reifydb_catalog/store/ringbuffer/
set_pk.rs1use reifydb_core::{
5 interface::{CommandTransaction, PrimaryKeyId, RingBufferId, RingBufferKey},
6 return_internal_error,
7};
8
9use crate::{CatalogStore, store::ringbuffer::layout::ringbuffer};
10
11impl CatalogStore {
12 pub async fn set_ringbuffer_primary_key(
15 txn: &mut impl CommandTransaction,
16 ringbuffer_id: RingBufferId,
17 primary_key_id: PrimaryKeyId,
18 ) -> crate::Result<()> {
19 let multi = match txn.get(&RingBufferKey::encoded(ringbuffer_id)).await? {
20 Some(v) => v,
21 None => return_internal_error!(format!(
22 "Ring buffer with ID {} not found when setting primary key. This indicates a critical catalog inconsistency.",
23 ringbuffer_id.0
24 )),
25 };
26
27 let mut updated_row = multi.values.clone();
28 ringbuffer::LAYOUT.set_u64(&mut updated_row, ringbuffer::PRIMARY_KEY, primary_key_id.0);
29
30 txn.set(&RingBufferKey::encoded(ringbuffer_id), updated_row).await?;
31
32 Ok(())
33 }
34}
35
36#[cfg(test)]
37mod tests {
38 use reifydb_core::interface::{PrimaryKeyId, RingBufferId};
39 use reifydb_engine::test_utils::create_test_command_transaction;
40
41 use crate::{CatalogStore, test_utils::ensure_test_ringbuffer};
42
43 #[tokio::test]
44 async fn test_set_ringbuffer_primary_key() {
45 let mut txn = create_test_command_transaction().await;
46 let ringbuffer = ensure_test_ringbuffer(&mut txn).await;
47
48 let pk_id = PrimaryKeyId(100);
50 CatalogStore::set_ringbuffer_primary_key(&mut txn, ringbuffer.id, pk_id).await.unwrap();
51
52 let retrieved_pk = CatalogStore::get_ringbuffer_pk_id(&mut txn, ringbuffer.id).await.unwrap();
54 assert_eq!(retrieved_pk, Some(pk_id));
55 }
56
57 #[tokio::test]
58 async fn test_set_ringbuffer_primary_key_nonexistent() {
59 let mut txn = create_test_command_transaction().await;
60
61 let result =
62 CatalogStore::set_ringbuffer_primary_key(&mut txn, RingBufferId(999), PrimaryKeyId(1)).await;
63
64 assert!(result.is_err());
65 let err = result.unwrap_err();
66 assert!(err.to_string().contains("Ring buffer with ID 999 not found"));
67 assert!(err.to_string().contains("critical catalog inconsistency"));
68 }
69
70 #[tokio::test]
71 async fn test_set_ringbuffer_primary_key_overwrites() {
72 let mut txn = create_test_command_transaction().await;
73 let ringbuffer = ensure_test_ringbuffer(&mut txn).await;
74
75 let pk_id1 = PrimaryKeyId(100);
77 CatalogStore::set_ringbuffer_primary_key(&mut txn, ringbuffer.id, pk_id1).await.unwrap();
78
79 let pk_id2 = PrimaryKeyId(200);
81 CatalogStore::set_ringbuffer_primary_key(&mut txn, ringbuffer.id, pk_id2).await.unwrap();
82
83 let retrieved_pk = CatalogStore::get_ringbuffer_pk_id(&mut txn, ringbuffer.id).await.unwrap();
85 assert_eq!(retrieved_pk, Some(pk_id2));
86 }
87}