Skip to main content

reifydb_transaction/interceptor/
ringbuffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{encoded::encoded::EncodedValues, interface::catalog::ringbuffer::RingBufferDef};
5use reifydb_type::{Result, value::row_number::RowNumber};
6
7use super::WithInterceptors;
8use crate::interceptor::chain::InterceptorChain;
9
10// PRE INSERT
11/// Context for ringbuffer pre-insert interceptors
12pub struct RingBufferPreInsertContext<'a> {
13	pub ringbuffer: &'a RingBufferDef,
14	pub row: &'a EncodedValues,
15}
16
17impl<'a> RingBufferPreInsertContext<'a> {
18	pub fn new(ringbuffer: &'a RingBufferDef, row: &'a EncodedValues) -> Self {
19		Self {
20			ringbuffer,
21			row,
22		}
23	}
24}
25
26pub trait RingBufferPreInsertInterceptor: Send + Sync {
27	fn intercept<'a>(&self, ctx: &mut RingBufferPreInsertContext<'a>) -> Result<()>;
28}
29
30impl InterceptorChain<dyn RingBufferPreInsertInterceptor + Send + Sync> {
31	pub fn execute(&self, mut ctx: RingBufferPreInsertContext) -> Result<()> {
32		for interceptor in &self.interceptors {
33			interceptor.intercept(&mut ctx)?;
34		}
35		Ok(())
36	}
37}
38
39pub struct ClosureRingBufferPreInsertInterceptor<F>
40where
41	F: for<'a> Fn(&mut RingBufferPreInsertContext<'a>) -> Result<()> + Send + Sync,
42{
43	closure: F,
44}
45
46impl<F> ClosureRingBufferPreInsertInterceptor<F>
47where
48	F: for<'a> Fn(&mut RingBufferPreInsertContext<'a>) -> Result<()> + Send + Sync,
49{
50	pub fn new(closure: F) -> Self {
51		Self {
52			closure,
53		}
54	}
55}
56
57impl<F> Clone for ClosureRingBufferPreInsertInterceptor<F>
58where
59	F: for<'a> Fn(&mut RingBufferPreInsertContext<'a>) -> Result<()> + Send + Sync + Clone,
60{
61	fn clone(&self) -> Self {
62		Self {
63			closure: self.closure.clone(),
64		}
65	}
66}
67
68impl<F> RingBufferPreInsertInterceptor for ClosureRingBufferPreInsertInterceptor<F>
69where
70	F: for<'a> Fn(&mut RingBufferPreInsertContext<'a>) -> Result<()> + Send + Sync,
71{
72	fn intercept<'a>(&self, ctx: &mut RingBufferPreInsertContext<'a>) -> Result<()> {
73		(self.closure)(ctx)
74	}
75}
76
77pub fn ringbuffer_pre_insert<F>(f: F) -> ClosureRingBufferPreInsertInterceptor<F>
78where
79	F: for<'a> Fn(&mut RingBufferPreInsertContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
80{
81	ClosureRingBufferPreInsertInterceptor::new(f)
82}
83
84// POST INSERT
85/// Context for ringbuffer post-insert interceptors
86pub struct RingBufferPostInsertContext<'a> {
87	pub ringbuffer: &'a RingBufferDef,
88	pub id: RowNumber,
89	pub row: &'a EncodedValues,
90}
91
92impl<'a> RingBufferPostInsertContext<'a> {
93	pub fn new(ringbuffer: &'a RingBufferDef, id: RowNumber, row: &'a EncodedValues) -> Self {
94		Self {
95			ringbuffer,
96			id,
97			row,
98		}
99	}
100}
101
102pub trait RingBufferPostInsertInterceptor: Send + Sync {
103	fn intercept<'a>(&self, ctx: &mut RingBufferPostInsertContext<'a>) -> Result<()>;
104}
105
106impl InterceptorChain<dyn RingBufferPostInsertInterceptor + Send + Sync> {
107	pub fn execute<'a>(&self, mut ctx: RingBufferPostInsertContext<'a>) -> Result<()> {
108		for interceptor in &self.interceptors {
109			interceptor.intercept(&mut ctx)?;
110		}
111		Ok(())
112	}
113}
114
115pub struct ClosureRingBufferPostInsertInterceptor<F>
116where
117	F: for<'a> Fn(&mut RingBufferPostInsertContext<'a>) -> Result<()> + Send + Sync,
118{
119	closure: F,
120}
121
122impl<F> ClosureRingBufferPostInsertInterceptor<F>
123where
124	F: for<'a> Fn(&mut RingBufferPostInsertContext<'a>) -> Result<()> + Send + Sync,
125{
126	pub fn new(closure: F) -> Self {
127		Self {
128			closure,
129		}
130	}
131}
132
133impl<F> Clone for ClosureRingBufferPostInsertInterceptor<F>
134where
135	F: for<'a> Fn(&mut RingBufferPostInsertContext<'a>) -> Result<()> + Send + Sync + Clone,
136{
137	fn clone(&self) -> Self {
138		Self {
139			closure: self.closure.clone(),
140		}
141	}
142}
143
144impl<F> RingBufferPostInsertInterceptor for ClosureRingBufferPostInsertInterceptor<F>
145where
146	F: for<'a> Fn(&mut RingBufferPostInsertContext<'a>) -> Result<()> + Send + Sync,
147{
148	fn intercept<'a>(&self, ctx: &mut RingBufferPostInsertContext<'a>) -> Result<()> {
149		(self.closure)(ctx)
150	}
151}
152
153pub fn ringbuffer_post_insert<F>(f: F) -> ClosureRingBufferPostInsertInterceptor<F>
154where
155	F: for<'a> Fn(&mut RingBufferPostInsertContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
156{
157	ClosureRingBufferPostInsertInterceptor::new(f)
158}
159
160// PRE UPDATE
161/// Context for ringbuffer pre-update interceptors
162pub struct RingBufferPreUpdateContext<'a> {
163	pub ringbuffer: &'a RingBufferDef,
164	pub id: RowNumber,
165	pub row: &'a EncodedValues,
166}
167
168impl<'a> RingBufferPreUpdateContext<'a> {
169	pub fn new(ringbuffer: &'a RingBufferDef, id: RowNumber, row: &'a EncodedValues) -> Self {
170		Self {
171			ringbuffer,
172			id,
173			row,
174		}
175	}
176}
177
178pub trait RingBufferPreUpdateInterceptor: Send + Sync {
179	fn intercept<'a>(&self, ctx: &mut RingBufferPreUpdateContext<'a>) -> Result<()>;
180}
181
182impl InterceptorChain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
183	pub fn execute(&self, mut ctx: RingBufferPreUpdateContext) -> Result<()> {
184		for interceptor in &self.interceptors {
185			interceptor.intercept(&mut ctx)?;
186		}
187		Ok(())
188	}
189}
190
191pub struct ClosureRingBufferPreUpdateInterceptor<F>
192where
193	F: for<'a> Fn(&mut RingBufferPreUpdateContext<'a>) -> Result<()> + Send + Sync,
194{
195	closure: F,
196}
197
198impl<F> ClosureRingBufferPreUpdateInterceptor<F>
199where
200	F: for<'a> Fn(&mut RingBufferPreUpdateContext<'a>) -> Result<()> + Send + Sync,
201{
202	pub fn new(closure: F) -> Self {
203		Self {
204			closure,
205		}
206	}
207}
208
209impl<F> Clone for ClosureRingBufferPreUpdateInterceptor<F>
210where
211	F: for<'a> Fn(&mut RingBufferPreUpdateContext<'a>) -> Result<()> + Send + Sync + Clone,
212{
213	fn clone(&self) -> Self {
214		Self {
215			closure: self.closure.clone(),
216		}
217	}
218}
219
220impl<F> RingBufferPreUpdateInterceptor for ClosureRingBufferPreUpdateInterceptor<F>
221where
222	F: for<'a> Fn(&mut RingBufferPreUpdateContext<'a>) -> Result<()> + Send + Sync,
223{
224	fn intercept<'a>(&self, ctx: &mut RingBufferPreUpdateContext<'a>) -> Result<()> {
225		(self.closure)(ctx)
226	}
227}
228
229pub fn ringbuffer_pre_update<F>(f: F) -> ClosureRingBufferPreUpdateInterceptor<F>
230where
231	F: for<'a> Fn(&mut RingBufferPreUpdateContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
232{
233	ClosureRingBufferPreUpdateInterceptor::new(f)
234}
235
236// POST UPDATE
237/// Context for ringbuffer post-update interceptors
238pub struct RingBufferPostUpdateContext<'a> {
239	pub ringbuffer: &'a RingBufferDef,
240	pub id: RowNumber,
241	pub row: &'a EncodedValues,
242	pub old_row: &'a EncodedValues,
243}
244
245impl<'a> RingBufferPostUpdateContext<'a> {
246	pub fn new(
247		ringbuffer: &'a RingBufferDef,
248		id: RowNumber,
249		row: &'a EncodedValues,
250		old_row: &'a EncodedValues,
251	) -> Self {
252		Self {
253			ringbuffer,
254			id,
255			row,
256			old_row,
257		}
258	}
259}
260
261pub trait RingBufferPostUpdateInterceptor: Send + Sync {
262	fn intercept<'a>(&self, ctx: &mut RingBufferPostUpdateContext<'a>) -> Result<()>;
263}
264
265impl InterceptorChain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
266	pub fn execute(&self, mut ctx: RingBufferPostUpdateContext) -> Result<()> {
267		for interceptor in &self.interceptors {
268			interceptor.intercept(&mut ctx)?;
269		}
270		Ok(())
271	}
272}
273
274pub struct ClosureRingBufferPostUpdateInterceptor<F>
275where
276	F: for<'a> Fn(&mut RingBufferPostUpdateContext<'a>) -> Result<()> + Send + Sync,
277{
278	closure: F,
279}
280
281impl<F> ClosureRingBufferPostUpdateInterceptor<F>
282where
283	F: for<'a> Fn(&mut RingBufferPostUpdateContext<'a>) -> Result<()> + Send + Sync,
284{
285	pub fn new(closure: F) -> Self {
286		Self {
287			closure,
288		}
289	}
290}
291
292impl<F> Clone for ClosureRingBufferPostUpdateInterceptor<F>
293where
294	F: for<'a> Fn(&mut RingBufferPostUpdateContext<'a>) -> Result<()> + Send + Sync + Clone,
295{
296	fn clone(&self) -> Self {
297		Self {
298			closure: self.closure.clone(),
299		}
300	}
301}
302
303impl<F> RingBufferPostUpdateInterceptor for ClosureRingBufferPostUpdateInterceptor<F>
304where
305	F: for<'a> Fn(&mut RingBufferPostUpdateContext<'a>) -> Result<()> + Send + Sync,
306{
307	fn intercept<'a>(&self, ctx: &mut RingBufferPostUpdateContext<'a>) -> Result<()> {
308		(self.closure)(ctx)
309	}
310}
311
312pub fn ringbuffer_post_update<F>(f: F) -> ClosureRingBufferPostUpdateInterceptor<F>
313where
314	F: for<'a> Fn(&mut RingBufferPostUpdateContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
315{
316	ClosureRingBufferPostUpdateInterceptor::new(f)
317}
318
319// PRE DELETE
320/// Context for ringbuffer pre-delete interceptors
321pub struct RingBufferPreDeleteContext<'a> {
322	pub ringbuffer: &'a RingBufferDef,
323	pub id: RowNumber,
324}
325
326impl<'a> RingBufferPreDeleteContext<'a> {
327	pub fn new(ringbuffer: &'a RingBufferDef, id: RowNumber) -> Self {
328		Self {
329			ringbuffer,
330			id,
331		}
332	}
333}
334
335pub trait RingBufferPreDeleteInterceptor: Send + Sync {
336	fn intercept<'a>(&self, ctx: &mut RingBufferPreDeleteContext<'a>) -> Result<()>;
337}
338
339impl InterceptorChain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
340	pub fn execute(&self, mut ctx: RingBufferPreDeleteContext) -> Result<()> {
341		for interceptor in &self.interceptors {
342			interceptor.intercept(&mut ctx)?;
343		}
344		Ok(())
345	}
346}
347
348pub struct ClosureRingBufferPreDeleteInterceptor<F>
349where
350	F: for<'a> Fn(&mut RingBufferPreDeleteContext<'a>) -> Result<()> + Send + Sync,
351{
352	closure: F,
353}
354
355impl<F> ClosureRingBufferPreDeleteInterceptor<F>
356where
357	F: for<'a> Fn(&mut RingBufferPreDeleteContext<'a>) -> Result<()> + Send + Sync,
358{
359	pub fn new(closure: F) -> Self {
360		Self {
361			closure,
362		}
363	}
364}
365
366impl<F> Clone for ClosureRingBufferPreDeleteInterceptor<F>
367where
368	F: for<'a> Fn(&mut RingBufferPreDeleteContext<'a>) -> Result<()> + Send + Sync + Clone,
369{
370	fn clone(&self) -> Self {
371		Self {
372			closure: self.closure.clone(),
373		}
374	}
375}
376
377impl<F> RingBufferPreDeleteInterceptor for ClosureRingBufferPreDeleteInterceptor<F>
378where
379	F: for<'a> Fn(&mut RingBufferPreDeleteContext<'a>) -> Result<()> + Send + Sync,
380{
381	fn intercept<'a>(&self, ctx: &mut RingBufferPreDeleteContext<'a>) -> Result<()> {
382		(self.closure)(ctx)
383	}
384}
385
386pub fn ringbuffer_pre_delete<F>(f: F) -> ClosureRingBufferPreDeleteInterceptor<F>
387where
388	F: for<'a> Fn(&mut RingBufferPreDeleteContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
389{
390	ClosureRingBufferPreDeleteInterceptor::new(f)
391}
392
393// POST DELETE
394/// Context for ringbuffer post-delete interceptors
395pub struct RingBufferPostDeleteContext<'a> {
396	pub ringbuffer: &'a RingBufferDef,
397	pub id: RowNumber,
398	pub deleted_row: &'a EncodedValues,
399}
400
401impl<'a> RingBufferPostDeleteContext<'a> {
402	pub fn new(ringbuffer: &'a RingBufferDef, id: RowNumber, deleted_row: &'a EncodedValues) -> Self {
403		Self {
404			ringbuffer,
405			id,
406			deleted_row,
407		}
408	}
409}
410
411pub trait RingBufferPostDeleteInterceptor: Send + Sync {
412	fn intercept<'a>(&self, ctx: &mut RingBufferPostDeleteContext<'a>) -> Result<()>;
413}
414
415impl InterceptorChain<dyn RingBufferPostDeleteInterceptor + Send + Sync> {
416	pub fn execute(&self, mut ctx: RingBufferPostDeleteContext) -> Result<()> {
417		for interceptor in &self.interceptors {
418			interceptor.intercept(&mut ctx)?;
419		}
420		Ok(())
421	}
422}
423
424pub struct ClosureRingBufferPostDeleteInterceptor<F>
425where
426	F: for<'a> Fn(&mut RingBufferPostDeleteContext<'a>) -> Result<()> + Send + Sync,
427{
428	closure: F,
429}
430
431impl<F> ClosureRingBufferPostDeleteInterceptor<F>
432where
433	F: for<'a> Fn(&mut RingBufferPostDeleteContext<'a>) -> Result<()> + Send + Sync,
434{
435	pub fn new(closure: F) -> Self {
436		Self {
437			closure,
438		}
439	}
440}
441
442impl<F> Clone for ClosureRingBufferPostDeleteInterceptor<F>
443where
444	F: for<'a> Fn(&mut RingBufferPostDeleteContext<'a>) -> Result<()> + Send + Sync + Clone,
445{
446	fn clone(&self) -> Self {
447		Self {
448			closure: self.closure.clone(),
449		}
450	}
451}
452
453impl<F> RingBufferPostDeleteInterceptor for ClosureRingBufferPostDeleteInterceptor<F>
454where
455	F: for<'a> Fn(&mut RingBufferPostDeleteContext<'a>) -> Result<()> + Send + Sync,
456{
457	fn intercept<'a>(&self, ctx: &mut RingBufferPostDeleteContext<'a>) -> Result<()> {
458		(self.closure)(ctx)
459	}
460}
461
462pub fn ringbuffer_post_delete<F>(f: F) -> ClosureRingBufferPostDeleteInterceptor<F>
463where
464	F: for<'a> Fn(&mut RingBufferPostDeleteContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
465{
466	ClosureRingBufferPostDeleteInterceptor::new(f)
467}
468
469/// Helper struct for executing ring buffer interceptors via static methods.
470pub struct RingBufferInterceptor;
471
472impl RingBufferInterceptor {
473	pub fn pre_insert(
474		txn: &mut impl WithInterceptors,
475		ringbuffer: &RingBufferDef,
476		row: &EncodedValues,
477	) -> Result<()> {
478		let ctx = RingBufferPreInsertContext::new(ringbuffer, row);
479		txn.ringbuffer_pre_insert_interceptors().execute(ctx)
480	}
481
482	pub fn post_insert(
483		txn: &mut impl WithInterceptors,
484		ringbuffer: &RingBufferDef,
485		id: RowNumber,
486		row: &EncodedValues,
487	) -> Result<()> {
488		let ctx = RingBufferPostInsertContext::new(ringbuffer, id, row);
489		txn.ringbuffer_post_insert_interceptors().execute(ctx)
490	}
491
492	pub fn pre_update(
493		txn: &mut impl WithInterceptors,
494		ringbuffer: &RingBufferDef,
495		id: RowNumber,
496		row: &EncodedValues,
497	) -> Result<()> {
498		let ctx = RingBufferPreUpdateContext::new(ringbuffer, id, row);
499		txn.ringbuffer_pre_update_interceptors().execute(ctx)
500	}
501
502	pub fn post_update(
503		txn: &mut impl WithInterceptors,
504		ringbuffer: &RingBufferDef,
505		id: RowNumber,
506		row: &EncodedValues,
507		old_row: &EncodedValues,
508	) -> Result<()> {
509		let ctx = RingBufferPostUpdateContext::new(ringbuffer, id, row, old_row);
510		txn.ringbuffer_post_update_interceptors().execute(ctx)
511	}
512
513	pub fn pre_delete(txn: &mut impl WithInterceptors, ringbuffer: &RingBufferDef, id: RowNumber) -> Result<()> {
514		let ctx = RingBufferPreDeleteContext::new(ringbuffer, id);
515		txn.ringbuffer_pre_delete_interceptors().execute(ctx)
516	}
517
518	pub fn post_delete(
519		txn: &mut impl WithInterceptors,
520		ringbuffer: &RingBufferDef,
521		id: RowNumber,
522		deleted_row: &EncodedValues,
523	) -> Result<()> {
524		let ctx = RingBufferPostDeleteContext::new(ringbuffer, id, deleted_row);
525		txn.ringbuffer_post_delete_interceptors().execute(ctx)
526	}
527}