reifydb_transaction/interceptor/
transaction.rs1use reifydb_core::{
5 common::CommitVersion,
6 encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
7 interface::{
8 catalog::shape::ShapeId,
9 change::{Change, Diff},
10 },
11};
12use reifydb_type::Result;
13
14use crate::{
15 TransactionId,
16 change::{RowChange, TransactionalCatalogChanges},
17 interceptor::chain::InterceptorChain,
18};
19
20pub struct PreCommitContext {
25 pub flow_changes: Vec<Change>,
27 pub pending_writes: Vec<(EncodedKey, Option<EncodedRow>)>,
30 pub pending_shapes: Vec<RowShape>,
32 pub transaction_writes: Vec<(EncodedKey, Option<EncodedRow>)>,
35 pub view_entries: Vec<(ShapeId, Diff)>,
38}
39
40impl PreCommitContext {
41 pub fn new() -> Self {
42 Self {
43 flow_changes: Vec::new(),
44 pending_writes: Vec::new(),
45 pending_shapes: Vec::new(),
46 transaction_writes: Vec::new(),
47 view_entries: Vec::new(),
48 }
49 }
50}
51
52impl Default for PreCommitContext {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58pub trait PreCommitInterceptor: Send + Sync {
59 fn intercept(&self, ctx: &mut PreCommitContext) -> Result<()>;
60}
61
62impl InterceptorChain<dyn PreCommitInterceptor + Send + Sync> {
63 pub fn execute(&self, ctx: &mut PreCommitContext) -> Result<()> {
64 for interceptor in &self.interceptors {
65 interceptor.intercept(ctx)?;
66 }
67 Ok(())
68 }
69}
70
71pub struct ClosurePreCommitInterceptor<F>
73where
74 F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
75{
76 closure: F,
77}
78
79impl<F> ClosurePreCommitInterceptor<F>
80where
81 F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
82{
83 pub fn new(closure: F) -> Self {
84 Self {
85 closure,
86 }
87 }
88}
89
90impl<F> Clone for ClosurePreCommitInterceptor<F>
91where
92 F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync + Clone,
93{
94 fn clone(&self) -> Self {
95 Self {
96 closure: self.closure.clone(),
97 }
98 }
99}
100
101impl<F> PreCommitInterceptor for ClosurePreCommitInterceptor<F>
102where
103 F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync,
104{
105 fn intercept(&self, ctx: &mut PreCommitContext) -> Result<()> {
106 (self.closure)(ctx)
107 }
108}
109
110pub fn pre_commit<F>(f: F) -> ClosurePreCommitInterceptor<F>
112where
113 F: Fn(&mut PreCommitContext) -> Result<()> + Send + Sync + Clone + 'static,
114{
115 ClosurePreCommitInterceptor::new(f)
116}
117
118pub struct PostCommitContext {
119 pub id: TransactionId,
120 pub version: CommitVersion,
121 pub changes: TransactionalCatalogChanges,
122 pub row_changes: Vec<RowChange>,
123}
124
125impl PostCommitContext {
126 pub fn new(
127 id: TransactionId,
128 version: CommitVersion,
129 changes: TransactionalCatalogChanges,
130 row_changes: Vec<RowChange>,
131 ) -> Self {
132 Self {
133 id,
134 version,
135 changes,
136 row_changes,
137 }
138 }
139}
140
141pub trait PostCommitInterceptor: Send + Sync {
142 fn intercept(&self, ctx: &mut PostCommitContext) -> Result<()>;
143}
144
145impl InterceptorChain<dyn PostCommitInterceptor + Send + Sync> {
146 pub fn execute(&self, mut ctx: PostCommitContext) -> Result<()> {
147 for interceptor in &self.interceptors {
148 interceptor.intercept(&mut ctx)?;
149 }
150 Ok(())
151 }
152}
153
154pub struct ClosurePostCommitInterceptor<F>
156where
157 F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
158{
159 closure: F,
160}
161
162impl<F> ClosurePostCommitInterceptor<F>
163where
164 F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
165{
166 pub fn new(closure: F) -> Self {
167 Self {
168 closure,
169 }
170 }
171}
172
173impl<F> Clone for ClosurePostCommitInterceptor<F>
174where
175 F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync + Clone,
176{
177 fn clone(&self) -> Self {
178 Self {
179 closure: self.closure.clone(),
180 }
181 }
182}
183
184impl<F> PostCommitInterceptor for ClosurePostCommitInterceptor<F>
185where
186 F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync,
187{
188 fn intercept(&self, ctx: &mut PostCommitContext) -> Result<()> {
189 (self.closure)(ctx)
190 }
191}
192
193pub fn post_commit<F>(f: F) -> ClosurePostCommitInterceptor<F>
195where
196 F: Fn(&mut PostCommitContext) -> Result<()> + Send + Sync + Clone + 'static,
197{
198 ClosurePostCommitInterceptor::new(f)
199}