reifydb_engine/transaction/catalog/
ring_buffer.rs1use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackRingBufferChangeOperations;
6use reifydb_core::interface::{
7 Change, NamespaceId, OperationType, OperationType::Delete, RingBufferDef, RingBufferId,
8 TransactionalRingBufferChanges,
9};
10use reifydb_type::IntoFragment;
11
12use crate::{StandardCommandTransaction, StandardQueryTransaction};
13
14impl CatalogTrackRingBufferChangeOperations for StandardCommandTransaction {
15 fn track_ring_buffer_def_created(&mut self, ring_buffer: RingBufferDef) -> reifydb_core::Result<()> {
16 let change = Change {
17 pre: None,
18 post: Some(ring_buffer),
19 op: Create,
20 };
21 self.changes.add_ring_buffer_def_change(change);
22 Ok(())
23 }
24
25 fn track_ring_buffer_def_updated(
26 &mut self,
27 pre: RingBufferDef,
28 post: RingBufferDef,
29 ) -> reifydb_core::Result<()> {
30 let change = Change {
31 pre: Some(pre),
32 post: Some(post),
33 op: Update,
34 };
35 self.changes.add_ring_buffer_def_change(change);
36 Ok(())
37 }
38
39 fn track_ring_buffer_def_deleted(&mut self, ring_buffer: RingBufferDef) -> reifydb_core::Result<()> {
40 let change = Change {
41 pre: Some(ring_buffer),
42 post: None,
43 op: Delete,
44 };
45 self.changes.add_ring_buffer_def_change(change);
46 Ok(())
47 }
48}
49
50impl TransactionalRingBufferChanges for StandardCommandTransaction {
51 fn find_ring_buffer(&self, id: RingBufferId) -> Option<&RingBufferDef> {
52 for change in self.changes.ring_buffer_def.iter().rev() {
54 if let Some(ring_buffer) = &change.post {
55 if ring_buffer.id == id {
56 return Some(ring_buffer);
57 }
58 }
59 if let Some(ring_buffer) = &change.pre {
60 if ring_buffer.id == id && change.op == Delete {
61 return None;
63 }
64 }
65 }
66 None
67 }
68
69 fn find_ring_buffer_by_name<'a>(
70 &self,
71 namespace: NamespaceId,
72 name: impl IntoFragment<'a>,
73 ) -> Option<&RingBufferDef> {
74 let name = name.into_fragment();
75 for change in self.changes.ring_buffer_def.iter().rev() {
77 if let Some(ring_buffer) = &change.post {
78 if ring_buffer.namespace == namespace && ring_buffer.name == name.text() {
79 return Some(ring_buffer);
80 }
81 }
82 if let Some(ring_buffer) = &change.pre {
83 if ring_buffer.namespace == namespace
84 && ring_buffer.name == name.text() && change.op == Delete
85 {
86 return None;
88 }
89 }
90 }
91 None
92 }
93
94 fn is_ring_buffer_deleted(&self, id: RingBufferId) -> bool {
95 self.changes
97 .ring_buffer_def
98 .iter()
99 .any(|change| change.op == Delete && change.pre.as_ref().map(|rb| rb.id == id).unwrap_or(false))
100 }
101
102 fn is_ring_buffer_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool {
103 let name = name.into_fragment();
104 self.changes.ring_buffer_def.iter().any(|change| {
106 change.op == Delete
107 && change
108 .pre
109 .as_ref()
110 .map(|rb| rb.namespace == namespace && rb.name == name.text())
111 .unwrap_or(false)
112 })
113 }
114}
115
116impl TransactionalRingBufferChanges for StandardQueryTransaction {
117 fn find_ring_buffer(&self, _id: RingBufferId) -> Option<&RingBufferDef> {
118 None
119 }
120
121 fn find_ring_buffer_by_name<'a>(
122 &self,
123 _namespace: NamespaceId,
124 _name: impl IntoFragment<'a>,
125 ) -> Option<&RingBufferDef> {
126 None
127 }
128
129 fn is_ring_buffer_deleted(&self, _id: RingBufferId) -> bool {
130 false
131 }
132
133 fn is_ring_buffer_deleted_by_name<'a>(&self, _namespace: NamespaceId, _name: impl IntoFragment<'a>) -> bool {
134 false
135 }
136}