reifydb_core/interface/transaction/
change.rs1use reifydb_type::IntoFragment;
5
6use crate::interface::{
7 NamespaceDef, NamespaceId, OperationType::Delete, RingBufferDef, RingBufferId, TableDef, TableId,
8 TransactionId, ViewDef, ViewId,
9};
10
11pub trait TransactionalChanges:
12 TransactionalNamespaceChanges + TransactionalRingBufferChanges + TransactionalTableChanges + TransactionalViewChanges
13{
14}
15
16pub trait TransactionalNamespaceChanges {
17 fn find_namespace(&self, id: NamespaceId) -> Option<&NamespaceDef>;
18
19 fn find_namespace_by_name<'a>(&self, name: impl IntoFragment<'a>) -> Option<&NamespaceDef>;
20
21 fn is_namespace_deleted(&self, id: NamespaceId) -> bool;
22
23 fn is_namespace_deleted_by_name<'a>(&self, name: impl IntoFragment<'a>) -> bool;
24}
25
26pub trait TransactionalTableChanges {
27 fn find_table(&self, id: TableId) -> Option<&TableDef>;
28
29 fn find_table_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&TableDef>;
30
31 fn is_table_deleted(&self, id: TableId) -> bool;
32
33 fn is_table_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
34}
35
36pub trait TransactionalRingBufferChanges {
37 fn find_ring_buffer(&self, id: RingBufferId) -> Option<&RingBufferDef>;
38
39 fn find_ring_buffer_by_name<'a>(
40 &self,
41 namespace: NamespaceId,
42 name: impl IntoFragment<'a>,
43 ) -> Option<&RingBufferDef>;
44
45 fn is_ring_buffer_deleted(&self, id: RingBufferId) -> bool;
46
47 fn is_ring_buffer_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
48}
49
50pub trait TransactionalViewChanges {
51 fn find_view(&self, id: ViewId) -> Option<&ViewDef>;
52
53 fn find_view_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&ViewDef>;
54
55 fn is_view_deleted(&self, id: ViewId) -> bool;
56
57 fn is_view_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
58}
59
60#[derive(Default, Debug, Clone)]
61pub struct TransactionalDefChanges {
62 pub txn_id: TransactionId,
64 pub namespace_def: Vec<Change<NamespaceDef>>,
66 pub ring_buffer_def: Vec<Change<RingBufferDef>>,
68 pub table_def: Vec<Change<TableDef>>,
70 pub view_def: Vec<Change<ViewDef>>,
72 pub log: Vec<Operation>,
74}
75
76impl TransactionalDefChanges {
77 pub fn add_namespace_def_change(&mut self, change: Change<NamespaceDef>) {
78 let id = change
79 .post
80 .as_ref()
81 .or(change.pre.as_ref())
82 .map(|s| s.id)
83 .expect("Change must have either pre or post state");
84 let op = change.op;
85 self.namespace_def.push(change);
86 self.log.push(Operation::Namespace {
87 id,
88 op,
89 });
90 }
91
92 pub fn add_ring_buffer_def_change(&mut self, change: Change<RingBufferDef>) {
93 let id = change
94 .post
95 .as_ref()
96 .or(change.pre.as_ref())
97 .map(|rb| rb.id)
98 .expect("Change must have either pre or post state");
99 let op = change.op;
100 self.ring_buffer_def.push(change);
101 self.log.push(Operation::RingBuffer {
102 id,
103 op,
104 });
105 }
106
107 pub fn add_table_def_change(&mut self, change: Change<TableDef>) {
108 let id = change
109 .post
110 .as_ref()
111 .or(change.pre.as_ref())
112 .map(|t| t.id)
113 .expect("Change must have either pre or post state");
114 let op = change.op;
115 self.table_def.push(change);
116 self.log.push(Operation::Table {
117 id,
118 op,
119 });
120 }
121
122 pub fn add_view_def_change(&mut self, change: Change<ViewDef>) {
123 let id = change
124 .post
125 .as_ref()
126 .or(change.pre.as_ref())
127 .map(|v| v.id)
128 .expect("Change must have either pre or post state");
129 let op = change.op;
130 self.view_def.push(change);
131 self.log.push(Operation::View {
132 id,
133 op,
134 });
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct Change<T> {
141 pub pre: Option<T>,
143
144 pub post: Option<T>,
146
147 pub op: OperationType,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq)]
152pub enum OperationType {
153 Create,
154 Update,
155 Delete,
156}
157
158#[derive(Debug, Clone)]
160pub enum Operation {
161 Namespace {
162 id: NamespaceId,
163 op: OperationType,
164 },
165 RingBuffer {
166 id: RingBufferId,
167 op: OperationType,
168 },
169 Table {
170 id: TableId,
171 op: OperationType,
172 },
173 View {
174 id: ViewId,
175 op: OperationType,
176 },
177}
178
179impl TransactionalDefChanges {
180 pub fn new(txn_id: TransactionId) -> Self {
181 Self {
182 txn_id,
183 namespace_def: Vec::new(),
184 ring_buffer_def: Vec::new(),
185 table_def: Vec::new(),
186 view_def: Vec::new(),
187 log: Vec::new(),
188 }
189 }
190
191 pub fn table_def_exists(&self, id: TableId) -> bool {
193 self.get_table_def(id).is_some()
194 }
195
196 pub fn get_table_def(&self, id: TableId) -> Option<&TableDef> {
198 for change in self.table_def.iter().rev() {
200 if let Some(table) = &change.post {
201 if table.id == id {
202 return Some(table);
203 }
204 } else if let Some(table) = &change.pre {
205 if table.id == id && change.op == Delete {
206 return None;
208 }
209 }
210 }
211 None
212 }
213
214 pub fn view_def_exists(&self, id: ViewId) -> bool {
216 self.get_view_def(id).is_some()
217 }
218
219 pub fn get_view_def(&self, id: ViewId) -> Option<&ViewDef> {
221 for change in self.view_def.iter().rev() {
223 if let Some(view) = &change.post {
224 if view.id == id {
225 return Some(view);
226 }
227 } else if let Some(view) = &change.pre {
228 if view.id == id && change.op == Delete {
229 return None;
231 }
232 }
233 }
234 None
235 }
236
237 pub fn get_pending_changes(&self) -> &[Operation] {
239 &self.log
240 }
241
242 pub fn txn_id(&self) -> TransactionId {
244 self.txn_id
245 }
246
247 pub fn namespace_def(&self) -> &[Change<NamespaceDef>] {
249 &self.namespace_def
250 }
251
252 pub fn table_def(&self) -> &[Change<TableDef>] {
254 &self.table_def
255 }
256
257 pub fn view_def(&self) -> &[Change<ViewDef>] {
259 &self.view_def
260 }
261
262 pub fn clear(&mut self) {
264 self.namespace_def.clear();
265 self.table_def.clear();
266 self.view_def.clear();
267 self.log.clear();
268 }
269}