1use std::marker::PhantomData;
5
6use reifydb_catalog::{MaterializedCatalog, transaction::MaterializedCatalogTransaction};
7use reifydb_core::{
8 CommitVersion, EncodedKey, EncodedKeyRange,
9 diagnostic::transaction,
10 event::EventBus,
11 interceptor,
12 interceptor::{
13 Chain, Interceptors, PostCommitInterceptor, PreCommitInterceptor, RingBufferPostDeleteInterceptor,
14 RingBufferPostInsertInterceptor, RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
15 RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor, TablePostDeleteInterceptor,
16 TablePostInsertInterceptor, TablePreDeleteInterceptor, TablePreInsertInterceptor,
17 TablePreUpdateInterceptor,
18 },
19 interface::{
20 BoxedMultiVersionIter, CdcTransaction, CommandTransaction, MultiVersionCommandTransaction,
21 MultiVersionQueryTransaction, MultiVersionTransaction, MultiVersionValues, QueryTransaction, RowChange,
22 SingleVersionTransaction, TransactionId, TransactionalChanges, TransactionalDefChanges, WithEventBus,
23 interceptor::{TransactionInterceptor, WithInterceptors},
24 },
25 return_error,
26 value::encoded::EncodedValues,
27};
28use reifydb_transaction::{
29 cdc::TransactionCdc,
30 multi::{TransactionMultiVersion, pending::PendingWrites},
31 single::TransactionSingleVersion,
32};
33use tracing::instrument;
34
35use crate::transaction::query::StandardQueryTransaction;
36
37pub struct StandardCommandTransaction {
42 pub multi: TransactionMultiVersion,
43 pub single: TransactionSingleVersion,
44 pub(crate) cdc: TransactionCdc,
45 state: TransactionState,
46
47 pub(crate) cmd: Option<<TransactionMultiVersion as MultiVersionTransaction>::Command>,
48 pub(crate) event_bus: EventBus,
49 pub(crate) changes: TransactionalDefChanges,
50 pub(crate) catalog: MaterializedCatalog,
51
52 pub(crate) row_changes: Vec<RowChange>,
54
55 pub(crate) interceptors: Interceptors<Self>,
56 _not_send_sync: PhantomData<*const ()>,
58}
59
60#[derive(Clone, Copy, PartialEq)]
61enum TransactionState {
62 Active,
63 Committed,
64 RolledBack,
65}
66
67impl StandardCommandTransaction {
68 #[instrument(level = "debug", skip_all)]
70 pub fn new(
71 multi: TransactionMultiVersion,
72 single: TransactionSingleVersion,
73 cdc: TransactionCdc,
74 event_bus: EventBus,
75 catalog: MaterializedCatalog,
76 interceptors: Interceptors<Self>,
77 ) -> reifydb_core::Result<Self> {
78 let cmd = multi.begin_command()?;
79 let txn_id = cmd.id();
80 Ok(Self {
81 cmd: Some(cmd),
82 multi,
83 single,
84 cdc,
85 state: TransactionState::Active,
86 event_bus,
87 catalog,
88 interceptors,
89 changes: TransactionalDefChanges::new(txn_id),
90 row_changes: Vec::new(),
91 _not_send_sync: PhantomData,
92 })
93 }
94
95 #[instrument(level = "trace", skip(self))]
96 pub fn event_bus(&self) -> &EventBus {
97 &self.event_bus
98 }
99
100 fn check_active(&self) -> crate::Result<()> {
103 match self.state {
104 TransactionState::Active => Ok(()),
105 TransactionState::Committed => {
106 return_error!(transaction::transaction_already_committed())
107 }
108 TransactionState::RolledBack => {
109 return_error!(transaction::transaction_already_rolled_back())
110 }
111 }
112 }
113
114 #[instrument(level = "debug", skip(self))]
118 pub fn commit(&mut self) -> crate::Result<CommitVersion> {
119 self.check_active()?;
120
121 TransactionInterceptor::pre_commit(self)?;
122
123 if let Some(multi) = self.cmd.take() {
124 let id = multi.id();
125 self.state = TransactionState::Committed;
126
127 let changes = std::mem::take(&mut self.changes);
128 let row_changes = std::mem::take(&mut self.row_changes);
129
130 let version = multi.commit()?;
131 TransactionInterceptor::post_commit(self, id, version, changes, row_changes)?;
132
133 Ok(version)
134 } else {
135 unreachable!("Transaction state inconsistency")
137 }
138 }
139
140 #[instrument(level = "debug", skip(self))]
142 pub fn rollback(&mut self) -> crate::Result<()> {
143 self.check_active()?;
144 if let Some(multi) = self.cmd.take() {
145 self.state = TransactionState::RolledBack;
146 multi.rollback()
147 } else {
148 unreachable!("Transaction state inconsistency")
150 }
151 }
152
153 #[instrument(level = "trace", skip(self))]
155 pub fn cdc(&self) -> &TransactionCdc {
156 &self.cdc
157 }
158
159 #[instrument(level = "trace", skip(self))]
164 pub fn pending_writes(&self) -> &PendingWrites {
165 self.cmd.as_ref().unwrap().pending_writes()
166 }
167
168 #[instrument(level = "trace", skip(self, keys, f))]
170 pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
171 where
172 I: IntoIterator<Item = &'a EncodedKey>,
173 F: FnOnce(&mut <TransactionSingleVersion as SingleVersionTransaction>::Query<'_>) -> crate::Result<R>,
174 {
175 self.check_active()?;
176 self.single.with_query(keys, f)
177 }
178
179 #[instrument(level = "trace", skip(self, keys, f))]
181 pub fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
182 where
183 I: IntoIterator<Item = &'a EncodedKey>,
184 F: FnOnce(&mut <TransactionSingleVersion as SingleVersionTransaction>::Command<'_>) -> crate::Result<R>,
185 {
186 self.check_active()?;
187 self.single.with_command(keys, f)
188 }
189
190 #[instrument(level = "trace", skip(self, f))]
194 pub fn with_multi_query<F, R>(&self, f: F) -> crate::Result<R>
195 where
196 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
197 {
198 self.check_active()?;
199
200 let mut query_txn = StandardQueryTransaction::new(
201 self.multi.begin_query()?,
202 self.single.clone(),
203 self.cdc.clone(),
204 self.catalog.clone(),
205 );
206
207 f(&mut query_txn)
208 }
209
210 #[instrument(level = "trace", skip(self, f))]
211 pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
212 where
213 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
214 {
215 self.check_active()?;
216
217 let mut query_txn = StandardQueryTransaction::new(
218 self.multi.begin_query()?,
219 self.single.clone(),
220 self.cdc.clone(),
221 self.catalog.clone(),
222 );
223
224 query_txn.read_as_of_version_exclusive(version)?;
225
226 f(&mut query_txn)
227 }
228
229 #[instrument(level = "trace", skip(self, f))]
230 pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
231 where
232 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
233 {
234 self.check_active()?;
235
236 let mut query_txn = StandardQueryTransaction::new(
237 self.multi.begin_query()?,
238 self.single.clone(),
239 self.cdc.clone(),
240 self.catalog.clone(),
241 );
242
243 query_txn.read_as_of_version_inclusive(version)?;
244
245 f(&mut query_txn)
246 }
247}
248
249impl MaterializedCatalogTransaction for StandardCommandTransaction {
250 fn catalog(&self) -> &MaterializedCatalog {
251 &self.catalog
252 }
253}
254
255impl MultiVersionQueryTransaction for StandardCommandTransaction {
256 #[inline]
257 fn version(&self) -> CommitVersion {
258 MultiVersionQueryTransaction::version(self.cmd.as_ref().unwrap())
259 }
260
261 #[inline]
262 fn id(&self) -> TransactionId {
263 MultiVersionQueryTransaction::id(self.cmd.as_ref().unwrap())
264 }
265
266 #[inline]
267 fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
268 self.check_active()?;
269 MultiVersionQueryTransaction::get(self.cmd.as_mut().unwrap(), key)
270 }
271
272 #[inline]
273 fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
274 self.check_active()?;
275 MultiVersionQueryTransaction::contains_key(self.cmd.as_mut().unwrap(), key)
276 }
277
278 #[inline]
279 fn range_batched(
280 &mut self,
281 range: EncodedKeyRange,
282 batch_size: u64,
283 ) -> crate::Result<BoxedMultiVersionIter<'_>> {
284 self.check_active()?;
285 MultiVersionQueryTransaction::range_batched(self.cmd.as_mut().unwrap(), range, batch_size)
286 }
287
288 #[inline]
289 fn range_rev_batched(
290 &mut self,
291 range: EncodedKeyRange,
292 batch_size: u64,
293 ) -> crate::Result<BoxedMultiVersionIter<'_>> {
294 self.check_active()?;
295 MultiVersionQueryTransaction::range_rev_batched(self.cmd.as_mut().unwrap(), range, batch_size)
296 }
297
298 #[inline]
299 fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter<'_>> {
300 self.check_active()?;
301 MultiVersionQueryTransaction::prefix(self.cmd.as_mut().unwrap(), prefix)
302 }
303
304 #[inline]
305 fn prefix_rev(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter<'_>> {
306 self.check_active()?;
307 MultiVersionQueryTransaction::prefix_rev(self.cmd.as_mut().unwrap(), prefix)
308 }
309
310 #[inline]
311 fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
312 self.check_active()?;
313 MultiVersionQueryTransaction::read_as_of_version_exclusive(self.cmd.as_mut().unwrap(), version)
314 }
315}
316
317impl MultiVersionCommandTransaction for StandardCommandTransaction {
318 #[inline]
319 fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
320 self.check_active()?;
321 MultiVersionCommandTransaction::set(self.cmd.as_mut().unwrap(), key, row)
322 }
323
324 #[inline]
325 fn remove(&mut self, key: &EncodedKey) -> crate::Result<()> {
326 self.check_active()?;
327 MultiVersionCommandTransaction::remove(self.cmd.as_mut().unwrap(), key)
328 }
329
330 #[inline]
331 fn commit(mut self) -> crate::Result<CommitVersion> {
332 self.check_active()?;
333 self.state = TransactionState::Committed;
334 MultiVersionCommandTransaction::commit(self.cmd.take().unwrap())
335 }
336
337 #[inline]
338 fn rollback(mut self) -> crate::Result<()> {
339 self.check_active()?;
340 self.state = TransactionState::RolledBack;
341 MultiVersionCommandTransaction::rollback(self.cmd.take().unwrap())
342 }
343}
344
345impl WithEventBus for StandardCommandTransaction {
346 fn event_bus(&self) -> &EventBus {
347 &self.event_bus
348 }
349}
350
351impl QueryTransaction for StandardCommandTransaction {
352 type SingleVersionQuery<'a> = <TransactionSingleVersion as SingleVersionTransaction>::Query<'a>;
353
354 type CdcQuery<'a> = <TransactionCdc as CdcTransaction>::Query<'a>;
355
356 fn begin_single_query<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionQuery<'_>>
357 where
358 I: IntoIterator<Item = &'a EncodedKey>,
359 {
360 self.check_active()?;
361 self.single.begin_query(keys)
362 }
363
364 fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
365 self.check_active()?;
366 self.cdc.begin_query()
367 }
368}
369
370impl CommandTransaction for StandardCommandTransaction {
371 type SingleVersionCommand<'a> = <TransactionSingleVersion as SingleVersionTransaction>::Command<'a>;
372
373 fn begin_single_command<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionCommand<'_>>
374 where
375 I: IntoIterator<Item = &'a EncodedKey>,
376 {
377 self.check_active()?;
378 self.single.begin_command(keys)
379 }
380
381 fn get_changes(&self) -> &TransactionalDefChanges {
382 &self.changes
383 }
384}
385
386impl WithInterceptors<StandardCommandTransaction> for StandardCommandTransaction {
387 fn table_pre_insert_interceptors(
388 &mut self,
389 ) -> &mut Chain<StandardCommandTransaction, dyn TablePreInsertInterceptor<StandardCommandTransaction>> {
390 &mut self.interceptors.table_pre_insert
391 }
392
393 fn table_post_insert_interceptors(
394 &mut self,
395 ) -> &mut Chain<StandardCommandTransaction, dyn TablePostInsertInterceptor<StandardCommandTransaction>> {
396 &mut self.interceptors.table_post_insert
397 }
398
399 fn table_pre_update_interceptors(
400 &mut self,
401 ) -> &mut Chain<StandardCommandTransaction, dyn TablePreUpdateInterceptor<StandardCommandTransaction>> {
402 &mut self.interceptors.table_pre_update
403 }
404
405 fn table_post_update_interceptors(
406 &mut self,
407 ) -> &mut Chain<
408 StandardCommandTransaction,
409 dyn interceptor::TablePostUpdateInterceptor<StandardCommandTransaction>,
410 > {
411 &mut self.interceptors.table_post_update
412 }
413
414 fn table_pre_delete_interceptors(
415 &mut self,
416 ) -> &mut Chain<StandardCommandTransaction, dyn TablePreDeleteInterceptor<StandardCommandTransaction>> {
417 &mut self.interceptors.table_pre_delete
418 }
419
420 fn table_post_delete_interceptors(
421 &mut self,
422 ) -> &mut Chain<StandardCommandTransaction, dyn TablePostDeleteInterceptor<StandardCommandTransaction>> {
423 &mut self.interceptors.table_post_delete
424 }
425
426 fn ring_buffer_pre_insert_interceptors(
427 &mut self,
428 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreInsertInterceptor<StandardCommandTransaction>> {
429 &mut self.interceptors.ring_buffer_pre_insert
430 }
431
432 fn ring_buffer_post_insert_interceptors(
433 &mut self,
434 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostInsertInterceptor<StandardCommandTransaction>> {
435 &mut self.interceptors.ring_buffer_post_insert
436 }
437
438 fn ring_buffer_pre_update_interceptors(
439 &mut self,
440 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreUpdateInterceptor<StandardCommandTransaction>> {
441 &mut self.interceptors.ring_buffer_pre_update
442 }
443
444 fn ring_buffer_post_update_interceptors(
445 &mut self,
446 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostUpdateInterceptor<StandardCommandTransaction>> {
447 &mut self.interceptors.ring_buffer_post_update
448 }
449
450 fn ring_buffer_pre_delete_interceptors(
451 &mut self,
452 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreDeleteInterceptor<StandardCommandTransaction>> {
453 &mut self.interceptors.ring_buffer_pre_delete
454 }
455
456 fn ring_buffer_post_delete_interceptors(
457 &mut self,
458 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostDeleteInterceptor<StandardCommandTransaction>> {
459 &mut self.interceptors.ring_buffer_post_delete
460 }
461
462 fn pre_commit_interceptors(
463 &mut self,
464 ) -> &mut Chain<StandardCommandTransaction, dyn PreCommitInterceptor<StandardCommandTransaction>> {
465 &mut self.interceptors.pre_commit
466 }
467
468 fn post_commit_interceptors(
469 &mut self,
470 ) -> &mut Chain<StandardCommandTransaction, dyn PostCommitInterceptor<StandardCommandTransaction>> {
471 &mut self.interceptors.post_commit
472 }
473
474 fn namespace_def_post_create_interceptors(
476 &mut self,
477 ) -> &mut Chain<
478 StandardCommandTransaction,
479 dyn interceptor::NamespaceDefPostCreateInterceptor<StandardCommandTransaction>,
480 > {
481 &mut self.interceptors.namespace_def_post_create
482 }
483
484 fn namespace_def_pre_update_interceptors(
485 &mut self,
486 ) -> &mut Chain<
487 StandardCommandTransaction,
488 dyn interceptor::NamespaceDefPreUpdateInterceptor<StandardCommandTransaction>,
489 > {
490 &mut self.interceptors.namespace_def_pre_update
491 }
492
493 fn namespace_def_post_update_interceptors(
494 &mut self,
495 ) -> &mut Chain<
496 StandardCommandTransaction,
497 dyn interceptor::NamespaceDefPostUpdateInterceptor<StandardCommandTransaction>,
498 > {
499 &mut self.interceptors.namespace_def_post_update
500 }
501
502 fn namespace_def_pre_delete_interceptors(
503 &mut self,
504 ) -> &mut Chain<
505 StandardCommandTransaction,
506 dyn interceptor::NamespaceDefPreDeleteInterceptor<StandardCommandTransaction>,
507 > {
508 &mut self.interceptors.namespace_def_pre_delete
509 }
510
511 fn table_def_post_create_interceptors(
513 &mut self,
514 ) -> &mut Chain<
515 StandardCommandTransaction,
516 dyn interceptor::TableDefPostCreateInterceptor<StandardCommandTransaction>,
517 > {
518 &mut self.interceptors.table_def_post_create
519 }
520
521 fn table_def_pre_update_interceptors(
522 &mut self,
523 ) -> &mut Chain<
524 StandardCommandTransaction,
525 dyn interceptor::TableDefPreUpdateInterceptor<StandardCommandTransaction>,
526 > {
527 &mut self.interceptors.table_def_pre_update
528 }
529
530 fn table_def_post_update_interceptors(
531 &mut self,
532 ) -> &mut Chain<
533 StandardCommandTransaction,
534 dyn interceptor::TableDefPostUpdateInterceptor<StandardCommandTransaction>,
535 > {
536 &mut self.interceptors.table_def_post_update
537 }
538
539 fn table_def_pre_delete_interceptors(
540 &mut self,
541 ) -> &mut Chain<
542 StandardCommandTransaction,
543 dyn interceptor::TableDefPreDeleteInterceptor<StandardCommandTransaction>,
544 > {
545 &mut self.interceptors.table_def_pre_delete
546 }
547
548 fn view_def_post_create_interceptors(
550 &mut self,
551 ) -> &mut Chain<
552 StandardCommandTransaction,
553 dyn interceptor::ViewDefPostCreateInterceptor<StandardCommandTransaction>,
554 > {
555 &mut self.interceptors.view_def_post_create
556 }
557
558 fn view_def_pre_update_interceptors(
559 &mut self,
560 ) -> &mut Chain<
561 StandardCommandTransaction,
562 dyn interceptor::ViewDefPreUpdateInterceptor<StandardCommandTransaction>,
563 > {
564 &mut self.interceptors.view_def_pre_update
565 }
566
567 fn view_def_post_update_interceptors(
568 &mut self,
569 ) -> &mut Chain<
570 StandardCommandTransaction,
571 dyn interceptor::ViewDefPostUpdateInterceptor<StandardCommandTransaction>,
572 > {
573 &mut self.interceptors.view_def_post_update
574 }
575
576 fn view_def_pre_delete_interceptors(
577 &mut self,
578 ) -> &mut Chain<
579 StandardCommandTransaction,
580 dyn interceptor::ViewDefPreDeleteInterceptor<StandardCommandTransaction>,
581 > {
582 &mut self.interceptors.view_def_pre_delete
583 }
584
585 fn ring_buffer_def_post_create_interceptors(
587 &mut self,
588 ) -> &mut Chain<
589 StandardCommandTransaction,
590 dyn interceptor::RingBufferDefPostCreateInterceptor<StandardCommandTransaction>,
591 > {
592 &mut self.interceptors.ring_buffer_def_post_create
593 }
594
595 fn ring_buffer_def_pre_update_interceptors(
596 &mut self,
597 ) -> &mut Chain<
598 StandardCommandTransaction,
599 dyn interceptor::RingBufferDefPreUpdateInterceptor<StandardCommandTransaction>,
600 > {
601 &mut self.interceptors.ring_buffer_def_pre_update
602 }
603
604 fn ring_buffer_def_post_update_interceptors(
605 &mut self,
606 ) -> &mut Chain<
607 StandardCommandTransaction,
608 dyn interceptor::RingBufferDefPostUpdateInterceptor<StandardCommandTransaction>,
609 > {
610 &mut self.interceptors.ring_buffer_def_post_update
611 }
612
613 fn ring_buffer_def_pre_delete_interceptors(
614 &mut self,
615 ) -> &mut Chain<
616 StandardCommandTransaction,
617 dyn interceptor::RingBufferDefPreDeleteInterceptor<StandardCommandTransaction>,
618 > {
619 &mut self.interceptors.ring_buffer_def_pre_delete
620 }
621
622 fn view_pre_insert_interceptors(
624 &mut self,
625 ) -> &mut Chain<StandardCommandTransaction, dyn interceptor::ViewPreInsertInterceptor<StandardCommandTransaction>>
626 {
627 &mut self.interceptors.view_pre_insert
628 }
629
630 fn view_post_insert_interceptors(
631 &mut self,
632 ) -> &mut Chain<
633 StandardCommandTransaction,
634 dyn interceptor::ViewPostInsertInterceptor<StandardCommandTransaction>,
635 > {
636 &mut self.interceptors.view_post_insert
637 }
638
639 fn view_pre_update_interceptors(
640 &mut self,
641 ) -> &mut Chain<StandardCommandTransaction, dyn interceptor::ViewPreUpdateInterceptor<StandardCommandTransaction>>
642 {
643 &mut self.interceptors.view_pre_update
644 }
645
646 fn view_post_update_interceptors(
647 &mut self,
648 ) -> &mut Chain<
649 StandardCommandTransaction,
650 dyn interceptor::ViewPostUpdateInterceptor<StandardCommandTransaction>,
651 > {
652 &mut self.interceptors.view_post_update
653 }
654
655 fn view_pre_delete_interceptors(
656 &mut self,
657 ) -> &mut Chain<StandardCommandTransaction, dyn interceptor::ViewPreDeleteInterceptor<StandardCommandTransaction>>
658 {
659 &mut self.interceptors.view_pre_delete
660 }
661
662 fn view_post_delete_interceptors(
663 &mut self,
664 ) -> &mut Chain<
665 StandardCommandTransaction,
666 dyn interceptor::ViewPostDeleteInterceptor<StandardCommandTransaction>,
667 > {
668 &mut self.interceptors.view_post_delete
669 }
670}
671
672impl TransactionalChanges for StandardCommandTransaction {}
673
674impl Drop for StandardCommandTransaction {
675 fn drop(&mut self) {
676 if let Some(multi) = self.cmd.take() {
677 if self.state == TransactionState::Active {
680 let _ = multi.rollback();
681 }
682 }
683 }
684}