reifydb_catalog/store/ringbuffer/
find.rs1use reifydb_core::interface::{
5 MultiVersionValues, NamespaceId, NamespaceRingBufferKey, QueryTransaction, RingBufferDef, RingBufferId,
6 RingBufferKey, RingBufferMetadata, RingBufferMetadataKey,
7};
8
9use crate::{
10 CatalogStore,
11 store::ringbuffer::layout::{ringbuffer, ringbuffer_metadata, ringbuffer_namespace},
12};
13
14impl CatalogStore {
15 pub async fn find_ringbuffer(
16 rx: &mut impl QueryTransaction,
17 ringbuffer: RingBufferId,
18 ) -> crate::Result<Option<RingBufferDef>> {
19 let Some(multi) = rx.get(&RingBufferKey::encoded(ringbuffer)).await? else {
20 return Ok(None);
21 };
22
23 let row = multi.values;
24 let id = RingBufferId(ringbuffer::LAYOUT.get_u64(&row, ringbuffer::ID));
25 let namespace = NamespaceId(ringbuffer::LAYOUT.get_u64(&row, ringbuffer::NAMESPACE));
26 let name = ringbuffer::LAYOUT.get_utf8(&row, ringbuffer::NAME).to_string();
27 let capacity = ringbuffer::LAYOUT.get_u64(&row, ringbuffer::CAPACITY);
28
29 Ok(Some(RingBufferDef {
30 id,
31 namespace,
32 name,
33 capacity,
34 columns: Self::list_columns(rx, id).await?,
35 primary_key: Self::find_primary_key(rx, id).await?,
36 }))
37 }
38
39 pub async fn find_ringbuffer_metadata(
40 rx: &mut impl QueryTransaction,
41 ringbuffer: RingBufferId,
42 ) -> crate::Result<Option<RingBufferMetadata>> {
43 let Some(multi) = rx.get(&RingBufferMetadataKey::encoded(ringbuffer)).await? else {
44 return Ok(None);
45 };
46
47 let row = multi.values;
48 let buffer_id = RingBufferId(ringbuffer_metadata::LAYOUT.get_u64(&row, ringbuffer_metadata::ID));
49 let capacity = ringbuffer_metadata::LAYOUT.get_u64(&row, ringbuffer_metadata::CAPACITY);
50 let head = ringbuffer_metadata::LAYOUT.get_u64(&row, ringbuffer_metadata::HEAD);
51 let tail = ringbuffer_metadata::LAYOUT.get_u64(&row, ringbuffer_metadata::TAIL);
52 let current_size = ringbuffer_metadata::LAYOUT.get_u64(&row, ringbuffer_metadata::COUNT);
53
54 Ok(Some(RingBufferMetadata {
55 id: buffer_id,
56 capacity,
57 count: current_size,
58 head,
59 tail,
60 }))
61 }
62
63 pub async fn find_ringbuffer_by_name(
64 rx: &mut impl QueryTransaction,
65 namespace: NamespaceId,
66 name: impl AsRef<str>,
67 ) -> crate::Result<Option<RingBufferDef>> {
68 let name = name.as_ref();
69 let batch = rx.range(NamespaceRingBufferKey::full_scan(namespace)).await?;
70 let Some(ringbuffer) = batch.items.into_iter().find_map(|multi: MultiVersionValues| {
71 let row = &multi.values;
72 let ringbuffer_name = ringbuffer_namespace::LAYOUT.get_utf8(row, ringbuffer_namespace::NAME);
73 if name == ringbuffer_name {
74 Some(RingBufferId(ringbuffer_namespace::LAYOUT.get_u64(row, ringbuffer_namespace::ID)))
75 } else {
76 None
77 }
78 }) else {
79 return Ok(None);
80 };
81
82 Ok(Some(Self::get_ringbuffer(rx, ringbuffer).await?))
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use reifydb_core::interface::RingBufferId;
89 use reifydb_engine::test_utils::create_test_command_transaction;
90 use reifydb_type::{Type, TypeConstraint};
91
92 use crate::{
93 CatalogStore,
94 namespace::NamespaceToCreate,
95 primary_key::PrimaryKeyToCreate,
96 ringbuffer::create::{RingBufferColumnToCreate, RingBufferToCreate},
97 test_utils::{ensure_test_namespace, ensure_test_ringbuffer},
98 };
99
100 #[tokio::test]
101 async fn test_find_ringbuffer_exists() {
102 let mut txn = create_test_command_transaction().await;
103 let ringbuffer = ensure_test_ringbuffer(&mut txn).await;
104
105 let found = CatalogStore::find_ringbuffer(&mut txn, ringbuffer.id)
106 .await
107 .unwrap()
108 .expect("Ring buffer should exist");
109
110 assert_eq!(found.id, ringbuffer.id);
111 assert_eq!(found.name, ringbuffer.name);
112 assert_eq!(found.namespace, ringbuffer.namespace);
113 assert_eq!(found.capacity, ringbuffer.capacity);
114 }
115
116 #[tokio::test]
117 async fn test_find_ringbuffer_not_exists() {
118 let mut txn = create_test_command_transaction().await;
119
120 let result = CatalogStore::find_ringbuffer(&mut txn, RingBufferId(999)).await.unwrap();
121
122 assert!(result.is_none());
123 }
124
125 #[tokio::test]
126 async fn test_find_ringbuffer_metadata() {
127 let mut txn = create_test_command_transaction().await;
128 let ringbuffer = ensure_test_ringbuffer(&mut txn).await;
129
130 let metadata = CatalogStore::find_ringbuffer_metadata(&mut txn, ringbuffer.id)
131 .await
132 .unwrap()
133 .expect("Metadata should exist");
134
135 assert_eq!(metadata.id, ringbuffer.id);
136 assert_eq!(metadata.capacity, ringbuffer.capacity);
137 assert_eq!(metadata.count, 0);
138 assert_eq!(metadata.head, 0);
139 assert_eq!(metadata.tail, 0);
140 }
141
142 #[tokio::test]
143 async fn test_find_ringbuffer_metadata_not_exists() {
144 let mut txn = create_test_command_transaction().await;
145
146 let result = CatalogStore::find_ringbuffer_metadata(&mut txn, RingBufferId(999)).await.unwrap();
147
148 assert!(result.is_none());
149 }
150
151 #[tokio::test]
152 async fn test_find_ringbuffer_by_name_exists() {
153 let mut txn = create_test_command_transaction().await;
154 let namespace = ensure_test_namespace(&mut txn).await;
155
156 let to_create = RingBufferToCreate {
158 namespace: namespace.id,
159 ringbuffer: "trades_buffer".to_string(),
160 capacity: 200,
161 columns: vec![RingBufferColumnToCreate {
162 name: "symbol".to_string(),
163 constraint: TypeConstraint::unconstrained(Type::Utf8),
164 fragment: None,
165 policies: vec![],
166 auto_increment: false,
167 dictionary_id: None,
168 }],
169 fragment: None,
170 };
171
172 let created = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
173
174 let found = CatalogStore::find_ringbuffer_by_name(&mut txn, namespace.id, "trades_buffer")
176 .await
177 .unwrap()
178 .expect("Should find ring buffer by name");
179
180 assert_eq!(found.id, created.id);
181 assert_eq!(found.name, "trades_buffer");
182 assert_eq!(found.capacity, 200);
183 assert_eq!(found.columns.len(), 1);
184 }
185
186 #[tokio::test]
187 async fn test_find_ringbuffer_by_name_not_exists() {
188 let mut txn = create_test_command_transaction().await;
189 let namespace = ensure_test_namespace(&mut txn).await;
190
191 let result = CatalogStore::find_ringbuffer_by_name(&mut txn, namespace.id, "nonexistent_buffer")
192 .await
193 .unwrap();
194
195 assert!(result.is_none());
196 }
197
198 #[tokio::test]
199 async fn test_find_ringbuffer_by_name_different_namespace() {
200 let mut txn = create_test_command_transaction().await;
201 let namespace1 = ensure_test_namespace(&mut txn).await;
202
203 let namespace2 = CatalogStore::create_namespace(
205 &mut txn,
206 NamespaceToCreate {
207 namespace_fragment: None,
208 name: "namespace2".to_string(),
209 },
210 )
211 .await
212 .unwrap();
213
214 let to_create = RingBufferToCreate {
216 namespace: namespace1.id,
217 ringbuffer: "shared_name".to_string(),
218 capacity: 50,
219 columns: vec![],
220 fragment: None,
221 };
222
223 CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
224
225 let result =
227 CatalogStore::find_ringbuffer_by_name(&mut txn, namespace2.id, "shared_name").await.unwrap();
228
229 assert!(result.is_none());
230
231 let found =
233 CatalogStore::find_ringbuffer_by_name(&mut txn, namespace1.id, "shared_name").await.unwrap();
234
235 assert!(found.is_some());
236 }
237
238 #[tokio::test]
239 async fn test_find_ringbuffer_with_columns_and_primary_key() {
240 let mut txn = create_test_command_transaction().await;
241 let namespace = ensure_test_namespace(&mut txn).await;
242
243 let to_create = RingBufferToCreate {
245 namespace: namespace.id,
246 ringbuffer: "pk_buffer".to_string(),
247 capacity: 100,
248 columns: vec![
249 RingBufferColumnToCreate {
250 name: "id".to_string(),
251 constraint: TypeConstraint::unconstrained(Type::Uint8),
252 fragment: None,
253 policies: vec![],
254 auto_increment: true,
255 dictionary_id: None,
256 },
257 RingBufferColumnToCreate {
258 name: "value".to_string(),
259 constraint: TypeConstraint::unconstrained(Type::Float8),
260 fragment: None,
261 policies: vec![],
262 auto_increment: false,
263 dictionary_id: None,
264 },
265 ],
266 fragment: None,
267 };
268
269 let created = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
270
271 let columns = CatalogStore::list_columns(&mut txn, created.id).await.unwrap();
273 let pk_id = CatalogStore::create_primary_key(
274 &mut txn,
275 PrimaryKeyToCreate {
276 source: created.id.into(),
277 column_ids: vec![columns[0].id],
278 },
279 )
280 .await
281 .unwrap();
282
283 let found = CatalogStore::find_ringbuffer(&mut txn, created.id)
285 .await
286 .unwrap()
287 .expect("Ring buffer should exist");
288
289 assert_eq!(found.columns.len(), 2);
290 assert_eq!(found.columns[0].name, "id");
291 assert_eq!(found.columns[0].auto_increment, true);
292 assert_eq!(found.columns[1].name, "value");
293 assert!(found.primary_key.is_some());
294 assert_eq!(found.primary_key.unwrap().id, pk_id);
295 }
296}