reifydb_engine/transaction/catalog/
ringbuffer.rs1use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackRingBufferChangeOperations;
6use reifydb_core::interface::{
7 Change, NamespaceId, OperationType, OperationType::Delete, RingBufferDef, RingBufferId,
8 TransactionalRingBufferChanges,
9};
10
11use crate::{StandardCommandTransaction, StandardQueryTransaction};
12
13impl CatalogTrackRingBufferChangeOperations for StandardCommandTransaction {
14 fn track_ringbuffer_def_created(&mut self, ringbuffer: RingBufferDef) -> reifydb_core::Result<()> {
15 let change = Change {
16 pre: None,
17 post: Some(ringbuffer),
18 op: Create,
19 };
20 self.changes.add_ringbuffer_def_change(change);
21 Ok(())
22 }
23
24 fn track_ringbuffer_def_updated(
25 &mut self,
26 pre: RingBufferDef,
27 post: RingBufferDef,
28 ) -> reifydb_core::Result<()> {
29 let change = Change {
30 pre: Some(pre),
31 post: Some(post),
32 op: Update,
33 };
34 self.changes.add_ringbuffer_def_change(change);
35 Ok(())
36 }
37
38 fn track_ringbuffer_def_deleted(&mut self, ringbuffer: RingBufferDef) -> reifydb_core::Result<()> {
39 let change = Change {
40 pre: Some(ringbuffer),
41 post: None,
42 op: Delete,
43 };
44 self.changes.add_ringbuffer_def_change(change);
45 Ok(())
46 }
47}
48
49impl TransactionalRingBufferChanges for StandardCommandTransaction {
50 fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBufferDef> {
51 for change in self.changes.ringbuffer_def.iter().rev() {
53 if let Some(ringbuffer) = &change.post {
54 if ringbuffer.id == id {
55 return Some(ringbuffer);
56 }
57 }
58 if let Some(ringbuffer) = &change.pre {
59 if ringbuffer.id == id && change.op == Delete {
60 return None;
62 }
63 }
64 }
65 None
66 }
67
68 fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBufferDef> {
69 for change in self.changes.ringbuffer_def.iter().rev() {
71 if let Some(ringbuffer) = &change.post {
72 if ringbuffer.namespace == namespace && ringbuffer.name == name {
73 return Some(ringbuffer);
74 }
75 }
76 if let Some(ringbuffer) = &change.pre {
77 if ringbuffer.namespace == namespace && ringbuffer.name == name && change.op == Delete {
78 return None;
80 }
81 }
82 }
83 None
84 }
85
86 fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool {
87 self.changes
89 .ringbuffer_def
90 .iter()
91 .any(|change| change.op == Delete && change.pre.as_ref().map(|rb| rb.id == id).unwrap_or(false))
92 }
93
94 fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
95 self.changes.ringbuffer_def.iter().any(|change| {
97 change.op == Delete
98 && change
99 .pre
100 .as_ref()
101 .map(|rb| rb.namespace == namespace && rb.name == name)
102 .unwrap_or(false)
103 })
104 }
105}
106
107impl TransactionalRingBufferChanges for StandardQueryTransaction {
108 fn find_ringbuffer(&self, _id: RingBufferId) -> Option<&RingBufferDef> {
109 None
110 }
111
112 fn find_ringbuffer_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&RingBufferDef> {
113 None
114 }
115
116 fn is_ringbuffer_deleted(&self, _id: RingBufferId) -> bool {
117 false
118 }
119
120 fn is_ringbuffer_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
121 false
122 }
123}