Skip to main content

reifydb_transaction/interceptor/
transaction.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::CommitVersion,
6	encoded::{encoded::EncodedValues, key::EncodedKey},
7	interface::change::Change,
8	testing::TestingContext,
9};
10use reifydb_type::Result;
11
12use crate::{
13	TransactionId,
14	change::{RowChange, TransactionalDefChanges},
15	interceptor::chain::InterceptorChain,
16};
17
18/// Context for pre-commit interceptors.
19///
20/// `flow_changes` carries the table-level changes accumulated during the transaction
21/// (input for transactional flow interceptors).
22/// `pending_writes` is populated by interceptors with view writes to be merged
23/// back into the transaction before it commits.
24pub struct PreCommitContext {
25	/// Table changes accumulated during this transaction (input to flow interceptors).
26	pub flow_changes: Vec<Change>,
27	/// View writes produced by flow interceptors to merge back into the transaction.
28	/// `Some(value)` = set the key, `None` = remove the key.
29	pub pending_writes: Vec<(EncodedKey, Option<EncodedValues>)>,
30	/// Snapshot of the committing transaction's pending KV writes (read-only base for flow processing).
31	/// `Some(value)` = set the key, `None` = remove the key.
32	pub transaction_writes: Vec<(EncodedKey, Option<EncodedValues>)>,
33	/// Testing audit log. Threaded through flow processing so view mutations are captured.
34	pub testing: Option<TestingContext>,
35}
36
37impl PreCommitContext {
38	pub fn new() -> Self {
39		Self {
40			flow_changes: Vec::new(),
41			pending_writes: Vec::new(),
42			transaction_writes: Vec::new(),
43			testing: None,
44		}
45	}
46
47	pub fn new_with_flow_changes(flow_changes: Vec<Change>) -> Self {
48		Self {
49			flow_changes,
50			pending_writes: Vec::new(),
51			transaction_writes: Vec::new(),
52			testing: None,
53		}
54	}
55}
56
57impl Default for PreCommitContext {
58	fn default() -> Self {
59		Self::new()
60	}
61}
62
63pub trait PreCommitInterceptor: Send + Sync {
64	fn intercept(&self, ctx: &mut PreCommitContext) -> Result<()>;
65}
66
67impl InterceptorChain<dyn PreCommitInterceptor + Send + Sync> {
68	pub fn execute(&self, ctx: &mut PreCommitContext) -> Result<()> {
69		for interceptor in &self.interceptors {
70			interceptor.intercept(ctx)?;
71		}
72		Ok(())
73	}
74}
75
76/// Closure wrapper for pre-commit interceptors
77pub struct ClosurePreCommitInterceptor<F>
78where
79	F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
80{
81	closure: F,
82}
83
84impl<F> ClosurePreCommitInterceptor<F>
85where
86	F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
87{
88	pub fn new(closure: F) -> Self {
89		Self {
90			closure,
91		}
92	}
93}
94
95impl<F> Clone for ClosurePreCommitInterceptor<F>
96where
97	F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync + Clone,
98{
99	fn clone(&self) -> Self {
100		Self {
101			closure: self.closure.clone(),
102		}
103	}
104}
105
106impl<F> PreCommitInterceptor for ClosurePreCommitInterceptor<F>
107where
108	F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
109{
110	fn intercept(&self, ctx: &mut PreCommitContext) -> Result<()> {
111		(self.closure)(ctx)
112	}
113}
114
115/// Helper function to create a closure pre-commit interceptor
116pub fn pre_commit<F>(f: F) -> ClosurePreCommitInterceptor<F>
117where
118	F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync + Clone + 'static,
119{
120	ClosurePreCommitInterceptor::new(f)
121}
122
123/// Context for post-commit interceptors
124pub struct PostCommitContext {
125	pub id: TransactionId,
126	pub version: CommitVersion,
127	pub changes: TransactionalDefChanges,
128	pub row_changes: Vec<RowChange>,
129}
130
131impl PostCommitContext {
132	pub fn new(
133		id: TransactionId,
134		version: CommitVersion,
135		changes: TransactionalDefChanges,
136		row_changes: Vec<RowChange>,
137	) -> Self {
138		Self {
139			id,
140			version,
141			changes,
142			row_changes,
143		}
144	}
145}
146
147pub trait PostCommitInterceptor: Send + Sync {
148	fn intercept(&self, ctx: &mut PostCommitContext) -> Result<()>;
149}
150
151impl InterceptorChain<dyn PostCommitInterceptor + Send + Sync> {
152	pub fn execute(&self, mut ctx: PostCommitContext) -> Result<()> {
153		for interceptor in &self.interceptors {
154			interceptor.intercept(&mut ctx)?;
155		}
156		Ok(())
157	}
158}
159
160/// Closure wrapper for post-commit interceptors
161pub struct ClosurePostCommitInterceptor<F>
162where
163	F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
164{
165	closure: F,
166}
167
168impl<F> ClosurePostCommitInterceptor<F>
169where
170	F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
171{
172	pub fn new(closure: F) -> Self {
173		Self {
174			closure,
175		}
176	}
177}
178
179impl<F> Clone for ClosurePostCommitInterceptor<F>
180where
181	F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync + Clone,
182{
183	fn clone(&self) -> Self {
184		Self {
185			closure: self.closure.clone(),
186		}
187	}
188}
189
190impl<F> PostCommitInterceptor for ClosurePostCommitInterceptor<F>
191where
192	F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
193{
194	fn intercept(&self, ctx: &mut PostCommitContext) -> Result<()> {
195		(self.closure)(ctx)
196	}
197}
198
199/// Helper function to create a closure post-commit interceptor
200pub fn post_commit<F>(f: F) -> ClosurePostCommitInterceptor<F>
201where
202	F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync + Clone + 'static,
203{
204	ClosurePostCommitInterceptor::new(f)
205}