reifydb_transaction/transaction/catalog/
ringbuffer.rs1use reifydb_core::interface::catalog::{
5 change::CatalogTrackRingBufferChangeOperations,
6 id::{NamespaceId, RingBufferId},
7 ringbuffer::RingBufferDef,
8};
9use reifydb_type::Result;
10
11use crate::{
12 change::{
13 Change,
14 OperationType::{Create, Delete, Update},
15 TransactionalRingBufferChanges,
16 },
17 transaction::{admin::AdminTransaction, subscription::SubscriptionTransaction},
18};
19
20impl CatalogTrackRingBufferChangeOperations for AdminTransaction {
21 fn track_ringbuffer_def_created(&mut self, ringbuffer: RingBufferDef) -> Result<()> {
22 let change = Change {
23 pre: None,
24 post: Some(ringbuffer),
25 op: Create,
26 };
27 self.changes.add_ringbuffer_def_change(change);
28 Ok(())
29 }
30
31 fn track_ringbuffer_def_updated(&mut self, pre: RingBufferDef, post: RingBufferDef) -> Result<()> {
32 let change = Change {
33 pre: Some(pre),
34 post: Some(post),
35 op: Update,
36 };
37 self.changes.add_ringbuffer_def_change(change);
38 Ok(())
39 }
40
41 fn track_ringbuffer_def_deleted(&mut self, ringbuffer: RingBufferDef) -> Result<()> {
42 let change = Change {
43 pre: Some(ringbuffer),
44 post: None,
45 op: Delete,
46 };
47 self.changes.add_ringbuffer_def_change(change);
48 Ok(())
49 }
50}
51
52impl TransactionalRingBufferChanges for AdminTransaction {
53 fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBufferDef> {
54 for change in self.changes.ringbuffer_def.iter().rev() {
55 if let Some(ringbuffer) = &change.post {
56 if ringbuffer.id == id {
57 return Some(ringbuffer);
58 }
59 }
60 if let Some(ringbuffer) = &change.pre {
61 if ringbuffer.id == id && change.op == Delete {
62 return None;
63 }
64 }
65 }
66 None
67 }
68
69 fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBufferDef> {
70 for change in self.changes.ringbuffer_def.iter().rev() {
71 if let Some(ringbuffer) = &change.post {
72 if ringbuffer.namespace == namespace && ringbuffer.name == name {
73 return Some(ringbuffer);
74 }
75 }
76 if let Some(ringbuffer) = &change.pre {
77 if ringbuffer.namespace == namespace && ringbuffer.name == name && change.op == Delete {
78 return None;
79 }
80 }
81 }
82 None
83 }
84
85 fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool {
86 self.changes
87 .ringbuffer_def
88 .iter()
89 .any(|change| change.op == Delete && change.pre.as_ref().map(|rb| rb.id == id).unwrap_or(false))
90 }
91
92 fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
93 self.changes.ringbuffer_def.iter().any(|change| {
94 change.op == Delete
95 && change
96 .pre
97 .as_ref()
98 .map(|rb| rb.namespace == namespace && rb.name == name)
99 .unwrap_or(false)
100 })
101 }
102}
103
104impl CatalogTrackRingBufferChangeOperations for SubscriptionTransaction {
105 fn track_ringbuffer_def_created(&mut self, ringbuffer: RingBufferDef) -> Result<()> {
106 self.inner.track_ringbuffer_def_created(ringbuffer)
107 }
108
109 fn track_ringbuffer_def_updated(&mut self, pre: RingBufferDef, post: RingBufferDef) -> Result<()> {
110 self.inner.track_ringbuffer_def_updated(pre, post)
111 }
112
113 fn track_ringbuffer_def_deleted(&mut self, ringbuffer: RingBufferDef) -> Result<()> {
114 self.inner.track_ringbuffer_def_deleted(ringbuffer)
115 }
116}
117
118impl TransactionalRingBufferChanges for SubscriptionTransaction {
119 fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBufferDef> {
120 self.inner.find_ringbuffer(id)
121 }
122
123 fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBufferDef> {
124 self.inner.find_ringbuffer_by_name(namespace, name)
125 }
126
127 fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool {
128 self.inner.is_ringbuffer_deleted(id)
129 }
130
131 fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
132 self.inner.is_ringbuffer_deleted_by_name(namespace, name)
133 }
134}