reifydb_catalog/store/ringbuffer/
find.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::interface::{
5	MultiVersionValues, NamespaceId, NamespaceRingBufferKey, QueryTransaction, RingBufferDef, RingBufferId,
6	RingBufferKey, RingBufferMetadata, RingBufferMetadataKey,
7};
8
9use crate::{
10	CatalogStore,
11	store::ringbuffer::layout::{ringbuffer, ringbuffer_metadata, ringbuffer_namespace},
12};
13
14impl CatalogStore {
15	pub async fn find_ringbuffer(
16		rx: &mut impl QueryTransaction,
17		ringbuffer: RingBufferId,
18	) -> crate::Result<Option<RingBufferDef>> {
19		let Some(multi) = rx.get(&RingBufferKey::encoded(ringbuffer)).await? else {
20			return Ok(None);
21		};
22
23		let row = multi.values;
24		let id = RingBufferId(ringbuffer::LAYOUT.get_u64(&row, ringbuffer::ID));
25		let namespace = NamespaceId(ringbuffer::LAYOUT.get_u64(&row, ringbuffer::NAMESPACE));
26		let name = ringbuffer::LAYOUT.get_utf8(&row, ringbuffer::NAME).to_string();
27		let capacity = ringbuffer::LAYOUT.get_u64(&row, ringbuffer::CAPACITY);
28
29		Ok(Some(RingBufferDef {
30			id,
31			namespace,
32			name,
33			capacity,
34			columns: Self::list_columns(rx, id).await?,
35			primary_key: Self::find_primary_key(rx, id).await?,
36		}))
37	}
38
39	pub async fn find_ringbuffer_metadata(
40		rx: &mut impl QueryTransaction,
41		ringbuffer: RingBufferId,
42	) -> crate::Result<Option<RingBufferMetadata>> {
43		let Some(multi) = rx.get(&RingBufferMetadataKey::encoded(ringbuffer)).await? else {
44			return Ok(None);
45		};
46
47		let row = multi.values;
48		let buffer_id = RingBufferId(ringbuffer_metadata::LAYOUT.get_u64(&row, ringbuffer_metadata::ID));
49		let capacity = ringbuffer_metadata::LAYOUT.get_u64(&row, ringbuffer_metadata::CAPACITY);
50		let head = ringbuffer_metadata::LAYOUT.get_u64(&row, ringbuffer_metadata::HEAD);
51		let tail = ringbuffer_metadata::LAYOUT.get_u64(&row, ringbuffer_metadata::TAIL);
52		let current_size = ringbuffer_metadata::LAYOUT.get_u64(&row, ringbuffer_metadata::COUNT);
53
54		Ok(Some(RingBufferMetadata {
55			id: buffer_id,
56			capacity,
57			count: current_size,
58			head,
59			tail,
60		}))
61	}
62
63	pub async fn find_ringbuffer_by_name(
64		rx: &mut impl QueryTransaction,
65		namespace: NamespaceId,
66		name: impl AsRef<str>,
67	) -> crate::Result<Option<RingBufferDef>> {
68		let name = name.as_ref();
69		let batch = rx.range(NamespaceRingBufferKey::full_scan(namespace)).await?;
70		let Some(ringbuffer) = batch.items.into_iter().find_map(|multi: MultiVersionValues| {
71			let row = &multi.values;
72			let ringbuffer_name = ringbuffer_namespace::LAYOUT.get_utf8(row, ringbuffer_namespace::NAME);
73			if name == ringbuffer_name {
74				Some(RingBufferId(ringbuffer_namespace::LAYOUT.get_u64(row, ringbuffer_namespace::ID)))
75			} else {
76				None
77			}
78		}) else {
79			return Ok(None);
80		};
81
82		Ok(Some(Self::get_ringbuffer(rx, ringbuffer).await?))
83	}
84}
85
86#[cfg(test)]
87mod tests {
88	use reifydb_core::interface::RingBufferId;
89	use reifydb_engine::test_utils::create_test_command_transaction;
90	use reifydb_type::{Type, TypeConstraint};
91
92	use crate::{
93		CatalogStore,
94		namespace::NamespaceToCreate,
95		primary_key::PrimaryKeyToCreate,
96		ringbuffer::create::{RingBufferColumnToCreate, RingBufferToCreate},
97		test_utils::{ensure_test_namespace, ensure_test_ringbuffer},
98	};
99
100	#[tokio::test]
101	async fn test_find_ringbuffer_exists() {
102		let mut txn = create_test_command_transaction().await;
103		let ringbuffer = ensure_test_ringbuffer(&mut txn).await;
104
105		let found = CatalogStore::find_ringbuffer(&mut txn, ringbuffer.id)
106			.await
107			.unwrap()
108			.expect("Ring buffer should exist");
109
110		assert_eq!(found.id, ringbuffer.id);
111		assert_eq!(found.name, ringbuffer.name);
112		assert_eq!(found.namespace, ringbuffer.namespace);
113		assert_eq!(found.capacity, ringbuffer.capacity);
114	}
115
116	#[tokio::test]
117	async fn test_find_ringbuffer_not_exists() {
118		let mut txn = create_test_command_transaction().await;
119
120		let result = CatalogStore::find_ringbuffer(&mut txn, RingBufferId(999)).await.unwrap();
121
122		assert!(result.is_none());
123	}
124
125	#[tokio::test]
126	async fn test_find_ringbuffer_metadata() {
127		let mut txn = create_test_command_transaction().await;
128		let ringbuffer = ensure_test_ringbuffer(&mut txn).await;
129
130		let metadata = CatalogStore::find_ringbuffer_metadata(&mut txn, ringbuffer.id)
131			.await
132			.unwrap()
133			.expect("Metadata should exist");
134
135		assert_eq!(metadata.id, ringbuffer.id);
136		assert_eq!(metadata.capacity, ringbuffer.capacity);
137		assert_eq!(metadata.count, 0);
138		assert_eq!(metadata.head, 0);
139		assert_eq!(metadata.tail, 0);
140	}
141
142	#[tokio::test]
143	async fn test_find_ringbuffer_metadata_not_exists() {
144		let mut txn = create_test_command_transaction().await;
145
146		let result = CatalogStore::find_ringbuffer_metadata(&mut txn, RingBufferId(999)).await.unwrap();
147
148		assert!(result.is_none());
149	}
150
151	#[tokio::test]
152	async fn test_find_ringbuffer_by_name_exists() {
153		let mut txn = create_test_command_transaction().await;
154		let namespace = ensure_test_namespace(&mut txn).await;
155
156		// Create a ring buffer with specific name
157		let to_create = RingBufferToCreate {
158			namespace: namespace.id,
159			ringbuffer: "trades_buffer".to_string(),
160			capacity: 200,
161			columns: vec![RingBufferColumnToCreate {
162				name: "symbol".to_string(),
163				constraint: TypeConstraint::unconstrained(Type::Utf8),
164				fragment: None,
165				policies: vec![],
166				auto_increment: false,
167				dictionary_id: None,
168			}],
169			fragment: None,
170		};
171
172		let created = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
173
174		// Find by name
175		let found = CatalogStore::find_ringbuffer_by_name(&mut txn, namespace.id, "trades_buffer")
176			.await
177			.unwrap()
178			.expect("Should find ring buffer by name");
179
180		assert_eq!(found.id, created.id);
181		assert_eq!(found.name, "trades_buffer");
182		assert_eq!(found.capacity, 200);
183		assert_eq!(found.columns.len(), 1);
184	}
185
186	#[tokio::test]
187	async fn test_find_ringbuffer_by_name_not_exists() {
188		let mut txn = create_test_command_transaction().await;
189		let namespace = ensure_test_namespace(&mut txn).await;
190
191		let result = CatalogStore::find_ringbuffer_by_name(&mut txn, namespace.id, "nonexistent_buffer")
192			.await
193			.unwrap();
194
195		assert!(result.is_none());
196	}
197
198	#[tokio::test]
199	async fn test_find_ringbuffer_by_name_different_namespace() {
200		let mut txn = create_test_command_transaction().await;
201		let namespace1 = ensure_test_namespace(&mut txn).await;
202
203		// Create namespace2
204		let namespace2 = CatalogStore::create_namespace(
205			&mut txn,
206			NamespaceToCreate {
207				namespace_fragment: None,
208				name: "namespace2".to_string(),
209			},
210		)
211		.await
212		.unwrap();
213
214		// Create ring buffer in namespace1
215		let to_create = RingBufferToCreate {
216			namespace: namespace1.id,
217			ringbuffer: "shared_name".to_string(),
218			capacity: 50,
219			columns: vec![],
220			fragment: None,
221		};
222
223		CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
224
225		// Try to find in namespace2 - should not exist
226		let result =
227			CatalogStore::find_ringbuffer_by_name(&mut txn, namespace2.id, "shared_name").await.unwrap();
228
229		assert!(result.is_none());
230
231		// Find in namespace1 - should exist
232		let found =
233			CatalogStore::find_ringbuffer_by_name(&mut txn, namespace1.id, "shared_name").await.unwrap();
234
235		assert!(found.is_some());
236	}
237
238	#[tokio::test]
239	async fn test_find_ringbuffer_with_columns_and_primary_key() {
240		let mut txn = create_test_command_transaction().await;
241		let namespace = ensure_test_namespace(&mut txn).await;
242
243		// Create ring buffer with columns
244		let to_create = RingBufferToCreate {
245			namespace: namespace.id,
246			ringbuffer: "pk_buffer".to_string(),
247			capacity: 100,
248			columns: vec![
249				RingBufferColumnToCreate {
250					name: "id".to_string(),
251					constraint: TypeConstraint::unconstrained(Type::Uint8),
252					fragment: None,
253					policies: vec![],
254					auto_increment: true,
255					dictionary_id: None,
256				},
257				RingBufferColumnToCreate {
258					name: "value".to_string(),
259					constraint: TypeConstraint::unconstrained(Type::Float8),
260					fragment: None,
261					policies: vec![],
262					auto_increment: false,
263					dictionary_id: None,
264				},
265			],
266			fragment: None,
267		};
268
269		let created = CatalogStore::create_ringbuffer(&mut txn, to_create).await.unwrap();
270
271		// Add primary key
272		let columns = CatalogStore::list_columns(&mut txn, created.id).await.unwrap();
273		let pk_id = CatalogStore::create_primary_key(
274			&mut txn,
275			PrimaryKeyToCreate {
276				source: created.id.into(),
277				column_ids: vec![columns[0].id],
278			},
279		)
280		.await
281		.unwrap();
282
283		// Find and verify
284		let found = CatalogStore::find_ringbuffer(&mut txn, created.id)
285			.await
286			.unwrap()
287			.expect("Ring buffer should exist");
288
289		assert_eq!(found.columns.len(), 2);
290		assert_eq!(found.columns[0].name, "id");
291		assert_eq!(found.columns[0].auto_increment, true);
292		assert_eq!(found.columns[1].name, "value");
293		assert!(found.primary_key.is_some());
294		assert_eq!(found.primary_key.unwrap().id, pk_id);
295	}
296}