reifydb_sub_flow/transaction/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::CommitVersion;
5use reifydb_engine::StandardCommandTransaction;
6
7mod commit;
8mod iter_range;
9mod metrics;
10mod pending;
11mod read;
12mod state;
13#[cfg(test)]
14mod test_utils;
15mod write;
16
17pub use metrics::FlowTransactionMetrics;
18pub use pending::{Pending, PendingWrites};
19use reifydb_core::interface::{MultiVersionQueryTransaction, MultiVersionTransaction};
20use reifydb_transaction::multi::StandardQueryTransaction;
21
22/// A transaction wrapper for parallel flow processing with snapshot isolation.
23///
24/// # Architecture
25///
26/// FlowTransaction enables parallel processing of independent data flows by providing:
27/// 1. **Snapshot reads** - via a wrapped StandardQueryTransaction reading at a fixed version
28/// 2. **Isolated writes** - via a local PendingWrites buffer unique to this transaction
29/// 3. **Sequential merge** - buffered writes are applied back to parent at commit time
30///
31/// # Read Path
32///
33/// All reads go through the wrapped `query` transaction, which provides a consistent
34/// snapshot view of the database at `version`. The query transaction is read-only and
35/// cannot modify the underlying storage.
36///
37/// For keys that have been modified locally:
38/// - Reads check the `pending` buffer first
39/// - If found there (or marked for removal), return the local value
40/// - Otherwise, delegate to the `query` transaction for the snapshot value
41///
42/// # Write Path
43///
44/// All writes (`set`, `remove`) go to the local `pending` buffer only:
45/// - Writes are NOT visible to the parent transaction
46/// - Writes are NOT visible to other FlowTransactions
47/// - Writes are NOT persisted to storage
48///
49/// The pending buffer is committed back to the parent transaction via `commit()`.
50///
51/// # Parallel Processing Pattern
52///
53/// ```ignore
54/// let mut parent = engine.begin_command()?;
55///
56/// // Create multiple FlowTransactions from shared parent reference
57/// // Each uses the CDC event version for proper snapshot isolation
58/// let flow_txns: Vec<FlowTransaction> = cdc_events
59///     .iter()
60///     .map(|cdc| FlowTransaction::new(&parent, cdc.version))
61///     .collect();
62///
63/// // Process in parallel (e.g., using rayon)
64/// let results: Vec<FlowTransaction> = flow_txns
65///     .into_par_iter()
66///     .map(|mut txn| {
67///         // Process flow, making reads and writes
68///         process_flow(&mut txn)?;
69///         Ok(txn)
70///     })
71///     .collect()?;
72///
73/// // Sequential merge back to parent
74/// for flow_txn in results {
75///     flow_txn.commit(&mut parent)?;
76/// }
77///
78/// // Atomic commit of all changes
79/// parent.commit()?;
80/// ```
81///
82/// # Thread Safety
83///
84/// FlowTransaction implements `Send` because:
85/// - `version` is Copy
86/// - `query` wraps Arc-based multi-version transaction (Send + Sync)
87/// - `pending` and `metrics` are owned and not shared
88///
89/// This allows FlowTransactions to be moved to worker threads for parallel processing.
90pub struct FlowTransaction {
91	/// CDC event version for snapshot isolation.
92	///
93	/// This is the version at which the CDC event was generated, NOT the parent transaction version.
94	/// All reads through the query transaction see the database state as of this CDC version.
95	/// This guarantees proper snapshot isolation - the flow processes data as it existed when
96	/// the CDC event was created, regardless of concurrent modifications.
97	version: CommitVersion,
98
99	/// Local write buffer for uncommitted changes.
100	///
101	/// Stores all `set()` and `remove()` operations made by this transaction.
102	/// NOT shared with other FlowTransactions. Changes are invisible until commit().
103	pending: PendingWrites,
104
105	/// Performance metrics tracking reads, writes, and other operations.
106	metrics: FlowTransactionMetrics,
107
108	/// Read-only query transaction for accessing multi-version storage.
109	///
110	/// Provides snapshot reads at `version`. This is the primary read path for data
111	/// not in the `pending` buffer. The query transaction is configured at construction
112	/// time to read at a specific version and cannot be modified.
113	query: StandardQueryTransaction,
114}
115
116// SAFETY: FlowTransaction can be sent across threads because:
117// - version: CommitVersion is Copy (u64 wrapper)
118// - query: StandardQueryTransaction wraps Arc-based multi-version storage (Send + Sync)
119// - pending: PendingWrites is a BTreeMap owned by this transaction (Send)
120// - metrics: FlowTransactionMetrics contains primitive counters (Send)
121//
122// This enables parallel flow processing where each FlowTransaction is moved to a
123// worker thread via rayon or similar thread pool.
124unsafe impl Send for FlowTransaction {}
125
126impl FlowTransaction {
127	/// Create a new FlowTransaction from a parent transaction at a specific CDC version
128	///
129	/// Takes a shared reference to the parent, allowing multiple FlowTransactions
130	/// to be created for parallel processing.
131	///
132	/// # Parameters
133	/// * `parent` - The parent command transaction to derive from
134	/// * `version` - The CDC event version for snapshot isolation (NOT parent.version())
135	pub fn new(parent: &StandardCommandTransaction, version: CommitVersion) -> Self {
136		let mut query = parent.multi.begin_query().unwrap();
137		query.read_as_of_version_inclusive(version).unwrap();
138
139		Self {
140			version,
141			pending: PendingWrites::new(),
142			metrics: FlowTransactionMetrics::new(),
143			query,
144		}
145	}
146
147	/// Get the version this transaction is reading at
148	pub fn version(&self) -> CommitVersion {
149		self.version
150	}
151
152	/// Update the transaction to read at a new version
153	///
154	/// This should be called after commit() and before processing the next unit of work.
155	/// The pending writes buffer is expected to be already cleared by the previous commit().
156	pub fn update_version(&mut self, new_version: CommitVersion) -> crate::Result<()> {
157		// Update the version field
158		self.version = new_version;
159
160		// Update the query transaction's read version
161		self.query.read_as_of_version_inclusive(new_version)?;
162
163		// Note: pending is already cleared by commit()
164		// Metrics are kept cumulative across units
165
166		Ok(())
167	}
168
169	/// Get immutable reference to the metrics
170	pub fn metrics(&self) -> &FlowTransactionMetrics {
171		&self.metrics
172	}
173}