reifydb_transaction/interceptor/
transaction.rs1use reifydb_core::{
5 common::CommitVersion,
6 encoded::{encoded::EncodedValues, key::EncodedKey},
7 interface::change::Change,
8 testing::TestingContext,
9};
10use reifydb_type::Result;
11
12use crate::{
13 TransactionId,
14 change::{RowChange, TransactionalDefChanges},
15 interceptor::chain::InterceptorChain,
16};
17
18pub struct PreCommitContext {
25 pub flow_changes: Vec<Change>,
27 pub pending_writes: Vec<(EncodedKey, Option<EncodedValues>)>,
30 pub transaction_writes: Vec<(EncodedKey, Option<EncodedValues>)>,
33 pub testing: Option<TestingContext>,
35}
36
37impl PreCommitContext {
38 pub fn new() -> Self {
39 Self {
40 flow_changes: Vec::new(),
41 pending_writes: Vec::new(),
42 transaction_writes: Vec::new(),
43 testing: None,
44 }
45 }
46
47 pub fn new_with_flow_changes(flow_changes: Vec<Change>) -> Self {
48 Self {
49 flow_changes,
50 pending_writes: Vec::new(),
51 transaction_writes: Vec::new(),
52 testing: None,
53 }
54 }
55}
56
57impl Default for PreCommitContext {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63pub trait PreCommitInterceptor: Send + Sync {
64 fn intercept(&self, ctx: &mut PreCommitContext) -> Result<()>;
65}
66
67impl InterceptorChain<dyn PreCommitInterceptor + Send + Sync> {
68 pub fn execute(&self, ctx: &mut PreCommitContext) -> Result<()> {
69 for interceptor in &self.interceptors {
70 interceptor.intercept(ctx)?;
71 }
72 Ok(())
73 }
74}
75
76pub struct ClosurePreCommitInterceptor<F>
78where
79 F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
80{
81 closure: F,
82}
83
84impl<F> ClosurePreCommitInterceptor<F>
85where
86 F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
87{
88 pub fn new(closure: F) -> Self {
89 Self {
90 closure,
91 }
92 }
93}
94
95impl<F> Clone for ClosurePreCommitInterceptor<F>
96where
97 F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync + Clone,
98{
99 fn clone(&self) -> Self {
100 Self {
101 closure: self.closure.clone(),
102 }
103 }
104}
105
106impl<F> PreCommitInterceptor for ClosurePreCommitInterceptor<F>
107where
108 F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
109{
110 fn intercept(&self, ctx: &mut PreCommitContext) -> Result<()> {
111 (self.closure)(ctx)
112 }
113}
114
115pub fn pre_commit<F>(f: F) -> ClosurePreCommitInterceptor<F>
117where
118 F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync + Clone + 'static,
119{
120 ClosurePreCommitInterceptor::new(f)
121}
122
123pub struct PostCommitContext {
125 pub id: TransactionId,
126 pub version: CommitVersion,
127 pub changes: TransactionalDefChanges,
128 pub row_changes: Vec<RowChange>,
129}
130
131impl PostCommitContext {
132 pub fn new(
133 id: TransactionId,
134 version: CommitVersion,
135 changes: TransactionalDefChanges,
136 row_changes: Vec<RowChange>,
137 ) -> Self {
138 Self {
139 id,
140 version,
141 changes,
142 row_changes,
143 }
144 }
145}
146
147pub trait PostCommitInterceptor: Send + Sync {
148 fn intercept(&self, ctx: &mut PostCommitContext) -> Result<()>;
149}
150
151impl InterceptorChain<dyn PostCommitInterceptor + Send + Sync> {
152 pub fn execute(&self, mut ctx: PostCommitContext) -> Result<()> {
153 for interceptor in &self.interceptors {
154 interceptor.intercept(&mut ctx)?;
155 }
156 Ok(())
157 }
158}
159
160pub struct ClosurePostCommitInterceptor<F>
162where
163 F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
164{
165 closure: F,
166}
167
168impl<F> ClosurePostCommitInterceptor<F>
169where
170 F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
171{
172 pub fn new(closure: F) -> Self {
173 Self {
174 closure,
175 }
176 }
177}
178
179impl<F> Clone for ClosurePostCommitInterceptor<F>
180where
181 F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync + Clone,
182{
183 fn clone(&self) -> Self {
184 Self {
185 closure: self.closure.clone(),
186 }
187 }
188}
189
190impl<F> PostCommitInterceptor for ClosurePostCommitInterceptor<F>
191where
192 F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
193{
194 fn intercept(&self, ctx: &mut PostCommitContext) -> Result<()> {
195 (self.closure)(ctx)
196 }
197}
198
199pub fn post_commit<F>(f: F) -> ClosurePostCommitInterceptor<F>
201where
202 F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync + Clone + 'static,
203{
204 ClosurePostCommitInterceptor::new(f)
205}