1use std::{collections::HashMap, mem, sync::Arc};
5
6use read::ReadFrom;
7use reifydb_catalog::catalog::Catalog;
8use reifydb_core::{
9 actors::pending::{Pending, PendingWrite},
10 common::CommitVersion,
11 encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
12 interface::{
13 catalog::{flow::FlowNodeId, shape::ShapeId},
14 change::{Change, ChangeOrigin, Diff},
15 },
16};
17use reifydb_runtime::context::clock::Clock;
18use reifydb_transaction::{
19 change_accumulator::ChangeAccumulator,
20 interceptor::{
21 WithInterceptors,
22 authentication::{AuthenticationPostCreateInterceptor, AuthenticationPreDeleteInterceptor},
23 chain::InterceptorChain as Chain,
24 dictionary::{
25 DictionaryPostCreateInterceptor, DictionaryPostUpdateInterceptor,
26 DictionaryPreDeleteInterceptor, DictionaryPreUpdateInterceptor,
27 },
28 dictionary_row::{
29 DictionaryRowPostDeleteInterceptor, DictionaryRowPostInsertInterceptor,
30 DictionaryRowPostUpdateInterceptor, DictionaryRowPreDeleteInterceptor,
31 DictionaryRowPreInsertInterceptor, DictionaryRowPreUpdateInterceptor,
32 },
33 granted_role::{GrantedRolePostCreateInterceptor, GrantedRolePreDeleteInterceptor},
34 identity::{
35 IdentityPostCreateInterceptor, IdentityPostUpdateInterceptor, IdentityPreDeleteInterceptor,
36 IdentityPreUpdateInterceptor,
37 },
38 interceptors::Interceptors,
39 namespace::{
40 NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
41 NamespacePreUpdateInterceptor,
42 },
43 ringbuffer::{
44 RingBufferPostCreateInterceptor, RingBufferPostUpdateInterceptor,
45 RingBufferPreDeleteInterceptor, RingBufferPreUpdateInterceptor,
46 },
47 ringbuffer_row::{
48 RingBufferRowPostDeleteInterceptor, RingBufferRowPostInsertInterceptor,
49 RingBufferRowPostUpdateInterceptor, RingBufferRowPreDeleteInterceptor,
50 RingBufferRowPreInsertInterceptor, RingBufferRowPreUpdateInterceptor,
51 },
52 role::{
53 RolePostCreateInterceptor, RolePostUpdateInterceptor, RolePreDeleteInterceptor,
54 RolePreUpdateInterceptor,
55 },
56 series::{
57 SeriesPostCreateInterceptor, SeriesPostUpdateInterceptor, SeriesPreDeleteInterceptor,
58 SeriesPreUpdateInterceptor,
59 },
60 series_row::{
61 SeriesRowPostDeleteInterceptor, SeriesRowPostInsertInterceptor, SeriesRowPostUpdateInterceptor,
62 SeriesRowPreDeleteInterceptor, SeriesRowPreInsertInterceptor, SeriesRowPreUpdateInterceptor,
63 },
64 table::{
65 TablePostCreateInterceptor, TablePostUpdateInterceptor, TablePreDeleteInterceptor,
66 TablePreUpdateInterceptor,
67 },
68 table_row::{
69 TableRowPostDeleteInterceptor, TableRowPostInsertInterceptor, TableRowPostUpdateInterceptor,
70 TableRowPreDeleteInterceptor, TableRowPreInsertInterceptor, TableRowPreUpdateInterceptor,
71 },
72 transaction::{PostCommitInterceptor, PreCommitInterceptor},
73 view::{
74 ViewPostCreateInterceptor, ViewPostUpdateInterceptor, ViewPreDeleteInterceptor,
75 ViewPreUpdateInterceptor,
76 },
77 view_row::{
78 ViewRowPostDeleteInterceptor, ViewRowPostInsertInterceptor, ViewRowPostUpdateInterceptor,
79 ViewRowPreDeleteInterceptor, ViewRowPreInsertInterceptor, ViewRowPreUpdateInterceptor,
80 },
81 },
82 multi::transaction::read::MultiReadTransaction,
83 transaction::admin::AdminTransaction,
84};
85use reifydb_type::Result;
86use tracing::instrument;
87
88pub mod read;
89pub mod slot;
90pub mod state;
91pub mod write;
92
93use slot::{OperatorStateSlot, PersistFn};
94
95pub struct TransactionalParams {
97 pub version: CommitVersion,
98 pub pending: Pending,
99 pub base_pending: Pending,
100 pub query: MultiReadTransaction,
101 pub state_query: MultiReadTransaction,
102 pub catalog: Catalog,
103 pub interceptors: Interceptors,
104 pub clock: Clock,
105 pub view_overlay: Arc<Vec<Change>>,
111}
112
113pub struct FlowTransactionInner {
115 pub version: CommitVersion,
116 pub pending: Pending,
117 pub pending_shapes: Vec<RowShape>,
118 pub query: MultiReadTransaction,
119 pub state_query: Option<MultiReadTransaction>,
120 pub catalog: Catalog,
121 pub interceptors: Interceptors,
122 pub accumulator: ChangeAccumulator,
123 pub clock: Clock,
124 pub operator_states: HashMap<FlowNodeId, OperatorStateSlot>,
128}
129
130pub enum FlowTransaction {
131 Deferred {
134 inner: FlowTransactionInner,
135 },
136
137 Transactional {
140 inner: FlowTransactionInner,
141 base_pending: Pending,
143 view_overlay: Arc<Vec<Change>>,
148 },
149
150 Ephemeral {
156 inner: FlowTransactionInner,
157 state: HashMap<EncodedKey, EncodedRow>,
159 },
160}
161
162impl FlowTransaction {
163 fn inner(&self) -> &FlowTransactionInner {
164 match self {
165 Self::Deferred {
166 inner,
167 ..
168 }
169 | Self::Transactional {
170 inner,
171 ..
172 }
173 | Self::Ephemeral {
174 inner,
175 ..
176 } => inner,
177 }
178 }
179
180 pub(crate) fn inner_mut(&mut self) -> &mut FlowTransactionInner {
181 match self {
182 Self::Deferred {
183 inner,
184 ..
185 }
186 | Self::Transactional {
187 inner,
188 ..
189 }
190 | Self::Ephemeral {
191 inner,
192 ..
193 } => inner,
194 }
195 }
196
197 #[instrument(name = "flow::transaction::deferred", level = "debug", skip(parent, catalog, interceptors, clock), fields(version = version.0))]
202 pub fn deferred(
203 parent: &AdminTransaction,
204 version: CommitVersion,
205 catalog: Catalog,
206 interceptors: Interceptors,
207 clock: Clock,
208 ) -> Self {
209 let mut query = parent.multi.begin_query().unwrap();
210 query.read_as_of_version_inclusive(version);
211
212 let state_query = parent.multi.begin_query().unwrap();
213 Self::Deferred {
214 inner: FlowTransactionInner {
215 version,
216 pending: Pending::new(),
217 pending_shapes: Vec::new(),
218 query,
219 state_query: Some(state_query),
220 catalog,
221 interceptors,
222 accumulator: ChangeAccumulator::new(),
223 clock,
224 operator_states: HashMap::new(),
225 },
226 }
227 }
228
229 pub fn deferred_from_parts(
233 version: CommitVersion,
234 pending: Pending,
235 query: MultiReadTransaction,
236 state_query: MultiReadTransaction,
237 catalog: Catalog,
238 interceptors: Interceptors,
239 clock: Clock,
240 ) -> Self {
241 Self::Deferred {
242 inner: FlowTransactionInner {
243 version,
244 pending,
245 pending_shapes: Vec::new(),
246 query,
247 state_query: Some(state_query),
248 catalog,
249 interceptors,
250 accumulator: ChangeAccumulator::new(),
251 clock,
252 operator_states: HashMap::new(),
253 },
254 }
255 }
256
257 pub fn transactional(params: TransactionalParams) -> Self {
263 Self::Transactional {
264 inner: FlowTransactionInner {
265 version: params.version,
266 pending: params.pending,
267 pending_shapes: Vec::new(),
268 query: params.query,
269 state_query: Some(params.state_query),
270 catalog: params.catalog,
271 interceptors: params.interceptors,
272 accumulator: ChangeAccumulator::new(),
273 clock: params.clock,
274 operator_states: HashMap::new(),
275 },
276 base_pending: params.base_pending,
277 view_overlay: params.view_overlay,
278 }
279 }
280
281 pub fn view_overlay(&self) -> Option<Arc<Vec<Change>>> {
288 match self {
289 Self::Transactional {
290 view_overlay,
291 ..
292 } => Some(Arc::clone(view_overlay)),
293 _ => None,
294 }
295 }
296
297 pub fn ephemeral(
303 version: CommitVersion,
304 query: MultiReadTransaction,
305 catalog: Catalog,
306 state: HashMap<EncodedKey, EncodedRow>,
307 clock: Clock,
308 ) -> Self {
309 let mut pq = query;
310 pq.read_as_of_version_inclusive(version);
311
312 Self::Ephemeral {
313 inner: FlowTransactionInner {
314 version,
315 pending: Pending::new(),
316 pending_shapes: Vec::new(),
317 query: pq,
318 state_query: None,
319 catalog,
320 interceptors: Interceptors::new(),
321 accumulator: ChangeAccumulator::new(),
322 clock,
323 operator_states: HashMap::new(),
324 },
325 state,
326 }
327 }
328
329 pub fn merge_state(&mut self) {
338 if let Self::Ephemeral {
339 inner,
340 state,
341 } = self
342 {
343 for (key, write) in inner.pending.iter_sorted() {
344 if matches!(Self::read_from(key), ReadFrom::StateQuery) {
345 match write {
346 PendingWrite::Set(row) => {
347 state.insert(key.clone(), row.clone());
348 }
349 PendingWrite::Remove => {
350 state.remove(key);
351 }
352 }
353 }
354 }
355 inner.pending = Pending::new();
356 }
357 }
358
359 pub fn take_state(&mut self) -> HashMap<EncodedKey, EncodedRow> {
364 if let Self::Ephemeral {
365 state,
366 ..
367 } = self
368 {
369 mem::take(state)
370 } else {
371 HashMap::new()
372 }
373 }
374
375 pub fn version(&self) -> CommitVersion {
377 self.inner().version
378 }
379
380 pub fn take_pending(&mut self) -> Pending {
382 mem::take(&mut self.inner_mut().pending)
383 }
384
385 pub fn take_pending_shapes(&mut self) -> Vec<RowShape> {
387 mem::take(&mut self.inner_mut().pending_shapes)
388 }
389
390 pub fn track_flow_change(&mut self, change: Change) {
392 if let ChangeOrigin::Shape(id) = change.origin {
393 for diff in change.diffs {
394 self.inner_mut().accumulator.track(id, diff);
395 }
396 }
397 }
398
399 pub fn take_accumulator_entries(&mut self) -> Vec<(ShapeId, Diff)> {
401 let acc = &mut self.inner_mut().accumulator;
402 let entries: Vec<_> = acc.entries_from(0).to_vec();
403 acc.clear();
404 entries
405 }
406
407 #[cfg(test)]
409 pub fn pending(&self) -> &Pending {
410 &self.inner().pending
411 }
412
413 pub fn update_version(&mut self, new_version: CommitVersion) {
415 let inner = self.inner_mut();
416 inner.version = new_version;
417 inner.query.read_as_of_version_inclusive(new_version);
418 }
419
420 pub fn catalog(&self) -> &Catalog {
422 &self.inner().catalog
423 }
424
425 pub fn clock(&self) -> &Clock {
427 &self.inner().clock
428 }
429
430 pub fn operator_state<S, F>(&mut self, node: FlowNodeId, load: F) -> Result<&mut S>
440 where
441 S: 'static + Send,
442 F: FnOnce(&mut Self) -> Result<(S, PersistFn)>,
443 {
444 if !self.inner().operator_states.contains_key(&node) {
445 let (state, persist) = load(self)?;
446 let slot = OperatorStateSlot {
447 value: Box::new(state),
448 dirty: false,
449 persist,
450 };
451 self.inner_mut().operator_states.insert(node, slot);
452 }
453 let slot = self.inner_mut().operator_states.get_mut(&node).expect("just inserted");
454 Ok(slot.value.downcast_mut::<S>().expect("operator state type mismatch"))
455 }
456
457 pub fn mark_state_dirty(&mut self, node: FlowNodeId) {
460 if let Some(slot) = self.inner_mut().operator_states.get_mut(&node) {
461 slot.dirty = true;
462 }
463 }
464
465 pub fn take_operator_state<S, F>(&mut self, node: FlowNodeId, load: F) -> Result<(S, PersistFn)>
472 where
473 S: 'static + Send,
474 F: FnOnce(&mut Self) -> Result<(S, PersistFn)>,
475 {
476 if let Some(slot) = self.inner_mut().operator_states.remove(&node) {
477 let value = slot.value.downcast::<S>().map_err(|_| ()).expect("operator state type mismatch");
478 Ok((*value, slot.persist))
479 } else {
480 load(self)
481 }
482 }
483
484 pub fn put_operator_state<S>(&mut self, node: FlowNodeId, state: S, persist: PersistFn)
487 where
488 S: 'static + Send,
489 {
490 self.inner_mut().operator_states.insert(
491 node,
492 OperatorStateSlot {
493 value: Box::new(state),
494 dirty: true,
495 persist,
496 },
497 );
498 }
499
500 pub fn flush_operator_states(&mut self) -> Result<()> {
504 let states = mem::take(&mut self.inner_mut().operator_states);
505 for (_, slot) in states {
506 if slot.dirty {
507 (slot.persist)(self, slot.value)?;
508 }
509 }
510 Ok(())
511 }
512}
513
514macro_rules! interceptor_method {
515 ($method:ident, $field:ident, $trait_name:ident) => {
516 fn $method(&mut self) -> &mut Chain<dyn $trait_name + Send + Sync> {
517 &mut self.inner_mut().interceptors.$field
518 }
519 };
520}
521
522impl WithInterceptors for FlowTransaction {
523 interceptor_method!(table_row_pre_insert_interceptors, table_row_pre_insert, TableRowPreInsertInterceptor);
524 interceptor_method!(table_row_post_insert_interceptors, table_row_post_insert, TableRowPostInsertInterceptor);
525 interceptor_method!(table_row_pre_update_interceptors, table_row_pre_update, TableRowPreUpdateInterceptor);
526 interceptor_method!(table_row_post_update_interceptors, table_row_post_update, TableRowPostUpdateInterceptor);
527 interceptor_method!(table_row_pre_delete_interceptors, table_row_pre_delete, TableRowPreDeleteInterceptor);
528 interceptor_method!(table_row_post_delete_interceptors, table_row_post_delete, TableRowPostDeleteInterceptor);
529
530 interceptor_method!(
531 ringbuffer_row_pre_insert_interceptors,
532 ringbuffer_row_pre_insert,
533 RingBufferRowPreInsertInterceptor
534 );
535 interceptor_method!(
536 ringbuffer_row_post_insert_interceptors,
537 ringbuffer_row_post_insert,
538 RingBufferRowPostInsertInterceptor
539 );
540 interceptor_method!(
541 ringbuffer_row_pre_update_interceptors,
542 ringbuffer_row_pre_update,
543 RingBufferRowPreUpdateInterceptor
544 );
545 interceptor_method!(
546 ringbuffer_row_post_update_interceptors,
547 ringbuffer_row_post_update,
548 RingBufferRowPostUpdateInterceptor
549 );
550 interceptor_method!(
551 ringbuffer_row_pre_delete_interceptors,
552 ringbuffer_row_pre_delete,
553 RingBufferRowPreDeleteInterceptor
554 );
555 interceptor_method!(
556 ringbuffer_row_post_delete_interceptors,
557 ringbuffer_row_post_delete,
558 RingBufferRowPostDeleteInterceptor
559 );
560
561 interceptor_method!(pre_commit_interceptors, pre_commit, PreCommitInterceptor);
562 interceptor_method!(post_commit_interceptors, post_commit, PostCommitInterceptor);
563
564 interceptor_method!(namespace_post_create_interceptors, namespace_post_create, NamespacePostCreateInterceptor);
565 interceptor_method!(namespace_pre_update_interceptors, namespace_pre_update, NamespacePreUpdateInterceptor);
566 interceptor_method!(namespace_post_update_interceptors, namespace_post_update, NamespacePostUpdateInterceptor);
567 interceptor_method!(namespace_pre_delete_interceptors, namespace_pre_delete, NamespacePreDeleteInterceptor);
568
569 interceptor_method!(table_post_create_interceptors, table_post_create, TablePostCreateInterceptor);
570 interceptor_method!(table_pre_update_interceptors, table_pre_update, TablePreUpdateInterceptor);
571 interceptor_method!(table_post_update_interceptors, table_post_update, TablePostUpdateInterceptor);
572 interceptor_method!(table_pre_delete_interceptors, table_pre_delete, TablePreDeleteInterceptor);
573
574 interceptor_method!(view_row_pre_insert_interceptors, view_row_pre_insert, ViewRowPreInsertInterceptor);
575 interceptor_method!(view_row_post_insert_interceptors, view_row_post_insert, ViewRowPostInsertInterceptor);
576 interceptor_method!(view_row_pre_update_interceptors, view_row_pre_update, ViewRowPreUpdateInterceptor);
577 interceptor_method!(view_row_post_update_interceptors, view_row_post_update, ViewRowPostUpdateInterceptor);
578 interceptor_method!(view_row_pre_delete_interceptors, view_row_pre_delete, ViewRowPreDeleteInterceptor);
579 interceptor_method!(view_row_post_delete_interceptors, view_row_post_delete, ViewRowPostDeleteInterceptor);
580
581 interceptor_method!(view_post_create_interceptors, view_post_create, ViewPostCreateInterceptor);
582 interceptor_method!(view_pre_update_interceptors, view_pre_update, ViewPreUpdateInterceptor);
583 interceptor_method!(view_post_update_interceptors, view_post_update, ViewPostUpdateInterceptor);
584 interceptor_method!(view_pre_delete_interceptors, view_pre_delete, ViewPreDeleteInterceptor);
585
586 interceptor_method!(
587 ringbuffer_post_create_interceptors,
588 ringbuffer_post_create,
589 RingBufferPostCreateInterceptor
590 );
591 interceptor_method!(ringbuffer_pre_update_interceptors, ringbuffer_pre_update, RingBufferPreUpdateInterceptor);
592 interceptor_method!(
593 ringbuffer_post_update_interceptors,
594 ringbuffer_post_update,
595 RingBufferPostUpdateInterceptor
596 );
597 interceptor_method!(ringbuffer_pre_delete_interceptors, ringbuffer_pre_delete, RingBufferPreDeleteInterceptor);
598
599 interceptor_method!(
600 dictionary_row_pre_insert_interceptors,
601 dictionary_row_pre_insert,
602 DictionaryRowPreInsertInterceptor
603 );
604 interceptor_method!(
605 dictionary_row_post_insert_interceptors,
606 dictionary_row_post_insert,
607 DictionaryRowPostInsertInterceptor
608 );
609 interceptor_method!(
610 dictionary_row_pre_update_interceptors,
611 dictionary_row_pre_update,
612 DictionaryRowPreUpdateInterceptor
613 );
614 interceptor_method!(
615 dictionary_row_post_update_interceptors,
616 dictionary_row_post_update,
617 DictionaryRowPostUpdateInterceptor
618 );
619 interceptor_method!(
620 dictionary_row_pre_delete_interceptors,
621 dictionary_row_pre_delete,
622 DictionaryRowPreDeleteInterceptor
623 );
624 interceptor_method!(
625 dictionary_row_post_delete_interceptors,
626 dictionary_row_post_delete,
627 DictionaryRowPostDeleteInterceptor
628 );
629
630 interceptor_method!(
631 dictionary_post_create_interceptors,
632 dictionary_post_create,
633 DictionaryPostCreateInterceptor
634 );
635 interceptor_method!(dictionary_pre_update_interceptors, dictionary_pre_update, DictionaryPreUpdateInterceptor);
636 interceptor_method!(
637 dictionary_post_update_interceptors,
638 dictionary_post_update,
639 DictionaryPostUpdateInterceptor
640 );
641 interceptor_method!(dictionary_pre_delete_interceptors, dictionary_pre_delete, DictionaryPreDeleteInterceptor);
642
643 interceptor_method!(series_row_pre_insert_interceptors, series_row_pre_insert, SeriesRowPreInsertInterceptor);
644 interceptor_method!(
645 series_row_post_insert_interceptors,
646 series_row_post_insert,
647 SeriesRowPostInsertInterceptor
648 );
649 interceptor_method!(series_row_pre_update_interceptors, series_row_pre_update, SeriesRowPreUpdateInterceptor);
650 interceptor_method!(
651 series_row_post_update_interceptors,
652 series_row_post_update,
653 SeriesRowPostUpdateInterceptor
654 );
655 interceptor_method!(series_row_pre_delete_interceptors, series_row_pre_delete, SeriesRowPreDeleteInterceptor);
656 interceptor_method!(
657 series_row_post_delete_interceptors,
658 series_row_post_delete,
659 SeriesRowPostDeleteInterceptor
660 );
661
662 interceptor_method!(series_post_create_interceptors, series_post_create, SeriesPostCreateInterceptor);
663 interceptor_method!(series_pre_update_interceptors, series_pre_update, SeriesPreUpdateInterceptor);
664 interceptor_method!(series_post_update_interceptors, series_post_update, SeriesPostUpdateInterceptor);
665 interceptor_method!(series_pre_delete_interceptors, series_pre_delete, SeriesPreDeleteInterceptor);
666 interceptor_method!(identity_post_create_interceptors, identity_post_create, IdentityPostCreateInterceptor);
667 interceptor_method!(identity_pre_update_interceptors, identity_pre_update, IdentityPreUpdateInterceptor);
668 interceptor_method!(identity_post_update_interceptors, identity_post_update, IdentityPostUpdateInterceptor);
669 interceptor_method!(identity_pre_delete_interceptors, identity_pre_delete, IdentityPreDeleteInterceptor);
670 interceptor_method!(role_post_create_interceptors, role_post_create, RolePostCreateInterceptor);
671 interceptor_method!(role_pre_update_interceptors, role_pre_update, RolePreUpdateInterceptor);
672 interceptor_method!(role_post_update_interceptors, role_post_update, RolePostUpdateInterceptor);
673 interceptor_method!(role_pre_delete_interceptors, role_pre_delete, RolePreDeleteInterceptor);
674 interceptor_method!(
675 granted_role_post_create_interceptors,
676 granted_role_post_create,
677 GrantedRolePostCreateInterceptor
678 );
679 interceptor_method!(
680 granted_role_pre_delete_interceptors,
681 granted_role_pre_delete,
682 GrantedRolePreDeleteInterceptor
683 );
684 interceptor_method!(
685 authentication_post_create_interceptors,
686 authentication_post_create,
687 AuthenticationPostCreateInterceptor
688 );
689 interceptor_method!(
690 authentication_pre_delete_interceptors,
691 authentication_pre_delete,
692 AuthenticationPreDeleteInterceptor
693 );
694}