1use async_trait::async_trait;
5use reifydb_catalog::{MaterializedCatalog, transaction::MaterializedCatalogTransaction};
6use reifydb_core::{
7 CommitVersion, EncodedKey, EncodedKeyRange,
8 diagnostic::transaction,
9 event::EventBus,
10 interceptor,
11 interceptor::{
12 Chain, Interceptors, PostCommitInterceptor, PreCommitInterceptor, RingBufferPostDeleteInterceptor,
13 RingBufferPostInsertInterceptor, RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
14 RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor, TablePostDeleteInterceptor,
15 TablePostInsertInterceptor, TablePreDeleteInterceptor, TablePreInsertInterceptor,
16 TablePreUpdateInterceptor,
17 },
18 interface::{
19 CdcTransaction, CommandTransaction, MultiVersionBatch, MultiVersionCommandTransaction,
20 MultiVersionQueryTransaction, MultiVersionTransaction, MultiVersionValues, QueryTransaction, RowChange,
21 SingleVersionTransaction, TransactionId, TransactionalChanges, TransactionalDefChanges, WithEventBus,
22 interceptor::{TransactionInterceptor, WithInterceptors},
23 },
24 return_error,
25 value::encoded::EncodedValues,
26};
27use reifydb_transaction::{
28 cdc::TransactionCdc,
29 multi::{TransactionMultiVersion, pending::PendingWrites},
30 single::TransactionSingle,
31};
32use tracing::instrument;
33
34use crate::transaction::query::StandardQueryTransaction;
35
36pub struct StandardCommandTransaction {
41 pub multi: TransactionMultiVersion,
42 pub single: TransactionSingle,
43 pub(crate) cdc: TransactionCdc,
44 state: TransactionState,
45
46 pub(crate) cmd: Option<<TransactionMultiVersion as MultiVersionTransaction>::Command>,
47 pub(crate) event_bus: EventBus,
48 pub(crate) changes: TransactionalDefChanges,
49 pub(crate) catalog: MaterializedCatalog,
50
51 pub(crate) row_changes: Vec<RowChange>,
53
54 pub(crate) interceptors: Interceptors<Self>,
55}
56
57#[derive(Clone, Copy, PartialEq)]
58enum TransactionState {
59 Active,
60 Committed,
61 RolledBack,
62}
63
64impl StandardCommandTransaction {
65 #[instrument(name = "engine::transaction::command::new", level = "debug", skip_all)]
67 pub async fn new(
68 multi: TransactionMultiVersion,
69 single: TransactionSingle,
70 cdc: TransactionCdc,
71 event_bus: EventBus,
72 catalog: MaterializedCatalog,
73 interceptors: Interceptors<Self>,
74 ) -> reifydb_core::Result<Self> {
75 let cmd = multi.begin_command().await?;
76 let txn_id = cmd.id();
77 Ok(Self {
78 cmd: Some(cmd),
79 multi,
80 single,
81 cdc,
82 state: TransactionState::Active,
83 event_bus,
84 catalog,
85 interceptors,
86 changes: TransactionalDefChanges::new(txn_id),
87 row_changes: Vec::new(),
88 })
89 }
90
91 #[instrument(name = "engine::transaction::command::event_bus", level = "trace", skip(self))]
92 pub fn event_bus(&self) -> &EventBus {
93 &self.event_bus
94 }
95
96 fn check_active(&self) -> crate::Result<()> {
99 match self.state {
100 TransactionState::Active => Ok(()),
101 TransactionState::Committed => {
102 return_error!(transaction::transaction_already_committed())
103 }
104 TransactionState::RolledBack => {
105 return_error!(transaction::transaction_already_rolled_back())
106 }
107 }
108 }
109
110 #[instrument(name = "engine::transaction::command::commit", level = "debug", skip(self))]
114 pub async fn commit(&mut self) -> crate::Result<CommitVersion> {
115 self.check_active()?;
116
117 TransactionInterceptor::pre_commit(self).await?;
118
119 if let Some(mut multi) = self.cmd.take() {
120 let id = multi.id();
121 self.state = TransactionState::Committed;
122
123 let changes = std::mem::take(&mut self.changes);
124 let row_changes = std::mem::take(&mut self.row_changes);
125
126 let version = multi.commit().await?;
127 TransactionInterceptor::post_commit(self, id, version, changes, row_changes).await?;
128
129 Ok(version)
130 } else {
131 unreachable!("Transaction state inconsistency")
133 }
134 }
135
136 #[instrument(name = "engine::transaction::command::rollback", level = "debug", skip(self))]
138 pub fn rollback(&mut self) -> crate::Result<()> {
139 self.check_active()?;
140 if let Some(mut multi) = self.cmd.take() {
141 self.state = TransactionState::RolledBack;
142 multi.rollback()
143 } else {
144 unreachable!("Transaction state inconsistency")
146 }
147 }
148
149 #[instrument(name = "engine::transaction::command::cdc", level = "trace", skip(self))]
151 pub fn cdc(&self) -> &TransactionCdc {
152 &self.cdc
153 }
154
155 #[instrument(name = "engine::transaction::command::pending_writes", level = "trace", skip(self))]
160 pub fn pending_writes(&self) -> &PendingWrites {
161 self.cmd.as_ref().unwrap().pending_writes()
162 }
163
164 #[instrument(name = "engine::transaction::command::with_single_query", level = "trace", skip(self, keys, f))]
166 pub async fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
167 where
168 I: IntoIterator<Item = &'a EncodedKey> + Send,
169 F: FnOnce(&mut <TransactionSingle as SingleVersionTransaction>::Query<'_>) -> crate::Result<R> + Send,
170 R: Send,
171 {
172 self.check_active()?;
173 self.single.with_query(keys, f).await
174 }
175
176 #[instrument(name = "engine::transaction::command::with_single_command", level = "trace", skip(self, keys, f))]
178 pub async fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
179 where
180 I: IntoIterator<Item = &'a EncodedKey> + Send,
181 F: FnOnce(&mut <TransactionSingle as SingleVersionTransaction>::Command<'_>) -> crate::Result<R> + Send,
182 R: Send,
183 {
184 self.check_active()?;
185 self.single.with_command(keys, f).await
186 }
187
188 #[instrument(name = "engine::transaction::command::with_multi_query", level = "trace", skip(self, f))]
192 pub async fn with_multi_query<F, R>(&self, f: F) -> crate::Result<R>
193 where
194 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
195 {
196 self.check_active()?;
197
198 let mut query_txn = StandardQueryTransaction::new(
199 self.multi.begin_query().await?,
200 self.single.clone(),
201 self.cdc.clone(),
202 self.catalog.clone(),
203 );
204
205 f(&mut query_txn)
206 }
207
208 #[instrument(
209 name = "engine::transaction::command::with_multi_query_as_of_exclusive",
210 level = "trace",
211 skip(self, f)
212 )]
213 pub async fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
214 where
215 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
216 {
217 self.check_active()?;
218
219 let mut query_txn = StandardQueryTransaction::new(
220 self.multi.begin_query().await?,
221 self.single.clone(),
222 self.cdc.clone(),
223 self.catalog.clone(),
224 );
225
226 query_txn.read_as_of_version_exclusive(version).await?;
227
228 f(&mut query_txn)
229 }
230
231 #[instrument(
232 name = "engine::transaction::command::with_multi_query_as_of_inclusive",
233 level = "trace",
234 skip(self, f)
235 )]
236 pub async fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
237 where
238 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
239 {
240 self.check_active()?;
241
242 let mut query_txn = StandardQueryTransaction::new(
243 self.multi.begin_query().await?,
244 self.single.clone(),
245 self.cdc.clone(),
246 self.catalog.clone(),
247 );
248
249 query_txn.read_as_of_version_inclusive(version).await?;
250
251 f(&mut query_txn)
252 }
253}
254
255impl MaterializedCatalogTransaction for StandardCommandTransaction {
256 fn catalog(&self) -> &MaterializedCatalog {
257 &self.catalog
258 }
259}
260
261#[async_trait]
262impl MultiVersionQueryTransaction for StandardCommandTransaction {
263 #[inline]
264 fn version(&self) -> CommitVersion {
265 MultiVersionQueryTransaction::version(self.cmd.as_ref().unwrap())
266 }
267
268 #[inline]
269 fn id(&self) -> TransactionId {
270 MultiVersionQueryTransaction::id(self.cmd.as_ref().unwrap())
271 }
272
273 #[inline]
274 async fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
275 self.check_active()?;
276 MultiVersionQueryTransaction::get(self.cmd.as_mut().unwrap(), key).await
277 }
278
279 #[inline]
280 async fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
281 self.check_active()?;
282 MultiVersionQueryTransaction::contains_key(self.cmd.as_mut().unwrap(), key).await
283 }
284
285 #[inline]
286 async fn range_batch(&mut self, range: EncodedKeyRange, batch_size: u64) -> crate::Result<MultiVersionBatch> {
287 self.check_active()?;
288 MultiVersionQueryTransaction::range_batch(self.cmd.as_mut().unwrap(), range, batch_size).await
289 }
290
291 #[inline]
292 async fn range_rev_batch(
293 &mut self,
294 range: EncodedKeyRange,
295 batch_size: u64,
296 ) -> crate::Result<MultiVersionBatch> {
297 self.check_active()?;
298 MultiVersionQueryTransaction::range_rev_batch(self.cmd.as_mut().unwrap(), range, batch_size).await
299 }
300
301 #[inline]
302 async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
303 self.check_active()?;
304 MultiVersionQueryTransaction::read_as_of_version_exclusive(self.cmd.as_mut().unwrap(), version).await
305 }
306}
307
308#[async_trait]
309impl MultiVersionCommandTransaction for StandardCommandTransaction {
310 #[inline]
311 async fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
312 self.check_active()?;
313 MultiVersionCommandTransaction::set(self.cmd.as_mut().unwrap(), key, row).await
314 }
315
316 #[inline]
317 async fn remove(&mut self, key: &EncodedKey) -> crate::Result<()> {
318 self.check_active()?;
319 MultiVersionCommandTransaction::remove(self.cmd.as_mut().unwrap(), key).await
320 }
321
322 #[inline]
323 async fn commit(&mut self) -> crate::Result<CommitVersion> {
324 self.check_active()?;
325 let result = MultiVersionCommandTransaction::commit(self.cmd.as_mut().unwrap()).await;
326 if result.is_ok() {
327 self.state = TransactionState::Committed;
328 }
329 result
330 }
331
332 #[inline]
333 async fn rollback(&mut self) -> crate::Result<()> {
334 self.check_active()?;
335 let result = MultiVersionCommandTransaction::rollback(self.cmd.as_mut().unwrap()).await;
336 if result.is_ok() {
337 self.state = TransactionState::RolledBack;
338 }
339 result
340 }
341}
342
343impl WithEventBus for StandardCommandTransaction {
344 fn event_bus(&self) -> &EventBus {
345 &self.event_bus
346 }
347}
348
349#[async_trait]
350impl QueryTransaction for StandardCommandTransaction {
351 type SingleVersionQuery<'a> = <TransactionSingle as SingleVersionTransaction>::Query<'a>;
352
353 type CdcQuery<'a> = <TransactionCdc as CdcTransaction>::Query<'a>;
354
355 async fn begin_single_query<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionQuery<'_>>
356 where
357 I: IntoIterator<Item = &'a EncodedKey> + Send,
358 {
359 self.check_active()?;
360 self.single.begin_query(keys).await
361 }
362
363 async fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
364 self.check_active()?;
365 Ok(self.cdc.begin_query()?)
366 }
367}
368
369#[async_trait]
370impl CommandTransaction for StandardCommandTransaction {
371 type SingleVersionCommand<'a> = <TransactionSingle as SingleVersionTransaction>::Command<'a>;
372
373 async fn begin_single_command<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionCommand<'_>>
374 where
375 I: IntoIterator<Item = &'a EncodedKey> + Send,
376 {
377 self.check_active()?;
378 self.single.begin_command(keys).await
379 }
380
381 fn get_changes(&self) -> &TransactionalDefChanges {
382 &self.changes
383 }
384}
385
386impl WithInterceptors<StandardCommandTransaction> for StandardCommandTransaction {
387 fn table_pre_insert_interceptors(
388 &mut self,
389 ) -> &mut Chain<
390 StandardCommandTransaction,
391 dyn TablePreInsertInterceptor<StandardCommandTransaction> + Send + Sync,
392 > {
393 &mut self.interceptors.table_pre_insert
394 }
395
396 fn table_post_insert_interceptors(
397 &mut self,
398 ) -> &mut Chain<
399 StandardCommandTransaction,
400 dyn TablePostInsertInterceptor<StandardCommandTransaction> + Send + Sync,
401 > {
402 &mut self.interceptors.table_post_insert
403 }
404
405 fn table_pre_update_interceptors(
406 &mut self,
407 ) -> &mut Chain<
408 StandardCommandTransaction,
409 dyn TablePreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
410 > {
411 &mut self.interceptors.table_pre_update
412 }
413
414 fn table_post_update_interceptors(
415 &mut self,
416 ) -> &mut Chain<
417 StandardCommandTransaction,
418 dyn interceptor::TablePostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
419 > {
420 &mut self.interceptors.table_post_update
421 }
422
423 fn table_pre_delete_interceptors(
424 &mut self,
425 ) -> &mut Chain<
426 StandardCommandTransaction,
427 dyn TablePreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
428 > {
429 &mut self.interceptors.table_pre_delete
430 }
431
432 fn table_post_delete_interceptors(
433 &mut self,
434 ) -> &mut Chain<
435 StandardCommandTransaction,
436 dyn TablePostDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
437 > {
438 &mut self.interceptors.table_post_delete
439 }
440
441 fn ringbuffer_pre_insert_interceptors(
442 &mut self,
443 ) -> &mut Chain<
444 StandardCommandTransaction,
445 dyn RingBufferPreInsertInterceptor<StandardCommandTransaction> + Send + Sync,
446 > {
447 &mut self.interceptors.ringbuffer_pre_insert
448 }
449
450 fn ringbuffer_post_insert_interceptors(
451 &mut self,
452 ) -> &mut Chain<
453 StandardCommandTransaction,
454 dyn RingBufferPostInsertInterceptor<StandardCommandTransaction> + Send + Sync,
455 > {
456 &mut self.interceptors.ringbuffer_post_insert
457 }
458
459 fn ringbuffer_pre_update_interceptors(
460 &mut self,
461 ) -> &mut Chain<
462 StandardCommandTransaction,
463 dyn RingBufferPreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
464 > {
465 &mut self.interceptors.ringbuffer_pre_update
466 }
467
468 fn ringbuffer_post_update_interceptors(
469 &mut self,
470 ) -> &mut Chain<
471 StandardCommandTransaction,
472 dyn RingBufferPostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
473 > {
474 &mut self.interceptors.ringbuffer_post_update
475 }
476
477 fn ringbuffer_pre_delete_interceptors(
478 &mut self,
479 ) -> &mut Chain<
480 StandardCommandTransaction,
481 dyn RingBufferPreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
482 > {
483 &mut self.interceptors.ringbuffer_pre_delete
484 }
485
486 fn ringbuffer_post_delete_interceptors(
487 &mut self,
488 ) -> &mut Chain<
489 StandardCommandTransaction,
490 dyn RingBufferPostDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
491 > {
492 &mut self.interceptors.ringbuffer_post_delete
493 }
494
495 fn pre_commit_interceptors(
496 &mut self,
497 ) -> &mut Chain<StandardCommandTransaction, dyn PreCommitInterceptor<StandardCommandTransaction> + Send + Sync>
498 {
499 &mut self.interceptors.pre_commit
500 }
501
502 fn post_commit_interceptors(
503 &mut self,
504 ) -> &mut Chain<StandardCommandTransaction, dyn PostCommitInterceptor<StandardCommandTransaction> + Send + Sync>
505 {
506 &mut self.interceptors.post_commit
507 }
508
509 fn namespace_def_post_create_interceptors(
511 &mut self,
512 ) -> &mut Chain<
513 StandardCommandTransaction,
514 dyn interceptor::NamespaceDefPostCreateInterceptor<StandardCommandTransaction> + Send + Sync,
515 > {
516 &mut self.interceptors.namespace_def_post_create
517 }
518
519 fn namespace_def_pre_update_interceptors(
520 &mut self,
521 ) -> &mut Chain<
522 StandardCommandTransaction,
523 dyn interceptor::NamespaceDefPreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
524 > {
525 &mut self.interceptors.namespace_def_pre_update
526 }
527
528 fn namespace_def_post_update_interceptors(
529 &mut self,
530 ) -> &mut Chain<
531 StandardCommandTransaction,
532 dyn interceptor::NamespaceDefPostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
533 > {
534 &mut self.interceptors.namespace_def_post_update
535 }
536
537 fn namespace_def_pre_delete_interceptors(
538 &mut self,
539 ) -> &mut Chain<
540 StandardCommandTransaction,
541 dyn interceptor::NamespaceDefPreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
542 > {
543 &mut self.interceptors.namespace_def_pre_delete
544 }
545
546 fn table_def_post_create_interceptors(
548 &mut self,
549 ) -> &mut Chain<
550 StandardCommandTransaction,
551 dyn interceptor::TableDefPostCreateInterceptor<StandardCommandTransaction> + Send + Sync,
552 > {
553 &mut self.interceptors.table_def_post_create
554 }
555
556 fn table_def_pre_update_interceptors(
557 &mut self,
558 ) -> &mut Chain<
559 StandardCommandTransaction,
560 dyn interceptor::TableDefPreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
561 > {
562 &mut self.interceptors.table_def_pre_update
563 }
564
565 fn table_def_post_update_interceptors(
566 &mut self,
567 ) -> &mut Chain<
568 StandardCommandTransaction,
569 dyn interceptor::TableDefPostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
570 > {
571 &mut self.interceptors.table_def_post_update
572 }
573
574 fn table_def_pre_delete_interceptors(
575 &mut self,
576 ) -> &mut Chain<
577 StandardCommandTransaction,
578 dyn interceptor::TableDefPreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
579 > {
580 &mut self.interceptors.table_def_pre_delete
581 }
582
583 fn view_def_post_create_interceptors(
585 &mut self,
586 ) -> &mut Chain<
587 StandardCommandTransaction,
588 dyn interceptor::ViewDefPostCreateInterceptor<StandardCommandTransaction> + Send + Sync,
589 > {
590 &mut self.interceptors.view_def_post_create
591 }
592
593 fn view_def_pre_update_interceptors(
594 &mut self,
595 ) -> &mut Chain<
596 StandardCommandTransaction,
597 dyn interceptor::ViewDefPreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
598 > {
599 &mut self.interceptors.view_def_pre_update
600 }
601
602 fn view_def_post_update_interceptors(
603 &mut self,
604 ) -> &mut Chain<
605 StandardCommandTransaction,
606 dyn interceptor::ViewDefPostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
607 > {
608 &mut self.interceptors.view_def_post_update
609 }
610
611 fn view_def_pre_delete_interceptors(
612 &mut self,
613 ) -> &mut Chain<
614 StandardCommandTransaction,
615 dyn interceptor::ViewDefPreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
616 > {
617 &mut self.interceptors.view_def_pre_delete
618 }
619
620 fn ringbuffer_def_post_create_interceptors(
622 &mut self,
623 ) -> &mut Chain<
624 StandardCommandTransaction,
625 dyn interceptor::RingBufferDefPostCreateInterceptor<StandardCommandTransaction> + Send + Sync,
626 > {
627 &mut self.interceptors.ringbuffer_def_post_create
628 }
629
630 fn ringbuffer_def_pre_update_interceptors(
631 &mut self,
632 ) -> &mut Chain<
633 StandardCommandTransaction,
634 dyn interceptor::RingBufferDefPreUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
635 > {
636 &mut self.interceptors.ringbuffer_def_pre_update
637 }
638
639 fn ringbuffer_def_post_update_interceptors(
640 &mut self,
641 ) -> &mut Chain<
642 StandardCommandTransaction,
643 dyn interceptor::RingBufferDefPostUpdateInterceptor<StandardCommandTransaction> + Send + Sync,
644 > {
645 &mut self.interceptors.ringbuffer_def_post_update
646 }
647
648 fn ringbuffer_def_pre_delete_interceptors(
649 &mut self,
650 ) -> &mut Chain<
651 StandardCommandTransaction,
652 dyn interceptor::RingBufferDefPreDeleteInterceptor<StandardCommandTransaction> + Send + Sync,
653 > {
654 &mut self.interceptors.ringbuffer_def_pre_delete
655 }
656}
657
658impl TransactionalChanges for StandardCommandTransaction {}
659
660impl Drop for StandardCommandTransaction {
661 fn drop(&mut self) {
662 if let Some(mut multi) = self.cmd.take() {
663 if self.state == TransactionState::Active {
666 let _ = multi.rollback();
667 }
668 }
669 }
670}