reifydb_engine/transaction/
command.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use async_trait::async_trait;
5use reifydb_catalog::{MaterializedCatalog, transaction::MaterializedCatalogTransaction};
6use reifydb_core::{
7	CommitVersion, EncodedKey, EncodedKeyRange,
8	diagnostic::transaction,
9	event::EventBus,
10	interceptor,
11	interceptor::{
12		Chain, Interceptors, PostCommitInterceptor, PreCommitInterceptor, RingBufferPostDeleteInterceptor,
13		RingBufferPostInsertInterceptor, RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
14		RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor, TablePostDeleteInterceptor,
15		TablePostInsertInterceptor, TablePreDeleteInterceptor, TablePreInsertInterceptor,
16		TablePreUpdateInterceptor,
17	},
18	interface::{
19		CdcTransaction, CommandTransaction, MultiVersionBatch, MultiVersionCommandTransaction,
20		MultiVersionQueryTransaction, MultiVersionTransaction, MultiVersionValues, QueryTransaction, RowChange,
21		SingleVersionTransaction, TransactionId, TransactionalChanges, TransactionalDefChanges, WithEventBus,
22		interceptor::{TransactionInterceptor, WithInterceptors},
23	},
24	return_error,
25	value::encoded::EncodedValues,
26};
27use reifydb_transaction::{
28	cdc::TransactionCdc,
29	multi::{TransactionMultiVersion, pending::PendingWrites},
30	single::TransactionSingle,
31};
32use tracing::instrument;
33
34use crate::transaction::query::StandardQueryTransaction;
35
36/// An active command transaction that holds a multi command transaction
37/// and provides query/command access to single storage.
38///
39/// The transaction will auto-rollback on drop if not explicitly committed.
40pub struct StandardCommandTransaction {
41	pub multi: TransactionMultiVersion,
42	pub single: TransactionSingle,
43	pub(crate) cdc: TransactionCdc,
44	state: TransactionState,
45
46	pub(crate) cmd: Option<<TransactionMultiVersion as MultiVersionTransaction>::Command>,
47	pub(crate) event_bus: EventBus,
48	pub(crate) changes: TransactionalDefChanges,
49	pub(crate) catalog: MaterializedCatalog,
50
51	// Track row changes for post-commit events
52	pub(crate) row_changes: Vec<RowChange>,
53
54	pub(crate) interceptors: Interceptors<Self>,
55}
56
57#[derive(Clone, Copy, PartialEq)]
58enum TransactionState {
59	Active,
60	Committed,
61	RolledBack,
62}
63
64impl StandardCommandTransaction {
65	/// Creates a new active command transaction with a pre-commit callback
66	#[instrument(name = "engine::transaction::command::new", level = "debug", skip_all)]
67	pub async fn new(
68		multi: TransactionMultiVersion,
69		single: TransactionSingle,
70		cdc: TransactionCdc,
71		event_bus: EventBus,
72		catalog: MaterializedCatalog,
73		interceptors: Interceptors<Self>,
74	) -> reifydb_core::Result<Self> {
75		let cmd = multi.begin_command().await?;
76		let txn_id = cmd.id();
77		Ok(Self {
78			cmd: Some(cmd),
79			multi,
80			single,
81			cdc,
82			state: TransactionState::Active,
83			event_bus,
84			catalog,
85			interceptors,
86			changes: TransactionalDefChanges::new(txn_id),
87			row_changes: Vec::new(),
88		})
89	}
90
91	#[instrument(name = "engine::transaction::command::event_bus", level = "trace", skip(self))]
92	pub fn event_bus(&self) -> &EventBus {
93		&self.event_bus
94	}
95
96	/// Check if transaction is still active and return appropriate error if
97	/// not
98	fn check_active(&self) -> crate::Result<()> {
99		match self.state {
100			TransactionState::Active => Ok(()),
101			TransactionState::Committed => {
102				return_error!(transaction::transaction_already_committed())
103			}
104			TransactionState::RolledBack => {
105				return_error!(transaction::transaction_already_rolled_back())
106			}
107		}
108	}
109
110	/// Commit the transaction.
111	/// Since single transactions are short-lived and auto-commit,
112	/// this only commits the multi transaction.
113	#[instrument(name = "engine::transaction::command::commit", level = "debug", skip(self))]
114	pub async fn commit(&mut self) -> crate::Result<CommitVersion> {
115		self.check_active()?;
116
117		TransactionInterceptor::pre_commit(self).await?;
118
119		if let Some(mut multi) = self.cmd.take() {
120			let id = multi.id();
121			self.state = TransactionState::Committed;
122
123			let changes = std::mem::take(&mut self.changes);
124			let row_changes = std::mem::take(&mut self.row_changes);
125
126			let version = multi.commit().await?;
127			TransactionInterceptor::post_commit(self, id, version, changes, row_changes).await?;
128
129			Ok(version)
130		} else {
131			// This should never happen due to check_active
132			unreachable!("Transaction state inconsistency")
133		}
134	}
135
136	/// Rollback the transaction.
137	#[instrument(name = "engine::transaction::command::rollback", level = "debug", skip(self))]
138	pub fn rollback(&mut self) -> crate::Result<()> {
139		self.check_active()?;
140		if let Some(mut multi) = self.cmd.take() {
141			self.state = TransactionState::RolledBack;
142			multi.rollback()
143		} else {
144			// This should never happen due to check_active
145			unreachable!("Transaction state inconsistency")
146		}
147	}
148
149	/// Get access to the CDC transaction interface
150	#[instrument(name = "engine::transaction::command::cdc", level = "trace", skip(self))]
151	pub fn cdc(&self) -> &TransactionCdc {
152		&self.cdc
153	}
154
155	/// Get access to the pending writes in this transaction
156	///
157	/// This allows checking for key conflicts when committing FlowTransactions
158	/// to ensure they operate on non-overlapping keyspaces.
159	#[instrument(name = "engine::transaction::command::pending_writes", level = "trace", skip(self))]
160	pub fn pending_writes(&self) -> &PendingWrites {
161		self.cmd.as_ref().unwrap().pending_writes()
162	}
163
164	/// Execute a function with query access to the single transaction.
165	#[instrument(name = "engine::transaction::command::with_single_query", level = "trace", skip(self, keys, f))]
166	pub async fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
167	where
168		I: IntoIterator<Item = &'a EncodedKey> + Send,
169		F: FnOnce(&mut <TransactionSingle as SingleVersionTransaction>::Query<'_>) -> crate::Result<R> + Send,
170		R: Send,
171	{
172		self.check_active()?;
173		self.single.with_query(keys, f).await
174	}
175
176	/// Execute a function with query access to the single transaction.
177	#[instrument(name = "engine::transaction::command::with_single_command", level = "trace", skip(self, keys, f))]
178	pub async fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
179	where
180		I: IntoIterator<Item = &'a EncodedKey> + Send,
181		F: FnOnce(&mut <TransactionSingle as SingleVersionTransaction>::Command<'_>) -> crate::Result<R> + Send,
182		R: Send,
183	{
184		self.check_active()?;
185		self.single.with_command(keys, f).await
186	}
187
188	/// Execute a function with a query transaction view.
189	/// This creates a new query transaction using the stored multi-version storage.
190	/// The query transaction will operate independently but share the same single/CDC storage.
191	#[instrument(name = "engine::transaction::command::with_multi_query", level = "trace", skip(self, f))]
192	pub async fn with_multi_query<F, R>(&self, f: F) -> crate::Result<R>
193	where
194		F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
195	{
196		self.check_active()?;
197
198		let mut query_txn = StandardQueryTransaction::new(
199			self.multi.begin_query().await?,
200			self.single.clone(),
201			self.cdc.clone(),
202			self.catalog.clone(),
203		);
204
205		f(&mut query_txn)
206	}
207
208	#[instrument(
209		name = "engine::transaction::command::with_multi_query_as_of_exclusive",
210		level = "trace",
211		skip(self, f)
212	)]
213	pub async fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
214	where
215		F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
216	{
217		self.check_active()?;
218
219		let mut query_txn = StandardQueryTransaction::new(
220			self.multi.begin_query().await?,
221			self.single.clone(),
222			self.cdc.clone(),
223			self.catalog.clone(),
224		);
225
226		query_txn.read_as_of_version_exclusive(version).await?;
227
228		f(&mut query_txn)
229	}
230
231	#[instrument(
232		name = "engine::transaction::command::with_multi_query_as_of_inclusive",
233		level = "trace",
234		skip(self, f)
235	)]
236	pub async fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
237	where
238		F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
239	{
240		self.check_active()?;
241
242		let mut query_txn = StandardQueryTransaction::new(
243			self.multi.begin_query().await?,
244			self.single.clone(),
245			self.cdc.clone(),
246			self.catalog.clone(),
247		);
248
249		query_txn.read_as_of_version_inclusive(version).await?;
250
251		f(&mut query_txn)
252	}
253}
254
255impl MaterializedCatalogTransaction for StandardCommandTransaction {
256	fn catalog(&self) -> &MaterializedCatalog {
257		&self.catalog
258	}
259}
260
261#[async_trait]
262impl MultiVersionQueryTransaction for StandardCommandTransaction {
263	#[inline]
264	fn version(&self) -> CommitVersion {
265		MultiVersionQueryTransaction::version(self.cmd.as_ref().unwrap())
266	}
267
268	#[inline]
269	fn id(&self) -> TransactionId {
270		MultiVersionQueryTransaction::id(self.cmd.as_ref().unwrap())
271	}
272
273	#[inline]
274	async fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
275		self.check_active()?;
276		MultiVersionQueryTransaction::get(self.cmd.as_mut().unwrap(), key).await
277	}
278
279	#[inline]
280	async fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
281		self.check_active()?;
282		MultiVersionQueryTransaction::contains_key(self.cmd.as_mut().unwrap(), key).await
283	}
284
285	#[inline]
286	async fn range_batch(&mut self, range: EncodedKeyRange, batch_size: u64) -> crate::Result<MultiVersionBatch> {
287		self.check_active()?;
288		MultiVersionQueryTransaction::range_batch(self.cmd.as_mut().unwrap(), range, batch_size).await
289	}
290
291	#[inline]
292	async fn range_rev_batch(
293		&mut self,
294		range: EncodedKeyRange,
295		batch_size: u64,
296	) -> crate::Result<MultiVersionBatch> {
297		self.check_active()?;
298		MultiVersionQueryTransaction::range_rev_batch(self.cmd.as_mut().unwrap(), range, batch_size).await
299	}
300
301	#[inline]
302	async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
303		self.check_active()?;
304		MultiVersionQueryTransaction::read_as_of_version_exclusive(self.cmd.as_mut().unwrap(), version).await
305	}
306}
307
308#[async_trait]
309impl MultiVersionCommandTransaction for StandardCommandTransaction {
310	#[inline]
311	async fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
312		self.check_active()?;
313		MultiVersionCommandTransaction::set(self.cmd.as_mut().unwrap(), key, row).await
314	}
315
316	#[inline]
317	async fn remove(&mut self, key: &EncodedKey) -> crate::Result<()> {
318		self.check_active()?;
319		MultiVersionCommandTransaction::remove(self.cmd.as_mut().unwrap(), key).await
320	}
321
322	#[inline]
323	async fn commit(&mut self) -> crate::Result<CommitVersion> {
324		self.check_active()?;
325		let result = MultiVersionCommandTransaction::commit(self.cmd.as_mut().unwrap()).await;
326		if result.is_ok() {
327			self.state = TransactionState::Committed;
328		}
329		result
330	}
331
332	#[inline]
333	async fn rollback(&mut self) -> crate::Result<()> {
334		self.check_active()?;
335		let result = MultiVersionCommandTransaction::rollback(self.cmd.as_mut().unwrap()).await;
336		if result.is_ok() {
337			self.state = TransactionState::RolledBack;
338		}
339		result
340	}
341}
342
343impl WithEventBus for StandardCommandTransaction {
344	fn event_bus(&self) -> &EventBus {
345		&self.event_bus
346	}
347}
348
349#[async_trait]
350impl QueryTransaction for StandardCommandTransaction {
351	type SingleVersionQuery<'a> = <TransactionSingle as SingleVersionTransaction>::Query<'a>;
352
353	type CdcQuery<'a> = <TransactionCdc as CdcTransaction>::Query<'a>;
354
355	async fn begin_single_query<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionQuery<'_>>
356	where
357		I: IntoIterator<Item = &'a EncodedKey> + Send,
358	{
359		self.check_active()?;
360		self.single.begin_query(keys).await
361	}
362
363	async fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
364		self.check_active()?;
365		Ok(self.cdc.begin_query()?)
366	}
367}
368
369#[async_trait]
370impl CommandTransaction for StandardCommandTransaction {
371	type SingleVersionCommand<'a> = <TransactionSingle as SingleVersionTransaction>::Command<'a>;
372
373	async fn begin_single_command<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionCommand<'_>>
374	where
375		I: IntoIterator<Item = &'a EncodedKey> + Send,
376	{
377		self.check_active()?;
378		self.single.begin_command(keys).await
379	}
380
381	fn get_changes(&self) -> &TransactionalDefChanges {
382		&self.changes
383	}
384}
385
386impl WithInterceptors<StandardCommandTransaction> for StandardCommandTransaction {
387	fn table_pre_insert_interceptors(
388		&mut self,
389	) -> &mut Chain<
390		StandardCommandTransaction,
391		dyn TablePreInsertInterceptor<StandardCommandTransaction> + Send + Sync,
392	> {
393		&mut self.interceptors.table_pre_insert
394	}
395
396	fn table_post_insert_interceptors(
397		&mut self,
398	) -> &mut Chain<
399		StandardCommandTransaction,
400		dyn TablePostInsertInterceptor<StandardCommandTransaction> + Send + Sync,
401	> {
402		&mut self.interceptors.table_post_insert
403	}
404
405	fn table_pre_update_interceptors(
406		&mut self,
407	) -> &mut Chain<
408		StandardCommandTransaction,
409		dyn TablePreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
410	> {
411		&mut self.interceptors.table_pre_update
412	}
413
414	fn table_post_update_interceptors(
415		&mut self,
416	) -> &mut Chain<
417		StandardCommandTransaction,
418		dyn interceptor::TablePostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
419	> {
420		&mut self.interceptors.table_post_update
421	}
422
423	fn table_pre_delete_interceptors(
424		&mut self,
425	) -> &mut Chain<
426		StandardCommandTransaction,
427		dyn TablePreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
428	> {
429		&mut self.interceptors.table_pre_delete
430	}
431
432	fn table_post_delete_interceptors(
433		&mut self,
434	) -> &mut Chain<
435		StandardCommandTransaction,
436		dyn TablePostDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
437	> {
438		&mut self.interceptors.table_post_delete
439	}
440
441	fn ringbuffer_pre_insert_interceptors(
442		&mut self,
443	) -> &mut Chain<
444		StandardCommandTransaction,
445		dyn RingBufferPreInsertInterceptor<StandardCommandTransaction> + Send + Sync,
446	> {
447		&mut self.interceptors.ringbuffer_pre_insert
448	}
449
450	fn ringbuffer_post_insert_interceptors(
451		&mut self,
452	) -> &mut Chain<
453		StandardCommandTransaction,
454		dyn RingBufferPostInsertInterceptor<StandardCommandTransaction> + Send + Sync,
455	> {
456		&mut self.interceptors.ringbuffer_post_insert
457	}
458
459	fn ringbuffer_pre_update_interceptors(
460		&mut self,
461	) -> &mut Chain<
462		StandardCommandTransaction,
463		dyn RingBufferPreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
464	> {
465		&mut self.interceptors.ringbuffer_pre_update
466	}
467
468	fn ringbuffer_post_update_interceptors(
469		&mut self,
470	) -> &mut Chain<
471		StandardCommandTransaction,
472		dyn RingBufferPostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
473	> {
474		&mut self.interceptors.ringbuffer_post_update
475	}
476
477	fn ringbuffer_pre_delete_interceptors(
478		&mut self,
479	) -> &mut Chain<
480		StandardCommandTransaction,
481		dyn RingBufferPreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
482	> {
483		&mut self.interceptors.ringbuffer_pre_delete
484	}
485
486	fn ringbuffer_post_delete_interceptors(
487		&mut self,
488	) -> &mut Chain<
489		StandardCommandTransaction,
490		dyn RingBufferPostDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
491	> {
492		&mut self.interceptors.ringbuffer_post_delete
493	}
494
495	fn pre_commit_interceptors(
496		&mut self,
497	) -> &mut Chain<StandardCommandTransaction, dyn PreCommitInterceptor<StandardCommandTransaction> + Send + Sync>
498	{
499		&mut self.interceptors.pre_commit
500	}
501
502	fn post_commit_interceptors(
503		&mut self,
504	) -> &mut Chain<StandardCommandTransaction, dyn PostCommitInterceptor<StandardCommandTransaction> + Send + Sync>
505	{
506		&mut self.interceptors.post_commit
507	}
508
509	// Namespace definition interceptors
510	fn namespace_def_post_create_interceptors(
511		&mut self,
512	) -> &mut Chain<
513		StandardCommandTransaction,
514		dyn interceptor::NamespaceDefPostCreateInterceptor<StandardCommandTransaction> + Send + Sync,
515	> {
516		&mut self.interceptors.namespace_def_post_create
517	}
518
519	fn namespace_def_pre_update_interceptors(
520		&mut self,
521	) -> &mut Chain<
522		StandardCommandTransaction,
523		dyn interceptor::NamespaceDefPreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
524	> {
525		&mut self.interceptors.namespace_def_pre_update
526	}
527
528	fn namespace_def_post_update_interceptors(
529		&mut self,
530	) -> &mut Chain<
531		StandardCommandTransaction,
532		dyn interceptor::NamespaceDefPostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
533	> {
534		&mut self.interceptors.namespace_def_post_update
535	}
536
537	fn namespace_def_pre_delete_interceptors(
538		&mut self,
539	) -> &mut Chain<
540		StandardCommandTransaction,
541		dyn interceptor::NamespaceDefPreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
542	> {
543		&mut self.interceptors.namespace_def_pre_delete
544	}
545
546	// Table definition interceptors
547	fn table_def_post_create_interceptors(
548		&mut self,
549	) -> &mut Chain<
550		StandardCommandTransaction,
551		dyn interceptor::TableDefPostCreateInterceptor<StandardCommandTransaction> + Send + Sync,
552	> {
553		&mut self.interceptors.table_def_post_create
554	}
555
556	fn table_def_pre_update_interceptors(
557		&mut self,
558	) -> &mut Chain<
559		StandardCommandTransaction,
560		dyn interceptor::TableDefPreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
561	> {
562		&mut self.interceptors.table_def_pre_update
563	}
564
565	fn table_def_post_update_interceptors(
566		&mut self,
567	) -> &mut Chain<
568		StandardCommandTransaction,
569		dyn interceptor::TableDefPostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
570	> {
571		&mut self.interceptors.table_def_post_update
572	}
573
574	fn table_def_pre_delete_interceptors(
575		&mut self,
576	) -> &mut Chain<
577		StandardCommandTransaction,
578		dyn interceptor::TableDefPreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
579	> {
580		&mut self.interceptors.table_def_pre_delete
581	}
582
583	// View definition interceptors
584	fn view_def_post_create_interceptors(
585		&mut self,
586	) -> &mut Chain<
587		StandardCommandTransaction,
588		dyn interceptor::ViewDefPostCreateInterceptor<StandardCommandTransaction> + Send + Sync,
589	> {
590		&mut self.interceptors.view_def_post_create
591	}
592
593	fn view_def_pre_update_interceptors(
594		&mut self,
595	) -> &mut Chain<
596		StandardCommandTransaction,
597		dyn interceptor::ViewDefPreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
598	> {
599		&mut self.interceptors.view_def_pre_update
600	}
601
602	fn view_def_post_update_interceptors(
603		&mut self,
604	) -> &mut Chain<
605		StandardCommandTransaction,
606		dyn interceptor::ViewDefPostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
607	> {
608		&mut self.interceptors.view_def_post_update
609	}
610
611	fn view_def_pre_delete_interceptors(
612		&mut self,
613	) -> &mut Chain<
614		StandardCommandTransaction,
615		dyn interceptor::ViewDefPreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
616	> {
617		&mut self.interceptors.view_def_pre_delete
618	}
619
620	// Ring buffer definition interceptors
621	fn ringbuffer_def_post_create_interceptors(
622		&mut self,
623	) -> &mut Chain<
624		StandardCommandTransaction,
625		dyn interceptor::RingBufferDefPostCreateInterceptor<StandardCommandTransaction> + Send + Sync,
626	> {
627		&mut self.interceptors.ringbuffer_def_post_create
628	}
629
630	fn ringbuffer_def_pre_update_interceptors(
631		&mut self,
632	) -> &mut Chain<
633		StandardCommandTransaction,
634		dyn interceptor::RingBufferDefPreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
635	> {
636		&mut self.interceptors.ringbuffer_def_pre_update
637	}
638
639	fn ringbuffer_def_post_update_interceptors(
640		&mut self,
641	) -> &mut Chain<
642		StandardCommandTransaction,
643		dyn interceptor::RingBufferDefPostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
644	> {
645		&mut self.interceptors.ringbuffer_def_post_update
646	}
647
648	fn ringbuffer_def_pre_delete_interceptors(
649		&mut self,
650	) -> &mut Chain<
651		StandardCommandTransaction,
652		dyn interceptor::RingBufferDefPreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
653	> {
654		&mut self.interceptors.ringbuffer_def_pre_delete
655	}
656}
657
658impl TransactionalChanges for StandardCommandTransaction {}
659
660impl Drop for StandardCommandTransaction {
661	fn drop(&mut self) {
662		if let Some(mut multi) = self.cmd.take() {
663			// Auto-rollback if still active (not committed or
664			// rolled back)
665			if self.state == TransactionState::Active {
666				let _ = multi.rollback();
667			}
668		}
669	}
670}