1use std::{mem::take, sync::Arc};
5
6use reifydb_core::{
7 common::CommitVersion,
8 encoded::{
9 key::{EncodedKey, EncodedKeyRange},
10 row::EncodedRow,
11 },
12 event::EventBus,
13 execution::ExecutionResult,
14 interface::{
15 WithEventBus,
16 change::{Change, ChangeOrigin},
17 store::{MultiVersionBatch, MultiVersionRow},
18 },
19};
20use reifydb_runtime::context::clock::Clock;
21use reifydb_type::{
22 Result,
23 error::Diagnostic,
24 params::Params,
25 value::{datetime::DateTime, identity::IdentityId},
26};
27use tracing::instrument;
28
29use crate::{
30 TransactionId,
31 change::{RowChange, TransactionalCatalogChanges},
32 change_accumulator::ChangeAccumulator,
33 error::TransactionError,
34 interceptor::{
35 WithInterceptors,
36 authentication::{AuthenticationPostCreateInterceptor, AuthenticationPreDeleteInterceptor},
37 chain::InterceptorChain as Chain,
38 dictionary::{
39 DictionaryPostCreateInterceptor, DictionaryPostUpdateInterceptor,
40 DictionaryPreDeleteInterceptor, DictionaryPreUpdateInterceptor,
41 },
42 dictionary_row::{
43 DictionaryRowPostDeleteInterceptor, DictionaryRowPostInsertInterceptor,
44 DictionaryRowPostUpdateInterceptor, DictionaryRowPreDeleteInterceptor,
45 DictionaryRowPreInsertInterceptor, DictionaryRowPreUpdateInterceptor,
46 },
47 granted_role::{GrantedRolePostCreateInterceptor, GrantedRolePreDeleteInterceptor},
48 identity::{
49 IdentityPostCreateInterceptor, IdentityPostUpdateInterceptor, IdentityPreDeleteInterceptor,
50 IdentityPreUpdateInterceptor,
51 },
52 interceptors::Interceptors,
53 namespace::{
54 NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
55 NamespacePreUpdateInterceptor,
56 },
57 ringbuffer::{
58 RingBufferPostCreateInterceptor, RingBufferPostUpdateInterceptor,
59 RingBufferPreDeleteInterceptor, RingBufferPreUpdateInterceptor,
60 },
61 ringbuffer_row::{
62 RingBufferRowPostDeleteInterceptor, RingBufferRowPostInsertInterceptor,
63 RingBufferRowPostUpdateInterceptor, RingBufferRowPreDeleteInterceptor,
64 RingBufferRowPreInsertInterceptor, RingBufferRowPreUpdateInterceptor,
65 },
66 role::{
67 RolePostCreateInterceptor, RolePostUpdateInterceptor, RolePreDeleteInterceptor,
68 RolePreUpdateInterceptor,
69 },
70 series::{
71 SeriesPostCreateInterceptor, SeriesPostUpdateInterceptor, SeriesPreDeleteInterceptor,
72 SeriesPreUpdateInterceptor,
73 },
74 series_row::{
75 SeriesRowPostDeleteInterceptor, SeriesRowPostInsertInterceptor, SeriesRowPostUpdateInterceptor,
76 SeriesRowPreDeleteInterceptor, SeriesRowPreInsertInterceptor, SeriesRowPreUpdateInterceptor,
77 },
78 table::{
79 TablePostCreateInterceptor, TablePostUpdateInterceptor, TablePreDeleteInterceptor,
80 TablePreUpdateInterceptor,
81 },
82 table_row::{
83 TableRowPostDeleteInterceptor, TableRowPostInsertInterceptor, TableRowPostUpdateInterceptor,
84 TableRowPreDeleteInterceptor, TableRowPreInsertInterceptor, TableRowPreUpdateInterceptor,
85 },
86 transaction::{PostCommitContext, PostCommitInterceptor, PreCommitContext, PreCommitInterceptor},
87 view::{
88 ViewPostCreateInterceptor, ViewPostUpdateInterceptor, ViewPreDeleteInterceptor,
89 ViewPreUpdateInterceptor,
90 },
91 view_row::{
92 ViewRowPostDeleteInterceptor, ViewRowPostInsertInterceptor, ViewRowPostUpdateInterceptor,
93 ViewRowPreDeleteInterceptor, ViewRowPreInsertInterceptor, ViewRowPreUpdateInterceptor,
94 },
95 },
96 multi::{
97 pending::PendingWrites,
98 transaction::{MultiTransaction, write::MultiWriteTransaction},
99 },
100 single::{SingleTransaction, read::SingleReadTransaction, write::SingleWriteTransaction},
101 transaction::{
102 RqlExecutor, Transaction, apply_pre_commit_writes, collect_transaction_writes, query::QueryTransaction,
103 write::Write,
104 },
105};
106
107pub struct CommandTransaction {
108 pub multi: MultiTransaction,
109 pub single: SingleTransaction,
110 state: TransactionState,
111
112 pub cmd: Option<MultiWriteTransaction>,
113 pub event_bus: EventBus,
114
115 pub(crate) row_changes: Vec<RowChange>,
116 pub(crate) interceptors: Interceptors,
117
118 pub(crate) accumulator: ChangeAccumulator,
119
120 pub identity: IdentityId,
121
122 pub(crate) executor: Option<Arc<dyn RqlExecutor>>,
123
124 pub(crate) clock: Clock,
125
126 poison_cause: Option<Diagnostic>,
127}
128
129#[derive(Clone, Copy, PartialEq)]
130enum TransactionState {
131 Active,
132 Committed,
133 RolledBack,
134 Poisoned,
135}
136
137impl CommandTransaction {
138 #[instrument(name = "transaction::command::new", level = "debug", skip_all)]
139 pub fn new(
140 multi: MultiTransaction,
141 single: SingleTransaction,
142 event_bus: EventBus,
143 interceptors: Interceptors,
144 identity: IdentityId,
145 clock: Clock,
146 ) -> Result<Self> {
147 let cmd = multi.begin_command()?;
148 Ok(Self {
149 cmd: Some(cmd),
150 multi,
151 single,
152 state: TransactionState::Active,
153 event_bus,
154 interceptors,
155 row_changes: Vec::new(),
156 accumulator: ChangeAccumulator::new(),
157 identity,
158 executor: None,
159 clock,
160 poison_cause: None,
161 })
162 }
163
164 pub fn set_executor(&mut self, executor: Arc<dyn RqlExecutor>) {
165 self.executor = Some(executor);
166 }
167
168 pub fn rql(&mut self, rql: &str, params: Params) -> ExecutionResult {
169 if let Err(e) = self.check_active() {
170 return ExecutionResult {
171 frames: vec![],
172 error: Some(e),
173 metrics: Default::default(),
174 };
175 }
176 let executor = self.executor.clone().expect("RqlExecutor not set");
177 let result = executor.rql(&mut Transaction::Command(self), rql, params);
178 if let Some(ref e) = result.error {
179 self.poison(*e.0.clone());
180 }
181 result
182 }
183
184 #[instrument(name = "transaction::command::event_bus", level = "trace", skip(self))]
185 pub fn event_bus(&self) -> &EventBus {
186 &self.event_bus
187 }
188
189 fn check_active(&self) -> Result<()> {
190 match self.state {
191 TransactionState::Active => Ok(()),
192 TransactionState::Committed => Err(TransactionError::AlreadyCommitted.into()),
193 TransactionState::RolledBack => Err(TransactionError::AlreadyRolledBack.into()),
194 TransactionState::Poisoned => Err(TransactionError::Poisoned {
195 cause: Box::new(self.poison_cause.clone().unwrap()),
196 }
197 .into()),
198 }
199 }
200
201 pub(crate) fn poison(&mut self, cause: Diagnostic) {
202 self.state = TransactionState::Poisoned;
203 self.poison_cause = Some(cause);
204 }
205
206 #[instrument(name = "transaction::command::commit", level = "debug", skip(self))]
207 pub fn commit(&mut self) -> Result<CommitVersion> {
208 self.check_active()?;
209 let mut ctx = self.build_pre_commit_context()?;
210 self.interceptors.pre_commit.execute(&mut ctx)?;
211 self.finalize_commit(ctx, false)
212 }
213
214 #[inline]
215 fn build_pre_commit_context(&mut self) -> Result<PreCommitContext> {
216 let transaction_writes = collect_transaction_writes(self.pending_writes());
217 Ok(PreCommitContext {
218 flow_changes: self
219 .accumulator
220 .take_changes(CommitVersion(0), DateTime::from_nanos(self.clock.now_nanos()))?,
221 pending_writes: Vec::new(),
222 pending_shapes: Vec::new(),
223 transaction_writes,
224 view_entries: Vec::new(),
225 })
226 }
227
228 fn finalize_commit(&mut self, ctx: PreCommitContext, unchecked: bool) -> Result<CommitVersion> {
229 let Some(mut multi) = self.cmd.take() else {
230 unreachable!("Transaction state inconsistency")
231 };
232 apply_pre_commit_writes(&mut multi, &ctx.pending_writes)?;
233 let id = multi.id();
234 self.state = TransactionState::Committed;
235
236 let changes = TransactionalCatalogChanges::default();
237 let row_changes = take(&mut self.row_changes);
238 let version = if unchecked {
239 multi.commit_unchecked()?
240 } else {
241 multi.commit()?
242 };
243 self.interceptors.post_commit.execute(PostCommitContext::new(id, version, changes, row_changes))?;
244 Ok(version)
245 }
246
247 pub fn execute_bulk_unchecked<F, R>(&mut self, body: F) -> Result<R>
248 where
249 F: FnOnce(&mut CommandTransaction) -> Result<R>,
250 {
251 self.disable_conflict_tracking()?;
252 let r = match body(self) {
253 Ok(r) => r,
254 Err(e) => {
255 let _ = self.rollback();
256 return Err(e);
257 }
258 };
259 self.commit_unchecked()?;
260 Ok(r)
261 }
262
263 #[instrument(name = "transaction::command::commit_unchecked", level = "debug", skip(self))]
264 pub(crate) fn commit_unchecked(&mut self) -> Result<CommitVersion> {
265 self.check_active()?;
266 let mut ctx = self.build_pre_commit_context()?;
267 self.interceptors.pre_commit.execute(&mut ctx)?;
268 self.finalize_commit(ctx, true)
269 }
270
271 #[instrument(name = "transaction::command::rollback", level = "debug", skip(self))]
272 pub fn rollback(&mut self) -> Result<()> {
273 self.check_active()?;
274 if let Some(mut multi) = self.cmd.take() {
275 self.state = TransactionState::RolledBack;
276 multi.rollback()
277 } else {
278 unreachable!("Transaction state inconsistency")
279 }
280 }
281
282 #[instrument(name = "transaction::command::pending_writes", level = "trace", skip(self))]
283 pub fn pending_writes(&self) -> &PendingWrites {
284 self.cmd.as_ref().unwrap().pending_writes()
285 }
286
287 #[instrument(name = "transaction::command::with_single_query", level = "trace", skip(self, keys, f))]
288 pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
289 where
290 I: IntoIterator<Item = &'a EncodedKey> + Send,
291 F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
292 R: Send,
293 {
294 self.check_active()?;
295 self.single.with_query(keys, f)
296 }
297
298 #[instrument(name = "transaction::command::with_single_command", level = "trace", skip(self, keys, f))]
299 pub fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
300 where
301 I: IntoIterator<Item = &'a EncodedKey> + Send,
302 F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R> + Send,
303 R: Send,
304 {
305 self.check_active()?;
306 self.single.with_command(keys, f)
307 }
308
309 #[instrument(name = "transaction::command::with_multi_query", level = "trace", skip(self, f))]
310 pub fn with_multi_query<F, R>(&self, f: F) -> Result<R>
311 where
312 F: FnOnce(&mut QueryTransaction) -> Result<R>,
313 {
314 self.check_active()?;
315
316 let mut query_txn =
317 QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
318
319 f(&mut query_txn)
320 }
321
322 #[instrument(name = "transaction::command::with_multi_query_as_of_exclusive", level = "trace", skip(self, f))]
323 pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
324 where
325 F: FnOnce(&mut QueryTransaction) -> Result<R>,
326 {
327 self.check_active()?;
328
329 let mut query_txn =
330 QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
331
332 query_txn.read_as_of_version_exclusive(version)?;
333
334 f(&mut query_txn)
335 }
336
337 #[instrument(name = "transaction::command::with_multi_query_as_of_inclusive", level = "trace", skip(self, f))]
338 pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
339 where
340 F: FnOnce(&mut QueryTransaction) -> Result<R>,
341 {
342 self.check_active()?;
343
344 let mut query_txn =
345 QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), self.identity);
346
347 query_txn.multi.read_as_of_version_inclusive(version);
348
349 f(&mut query_txn)
350 }
351
352 #[instrument(name = "transaction::command::begin_single_query", level = "trace", skip(self, keys))]
353 pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
354 where
355 I: IntoIterator<Item = &'a EncodedKey>,
356 {
357 self.check_active()?;
358 self.single.begin_query(keys)
359 }
360
361 #[instrument(name = "transaction::command::begin_single_command", level = "trace", skip(self, keys))]
362 pub fn begin_single_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
363 where
364 I: IntoIterator<Item = &'a EncodedKey>,
365 {
366 self.check_active()?;
367 self.single.begin_command(keys)
368 }
369
370 pub fn track_row_change(&mut self, changes: &[RowChange]) {
371 self.row_changes.extend_from_slice(changes);
372 }
373
374 pub fn track_flow_change(&mut self, change: Change) {
375 if let ChangeOrigin::Shape(id) = change.origin {
376 for diff in change.diffs {
377 self.accumulator.track(id, diff);
378 }
379 }
380 }
381
382 #[inline]
383 pub fn version(&self) -> CommitVersion {
384 self.cmd.as_ref().unwrap().version()
385 }
386
387 #[inline]
388 pub fn id(&self) -> TransactionId {
389 self.cmd.as_ref().unwrap().id()
390 }
391
392 #[inline]
393 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
394 self.check_active()?;
395 Ok(self.cmd.as_mut().unwrap().get(key)?.map(|v| v.into_multi_version_row()))
396 }
397
398 #[inline]
399 pub fn get_committed(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
400 self.check_active()?;
401 Ok(self.cmd.as_mut().unwrap().get_committed(key)?.map(|v| v.into_multi_version_row()))
402 }
403
404 #[inline]
405 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
406 self.check_active()?;
407 self.cmd.as_mut().unwrap().contains_key(key)
408 }
409
410 #[inline]
411 pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
412 self.check_active()?;
413 self.cmd.as_mut().unwrap().prefix(prefix)
414 }
415
416 #[inline]
417 pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
418 self.check_active()?;
419 self.cmd.as_mut().unwrap().prefix_rev(prefix)
420 }
421
422 #[inline]
423 pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
424 self.check_active()?;
425 self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version);
426 Ok(())
427 }
428
429 #[inline]
430 pub fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
431 self.check_active()?;
432 self.cmd.as_mut().unwrap().set(key, row)
433 }
434
435 #[inline]
436 pub fn reserve_writes(&mut self, additional: usize) -> Result<()> {
437 self.check_active()?;
438 self.cmd.as_mut().unwrap().reserve_writes(additional);
439 Ok(())
440 }
441
442 #[inline]
443 pub(crate) fn disable_conflict_tracking(&mut self) -> Result<()> {
444 self.check_active()?;
445 self.cmd.as_mut().unwrap().disable_conflict_tracking();
446 Ok(())
447 }
448
449 #[inline]
450 pub fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
451 self.check_active()?;
452 self.cmd.as_mut().unwrap().unset(key, row)
453 }
454
455 #[inline]
456 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
457 self.check_active()?;
458 self.cmd.as_mut().unwrap().remove(key)
459 }
460
461 #[inline]
462 pub fn mark_preexisting(&mut self, key: &EncodedKey) -> Result<()> {
463 self.check_active()?;
464 self.cmd.as_mut().unwrap().mark_preexisting(key);
465 Ok(())
466 }
467
468 #[inline]
469 pub fn range(
470 &mut self,
471 range: EncodedKeyRange,
472 batch_size: usize,
473 ) -> Result<Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>> {
474 self.check_active()?;
475 Ok(self.cmd.as_mut().unwrap().range(range, batch_size))
476 }
477
478 #[inline]
479 pub fn range_rev(
480 &mut self,
481 range: EncodedKeyRange,
482 batch_size: usize,
483 ) -> Result<Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>> {
484 self.check_active()?;
485 Ok(self.cmd.as_mut().unwrap().range_rev(range, batch_size))
486 }
487}
488
489impl WithEventBus for CommandTransaction {
490 fn event_bus(&self) -> &EventBus {
491 &self.event_bus
492 }
493}
494
495impl Write for CommandTransaction {
496 #[inline]
497 fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
498 CommandTransaction::set(self, key, row)
499 }
500 #[inline]
501 fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
502 CommandTransaction::unset(self, key, row)
503 }
504 #[inline]
505 fn remove(&mut self, key: &EncodedKey) -> Result<()> {
506 CommandTransaction::remove(self, key)
507 }
508 #[inline]
509 fn mark_preexisting(&mut self, key: &EncodedKey) -> Result<()> {
510 CommandTransaction::mark_preexisting(self, key)
511 }
512 #[inline]
513 fn track_row_change(&mut self, changes: &[RowChange]) {
514 CommandTransaction::track_row_change(self, changes)
515 }
516 #[inline]
517 fn track_flow_change(&mut self, change: Change) {
518 CommandTransaction::track_flow_change(self, change)
519 }
520}
521
522impl WithInterceptors for CommandTransaction {
523 fn table_row_pre_insert_interceptors(&mut self) -> &mut Chain<dyn TableRowPreInsertInterceptor + Send + Sync> {
524 &mut self.interceptors.table_row_pre_insert
525 }
526
527 fn table_row_post_insert_interceptors(
528 &mut self,
529 ) -> &mut Chain<dyn TableRowPostInsertInterceptor + Send + Sync> {
530 &mut self.interceptors.table_row_post_insert
531 }
532
533 fn table_row_pre_update_interceptors(&mut self) -> &mut Chain<dyn TableRowPreUpdateInterceptor + Send + Sync> {
534 &mut self.interceptors.table_row_pre_update
535 }
536
537 fn table_row_post_update_interceptors(
538 &mut self,
539 ) -> &mut Chain<dyn TableRowPostUpdateInterceptor + Send + Sync> {
540 &mut self.interceptors.table_row_post_update
541 }
542
543 fn table_row_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TableRowPreDeleteInterceptor + Send + Sync> {
544 &mut self.interceptors.table_row_pre_delete
545 }
546
547 fn table_row_post_delete_interceptors(
548 &mut self,
549 ) -> &mut Chain<dyn TableRowPostDeleteInterceptor + Send + Sync> {
550 &mut self.interceptors.table_row_post_delete
551 }
552
553 fn ringbuffer_row_pre_insert_interceptors(
554 &mut self,
555 ) -> &mut Chain<dyn RingBufferRowPreInsertInterceptor + Send + Sync> {
556 &mut self.interceptors.ringbuffer_row_pre_insert
557 }
558
559 fn ringbuffer_row_post_insert_interceptors(
560 &mut self,
561 ) -> &mut Chain<dyn RingBufferRowPostInsertInterceptor + Send + Sync> {
562 &mut self.interceptors.ringbuffer_row_post_insert
563 }
564
565 fn ringbuffer_row_pre_update_interceptors(
566 &mut self,
567 ) -> &mut Chain<dyn RingBufferRowPreUpdateInterceptor + Send + Sync> {
568 &mut self.interceptors.ringbuffer_row_pre_update
569 }
570
571 fn ringbuffer_row_post_update_interceptors(
572 &mut self,
573 ) -> &mut Chain<dyn RingBufferRowPostUpdateInterceptor + Send + Sync> {
574 &mut self.interceptors.ringbuffer_row_post_update
575 }
576
577 fn ringbuffer_row_pre_delete_interceptors(
578 &mut self,
579 ) -> &mut Chain<dyn RingBufferRowPreDeleteInterceptor + Send + Sync> {
580 &mut self.interceptors.ringbuffer_row_pre_delete
581 }
582
583 fn ringbuffer_row_post_delete_interceptors(
584 &mut self,
585 ) -> &mut Chain<dyn RingBufferRowPostDeleteInterceptor + Send + Sync> {
586 &mut self.interceptors.ringbuffer_row_post_delete
587 }
588
589 fn pre_commit_interceptors(&mut self) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync> {
590 &mut self.interceptors.pre_commit
591 }
592
593 fn post_commit_interceptors(&mut self) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync> {
594 &mut self.interceptors.post_commit
595 }
596
597 fn namespace_post_create_interceptors(
598 &mut self,
599 ) -> &mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync> {
600 &mut self.interceptors.namespace_post_create
601 }
602
603 fn namespace_pre_update_interceptors(&mut self) -> &mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync> {
604 &mut self.interceptors.namespace_pre_update
605 }
606
607 fn namespace_post_update_interceptors(
608 &mut self,
609 ) -> &mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync> {
610 &mut self.interceptors.namespace_post_update
611 }
612
613 fn namespace_pre_delete_interceptors(&mut self) -> &mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync> {
614 &mut self.interceptors.namespace_pre_delete
615 }
616
617 fn table_post_create_interceptors(&mut self) -> &mut Chain<dyn TablePostCreateInterceptor + Send + Sync> {
618 &mut self.interceptors.table_post_create
619 }
620
621 fn table_pre_update_interceptors(&mut self) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync> {
622 &mut self.interceptors.table_pre_update
623 }
624
625 fn table_post_update_interceptors(&mut self) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync> {
626 &mut self.interceptors.table_post_update
627 }
628
629 fn table_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync> {
630 &mut self.interceptors.table_pre_delete
631 }
632
633 fn view_row_pre_insert_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreInsertInterceptor + Send + Sync> {
634 &mut self.interceptors.view_row_pre_insert
635 }
636
637 fn view_row_post_insert_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostInsertInterceptor + Send + Sync> {
638 &mut self.interceptors.view_row_post_insert
639 }
640
641 fn view_row_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreUpdateInterceptor + Send + Sync> {
642 &mut self.interceptors.view_row_pre_update
643 }
644
645 fn view_row_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostUpdateInterceptor + Send + Sync> {
646 &mut self.interceptors.view_row_post_update
647 }
648
649 fn view_row_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewRowPreDeleteInterceptor + Send + Sync> {
650 &mut self.interceptors.view_row_pre_delete
651 }
652
653 fn view_row_post_delete_interceptors(&mut self) -> &mut Chain<dyn ViewRowPostDeleteInterceptor + Send + Sync> {
654 &mut self.interceptors.view_row_post_delete
655 }
656
657 fn view_post_create_interceptors(&mut self) -> &mut Chain<dyn ViewPostCreateInterceptor + Send + Sync> {
658 &mut self.interceptors.view_post_create
659 }
660
661 fn view_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync> {
662 &mut self.interceptors.view_pre_update
663 }
664
665 fn view_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync> {
666 &mut self.interceptors.view_post_update
667 }
668
669 fn view_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync> {
670 &mut self.interceptors.view_pre_delete
671 }
672
673 fn ringbuffer_post_create_interceptors(
674 &mut self,
675 ) -> &mut Chain<dyn RingBufferPostCreateInterceptor + Send + Sync> {
676 &mut self.interceptors.ringbuffer_post_create
677 }
678
679 fn ringbuffer_pre_update_interceptors(
680 &mut self,
681 ) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
682 &mut self.interceptors.ringbuffer_pre_update
683 }
684
685 fn ringbuffer_post_update_interceptors(
686 &mut self,
687 ) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
688 &mut self.interceptors.ringbuffer_post_update
689 }
690
691 fn ringbuffer_pre_delete_interceptors(
692 &mut self,
693 ) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
694 &mut self.interceptors.ringbuffer_pre_delete
695 }
696
697 fn dictionary_row_pre_insert_interceptors(
698 &mut self,
699 ) -> &mut Chain<dyn DictionaryRowPreInsertInterceptor + Send + Sync> {
700 &mut self.interceptors.dictionary_row_pre_insert
701 }
702
703 fn dictionary_row_post_insert_interceptors(
704 &mut self,
705 ) -> &mut Chain<dyn DictionaryRowPostInsertInterceptor + Send + Sync> {
706 &mut self.interceptors.dictionary_row_post_insert
707 }
708
709 fn dictionary_row_pre_update_interceptors(
710 &mut self,
711 ) -> &mut Chain<dyn DictionaryRowPreUpdateInterceptor + Send + Sync> {
712 &mut self.interceptors.dictionary_row_pre_update
713 }
714
715 fn dictionary_row_post_update_interceptors(
716 &mut self,
717 ) -> &mut Chain<dyn DictionaryRowPostUpdateInterceptor + Send + Sync> {
718 &mut self.interceptors.dictionary_row_post_update
719 }
720
721 fn dictionary_row_pre_delete_interceptors(
722 &mut self,
723 ) -> &mut Chain<dyn DictionaryRowPreDeleteInterceptor + Send + Sync> {
724 &mut self.interceptors.dictionary_row_pre_delete
725 }
726
727 fn dictionary_row_post_delete_interceptors(
728 &mut self,
729 ) -> &mut Chain<dyn DictionaryRowPostDeleteInterceptor + Send + Sync> {
730 &mut self.interceptors.dictionary_row_post_delete
731 }
732
733 fn dictionary_post_create_interceptors(
734 &mut self,
735 ) -> &mut Chain<dyn DictionaryPostCreateInterceptor + Send + Sync> {
736 &mut self.interceptors.dictionary_post_create
737 }
738
739 fn dictionary_pre_update_interceptors(
740 &mut self,
741 ) -> &mut Chain<dyn DictionaryPreUpdateInterceptor + Send + Sync> {
742 &mut self.interceptors.dictionary_pre_update
743 }
744
745 fn dictionary_post_update_interceptors(
746 &mut self,
747 ) -> &mut Chain<dyn DictionaryPostUpdateInterceptor + Send + Sync> {
748 &mut self.interceptors.dictionary_post_update
749 }
750
751 fn dictionary_pre_delete_interceptors(
752 &mut self,
753 ) -> &mut Chain<dyn DictionaryPreDeleteInterceptor + Send + Sync> {
754 &mut self.interceptors.dictionary_pre_delete
755 }
756
757 fn series_row_pre_insert_interceptors(
758 &mut self,
759 ) -> &mut Chain<dyn SeriesRowPreInsertInterceptor + Send + Sync> {
760 &mut self.interceptors.series_row_pre_insert
761 }
762
763 fn series_row_post_insert_interceptors(
764 &mut self,
765 ) -> &mut Chain<dyn SeriesRowPostInsertInterceptor + Send + Sync> {
766 &mut self.interceptors.series_row_post_insert
767 }
768
769 fn series_row_pre_update_interceptors(
770 &mut self,
771 ) -> &mut Chain<dyn SeriesRowPreUpdateInterceptor + Send + Sync> {
772 &mut self.interceptors.series_row_pre_update
773 }
774
775 fn series_row_post_update_interceptors(
776 &mut self,
777 ) -> &mut Chain<dyn SeriesRowPostUpdateInterceptor + Send + Sync> {
778 &mut self.interceptors.series_row_post_update
779 }
780
781 fn series_row_pre_delete_interceptors(
782 &mut self,
783 ) -> &mut Chain<dyn SeriesRowPreDeleteInterceptor + Send + Sync> {
784 &mut self.interceptors.series_row_pre_delete
785 }
786
787 fn series_row_post_delete_interceptors(
788 &mut self,
789 ) -> &mut Chain<dyn SeriesRowPostDeleteInterceptor + Send + Sync> {
790 &mut self.interceptors.series_row_post_delete
791 }
792
793 fn series_post_create_interceptors(&mut self) -> &mut Chain<dyn SeriesPostCreateInterceptor + Send + Sync> {
794 &mut self.interceptors.series_post_create
795 }
796
797 fn series_pre_update_interceptors(&mut self) -> &mut Chain<dyn SeriesPreUpdateInterceptor + Send + Sync> {
798 &mut self.interceptors.series_pre_update
799 }
800
801 fn series_post_update_interceptors(&mut self) -> &mut Chain<dyn SeriesPostUpdateInterceptor + Send + Sync> {
802 &mut self.interceptors.series_post_update
803 }
804
805 fn series_pre_delete_interceptors(&mut self) -> &mut Chain<dyn SeriesPreDeleteInterceptor + Send + Sync> {
806 &mut self.interceptors.series_pre_delete
807 }
808
809 fn identity_post_create_interceptors(&mut self) -> &mut Chain<dyn IdentityPostCreateInterceptor + Send + Sync> {
810 &mut self.interceptors.identity_post_create
811 }
812
813 fn identity_pre_update_interceptors(&mut self) -> &mut Chain<dyn IdentityPreUpdateInterceptor + Send + Sync> {
814 &mut self.interceptors.identity_pre_update
815 }
816
817 fn identity_post_update_interceptors(&mut self) -> &mut Chain<dyn IdentityPostUpdateInterceptor + Send + Sync> {
818 &mut self.interceptors.identity_post_update
819 }
820
821 fn identity_pre_delete_interceptors(&mut self) -> &mut Chain<dyn IdentityPreDeleteInterceptor + Send + Sync> {
822 &mut self.interceptors.identity_pre_delete
823 }
824
825 fn role_post_create_interceptors(&mut self) -> &mut Chain<dyn RolePostCreateInterceptor + Send + Sync> {
826 &mut self.interceptors.role_post_create
827 }
828
829 fn role_pre_update_interceptors(&mut self) -> &mut Chain<dyn RolePreUpdateInterceptor + Send + Sync> {
830 &mut self.interceptors.role_pre_update
831 }
832
833 fn role_post_update_interceptors(&mut self) -> &mut Chain<dyn RolePostUpdateInterceptor + Send + Sync> {
834 &mut self.interceptors.role_post_update
835 }
836
837 fn role_pre_delete_interceptors(&mut self) -> &mut Chain<dyn RolePreDeleteInterceptor + Send + Sync> {
838 &mut self.interceptors.role_pre_delete
839 }
840
841 fn granted_role_post_create_interceptors(
842 &mut self,
843 ) -> &mut Chain<dyn GrantedRolePostCreateInterceptor + Send + Sync> {
844 &mut self.interceptors.granted_role_post_create
845 }
846
847 fn granted_role_pre_delete_interceptors(
848 &mut self,
849 ) -> &mut Chain<dyn GrantedRolePreDeleteInterceptor + Send + Sync> {
850 &mut self.interceptors.granted_role_pre_delete
851 }
852
853 fn authentication_post_create_interceptors(
854 &mut self,
855 ) -> &mut Chain<dyn AuthenticationPostCreateInterceptor + Send + Sync> {
856 &mut self.interceptors.authentication_post_create
857 }
858
859 fn authentication_pre_delete_interceptors(
860 &mut self,
861 ) -> &mut Chain<dyn AuthenticationPreDeleteInterceptor + Send + Sync> {
862 &mut self.interceptors.authentication_pre_delete
863 }
864}
865
866impl Drop for CommandTransaction {
867 fn drop(&mut self) {
868 if let Some(mut multi) = self.cmd.take()
869 && (self.state == TransactionState::Active || self.state == TransactionState::Poisoned)
870 {
871 let _ = multi.rollback();
872 }
873 }
874}