reifydb_sub_flow/transaction/mod.rs
1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_catalog::transaction::MaterializedCatalogTransaction;
5use reifydb_core::{CommitVersion, interface::ViewDef, value::encoded::EncodedValues};
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_type::RowNumber;
8use tracing::instrument;
9
10mod commit;
11mod iter_range;
12mod metrics;
13mod pending;
14mod read;
15mod state;
16#[cfg(test)]
17mod utils;
18mod write;
19
20pub use metrics::FlowTransactionMetrics;
21pub use pending::{Pending, PendingWrites};
22use reifydb_transaction::multi::StandardQueryTransaction;
23
24/// Represents a pending view operation that needs interceptor calls at commit time.
25#[derive(Debug, Clone)]
26pub enum ViewPending {
27 /// Insert a new row into the view
28 Insert {
29 view: ViewDef,
30 row_number: RowNumber,
31 row: EncodedValues,
32 },
33 /// Update an existing row in the view (remove old, insert new)
34 Update {
35 view: ViewDef,
36 old_row_number: RowNumber,
37 new_row_number: RowNumber,
38 row: EncodedValues,
39 },
40 /// Remove a row from the view
41 Remove {
42 view: ViewDef,
43 row_number: RowNumber,
44 },
45}
46
47/// A transaction wrapper for parallel flow processing with snapshot isolation.
48///
49/// # Architecture
50///
51/// FlowTransaction enables parallel processing of independent data flows by providing:
52/// 1. **Snapshot reads** - via a wrapped StandardQueryTransaction reading at a fixed version
53/// 2. **Isolated writes** - via a local PendingWrites buffer unique to this transaction
54/// 3. **Sequential merge** - buffered writes are applied back to parent at commit time
55///
56/// # Read Path
57///
58/// All reads go through the wrapped `query` transaction, which provides a consistent
59/// snapshot view of the database at `version`. The query transaction is read-only and
60/// cannot modify the underlying storage.
61///
62/// For keys that have been modified locally:
63/// - Reads check the `pending` buffer first
64/// - If found there (or marked for removal), return the local value
65/// - Otherwise, delegate to the `query` transaction for the snapshot value
66///
67/// # Write Path
68///
69/// All writes (`set`, `remove`) go to the local `pending` buffer only:
70/// - Writes are NOT visible to the parent transaction
71/// - Writes are NOT visible to other FlowTransactions
72/// - Writes are NOT persisted to storage
73///
74/// The pending buffer is committed back to the parent transaction via `commit()`.
75///
76/// # Parallel Processing Pattern
77///
78/// ```ignore
79/// let mut parent = engine.begin_command()?;
80///
81/// // Create multiple FlowTransactions from shared parent reference
82/// // Each uses the CDC event version for proper snapshot isolation
83/// let flow_txns: Vec<FlowTransaction> = cdc_events
84/// .iter()
85/// .map(|cdc| FlowTransaction::new(&parent, cdc.version))
86/// .collect();
87///
88/// // Process in parallel (e.g., using rayon)
89/// let results: Vec<FlowTransaction> = flow_txns
90/// .into_par_iter()
91/// .map(|mut txn| {
92/// // Process flow, making reads and writes
93/// process_flow(&mut txn)?;
94/// Ok(txn)
95/// })
96/// .collect()?;
97///
98/// // Sequential merge back to parent
99/// for flow_txn in results {
100/// flow_txn.commit(&mut parent)?;
101/// }
102///
103/// // Atomic commit of all changes
104/// parent.commit()?;
105/// ```
106///
107/// # Thread Safety
108///
109/// FlowTransaction implements `Send` because:
110/// - `version` is Copy
111/// - `query` wraps Arc-based multi-version transaction (Send + Sync)
112/// - `pending` and `metrics` are owned and not shared
113///
114/// This allows FlowTransactions to be moved to worker threads for parallel processing.
115pub struct FlowTransaction {
116 /// CDC event version for snapshot isolation.
117 ///
118 /// This is the version at which the CDC event was generated, NOT the parent transaction version.
119 /// Source data reads see the database state as of this CDC version.
120 /// This guarantees proper snapshot isolation - the flow processes data as it existed when
121 /// the CDC event was created, regardless of concurrent modifications.
122 version: CommitVersion,
123
124 /// Local write buffer for uncommitted changes.
125 ///
126 /// Stores all `set()` and `remove()` operations made by this transaction.
127 /// NOT shared with other FlowTransactions. Changes are invisible until commit().
128 pending: PendingWrites,
129
130 /// Pending view operations that require interceptor calls at commit time.
131 ///
132 /// View operations are tracked separately because they need to invoke
133 /// ViewInterceptor::pre_insert/post_insert etc. on the parent transaction
134 /// when committing. The parent transaction has access to the interceptor chains,
135 /// while FlowTransaction does not.
136 view_pending: Vec<ViewPending>,
137
138 /// Performance metrics tracking reads, writes, and other operations.
139 metrics: FlowTransactionMetrics,
140
141 /// Read-only query transaction for accessing source data at CDC snapshot version.
142 ///
143 /// Provides snapshot reads at `version`. Used for reading source tables/views
144 /// to ensure consistent view of the data being processed by the flow.
145 source_query: StandardQueryTransaction,
146
147 /// Read-only query transaction for accessing flow state at latest version.
148 ///
149 /// Reads at the latest committed version. Used for reading flow state
150 /// (join tables, distinct values, counters) that must be visible across
151 /// all CDC versions to maintain continuity.
152 state_query: StandardQueryTransaction,
153
154 /// Catalog for metadata access (cloned from parent, Arc-based so cheap)
155 catalog: reifydb_catalog::MaterializedCatalog,
156}
157
158impl FlowTransaction {
159 /// Create a new FlowTransaction from a parent transaction at a specific CDC version
160 ///
161 /// Takes a shared reference to the parent, allowing multiple FlowTransactions
162 /// to be created for parallel processing.
163 ///
164 /// # Parameters
165 /// * `parent` - The parent command transaction to derive from
166 /// * `version` - The CDC event version for snapshot isolation (NOT parent.version())
167 #[instrument(level = "debug", skip(parent), fields(version = version.0))]
168 pub fn new(parent: &StandardCommandTransaction, version: CommitVersion) -> Self {
169 let mut source_query = parent.multi.begin_query().unwrap();
170 source_query.read_as_of_version_inclusive(version);
171
172 let state_query = parent.multi.begin_query().unwrap();
173 Self {
174 version,
175 pending: PendingWrites::new(),
176 view_pending: Vec::new(),
177 metrics: FlowTransactionMetrics::new(),
178 source_query,
179 state_query,
180 catalog: parent.catalog().clone(),
181 }
182 }
183
184 /// Get the version this transaction is reading at
185 pub fn version(&self) -> CommitVersion {
186 self.version
187 }
188
189 /// Update the transaction to read at a new version
190 pub fn update_version(&mut self, new_version: CommitVersion) {
191 self.version = new_version;
192 self.source_query.read_as_of_version_inclusive(new_version);
193 }
194
195 /// Get immutable reference to the metrics
196 pub fn metrics(&self) -> &FlowTransactionMetrics {
197 &self.metrics
198 }
199
200 /// Get access to the catalog for reading metadata
201 pub(crate) fn catalog(&self) -> &reifydb_catalog::MaterializedCatalog {
202 &self.catalog
203 }
204}