reifydb_catalog/transaction/
ring_buffer.rs1use reifydb_core::{
5 interface::{
6 CommandTransaction, NamespaceId, QueryTransaction, RingBufferDef, RingBufferId, TransactionalChanges,
7 interceptor::WithInterceptors,
8 },
9 return_error,
10};
11use reifydb_type::{
12 IntoFragment,
13 diagnostic::catalog::{ring_buffer_already_exists, ring_buffer_not_found},
14};
15use tracing::instrument;
16
17use crate::{
18 CatalogStore, store::ring_buffer::create::RingBufferToCreate, transaction::MaterializedCatalogTransaction,
19};
20
21pub trait CatalogRingBufferQueryOperations {
22 fn find_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>>;
23
24 fn find_ring_buffer_by_name<'a>(
25 &mut self,
26 namespace: NamespaceId,
27 name: impl IntoFragment<'a>,
28 ) -> crate::Result<Option<RingBufferDef>>;
29
30 fn get_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef>;
31
32 fn get_ring_buffer_by_name<'a>(
33 &mut self,
34 namespace: NamespaceId,
35 name: impl IntoFragment<'a>,
36 ) -> crate::Result<RingBufferDef>;
37}
38
39impl<QT: QueryTransaction + MaterializedCatalogTransaction> CatalogRingBufferQueryOperations for QT {
40 #[instrument(level = "trace", skip(self))]
41 fn find_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>> {
42 CatalogStore::find_ring_buffer(self, id)
43 }
44
45 #[instrument(level = "trace", skip(self, name))]
46 fn find_ring_buffer_by_name<'a>(
47 &mut self,
48 namespace: NamespaceId,
49 name: impl IntoFragment<'a>,
50 ) -> crate::Result<Option<RingBufferDef>> {
51 let name = name.into_fragment();
52 CatalogStore::find_ring_buffer_by_name(self, namespace, name.text())
53 }
54
55 #[instrument(level = "trace", skip(self))]
56 fn get_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef> {
57 CatalogStore::get_ring_buffer(self, id)
58 }
59
60 #[instrument(level = "trace", skip(self, name))]
61 fn get_ring_buffer_by_name<'a>(
62 &mut self,
63 namespace: NamespaceId,
64 name: impl IntoFragment<'a>,
65 ) -> crate::Result<RingBufferDef> {
66 let name = name.into_fragment();
67 let name_text = name.text().to_string();
68 let ring_buffer = self.find_ring_buffer_by_name(namespace, name.clone())?;
69 match ring_buffer {
70 Some(rb) => Ok(rb),
71 None => {
72 let namespace = CatalogStore::get_namespace(self, namespace)?;
73 return_error!(ring_buffer_not_found(name, &namespace.name, &name_text))
74 }
75 }
76 }
77}
78
79pub trait CatalogTrackRingBufferChangeOperations {
80 fn track_ring_buffer_def_created(&mut self, ring_buffer: RingBufferDef) -> crate::Result<()>;
82
83 fn track_ring_buffer_def_updated(&mut self, pre: RingBufferDef, post: RingBufferDef) -> crate::Result<()>;
84
85 fn track_ring_buffer_def_deleted(&mut self, ring_buffer: RingBufferDef) -> crate::Result<()>;
86}
87
88pub trait CatalogRingBufferCommandOperations: CatalogRingBufferQueryOperations {
89 fn create_ring_buffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef>;
90}
91
92impl<
93 CT: CommandTransaction
94 + MaterializedCatalogTransaction
95 + CatalogTrackRingBufferChangeOperations
96 + WithInterceptors<CT>
97 + TransactionalChanges,
98> CatalogRingBufferCommandOperations for CT
99{
100 #[instrument(level = "debug", skip(self, to_create))]
101 fn create_ring_buffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef> {
102 if let Some(_ring_buffer) =
103 self.find_ring_buffer_by_name(to_create.namespace, &to_create.ring_buffer)?
104 {
105 let namespace = CatalogStore::get_namespace(self, to_create.namespace)?;
106 return_error!(ring_buffer_already_exists(
107 to_create.fragment.unwrap_or_default(),
108 &namespace.name,
109 &to_create.ring_buffer
110 ));
111 }
112
113 CatalogStore::create_ring_buffer(self, to_create)
114 }
115}