1use std::{mem::take, sync::Arc};
5
6use reifydb_core::{
7 common::CommitVersion,
8 encoded::{
9 key::{EncodedKey, EncodedKeyRange},
10 row::EncodedRow,
11 },
12 event::EventBus,
13 execution::ExecutionResult,
14 interface::{
15 WithEventBus,
16 change::{Change, ChangeOrigin},
17 store::{MultiVersionBatch, MultiVersionRow},
18 },
19};
20use reifydb_runtime::context::clock::Clock;
21use reifydb_type::{
22 Result,
23 error::Diagnostic,
24 params::Params,
25 value::{datetime::DateTime, identity::IdentityId},
26};
27use tracing::instrument;
28
29use crate::{
30 TransactionId,
31 change::{RowChange, TransactionalCatalogChanges},
32 change_accumulator::ChangeAccumulator,
33 error::TransactionError,
34 interceptor::{
35 WithInterceptors,
36 authentication::{AuthenticationPostCreateInterceptor, AuthenticationPreDeleteInterceptor},
37 chain::InterceptorChain as Chain,
38 dictionary::{
39 DictionaryPostCreateInterceptor, DictionaryPostUpdateInterceptor,
40 DictionaryPreDeleteInterceptor, DictionaryPreUpdateInterceptor,
41 },
42 dictionary_row::{
43 DictionaryRowPostDeleteInterceptor, DictionaryRowPostInsertInterceptor,
44 DictionaryRowPostUpdateInterceptor, DictionaryRowPreDeleteInterceptor,
45 DictionaryRowPreInsertInterceptor, DictionaryRowPreUpdateInterceptor,
46 },
47 granted_role::{GrantedRolePostCreateInterceptor, GrantedRolePreDeleteInterceptor},
48 identity::{
49 IdentityPostCreateInterceptor, IdentityPostUpdateInterceptor, IdentityPreDeleteInterceptor,
50 IdentityPreUpdateInterceptor,
51 },
52 interceptors::Interceptors,
53 namespace::{
54 NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
55 NamespacePreUpdateInterceptor,
56 },
57 ringbuffer::{
58 RingBufferPostCreateInterceptor, RingBufferPostUpdateInterceptor,
59 RingBufferPreDeleteInterceptor, RingBufferPreUpdateInterceptor,
60 },
61 ringbuffer_row::{
62 RingBufferRowPostDeleteInterceptor, RingBufferRowPostInsertInterceptor,
63 RingBufferRowPostUpdateInterceptor, RingBufferRowPreDeleteInterceptor,
64 RingBufferRowPreInsertInterceptor, RingBufferRowPreUpdateInterceptor,
65 },
66 role::{
67 RolePostCreateInterceptor, RolePostUpdateInterceptor, RolePreDeleteInterceptor,
68 RolePreUpdateInterceptor,
69 },
70 series::{
71 SeriesPostCreateInterceptor, SeriesPostUpdateInterceptor, SeriesPreDeleteInterceptor,
72 SeriesPreUpdateInterceptor,
73 },
74 series_row::{
75 SeriesRowPostDeleteInterceptor, SeriesRowPostInsertInterceptor, SeriesRowPostUpdateInterceptor,
76 SeriesRowPreDeleteInterceptor, SeriesRowPreInsertInterceptor, SeriesRowPreUpdateInterceptor,
77 },
78 table::{
79 TablePostCreateInterceptor, TablePostUpdateInterceptor, TablePreDeleteInterceptor,
80 TablePreUpdateInterceptor,
81 },
82 table_row::{
83 TableRowPostDeleteInterceptor, TableRowPostInsertInterceptor, TableRowPostUpdateInterceptor,
84 TableRowPreDeleteInterceptor, TableRowPreInsertInterceptor, TableRowPreUpdateInterceptor,
85 },
86 transaction::{PostCommitContext, PostCommitInterceptor, PreCommitContext, PreCommitInterceptor},
87 view::{
88 ViewPostCreateInterceptor, ViewPostUpdateInterceptor, ViewPreDeleteInterceptor,
89 ViewPreUpdateInterceptor,
90 },
91 view_row::{
92 ViewRowPostDeleteInterceptor, ViewRowPostInsertInterceptor, ViewRowPostUpdateInterceptor,
93 ViewRowPreDeleteInterceptor, ViewRowPreInsertInterceptor, ViewRowPreUpdateInterceptor,
94 },
95 },
96 multi::{
97 pending::PendingWrites,
98 transaction::{MultiTransaction, write::MultiWriteTransaction},
99 },
100 single::{SingleTransaction, read::SingleReadTransaction, write::SingleWriteTransaction},
101 transaction::{
102 RqlExecutor, Transaction, apply_pre_commit_writes, collect_transaction_writes, query::QueryTransaction,
103 write::Write,
104 },
105};
106
107pub struct CommandTransaction {
112 pub multi: MultiTransaction,
113 pub single: SingleTransaction,
114 state: TransactionState,
115
116 pub cmd: Option<MultiWriteTransaction>,
117 pub event_bus: EventBus,
118
119 pub(crate) row_changes: Vec<RowChange>,
121 pub(crate) interceptors: Interceptors,
122
123 pub(crate) accumulator: ChangeAccumulator,
125
126 pub identity: IdentityId,
128
129 pub(crate) executor: Option<Arc<dyn RqlExecutor>>,
131
132 pub(crate) clock: Clock,
134
135 poison_cause: Option<Diagnostic>,
137}
138
139#[derive(Clone, Copy, PartialEq)]
140enum TransactionState {
141 Active,
142 Committed,
143 RolledBack,
144 Poisoned,
145}
146
147impl CommandTransaction {
148 #[instrument(name = "transaction::command::new", level = "debug", skip_all)]
150 pub fn new(
151 multi: MultiTransaction,
152 single: SingleTransaction,
153 event_bus: EventBus,
154 interceptors: Interceptors,
155 identity: IdentityId,
156 clock: Clock,
157 ) -> Result<Self> {
158 let cmd = multi.begin_command()?;
159 Ok(Self {
160 cmd: Some(cmd),
161 multi,
162 single,
163 state: TransactionState::Active,
164 event_bus,
165 interceptors,
166 row_changes: Vec::new(),
167 accumulator: ChangeAccumulator::new(),
168 identity,
169 executor: None,
170 clock,
171 poison_cause: None,
172 })
173 }
174
175 pub fn set_executor(&mut self, executor: Arc<dyn RqlExecutor>) {
177 self.executor = Some(executor);
178 }
179
180 pub fn rql(&mut self, rql: &str, params: Params) -> ExecutionResult {
184 if let Err(e) = self.check_active() {
185 return ExecutionResult {
186 frames: vec![],
187 error: Some(e),
188 metrics: Default::default(),
189 };
190 }
191 let executor = self.executor.clone().expect("RqlExecutor not set");
192 let result = executor.rql(&mut Transaction::Command(self), rql, params);
193 if let Some(ref e) = result.error {
194 self.poison(*e.0.clone());
195 }
196 result
197 }
198
199 #[instrument(name = "transaction::command::event_bus", level = "trace", skip(self))]
200 pub fn event_bus(&self) -> &EventBus {
201 &self.event_bus
202 }
203
204 fn check_active(&self) -> Result<()> {
207 match self.state {
208 TransactionState::Active => Ok(()),
209 TransactionState::Committed => Err(TransactionError::AlreadyCommitted.into()),
210 TransactionState::RolledBack => Err(TransactionError::AlreadyRolledBack.into()),
211 TransactionState::Poisoned => Err(TransactionError::Poisoned {
212 cause: Box::new(self.poison_cause.clone().unwrap()),
213 }
214 .into()),
215 }
216 }
217
218 pub(crate) fn poison(&mut self, cause: Diagnostic) {
220 self.state = TransactionState::Poisoned;
221 self.poison_cause = Some(cause);
222 }
223
224 #[instrument(name = "transaction::command::commit", level = "debug", skip(self))]
228 pub fn commit(&mut self) -> Result<CommitVersion> {
229 self.check_active()?;
230 let mut ctx = self.build_pre_commit_context();
231 self.interceptors.pre_commit.execute(&mut ctx)?;
232 self.finalize_commit(ctx, false)
233 }
234
235 #[inline]
236 fn build_pre_commit_context(&mut self) -> PreCommitContext {
237 let transaction_writes = collect_transaction_writes(self.pending_writes());
238 PreCommitContext {
239 flow_changes: self
240 .accumulator
241 .take_changes(CommitVersion(0), DateTime::from_nanos(self.clock.now_nanos())),
242 pending_writes: Vec::new(),
243 pending_shapes: Vec::new(),
244 transaction_writes,
245 view_entries: Vec::new(),
246 }
247 }
248
249 fn finalize_commit(&mut self, ctx: PreCommitContext, unchecked: bool) -> Result<CommitVersion> {
250 let Some(mut multi) = self.cmd.take() else {
251 unreachable!("Transaction state inconsistency")
252 };
253 apply_pre_commit_writes(&mut multi, &ctx.pending_writes)?;
254 let id = multi.id();
255 self.state = TransactionState::Committed;
256
257 let changes = TransactionalCatalogChanges::default();
258 let row_changes = take(&mut self.row_changes);
259 let version = if unchecked {
260 multi.commit_unchecked()?
261 } else {
262 multi.commit()?
263 };
264 self.interceptors.post_commit.execute(PostCommitContext::new(id, version, changes, row_changes))?;
265 Ok(version)
266 }
267
268 pub fn execute_bulk_unchecked<F, R>(&mut self, body: F) -> Result<R>
281 where
282 F: FnOnce(&mut CommandTransaction) -> Result<R>,
283 {
284 self.disable_conflict_tracking()?;
285 let r = match body(self) {
286 Ok(r) => r,
287 Err(e) => {
288 let _ = self.rollback();
289 return Err(e);
290 }
291 };
292 self.commit_unchecked()?;
293 Ok(r)
294 }
295
296 #[instrument(name = "transaction::command::commit_unchecked", level = "debug", skip(self))]
298 pub(crate) fn commit_unchecked(&mut self) -> Result<CommitVersion> {
299 self.check_active()?;
300 let mut ctx = self.build_pre_commit_context();
301 self.interceptors.pre_commit.execute(&mut ctx)?;
302 self.finalize_commit(ctx, true)
303 }
304
305 #[instrument(name = "transaction::command::rollback", level = "debug", skip(self))]
307 pub fn rollback(&mut self) -> Result<()> {
308 self.check_active()?;
309 if let Some(mut multi) = self.cmd.take() {
310 self.state = TransactionState::RolledBack;
311 multi.rollback()
312 } else {
313 unreachable!("Transaction state inconsistency")
315 }
316 }
317
318 #[instrument(name = "transaction::command::pending_writes", level = "trace", skip(self))]
323 pub fn pending_writes(&self) -> &PendingWrites {
324 self.cmd.as_ref().unwrap().pending_writes()
325 }
326
327 #[instrument(name = "transaction::command::with_single_query", level = "trace", skip(self, keys, f))]
329 pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
330 where
331 I: IntoIterator<Item = &'a EncodedKey> + Send,
332 F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
333 R: Send,
334 {
335 self.check_active()?;
336 self.single.with_query(keys, f)
337 }
338
339 #[instrument(name = "transaction::command::with_single_command", level = "trace", skip(self, keys, f))]
341 pub fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
342 where
343 I: IntoIterator<Item = &'a EncodedKey> + Send,
344 F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R> + Send,
345 R: Send,
346 {
347 self.check_active()?;
348 self.single.with_command(keys, f)
349 }
350
351 #[instrument(name = "transaction::command::with_multi_query", level = "trace", skip(self, f))]
355 pub fn with_multi_query<F, R>(&self, f: F) -> Result<R>
356 where
357 F: FnOnce(&mut QueryTransaction) -> Result<R>,
358 {
359 self.check_active()?;
360
361 let mut query_txn =
362 QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
363
364 f(&mut query_txn)
365 }
366
367 #[instrument(name = "transaction::command::with_multi_query_as_of_exclusive", level = "trace", skip(self, f))]
368 pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
369 where
370 F: FnOnce(&mut QueryTransaction) -> Result<R>,
371 {
372 self.check_active()?;
373
374 let mut query_txn =
375 QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
376
377 query_txn.read_as_of_version_exclusive(version)?;
378
379 f(&mut query_txn)
380 }
381
382 #[instrument(name = "transaction::command::with_multi_query_as_of_inclusive", level = "trace", skip(self, f))]
383 pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
384 where
385 F: FnOnce(&mut QueryTransaction) -> Result<R>,
386 {
387 self.check_active()?;
388
389 let mut query_txn =
390 QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
391
392 query_txn.multi.read_as_of_version_inclusive(version);
393
394 f(&mut query_txn)
395 }
396
397 #[instrument(name = "transaction::command::begin_single_query", level = "trace", skip(self, keys))]
399 pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
400 where
401 I: IntoIterator<Item = &'a EncodedKey>,
402 {
403 self.check_active()?;
404 self.single.begin_query(keys)
405 }
406
407 #[instrument(name = "transaction::command::begin_single_command", level = "trace", skip(self, keys))]
409 pub fn begin_single_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
410 where
411 I: IntoIterator<Item = &'a EncodedKey>,
412 {
413 self.check_active()?;
414 self.single.begin_command(keys)
415 }
416
417 pub fn track_row_change(&mut self, change: RowChange) {
419 self.row_changes.push(change);
420 }
421
422 pub fn track_flow_change(&mut self, change: Change) {
424 if let ChangeOrigin::Shape(id) = change.origin {
425 for diff in change.diffs {
426 self.accumulator.track(id, diff);
427 }
428 }
429 }
430
431 #[inline]
433 pub fn version(&self) -> CommitVersion {
434 self.cmd.as_ref().unwrap().version()
435 }
436
437 #[inline]
439 pub fn id(&self) -> TransactionId {
440 self.cmd.as_ref().unwrap().id()
441 }
442
443 #[inline]
445 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
446 self.check_active()?;
447 Ok(self.cmd.as_mut().unwrap().get(key)?.map(|v| v.into_multi_version_row()))
448 }
449
450 #[inline]
453 pub fn get_committed(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
454 self.check_active()?;
455 Ok(self.cmd.as_mut().unwrap().get_committed(key)?.map(|v| v.into_multi_version_row()))
456 }
457
458 #[inline]
460 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
461 self.check_active()?;
462 self.cmd.as_mut().unwrap().contains_key(key)
463 }
464
465 #[inline]
467 pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
468 self.check_active()?;
469 self.cmd.as_mut().unwrap().prefix(prefix)
470 }
471
472 #[inline]
474 pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
475 self.check_active()?;
476 self.cmd.as_mut().unwrap().prefix_rev(prefix)
477 }
478
479 #[inline]
481 pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
482 self.check_active()?;
483 self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version);
484 Ok(())
485 }
486
487 #[inline]
489 pub fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
490 self.check_active()?;
491 self.cmd.as_mut().unwrap().set(key, row)
492 }
493
494 #[inline]
497 pub fn reserve_writes(&mut self, additional: usize) -> Result<()> {
498 self.check_active()?;
499 self.cmd.as_mut().unwrap().reserve_writes(additional);
500 Ok(())
501 }
502
503 #[inline]
505 pub(crate) fn disable_conflict_tracking(&mut self) -> Result<()> {
506 self.check_active()?;
507 self.cmd.as_mut().unwrap().disable_conflict_tracking();
508 Ok(())
509 }
510
511 #[inline]
515 pub fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
516 self.check_active()?;
517 self.cmd.as_mut().unwrap().unset(key, row)
518 }
519
520 #[inline]
524 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
525 self.check_active()?;
526 self.cmd.as_mut().unwrap().remove(key)
527 }
528
529 #[inline]
530 pub fn mark_preexisting(&mut self, key: &EncodedKey) -> Result<()> {
531 self.check_active()?;
532 self.cmd.as_mut().unwrap().mark_preexisting(key);
533 Ok(())
534 }
535
536 #[inline]
538 pub fn range(
539 &mut self,
540 range: EncodedKeyRange,
541 batch_size: usize,
542 ) -> Result<Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>> {
543 self.check_active()?;
544 Ok(self.cmd.as_mut().unwrap().range(range, batch_size))
545 }
546
547 #[inline]
549 pub fn range_rev(
550 &mut self,
551 range: EncodedKeyRange,
552 batch_size: usize,
553 ) -> Result<Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>> {
554 self.check_active()?;
555 Ok(self.cmd.as_mut().unwrap().range_rev(range, batch_size))
556 }
557}
558
559impl WithEventBus for CommandTransaction {
560 fn event_bus(&self) -> &EventBus {
561 &self.event_bus
562 }
563}
564
565impl Write for CommandTransaction {
566 #[inline]
567 fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
568 CommandTransaction::set(self, key, row)
569 }
570 #[inline]
571 fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
572 CommandTransaction::unset(self, key, row)
573 }
574 #[inline]
575 fn remove(&mut self, key: &EncodedKey) -> Result<()> {
576 CommandTransaction::remove(self, key)
577 }
578 #[inline]
579 fn mark_preexisting(&mut self, key: &EncodedKey) -> Result<()> {
580 CommandTransaction::mark_preexisting(self, key)
581 }
582 #[inline]
583 fn track_row_change(&mut self, change: RowChange) {
584 CommandTransaction::track_row_change(self, change)
585 }
586 #[inline]
587 fn track_flow_change(&mut self, change: Change) {
588 CommandTransaction::track_flow_change(self, change)
589 }
590}
591
592impl WithInterceptors for CommandTransaction {
593 fn table_row_pre_insert_interceptors(&mut self) -> &mut Chain<dyn TableRowPreInsertInterceptor + Send + Sync> {
594 &mut self.interceptors.table_row_pre_insert
595 }
596
597 fn table_row_post_insert_interceptors(
598 &mut self,
599 ) -> &mut Chain<dyn TableRowPostInsertInterceptor + Send + Sync> {
600 &mut self.interceptors.table_row_post_insert
601 }
602
603 fn table_row_pre_update_interceptors(&mut self) -> &mut Chain<dyn TableRowPreUpdateInterceptor + Send + Sync> {
604 &mut self.interceptors.table_row_pre_update
605 }
606
607 fn table_row_post_update_interceptors(
608 &mut self,
609 ) -> &mut Chain<dyn TableRowPostUpdateInterceptor + Send + Sync> {
610 &mut self.interceptors.table_row_post_update
611 }
612
613 fn table_row_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TableRowPreDeleteInterceptor + Send + Sync> {
614 &mut self.interceptors.table_row_pre_delete
615 }
616
617 fn table_row_post_delete_interceptors(
618 &mut self,
619 ) -> &mut Chain<dyn TableRowPostDeleteInterceptor + Send + Sync> {
620 &mut self.interceptors.table_row_post_delete
621 }
622
623 fn ringbuffer_row_pre_insert_interceptors(
624 &mut self,
625 ) -> &mut Chain<dyn RingBufferRowPreInsertInterceptor + Send + Sync> {
626 &mut self.interceptors.ringbuffer_row_pre_insert
627 }
628
629 fn ringbuffer_row_post_insert_interceptors(
630 &mut self,
631 ) -> &mut Chain<dyn RingBufferRowPostInsertInterceptor + Send + Sync> {
632 &mut self.interceptors.ringbuffer_row_post_insert
633 }
634
635 fn ringbuffer_row_pre_update_interceptors(
636 &mut self,
637 ) -> &mut Chain<dyn RingBufferRowPreUpdateInterceptor + Send + Sync> {
638 &mut self.interceptors.ringbuffer_row_pre_update
639 }
640
641 fn ringbuffer_row_post_update_interceptors(
642 &mut self,
643 ) -> &mut Chain<dyn RingBufferRowPostUpdateInterceptor + Send + Sync> {
644 &mut self.interceptors.ringbuffer_row_post_update
645 }
646
647 fn ringbuffer_row_pre_delete_interceptors(
648 &mut self,
649 ) -> &mut Chain<dyn RingBufferRowPreDeleteInterceptor + Send + Sync> {
650 &mut self.interceptors.ringbuffer_row_pre_delete
651 }
652
653 fn ringbuffer_row_post_delete_interceptors(
654 &mut self,
655 ) -> &mut Chain<dyn RingBufferRowPostDeleteInterceptor + Send + Sync> {
656 &mut self.interceptors.ringbuffer_row_post_delete
657 }
658
659 fn pre_commit_interceptors(&mut self) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync> {
660 &mut self.interceptors.pre_commit
661 }
662
663 fn post_commit_interceptors(&mut self) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync> {
664 &mut self.interceptors.post_commit
665 }
666
667 fn namespace_post_create_interceptors(
668 &mut self,
669 ) -> &mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync> {
670 &mut self.interceptors.namespace_post_create
671 }
672
673 fn namespace_pre_update_interceptors(&mut self) -> &mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync> {
674 &mut self.interceptors.namespace_pre_update
675 }
676
677 fn namespace_post_update_interceptors(
678 &mut self,
679 ) -> &mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync> {
680 &mut self.interceptors.namespace_post_update
681 }
682
683 fn namespace_pre_delete_interceptors(&mut self) -> &mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync> {
684 &mut self.interceptors.namespace_pre_delete
685 }
686
687 fn table_post_create_interceptors(&mut self) -> &mut Chain<dyn TablePostCreateInterceptor + Send + Sync> {
688 &mut self.interceptors.table_post_create
689 }
690
691 fn table_pre_update_interceptors(&mut self) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync> {
692 &mut self.interceptors.table_pre_update
693 }
694
695 fn table_post_update_interceptors(&mut self) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync> {
696 &mut self.interceptors.table_post_update
697 }
698
699 fn table_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync> {
700 &mut self.interceptors.table_pre_delete
701 }
702
703 fn view_row_pre_insert_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreInsertInterceptor + Send + Sync> {
704 &mut self.interceptors.view_row_pre_insert
705 }
706
707 fn view_row_post_insert_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostInsertInterceptor + Send + Sync> {
708 &mut self.interceptors.view_row_post_insert
709 }
710
711 fn view_row_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreUpdateInterceptor + Send + Sync> {
712 &mut self.interceptors.view_row_pre_update
713 }
714
715 fn view_row_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostUpdateInterceptor + Send + Sync> {
716 &mut self.interceptors.view_row_post_update
717 }
718
719 fn view_row_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreDeleteInterceptor + Send + Sync> {
720 &mut self.interceptors.view_row_pre_delete
721 }
722
723 fn view_row_post_delete_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostDeleteInterceptor + Send + Sync> {
724 &mut self.interceptors.view_row_post_delete
725 }
726
727 fn view_post_create_interceptors(&mut self) -> &mut Chain<dyn ViewPostCreateInterceptor + Send + Sync> {
728 &mut self.interceptors.view_post_create
729 }
730
731 fn view_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync> {
732 &mut self.interceptors.view_pre_update
733 }
734
735 fn view_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync> {
736 &mut self.interceptors.view_post_update
737 }
738
739 fn view_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync> {
740 &mut self.interceptors.view_pre_delete
741 }
742
743 fn ringbuffer_post_create_interceptors(
744 &mut self,
745 ) -> &mut Chain<dyn RingBufferPostCreateInterceptor + Send + Sync> {
746 &mut self.interceptors.ringbuffer_post_create
747 }
748
749 fn ringbuffer_pre_update_interceptors(
750 &mut self,
751 ) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
752 &mut self.interceptors.ringbuffer_pre_update
753 }
754
755 fn ringbuffer_post_update_interceptors(
756 &mut self,
757 ) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
758 &mut self.interceptors.ringbuffer_post_update
759 }
760
761 fn ringbuffer_pre_delete_interceptors(
762 &mut self,
763 ) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
764 &mut self.interceptors.ringbuffer_pre_delete
765 }
766
767 fn dictionary_row_pre_insert_interceptors(
768 &mut self,
769 ) -> &mut Chain<dyn DictionaryRowPreInsertInterceptor + Send + Sync> {
770 &mut self.interceptors.dictionary_row_pre_insert
771 }
772
773 fn dictionary_row_post_insert_interceptors(
774 &mut self,
775 ) -> &mut Chain<dyn DictionaryRowPostInsertInterceptor + Send + Sync> {
776 &mut self.interceptors.dictionary_row_post_insert
777 }
778
779 fn dictionary_row_pre_update_interceptors(
780 &mut self,
781 ) -> &mut Chain<dyn DictionaryRowPreUpdateInterceptor + Send + Sync> {
782 &mut self.interceptors.dictionary_row_pre_update
783 }
784
785 fn dictionary_row_post_update_interceptors(
786 &mut self,
787 ) -> &mut Chain<dyn DictionaryRowPostUpdateInterceptor + Send + Sync> {
788 &mut self.interceptors.dictionary_row_post_update
789 }
790
791 fn dictionary_row_pre_delete_interceptors(
792 &mut self,
793 ) -> &mut Chain<dyn DictionaryRowPreDeleteInterceptor + Send + Sync> {
794 &mut self.interceptors.dictionary_row_pre_delete
795 }
796
797 fn dictionary_row_post_delete_interceptors(
798 &mut self,
799 ) -> &mut Chain<dyn DictionaryRowPostDeleteInterceptor + Send + Sync> {
800 &mut self.interceptors.dictionary_row_post_delete
801 }
802
803 fn dictionary_post_create_interceptors(
804 &mut self,
805 ) -> &mut Chain<dyn DictionaryPostCreateInterceptor + Send + Sync> {
806 &mut self.interceptors.dictionary_post_create
807 }
808
809 fn dictionary_pre_update_interceptors(
810 &mut self,
811 ) -> &mut Chain<dyn DictionaryPreUpdateInterceptor + Send + Sync> {
812 &mut self.interceptors.dictionary_pre_update
813 }
814
815 fn dictionary_post_update_interceptors(
816 &mut self,
817 ) -> &mut Chain<dyn DictionaryPostUpdateInterceptor + Send + Sync> {
818 &mut self.interceptors.dictionary_post_update
819 }
820
821 fn dictionary_pre_delete_interceptors(
822 &mut self,
823 ) -> &mut Chain<dyn DictionaryPreDeleteInterceptor + Send + Sync> {
824 &mut self.interceptors.dictionary_pre_delete
825 }
826
827 fn series_row_pre_insert_interceptors(
828 &mut self,
829 ) -> &mut Chain<dyn SeriesRowPreInsertInterceptor + Send + Sync> {
830 &mut self.interceptors.series_row_pre_insert
831 }
832
833 fn series_row_post_insert_interceptors(
834 &mut self,
835 ) -> &mut Chain<dyn SeriesRowPostInsertInterceptor + Send + Sync> {
836 &mut self.interceptors.series_row_post_insert
837 }
838
839 fn series_row_pre_update_interceptors(
840 &mut self,
841 ) -> &mut Chain<dyn SeriesRowPreUpdateInterceptor + Send + Sync> {
842 &mut self.interceptors.series_row_pre_update
843 }
844
845 fn series_row_post_update_interceptors(
846 &mut self,
847 ) -> &mut Chain<dyn SeriesRowPostUpdateInterceptor + Send + Sync> {
848 &mut self.interceptors.series_row_post_update
849 }
850
851 fn series_row_pre_delete_interceptors(
852 &mut self,
853 ) -> &mut Chain<dyn SeriesRowPreDeleteInterceptor + Send + Sync> {
854 &mut self.interceptors.series_row_pre_delete
855 }
856
857 fn series_row_post_delete_interceptors(
858 &mut self,
859 ) -> &mut Chain<dyn SeriesRowPostDeleteInterceptor + Send + Sync> {
860 &mut self.interceptors.series_row_post_delete
861 }
862
863 fn series_post_create_interceptors(&mut self) -> &mut Chain<dyn SeriesPostCreateInterceptor + Send + Sync> {
864 &mut self.interceptors.series_post_create
865 }
866
867 fn series_pre_update_interceptors(&mut self) -> &mut Chain<dyn SeriesPreUpdateInterceptor + Send + Sync> {
868 &mut self.interceptors.series_pre_update
869 }
870
871 fn series_post_update_interceptors(&mut self) -> &mut Chain<dyn SeriesPostUpdateInterceptor + Send + Sync> {
872 &mut self.interceptors.series_post_update
873 }
874
875 fn series_pre_delete_interceptors(&mut self) -> &mut Chain<dyn SeriesPreDeleteInterceptor + Send + Sync> {
876 &mut self.interceptors.series_pre_delete
877 }
878
879 fn identity_post_create_interceptors(&mut self) -> &mut Chain<dyn IdentityPostCreateInterceptor + Send + Sync> {
880 &mut self.interceptors.identity_post_create
881 }
882
883 fn identity_pre_update_interceptors(&mut self) -> &mut Chain<dyn IdentityPreUpdateInterceptor + Send + Sync> {
884 &mut self.interceptors.identity_pre_update
885 }
886
887 fn identity_post_update_interceptors(&mut self) -> &mut Chain<dyn IdentityPostUpdateInterceptor + Send + Sync> {
888 &mut self.interceptors.identity_post_update
889 }
890
891 fn identity_pre_delete_interceptors(&mut self) -> &mut Chain<dyn IdentityPreDeleteInterceptor + Send + Sync> {
892 &mut self.interceptors.identity_pre_delete
893 }
894
895 fn role_post_create_interceptors(&mut self) -> &mut Chain<dyn RolePostCreateInterceptor + Send + Sync> {
896 &mut self.interceptors.role_post_create
897 }
898
899 fn role_pre_update_interceptors(&mut self) -> &mut Chain<dyn RolePreUpdateInterceptor + Send + Sync> {
900 &mut self.interceptors.role_pre_update
901 }
902
903 fn role_post_update_interceptors(&mut self) -> &mut Chain<dyn RolePostUpdateInterceptor + Send + Sync> {
904 &mut self.interceptors.role_post_update
905 }
906
907 fn role_pre_delete_interceptors(&mut self) -> &mut Chain<dyn RolePreDeleteInterceptor + Send + Sync> {
908 &mut self.interceptors.role_pre_delete
909 }
910
911 fn granted_role_post_create_interceptors(
912 &mut self,
913 ) -> &mut Chain<dyn GrantedRolePostCreateInterceptor + Send + Sync> {
914 &mut self.interceptors.granted_role_post_create
915 }
916
917 fn granted_role_pre_delete_interceptors(
918 &mut self,
919 ) -> &mut Chain<dyn GrantedRolePreDeleteInterceptor + Send + Sync> {
920 &mut self.interceptors.granted_role_pre_delete
921 }
922
923 fn authentication_post_create_interceptors(
924 &mut self,
925 ) -> &mut Chain<dyn AuthenticationPostCreateInterceptor + Send + Sync> {
926 &mut self.interceptors.authentication_post_create
927 }
928
929 fn authentication_pre_delete_interceptors(
930 &mut self,
931 ) -> &mut Chain<dyn AuthenticationPreDeleteInterceptor + Send + Sync> {
932 &mut self.interceptors.authentication_pre_delete
933 }
934}
935
936impl Drop for CommandTransaction {
937 fn drop(&mut self) {
938 if let Some(mut multi) = self.cmd.take() {
939 if self.state == TransactionState::Active || self.state == TransactionState::Poisoned {
941 let _ = multi.rollback();
942 }
943 }
944 }
945}