reifydb_engine/transaction/
command.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::marker::PhantomData;
5
6use reifydb_catalog::{MaterializedCatalog, transaction::MaterializedCatalogTransaction};
7use reifydb_core::{
8	CommitVersion, EncodedKey, EncodedKeyRange,
9	diagnostic::transaction,
10	event::EventBus,
11	interceptor,
12	interceptor::{
13		Chain, Interceptors, PostCommitInterceptor, PreCommitInterceptor, RingBufferPostDeleteInterceptor,
14		RingBufferPostInsertInterceptor, RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
15		RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor, TablePostDeleteInterceptor,
16		TablePostInsertInterceptor, TablePreDeleteInterceptor, TablePreInsertInterceptor,
17		TablePreUpdateInterceptor,
18	},
19	interface::{
20		BoxedMultiVersionIter, CdcTransaction, CommandTransaction, MultiVersionCommandTransaction,
21		MultiVersionQueryTransaction, MultiVersionTransaction, MultiVersionValues, QueryTransaction,
22		SingleVersionTransaction, TransactionId, TransactionalChanges, TransactionalDefChanges, WithEventBus,
23		interceptor::{TransactionInterceptor, WithInterceptors},
24	},
25	return_error,
26	value::encoded::EncodedValues,
27};
28use reifydb_transaction::{
29	cdc::TransactionCdc,
30	multi::{TransactionMultiVersion, pending::PendingWrites},
31	single::TransactionSingleVersion,
32};
33
34use crate::transaction::query::StandardQueryTransaction;
35
36/// An active command transaction that holds a multi command transaction
37/// and provides query/command access to single storage.
38///
39/// The transaction will auto-rollback on drop if not explicitly committed.
40pub struct StandardCommandTransaction {
41	pub multi: TransactionMultiVersion,
42	pub single: TransactionSingleVersion,
43	pub(crate) cdc: TransactionCdc,
44	state: TransactionState,
45
46	pub(crate) cmd: Option<<TransactionMultiVersion as MultiVersionTransaction>::Command>,
47	pub(crate) event_bus: EventBus,
48	pub(crate) changes: TransactionalDefChanges,
49	pub(crate) catalog: MaterializedCatalog,
50
51	pub(crate) interceptors: Interceptors<Self>,
52	// Marker to prevent Send and Sync
53	_not_send_sync: PhantomData<*const ()>,
54}
55
56#[derive(Clone, Copy, PartialEq)]
57enum TransactionState {
58	Active,
59	Committed,
60	RolledBack,
61}
62
63impl StandardCommandTransaction {
64	/// Creates a new active command transaction with a pre-commit callback
65	pub fn new(
66		multi: TransactionMultiVersion,
67		single: TransactionSingleVersion,
68		cdc: TransactionCdc,
69		event_bus: EventBus,
70		catalog: MaterializedCatalog,
71		interceptors: Interceptors<Self>,
72	) -> reifydb_core::Result<Self> {
73		let cmd = multi.begin_command()?;
74		let txn_id = cmd.id();
75		Ok(Self {
76			cmd: Some(cmd),
77			multi,
78			single,
79			cdc,
80			state: TransactionState::Active,
81			event_bus,
82			catalog,
83			interceptors,
84			changes: TransactionalDefChanges::new(txn_id),
85			_not_send_sync: PhantomData,
86		})
87	}
88
89	pub fn event_bus(&self) -> &EventBus {
90		&self.event_bus
91	}
92
93	/// Check if transaction is still active and return appropriate error if
94	/// not
95	fn check_active(&self) -> crate::Result<()> {
96		match self.state {
97			TransactionState::Active => Ok(()),
98			TransactionState::Committed => {
99				return_error!(transaction::transaction_already_committed())
100			}
101			TransactionState::RolledBack => {
102				return_error!(transaction::transaction_already_rolled_back())
103			}
104		}
105	}
106
107	/// Commit the transaction.
108	/// Since single transactions are short-lived and auto-commit,
109	/// this only commits the multi transaction.
110	pub fn commit(&mut self) -> crate::Result<CommitVersion> {
111		self.check_active()?;
112
113		TransactionInterceptor::pre_commit(self)?;
114
115		if let Some(multi) = self.cmd.take() {
116			let id = multi.id();
117			self.state = TransactionState::Committed;
118
119			let changes = std::mem::take(&mut self.changes);
120
121			let version = multi.commit()?;
122			TransactionInterceptor::post_commit(self, id, version, changes)?;
123
124			Ok(version)
125		} else {
126			// This should never happen due to check_active
127			unreachable!("Transaction state inconsistency")
128		}
129	}
130
131	/// Rollback the transaction.
132	pub fn rollback(&mut self) -> crate::Result<()> {
133		self.check_active()?;
134		if let Some(multi) = self.cmd.take() {
135			self.state = TransactionState::RolledBack;
136			multi.rollback()
137		} else {
138			// This should never happen due to check_active
139			unreachable!("Transaction state inconsistency")
140		}
141	}
142
143	/// Get access to the CDC transaction interface
144	pub fn cdc(&self) -> &TransactionCdc {
145		&self.cdc
146	}
147
148	/// Get access to the pending writes in this transaction
149	///
150	/// This allows checking for key conflicts when committing FlowTransactions
151	/// to ensure they operate on non-overlapping keyspaces.
152	pub fn pending_writes(&self) -> &PendingWrites {
153		self.cmd.as_ref().unwrap().pending_writes()
154	}
155
156	/// Execute a function with query access to the single transaction.
157	pub fn with_single_query<F, R>(&self, f: F) -> crate::Result<R>
158	where
159		F: FnOnce(&mut <TransactionSingleVersion as SingleVersionTransaction>::Query<'_>) -> crate::Result<R>,
160	{
161		self.check_active()?;
162		self.single.with_query(f)
163	}
164
165	/// Execute a function with query access to the single transaction.
166	pub fn with_single_command<F, R>(&self, f: F) -> crate::Result<R>
167	where
168		F: FnOnce(&mut <TransactionSingleVersion as SingleVersionTransaction>::Command<'_>) -> crate::Result<R>,
169	{
170		self.check_active()?;
171		self.single.with_command(f)
172	}
173
174	/// Execute a function with a query transaction view.
175	/// This creates a new query transaction using the stored multi-version storage.
176	/// The query transaction will operate independently but share the same single/CDC storage.
177	pub fn with_multi_query<F, R>(&self, f: F) -> crate::Result<R>
178	where
179		F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
180	{
181		self.check_active()?;
182
183		let mut query_txn = StandardQueryTransaction::new(
184			self.multi.begin_query()?,
185			self.single.clone(),
186			self.cdc.clone(),
187			self.catalog.clone(),
188		);
189
190		f(&mut query_txn)
191	}
192
193	pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
194	where
195		F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
196	{
197		self.check_active()?;
198
199		let mut query_txn = StandardQueryTransaction::new(
200			self.multi.begin_query()?,
201			self.single.clone(),
202			self.cdc.clone(),
203			self.catalog.clone(),
204		);
205
206		query_txn.read_as_of_version_exclusive(version)?;
207
208		f(&mut query_txn)
209	}
210
211	pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
212	where
213		F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
214	{
215		self.check_active()?;
216
217		let mut query_txn = StandardQueryTransaction::new(
218			self.multi.begin_query()?,
219			self.single.clone(),
220			self.cdc.clone(),
221			self.catalog.clone(),
222		);
223
224		query_txn.read_as_of_version_inclusive(version)?;
225
226		f(&mut query_txn)
227	}
228}
229
230impl MaterializedCatalogTransaction for StandardCommandTransaction {
231	fn catalog(&self) -> &MaterializedCatalog {
232		&self.catalog
233	}
234}
235
236impl MultiVersionQueryTransaction for StandardCommandTransaction {
237	#[inline]
238	fn version(&self) -> CommitVersion {
239		self.cmd.as_ref().unwrap().version()
240	}
241
242	#[inline]
243	fn id(&self) -> TransactionId {
244		self.cmd.as_ref().unwrap().id()
245	}
246
247	#[inline]
248	fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
249		self.check_active()?;
250		self.cmd.as_mut().unwrap().get(key)
251	}
252
253	#[inline]
254	fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
255		self.check_active()?;
256		self.cmd.as_mut().unwrap().contains_key(key)
257	}
258
259	#[inline]
260	fn range_batched(&mut self, range: EncodedKeyRange, batch_size: u64) -> crate::Result<BoxedMultiVersionIter> {
261		self.check_active()?;
262		self.cmd.as_mut().unwrap().range_batched(range, batch_size)
263	}
264
265	#[inline]
266	fn range_rev_batched(
267		&mut self,
268		range: EncodedKeyRange,
269		batch_size: u64,
270	) -> crate::Result<BoxedMultiVersionIter> {
271		self.check_active()?;
272		self.cmd.as_mut().unwrap().range_rev_batched(range, batch_size)
273	}
274
275	#[inline]
276	fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter> {
277		self.check_active()?;
278		self.cmd.as_mut().unwrap().prefix(prefix)
279	}
280
281	#[inline]
282	fn prefix_rev(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter> {
283		self.check_active()?;
284		self.cmd.as_mut().unwrap().prefix_rev(prefix)
285	}
286
287	#[inline]
288	fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
289		self.check_active()?;
290		self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version)
291	}
292}
293
294impl MultiVersionCommandTransaction for StandardCommandTransaction {
295	#[inline]
296	fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
297		self.check_active()?;
298		self.cmd.as_mut().unwrap().set(key, row)
299	}
300
301	#[inline]
302	fn remove(&mut self, key: &EncodedKey) -> crate::Result<()> {
303		self.check_active()?;
304		self.cmd.as_mut().unwrap().remove(key)
305	}
306
307	#[inline]
308	fn commit(mut self) -> crate::Result<CommitVersion> {
309		self.check_active()?;
310		self.state = TransactionState::Committed;
311		self.cmd.take().unwrap().commit()
312	}
313
314	#[inline]
315	fn rollback(mut self) -> crate::Result<()> {
316		self.check_active()?;
317		self.state = TransactionState::RolledBack;
318		self.cmd.take().unwrap().rollback()
319	}
320}
321
322impl WithEventBus for StandardCommandTransaction {
323	fn event_bus(&self) -> &EventBus {
324		&self.event_bus
325	}
326}
327
328impl QueryTransaction for StandardCommandTransaction {
329	type SingleVersionQuery<'a> = <TransactionSingleVersion as SingleVersionTransaction>::Query<'a>;
330
331	type CdcQuery<'a> = <TransactionCdc as CdcTransaction>::Query<'a>;
332
333	fn begin_single_query(&self) -> crate::Result<Self::SingleVersionQuery<'_>> {
334		self.check_active()?;
335		self.single.begin_query()
336	}
337
338	fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
339		self.check_active()?;
340		self.cdc.begin_query()
341	}
342}
343
344impl CommandTransaction for StandardCommandTransaction {
345	type SingleVersionCommand<'a> = <TransactionSingleVersion as SingleVersionTransaction>::Command<'a>;
346
347	fn begin_single_command(&self) -> crate::Result<Self::SingleVersionCommand<'_>> {
348		self.check_active()?;
349		self.single.begin_command()
350	}
351
352	fn get_changes(&self) -> &TransactionalDefChanges {
353		&self.changes
354	}
355}
356
357impl WithInterceptors<StandardCommandTransaction> for StandardCommandTransaction {
358	fn table_pre_insert_interceptors(
359		&mut self,
360	) -> &mut Chain<StandardCommandTransaction, dyn TablePreInsertInterceptor<StandardCommandTransaction>> {
361		&mut self.interceptors.table_pre_insert
362	}
363
364	fn table_post_insert_interceptors(
365		&mut self,
366	) -> &mut Chain<StandardCommandTransaction, dyn TablePostInsertInterceptor<StandardCommandTransaction>> {
367		&mut self.interceptors.table_post_insert
368	}
369
370	fn table_pre_update_interceptors(
371		&mut self,
372	) -> &mut Chain<StandardCommandTransaction, dyn TablePreUpdateInterceptor<StandardCommandTransaction>> {
373		&mut self.interceptors.table_pre_update
374	}
375
376	fn table_post_update_interceptors(
377		&mut self,
378	) -> &mut Chain<
379		StandardCommandTransaction,
380		dyn interceptor::TablePostUpdateInterceptor<StandardCommandTransaction>,
381	> {
382		&mut self.interceptors.table_post_update
383	}
384
385	fn table_pre_delete_interceptors(
386		&mut self,
387	) -> &mut Chain<StandardCommandTransaction, dyn TablePreDeleteInterceptor<StandardCommandTransaction>> {
388		&mut self.interceptors.table_pre_delete
389	}
390
391	fn table_post_delete_interceptors(
392		&mut self,
393	) -> &mut Chain<StandardCommandTransaction, dyn TablePostDeleteInterceptor<StandardCommandTransaction>> {
394		&mut self.interceptors.table_post_delete
395	}
396
397	fn ring_buffer_pre_insert_interceptors(
398		&mut self,
399	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreInsertInterceptor<StandardCommandTransaction>> {
400		&mut self.interceptors.ring_buffer_pre_insert
401	}
402
403	fn ring_buffer_post_insert_interceptors(
404		&mut self,
405	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostInsertInterceptor<StandardCommandTransaction>> {
406		&mut self.interceptors.ring_buffer_post_insert
407	}
408
409	fn ring_buffer_pre_update_interceptors(
410		&mut self,
411	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreUpdateInterceptor<StandardCommandTransaction>> {
412		&mut self.interceptors.ring_buffer_pre_update
413	}
414
415	fn ring_buffer_post_update_interceptors(
416		&mut self,
417	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostUpdateInterceptor<StandardCommandTransaction>> {
418		&mut self.interceptors.ring_buffer_post_update
419	}
420
421	fn ring_buffer_pre_delete_interceptors(
422		&mut self,
423	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreDeleteInterceptor<StandardCommandTransaction>> {
424		&mut self.interceptors.ring_buffer_pre_delete
425	}
426
427	fn ring_buffer_post_delete_interceptors(
428		&mut self,
429	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostDeleteInterceptor<StandardCommandTransaction>> {
430		&mut self.interceptors.ring_buffer_post_delete
431	}
432
433	fn pre_commit_interceptors(
434		&mut self,
435	) -> &mut Chain<StandardCommandTransaction, dyn PreCommitInterceptor<StandardCommandTransaction>> {
436		&mut self.interceptors.pre_commit
437	}
438
439	fn post_commit_interceptors(
440		&mut self,
441	) -> &mut Chain<StandardCommandTransaction, dyn PostCommitInterceptor<StandardCommandTransaction>> {
442		&mut self.interceptors.post_commit
443	}
444
445	// Namespace definition interceptors
446	fn namespace_def_post_create_interceptors(
447		&mut self,
448	) -> &mut Chain<
449		StandardCommandTransaction,
450		dyn interceptor::NamespaceDefPostCreateInterceptor<StandardCommandTransaction>,
451	> {
452		&mut self.interceptors.namespace_def_post_create
453	}
454
455	fn namespace_def_pre_update_interceptors(
456		&mut self,
457	) -> &mut Chain<
458		StandardCommandTransaction,
459		dyn interceptor::NamespaceDefPreUpdateInterceptor<StandardCommandTransaction>,
460	> {
461		&mut self.interceptors.namespace_def_pre_update
462	}
463
464	fn namespace_def_post_update_interceptors(
465		&mut self,
466	) -> &mut Chain<
467		StandardCommandTransaction,
468		dyn interceptor::NamespaceDefPostUpdateInterceptor<StandardCommandTransaction>,
469	> {
470		&mut self.interceptors.namespace_def_post_update
471	}
472
473	fn namespace_def_pre_delete_interceptors(
474		&mut self,
475	) -> &mut Chain<
476		StandardCommandTransaction,
477		dyn interceptor::NamespaceDefPreDeleteInterceptor<StandardCommandTransaction>,
478	> {
479		&mut self.interceptors.namespace_def_pre_delete
480	}
481
482	// Table definition interceptors
483	fn table_def_post_create_interceptors(
484		&mut self,
485	) -> &mut Chain<
486		StandardCommandTransaction,
487		dyn interceptor::TableDefPostCreateInterceptor<StandardCommandTransaction>,
488	> {
489		&mut self.interceptors.table_def_post_create
490	}
491
492	fn table_def_pre_update_interceptors(
493		&mut self,
494	) -> &mut Chain<
495		StandardCommandTransaction,
496		dyn interceptor::TableDefPreUpdateInterceptor<StandardCommandTransaction>,
497	> {
498		&mut self.interceptors.table_def_pre_update
499	}
500
501	fn table_def_post_update_interceptors(
502		&mut self,
503	) -> &mut Chain<
504		StandardCommandTransaction,
505		dyn interceptor::TableDefPostUpdateInterceptor<StandardCommandTransaction>,
506	> {
507		&mut self.interceptors.table_def_post_update
508	}
509
510	fn table_def_pre_delete_interceptors(
511		&mut self,
512	) -> &mut Chain<
513		StandardCommandTransaction,
514		dyn interceptor::TableDefPreDeleteInterceptor<StandardCommandTransaction>,
515	> {
516		&mut self.interceptors.table_def_pre_delete
517	}
518
519	// View definition interceptors
520	fn view_def_post_create_interceptors(
521		&mut self,
522	) -> &mut Chain<
523		StandardCommandTransaction,
524		dyn interceptor::ViewDefPostCreateInterceptor<StandardCommandTransaction>,
525	> {
526		&mut self.interceptors.view_def_post_create
527	}
528
529	fn view_def_pre_update_interceptors(
530		&mut self,
531	) -> &mut Chain<
532		StandardCommandTransaction,
533		dyn interceptor::ViewDefPreUpdateInterceptor<StandardCommandTransaction>,
534	> {
535		&mut self.interceptors.view_def_pre_update
536	}
537
538	fn view_def_post_update_interceptors(
539		&mut self,
540	) -> &mut Chain<
541		StandardCommandTransaction,
542		dyn interceptor::ViewDefPostUpdateInterceptor<StandardCommandTransaction>,
543	> {
544		&mut self.interceptors.view_def_post_update
545	}
546
547	fn view_def_pre_delete_interceptors(
548		&mut self,
549	) -> &mut Chain<
550		StandardCommandTransaction,
551		dyn interceptor::ViewDefPreDeleteInterceptor<StandardCommandTransaction>,
552	> {
553		&mut self.interceptors.view_def_pre_delete
554	}
555}
556
557impl TransactionalChanges for StandardCommandTransaction {}
558
559impl Drop for StandardCommandTransaction {
560	fn drop(&mut self) {
561		if let Some(multi) = self.cmd.take() {
562			// Auto-rollback if still active (not committed or
563			// rolled back)
564			if self.state == TransactionState::Active {
565				let _ = multi.rollback();
566			}
567		}
568	}
569}