Skip to main content

reifydb_sub_flow/transaction/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use pending::{Pending, PendingWrites};
5use reifydb_catalog::catalog::Catalog;
6use reifydb_core::common::CommitVersion;
7use reifydb_transaction::{
8	interceptor::{
9		WithInterceptors,
10		chain::InterceptorChain as Chain,
11		interceptors::Interceptors,
12		namespace_def::{
13			NamespaceDefPostCreateInterceptor, NamespaceDefPostUpdateInterceptor,
14			NamespaceDefPreDeleteInterceptor, NamespaceDefPreUpdateInterceptor,
15		},
16		ringbuffer::{
17			RingBufferPostDeleteInterceptor, RingBufferPostInsertInterceptor,
18			RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
19			RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor,
20		},
21		ringbuffer_def::{
22			RingBufferDefPostCreateInterceptor, RingBufferDefPostUpdateInterceptor,
23			RingBufferDefPreDeleteInterceptor, RingBufferDefPreUpdateInterceptor,
24		},
25		table::{
26			TablePostDeleteInterceptor, TablePostInsertInterceptor, TablePostUpdateInterceptor,
27			TablePreDeleteInterceptor, TablePreInsertInterceptor, TablePreUpdateInterceptor,
28		},
29		table_def::{
30			TableDefPostCreateInterceptor, TableDefPostUpdateInterceptor, TableDefPreDeleteInterceptor,
31			TableDefPreUpdateInterceptor,
32		},
33		transaction::{PostCommitInterceptor, PreCommitInterceptor},
34		view::{
35			ViewPostDeleteInterceptor, ViewPostInsertInterceptor, ViewPostUpdateInterceptor,
36			ViewPreDeleteInterceptor, ViewPreInsertInterceptor, ViewPreUpdateInterceptor,
37		},
38		view_def::{
39			ViewDefPostCreateInterceptor, ViewDefPostUpdateInterceptor, ViewDefPreDeleteInterceptor,
40			ViewDefPreUpdateInterceptor,
41		},
42	},
43	multi::transaction::read::MultiReadTransaction,
44	transaction::admin::AdminTransaction,
45};
46use tracing::instrument;
47
48pub mod pending;
49pub mod range;
50pub mod read;
51pub mod state;
52pub mod write;
53
54/// A transaction wrapper for flow processing with dual-version read semantics.
55///
56/// # Architecture
57///
58/// FlowTransaction provides **dual-version reads** critical for stateful flow processing:
59/// 1. **Source data** - Read at CDC event version (snapshot isolation)
60/// 2. **Flow state** - Read at latest version (state continuity across CDC events)
61/// 3. **Isolated writes** - Local PendingWrites buffer returned to caller
62///
63/// This dual-version approach allows stateful operators (joins, aggregates, distinct) to:
64/// - Process source data at a consistent snapshot (the CDC event version)
65/// - Access their own state at the latest version to maintain continuity
66///
67/// # Dual-Version Read Routing
68///
69/// Reads are automatically routed to the correct query transaction based on key type:
70///
71/// ```text
72/// ┌─────────────────┐
73/// │  FlowTransaction│
74/// └────────┬────────┘
75///          │
76///          ├──► primitive_query (at CDC version)
77///          │    - Source tables
78///          │    - Source views
79///          │    - Regular data
80///          │
81///          └──► state_query (at latest version)
82///               - FlowNodeState
83///               - FlowNodeInternalState
84///               - Stateful operator state
85/// ```
86///
87/// # Why Dual Versions Matter
88///
89/// ## Example: Join Operator
90/// ```ignore
91/// // CDC event arrives at version 100
92/// let mut flow_txn = FlowTransaction::new(&parent, CommitVersion(100));
93///
94/// // Join operator processes the event:
95/// // 1. Reads source table at version 100 (snapshot)
96/// let source_row = flow_txn.get(&source_key)?;
97///
98/// // 2. Reads join state at LATEST version (e.g., 150)
99/// //    This state contains results from ALL previous CDC events
100/// let join_state = flow_txn.state_get(node_id, &state_key)?;
101///
102/// // Without dual versions, join state would be stale at version 100
103/// ```
104///
105/// ## Example: Distinct Operator
106/// ```ignore
107/// // Maintains a set of seen values across ALL CDC events
108/// let seen = flow_txn.state_get(node_id, &value_key)?;
109///
110/// // If read at CDC version, would "forget" values seen in later events
111/// // Dual-version ensures we see ALL distinct values accumulated so far
112/// ```
113///
114/// # Current Usage Pattern
115///
116/// FlowTransaction is used in worker threads to process CDC batches:
117///
118/// ```ignore
119/// // In flow worker thread
120/// let primitive_query = engine.multi().begin_query_at_version(batch.version)?;
121/// let state_query = engine.multi().begin_query_at_version(state_version)?;
122///
123/// let mut txn = FlowTransaction {
124///     version: batch.version,
125///     pending: PendingWrites::new(),
126///     primitive_query,
127///     state_query,
128///     catalog: catalog.clone(),
129/// };
130///
131/// for change in batch.changes {
132///     flow_engine.process(&mut txn, change, flow_id)?;
133/// }
134///
135/// // Extract pending writes to merge into parent transaction
136/// let pending = txn.pending;
137/// ```
138///
139/// # Write Path
140///
141/// All writes (`set`, `remove`) go to the local `pending` buffer:
142/// - Reads check pending buffer first, then delegate to query transactions
143/// - Pending writes are extracted and applied to parent transaction by caller
144///
145/// # Thread Safety
146///
147/// FlowTransaction is Send because all fields are either Copy, owned, or
148pub struct FlowTransaction {
149	/// CDC event version for snapshot isolation.
150	///
151	/// This is the version at which the CDC event was generated, NOT the parent transaction version.
152	/// Source data reads see the database state as of this CDC version.
153	/// This guarantees proper snapshot isolation - the flow processes data as it existed when
154	/// the CDC event was created, regardless of concurrent modifications.
155	pub(crate) version: CommitVersion,
156
157	/// Local write buffer for pending changes.
158	///
159	/// Stores all `set()` and `remove()` operations made by this transaction.
160	/// Returned to caller for application to parent transaction.
161	pub(crate) pending: PendingWrites,
162
163	/// Read-only query transaction for accessing storage primitive data at CDC snapshot version.
164	///
165	/// Provides snapshot reads at `version`. Used for reading storage primitives tables/views
166	/// to ensure consistent view of the data being processed by the flow.
167	pub(crate) primitive_query: MultiReadTransaction,
168
169	/// Read-only query transaction for accessing flow state at latest version.
170	///
171	/// Reads at the latest committed version. Used for reading flow state
172	/// (join tables, distinct values, counters) that must be visible across
173	/// all CDC versions to maintain continuity.
174	pub(crate) state_query: MultiReadTransaction,
175
176	/// Catalog for metadata access (cloned from parent, Arc-based so cheap)
177	pub(crate) catalog: Catalog,
178
179	/// Interceptors for view data operations
180	pub(crate) interceptors: Interceptors,
181}
182
183impl FlowTransaction {
184	/// Create a new FlowTransaction from a parent transaction at a specific CDC version.
185	///
186	/// Creates dual query transactions:
187	/// - `primitive_query`: Reads at the specified CDC version (snapshot isolation)
188	/// - `state_query`: Reads at the latest version (state continuity)
189	///
190	/// # Parameters
191	/// * `parent` - The parent command transaction to derive from
192	/// * `version` - The CDC event version for snapshot isolation (NOT parent.version())
193	/// * `catalog` - The catalog for metadata access
194	#[instrument(name = "flow::transaction::new", level = "debug", skip(parent, catalog, interceptors), fields(version = version.0))]
195	pub fn new(
196		parent: &AdminTransaction,
197		version: CommitVersion,
198		catalog: Catalog,
199		interceptors: Interceptors,
200	) -> Self {
201		let mut primitive_query = parent.multi.begin_query().unwrap();
202		primitive_query.read_as_of_version_inclusive(version);
203
204		let state_query = parent.multi.begin_query().unwrap();
205		Self {
206			version,
207			pending: PendingWrites::new(),
208			primitive_query,
209			state_query,
210			catalog,
211			interceptors,
212		}
213	}
214
215	/// Update the transaction to read at a new version
216	pub fn update_version(&mut self, new_version: CommitVersion) {
217		self.version = new_version;
218		self.primitive_query.read_as_of_version_inclusive(new_version);
219	}
220
221	/// Get access to the catalog for reading metadata
222	pub(crate) fn catalog(&self) -> &Catalog {
223		&self.catalog
224	}
225}
226
227impl WithInterceptors for FlowTransaction {
228	fn table_pre_insert_interceptors(&mut self) -> &mut Chain<dyn TablePreInsertInterceptor + Send + Sync> {
229		&mut self.interceptors.table_pre_insert
230	}
231
232	fn table_post_insert_interceptors(&mut self) -> &mut Chain<dyn TablePostInsertInterceptor + Send + Sync> {
233		&mut self.interceptors.table_post_insert
234	}
235
236	fn table_pre_update_interceptors(&mut self) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync> {
237		&mut self.interceptors.table_pre_update
238	}
239
240	fn table_post_update_interceptors(&mut self) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync> {
241		&mut self.interceptors.table_post_update
242	}
243
244	fn table_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync> {
245		&mut self.interceptors.table_pre_delete
246	}
247
248	fn table_post_delete_interceptors(&mut self) -> &mut Chain<dyn TablePostDeleteInterceptor + Send + Sync> {
249		&mut self.interceptors.table_post_delete
250	}
251
252	fn ringbuffer_pre_insert_interceptors(
253		&mut self,
254	) -> &mut Chain<dyn RingBufferPreInsertInterceptor + Send + Sync> {
255		&mut self.interceptors.ringbuffer_pre_insert
256	}
257
258	fn ringbuffer_post_insert_interceptors(
259		&mut self,
260	) -> &mut Chain<dyn RingBufferPostInsertInterceptor + Send + Sync> {
261		&mut self.interceptors.ringbuffer_post_insert
262	}
263
264	fn ringbuffer_pre_update_interceptors(
265		&mut self,
266	) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
267		&mut self.interceptors.ringbuffer_pre_update
268	}
269
270	fn ringbuffer_post_update_interceptors(
271		&mut self,
272	) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
273		&mut self.interceptors.ringbuffer_post_update
274	}
275
276	fn ringbuffer_pre_delete_interceptors(
277		&mut self,
278	) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
279		&mut self.interceptors.ringbuffer_pre_delete
280	}
281
282	fn ringbuffer_post_delete_interceptors(
283		&mut self,
284	) -> &mut Chain<dyn RingBufferPostDeleteInterceptor + Send + Sync> {
285		&mut self.interceptors.ringbuffer_post_delete
286	}
287
288	fn pre_commit_interceptors(&mut self) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync> {
289		&mut self.interceptors.pre_commit
290	}
291
292	fn post_commit_interceptors(&mut self) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync> {
293		&mut self.interceptors.post_commit
294	}
295
296	fn namespace_def_post_create_interceptors(
297		&mut self,
298	) -> &mut Chain<dyn NamespaceDefPostCreateInterceptor + Send + Sync> {
299		&mut self.interceptors.namespace_def_post_create
300	}
301
302	fn namespace_def_pre_update_interceptors(
303		&mut self,
304	) -> &mut Chain<dyn NamespaceDefPreUpdateInterceptor + Send + Sync> {
305		&mut self.interceptors.namespace_def_pre_update
306	}
307
308	fn namespace_def_post_update_interceptors(
309		&mut self,
310	) -> &mut Chain<dyn NamespaceDefPostUpdateInterceptor + Send + Sync> {
311		&mut self.interceptors.namespace_def_post_update
312	}
313
314	fn namespace_def_pre_delete_interceptors(
315		&mut self,
316	) -> &mut Chain<dyn NamespaceDefPreDeleteInterceptor + Send + Sync> {
317		&mut self.interceptors.namespace_def_pre_delete
318	}
319
320	fn table_def_post_create_interceptors(
321		&mut self,
322	) -> &mut Chain<dyn TableDefPostCreateInterceptor + Send + Sync> {
323		&mut self.interceptors.table_def_post_create
324	}
325
326	fn table_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn TableDefPreUpdateInterceptor + Send + Sync> {
327		&mut self.interceptors.table_def_pre_update
328	}
329
330	fn table_def_post_update_interceptors(
331		&mut self,
332	) -> &mut Chain<dyn TableDefPostUpdateInterceptor + Send + Sync> {
333		&mut self.interceptors.table_def_post_update
334	}
335
336	fn table_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TableDefPreDeleteInterceptor + Send + Sync> {
337		&mut self.interceptors.table_def_pre_delete
338	}
339
340	fn view_pre_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPreInsertInterceptor + Send + Sync> {
341		&mut self.interceptors.view_pre_insert
342	}
343
344	fn view_post_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPostInsertInterceptor + Send + Sync> {
345		&mut self.interceptors.view_post_insert
346	}
347
348	fn view_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync> {
349		&mut self.interceptors.view_pre_update
350	}
351
352	fn view_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync> {
353		&mut self.interceptors.view_post_update
354	}
355
356	fn view_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync> {
357		&mut self.interceptors.view_pre_delete
358	}
359
360	fn view_post_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPostDeleteInterceptor + Send + Sync> {
361		&mut self.interceptors.view_post_delete
362	}
363
364	fn view_def_post_create_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostCreateInterceptor + Send + Sync> {
365		&mut self.interceptors.view_def_post_create
366	}
367
368	fn view_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreUpdateInterceptor + Send + Sync> {
369		&mut self.interceptors.view_def_pre_update
370	}
371
372	fn view_def_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostUpdateInterceptor + Send + Sync> {
373		&mut self.interceptors.view_def_post_update
374	}
375
376	fn view_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreDeleteInterceptor + Send + Sync> {
377		&mut self.interceptors.view_def_pre_delete
378	}
379
380	fn ringbuffer_def_post_create_interceptors(
381		&mut self,
382	) -> &mut Chain<dyn RingBufferDefPostCreateInterceptor + Send + Sync> {
383		&mut self.interceptors.ringbuffer_def_post_create
384	}
385
386	fn ringbuffer_def_pre_update_interceptors(
387		&mut self,
388	) -> &mut Chain<dyn RingBufferDefPreUpdateInterceptor + Send + Sync> {
389		&mut self.interceptors.ringbuffer_def_pre_update
390	}
391
392	fn ringbuffer_def_post_update_interceptors(
393		&mut self,
394	) -> &mut Chain<dyn RingBufferDefPostUpdateInterceptor + Send + Sync> {
395		&mut self.interceptors.ringbuffer_def_post_update
396	}
397
398	fn ringbuffer_def_pre_delete_interceptors(
399		&mut self,
400	) -> &mut Chain<dyn RingBufferDefPreDeleteInterceptor + Send + Sync> {
401		&mut self.interceptors.ringbuffer_def_pre_delete
402	}
403}