Skip to main content

reifydb_transaction/transaction/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	common::CommitVersion,
8	delta::Delta,
9	encoded::{
10		key::{EncodedKey, EncodedKeyRange},
11		row::EncodedRow,
12	},
13	execution::ExecutionResult,
14	interface::{
15		catalog::{policy::SessionOp, shape::ShapeId},
16		change::{Change, Diff},
17		store::{MultiVersionBatch, MultiVersionRow},
18	},
19	testing::{CapturedEvent, CapturedInvocation},
20	value::column::columns::Columns,
21};
22use reifydb_type::{
23	Result,
24	error::Diagnostic,
25	params::Params,
26	value::{datetime::DateTime, identity::IdentityId},
27};
28
29use crate::{
30	TransactionId,
31	change::{CatalogChangesSavepoint, RowChange},
32	interceptor::{
33		WithInterceptors,
34		authentication::{AuthenticationPostCreateInterceptor, AuthenticationPreDeleteInterceptor},
35		chain::InterceptorChain as Chain,
36		dictionary::{
37			DictionaryPostCreateInterceptor, DictionaryPostUpdateInterceptor,
38			DictionaryPreDeleteInterceptor, DictionaryPreUpdateInterceptor,
39		},
40		dictionary_row::{
41			DictionaryRowPostDeleteInterceptor, DictionaryRowPostInsertInterceptor,
42			DictionaryRowPostUpdateInterceptor, DictionaryRowPreDeleteInterceptor,
43			DictionaryRowPreInsertInterceptor, DictionaryRowPreUpdateInterceptor,
44		},
45		granted_role::{GrantedRolePostCreateInterceptor, GrantedRolePreDeleteInterceptor},
46		identity::{
47			IdentityPostCreateInterceptor, IdentityPostUpdateInterceptor, IdentityPreDeleteInterceptor,
48			IdentityPreUpdateInterceptor,
49		},
50		namespace::{
51			NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
52			NamespacePreUpdateInterceptor,
53		},
54		ringbuffer::{
55			RingBufferPostCreateInterceptor, RingBufferPostUpdateInterceptor,
56			RingBufferPreDeleteInterceptor, RingBufferPreUpdateInterceptor,
57		},
58		ringbuffer_row::{
59			RingBufferRowPostDeleteInterceptor, RingBufferRowPostInsertInterceptor,
60			RingBufferRowPostUpdateInterceptor, RingBufferRowPreDeleteInterceptor,
61			RingBufferRowPreInsertInterceptor, RingBufferRowPreUpdateInterceptor,
62		},
63		role::{
64			RolePostCreateInterceptor, RolePostUpdateInterceptor, RolePreDeleteInterceptor,
65			RolePreUpdateInterceptor,
66		},
67		series::{
68			SeriesPostCreateInterceptor, SeriesPostUpdateInterceptor, SeriesPreDeleteInterceptor,
69			SeriesPreUpdateInterceptor,
70		},
71		series_row::{
72			SeriesRowPostDeleteInterceptor, SeriesRowPostInsertInterceptor, SeriesRowPostUpdateInterceptor,
73			SeriesRowPreDeleteInterceptor, SeriesRowPreInsertInterceptor, SeriesRowPreUpdateInterceptor,
74		},
75		table::{
76			TablePostCreateInterceptor, TablePostUpdateInterceptor, TablePreDeleteInterceptor,
77			TablePreUpdateInterceptor,
78		},
79		table_row::{
80			TableRowPostDeleteInterceptor, TableRowPostInsertInterceptor, TableRowPostUpdateInterceptor,
81			TableRowPreDeleteInterceptor, TableRowPreInsertInterceptor, TableRowPreUpdateInterceptor,
82		},
83		transaction::{PostCommitInterceptor, PreCommitContext, PreCommitInterceptor},
84		view::{
85			ViewPostCreateInterceptor, ViewPostUpdateInterceptor, ViewPreDeleteInterceptor,
86			ViewPreUpdateInterceptor,
87		},
88		view_row::{
89			ViewRowPostDeleteInterceptor, ViewRowPostInsertInterceptor, ViewRowPostUpdateInterceptor,
90			ViewRowPreDeleteInterceptor, ViewRowPreInsertInterceptor, ViewRowPreUpdateInterceptor,
91		},
92	},
93	multi::transaction::write::WriteSavepoint,
94	single::{read::SingleReadTransaction, write::SingleWriteTransaction},
95	transaction::{
96		admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
97		replica::ReplicaTransaction, write::Write,
98	},
99};
100
101/// Trait for executing RQL within a transaction.
102///
103/// This trait decouples RQL execution from the transaction layer, allowing
104/// any component (procedures, ProcedureContext, tests, etc.) to execute
105/// RQL through a transaction without a direct dependency on the engine crate.
106pub trait RqlExecutor: Send + Sync {
107	fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult;
108}
109
110pub mod admin;
111pub mod catalog;
112pub mod command;
113pub mod query;
114pub mod replica;
115pub mod write;
116
117use crate::multi::{pending::PendingWrites, transaction::write::MultiWriteTransaction};
118
119/// Collect transaction writes from pending writes for use in `PreCommitContext`.
120#[inline]
121pub(super) fn collect_transaction_writes(pending: &PendingWrites) -> Vec<(EncodedKey, Option<EncodedRow>)> {
122	pending.iter()
123		.map(|(key, p)| match &p.delta {
124			Delta::Set {
125				row,
126				..
127			} => (key.clone(), Some(row.clone())),
128			_ => (key.clone(), None),
129		})
130		.collect()
131}
132
133/// Apply pending writes produced by pre-commit interceptors to the multi transaction.
134#[inline]
135pub(super) fn apply_pre_commit_writes(
136	multi: &mut MultiWriteTransaction,
137	pending_writes: &[(EncodedKey, Option<EncodedRow>)],
138) -> Result<()> {
139	for (key, value) in pending_writes {
140		match value {
141			Some(v) => multi.set(key, v.clone())?,
142			None => multi.remove(key)?,
143		}
144	}
145	Ok(())
146}
147
148/// Opaque savepoint for per-test transaction isolation.
149pub struct Savepoint {
150	write: WriteSavepoint,
151	row_changes_len: usize,
152	accumulator_len: usize,
153	changes: CatalogChangesSavepoint,
154}
155
156pub struct TestTransaction<'a> {
157	pub inner: &'a mut AdminTransaction,
158	pub baseline: usize,
159	pub events: &'a mut Vec<CapturedEvent>,
160	pub invocations: &'a mut Vec<CapturedInvocation>,
161	pub event_seq: &'a mut u64,
162	pub handler_seq: &'a mut u64,
163	pub savepoint: Option<Savepoint>,
164	pub session_type: SessionOp,
165	pub session_default_deny: bool,
166}
167
168impl<'a> TestTransaction<'a> {
169	pub fn new(
170		inner: &'a mut AdminTransaction,
171		events: &'a mut Vec<CapturedEvent>,
172		invocations: &'a mut Vec<CapturedInvocation>,
173		event_seq: &'a mut u64,
174		handler_seq: &'a mut u64,
175		session_type: SessionOp,
176		session_default_deny: bool,
177	) -> Self {
178		let baseline = inner.accumulator.len();
179		let savepoint = Savepoint {
180			write: inner.cmd.as_ref().unwrap().savepoint(),
181			row_changes_len: inner.row_changes.len(),
182			accumulator_len: inner.accumulator.len(),
183			changes: inner.changes.savepoint(),
184		};
185		Self {
186			inner,
187			baseline,
188			events,
189			invocations,
190			event_seq,
191			handler_seq,
192			savepoint: Some(savepoint),
193			session_type,
194			session_default_deny,
195		}
196	}
197
198	/// Restore transaction state to the savepoint captured at construction.
199	pub fn restore(&mut self) {
200		if let Some(sp) = self.savepoint.take() {
201			self.inner.cmd.as_mut().unwrap().restore_savepoint(sp.write);
202			self.inner.row_changes.truncate(sp.row_changes_len);
203			self.inner.accumulator.truncate(sp.accumulator_len);
204			self.inner.changes.restore_savepoint(sp.changes);
205			self.inner.unpoison();
206		}
207	}
208
209	/// Re-borrow this test transaction with a shorter lifetime,
210	/// producing a TestTransaction suitable for embedding in a
211	/// `Transaction::Test` variant without consuming `self`.
212	pub fn reborrow(&mut self) -> TestTransaction<'_> {
213		TestTransaction {
214			inner: &mut *self.inner,
215			baseline: self.baseline,
216			events: &mut *self.events,
217			invocations: &mut *self.invocations,
218			event_seq: &mut *self.event_seq,
219			handler_seq: &mut *self.handler_seq,
220			savepoint: None,
221			session_type: self.session_type,
222			session_default_deny: self.session_default_deny,
223		}
224	}
225
226	/// Read accumulator entries since the baseline.
227	/// Used by testing helpers to inspect mutations within the current test.
228	pub fn accumulator_entries_from(&self) -> &[(ShapeId, Diff)] {
229		self.inner.accumulator.entries_from(self.baseline)
230	}
231
232	/// Execute test-only pre-commit style processing without committing.
233	///
234	/// If a `test_pre_commit` hook is registered on the interceptors, it is
235	/// called first to ensure uncommitted flows are registered in the shared
236	/// flow engine.  Then the pre-commit interceptor chain runs (which
237	/// includes transactional flow processing) over accumulator entries from
238	/// the baseline onwards.
239	///
240	/// Used by testing helpers that need commit-time flow work materialized
241	/// while still staying inside the test savepoint.
242	pub fn capture_testing_pre_commit(&mut self) -> Result<()> {
243		// Only process if there are non-view source changes; view-only entries
244		// are flow output from a previous call and must not be re-consumed.
245		let has_source_changes = self
246			.inner
247			.accumulator
248			.entries_from(self.baseline)
249			.iter()
250			.any(|(id, _)| !matches!(id, ShapeId::View(_)));
251
252		if !has_source_changes {
253			return Ok(());
254		}
255
256		// Clone the hook before re-borrowing self.
257		let hook = self.inner.interceptors.test_pre_commit.clone();
258
259		if let Some(hook) = hook {
260			hook(self)?;
261		}
262
263		let offset = self.baseline;
264		let transaction_writes: Vec<(EncodedKey, Option<EncodedRow>)> = self
265			.inner
266			.pending_writes()
267			.iter()
268			.map(|(key, pending)| match &pending.delta {
269				Delta::Set {
270					row,
271					..
272				} => (key.clone(), Some(row.clone())),
273				_ => (key.clone(), None),
274			})
275			.collect();
276
277		let mut ctx = PreCommitContext {
278			flow_changes: self.inner.accumulator.take_changes_from(
279				offset,
280				CommitVersion(0),
281				DateTime::from_nanos(self.inner.clock.now_nanos()),
282			),
283			pending_writes: Vec::new(),
284			pending_shapes: Vec::new(),
285			transaction_writes,
286			view_entries: Vec::new(),
287		};
288
289		self.inner.interceptors.pre_commit.execute(&mut ctx)?;
290
291		for (key, value) in &ctx.pending_writes {
292			match value {
293				Some(v) => self.inner.cmd.as_mut().unwrap().set(key, v.clone())?,
294				None => self.inner.cmd.as_mut().unwrap().remove(key)?,
295			}
296		}
297
298		for (id, diff) in ctx.view_entries {
299			self.inner.accumulator.track(id, diff);
300		}
301
302		Ok(())
303	}
304}
305
306/// An enum that can hold either a command, admin, query, or subscription transaction
307/// for flexible execution
308pub enum Transaction<'a> {
309	Command(&'a mut CommandTransaction),
310	Admin(&'a mut AdminTransaction),
311	Query(&'a mut QueryTransaction),
312	Test(Box<TestTransaction<'a>>),
313	Replica(&'a mut ReplicaTransaction),
314}
315
316impl<'a> Transaction<'a> {
317	/// Get the transaction version
318	pub fn version(&self) -> CommitVersion {
319		match self {
320			Self::Command(txn) => txn.version(),
321			Self::Admin(txn) => txn.version(),
322			Self::Query(txn) => txn.version(),
323			Self::Test(t) => t.inner.version(),
324			Self::Replica(txn) => txn.version(),
325		}
326	}
327
328	/// Get the transaction ID
329	pub fn id(&self) -> TransactionId {
330		match self {
331			Self::Command(txn) => txn.id(),
332			Self::Admin(txn) => txn.id(),
333			Self::Query(txn) => txn.id(),
334			Self::Test(t) => t.inner.id(),
335			Self::Replica(txn) => txn.id(),
336		}
337	}
338
339	/// Get a value by key (async method)
340	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
341		match self {
342			Self::Command(txn) => txn.get(key),
343			Self::Admin(txn) => txn.get(key),
344			Self::Query(txn) => txn.get(key),
345			Self::Test(t) => t.inner.get(key),
346			Self::Replica(txn) => txn.get(key),
347		}
348	}
349
350	/// Read the committed value at the transaction's read version, ignoring
351	/// pending intra-tx writes. For read-only transaction variants (Query,
352	/// Replica) this is equivalent to `get` since they hold no pending
353	/// writes.
354	pub fn get_committed(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
355		match self {
356			Self::Command(txn) => txn.get_committed(key),
357			Self::Admin(txn) => txn.get_committed(key),
358			Self::Query(txn) => txn.get(key),
359			Self::Test(t) => t.inner.get_committed(key),
360			Self::Replica(txn) => txn.get(key),
361		}
362	}
363
364	/// Check if a key exists (async method)
365	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
366		match self {
367			Self::Command(txn) => txn.contains_key(key),
368			Self::Admin(txn) => txn.contains_key(key),
369			Self::Query(txn) => txn.contains_key(key),
370			Self::Test(t) => t.inner.contains_key(key),
371			Self::Replica(txn) => txn.contains_key(key),
372		}
373	}
374
375	/// Get a prefix batch (async method)
376	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
377		match self {
378			Self::Command(txn) => txn.prefix(prefix),
379			Self::Admin(txn) => txn.prefix(prefix),
380			Self::Query(txn) => txn.prefix(prefix),
381			Self::Test(t) => t.inner.prefix(prefix),
382			Self::Replica(txn) => txn.prefix(prefix),
383		}
384	}
385
386	/// Get a reverse prefix batch (async method)
387	pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
388		match self {
389			Self::Command(txn) => txn.prefix_rev(prefix),
390			Self::Admin(txn) => txn.prefix_rev(prefix),
391			Self::Query(txn) => txn.prefix_rev(prefix),
392			Self::Test(t) => t.inner.prefix_rev(prefix),
393			Self::Replica(txn) => txn.prefix_rev(prefix),
394		}
395	}
396
397	/// Read as of version exclusive (async method)
398	pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
399		match self {
400			Transaction::Command(txn) => txn.read_as_of_version_exclusive(version),
401			Transaction::Admin(txn) => txn.read_as_of_version_exclusive(version),
402			Transaction::Query(txn) => txn.read_as_of_version_exclusive(version),
403			Transaction::Test(t) => t.inner.read_as_of_version_exclusive(version),
404			Transaction::Replica(_) => {
405				panic!("read_as_of_version_exclusive not supported on Replica transaction")
406			}
407		}
408	}
409
410	/// Create a streaming iterator for forward range queries.
411	pub fn range(
412		&mut self,
413		range: EncodedKeyRange,
414		batch_size: usize,
415	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>> {
416		match self {
417			Transaction::Command(txn) => txn.range(range, batch_size),
418			Transaction::Admin(txn) => txn.range(range, batch_size),
419			Transaction::Query(txn) => Ok(txn.range(range, batch_size)),
420			Transaction::Test(t) => t.inner.range(range, batch_size),
421			Transaction::Replica(txn) => txn.range(range, batch_size),
422		}
423	}
424
425	/// Create a streaming iterator for reverse range queries.
426	pub fn range_rev(
427		&mut self,
428		range: EncodedKeyRange,
429		batch_size: usize,
430	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>> {
431		match self {
432			Transaction::Command(txn) => txn.range_rev(range, batch_size),
433			Transaction::Admin(txn) => txn.range_rev(range, batch_size),
434			Transaction::Query(txn) => Ok(txn.range_rev(range, batch_size)),
435			Transaction::Test(t) => t.inner.range_rev(range, batch_size),
436			Transaction::Replica(txn) => txn.range_rev(range, batch_size),
437		}
438	}
439}
440
441impl<'a> From<&'a mut CommandTransaction> for Transaction<'a> {
442	fn from(txn: &'a mut CommandTransaction) -> Self {
443		Self::Command(txn)
444	}
445}
446
447impl<'a> From<&'a mut AdminTransaction> for Transaction<'a> {
448	fn from(txn: &'a mut AdminTransaction) -> Self {
449		Self::Admin(txn)
450	}
451}
452
453impl<'a> From<&'a mut QueryTransaction> for Transaction<'a> {
454	fn from(txn: &'a mut QueryTransaction) -> Self {
455		Self::Query(txn)
456	}
457}
458
459impl<'a> From<&'a mut ReplicaTransaction> for Transaction<'a> {
460	fn from(txn: &'a mut ReplicaTransaction) -> Self {
461		Self::Replica(txn)
462	}
463}
464
465impl<'a> Transaction<'a> {
466	/// Get the identity associated with this transaction.
467	pub fn identity(&self) -> IdentityId {
468		match self {
469			Self::Command(txn) => txn.identity,
470			Self::Admin(txn) => txn.identity,
471			Self::Query(txn) => txn.identity,
472			Self::Test(t) => t.inner.identity,
473			Self::Replica(_) => IdentityId::system(),
474		}
475	}
476
477	/// Set the identity associated with this transaction.
478	pub fn set_identity(&mut self, identity: IdentityId) {
479		match self {
480			Self::Command(txn) => txn.identity = identity,
481			Self::Admin(txn) => txn.identity = identity,
482			Self::Query(txn) => txn.identity = identity,
483			Self::Test(t) => t.inner.identity = identity,
484			Self::Replica(_) => {}
485		}
486	}
487
488	/// Clone the RQL executor, if one is set.
489	fn executor_clone(&self) -> Option<Arc<dyn RqlExecutor>> {
490		match self {
491			Self::Command(txn) => txn.executor.clone(),
492			Self::Admin(txn) => txn.executor.clone(),
493			Self::Query(txn) => txn.executor.clone(),
494			Self::Test(t) => t.inner.executor.clone(),
495			Self::Replica(_) => None,
496		}
497	}
498
499	/// Execute RQL within this transaction using the attached executor.
500	///
501	/// Panics if no `RqlExecutor` has been set on the underlying transaction.
502	pub fn rql(&mut self, rql: &str, params: Params) -> ExecutionResult {
503		let executor = self.executor_clone().expect("RqlExecutor not set");
504		let mut tx = self.reborrow();
505		let result = executor.rql(&mut tx, rql, params);
506		if let Some(ref e) = result.error {
507			self.poison(*e.0.clone());
508		}
509		result
510	}
511
512	/// Mark this transaction as poisoned, storing the original error diagnostic.
513	/// No-op for Query and Replica transactions.
514	fn poison(&mut self, cause: Diagnostic) {
515		match self {
516			Transaction::Command(txn) => txn.poison(cause),
517			Transaction::Admin(txn) => txn.poison(cause),
518			Transaction::Query(_) => {}
519			Transaction::Test(t) => t.inner.poison(cause),
520			Transaction::Replica(_) => {}
521		}
522	}
523
524	/// Re-borrow this transaction with a shorter lifetime, enabling
525	/// multiple sequential uses of the same transaction binding.
526	pub fn reborrow(&mut self) -> Transaction<'_> {
527		match self {
528			Transaction::Command(cmd) => Transaction::Command(cmd),
529			Transaction::Admin(admin) => Transaction::Admin(admin),
530			Transaction::Query(qry) => Transaction::Query(qry),
531			Transaction::Test(t) => Transaction::Test(Box::new(TestTransaction {
532				inner: t.inner,
533				baseline: t.baseline,
534				events: t.events,
535				invocations: t.invocations,
536				event_seq: t.event_seq,
537				handler_seq: t.handler_seq,
538				savepoint: None,
539				session_type: t.session_type,
540				session_default_deny: t.session_default_deny,
541			})),
542			Transaction::Replica(rep) => Transaction::Replica(rep),
543		}
544	}
545
546	/// Extract the underlying CommandTransaction, panics if this is
547	/// not a Command transaction
548	pub fn command(self) -> &'a mut CommandTransaction {
549		match self {
550			Self::Command(txn) => txn,
551			_ => panic!("Expected Command transaction"),
552		}
553	}
554
555	/// Extract the underlying AdminTransaction, panics if this is
556	/// not an Admin transaction
557	pub fn admin(self) -> &'a mut AdminTransaction {
558		match self {
559			Self::Admin(txn) => txn,
560			Self::Test(t) => t.inner,
561			_ => panic!("Expected Admin transaction"),
562		}
563	}
564
565	/// Extract the underlying QueryTransaction, panics if this is
566	/// not a Query transaction
567	pub fn query(self) -> &'a mut QueryTransaction {
568		match self {
569			Self::Query(txn) => txn,
570			_ => panic!("Expected Query transaction"),
571		}
572	}
573
574	/// Extract the underlying ReplicaTransaction, panics if this is
575	/// not a Replica transaction
576	pub fn replica(self) -> &'a mut ReplicaTransaction {
577		match self {
578			Self::Replica(txn) => txn,
579			_ => panic!("Expected Replica transaction"),
580		}
581	}
582
583	/// Get a mutable reference to the underlying
584	/// CommandTransaction, panics if this is not a Command transaction
585	pub fn command_mut(&mut self) -> &mut CommandTransaction {
586		match self {
587			Self::Command(txn) => txn,
588			_ => panic!("Expected Command transaction"),
589		}
590	}
591
592	/// Get a mutable reference to the underlying
593	/// AdminTransaction, panics if this is not an Admin transaction
594	pub fn admin_mut(&mut self) -> &mut AdminTransaction {
595		match self {
596			Self::Admin(txn) => txn,
597			Self::Test(t) => t.inner,
598			_ => panic!("Expected Admin transaction"),
599		}
600	}
601
602	/// Get a mutable reference to the underlying QueryTransaction,
603	/// panics if this is not a Query transaction
604	pub fn query_mut(&mut self) -> &mut QueryTransaction {
605		match self {
606			Self::Query(txn) => txn,
607			_ => panic!("Expected Query transaction"),
608		}
609	}
610
611	/// Get a mutable reference to the underlying ReplicaTransaction,
612	/// panics if this is not a Replica transaction
613	pub fn replica_mut(&mut self) -> &mut ReplicaTransaction {
614		match self {
615			Self::Replica(txn) => txn,
616			_ => panic!("Expected Replica transaction"),
617		}
618	}
619
620	/// Begin a single-version query transaction for specific keys
621	pub fn begin_single_query<'b, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
622	where
623		I: IntoIterator<Item = &'b EncodedKey>,
624	{
625		match self {
626			Transaction::Command(txn) => txn.begin_single_query(keys),
627			Transaction::Admin(txn) => txn.begin_single_query(keys),
628			Transaction::Query(txn) => txn.begin_single_query(keys),
629			Transaction::Test(t) => t.inner.begin_single_query(keys),
630			Transaction::Replica(_) => panic!("Single queries not supported on Replica transaction"),
631		}
632	}
633
634	/// Begin a single-version write transaction for specific keys.
635	/// Panics on Query and Replica transactions.
636	pub fn begin_single_command<'b, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
637	where
638		I: IntoIterator<Item = &'b EncodedKey>,
639	{
640		match self {
641			Transaction::Command(txn) => txn.begin_single_command(keys),
642			Transaction::Admin(txn) => txn.begin_single_command(keys),
643			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
644			Transaction::Test(t) => t.inner.begin_single_command(keys),
645			Transaction::Replica(_) => panic!("Single commands not supported on Replica transaction"),
646		}
647	}
648
649	/// Resolve to the underlying `WriteOps` impl. Panics on `Query`,
650	/// matching the prior per-method panic site.
651	fn write_ops(&mut self) -> &mut dyn Write {
652		match self {
653			Transaction::Command(txn) => &mut **txn,
654			Transaction::Admin(txn) => &mut **txn,
655			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
656			Transaction::Test(t) => &mut *t.inner,
657			Transaction::Replica(txn) => &mut **txn,
658		}
659	}
660
661	/// Set a key-value pair. Panics on Query transactions.
662	pub fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
663		Write::set(self.write_ops(), key, row)
664	}
665
666	/// Unset (delete with tombstone) a key-value pair. Panics on Query transactions.
667	pub fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
668		Write::unset(self.write_ops(), key, row)
669	}
670
671	/// Remove a key. Panics on Query transactions.
672	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
673		Write::remove(self.write_ops(), key)
674	}
675
676	pub fn mark_preexisting(&mut self, key: &EncodedKey) -> Result<()> {
677		Write::mark_preexisting(self.write_ops(), key)
678	}
679
680	/// Track a row change for post-commit event emission.
681	/// No-op on Replica transactions. Panics on Query transactions.
682	pub fn track_row_change(&mut self, change: RowChange) {
683		Write::track_row_change(self.write_ops(), change)
684	}
685
686	/// Track a flow change for transactional view pre-commit processing.
687	/// No-op on Replica transactions. Panics on Query transactions.
688	pub fn track_flow_change(&mut self, change: Change) {
689		Write::track_flow_change(self.write_ops(), change)
690	}
691
692	/// Record a test event dispatch. No-op for non-Test transactions.
693	pub fn record_test_event(
694		&mut self,
695		namespace: String,
696		event: String,
697		variant: String,
698		depth: u8,
699		columns: Columns,
700	) {
701		if let Transaction::Test(t) = self {
702			*t.event_seq += 1;
703			t.events.push(CapturedEvent {
704				sequence: *t.event_seq,
705				namespace,
706				event,
707				variant,
708				depth,
709				columns,
710			});
711		}
712	}
713
714	/// Record a test handler invocation. No-op for non-Test transactions.
715	///
716	/// The `sequence` field of `invocation` will be overwritten with the next handler sequence number.
717	pub fn record_test_handler(&mut self, mut invocation: CapturedInvocation) {
718		if let Transaction::Test(t) = self {
719			*t.handler_seq += 1;
720			invocation.sequence = *t.handler_seq;
721			t.invocations.push(invocation);
722		}
723	}
724}
725
726macro_rules! delegate_interceptor {
727	($method:ident, $ret:ty) => {
728		fn $method(&mut self) -> $ret {
729			match self {
730				Transaction::Command(txn) => txn.$method(),
731				Transaction::Admin(txn) => txn.$method(),
732				Transaction::Query(_) => panic!("Interceptors not supported on Query transaction"),
733				Transaction::Test(t) => t.inner.$method(),
734				Transaction::Replica(_) => panic!("Interceptors not supported on Replica transaction"),
735			}
736		}
737	};
738}
739
740impl WithInterceptors for Transaction<'_> {
741	delegate_interceptor!(
742		table_row_pre_insert_interceptors,
743		&mut Chain<dyn TableRowPreInsertInterceptor + Send + Sync>
744	);
745	delegate_interceptor!(
746		table_row_post_insert_interceptors,
747		&mut Chain<dyn TableRowPostInsertInterceptor + Send + Sync>
748	);
749	delegate_interceptor!(
750		table_row_pre_update_interceptors,
751		&mut Chain<dyn TableRowPreUpdateInterceptor + Send + Sync>
752	);
753	delegate_interceptor!(
754		table_row_post_update_interceptors,
755		&mut Chain<dyn TableRowPostUpdateInterceptor + Send + Sync>
756	);
757	delegate_interceptor!(
758		table_row_pre_delete_interceptors,
759		&mut Chain<dyn TableRowPreDeleteInterceptor + Send + Sync>
760	);
761	delegate_interceptor!(
762		table_row_post_delete_interceptors,
763		&mut Chain<dyn TableRowPostDeleteInterceptor + Send + Sync>
764	);
765	delegate_interceptor!(
766		ringbuffer_row_pre_insert_interceptors,
767		&mut Chain<dyn RingBufferRowPreInsertInterceptor + Send + Sync>
768	);
769	delegate_interceptor!(
770		ringbuffer_row_post_insert_interceptors,
771		&mut Chain<dyn RingBufferRowPostInsertInterceptor + Send + Sync>
772	);
773	delegate_interceptor!(
774		ringbuffer_row_pre_update_interceptors,
775		&mut Chain<dyn RingBufferRowPreUpdateInterceptor + Send + Sync>
776	);
777	delegate_interceptor!(
778		ringbuffer_row_post_update_interceptors,
779		&mut Chain<dyn RingBufferRowPostUpdateInterceptor + Send + Sync>
780	);
781	delegate_interceptor!(
782		ringbuffer_row_pre_delete_interceptors,
783		&mut Chain<dyn RingBufferRowPreDeleteInterceptor + Send + Sync>
784	);
785	delegate_interceptor!(
786		ringbuffer_row_post_delete_interceptors,
787		&mut Chain<dyn RingBufferRowPostDeleteInterceptor + Send + Sync>
788	);
789	delegate_interceptor!(pre_commit_interceptors, &mut Chain<dyn PreCommitInterceptor + Send + Sync>);
790	delegate_interceptor!(post_commit_interceptors, &mut Chain<dyn PostCommitInterceptor + Send + Sync>);
791	delegate_interceptor!(
792		namespace_post_create_interceptors,
793		&mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync>
794	);
795	delegate_interceptor!(
796		namespace_pre_update_interceptors,
797		&mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync>
798	);
799	delegate_interceptor!(
800		namespace_post_update_interceptors,
801		&mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync>
802	);
803	delegate_interceptor!(
804		namespace_pre_delete_interceptors,
805		&mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync>
806	);
807	delegate_interceptor!(table_post_create_interceptors, &mut Chain<dyn TablePostCreateInterceptor + Send + Sync>);
808	delegate_interceptor!(table_pre_update_interceptors, &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync>);
809	delegate_interceptor!(table_post_update_interceptors, &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync>);
810	delegate_interceptor!(table_pre_delete_interceptors, &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync>);
811	delegate_interceptor!(
812		view_row_pre_insert_interceptors,
813		&mut Chain<dyn ViewRowPreInsertInterceptor + Send + Sync>
814	);
815	delegate_interceptor!(
816		view_row_post_insert_interceptors,
817		&mut Chain<dyn ViewRowPostInsertInterceptor + Send + Sync>
818	);
819	delegate_interceptor!(
820		view_row_pre_update_interceptors,
821		&mut Chain<dyn ViewRowPreUpdateInterceptor + Send + Sync>
822	);
823	delegate_interceptor!(
824		view_row_post_update_interceptors,
825		&mut Chain<dyn ViewRowPostUpdateInterceptor + Send + Sync>
826	);
827	delegate_interceptor!(
828		view_row_pre_delete_interceptors,
829		&mut Chain<dyn ViewRowPreDeleteInterceptor + Send + Sync>
830	);
831	delegate_interceptor!(
832		view_row_post_delete_interceptors,
833		&mut Chain<dyn ViewRowPostDeleteInterceptor + Send + Sync>
834	);
835	delegate_interceptor!(view_post_create_interceptors, &mut Chain<dyn ViewPostCreateInterceptor + Send + Sync>);
836	delegate_interceptor!(view_pre_update_interceptors, &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync>);
837	delegate_interceptor!(view_post_update_interceptors, &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync>);
838	delegate_interceptor!(view_pre_delete_interceptors, &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync>);
839	delegate_interceptor!(
840		ringbuffer_post_create_interceptors,
841		&mut Chain<dyn RingBufferPostCreateInterceptor + Send + Sync>
842	);
843	delegate_interceptor!(
844		ringbuffer_pre_update_interceptors,
845		&mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync>
846	);
847	delegate_interceptor!(
848		ringbuffer_post_update_interceptors,
849		&mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync>
850	);
851	delegate_interceptor!(
852		ringbuffer_pre_delete_interceptors,
853		&mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync>
854	);
855	delegate_interceptor!(
856		dictionary_row_pre_insert_interceptors,
857		&mut Chain<dyn DictionaryRowPreInsertInterceptor + Send + Sync>
858	);
859	delegate_interceptor!(
860		dictionary_row_post_insert_interceptors,
861		&mut Chain<dyn DictionaryRowPostInsertInterceptor + Send + Sync>
862	);
863	delegate_interceptor!(
864		dictionary_row_pre_update_interceptors,
865		&mut Chain<dyn DictionaryRowPreUpdateInterceptor + Send + Sync>
866	);
867	delegate_interceptor!(
868		dictionary_row_post_update_interceptors,
869		&mut Chain<dyn DictionaryRowPostUpdateInterceptor + Send + Sync>
870	);
871	delegate_interceptor!(
872		dictionary_row_pre_delete_interceptors,
873		&mut Chain<dyn DictionaryRowPreDeleteInterceptor + Send + Sync>
874	);
875	delegate_interceptor!(
876		dictionary_row_post_delete_interceptors,
877		&mut Chain<dyn DictionaryRowPostDeleteInterceptor + Send + Sync>
878	);
879	delegate_interceptor!(
880		dictionary_post_create_interceptors,
881		&mut Chain<dyn DictionaryPostCreateInterceptor + Send + Sync>
882	);
883	delegate_interceptor!(
884		dictionary_pre_update_interceptors,
885		&mut Chain<dyn DictionaryPreUpdateInterceptor + Send + Sync>
886	);
887	delegate_interceptor!(
888		dictionary_post_update_interceptors,
889		&mut Chain<dyn DictionaryPostUpdateInterceptor + Send + Sync>
890	);
891	delegate_interceptor!(
892		dictionary_pre_delete_interceptors,
893		&mut Chain<dyn DictionaryPreDeleteInterceptor + Send + Sync>
894	);
895	delegate_interceptor!(
896		series_row_pre_insert_interceptors,
897		&mut Chain<dyn SeriesRowPreInsertInterceptor + Send + Sync>
898	);
899	delegate_interceptor!(
900		series_row_post_insert_interceptors,
901		&mut Chain<dyn SeriesRowPostInsertInterceptor + Send + Sync>
902	);
903	delegate_interceptor!(
904		series_row_pre_update_interceptors,
905		&mut Chain<dyn SeriesRowPreUpdateInterceptor + Send + Sync>
906	);
907	delegate_interceptor!(
908		series_row_post_update_interceptors,
909		&mut Chain<dyn SeriesRowPostUpdateInterceptor + Send + Sync>
910	);
911	delegate_interceptor!(
912		series_row_pre_delete_interceptors,
913		&mut Chain<dyn SeriesRowPreDeleteInterceptor + Send + Sync>
914	);
915	delegate_interceptor!(
916		series_row_post_delete_interceptors,
917		&mut Chain<dyn SeriesRowPostDeleteInterceptor + Send + Sync>
918	);
919	delegate_interceptor!(
920		series_post_create_interceptors,
921		&mut Chain<dyn SeriesPostCreateInterceptor + Send + Sync>
922	);
923	delegate_interceptor!(series_pre_update_interceptors, &mut Chain<dyn SeriesPreUpdateInterceptor + Send + Sync>);
924	delegate_interceptor!(
925		series_post_update_interceptors,
926		&mut Chain<dyn SeriesPostUpdateInterceptor + Send + Sync>
927	);
928	delegate_interceptor!(series_pre_delete_interceptors, &mut Chain<dyn SeriesPreDeleteInterceptor + Send + Sync>);
929	delegate_interceptor!(
930		identity_post_create_interceptors,
931		&mut Chain<dyn IdentityPostCreateInterceptor + Send + Sync>
932	);
933	delegate_interceptor!(
934		identity_pre_update_interceptors,
935		&mut Chain<dyn IdentityPreUpdateInterceptor + Send + Sync>
936	);
937	delegate_interceptor!(
938		identity_post_update_interceptors,
939		&mut Chain<dyn IdentityPostUpdateInterceptor + Send + Sync>
940	);
941	delegate_interceptor!(
942		identity_pre_delete_interceptors,
943		&mut Chain<dyn IdentityPreDeleteInterceptor + Send + Sync>
944	);
945	delegate_interceptor!(role_post_create_interceptors, &mut Chain<dyn RolePostCreateInterceptor + Send + Sync>);
946	delegate_interceptor!(role_pre_update_interceptors, &mut Chain<dyn RolePreUpdateInterceptor + Send + Sync>);
947	delegate_interceptor!(role_post_update_interceptors, &mut Chain<dyn RolePostUpdateInterceptor + Send + Sync>);
948	delegate_interceptor!(role_pre_delete_interceptors, &mut Chain<dyn RolePreDeleteInterceptor + Send + Sync>);
949	delegate_interceptor!(
950		granted_role_post_create_interceptors,
951		&mut Chain<dyn GrantedRolePostCreateInterceptor + Send + Sync>
952	);
953	delegate_interceptor!(
954		granted_role_pre_delete_interceptors,
955		&mut Chain<dyn GrantedRolePreDeleteInterceptor + Send + Sync>
956	);
957	delegate_interceptor!(
958		authentication_post_create_interceptors,
959		&mut Chain<dyn AuthenticationPostCreateInterceptor + Send + Sync>
960	);
961	delegate_interceptor!(
962		authentication_pre_delete_interceptors,
963		&mut Chain<dyn AuthenticationPreDeleteInterceptor + Send + Sync>
964	);
965}