Skip to main content

reifydb_transaction/transaction/
command.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem::take, sync::Arc};
5
6use reifydb_core::{
7	common::CommitVersion,
8	encoded::{
9		key::{EncodedKey, EncodedKeyRange},
10		row::EncodedRow,
11	},
12	event::EventBus,
13	execution::ExecutionResult,
14	interface::{
15		WithEventBus,
16		change::{Change, ChangeOrigin},
17		store::{MultiVersionBatch, MultiVersionRow},
18	},
19};
20use reifydb_runtime::context::clock::Clock;
21use reifydb_type::{
22	Result,
23	error::Diagnostic,
24	params::Params,
25	value::{datetime::DateTime, identity::IdentityId},
26};
27use tracing::instrument;
28
29use crate::{
30	TransactionId,
31	change::{RowChange, TransactionalCatalogChanges},
32	change_accumulator::ChangeAccumulator,
33	error::TransactionError,
34	interceptor::{
35		WithInterceptors,
36		authentication::{AuthenticationPostCreateInterceptor, AuthenticationPreDeleteInterceptor},
37		chain::InterceptorChain as Chain,
38		dictionary::{
39			DictionaryPostCreateInterceptor, DictionaryPostUpdateInterceptor,
40			DictionaryPreDeleteInterceptor, DictionaryPreUpdateInterceptor,
41		},
42		dictionary_row::{
43			DictionaryRowPostDeleteInterceptor, DictionaryRowPostInsertInterceptor,
44			DictionaryRowPostUpdateInterceptor, DictionaryRowPreDeleteInterceptor,
45			DictionaryRowPreInsertInterceptor, DictionaryRowPreUpdateInterceptor,
46		},
47		granted_role::{GrantedRolePostCreateInterceptor, GrantedRolePreDeleteInterceptor},
48		identity::{
49			IdentityPostCreateInterceptor, IdentityPostUpdateInterceptor, IdentityPreDeleteInterceptor,
50			IdentityPreUpdateInterceptor,
51		},
52		interceptors::Interceptors,
53		namespace::{
54			NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
55			NamespacePreUpdateInterceptor,
56		},
57		ringbuffer::{
58			RingBufferPostCreateInterceptor, RingBufferPostUpdateInterceptor,
59			RingBufferPreDeleteInterceptor, RingBufferPreUpdateInterceptor,
60		},
61		ringbuffer_row::{
62			RingBufferRowPostDeleteInterceptor, RingBufferRowPostInsertInterceptor,
63			RingBufferRowPostUpdateInterceptor, RingBufferRowPreDeleteInterceptor,
64			RingBufferRowPreInsertInterceptor, RingBufferRowPreUpdateInterceptor,
65		},
66		role::{
67			RolePostCreateInterceptor, RolePostUpdateInterceptor, RolePreDeleteInterceptor,
68			RolePreUpdateInterceptor,
69		},
70		series::{
71			SeriesPostCreateInterceptor, SeriesPostUpdateInterceptor, SeriesPreDeleteInterceptor,
72			SeriesPreUpdateInterceptor,
73		},
74		series_row::{
75			SeriesRowPostDeleteInterceptor, SeriesRowPostInsertInterceptor, SeriesRowPostUpdateInterceptor,
76			SeriesRowPreDeleteInterceptor, SeriesRowPreInsertInterceptor, SeriesRowPreUpdateInterceptor,
77		},
78		table::{
79			TablePostCreateInterceptor, TablePostUpdateInterceptor, TablePreDeleteInterceptor,
80			TablePreUpdateInterceptor,
81		},
82		table_row::{
83			TableRowPostDeleteInterceptor, TableRowPostInsertInterceptor, TableRowPostUpdateInterceptor,
84			TableRowPreDeleteInterceptor, TableRowPreInsertInterceptor, TableRowPreUpdateInterceptor,
85		},
86		transaction::{PostCommitContext, PostCommitInterceptor, PreCommitContext, PreCommitInterceptor},
87		view::{
88			ViewPostCreateInterceptor, ViewPostUpdateInterceptor, ViewPreDeleteInterceptor,
89			ViewPreUpdateInterceptor,
90		},
91		view_row::{
92			ViewRowPostDeleteInterceptor, ViewRowPostInsertInterceptor, ViewRowPostUpdateInterceptor,
93			ViewRowPreDeleteInterceptor, ViewRowPreInsertInterceptor, ViewRowPreUpdateInterceptor,
94		},
95	},
96	multi::{
97		pending::PendingWrites,
98		transaction::{MultiTransaction, write::MultiWriteTransaction},
99	},
100	single::{SingleTransaction, read::SingleReadTransaction, write::SingleWriteTransaction},
101	transaction::{
102		RqlExecutor, Transaction, apply_pre_commit_writes, collect_transaction_writes, query::QueryTransaction,
103		write::Write,
104	},
105};
106
107/// An active command transaction that holds a multi command transaction
108/// and provides query/command access to single storage.
109///
110/// The transaction will auto-rollback on drop if not explicitly committed.
111pub struct CommandTransaction {
112	pub multi: MultiTransaction,
113	pub single: SingleTransaction,
114	state: TransactionState,
115
116	pub cmd: Option<MultiWriteTransaction>,
117	pub event_bus: EventBus,
118
119	// Track row changes for post-commit events
120	pub(crate) row_changes: Vec<RowChange>,
121	pub(crate) interceptors: Interceptors,
122
123	// Accumulate flow changes for transactional view pre-commit processing
124	pub(crate) accumulator: ChangeAccumulator,
125
126	/// The identity executing this transaction.
127	pub identity: IdentityId,
128
129	/// Optional RQL executor for running RQL within this transaction.
130	pub(crate) executor: Option<Arc<dyn RqlExecutor>>,
131
132	/// Clock for timestamping flow changes at commit.
133	pub(crate) clock: Clock,
134
135	/// When the transaction has been poisoned, stores the original error diagnostic.
136	poison_cause: Option<Diagnostic>,
137}
138
139#[derive(Clone, Copy, PartialEq)]
140enum TransactionState {
141	Active,
142	Committed,
143	RolledBack,
144	Poisoned,
145}
146
147impl CommandTransaction {
148	/// Creates a new active command transaction with a pre-commit callback
149	#[instrument(name = "transaction::command::new", level = "debug", skip_all)]
150	pub fn new(
151		multi: MultiTransaction,
152		single: SingleTransaction,
153		event_bus: EventBus,
154		interceptors: Interceptors,
155		identity: IdentityId,
156		clock: Clock,
157	) -> Result<Self> {
158		let cmd = multi.begin_command()?;
159		Ok(Self {
160			cmd: Some(cmd),
161			multi,
162			single,
163			state: TransactionState::Active,
164			event_bus,
165			interceptors,
166			row_changes: Vec::new(),
167			accumulator: ChangeAccumulator::new(),
168			identity,
169			executor: None,
170			clock,
171			poison_cause: None,
172		})
173	}
174
175	/// Set the RQL executor for this transaction.
176	pub fn set_executor(&mut self, executor: Arc<dyn RqlExecutor>) {
177		self.executor = Some(executor);
178	}
179
180	/// Execute RQL within this transaction using the attached executor.
181	///
182	/// Panics if no `RqlExecutor` has been set on this transaction.
183	pub fn rql(&mut self, rql: &str, params: Params) -> ExecutionResult {
184		if let Err(e) = self.check_active() {
185			return ExecutionResult {
186				frames: vec![],
187				error: Some(e),
188				metrics: Default::default(),
189			};
190		}
191		let executor = self.executor.clone().expect("RqlExecutor not set");
192		let result = executor.rql(&mut Transaction::Command(self), rql, params);
193		if let Some(ref e) = result.error {
194			self.poison(*e.0.clone());
195		}
196		result
197	}
198
199	#[instrument(name = "transaction::command::event_bus", level = "trace", skip(self))]
200	pub fn event_bus(&self) -> &EventBus {
201		&self.event_bus
202	}
203
204	/// Check if transaction is still active and return appropriate error if
205	/// not
206	fn check_active(&self) -> Result<()> {
207		match self.state {
208			TransactionState::Active => Ok(()),
209			TransactionState::Committed => Err(TransactionError::AlreadyCommitted.into()),
210			TransactionState::RolledBack => Err(TransactionError::AlreadyRolledBack.into()),
211			TransactionState::Poisoned => Err(TransactionError::Poisoned {
212				cause: Box::new(self.poison_cause.clone().unwrap()),
213			}
214			.into()),
215		}
216	}
217
218	/// Mark this transaction as poisoned, storing the original error diagnostic.
219	pub(crate) fn poison(&mut self, cause: Diagnostic) {
220		self.state = TransactionState::Poisoned;
221		self.poison_cause = Some(cause);
222	}
223
224	/// Commit the transaction.
225	/// Since single transactions are short-lived and auto-commit,
226	/// this only commits the multi transaction.
227	#[instrument(name = "transaction::command::commit", level = "debug", skip(self))]
228	pub fn commit(&mut self) -> Result<CommitVersion> {
229		self.check_active()?;
230		let mut ctx = self.build_pre_commit_context();
231		self.interceptors.pre_commit.execute(&mut ctx)?;
232		self.finalize_commit(ctx, /* unchecked = */ false)
233	}
234
235	#[inline]
236	fn build_pre_commit_context(&mut self) -> PreCommitContext {
237		let transaction_writes = collect_transaction_writes(self.pending_writes());
238		PreCommitContext {
239			flow_changes: self
240				.accumulator
241				.take_changes(CommitVersion(0), DateTime::from_nanos(self.clock.now_nanos())),
242			pending_writes: Vec::new(),
243			pending_shapes: Vec::new(),
244			transaction_writes,
245			view_entries: Vec::new(),
246		}
247	}
248
249	fn finalize_commit(&mut self, ctx: PreCommitContext, unchecked: bool) -> Result<CommitVersion> {
250		let Some(mut multi) = self.cmd.take() else {
251			unreachable!("Transaction state inconsistency")
252		};
253		apply_pre_commit_writes(&mut multi, &ctx.pending_writes)?;
254		let id = multi.id();
255		self.state = TransactionState::Committed;
256
257		let changes = TransactionalCatalogChanges::default();
258		let row_changes = take(&mut self.row_changes);
259		let version = if unchecked {
260			multi.commit_unchecked()?
261		} else {
262			multi.commit()?
263		};
264		self.interceptors.post_commit.execute(PostCommitContext::new(id, version, changes, row_changes))?;
265		Ok(version)
266	}
267
268	/// Run `body` with conflict tracking disabled, then commit via the
269	/// bypass path. This is the only public entry point that reaches
270	/// `commit_unchecked`; both halves it composes are crate-private.
271	///
272	/// On body failure the transaction is rolled back before the error is
273	/// returned. Without this, the conflict manager remains in its disabled
274	/// state and a subsequent `commit()` on the same transaction would
275	/// register an empty oracle window, silently bypassing SSI conflict
276	/// detection. `rollback` resets `ConflictManager.disabled` so a reused
277	/// transaction resumes normal tracking.
278	///
279	/// See the "Unchecked commits" section in `multi::transaction` for the safety contract.
280	pub fn execute_bulk_unchecked<F, R>(&mut self, body: F) -> Result<R>
281	where
282		F: FnOnce(&mut CommandTransaction) -> Result<R>,
283	{
284		self.disable_conflict_tracking()?;
285		let r = match body(self) {
286			Ok(r) => r,
287			Err(e) => {
288				let _ = self.rollback();
289				return Err(e);
290			}
291		};
292		self.commit_unchecked()?;
293		Ok(r)
294	}
295
296	/// See the "Unchecked commits" section in `multi::transaction` for the safety contract.
297	#[instrument(name = "transaction::command::commit_unchecked", level = "debug", skip(self))]
298	pub(crate) fn commit_unchecked(&mut self) -> Result<CommitVersion> {
299		self.check_active()?;
300		let mut ctx = self.build_pre_commit_context();
301		self.interceptors.pre_commit.execute(&mut ctx)?;
302		self.finalize_commit(ctx, /* unchecked = */ true)
303	}
304
305	/// Rollback the transaction.
306	#[instrument(name = "transaction::command::rollback", level = "debug", skip(self))]
307	pub fn rollback(&mut self) -> Result<()> {
308		self.check_active()?;
309		if let Some(mut multi) = self.cmd.take() {
310			self.state = TransactionState::RolledBack;
311			multi.rollback()
312		} else {
313			// This should never happen due to check_active
314			unreachable!("Transaction state inconsistency")
315		}
316	}
317
318	/// Get access to the pending writes in this transaction
319	///
320	/// This allows checking for key conflicts when committing FlowTransactions
321	/// to ensure they operate on non-overlapping keyspaces.
322	#[instrument(name = "transaction::command::pending_writes", level = "trace", skip(self))]
323	pub fn pending_writes(&self) -> &PendingWrites {
324		self.cmd.as_ref().unwrap().pending_writes()
325	}
326
327	/// Execute a function with query access to the single transaction.
328	#[instrument(name = "transaction::command::with_single_query", level = "trace", skip(self, keys, f))]
329	pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
330	where
331		I: IntoIterator<Item = &'a EncodedKey> + Send,
332		F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
333		R: Send,
334	{
335		self.check_active()?;
336		self.single.with_query(keys, f)
337	}
338
339	/// Execute a function with query access to the single transaction.
340	#[instrument(name = "transaction::command::with_single_command", level = "trace", skip(self, keys, f))]
341	pub fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
342	where
343		I: IntoIterator<Item = &'a EncodedKey> + Send,
344		F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R> + Send,
345		R: Send,
346	{
347		self.check_active()?;
348		self.single.with_command(keys, f)
349	}
350
351	/// Execute a function with a query transaction view.
352	/// This creates a new query transaction using the stored multi-version storage.
353	/// The query transaction will operate independently but share the same single/CDC storage.
354	#[instrument(name = "transaction::command::with_multi_query", level = "trace", skip(self, f))]
355	pub fn with_multi_query<F, R>(&self, f: F) -> Result<R>
356	where
357		F: FnOnce(&mut QueryTransaction) -> Result<R>,
358	{
359		self.check_active()?;
360
361		let mut query_txn =
362			QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
363
364		f(&mut query_txn)
365	}
366
367	#[instrument(name = "transaction::command::with_multi_query_as_of_exclusive", level = "trace", skip(self, f))]
368	pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
369	where
370		F: FnOnce(&mut QueryTransaction) -> Result<R>,
371	{
372		self.check_active()?;
373
374		let mut query_txn =
375			QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
376
377		query_txn.read_as_of_version_exclusive(version)?;
378
379		f(&mut query_txn)
380	}
381
382	#[instrument(name = "transaction::command::with_multi_query_as_of_inclusive", level = "trace", skip(self, f))]
383	pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
384	where
385		F: FnOnce(&mut QueryTransaction) -> Result<R>,
386	{
387		self.check_active()?;
388
389		let mut query_txn =
390			QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
391
392		query_txn.multi.read_as_of_version_inclusive(version);
393
394		f(&mut query_txn)
395	}
396
397	/// Begin a single-version query transaction for specific keys
398	#[instrument(name = "transaction::command::begin_single_query", level = "trace", skip(self, keys))]
399	pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
400	where
401		I: IntoIterator<Item = &'a EncodedKey>,
402	{
403		self.check_active()?;
404		self.single.begin_query(keys)
405	}
406
407	/// Begin a single-version command transaction for specific keys
408	#[instrument(name = "transaction::command::begin_single_command", level = "trace", skip(self, keys))]
409	pub fn begin_single_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
410	where
411		I: IntoIterator<Item = &'a EncodedKey>,
412	{
413		self.check_active()?;
414		self.single.begin_command(keys)
415	}
416
417	/// Track a row change for post-commit event emission
418	pub fn track_row_change(&mut self, change: RowChange) {
419		self.row_changes.push(change);
420	}
421
422	/// Track a flow change for transactional view pre-commit processing.
423	pub fn track_flow_change(&mut self, change: Change) {
424		if let ChangeOrigin::Shape(id) = change.origin {
425			for diff in change.diffs {
426				self.accumulator.track(id, diff);
427			}
428		}
429	}
430
431	/// Get the transaction version
432	#[inline]
433	pub fn version(&self) -> CommitVersion {
434		self.cmd.as_ref().unwrap().version()
435	}
436
437	/// Get the transaction ID
438	#[inline]
439	pub fn id(&self) -> TransactionId {
440		self.cmd.as_ref().unwrap().id()
441	}
442
443	/// Get a value by key
444	#[inline]
445	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
446		self.check_active()?;
447		Ok(self.cmd.as_mut().unwrap().get(key)?.map(|v| v.into_multi_version_row()))
448	}
449
450	/// Read the committed value at the transaction's read version, ignoring
451	/// pending intra-tx writes.
452	#[inline]
453	pub fn get_committed(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
454		self.check_active()?;
455		Ok(self.cmd.as_mut().unwrap().get_committed(key)?.map(|v| v.into_multi_version_row()))
456	}
457
458	/// Check if a key exists
459	#[inline]
460	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
461		self.check_active()?;
462		self.cmd.as_mut().unwrap().contains_key(key)
463	}
464
465	/// Get a prefix batch
466	#[inline]
467	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
468		self.check_active()?;
469		self.cmd.as_mut().unwrap().prefix(prefix)
470	}
471
472	/// Get a reverse prefix batch
473	#[inline]
474	pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
475		self.check_active()?;
476		self.cmd.as_mut().unwrap().prefix_rev(prefix)
477	}
478
479	/// Read as of version exclusive
480	#[inline]
481	pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
482		self.check_active()?;
483		self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version);
484		Ok(())
485	}
486
487	/// Set a key-value pair
488	#[inline]
489	pub fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
490		self.check_active()?;
491		self.cmd.as_mut().unwrap().set(key, row)
492	}
493
494	/// Reserve capacity in the conflict tracker for `additional` more write keys.
495	/// Use ahead of a known-size bulk write to avoid HashSet rehash churn.
496	#[inline]
497	pub fn reserve_writes(&mut self, additional: usize) -> Result<()> {
498		self.check_active()?;
499		self.cmd.as_mut().unwrap().reserve_writes(additional);
500		Ok(())
501	}
502
503	/// See the "Unchecked commits" section in `multi::transaction` for the safety contract.
504	#[inline]
505	pub(crate) fn disable_conflict_tracking(&mut self) -> Result<()> {
506		self.check_active()?;
507		self.cmd.as_mut().unwrap().disable_conflict_tracking();
508		Ok(())
509	}
510
511	/// Unset a key, preserving the deleted values.
512	///
513	/// The `row` parameter contains the deleted row for CDC and metrics.
514	#[inline]
515	pub fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
516		self.check_active()?;
517		self.cmd.as_mut().unwrap().unset(key, row)
518	}
519
520	/// Remove a key without preserving the deleted values.
521	///
522	/// Use when only the key matters (e.g., index entries, catalog metadata).
523	#[inline]
524	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
525		self.check_active()?;
526		self.cmd.as_mut().unwrap().remove(key)
527	}
528
529	#[inline]
530	pub fn mark_preexisting(&mut self, key: &EncodedKey) -> Result<()> {
531		self.check_active()?;
532		self.cmd.as_mut().unwrap().mark_preexisting(key);
533		Ok(())
534	}
535
536	/// Create a streaming iterator for forward range queries.
537	#[inline]
538	pub fn range(
539		&mut self,
540		range: EncodedKeyRange,
541		batch_size: usize,
542	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>> {
543		self.check_active()?;
544		Ok(self.cmd.as_mut().unwrap().range(range, batch_size))
545	}
546
547	/// Create a streaming iterator for reverse range queries.
548	#[inline]
549	pub fn range_rev(
550		&mut self,
551		range: EncodedKeyRange,
552		batch_size: usize,
553	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>> {
554		self.check_active()?;
555		Ok(self.cmd.as_mut().unwrap().range_rev(range, batch_size))
556	}
557}
558
559impl WithEventBus for CommandTransaction {
560	fn event_bus(&self) -> &EventBus {
561		&self.event_bus
562	}
563}
564
565impl Write for CommandTransaction {
566	#[inline]
567	fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
568		CommandTransaction::set(self, key, row)
569	}
570	#[inline]
571	fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
572		CommandTransaction::unset(self, key, row)
573	}
574	#[inline]
575	fn remove(&mut self, key: &EncodedKey) -> Result<()> {
576		CommandTransaction::remove(self, key)
577	}
578	#[inline]
579	fn mark_preexisting(&mut self, key: &EncodedKey) -> Result<()> {
580		CommandTransaction::mark_preexisting(self, key)
581	}
582	#[inline]
583	fn track_row_change(&mut self, change: RowChange) {
584		CommandTransaction::track_row_change(self, change)
585	}
586	#[inline]
587	fn track_flow_change(&mut self, change: Change) {
588		CommandTransaction::track_flow_change(self, change)
589	}
590}
591
592impl WithInterceptors for CommandTransaction {
593	fn table_row_pre_insert_interceptors(&mut self) -> &mut Chain<dyn TableRowPreInsertInterceptor + Send + Sync> {
594		&mut self.interceptors.table_row_pre_insert
595	}
596
597	fn table_row_post_insert_interceptors(
598		&mut self,
599	) -> &mut Chain<dyn TableRowPostInsertInterceptor + Send + Sync> {
600		&mut self.interceptors.table_row_post_insert
601	}
602
603	fn table_row_pre_update_interceptors(&mut self) -> &mut Chain<dyn TableRowPreUpdateInterceptor + Send + Sync> {
604		&mut self.interceptors.table_row_pre_update
605	}
606
607	fn table_row_post_update_interceptors(
608		&mut self,
609	) -> &mut Chain<dyn TableRowPostUpdateInterceptor + Send + Sync> {
610		&mut self.interceptors.table_row_post_update
611	}
612
613	fn table_row_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TableRowPreDeleteInterceptor + Send + Sync> {
614		&mut self.interceptors.table_row_pre_delete
615	}
616
617	fn table_row_post_delete_interceptors(
618		&mut self,
619	) -> &mut Chain<dyn TableRowPostDeleteInterceptor + Send + Sync> {
620		&mut self.interceptors.table_row_post_delete
621	}
622
623	fn ringbuffer_row_pre_insert_interceptors(
624		&mut self,
625	) -> &mut Chain<dyn RingBufferRowPreInsertInterceptor + Send + Sync> {
626		&mut self.interceptors.ringbuffer_row_pre_insert
627	}
628
629	fn ringbuffer_row_post_insert_interceptors(
630		&mut self,
631	) -> &mut Chain<dyn RingBufferRowPostInsertInterceptor + Send + Sync> {
632		&mut self.interceptors.ringbuffer_row_post_insert
633	}
634
635	fn ringbuffer_row_pre_update_interceptors(
636		&mut self,
637	) -> &mut Chain<dyn RingBufferRowPreUpdateInterceptor + Send + Sync> {
638		&mut self.interceptors.ringbuffer_row_pre_update
639	}
640
641	fn ringbuffer_row_post_update_interceptors(
642		&mut self,
643	) -> &mut Chain<dyn RingBufferRowPostUpdateInterceptor + Send + Sync> {
644		&mut self.interceptors.ringbuffer_row_post_update
645	}
646
647	fn ringbuffer_row_pre_delete_interceptors(
648		&mut self,
649	) -> &mut Chain<dyn RingBufferRowPreDeleteInterceptor + Send + Sync> {
650		&mut self.interceptors.ringbuffer_row_pre_delete
651	}
652
653	fn ringbuffer_row_post_delete_interceptors(
654		&mut self,
655	) -> &mut Chain<dyn RingBufferRowPostDeleteInterceptor + Send + Sync> {
656		&mut self.interceptors.ringbuffer_row_post_delete
657	}
658
659	fn pre_commit_interceptors(&mut self) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync> {
660		&mut self.interceptors.pre_commit
661	}
662
663	fn post_commit_interceptors(&mut self) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync> {
664		&mut self.interceptors.post_commit
665	}
666
667	fn namespace_post_create_interceptors(
668		&mut self,
669	) -> &mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync> {
670		&mut self.interceptors.namespace_post_create
671	}
672
673	fn namespace_pre_update_interceptors(&mut self) -> &mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync> {
674		&mut self.interceptors.namespace_pre_update
675	}
676
677	fn namespace_post_update_interceptors(
678		&mut self,
679	) -> &mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync> {
680		&mut self.interceptors.namespace_post_update
681	}
682
683	fn namespace_pre_delete_interceptors(&mut self) -> &mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync> {
684		&mut self.interceptors.namespace_pre_delete
685	}
686
687	fn table_post_create_interceptors(&mut self) -> &mut Chain<dyn TablePostCreateInterceptor + Send + Sync> {
688		&mut self.interceptors.table_post_create
689	}
690
691	fn table_pre_update_interceptors(&mut self) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync> {
692		&mut self.interceptors.table_pre_update
693	}
694
695	fn table_post_update_interceptors(&mut self) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync> {
696		&mut self.interceptors.table_post_update
697	}
698
699	fn table_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync> {
700		&mut self.interceptors.table_pre_delete
701	}
702
703	fn view_row_pre_insert_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreInsertInterceptor + Send + Sync> {
704		&mut self.interceptors.view_row_pre_insert
705	}
706
707	fn view_row_post_insert_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostInsertInterceptor + Send + Sync> {
708		&mut self.interceptors.view_row_post_insert
709	}
710
711	fn view_row_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreUpdateInterceptor + Send + Sync> {
712		&mut self.interceptors.view_row_pre_update
713	}
714
715	fn view_row_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostUpdateInterceptor + Send + Sync> {
716		&mut self.interceptors.view_row_post_update
717	}
718
719	fn view_row_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreDeleteInterceptor + Send + Sync> {
720		&mut self.interceptors.view_row_pre_delete
721	}
722
723	fn view_row_post_delete_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostDeleteInterceptor + Send + Sync> {
724		&mut self.interceptors.view_row_post_delete
725	}
726
727	fn view_post_create_interceptors(&mut self) -> &mut Chain<dyn ViewPostCreateInterceptor + Send + Sync> {
728		&mut self.interceptors.view_post_create
729	}
730
731	fn view_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync> {
732		&mut self.interceptors.view_pre_update
733	}
734
735	fn view_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync> {
736		&mut self.interceptors.view_post_update
737	}
738
739	fn view_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync> {
740		&mut self.interceptors.view_pre_delete
741	}
742
743	fn ringbuffer_post_create_interceptors(
744		&mut self,
745	) -> &mut Chain<dyn RingBufferPostCreateInterceptor + Send + Sync> {
746		&mut self.interceptors.ringbuffer_post_create
747	}
748
749	fn ringbuffer_pre_update_interceptors(
750		&mut self,
751	) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
752		&mut self.interceptors.ringbuffer_pre_update
753	}
754
755	fn ringbuffer_post_update_interceptors(
756		&mut self,
757	) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
758		&mut self.interceptors.ringbuffer_post_update
759	}
760
761	fn ringbuffer_pre_delete_interceptors(
762		&mut self,
763	) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
764		&mut self.interceptors.ringbuffer_pre_delete
765	}
766
767	fn dictionary_row_pre_insert_interceptors(
768		&mut self,
769	) -> &mut Chain<dyn DictionaryRowPreInsertInterceptor + Send + Sync> {
770		&mut self.interceptors.dictionary_row_pre_insert
771	}
772
773	fn dictionary_row_post_insert_interceptors(
774		&mut self,
775	) -> &mut Chain<dyn DictionaryRowPostInsertInterceptor + Send + Sync> {
776		&mut self.interceptors.dictionary_row_post_insert
777	}
778
779	fn dictionary_row_pre_update_interceptors(
780		&mut self,
781	) -> &mut Chain<dyn DictionaryRowPreUpdateInterceptor + Send + Sync> {
782		&mut self.interceptors.dictionary_row_pre_update
783	}
784
785	fn dictionary_row_post_update_interceptors(
786		&mut self,
787	) -> &mut Chain<dyn DictionaryRowPostUpdateInterceptor + Send + Sync> {
788		&mut self.interceptors.dictionary_row_post_update
789	}
790
791	fn dictionary_row_pre_delete_interceptors(
792		&mut self,
793	) -> &mut Chain<dyn DictionaryRowPreDeleteInterceptor + Send + Sync> {
794		&mut self.interceptors.dictionary_row_pre_delete
795	}
796
797	fn dictionary_row_post_delete_interceptors(
798		&mut self,
799	) -> &mut Chain<dyn DictionaryRowPostDeleteInterceptor + Send + Sync> {
800		&mut self.interceptors.dictionary_row_post_delete
801	}
802
803	fn dictionary_post_create_interceptors(
804		&mut self,
805	) -> &mut Chain<dyn DictionaryPostCreateInterceptor + Send + Sync> {
806		&mut self.interceptors.dictionary_post_create
807	}
808
809	fn dictionary_pre_update_interceptors(
810		&mut self,
811	) -> &mut Chain<dyn DictionaryPreUpdateInterceptor + Send + Sync> {
812		&mut self.interceptors.dictionary_pre_update
813	}
814
815	fn dictionary_post_update_interceptors(
816		&mut self,
817	) -> &mut Chain<dyn DictionaryPostUpdateInterceptor + Send + Sync> {
818		&mut self.interceptors.dictionary_post_update
819	}
820
821	fn dictionary_pre_delete_interceptors(
822		&mut self,
823	) -> &mut Chain<dyn DictionaryPreDeleteInterceptor + Send + Sync> {
824		&mut self.interceptors.dictionary_pre_delete
825	}
826
827	fn series_row_pre_insert_interceptors(
828		&mut self,
829	) -> &mut Chain<dyn SeriesRowPreInsertInterceptor + Send + Sync> {
830		&mut self.interceptors.series_row_pre_insert
831	}
832
833	fn series_row_post_insert_interceptors(
834		&mut self,
835	) -> &mut Chain<dyn SeriesRowPostInsertInterceptor + Send + Sync> {
836		&mut self.interceptors.series_row_post_insert
837	}
838
839	fn series_row_pre_update_interceptors(
840		&mut self,
841	) -> &mut Chain<dyn SeriesRowPreUpdateInterceptor + Send + Sync> {
842		&mut self.interceptors.series_row_pre_update
843	}
844
845	fn series_row_post_update_interceptors(
846		&mut self,
847	) -> &mut Chain<dyn SeriesRowPostUpdateInterceptor + Send + Sync> {
848		&mut self.interceptors.series_row_post_update
849	}
850
851	fn series_row_pre_delete_interceptors(
852		&mut self,
853	) -> &mut Chain<dyn SeriesRowPreDeleteInterceptor + Send + Sync> {
854		&mut self.interceptors.series_row_pre_delete
855	}
856
857	fn series_row_post_delete_interceptors(
858		&mut self,
859	) -> &mut Chain<dyn SeriesRowPostDeleteInterceptor + Send + Sync> {
860		&mut self.interceptors.series_row_post_delete
861	}
862
863	fn series_post_create_interceptors(&mut self) -> &mut Chain<dyn SeriesPostCreateInterceptor + Send + Sync> {
864		&mut self.interceptors.series_post_create
865	}
866
867	fn series_pre_update_interceptors(&mut self) -> &mut Chain<dyn SeriesPreUpdateInterceptor + Send + Sync> {
868		&mut self.interceptors.series_pre_update
869	}
870
871	fn series_post_update_interceptors(&mut self) -> &mut Chain<dyn SeriesPostUpdateInterceptor + Send + Sync> {
872		&mut self.interceptors.series_post_update
873	}
874
875	fn series_pre_delete_interceptors(&mut self) -> &mut Chain<dyn SeriesPreDeleteInterceptor + Send + Sync> {
876		&mut self.interceptors.series_pre_delete
877	}
878
879	fn identity_post_create_interceptors(&mut self) -> &mut Chain<dyn IdentityPostCreateInterceptor + Send + Sync> {
880		&mut self.interceptors.identity_post_create
881	}
882
883	fn identity_pre_update_interceptors(&mut self) -> &mut Chain<dyn IdentityPreUpdateInterceptor + Send + Sync> {
884		&mut self.interceptors.identity_pre_update
885	}
886
887	fn identity_post_update_interceptors(&mut self) -> &mut Chain<dyn IdentityPostUpdateInterceptor + Send + Sync> {
888		&mut self.interceptors.identity_post_update
889	}
890
891	fn identity_pre_delete_interceptors(&mut self) -> &mut Chain<dyn IdentityPreDeleteInterceptor + Send + Sync> {
892		&mut self.interceptors.identity_pre_delete
893	}
894
895	fn role_post_create_interceptors(&mut self) -> &mut Chain<dyn RolePostCreateInterceptor + Send + Sync> {
896		&mut self.interceptors.role_post_create
897	}
898
899	fn role_pre_update_interceptors(&mut self) -> &mut Chain<dyn RolePreUpdateInterceptor + Send + Sync> {
900		&mut self.interceptors.role_pre_update
901	}
902
903	fn role_post_update_interceptors(&mut self) -> &mut Chain<dyn RolePostUpdateInterceptor + Send + Sync> {
904		&mut self.interceptors.role_post_update
905	}
906
907	fn role_pre_delete_interceptors(&mut self) -> &mut Chain<dyn RolePreDeleteInterceptor + Send + Sync> {
908		&mut self.interceptors.role_pre_delete
909	}
910
911	fn granted_role_post_create_interceptors(
912		&mut self,
913	) -> &mut Chain<dyn GrantedRolePostCreateInterceptor + Send + Sync> {
914		&mut self.interceptors.granted_role_post_create
915	}
916
917	fn granted_role_pre_delete_interceptors(
918		&mut self,
919	) -> &mut Chain<dyn GrantedRolePreDeleteInterceptor + Send + Sync> {
920		&mut self.interceptors.granted_role_pre_delete
921	}
922
923	fn authentication_post_create_interceptors(
924		&mut self,
925	) -> &mut Chain<dyn AuthenticationPostCreateInterceptor + Send + Sync> {
926		&mut self.interceptors.authentication_post_create
927	}
928
929	fn authentication_pre_delete_interceptors(
930		&mut self,
931	) -> &mut Chain<dyn AuthenticationPreDeleteInterceptor + Send + Sync> {
932		&mut self.interceptors.authentication_pre_delete
933	}
934}
935
936impl Drop for CommandTransaction {
937	fn drop(&mut self) {
938		if let Some(mut multi) = self.cmd.take() {
939			// Auto-rollback if still active or poisoned (not committed or rolled back)
940			if self.state == TransactionState::Active || self.state == TransactionState::Poisoned {
941				let _ = multi.rollback();
942			}
943		}
944	}
945}