Skip to main content

reifydb_sub_flow/transaction/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::mem;
5
6use pending::{Pending, PendingWrite};
7use reifydb_catalog::catalog::Catalog;
8use reifydb_core::{common::CommitVersion, interface::change::Change, testing::TestingContext};
9use reifydb_transaction::{
10	interceptor::{
11		WithInterceptors,
12		chain::InterceptorChain as Chain,
13		interceptors::Interceptors,
14		namespace::{
15			NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
16			NamespacePreUpdateInterceptor,
17		},
18		ringbuffer::{
19			RingBufferPostDeleteInterceptor, RingBufferPostInsertInterceptor,
20			RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
21			RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor,
22		},
23		ringbuffer_def::{
24			RingBufferDefPostCreateInterceptor, RingBufferDefPostUpdateInterceptor,
25			RingBufferDefPreDeleteInterceptor, RingBufferDefPreUpdateInterceptor,
26		},
27		table::{
28			TablePostDeleteInterceptor, TablePostInsertInterceptor, TablePostUpdateInterceptor,
29			TablePreDeleteInterceptor, TablePreInsertInterceptor, TablePreUpdateInterceptor,
30		},
31		table_def::{
32			TableDefPostCreateInterceptor, TableDefPostUpdateInterceptor, TableDefPreDeleteInterceptor,
33			TableDefPreUpdateInterceptor,
34		},
35		transaction::{PostCommitInterceptor, PreCommitInterceptor},
36		view::{
37			ViewPostDeleteInterceptor, ViewPostInsertInterceptor, ViewPostUpdateInterceptor,
38			ViewPreDeleteInterceptor, ViewPreInsertInterceptor, ViewPreUpdateInterceptor,
39		},
40		view_def::{
41			ViewDefPostCreateInterceptor, ViewDefPostUpdateInterceptor, ViewDefPreDeleteInterceptor,
42			ViewDefPreUpdateInterceptor,
43		},
44	},
45	multi::transaction::read::MultiReadTransaction,
46	transaction::admin::AdminTransaction,
47};
48use tracing::instrument;
49
50pub mod pending;
51pub mod range;
52pub mod read;
53pub mod state;
54pub mod write;
55
56/// A transaction wrapper for flow processing with dual-version read semantics.
57///
58/// # Architecture
59///
60/// FlowTransaction provides **dual-version reads** critical for stateful flow processing:
61/// 1. **Source data** - Read at CDC event version (snapshot isolation)
62/// 2. **Flow state** - Read at latest version (state continuity across CDC events)
63/// 3. **Isolated writes** - Local PendingWrites buffer returned to caller
64///
65/// This dual-version approach allows stateful operators (joins, aggregates, distinct) to:
66/// - Process source data at a consistent snapshot (the CDC event version)
67/// - Access their own state at the latest version to maintain continuity
68///
69/// # Dual-Version Read Routing
70///
71/// Reads are automatically routed to the correct query transaction based on key type:
72///
73/// ```text
74/// ┌─────────────────┐
75/// │  FlowTransaction│
76/// └────────┬────────┘
77///          │
78///          ├──► pending (flow-generated writes)
79///          │
80///          ├──► variant
81///          │    ├─ Deferred: skip
82///          │    └─ Transactional { base_pending }: check base_pending
83///          │
84///          ├──► primitive_query (at CDC version)
85///          │    - Source tables / views / regular data
86///          │
87///          └──► state_query (at latest version)
88///               - FlowNodeState / FlowNodeInternalState
89/// ```
90///
91/// # Construction
92///
93/// Use named constructors to enforce correct initialization:
94/// - [`FlowTransaction::deferred`] — CDC path (no base pending)
95/// - [`FlowTransaction::transactional`] — inline pre-commit path (with base pending)
96///
97/// # Write Path
98///
99/// All writes (`set`, `remove`) go to the local `pending` buffer:
100/// - Reads check pending buffer first, then delegate to query transactions
101/// - Pending writes are extracted via [`FlowTransaction::take_pending`]
102///
103/// # Thread Safety
104///
105/// FlowTransaction is Send because all fields are either Copy, owned, or
106pub enum FlowTransaction {
107	/// CDC-driven async flow processing.
108	/// Reads only from committed storage + flow pending writes.
109	Deferred {
110		version: CommitVersion,
111		pending: Pending,
112		primitive_query: MultiReadTransaction,
113		state_query: MultiReadTransaction,
114		catalog: Catalog,
115		interceptors: Interceptors,
116		testing: Option<TestingContext>,
117	},
118
119	/// Inline flow processing within a committing transaction.
120	/// Can additionally read uncommitted writes from the parent transaction.
121	Transactional {
122		version: CommitVersion,
123		pending: Pending,
124		/// Read-only snapshot of the committing transaction's KV writes.
125		base_pending: Pending,
126		primitive_query: MultiReadTransaction,
127		state_query: MultiReadTransaction,
128		catalog: Catalog,
129		interceptors: Interceptors,
130		testing: Option<TestingContext>,
131	},
132}
133
134impl FlowTransaction {
135	/// Create a deferred (CDC) FlowTransaction from a parent transaction.
136	///
137	/// Used by the async worker path. Reads only from committed storage +
138	/// flow-generated pending writes — no base pending from a parent transaction.
139	#[instrument(name = "flow::transaction::deferred", level = "debug", skip(parent, catalog, interceptors), fields(version = version.0))]
140	pub fn deferred(
141		parent: &AdminTransaction,
142		version: CommitVersion,
143		catalog: Catalog,
144		interceptors: Interceptors,
145	) -> Self {
146		let mut primitive_query = parent.multi.begin_query().unwrap();
147		primitive_query.read_as_of_version_inclusive(version);
148
149		let state_query = parent.multi.begin_query().unwrap();
150		Self::Deferred {
151			version,
152			pending: Pending::new(),
153			primitive_query,
154			state_query,
155			catalog,
156			interceptors,
157			testing: None,
158		}
159	}
160
161	/// Create a deferred (CDC) FlowTransaction from pre-built parts.
162	///
163	/// Used by the worker actor which creates its own query transactions.
164	pub fn deferred_from_parts(
165		version: CommitVersion,
166		pending: Pending,
167		primitive_query: MultiReadTransaction,
168		state_query: MultiReadTransaction,
169		catalog: Catalog,
170		interceptors: Interceptors,
171	) -> Self {
172		Self::Deferred {
173			version,
174			pending,
175			primitive_query,
176			state_query,
177			catalog,
178			interceptors,
179			testing: None,
180		}
181	}
182
183	/// Create a transactional (inline) FlowTransaction.
184	///
185	/// Used by the pre-commit interceptor path. `base_pending` is a read-only
186	/// snapshot of the committing transaction's KV writes so that flow operators
187	/// can see uncommitted row data.
188	pub fn transactional(
189		version: CommitVersion,
190		pending: Pending,
191		base_pending: Pending,
192		primitive_query: MultiReadTransaction,
193		state_query: MultiReadTransaction,
194		catalog: Catalog,
195		interceptors: Interceptors,
196		testing: Option<TestingContext>,
197	) -> Self {
198		Self::Transactional {
199			version,
200			pending,
201			base_pending,
202			primitive_query,
203			state_query,
204			catalog,
205			interceptors,
206			testing,
207		}
208	}
209
210	/// Get the transaction version.
211	pub fn version(&self) -> CommitVersion {
212		match self {
213			Self::Deferred {
214				version,
215				..
216			} => *version,
217			Self::Transactional {
218				version,
219				..
220			} => *version,
221		}
222	}
223
224	/// Extract pending writes, replacing them with an empty buffer.
225	pub fn take_pending(&mut self) -> Pending {
226		match self {
227			Self::Deferred {
228				pending,
229				..
230			} => mem::take(pending),
231			Self::Transactional {
232				pending,
233				..
234			} => mem::take(pending),
235		}
236	}
237
238	/// Get a reference to the pending writes.
239	#[cfg(test)]
240	pub fn pending(&self) -> &Pending {
241		match self {
242			Self::Deferred {
243				pending,
244				..
245			} => pending,
246			Self::Transactional {
247				pending,
248				..
249			} => pending,
250		}
251	}
252
253	/// Drain all generated view changes, returning them.
254	pub fn take_view_changes(&mut self) -> pending::ViewChanges {
255		match self {
256			Self::Deferred {
257				pending,
258				..
259			} => pending.take_view_changes(),
260			Self::Transactional {
261				pending,
262				..
263			} => pending.take_view_changes(),
264		}
265	}
266
267	/// Append a view change (used by `SinkViewOperator`).
268	pub fn push_view_change(&mut self, change: Change) {
269		match self {
270			Self::Deferred {
271				pending,
272				..
273			} => pending.push_view_change(change),
274			Self::Transactional {
275				pending,
276				..
277			} => pending.push_view_change(change),
278		}
279	}
280
281	/// Update the transaction to read at a new version
282	pub fn update_version(&mut self, new_version: CommitVersion) {
283		match self {
284			Self::Deferred {
285				version,
286				primitive_query,
287				..
288			} => {
289				*version = new_version;
290				primitive_query.read_as_of_version_inclusive(new_version);
291			}
292			Self::Transactional {
293				version,
294				primitive_query,
295				..
296			} => {
297				*version = new_version;
298				primitive_query.read_as_of_version_inclusive(new_version);
299			}
300		}
301	}
302
303	/// Get access to the catalog for reading metadata
304	pub fn catalog(&self) -> &Catalog {
305		match self {
306			Self::Deferred {
307				catalog,
308				..
309			} => catalog,
310			Self::Transactional {
311				catalog,
312				..
313			} => catalog,
314		}
315	}
316
317	/// Get mutable access to the testing context (if active).
318	pub fn testing_mut(&mut self) -> Option<&mut TestingContext> {
319		match self {
320			Self::Deferred {
321				testing,
322				..
323			} => testing.as_mut(),
324			Self::Transactional {
325				testing,
326				..
327			} => testing.as_mut(),
328		}
329	}
330
331	/// Extract the testing context, replacing it with `None`.
332	pub fn take_testing(&mut self) -> Option<TestingContext> {
333		match self {
334			Self::Deferred {
335				testing,
336				..
337			} => testing.take(),
338			Self::Transactional {
339				testing,
340				..
341			} => testing.take(),
342		}
343	}
344}
345
346macro_rules! interceptor_method {
347	($method:ident, $field:ident, $trait_name:ident) => {
348		fn $method(&mut self) -> &mut Chain<dyn $trait_name + Send + Sync> {
349			match self {
350				Self::Deferred {
351					interceptors,
352					..
353				} => &mut interceptors.$field,
354				Self::Transactional {
355					interceptors,
356					..
357				} => &mut interceptors.$field,
358			}
359		}
360	};
361}
362
363impl WithInterceptors for FlowTransaction {
364	interceptor_method!(table_pre_insert_interceptors, table_pre_insert, TablePreInsertInterceptor);
365	interceptor_method!(table_post_insert_interceptors, table_post_insert, TablePostInsertInterceptor);
366	interceptor_method!(table_pre_update_interceptors, table_pre_update, TablePreUpdateInterceptor);
367	interceptor_method!(table_post_update_interceptors, table_post_update, TablePostUpdateInterceptor);
368	interceptor_method!(table_pre_delete_interceptors, table_pre_delete, TablePreDeleteInterceptor);
369	interceptor_method!(table_post_delete_interceptors, table_post_delete, TablePostDeleteInterceptor);
370
371	interceptor_method!(ringbuffer_pre_insert_interceptors, ringbuffer_pre_insert, RingBufferPreInsertInterceptor);
372	interceptor_method!(
373		ringbuffer_post_insert_interceptors,
374		ringbuffer_post_insert,
375		RingBufferPostInsertInterceptor
376	);
377	interceptor_method!(ringbuffer_pre_update_interceptors, ringbuffer_pre_update, RingBufferPreUpdateInterceptor);
378	interceptor_method!(
379		ringbuffer_post_update_interceptors,
380		ringbuffer_post_update,
381		RingBufferPostUpdateInterceptor
382	);
383	interceptor_method!(ringbuffer_pre_delete_interceptors, ringbuffer_pre_delete, RingBufferPreDeleteInterceptor);
384	interceptor_method!(
385		ringbuffer_post_delete_interceptors,
386		ringbuffer_post_delete,
387		RingBufferPostDeleteInterceptor
388	);
389
390	interceptor_method!(pre_commit_interceptors, pre_commit, PreCommitInterceptor);
391	interceptor_method!(post_commit_interceptors, post_commit, PostCommitInterceptor);
392
393	interceptor_method!(namespace_post_create_interceptors, namespace_post_create, NamespacePostCreateInterceptor);
394	interceptor_method!(namespace_pre_update_interceptors, namespace_pre_update, NamespacePreUpdateInterceptor);
395	interceptor_method!(namespace_post_update_interceptors, namespace_post_update, NamespacePostUpdateInterceptor);
396	interceptor_method!(namespace_pre_delete_interceptors, namespace_pre_delete, NamespacePreDeleteInterceptor);
397
398	interceptor_method!(table_def_post_create_interceptors, table_def_post_create, TableDefPostCreateInterceptor);
399	interceptor_method!(table_def_pre_update_interceptors, table_def_pre_update, TableDefPreUpdateInterceptor);
400	interceptor_method!(table_def_post_update_interceptors, table_def_post_update, TableDefPostUpdateInterceptor);
401	interceptor_method!(table_def_pre_delete_interceptors, table_def_pre_delete, TableDefPreDeleteInterceptor);
402
403	interceptor_method!(view_pre_insert_interceptors, view_pre_insert, ViewPreInsertInterceptor);
404	interceptor_method!(view_post_insert_interceptors, view_post_insert, ViewPostInsertInterceptor);
405	interceptor_method!(view_pre_update_interceptors, view_pre_update, ViewPreUpdateInterceptor);
406	interceptor_method!(view_post_update_interceptors, view_post_update, ViewPostUpdateInterceptor);
407	interceptor_method!(view_pre_delete_interceptors, view_pre_delete, ViewPreDeleteInterceptor);
408	interceptor_method!(view_post_delete_interceptors, view_post_delete, ViewPostDeleteInterceptor);
409
410	interceptor_method!(view_def_post_create_interceptors, view_def_post_create, ViewDefPostCreateInterceptor);
411	interceptor_method!(view_def_pre_update_interceptors, view_def_pre_update, ViewDefPreUpdateInterceptor);
412	interceptor_method!(view_def_post_update_interceptors, view_def_post_update, ViewDefPostUpdateInterceptor);
413	interceptor_method!(view_def_pre_delete_interceptors, view_def_pre_delete, ViewDefPreDeleteInterceptor);
414
415	interceptor_method!(
416		ringbuffer_def_post_create_interceptors,
417		ringbuffer_def_post_create,
418		RingBufferDefPostCreateInterceptor
419	);
420	interceptor_method!(
421		ringbuffer_def_pre_update_interceptors,
422		ringbuffer_def_pre_update,
423		RingBufferDefPreUpdateInterceptor
424	);
425	interceptor_method!(
426		ringbuffer_def_post_update_interceptors,
427		ringbuffer_def_post_update,
428		RingBufferDefPostUpdateInterceptor
429	);
430	interceptor_method!(
431		ringbuffer_def_pre_delete_interceptors,
432		ringbuffer_def_pre_delete,
433		RingBufferDefPreDeleteInterceptor
434	);
435}