1use async_trait::async_trait;
5use reifydb_type::RowNumber;
6
7use crate::{
8 CommitVersion,
9 interceptor::{
10 NamespaceDefPostCreateContext, NamespaceDefPostUpdateContext, NamespaceDefPreDeleteContext,
11 NamespaceDefPreUpdateContext, PostCommitContext, PreCommitContext, RingBufferDefPostCreateContext,
12 RingBufferDefPostUpdateContext, RingBufferDefPreDeleteContext, RingBufferDefPreUpdateContext,
13 RingBufferPostDeleteContext, RingBufferPostInsertContext, RingBufferPostUpdateContext,
14 RingBufferPreDeleteContext, RingBufferPreInsertContext, RingBufferPreUpdateContext,
15 TableDefPostCreateContext, TableDefPostUpdateContext, TableDefPreDeleteContext,
16 TableDefPreUpdateContext, TablePostDeleteContext, TablePostInsertContext, TablePostUpdateContext,
17 TablePreDeleteContext, TablePreInsertContext, TablePreUpdateContext, ViewDefPostCreateContext,
18 ViewDefPostUpdateContext, ViewDefPreDeleteContext, ViewDefPreUpdateContext,
19 },
20 interface::{
21 CommandTransaction, NamespaceDef, RingBufferDef, RowChange, TableDef, TransactionId,
22 TransactionalDefChanges, ViewDef,
23 interceptor::{
24 NamespaceDefInterceptor, RingBufferDefInterceptor, RingBufferInterceptor, TableDefInterceptor,
25 TableInterceptor, TransactionInterceptor, ViewDefInterceptor, WithInterceptors,
26 },
27 },
28 value::encoded::EncodedValues,
29};
30
31#[async_trait]
32impl<CT: CommandTransaction + WithInterceptors<CT> + Send> TableInterceptor<CT> for CT {
33 async fn pre_insert(&mut self, table: &TableDef, rn: RowNumber, row: &EncodedValues) -> crate::Result<()> {
34 if self.table_pre_insert_interceptors().is_empty() {
35 return Ok(());
36 }
37 let interceptors = self.table_pre_insert_interceptors().interceptors.clone();
38 let mut ctx = TablePreInsertContext::new(self, table, rn, row);
39 for interceptor in interceptors {
40 interceptor.intercept(&mut ctx).await?;
41 }
42 Ok(())
43 }
44
45 async fn post_insert(&mut self, table: &TableDef, id: RowNumber, row: &EncodedValues) -> crate::Result<()> {
46 if self.table_post_insert_interceptors().is_empty() {
47 return Ok(());
48 }
49 let interceptors = self.table_post_insert_interceptors().interceptors.clone();
50 let mut ctx = TablePostInsertContext::new(self, table, id, row);
51 for interceptor in interceptors {
52 interceptor.intercept(&mut ctx).await?;
53 }
54 Ok(())
55 }
56
57 async fn pre_update(&mut self, table: &TableDef, id: RowNumber, row: &EncodedValues) -> crate::Result<()> {
58 if self.table_pre_update_interceptors().is_empty() {
59 return Ok(());
60 }
61 let interceptors = self.table_pre_update_interceptors().interceptors.clone();
62 let mut ctx = TablePreUpdateContext::new(self, table, id, row);
63 for interceptor in interceptors {
64 interceptor.intercept(&mut ctx).await?;
65 }
66 Ok(())
67 }
68
69 async fn post_update(
70 &mut self,
71 table: &TableDef,
72 id: RowNumber,
73 row: &EncodedValues,
74 old_row: &EncodedValues,
75 ) -> crate::Result<()> {
76 if self.table_post_update_interceptors().is_empty() {
77 return Ok(());
78 }
79 let interceptors = self.table_post_update_interceptors().interceptors.clone();
80 let mut ctx = TablePostUpdateContext::new(self, table, id, row, old_row);
81 for interceptor in interceptors {
82 interceptor.intercept(&mut ctx).await?;
83 }
84 Ok(())
85 }
86
87 async fn pre_delete(&mut self, table: &TableDef, id: RowNumber) -> crate::Result<()> {
88 if self.table_pre_delete_interceptors().is_empty() {
89 return Ok(());
90 }
91 let interceptors = self.table_pre_delete_interceptors().interceptors.clone();
92 let mut ctx = TablePreDeleteContext::new(self, table, id);
93 for interceptor in interceptors {
94 interceptor.intercept(&mut ctx).await?;
95 }
96 Ok(())
97 }
98
99 async fn post_delete(
100 &mut self,
101 table: &TableDef,
102 id: RowNumber,
103 deleted_row: &EncodedValues,
104 ) -> crate::Result<()> {
105 if self.table_post_delete_interceptors().is_empty() {
106 return Ok(());
107 }
108 let interceptors = self.table_post_delete_interceptors().interceptors.clone();
109 let mut ctx = TablePostDeleteContext::new(self, table, id, deleted_row);
110 for interceptor in interceptors {
111 interceptor.intercept(&mut ctx).await?;
112 }
113 Ok(())
114 }
115}
116
117#[async_trait]
118impl<CT: CommandTransaction + WithInterceptors<CT> + Send> RingBufferInterceptor<CT> for CT {
119 async fn pre_insert(&mut self, ringbuffer: &RingBufferDef, row: &EncodedValues) -> crate::Result<()> {
120 if self.ringbuffer_pre_insert_interceptors().is_empty() {
121 return Ok(());
122 }
123 let interceptors = self.ringbuffer_pre_insert_interceptors().interceptors.clone();
124 let mut ctx = RingBufferPreInsertContext::new(self, ringbuffer, row);
125 for interceptor in interceptors {
126 interceptor.intercept(&mut ctx).await?;
127 }
128 Ok(())
129 }
130
131 async fn post_insert(
132 &mut self,
133 ringbuffer: &RingBufferDef,
134 id: RowNumber,
135 row: &EncodedValues,
136 ) -> crate::Result<()> {
137 if self.ringbuffer_post_insert_interceptors().is_empty() {
138 return Ok(());
139 }
140 let interceptors = self.ringbuffer_post_insert_interceptors().interceptors.clone();
141 let mut ctx = RingBufferPostInsertContext::new(self, ringbuffer, id, row);
142 for interceptor in interceptors {
143 interceptor.intercept(&mut ctx).await?;
144 }
145 Ok(())
146 }
147
148 async fn pre_update(
149 &mut self,
150 ringbuffer: &RingBufferDef,
151 id: RowNumber,
152 row: &EncodedValues,
153 ) -> crate::Result<()> {
154 if self.ringbuffer_pre_update_interceptors().is_empty() {
155 return Ok(());
156 }
157 let interceptors = self.ringbuffer_pre_update_interceptors().interceptors.clone();
158 let mut ctx = RingBufferPreUpdateContext::new(self, ringbuffer, id, row);
159 for interceptor in interceptors {
160 interceptor.intercept(&mut ctx).await?;
161 }
162 Ok(())
163 }
164
165 async fn post_update(
166 &mut self,
167 ringbuffer: &RingBufferDef,
168 id: RowNumber,
169 row: &EncodedValues,
170 old_row: &EncodedValues,
171 ) -> crate::Result<()> {
172 if self.ringbuffer_post_update_interceptors().is_empty() {
173 return Ok(());
174 }
175 let interceptors = self.ringbuffer_post_update_interceptors().interceptors.clone();
176 let mut ctx = RingBufferPostUpdateContext::new(self, ringbuffer, id, row, old_row);
177 for interceptor in interceptors {
178 interceptor.intercept(&mut ctx).await?;
179 }
180 Ok(())
181 }
182
183 async fn pre_delete(&mut self, ringbuffer: &RingBufferDef, id: RowNumber) -> crate::Result<()> {
184 if self.ringbuffer_pre_delete_interceptors().is_empty() {
185 return Ok(());
186 }
187 let interceptors = self.ringbuffer_pre_delete_interceptors().interceptors.clone();
188 let mut ctx = RingBufferPreDeleteContext::new(self, ringbuffer, id);
189 for interceptor in interceptors {
190 interceptor.intercept(&mut ctx).await?;
191 }
192 Ok(())
193 }
194
195 async fn post_delete(
196 &mut self,
197 ringbuffer: &RingBufferDef,
198 id: RowNumber,
199 deleted_row: &EncodedValues,
200 ) -> crate::Result<()> {
201 if self.ringbuffer_post_delete_interceptors().is_empty() {
202 return Ok(());
203 }
204 let interceptors = self.ringbuffer_post_delete_interceptors().interceptors.clone();
205 let mut ctx = RingBufferPostDeleteContext::new(self, ringbuffer, id, deleted_row);
206 for interceptor in interceptors {
207 interceptor.intercept(&mut ctx).await?;
208 }
209 Ok(())
210 }
211}
212
213#[async_trait]
214impl<CT: CommandTransaction + WithInterceptors<CT> + Send> NamespaceDefInterceptor<CT> for CT {
215 async fn post_create(&mut self, post: &NamespaceDef) -> crate::Result<()> {
216 if self.namespace_def_post_create_interceptors().is_empty() {
217 return Ok(());
218 }
219 let interceptors = self.namespace_def_post_create_interceptors().interceptors.clone();
220 let mut ctx = NamespaceDefPostCreateContext::new(self, post);
221 for interceptor in interceptors {
222 interceptor.intercept(&mut ctx).await?;
223 }
224 Ok(())
225 }
226
227 async fn pre_update(&mut self, pre: &NamespaceDef) -> crate::Result<()> {
228 if self.namespace_def_pre_update_interceptors().is_empty() {
229 return Ok(());
230 }
231 let interceptors = self.namespace_def_pre_update_interceptors().interceptors.clone();
232 let mut ctx = NamespaceDefPreUpdateContext::new(self, pre);
233 for interceptor in interceptors {
234 interceptor.intercept(&mut ctx).await?;
235 }
236 Ok(())
237 }
238
239 async fn post_update(&mut self, pre: &NamespaceDef, post: &NamespaceDef) -> crate::Result<()> {
240 if self.namespace_def_post_update_interceptors().is_empty() {
241 return Ok(());
242 }
243 let interceptors = self.namespace_def_post_update_interceptors().interceptors.clone();
244 let mut ctx = NamespaceDefPostUpdateContext::new(self, pre, post);
245 for interceptor in interceptors {
246 interceptor.intercept(&mut ctx).await?;
247 }
248 Ok(())
249 }
250
251 async fn pre_delete(&mut self, pre: &NamespaceDef) -> crate::Result<()> {
252 if self.namespace_def_pre_delete_interceptors().is_empty() {
253 return Ok(());
254 }
255 let interceptors = self.namespace_def_pre_delete_interceptors().interceptors.clone();
256 let mut ctx = NamespaceDefPreDeleteContext::new(self, pre);
257 for interceptor in interceptors {
258 interceptor.intercept(&mut ctx).await?;
259 }
260 Ok(())
261 }
262}
263
264#[async_trait]
265impl<CT: CommandTransaction + WithInterceptors<CT> + Send> TableDefInterceptor<CT> for CT {
266 async fn post_create(&mut self, post: &TableDef) -> crate::Result<()> {
267 if self.table_def_post_create_interceptors().is_empty() {
268 return Ok(());
269 }
270 let interceptors = self.table_def_post_create_interceptors().interceptors.clone();
271 let mut ctx = TableDefPostCreateContext::new(self, post);
272 for interceptor in interceptors {
273 interceptor.intercept(&mut ctx).await?;
274 }
275 Ok(())
276 }
277
278 async fn pre_update(&mut self, pre: &TableDef) -> crate::Result<()> {
279 if self.table_def_pre_update_interceptors().is_empty() {
280 return Ok(());
281 }
282 let interceptors = self.table_def_pre_update_interceptors().interceptors.clone();
283 let mut ctx = TableDefPreUpdateContext::new(self, pre);
284 for interceptor in interceptors {
285 interceptor.intercept(&mut ctx).await?;
286 }
287 Ok(())
288 }
289
290 async fn post_update(&mut self, pre: &TableDef, post: &TableDef) -> crate::Result<()> {
291 if self.table_def_post_update_interceptors().is_empty() {
292 return Ok(());
293 }
294 let interceptors = self.table_def_post_update_interceptors().interceptors.clone();
295 let mut ctx = TableDefPostUpdateContext::new(self, pre, post);
296 for interceptor in interceptors {
297 interceptor.intercept(&mut ctx).await?;
298 }
299 Ok(())
300 }
301
302 async fn pre_delete(&mut self, pre: &TableDef) -> crate::Result<()> {
303 if self.table_def_pre_delete_interceptors().is_empty() {
304 return Ok(());
305 }
306 let interceptors = self.table_def_pre_delete_interceptors().interceptors.clone();
307 let mut ctx = TableDefPreDeleteContext::new(self, pre);
308 for interceptor in interceptors {
309 interceptor.intercept(&mut ctx).await?;
310 }
311 Ok(())
312 }
313}
314
315#[async_trait]
316impl<CT: CommandTransaction + WithInterceptors<CT> + Send> ViewDefInterceptor<CT> for CT {
317 async fn post_create(&mut self, post: &ViewDef) -> crate::Result<()> {
318 if self.view_def_post_create_interceptors().is_empty() {
319 return Ok(());
320 }
321 let interceptors = self.view_def_post_create_interceptors().interceptors.clone();
322 let mut ctx = ViewDefPostCreateContext::new(self, post);
323 for interceptor in interceptors {
324 interceptor.intercept(&mut ctx).await?;
325 }
326 Ok(())
327 }
328
329 async fn pre_update(&mut self, pre: &ViewDef) -> crate::Result<()> {
330 if self.view_def_pre_update_interceptors().is_empty() {
331 return Ok(());
332 }
333 let interceptors = self.view_def_pre_update_interceptors().interceptors.clone();
334 let mut ctx = ViewDefPreUpdateContext::new(self, pre);
335 for interceptor in interceptors {
336 interceptor.intercept(&mut ctx).await?;
337 }
338 Ok(())
339 }
340
341 async fn post_update(&mut self, pre: &ViewDef, post: &ViewDef) -> crate::Result<()> {
342 if self.view_def_post_update_interceptors().is_empty() {
343 return Ok(());
344 }
345 let interceptors = self.view_def_post_update_interceptors().interceptors.clone();
346 let mut ctx = ViewDefPostUpdateContext::new(self, pre, post);
347 for interceptor in interceptors {
348 interceptor.intercept(&mut ctx).await?;
349 }
350 Ok(())
351 }
352
353 async fn pre_delete(&mut self, pre: &ViewDef) -> crate::Result<()> {
354 if self.view_def_pre_delete_interceptors().is_empty() {
355 return Ok(());
356 }
357 let interceptors = self.view_def_pre_delete_interceptors().interceptors.clone();
358 let mut ctx = ViewDefPreDeleteContext::new(self, pre);
359 for interceptor in interceptors {
360 interceptor.intercept(&mut ctx).await?;
361 }
362 Ok(())
363 }
364}
365
366#[async_trait]
367impl<CT: CommandTransaction + WithInterceptors<CT> + Send> RingBufferDefInterceptor<CT> for CT {
368 async fn post_create(&mut self, post: &RingBufferDef) -> crate::Result<()> {
369 if self.ringbuffer_def_post_create_interceptors().is_empty() {
370 return Ok(());
371 }
372 let interceptors = self.ringbuffer_def_post_create_interceptors().interceptors.clone();
373 let mut ctx = RingBufferDefPostCreateContext::new(self, post);
374 for interceptor in interceptors {
375 interceptor.intercept(&mut ctx).await?;
376 }
377 Ok(())
378 }
379
380 async fn pre_update(&mut self, pre: &RingBufferDef) -> crate::Result<()> {
381 if self.ringbuffer_def_pre_update_interceptors().is_empty() {
382 return Ok(());
383 }
384 let interceptors = self.ringbuffer_def_pre_update_interceptors().interceptors.clone();
385 let mut ctx = RingBufferDefPreUpdateContext::new(self, pre);
386 for interceptor in interceptors {
387 interceptor.intercept(&mut ctx).await?;
388 }
389 Ok(())
390 }
391
392 async fn post_update(&mut self, pre: &RingBufferDef, post: &RingBufferDef) -> crate::Result<()> {
393 if self.ringbuffer_def_post_update_interceptors().is_empty() {
394 return Ok(());
395 }
396 let interceptors = self.ringbuffer_def_post_update_interceptors().interceptors.clone();
397 let mut ctx = RingBufferDefPostUpdateContext::new(self, pre, post);
398 for interceptor in interceptors {
399 interceptor.intercept(&mut ctx).await?;
400 }
401 Ok(())
402 }
403
404 async fn pre_delete(&mut self, pre: &RingBufferDef) -> crate::Result<()> {
405 if self.ringbuffer_def_pre_delete_interceptors().is_empty() {
406 return Ok(());
407 }
408 let interceptors = self.ringbuffer_def_pre_delete_interceptors().interceptors.clone();
409 let mut ctx = RingBufferDefPreDeleteContext::new(self, pre);
410 for interceptor in interceptors {
411 interceptor.intercept(&mut ctx).await?;
412 }
413 Ok(())
414 }
415}
416
417#[async_trait]
418impl<CT: CommandTransaction + WithInterceptors<CT> + Send> TransactionInterceptor<CT> for CT {
419 async fn pre_commit(&mut self) -> crate::Result<()> {
420 if self.pre_commit_interceptors().is_empty() {
421 return Ok(());
422 }
423 let interceptors = self.pre_commit_interceptors().interceptors.clone();
424 let mut ctx = PreCommitContext::new(self);
425 for interceptor in interceptors {
426 interceptor.intercept(&mut ctx).await?;
427 }
428 Ok(())
429 }
430
431 async fn post_commit(
432 &mut self,
433 id: TransactionId,
434 version: CommitVersion,
435 changes: TransactionalDefChanges,
436 row_changes: Vec<RowChange>,
437 ) -> crate::Result<()> {
438 if self.post_commit_interceptors().is_empty() {
439 return Ok(());
440 }
441 let interceptors = self.post_commit_interceptors().interceptors.clone();
442 let mut ctx = PostCommitContext::new(id, version, changes, row_changes);
443 for interceptor in interceptors {
444 interceptor.intercept(&mut ctx).await?;
445 }
446 Ok(())
447 }
448}