reifydb_core/interface/transaction/
change.rs1use crate::interface::{
5 DictionaryDef, DictionaryId, FlowDef, FlowId, NamespaceDef, NamespaceId, OperationType::Delete, RingBufferDef,
6 RingBufferId, TableDef, TableId, TransactionId, ViewDef, ViewId,
7};
8
9pub trait TransactionalChanges:
10 TransactionalDictionaryChanges
11 + TransactionalFlowChanges
12 + TransactionalNamespaceChanges
13 + TransactionalRingBufferChanges
14 + TransactionalTableChanges
15 + TransactionalViewChanges
16{
17}
18
19pub trait TransactionalDictionaryChanges {
20 fn find_dictionary(&self, id: DictionaryId) -> Option<&DictionaryDef>;
21
22 fn find_dictionary_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&DictionaryDef>;
23
24 fn is_dictionary_deleted(&self, id: DictionaryId) -> bool;
25
26 fn is_dictionary_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
27}
28
29pub trait TransactionalNamespaceChanges {
30 fn find_namespace(&self, id: NamespaceId) -> Option<&NamespaceDef>;
31
32 fn find_namespace_by_name(&self, name: &str) -> Option<&NamespaceDef>;
33
34 fn is_namespace_deleted(&self, id: NamespaceId) -> bool;
35
36 fn is_namespace_deleted_by_name(&self, name: &str) -> bool;
37}
38
39pub trait TransactionalFlowChanges {
40 fn find_flow(&self, id: FlowId) -> Option<&FlowDef>;
41
42 fn find_flow_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&FlowDef>;
43
44 fn is_flow_deleted(&self, id: FlowId) -> bool;
45
46 fn is_flow_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
47}
48
49pub trait TransactionalTableChanges {
50 fn find_table(&self, id: TableId) -> Option<&TableDef>;
51
52 fn find_table_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&TableDef>;
53
54 fn is_table_deleted(&self, id: TableId) -> bool;
55
56 fn is_table_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
57}
58
59pub trait TransactionalRingBufferChanges {
60 fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBufferDef>;
61
62 fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBufferDef>;
63
64 fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool;
65
66 fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
67}
68
69pub trait TransactionalViewChanges {
70 fn find_view(&self, id: ViewId) -> Option<&ViewDef>;
71
72 fn find_view_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&ViewDef>;
73
74 fn is_view_deleted(&self, id: ViewId) -> bool;
75
76 fn is_view_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
77}
78
79#[derive(Default, Debug, Clone)]
80pub struct TransactionalDefChanges {
81 pub txn_id: TransactionId,
83 pub dictionary_def: Vec<Change<DictionaryDef>>,
85 pub flow_def: Vec<Change<FlowDef>>,
87 pub namespace_def: Vec<Change<NamespaceDef>>,
89 pub ringbuffer_def: Vec<Change<RingBufferDef>>,
91 pub table_def: Vec<Change<TableDef>>,
93 pub view_def: Vec<Change<ViewDef>>,
95 pub log: Vec<Operation>,
97}
98
99impl TransactionalDefChanges {
100 pub fn add_dictionary_def_change(&mut self, change: Change<DictionaryDef>) {
101 let id = change
102 .post
103 .as_ref()
104 .or(change.pre.as_ref())
105 .map(|d| d.id)
106 .expect("Change must have either pre or post state");
107 let op = change.op;
108 self.dictionary_def.push(change);
109 self.log.push(Operation::Dictionary {
110 id,
111 op,
112 });
113 }
114
115 pub fn add_flow_def_change(&mut self, change: Change<FlowDef>) {
116 let id = change
117 .post
118 .as_ref()
119 .or(change.pre.as_ref())
120 .map(|f| f.id)
121 .expect("Change must have either pre or post state");
122 let op = change.op;
123 self.flow_def.push(change);
124 self.log.push(Operation::Flow {
125 id,
126 op,
127 });
128 }
129
130 pub fn add_namespace_def_change(&mut self, change: Change<NamespaceDef>) {
131 let id = change
132 .post
133 .as_ref()
134 .or(change.pre.as_ref())
135 .map(|s| s.id)
136 .expect("Change must have either pre or post state");
137 let op = change.op;
138 self.namespace_def.push(change);
139 self.log.push(Operation::Namespace {
140 id,
141 op,
142 });
143 }
144
145 pub fn add_ringbuffer_def_change(&mut self, change: Change<RingBufferDef>) {
146 let id = change
147 .post
148 .as_ref()
149 .or(change.pre.as_ref())
150 .map(|rb| rb.id)
151 .expect("Change must have either pre or post state");
152 let op = change.op;
153 self.ringbuffer_def.push(change);
154 self.log.push(Operation::RingBuffer {
155 id,
156 op,
157 });
158 }
159
160 pub fn add_table_def_change(&mut self, change: Change<TableDef>) {
161 let id = change
162 .post
163 .as_ref()
164 .or(change.pre.as_ref())
165 .map(|t| t.id)
166 .expect("Change must have either pre or post state");
167 let op = change.op;
168 self.table_def.push(change);
169 self.log.push(Operation::Table {
170 id,
171 op,
172 });
173 }
174
175 pub fn add_view_def_change(&mut self, change: Change<ViewDef>) {
176 let id = change
177 .post
178 .as_ref()
179 .or(change.pre.as_ref())
180 .map(|v| v.id)
181 .expect("Change must have either pre or post state");
182 let op = change.op;
183 self.view_def.push(change);
184 self.log.push(Operation::View {
185 id,
186 op,
187 });
188 }
189}
190
191#[derive(Debug, Clone)]
193pub struct Change<T> {
194 pub pre: Option<T>,
196
197 pub post: Option<T>,
199
200 pub op: OperationType,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq)]
205pub enum OperationType {
206 Create,
207 Update,
208 Delete,
209}
210
211#[derive(Debug, Clone)]
213pub enum Operation {
214 Dictionary {
215 id: DictionaryId,
216 op: OperationType,
217 },
218 Flow {
219 id: FlowId,
220 op: OperationType,
221 },
222 Namespace {
223 id: NamespaceId,
224 op: OperationType,
225 },
226 RingBuffer {
227 id: RingBufferId,
228 op: OperationType,
229 },
230 Table {
231 id: TableId,
232 op: OperationType,
233 },
234 View {
235 id: ViewId,
236 op: OperationType,
237 },
238}
239
240impl TransactionalDefChanges {
241 pub fn new(txn_id: TransactionId) -> Self {
242 Self {
243 txn_id,
244 dictionary_def: Vec::new(),
245 flow_def: Vec::new(),
246 namespace_def: Vec::new(),
247 ringbuffer_def: Vec::new(),
248 table_def: Vec::new(),
249 view_def: Vec::new(),
250 log: Vec::new(),
251 }
252 }
253
254 pub fn table_def_exists(&self, id: TableId) -> bool {
256 self.get_table_def(id).is_some()
257 }
258
259 pub fn get_table_def(&self, id: TableId) -> Option<&TableDef> {
261 for change in self.table_def.iter().rev() {
263 if let Some(table) = &change.post {
264 if table.id == id {
265 return Some(table);
266 }
267 } else if let Some(table) = &change.pre {
268 if table.id == id && change.op == Delete {
269 return None;
271 }
272 }
273 }
274 None
275 }
276
277 pub fn view_def_exists(&self, id: ViewId) -> bool {
279 self.get_view_def(id).is_some()
280 }
281
282 pub fn get_view_def(&self, id: ViewId) -> Option<&ViewDef> {
284 for change in self.view_def.iter().rev() {
286 if let Some(view) = &change.post {
287 if view.id == id {
288 return Some(view);
289 }
290 } else if let Some(view) = &change.pre {
291 if view.id == id && change.op == Delete {
292 return None;
294 }
295 }
296 }
297 None
298 }
299
300 pub fn get_pending_changes(&self) -> &[Operation] {
302 &self.log
303 }
304
305 pub fn txn_id(&self) -> TransactionId {
307 self.txn_id
308 }
309
310 pub fn namespace_def(&self) -> &[Change<NamespaceDef>] {
312 &self.namespace_def
313 }
314
315 pub fn table_def(&self) -> &[Change<TableDef>] {
317 &self.table_def
318 }
319
320 pub fn view_def(&self) -> &[Change<ViewDef>] {
322 &self.view_def
323 }
324
325 pub fn clear(&mut self) {
327 self.dictionary_def.clear();
328 self.flow_def.clear();
329 self.namespace_def.clear();
330 self.ringbuffer_def.clear();
331 self.table_def.clear();
332 self.view_def.clear();
333 self.log.clear();
334 }
335}
336
337#[derive(Debug, Clone)]
339pub struct TableRowInsertion {
340 pub table_id: TableId,
341 pub row_number: reifydb_type::RowNumber,
342 pub encoded: crate::value::encoded::EncodedValues,
343}
344
345#[derive(Debug, Clone)]
347pub enum RowChange {
348 TableInsert(TableRowInsertion),
350 }