reifydb_core/interface/transaction/
change.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use crate::interface::{
5	DictionaryDef, DictionaryId, FlowDef, FlowId, NamespaceDef, NamespaceId, OperationType::Delete, RingBufferDef,
6	RingBufferId, TableDef, TableId, TransactionId, ViewDef, ViewId,
7};
8
9pub trait TransactionalChanges:
10	TransactionalDictionaryChanges
11	+ TransactionalFlowChanges
12	+ TransactionalNamespaceChanges
13	+ TransactionalRingBufferChanges
14	+ TransactionalTableChanges
15	+ TransactionalViewChanges
16{
17}
18
19pub trait TransactionalDictionaryChanges {
20	fn find_dictionary(&self, id: DictionaryId) -> Option<&DictionaryDef>;
21
22	fn find_dictionary_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&DictionaryDef>;
23
24	fn is_dictionary_deleted(&self, id: DictionaryId) -> bool;
25
26	fn is_dictionary_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
27}
28
29pub trait TransactionalNamespaceChanges {
30	fn find_namespace(&self, id: NamespaceId) -> Option<&NamespaceDef>;
31
32	fn find_namespace_by_name(&self, name: &str) -> Option<&NamespaceDef>;
33
34	fn is_namespace_deleted(&self, id: NamespaceId) -> bool;
35
36	fn is_namespace_deleted_by_name(&self, name: &str) -> bool;
37}
38
39pub trait TransactionalFlowChanges {
40	fn find_flow(&self, id: FlowId) -> Option<&FlowDef>;
41
42	fn find_flow_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&FlowDef>;
43
44	fn is_flow_deleted(&self, id: FlowId) -> bool;
45
46	fn is_flow_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
47}
48
49pub trait TransactionalTableChanges {
50	fn find_table(&self, id: TableId) -> Option<&TableDef>;
51
52	fn find_table_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&TableDef>;
53
54	fn is_table_deleted(&self, id: TableId) -> bool;
55
56	fn is_table_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
57}
58
59pub trait TransactionalRingBufferChanges {
60	fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBufferDef>;
61
62	fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBufferDef>;
63
64	fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool;
65
66	fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
67}
68
69pub trait TransactionalViewChanges {
70	fn find_view(&self, id: ViewId) -> Option<&ViewDef>;
71
72	fn find_view_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&ViewDef>;
73
74	fn is_view_deleted(&self, id: ViewId) -> bool;
75
76	fn is_view_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
77}
78
79#[derive(Default, Debug, Clone)]
80pub struct TransactionalDefChanges {
81	/// Transaction ID this change set belongs to
82	pub txn_id: TransactionId,
83	/// All dictionary definition changes in order (no coalescing)
84	pub dictionary_def: Vec<Change<DictionaryDef>>,
85	/// All flow definition changes in order (no coalescing)
86	pub flow_def: Vec<Change<FlowDef>>,
87	/// All namespace definition changes in order (no coalescing)
88	pub namespace_def: Vec<Change<NamespaceDef>>,
89	/// All ring buffer definition changes in order (no coalescing)
90	pub ringbuffer_def: Vec<Change<RingBufferDef>>,
91	/// All table definition changes in order (no coalescing)
92	pub table_def: Vec<Change<TableDef>>,
93	/// All view definition changes in order (no coalescing)
94	pub view_def: Vec<Change<ViewDef>>,
95	/// Order of operations for replay/rollback
96	pub log: Vec<Operation>,
97}
98
99impl TransactionalDefChanges {
100	pub fn add_dictionary_def_change(&mut self, change: Change<DictionaryDef>) {
101		let id = change
102			.post
103			.as_ref()
104			.or(change.pre.as_ref())
105			.map(|d| d.id)
106			.expect("Change must have either pre or post state");
107		let op = change.op;
108		self.dictionary_def.push(change);
109		self.log.push(Operation::Dictionary {
110			id,
111			op,
112		});
113	}
114
115	pub fn add_flow_def_change(&mut self, change: Change<FlowDef>) {
116		let id = change
117			.post
118			.as_ref()
119			.or(change.pre.as_ref())
120			.map(|f| f.id)
121			.expect("Change must have either pre or post state");
122		let op = change.op;
123		self.flow_def.push(change);
124		self.log.push(Operation::Flow {
125			id,
126			op,
127		});
128	}
129
130	pub fn add_namespace_def_change(&mut self, change: Change<NamespaceDef>) {
131		let id = change
132			.post
133			.as_ref()
134			.or(change.pre.as_ref())
135			.map(|s| s.id)
136			.expect("Change must have either pre or post state");
137		let op = change.op;
138		self.namespace_def.push(change);
139		self.log.push(Operation::Namespace {
140			id,
141			op,
142		});
143	}
144
145	pub fn add_ringbuffer_def_change(&mut self, change: Change<RingBufferDef>) {
146		let id = change
147			.post
148			.as_ref()
149			.or(change.pre.as_ref())
150			.map(|rb| rb.id)
151			.expect("Change must have either pre or post state");
152		let op = change.op;
153		self.ringbuffer_def.push(change);
154		self.log.push(Operation::RingBuffer {
155			id,
156			op,
157		});
158	}
159
160	pub fn add_table_def_change(&mut self, change: Change<TableDef>) {
161		let id = change
162			.post
163			.as_ref()
164			.or(change.pre.as_ref())
165			.map(|t| t.id)
166			.expect("Change must have either pre or post state");
167		let op = change.op;
168		self.table_def.push(change);
169		self.log.push(Operation::Table {
170			id,
171			op,
172		});
173	}
174
175	pub fn add_view_def_change(&mut self, change: Change<ViewDef>) {
176		let id = change
177			.post
178			.as_ref()
179			.or(change.pre.as_ref())
180			.map(|v| v.id)
181			.expect("Change must have either pre or post state");
182		let op = change.op;
183		self.view_def.push(change);
184		self.log.push(Operation::View {
185			id,
186			op,
187		});
188	}
189}
190
191/// Represents a single change
192#[derive(Debug, Clone)]
193pub struct Change<T> {
194	/// State before the change (None for CREATE)
195	pub pre: Option<T>,
196
197	/// State after the change (None for DELETE)
198	pub post: Option<T>,
199
200	/// Type of operation
201	pub op: OperationType,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq)]
205pub enum OperationType {
206	Create,
207	Update,
208	Delete,
209}
210
211/// Log entry for operation ordering
212#[derive(Debug, Clone)]
213pub enum Operation {
214	Dictionary {
215		id: DictionaryId,
216		op: OperationType,
217	},
218	Flow {
219		id: FlowId,
220		op: OperationType,
221	},
222	Namespace {
223		id: NamespaceId,
224		op: OperationType,
225	},
226	RingBuffer {
227		id: RingBufferId,
228		op: OperationType,
229	},
230	Table {
231		id: TableId,
232		op: OperationType,
233	},
234	View {
235		id: ViewId,
236		op: OperationType,
237	},
238}
239
240impl TransactionalDefChanges {
241	pub fn new(txn_id: TransactionId) -> Self {
242		Self {
243			txn_id,
244			dictionary_def: Vec::new(),
245			flow_def: Vec::new(),
246			namespace_def: Vec::new(),
247			ringbuffer_def: Vec::new(),
248			table_def: Vec::new(),
249			view_def: Vec::new(),
250			log: Vec::new(),
251		}
252	}
253
254	/// Check if a table exists in this transaction's view
255	pub fn table_def_exists(&self, id: TableId) -> bool {
256		self.get_table_def(id).is_some()
257	}
258
259	/// Get current state of a table within this transaction
260	pub fn get_table_def(&self, id: TableId) -> Option<&TableDef> {
261		// Find the last change for this table ID
262		for change in self.table_def.iter().rev() {
263			if let Some(table) = &change.post {
264				if table.id == id {
265					return Some(table);
266				}
267			} else if let Some(table) = &change.pre {
268				if table.id == id && change.op == Delete {
269					// Table was deleted
270					return None;
271				}
272			}
273		}
274		None
275	}
276
277	/// Check if a view exists in this transaction's view
278	pub fn view_def_exists(&self, id: ViewId) -> bool {
279		self.get_view_def(id).is_some()
280	}
281
282	/// Get current state of a view within this transaction
283	pub fn get_view_def(&self, id: ViewId) -> Option<&ViewDef> {
284		// Find the last change for this view ID
285		for change in self.view_def.iter().rev() {
286			if let Some(view) = &change.post {
287				if view.id == id {
288					return Some(view);
289				}
290			} else if let Some(view) = &change.pre {
291				if view.id == id && change.op == Delete {
292					// View was deleted
293					return None;
294				}
295			}
296		}
297		None
298	}
299
300	/// Get all pending changes for commit
301	pub fn get_pending_changes(&self) -> &[Operation] {
302		&self.log
303	}
304
305	/// Get the transaction ID
306	pub fn txn_id(&self) -> TransactionId {
307		self.txn_id
308	}
309
310	/// Get namespace definition changes
311	pub fn namespace_def(&self) -> &[Change<NamespaceDef>] {
312		&self.namespace_def
313	}
314
315	/// Get table definition changes
316	pub fn table_def(&self) -> &[Change<TableDef>] {
317		&self.table_def
318	}
319
320	/// Get view definition changes
321	pub fn view_def(&self) -> &[Change<ViewDef>] {
322		&self.view_def
323	}
324
325	/// Clear all changes (for rollback)
326	pub fn clear(&mut self) {
327		self.dictionary_def.clear();
328		self.flow_def.clear();
329		self.namespace_def.clear();
330		self.ringbuffer_def.clear();
331		self.table_def.clear();
332		self.view_def.clear();
333		self.log.clear();
334	}
335}
336
337/// Tracks a table row insertion for post-commit event emission
338#[derive(Debug, Clone)]
339pub struct TableRowInsertion {
340	pub table_id: TableId,
341	pub row_number: reifydb_type::RowNumber,
342	pub encoded: crate::value::encoded::EncodedValues,
343}
344
345/// Tracks row changes across different entity types for post-commit event emission
346#[derive(Debug, Clone)]
347pub enum RowChange {
348	/// A row was inserted into a table
349	TableInsert(TableRowInsertion),
350	// Future variants:
351	// ViewInsert(ViewRowInsertion),
352	// RingBufferInsert(RingBufferRowInsertion),
353	// TableUpdate(TableRowUpdate),
354	// TableDelete(TableRowDelete),
355}