Skip to main content

reifydb_transaction/interceptor/
transaction.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::CommitVersion,
6	encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
7	interface::{
8		catalog::shape::ShapeId,
9		change::{Change, Diff},
10	},
11};
12use reifydb_type::Result;
13
14use crate::{
15	TransactionId,
16	change::{RowChange, TransactionalCatalogChanges},
17	interceptor::chain::InterceptorChain,
18};
19
20/// `flow_changes` carries the table-level changes accumulated during the transaction
21/// (input for transactional flow interceptors).
22/// `pending_writes` is populated by interceptors with view writes to be merged
23/// back into the transaction before it commits.
24pub struct PreCommitContext {
25	/// Table changes accumulated during this transaction (input to flow interceptors).
26	pub flow_changes: Vec<Change>,
27	/// View writes produced by flow interceptors to merge back into the transaction.
28	/// `Some(value)` = set the key, `None` = remove the key.
29	pub pending_writes: Vec<(EncodedKey, Option<EncodedRow>)>,
30	/// Row shapes created during flow processing, to be persisted at commit time.
31	pub pending_shapes: Vec<RowShape>,
32	/// Snapshot of the committing transaction's pending KV writes (read-only base for flow processing).
33	/// `Some(value)` = set the key, `None` = remove the key.
34	pub transaction_writes: Vec<(EncodedKey, Option<EncodedRow>)>,
35	/// View-level accumulator entries produced by flow interceptors.
36	/// Used by test infrastructure to feed view diffs back into the change accumulator.
37	pub view_entries: Vec<(ShapeId, Diff)>,
38}
39
40impl PreCommitContext {
41	pub fn new() -> Self {
42		Self {
43			flow_changes: Vec::new(),
44			pending_writes: Vec::new(),
45			pending_shapes: Vec::new(),
46			transaction_writes: Vec::new(),
47			view_entries: Vec::new(),
48		}
49	}
50}
51
52impl Default for PreCommitContext {
53	fn default() -> Self {
54		Self::new()
55	}
56}
57
58pub trait PreCommitInterceptor: Send + Sync {
59	fn intercept(&self, ctx: &mut PreCommitContext) -> Result<()>;
60}
61
62impl InterceptorChain<dyn PreCommitInterceptor + Send + Sync> {
63	pub fn execute(&self, ctx: &mut PreCommitContext) -> Result<()> {
64		for interceptor in &self.interceptors {
65			interceptor.intercept(ctx)?;
66		}
67		Ok(())
68	}
69}
70
71/// Closure wrapper for pre-commit interceptors
72pub struct ClosurePreCommitInterceptor<F>
73where
74	F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
75{
76	closure: F,
77}
78
79impl<F> ClosurePreCommitInterceptor<F>
80where
81	F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
82{
83	pub fn new(closure: F) -> Self {
84		Self {
85			closure,
86		}
87	}
88}
89
90impl<F> Clone for ClosurePreCommitInterceptor<F>
91where
92	F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync + Clone,
93{
94	fn clone(&self) -> Self {
95		Self {
96			closure: self.closure.clone(),
97		}
98	}
99}
100
101impl<F> PreCommitInterceptor for ClosurePreCommitInterceptor<F>
102where
103	F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
104{
105	fn intercept(&self, ctx: &mut PreCommitContext) -> Result<()> {
106		(self.closure)(ctx)
107	}
108}
109
110/// Helper function to create a closure pre-commit interceptor
111pub fn pre_commit<F>(f: F) -> ClosurePreCommitInterceptor<F>
112where
113	F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync + Clone + 'static,
114{
115	ClosurePreCommitInterceptor::new(f)
116}
117
118pub struct PostCommitContext {
119	pub id: TransactionId,
120	pub version: CommitVersion,
121	pub changes: TransactionalCatalogChanges,
122	pub row_changes: Vec<RowChange>,
123}
124
125impl PostCommitContext {
126	pub fn new(
127		id: TransactionId,
128		version: CommitVersion,
129		changes: TransactionalCatalogChanges,
130		row_changes: Vec<RowChange>,
131	) -> Self {
132		Self {
133			id,
134			version,
135			changes,
136			row_changes,
137		}
138	}
139}
140
141pub trait PostCommitInterceptor: Send + Sync {
142	fn intercept(&self, ctx: &mut PostCommitContext) -> Result<()>;
143}
144
145impl InterceptorChain<dyn PostCommitInterceptor + Send + Sync> {
146	pub fn execute(&self, mut ctx: PostCommitContext) -> Result<()> {
147		for interceptor in &self.interceptors {
148			interceptor.intercept(&mut ctx)?;
149		}
150		Ok(())
151	}
152}
153
154/// Closure wrapper for post-commit interceptors
155pub struct ClosurePostCommitInterceptor<F>
156where
157	F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
158{
159	closure: F,
160}
161
162impl<F> ClosurePostCommitInterceptor<F>
163where
164	F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
165{
166	pub fn new(closure: F) -> Self {
167		Self {
168			closure,
169		}
170	}
171}
172
173impl<F> Clone for ClosurePostCommitInterceptor<F>
174where
175	F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync + Clone,
176{
177	fn clone(&self) -> Self {
178		Self {
179			closure: self.closure.clone(),
180		}
181	}
182}
183
184impl<F> PostCommitInterceptor for ClosurePostCommitInterceptor<F>
185where
186	F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
187{
188	fn intercept(&self, ctx: &mut PostCommitContext) -> Result<()> {
189		(self.closure)(ctx)
190	}
191}
192
193/// Helper function to create a closure post-commit interceptor
194pub fn post_commit<F>(f: F) -> ClosurePostCommitInterceptor<F>
195where
196	F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync + Clone + 'static,
197{
198	ClosurePostCommitInterceptor::new(f)
199}