1use std::marker::PhantomData;
5
6use reifydb_catalog::{MaterializedCatalog, transaction::MaterializedCatalogTransaction};
7use reifydb_core::{
8 CommitVersion, EncodedKey, EncodedKeyRange,
9 diagnostic::transaction,
10 event::EventBus,
11 interceptor,
12 interceptor::{
13 Chain, Interceptors, PostCommitInterceptor, PreCommitInterceptor, RingBufferPostDeleteInterceptor,
14 RingBufferPostInsertInterceptor, RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
15 RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor, TablePostDeleteInterceptor,
16 TablePostInsertInterceptor, TablePreDeleteInterceptor, TablePreInsertInterceptor,
17 TablePreUpdateInterceptor,
18 },
19 interface::{
20 BoxedMultiVersionIter, CdcTransaction, CommandTransaction, MultiVersionCommandTransaction,
21 MultiVersionQueryTransaction, MultiVersionTransaction, MultiVersionValues, QueryTransaction,
22 SingleVersionTransaction, TransactionId, TransactionalChanges, TransactionalDefChanges, WithEventBus,
23 interceptor::{TransactionInterceptor, WithInterceptors},
24 },
25 return_error,
26 value::encoded::EncodedValues,
27};
28use reifydb_transaction::{
29 cdc::TransactionCdc,
30 multi::{TransactionMultiVersion, pending::PendingWrites},
31 single::TransactionSingleVersion,
32};
33
34use crate::transaction::query::StandardQueryTransaction;
35
36pub struct StandardCommandTransaction {
41 pub multi: TransactionMultiVersion,
42 pub single: TransactionSingleVersion,
43 pub(crate) cdc: TransactionCdc,
44 state: TransactionState,
45
46 pub(crate) cmd: Option<<TransactionMultiVersion as MultiVersionTransaction>::Command>,
47 pub(crate) event_bus: EventBus,
48 pub(crate) changes: TransactionalDefChanges,
49 pub(crate) catalog: MaterializedCatalog,
50
51 pub(crate) interceptors: Interceptors<Self>,
52 _not_send_sync: PhantomData<*const ()>,
54}
55
56#[derive(Clone, Copy, PartialEq)]
57enum TransactionState {
58 Active,
59 Committed,
60 RolledBack,
61}
62
63impl StandardCommandTransaction {
64 pub fn new(
66 multi: TransactionMultiVersion,
67 single: TransactionSingleVersion,
68 cdc: TransactionCdc,
69 event_bus: EventBus,
70 catalog: MaterializedCatalog,
71 interceptors: Interceptors<Self>,
72 ) -> reifydb_core::Result<Self> {
73 let cmd = multi.begin_command()?;
74 let txn_id = cmd.id();
75 Ok(Self {
76 cmd: Some(cmd),
77 multi,
78 single,
79 cdc,
80 state: TransactionState::Active,
81 event_bus,
82 catalog,
83 interceptors,
84 changes: TransactionalDefChanges::new(txn_id),
85 _not_send_sync: PhantomData,
86 })
87 }
88
89 pub fn event_bus(&self) -> &EventBus {
90 &self.event_bus
91 }
92
93 fn check_active(&self) -> crate::Result<()> {
96 match self.state {
97 TransactionState::Active => Ok(()),
98 TransactionState::Committed => {
99 return_error!(transaction::transaction_already_committed())
100 }
101 TransactionState::RolledBack => {
102 return_error!(transaction::transaction_already_rolled_back())
103 }
104 }
105 }
106
107 pub fn commit(&mut self) -> crate::Result<CommitVersion> {
111 self.check_active()?;
112
113 TransactionInterceptor::pre_commit(self)?;
114
115 if let Some(multi) = self.cmd.take() {
116 let id = multi.id();
117 self.state = TransactionState::Committed;
118
119 let changes = std::mem::take(&mut self.changes);
120
121 let version = multi.commit()?;
122 TransactionInterceptor::post_commit(self, id, version, changes)?;
123
124 Ok(version)
125 } else {
126 unreachable!("Transaction state inconsistency")
128 }
129 }
130
131 pub fn rollback(&mut self) -> crate::Result<()> {
133 self.check_active()?;
134 if let Some(multi) = self.cmd.take() {
135 self.state = TransactionState::RolledBack;
136 multi.rollback()
137 } else {
138 unreachable!("Transaction state inconsistency")
140 }
141 }
142
143 pub fn cdc(&self) -> &TransactionCdc {
145 &self.cdc
146 }
147
148 pub fn pending_writes(&self) -> &PendingWrites {
153 self.cmd.as_ref().unwrap().pending_writes()
154 }
155
156 pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
158 where
159 I: IntoIterator<Item = &'a EncodedKey>,
160 F: FnOnce(&mut <TransactionSingleVersion as SingleVersionTransaction>::Query<'_>) -> crate::Result<R>,
161 {
162 self.check_active()?;
163 self.single.with_query(keys, f)
164 }
165
166 pub fn with_single_command<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
168 where
169 I: IntoIterator<Item = &'a EncodedKey>,
170 F: FnOnce(&mut <TransactionSingleVersion as SingleVersionTransaction>::Command<'_>) -> crate::Result<R>,
171 {
172 self.check_active()?;
173 self.single.with_command(keys, f)
174 }
175
176 pub fn with_multi_query<F, R>(&self, f: F) -> crate::Result<R>
180 where
181 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
182 {
183 self.check_active()?;
184
185 let mut query_txn = StandardQueryTransaction::new(
186 self.multi.begin_query()?,
187 self.single.clone(),
188 self.cdc.clone(),
189 self.catalog.clone(),
190 );
191
192 f(&mut query_txn)
193 }
194
195 pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
196 where
197 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
198 {
199 self.check_active()?;
200
201 let mut query_txn = StandardQueryTransaction::new(
202 self.multi.begin_query()?,
203 self.single.clone(),
204 self.cdc.clone(),
205 self.catalog.clone(),
206 );
207
208 query_txn.read_as_of_version_exclusive(version)?;
209
210 f(&mut query_txn)
211 }
212
213 pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
214 where
215 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
216 {
217 self.check_active()?;
218
219 let mut query_txn = StandardQueryTransaction::new(
220 self.multi.begin_query()?,
221 self.single.clone(),
222 self.cdc.clone(),
223 self.catalog.clone(),
224 );
225
226 query_txn.read_as_of_version_inclusive(version)?;
227
228 f(&mut query_txn)
229 }
230}
231
232impl MaterializedCatalogTransaction for StandardCommandTransaction {
233 fn catalog(&self) -> &MaterializedCatalog {
234 &self.catalog
235 }
236}
237
238impl MultiVersionQueryTransaction for StandardCommandTransaction {
239 #[inline]
240 fn version(&self) -> CommitVersion {
241 self.cmd.as_ref().unwrap().version()
242 }
243
244 #[inline]
245 fn id(&self) -> TransactionId {
246 self.cmd.as_ref().unwrap().id()
247 }
248
249 #[inline]
250 fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
251 self.check_active()?;
252 self.cmd.as_mut().unwrap().get(key)
253 }
254
255 #[inline]
256 fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
257 self.check_active()?;
258 self.cmd.as_mut().unwrap().contains_key(key)
259 }
260
261 #[inline]
262 fn range_batched(&mut self, range: EncodedKeyRange, batch_size: u64) -> crate::Result<BoxedMultiVersionIter> {
263 self.check_active()?;
264 self.cmd.as_mut().unwrap().range_batched(range, batch_size)
265 }
266
267 #[inline]
268 fn range_rev_batched(
269 &mut self,
270 range: EncodedKeyRange,
271 batch_size: u64,
272 ) -> crate::Result<BoxedMultiVersionIter> {
273 self.check_active()?;
274 self.cmd.as_mut().unwrap().range_rev_batched(range, batch_size)
275 }
276
277 #[inline]
278 fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter> {
279 self.check_active()?;
280 self.cmd.as_mut().unwrap().prefix(prefix)
281 }
282
283 #[inline]
284 fn prefix_rev(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter> {
285 self.check_active()?;
286 self.cmd.as_mut().unwrap().prefix_rev(prefix)
287 }
288
289 #[inline]
290 fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
291 self.check_active()?;
292 self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version)
293 }
294}
295
296impl MultiVersionCommandTransaction for StandardCommandTransaction {
297 #[inline]
298 fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
299 self.check_active()?;
300 self.cmd.as_mut().unwrap().set(key, row)
301 }
302
303 #[inline]
304 fn remove(&mut self, key: &EncodedKey) -> crate::Result<()> {
305 self.check_active()?;
306 self.cmd.as_mut().unwrap().remove(key)
307 }
308
309 #[inline]
310 fn commit(mut self) -> crate::Result<CommitVersion> {
311 self.check_active()?;
312 self.state = TransactionState::Committed;
313 self.cmd.take().unwrap().commit()
314 }
315
316 #[inline]
317 fn rollback(mut self) -> crate::Result<()> {
318 self.check_active()?;
319 self.state = TransactionState::RolledBack;
320 self.cmd.take().unwrap().rollback()
321 }
322}
323
324impl WithEventBus for StandardCommandTransaction {
325 fn event_bus(&self) -> &EventBus {
326 &self.event_bus
327 }
328}
329
330impl QueryTransaction for StandardCommandTransaction {
331 type SingleVersionQuery<'a> = <TransactionSingleVersion as SingleVersionTransaction>::Query<'a>;
332
333 type CdcQuery<'a> = <TransactionCdc as CdcTransaction>::Query<'a>;
334
335 fn begin_single_query<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionQuery<'_>>
336 where
337 I: IntoIterator<Item = &'a EncodedKey>,
338 {
339 self.check_active()?;
340 self.single.begin_query(keys)
341 }
342
343 fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
344 self.check_active()?;
345 self.cdc.begin_query()
346 }
347}
348
349impl CommandTransaction for StandardCommandTransaction {
350 type SingleVersionCommand<'a> = <TransactionSingleVersion as SingleVersionTransaction>::Command<'a>;
351
352 fn begin_single_command<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionCommand<'_>>
353 where
354 I: IntoIterator<Item = &'a EncodedKey>,
355 {
356 self.check_active()?;
357 self.single.begin_command(keys)
358 }
359
360 fn get_changes(&self) -> &TransactionalDefChanges {
361 &self.changes
362 }
363}
364
365impl WithInterceptors<StandardCommandTransaction> for StandardCommandTransaction {
366 fn table_pre_insert_interceptors(
367 &mut self,
368 ) -> &mut Chain<StandardCommandTransaction, dyn TablePreInsertInterceptor<StandardCommandTransaction>> {
369 &mut self.interceptors.table_pre_insert
370 }
371
372 fn table_post_insert_interceptors(
373 &mut self,
374 ) -> &mut Chain<StandardCommandTransaction, dyn TablePostInsertInterceptor<StandardCommandTransaction>> {
375 &mut self.interceptors.table_post_insert
376 }
377
378 fn table_pre_update_interceptors(
379 &mut self,
380 ) -> &mut Chain<StandardCommandTransaction, dyn TablePreUpdateInterceptor<StandardCommandTransaction>> {
381 &mut self.interceptors.table_pre_update
382 }
383
384 fn table_post_update_interceptors(
385 &mut self,
386 ) -> &mut Chain<
387 StandardCommandTransaction,
388 dyn interceptor::TablePostUpdateInterceptor<StandardCommandTransaction>,
389 > {
390 &mut self.interceptors.table_post_update
391 }
392
393 fn table_pre_delete_interceptors(
394 &mut self,
395 ) -> &mut Chain<StandardCommandTransaction, dyn TablePreDeleteInterceptor<StandardCommandTransaction>> {
396 &mut self.interceptors.table_pre_delete
397 }
398
399 fn table_post_delete_interceptors(
400 &mut self,
401 ) -> &mut Chain<StandardCommandTransaction, dyn TablePostDeleteInterceptor<StandardCommandTransaction>> {
402 &mut self.interceptors.table_post_delete
403 }
404
405 fn ring_buffer_pre_insert_interceptors(
406 &mut self,
407 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreInsertInterceptor<StandardCommandTransaction>> {
408 &mut self.interceptors.ring_buffer_pre_insert
409 }
410
411 fn ring_buffer_post_insert_interceptors(
412 &mut self,
413 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostInsertInterceptor<StandardCommandTransaction>> {
414 &mut self.interceptors.ring_buffer_post_insert
415 }
416
417 fn ring_buffer_pre_update_interceptors(
418 &mut self,
419 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreUpdateInterceptor<StandardCommandTransaction>> {
420 &mut self.interceptors.ring_buffer_pre_update
421 }
422
423 fn ring_buffer_post_update_interceptors(
424 &mut self,
425 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostUpdateInterceptor<StandardCommandTransaction>> {
426 &mut self.interceptors.ring_buffer_post_update
427 }
428
429 fn ring_buffer_pre_delete_interceptors(
430 &mut self,
431 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreDeleteInterceptor<StandardCommandTransaction>> {
432 &mut self.interceptors.ring_buffer_pre_delete
433 }
434
435 fn ring_buffer_post_delete_interceptors(
436 &mut self,
437 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostDeleteInterceptor<StandardCommandTransaction>> {
438 &mut self.interceptors.ring_buffer_post_delete
439 }
440
441 fn pre_commit_interceptors(
442 &mut self,
443 ) -> &mut Chain<StandardCommandTransaction, dyn PreCommitInterceptor<StandardCommandTransaction>> {
444 &mut self.interceptors.pre_commit
445 }
446
447 fn post_commit_interceptors(
448 &mut self,
449 ) -> &mut Chain<StandardCommandTransaction, dyn PostCommitInterceptor<StandardCommandTransaction>> {
450 &mut self.interceptors.post_commit
451 }
452
453 fn namespace_def_post_create_interceptors(
455 &mut self,
456 ) -> &mut Chain<
457 StandardCommandTransaction,
458 dyn interceptor::NamespaceDefPostCreateInterceptor<StandardCommandTransaction>,
459 > {
460 &mut self.interceptors.namespace_def_post_create
461 }
462
463 fn namespace_def_pre_update_interceptors(
464 &mut self,
465 ) -> &mut Chain<
466 StandardCommandTransaction,
467 dyn interceptor::NamespaceDefPreUpdateInterceptor<StandardCommandTransaction>,
468 > {
469 &mut self.interceptors.namespace_def_pre_update
470 }
471
472 fn namespace_def_post_update_interceptors(
473 &mut self,
474 ) -> &mut Chain<
475 StandardCommandTransaction,
476 dyn interceptor::NamespaceDefPostUpdateInterceptor<StandardCommandTransaction>,
477 > {
478 &mut self.interceptors.namespace_def_post_update
479 }
480
481 fn namespace_def_pre_delete_interceptors(
482 &mut self,
483 ) -> &mut Chain<
484 StandardCommandTransaction,
485 dyn interceptor::NamespaceDefPreDeleteInterceptor<StandardCommandTransaction>,
486 > {
487 &mut self.interceptors.namespace_def_pre_delete
488 }
489
490 fn table_def_post_create_interceptors(
492 &mut self,
493 ) -> &mut Chain<
494 StandardCommandTransaction,
495 dyn interceptor::TableDefPostCreateInterceptor<StandardCommandTransaction>,
496 > {
497 &mut self.interceptors.table_def_post_create
498 }
499
500 fn table_def_pre_update_interceptors(
501 &mut self,
502 ) -> &mut Chain<
503 StandardCommandTransaction,
504 dyn interceptor::TableDefPreUpdateInterceptor<StandardCommandTransaction>,
505 > {
506 &mut self.interceptors.table_def_pre_update
507 }
508
509 fn table_def_post_update_interceptors(
510 &mut self,
511 ) -> &mut Chain<
512 StandardCommandTransaction,
513 dyn interceptor::TableDefPostUpdateInterceptor<StandardCommandTransaction>,
514 > {
515 &mut self.interceptors.table_def_post_update
516 }
517
518 fn table_def_pre_delete_interceptors(
519 &mut self,
520 ) -> &mut Chain<
521 StandardCommandTransaction,
522 dyn interceptor::TableDefPreDeleteInterceptor<StandardCommandTransaction>,
523 > {
524 &mut self.interceptors.table_def_pre_delete
525 }
526
527 fn view_def_post_create_interceptors(
529 &mut self,
530 ) -> &mut Chain<
531 StandardCommandTransaction,
532 dyn interceptor::ViewDefPostCreateInterceptor<StandardCommandTransaction>,
533 > {
534 &mut self.interceptors.view_def_post_create
535 }
536
537 fn view_def_pre_update_interceptors(
538 &mut self,
539 ) -> &mut Chain<
540 StandardCommandTransaction,
541 dyn interceptor::ViewDefPreUpdateInterceptor<StandardCommandTransaction>,
542 > {
543 &mut self.interceptors.view_def_pre_update
544 }
545
546 fn view_def_post_update_interceptors(
547 &mut self,
548 ) -> &mut Chain<
549 StandardCommandTransaction,
550 dyn interceptor::ViewDefPostUpdateInterceptor<StandardCommandTransaction>,
551 > {
552 &mut self.interceptors.view_def_post_update
553 }
554
555 fn view_def_pre_delete_interceptors(
556 &mut self,
557 ) -> &mut Chain<
558 StandardCommandTransaction,
559 dyn interceptor::ViewDefPreDeleteInterceptor<StandardCommandTransaction>,
560 > {
561 &mut self.interceptors.view_def_pre_delete
562 }
563}
564
565impl TransactionalChanges for StandardCommandTransaction {}
566
567impl Drop for StandardCommandTransaction {
568 fn drop(&mut self) {
569 if let Some(multi) = self.cmd.take() {
570 if self.state == TransactionState::Active {
573 let _ = multi.rollback();
574 }
575 }
576 }
577}