reifydb_catalog/store/ringbuffer/
list.rs1use reifydb_core::interface::{Key, NamespaceId, QueryTransaction, RingBufferDef, RingBufferKey};
5
6use crate::{CatalogStore, store::ringbuffer::layout::ringbuffer};
7
8impl CatalogStore {
9 pub async fn list_ringbuffers_all(rx: &mut impl QueryTransaction) -> crate::Result<Vec<RingBufferDef>> {
10 let mut result = Vec::new();
11
12 let batch = rx.range(RingBufferKey::full_scan()).await?;
13
14 for entry in batch.items {
15 if let Some(key) = Key::decode(&entry.key) {
16 if let Key::RingBuffer(ringbuffer_key) = key {
17 let ringbuffer_id = ringbuffer_key.ringbuffer;
18
19 let namespace_id = NamespaceId(
20 ringbuffer::LAYOUT.get_u64(&entry.values, ringbuffer::NAMESPACE),
21 );
22
23 let name = ringbuffer::LAYOUT
24 .get_utf8(&entry.values, ringbuffer::NAME)
25 .to_string();
26
27 let capacity = ringbuffer::LAYOUT.get_u64(&entry.values, ringbuffer::CAPACITY);
28
29 let primary_key = Self::find_primary_key(rx, ringbuffer_id).await?;
30 let columns = Self::list_columns(rx, ringbuffer_id).await?;
31
32 let ringbuffer_def = RingBufferDef {
33 id: ringbuffer_id,
34 namespace: namespace_id,
35 name,
36 capacity,
37 columns,
38 primary_key,
39 };
40
41 result.push(ringbuffer_def);
42 }
43 }
44 }
45
46 Ok(result)
47 }
48}
49
50#[cfg(test)]
51mod tests {
52 use reifydb_engine::test_utils::create_test_command_transaction;
53
54 use crate::{
55 CatalogStore, namespace::NamespaceToCreate, ringbuffer::create::RingBufferToCreate,
56 test_utils::ensure_test_namespace,
57 };
58
59 #[tokio::test]
60 async fn test_list_ringbuffers_empty() {
61 let mut txn = create_test_command_transaction().await;
62 ensure_test_namespace(&mut txn).await;
63
64 let buffers = CatalogStore::list_ringbuffers_all(&mut txn).await.unwrap();
65
66 assert_eq!(buffers.len(), 0);
67 }
68
69 #[tokio::test]
70 async fn test_list_ringbuffers_multiple() {
71 let mut txn = create_test_command_transaction().await;
72 let namespace = ensure_test_namespace(&mut txn).await;
73
74 let buffer1 = RingBufferToCreate {
76 namespace: namespace.id,
77 ringbuffer: "buffer1".to_string(),
78 capacity: 100,
79 columns: vec![],
80 fragment: None,
81 };
82 CatalogStore::create_ringbuffer(&mut txn, buffer1).await.unwrap();
83
84 let buffer2 = RingBufferToCreate {
86 namespace: namespace.id,
87 ringbuffer: "buffer2".to_string(),
88 capacity: 200,
89 columns: vec![],
90 fragment: None,
91 };
92 CatalogStore::create_ringbuffer(&mut txn, buffer2).await.unwrap();
93
94 let buffers = CatalogStore::list_ringbuffers_all(&mut txn).await.unwrap();
95
96 assert_eq!(buffers.len(), 2);
97 assert!(buffers.iter().any(|b| b.name == "buffer1"));
98 assert!(buffers.iter().any(|b| b.name == "buffer2"));
99 }
100
101 #[tokio::test]
102 async fn test_list_ringbuffers_different_namespaces() {
103 let mut txn = create_test_command_transaction().await;
104 let namespace1 = ensure_test_namespace(&mut txn).await;
105
106 let namespace2 = CatalogStore::create_namespace(
108 &mut txn,
109 NamespaceToCreate {
110 namespace_fragment: None,
111 name: "namespace2".to_string(),
112 },
113 )
114 .await
115 .unwrap();
116
117 let buffer1 = RingBufferToCreate {
119 namespace: namespace1.id,
120 ringbuffer: "buffer1".to_string(),
121 capacity: 100,
122 columns: vec![],
123 fragment: None,
124 };
125 CatalogStore::create_ringbuffer(&mut txn, buffer1).await.unwrap();
126
127 let buffer2 = RingBufferToCreate {
129 namespace: namespace2.id,
130 ringbuffer: "buffer2".to_string(),
131 capacity: 200,
132 columns: vec![],
133 fragment: None,
134 };
135 CatalogStore::create_ringbuffer(&mut txn, buffer2).await.unwrap();
136
137 let all_buffers = CatalogStore::list_ringbuffers_all(&mut txn).await.unwrap();
139 assert_eq!(all_buffers.len(), 2);
140
141 let buffer1_entry = all_buffers.iter().find(|b| b.name == "buffer1").expect("buffer1 should exist");
143 assert_eq!(buffer1_entry.namespace, namespace1.id);
144
145 let buffer2_entry = all_buffers.iter().find(|b| b.name == "buffer2").expect("buffer2 should exist");
147 assert_eq!(buffer2_entry.namespace, namespace2.id);
148 }
149}