reifydb_catalog/store/ringbuffer/
set_pk.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{CommandTransaction, PrimaryKeyId, RingBufferId, RingBufferKey},
6	return_internal_error,
7};
8
9use crate::{CatalogStore, store::ringbuffer::layout::ringbuffer};
10
11impl CatalogStore {
12	/// Set the primary key ID for a ring buffer
13	/// Returns an internal error if the ring buffer doesn't exist
14	pub async fn set_ringbuffer_primary_key(
15		txn: &mut impl CommandTransaction,
16		ringbuffer_id: RingBufferId,
17		primary_key_id: PrimaryKeyId,
18	) -> crate::Result<()> {
19		let multi = match txn.get(&RingBufferKey::encoded(ringbuffer_id)).await? {
20			Some(v) => v,
21			None => return_internal_error!(format!(
22				"Ring buffer with ID {} not found when setting primary key. This indicates a critical catalog inconsistency.",
23				ringbuffer_id.0
24			)),
25		};
26
27		let mut updated_row = multi.values.clone();
28		ringbuffer::LAYOUT.set_u64(&mut updated_row, ringbuffer::PRIMARY_KEY, primary_key_id.0);
29
30		txn.set(&RingBufferKey::encoded(ringbuffer_id), updated_row).await?;
31
32		Ok(())
33	}
34}
35
36#[cfg(test)]
37mod tests {
38	use reifydb_core::interface::{PrimaryKeyId, RingBufferId};
39	use reifydb_engine::test_utils::create_test_command_transaction;
40
41	use crate::{CatalogStore, test_utils::ensure_test_ringbuffer};
42
43	#[tokio::test]
44	async fn test_set_ringbuffer_primary_key() {
45		let mut txn = create_test_command_transaction().await;
46		let ringbuffer = ensure_test_ringbuffer(&mut txn).await;
47
48		// Set primary key
49		let pk_id = PrimaryKeyId(100);
50		CatalogStore::set_ringbuffer_primary_key(&mut txn, ringbuffer.id, pk_id).await.unwrap();
51
52		// Verify it was set
53		let retrieved_pk = CatalogStore::get_ringbuffer_pk_id(&mut txn, ringbuffer.id).await.unwrap();
54		assert_eq!(retrieved_pk, Some(pk_id));
55	}
56
57	#[tokio::test]
58	async fn test_set_ringbuffer_primary_key_nonexistent() {
59		let mut txn = create_test_command_transaction().await;
60
61		let result =
62			CatalogStore::set_ringbuffer_primary_key(&mut txn, RingBufferId(999), PrimaryKeyId(1)).await;
63
64		assert!(result.is_err());
65		let err = result.unwrap_err();
66		assert!(err.to_string().contains("Ring buffer with ID 999 not found"));
67		assert!(err.to_string().contains("critical catalog inconsistency"));
68	}
69
70	#[tokio::test]
71	async fn test_set_ringbuffer_primary_key_overwrites() {
72		let mut txn = create_test_command_transaction().await;
73		let ringbuffer = ensure_test_ringbuffer(&mut txn).await;
74
75		// Set first primary key
76		let pk_id1 = PrimaryKeyId(100);
77		CatalogStore::set_ringbuffer_primary_key(&mut txn, ringbuffer.id, pk_id1).await.unwrap();
78
79		// Set second primary key (should overwrite)
80		let pk_id2 = PrimaryKeyId(200);
81		CatalogStore::set_ringbuffer_primary_key(&mut txn, ringbuffer.id, pk_id2).await.unwrap();
82
83		// Verify second one is set
84		let retrieved_pk = CatalogStore::get_ringbuffer_pk_id(&mut txn, ringbuffer.id).await.unwrap();
85		assert_eq!(retrieved_pk, Some(pk_id2));
86	}
87}