reifydb_transaction/interceptor/
ringbuffer_row.rs1use reifydb_core::{encoded::row::EncodedRow, interface::catalog::ringbuffer::RingBuffer};
5use reifydb_type::{Result, value::row_number::RowNumber};
6
7use super::WithInterceptors;
8use crate::interceptor::chain::InterceptorChain;
9
10pub struct RingBufferRowPreInsertContext<'a> {
12 pub ringbuffer: &'a RingBuffer,
13 pub row: EncodedRow,
14}
15
16impl<'a> RingBufferRowPreInsertContext<'a> {
17 pub fn new(ringbuffer: &'a RingBuffer, row: EncodedRow) -> Self {
18 Self {
19 ringbuffer,
20 row,
21 }
22 }
23}
24
25pub trait RingBufferRowPreInsertInterceptor: Send + Sync {
26 fn intercept<'a>(&self, ctx: &mut RingBufferRowPreInsertContext<'a>) -> Result<()>;
27}
28
29impl InterceptorChain<dyn RingBufferRowPreInsertInterceptor + Send + Sync> {
30 pub fn execute(&self, mut ctx: RingBufferRowPreInsertContext) -> Result<EncodedRow> {
31 for interceptor in &self.interceptors {
32 interceptor.intercept(&mut ctx)?;
33 }
34 Ok(ctx.row)
35 }
36}
37
38pub struct ClosureRingBufferRowPreInsertInterceptor<F>
39where
40 F: for<'a> Fn(&mut RingBufferRowPreInsertContext<'a>) -> Result<()> + Send + Sync,
41{
42 closure: F,
43}
44
45impl<F> ClosureRingBufferRowPreInsertInterceptor<F>
46where
47 F: for<'a> Fn(&mut RingBufferRowPreInsertContext<'a>) -> Result<()> + Send + Sync,
48{
49 pub fn new(closure: F) -> Self {
50 Self {
51 closure,
52 }
53 }
54}
55
56impl<F> Clone for ClosureRingBufferRowPreInsertInterceptor<F>
57where
58 F: for<'a> Fn(&mut RingBufferRowPreInsertContext<'a>) -> Result<()> + Send + Sync + Clone,
59{
60 fn clone(&self) -> Self {
61 Self {
62 closure: self.closure.clone(),
63 }
64 }
65}
66
67impl<F> RingBufferRowPreInsertInterceptor for ClosureRingBufferRowPreInsertInterceptor<F>
68where
69 F: for<'a> Fn(&mut RingBufferRowPreInsertContext<'a>) -> Result<()> + Send + Sync,
70{
71 fn intercept<'a>(&self, ctx: &mut RingBufferRowPreInsertContext<'a>) -> Result<()> {
72 (self.closure)(ctx)
73 }
74}
75
76pub fn ringbuffer_row_pre_insert<F>(f: F) -> ClosureRingBufferRowPreInsertInterceptor<F>
77where
78 F: for<'a> Fn(&mut RingBufferRowPreInsertContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
79{
80 ClosureRingBufferRowPreInsertInterceptor::new(f)
81}
82
83pub struct RingBufferRowPostInsertContext<'a> {
85 pub ringbuffer: &'a RingBuffer,
86 pub id: RowNumber,
87 pub row: &'a EncodedRow,
88}
89
90impl<'a> RingBufferRowPostInsertContext<'a> {
91 pub fn new(ringbuffer: &'a RingBuffer, id: RowNumber, row: &'a EncodedRow) -> Self {
92 Self {
93 ringbuffer,
94 id,
95 row,
96 }
97 }
98}
99
100pub trait RingBufferRowPostInsertInterceptor: Send + Sync {
101 fn intercept<'a>(&self, ctx: &mut RingBufferRowPostInsertContext<'a>) -> Result<()>;
102}
103
104impl InterceptorChain<dyn RingBufferRowPostInsertInterceptor + Send + Sync> {
105 pub fn execute<'a>(&self, mut ctx: RingBufferRowPostInsertContext<'a>) -> Result<()> {
106 for interceptor in &self.interceptors {
107 interceptor.intercept(&mut ctx)?;
108 }
109 Ok(())
110 }
111}
112
113pub struct ClosureRingBufferRowPostInsertInterceptor<F>
114where
115 F: for<'a> Fn(&mut RingBufferRowPostInsertContext<'a>) -> Result<()> + Send + Sync,
116{
117 closure: F,
118}
119
120impl<F> ClosureRingBufferRowPostInsertInterceptor<F>
121where
122 F: for<'a> Fn(&mut RingBufferRowPostInsertContext<'a>) -> Result<()> + Send + Sync,
123{
124 pub fn new(closure: F) -> Self {
125 Self {
126 closure,
127 }
128 }
129}
130
131impl<F> Clone for ClosureRingBufferRowPostInsertInterceptor<F>
132where
133 F: for<'a> Fn(&mut RingBufferRowPostInsertContext<'a>) -> Result<()> + Send + Sync + Clone,
134{
135 fn clone(&self) -> Self {
136 Self {
137 closure: self.closure.clone(),
138 }
139 }
140}
141
142impl<F> RingBufferRowPostInsertInterceptor for ClosureRingBufferRowPostInsertInterceptor<F>
143where
144 F: for<'a> Fn(&mut RingBufferRowPostInsertContext<'a>) -> Result<()> + Send + Sync,
145{
146 fn intercept<'a>(&self, ctx: &mut RingBufferRowPostInsertContext<'a>) -> Result<()> {
147 (self.closure)(ctx)
148 }
149}
150
151pub fn ringbuffer_row_post_insert<F>(f: F) -> ClosureRingBufferRowPostInsertInterceptor<F>
152where
153 F: for<'a> Fn(&mut RingBufferRowPostInsertContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
154{
155 ClosureRingBufferRowPostInsertInterceptor::new(f)
156}
157
158pub struct RingBufferRowPreUpdateContext<'a> {
160 pub ringbuffer: &'a RingBuffer,
161 pub id: RowNumber,
162 pub row: EncodedRow,
163}
164
165impl<'a> RingBufferRowPreUpdateContext<'a> {
166 pub fn new(ringbuffer: &'a RingBuffer, id: RowNumber, row: EncodedRow) -> Self {
167 Self {
168 ringbuffer,
169 id,
170 row,
171 }
172 }
173}
174
175pub trait RingBufferRowPreUpdateInterceptor: Send + Sync {
176 fn intercept<'a>(&self, ctx: &mut RingBufferRowPreUpdateContext<'a>) -> Result<()>;
177}
178
179impl InterceptorChain<dyn RingBufferRowPreUpdateInterceptor + Send + Sync> {
180 pub fn execute(&self, mut ctx: RingBufferRowPreUpdateContext) -> Result<EncodedRow> {
181 for interceptor in &self.interceptors {
182 interceptor.intercept(&mut ctx)?;
183 }
184 Ok(ctx.row)
185 }
186}
187
188pub struct ClosureRingBufferRowPreUpdateInterceptor<F>
189where
190 F: for<'a> Fn(&mut RingBufferRowPreUpdateContext<'a>) -> Result<()> + Send + Sync,
191{
192 closure: F,
193}
194
195impl<F> ClosureRingBufferRowPreUpdateInterceptor<F>
196where
197 F: for<'a> Fn(&mut RingBufferRowPreUpdateContext<'a>) -> Result<()> + Send + Sync,
198{
199 pub fn new(closure: F) -> Self {
200 Self {
201 closure,
202 }
203 }
204}
205
206impl<F> Clone for ClosureRingBufferRowPreUpdateInterceptor<F>
207where
208 F: for<'a> Fn(&mut RingBufferRowPreUpdateContext<'a>) -> Result<()> + Send + Sync + Clone,
209{
210 fn clone(&self) -> Self {
211 Self {
212 closure: self.closure.clone(),
213 }
214 }
215}
216
217impl<F> RingBufferRowPreUpdateInterceptor for ClosureRingBufferRowPreUpdateInterceptor<F>
218where
219 F: for<'a> Fn(&mut RingBufferRowPreUpdateContext<'a>) -> Result<()> + Send + Sync,
220{
221 fn intercept<'a>(&self, ctx: &mut RingBufferRowPreUpdateContext<'a>) -> Result<()> {
222 (self.closure)(ctx)
223 }
224}
225
226pub fn ringbuffer_row_pre_update<F>(f: F) -> ClosureRingBufferRowPreUpdateInterceptor<F>
227where
228 F: for<'a> Fn(&mut RingBufferRowPreUpdateContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
229{
230 ClosureRingBufferRowPreUpdateInterceptor::new(f)
231}
232
233pub struct RingBufferRowPostUpdateContext<'a> {
235 pub ringbuffer: &'a RingBuffer,
236 pub id: RowNumber,
237 pub post: &'a EncodedRow,
238 pub pre: &'a EncodedRow,
239}
240
241impl<'a> RingBufferRowPostUpdateContext<'a> {
242 pub fn new(ringbuffer: &'a RingBuffer, id: RowNumber, post: &'a EncodedRow, pre: &'a EncodedRow) -> Self {
243 Self {
244 ringbuffer,
245 id,
246 post,
247 pre,
248 }
249 }
250}
251
252pub trait RingBufferRowPostUpdateInterceptor: Send + Sync {
253 fn intercept<'a>(&self, ctx: &mut RingBufferRowPostUpdateContext<'a>) -> Result<()>;
254}
255
256impl InterceptorChain<dyn RingBufferRowPostUpdateInterceptor + Send + Sync> {
257 pub fn execute(&self, mut ctx: RingBufferRowPostUpdateContext) -> Result<()> {
258 for interceptor in &self.interceptors {
259 interceptor.intercept(&mut ctx)?;
260 }
261 Ok(())
262 }
263}
264
265pub struct ClosureRingBufferRowPostUpdateInterceptor<F>
266where
267 F: for<'a> Fn(&mut RingBufferRowPostUpdateContext<'a>) -> Result<()> + Send + Sync,
268{
269 closure: F,
270}
271
272impl<F> ClosureRingBufferRowPostUpdateInterceptor<F>
273where
274 F: for<'a> Fn(&mut RingBufferRowPostUpdateContext<'a>) -> Result<()> + Send + Sync,
275{
276 pub fn new(closure: F) -> Self {
277 Self {
278 closure,
279 }
280 }
281}
282
283impl<F> Clone for ClosureRingBufferRowPostUpdateInterceptor<F>
284where
285 F: for<'a> Fn(&mut RingBufferRowPostUpdateContext<'a>) -> Result<()> + Send + Sync + Clone,
286{
287 fn clone(&self) -> Self {
288 Self {
289 closure: self.closure.clone(),
290 }
291 }
292}
293
294impl<F> RingBufferRowPostUpdateInterceptor for ClosureRingBufferRowPostUpdateInterceptor<F>
295where
296 F: for<'a> Fn(&mut RingBufferRowPostUpdateContext<'a>) -> Result<()> + Send + Sync,
297{
298 fn intercept<'a>(&self, ctx: &mut RingBufferRowPostUpdateContext<'a>) -> Result<()> {
299 (self.closure)(ctx)
300 }
301}
302
303pub fn ringbuffer_row_post_update<F>(f: F) -> ClosureRingBufferRowPostUpdateInterceptor<F>
304where
305 F: for<'a> Fn(&mut RingBufferRowPostUpdateContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
306{
307 ClosureRingBufferRowPostUpdateInterceptor::new(f)
308}
309
310pub struct RingBufferRowPreDeleteContext<'a> {
312 pub ringbuffer: &'a RingBuffer,
313 pub id: RowNumber,
314}
315
316impl<'a> RingBufferRowPreDeleteContext<'a> {
317 pub fn new(ringbuffer: &'a RingBuffer, id: RowNumber) -> Self {
318 Self {
319 ringbuffer,
320 id,
321 }
322 }
323}
324
325pub trait RingBufferRowPreDeleteInterceptor: Send + Sync {
326 fn intercept<'a>(&self, ctx: &mut RingBufferRowPreDeleteContext<'a>) -> Result<()>;
327}
328
329impl InterceptorChain<dyn RingBufferRowPreDeleteInterceptor + Send + Sync> {
330 pub fn execute(&self, mut ctx: RingBufferRowPreDeleteContext) -> Result<()> {
331 for interceptor in &self.interceptors {
332 interceptor.intercept(&mut ctx)?;
333 }
334 Ok(())
335 }
336}
337
338pub struct ClosureRingBufferRowPreDeleteInterceptor<F>
339where
340 F: for<'a> Fn(&mut RingBufferRowPreDeleteContext<'a>) -> Result<()> + Send + Sync,
341{
342 closure: F,
343}
344
345impl<F> ClosureRingBufferRowPreDeleteInterceptor<F>
346where
347 F: for<'a> Fn(&mut RingBufferRowPreDeleteContext<'a>) -> Result<()> + Send + Sync,
348{
349 pub fn new(closure: F) -> Self {
350 Self {
351 closure,
352 }
353 }
354}
355
356impl<F> Clone for ClosureRingBufferRowPreDeleteInterceptor<F>
357where
358 F: for<'a> Fn(&mut RingBufferRowPreDeleteContext<'a>) -> Result<()> + Send + Sync + Clone,
359{
360 fn clone(&self) -> Self {
361 Self {
362 closure: self.closure.clone(),
363 }
364 }
365}
366
367impl<F> RingBufferRowPreDeleteInterceptor for ClosureRingBufferRowPreDeleteInterceptor<F>
368where
369 F: for<'a> Fn(&mut RingBufferRowPreDeleteContext<'a>) -> Result<()> + Send + Sync,
370{
371 fn intercept<'a>(&self, ctx: &mut RingBufferRowPreDeleteContext<'a>) -> Result<()> {
372 (self.closure)(ctx)
373 }
374}
375
376pub fn ringbuffer_row_pre_delete<F>(f: F) -> ClosureRingBufferRowPreDeleteInterceptor<F>
377where
378 F: for<'a> Fn(&mut RingBufferRowPreDeleteContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
379{
380 ClosureRingBufferRowPreDeleteInterceptor::new(f)
381}
382
383pub struct RingBufferRowPostDeleteContext<'a> {
385 pub ringbuffer: &'a RingBuffer,
386 pub id: RowNumber,
387 pub deleted_row: &'a EncodedRow,
388}
389
390impl<'a> RingBufferRowPostDeleteContext<'a> {
391 pub fn new(ringbuffer: &'a RingBuffer, id: RowNumber, deleted_row: &'a EncodedRow) -> Self {
392 Self {
393 ringbuffer,
394 id,
395 deleted_row,
396 }
397 }
398}
399
400pub trait RingBufferRowPostDeleteInterceptor: Send + Sync {
401 fn intercept<'a>(&self, ctx: &mut RingBufferRowPostDeleteContext<'a>) -> Result<()>;
402}
403
404impl InterceptorChain<dyn RingBufferRowPostDeleteInterceptor + Send + Sync> {
405 pub fn execute(&self, mut ctx: RingBufferRowPostDeleteContext) -> Result<()> {
406 for interceptor in &self.interceptors {
407 interceptor.intercept(&mut ctx)?;
408 }
409 Ok(())
410 }
411}
412
413pub struct ClosureRingBufferRowPostDeleteInterceptor<F>
414where
415 F: for<'a> Fn(&mut RingBufferRowPostDeleteContext<'a>) -> Result<()> + Send + Sync,
416{
417 closure: F,
418}
419
420impl<F> ClosureRingBufferRowPostDeleteInterceptor<F>
421where
422 F: for<'a> Fn(&mut RingBufferRowPostDeleteContext<'a>) -> Result<()> + Send + Sync,
423{
424 pub fn new(closure: F) -> Self {
425 Self {
426 closure,
427 }
428 }
429}
430
431impl<F> Clone for ClosureRingBufferRowPostDeleteInterceptor<F>
432where
433 F: for<'a> Fn(&mut RingBufferRowPostDeleteContext<'a>) -> Result<()> + Send + Sync + Clone,
434{
435 fn clone(&self) -> Self {
436 Self {
437 closure: self.closure.clone(),
438 }
439 }
440}
441
442impl<F> RingBufferRowPostDeleteInterceptor for ClosureRingBufferRowPostDeleteInterceptor<F>
443where
444 F: for<'a> Fn(&mut RingBufferRowPostDeleteContext<'a>) -> Result<()> + Send + Sync,
445{
446 fn intercept<'a>(&self, ctx: &mut RingBufferRowPostDeleteContext<'a>) -> Result<()> {
447 (self.closure)(ctx)
448 }
449}
450
451pub fn ringbuffer_row_post_delete<F>(f: F) -> ClosureRingBufferRowPostDeleteInterceptor<F>
452where
453 F: for<'a> Fn(&mut RingBufferRowPostDeleteContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
454{
455 ClosureRingBufferRowPostDeleteInterceptor::new(f)
456}
457
458pub struct RingBufferRowInterceptor;
460
461impl RingBufferRowInterceptor {
462 pub fn pre_insert(
463 txn: &mut impl WithInterceptors,
464 ringbuffer: &RingBuffer,
465 row: EncodedRow,
466 ) -> Result<EncodedRow> {
467 let ctx = RingBufferRowPreInsertContext::new(ringbuffer, row);
468 txn.ringbuffer_row_pre_insert_interceptors().execute(ctx)
469 }
470
471 pub fn post_insert(
472 txn: &mut impl WithInterceptors,
473 ringbuffer: &RingBuffer,
474 id: RowNumber,
475 row: &EncodedRow,
476 ) -> Result<()> {
477 let ctx = RingBufferRowPostInsertContext::new(ringbuffer, id, row);
478 txn.ringbuffer_row_post_insert_interceptors().execute(ctx)
479 }
480
481 pub fn pre_update(
482 txn: &mut impl WithInterceptors,
483 ringbuffer: &RingBuffer,
484 id: RowNumber,
485 row: EncodedRow,
486 ) -> Result<EncodedRow> {
487 let ctx = RingBufferRowPreUpdateContext::new(ringbuffer, id, row);
488 txn.ringbuffer_row_pre_update_interceptors().execute(ctx)
489 }
490
491 pub fn post_update(
492 txn: &mut impl WithInterceptors,
493 ringbuffer: &RingBuffer,
494 id: RowNumber,
495 post: &EncodedRow,
496 pre: &EncodedRow,
497 ) -> Result<()> {
498 let ctx = RingBufferRowPostUpdateContext::new(ringbuffer, id, post, pre);
499 txn.ringbuffer_row_post_update_interceptors().execute(ctx)
500 }
501
502 pub fn pre_delete(txn: &mut impl WithInterceptors, ringbuffer: &RingBuffer, id: RowNumber) -> Result<()> {
503 let ctx = RingBufferRowPreDeleteContext::new(ringbuffer, id);
504 txn.ringbuffer_row_pre_delete_interceptors().execute(ctx)
505 }
506
507 pub fn post_delete(
508 txn: &mut impl WithInterceptors,
509 ringbuffer: &RingBuffer,
510 id: RowNumber,
511 deleted_row: &EncodedRow,
512 ) -> Result<()> {
513 let ctx = RingBufferRowPostDeleteContext::new(ringbuffer, id, deleted_row);
514 txn.ringbuffer_row_post_delete_interceptors().execute(ctx)
515 }
516}