Skip to main content

reifydb_transaction/transaction/
command.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::mem::take;
5
6use reifydb_core::{
7	common::CommitVersion,
8	delta::Delta,
9	encoded::{
10		encoded::EncodedValues,
11		key::{EncodedKey, EncodedKeyRange},
12	},
13	event::EventBus,
14	interface::{
15		WithEventBus,
16		change::Change,
17		store::{MultiVersionBatch, MultiVersionValues},
18	},
19	testing::TestingContext,
20};
21use reifydb_type::Result;
22use tracing::instrument;
23
24use crate::{
25	TransactionId,
26	change::{RowChange, TransactionalDefChanges},
27	error::TransactionError,
28	interceptor::{
29		WithInterceptors,
30		chain::InterceptorChain as Chain,
31		interceptors::Interceptors,
32		namespace::{
33			NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
34			NamespacePreUpdateInterceptor,
35		},
36		ringbuffer::{
37			RingBufferPostDeleteInterceptor, RingBufferPostInsertInterceptor,
38			RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
39			RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor,
40		},
41		ringbuffer_def::{
42			RingBufferDefPostCreateInterceptor, RingBufferDefPostUpdateInterceptor,
43			RingBufferDefPreDeleteInterceptor, RingBufferDefPreUpdateInterceptor,
44		},
45		table::{
46			TablePostDeleteInterceptor, TablePostInsertInterceptor, TablePostUpdateInterceptor,
47			TablePreDeleteInterceptor, TablePreInsertInterceptor, TablePreUpdateInterceptor,
48		},
49		table_def::{
50			TableDefPostCreateInterceptor, TableDefPostUpdateInterceptor, TableDefPreDeleteInterceptor,
51			TableDefPreUpdateInterceptor,
52		},
53		transaction::{PostCommitContext, PostCommitInterceptor, PreCommitContext, PreCommitInterceptor},
54		view::{
55			ViewPostDeleteInterceptor, ViewPostInsertInterceptor, ViewPostUpdateInterceptor,
56			ViewPreDeleteInterceptor, ViewPreInsertInterceptor, ViewPreUpdateInterceptor,
57		},
58		view_def::{
59			ViewDefPostCreateInterceptor, ViewDefPostUpdateInterceptor, ViewDefPreDeleteInterceptor,
60			ViewDefPreUpdateInterceptor,
61		},
62	},
63	multi::{
64		pending::PendingWrites,
65		transaction::{MultiTransaction, write::MultiWriteTransaction},
66	},
67	single::{SingleTransaction, read::SingleReadTransaction, write::SingleWriteTransaction},
68	transaction::query::QueryTransaction,
69};
70
71/// An active command transaction that holds a multi command transaction
72/// and provides query/command access to single storage.
73///
74/// The transaction will auto-rollback on drop if not explicitly committed.
75pub struct CommandTransaction {
76	pub multi: MultiTransaction,
77	pub single: SingleTransaction,
78	state: TransactionState,
79
80	pub cmd: Option<MultiWriteTransaction>,
81	pub event_bus: EventBus,
82
83	// Track row changes for post-commit events
84	pub(crate) row_changes: Vec<RowChange>,
85	pub(crate) interceptors: Interceptors,
86
87	// Track table changes for transactional flow pre-commit processing
88	pub(crate) pending_flow_changes: Vec<Change>,
89
90	/// Testing audit log. Set by the VM when in test context.
91	pub testing: Option<TestingContext>,
92}
93
94#[derive(Clone, Copy, PartialEq)]
95enum TransactionState {
96	Active,
97	Committed,
98	RolledBack,
99}
100
101impl CommandTransaction {
102	/// Creates a new active command transaction with a pre-commit callback
103	#[instrument(name = "transaction::command::new", level = "debug", skip_all)]
104	pub fn new(
105		multi: MultiTransaction,
106		single: SingleTransaction,
107		event_bus: EventBus,
108		interceptors: Interceptors,
109	) -> Result<Self> {
110		let cmd = multi.begin_command()?;
111		Ok(Self {
112			cmd: Some(cmd),
113			multi,
114			single,
115			state: TransactionState::Active,
116			event_bus,
117			interceptors,
118			row_changes: Vec::new(),
119			pending_flow_changes: Vec::new(),
120			testing: None,
121		})
122	}
123
124	#[instrument(name = "transaction::command::event_bus", level = "trace", skip(self))]
125	pub fn event_bus(&self) -> &EventBus {
126		&self.event_bus
127	}
128
129	/// Check if transaction is still active and return appropriate error if
130	/// not
131	fn check_active(&self) -> Result<()> {
132		match self.state {
133			TransactionState::Active => Ok(()),
134			TransactionState::Committed => {
135				return Err(TransactionError::AlreadyCommitted.into());
136			}
137			TransactionState::RolledBack => {
138				return Err(TransactionError::AlreadyRolledBack.into());
139			}
140		}
141	}
142
143	/// Commit the transaction.
144	/// Since single transactions are short-lived and auto-commit,
145	/// this only commits the multi transaction.
146	#[instrument(name = "transaction::command::commit", level = "debug", skip(self))]
147	pub fn commit(&mut self) -> Result<CommitVersion> {
148		self.check_active()?;
149
150		let transaction_writes: Vec<(EncodedKey, Option<EncodedValues>)> = self
151			.pending_writes()
152			.iter()
153			.map(|(key, pending)| match &pending.delta {
154				Delta::Set {
155					values,
156					..
157				} => (key.clone(), Some(values.clone())),
158				_ => (key.clone(), None),
159			})
160			.collect();
161
162		let mut ctx = PreCommitContext {
163			flow_changes: take(&mut self.pending_flow_changes),
164			pending_writes: Vec::new(),
165			transaction_writes,
166			testing: self.testing.take(),
167		};
168		self.interceptors.pre_commit.execute(&mut ctx)?;
169		self.testing = ctx.testing;
170
171		if let Some(mut multi) = self.cmd.take() {
172			// Apply pending view writes produced by pre-commit interceptors
173			for (key, value) in &ctx.pending_writes {
174				match value {
175					Some(v) => multi.set(key, v.clone())?,
176					None => multi.remove(key)?,
177				}
178			}
179
180			let id = multi.tm.id();
181			self.state = TransactionState::Committed;
182
183			let changes = TransactionalDefChanges::default();
184			let row_changes = take(&mut self.row_changes);
185
186			let version = multi.commit()?;
187			self.interceptors.post_commit.execute(PostCommitContext::new(
188				id,
189				version,
190				changes,
191				row_changes,
192			))?;
193
194			Ok(version)
195		} else {
196			// This should never happen due to check_active
197			unreachable!("Transaction state inconsistency")
198		}
199	}
200
201	/// Rollback the transaction.
202	#[instrument(name = "transaction::command::rollback", level = "debug", skip(self))]
203	pub fn rollback(&mut self) -> Result<()> {
204		self.check_active()?;
205		if let Some(mut multi) = self.cmd.take() {
206			self.state = TransactionState::RolledBack;
207			multi.rollback()
208		} else {
209			// This should never happen due to check_active
210			unreachable!("Transaction state inconsistency")
211		}
212	}
213
214	/// Get access to the pending writes in this transaction
215	///
216	/// This allows checking for key conflicts when committing FlowTransactions
217	/// to ensure they operate on non-overlapping keyspaces.
218	#[instrument(name = "transaction::command::pending_writes", level = "trace", skip(self))]
219	pub fn pending_writes(&self) -> &PendingWrites {
220		self.cmd.as_ref().unwrap().pending_writes()
221	}
222
223	/// Execute a function with query access to the single transaction.
224	#[instrument(name = "transaction::command::with_single_query", level = "trace", skip(self, keys, f))]
225	pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
226	where
227		I: IntoIterator<Item = &'a EncodedKey> + Send,
228		F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
229		R: Send,
230	{
231		self.check_active()?;
232		self.single.with_query(keys, f)
233	}
234
235	/// Execute a function with query access to the single transaction.
236	#[instrument(name = "transaction::command::with_single_command", level = "trace", skip(self, keys, f))]
237	pub fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
238	where
239		I: IntoIterator<Item = &'a EncodedKey> + Send,
240		F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R> + Send,
241		R: Send,
242	{
243		self.check_active()?;
244		self.single.with_command(keys, f)
245	}
246
247	/// Execute a function with a query transaction view.
248	/// This creates a new query transaction using the stored multi-version storage.
249	/// The query transaction will operate independently but share the same single/CDC storage.
250	#[instrument(name = "transaction::command::with_multi_query", level = "trace", skip(self, f))]
251	pub fn with_multi_query<F, R>(&self, f: F) -> Result<R>
252	where
253		F: FnOnce(&mut QueryTransaction) -> Result<R>,
254	{
255		self.check_active()?;
256
257		let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
258
259		f(&mut query_txn)
260	}
261
262	#[instrument(name = "transaction::command::with_multi_query_as_of_exclusive", level = "trace", skip(self, f))]
263	pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
264	where
265		F: FnOnce(&mut QueryTransaction) -> Result<R>,
266	{
267		self.check_active()?;
268
269		let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
270
271		query_txn.read_as_of_version_exclusive(version)?;
272
273		f(&mut query_txn)
274	}
275
276	#[instrument(name = "transaction::command::with_multi_query_as_of_inclusive", level = "trace", skip(self, f))]
277	pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
278	where
279		F: FnOnce(&mut QueryTransaction) -> Result<R>,
280	{
281		self.check_active()?;
282
283		let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
284
285		query_txn.multi.read_as_of_version_inclusive(version);
286
287		f(&mut query_txn)
288	}
289
290	/// Begin a single-version query transaction for specific keys
291	#[instrument(name = "transaction::command::begin_single_query", level = "trace", skip(self, keys))]
292	pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
293	where
294		I: IntoIterator<Item = &'a EncodedKey>,
295	{
296		self.check_active()?;
297		self.single.begin_query(keys)
298	}
299
300	/// Begin a single-version command transaction for specific keys
301	#[instrument(name = "transaction::command::begin_single_command", level = "trace", skip(self, keys))]
302	pub fn begin_single_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
303	where
304		I: IntoIterator<Item = &'a EncodedKey>,
305	{
306		self.check_active()?;
307		self.single.begin_command(keys)
308	}
309
310	/// Track a row change for post-commit event emission
311	pub fn track_row_change(&mut self, change: RowChange) {
312		self.row_changes.push(change);
313	}
314
315	/// Track a flow change for transactional view pre-commit processing
316	pub fn track_flow_change(&mut self, change: Change) {
317		self.pending_flow_changes.push(change);
318	}
319
320	/// Get the transaction version
321	#[inline]
322	pub fn version(&self) -> CommitVersion {
323		self.cmd.as_ref().unwrap().version()
324	}
325
326	/// Get the transaction ID
327	#[inline]
328	pub fn id(&self) -> TransactionId {
329		self.cmd.as_ref().unwrap().tm.id()
330	}
331
332	/// Get a value by key
333	#[inline]
334	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>> {
335		self.check_active()?;
336		Ok(self.cmd.as_mut().unwrap().get(key)?.map(|v| v.into_multi_version_values()))
337	}
338
339	/// Check if a key exists
340	#[inline]
341	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
342		self.check_active()?;
343		self.cmd.as_mut().unwrap().contains_key(key)
344	}
345
346	/// Get a prefix batch
347	#[inline]
348	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
349		self.check_active()?;
350		self.cmd.as_mut().unwrap().prefix(prefix)
351	}
352
353	/// Get a reverse prefix batch
354	#[inline]
355	pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
356		self.check_active()?;
357		self.cmd.as_mut().unwrap().prefix_rev(prefix)
358	}
359
360	/// Read as of version exclusive
361	#[inline]
362	pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
363		self.check_active()?;
364		self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version);
365		Ok(())
366	}
367
368	/// Set a key-value pair
369	#[inline]
370	pub fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> Result<()> {
371		self.check_active()?;
372		self.cmd.as_mut().unwrap().set(key, row)
373	}
374
375	/// Unset a key, preserving the deleted values.
376	///
377	/// The `values` parameter contains the deleted values for CDC and metrics.
378	#[inline]
379	pub fn unset(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()> {
380		self.check_active()?;
381		self.cmd.as_mut().unwrap().unset(key, values)
382	}
383
384	/// Remove a key without preserving the deleted values.
385	///
386	/// Use when only the key matters (e.g., index entries, catalog metadata).
387	#[inline]
388	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
389		self.check_active()?;
390		self.cmd.as_mut().unwrap().remove(key)
391	}
392
393	/// Create a streaming iterator for forward range queries.
394	#[inline]
395	pub fn range(
396		&mut self,
397		range: EncodedKeyRange,
398		batch_size: usize,
399	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
400		self.check_active()?;
401		Ok(self.cmd.as_mut().unwrap().range(range, batch_size))
402	}
403
404	/// Create a streaming iterator for reverse range queries.
405	#[inline]
406	pub fn range_rev(
407		&mut self,
408		range: EncodedKeyRange,
409		batch_size: usize,
410	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
411		self.check_active()?;
412		Ok(self.cmd.as_mut().unwrap().range_rev(range, batch_size))
413	}
414}
415
416impl WithEventBus for CommandTransaction {
417	fn event_bus(&self) -> &EventBus {
418		&self.event_bus
419	}
420}
421
422impl WithInterceptors for CommandTransaction {
423	fn table_pre_insert_interceptors(&mut self) -> &mut Chain<dyn TablePreInsertInterceptor + Send + Sync> {
424		&mut self.interceptors.table_pre_insert
425	}
426
427	fn table_post_insert_interceptors(&mut self) -> &mut Chain<dyn TablePostInsertInterceptor + Send + Sync> {
428		&mut self.interceptors.table_post_insert
429	}
430
431	fn table_pre_update_interceptors(&mut self) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync> {
432		&mut self.interceptors.table_pre_update
433	}
434
435	fn table_post_update_interceptors(&mut self) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync> {
436		&mut self.interceptors.table_post_update
437	}
438
439	fn table_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync> {
440		&mut self.interceptors.table_pre_delete
441	}
442
443	fn table_post_delete_interceptors(&mut self) -> &mut Chain<dyn TablePostDeleteInterceptor + Send + Sync> {
444		&mut self.interceptors.table_post_delete
445	}
446
447	fn ringbuffer_pre_insert_interceptors(
448		&mut self,
449	) -> &mut Chain<dyn RingBufferPreInsertInterceptor + Send + Sync> {
450		&mut self.interceptors.ringbuffer_pre_insert
451	}
452
453	fn ringbuffer_post_insert_interceptors(
454		&mut self,
455	) -> &mut Chain<dyn RingBufferPostInsertInterceptor + Send + Sync> {
456		&mut self.interceptors.ringbuffer_post_insert
457	}
458
459	fn ringbuffer_pre_update_interceptors(
460		&mut self,
461	) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
462		&mut self.interceptors.ringbuffer_pre_update
463	}
464
465	fn ringbuffer_post_update_interceptors(
466		&mut self,
467	) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
468		&mut self.interceptors.ringbuffer_post_update
469	}
470
471	fn ringbuffer_pre_delete_interceptors(
472		&mut self,
473	) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
474		&mut self.interceptors.ringbuffer_pre_delete
475	}
476
477	fn ringbuffer_post_delete_interceptors(
478		&mut self,
479	) -> &mut Chain<dyn RingBufferPostDeleteInterceptor + Send + Sync> {
480		&mut self.interceptors.ringbuffer_post_delete
481	}
482
483	fn pre_commit_interceptors(&mut self) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync> {
484		&mut self.interceptors.pre_commit
485	}
486
487	fn post_commit_interceptors(&mut self) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync> {
488		&mut self.interceptors.post_commit
489	}
490
491	fn namespace_post_create_interceptors(
492		&mut self,
493	) -> &mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync> {
494		&mut self.interceptors.namespace_post_create
495	}
496
497	fn namespace_pre_update_interceptors(&mut self) -> &mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync> {
498		&mut self.interceptors.namespace_pre_update
499	}
500
501	fn namespace_post_update_interceptors(
502		&mut self,
503	) -> &mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync> {
504		&mut self.interceptors.namespace_post_update
505	}
506
507	fn namespace_pre_delete_interceptors(&mut self) -> &mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync> {
508		&mut self.interceptors.namespace_pre_delete
509	}
510
511	fn table_def_post_create_interceptors(
512		&mut self,
513	) -> &mut Chain<dyn TableDefPostCreateInterceptor + Send + Sync> {
514		&mut self.interceptors.table_def_post_create
515	}
516
517	fn table_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn TableDefPreUpdateInterceptor + Send + Sync> {
518		&mut self.interceptors.table_def_pre_update
519	}
520
521	fn table_def_post_update_interceptors(
522		&mut self,
523	) -> &mut Chain<dyn TableDefPostUpdateInterceptor + Send + Sync> {
524		&mut self.interceptors.table_def_post_update
525	}
526
527	fn table_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TableDefPreDeleteInterceptor + Send + Sync> {
528		&mut self.interceptors.table_def_pre_delete
529	}
530
531	fn view_pre_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPreInsertInterceptor + Send + Sync> {
532		&mut self.interceptors.view_pre_insert
533	}
534
535	fn view_post_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPostInsertInterceptor + Send + Sync> {
536		&mut self.interceptors.view_post_insert
537	}
538
539	fn view_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync> {
540		&mut self.interceptors.view_pre_update
541	}
542
543	fn view_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync> {
544		&mut self.interceptors.view_post_update
545	}
546
547	fn view_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync> {
548		&mut self.interceptors.view_pre_delete
549	}
550
551	fn view_post_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPostDeleteInterceptor + Send + Sync> {
552		&mut self.interceptors.view_post_delete
553	}
554
555	fn view_def_post_create_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostCreateInterceptor + Send + Sync> {
556		&mut self.interceptors.view_def_post_create
557	}
558
559	fn view_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreUpdateInterceptor + Send + Sync> {
560		&mut self.interceptors.view_def_pre_update
561	}
562
563	fn view_def_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostUpdateInterceptor + Send + Sync> {
564		&mut self.interceptors.view_def_post_update
565	}
566
567	fn view_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreDeleteInterceptor + Send + Sync> {
568		&mut self.interceptors.view_def_pre_delete
569	}
570
571	fn ringbuffer_def_post_create_interceptors(
572		&mut self,
573	) -> &mut Chain<dyn RingBufferDefPostCreateInterceptor + Send + Sync> {
574		&mut self.interceptors.ringbuffer_def_post_create
575	}
576
577	fn ringbuffer_def_pre_update_interceptors(
578		&mut self,
579	) -> &mut Chain<dyn RingBufferDefPreUpdateInterceptor + Send + Sync> {
580		&mut self.interceptors.ringbuffer_def_pre_update
581	}
582
583	fn ringbuffer_def_post_update_interceptors(
584		&mut self,
585	) -> &mut Chain<dyn RingBufferDefPostUpdateInterceptor + Send + Sync> {
586		&mut self.interceptors.ringbuffer_def_post_update
587	}
588
589	fn ringbuffer_def_pre_delete_interceptors(
590		&mut self,
591	) -> &mut Chain<dyn RingBufferDefPreDeleteInterceptor + Send + Sync> {
592		&mut self.interceptors.ringbuffer_def_pre_delete
593	}
594}
595
596impl Drop for CommandTransaction {
597	fn drop(&mut self) {
598		if let Some(mut multi) = self.cmd.take() {
599			// Auto-rollback if still active (not committed or rolled back)
600			if self.state == TransactionState::Active {
601				let _ = multi.rollback();
602			}
603		}
604	}
605}