reifydb_catalog/transaction/
ring_buffer.rs1use reifydb_core::{
5 interface::{
6 CommandTransaction, NamespaceId, QueryTransaction, RingBufferDef, RingBufferId, TransactionalChanges,
7 interceptor::WithInterceptors,
8 },
9 return_error,
10};
11use reifydb_type::{
12 IntoFragment,
13 diagnostic::catalog::{ring_buffer_already_exists, ring_buffer_not_found},
14};
15
16use crate::{
17 CatalogStore, store::ring_buffer::create::RingBufferToCreate, transaction::MaterializedCatalogTransaction,
18};
19
20pub trait CatalogRingBufferQueryOperations {
21 fn find_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>>;
22
23 fn find_ring_buffer_by_name<'a>(
24 &mut self,
25 namespace: NamespaceId,
26 name: impl IntoFragment<'a>,
27 ) -> crate::Result<Option<RingBufferDef>>;
28
29 fn get_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef>;
30
31 fn get_ring_buffer_by_name<'a>(
32 &mut self,
33 namespace: NamespaceId,
34 name: impl IntoFragment<'a>,
35 ) -> crate::Result<RingBufferDef>;
36}
37
38impl<QT: QueryTransaction + MaterializedCatalogTransaction> CatalogRingBufferQueryOperations for QT {
39 fn find_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>> {
40 CatalogStore::find_ring_buffer(self, id)
41 }
42
43 fn find_ring_buffer_by_name<'a>(
44 &mut self,
45 namespace: NamespaceId,
46 name: impl IntoFragment<'a>,
47 ) -> crate::Result<Option<RingBufferDef>> {
48 let name = name.into_fragment();
49 CatalogStore::find_ring_buffer_by_name(self, namespace, name.text())
50 }
51
52 fn get_ring_buffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef> {
53 CatalogStore::get_ring_buffer(self, id)
54 }
55
56 fn get_ring_buffer_by_name<'a>(
57 &mut self,
58 namespace: NamespaceId,
59 name: impl IntoFragment<'a>,
60 ) -> crate::Result<RingBufferDef> {
61 let name = name.into_fragment();
62 let name_text = name.text().to_string();
63 let ring_buffer = self.find_ring_buffer_by_name(namespace, name.clone())?;
64 match ring_buffer {
65 Some(rb) => Ok(rb),
66 None => {
67 let namespace = CatalogStore::get_namespace(self, namespace)?;
68 return_error!(ring_buffer_not_found(name, &namespace.name, &name_text))
69 }
70 }
71 }
72}
73
74pub trait CatalogTrackRingBufferChangeOperations {
75 fn track_ring_buffer_def_created(&mut self, ring_buffer: RingBufferDef) -> crate::Result<()>;
77
78 fn track_ring_buffer_def_updated(&mut self, pre: RingBufferDef, post: RingBufferDef) -> crate::Result<()>;
79
80 fn track_ring_buffer_def_deleted(&mut self, ring_buffer: RingBufferDef) -> crate::Result<()>;
81}
82
83pub trait CatalogRingBufferCommandOperations: CatalogRingBufferQueryOperations {
84 fn create_ring_buffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef>;
85}
86
87impl<
88 CT: CommandTransaction
89 + MaterializedCatalogTransaction
90 + CatalogTrackRingBufferChangeOperations
91 + WithInterceptors<CT>
92 + TransactionalChanges,
93> CatalogRingBufferCommandOperations for CT
94{
95 fn create_ring_buffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef> {
96 if let Some(_ring_buffer) =
97 self.find_ring_buffer_by_name(to_create.namespace, &to_create.ring_buffer)?
98 {
99 let namespace = CatalogStore::get_namespace(self, to_create.namespace)?;
100 return_error!(ring_buffer_already_exists(
101 to_create.fragment.unwrap_or_default(),
102 &namespace.name,
103 &to_create.ring_buffer
104 ));
105 }
106
107 CatalogStore::create_ring_buffer(self, to_create)
108 }
109}