reifydb_core/interface/transaction/
change.rs1use reifydb_type::IntoFragment;
5
6use crate::interface::{
7 DictionaryDef, DictionaryId, FlowDef, FlowId, NamespaceDef, NamespaceId, OperationType::Delete, RingBufferDef,
8 RingBufferId, TableDef, TableId, TransactionId, ViewDef, ViewId,
9};
10
11pub trait TransactionalChanges:
12 TransactionalDictionaryChanges
13 + TransactionalFlowChanges
14 + TransactionalNamespaceChanges
15 + TransactionalRingBufferChanges
16 + TransactionalTableChanges
17 + TransactionalViewChanges
18{
19}
20
21pub trait TransactionalDictionaryChanges {
22 fn find_dictionary(&self, id: DictionaryId) -> Option<&DictionaryDef>;
23
24 fn find_dictionary_by_name<'a>(
25 &self,
26 namespace: NamespaceId,
27 name: impl IntoFragment<'a>,
28 ) -> Option<&DictionaryDef>;
29
30 fn is_dictionary_deleted(&self, id: DictionaryId) -> bool;
31
32 fn is_dictionary_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
33}
34
35pub trait TransactionalNamespaceChanges {
36 fn find_namespace(&self, id: NamespaceId) -> Option<&NamespaceDef>;
37
38 fn find_namespace_by_name<'a>(&self, name: impl IntoFragment<'a>) -> Option<&NamespaceDef>;
39
40 fn is_namespace_deleted(&self, id: NamespaceId) -> bool;
41
42 fn is_namespace_deleted_by_name<'a>(&self, name: impl IntoFragment<'a>) -> bool;
43}
44
45pub trait TransactionalFlowChanges {
46 fn find_flow(&self, id: FlowId) -> Option<&FlowDef>;
47
48 fn find_flow_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&FlowDef>;
49
50 fn is_flow_deleted(&self, id: FlowId) -> bool;
51
52 fn is_flow_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
53}
54
55pub trait TransactionalTableChanges {
56 fn find_table(&self, id: TableId) -> Option<&TableDef>;
57
58 fn find_table_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&TableDef>;
59
60 fn is_table_deleted(&self, id: TableId) -> bool;
61
62 fn is_table_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
63}
64
65pub trait TransactionalRingBufferChanges {
66 fn find_ring_buffer(&self, id: RingBufferId) -> Option<&RingBufferDef>;
67
68 fn find_ring_buffer_by_name<'a>(
69 &self,
70 namespace: NamespaceId,
71 name: impl IntoFragment<'a>,
72 ) -> Option<&RingBufferDef>;
73
74 fn is_ring_buffer_deleted(&self, id: RingBufferId) -> bool;
75
76 fn is_ring_buffer_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
77}
78
79pub trait TransactionalViewChanges {
80 fn find_view(&self, id: ViewId) -> Option<&ViewDef>;
81
82 fn find_view_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&ViewDef>;
83
84 fn is_view_deleted(&self, id: ViewId) -> bool;
85
86 fn is_view_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
87}
88
89#[derive(Default, Debug, Clone)]
90pub struct TransactionalDefChanges {
91 pub txn_id: TransactionId,
93 pub dictionary_def: Vec<Change<DictionaryDef>>,
95 pub flow_def: Vec<Change<FlowDef>>,
97 pub namespace_def: Vec<Change<NamespaceDef>>,
99 pub ring_buffer_def: Vec<Change<RingBufferDef>>,
101 pub table_def: Vec<Change<TableDef>>,
103 pub view_def: Vec<Change<ViewDef>>,
105 pub log: Vec<Operation>,
107}
108
109impl TransactionalDefChanges {
110 pub fn add_dictionary_def_change(&mut self, change: Change<DictionaryDef>) {
111 let id = change
112 .post
113 .as_ref()
114 .or(change.pre.as_ref())
115 .map(|d| d.id)
116 .expect("Change must have either pre or post state");
117 let op = change.op;
118 self.dictionary_def.push(change);
119 self.log.push(Operation::Dictionary {
120 id,
121 op,
122 });
123 }
124
125 pub fn add_flow_def_change(&mut self, change: Change<FlowDef>) {
126 let id = change
127 .post
128 .as_ref()
129 .or(change.pre.as_ref())
130 .map(|f| f.id)
131 .expect("Change must have either pre or post state");
132 let op = change.op;
133 self.flow_def.push(change);
134 self.log.push(Operation::Flow {
135 id,
136 op,
137 });
138 }
139
140 pub fn add_namespace_def_change(&mut self, change: Change<NamespaceDef>) {
141 let id = change
142 .post
143 .as_ref()
144 .or(change.pre.as_ref())
145 .map(|s| s.id)
146 .expect("Change must have either pre or post state");
147 let op = change.op;
148 self.namespace_def.push(change);
149 self.log.push(Operation::Namespace {
150 id,
151 op,
152 });
153 }
154
155 pub fn add_ring_buffer_def_change(&mut self, change: Change<RingBufferDef>) {
156 let id = change
157 .post
158 .as_ref()
159 .or(change.pre.as_ref())
160 .map(|rb| rb.id)
161 .expect("Change must have either pre or post state");
162 let op = change.op;
163 self.ring_buffer_def.push(change);
164 self.log.push(Operation::RingBuffer {
165 id,
166 op,
167 });
168 }
169
170 pub fn add_table_def_change(&mut self, change: Change<TableDef>) {
171 let id = change
172 .post
173 .as_ref()
174 .or(change.pre.as_ref())
175 .map(|t| t.id)
176 .expect("Change must have either pre or post state");
177 let op = change.op;
178 self.table_def.push(change);
179 self.log.push(Operation::Table {
180 id,
181 op,
182 });
183 }
184
185 pub fn add_view_def_change(&mut self, change: Change<ViewDef>) {
186 let id = change
187 .post
188 .as_ref()
189 .or(change.pre.as_ref())
190 .map(|v| v.id)
191 .expect("Change must have either pre or post state");
192 let op = change.op;
193 self.view_def.push(change);
194 self.log.push(Operation::View {
195 id,
196 op,
197 });
198 }
199}
200
201#[derive(Debug, Clone)]
203pub struct Change<T> {
204 pub pre: Option<T>,
206
207 pub post: Option<T>,
209
210 pub op: OperationType,
212}
213
214#[derive(Debug, Clone, Copy, PartialEq)]
215pub enum OperationType {
216 Create,
217 Update,
218 Delete,
219}
220
221#[derive(Debug, Clone)]
223pub enum Operation {
224 Dictionary {
225 id: DictionaryId,
226 op: OperationType,
227 },
228 Flow {
229 id: FlowId,
230 op: OperationType,
231 },
232 Namespace {
233 id: NamespaceId,
234 op: OperationType,
235 },
236 RingBuffer {
237 id: RingBufferId,
238 op: OperationType,
239 },
240 Table {
241 id: TableId,
242 op: OperationType,
243 },
244 View {
245 id: ViewId,
246 op: OperationType,
247 },
248}
249
250impl TransactionalDefChanges {
251 pub fn new(txn_id: TransactionId) -> Self {
252 Self {
253 txn_id,
254 dictionary_def: Vec::new(),
255 flow_def: Vec::new(),
256 namespace_def: Vec::new(),
257 ring_buffer_def: Vec::new(),
258 table_def: Vec::new(),
259 view_def: Vec::new(),
260 log: Vec::new(),
261 }
262 }
263
264 pub fn table_def_exists(&self, id: TableId) -> bool {
266 self.get_table_def(id).is_some()
267 }
268
269 pub fn get_table_def(&self, id: TableId) -> Option<&TableDef> {
271 for change in self.table_def.iter().rev() {
273 if let Some(table) = &change.post {
274 if table.id == id {
275 return Some(table);
276 }
277 } else if let Some(table) = &change.pre {
278 if table.id == id && change.op == Delete {
279 return None;
281 }
282 }
283 }
284 None
285 }
286
287 pub fn view_def_exists(&self, id: ViewId) -> bool {
289 self.get_view_def(id).is_some()
290 }
291
292 pub fn get_view_def(&self, id: ViewId) -> Option<&ViewDef> {
294 for change in self.view_def.iter().rev() {
296 if let Some(view) = &change.post {
297 if view.id == id {
298 return Some(view);
299 }
300 } else if let Some(view) = &change.pre {
301 if view.id == id && change.op == Delete {
302 return None;
304 }
305 }
306 }
307 None
308 }
309
310 pub fn get_pending_changes(&self) -> &[Operation] {
312 &self.log
313 }
314
315 pub fn txn_id(&self) -> TransactionId {
317 self.txn_id
318 }
319
320 pub fn namespace_def(&self) -> &[Change<NamespaceDef>] {
322 &self.namespace_def
323 }
324
325 pub fn table_def(&self) -> &[Change<TableDef>] {
327 &self.table_def
328 }
329
330 pub fn view_def(&self) -> &[Change<ViewDef>] {
332 &self.view_def
333 }
334
335 pub fn clear(&mut self) {
337 self.dictionary_def.clear();
338 self.flow_def.clear();
339 self.namespace_def.clear();
340 self.ring_buffer_def.clear();
341 self.table_def.clear();
342 self.view_def.clear();
343 self.log.clear();
344 }
345}
346
347#[derive(Debug, Clone)]
349pub struct TableRowInsertion {
350 pub table_id: TableId,
351 pub row_number: reifydb_type::RowNumber,
352 pub encoded: crate::value::encoded::EncodedValues,
353}
354
355#[derive(Debug, Clone)]
357pub enum RowChange {
358 TableInsert(TableRowInsertion),
360 }