1use std::marker::PhantomData;
5
6use reifydb_catalog::{MaterializedCatalog, transaction::MaterializedCatalogTransaction};
7use reifydb_core::{
8 CommitVersion, EncodedKey, EncodedKeyRange,
9 diagnostic::transaction,
10 event::EventBus,
11 interceptor,
12 interceptor::{
13 Chain, Interceptors, PostCommitInterceptor, PreCommitInterceptor, RingBufferPostDeleteInterceptor,
14 RingBufferPostInsertInterceptor, RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
15 RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor, TablePostDeleteInterceptor,
16 TablePostInsertInterceptor, TablePreDeleteInterceptor, TablePreInsertInterceptor,
17 TablePreUpdateInterceptor,
18 },
19 interface::{
20 BoxedMultiVersionIter, CdcTransaction, CommandTransaction, MultiVersionCommandTransaction,
21 MultiVersionQueryTransaction, MultiVersionTransaction, MultiVersionValues, QueryTransaction,
22 SingleVersionTransaction, TransactionId, TransactionalChanges, TransactionalDefChanges, WithEventBus,
23 interceptor::{TransactionInterceptor, WithInterceptors},
24 },
25 return_error,
26 value::encoded::EncodedValues,
27};
28use reifydb_transaction::{
29 cdc::TransactionCdc,
30 multi::{TransactionMultiVersion, pending::PendingWrites},
31 single::TransactionSingleVersion,
32};
33
34use crate::transaction::query::StandardQueryTransaction;
35
36pub struct StandardCommandTransaction {
41 pub multi: TransactionMultiVersion,
42 pub single: TransactionSingleVersion,
43 pub(crate) cdc: TransactionCdc,
44 state: TransactionState,
45
46 pub(crate) cmd: Option<<TransactionMultiVersion as MultiVersionTransaction>::Command>,
47 pub(crate) event_bus: EventBus,
48 pub(crate) changes: TransactionalDefChanges,
49 pub(crate) catalog: MaterializedCatalog,
50
51 pub(crate) interceptors: Interceptors<Self>,
52 _not_send_sync: PhantomData<*const ()>,
54}
55
56#[derive(Clone, Copy, PartialEq)]
57enum TransactionState {
58 Active,
59 Committed,
60 RolledBack,
61}
62
63impl StandardCommandTransaction {
64 pub fn new(
66 multi: TransactionMultiVersion,
67 single: TransactionSingleVersion,
68 cdc: TransactionCdc,
69 event_bus: EventBus,
70 catalog: MaterializedCatalog,
71 interceptors: Interceptors<Self>,
72 ) -> reifydb_core::Result<Self> {
73 let cmd = multi.begin_command()?;
74 let txn_id = cmd.id();
75 Ok(Self {
76 cmd: Some(cmd),
77 multi,
78 single,
79 cdc,
80 state: TransactionState::Active,
81 event_bus,
82 catalog,
83 interceptors,
84 changes: TransactionalDefChanges::new(txn_id),
85 _not_send_sync: PhantomData,
86 })
87 }
88
89 pub fn event_bus(&self) -> &EventBus {
90 &self.event_bus
91 }
92
93 fn check_active(&self) -> crate::Result<()> {
96 match self.state {
97 TransactionState::Active => Ok(()),
98 TransactionState::Committed => {
99 return_error!(transaction::transaction_already_committed())
100 }
101 TransactionState::RolledBack => {
102 return_error!(transaction::transaction_already_rolled_back())
103 }
104 }
105 }
106
107 pub fn commit(&mut self) -> crate::Result<CommitVersion> {
111 self.check_active()?;
112
113 TransactionInterceptor::pre_commit(self)?;
114
115 if let Some(multi) = self.cmd.take() {
116 let id = multi.id();
117 self.state = TransactionState::Committed;
118
119 let changes = std::mem::take(&mut self.changes);
120
121 let version = multi.commit()?;
122 TransactionInterceptor::post_commit(self, id, version, changes)?;
123
124 Ok(version)
125 } else {
126 unreachable!("Transaction state inconsistency")
128 }
129 }
130
131 pub fn rollback(&mut self) -> crate::Result<()> {
133 self.check_active()?;
134 if let Some(multi) = self.cmd.take() {
135 self.state = TransactionState::RolledBack;
136 multi.rollback()
137 } else {
138 unreachable!("Transaction state inconsistency")
140 }
141 }
142
143 pub fn cdc(&self) -> &TransactionCdc {
145 &self.cdc
146 }
147
148 pub fn pending_writes(&self) -> &PendingWrites {
153 self.cmd.as_ref().unwrap().pending_writes()
154 }
155
156 pub fn with_single_query<F, R>(&self, f: F) -> crate::Result<R>
158 where
159 F: FnOnce(&mut <TransactionSingleVersion as SingleVersionTransaction>::Query<'_>) -> crate::Result<R>,
160 {
161 self.check_active()?;
162 self.single.with_query(f)
163 }
164
165 pub fn with_single_command<F, R>(&self, f: F) -> crate::Result<R>
167 where
168 F: FnOnce(&mut <TransactionSingleVersion as SingleVersionTransaction>::Command<'_>) -> crate::Result<R>,
169 {
170 self.check_active()?;
171 self.single.with_command(f)
172 }
173
174 pub fn with_multi_query<F, R>(&self, f: F) -> crate::Result<R>
178 where
179 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
180 {
181 self.check_active()?;
182
183 let mut query_txn = StandardQueryTransaction::new(
184 self.multi.begin_query()?,
185 self.single.clone(),
186 self.cdc.clone(),
187 self.catalog.clone(),
188 );
189
190 f(&mut query_txn)
191 }
192
193 pub fn with_multi_query_as_of_exclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
194 where
195 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
196 {
197 self.check_active()?;
198
199 let mut query_txn = StandardQueryTransaction::new(
200 self.multi.begin_query()?,
201 self.single.clone(),
202 self.cdc.clone(),
203 self.catalog.clone(),
204 );
205
206 query_txn.read_as_of_version_exclusive(version)?;
207
208 f(&mut query_txn)
209 }
210
211 pub fn with_multi_query_as_of_inclusive<F, R>(&self, version: CommitVersion, f: F) -> crate::Result<R>
212 where
213 F: FnOnce(&mut StandardQueryTransaction) -> crate::Result<R>,
214 {
215 self.check_active()?;
216
217 let mut query_txn = StandardQueryTransaction::new(
218 self.multi.begin_query()?,
219 self.single.clone(),
220 self.cdc.clone(),
221 self.catalog.clone(),
222 );
223
224 query_txn.read_as_of_version_inclusive(version)?;
225
226 f(&mut query_txn)
227 }
228}
229
230impl MaterializedCatalogTransaction for StandardCommandTransaction {
231 fn catalog(&self) -> &MaterializedCatalog {
232 &self.catalog
233 }
234}
235
236impl MultiVersionQueryTransaction for StandardCommandTransaction {
237 #[inline]
238 fn version(&self) -> CommitVersion {
239 self.cmd.as_ref().unwrap().version()
240 }
241
242 #[inline]
243 fn id(&self) -> TransactionId {
244 self.cmd.as_ref().unwrap().id()
245 }
246
247 #[inline]
248 fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
249 self.check_active()?;
250 self.cmd.as_mut().unwrap().get(key)
251 }
252
253 #[inline]
254 fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
255 self.check_active()?;
256 self.cmd.as_mut().unwrap().contains_key(key)
257 }
258
259 #[inline]
260 fn range_batched(&mut self, range: EncodedKeyRange, batch_size: u64) -> crate::Result<BoxedMultiVersionIter> {
261 self.check_active()?;
262 self.cmd.as_mut().unwrap().range_batched(range, batch_size)
263 }
264
265 #[inline]
266 fn range_rev_batched(
267 &mut self,
268 range: EncodedKeyRange,
269 batch_size: u64,
270 ) -> crate::Result<BoxedMultiVersionIter> {
271 self.check_active()?;
272 self.cmd.as_mut().unwrap().range_rev_batched(range, batch_size)
273 }
274
275 #[inline]
276 fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter> {
277 self.check_active()?;
278 self.cmd.as_mut().unwrap().prefix(prefix)
279 }
280
281 #[inline]
282 fn prefix_rev(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter> {
283 self.check_active()?;
284 self.cmd.as_mut().unwrap().prefix_rev(prefix)
285 }
286
287 #[inline]
288 fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
289 self.check_active()?;
290 self.cmd.as_mut().unwrap().read_as_of_version_exclusive(version)
291 }
292}
293
294impl MultiVersionCommandTransaction for StandardCommandTransaction {
295 #[inline]
296 fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
297 self.check_active()?;
298 self.cmd.as_mut().unwrap().set(key, row)
299 }
300
301 #[inline]
302 fn remove(&mut self, key: &EncodedKey) -> crate::Result<()> {
303 self.check_active()?;
304 self.cmd.as_mut().unwrap().remove(key)
305 }
306
307 #[inline]
308 fn commit(mut self) -> crate::Result<CommitVersion> {
309 self.check_active()?;
310 self.state = TransactionState::Committed;
311 self.cmd.take().unwrap().commit()
312 }
313
314 #[inline]
315 fn rollback(mut self) -> crate::Result<()> {
316 self.check_active()?;
317 self.state = TransactionState::RolledBack;
318 self.cmd.take().unwrap().rollback()
319 }
320}
321
322impl WithEventBus for StandardCommandTransaction {
323 fn event_bus(&self) -> &EventBus {
324 &self.event_bus
325 }
326}
327
328impl QueryTransaction for StandardCommandTransaction {
329 type SingleVersionQuery<'a> = <TransactionSingleVersion as SingleVersionTransaction>::Query<'a>;
330
331 type CdcQuery<'a> = <TransactionCdc as CdcTransaction>::Query<'a>;
332
333 fn begin_single_query(&self) -> crate::Result<Self::SingleVersionQuery<'_>> {
334 self.check_active()?;
335 self.single.begin_query()
336 }
337
338 fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
339 self.check_active()?;
340 self.cdc.begin_query()
341 }
342}
343
344impl CommandTransaction for StandardCommandTransaction {
345 type SingleVersionCommand<'a> = <TransactionSingleVersion as SingleVersionTransaction>::Command<'a>;
346
347 fn begin_single_command(&self) -> crate::Result<Self::SingleVersionCommand<'_>> {
348 self.check_active()?;
349 self.single.begin_command()
350 }
351
352 fn get_changes(&self) -> &TransactionalDefChanges {
353 &self.changes
354 }
355}
356
357impl WithInterceptors<StandardCommandTransaction> for StandardCommandTransaction {
358 fn table_pre_insert_interceptors(
359 &mut self,
360 ) -> &mut Chain<StandardCommandTransaction, dyn TablePreInsertInterceptor<StandardCommandTransaction>> {
361 &mut self.interceptors.table_pre_insert
362 }
363
364 fn table_post_insert_interceptors(
365 &mut self,
366 ) -> &mut Chain<StandardCommandTransaction, dyn TablePostInsertInterceptor<StandardCommandTransaction>> {
367 &mut self.interceptors.table_post_insert
368 }
369
370 fn table_pre_update_interceptors(
371 &mut self,
372 ) -> &mut Chain<StandardCommandTransaction, dyn TablePreUpdateInterceptor<StandardCommandTransaction>> {
373 &mut self.interceptors.table_pre_update
374 }
375
376 fn table_post_update_interceptors(
377 &mut self,
378 ) -> &mut Chain<
379 StandardCommandTransaction,
380 dyn interceptor::TablePostUpdateInterceptor<StandardCommandTransaction>,
381 > {
382 &mut self.interceptors.table_post_update
383 }
384
385 fn table_pre_delete_interceptors(
386 &mut self,
387 ) -> &mut Chain<StandardCommandTransaction, dyn TablePreDeleteInterceptor<StandardCommandTransaction>> {
388 &mut self.interceptors.table_pre_delete
389 }
390
391 fn table_post_delete_interceptors(
392 &mut self,
393 ) -> &mut Chain<StandardCommandTransaction, dyn TablePostDeleteInterceptor<StandardCommandTransaction>> {
394 &mut self.interceptors.table_post_delete
395 }
396
397 fn ring_buffer_pre_insert_interceptors(
398 &mut self,
399 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreInsertInterceptor<StandardCommandTransaction>> {
400 &mut self.interceptors.ring_buffer_pre_insert
401 }
402
403 fn ring_buffer_post_insert_interceptors(
404 &mut self,
405 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostInsertInterceptor<StandardCommandTransaction>> {
406 &mut self.interceptors.ring_buffer_post_insert
407 }
408
409 fn ring_buffer_pre_update_interceptors(
410 &mut self,
411 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreUpdateInterceptor<StandardCommandTransaction>> {
412 &mut self.interceptors.ring_buffer_pre_update
413 }
414
415 fn ring_buffer_post_update_interceptors(
416 &mut self,
417 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostUpdateInterceptor<StandardCommandTransaction>> {
418 &mut self.interceptors.ring_buffer_post_update
419 }
420
421 fn ring_buffer_pre_delete_interceptors(
422 &mut self,
423 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPreDeleteInterceptor<StandardCommandTransaction>> {
424 &mut self.interceptors.ring_buffer_pre_delete
425 }
426
427 fn ring_buffer_post_delete_interceptors(
428 &mut self,
429 ) -> &mut Chain<StandardCommandTransaction, dyn RingBufferPostDeleteInterceptor<StandardCommandTransaction>> {
430 &mut self.interceptors.ring_buffer_post_delete
431 }
432
433 fn pre_commit_interceptors(
434 &mut self,
435 ) -> &mut Chain<StandardCommandTransaction, dyn PreCommitInterceptor<StandardCommandTransaction>> {
436 &mut self.interceptors.pre_commit
437 }
438
439 fn post_commit_interceptors(
440 &mut self,
441 ) -> &mut Chain<StandardCommandTransaction, dyn PostCommitInterceptor<StandardCommandTransaction>> {
442 &mut self.interceptors.post_commit
443 }
444
445 fn namespace_def_post_create_interceptors(
447 &mut self,
448 ) -> &mut Chain<
449 StandardCommandTransaction,
450 dyn interceptor::NamespaceDefPostCreateInterceptor<StandardCommandTransaction>,
451 > {
452 &mut self.interceptors.namespace_def_post_create
453 }
454
455 fn namespace_def_pre_update_interceptors(
456 &mut self,
457 ) -> &mut Chain<
458 StandardCommandTransaction,
459 dyn interceptor::NamespaceDefPreUpdateInterceptor<StandardCommandTransaction>,
460 > {
461 &mut self.interceptors.namespace_def_pre_update
462 }
463
464 fn namespace_def_post_update_interceptors(
465 &mut self,
466 ) -> &mut Chain<
467 StandardCommandTransaction,
468 dyn interceptor::NamespaceDefPostUpdateInterceptor<StandardCommandTransaction>,
469 > {
470 &mut self.interceptors.namespace_def_post_update
471 }
472
473 fn namespace_def_pre_delete_interceptors(
474 &mut self,
475 ) -> &mut Chain<
476 StandardCommandTransaction,
477 dyn interceptor::NamespaceDefPreDeleteInterceptor<StandardCommandTransaction>,
478 > {
479 &mut self.interceptors.namespace_def_pre_delete
480 }
481
482 fn table_def_post_create_interceptors(
484 &mut self,
485 ) -> &mut Chain<
486 StandardCommandTransaction,
487 dyn interceptor::TableDefPostCreateInterceptor<StandardCommandTransaction>,
488 > {
489 &mut self.interceptors.table_def_post_create
490 }
491
492 fn table_def_pre_update_interceptors(
493 &mut self,
494 ) -> &mut Chain<
495 StandardCommandTransaction,
496 dyn interceptor::TableDefPreUpdateInterceptor<StandardCommandTransaction>,
497 > {
498 &mut self.interceptors.table_def_pre_update
499 }
500
501 fn table_def_post_update_interceptors(
502 &mut self,
503 ) -> &mut Chain<
504 StandardCommandTransaction,
505 dyn interceptor::TableDefPostUpdateInterceptor<StandardCommandTransaction>,
506 > {
507 &mut self.interceptors.table_def_post_update
508 }
509
510 fn table_def_pre_delete_interceptors(
511 &mut self,
512 ) -> &mut Chain<
513 StandardCommandTransaction,
514 dyn interceptor::TableDefPreDeleteInterceptor<StandardCommandTransaction>,
515 > {
516 &mut self.interceptors.table_def_pre_delete
517 }
518
519 fn view_def_post_create_interceptors(
521 &mut self,
522 ) -> &mut Chain<
523 StandardCommandTransaction,
524 dyn interceptor::ViewDefPostCreateInterceptor<StandardCommandTransaction>,
525 > {
526 &mut self.interceptors.view_def_post_create
527 }
528
529 fn view_def_pre_update_interceptors(
530 &mut self,
531 ) -> &mut Chain<
532 StandardCommandTransaction,
533 dyn interceptor::ViewDefPreUpdateInterceptor<StandardCommandTransaction>,
534 > {
535 &mut self.interceptors.view_def_pre_update
536 }
537
538 fn view_def_post_update_interceptors(
539 &mut self,
540 ) -> &mut Chain<
541 StandardCommandTransaction,
542 dyn interceptor::ViewDefPostUpdateInterceptor<StandardCommandTransaction>,
543 > {
544 &mut self.interceptors.view_def_post_update
545 }
546
547 fn view_def_pre_delete_interceptors(
548 &mut self,
549 ) -> &mut Chain<
550 StandardCommandTransaction,
551 dyn interceptor::ViewDefPreDeleteInterceptor<StandardCommandTransaction>,
552 > {
553 &mut self.interceptors.view_def_pre_delete
554 }
555}
556
557impl TransactionalChanges for StandardCommandTransaction {}
558
559impl Drop for StandardCommandTransaction {
560 fn drop(&mut self) {
561 if let Some(multi) = self.cmd.take() {
562 if self.state == TransactionState::Active {
565 let _ = multi.rollback();
566 }
567 }
568 }
569}