Skip to main content

reifydb_transaction/transaction/
command.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem::take, sync::Arc};
5
6use reifydb_core::{
7	common::CommitVersion,
8	encoded::{
9		key::{EncodedKey, EncodedKeyRange},
10		row::EncodedRow,
11	},
12	event::EventBus,
13	execution::ExecutionResult,
14	interface::{
15		WithEventBus,
16		change::{Change, ChangeOrigin},
17		store::{MultiVersionBatch, MultiVersionRow},
18	},
19};
20use reifydb_runtime::context::clock::Clock;
21use reifydb_type::{
22	Result,
23	error::Diagnostic,
24	params::Params,
25	value::{datetime::DateTime, identity::IdentityId},
26};
27use tracing::instrument;
28
29use crate::{
30	TransactionId,
31	change::{RowChange, TransactionalCatalogChanges},
32	change_accumulator::ChangeAccumulator,
33	error::TransactionError,
34	interceptor::{
35		WithInterceptors,
36		authentication::{AuthenticationPostCreateInterceptor, AuthenticationPreDeleteInterceptor},
37		chain::InterceptorChain as Chain,
38		dictionary::{
39			DictionaryPostCreateInterceptor, DictionaryPostUpdateInterceptor,
40			DictionaryPreDeleteInterceptor, DictionaryPreUpdateInterceptor,
41		},
42		dictionary_row::{
43			DictionaryRowPostDeleteInterceptor, DictionaryRowPostInsertInterceptor,
44			DictionaryRowPostUpdateInterceptor, DictionaryRowPreDeleteInterceptor,
45			DictionaryRowPreInsertInterceptor, DictionaryRowPreUpdateInterceptor,
46		},
47		granted_role::{GrantedRolePostCreateInterceptor, GrantedRolePreDeleteInterceptor},
48		identity::{
49			IdentityPostCreateInterceptor, IdentityPostUpdateInterceptor, IdentityPreDeleteInterceptor,
50			IdentityPreUpdateInterceptor,
51		},
52		interceptors::Interceptors,
53		namespace::{
54			NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
55			NamespacePreUpdateInterceptor,
56		},
57		ringbuffer::{
58			RingBufferPostCreateInterceptor, RingBufferPostUpdateInterceptor,
59			RingBufferPreDeleteInterceptor, RingBufferPreUpdateInterceptor,
60		},
61		ringbuffer_row::{
62			RingBufferRowPostDeleteInterceptor, RingBufferRowPostInsertInterceptor,
63			RingBufferRowPostUpdateInterceptor, RingBufferRowPreDeleteInterceptor,
64			RingBufferRowPreInsertInterceptor, RingBufferRowPreUpdateInterceptor,
65		},
66		role::{
67			RolePostCreateInterceptor, RolePostUpdateInterceptor, RolePreDeleteInterceptor,
68			RolePreUpdateInterceptor,
69		},
70		series::{
71			SeriesPostCreateInterceptor, SeriesPostUpdateInterceptor, SeriesPreDeleteInterceptor,
72			SeriesPreUpdateInterceptor,
73		},
74		series_row::{
75			SeriesRowPostDeleteInterceptor, SeriesRowPostInsertInterceptor, SeriesRowPostUpdateInterceptor,
76			SeriesRowPreDeleteInterceptor, SeriesRowPreInsertInterceptor, SeriesRowPreUpdateInterceptor,
77		},
78		table::{
79			TablePostCreateInterceptor, TablePostUpdateInterceptor, TablePreDeleteInterceptor,
80			TablePreUpdateInterceptor,
81		},
82		table_row::{
83			TableRowPostDeleteInterceptor, TableRowPostInsertInterceptor, TableRowPostUpdateInterceptor,
84			TableRowPreDeleteInterceptor, TableRowPreInsertInterceptor, TableRowPreUpdateInterceptor,
85		},
86		transaction::{PostCommitContext, PostCommitInterceptor, PreCommitContext, PreCommitInterceptor},
87		view::{
88			ViewPostCreateInterceptor, ViewPostUpdateInterceptor, ViewPreDeleteInterceptor,
89			ViewPreUpdateInterceptor,
90		},
91		view_row::{
92			ViewRowPostDeleteInterceptor, ViewRowPostInsertInterceptor, ViewRowPostUpdateInterceptor,
93			ViewRowPreDeleteInterceptor, ViewRowPreInsertInterceptor, ViewRowPreUpdateInterceptor,
94		},
95	},
96	multi::{
97		pending::PendingWrites,
98		transaction::{MultiTransaction, write::MultiWriteTransaction},
99	},
100	single::{SingleTransaction, read::SingleReadTransaction, write::SingleWriteTransaction},
101	transaction::{
102		RqlExecutor, Transaction, apply_pre_commit_writes, collect_transaction_writes, query::QueryTransaction,
103		write::Write,
104	},
105};
106
107pub struct CommandTransaction {
108	pub multi: MultiTransaction,
109	pub single: SingleTransaction,
110	state: TransactionState,
111
112	pub cmd: Option<MultiWriteTransaction>,
113	pub event_bus: EventBus,
114
115	pub(crate) row_changes: Vec<RowChange>,
116	pub(crate) interceptors: Interceptors,
117
118	pub(crate) accumulator: ChangeAccumulator,
119
120	pub identity: IdentityId,
121
122	pub(crate) executor: Option<Arc<dyn RqlExecutor>>,
123
124	pub(crate) clock: Clock,
125
126	poison_cause: Option<Diagnostic>,
127}
128
129#[derive(Clone, Copy, PartialEq)]
130enum TransactionState {
131	Active,
132	Committed,
133	RolledBack,
134	Poisoned,
135}
136
137impl CommandTransaction {
138	#[instrument(name = "transaction::command::new", level = "debug", skip_all)]
139	pub fn new(
140		multi: MultiTransaction,
141		single: SingleTransaction,
142		event_bus: EventBus,
143		interceptors: Interceptors,
144		identity: IdentityId,
145		clock: Clock,
146	) -> Result<Self> {
147		let cmd = multi.begin_command()?;
148		Ok(Self {
149			cmd: Some(cmd),
150			multi,
151			single,
152			state: TransactionState::Active,
153			event_bus,
154			interceptors,
155			row_changes: Vec::new(),
156			accumulator: ChangeAccumulator::new(),
157			identity,
158			executor: None,
159			clock,
160			poison_cause: None,
161		})
162	}
163
164	pub fn set_executor(&mut self, executor: Arc<dyn RqlExecutor>) {
165		self.executor = Some(executor);
166	}
167
168	pub fn rql(&mut self, rql: &str, params: Params) -> ExecutionResult {
169		if let Err(e) = self.check_active() {
170			return ExecutionResult {
171				frames: vec![],
172				error: Some(e),
173				metrics: Default::default(),
174			};
175		}
176		let executor = self.executor.clone().expect("RqlExecutor not set");
177		let result = executor.rql(&mut Transaction::Command(self), rql, params);
178		if let Some(ref e) = result.error {
179			self.poison(*e.0.clone());
180		}
181		result
182	}
183
184	#[instrument(name = "transaction::command::event_bus", level = "trace", skip(self))]
185	pub fn event_bus(&self) -> &EventBus {
186		&self.event_bus
187	}
188
189	fn check_active(&self) -> Result<()> {
190		match self.state {
191			TransactionState::Active => Ok(()),
192			TransactionState::Committed => Err(TransactionError::AlreadyCommitted.into()),
193			TransactionState::RolledBack => Err(TransactionError::AlreadyRolledBack.into()),
194			TransactionState::Poisoned => Err(TransactionError::Poisoned {
195				cause: Box::new(self.poison_cause.clone().unwrap()),
196			}
197			.into()),
198		}
199	}
200
201	pub(crate) fn poison(&mut self, cause: Diagnostic) {
202		self.state = TransactionState::Poisoned;
203		self.poison_cause = Some(cause);
204	}
205
206	#[instrument(name = "transaction::command::commit", level = "debug", skip(self))]
207	pub fn commit(&mut self) -> Result<CommitVersion> {
208		self.check_active()?;
209		let mut ctx = self.build_pre_commit_context()?;
210		self.interceptors.pre_commit.execute(&mut ctx)?;
211		self.finalize_commit(ctx, false)
212	}
213
214	#[inline]
215	fn build_pre_commit_context(&mut self) -> Result<PreCommitContext> {
216		let transaction_writes = collect_transaction_writes(self.pending_writes());
217		Ok(PreCommitContext {
218			flow_changes: self
219				.accumulator
220				.take_changes(CommitVersion(0), DateTime::from_nanos(self.clock.now_nanos()))?,
221			pending_writes: Vec::new(),
222			pending_shapes: Vec::new(),
223			transaction_writes,
224			view_entries: Vec::new(),
225		})
226	}
227
228	fn finalize_commit(&mut self, ctx: PreCommitContext, unchecked: bool) -> Result<CommitVersion> {
229		let Some(mut multi) = self.cmd.take() else {
230			unreachable!("Transaction state inconsistency")
231		};
232		apply_pre_commit_writes(&mut multi, &ctx.pending_writes)?;
233		let id = multi.id();
234		self.state = TransactionState::Committed;
235
236		let changes = TransactionalCatalogChanges::default();
237		let row_changes = take(&mut self.row_changes);
238		let version = if unchecked {
239			multi.commit_unchecked()?
240		} else {
241			multi.commit()?
242		};
243		self.interceptors.post_commit.execute(PostCommitContext::new(id, version, changes, row_changes))?;
244		Ok(version)
245	}
246
247	pub fn execute_bulk_unchecked<F, R>(&mut self, body: F) -> Result<R>
248	where
249		F: FnOnce(&mut CommandTransaction) -> Result<R>,
250	{
251		self.disable_conflict_tracking()?;
252		let r = match body(self) {
253			Ok(r) => r,
254			Err(e) => {
255				let _ = self.rollback();
256				return Err(e);
257			}
258		};
259		self.commit_unchecked()?;
260		Ok(r)
261	}
262
263	#[instrument(name = "transaction::command::commit_unchecked", level = "debug", skip(self))]
264	pub(crate) fn commit_unchecked(&mut self) -> Result<CommitVersion> {
265		self.check_active()?;
266		let mut ctx = self.build_pre_commit_context()?;
267		self.interceptors.pre_commit.execute(&mut ctx)?;
268		self.finalize_commit(ctx, true)
269	}
270
271	#[instrument(name = "transaction::command::rollback", level = "debug", skip(self))]
272	pub fn rollback(&mut self) -> Result<()> {
273		self.check_active()?;
274		if let Some(mut multi) = self.cmd.take() {
275			self.state = TransactionState::RolledBack;
276			multi.rollback()
277		} else {
278			unreachable!("Transaction state inconsistency")
279		}
280	}
281
282	#[instrument(name = "transaction::command::pending_writes", level = "trace", skip(self))]
283	pub fn pending_writes(&self) -> &PendingWrites {
284		self.cmd.as_ref().unwrap().pending_writes()
285	}
286
287	#[instrument(name = "transaction::command::with_single_query", level = "trace", skip(self, keys, f))]
288	pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
289	where
290		I: IntoIterator<Item = &'a EncodedKey> + Send,
291		F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
292		R: Send,
293	{
294		self.check_active()?;
295		self.single.with_query(keys, f)
296	}
297
298	#[instrument(name = "transaction::command::with_single_command", level = "trace", skip(self, keys, f))]
299	pub fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
300	where
301		I: IntoIterator<Item = &'a EncodedKey> + Send,
302		F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R> + Send,
303		R: Send,
304	{
305		self.check_active()?;
306		self.single.with_command(keys, f)
307	}
308
309	#[instrument(name = "transaction::command::with_multi_query", level = "trace", skip(self, f))]
310	pub fn with_multi_query<F, R>(&self, f: F) -> Result<R>
311	where
312		F: FnOnce(&mut QueryTransaction) -> Result<R>,
313	{
314		self.check_active()?;
315
316		let mut query_txn =
317			QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
318
319		f(&mut query_txn)
320	}
321
322	#[instrument(name = "transaction::command::with_multi_query_as_of_exclusive", level = "trace", skip(self, f))]
323	pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
324	where
325		F: FnOnce(&mut QueryTransaction) -> Result<R>,
326	{
327		self.check_active()?;
328
329		let mut query_txn =
330			QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
331
332		query_txn.read_as_of_version_exclusive(version)?;
333
334		f(&mut query_txn)
335	}
336
337	#[instrument(name = "transaction::command::with_multi_query_as_of_inclusive", level = "trace", skip(self, f))]
338	pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
339	where
340		F: FnOnce(&mut QueryTransaction) -> Result<R>,
341	{
342		self.check_active()?;
343
344		let mut query_txn =
345			QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
346
347		query_txn.multi.read_as_of_version_inclusive(version);
348
349		f(&mut query_txn)
350	}
351
352	#[instrument(name = "transaction::command::begin_single_query", level = "trace", skip(self, keys))]
353	pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
354	where
355		I: IntoIterator<Item = &'a EncodedKey>,
356	{
357		self.check_active()?;
358		self.single.begin_query(keys)
359	}
360
361	#[instrument(name = "transaction::command::begin_single_command", level = "trace", skip(self, keys))]
362	pub fn begin_single_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
363	where
364		I: IntoIterator<Item = &'a EncodedKey>,
365	{
366		self.check_active()?;
367		self.single.begin_command(keys)
368	}
369
370	pub fn track_row_change(&mut self, changes: &[RowChange]) {
371		self.row_changes.extend_from_slice(changes);
372	}
373
374	pub fn track_flow_change(&mut self, change: Change) {
375		if let ChangeOrigin::Shape(id) = change.origin {
376			for diff in change.diffs {
377				self.accumulator.track(id, diff);
378			}
379		}
380	}
381
382	#[inline]
383	pub fn version(&self) -> CommitVersion {
384		self.cmd.as_ref().unwrap().version()
385	}
386
387	#[inline]
388	pub fn id(&self) -> TransactionId {
389		self.cmd.as_ref().unwrap().id()
390	}
391
392	#[inline]
393	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
394		self.check_active()?;
395		Ok(self.cmd.as_mut().unwrap().get(key)?.map(|v| v.into_multi_version_row()))
396	}
397
398	#[inline]
399	pub fn get_committed(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
400		self.check_active()?;
401		Ok(self.cmd.as_mut().unwrap().get_committed(key)?.map(|v| v.into_multi_version_row()))
402	}
403
404	#[inline]
405	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
406		self.check_active()?;
407		self.cmd.as_mut().unwrap().contains_key(key)
408	}
409
410	#[inline]
411	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
412		self.check_active()?;
413		self.cmd.as_mut().unwrap().prefix(prefix)
414	}
415
416	#[inline]
417	pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
418		self.check_active()?;
419		self.cmd.as_mut().unwrap().prefix_rev(prefix)
420	}
421
422	#[inline]
423	pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
424		self.check_active()?;
425		self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version);
426		Ok(())
427	}
428
429	#[inline]
430	pub fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
431		self.check_active()?;
432		self.cmd.as_mut().unwrap().set(key, row)
433	}
434
435	#[inline]
436	pub fn reserve_writes(&mut self, additional: usize) -> Result<()> {
437		self.check_active()?;
438		self.cmd.as_mut().unwrap().reserve_writes(additional);
439		Ok(())
440	}
441
442	#[inline]
443	pub(crate) fn disable_conflict_tracking(&mut self) -> Result<()> {
444		self.check_active()?;
445		self.cmd.as_mut().unwrap().disable_conflict_tracking();
446		Ok(())
447	}
448
449	#[inline]
450	pub fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
451		self.check_active()?;
452		self.cmd.as_mut().unwrap().unset(key, row)
453	}
454
455	#[inline]
456	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
457		self.check_active()?;
458		self.cmd.as_mut().unwrap().remove(key)
459	}
460
461	#[inline]
462	pub fn mark_preexisting(&mut self, key: &EncodedKey) -> Result<()> {
463		self.check_active()?;
464		self.cmd.as_mut().unwrap().mark_preexisting(key);
465		Ok(())
466	}
467
468	#[inline]
469	pub fn range(
470		&mut self,
471		range: EncodedKeyRange,
472		batch_size: usize,
473	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>> {
474		self.check_active()?;
475		Ok(self.cmd.as_mut().unwrap().range(range, batch_size))
476	}
477
478	#[inline]
479	pub fn range_rev(
480		&mut self,
481		range: EncodedKeyRange,
482		batch_size: usize,
483	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>> {
484		self.check_active()?;
485		Ok(self.cmd.as_mut().unwrap().range_rev(range, batch_size))
486	}
487}
488
489impl WithEventBus for CommandTransaction {
490	fn event_bus(&self) -> &EventBus {
491		&self.event_bus
492	}
493}
494
495impl Write for CommandTransaction {
496	#[inline]
497	fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
498		CommandTransaction::set(self, key, row)
499	}
500	#[inline]
501	fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
502		CommandTransaction::unset(self, key, row)
503	}
504	#[inline]
505	fn remove(&mut self, key: &EncodedKey) -> Result<()> {
506		CommandTransaction::remove(self, key)
507	}
508	#[inline]
509	fn mark_preexisting(&mut self, key: &EncodedKey) -> Result<()> {
510		CommandTransaction::mark_preexisting(self, key)
511	}
512	#[inline]
513	fn track_row_change(&mut self, changes: &[RowChange]) {
514		CommandTransaction::track_row_change(self, changes)
515	}
516	#[inline]
517	fn track_flow_change(&mut self, change: Change) {
518		CommandTransaction::track_flow_change(self, change)
519	}
520}
521
522impl WithInterceptors for CommandTransaction {
523	fn table_row_pre_insert_interceptors(&mut self) -> &mut Chain<dyn TableRowPreInsertInterceptor + Send + Sync> {
524		&mut self.interceptors.table_row_pre_insert
525	}
526
527	fn table_row_post_insert_interceptors(
528		&mut self,
529	) -> &mut Chain<dyn TableRowPostInsertInterceptor + Send + Sync> {
530		&mut self.interceptors.table_row_post_insert
531	}
532
533	fn table_row_pre_update_interceptors(&mut self) -> &mut Chain<dyn TableRowPreUpdateInterceptor + Send + Sync> {
534		&mut self.interceptors.table_row_pre_update
535	}
536
537	fn table_row_post_update_interceptors(
538		&mut self,
539	) -> &mut Chain<dyn TableRowPostUpdateInterceptor + Send + Sync> {
540		&mut self.interceptors.table_row_post_update
541	}
542
543	fn table_row_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TableRowPreDeleteInterceptor + Send + Sync> {
544		&mut self.interceptors.table_row_pre_delete
545	}
546
547	fn table_row_post_delete_interceptors(
548		&mut self,
549	) -> &mut Chain<dyn TableRowPostDeleteInterceptor + Send + Sync> {
550		&mut self.interceptors.table_row_post_delete
551	}
552
553	fn ringbuffer_row_pre_insert_interceptors(
554		&mut self,
555	) -> &mut Chain<dyn RingBufferRowPreInsertInterceptor + Send + Sync> {
556		&mut self.interceptors.ringbuffer_row_pre_insert
557	}
558
559	fn ringbuffer_row_post_insert_interceptors(
560		&mut self,
561	) -> &mut Chain<dyn RingBufferRowPostInsertInterceptor + Send + Sync> {
562		&mut self.interceptors.ringbuffer_row_post_insert
563	}
564
565	fn ringbuffer_row_pre_update_interceptors(
566		&mut self,
567	) -> &mut Chain<dyn RingBufferRowPreUpdateInterceptor + Send + Sync> {
568		&mut self.interceptors.ringbuffer_row_pre_update
569	}
570
571	fn ringbuffer_row_post_update_interceptors(
572		&mut self,
573	) -> &mut Chain<dyn RingBufferRowPostUpdateInterceptor + Send + Sync> {
574		&mut self.interceptors.ringbuffer_row_post_update
575	}
576
577	fn ringbuffer_row_pre_delete_interceptors(
578		&mut self,
579	) -> &mut Chain<dyn RingBufferRowPreDeleteInterceptor + Send + Sync> {
580		&mut self.interceptors.ringbuffer_row_pre_delete
581	}
582
583	fn ringbuffer_row_post_delete_interceptors(
584		&mut self,
585	) -> &mut Chain<dyn RingBufferRowPostDeleteInterceptor + Send + Sync> {
586		&mut self.interceptors.ringbuffer_row_post_delete
587	}
588
589	fn pre_commit_interceptors(&mut self) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync> {
590		&mut self.interceptors.pre_commit
591	}
592
593	fn post_commit_interceptors(&mut self) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync> {
594		&mut self.interceptors.post_commit
595	}
596
597	fn namespace_post_create_interceptors(
598		&mut self,
599	) -> &mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync> {
600		&mut self.interceptors.namespace_post_create
601	}
602
603	fn namespace_pre_update_interceptors(&mut self) -> &mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync> {
604		&mut self.interceptors.namespace_pre_update
605	}
606
607	fn namespace_post_update_interceptors(
608		&mut self,
609	) -> &mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync> {
610		&mut self.interceptors.namespace_post_update
611	}
612
613	fn namespace_pre_delete_interceptors(&mut self) -> &mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync> {
614		&mut self.interceptors.namespace_pre_delete
615	}
616
617	fn table_post_create_interceptors(&mut self) -> &mut Chain<dyn TablePostCreateInterceptor + Send + Sync> {
618		&mut self.interceptors.table_post_create
619	}
620
621	fn table_pre_update_interceptors(&mut self) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync> {
622		&mut self.interceptors.table_pre_update
623	}
624
625	fn table_post_update_interceptors(&mut self) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync> {
626		&mut self.interceptors.table_post_update
627	}
628
629	fn table_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync> {
630		&mut self.interceptors.table_pre_delete
631	}
632
633	fn view_row_pre_insert_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreInsertInterceptor + Send + Sync> {
634		&mut self.interceptors.view_row_pre_insert
635	}
636
637	fn view_row_post_insert_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostInsertInterceptor + Send + Sync> {
638		&mut self.interceptors.view_row_post_insert
639	}
640
641	fn view_row_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreUpdateInterceptor + Send + Sync> {
642		&mut self.interceptors.view_row_pre_update
643	}
644
645	fn view_row_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostUpdateInterceptor + Send + Sync> {
646		&mut self.interceptors.view_row_post_update
647	}
648
649	fn view_row_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreDeleteInterceptor + Send + Sync> {
650		&mut self.interceptors.view_row_pre_delete
651	}
652
653	fn view_row_post_delete_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostDeleteInterceptor + Send + Sync> {
654		&mut self.interceptors.view_row_post_delete
655	}
656
657	fn view_post_create_interceptors(&mut self) -> &mut Chain<dyn ViewPostCreateInterceptor + Send + Sync> {
658		&mut self.interceptors.view_post_create
659	}
660
661	fn view_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync> {
662		&mut self.interceptors.view_pre_update
663	}
664
665	fn view_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync> {
666		&mut self.interceptors.view_post_update
667	}
668
669	fn view_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync> {
670		&mut self.interceptors.view_pre_delete
671	}
672
673	fn ringbuffer_post_create_interceptors(
674		&mut self,
675	) -> &mut Chain<dyn RingBufferPostCreateInterceptor + Send + Sync> {
676		&mut self.interceptors.ringbuffer_post_create
677	}
678
679	fn ringbuffer_pre_update_interceptors(
680		&mut self,
681	) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
682		&mut self.interceptors.ringbuffer_pre_update
683	}
684
685	fn ringbuffer_post_update_interceptors(
686		&mut self,
687	) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
688		&mut self.interceptors.ringbuffer_post_update
689	}
690
691	fn ringbuffer_pre_delete_interceptors(
692		&mut self,
693	) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
694		&mut self.interceptors.ringbuffer_pre_delete
695	}
696
697	fn dictionary_row_pre_insert_interceptors(
698		&mut self,
699	) -> &mut Chain<dyn DictionaryRowPreInsertInterceptor + Send + Sync> {
700		&mut self.interceptors.dictionary_row_pre_insert
701	}
702
703	fn dictionary_row_post_insert_interceptors(
704		&mut self,
705	) -> &mut Chain<dyn DictionaryRowPostInsertInterceptor + Send + Sync> {
706		&mut self.interceptors.dictionary_row_post_insert
707	}
708
709	fn dictionary_row_pre_update_interceptors(
710		&mut self,
711	) -> &mut Chain<dyn DictionaryRowPreUpdateInterceptor + Send + Sync> {
712		&mut self.interceptors.dictionary_row_pre_update
713	}
714
715	fn dictionary_row_post_update_interceptors(
716		&mut self,
717	) -> &mut Chain<dyn DictionaryRowPostUpdateInterceptor + Send + Sync> {
718		&mut self.interceptors.dictionary_row_post_update
719	}
720
721	fn dictionary_row_pre_delete_interceptors(
722		&mut self,
723	) -> &mut Chain<dyn DictionaryRowPreDeleteInterceptor + Send + Sync> {
724		&mut self.interceptors.dictionary_row_pre_delete
725	}
726
727	fn dictionary_row_post_delete_interceptors(
728		&mut self,
729	) -> &mut Chain<dyn DictionaryRowPostDeleteInterceptor + Send + Sync> {
730		&mut self.interceptors.dictionary_row_post_delete
731	}
732
733	fn dictionary_post_create_interceptors(
734		&mut self,
735	) -> &mut Chain<dyn DictionaryPostCreateInterceptor + Send + Sync> {
736		&mut self.interceptors.dictionary_post_create
737	}
738
739	fn dictionary_pre_update_interceptors(
740		&mut self,
741	) -> &mut Chain<dyn DictionaryPreUpdateInterceptor + Send + Sync> {
742		&mut self.interceptors.dictionary_pre_update
743	}
744
745	fn dictionary_post_update_interceptors(
746		&mut self,
747	) -> &mut Chain<dyn DictionaryPostUpdateInterceptor + Send + Sync> {
748		&mut self.interceptors.dictionary_post_update
749	}
750
751	fn dictionary_pre_delete_interceptors(
752		&mut self,
753	) -> &mut Chain<dyn DictionaryPreDeleteInterceptor + Send + Sync> {
754		&mut self.interceptors.dictionary_pre_delete
755	}
756
757	fn series_row_pre_insert_interceptors(
758		&mut self,
759	) -> &mut Chain<dyn SeriesRowPreInsertInterceptor + Send + Sync> {
760		&mut self.interceptors.series_row_pre_insert
761	}
762
763	fn series_row_post_insert_interceptors(
764		&mut self,
765	) -> &mut Chain<dyn SeriesRowPostInsertInterceptor + Send + Sync> {
766		&mut self.interceptors.series_row_post_insert
767	}
768
769	fn series_row_pre_update_interceptors(
770		&mut self,
771	) -> &mut Chain<dyn SeriesRowPreUpdateInterceptor + Send + Sync> {
772		&mut self.interceptors.series_row_pre_update
773	}
774
775	fn series_row_post_update_interceptors(
776		&mut self,
777	) -> &mut Chain<dyn SeriesRowPostUpdateInterceptor + Send + Sync> {
778		&mut self.interceptors.series_row_post_update
779	}
780
781	fn series_row_pre_delete_interceptors(
782		&mut self,
783	) -> &mut Chain<dyn SeriesRowPreDeleteInterceptor + Send + Sync> {
784		&mut self.interceptors.series_row_pre_delete
785	}
786
787	fn series_row_post_delete_interceptors(
788		&mut self,
789	) -> &mut Chain<dyn SeriesRowPostDeleteInterceptor + Send + Sync> {
790		&mut self.interceptors.series_row_post_delete
791	}
792
793	fn series_post_create_interceptors(&mut self) -> &mut Chain<dyn SeriesPostCreateInterceptor + Send + Sync> {
794		&mut self.interceptors.series_post_create
795	}
796
797	fn series_pre_update_interceptors(&mut self) -> &mut Chain<dyn SeriesPreUpdateInterceptor + Send + Sync> {
798		&mut self.interceptors.series_pre_update
799	}
800
801	fn series_post_update_interceptors(&mut self) -> &mut Chain<dyn SeriesPostUpdateInterceptor + Send + Sync> {
802		&mut self.interceptors.series_post_update
803	}
804
805	fn series_pre_delete_interceptors(&mut self) -> &mut Chain<dyn SeriesPreDeleteInterceptor + Send + Sync> {
806		&mut self.interceptors.series_pre_delete
807	}
808
809	fn identity_post_create_interceptors(&mut self) -> &mut Chain<dyn IdentityPostCreateInterceptor + Send + Sync> {
810		&mut self.interceptors.identity_post_create
811	}
812
813	fn identity_pre_update_interceptors(&mut self) -> &mut Chain<dyn IdentityPreUpdateInterceptor + Send + Sync> {
814		&mut self.interceptors.identity_pre_update
815	}
816
817	fn identity_post_update_interceptors(&mut self) -> &mut Chain<dyn IdentityPostUpdateInterceptor + Send + Sync> {
818		&mut self.interceptors.identity_post_update
819	}
820
821	fn identity_pre_delete_interceptors(&mut self) -> &mut Chain<dyn IdentityPreDeleteInterceptor + Send + Sync> {
822		&mut self.interceptors.identity_pre_delete
823	}
824
825	fn role_post_create_interceptors(&mut self) -> &mut Chain<dyn RolePostCreateInterceptor + Send + Sync> {
826		&mut self.interceptors.role_post_create
827	}
828
829	fn role_pre_update_interceptors(&mut self) -> &mut Chain<dyn RolePreUpdateInterceptor + Send + Sync> {
830		&mut self.interceptors.role_pre_update
831	}
832
833	fn role_post_update_interceptors(&mut self) -> &mut Chain<dyn RolePostUpdateInterceptor + Send + Sync> {
834		&mut self.interceptors.role_post_update
835	}
836
837	fn role_pre_delete_interceptors(&mut self) -> &mut Chain<dyn RolePreDeleteInterceptor + Send + Sync> {
838		&mut self.interceptors.role_pre_delete
839	}
840
841	fn granted_role_post_create_interceptors(
842		&mut self,
843	) -> &mut Chain<dyn GrantedRolePostCreateInterceptor + Send + Sync> {
844		&mut self.interceptors.granted_role_post_create
845	}
846
847	fn granted_role_pre_delete_interceptors(
848		&mut self,
849	) -> &mut Chain<dyn GrantedRolePreDeleteInterceptor + Send + Sync> {
850		&mut self.interceptors.granted_role_pre_delete
851	}
852
853	fn authentication_post_create_interceptors(
854		&mut self,
855	) -> &mut Chain<dyn AuthenticationPostCreateInterceptor + Send + Sync> {
856		&mut self.interceptors.authentication_post_create
857	}
858
859	fn authentication_pre_delete_interceptors(
860		&mut self,
861	) -> &mut Chain<dyn AuthenticationPreDeleteInterceptor + Send + Sync> {
862		&mut self.interceptors.authentication_pre_delete
863	}
864}
865
866impl Drop for CommandTransaction {
867	fn drop(&mut self) {
868		if let Some(mut multi) = self.cmd.take()
869			&& (self.state == TransactionState::Active || self.state == TransactionState::Poisoned)
870		{
871			let _ = multi.rollback();
872		}
873	}
874}