reifydb_core/interface/transaction/
change.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_type::IntoFragment;
5
6use crate::interface::{
7	DictionaryDef, DictionaryId, FlowDef, FlowId, NamespaceDef, NamespaceId, OperationType::Delete, RingBufferDef,
8	RingBufferId, TableDef, TableId, TransactionId, ViewDef, ViewId,
9};
10
11pub trait TransactionalChanges:
12	TransactionalDictionaryChanges
13	+ TransactionalFlowChanges
14	+ TransactionalNamespaceChanges
15	+ TransactionalRingBufferChanges
16	+ TransactionalTableChanges
17	+ TransactionalViewChanges
18{
19}
20
21pub trait TransactionalDictionaryChanges {
22	fn find_dictionary(&self, id: DictionaryId) -> Option<&DictionaryDef>;
23
24	fn find_dictionary_by_name<'a>(
25		&self,
26		namespace: NamespaceId,
27		name: impl IntoFragment<'a>,
28	) -> Option<&DictionaryDef>;
29
30	fn is_dictionary_deleted(&self, id: DictionaryId) -> bool;
31
32	fn is_dictionary_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
33}
34
35pub trait TransactionalNamespaceChanges {
36	fn find_namespace(&self, id: NamespaceId) -> Option<&NamespaceDef>;
37
38	fn find_namespace_by_name<'a>(&self, name: impl IntoFragment<'a>) -> Option<&NamespaceDef>;
39
40	fn is_namespace_deleted(&self, id: NamespaceId) -> bool;
41
42	fn is_namespace_deleted_by_name<'a>(&self, name: impl IntoFragment<'a>) -> bool;
43}
44
45pub trait TransactionalFlowChanges {
46	fn find_flow(&self, id: FlowId) -> Option<&FlowDef>;
47
48	fn find_flow_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&FlowDef>;
49
50	fn is_flow_deleted(&self, id: FlowId) -> bool;
51
52	fn is_flow_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
53}
54
55pub trait TransactionalTableChanges {
56	fn find_table(&self, id: TableId) -> Option<&TableDef>;
57
58	fn find_table_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&TableDef>;
59
60	fn is_table_deleted(&self, id: TableId) -> bool;
61
62	fn is_table_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
63}
64
65pub trait TransactionalRingBufferChanges {
66	fn find_ring_buffer(&self, id: RingBufferId) -> Option<&RingBufferDef>;
67
68	fn find_ring_buffer_by_name<'a>(
69		&self,
70		namespace: NamespaceId,
71		name: impl IntoFragment<'a>,
72	) -> Option<&RingBufferDef>;
73
74	fn is_ring_buffer_deleted(&self, id: RingBufferId) -> bool;
75
76	fn is_ring_buffer_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
77}
78
79pub trait TransactionalViewChanges {
80	fn find_view(&self, id: ViewId) -> Option<&ViewDef>;
81
82	fn find_view_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&ViewDef>;
83
84	fn is_view_deleted(&self, id: ViewId) -> bool;
85
86	fn is_view_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
87}
88
89#[derive(Default, Debug, Clone)]
90pub struct TransactionalDefChanges {
91	/// Transaction ID this change set belongs to
92	pub txn_id: TransactionId,
93	/// All dictionary definition changes in order (no coalescing)
94	pub dictionary_def: Vec<Change<DictionaryDef>>,
95	/// All flow definition changes in order (no coalescing)
96	pub flow_def: Vec<Change<FlowDef>>,
97	/// All namespace definition changes in order (no coalescing)
98	pub namespace_def: Vec<Change<NamespaceDef>>,
99	/// All ring buffer definition changes in order (no coalescing)
100	pub ring_buffer_def: Vec<Change<RingBufferDef>>,
101	/// All table definition changes in order (no coalescing)
102	pub table_def: Vec<Change<TableDef>>,
103	/// All view definition changes in order (no coalescing)
104	pub view_def: Vec<Change<ViewDef>>,
105	/// Order of operations for replay/rollback
106	pub log: Vec<Operation>,
107}
108
109impl TransactionalDefChanges {
110	pub fn add_dictionary_def_change(&mut self, change: Change<DictionaryDef>) {
111		let id = change
112			.post
113			.as_ref()
114			.or(change.pre.as_ref())
115			.map(|d| d.id)
116			.expect("Change must have either pre or post state");
117		let op = change.op;
118		self.dictionary_def.push(change);
119		self.log.push(Operation::Dictionary {
120			id,
121			op,
122		});
123	}
124
125	pub fn add_flow_def_change(&mut self, change: Change<FlowDef>) {
126		let id = change
127			.post
128			.as_ref()
129			.or(change.pre.as_ref())
130			.map(|f| f.id)
131			.expect("Change must have either pre or post state");
132		let op = change.op;
133		self.flow_def.push(change);
134		self.log.push(Operation::Flow {
135			id,
136			op,
137		});
138	}
139
140	pub fn add_namespace_def_change(&mut self, change: Change<NamespaceDef>) {
141		let id = change
142			.post
143			.as_ref()
144			.or(change.pre.as_ref())
145			.map(|s| s.id)
146			.expect("Change must have either pre or post state");
147		let op = change.op;
148		self.namespace_def.push(change);
149		self.log.push(Operation::Namespace {
150			id,
151			op,
152		});
153	}
154
155	pub fn add_ring_buffer_def_change(&mut self, change: Change<RingBufferDef>) {
156		let id = change
157			.post
158			.as_ref()
159			.or(change.pre.as_ref())
160			.map(|rb| rb.id)
161			.expect("Change must have either pre or post state");
162		let op = change.op;
163		self.ring_buffer_def.push(change);
164		self.log.push(Operation::RingBuffer {
165			id,
166			op,
167		});
168	}
169
170	pub fn add_table_def_change(&mut self, change: Change<TableDef>) {
171		let id = change
172			.post
173			.as_ref()
174			.or(change.pre.as_ref())
175			.map(|t| t.id)
176			.expect("Change must have either pre or post state");
177		let op = change.op;
178		self.table_def.push(change);
179		self.log.push(Operation::Table {
180			id,
181			op,
182		});
183	}
184
185	pub fn add_view_def_change(&mut self, change: Change<ViewDef>) {
186		let id = change
187			.post
188			.as_ref()
189			.or(change.pre.as_ref())
190			.map(|v| v.id)
191			.expect("Change must have either pre or post state");
192		let op = change.op;
193		self.view_def.push(change);
194		self.log.push(Operation::View {
195			id,
196			op,
197		});
198	}
199}
200
201/// Represents a single change
202#[derive(Debug, Clone)]
203pub struct Change<T> {
204	/// State before the change (None for CREATE)
205	pub pre: Option<T>,
206
207	/// State after the change (None for DELETE)
208	pub post: Option<T>,
209
210	/// Type of operation
211	pub op: OperationType,
212}
213
214#[derive(Debug, Clone, Copy, PartialEq)]
215pub enum OperationType {
216	Create,
217	Update,
218	Delete,
219}
220
221/// Log entry for operation ordering
222#[derive(Debug, Clone)]
223pub enum Operation {
224	Dictionary {
225		id: DictionaryId,
226		op: OperationType,
227	},
228	Flow {
229		id: FlowId,
230		op: OperationType,
231	},
232	Namespace {
233		id: NamespaceId,
234		op: OperationType,
235	},
236	RingBuffer {
237		id: RingBufferId,
238		op: OperationType,
239	},
240	Table {
241		id: TableId,
242		op: OperationType,
243	},
244	View {
245		id: ViewId,
246		op: OperationType,
247	},
248}
249
250impl TransactionalDefChanges {
251	pub fn new(txn_id: TransactionId) -> Self {
252		Self {
253			txn_id,
254			dictionary_def: Vec::new(),
255			flow_def: Vec::new(),
256			namespace_def: Vec::new(),
257			ring_buffer_def: Vec::new(),
258			table_def: Vec::new(),
259			view_def: Vec::new(),
260			log: Vec::new(),
261		}
262	}
263
264	/// Check if a table exists in this transaction's view
265	pub fn table_def_exists(&self, id: TableId) -> bool {
266		self.get_table_def(id).is_some()
267	}
268
269	/// Get current state of a table within this transaction
270	pub fn get_table_def(&self, id: TableId) -> Option<&TableDef> {
271		// Find the last change for this table ID
272		for change in self.table_def.iter().rev() {
273			if let Some(table) = &change.post {
274				if table.id == id {
275					return Some(table);
276				}
277			} else if let Some(table) = &change.pre {
278				if table.id == id && change.op == Delete {
279					// Table was deleted
280					return None;
281				}
282			}
283		}
284		None
285	}
286
287	/// Check if a view exists in this transaction's view
288	pub fn view_def_exists(&self, id: ViewId) -> bool {
289		self.get_view_def(id).is_some()
290	}
291
292	/// Get current state of a view within this transaction
293	pub fn get_view_def(&self, id: ViewId) -> Option<&ViewDef> {
294		// Find the last change for this view ID
295		for change in self.view_def.iter().rev() {
296			if let Some(view) = &change.post {
297				if view.id == id {
298					return Some(view);
299				}
300			} else if let Some(view) = &change.pre {
301				if view.id == id && change.op == Delete {
302					// View was deleted
303					return None;
304				}
305			}
306		}
307		None
308	}
309
310	/// Get all pending changes for commit
311	pub fn get_pending_changes(&self) -> &[Operation] {
312		&self.log
313	}
314
315	/// Get the transaction ID
316	pub fn txn_id(&self) -> TransactionId {
317		self.txn_id
318	}
319
320	/// Get namespace definition changes
321	pub fn namespace_def(&self) -> &[Change<NamespaceDef>] {
322		&self.namespace_def
323	}
324
325	/// Get table definition changes
326	pub fn table_def(&self) -> &[Change<TableDef>] {
327		&self.table_def
328	}
329
330	/// Get view definition changes
331	pub fn view_def(&self) -> &[Change<ViewDef>] {
332		&self.view_def
333	}
334
335	/// Clear all changes (for rollback)
336	pub fn clear(&mut self) {
337		self.dictionary_def.clear();
338		self.flow_def.clear();
339		self.namespace_def.clear();
340		self.ring_buffer_def.clear();
341		self.table_def.clear();
342		self.view_def.clear();
343		self.log.clear();
344	}
345}
346
347/// Tracks a table row insertion for post-commit event emission
348#[derive(Debug, Clone)]
349pub struct TableRowInsertion {
350	pub table_id: TableId,
351	pub row_number: reifydb_type::RowNumber,
352	pub encoded: crate::value::encoded::EncodedValues,
353}
354
355/// Tracks row changes across different entity types for post-commit event emission
356#[derive(Debug, Clone)]
357pub enum RowChange {
358	/// A row was inserted into a table
359	TableInsert(TableRowInsertion),
360	// Future variants:
361	// ViewInsert(ViewRowInsertion),
362	// RingBufferInsert(RingBufferRowInsertion),
363	// TableUpdate(TableRowUpdate),
364	// TableDelete(TableRowDelete),
365}