reifydb_core/interface/transaction/
change.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_type::IntoFragment;
5
6use crate::interface::{
7	DictionaryDef, DictionaryId, NamespaceDef, NamespaceId, OperationType::Delete, RingBufferDef, RingBufferId,
8	TableDef, TableId, TransactionId, ViewDef, ViewId,
9};
10
11pub trait TransactionalChanges:
12	TransactionalDictionaryChanges
13	+ TransactionalNamespaceChanges
14	+ TransactionalRingBufferChanges
15	+ TransactionalTableChanges
16	+ TransactionalViewChanges
17{
18}
19
20pub trait TransactionalDictionaryChanges {
21	fn find_dictionary(&self, id: DictionaryId) -> Option<&DictionaryDef>;
22
23	fn find_dictionary_by_name<'a>(
24		&self,
25		namespace: NamespaceId,
26		name: impl IntoFragment<'a>,
27	) -> Option<&DictionaryDef>;
28
29	fn is_dictionary_deleted(&self, id: DictionaryId) -> bool;
30
31	fn is_dictionary_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
32}
33
34pub trait TransactionalNamespaceChanges {
35	fn find_namespace(&self, id: NamespaceId) -> Option<&NamespaceDef>;
36
37	fn find_namespace_by_name<'a>(&self, name: impl IntoFragment<'a>) -> Option<&NamespaceDef>;
38
39	fn is_namespace_deleted(&self, id: NamespaceId) -> bool;
40
41	fn is_namespace_deleted_by_name<'a>(&self, name: impl IntoFragment<'a>) -> bool;
42}
43
44pub trait TransactionalTableChanges {
45	fn find_table(&self, id: TableId) -> Option<&TableDef>;
46
47	fn find_table_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&TableDef>;
48
49	fn is_table_deleted(&self, id: TableId) -> bool;
50
51	fn is_table_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
52}
53
54pub trait TransactionalRingBufferChanges {
55	fn find_ring_buffer(&self, id: RingBufferId) -> Option<&RingBufferDef>;
56
57	fn find_ring_buffer_by_name<'a>(
58		&self,
59		namespace: NamespaceId,
60		name: impl IntoFragment<'a>,
61	) -> Option<&RingBufferDef>;
62
63	fn is_ring_buffer_deleted(&self, id: RingBufferId) -> bool;
64
65	fn is_ring_buffer_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
66}
67
68pub trait TransactionalViewChanges {
69	fn find_view(&self, id: ViewId) -> Option<&ViewDef>;
70
71	fn find_view_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&ViewDef>;
72
73	fn is_view_deleted(&self, id: ViewId) -> bool;
74
75	fn is_view_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
76}
77
78#[derive(Default, Debug, Clone)]
79pub struct TransactionalDefChanges {
80	/// Transaction ID this change set belongs to
81	pub txn_id: TransactionId,
82	/// All dictionary definition changes in order (no coalescing)
83	pub dictionary_def: Vec<Change<DictionaryDef>>,
84	/// All namespace definition changes in order (no coalescing)
85	pub namespace_def: Vec<Change<NamespaceDef>>,
86	/// All ring buffer definition changes in order (no coalescing)
87	pub ring_buffer_def: Vec<Change<RingBufferDef>>,
88	/// All table definition changes in order (no coalescing)
89	pub table_def: Vec<Change<TableDef>>,
90	/// All view definition changes in order (no coalescing)
91	pub view_def: Vec<Change<ViewDef>>,
92	/// Order of operations for replay/rollback
93	pub log: Vec<Operation>,
94}
95
96impl TransactionalDefChanges {
97	pub fn add_dictionary_def_change(&mut self, change: Change<DictionaryDef>) {
98		let id = change
99			.post
100			.as_ref()
101			.or(change.pre.as_ref())
102			.map(|d| d.id)
103			.expect("Change must have either pre or post state");
104		let op = change.op;
105		self.dictionary_def.push(change);
106		self.log.push(Operation::Dictionary {
107			id,
108			op,
109		});
110	}
111
112	pub fn add_namespace_def_change(&mut self, change: Change<NamespaceDef>) {
113		let id = change
114			.post
115			.as_ref()
116			.or(change.pre.as_ref())
117			.map(|s| s.id)
118			.expect("Change must have either pre or post state");
119		let op = change.op;
120		self.namespace_def.push(change);
121		self.log.push(Operation::Namespace {
122			id,
123			op,
124		});
125	}
126
127	pub fn add_ring_buffer_def_change(&mut self, change: Change<RingBufferDef>) {
128		let id = change
129			.post
130			.as_ref()
131			.or(change.pre.as_ref())
132			.map(|rb| rb.id)
133			.expect("Change must have either pre or post state");
134		let op = change.op;
135		self.ring_buffer_def.push(change);
136		self.log.push(Operation::RingBuffer {
137			id,
138			op,
139		});
140	}
141
142	pub fn add_table_def_change(&mut self, change: Change<TableDef>) {
143		let id = change
144			.post
145			.as_ref()
146			.or(change.pre.as_ref())
147			.map(|t| t.id)
148			.expect("Change must have either pre or post state");
149		let op = change.op;
150		self.table_def.push(change);
151		self.log.push(Operation::Table {
152			id,
153			op,
154		});
155	}
156
157	pub fn add_view_def_change(&mut self, change: Change<ViewDef>) {
158		let id = change
159			.post
160			.as_ref()
161			.or(change.pre.as_ref())
162			.map(|v| v.id)
163			.expect("Change must have either pre or post state");
164		let op = change.op;
165		self.view_def.push(change);
166		self.log.push(Operation::View {
167			id,
168			op,
169		});
170	}
171}
172
173/// Represents a single change
174#[derive(Debug, Clone)]
175pub struct Change<T> {
176	/// State before the change (None for CREATE)
177	pub pre: Option<T>,
178
179	/// State after the change (None for DELETE)
180	pub post: Option<T>,
181
182	/// Type of operation
183	pub op: OperationType,
184}
185
186#[derive(Debug, Clone, Copy, PartialEq)]
187pub enum OperationType {
188	Create,
189	Update,
190	Delete,
191}
192
193/// Log entry for operation ordering
194#[derive(Debug, Clone)]
195pub enum Operation {
196	Dictionary {
197		id: DictionaryId,
198		op: OperationType,
199	},
200	Namespace {
201		id: NamespaceId,
202		op: OperationType,
203	},
204	RingBuffer {
205		id: RingBufferId,
206		op: OperationType,
207	},
208	Table {
209		id: TableId,
210		op: OperationType,
211	},
212	View {
213		id: ViewId,
214		op: OperationType,
215	},
216}
217
218impl TransactionalDefChanges {
219	pub fn new(txn_id: TransactionId) -> Self {
220		Self {
221			txn_id,
222			dictionary_def: Vec::new(),
223			namespace_def: Vec::new(),
224			ring_buffer_def: Vec::new(),
225			table_def: Vec::new(),
226			view_def: Vec::new(),
227			log: Vec::new(),
228		}
229	}
230
231	/// Check if a table exists in this transaction's view
232	pub fn table_def_exists(&self, id: TableId) -> bool {
233		self.get_table_def(id).is_some()
234	}
235
236	/// Get current state of a table within this transaction
237	pub fn get_table_def(&self, id: TableId) -> Option<&TableDef> {
238		// Find the last change for this table ID
239		for change in self.table_def.iter().rev() {
240			if let Some(table) = &change.post {
241				if table.id == id {
242					return Some(table);
243				}
244			} else if let Some(table) = &change.pre {
245				if table.id == id && change.op == Delete {
246					// Table was deleted
247					return None;
248				}
249			}
250		}
251		None
252	}
253
254	/// Check if a view exists in this transaction's view
255	pub fn view_def_exists(&self, id: ViewId) -> bool {
256		self.get_view_def(id).is_some()
257	}
258
259	/// Get current state of a view within this transaction
260	pub fn get_view_def(&self, id: ViewId) -> Option<&ViewDef> {
261		// Find the last change for this view ID
262		for change in self.view_def.iter().rev() {
263			if let Some(view) = &change.post {
264				if view.id == id {
265					return Some(view);
266				}
267			} else if let Some(view) = &change.pre {
268				if view.id == id && change.op == Delete {
269					// View was deleted
270					return None;
271				}
272			}
273		}
274		None
275	}
276
277	/// Get all pending changes for commit
278	pub fn get_pending_changes(&self) -> &[Operation] {
279		&self.log
280	}
281
282	/// Get the transaction ID
283	pub fn txn_id(&self) -> TransactionId {
284		self.txn_id
285	}
286
287	/// Get namespace definition changes
288	pub fn namespace_def(&self) -> &[Change<NamespaceDef>] {
289		&self.namespace_def
290	}
291
292	/// Get table definition changes
293	pub fn table_def(&self) -> &[Change<TableDef>] {
294		&self.table_def
295	}
296
297	/// Get view definition changes
298	pub fn view_def(&self) -> &[Change<ViewDef>] {
299		&self.view_def
300	}
301
302	/// Clear all changes (for rollback)
303	pub fn clear(&mut self) {
304		self.dictionary_def.clear();
305		self.namespace_def.clear();
306		self.ring_buffer_def.clear();
307		self.table_def.clear();
308		self.view_def.clear();
309		self.log.clear();
310	}
311}