reifydb_sub_flow/transaction/mod.rs
1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::CommitVersion;
5use reifydb_engine::StandardCommandTransaction;
6
7mod commit;
8mod iter_range;
9mod metrics;
10mod pending;
11mod read;
12mod state;
13#[cfg(test)]
14mod utils;
15mod write;
16
17pub use metrics::FlowTransactionMetrics;
18pub use pending::{Pending, PendingWrites};
19use reifydb_core::interface::{MultiVersionQueryTransaction, MultiVersionTransaction};
20use reifydb_transaction::multi::StandardQueryTransaction;
21
22/// A transaction wrapper for parallel flow processing with snapshot isolation.
23///
24/// # Architecture
25///
26/// FlowTransaction enables parallel processing of independent data flows by providing:
27/// 1. **Snapshot reads** - via a wrapped StandardQueryTransaction reading at a fixed version
28/// 2. **Isolated writes** - via a local PendingWrites buffer unique to this transaction
29/// 3. **Sequential merge** - buffered writes are applied back to parent at commit time
30///
31/// # Read Path
32///
33/// All reads go through the wrapped `query` transaction, which provides a consistent
34/// snapshot view of the database at `version`. The query transaction is read-only and
35/// cannot modify the underlying storage.
36///
37/// For keys that have been modified locally:
38/// - Reads check the `pending` buffer first
39/// - If found there (or marked for removal), return the local value
40/// - Otherwise, delegate to the `query` transaction for the snapshot value
41///
42/// # Write Path
43///
44/// All writes (`set`, `remove`) go to the local `pending` buffer only:
45/// - Writes are NOT visible to the parent transaction
46/// - Writes are NOT visible to other FlowTransactions
47/// - Writes are NOT persisted to storage
48///
49/// The pending buffer is committed back to the parent transaction via `commit()`.
50///
51/// # Parallel Processing Pattern
52///
53/// ```ignore
54/// let mut parent = engine.begin_command()?;
55///
56/// // Create multiple FlowTransactions from shared parent reference
57/// // Each uses the CDC event version for proper snapshot isolation
58/// let flow_txns: Vec<FlowTransaction> = cdc_events
59/// .iter()
60/// .map(|cdc| FlowTransaction::new(&parent, cdc.version))
61/// .collect();
62///
63/// // Process in parallel (e.g., using rayon)
64/// let results: Vec<FlowTransaction> = flow_txns
65/// .into_par_iter()
66/// .map(|mut txn| {
67/// // Process flow, making reads and writes
68/// process_flow(&mut txn)?;
69/// Ok(txn)
70/// })
71/// .collect()?;
72///
73/// // Sequential merge back to parent
74/// for flow_txn in results {
75/// flow_txn.commit(&mut parent)?;
76/// }
77///
78/// // Atomic commit of all changes
79/// parent.commit()?;
80/// ```
81///
82/// # Thread Safety
83///
84/// FlowTransaction implements `Send` because:
85/// - `version` is Copy
86/// - `query` wraps Arc-based multi-version transaction (Send + Sync)
87/// - `pending` and `metrics` are owned and not shared
88///
89/// This allows FlowTransactions to be moved to worker threads for parallel processing.
90pub struct FlowTransaction {
91 /// CDC event version for snapshot isolation.
92 ///
93 /// This is the version at which the CDC event was generated, NOT the parent transaction version.
94 /// Source data reads see the database state as of this CDC version.
95 /// This guarantees proper snapshot isolation - the flow processes data as it existed when
96 /// the CDC event was created, regardless of concurrent modifications.
97 version: CommitVersion,
98
99 /// Local write buffer for uncommitted changes.
100 ///
101 /// Stores all `set()` and `remove()` operations made by this transaction.
102 /// NOT shared with other FlowTransactions. Changes are invisible until commit().
103 pending: PendingWrites,
104
105 /// Performance metrics tracking reads, writes, and other operations.
106 metrics: FlowTransactionMetrics,
107
108 /// Read-only query transaction for accessing source data at CDC snapshot version.
109 ///
110 /// Provides snapshot reads at `version`. Used for reading source tables/views
111 /// to ensure consistent view of the data being processed by the flow.
112 source_query: StandardQueryTransaction,
113
114 /// Read-only query transaction for accessing flow state at latest version.
115 ///
116 /// Reads at the latest committed version. Used for reading flow state
117 /// (join tables, distinct values, counters) that must be visible across
118 /// all CDC versions to maintain continuity.
119 state_query: StandardQueryTransaction,
120}
121
122impl FlowTransaction {
123 /// Create a new FlowTransaction from a parent transaction at a specific CDC version
124 ///
125 /// Takes a shared reference to the parent, allowing multiple FlowTransactions
126 /// to be created for parallel processing.
127 ///
128 /// # Parameters
129 /// * `parent` - The parent command transaction to derive from
130 /// * `version` - The CDC event version for snapshot isolation (NOT parent.version())
131 pub fn new(parent: &StandardCommandTransaction, version: CommitVersion) -> Self {
132 let mut source_query = parent.multi.begin_query().unwrap();
133 source_query.read_as_of_version_inclusive(version).unwrap();
134
135 let state_query = parent.multi.begin_query().unwrap();
136 Self {
137 version,
138 pending: PendingWrites::new(),
139 metrics: FlowTransactionMetrics::new(),
140 source_query,
141 state_query,
142 }
143 }
144
145 /// Get the version this transaction is reading at
146 pub fn version(&self) -> CommitVersion {
147 self.version
148 }
149
150 /// Update the transaction to read at a new version
151 pub fn update_version(&mut self, new_version: CommitVersion) -> crate::Result<()> {
152 self.version = new_version;
153 self.source_query.read_as_of_version_inclusive(new_version)?;
154 Ok(())
155 }
156
157 /// Get immutable reference to the metrics
158 pub fn metrics(&self) -> &FlowTransactionMetrics {
159 &self.metrics
160 }
161}