reifydb_transaction/interceptor/
ringbuffer.rs1use reifydb_core::{encoded::encoded::EncodedValues, interface::catalog::ringbuffer::RingBufferDef};
5use reifydb_type::{Result, value::row_number::RowNumber};
6
7use super::WithInterceptors;
8use crate::interceptor::chain::InterceptorChain;
9
10pub struct RingBufferPreInsertContext<'a> {
13 pub ringbuffer: &'a RingBufferDef,
14 pub row: &'a EncodedValues,
15}
16
17impl<'a> RingBufferPreInsertContext<'a> {
18 pub fn new(ringbuffer: &'a RingBufferDef, row: &'a EncodedValues) -> Self {
19 Self {
20 ringbuffer,
21 row,
22 }
23 }
24}
25
26pub trait RingBufferPreInsertInterceptor: Send + Sync {
27 fn intercept<'a>(&self, ctx: &mut RingBufferPreInsertContext<'a>) -> Result<()>;
28}
29
30impl InterceptorChain<dyn RingBufferPreInsertInterceptor + Send + Sync> {
31 pub fn execute(&self, mut ctx: RingBufferPreInsertContext) -> Result<()> {
32 for interceptor in &self.interceptors {
33 interceptor.intercept(&mut ctx)?;
34 }
35 Ok(())
36 }
37}
38
39pub struct ClosureRingBufferPreInsertInterceptor<F>
40where
41 F: for<'a> Fn(&mut RingBufferPreInsertContext<'a>) -> Result<()> + Send + Sync,
42{
43 closure: F,
44}
45
46impl<F> ClosureRingBufferPreInsertInterceptor<F>
47where
48 F: for<'a> Fn(&mut RingBufferPreInsertContext<'a>) -> Result<()> + Send + Sync,
49{
50 pub fn new(closure: F) -> Self {
51 Self {
52 closure,
53 }
54 }
55}
56
57impl<F> Clone for ClosureRingBufferPreInsertInterceptor<F>
58where
59 F: for<'a> Fn(&mut RingBufferPreInsertContext<'a>) -> Result<()> + Send + Sync + Clone,
60{
61 fn clone(&self) -> Self {
62 Self {
63 closure: self.closure.clone(),
64 }
65 }
66}
67
68impl<F> RingBufferPreInsertInterceptor for ClosureRingBufferPreInsertInterceptor<F>
69where
70 F: for<'a> Fn(&mut RingBufferPreInsertContext<'a>) -> Result<()> + Send + Sync,
71{
72 fn intercept<'a>(&self, ctx: &mut RingBufferPreInsertContext<'a>) -> Result<()> {
73 (self.closure)(ctx)
74 }
75}
76
77pub fn ringbuffer_pre_insert<F>(f: F) -> ClosureRingBufferPreInsertInterceptor<F>
78where
79 F: for<'a> Fn(&mut RingBufferPreInsertContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
80{
81 ClosureRingBufferPreInsertInterceptor::new(f)
82}
83
84pub struct RingBufferPostInsertContext<'a> {
87 pub ringbuffer: &'a RingBufferDef,
88 pub id: RowNumber,
89 pub row: &'a EncodedValues,
90}
91
92impl<'a> RingBufferPostInsertContext<'a> {
93 pub fn new(ringbuffer: &'a RingBufferDef, id: RowNumber, row: &'a EncodedValues) -> Self {
94 Self {
95 ringbuffer,
96 id,
97 row,
98 }
99 }
100}
101
102pub trait RingBufferPostInsertInterceptor: Send + Sync {
103 fn intercept<'a>(&self, ctx: &mut RingBufferPostInsertContext<'a>) -> Result<()>;
104}
105
106impl InterceptorChain<dyn RingBufferPostInsertInterceptor + Send + Sync> {
107 pub fn execute<'a>(&self, mut ctx: RingBufferPostInsertContext<'a>) -> Result<()> {
108 for interceptor in &self.interceptors {
109 interceptor.intercept(&mut ctx)?;
110 }
111 Ok(())
112 }
113}
114
115pub struct ClosureRingBufferPostInsertInterceptor<F>
116where
117 F: for<'a> Fn(&mut RingBufferPostInsertContext<'a>) -> Result<()> + Send + Sync,
118{
119 closure: F,
120}
121
122impl<F> ClosureRingBufferPostInsertInterceptor<F>
123where
124 F: for<'a> Fn(&mut RingBufferPostInsertContext<'a>) -> Result<()> + Send + Sync,
125{
126 pub fn new(closure: F) -> Self {
127 Self {
128 closure,
129 }
130 }
131}
132
133impl<F> Clone for ClosureRingBufferPostInsertInterceptor<F>
134where
135 F: for<'a> Fn(&mut RingBufferPostInsertContext<'a>) -> Result<()> + Send + Sync + Clone,
136{
137 fn clone(&self) -> Self {
138 Self {
139 closure: self.closure.clone(),
140 }
141 }
142}
143
144impl<F> RingBufferPostInsertInterceptor for ClosureRingBufferPostInsertInterceptor<F>
145where
146 F: for<'a> Fn(&mut RingBufferPostInsertContext<'a>) -> Result<()> + Send + Sync,
147{
148 fn intercept<'a>(&self, ctx: &mut RingBufferPostInsertContext<'a>) -> Result<()> {
149 (self.closure)(ctx)
150 }
151}
152
153pub fn ringbuffer_post_insert<F>(f: F) -> ClosureRingBufferPostInsertInterceptor<F>
154where
155 F: for<'a> Fn(&mut RingBufferPostInsertContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
156{
157 ClosureRingBufferPostInsertInterceptor::new(f)
158}
159
160pub struct RingBufferPreUpdateContext<'a> {
163 pub ringbuffer: &'a RingBufferDef,
164 pub id: RowNumber,
165 pub row: &'a EncodedValues,
166}
167
168impl<'a> RingBufferPreUpdateContext<'a> {
169 pub fn new(ringbuffer: &'a RingBufferDef, id: RowNumber, row: &'a EncodedValues) -> Self {
170 Self {
171 ringbuffer,
172 id,
173 row,
174 }
175 }
176}
177
178pub trait RingBufferPreUpdateInterceptor: Send + Sync {
179 fn intercept<'a>(&self, ctx: &mut RingBufferPreUpdateContext<'a>) -> Result<()>;
180}
181
182impl InterceptorChain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
183 pub fn execute(&self, mut ctx: RingBufferPreUpdateContext) -> Result<()> {
184 for interceptor in &self.interceptors {
185 interceptor.intercept(&mut ctx)?;
186 }
187 Ok(())
188 }
189}
190
191pub struct ClosureRingBufferPreUpdateInterceptor<F>
192where
193 F: for<'a> Fn(&mut RingBufferPreUpdateContext<'a>) -> Result<()> + Send + Sync,
194{
195 closure: F,
196}
197
198impl<F> ClosureRingBufferPreUpdateInterceptor<F>
199where
200 F: for<'a> Fn(&mut RingBufferPreUpdateContext<'a>) -> Result<()> + Send + Sync,
201{
202 pub fn new(closure: F) -> Self {
203 Self {
204 closure,
205 }
206 }
207}
208
209impl<F> Clone for ClosureRingBufferPreUpdateInterceptor<F>
210where
211 F: for<'a> Fn(&mut RingBufferPreUpdateContext<'a>) -> Result<()> + Send + Sync + Clone,
212{
213 fn clone(&self) -> Self {
214 Self {
215 closure: self.closure.clone(),
216 }
217 }
218}
219
220impl<F> RingBufferPreUpdateInterceptor for ClosureRingBufferPreUpdateInterceptor<F>
221where
222 F: for<'a> Fn(&mut RingBufferPreUpdateContext<'a>) -> Result<()> + Send + Sync,
223{
224 fn intercept<'a>(&self, ctx: &mut RingBufferPreUpdateContext<'a>) -> Result<()> {
225 (self.closure)(ctx)
226 }
227}
228
229pub fn ringbuffer_pre_update<F>(f: F) -> ClosureRingBufferPreUpdateInterceptor<F>
230where
231 F: for<'a> Fn(&mut RingBufferPreUpdateContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
232{
233 ClosureRingBufferPreUpdateInterceptor::new(f)
234}
235
236pub struct RingBufferPostUpdateContext<'a> {
239 pub ringbuffer: &'a RingBufferDef,
240 pub id: RowNumber,
241 pub row: &'a EncodedValues,
242 pub old_row: &'a EncodedValues,
243}
244
245impl<'a> RingBufferPostUpdateContext<'a> {
246 pub fn new(
247 ringbuffer: &'a RingBufferDef,
248 id: RowNumber,
249 row: &'a EncodedValues,
250 old_row: &'a EncodedValues,
251 ) -> Self {
252 Self {
253 ringbuffer,
254 id,
255 row,
256 old_row,
257 }
258 }
259}
260
261pub trait RingBufferPostUpdateInterceptor: Send + Sync {
262 fn intercept<'a>(&self, ctx: &mut RingBufferPostUpdateContext<'a>) -> Result<()>;
263}
264
265impl InterceptorChain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
266 pub fn execute(&self, mut ctx: RingBufferPostUpdateContext) -> Result<()> {
267 for interceptor in &self.interceptors {
268 interceptor.intercept(&mut ctx)?;
269 }
270 Ok(())
271 }
272}
273
274pub struct ClosureRingBufferPostUpdateInterceptor<F>
275where
276 F: for<'a> Fn(&mut RingBufferPostUpdateContext<'a>) -> Result<()> + Send + Sync,
277{
278 closure: F,
279}
280
281impl<F> ClosureRingBufferPostUpdateInterceptor<F>
282where
283 F: for<'a> Fn(&mut RingBufferPostUpdateContext<'a>) -> Result<()> + Send + Sync,
284{
285 pub fn new(closure: F) -> Self {
286 Self {
287 closure,
288 }
289 }
290}
291
292impl<F> Clone for ClosureRingBufferPostUpdateInterceptor<F>
293where
294 F: for<'a> Fn(&mut RingBufferPostUpdateContext<'a>) -> Result<()> + Send + Sync + Clone,
295{
296 fn clone(&self) -> Self {
297 Self {
298 closure: self.closure.clone(),
299 }
300 }
301}
302
303impl<F> RingBufferPostUpdateInterceptor for ClosureRingBufferPostUpdateInterceptor<F>
304where
305 F: for<'a> Fn(&mut RingBufferPostUpdateContext<'a>) -> Result<()> + Send + Sync,
306{
307 fn intercept<'a>(&self, ctx: &mut RingBufferPostUpdateContext<'a>) -> Result<()> {
308 (self.closure)(ctx)
309 }
310}
311
312pub fn ringbuffer_post_update<F>(f: F) -> ClosureRingBufferPostUpdateInterceptor<F>
313where
314 F: for<'a> Fn(&mut RingBufferPostUpdateContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
315{
316 ClosureRingBufferPostUpdateInterceptor::new(f)
317}
318
319pub struct RingBufferPreDeleteContext<'a> {
322 pub ringbuffer: &'a RingBufferDef,
323 pub id: RowNumber,
324}
325
326impl<'a> RingBufferPreDeleteContext<'a> {
327 pub fn new(ringbuffer: &'a RingBufferDef, id: RowNumber) -> Self {
328 Self {
329 ringbuffer,
330 id,
331 }
332 }
333}
334
335pub trait RingBufferPreDeleteInterceptor: Send + Sync {
336 fn intercept<'a>(&self, ctx: &mut RingBufferPreDeleteContext<'a>) -> Result<()>;
337}
338
339impl InterceptorChain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
340 pub fn execute(&self, mut ctx: RingBufferPreDeleteContext) -> Result<()> {
341 for interceptor in &self.interceptors {
342 interceptor.intercept(&mut ctx)?;
343 }
344 Ok(())
345 }
346}
347
348pub struct ClosureRingBufferPreDeleteInterceptor<F>
349where
350 F: for<'a> Fn(&mut RingBufferPreDeleteContext<'a>) -> Result<()> + Send + Sync,
351{
352 closure: F,
353}
354
355impl<F> ClosureRingBufferPreDeleteInterceptor<F>
356where
357 F: for<'a> Fn(&mut RingBufferPreDeleteContext<'a>) -> Result<()> + Send + Sync,
358{
359 pub fn new(closure: F) -> Self {
360 Self {
361 closure,
362 }
363 }
364}
365
366impl<F> Clone for ClosureRingBufferPreDeleteInterceptor<F>
367where
368 F: for<'a> Fn(&mut RingBufferPreDeleteContext<'a>) -> Result<()> + Send + Sync + Clone,
369{
370 fn clone(&self) -> Self {
371 Self {
372 closure: self.closure.clone(),
373 }
374 }
375}
376
377impl<F> RingBufferPreDeleteInterceptor for ClosureRingBufferPreDeleteInterceptor<F>
378where
379 F: for<'a> Fn(&mut RingBufferPreDeleteContext<'a>) -> Result<()> + Send + Sync,
380{
381 fn intercept<'a>(&self, ctx: &mut RingBufferPreDeleteContext<'a>) -> Result<()> {
382 (self.closure)(ctx)
383 }
384}
385
386pub fn ringbuffer_pre_delete<F>(f: F) -> ClosureRingBufferPreDeleteInterceptor<F>
387where
388 F: for<'a> Fn(&mut RingBufferPreDeleteContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
389{
390 ClosureRingBufferPreDeleteInterceptor::new(f)
391}
392
393pub struct RingBufferPostDeleteContext<'a> {
396 pub ringbuffer: &'a RingBufferDef,
397 pub id: RowNumber,
398 pub deleted_row: &'a EncodedValues,
399}
400
401impl<'a> RingBufferPostDeleteContext<'a> {
402 pub fn new(ringbuffer: &'a RingBufferDef, id: RowNumber, deleted_row: &'a EncodedValues) -> Self {
403 Self {
404 ringbuffer,
405 id,
406 deleted_row,
407 }
408 }
409}
410
411pub trait RingBufferPostDeleteInterceptor: Send + Sync {
412 fn intercept<'a>(&self, ctx: &mut RingBufferPostDeleteContext<'a>) -> Result<()>;
413}
414
415impl InterceptorChain<dyn RingBufferPostDeleteInterceptor + Send + Sync> {
416 pub fn execute(&self, mut ctx: RingBufferPostDeleteContext) -> Result<()> {
417 for interceptor in &self.interceptors {
418 interceptor.intercept(&mut ctx)?;
419 }
420 Ok(())
421 }
422}
423
424pub struct ClosureRingBufferPostDeleteInterceptor<F>
425where
426 F: for<'a> Fn(&mut RingBufferPostDeleteContext<'a>) -> Result<()> + Send + Sync,
427{
428 closure: F,
429}
430
431impl<F> ClosureRingBufferPostDeleteInterceptor<F>
432where
433 F: for<'a> Fn(&mut RingBufferPostDeleteContext<'a>) -> Result<()> + Send + Sync,
434{
435 pub fn new(closure: F) -> Self {
436 Self {
437 closure,
438 }
439 }
440}
441
442impl<F> Clone for ClosureRingBufferPostDeleteInterceptor<F>
443where
444 F: for<'a> Fn(&mut RingBufferPostDeleteContext<'a>) -> Result<()> + Send + Sync + Clone,
445{
446 fn clone(&self) -> Self {
447 Self {
448 closure: self.closure.clone(),
449 }
450 }
451}
452
453impl<F> RingBufferPostDeleteInterceptor for ClosureRingBufferPostDeleteInterceptor<F>
454where
455 F: for<'a> Fn(&mut RingBufferPostDeleteContext<'a>) -> Result<()> + Send + Sync,
456{
457 fn intercept<'a>(&self, ctx: &mut RingBufferPostDeleteContext<'a>) -> Result<()> {
458 (self.closure)(ctx)
459 }
460}
461
462pub fn ringbuffer_post_delete<F>(f: F) -> ClosureRingBufferPostDeleteInterceptor<F>
463where
464 F: for<'a> Fn(&mut RingBufferPostDeleteContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
465{
466 ClosureRingBufferPostDeleteInterceptor::new(f)
467}
468
469pub struct RingBufferInterceptor;
471
472impl RingBufferInterceptor {
473 pub fn pre_insert(
474 txn: &mut impl WithInterceptors,
475 ringbuffer: &RingBufferDef,
476 row: &EncodedValues,
477 ) -> Result<()> {
478 let ctx = RingBufferPreInsertContext::new(ringbuffer, row);
479 txn.ringbuffer_pre_insert_interceptors().execute(ctx)
480 }
481
482 pub fn post_insert(
483 txn: &mut impl WithInterceptors,
484 ringbuffer: &RingBufferDef,
485 id: RowNumber,
486 row: &EncodedValues,
487 ) -> Result<()> {
488 let ctx = RingBufferPostInsertContext::new(ringbuffer, id, row);
489 txn.ringbuffer_post_insert_interceptors().execute(ctx)
490 }
491
492 pub fn pre_update(
493 txn: &mut impl WithInterceptors,
494 ringbuffer: &RingBufferDef,
495 id: RowNumber,
496 row: &EncodedValues,
497 ) -> Result<()> {
498 let ctx = RingBufferPreUpdateContext::new(ringbuffer, id, row);
499 txn.ringbuffer_pre_update_interceptors().execute(ctx)
500 }
501
502 pub fn post_update(
503 txn: &mut impl WithInterceptors,
504 ringbuffer: &RingBufferDef,
505 id: RowNumber,
506 row: &EncodedValues,
507 old_row: &EncodedValues,
508 ) -> Result<()> {
509 let ctx = RingBufferPostUpdateContext::new(ringbuffer, id, row, old_row);
510 txn.ringbuffer_post_update_interceptors().execute(ctx)
511 }
512
513 pub fn pre_delete(txn: &mut impl WithInterceptors, ringbuffer: &RingBufferDef, id: RowNumber) -> Result<()> {
514 let ctx = RingBufferPreDeleteContext::new(ringbuffer, id);
515 txn.ringbuffer_pre_delete_interceptors().execute(ctx)
516 }
517
518 pub fn post_delete(
519 txn: &mut impl WithInterceptors,
520 ringbuffer: &RingBufferDef,
521 id: RowNumber,
522 deleted_row: &EncodedValues,
523 ) -> Result<()> {
524 let ctx = RingBufferPostDeleteContext::new(ringbuffer, id, deleted_row);
525 txn.ringbuffer_post_delete_interceptors().execute(ctx)
526 }
527}