reifydb_catalog/transaction/
ringbuffer.rs1use async_trait::async_trait;
5use reifydb_core::{
6 interface::{
7 CommandTransaction, NamespaceId, QueryTransaction, RingBufferDef, RingBufferId, TransactionalChanges,
8 interceptor::WithInterceptors,
9 },
10 return_error,
11};
12use reifydb_type::{
13 Fragment,
14 diagnostic::catalog::{ringbuffer_already_exists, ringbuffer_not_found},
15};
16use tracing::instrument;
17
18use crate::{CatalogStore, store::ringbuffer::create::RingBufferToCreate, transaction::MaterializedCatalogTransaction};
19
20#[async_trait]
21pub trait CatalogRingBufferQueryOperations: Send {
22 async fn find_ringbuffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>>;
23
24 async fn find_ringbuffer_by_name(
25 &mut self,
26 namespace: NamespaceId,
27 name: &str,
28 ) -> crate::Result<Option<RingBufferDef>>;
29
30 async fn get_ringbuffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef>;
31
32 async fn get_ringbuffer_by_name(
33 &mut self,
34 namespace: NamespaceId,
35 name: impl Into<Fragment> + Send,
36 ) -> crate::Result<RingBufferDef>;
37}
38
39#[async_trait]
40impl<QT: QueryTransaction + MaterializedCatalogTransaction + Send + 'static> CatalogRingBufferQueryOperations for QT {
41 #[instrument(name = "catalog::ringbuffer::find", level = "trace", skip(self))]
42 async fn find_ringbuffer(&mut self, id: RingBufferId) -> crate::Result<Option<RingBufferDef>> {
43 CatalogStore::find_ringbuffer(self, id).await
44 }
45
46 #[instrument(name = "catalog::ringbuffer::find_by_name", level = "trace", skip(self, name))]
47 async fn find_ringbuffer_by_name(
48 &mut self,
49 namespace: NamespaceId,
50 name: &str,
51 ) -> crate::Result<Option<RingBufferDef>> {
52 CatalogStore::find_ringbuffer_by_name(self, namespace, name).await
53 }
54
55 #[instrument(name = "catalog::ringbuffer::get", level = "trace", skip(self))]
56 async fn get_ringbuffer(&mut self, id: RingBufferId) -> crate::Result<RingBufferDef> {
57 CatalogStore::get_ringbuffer(self, id).await
58 }
59
60 #[instrument(name = "catalog::ringbuffer::get_by_name", level = "trace", skip(self, name))]
61 async fn get_ringbuffer_by_name(
62 &mut self,
63 namespace: NamespaceId,
64 name: impl Into<Fragment> + Send,
65 ) -> crate::Result<RingBufferDef> {
66 let name = name.into();
67 let name_text = name.text().to_string();
68 let ringbuffer = self.find_ringbuffer_by_name(namespace, name.text()).await?;
69 match ringbuffer {
70 Some(rb) => Ok(rb),
71 None => {
72 let namespace = CatalogStore::get_namespace(self, namespace).await?;
73 return_error!(ringbuffer_not_found(name, &namespace.name, &name_text))
74 }
75 }
76 }
77}
78
79pub trait CatalogTrackRingBufferChangeOperations {
80 fn track_ringbuffer_def_created(&mut self, ringbuffer: RingBufferDef) -> crate::Result<()>;
82
83 fn track_ringbuffer_def_updated(&mut self, pre: RingBufferDef, post: RingBufferDef) -> crate::Result<()>;
84
85 fn track_ringbuffer_def_deleted(&mut self, ringbuffer: RingBufferDef) -> crate::Result<()>;
86}
87
88#[async_trait]
89pub trait CatalogRingBufferCommandOperations: CatalogRingBufferQueryOperations {
90 async fn create_ringbuffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef>;
91}
92
93#[async_trait]
94impl<
95 CT: CommandTransaction
96 + MaterializedCatalogTransaction
97 + CatalogTrackRingBufferChangeOperations
98 + WithInterceptors<CT>
99 + TransactionalChanges
100 + Send
101 + 'static,
102> CatalogRingBufferCommandOperations for CT
103{
104 #[instrument(name = "catalog::ringbuffer::create", level = "debug", skip(self, to_create))]
105 async fn create_ringbuffer(&mut self, to_create: RingBufferToCreate) -> crate::Result<RingBufferDef> {
106 if let Some(_ringbuffer) =
107 self.find_ringbuffer_by_name(to_create.namespace, to_create.ringbuffer.as_str()).await?
108 {
109 let namespace = CatalogStore::get_namespace(self, to_create.namespace).await?;
110 return_error!(ringbuffer_already_exists(
111 to_create.fragment.unwrap_or_default(),
112 &namespace.name,
113 &to_create.ringbuffer
114 ));
115 }
116
117 CatalogStore::create_ringbuffer(self, to_create).await
118 }
119}