reifydb_catalog/store/ringbuffer/
create.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	diagnostic::catalog::ringbuffer_already_exists,
6	interface::{
7		ColumnIndex, ColumnPolicyKind, CommandTransaction, DictionaryId, NamespaceId, RingBufferDef,
8		RingBufferId, TableId,
9	},
10	return_error,
11};
12use reifydb_type::{Fragment, TypeConstraint};
13
14use crate::{CatalogStore, store::sequence::SystemSequence};
15
16#[derive(Debug, Clone)]
17pub struct RingBufferColumnToCreate {
18	pub name: String,
19	pub constraint: TypeConstraint,
20	pub policies: Vec<ColumnPolicyKind>,
21	pub auto_increment: bool,
22	pub fragment: Option<Fragment>,
23	pub dictionary_id: Option<DictionaryId>,
24}
25
26#[derive(Debug, Clone)]
27pub struct RingBufferToCreate {
28	pub fragment: Option<Fragment>,
29	pub ringbuffer: String,
30	pub namespace: NamespaceId,
31	pub columns: Vec<RingBufferColumnToCreate>,
32	pub capacity: u64,
33}
34
35impl CatalogStore {
36	pub async fn create_ringbuffer(
37		txn: &mut impl CommandTransaction,
38		to_create: RingBufferToCreate,
39	) -> crate::Result<RingBufferDef> {
40		let namespace_id = to_create.namespace;
41
42		// Check if ring buffer already exists
43		if let Some(ringbuffer) =
44			CatalogStore::find_ringbuffer_by_name(txn, namespace_id, &to_create.ringbuffer).await?
45		{
46			let namespace = CatalogStore::get_namespace(txn, namespace_id).await?;
47			return_error!(ringbuffer_already_exists(
48				to_create.fragment.unwrap_or_else(|| Fragment::None),
49				&namespace.name,
50				&ringbuffer.name
51			));
52		}
53
54		// Allocate new ring buffer ID
55		let ringbuffer_id = SystemSequence::next_ringbuffer_id(txn).await?;
56
57		// Store the ring buffer
58		Self::store_ringbuffer(txn, ringbuffer_id, namespace_id, &to_create).await?;
59
60		// Link ring buffer to namespace
61		Self::link_ringbuffer_to_namespace(txn, namespace_id, ringbuffer_id, &to_create.ringbuffer).await?;
62
63		// Save capacity before moving to_create
64		let capacity = to_create.capacity;
65
66		// Insert columns
67		Self::insert_ringbuffer_columns(txn, ringbuffer_id, to_create).await?;
68
69		// Initialize ring buffer metadata
70		Self::initialize_ringbuffer_metadata(txn, ringbuffer_id, capacity).await?;
71
72		Ok(Self::get_ringbuffer(txn, ringbuffer_id).await?)
73	}
74
75	async fn store_ringbuffer(
76		txn: &mut impl CommandTransaction,
77		ringbuffer: RingBufferId,
78		namespace: NamespaceId,
79		to_create: &RingBufferToCreate,
80	) -> crate::Result<()> {
81		use reifydb_core::interface::RingBufferKey;
82
83		use crate::store::ringbuffer::layout::ringbuffer;
84
85		let mut row = ringbuffer::LAYOUT.allocate();
86		ringbuffer::LAYOUT.set_u64(&mut row, ringbuffer::ID, ringbuffer);
87		ringbuffer::LAYOUT.set_u64(&mut row, ringbuffer::NAMESPACE, namespace);
88		ringbuffer::LAYOUT.set_utf8(&mut row, ringbuffer::NAME, &to_create.ringbuffer);
89		ringbuffer::LAYOUT.set_u64(&mut row, ringbuffer::CAPACITY, to_create.capacity);
90		// Initialize with no primary key
91		ringbuffer::LAYOUT.set_u64(&mut row, ringbuffer::PRIMARY_KEY, 0u64);
92
93		txn.set(&RingBufferKey::encoded(ringbuffer), row).await?;
94
95		Ok(())
96	}
97
98	async fn link_ringbuffer_to_namespace(
99		txn: &mut impl CommandTransaction,
100		namespace: NamespaceId,
101		ringbuffer: RingBufferId,
102		name: &str,
103	) -> crate::Result<()> {
104		use reifydb_core::interface::NamespaceRingBufferKey;
105
106		use crate::store::ringbuffer::layout::ringbuffer_namespace;
107
108		let mut row = ringbuffer_namespace::LAYOUT.allocate();
109		ringbuffer_namespace::LAYOUT.set_u64(&mut row, ringbuffer_namespace::ID, ringbuffer);
110		ringbuffer_namespace::LAYOUT.set_utf8(&mut row, ringbuffer_namespace::NAME, name);
111
112		txn.set(&NamespaceRingBufferKey::encoded(namespace, ringbuffer), row).await?;
113
114		Ok(())
115	}
116
117	async fn insert_ringbuffer_columns(
118		txn: &mut impl CommandTransaction,
119		ringbuffer_id: RingBufferId,
120		to_create: RingBufferToCreate,
121	) -> crate::Result<()> {
122		use crate::store::column::ColumnToCreate;
123
124		for (idx, col) in to_create.columns.into_iter().enumerate() {
125			CatalogStore::create_column(
126				txn,
127				ringbuffer_id,
128				ColumnToCreate {
129					fragment: col.fragment,
130					namespace_name: String::new(), /* Not used in
131					                                * create_column */
132					table: TableId(0), /* Not used in
133					                    * create_column -
134					                    * source is passed
135					                    * separately */
136					table_name: String::new(), /* Not used in
137					                            * create_column */
138					column: col.name,
139					constraint: col.constraint,
140					if_not_exists: false,
141					policies: col.policies,
142					index: ColumnIndex(idx as u8),
143					auto_increment: col.auto_increment,
144					dictionary_id: col.dictionary_id,
145				},
146			)
147			.await?;
148		}
149
150		Ok(())
151	}
152
153	async fn initialize_ringbuffer_metadata(
154		txn: &mut impl CommandTransaction,
155		ringbuffer_id: RingBufferId,
156		capacity: u64,
157	) -> crate::Result<()> {
158		use reifydb_core::interface::RingBufferMetadataKey;
159
160		use crate::store::ringbuffer::layout::ringbuffer_metadata;
161
162		let mut row = ringbuffer_metadata::LAYOUT.allocate();
163		ringbuffer_metadata::LAYOUT.set_u64(&mut row, ringbuffer_metadata::ID, ringbuffer_id);
164		ringbuffer_metadata::LAYOUT.set_u64(&mut row, ringbuffer_metadata::CAPACITY, capacity);
165		ringbuffer_metadata::LAYOUT.set_u64(&mut row, ringbuffer_metadata::HEAD, 0u64);
166		ringbuffer_metadata::LAYOUT.set_u64(&mut row, ringbuffer_metadata::TAIL, 0u64);
167		ringbuffer_metadata::LAYOUT.set_u64(&mut row, ringbuffer_metadata::COUNT, 0u64);
168
169		txn.set(&RingBufferMetadataKey::encoded(ringbuffer_id), row).await?;
170
171		Ok(())
172	}
173}
174
175#[cfg(test)]
176mod tests {
177	use reifydb_core::interface::{MultiVersionQueryTransaction, NamespaceRingBufferKey};
178	use reifydb_engine::test_utils::create_test_command_transaction;
179	use reifydb_type::{Type, TypeConstraint};
180
181	use super::*;
182	use crate::{store::ringbuffer::layout::ringbuffer_namespace, test_utils::ensure_test_namespace};
183
184	#[tokio::test]
185	async fn test_create_simple_ringbuffer() {
186		let mut txn = create_test_command_transaction().await;
187		let test_namespace = ensure_test_namespace(&mut txn).await;
188
189		let to_create = RingBufferToCreate {
190			namespace: test_namespace.id,
191			ringbuffer: "trades".to_string(),
192			capacity: 1000,
193			columns: vec![
194				RingBufferColumnToCreate {
195					name: "symbol".to_string(),
196					constraint: TypeConstraint::unconstrained(Type::Utf8),
197					fragment: None,
198					policies: vec![],
199					auto_increment: false,
200					dictionary_id: None,
201				},
202				RingBufferColumnToCreate {
203					name: "price".to_string(),
204					constraint: TypeConstraint::unconstrained(Type::Float8),
205					fragment: None,
206					policies: vec![],
207					auto_increment: false,
208					dictionary_id: None,
209				},
210			],
211			fragment: None,
212		};
213
214		let result = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
215
216		assert!(result.id.0 > 0);
217		assert_eq!(result.namespace, test_namespace.id);
218		assert_eq!(result.name, "trades");
219		assert_eq!(result.capacity, 1000);
220		assert_eq!(result.columns.len(), 2);
221		assert_eq!(result.columns[0].name, "symbol");
222		assert_eq!(result.columns[1].name, "price");
223		assert_eq!(result.primary_key, None);
224	}
225
226	#[tokio::test]
227	async fn test_create_ringbuffer_empty_columns() {
228		let mut txn = create_test_command_transaction().await;
229		let test_namespace = ensure_test_namespace(&mut txn).await;
230
231		let to_create = RingBufferToCreate {
232			namespace: test_namespace.id,
233			ringbuffer: "empty_buffer".to_string(),
234			capacity: 100,
235			columns: vec![],
236			fragment: None,
237		};
238
239		let result = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
240
241		assert!(result.id.0 > 0);
242		assert_eq!(result.namespace, test_namespace.id);
243		assert_eq!(result.name, "empty_buffer");
244		assert_eq!(result.capacity, 100);
245		assert_eq!(result.columns.len(), 0);
246	}
247
248	#[tokio::test]
249	async fn test_create_duplicate_ringbuffer() {
250		let mut txn = create_test_command_transaction().await;
251		let test_namespace = ensure_test_namespace(&mut txn).await;
252
253		let to_create = RingBufferToCreate {
254			namespace: test_namespace.id,
255			ringbuffer: "test_ringbuffer".to_string(),
256			capacity: 50,
257			columns: vec![],
258			fragment: None,
259		};
260
261		// First creation should succeed
262		let result = CatalogStore::create_ringbuffer(&mut txn, to_create.clone()).await.unwrap();
263		assert!(result.id.0 > 0);
264		assert_eq!(result.namespace, test_namespace.id);
265		assert_eq!(result.name, "test_ringbuffer");
266
267		// Second creation should fail with duplicate error
268		let err = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap_err();
269		assert_eq!(err.diagnostic().code, "CA_005");
270	}
271
272	#[tokio::test]
273	async fn test_ringbuffer_linked_to_namespace() {
274		let mut txn = create_test_command_transaction().await;
275		let test_namespace = ensure_test_namespace(&mut txn).await;
276
277		let to_create = RingBufferToCreate {
278			namespace: test_namespace.id,
279			ringbuffer: "buffer1".to_string(),
280			capacity: 10,
281			columns: vec![],
282			fragment: None,
283		};
284
285		CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
286
287		let to_create = RingBufferToCreate {
288			namespace: test_namespace.id,
289			ringbuffer: "buffer2".to_string(),
290			capacity: 20,
291			columns: vec![],
292			fragment: None,
293		};
294
295		CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
296
297		// Check namespace links
298		let links = txn
299			.range(NamespaceRingBufferKey::full_scan(test_namespace.id))
300			.await
301			.unwrap()
302			.items
303			.into_iter()
304			.collect::<Vec<_>>();
305		assert_eq!(links.len(), 2);
306
307		// Check first link (descending order, so buffer2 comes first)
308		let link = &links[0];
309		let row = &link.values;
310		let id2 = ringbuffer_namespace::LAYOUT.get_u64(row, ringbuffer_namespace::ID);
311		assert!(id2 > 0);
312		assert_eq!(ringbuffer_namespace::LAYOUT.get_utf8(row, ringbuffer_namespace::NAME), "buffer2");
313
314		// Check second link (buffer1 comes second)
315		let link = &links[1];
316		let row = &link.values;
317		let id1 = ringbuffer_namespace::LAYOUT.get_u64(row, ringbuffer_namespace::ID);
318		assert!(id2 > id1);
319		assert_eq!(ringbuffer_namespace::LAYOUT.get_utf8(row, ringbuffer_namespace::NAME), "buffer1");
320	}
321
322	#[tokio::test]
323	async fn test_create_ringbuffer_with_metadata() {
324		let mut txn = create_test_command_transaction().await;
325		let test_namespace = ensure_test_namespace(&mut txn).await;
326
327		let to_create = RingBufferToCreate {
328			namespace: test_namespace.id,
329			ringbuffer: "metadata_buffer".to_string(),
330			capacity: 500,
331			columns: vec![],
332			fragment: None,
333		};
334
335		let result = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
336
337		// Check that metadata was created
338		let metadata = CatalogStore::find_ringbuffer_metadata(&mut txn, result.id)
339			.await
340			.unwrap()
341			.expect("Metadata should exist");
342
343		assert_eq!(metadata.id, result.id);
344		assert_eq!(metadata.capacity, 500);
345		assert_eq!(metadata.count, 0);
346		assert_eq!(metadata.head, 0);
347		assert_eq!(metadata.tail, 0);
348	}
349
350	#[tokio::test]
351	async fn test_create_multiple_ringbuffers_with_different_capacities() {
352		let mut txn = create_test_command_transaction().await;
353		let test_namespace = ensure_test_namespace(&mut txn).await;
354
355		// Create small buffer
356		let small = RingBufferToCreate {
357			namespace: test_namespace.id,
358			ringbuffer: "small_buffer".to_string(),
359			capacity: 10,
360			columns: vec![],
361			fragment: None,
362		};
363		let small_result = CatalogStore::create_ringbuffer(&mut txn, small).await.unwrap();
364		assert_eq!(small_result.capacity, 10);
365
366		// Create medium buffer
367		let medium = RingBufferToCreate {
368			namespace: test_namespace.id,
369			ringbuffer: "medium_buffer".to_string(),
370			capacity: 1000,
371			columns: vec![],
372			fragment: None,
373		};
374		let medium_result = CatalogStore::create_ringbuffer(&mut txn, medium).await.unwrap();
375		assert_eq!(medium_result.capacity, 1000);
376
377		// Create large buffer
378		let large = RingBufferToCreate {
379			namespace: test_namespace.id,
380			ringbuffer: "large_buffer".to_string(),
381			capacity: 1000000,
382			columns: vec![],
383			fragment: None,
384		};
385		let large_result = CatalogStore::create_ringbuffer(&mut txn, large).await.unwrap();
386		assert_eq!(large_result.capacity, 1000000);
387
388		// Verify they have different IDs
389		assert_ne!(small_result.id, medium_result.id);
390		assert_ne!(medium_result.id, large_result.id);
391		assert_ne!(small_result.id, large_result.id);
392	}
393
394	#[tokio::test]
395	async fn test_create_ringbuffer_preserves_column_order() {
396		let mut txn = create_test_command_transaction().await;
397		let test_namespace = ensure_test_namespace(&mut txn).await;
398
399		let columns = vec![
400			RingBufferColumnToCreate {
401				name: "first".to_string(),
402				constraint: TypeConstraint::unconstrained(Type::Uint8),
403				fragment: None,
404				policies: vec![],
405				auto_increment: false,
406				dictionary_id: None,
407			},
408			RingBufferColumnToCreate {
409				name: "second".to_string(),
410				constraint: TypeConstraint::unconstrained(Type::Uint16),
411				fragment: None,
412				policies: vec![],
413				auto_increment: false,
414				dictionary_id: None,
415			},
416			RingBufferColumnToCreate {
417				name: "third".to_string(),
418				constraint: TypeConstraint::unconstrained(Type::Uint4),
419				fragment: None,
420				policies: vec![],
421				auto_increment: false,
422				dictionary_id: None,
423			},
424		];
425
426		let to_create = RingBufferToCreate {
427			namespace: test_namespace.id,
428			ringbuffer: "ordered_buffer".to_string(),
429			capacity: 100,
430			columns: columns.clone(),
431			fragment: None,
432		};
433
434		let result = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
435
436		assert_eq!(result.columns.len(), 3);
437		assert_eq!(result.columns[0].name, "first");
438		assert_eq!(result.columns[0].index.0, 0);
439		assert_eq!(result.columns[1].name, "second");
440		assert_eq!(result.columns[1].index.0, 1);
441		assert_eq!(result.columns[2].name, "third");
442		assert_eq!(result.columns[2].index.0, 2);
443	}
444}