reifydb_core/interface/transaction/
change.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_type::IntoFragment;
5
6use crate::interface::{
7	NamespaceDef, NamespaceId, OperationType::Delete, RingBufferDef, RingBufferId, TableDef, TableId,
8	TransactionId, ViewDef, ViewId,
9};
10
11pub trait TransactionalChanges:
12	TransactionalNamespaceChanges + TransactionalRingBufferChanges + TransactionalTableChanges + TransactionalViewChanges
13{
14}
15
16pub trait TransactionalNamespaceChanges {
17	fn find_namespace(&self, id: NamespaceId) -> Option<&NamespaceDef>;
18
19	fn find_namespace_by_name<'a>(&self, name: impl IntoFragment<'a>) -> Option<&NamespaceDef>;
20
21	fn is_namespace_deleted(&self, id: NamespaceId) -> bool;
22
23	fn is_namespace_deleted_by_name<'a>(&self, name: impl IntoFragment<'a>) -> bool;
24}
25
26pub trait TransactionalTableChanges {
27	fn find_table(&self, id: TableId) -> Option<&TableDef>;
28
29	fn find_table_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&TableDef>;
30
31	fn is_table_deleted(&self, id: TableId) -> bool;
32
33	fn is_table_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
34}
35
36pub trait TransactionalRingBufferChanges {
37	fn find_ring_buffer(&self, id: RingBufferId) -> Option<&RingBufferDef>;
38
39	fn find_ring_buffer_by_name<'a>(
40		&self,
41		namespace: NamespaceId,
42		name: impl IntoFragment<'a>,
43	) -> Option<&RingBufferDef>;
44
45	fn is_ring_buffer_deleted(&self, id: RingBufferId) -> bool;
46
47	fn is_ring_buffer_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
48}
49
50pub trait TransactionalViewChanges {
51	fn find_view(&self, id: ViewId) -> Option<&ViewDef>;
52
53	fn find_view_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&ViewDef>;
54
55	fn is_view_deleted(&self, id: ViewId) -> bool;
56
57	fn is_view_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool;
58}
59
60#[derive(Default, Debug, Clone)]
61pub struct TransactionalDefChanges {
62	/// Transaction ID this change set belongs to
63	pub txn_id: TransactionId,
64	/// All namespace definition changes in order (no coalescing)
65	pub namespace_def: Vec<Change<NamespaceDef>>,
66	/// All ring buffer definition changes in order (no coalescing)
67	pub ring_buffer_def: Vec<Change<RingBufferDef>>,
68	/// All table definition changes in order (no coalescing)
69	pub table_def: Vec<Change<TableDef>>,
70	/// All view definition changes in order (no coalescing)
71	pub view_def: Vec<Change<ViewDef>>,
72	/// Order of operations for replay/rollback
73	pub log: Vec<Operation>,
74}
75
76impl TransactionalDefChanges {
77	pub fn add_namespace_def_change(&mut self, change: Change<NamespaceDef>) {
78		let id = change
79			.post
80			.as_ref()
81			.or(change.pre.as_ref())
82			.map(|s| s.id)
83			.expect("Change must have either pre or post state");
84		let op = change.op;
85		self.namespace_def.push(change);
86		self.log.push(Operation::Namespace {
87			id,
88			op,
89		});
90	}
91
92	pub fn add_ring_buffer_def_change(&mut self, change: Change<RingBufferDef>) {
93		let id = change
94			.post
95			.as_ref()
96			.or(change.pre.as_ref())
97			.map(|rb| rb.id)
98			.expect("Change must have either pre or post state");
99		let op = change.op;
100		self.ring_buffer_def.push(change);
101		self.log.push(Operation::RingBuffer {
102			id,
103			op,
104		});
105	}
106
107	pub fn add_table_def_change(&mut self, change: Change<TableDef>) {
108		let id = change
109			.post
110			.as_ref()
111			.or(change.pre.as_ref())
112			.map(|t| t.id)
113			.expect("Change must have either pre or post state");
114		let op = change.op;
115		self.table_def.push(change);
116		self.log.push(Operation::Table {
117			id,
118			op,
119		});
120	}
121
122	pub fn add_view_def_change(&mut self, change: Change<ViewDef>) {
123		let id = change
124			.post
125			.as_ref()
126			.or(change.pre.as_ref())
127			.map(|v| v.id)
128			.expect("Change must have either pre or post state");
129		let op = change.op;
130		self.view_def.push(change);
131		self.log.push(Operation::View {
132			id,
133			op,
134		});
135	}
136}
137
138/// Represents a single change
139#[derive(Debug, Clone)]
140pub struct Change<T> {
141	/// State before the change (None for CREATE)
142	pub pre: Option<T>,
143
144	/// State after the change (None for DELETE)
145	pub post: Option<T>,
146
147	/// Type of operation
148	pub op: OperationType,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq)]
152pub enum OperationType {
153	Create,
154	Update,
155	Delete,
156}
157
158/// Log entry for operation ordering
159#[derive(Debug, Clone)]
160pub enum Operation {
161	Namespace {
162		id: NamespaceId,
163		op: OperationType,
164	},
165	RingBuffer {
166		id: RingBufferId,
167		op: OperationType,
168	},
169	Table {
170		id: TableId,
171		op: OperationType,
172	},
173	View {
174		id: ViewId,
175		op: OperationType,
176	},
177}
178
179impl TransactionalDefChanges {
180	pub fn new(txn_id: TransactionId) -> Self {
181		Self {
182			txn_id,
183			namespace_def: Vec::new(),
184			ring_buffer_def: Vec::new(),
185			table_def: Vec::new(),
186			view_def: Vec::new(),
187			log: Vec::new(),
188		}
189	}
190
191	/// Check if a table exists in this transaction's view
192	pub fn table_def_exists(&self, id: TableId) -> bool {
193		self.get_table_def(id).is_some()
194	}
195
196	/// Get current state of a table within this transaction
197	pub fn get_table_def(&self, id: TableId) -> Option<&TableDef> {
198		// Find the last change for this table ID
199		for change in self.table_def.iter().rev() {
200			if let Some(table) = &change.post {
201				if table.id == id {
202					return Some(table);
203				}
204			} else if let Some(table) = &change.pre {
205				if table.id == id && change.op == Delete {
206					// Table was deleted
207					return None;
208				}
209			}
210		}
211		None
212	}
213
214	/// Check if a view exists in this transaction's view
215	pub fn view_def_exists(&self, id: ViewId) -> bool {
216		self.get_view_def(id).is_some()
217	}
218
219	/// Get current state of a view within this transaction
220	pub fn get_view_def(&self, id: ViewId) -> Option<&ViewDef> {
221		// Find the last change for this view ID
222		for change in self.view_def.iter().rev() {
223			if let Some(view) = &change.post {
224				if view.id == id {
225					return Some(view);
226				}
227			} else if let Some(view) = &change.pre {
228				if view.id == id && change.op == Delete {
229					// View was deleted
230					return None;
231				}
232			}
233		}
234		None
235	}
236
237	/// Get all pending changes for commit
238	pub fn get_pending_changes(&self) -> &[Operation] {
239		&self.log
240	}
241
242	/// Get the transaction ID
243	pub fn txn_id(&self) -> TransactionId {
244		self.txn_id
245	}
246
247	/// Get namespace definition changes
248	pub fn namespace_def(&self) -> &[Change<NamespaceDef>] {
249		&self.namespace_def
250	}
251
252	/// Get table definition changes
253	pub fn table_def(&self) -> &[Change<TableDef>] {
254		&self.table_def
255	}
256
257	/// Get view definition changes
258	pub fn view_def(&self) -> &[Change<ViewDef>] {
259		&self.view_def
260	}
261
262	/// Clear all changes (for rollback)
263	pub fn clear(&mut self) {
264		self.namespace_def.clear();
265		self.table_def.clear();
266		self.view_def.clear();
267		self.log.clear();
268	}
269}