reifydb_sub_flow/transaction/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_catalog::transaction::MaterializedCatalogTransaction;
5use reifydb_core::CommitVersion;
6use reifydb_engine::StandardCommandTransaction;
7use tracing::instrument;
8
9mod commit;
10mod iter_range;
11mod metrics;
12mod pending;
13mod read;
14mod state;
15#[cfg(test)]
16mod utils;
17mod write;
18
19pub use metrics::FlowTransactionMetrics;
20pub use pending::{Pending, PendingWrites};
21use reifydb_transaction::multi::StandardQueryTransaction;
22
23/// A transaction wrapper for parallel flow processing with snapshot isolation.
24///
25/// # Architecture
26///
27/// FlowTransaction enables parallel processing of independent data flows by providing:
28/// 1. **Snapshot reads** - via a wrapped StandardQueryTransaction reading at a fixed version
29/// 2. **Isolated writes** - via a local PendingWrites buffer unique to this transaction
30/// 3. **Sequential merge** - buffered writes are applied back to parent at commit time
31///
32/// # Read Path
33///
34/// All reads go through the wrapped `query` transaction, which provides a consistent
35/// snapshot view of the database at `version`. The query transaction is read-only and
36/// cannot modify the underlying storage.
37///
38/// For keys that have been modified locally:
39/// - Reads check the `pending` buffer first
40/// - If found there (or marked for removal), return the local value
41/// - Otherwise, delegate to the `query` transaction for the snapshot value
42///
43/// # Write Path
44///
45/// All writes (`set`, `remove`) go to the local `pending` buffer only:
46/// - Writes are NOT visible to the parent transaction
47/// - Writes are NOT visible to other FlowTransactions
48/// - Writes are NOT persisted to storage
49///
50/// The pending buffer is committed back to the parent transaction via `commit()`.
51///
52/// # Parallel Processing Pattern
53///
54/// ```ignore
55/// let mut parent = engine.begin_command()?;
56///
57/// // Create multiple FlowTransactions from shared parent reference
58/// // Each uses the CDC event version for proper snapshot isolation
59/// let flow_txns: Vec<FlowTransaction> = cdc_events
60///     .iter()
61///     .map(|cdc| FlowTransaction::new(&parent, cdc.version))
62///     .collect();
63///
64/// // Process in parallel (e.g., using rayon)
65/// let results: Vec<FlowTransaction> = flow_txns
66///     .into_par_iter()
67///     .map(|mut txn| {
68///         // Process flow, making reads and writes
69///         process_flow(&mut txn)?;
70///         Ok(txn)
71///     })
72///     .collect()?;
73///
74/// // Sequential merge back to parent
75/// for flow_txn in results {
76///     flow_txn.commit(&mut parent)?;
77/// }
78///
79/// // Atomic commit of all changes
80/// parent.commit()?;
81/// ```
82///
83/// # Thread Safety
84///
85/// FlowTransaction implements `Send` because:
86/// - `version` is Copy
87/// - `query` wraps Arc-based multi-version transaction (Send + Sync)
88/// - `pending` and `metrics` are owned and not shared
89///
90/// This allows FlowTransactions to be moved to worker threads for parallel processing.
91pub struct FlowTransaction {
92	/// CDC event version for snapshot isolation.
93	///
94	/// This is the version at which the CDC event was generated, NOT the parent transaction version.
95	/// Source data reads see the database state as of this CDC version.
96	/// This guarantees proper snapshot isolation - the flow processes data as it existed when
97	/// the CDC event was created, regardless of concurrent modifications.
98	version: CommitVersion,
99
100	/// Local write buffer for uncommitted changes.
101	///
102	/// Stores all `set()` and `remove()` operations made by this transaction.
103	/// NOT shared with other FlowTransactions. Changes are invisible until commit().
104	pending: PendingWrites,
105
106	/// Performance metrics tracking reads, writes, and other operations.
107	metrics: FlowTransactionMetrics,
108
109	/// Read-only query transaction for accessing source data at CDC snapshot version.
110	///
111	/// Provides snapshot reads at `version`. Used for reading source tables/views
112	/// to ensure consistent view of the data being processed by the flow.
113	source_query: StandardQueryTransaction,
114
115	/// Read-only query transaction for accessing flow state at latest version.
116	///
117	/// Reads at the latest committed version. Used for reading flow state
118	/// (join tables, distinct values, counters) that must be visible across
119	/// all CDC versions to maintain continuity.
120	state_query: StandardQueryTransaction,
121
122	/// Catalog for metadata access (cloned from parent, Arc-based so cheap)
123	catalog: reifydb_catalog::MaterializedCatalog,
124}
125
126impl FlowTransaction {
127	/// Create a new FlowTransaction from a parent transaction at a specific CDC version
128	///
129	/// Takes a shared reference to the parent, allowing multiple FlowTransactions
130	/// to be created for parallel processing.
131	///
132	/// # Parameters
133	/// * `parent` - The parent command transaction to derive from
134	/// * `version` - The CDC event version for snapshot isolation (NOT parent.version())
135	#[instrument(name = "flow::transaction::new", level = "debug", skip(parent), fields(version = version.0))]
136	pub async fn new(parent: &StandardCommandTransaction, version: CommitVersion) -> Self {
137		let mut source_query = parent.multi.begin_query().await.unwrap();
138		source_query.read_as_of_version_inclusive(version);
139
140		let state_query = parent.multi.begin_query().await.unwrap();
141		Self {
142			version,
143			pending: PendingWrites::new(),
144			metrics: FlowTransactionMetrics::new(),
145			source_query,
146			state_query,
147			catalog: parent.catalog().clone(),
148		}
149	}
150
151	/// Get the version this transaction is reading at
152	pub fn version(&self) -> CommitVersion {
153		self.version
154	}
155
156	/// Update the transaction to read at a new version
157	pub async fn update_version(&mut self, new_version: CommitVersion) {
158		self.version = new_version;
159		self.source_query.read_as_of_version_inclusive(new_version);
160	}
161
162	/// Get immutable reference to the metrics
163	pub fn metrics(&self) -> &FlowTransactionMetrics {
164		&self.metrics
165	}
166
167	/// Get access to the catalog for reading metadata
168	pub(crate) fn catalog(&self) -> &reifydb_catalog::MaterializedCatalog {
169		&self.catalog
170	}
171}