Skip to main content

reifydb_sub_flow/transaction/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::mem;
5
6use pending::{Pending, PendingWrite};
7use reifydb_catalog::catalog::Catalog;
8use reifydb_core::{
9	common::CommitVersion,
10	interface::{
11		catalog::schema::SchemaId,
12		change::{Change, ChangeOrigin, Diff},
13	},
14};
15use reifydb_transaction::{
16	change_accumulator::ChangeAccumulator,
17	interceptor::{
18		WithInterceptors,
19		authentication::{AuthenticationPostCreateInterceptor, AuthenticationPreDeleteInterceptor},
20		chain::InterceptorChain as Chain,
21		dictionary::{
22			DictionaryPostCreateInterceptor, DictionaryPostUpdateInterceptor,
23			DictionaryPreDeleteInterceptor, DictionaryPreUpdateInterceptor,
24		},
25		dictionary_row::{
26			DictionaryRowPostDeleteInterceptor, DictionaryRowPostInsertInterceptor,
27			DictionaryRowPostUpdateInterceptor, DictionaryRowPreDeleteInterceptor,
28			DictionaryRowPreInsertInterceptor, DictionaryRowPreUpdateInterceptor,
29		},
30		granted_role::{GrantedRolePostCreateInterceptor, GrantedRolePreDeleteInterceptor},
31		identity::{
32			IdentityPostCreateInterceptor, IdentityPostUpdateInterceptor, IdentityPreDeleteInterceptor,
33			IdentityPreUpdateInterceptor,
34		},
35		interceptors::Interceptors,
36		namespace::{
37			NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
38			NamespacePreUpdateInterceptor,
39		},
40		ringbuffer::{
41			RingBufferPostCreateInterceptor, RingBufferPostUpdateInterceptor,
42			RingBufferPreDeleteInterceptor, RingBufferPreUpdateInterceptor,
43		},
44		ringbuffer_row::{
45			RingBufferRowPostDeleteInterceptor, RingBufferRowPostInsertInterceptor,
46			RingBufferRowPostUpdateInterceptor, RingBufferRowPreDeleteInterceptor,
47			RingBufferRowPreInsertInterceptor, RingBufferRowPreUpdateInterceptor,
48		},
49		role::{
50			RolePostCreateInterceptor, RolePostUpdateInterceptor, RolePreDeleteInterceptor,
51			RolePreUpdateInterceptor,
52		},
53		series::{
54			SeriesPostCreateInterceptor, SeriesPostUpdateInterceptor, SeriesPreDeleteInterceptor,
55			SeriesPreUpdateInterceptor,
56		},
57		series_row::{
58			SeriesRowPostDeleteInterceptor, SeriesRowPostInsertInterceptor, SeriesRowPostUpdateInterceptor,
59			SeriesRowPreDeleteInterceptor, SeriesRowPreInsertInterceptor, SeriesRowPreUpdateInterceptor,
60		},
61		table::{
62			TablePostCreateInterceptor, TablePostUpdateInterceptor, TablePreDeleteInterceptor,
63			TablePreUpdateInterceptor,
64		},
65		table_row::{
66			TableRowPostDeleteInterceptor, TableRowPostInsertInterceptor, TableRowPostUpdateInterceptor,
67			TableRowPreDeleteInterceptor, TableRowPreInsertInterceptor, TableRowPreUpdateInterceptor,
68		},
69		transaction::{PostCommitInterceptor, PreCommitInterceptor},
70		view::{
71			ViewPostCreateInterceptor, ViewPostUpdateInterceptor, ViewPreDeleteInterceptor,
72			ViewPreUpdateInterceptor,
73		},
74		view_row::{
75			ViewRowPostDeleteInterceptor, ViewRowPostInsertInterceptor, ViewRowPostUpdateInterceptor,
76			ViewRowPreDeleteInterceptor, ViewRowPreInsertInterceptor, ViewRowPreUpdateInterceptor,
77		},
78	},
79	multi::transaction::read::MultiReadTransaction,
80	transaction::admin::AdminTransaction,
81};
82use tracing::instrument;
83
84pub mod pending;
85pub mod range;
86pub mod read;
87pub mod state;
88pub mod write;
89
90/// Shared fields between Deferred and Transactional variants.
91pub struct FlowTransactionInner {
92	pub version: CommitVersion,
93	pub pending: Pending,
94	pub primitive_query: MultiReadTransaction,
95	pub state_query: MultiReadTransaction,
96	pub catalog: Catalog,
97	pub interceptors: Interceptors,
98	pub accumulator: ChangeAccumulator,
99}
100
101/// A transaction wrapper for flow processing with dual-version read semantics.
102///
103/// # Architecture
104///
105/// FlowTransaction provides **dual-version reads** critical for stateful flow processing:
106/// 1. **Source data** - Read at CDC event version (snapshot isolation)
107/// 2. **Flow state** - Read at latest version (state continuity across CDC events)
108/// 3. **Isolated writes** - Local PendingWrites buffer returned to caller
109///
110/// This dual-version approach allows stateful operators (joins, aggregates, distinct) to:
111/// - Process source data at a consistent snapshot (the CDC event version)
112/// - Access their own state at the latest version to maintain continuity
113///
114/// # Dual-Version Read Routing
115///
116/// Reads are automatically routed to the correct query transaction based on key type:
117///
118/// ```text
119/// ┌─────────────────┐
120/// │  FlowTransaction│
121/// └────────┬────────┘
122///          │
123///          ├──► pending (flow-generated writes)
124///          │
125///          ├──► variant
126///          │    ├─ Deferred: skip
127///          │    └─ Transactional { base_pending }: check base_pending
128///          │
129///          ├──► primitive_query (at CDC version)
130///          │    - Source tables / views / regular data
131///          │
132///          └──► state_query (at latest version)
133///               - FlowNodeState / FlowNodeInternalState
134/// ```
135///
136/// # Construction
137///
138/// Use named constructors to enforce correct initialization:
139/// - [`FlowTransaction::deferred`] — CDC path (no base pending)
140/// - [`FlowTransaction::transactional`] — inline pre-commit path (with base pending)
141///
142/// # Write Path
143///
144/// All writes (`set`, `remove`) go to the local `pending` buffer:
145/// - Reads check pending buffer first, then delegate to query transactions
146/// - Pending writes are extracted via [`FlowTransaction::take_pending`]
147///
148/// # Thread Safety
149///
150/// FlowTransaction is Send because all fields are either Copy, owned, or
151pub enum FlowTransaction {
152	/// CDC-driven async flow processing.
153	/// Reads only from committed storage + flow pending writes.
154	Deferred {
155		inner: FlowTransactionInner,
156	},
157
158	/// Inline flow processing within a committing transaction.
159	/// Can additionally read uncommitted writes from the parent transaction.
160	Transactional {
161		inner: FlowTransactionInner,
162		/// Read-only snapshot of the committing transaction's KV writes.
163		base_pending: Pending,
164	},
165}
166
167impl FlowTransaction {
168	fn inner(&self) -> &FlowTransactionInner {
169		match self {
170			Self::Deferred {
171				inner,
172				..
173			}
174			| Self::Transactional {
175				inner,
176				..
177			} => inner,
178		}
179	}
180
181	fn inner_mut(&mut self) -> &mut FlowTransactionInner {
182		match self {
183			Self::Deferred {
184				inner,
185				..
186			}
187			| Self::Transactional {
188				inner,
189				..
190			} => inner,
191		}
192	}
193
194	/// Create a deferred (CDC) FlowTransaction from a parent transaction.
195	///
196	/// Used by the async worker path. Reads only from committed storage +
197	/// flow-generated pending writes — no base pending from a parent transaction.
198	#[instrument(name = "flow::transaction::deferred", level = "debug", skip(parent, catalog, interceptors), fields(version = version.0))]
199	pub fn deferred(
200		parent: &AdminTransaction,
201		version: CommitVersion,
202		catalog: Catalog,
203		interceptors: Interceptors,
204	) -> Self {
205		let mut primitive_query = parent.multi.begin_query().unwrap();
206		primitive_query.read_as_of_version_inclusive(version);
207
208		let state_query = parent.multi.begin_query().unwrap();
209		Self::Deferred {
210			inner: FlowTransactionInner {
211				version,
212				pending: Pending::new(),
213				primitive_query,
214				state_query,
215				catalog,
216				interceptors,
217				accumulator: ChangeAccumulator::new(),
218			},
219		}
220	}
221
222	/// Create a deferred (CDC) FlowTransaction from pre-built parts.
223	///
224	/// Used by the worker actor which creates its own query transactions.
225	pub fn deferred_from_parts(
226		version: CommitVersion,
227		pending: Pending,
228		primitive_query: MultiReadTransaction,
229		state_query: MultiReadTransaction,
230		catalog: Catalog,
231		interceptors: Interceptors,
232	) -> Self {
233		Self::Deferred {
234			inner: FlowTransactionInner {
235				version,
236				pending,
237				primitive_query,
238				state_query,
239				catalog,
240				interceptors,
241				accumulator: ChangeAccumulator::new(),
242			},
243		}
244	}
245
246	/// Create a transactional (inline) FlowTransaction.
247	///
248	/// Used by the pre-commit interceptor path. `base_pending` is a read-only
249	/// snapshot of the committing transaction's KV writes so that flow operators
250	/// can see uncommitted row data.
251	pub fn transactional(
252		version: CommitVersion,
253		pending: Pending,
254		base_pending: Pending,
255		primitive_query: MultiReadTransaction,
256		state_query: MultiReadTransaction,
257		catalog: Catalog,
258		interceptors: Interceptors,
259	) -> Self {
260		Self::Transactional {
261			inner: FlowTransactionInner {
262				version,
263				pending,
264				primitive_query,
265				state_query,
266				catalog,
267				interceptors,
268				accumulator: ChangeAccumulator::new(),
269			},
270			base_pending,
271		}
272	}
273
274	/// Get the transaction version.
275	pub fn version(&self) -> CommitVersion {
276		self.inner().version
277	}
278
279	/// Extract pending writes, replacing them with an empty buffer.
280	pub fn take_pending(&mut self) -> Pending {
281		mem::take(&mut self.inner_mut().pending)
282	}
283
284	/// Track a view-level flow change in this transaction's accumulator.
285	pub fn track_flow_change(&mut self, change: Change) {
286		if let ChangeOrigin::Schema(id) = change.origin {
287			for diff in change.diffs {
288				self.inner_mut().accumulator.track(id, diff);
289			}
290		}
291	}
292
293	/// Drain the accumulator entries collected during flow processing.
294	pub fn take_accumulator_entries(&mut self) -> Vec<(SchemaId, Diff)> {
295		let acc = &mut self.inner_mut().accumulator;
296		let entries: Vec<_> = acc.entries_from(0).to_vec();
297		acc.clear();
298		entries
299	}
300
301	/// Get a reference to the pending writes.
302	#[cfg(test)]
303	pub fn pending(&self) -> &Pending {
304		&self.inner().pending
305	}
306
307	/// Update the transaction to read at a new version
308	pub fn update_version(&mut self, new_version: CommitVersion) {
309		let inner = self.inner_mut();
310		inner.version = new_version;
311		inner.primitive_query.read_as_of_version_inclusive(new_version);
312	}
313
314	/// Get access to the catalog for reading metadata
315	pub fn catalog(&self) -> &Catalog {
316		&self.inner().catalog
317	}
318}
319
320macro_rules! interceptor_method {
321	($method:ident, $field:ident, $trait_name:ident) => {
322		fn $method(&mut self) -> &mut Chain<dyn $trait_name + Send + Sync> {
323			&mut self.inner_mut().interceptors.$field
324		}
325	};
326}
327
328impl WithInterceptors for FlowTransaction {
329	interceptor_method!(table_row_pre_insert_interceptors, table_row_pre_insert, TableRowPreInsertInterceptor);
330	interceptor_method!(table_row_post_insert_interceptors, table_row_post_insert, TableRowPostInsertInterceptor);
331	interceptor_method!(table_row_pre_update_interceptors, table_row_pre_update, TableRowPreUpdateInterceptor);
332	interceptor_method!(table_row_post_update_interceptors, table_row_post_update, TableRowPostUpdateInterceptor);
333	interceptor_method!(table_row_pre_delete_interceptors, table_row_pre_delete, TableRowPreDeleteInterceptor);
334	interceptor_method!(table_row_post_delete_interceptors, table_row_post_delete, TableRowPostDeleteInterceptor);
335
336	interceptor_method!(
337		ringbuffer_row_pre_insert_interceptors,
338		ringbuffer_row_pre_insert,
339		RingBufferRowPreInsertInterceptor
340	);
341	interceptor_method!(
342		ringbuffer_row_post_insert_interceptors,
343		ringbuffer_row_post_insert,
344		RingBufferRowPostInsertInterceptor
345	);
346	interceptor_method!(
347		ringbuffer_row_pre_update_interceptors,
348		ringbuffer_row_pre_update,
349		RingBufferRowPreUpdateInterceptor
350	);
351	interceptor_method!(
352		ringbuffer_row_post_update_interceptors,
353		ringbuffer_row_post_update,
354		RingBufferRowPostUpdateInterceptor
355	);
356	interceptor_method!(
357		ringbuffer_row_pre_delete_interceptors,
358		ringbuffer_row_pre_delete,
359		RingBufferRowPreDeleteInterceptor
360	);
361	interceptor_method!(
362		ringbuffer_row_post_delete_interceptors,
363		ringbuffer_row_post_delete,
364		RingBufferRowPostDeleteInterceptor
365	);
366
367	interceptor_method!(pre_commit_interceptors, pre_commit, PreCommitInterceptor);
368	interceptor_method!(post_commit_interceptors, post_commit, PostCommitInterceptor);
369
370	interceptor_method!(namespace_post_create_interceptors, namespace_post_create, NamespacePostCreateInterceptor);
371	interceptor_method!(namespace_pre_update_interceptors, namespace_pre_update, NamespacePreUpdateInterceptor);
372	interceptor_method!(namespace_post_update_interceptors, namespace_post_update, NamespacePostUpdateInterceptor);
373	interceptor_method!(namespace_pre_delete_interceptors, namespace_pre_delete, NamespacePreDeleteInterceptor);
374
375	interceptor_method!(table_post_create_interceptors, table_post_create, TablePostCreateInterceptor);
376	interceptor_method!(table_pre_update_interceptors, table_pre_update, TablePreUpdateInterceptor);
377	interceptor_method!(table_post_update_interceptors, table_post_update, TablePostUpdateInterceptor);
378	interceptor_method!(table_pre_delete_interceptors, table_pre_delete, TablePreDeleteInterceptor);
379
380	interceptor_method!(view_row_pre_insert_interceptors, view_row_pre_insert, ViewRowPreInsertInterceptor);
381	interceptor_method!(view_row_post_insert_interceptors, view_row_post_insert, ViewRowPostInsertInterceptor);
382	interceptor_method!(view_row_pre_update_interceptors, view_row_pre_update, ViewRowPreUpdateInterceptor);
383	interceptor_method!(view_row_post_update_interceptors, view_row_post_update, ViewRowPostUpdateInterceptor);
384	interceptor_method!(view_row_pre_delete_interceptors, view_row_pre_delete, ViewRowPreDeleteInterceptor);
385	interceptor_method!(view_row_post_delete_interceptors, view_row_post_delete, ViewRowPostDeleteInterceptor);
386
387	interceptor_method!(view_post_create_interceptors, view_post_create, ViewPostCreateInterceptor);
388	interceptor_method!(view_pre_update_interceptors, view_pre_update, ViewPreUpdateInterceptor);
389	interceptor_method!(view_post_update_interceptors, view_post_update, ViewPostUpdateInterceptor);
390	interceptor_method!(view_pre_delete_interceptors, view_pre_delete, ViewPreDeleteInterceptor);
391
392	interceptor_method!(
393		ringbuffer_post_create_interceptors,
394		ringbuffer_post_create,
395		RingBufferPostCreateInterceptor
396	);
397	interceptor_method!(ringbuffer_pre_update_interceptors, ringbuffer_pre_update, RingBufferPreUpdateInterceptor);
398	interceptor_method!(
399		ringbuffer_post_update_interceptors,
400		ringbuffer_post_update,
401		RingBufferPostUpdateInterceptor
402	);
403	interceptor_method!(ringbuffer_pre_delete_interceptors, ringbuffer_pre_delete, RingBufferPreDeleteInterceptor);
404
405	interceptor_method!(
406		dictionary_row_pre_insert_interceptors,
407		dictionary_row_pre_insert,
408		DictionaryRowPreInsertInterceptor
409	);
410	interceptor_method!(
411		dictionary_row_post_insert_interceptors,
412		dictionary_row_post_insert,
413		DictionaryRowPostInsertInterceptor
414	);
415	interceptor_method!(
416		dictionary_row_pre_update_interceptors,
417		dictionary_row_pre_update,
418		DictionaryRowPreUpdateInterceptor
419	);
420	interceptor_method!(
421		dictionary_row_post_update_interceptors,
422		dictionary_row_post_update,
423		DictionaryRowPostUpdateInterceptor
424	);
425	interceptor_method!(
426		dictionary_row_pre_delete_interceptors,
427		dictionary_row_pre_delete,
428		DictionaryRowPreDeleteInterceptor
429	);
430	interceptor_method!(
431		dictionary_row_post_delete_interceptors,
432		dictionary_row_post_delete,
433		DictionaryRowPostDeleteInterceptor
434	);
435
436	interceptor_method!(
437		dictionary_post_create_interceptors,
438		dictionary_post_create,
439		DictionaryPostCreateInterceptor
440	);
441	interceptor_method!(dictionary_pre_update_interceptors, dictionary_pre_update, DictionaryPreUpdateInterceptor);
442	interceptor_method!(
443		dictionary_post_update_interceptors,
444		dictionary_post_update,
445		DictionaryPostUpdateInterceptor
446	);
447	interceptor_method!(dictionary_pre_delete_interceptors, dictionary_pre_delete, DictionaryPreDeleteInterceptor);
448
449	interceptor_method!(series_row_pre_insert_interceptors, series_row_pre_insert, SeriesRowPreInsertInterceptor);
450	interceptor_method!(
451		series_row_post_insert_interceptors,
452		series_row_post_insert,
453		SeriesRowPostInsertInterceptor
454	);
455	interceptor_method!(series_row_pre_update_interceptors, series_row_pre_update, SeriesRowPreUpdateInterceptor);
456	interceptor_method!(
457		series_row_post_update_interceptors,
458		series_row_post_update,
459		SeriesRowPostUpdateInterceptor
460	);
461	interceptor_method!(series_row_pre_delete_interceptors, series_row_pre_delete, SeriesRowPreDeleteInterceptor);
462	interceptor_method!(
463		series_row_post_delete_interceptors,
464		series_row_post_delete,
465		SeriesRowPostDeleteInterceptor
466	);
467
468	interceptor_method!(series_post_create_interceptors, series_post_create, SeriesPostCreateInterceptor);
469	interceptor_method!(series_pre_update_interceptors, series_pre_update, SeriesPreUpdateInterceptor);
470	interceptor_method!(series_post_update_interceptors, series_post_update, SeriesPostUpdateInterceptor);
471	interceptor_method!(series_pre_delete_interceptors, series_pre_delete, SeriesPreDeleteInterceptor);
472	interceptor_method!(identity_post_create_interceptors, identity_post_create, IdentityPostCreateInterceptor);
473	interceptor_method!(identity_pre_update_interceptors, identity_pre_update, IdentityPreUpdateInterceptor);
474	interceptor_method!(identity_post_update_interceptors, identity_post_update, IdentityPostUpdateInterceptor);
475	interceptor_method!(identity_pre_delete_interceptors, identity_pre_delete, IdentityPreDeleteInterceptor);
476	interceptor_method!(role_post_create_interceptors, role_post_create, RolePostCreateInterceptor);
477	interceptor_method!(role_pre_update_interceptors, role_pre_update, RolePreUpdateInterceptor);
478	interceptor_method!(role_post_update_interceptors, role_post_update, RolePostUpdateInterceptor);
479	interceptor_method!(role_pre_delete_interceptors, role_pre_delete, RolePreDeleteInterceptor);
480	interceptor_method!(
481		granted_role_post_create_interceptors,
482		granted_role_post_create,
483		GrantedRolePostCreateInterceptor
484	);
485	interceptor_method!(
486		granted_role_pre_delete_interceptors,
487		granted_role_pre_delete,
488		GrantedRolePreDeleteInterceptor
489	);
490	interceptor_method!(
491		authentication_post_create_interceptors,
492		authentication_post_create,
493		AuthenticationPostCreateInterceptor
494	);
495	interceptor_method!(
496		authentication_pre_delete_interceptors,
497		authentication_pre_delete,
498		AuthenticationPreDeleteInterceptor
499	);
500}