Skip to main content

reifydb_sub_flow/transaction/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::mem;
5
6use pending::{Pending, PendingWrite};
7use reifydb_catalog::catalog::Catalog;
8use reifydb_core::{common::CommitVersion, interface::change::Change};
9use reifydb_transaction::{
10	interceptor::{
11		WithInterceptors,
12		chain::InterceptorChain as Chain,
13		interceptors::Interceptors,
14		namespace_def::{
15			NamespaceDefPostCreateInterceptor, NamespaceDefPostUpdateInterceptor,
16			NamespaceDefPreDeleteInterceptor, NamespaceDefPreUpdateInterceptor,
17		},
18		ringbuffer::{
19			RingBufferPostDeleteInterceptor, RingBufferPostInsertInterceptor,
20			RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
21			RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor,
22		},
23		ringbuffer_def::{
24			RingBufferDefPostCreateInterceptor, RingBufferDefPostUpdateInterceptor,
25			RingBufferDefPreDeleteInterceptor, RingBufferDefPreUpdateInterceptor,
26		},
27		table::{
28			TablePostDeleteInterceptor, TablePostInsertInterceptor, TablePostUpdateInterceptor,
29			TablePreDeleteInterceptor, TablePreInsertInterceptor, TablePreUpdateInterceptor,
30		},
31		table_def::{
32			TableDefPostCreateInterceptor, TableDefPostUpdateInterceptor, TableDefPreDeleteInterceptor,
33			TableDefPreUpdateInterceptor,
34		},
35		transaction::{PostCommitInterceptor, PreCommitInterceptor},
36		view::{
37			ViewPostDeleteInterceptor, ViewPostInsertInterceptor, ViewPostUpdateInterceptor,
38			ViewPreDeleteInterceptor, ViewPreInsertInterceptor, ViewPreUpdateInterceptor,
39		},
40		view_def::{
41			ViewDefPostCreateInterceptor, ViewDefPostUpdateInterceptor, ViewDefPreDeleteInterceptor,
42			ViewDefPreUpdateInterceptor,
43		},
44	},
45	multi::transaction::read::MultiReadTransaction,
46	transaction::admin::AdminTransaction,
47};
48use tracing::instrument;
49
50pub mod pending;
51pub mod range;
52pub mod read;
53pub mod state;
54pub mod write;
55
56/// A transaction wrapper for flow processing with dual-version read semantics.
57///
58/// # Architecture
59///
60/// FlowTransaction provides **dual-version reads** critical for stateful flow processing:
61/// 1. **Source data** - Read at CDC event version (snapshot isolation)
62/// 2. **Flow state** - Read at latest version (state continuity across CDC events)
63/// 3. **Isolated writes** - Local PendingWrites buffer returned to caller
64///
65/// This dual-version approach allows stateful operators (joins, aggregates, distinct) to:
66/// - Process source data at a consistent snapshot (the CDC event version)
67/// - Access their own state at the latest version to maintain continuity
68///
69/// # Dual-Version Read Routing
70///
71/// Reads are automatically routed to the correct query transaction based on key type:
72///
73/// ```text
74/// ┌─────────────────┐
75/// │  FlowTransaction│
76/// └────────┬────────┘
77///          │
78///          ├──► pending (flow-generated writes)
79///          │
80///          ├──► variant
81///          │    ├─ Deferred: skip
82///          │    └─ Transactional { base_pending }: check base_pending
83///          │
84///          ├──► primitive_query (at CDC version)
85///          │    - Source tables / views / regular data
86///          │
87///          └──► state_query (at latest version)
88///               - FlowNodeState / FlowNodeInternalState
89/// ```
90///
91/// # Construction
92///
93/// Use named constructors to enforce correct initialization:
94/// - [`FlowTransaction::deferred`] — CDC path (no base pending)
95/// - [`FlowTransaction::transactional`] — inline pre-commit path (with base pending)
96///
97/// # Write Path
98///
99/// All writes (`set`, `remove`) go to the local `pending` buffer:
100/// - Reads check pending buffer first, then delegate to query transactions
101/// - Pending writes are extracted via [`FlowTransaction::take_pending`]
102///
103/// # Thread Safety
104///
105/// FlowTransaction is Send because all fields are either Copy, owned, or
106pub enum FlowTransaction {
107	/// CDC-driven async flow processing.
108	/// Reads only from committed storage + flow pending writes.
109	Deferred {
110		version: CommitVersion,
111		pending: Pending,
112		primitive_query: MultiReadTransaction,
113		state_query: MultiReadTransaction,
114		catalog: Catalog,
115		interceptors: Interceptors,
116	},
117
118	/// Inline flow processing within a committing transaction.
119	/// Can additionally read uncommitted writes from the parent transaction.
120	Transactional {
121		version: CommitVersion,
122		pending: Pending,
123		/// Read-only snapshot of the committing transaction's KV writes.
124		base_pending: Pending,
125		primitive_query: MultiReadTransaction,
126		state_query: MultiReadTransaction,
127		catalog: Catalog,
128		interceptors: Interceptors,
129	},
130}
131
132impl FlowTransaction {
133	/// Create a deferred (CDC) FlowTransaction from a parent transaction.
134	///
135	/// Used by the async worker path. Reads only from committed storage +
136	/// flow-generated pending writes — no base pending from a parent transaction.
137	#[instrument(name = "flow::transaction::deferred", level = "debug", skip(parent, catalog, interceptors), fields(version = version.0))]
138	pub fn deferred(
139		parent: &AdminTransaction,
140		version: CommitVersion,
141		catalog: Catalog,
142		interceptors: Interceptors,
143	) -> Self {
144		let mut primitive_query = parent.multi.begin_query().unwrap();
145		primitive_query.read_as_of_version_inclusive(version);
146
147		let state_query = parent.multi.begin_query().unwrap();
148		Self::Deferred {
149			version,
150			pending: Pending::new(),
151			primitive_query,
152			state_query,
153			catalog,
154			interceptors,
155		}
156	}
157
158	/// Create a deferred (CDC) FlowTransaction from pre-built parts.
159	///
160	/// Used by the worker actor which creates its own query transactions.
161	pub fn deferred_from_parts(
162		version: CommitVersion,
163		pending: Pending,
164		primitive_query: MultiReadTransaction,
165		state_query: MultiReadTransaction,
166		catalog: Catalog,
167		interceptors: Interceptors,
168	) -> Self {
169		Self::Deferred {
170			version,
171			pending,
172			primitive_query,
173			state_query,
174			catalog,
175			interceptors,
176		}
177	}
178
179	/// Create a transactional (inline) FlowTransaction.
180	///
181	/// Used by the pre-commit interceptor path. `base_pending` is a read-only
182	/// snapshot of the committing transaction's KV writes so that flow operators
183	/// can see uncommitted row data.
184	pub fn transactional(
185		version: CommitVersion,
186		pending: Pending,
187		base_pending: Pending,
188		primitive_query: MultiReadTransaction,
189		state_query: MultiReadTransaction,
190		catalog: Catalog,
191		interceptors: Interceptors,
192	) -> Self {
193		Self::Transactional {
194			version,
195			pending,
196			base_pending,
197			primitive_query,
198			state_query,
199			catalog,
200			interceptors,
201		}
202	}
203
204	/// Get the transaction version.
205	pub fn version(&self) -> CommitVersion {
206		match self {
207			Self::Deferred {
208				version,
209				..
210			} => *version,
211			Self::Transactional {
212				version,
213				..
214			} => *version,
215		}
216	}
217
218	/// Extract pending writes, replacing them with an empty buffer.
219	pub fn take_pending(&mut self) -> Pending {
220		match self {
221			Self::Deferred {
222				pending,
223				..
224			} => mem::take(pending),
225			Self::Transactional {
226				pending,
227				..
228			} => mem::take(pending),
229		}
230	}
231
232	/// Get a reference to the pending writes.
233	#[cfg(test)]
234	pub fn pending(&self) -> &Pending {
235		match self {
236			Self::Deferred {
237				pending,
238				..
239			} => pending,
240			Self::Transactional {
241				pending,
242				..
243			} => pending,
244		}
245	}
246
247	/// Drain all generated view changes, returning them.
248	pub fn take_view_changes(&mut self) -> pending::ViewChanges {
249		match self {
250			Self::Deferred {
251				pending,
252				..
253			} => pending.take_view_changes(),
254			Self::Transactional {
255				pending,
256				..
257			} => pending.take_view_changes(),
258		}
259	}
260
261	/// Append a view change (used by `SinkViewOperator`).
262	pub fn push_view_change(&mut self, change: Change) {
263		match self {
264			Self::Deferred {
265				pending,
266				..
267			} => pending.push_view_change(change),
268			Self::Transactional {
269				pending,
270				..
271			} => pending.push_view_change(change),
272		}
273	}
274
275	/// Update the transaction to read at a new version
276	pub fn update_version(&mut self, new_version: CommitVersion) {
277		match self {
278			Self::Deferred {
279				version,
280				primitive_query,
281				..
282			} => {
283				*version = new_version;
284				primitive_query.read_as_of_version_inclusive(new_version);
285			}
286			Self::Transactional {
287				version,
288				primitive_query,
289				..
290			} => {
291				*version = new_version;
292				primitive_query.read_as_of_version_inclusive(new_version);
293			}
294		}
295	}
296
297	/// Get access to the catalog for reading metadata
298	pub(crate) fn catalog(&self) -> &Catalog {
299		match self {
300			Self::Deferred {
301				catalog,
302				..
303			} => catalog,
304			Self::Transactional {
305				catalog,
306				..
307			} => catalog,
308		}
309	}
310}
311
312macro_rules! interceptor_method {
313	($method:ident, $field:ident, $trait_name:ident) => {
314		fn $method(&mut self) -> &mut Chain<dyn $trait_name + Send + Sync> {
315			match self {
316				Self::Deferred {
317					interceptors,
318					..
319				} => &mut interceptors.$field,
320				Self::Transactional {
321					interceptors,
322					..
323				} => &mut interceptors.$field,
324			}
325		}
326	};
327}
328
329impl WithInterceptors for FlowTransaction {
330	interceptor_method!(table_pre_insert_interceptors, table_pre_insert, TablePreInsertInterceptor);
331	interceptor_method!(table_post_insert_interceptors, table_post_insert, TablePostInsertInterceptor);
332	interceptor_method!(table_pre_update_interceptors, table_pre_update, TablePreUpdateInterceptor);
333	interceptor_method!(table_post_update_interceptors, table_post_update, TablePostUpdateInterceptor);
334	interceptor_method!(table_pre_delete_interceptors, table_pre_delete, TablePreDeleteInterceptor);
335	interceptor_method!(table_post_delete_interceptors, table_post_delete, TablePostDeleteInterceptor);
336
337	interceptor_method!(ringbuffer_pre_insert_interceptors, ringbuffer_pre_insert, RingBufferPreInsertInterceptor);
338	interceptor_method!(
339		ringbuffer_post_insert_interceptors,
340		ringbuffer_post_insert,
341		RingBufferPostInsertInterceptor
342	);
343	interceptor_method!(ringbuffer_pre_update_interceptors, ringbuffer_pre_update, RingBufferPreUpdateInterceptor);
344	interceptor_method!(
345		ringbuffer_post_update_interceptors,
346		ringbuffer_post_update,
347		RingBufferPostUpdateInterceptor
348	);
349	interceptor_method!(ringbuffer_pre_delete_interceptors, ringbuffer_pre_delete, RingBufferPreDeleteInterceptor);
350	interceptor_method!(
351		ringbuffer_post_delete_interceptors,
352		ringbuffer_post_delete,
353		RingBufferPostDeleteInterceptor
354	);
355
356	interceptor_method!(pre_commit_interceptors, pre_commit, PreCommitInterceptor);
357	interceptor_method!(post_commit_interceptors, post_commit, PostCommitInterceptor);
358
359	interceptor_method!(
360		namespace_def_post_create_interceptors,
361		namespace_def_post_create,
362		NamespaceDefPostCreateInterceptor
363	);
364	interceptor_method!(
365		namespace_def_pre_update_interceptors,
366		namespace_def_pre_update,
367		NamespaceDefPreUpdateInterceptor
368	);
369	interceptor_method!(
370		namespace_def_post_update_interceptors,
371		namespace_def_post_update,
372		NamespaceDefPostUpdateInterceptor
373	);
374	interceptor_method!(
375		namespace_def_pre_delete_interceptors,
376		namespace_def_pre_delete,
377		NamespaceDefPreDeleteInterceptor
378	);
379
380	interceptor_method!(table_def_post_create_interceptors, table_def_post_create, TableDefPostCreateInterceptor);
381	interceptor_method!(table_def_pre_update_interceptors, table_def_pre_update, TableDefPreUpdateInterceptor);
382	interceptor_method!(table_def_post_update_interceptors, table_def_post_update, TableDefPostUpdateInterceptor);
383	interceptor_method!(table_def_pre_delete_interceptors, table_def_pre_delete, TableDefPreDeleteInterceptor);
384
385	interceptor_method!(view_pre_insert_interceptors, view_pre_insert, ViewPreInsertInterceptor);
386	interceptor_method!(view_post_insert_interceptors, view_post_insert, ViewPostInsertInterceptor);
387	interceptor_method!(view_pre_update_interceptors, view_pre_update, ViewPreUpdateInterceptor);
388	interceptor_method!(view_post_update_interceptors, view_post_update, ViewPostUpdateInterceptor);
389	interceptor_method!(view_pre_delete_interceptors, view_pre_delete, ViewPreDeleteInterceptor);
390	interceptor_method!(view_post_delete_interceptors, view_post_delete, ViewPostDeleteInterceptor);
391
392	interceptor_method!(view_def_post_create_interceptors, view_def_post_create, ViewDefPostCreateInterceptor);
393	interceptor_method!(view_def_pre_update_interceptors, view_def_pre_update, ViewDefPreUpdateInterceptor);
394	interceptor_method!(view_def_post_update_interceptors, view_def_post_update, ViewDefPostUpdateInterceptor);
395	interceptor_method!(view_def_pre_delete_interceptors, view_def_pre_delete, ViewDefPreDeleteInterceptor);
396
397	interceptor_method!(
398		ringbuffer_def_post_create_interceptors,
399		ringbuffer_def_post_create,
400		RingBufferDefPostCreateInterceptor
401	);
402	interceptor_method!(
403		ringbuffer_def_pre_update_interceptors,
404		ringbuffer_def_pre_update,
405		RingBufferDefPreUpdateInterceptor
406	);
407	interceptor_method!(
408		ringbuffer_def_post_update_interceptors,
409		ringbuffer_def_post_update,
410		RingBufferDefPostUpdateInterceptor
411	);
412	interceptor_method!(
413		ringbuffer_def_pre_delete_interceptors,
414		ringbuffer_def_pre_delete,
415		RingBufferDefPreDeleteInterceptor
416	);
417}