1use reifydb_core::{
5 diagnostic::catalog::ringbuffer_already_exists,
6 interface::{
7 ColumnIndex, ColumnPolicyKind, CommandTransaction, DictionaryId, NamespaceId, RingBufferDef,
8 RingBufferId, TableId,
9 },
10 return_error,
11};
12use reifydb_type::{Fragment, TypeConstraint};
13
14use crate::{CatalogStore, store::sequence::SystemSequence};
15
16#[derive(Debug, Clone)]
17pub struct RingBufferColumnToCreate {
18 pub name: String,
19 pub constraint: TypeConstraint,
20 pub policies: Vec<ColumnPolicyKind>,
21 pub auto_increment: bool,
22 pub fragment: Option<Fragment>,
23 pub dictionary_id: Option<DictionaryId>,
24}
25
26#[derive(Debug, Clone)]
27pub struct RingBufferToCreate {
28 pub fragment: Option<Fragment>,
29 pub ringbuffer: String,
30 pub namespace: NamespaceId,
31 pub columns: Vec<RingBufferColumnToCreate>,
32 pub capacity: u64,
33}
34
35impl CatalogStore {
36 pub async fn create_ringbuffer(
37 txn: &mut impl CommandTransaction,
38 to_create: RingBufferToCreate,
39 ) -> crate::Result<RingBufferDef> {
40 let namespace_id = to_create.namespace;
41
42 if let Some(ringbuffer) =
44 CatalogStore::find_ringbuffer_by_name(txn, namespace_id, &to_create.ringbuffer).await?
45 {
46 let namespace = CatalogStore::get_namespace(txn, namespace_id).await?;
47 return_error!(ringbuffer_already_exists(
48 to_create.fragment.unwrap_or_else(|| Fragment::None),
49 &namespace.name,
50 &ringbuffer.name
51 ));
52 }
53
54 let ringbuffer_id = SystemSequence::next_ringbuffer_id(txn).await?;
56
57 Self::store_ringbuffer(txn, ringbuffer_id, namespace_id, &to_create).await?;
59
60 Self::link_ringbuffer_to_namespace(txn, namespace_id, ringbuffer_id, &to_create.ringbuffer).await?;
62
63 let capacity = to_create.capacity;
65
66 Self::insert_ringbuffer_columns(txn, ringbuffer_id, to_create).await?;
68
69 Self::initialize_ringbuffer_metadata(txn, ringbuffer_id, capacity).await?;
71
72 Ok(Self::get_ringbuffer(txn, ringbuffer_id).await?)
73 }
74
75 async fn store_ringbuffer(
76 txn: &mut impl CommandTransaction,
77 ringbuffer: RingBufferId,
78 namespace: NamespaceId,
79 to_create: &RingBufferToCreate,
80 ) -> crate::Result<()> {
81 use reifydb_core::interface::RingBufferKey;
82
83 use crate::store::ringbuffer::layout::ringbuffer;
84
85 let mut row = ringbuffer::LAYOUT.allocate();
86 ringbuffer::LAYOUT.set_u64(&mut row, ringbuffer::ID, ringbuffer);
87 ringbuffer::LAYOUT.set_u64(&mut row, ringbuffer::NAMESPACE, namespace);
88 ringbuffer::LAYOUT.set_utf8(&mut row, ringbuffer::NAME, &to_create.ringbuffer);
89 ringbuffer::LAYOUT.set_u64(&mut row, ringbuffer::CAPACITY, to_create.capacity);
90 ringbuffer::LAYOUT.set_u64(&mut row, ringbuffer::PRIMARY_KEY, 0u64);
92
93 txn.set(&RingBufferKey::encoded(ringbuffer), row).await?;
94
95 Ok(())
96 }
97
98 async fn link_ringbuffer_to_namespace(
99 txn: &mut impl CommandTransaction,
100 namespace: NamespaceId,
101 ringbuffer: RingBufferId,
102 name: &str,
103 ) -> crate::Result<()> {
104 use reifydb_core::interface::NamespaceRingBufferKey;
105
106 use crate::store::ringbuffer::layout::ringbuffer_namespace;
107
108 let mut row = ringbuffer_namespace::LAYOUT.allocate();
109 ringbuffer_namespace::LAYOUT.set_u64(&mut row, ringbuffer_namespace::ID, ringbuffer);
110 ringbuffer_namespace::LAYOUT.set_utf8(&mut row, ringbuffer_namespace::NAME, name);
111
112 txn.set(&NamespaceRingBufferKey::encoded(namespace, ringbuffer), row).await?;
113
114 Ok(())
115 }
116
117 async fn insert_ringbuffer_columns(
118 txn: &mut impl CommandTransaction,
119 ringbuffer_id: RingBufferId,
120 to_create: RingBufferToCreate,
121 ) -> crate::Result<()> {
122 use crate::store::column::ColumnToCreate;
123
124 for (idx, col) in to_create.columns.into_iter().enumerate() {
125 CatalogStore::create_column(
126 txn,
127 ringbuffer_id,
128 ColumnToCreate {
129 fragment: col.fragment,
130 namespace_name: String::new(), table: TableId(0), table_name: String::new(), column: col.name,
139 constraint: col.constraint,
140 if_not_exists: false,
141 policies: col.policies,
142 index: ColumnIndex(idx as u8),
143 auto_increment: col.auto_increment,
144 dictionary_id: col.dictionary_id,
145 },
146 )
147 .await?;
148 }
149
150 Ok(())
151 }
152
153 async fn initialize_ringbuffer_metadata(
154 txn: &mut impl CommandTransaction,
155 ringbuffer_id: RingBufferId,
156 capacity: u64,
157 ) -> crate::Result<()> {
158 use reifydb_core::interface::RingBufferMetadataKey;
159
160 use crate::store::ringbuffer::layout::ringbuffer_metadata;
161
162 let mut row = ringbuffer_metadata::LAYOUT.allocate();
163 ringbuffer_metadata::LAYOUT.set_u64(&mut row, ringbuffer_metadata::ID, ringbuffer_id);
164 ringbuffer_metadata::LAYOUT.set_u64(&mut row, ringbuffer_metadata::CAPACITY, capacity);
165 ringbuffer_metadata::LAYOUT.set_u64(&mut row, ringbuffer_metadata::HEAD, 0u64);
166 ringbuffer_metadata::LAYOUT.set_u64(&mut row, ringbuffer_metadata::TAIL, 0u64);
167 ringbuffer_metadata::LAYOUT.set_u64(&mut row, ringbuffer_metadata::COUNT, 0u64);
168
169 txn.set(&RingBufferMetadataKey::encoded(ringbuffer_id), row).await?;
170
171 Ok(())
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use reifydb_core::interface::{MultiVersionQueryTransaction, NamespaceRingBufferKey};
178 use reifydb_engine::test_utils::create_test_command_transaction;
179 use reifydb_type::{Type, TypeConstraint};
180
181 use super::*;
182 use crate::{store::ringbuffer::layout::ringbuffer_namespace, test_utils::ensure_test_namespace};
183
184 #[tokio::test]
185 async fn test_create_simple_ringbuffer() {
186 let mut txn = create_test_command_transaction().await;
187 let test_namespace = ensure_test_namespace(&mut txn).await;
188
189 let to_create = RingBufferToCreate {
190 namespace: test_namespace.id,
191 ringbuffer: "trades".to_string(),
192 capacity: 1000,
193 columns: vec![
194 RingBufferColumnToCreate {
195 name: "symbol".to_string(),
196 constraint: TypeConstraint::unconstrained(Type::Utf8),
197 fragment: None,
198 policies: vec![],
199 auto_increment: false,
200 dictionary_id: None,
201 },
202 RingBufferColumnToCreate {
203 name: "price".to_string(),
204 constraint: TypeConstraint::unconstrained(Type::Float8),
205 fragment: None,
206 policies: vec![],
207 auto_increment: false,
208 dictionary_id: None,
209 },
210 ],
211 fragment: None,
212 };
213
214 let result = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
215
216 assert!(result.id.0 > 0);
217 assert_eq!(result.namespace, test_namespace.id);
218 assert_eq!(result.name, "trades");
219 assert_eq!(result.capacity, 1000);
220 assert_eq!(result.columns.len(), 2);
221 assert_eq!(result.columns[0].name, "symbol");
222 assert_eq!(result.columns[1].name, "price");
223 assert_eq!(result.primary_key, None);
224 }
225
226 #[tokio::test]
227 async fn test_create_ringbuffer_empty_columns() {
228 let mut txn = create_test_command_transaction().await;
229 let test_namespace = ensure_test_namespace(&mut txn).await;
230
231 let to_create = RingBufferToCreate {
232 namespace: test_namespace.id,
233 ringbuffer: "empty_buffer".to_string(),
234 capacity: 100,
235 columns: vec![],
236 fragment: None,
237 };
238
239 let result = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
240
241 assert!(result.id.0 > 0);
242 assert_eq!(result.namespace, test_namespace.id);
243 assert_eq!(result.name, "empty_buffer");
244 assert_eq!(result.capacity, 100);
245 assert_eq!(result.columns.len(), 0);
246 }
247
248 #[tokio::test]
249 async fn test_create_duplicate_ringbuffer() {
250 let mut txn = create_test_command_transaction().await;
251 let test_namespace = ensure_test_namespace(&mut txn).await;
252
253 let to_create = RingBufferToCreate {
254 namespace: test_namespace.id,
255 ringbuffer: "test_ringbuffer".to_string(),
256 capacity: 50,
257 columns: vec![],
258 fragment: None,
259 };
260
261 let result = CatalogStore::create_ringbuffer(&mut txn, to_create.clone()).await.unwrap();
263 assert!(result.id.0 > 0);
264 assert_eq!(result.namespace, test_namespace.id);
265 assert_eq!(result.name, "test_ringbuffer");
266
267 let err = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap_err();
269 assert_eq!(err.diagnostic().code, "CA_005");
270 }
271
272 #[tokio::test]
273 async fn test_ringbuffer_linked_to_namespace() {
274 let mut txn = create_test_command_transaction().await;
275 let test_namespace = ensure_test_namespace(&mut txn).await;
276
277 let to_create = RingBufferToCreate {
278 namespace: test_namespace.id,
279 ringbuffer: "buffer1".to_string(),
280 capacity: 10,
281 columns: vec![],
282 fragment: None,
283 };
284
285 CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
286
287 let to_create = RingBufferToCreate {
288 namespace: test_namespace.id,
289 ringbuffer: "buffer2".to_string(),
290 capacity: 20,
291 columns: vec![],
292 fragment: None,
293 };
294
295 CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
296
297 let links = txn
299 .range(NamespaceRingBufferKey::full_scan(test_namespace.id))
300 .await
301 .unwrap()
302 .items
303 .into_iter()
304 .collect::<Vec<_>>();
305 assert_eq!(links.len(), 2);
306
307 let link = &links[0];
309 let row = &link.values;
310 let id2 = ringbuffer_namespace::LAYOUT.get_u64(row, ringbuffer_namespace::ID);
311 assert!(id2 > 0);
312 assert_eq!(ringbuffer_namespace::LAYOUT.get_utf8(row, ringbuffer_namespace::NAME), "buffer2");
313
314 let link = &links[1];
316 let row = &link.values;
317 let id1 = ringbuffer_namespace::LAYOUT.get_u64(row, ringbuffer_namespace::ID);
318 assert!(id2 > id1);
319 assert_eq!(ringbuffer_namespace::LAYOUT.get_utf8(row, ringbuffer_namespace::NAME), "buffer1");
320 }
321
322 #[tokio::test]
323 async fn test_create_ringbuffer_with_metadata() {
324 let mut txn = create_test_command_transaction().await;
325 let test_namespace = ensure_test_namespace(&mut txn).await;
326
327 let to_create = RingBufferToCreate {
328 namespace: test_namespace.id,
329 ringbuffer: "metadata_buffer".to_string(),
330 capacity: 500,
331 columns: vec![],
332 fragment: None,
333 };
334
335 let result = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
336
337 let metadata = CatalogStore::find_ringbuffer_metadata(&mut txn, result.id)
339 .await
340 .unwrap()
341 .expect("Metadata should exist");
342
343 assert_eq!(metadata.id, result.id);
344 assert_eq!(metadata.capacity, 500);
345 assert_eq!(metadata.count, 0);
346 assert_eq!(metadata.head, 0);
347 assert_eq!(metadata.tail, 0);
348 }
349
350 #[tokio::test]
351 async fn test_create_multiple_ringbuffers_with_different_capacities() {
352 let mut txn = create_test_command_transaction().await;
353 let test_namespace = ensure_test_namespace(&mut txn).await;
354
355 let small = RingBufferToCreate {
357 namespace: test_namespace.id,
358 ringbuffer: "small_buffer".to_string(),
359 capacity: 10,
360 columns: vec![],
361 fragment: None,
362 };
363 let small_result = CatalogStore::create_ringbuffer(&mut txn, small).await.unwrap();
364 assert_eq!(small_result.capacity, 10);
365
366 let medium = RingBufferToCreate {
368 namespace: test_namespace.id,
369 ringbuffer: "medium_buffer".to_string(),
370 capacity: 1000,
371 columns: vec![],
372 fragment: None,
373 };
374 let medium_result = CatalogStore::create_ringbuffer(&mut txn, medium).await.unwrap();
375 assert_eq!(medium_result.capacity, 1000);
376
377 let large = RingBufferToCreate {
379 namespace: test_namespace.id,
380 ringbuffer: "large_buffer".to_string(),
381 capacity: 1000000,
382 columns: vec![],
383 fragment: None,
384 };
385 let large_result = CatalogStore::create_ringbuffer(&mut txn, large).await.unwrap();
386 assert_eq!(large_result.capacity, 1000000);
387
388 assert_ne!(small_result.id, medium_result.id);
390 assert_ne!(medium_result.id, large_result.id);
391 assert_ne!(small_result.id, large_result.id);
392 }
393
394 #[tokio::test]
395 async fn test_create_ringbuffer_preserves_column_order() {
396 let mut txn = create_test_command_transaction().await;
397 let test_namespace = ensure_test_namespace(&mut txn).await;
398
399 let columns = vec![
400 RingBufferColumnToCreate {
401 name: "first".to_string(),
402 constraint: TypeConstraint::unconstrained(Type::Uint8),
403 fragment: None,
404 policies: vec![],
405 auto_increment: false,
406 dictionary_id: None,
407 },
408 RingBufferColumnToCreate {
409 name: "second".to_string(),
410 constraint: TypeConstraint::unconstrained(Type::Uint16),
411 fragment: None,
412 policies: vec![],
413 auto_increment: false,
414 dictionary_id: None,
415 },
416 RingBufferColumnToCreate {
417 name: "third".to_string(),
418 constraint: TypeConstraint::unconstrained(Type::Uint4),
419 fragment: None,
420 policies: vec![],
421 auto_increment: false,
422 dictionary_id: None,
423 },
424 ];
425
426 let to_create = RingBufferToCreate {
427 namespace: test_namespace.id,
428 ringbuffer: "ordered_buffer".to_string(),
429 capacity: 100,
430 columns: columns.clone(),
431 fragment: None,
432 };
433
434 let result = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
435
436 assert_eq!(result.columns.len(), 3);
437 assert_eq!(result.columns[0].name, "first");
438 assert_eq!(result.columns[0].index.0, 0);
439 assert_eq!(result.columns[1].name, "second");
440 assert_eq!(result.columns[1].index.0, 1);
441 assert_eq!(result.columns[2].name, "third");
442 assert_eq!(result.columns[2].index.0, 2);
443 }
444}