Skip to main content

reifydb_transaction/transaction/
admin.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::mem::take;
5
6use reifydb_core::{
7	common::CommitVersion,
8	delta::Delta,
9	encoded::{
10		encoded::EncodedValues,
11		key::{EncodedKey, EncodedKeyRange},
12	},
13	event::EventBus,
14	interface::{
15		WithEventBus,
16		change::Change,
17		store::{MultiVersionBatch, MultiVersionValues},
18	},
19	testing::TestingContext,
20};
21use reifydb_type::Result;
22use tracing::instrument;
23
24use crate::{
25	TransactionId,
26	change::{RowChange, TransactionalChanges, TransactionalDefChanges},
27	error::TransactionError,
28	interceptor::{
29		WithInterceptors,
30		chain::InterceptorChain as Chain,
31		interceptors::Interceptors,
32		namespace::{
33			NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
34			NamespacePreUpdateInterceptor,
35		},
36		ringbuffer::{
37			RingBufferPostDeleteInterceptor, RingBufferPostInsertInterceptor,
38			RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
39			RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor,
40		},
41		ringbuffer_def::{
42			RingBufferDefPostCreateInterceptor, RingBufferDefPostUpdateInterceptor,
43			RingBufferDefPreDeleteInterceptor, RingBufferDefPreUpdateInterceptor,
44		},
45		table::{
46			TablePostDeleteInterceptor, TablePostInsertInterceptor, TablePostUpdateInterceptor,
47			TablePreDeleteInterceptor, TablePreInsertInterceptor, TablePreUpdateInterceptor,
48		},
49		table_def::{
50			TableDefPostCreateInterceptor, TableDefPostUpdateInterceptor, TableDefPreDeleteInterceptor,
51			TableDefPreUpdateInterceptor,
52		},
53		transaction::{PostCommitContext, PostCommitInterceptor, PreCommitContext, PreCommitInterceptor},
54		view::{
55			ViewPostDeleteInterceptor, ViewPostInsertInterceptor, ViewPostUpdateInterceptor,
56			ViewPreDeleteInterceptor, ViewPreInsertInterceptor, ViewPreUpdateInterceptor,
57		},
58		view_def::{
59			ViewDefPostCreateInterceptor, ViewDefPostUpdateInterceptor, ViewDefPreDeleteInterceptor,
60			ViewDefPreUpdateInterceptor,
61		},
62	},
63	multi::{
64		pending::PendingWrites,
65		transaction::{
66			MultiTransaction,
67			write::{MultiWriteTransaction, WriteSavepoint},
68		},
69	},
70	single::{SingleTransaction, read::SingleReadTransaction, write::SingleWriteTransaction},
71	transaction::query::QueryTransaction,
72};
73
74/// An active admin transaction that supports Query + DML + DDL operations.
75///
76/// AdminTransaction is the most privileged transaction type, capable of
77/// executing DDL (schema changes), DML (data mutations), and queries.
78/// It tracks catalog definition changes (TransactionalDefChanges) for DDL.
79///
80/// The transaction will auto-rollback on drop if not explicitly committed.
81pub struct AdminTransaction {
82	pub multi: MultiTransaction,
83	pub single: SingleTransaction,
84	state: TransactionState,
85
86	pub cmd: Option<MultiWriteTransaction>,
87	pub event_bus: EventBus,
88	pub changes: TransactionalDefChanges,
89
90	// Track row changes for post-commit events
91	pub(crate) row_changes: Vec<RowChange>,
92	pub(crate) interceptors: Interceptors,
93
94	// Track table changes for transactional flow pre-commit processing
95	pub(crate) pending_flow_changes: Vec<Change>,
96
97	/// Testing audit log. Set by the VM when in test context.
98	pub testing: Option<TestingContext>,
99}
100
101/// Opaque savepoint for per-test transaction isolation.
102pub struct Savepoint {
103	write: WriteSavepoint,
104	row_changes_len: usize,
105	flow_changes_len: usize,
106}
107
108#[derive(Clone, Copy, PartialEq)]
109enum TransactionState {
110	Active,
111	Committed,
112	RolledBack,
113}
114
115impl AdminTransaction {
116	/// Create a savepoint capturing current transaction state.
117	pub fn savepoint(&self) -> Savepoint {
118		Savepoint {
119			write: self.cmd.as_ref().unwrap().savepoint(),
120			row_changes_len: self.row_changes.len(),
121			flow_changes_len: self.pending_flow_changes.len(),
122		}
123	}
124
125	/// Restore transaction state to a previously created savepoint.
126	pub fn restore_savepoint(&mut self, sp: Savepoint) {
127		self.cmd.as_mut().unwrap().restore_savepoint(sp.write);
128		self.row_changes.truncate(sp.row_changes_len);
129		self.pending_flow_changes.truncate(sp.flow_changes_len);
130	}
131
132	/// Reset test-only flow bookkeeping so setup statements do not appear as
133	/// in-test mutations when `RUN TESTS` later performs an inline flush.
134	pub fn clear_test_flow_state(&mut self) {
135		self.pending_flow_changes.clear();
136		self.testing = None;
137	}
138
139	/// Execute test-only pre-commit style processing without committing.
140	///
141	/// This is used by testing helpers that need commit-time flow work
142	/// materialized while still staying inside the test savepoint.
143	pub fn capture_testing_pre_commit<F>(&mut self, f: F) -> Result<()>
144	where
145		F: FnOnce(&mut PreCommitContext) -> Result<()>,
146	{
147		let transaction_writes: Vec<(EncodedKey, Option<EncodedValues>)> = self
148			.pending_writes()
149			.iter()
150			.map(|(key, pending)| match &pending.delta {
151				Delta::Set {
152					values,
153					..
154				} => (key.clone(), Some(values.clone())),
155				_ => (key.clone(), None),
156			})
157			.collect();
158
159		let mut ctx = PreCommitContext {
160			flow_changes: take(&mut self.pending_flow_changes),
161			pending_writes: Vec::new(),
162			transaction_writes,
163			testing: self.testing.take().or_else(|| Some(TestingContext::new())),
164		};
165
166		f(&mut ctx)?;
167		self.testing = ctx.testing;
168
169		for (key, value) in &ctx.pending_writes {
170			match value {
171				Some(v) => self.cmd.as_mut().unwrap().set(key, v.clone())?,
172				None => self.cmd.as_mut().unwrap().remove(key)?,
173			}
174		}
175
176		Ok(())
177	}
178}
179
180impl AdminTransaction {
181	/// Creates a new active admin transaction with a pre-commit callback
182	#[instrument(name = "transaction::admin::new", level = "debug", skip_all)]
183	pub fn new(
184		multi: MultiTransaction,
185		single: SingleTransaction,
186		event_bus: EventBus,
187		interceptors: Interceptors,
188	) -> Result<Self> {
189		let cmd = multi.begin_command()?;
190		let txn_id = cmd.tm.id();
191		Ok(Self {
192			cmd: Some(cmd),
193			multi,
194			single,
195			state: TransactionState::Active,
196			event_bus,
197			interceptors,
198			changes: TransactionalDefChanges::new(txn_id),
199			row_changes: Vec::new(),
200			pending_flow_changes: Vec::new(),
201			testing: None,
202		})
203	}
204
205	#[instrument(name = "transaction::admin::event_bus", level = "trace", skip(self))]
206	pub fn event_bus(&self) -> &EventBus {
207		&self.event_bus
208	}
209
210	/// Check if transaction is still active and return appropriate error if
211	/// not
212	fn check_active(&self) -> Result<()> {
213		match self.state {
214			TransactionState::Active => Ok(()),
215			TransactionState::Committed => {
216				return Err(TransactionError::AlreadyCommitted.into());
217			}
218			TransactionState::RolledBack => {
219				return Err(TransactionError::AlreadyRolledBack.into());
220			}
221		}
222	}
223
224	/// Commit the transaction.
225	/// Since single transactions are short-lived and auto-commit,
226	/// this only commits the multi transaction.
227	#[instrument(name = "transaction::admin::commit", level = "debug", skip(self))]
228	pub fn commit(&mut self) -> Result<CommitVersion> {
229		self.check_active()?;
230
231		let transaction_writes: Vec<(EncodedKey, Option<EncodedValues>)> = self
232			.pending_writes()
233			.iter()
234			.map(|(key, pending)| match &pending.delta {
235				Delta::Set {
236					values,
237					..
238				} => (key.clone(), Some(values.clone())),
239				_ => (key.clone(), None),
240			})
241			.collect();
242
243		let mut ctx = PreCommitContext {
244			flow_changes: take(&mut self.pending_flow_changes),
245			pending_writes: Vec::new(),
246			transaction_writes,
247			testing: self.testing.take(),
248		};
249		self.interceptors.pre_commit.execute(&mut ctx)?;
250		self.testing = ctx.testing;
251
252		if let Some(mut multi) = self.cmd.take() {
253			// Apply pending view writes produced by pre-commit interceptors
254			for (key, value) in &ctx.pending_writes {
255				match value {
256					Some(v) => multi.set(key, v.clone())?,
257					None => multi.remove(key)?,
258				}
259			}
260
261			let id = multi.tm.id();
262			self.state = TransactionState::Committed;
263
264			let changes = take(&mut self.changes);
265			let row_changes = take(&mut self.row_changes);
266
267			let version = multi.commit()?;
268			self.interceptors.post_commit.execute(PostCommitContext::new(
269				id,
270				version,
271				changes,
272				row_changes,
273			))?;
274
275			Ok(version)
276		} else {
277			// This should never happen due to check_active
278			unreachable!("Transaction state inconsistency")
279		}
280	}
281
282	/// Rollback the transaction.
283	#[instrument(name = "transaction::admin::rollback", level = "debug", skip(self))]
284	pub fn rollback(&mut self) -> Result<()> {
285		self.check_active()?;
286		if let Some(mut multi) = self.cmd.take() {
287			self.state = TransactionState::RolledBack;
288			multi.rollback()
289		} else {
290			// This should never happen due to check_active
291			unreachable!("Transaction state inconsistency")
292		}
293	}
294
295	/// Get access to the pending writes in this transaction
296	#[instrument(name = "transaction::admin::pending_writes", level = "trace", skip(self))]
297	pub fn pending_writes(&self) -> &PendingWrites {
298		self.cmd.as_ref().unwrap().pending_writes()
299	}
300
301	/// Execute a function with query access to the single transaction.
302	#[instrument(name = "transaction::admin::with_single_query", level = "trace", skip(self, keys, f))]
303	pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
304	where
305		I: IntoIterator<Item = &'a EncodedKey> + Send,
306		F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
307		R: Send,
308	{
309		self.check_active()?;
310		self.single.with_query(keys, f)
311	}
312
313	/// Execute a function with query access to the single transaction.
314	#[instrument(name = "transaction::admin::with_single_command", level = "trace", skip(self, keys, f))]
315	pub fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
316	where
317		I: IntoIterator<Item = &'a EncodedKey> + Send,
318		F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R> + Send,
319		R: Send,
320	{
321		self.check_active()?;
322		self.single.with_command(keys, f)
323	}
324
325	/// Execute a function with a query transaction view.
326	#[instrument(name = "transaction::admin::with_multi_query", level = "trace", skip(self, f))]
327	pub fn with_multi_query<F, R>(&self, f: F) -> Result<R>
328	where
329		F: FnOnce(&mut QueryTransaction) -> Result<R>,
330	{
331		self.check_active()?;
332
333		let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
334
335		f(&mut query_txn)
336	}
337
338	#[instrument(name = "transaction::admin::with_multi_query_as_of_exclusive", level = "trace", skip(self, f))]
339	pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
340	where
341		F: FnOnce(&mut QueryTransaction) -> Result<R>,
342	{
343		self.check_active()?;
344
345		let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
346
347		query_txn.read_as_of_version_exclusive(version)?;
348
349		f(&mut query_txn)
350	}
351
352	#[instrument(name = "transaction::admin::with_multi_query_as_of_inclusive", level = "trace", skip(self, f))]
353	pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
354	where
355		F: FnOnce(&mut QueryTransaction) -> Result<R>,
356	{
357		self.check_active()?;
358
359		let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
360
361		query_txn.multi.read_as_of_version_inclusive(version);
362
363		f(&mut query_txn)
364	}
365
366	/// Begin a single-version query transaction for specific keys
367	#[instrument(name = "transaction::admin::begin_single_query", level = "trace", skip(self, keys))]
368	pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
369	where
370		I: IntoIterator<Item = &'a EncodedKey>,
371	{
372		self.check_active()?;
373		self.single.begin_query(keys)
374	}
375
376	/// Begin a single-version command transaction for specific keys
377	#[instrument(name = "transaction::admin::begin_single_command", level = "trace", skip(self, keys))]
378	pub fn begin_single_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
379	where
380		I: IntoIterator<Item = &'a EncodedKey>,
381	{
382		self.check_active()?;
383		self.single.begin_command(keys)
384	}
385
386	/// Get reference to catalog changes for this transaction
387	pub fn get_changes(&self) -> &TransactionalDefChanges {
388		&self.changes
389	}
390
391	/// Track a row change for post-commit event emission
392	pub fn track_row_change(&mut self, change: RowChange) {
393		self.row_changes.push(change);
394	}
395
396	/// Track a flow change for transactional view pre-commit processing
397	pub fn track_flow_change(&mut self, change: Change) {
398		self.pending_flow_changes.push(change);
399	}
400
401	/// Get the transaction version
402	#[inline]
403	pub fn version(&self) -> CommitVersion {
404		self.cmd.as_ref().unwrap().version()
405	}
406
407	/// Get the transaction ID
408	#[inline]
409	pub fn id(&self) -> TransactionId {
410		self.cmd.as_ref().unwrap().tm.id()
411	}
412
413	/// Get a value by key
414	#[inline]
415	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>> {
416		self.check_active()?;
417		Ok(self.cmd.as_mut().unwrap().get(key)?.map(|v| v.into_multi_version_values()))
418	}
419
420	/// Check if a key exists
421	#[inline]
422	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
423		self.check_active()?;
424		self.cmd.as_mut().unwrap().contains_key(key)
425	}
426
427	/// Get a prefix batch
428	#[inline]
429	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
430		self.check_active()?;
431		self.cmd.as_mut().unwrap().prefix(prefix)
432	}
433
434	/// Get a reverse prefix batch
435	#[inline]
436	pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
437		self.check_active()?;
438		self.cmd.as_mut().unwrap().prefix_rev(prefix)
439	}
440
441	/// Read as of version exclusive
442	#[inline]
443	pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
444		self.check_active()?;
445		self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version);
446		Ok(())
447	}
448
449	/// Set a key-value pair
450	#[inline]
451	pub fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> Result<()> {
452		self.check_active()?;
453		self.cmd.as_mut().unwrap().set(key, row)
454	}
455
456	/// Unset a key, preserving the deleted values.
457	#[inline]
458	pub fn unset(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()> {
459		self.check_active()?;
460		self.cmd.as_mut().unwrap().unset(key, values)
461	}
462
463	/// Remove a key without preserving the deleted values.
464	#[inline]
465	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
466		self.check_active()?;
467		self.cmd.as_mut().unwrap().remove(key)
468	}
469
470	/// Create a streaming iterator for forward range queries.
471	#[inline]
472	pub fn range(
473		&mut self,
474		range: EncodedKeyRange,
475		batch_size: usize,
476	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
477		self.check_active()?;
478		Ok(self.cmd.as_mut().unwrap().range(range, batch_size))
479	}
480
481	/// Create a streaming iterator for reverse range queries.
482	#[inline]
483	pub fn range_rev(
484		&mut self,
485		range: EncodedKeyRange,
486		batch_size: usize,
487	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
488		self.check_active()?;
489		Ok(self.cmd.as_mut().unwrap().range_rev(range, batch_size))
490	}
491}
492
493impl WithEventBus for AdminTransaction {
494	fn event_bus(&self) -> &EventBus {
495		&self.event_bus
496	}
497}
498
499impl WithInterceptors for AdminTransaction {
500	fn table_pre_insert_interceptors(&mut self) -> &mut Chain<dyn TablePreInsertInterceptor + Send + Sync> {
501		&mut self.interceptors.table_pre_insert
502	}
503
504	fn table_post_insert_interceptors(&mut self) -> &mut Chain<dyn TablePostInsertInterceptor + Send + Sync> {
505		&mut self.interceptors.table_post_insert
506	}
507
508	fn table_pre_update_interceptors(&mut self) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync> {
509		&mut self.interceptors.table_pre_update
510	}
511
512	fn table_post_update_interceptors(&mut self) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync> {
513		&mut self.interceptors.table_post_update
514	}
515
516	fn table_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync> {
517		&mut self.interceptors.table_pre_delete
518	}
519
520	fn table_post_delete_interceptors(&mut self) -> &mut Chain<dyn TablePostDeleteInterceptor + Send + Sync> {
521		&mut self.interceptors.table_post_delete
522	}
523
524	fn ringbuffer_pre_insert_interceptors(
525		&mut self,
526	) -> &mut Chain<dyn RingBufferPreInsertInterceptor + Send + Sync> {
527		&mut self.interceptors.ringbuffer_pre_insert
528	}
529
530	fn ringbuffer_post_insert_interceptors(
531		&mut self,
532	) -> &mut Chain<dyn RingBufferPostInsertInterceptor + Send + Sync> {
533		&mut self.interceptors.ringbuffer_post_insert
534	}
535
536	fn ringbuffer_pre_update_interceptors(
537		&mut self,
538	) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
539		&mut self.interceptors.ringbuffer_pre_update
540	}
541
542	fn ringbuffer_post_update_interceptors(
543		&mut self,
544	) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
545		&mut self.interceptors.ringbuffer_post_update
546	}
547
548	fn ringbuffer_pre_delete_interceptors(
549		&mut self,
550	) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
551		&mut self.interceptors.ringbuffer_pre_delete
552	}
553
554	fn ringbuffer_post_delete_interceptors(
555		&mut self,
556	) -> &mut Chain<dyn RingBufferPostDeleteInterceptor + Send + Sync> {
557		&mut self.interceptors.ringbuffer_post_delete
558	}
559
560	fn pre_commit_interceptors(&mut self) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync> {
561		&mut self.interceptors.pre_commit
562	}
563
564	fn post_commit_interceptors(&mut self) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync> {
565		&mut self.interceptors.post_commit
566	}
567
568	fn namespace_post_create_interceptors(
569		&mut self,
570	) -> &mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync> {
571		&mut self.interceptors.namespace_post_create
572	}
573
574	fn namespace_pre_update_interceptors(&mut self) -> &mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync> {
575		&mut self.interceptors.namespace_pre_update
576	}
577
578	fn namespace_post_update_interceptors(
579		&mut self,
580	) -> &mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync> {
581		&mut self.interceptors.namespace_post_update
582	}
583
584	fn namespace_pre_delete_interceptors(&mut self) -> &mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync> {
585		&mut self.interceptors.namespace_pre_delete
586	}
587
588	fn table_def_post_create_interceptors(
589		&mut self,
590	) -> &mut Chain<dyn TableDefPostCreateInterceptor + Send + Sync> {
591		&mut self.interceptors.table_def_post_create
592	}
593
594	fn table_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn TableDefPreUpdateInterceptor + Send + Sync> {
595		&mut self.interceptors.table_def_pre_update
596	}
597
598	fn table_def_post_update_interceptors(
599		&mut self,
600	) -> &mut Chain<dyn TableDefPostUpdateInterceptor + Send + Sync> {
601		&mut self.interceptors.table_def_post_update
602	}
603
604	fn table_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TableDefPreDeleteInterceptor + Send + Sync> {
605		&mut self.interceptors.table_def_pre_delete
606	}
607
608	fn view_pre_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPreInsertInterceptor + Send + Sync> {
609		&mut self.interceptors.view_pre_insert
610	}
611
612	fn view_post_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPostInsertInterceptor + Send + Sync> {
613		&mut self.interceptors.view_post_insert
614	}
615
616	fn view_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync> {
617		&mut self.interceptors.view_pre_update
618	}
619
620	fn view_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync> {
621		&mut self.interceptors.view_post_update
622	}
623
624	fn view_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync> {
625		&mut self.interceptors.view_pre_delete
626	}
627
628	fn view_post_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPostDeleteInterceptor + Send + Sync> {
629		&mut self.interceptors.view_post_delete
630	}
631
632	fn view_def_post_create_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostCreateInterceptor + Send + Sync> {
633		&mut self.interceptors.view_def_post_create
634	}
635
636	fn view_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreUpdateInterceptor + Send + Sync> {
637		&mut self.interceptors.view_def_pre_update
638	}
639
640	fn view_def_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostUpdateInterceptor + Send + Sync> {
641		&mut self.interceptors.view_def_post_update
642	}
643
644	fn view_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreDeleteInterceptor + Send + Sync> {
645		&mut self.interceptors.view_def_pre_delete
646	}
647
648	fn ringbuffer_def_post_create_interceptors(
649		&mut self,
650	) -> &mut Chain<dyn RingBufferDefPostCreateInterceptor + Send + Sync> {
651		&mut self.interceptors.ringbuffer_def_post_create
652	}
653
654	fn ringbuffer_def_pre_update_interceptors(
655		&mut self,
656	) -> &mut Chain<dyn RingBufferDefPreUpdateInterceptor + Send + Sync> {
657		&mut self.interceptors.ringbuffer_def_pre_update
658	}
659
660	fn ringbuffer_def_post_update_interceptors(
661		&mut self,
662	) -> &mut Chain<dyn RingBufferDefPostUpdateInterceptor + Send + Sync> {
663		&mut self.interceptors.ringbuffer_def_post_update
664	}
665
666	fn ringbuffer_def_pre_delete_interceptors(
667		&mut self,
668	) -> &mut Chain<dyn RingBufferDefPreDeleteInterceptor + Send + Sync> {
669		&mut self.interceptors.ringbuffer_def_pre_delete
670	}
671}
672
673impl TransactionalChanges for AdminTransaction {}
674
675impl Drop for AdminTransaction {
676	fn drop(&mut self) {
677		if let Some(mut multi) = self.cmd.take() {
678			// Auto-rollback if still active (not committed or rolled back)
679			if self.state == TransactionState::Active {
680				let _ = multi.rollback();
681			}
682		}
683	}
684}