reifydb_sub_flow/transaction/mod.rs
1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::CommitVersion;
5use reifydb_engine::StandardCommandTransaction;
6
7mod commit;
8mod iter_range;
9mod metrics;
10mod pending;
11mod read;
12mod state;
13#[cfg(test)]
14mod test_utils;
15mod write;
16
17pub use metrics::FlowTransactionMetrics;
18pub use pending::{Pending, PendingWrites};
19use reifydb_core::interface::{MultiVersionQueryTransaction, MultiVersionTransaction};
20use reifydb_transaction::multi::StandardQueryTransaction;
21
22/// A transaction wrapper for parallel flow processing with snapshot isolation.
23///
24/// # Architecture
25///
26/// FlowTransaction enables parallel processing of independent data flows by providing:
27/// 1. **Snapshot reads** - via a wrapped StandardQueryTransaction reading at a fixed version
28/// 2. **Isolated writes** - via a local PendingWrites buffer unique to this transaction
29/// 3. **Sequential merge** - buffered writes are applied back to parent at commit time
30///
31/// # Read Path
32///
33/// All reads go through the wrapped `query` transaction, which provides a consistent
34/// snapshot view of the database at `version`. The query transaction is read-only and
35/// cannot modify the underlying storage.
36///
37/// For keys that have been modified locally:
38/// - Reads check the `pending` buffer first
39/// - If found there (or marked for removal), return the local value
40/// - Otherwise, delegate to the `query` transaction for the snapshot value
41///
42/// # Write Path
43///
44/// All writes (`set`, `remove`) go to the local `pending` buffer only:
45/// - Writes are NOT visible to the parent transaction
46/// - Writes are NOT visible to other FlowTransactions
47/// - Writes are NOT persisted to storage
48///
49/// The pending buffer is committed back to the parent transaction via `commit()`.
50///
51/// # Parallel Processing Pattern
52///
53/// ```ignore
54/// let mut parent = engine.begin_command()?;
55///
56/// // Create multiple FlowTransactions from shared parent reference
57/// // Each uses the CDC event version for proper snapshot isolation
58/// let flow_txns: Vec<FlowTransaction> = cdc_events
59/// .iter()
60/// .map(|cdc| FlowTransaction::new(&parent, cdc.version))
61/// .collect();
62///
63/// // Process in parallel (e.g., using rayon)
64/// let results: Vec<FlowTransaction> = flow_txns
65/// .into_par_iter()
66/// .map(|mut txn| {
67/// // Process flow, making reads and writes
68/// process_flow(&mut txn)?;
69/// Ok(txn)
70/// })
71/// .collect()?;
72///
73/// // Sequential merge back to parent
74/// for flow_txn in results {
75/// flow_txn.commit(&mut parent)?;
76/// }
77///
78/// // Atomic commit of all changes
79/// parent.commit()?;
80/// ```
81///
82/// # Thread Safety
83///
84/// FlowTransaction implements `Send` because:
85/// - `version` is Copy
86/// - `query` wraps Arc-based multi-version transaction (Send + Sync)
87/// - `pending` and `metrics` are owned and not shared
88///
89/// This allows FlowTransactions to be moved to worker threads for parallel processing.
90pub struct FlowTransaction {
91 /// CDC event version for snapshot isolation.
92 ///
93 /// This is the version at which the CDC event was generated, NOT the parent transaction version.
94 /// All reads through the query transaction see the database state as of this CDC version.
95 /// This guarantees proper snapshot isolation - the flow processes data as it existed when
96 /// the CDC event was created, regardless of concurrent modifications.
97 version: CommitVersion,
98
99 /// Local write buffer for uncommitted changes.
100 ///
101 /// Stores all `set()` and `remove()` operations made by this transaction.
102 /// NOT shared with other FlowTransactions. Changes are invisible until commit().
103 pending: PendingWrites,
104
105 /// Performance metrics tracking reads, writes, and other operations.
106 metrics: FlowTransactionMetrics,
107
108 /// Read-only query transaction for accessing multi-version storage.
109 ///
110 /// Provides snapshot reads at `version`. This is the primary read path for data
111 /// not in the `pending` buffer. The query transaction is configured at construction
112 /// time to read at a specific version and cannot be modified.
113 query: StandardQueryTransaction,
114}
115
116// SAFETY: FlowTransaction can be sent across threads because:
117// - version: CommitVersion is Copy (u64 wrapper)
118// - query: StandardQueryTransaction wraps Arc-based multi-version storage (Send + Sync)
119// - pending: PendingWrites is a BTreeMap owned by this transaction (Send)
120// - metrics: FlowTransactionMetrics contains primitive counters (Send)
121//
122// This enables parallel flow processing where each FlowTransaction is moved to a
123// worker thread via rayon or similar thread pool.
124unsafe impl Send for FlowTransaction {}
125
126impl FlowTransaction {
127 /// Create a new FlowTransaction from a parent transaction at a specific CDC version
128 ///
129 /// Takes a shared reference to the parent, allowing multiple FlowTransactions
130 /// to be created for parallel processing.
131 ///
132 /// # Parameters
133 /// * `parent` - The parent command transaction to derive from
134 /// * `version` - The CDC event version for snapshot isolation (NOT parent.version())
135 pub fn new(parent: &StandardCommandTransaction, version: CommitVersion) -> Self {
136 let mut query = parent.multi.begin_query().unwrap();
137 query.read_as_of_version_inclusive(version).unwrap();
138
139 Self {
140 version,
141 pending: PendingWrites::new(),
142 metrics: FlowTransactionMetrics::new(),
143 query,
144 }
145 }
146
147 /// Get the version this transaction is reading at
148 pub fn version(&self) -> CommitVersion {
149 self.version
150 }
151
152 /// Update the transaction to read at a new version
153 ///
154 /// This should be called after commit() and before processing the next unit of work.
155 /// The pending writes buffer is expected to be already cleared by the previous commit().
156 pub fn update_version(&mut self, new_version: CommitVersion) -> crate::Result<()> {
157 // Update the version field
158 self.version = new_version;
159
160 // Update the query transaction's read version
161 self.query.read_as_of_version_inclusive(new_version)?;
162
163 // Note: pending is already cleared by commit()
164 // Metrics are kept cumulative across units
165
166 Ok(())
167 }
168
169 /// Get immutable reference to the metrics
170 pub fn metrics(&self) -> &FlowTransactionMetrics {
171 &self.metrics
172 }
173}