1use std::mem::take;
5
6use reifydb_core::{
7 common::CommitVersion,
8 delta::Delta,
9 encoded::{
10 encoded::EncodedValues,
11 key::{EncodedKey, EncodedKeyRange},
12 },
13 event::EventBus,
14 interface::{
15 WithEventBus,
16 change::Change,
17 store::{MultiVersionBatch, MultiVersionValues},
18 },
19 testing::TestingContext,
20};
21use reifydb_type::Result;
22use tracing::instrument;
23
24use crate::{
25 TransactionId,
26 change::{RowChange, TransactionalDefChanges},
27 error::TransactionError,
28 interceptor::{
29 WithInterceptors,
30 chain::InterceptorChain as Chain,
31 interceptors::Interceptors,
32 namespace::{
33 NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
34 NamespacePreUpdateInterceptor,
35 },
36 ringbuffer::{
37 RingBufferPostDeleteInterceptor, RingBufferPostInsertInterceptor,
38 RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
39 RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor,
40 },
41 ringbuffer_def::{
42 RingBufferDefPostCreateInterceptor, RingBufferDefPostUpdateInterceptor,
43 RingBufferDefPreDeleteInterceptor, RingBufferDefPreUpdateInterceptor,
44 },
45 table::{
46 TablePostDeleteInterceptor, TablePostInsertInterceptor, TablePostUpdateInterceptor,
47 TablePreDeleteInterceptor, TablePreInsertInterceptor, TablePreUpdateInterceptor,
48 },
49 table_def::{
50 TableDefPostCreateInterceptor, TableDefPostUpdateInterceptor, TableDefPreDeleteInterceptor,
51 TableDefPreUpdateInterceptor,
52 },
53 transaction::{PostCommitContext, PostCommitInterceptor, PreCommitContext, PreCommitInterceptor},
54 view::{
55 ViewPostDeleteInterceptor, ViewPostInsertInterceptor, ViewPostUpdateInterceptor,
56 ViewPreDeleteInterceptor, ViewPreInsertInterceptor, ViewPreUpdateInterceptor,
57 },
58 view_def::{
59 ViewDefPostCreateInterceptor, ViewDefPostUpdateInterceptor, ViewDefPreDeleteInterceptor,
60 ViewDefPreUpdateInterceptor,
61 },
62 },
63 multi::{
64 pending::PendingWrites,
65 transaction::{MultiTransaction, write::MultiWriteTransaction},
66 },
67 single::{SingleTransaction, read::SingleReadTransaction, write::SingleWriteTransaction},
68 transaction::query::QueryTransaction,
69};
70
71pub struct CommandTransaction {
76 pub multi: MultiTransaction,
77 pub single: SingleTransaction,
78 state: TransactionState,
79
80 pub cmd: Option<MultiWriteTransaction>,
81 pub event_bus: EventBus,
82
83 pub(crate) row_changes: Vec<RowChange>,
85 pub(crate) interceptors: Interceptors,
86
87 pub(crate) pending_flow_changes: Vec<Change>,
89
90 pub testing: Option<TestingContext>,
92}
93
94#[derive(Clone, Copy, PartialEq)]
95enum TransactionState {
96 Active,
97 Committed,
98 RolledBack,
99}
100
101impl CommandTransaction {
102 #[instrument(name = "transaction::command::new", level = "debug", skip_all)]
104 pub fn new(
105 multi: MultiTransaction,
106 single: SingleTransaction,
107 event_bus: EventBus,
108 interceptors: Interceptors,
109 ) -> Result<Self> {
110 let cmd = multi.begin_command()?;
111 Ok(Self {
112 cmd: Some(cmd),
113 multi,
114 single,
115 state: TransactionState::Active,
116 event_bus,
117 interceptors,
118 row_changes: Vec::new(),
119 pending_flow_changes: Vec::new(),
120 testing: None,
121 })
122 }
123
124 #[instrument(name = "transaction::command::event_bus", level = "trace", skip(self))]
125 pub fn event_bus(&self) -> &EventBus {
126 &self.event_bus
127 }
128
129 fn check_active(&self) -> Result<()> {
132 match self.state {
133 TransactionState::Active => Ok(()),
134 TransactionState::Committed => {
135 return Err(TransactionError::AlreadyCommitted.into());
136 }
137 TransactionState::RolledBack => {
138 return Err(TransactionError::AlreadyRolledBack.into());
139 }
140 }
141 }
142
143 #[instrument(name = "transaction::command::commit", level = "debug", skip(self))]
147 pub fn commit(&mut self) -> Result<CommitVersion> {
148 self.check_active()?;
149
150 let transaction_writes: Vec<(EncodedKey, Option<EncodedValues>)> = self
151 .pending_writes()
152 .iter()
153 .map(|(key, pending)| match &pending.delta {
154 Delta::Set {
155 values,
156 ..
157 } => (key.clone(), Some(values.clone())),
158 _ => (key.clone(), None),
159 })
160 .collect();
161
162 let mut ctx = PreCommitContext {
163 flow_changes: take(&mut self.pending_flow_changes),
164 pending_writes: Vec::new(),
165 transaction_writes,
166 testing: self.testing.take(),
167 };
168 self.interceptors.pre_commit.execute(&mut ctx)?;
169 self.testing = ctx.testing;
170
171 if let Some(mut multi) = self.cmd.take() {
172 for (key, value) in &ctx.pending_writes {
174 match value {
175 Some(v) => multi.set(key, v.clone())?,
176 None => multi.remove(key)?,
177 }
178 }
179
180 let id = multi.tm.id();
181 self.state = TransactionState::Committed;
182
183 let changes = TransactionalDefChanges::default();
184 let row_changes = take(&mut self.row_changes);
185
186 let version = multi.commit()?;
187 self.interceptors.post_commit.execute(PostCommitContext::new(
188 id,
189 version,
190 changes,
191 row_changes,
192 ))?;
193
194 Ok(version)
195 } else {
196 unreachable!("Transaction state inconsistency")
198 }
199 }
200
201 #[instrument(name = "transaction::command::rollback", level = "debug", skip(self))]
203 pub fn rollback(&mut self) -> Result<()> {
204 self.check_active()?;
205 if let Some(mut multi) = self.cmd.take() {
206 self.state = TransactionState::RolledBack;
207 multi.rollback()
208 } else {
209 unreachable!("Transaction state inconsistency")
211 }
212 }
213
214 #[instrument(name = "transaction::command::pending_writes", level = "trace", skip(self))]
219 pub fn pending_writes(&self) -> &PendingWrites {
220 self.cmd.as_ref().unwrap().pending_writes()
221 }
222
223 #[instrument(name = "transaction::command::with_single_query", level = "trace", skip(self, keys, f))]
225 pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
226 where
227 I: IntoIterator<Item = &'a EncodedKey> + Send,
228 F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
229 R: Send,
230 {
231 self.check_active()?;
232 self.single.with_query(keys, f)
233 }
234
235 #[instrument(name = "transaction::command::with_single_command", level = "trace", skip(self, keys, f))]
237 pub fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
238 where
239 I: IntoIterator<Item = &'a EncodedKey> + Send,
240 F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R> + Send,
241 R: Send,
242 {
243 self.check_active()?;
244 self.single.with_command(keys, f)
245 }
246
247 #[instrument(name = "transaction::command::with_multi_query", level = "trace", skip(self, f))]
251 pub fn with_multi_query<F, R>(&self, f: F) -> Result<R>
252 where
253 F: FnOnce(&mut QueryTransaction) -> Result<R>,
254 {
255 self.check_active()?;
256
257 let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
258
259 f(&mut query_txn)
260 }
261
262 #[instrument(name = "transaction::command::with_multi_query_as_of_exclusive", level = "trace", skip(self, f))]
263 pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
264 where
265 F: FnOnce(&mut QueryTransaction) -> Result<R>,
266 {
267 self.check_active()?;
268
269 let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
270
271 query_txn.read_as_of_version_exclusive(version)?;
272
273 f(&mut query_txn)
274 }
275
276 #[instrument(name = "transaction::command::with_multi_query_as_of_inclusive", level = "trace", skip(self, f))]
277 pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> Result<R>
278 where
279 F: FnOnce(&mut QueryTransaction) -> Result<R>,
280 {
281 self.check_active()?;
282
283 let mut query_txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone());
284
285 query_txn.multi.read_as_of_version_inclusive(version);
286
287 f(&mut query_txn)
288 }
289
290 #[instrument(name = "transaction::command::begin_single_query", level = "trace", skip(self, keys))]
292 pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
293 where
294 I: IntoIterator<Item = &'a EncodedKey>,
295 {
296 self.check_active()?;
297 self.single.begin_query(keys)
298 }
299
300 #[instrument(name = "transaction::command::begin_single_command", level = "trace", skip(self, keys))]
302 pub fn begin_single_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
303 where
304 I: IntoIterator<Item = &'a EncodedKey>,
305 {
306 self.check_active()?;
307 self.single.begin_command(keys)
308 }
309
310 pub fn track_row_change(&mut self, change: RowChange) {
312 self.row_changes.push(change);
313 }
314
315 pub fn track_flow_change(&mut self, change: Change) {
317 self.pending_flow_changes.push(change);
318 }
319
320 #[inline]
322 pub fn version(&self) -> CommitVersion {
323 self.cmd.as_ref().unwrap().version()
324 }
325
326 #[inline]
328 pub fn id(&self) -> TransactionId {
329 self.cmd.as_ref().unwrap().tm.id()
330 }
331
332 #[inline]
334 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>> {
335 self.check_active()?;
336 Ok(self.cmd.as_mut().unwrap().get(key)?.map(|v| v.into_multi_version_values()))
337 }
338
339 #[inline]
341 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
342 self.check_active()?;
343 self.cmd.as_mut().unwrap().contains_key(key)
344 }
345
346 #[inline]
348 pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
349 self.check_active()?;
350 self.cmd.as_mut().unwrap().prefix(prefix)
351 }
352
353 #[inline]
355 pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
356 self.check_active()?;
357 self.cmd.as_mut().unwrap().prefix_rev(prefix)
358 }
359
360 #[inline]
362 pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
363 self.check_active()?;
364 self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version);
365 Ok(())
366 }
367
368 #[inline]
370 pub fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> Result<()> {
371 self.check_active()?;
372 self.cmd.as_mut().unwrap().set(key, row)
373 }
374
375 #[inline]
379 pub fn unset(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()> {
380 self.check_active()?;
381 self.cmd.as_mut().unwrap().unset(key, values)
382 }
383
384 #[inline]
388 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
389 self.check_active()?;
390 self.cmd.as_mut().unwrap().remove(key)
391 }
392
393 #[inline]
395 pub fn range(
396 &mut self,
397 range: EncodedKeyRange,
398 batch_size: usize,
399 ) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
400 self.check_active()?;
401 Ok(self.cmd.as_mut().unwrap().range(range, batch_size))
402 }
403
404 #[inline]
406 pub fn range_rev(
407 &mut self,
408 range: EncodedKeyRange,
409 batch_size: usize,
410 ) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
411 self.check_active()?;
412 Ok(self.cmd.as_mut().unwrap().range_rev(range, batch_size))
413 }
414}
415
416impl WithEventBus for CommandTransaction {
417 fn event_bus(&self) -> &EventBus {
418 &self.event_bus
419 }
420}
421
422impl WithInterceptors for CommandTransaction {
423 fn table_pre_insert_interceptors(&mut self) -> &mut Chain<dyn TablePreInsertInterceptor + Send + Sync> {
424 &mut self.interceptors.table_pre_insert
425 }
426
427 fn table_post_insert_interceptors(&mut self) -> &mut Chain<dyn TablePostInsertInterceptor + Send + Sync> {
428 &mut self.interceptors.table_post_insert
429 }
430
431 fn table_pre_update_interceptors(&mut self) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync> {
432 &mut self.interceptors.table_pre_update
433 }
434
435 fn table_post_update_interceptors(&mut self) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync> {
436 &mut self.interceptors.table_post_update
437 }
438
439 fn table_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync> {
440 &mut self.interceptors.table_pre_delete
441 }
442
443 fn table_post_delete_interceptors(&mut self) -> &mut Chain<dyn TablePostDeleteInterceptor + Send + Sync> {
444 &mut self.interceptors.table_post_delete
445 }
446
447 fn ringbuffer_pre_insert_interceptors(
448 &mut self,
449 ) -> &mut Chain<dyn RingBufferPreInsertInterceptor + Send + Sync> {
450 &mut self.interceptors.ringbuffer_pre_insert
451 }
452
453 fn ringbuffer_post_insert_interceptors(
454 &mut self,
455 ) -> &mut Chain<dyn RingBufferPostInsertInterceptor + Send + Sync> {
456 &mut self.interceptors.ringbuffer_post_insert
457 }
458
459 fn ringbuffer_pre_update_interceptors(
460 &mut self,
461 ) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
462 &mut self.interceptors.ringbuffer_pre_update
463 }
464
465 fn ringbuffer_post_update_interceptors(
466 &mut self,
467 ) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
468 &mut self.interceptors.ringbuffer_post_update
469 }
470
471 fn ringbuffer_pre_delete_interceptors(
472 &mut self,
473 ) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
474 &mut self.interceptors.ringbuffer_pre_delete
475 }
476
477 fn ringbuffer_post_delete_interceptors(
478 &mut self,
479 ) -> &mut Chain<dyn RingBufferPostDeleteInterceptor + Send + Sync> {
480 &mut self.interceptors.ringbuffer_post_delete
481 }
482
483 fn pre_commit_interceptors(&mut self) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync> {
484 &mut self.interceptors.pre_commit
485 }
486
487 fn post_commit_interceptors(&mut self) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync> {
488 &mut self.interceptors.post_commit
489 }
490
491 fn namespace_post_create_interceptors(
492 &mut self,
493 ) -> &mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync> {
494 &mut self.interceptors.namespace_post_create
495 }
496
497 fn namespace_pre_update_interceptors(&mut self) -> &mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync> {
498 &mut self.interceptors.namespace_pre_update
499 }
500
501 fn namespace_post_update_interceptors(
502 &mut self,
503 ) -> &mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync> {
504 &mut self.interceptors.namespace_post_update
505 }
506
507 fn namespace_pre_delete_interceptors(&mut self) -> &mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync> {
508 &mut self.interceptors.namespace_pre_delete
509 }
510
511 fn table_def_post_create_interceptors(
512 &mut self,
513 ) -> &mut Chain<dyn TableDefPostCreateInterceptor + Send + Sync> {
514 &mut self.interceptors.table_def_post_create
515 }
516
517 fn table_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn TableDefPreUpdateInterceptor + Send + Sync> {
518 &mut self.interceptors.table_def_pre_update
519 }
520
521 fn table_def_post_update_interceptors(
522 &mut self,
523 ) -> &mut Chain<dyn TableDefPostUpdateInterceptor + Send + Sync> {
524 &mut self.interceptors.table_def_post_update
525 }
526
527 fn table_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TableDefPreDeleteInterceptor + Send + Sync> {
528 &mut self.interceptors.table_def_pre_delete
529 }
530
531 fn view_pre_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPreInsertInterceptor + Send + Sync> {
532 &mut self.interceptors.view_pre_insert
533 }
534
535 fn view_post_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPostInsertInterceptor + Send + Sync> {
536 &mut self.interceptors.view_post_insert
537 }
538
539 fn view_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync> {
540 &mut self.interceptors.view_pre_update
541 }
542
543 fn view_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync> {
544 &mut self.interceptors.view_post_update
545 }
546
547 fn view_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync> {
548 &mut self.interceptors.view_pre_delete
549 }
550
551 fn view_post_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPostDeleteInterceptor + Send + Sync> {
552 &mut self.interceptors.view_post_delete
553 }
554
555 fn view_def_post_create_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostCreateInterceptor + Send + Sync> {
556 &mut self.interceptors.view_def_post_create
557 }
558
559 fn view_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreUpdateInterceptor + Send + Sync> {
560 &mut self.interceptors.view_def_pre_update
561 }
562
563 fn view_def_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostUpdateInterceptor + Send + Sync> {
564 &mut self.interceptors.view_def_post_update
565 }
566
567 fn view_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreDeleteInterceptor + Send + Sync> {
568 &mut self.interceptors.view_def_pre_delete
569 }
570
571 fn ringbuffer_def_post_create_interceptors(
572 &mut self,
573 ) -> &mut Chain<dyn RingBufferDefPostCreateInterceptor + Send + Sync> {
574 &mut self.interceptors.ringbuffer_def_post_create
575 }
576
577 fn ringbuffer_def_pre_update_interceptors(
578 &mut self,
579 ) -> &mut Chain<dyn RingBufferDefPreUpdateInterceptor + Send + Sync> {
580 &mut self.interceptors.ringbuffer_def_pre_update
581 }
582
583 fn ringbuffer_def_post_update_interceptors(
584 &mut self,
585 ) -> &mut Chain<dyn RingBufferDefPostUpdateInterceptor + Send + Sync> {
586 &mut self.interceptors.ringbuffer_def_post_update
587 }
588
589 fn ringbuffer_def_pre_delete_interceptors(
590 &mut self,
591 ) -> &mut Chain<dyn RingBufferDefPreDeleteInterceptor + Send + Sync> {
592 &mut self.interceptors.ringbuffer_def_pre_delete
593 }
594}
595
596impl Drop for CommandTransaction {
597 fn drop(&mut self) {
598 if let Some(mut multi) = self.cmd.take() {
599 if self.state == TransactionState::Active {
601 let _ = multi.rollback();
602 }
603 }
604 }
605}