reifydb_catalog/transaction/
ring_buffer.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{
6		CommandTransaction, NamespaceId, QueryTransaction, RingBufferDef, RingBufferId, TransactionalChanges,
7		interceptor::WithInterceptors,
8	},
9	return_error,
10};
11use reifydb_type::{
12	IntoFragment,
13	diagnostic::catalog::{ring_buffer_already_exists, ring_buffer_not_found},
14};
15
16use crate::{
17	CatalogStore, store::ring_buffer::create::RingBufferToCreate, transaction::MaterializedCatalogTransaction,
18};
19
20pub trait CatalogRingBufferQueryOperations {
21	fn find_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>>;
22
23	fn find_ring_buffer_by_name<'a>(
24		&mut self,
25		namespace: NamespaceId,
26		name: impl IntoFragment<'a>,
27	) -> crate::Result<Option<RingBufferDef>>;
28
29	fn get_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef>;
30
31	fn get_ring_buffer_by_name<'a>(
32		&mut self,
33		namespace: NamespaceId,
34		name: impl IntoFragment<'a>,
35	) -> crate::Result<RingBufferDef>;
36}
37
38impl<QT: QueryTransaction + MaterializedCatalogTransaction> CatalogRingBufferQueryOperations for QT {
39	fn find_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>> {
40		CatalogStore::find_ring_buffer(self, id)
41	}
42
43	fn find_ring_buffer_by_name<'a>(
44		&mut self,
45		namespace: NamespaceId,
46		name: impl IntoFragment<'a>,
47	) -> crate::Result<Option<RingBufferDef>> {
48		let name = name.into_fragment();
49		CatalogStore::find_ring_buffer_by_name(self, namespace, name.text())
50	}
51
52	fn get_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef> {
53		CatalogStore::get_ring_buffer(self, id)
54	}
55
56	fn get_ring_buffer_by_name<'a>(
57		&mut self,
58		namespace: NamespaceId,
59		name: impl IntoFragment<'a>,
60	) -> crate::Result<RingBufferDef> {
61		let name = name.into_fragment();
62		let name_text = name.text().to_string();
63		let ring_buffer = self.find_ring_buffer_by_name(namespace, name.clone())?;
64		match ring_buffer {
65			Some(rb) => Ok(rb),
66			None => {
67				let namespace = CatalogStore::get_namespace(self, namespace)?;
68				return_error!(ring_buffer_not_found(name, &namespace.name, &name_text))
69			}
70		}
71	}
72}
73
74pub trait CatalogTrackRingBufferChangeOperations {
75	// Ring buffer tracking methods
76	fn track_ring_buffer_def_created(&mut self, ring_buffer: RingBufferDef) -> crate::Result<()>;
77
78	fn track_ring_buffer_def_updated(&mut self, pre: RingBufferDef, post: RingBufferDef) -> crate::Result<()>;
79
80	fn track_ring_buffer_def_deleted(&mut self, ring_buffer: RingBufferDef) -> crate::Result<()>;
81}
82
83pub trait CatalogRingBufferCommandOperations: CatalogRingBufferQueryOperations {
84	fn create_ring_buffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef>;
85}
86
87impl<
88	CT: CommandTransaction
89		+ MaterializedCatalogTransaction
90		+ CatalogTrackRingBufferChangeOperations
91		+ WithInterceptors<CT>
92		+ TransactionalChanges,
93> CatalogRingBufferCommandOperations for CT
94{
95	fn create_ring_buffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef> {
96		if let Some(_ring_buffer) =
97			self.find_ring_buffer_by_name(to_create.namespace, &to_create.ring_buffer)?
98		{
99			let namespace = CatalogStore::get_namespace(self, to_create.namespace)?;
100			return_error!(ring_buffer_already_exists(
101				to_create.fragment.unwrap_or_default(),
102				&namespace.name,
103				&to_create.ring_buffer
104			));
105		}
106
107		CatalogStore::create_ring_buffer(self, to_create)
108	}
109}