reifydb_engine/transaction/
command.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::marker::PhantomData;
5
6use reifydb_catalog::{MaterializedCatalog, transaction::MaterializedCatalogTransaction};
7use reifydb_core::{
8	CommitVersion, EncodedKey, EncodedKeyRange,
9	diagnostic::transaction,
10	event::EventBus,
11	interceptor,
12	interceptor::{
13		Chain, Interceptors, PostCommitInterceptor, PreCommitInterceptor, RingBufferPostDeleteInterceptor,
14		RingBufferPostInsertInterceptor, RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
15		RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor, TablePostDeleteInterceptor,
16		TablePostInsertInterceptor, TablePreDeleteInterceptor, TablePreInsertInterceptor,
17		TablePreUpdateInterceptor,
18	},
19	interface::{
20		BoxedMultiVersionIter, CdcTransaction, CommandTransaction, MultiVersionCommandTransaction,
21		MultiVersionQueryTransaction, MultiVersionTransaction, MultiVersionValues, QueryTransaction,
22		SingleVersionTransaction, TransactionId, TransactionalChanges, TransactionalDefChanges, WithEventBus,
23		interceptor::{TransactionInterceptor, WithInterceptors},
24	},
25	return_error,
26	value::encoded::EncodedValues,
27};
28use reifydb_transaction::{
29	cdc::TransactionCdc,
30	multi::{TransactionMultiVersion, pending::PendingWrites},
31	single::TransactionSingleVersion,
32};
33
34use crate::transaction::query::StandardQueryTransaction;
35
36/// An active command transaction that holds a multi command transaction
37/// and provides query/command access to single storage.
38///
39/// The transaction will auto-rollback on drop if not explicitly committed.
40pub struct StandardCommandTransaction {
41	pub multi: TransactionMultiVersion,
42	pub single: TransactionSingleVersion,
43	pub(crate) cdc: TransactionCdc,
44	state: TransactionState,
45
46	pub(crate) cmd: Option<<TransactionMultiVersion as MultiVersionTransaction>::Command>,
47	pub(crate) event_bus: EventBus,
48	pub(crate) changes: TransactionalDefChanges,
49	pub(crate) catalog: MaterializedCatalog,
50
51	pub(crate) interceptors: Interceptors<Self>,
52	// Marker to prevent Send and Sync
53	_not_send_sync: PhantomData<*const ()>,
54}
55
56#[derive(Clone, Copy, PartialEq)]
57enum TransactionState {
58	Active,
59	Committed,
60	RolledBack,
61}
62
63impl StandardCommandTransaction {
64	/// Creates a new active command transaction with a pre-commit callback
65	pub fn new(
66		multi: TransactionMultiVersion,
67		single: TransactionSingleVersion,
68		cdc: TransactionCdc,
69		event_bus: EventBus,
70		catalog: MaterializedCatalog,
71		interceptors: Interceptors<Self>,
72	) -> reifydb_core::Result<Self> {
73		let cmd = multi.begin_command()?;
74		let txn_id = cmd.id();
75		Ok(Self {
76			cmd: Some(cmd),
77			multi,
78			single,
79			cdc,
80			state: TransactionState::Active,
81			event_bus,
82			catalog,
83			interceptors,
84			changes: TransactionalDefChanges::new(txn_id),
85			_not_send_sync: PhantomData,
86		})
87	}
88
89	pub fn event_bus(&self) -> &EventBus {
90		&self.event_bus
91	}
92
93	/// Check if transaction is still active and return appropriate error if
94	/// not
95	fn check_active(&self) -> crate::Result<()> {
96		match self.state {
97			TransactionState::Active => Ok(()),
98			TransactionState::Committed => {
99				return_error!(transaction::transaction_already_committed())
100			}
101			TransactionState::RolledBack => {
102				return_error!(transaction::transaction_already_rolled_back())
103			}
104		}
105	}
106
107	/// Commit the transaction.
108	/// Since single transactions are short-lived and auto-commit,
109	/// this only commits the multi transaction.
110	pub fn commit(&mut self) -> crate::Result<CommitVersion> {
111		self.check_active()?;
112
113		TransactionInterceptor::pre_commit(self)?;
114
115		if let Some(multi) = self.cmd.take() {
116			let id = multi.id();
117			self.state = TransactionState::Committed;
118
119			let changes = std::mem::take(&mut self.changes);
120
121			let version = multi.commit()?;
122			TransactionInterceptor::post_commit(self, id, version, changes)?;
123
124			Ok(version)
125		} else {
126			// This should never happen due to check_active
127			unreachable!("Transaction state inconsistency")
128		}
129	}
130
131	/// Rollback the transaction.
132	pub fn rollback(&mut self) -> crate::Result<()> {
133		self.check_active()?;
134		if let Some(multi) = self.cmd.take() {
135			self.state = TransactionState::RolledBack;
136			multi.rollback()
137		} else {
138			// This should never happen due to check_active
139			unreachable!("Transaction state inconsistency")
140		}
141	}
142
143	/// Get access to the CDC transaction interface
144	pub fn cdc(&self) -> &TransactionCdc {
145		&self.cdc
146	}
147
148	/// Get access to the pending writes in this transaction
149	///
150	/// This allows checking for key conflicts when committing FlowTransactions
151	/// to ensure they operate on non-overlapping keyspaces.
152	pub fn pending_writes(&self) -> &PendingWrites {
153		self.cmd.as_ref().unwrap().pending_writes()
154	}
155
156	/// Execute a function with query access to the single transaction.
157	pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
158	where
159		I: IntoIterator<Item = &'a EncodedKey>,
160		F: FnOnce(&mut <TransactionSingleVersion as SingleVersionTransaction>::Query<'_>) -> crate::Result<R>,
161	{
162		self.check_active()?;
163		self.single.with_query(keys, f)
164	}
165
166	/// Execute a function with query access to the single transaction.
167	pub fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
168	where
169		I: IntoIterator<Item = &'a EncodedKey>,
170		F: FnOnce(&mut <TransactionSingleVersion as SingleVersionTransaction>::Command<'_>) -> crate::Result<R>,
171	{
172		self.check_active()?;
173		self.single.with_command(keys, f)
174	}
175
176	/// Execute a function with a query transaction view.
177	/// This creates a new query transaction using the stored multi-version storage.
178	/// The query transaction will operate independently but share the same single/CDC storage.
179	pub fn with_multi_query<F, R>(&self, f: F) -> crate::Result<R>
180	where
181		F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
182	{
183		self.check_active()?;
184
185		let mut query_txn = StandardQueryTransaction::new(
186			self.multi.begin_query()?,
187			self.single.clone(),
188			self.cdc.clone(),
189			self.catalog.clone(),
190		);
191
192		f(&mut query_txn)
193	}
194
195	pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
196	where
197		F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
198	{
199		self.check_active()?;
200
201		let mut query_txn = StandardQueryTransaction::new(
202			self.multi.begin_query()?,
203			self.single.clone(),
204			self.cdc.clone(),
205			self.catalog.clone(),
206		);
207
208		query_txn.read_as_of_version_exclusive(version)?;
209
210		f(&mut query_txn)
211	}
212
213	pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
214	where
215		F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
216	{
217		self.check_active()?;
218
219		let mut query_txn = StandardQueryTransaction::new(
220			self.multi.begin_query()?,
221			self.single.clone(),
222			self.cdc.clone(),
223			self.catalog.clone(),
224		);
225
226		query_txn.read_as_of_version_inclusive(version)?;
227
228		f(&mut query_txn)
229	}
230}
231
232impl MaterializedCatalogTransaction for StandardCommandTransaction {
233	fn catalog(&self) -> &MaterializedCatalog {
234		&self.catalog
235	}
236}
237
238impl MultiVersionQueryTransaction for StandardCommandTransaction {
239	#[inline]
240	fn version(&self) -> CommitVersion {
241		self.cmd.as_ref().unwrap().version()
242	}
243
244	#[inline]
245	fn id(&self) -> TransactionId {
246		self.cmd.as_ref().unwrap().id()
247	}
248
249	#[inline]
250	fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
251		self.check_active()?;
252		self.cmd.as_mut().unwrap().get(key)
253	}
254
255	#[inline]
256	fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
257		self.check_active()?;
258		self.cmd.as_mut().unwrap().contains_key(key)
259	}
260
261	#[inline]
262	fn range_batched(&mut self, range: EncodedKeyRange, batch_size: u64) -> crate::Result<BoxedMultiVersionIter> {
263		self.check_active()?;
264		self.cmd.as_mut().unwrap().range_batched(range, batch_size)
265	}
266
267	#[inline]
268	fn range_rev_batched(
269		&mut self,
270		range: EncodedKeyRange,
271		batch_size: u64,
272	) -> crate::Result<BoxedMultiVersionIter> {
273		self.check_active()?;
274		self.cmd.as_mut().unwrap().range_rev_batched(range, batch_size)
275	}
276
277	#[inline]
278	fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter> {
279		self.check_active()?;
280		self.cmd.as_mut().unwrap().prefix(prefix)
281	}
282
283	#[inline]
284	fn prefix_rev(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter> {
285		self.check_active()?;
286		self.cmd.as_mut().unwrap().prefix_rev(prefix)
287	}
288
289	#[inline]
290	fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
291		self.check_active()?;
292		self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version)
293	}
294}
295
296impl MultiVersionCommandTransaction for StandardCommandTransaction {
297	#[inline]
298	fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
299		self.check_active()?;
300		self.cmd.as_mut().unwrap().set(key, row)
301	}
302
303	#[inline]
304	fn remove(&mut self, key: &EncodedKey) -> crate::Result<()> {
305		self.check_active()?;
306		self.cmd.as_mut().unwrap().remove(key)
307	}
308
309	#[inline]
310	fn commit(mut self) -> crate::Result<CommitVersion> {
311		self.check_active()?;
312		self.state = TransactionState::Committed;
313		self.cmd.take().unwrap().commit()
314	}
315
316	#[inline]
317	fn rollback(mut self) -> crate::Result<()> {
318		self.check_active()?;
319		self.state = TransactionState::RolledBack;
320		self.cmd.take().unwrap().rollback()
321	}
322}
323
324impl WithEventBus for StandardCommandTransaction {
325	fn event_bus(&self) -> &EventBus {
326		&self.event_bus
327	}
328}
329
330impl QueryTransaction for StandardCommandTransaction {
331	type SingleVersionQuery<'a> = <TransactionSingleVersion as SingleVersionTransaction>::Query<'a>;
332
333	type CdcQuery<'a> = <TransactionCdc as CdcTransaction>::Query<'a>;
334
335	fn begin_single_query<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionQuery<'_>>
336	where
337		I: IntoIterator<Item = &'a EncodedKey>,
338	{
339		self.check_active()?;
340		self.single.begin_query(keys)
341	}
342
343	fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
344		self.check_active()?;
345		self.cdc.begin_query()
346	}
347}
348
349impl CommandTransaction for StandardCommandTransaction {
350	type SingleVersionCommand<'a> = <TransactionSingleVersion as SingleVersionTransaction>::Command<'a>;
351
352	fn begin_single_command<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionCommand<'_>>
353	where
354		I: IntoIterator<Item = &'a EncodedKey>,
355	{
356		self.check_active()?;
357		self.single.begin_command(keys)
358	}
359
360	fn get_changes(&self) -> &TransactionalDefChanges {
361		&self.changes
362	}
363}
364
365impl WithInterceptors<StandardCommandTransaction> for StandardCommandTransaction {
366	fn table_pre_insert_interceptors(
367		&mut self,
368	) -> &mut Chain<StandardCommandTransaction, dyn TablePreInsertInterceptor<StandardCommandTransaction>> {
369		&mut self.interceptors.table_pre_insert
370	}
371
372	fn table_post_insert_interceptors(
373		&mut self,
374	) -> &mut Chain<StandardCommandTransaction, dyn TablePostInsertInterceptor<StandardCommandTransaction>> {
375		&mut self.interceptors.table_post_insert
376	}
377
378	fn table_pre_update_interceptors(
379		&mut self,
380	) -> &mut Chain<StandardCommandTransaction, dyn TablePreUpdateInterceptor<StandardCommandTransaction>> {
381		&mut self.interceptors.table_pre_update
382	}
383
384	fn table_post_update_interceptors(
385		&mut self,
386	) -> &mut Chain<
387		StandardCommandTransaction,
388		dyn interceptor::TablePostUpdateInterceptor<StandardCommandTransaction>,
389	> {
390		&mut self.interceptors.table_post_update
391	}
392
393	fn table_pre_delete_interceptors(
394		&mut self,
395	) -> &mut Chain<StandardCommandTransaction, dyn TablePreDeleteInterceptor<StandardCommandTransaction>> {
396		&mut self.interceptors.table_pre_delete
397	}
398
399	fn table_post_delete_interceptors(
400		&mut self,
401	) -> &mut Chain<StandardCommandTransaction, dyn TablePostDeleteInterceptor<StandardCommandTransaction>> {
402		&mut self.interceptors.table_post_delete
403	}
404
405	fn ring_buffer_pre_insert_interceptors(
406		&mut self,
407	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreInsertInterceptor<StandardCommandTransaction>> {
408		&mut self.interceptors.ring_buffer_pre_insert
409	}
410
411	fn ring_buffer_post_insert_interceptors(
412		&mut self,
413	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostInsertInterceptor<StandardCommandTransaction>> {
414		&mut self.interceptors.ring_buffer_post_insert
415	}
416
417	fn ring_buffer_pre_update_interceptors(
418		&mut self,
419	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreUpdateInterceptor<StandardCommandTransaction>> {
420		&mut self.interceptors.ring_buffer_pre_update
421	}
422
423	fn ring_buffer_post_update_interceptors(
424		&mut self,
425	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostUpdateInterceptor<StandardCommandTransaction>> {
426		&mut self.interceptors.ring_buffer_post_update
427	}
428
429	fn ring_buffer_pre_delete_interceptors(
430		&mut self,
431	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreDeleteInterceptor<StandardCommandTransaction>> {
432		&mut self.interceptors.ring_buffer_pre_delete
433	}
434
435	fn ring_buffer_post_delete_interceptors(
436		&mut self,
437	) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostDeleteInterceptor<StandardCommandTransaction>> {
438		&mut self.interceptors.ring_buffer_post_delete
439	}
440
441	fn pre_commit_interceptors(
442		&mut self,
443	) -> &mut Chain<StandardCommandTransaction, dyn PreCommitInterceptor<StandardCommandTransaction>> {
444		&mut self.interceptors.pre_commit
445	}
446
447	fn post_commit_interceptors(
448		&mut self,
449	) -> &mut Chain<StandardCommandTransaction, dyn PostCommitInterceptor<StandardCommandTransaction>> {
450		&mut self.interceptors.post_commit
451	}
452
453	// Namespace definition interceptors
454	fn namespace_def_post_create_interceptors(
455		&mut self,
456	) -> &mut Chain<
457		StandardCommandTransaction,
458		dyn interceptor::NamespaceDefPostCreateInterceptor<StandardCommandTransaction>,
459	> {
460		&mut self.interceptors.namespace_def_post_create
461	}
462
463	fn namespace_def_pre_update_interceptors(
464		&mut self,
465	) -> &mut Chain<
466		StandardCommandTransaction,
467		dyn interceptor::NamespaceDefPreUpdateInterceptor<StandardCommandTransaction>,
468	> {
469		&mut self.interceptors.namespace_def_pre_update
470	}
471
472	fn namespace_def_post_update_interceptors(
473		&mut self,
474	) -> &mut Chain<
475		StandardCommandTransaction,
476		dyn interceptor::NamespaceDefPostUpdateInterceptor<StandardCommandTransaction>,
477	> {
478		&mut self.interceptors.namespace_def_post_update
479	}
480
481	fn namespace_def_pre_delete_interceptors(
482		&mut self,
483	) -> &mut Chain<
484		StandardCommandTransaction,
485		dyn interceptor::NamespaceDefPreDeleteInterceptor<StandardCommandTransaction>,
486	> {
487		&mut self.interceptors.namespace_def_pre_delete
488	}
489
490	// Table definition interceptors
491	fn table_def_post_create_interceptors(
492		&mut self,
493	) -> &mut Chain<
494		StandardCommandTransaction,
495		dyn interceptor::TableDefPostCreateInterceptor<StandardCommandTransaction>,
496	> {
497		&mut self.interceptors.table_def_post_create
498	}
499
500	fn table_def_pre_update_interceptors(
501		&mut self,
502	) -> &mut Chain<
503		StandardCommandTransaction,
504		dyn interceptor::TableDefPreUpdateInterceptor<StandardCommandTransaction>,
505	> {
506		&mut self.interceptors.table_def_pre_update
507	}
508
509	fn table_def_post_update_interceptors(
510		&mut self,
511	) -> &mut Chain<
512		StandardCommandTransaction,
513		dyn interceptor::TableDefPostUpdateInterceptor<StandardCommandTransaction>,
514	> {
515		&mut self.interceptors.table_def_post_update
516	}
517
518	fn table_def_pre_delete_interceptors(
519		&mut self,
520	) -> &mut Chain<
521		StandardCommandTransaction,
522		dyn interceptor::TableDefPreDeleteInterceptor<StandardCommandTransaction>,
523	> {
524		&mut self.interceptors.table_def_pre_delete
525	}
526
527	// View definition interceptors
528	fn view_def_post_create_interceptors(
529		&mut self,
530	) -> &mut Chain<
531		StandardCommandTransaction,
532		dyn interceptor::ViewDefPostCreateInterceptor<StandardCommandTransaction>,
533	> {
534		&mut self.interceptors.view_def_post_create
535	}
536
537	fn view_def_pre_update_interceptors(
538		&mut self,
539	) -> &mut Chain<
540		StandardCommandTransaction,
541		dyn interceptor::ViewDefPreUpdateInterceptor<StandardCommandTransaction>,
542	> {
543		&mut self.interceptors.view_def_pre_update
544	}
545
546	fn view_def_post_update_interceptors(
547		&mut self,
548	) -> &mut Chain<
549		StandardCommandTransaction,
550		dyn interceptor::ViewDefPostUpdateInterceptor<StandardCommandTransaction>,
551	> {
552		&mut self.interceptors.view_def_post_update
553	}
554
555	fn view_def_pre_delete_interceptors(
556		&mut self,
557	) -> &mut Chain<
558		StandardCommandTransaction,
559		dyn interceptor::ViewDefPreDeleteInterceptor<StandardCommandTransaction>,
560	> {
561		&mut self.interceptors.view_def_pre_delete
562	}
563}
564
565impl TransactionalChanges for StandardCommandTransaction {}
566
567impl Drop for StandardCommandTransaction {
568	fn drop(&mut self) {
569		if let Some(multi) = self.cmd.take() {
570			// Auto-rollback if still active (not committed or
571			// rolled back)
572			if self.state == TransactionState::Active {
573				let _ = multi.rollback();
574			}
575		}
576	}
577}