1use std::mem::take;
5
6use reifydb_core::{
7 common::CommitVersion,
8 delta::Delta,
9 encoded::{
10 encoded::EncodedValues,
11 key::{EncodedKey, EncodedKeyRange},
12 },
13 event::EventBus,
14 interface::{
15 WithEventBus,
16 change::Change,
17 store::{MultiVersionBatch, MultiVersionValues},
18 },
19 testing::TestingContext,
20};
21use reifydb_type::Result;
22use tracing::instrument;
23
24use crate::{
25 TransactionId,
26 change::{RowChange, TransactionalChanges, TransactionalDefChanges},
27 error::TransactionError,
28 interceptor::{
29 WithInterceptors,
30 chain::InterceptorChain as Chain,
31 interceptors::Interceptors,
32 namespace::{
33 NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
34 NamespacePreUpdateInterceptor,
35 },
36 ringbuffer::{
37 RingBufferPostDeleteInterceptor, RingBufferPostInsertInterceptor,
38 RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
39 RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor,
40 },
41 ringbuffer_def::{
42 RingBufferDefPostCreateInterceptor, RingBufferDefPostUpdateInterceptor,
43 RingBufferDefPreDeleteInterceptor, RingBufferDefPreUpdateInterceptor,
44 },
45 table::{
46 TablePostDeleteInterceptor, TablePostInsertInterceptor, TablePostUpdateInterceptor,
47 TablePreDeleteInterceptor, TablePreInsertInterceptor, TablePreUpdateInterceptor,
48 },
49 table_def::{
50 TableDefPostCreateInterceptor, TableDefPostUpdateInterceptor, TableDefPreDeleteInterceptor,
51 TableDefPreUpdateInterceptor,
52 },
53 transaction::{PostCommitContext, PostCommitInterceptor, PreCommitContext, PreCommitInterceptor},
54 view::{
55 ViewPostDeleteInterceptor, ViewPostInsertInterceptor, ViewPostUpdateInterceptor,
56 ViewPreDeleteInterceptor, ViewPreInsertInterceptor, ViewPreUpdateInterceptor,
57 },
58 view_def::{
59 ViewDefPostCreateInterceptor, ViewDefPostUpdateInterceptor, ViewDefPreDeleteInterceptor,
60 ViewDefPreUpdateInterceptor,
61 },
62 },
63 multi::{
64 pending::PendingWrites,
65 transaction::{
66 MultiTransaction,
67 write::{MultiWriteTransaction, WriteSavepoint},
68 },
69 },
70 single::{SingleTransaction, read::SingleReadTransaction, write::SingleWriteTransaction},
71 transaction::query::QueryTransaction,
72};
73
74pub struct AdminTransaction {
82 pub multi: MultiTransaction,
83 pub single: SingleTransaction,
84 state: TransactionState,
85
86 pub cmd: Option<MultiWriteTransaction>,
87 pub event_bus: EventBus,
88 pub changes: TransactionalDefChanges,
89
90 pub(crate) row_changes: Vec<RowChange>,
92 pub(crate) interceptors: Interceptors,
93
94 pub(crate) pending_flow_changes: Vec<Change>,
96
97 pub testing: Option<TestingContext>,
99}
100
101pub struct Savepoint {
103 write: WriteSavepoint,
104 row_changes_len: usize,
105 flow_changes_len: usize,
106}
107
108#[derive(Clone, Copy, PartialEq)]
109enum TransactionState {
110 Active,
111 Committed,
112 RolledBack,
113}
114
115impl AdminTransaction {
116 pub fn savepoint(&self) -> Savepoint {
118 Savepoint {
119 write: self.cmd.as_ref().unwrap().savepoint(),
120 row_changes_len: self.row_changes.len(),
121 flow_changes_len: self.pending_flow_changes.len(),
122 }
123 }
124
125 pub fn restore_savepoint(&mut self, sp: Savepoint) {
127 self.cmd.as_mut().unwrap().restore_savepoint(sp.write);
128 self.row_changes.truncate(sp.row_changes_len);
129 self.pending_flow_changes.truncate(sp.flow_changes_len);
130 }
131
132 pub fn clear_test_flow_state(&mut self) {
135 self.pending_flow_changes.clear();
136 self.testing = None;
137 }
138
139 pub fn capture_testing_pre_commit<F>(&mut self, f: F) -> Result<()>
144 where
145 F: FnOnce(&mut PreCommitContext) -> Result<()>,
146 {
147 let transaction_writes: Vec<(EncodedKey, Option<EncodedValues>)> = self
148 .pending_writes()
149 .iter()
150 .map(|(key, pending)| match &pending.delta {
151 Delta::Set {
152 values,
153 ..
154 } => (key.clone(), Some(values.clone())),
155 _ => (key.clone(), None),
156 })
157 .collect();
158
159 let mut ctx = PreCommitContext {
160 flow_changes: take(&mut self.pending_flow_changes),
161 pending_writes: Vec::new(),
162 transaction_writes,
163 testing: self.testing.take().or_else(|| Some(TestingContext::new())),
164 };
165
166 f(&mut ctx)?;
167 self.testing = ctx.testing;
168
169 for (key, value) in &ctx.pending_writes {
170 match value {
171 Some(v) => self.cmd.as_mut().unwrap().set(key, v.clone())?,
172 None => self.cmd.as_mut().unwrap().remove(key)?,
173 }
174 }
175
176 Ok(())
177 }
178}
179
180impl AdminTransaction {
181 #[instrument(name = "transaction::admin::new", level = "debug", skip_all)]
183 pub fn new(
184 multi: MultiTransaction,
185 single: SingleTransaction,
186 event_bus: EventBus,
187 interceptors: Interceptors,
188 ) -> Result<Self> {
189 let cmd = multi.begin_command()?;
190 let txn_id = cmd.tm.id();
191 Ok(Self {
192 cmd: Some(cmd),
193 multi,
194 single,
195 state: TransactionState::Active,
196 event_bus,
197 interceptors,
198 changes: TransactionalDefChanges::new(txn_id),
199 row_changes: Vec::new(),
200 pending_flow_changes: Vec::new(),
201 testing: None,
202 })
203 }
204
205 #[instrument(name = "transaction::admin::event_bus", level = "trace", skip(self))]
206 pub fn event_bus(&self) -> &EventBus {
207 &self.event_bus
208 }
209
210 fn check_active(&self) -> Result<()> {
213 match self.state {
214 TransactionState::Active => Ok(()),
215 TransactionState::Committed => {
216 return Err(TransactionError::AlreadyCommitted.into());
217 }
218 TransactionState::RolledBack => {
219 return Err(TransactionError::AlreadyRolledBack.into());
220 }
221 }
222 }
223
224 #[instrument(name = "transaction::admin::commit", level = "debug", skip(self))]
228 pub fn commit(&mut self) -> Result<CommitVersion> {
229 self.check_active()?;
230
231 let transaction_writes: Vec<(EncodedKey, Option<EncodedValues>)> = self
232 .pending_writes()
233 .iter()
234 .map(|(key, pending)| match &pending.delta {
235 Delta::Set {
236 values,
237 ..
238 } => (key.clone(), Some(values.clone())),
239 _ => (key.clone(), None),
240 })
241 .collect();
242
243 let mut ctx = PreCommitContext {
244 flow_changes: take(&mut self.pending_flow_changes),
245 pending_writes: Vec::new(),
246 transaction_writes,
247 testing: self.testing.take(),
248 };
249 self.interceptors.pre_commit.execute(&mut ctx)?;
250 self.testing = ctx.testing;
251
252 if let Some(mut multi) = self.cmd.take() {
253 for (key, value) in &ctx.pending_writes {
255 match value {
256 Some(v) => multi.set(key, v.clone())?,
257 None => multi.remove(key)?,
258 }
259 }
260
261 let id = multi.tm.id();
262 self.state = TransactionState::Committed;
263
264 let changes = take(&mut self.changes);
265 let row_changes = take(&mut self.row_changes);
266
267 let version = multi.commit()?;
268 self.interceptors.post_commit.execute(PostCommitContext::new(
269 id,
270 version,
271 changes,
272 row_changes,
273 ))?;
274
275 Ok(version)
276 } else {
277 unreachable!("Transaction state inconsistency")
279 }
280 }
281
282 #[instrument(name = "transaction::admin::rollback", level = "debug", skip(self))]
284 pub fn rollback(&mut self) -> Result<()> {
285 self.check_active()?;
286 if let Some(mut multi) = self.cmd.take() {
287 self.state = TransactionState::RolledBack;
288 multi.rollback()
289 } else {
290 unreachable!("Transaction state inconsistency")
292 }
293 }
294
295 #[instrument(name = "transaction::admin::pending_writes", level = "trace", skip(self))]
297 pub fn pending_writes(&self) -> &PendingWrites {
298 self.cmd.as_ref().unwrap().pending_writes()
299 }
300
301 #[instrument(name = "transaction::admin::with_single_query", level = "trace", skip(self, keys, f))]
303 pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
304 where
305 I: IntoIterator<Item = &'a EncodedKey> + Send,
306 F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
307 R: Send,
308 {
309 self.check_active()?;
310 self.single.with_query(keys, f)
311 }
312
313 #[instrument(name = "transaction::admin::with_single_command", level = "trace", skip(self, keys, f))]
315 pub fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
316 where
317 I: IntoIterator<Item = &'a EncodedKey> + Send,
318 F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R> + Send,
319 R: Send,
320 {
321 self.check_active()?;
322 self.single.with_command(keys, f)
323 }
324
325 #[instrument(name = "transaction::admin::with_multi_query", level = "trace", skip(self, f))]
327 pub fn with_multi_query<F, R>(&self, f: F) -> Result<R>
328 where
329 F: FnOnce(&mut QueryTransaction) -> Result<R>,
330 {
331 self.check_active()?;
332
333 let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
334
335 f(&mut query_txn)
336 }
337
338 #[instrument(name = "transaction::admin::with_multi_query_as_of_exclusive", level = "trace", skip(self, f))]
339 pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
340 where
341 F: FnOnce(&mut QueryTransaction) -> Result<R>,
342 {
343 self.check_active()?;
344
345 let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
346
347 query_txn.read_as_of_version_exclusive(version)?;
348
349 f(&mut query_txn)
350 }
351
352 #[instrument(name = "transaction::admin::with_multi_query_as_of_inclusive", level = "trace", skip(self, f))]
353 pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
354 where
355 F: FnOnce(&mut QueryTransaction) -> Result<R>,
356 {
357 self.check_active()?;
358
359 let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
360
361 query_txn.multi.read_as_of_version_inclusive(version);
362
363 f(&mut query_txn)
364 }
365
366 #[instrument(name = "transaction::admin::begin_single_query", level = "trace", skip(self, keys))]
368 pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
369 where
370 I: IntoIterator<Item = &'a EncodedKey>,
371 {
372 self.check_active()?;
373 self.single.begin_query(keys)
374 }
375
376 #[instrument(name = "transaction::admin::begin_single_command", level = "trace", skip(self, keys))]
378 pub fn begin_single_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
379 where
380 I: IntoIterator<Item = &'a EncodedKey>,
381 {
382 self.check_active()?;
383 self.single.begin_command(keys)
384 }
385
386 pub fn get_changes(&self) -> &TransactionalDefChanges {
388 &self.changes
389 }
390
391 pub fn track_row_change(&mut self, change: RowChange) {
393 self.row_changes.push(change);
394 }
395
396 pub fn track_flow_change(&mut self, change: Change) {
398 self.pending_flow_changes.push(change);
399 }
400
401 #[inline]
403 pub fn version(&self) -> CommitVersion {
404 self.cmd.as_ref().unwrap().version()
405 }
406
407 #[inline]
409 pub fn id(&self) -> TransactionId {
410 self.cmd.as_ref().unwrap().tm.id()
411 }
412
413 #[inline]
415 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>> {
416 self.check_active()?;
417 Ok(self.cmd.as_mut().unwrap().get(key)?.map(|v| v.into_multi_version_values()))
418 }
419
420 #[inline]
422 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
423 self.check_active()?;
424 self.cmd.as_mut().unwrap().contains_key(key)
425 }
426
427 #[inline]
429 pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
430 self.check_active()?;
431 self.cmd.as_mut().unwrap().prefix(prefix)
432 }
433
434 #[inline]
436 pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
437 self.check_active()?;
438 self.cmd.as_mut().unwrap().prefix_rev(prefix)
439 }
440
441 #[inline]
443 pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
444 self.check_active()?;
445 self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version);
446 Ok(())
447 }
448
449 #[inline]
451 pub fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> Result<()> {
452 self.check_active()?;
453 self.cmd.as_mut().unwrap().set(key, row)
454 }
455
456 #[inline]
458 pub fn unset(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()> {
459 self.check_active()?;
460 self.cmd.as_mut().unwrap().unset(key, values)
461 }
462
463 #[inline]
465 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
466 self.check_active()?;
467 self.cmd.as_mut().unwrap().remove(key)
468 }
469
470 #[inline]
472 pub fn range(
473 &mut self,
474 range: EncodedKeyRange,
475 batch_size: usize,
476 ) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
477 self.check_active()?;
478 Ok(self.cmd.as_mut().unwrap().range(range, batch_size))
479 }
480
481 #[inline]
483 pub fn range_rev(
484 &mut self,
485 range: EncodedKeyRange,
486 batch_size: usize,
487 ) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
488 self.check_active()?;
489 Ok(self.cmd.as_mut().unwrap().range_rev(range, batch_size))
490 }
491}
492
493impl WithEventBus for AdminTransaction {
494 fn event_bus(&self) -> &EventBus {
495 &self.event_bus
496 }
497}
498
499impl WithInterceptors for AdminTransaction {
500 fn table_pre_insert_interceptors(&mut self) -> &mut Chain<dyn TablePreInsertInterceptor + Send + Sync> {
501 &mut self.interceptors.table_pre_insert
502 }
503
504 fn table_post_insert_interceptors(&mut self) -> &mut Chain<dyn TablePostInsertInterceptor + Send + Sync> {
505 &mut self.interceptors.table_post_insert
506 }
507
508 fn table_pre_update_interceptors(&mut self) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync> {
509 &mut self.interceptors.table_pre_update
510 }
511
512 fn table_post_update_interceptors(&mut self) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync> {
513 &mut self.interceptors.table_post_update
514 }
515
516 fn table_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync> {
517 &mut self.interceptors.table_pre_delete
518 }
519
520 fn table_post_delete_interceptors(&mut self) -> &mut Chain<dyn TablePostDeleteInterceptor + Send + Sync> {
521 &mut self.interceptors.table_post_delete
522 }
523
524 fn ringbuffer_pre_insert_interceptors(
525 &mut self,
526 ) -> &mut Chain<dyn RingBufferPreInsertInterceptor + Send + Sync> {
527 &mut self.interceptors.ringbuffer_pre_insert
528 }
529
530 fn ringbuffer_post_insert_interceptors(
531 &mut self,
532 ) -> &mut Chain<dyn RingBufferPostInsertInterceptor + Send + Sync> {
533 &mut self.interceptors.ringbuffer_post_insert
534 }
535
536 fn ringbuffer_pre_update_interceptors(
537 &mut self,
538 ) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
539 &mut self.interceptors.ringbuffer_pre_update
540 }
541
542 fn ringbuffer_post_update_interceptors(
543 &mut self,
544 ) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
545 &mut self.interceptors.ringbuffer_post_update
546 }
547
548 fn ringbuffer_pre_delete_interceptors(
549 &mut self,
550 ) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
551 &mut self.interceptors.ringbuffer_pre_delete
552 }
553
554 fn ringbuffer_post_delete_interceptors(
555 &mut self,
556 ) -> &mut Chain<dyn RingBufferPostDeleteInterceptor + Send + Sync> {
557 &mut self.interceptors.ringbuffer_post_delete
558 }
559
560 fn pre_commit_interceptors(&mut self) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync> {
561 &mut self.interceptors.pre_commit
562 }
563
564 fn post_commit_interceptors(&mut self) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync> {
565 &mut self.interceptors.post_commit
566 }
567
568 fn namespace_post_create_interceptors(
569 &mut self,
570 ) -> &mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync> {
571 &mut self.interceptors.namespace_post_create
572 }
573
574 fn namespace_pre_update_interceptors(&mut self) -> &mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync> {
575 &mut self.interceptors.namespace_pre_update
576 }
577
578 fn namespace_post_update_interceptors(
579 &mut self,
580 ) -> &mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync> {
581 &mut self.interceptors.namespace_post_update
582 }
583
584 fn namespace_pre_delete_interceptors(&mut self) -> &mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync> {
585 &mut self.interceptors.namespace_pre_delete
586 }
587
588 fn table_def_post_create_interceptors(
589 &mut self,
590 ) -> &mut Chain<dyn TableDefPostCreateInterceptor + Send + Sync> {
591 &mut self.interceptors.table_def_post_create
592 }
593
594 fn table_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn TableDefPreUpdateInterceptor + Send + Sync> {
595 &mut self.interceptors.table_def_pre_update
596 }
597
598 fn table_def_post_update_interceptors(
599 &mut self,
600 ) -> &mut Chain<dyn TableDefPostUpdateInterceptor + Send + Sync> {
601 &mut self.interceptors.table_def_post_update
602 }
603
604 fn table_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TableDefPreDeleteInterceptor + Send + Sync> {
605 &mut self.interceptors.table_def_pre_delete
606 }
607
608 fn view_pre_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPreInsertInterceptor + Send + Sync> {
609 &mut self.interceptors.view_pre_insert
610 }
611
612 fn view_post_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPostInsertInterceptor + Send + Sync> {
613 &mut self.interceptors.view_post_insert
614 }
615
616 fn view_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync> {
617 &mut self.interceptors.view_pre_update
618 }
619
620 fn view_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync> {
621 &mut self.interceptors.view_post_update
622 }
623
624 fn view_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync> {
625 &mut self.interceptors.view_pre_delete
626 }
627
628 fn view_post_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPostDeleteInterceptor + Send + Sync> {
629 &mut self.interceptors.view_post_delete
630 }
631
632 fn view_def_post_create_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostCreateInterceptor + Send + Sync> {
633 &mut self.interceptors.view_def_post_create
634 }
635
636 fn view_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreUpdateInterceptor + Send + Sync> {
637 &mut self.interceptors.view_def_pre_update
638 }
639
640 fn view_def_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostUpdateInterceptor + Send + Sync> {
641 &mut self.interceptors.view_def_post_update
642 }
643
644 fn view_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreDeleteInterceptor + Send + Sync> {
645 &mut self.interceptors.view_def_pre_delete
646 }
647
648 fn ringbuffer_def_post_create_interceptors(
649 &mut self,
650 ) -> &mut Chain<dyn RingBufferDefPostCreateInterceptor + Send + Sync> {
651 &mut self.interceptors.ringbuffer_def_post_create
652 }
653
654 fn ringbuffer_def_pre_update_interceptors(
655 &mut self,
656 ) -> &mut Chain<dyn RingBufferDefPreUpdateInterceptor + Send + Sync> {
657 &mut self.interceptors.ringbuffer_def_pre_update
658 }
659
660 fn ringbuffer_def_post_update_interceptors(
661 &mut self,
662 ) -> &mut Chain<dyn RingBufferDefPostUpdateInterceptor + Send + Sync> {
663 &mut self.interceptors.ringbuffer_def_post_update
664 }
665
666 fn ringbuffer_def_pre_delete_interceptors(
667 &mut self,
668 ) -> &mut Chain<dyn RingBufferDefPreDeleteInterceptor + Send + Sync> {
669 &mut self.interceptors.ringbuffer_def_pre_delete
670 }
671}
672
673impl TransactionalChanges for AdminTransaction {}
674
675impl Drop for AdminTransaction {
676 fn drop(&mut self) {
677 if let Some(mut multi) = self.cmd.take() {
678 if self.state == TransactionState::Active {
680 let _ = multi.rollback();
681 }
682 }
683 }
684}