reifydb_sub_flow/transaction/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_catalog::transaction::MaterializedCatalogTransaction;
5use reifydb_core::{CommitVersion, interface::ViewDef, value::encoded::EncodedValues};
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_type::RowNumber;
8use tracing::instrument;
9
10mod commit;
11mod iter_range;
12mod metrics;
13mod pending;
14mod read;
15mod state;
16#[cfg(test)]
17mod utils;
18mod write;
19
20pub use metrics::FlowTransactionMetrics;
21pub use pending::{Pending, PendingWrites};
22use reifydb_core::interface::{MultiVersionQueryTransaction, MultiVersionTransaction};
23use reifydb_transaction::multi::StandardQueryTransaction;
24
25/// Represents a pending view operation that needs interceptor calls at commit time.
26#[derive(Debug, Clone)]
27pub enum ViewPending {
28	/// Insert a new row into the view
29	Insert {
30		view: ViewDef,
31		row_number: RowNumber,
32		row: EncodedValues,
33	},
34	/// Update an existing row in the view (remove old, insert new)
35	Update {
36		view: ViewDef,
37		old_row_number: RowNumber,
38		new_row_number: RowNumber,
39		row: EncodedValues,
40	},
41	/// Remove a row from the view
42	Remove {
43		view: ViewDef,
44		row_number: RowNumber,
45	},
46}
47
48/// A transaction wrapper for parallel flow processing with snapshot isolation.
49///
50/// # Architecture
51///
52/// FlowTransaction enables parallel processing of independent data flows by providing:
53/// 1. **Snapshot reads** - via a wrapped StandardQueryTransaction reading at a fixed version
54/// 2. **Isolated writes** - via a local PendingWrites buffer unique to this transaction
55/// 3. **Sequential merge** - buffered writes are applied back to parent at commit time
56///
57/// # Read Path
58///
59/// All reads go through the wrapped `query` transaction, which provides a consistent
60/// snapshot view of the database at `version`. The query transaction is read-only and
61/// cannot modify the underlying storage.
62///
63/// For keys that have been modified locally:
64/// - Reads check the `pending` buffer first
65/// - If found there (or marked for removal), return the local value
66/// - Otherwise, delegate to the `query` transaction for the snapshot value
67///
68/// # Write Path
69///
70/// All writes (`set`, `remove`) go to the local `pending` buffer only:
71/// - Writes are NOT visible to the parent transaction
72/// - Writes are NOT visible to other FlowTransactions
73/// - Writes are NOT persisted to storage
74///
75/// The pending buffer is committed back to the parent transaction via `commit()`.
76///
77/// # Parallel Processing Pattern
78///
79/// ```ignore
80/// let mut parent = engine.begin_command()?;
81///
82/// // Create multiple FlowTransactions from shared parent reference
83/// // Each uses the CDC event version for proper snapshot isolation
84/// let flow_txns: Vec<FlowTransaction> = cdc_events
85///     .iter()
86///     .map(|cdc| FlowTransaction::new(&parent, cdc.version))
87///     .collect();
88///
89/// // Process in parallel (e.g., using rayon)
90/// let results: Vec<FlowTransaction> = flow_txns
91///     .into_par_iter()
92///     .map(|mut txn| {
93///         // Process flow, making reads and writes
94///         process_flow(&mut txn)?;
95///         Ok(txn)
96///     })
97///     .collect()?;
98///
99/// // Sequential merge back to parent
100/// for flow_txn in results {
101///     flow_txn.commit(&mut parent)?;
102/// }
103///
104/// // Atomic commit of all changes
105/// parent.commit()?;
106/// ```
107///
108/// # Thread Safety
109///
110/// FlowTransaction implements `Send` because:
111/// - `version` is Copy
112/// - `query` wraps Arc-based multi-version transaction (Send + Sync)
113/// - `pending` and `metrics` are owned and not shared
114///
115/// This allows FlowTransactions to be moved to worker threads for parallel processing.
116pub struct FlowTransaction {
117	/// CDC event version for snapshot isolation.
118	///
119	/// This is the version at which the CDC event was generated, NOT the parent transaction version.
120	/// Source data reads see the database state as of this CDC version.
121	/// This guarantees proper snapshot isolation - the flow processes data as it existed when
122	/// the CDC event was created, regardless of concurrent modifications.
123	version: CommitVersion,
124
125	/// Local write buffer for uncommitted changes.
126	///
127	/// Stores all `set()` and `remove()` operations made by this transaction.
128	/// NOT shared with other FlowTransactions. Changes are invisible until commit().
129	pending: PendingWrites,
130
131	/// Pending view operations that require interceptor calls at commit time.
132	///
133	/// View operations are tracked separately because they need to invoke
134	/// ViewInterceptor::pre_insert/post_insert etc. on the parent transaction
135	/// when committing. The parent transaction has access to the interceptor chains,
136	/// while FlowTransaction does not.
137	view_pending: Vec<ViewPending>,
138
139	/// Performance metrics tracking reads, writes, and other operations.
140	metrics: FlowTransactionMetrics,
141
142	/// Read-only query transaction for accessing source data at CDC snapshot version.
143	///
144	/// Provides snapshot reads at `version`. Used for reading source tables/views
145	/// to ensure consistent view of the data being processed by the flow.
146	source_query: StandardQueryTransaction,
147
148	/// Read-only query transaction for accessing flow state at latest version.
149	///
150	/// Reads at the latest committed version. Used for reading flow state
151	/// (join tables, distinct values, counters) that must be visible across
152	/// all CDC versions to maintain continuity.
153	state_query: StandardQueryTransaction,
154
155	/// Catalog for metadata access (cloned from parent, Arc-based so cheap)
156	catalog: reifydb_catalog::MaterializedCatalog,
157}
158
159impl FlowTransaction {
160	/// Create a new FlowTransaction from a parent transaction at a specific CDC version
161	///
162	/// Takes a shared reference to the parent, allowing multiple FlowTransactions
163	/// to be created for parallel processing.
164	///
165	/// # Parameters
166	/// * `parent` - The parent command transaction to derive from
167	/// * `version` - The CDC event version for snapshot isolation (NOT parent.version())
168	#[instrument(level = "debug", skip(parent), fields(version = version.0))]
169	pub fn new(parent: &StandardCommandTransaction, version: CommitVersion) -> Self {
170		let mut source_query = parent.multi.begin_query().unwrap();
171		source_query.read_as_of_version_inclusive(version).unwrap();
172
173		let state_query = parent.multi.begin_query().unwrap();
174		Self {
175			version,
176			pending: PendingWrites::new(),
177			view_pending: Vec::new(),
178			metrics: FlowTransactionMetrics::new(),
179			source_query,
180			state_query,
181			catalog: parent.catalog().clone(),
182		}
183	}
184
185	/// Get the version this transaction is reading at
186	pub fn version(&self) -> CommitVersion {
187		self.version
188	}
189
190	/// Update the transaction to read at a new version
191	pub fn update_version(&mut self, new_version: CommitVersion) -> crate::Result<()> {
192		self.version = new_version;
193		self.source_query.read_as_of_version_inclusive(new_version)?;
194		Ok(())
195	}
196
197	/// Get immutable reference to the metrics
198	pub fn metrics(&self) -> &FlowTransactionMetrics {
199		&self.metrics
200	}
201
202	/// Get access to the catalog for reading metadata
203	pub(crate) fn catalog(&self) -> &reifydb_catalog::MaterializedCatalog {
204		&self.catalog
205	}
206}