Skip to main content

reifydb_transaction/interceptor/
ringbuffer_row.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{encoded::row::EncodedRow, interface::catalog::ringbuffer::RingBuffer};
5use reifydb_type::{Result, value::row_number::RowNumber};
6
7use super::WithInterceptors;
8use crate::interceptor::chain::InterceptorChain;
9
10// PRE INSERT
11pub struct RingBufferRowPreInsertContext<'a> {
12	pub ringbuffer: &'a RingBuffer,
13	pub row: EncodedRow,
14}
15
16impl<'a> RingBufferRowPreInsertContext<'a> {
17	pub fn new(ringbuffer: &'a RingBuffer, row: EncodedRow) -> Self {
18		Self {
19			ringbuffer,
20			row,
21		}
22	}
23}
24
25pub trait RingBufferRowPreInsertInterceptor: Send + Sync {
26	fn intercept<'a>(&self, ctx: &mut RingBufferRowPreInsertContext<'a>) -> Result<()>;
27}
28
29impl InterceptorChain<dyn RingBufferRowPreInsertInterceptor + Send + Sync> {
30	pub fn execute(&self, mut ctx: RingBufferRowPreInsertContext) -> Result<EncodedRow> {
31		for interceptor in &self.interceptors {
32			interceptor.intercept(&mut ctx)?;
33		}
34		Ok(ctx.row)
35	}
36}
37
38pub struct ClosureRingBufferRowPreInsertInterceptor<F>
39where
40	F: for<'a> Fn(&mut RingBufferRowPreInsertContext<'a>) -> Result<()> + Send + Sync,
41{
42	closure: F,
43}
44
45impl<F> ClosureRingBufferRowPreInsertInterceptor<F>
46where
47	F: for<'a> Fn(&mut RingBufferRowPreInsertContext<'a>) -> Result<()> + Send + Sync,
48{
49	pub fn new(closure: F) -> Self {
50		Self {
51			closure,
52		}
53	}
54}
55
56impl<F> Clone for ClosureRingBufferRowPreInsertInterceptor<F>
57where
58	F: for<'a> Fn(&mut RingBufferRowPreInsertContext<'a>) -> Result<()> + Send + Sync + Clone,
59{
60	fn clone(&self) -> Self {
61		Self {
62			closure: self.closure.clone(),
63		}
64	}
65}
66
67impl<F> RingBufferRowPreInsertInterceptor for ClosureRingBufferRowPreInsertInterceptor<F>
68where
69	F: for<'a> Fn(&mut RingBufferRowPreInsertContext<'a>) -> Result<()> + Send + Sync,
70{
71	fn intercept<'a>(&self, ctx: &mut RingBufferRowPreInsertContext<'a>) -> Result<()> {
72		(self.closure)(ctx)
73	}
74}
75
76pub fn ringbuffer_row_pre_insert<F>(f: F) -> ClosureRingBufferRowPreInsertInterceptor<F>
77where
78	F: for<'a> Fn(&mut RingBufferRowPreInsertContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
79{
80	ClosureRingBufferRowPreInsertInterceptor::new(f)
81}
82
83// POST INSERT
84pub struct RingBufferRowPostInsertContext<'a> {
85	pub ringbuffer: &'a RingBuffer,
86	pub id: RowNumber,
87	pub row: &'a EncodedRow,
88}
89
90impl<'a> RingBufferRowPostInsertContext<'a> {
91	pub fn new(ringbuffer: &'a RingBuffer, id: RowNumber, row: &'a EncodedRow) -> Self {
92		Self {
93			ringbuffer,
94			id,
95			row,
96		}
97	}
98}
99
100pub trait RingBufferRowPostInsertInterceptor: Send + Sync {
101	fn intercept<'a>(&self, ctx: &mut RingBufferRowPostInsertContext<'a>) -> Result<()>;
102}
103
104impl InterceptorChain<dyn RingBufferRowPostInsertInterceptor + Send + Sync> {
105	pub fn execute<'a>(&self, mut ctx: RingBufferRowPostInsertContext<'a>) -> Result<()> {
106		for interceptor in &self.interceptors {
107			interceptor.intercept(&mut ctx)?;
108		}
109		Ok(())
110	}
111}
112
113pub struct ClosureRingBufferRowPostInsertInterceptor<F>
114where
115	F: for<'a> Fn(&mut RingBufferRowPostInsertContext<'a>) -> Result<()> + Send + Sync,
116{
117	closure: F,
118}
119
120impl<F> ClosureRingBufferRowPostInsertInterceptor<F>
121where
122	F: for<'a> Fn(&mut RingBufferRowPostInsertContext<'a>) -> Result<()> + Send + Sync,
123{
124	pub fn new(closure: F) -> Self {
125		Self {
126			closure,
127		}
128	}
129}
130
131impl<F> Clone for ClosureRingBufferRowPostInsertInterceptor<F>
132where
133	F: for<'a> Fn(&mut RingBufferRowPostInsertContext<'a>) -> Result<()> + Send + Sync + Clone,
134{
135	fn clone(&self) -> Self {
136		Self {
137			closure: self.closure.clone(),
138		}
139	}
140}
141
142impl<F> RingBufferRowPostInsertInterceptor for ClosureRingBufferRowPostInsertInterceptor<F>
143where
144	F: for<'a> Fn(&mut RingBufferRowPostInsertContext<'a>) -> Result<()> + Send + Sync,
145{
146	fn intercept<'a>(&self, ctx: &mut RingBufferRowPostInsertContext<'a>) -> Result<()> {
147		(self.closure)(ctx)
148	}
149}
150
151pub fn ringbuffer_row_post_insert<F>(f: F) -> ClosureRingBufferRowPostInsertInterceptor<F>
152where
153	F: for<'a> Fn(&mut RingBufferRowPostInsertContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
154{
155	ClosureRingBufferRowPostInsertInterceptor::new(f)
156}
157
158// PRE UPDATE
159pub struct RingBufferRowPreUpdateContext<'a> {
160	pub ringbuffer: &'a RingBuffer,
161	pub id: RowNumber,
162	pub row: EncodedRow,
163}
164
165impl<'a> RingBufferRowPreUpdateContext<'a> {
166	pub fn new(ringbuffer: &'a RingBuffer, id: RowNumber, row: EncodedRow) -> Self {
167		Self {
168			ringbuffer,
169			id,
170			row,
171		}
172	}
173}
174
175pub trait RingBufferRowPreUpdateInterceptor: Send + Sync {
176	fn intercept<'a>(&self, ctx: &mut RingBufferRowPreUpdateContext<'a>) -> Result<()>;
177}
178
179impl InterceptorChain<dyn RingBufferRowPreUpdateInterceptor + Send + Sync> {
180	pub fn execute(&self, mut ctx: RingBufferRowPreUpdateContext) -> Result<EncodedRow> {
181		for interceptor in &self.interceptors {
182			interceptor.intercept(&mut ctx)?;
183		}
184		Ok(ctx.row)
185	}
186}
187
188pub struct ClosureRingBufferRowPreUpdateInterceptor<F>
189where
190	F: for<'a> Fn(&mut RingBufferRowPreUpdateContext<'a>) -> Result<()> + Send + Sync,
191{
192	closure: F,
193}
194
195impl<F> ClosureRingBufferRowPreUpdateInterceptor<F>
196where
197	F: for<'a> Fn(&mut RingBufferRowPreUpdateContext<'a>) -> Result<()> + Send + Sync,
198{
199	pub fn new(closure: F) -> Self {
200		Self {
201			closure,
202		}
203	}
204}
205
206impl<F> Clone for ClosureRingBufferRowPreUpdateInterceptor<F>
207where
208	F: for<'a> Fn(&mut RingBufferRowPreUpdateContext<'a>) -> Result<()> + Send + Sync + Clone,
209{
210	fn clone(&self) -> Self {
211		Self {
212			closure: self.closure.clone(),
213		}
214	}
215}
216
217impl<F> RingBufferRowPreUpdateInterceptor for ClosureRingBufferRowPreUpdateInterceptor<F>
218where
219	F: for<'a> Fn(&mut RingBufferRowPreUpdateContext<'a>) -> Result<()> + Send + Sync,
220{
221	fn intercept<'a>(&self, ctx: &mut RingBufferRowPreUpdateContext<'a>) -> Result<()> {
222		(self.closure)(ctx)
223	}
224}
225
226pub fn ringbuffer_row_pre_update<F>(f: F) -> ClosureRingBufferRowPreUpdateInterceptor<F>
227where
228	F: for<'a> Fn(&mut RingBufferRowPreUpdateContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
229{
230	ClosureRingBufferRowPreUpdateInterceptor::new(f)
231}
232
233// POST UPDATE
234pub struct RingBufferRowPostUpdateContext<'a> {
235	pub ringbuffer: &'a RingBuffer,
236	pub id: RowNumber,
237	pub post: &'a EncodedRow,
238	pub pre: &'a EncodedRow,
239}
240
241impl<'a> RingBufferRowPostUpdateContext<'a> {
242	pub fn new(ringbuffer: &'a RingBuffer, id: RowNumber, post: &'a EncodedRow, pre: &'a EncodedRow) -> Self {
243		Self {
244			ringbuffer,
245			id,
246			post,
247			pre,
248		}
249	}
250}
251
252pub trait RingBufferRowPostUpdateInterceptor: Send + Sync {
253	fn intercept<'a>(&self, ctx: &mut RingBufferRowPostUpdateContext<'a>) -> Result<()>;
254}
255
256impl InterceptorChain<dyn RingBufferRowPostUpdateInterceptor + Send + Sync> {
257	pub fn execute(&self, mut ctx: RingBufferRowPostUpdateContext) -> Result<()> {
258		for interceptor in &self.interceptors {
259			interceptor.intercept(&mut ctx)?;
260		}
261		Ok(())
262	}
263}
264
265pub struct ClosureRingBufferRowPostUpdateInterceptor<F>
266where
267	F: for<'a> Fn(&mut RingBufferRowPostUpdateContext<'a>) -> Result<()> + Send + Sync,
268{
269	closure: F,
270}
271
272impl<F> ClosureRingBufferRowPostUpdateInterceptor<F>
273where
274	F: for<'a> Fn(&mut RingBufferRowPostUpdateContext<'a>) -> Result<()> + Send + Sync,
275{
276	pub fn new(closure: F) -> Self {
277		Self {
278			closure,
279		}
280	}
281}
282
283impl<F> Clone for ClosureRingBufferRowPostUpdateInterceptor<F>
284where
285	F: for<'a> Fn(&mut RingBufferRowPostUpdateContext<'a>) -> Result<()> + Send + Sync + Clone,
286{
287	fn clone(&self) -> Self {
288		Self {
289			closure: self.closure.clone(),
290		}
291	}
292}
293
294impl<F> RingBufferRowPostUpdateInterceptor for ClosureRingBufferRowPostUpdateInterceptor<F>
295where
296	F: for<'a> Fn(&mut RingBufferRowPostUpdateContext<'a>) -> Result<()> + Send + Sync,
297{
298	fn intercept<'a>(&self, ctx: &mut RingBufferRowPostUpdateContext<'a>) -> Result<()> {
299		(self.closure)(ctx)
300	}
301}
302
303pub fn ringbuffer_row_post_update<F>(f: F) -> ClosureRingBufferRowPostUpdateInterceptor<F>
304where
305	F: for<'a> Fn(&mut RingBufferRowPostUpdateContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
306{
307	ClosureRingBufferRowPostUpdateInterceptor::new(f)
308}
309
310// PRE DELETE
311pub struct RingBufferRowPreDeleteContext<'a> {
312	pub ringbuffer: &'a RingBuffer,
313	pub id: RowNumber,
314}
315
316impl<'a> RingBufferRowPreDeleteContext<'a> {
317	pub fn new(ringbuffer: &'a RingBuffer, id: RowNumber) -> Self {
318		Self {
319			ringbuffer,
320			id,
321		}
322	}
323}
324
325pub trait RingBufferRowPreDeleteInterceptor: Send + Sync {
326	fn intercept<'a>(&self, ctx: &mut RingBufferRowPreDeleteContext<'a>) -> Result<()>;
327}
328
329impl InterceptorChain<dyn RingBufferRowPreDeleteInterceptor + Send + Sync> {
330	pub fn execute(&self, mut ctx: RingBufferRowPreDeleteContext) -> Result<()> {
331		for interceptor in &self.interceptors {
332			interceptor.intercept(&mut ctx)?;
333		}
334		Ok(())
335	}
336}
337
338pub struct ClosureRingBufferRowPreDeleteInterceptor<F>
339where
340	F: for<'a> Fn(&mut RingBufferRowPreDeleteContext<'a>) -> Result<()> + Send + Sync,
341{
342	closure: F,
343}
344
345impl<F> ClosureRingBufferRowPreDeleteInterceptor<F>
346where
347	F: for<'a> Fn(&mut RingBufferRowPreDeleteContext<'a>) -> Result<()> + Send + Sync,
348{
349	pub fn new(closure: F) -> Self {
350		Self {
351			closure,
352		}
353	}
354}
355
356impl<F> Clone for ClosureRingBufferRowPreDeleteInterceptor<F>
357where
358	F: for<'a> Fn(&mut RingBufferRowPreDeleteContext<'a>) -> Result<()> + Send + Sync + Clone,
359{
360	fn clone(&self) -> Self {
361		Self {
362			closure: self.closure.clone(),
363		}
364	}
365}
366
367impl<F> RingBufferRowPreDeleteInterceptor for ClosureRingBufferRowPreDeleteInterceptor<F>
368where
369	F: for<'a> Fn(&mut RingBufferRowPreDeleteContext<'a>) -> Result<()> + Send + Sync,
370{
371	fn intercept<'a>(&self, ctx: &mut RingBufferRowPreDeleteContext<'a>) -> Result<()> {
372		(self.closure)(ctx)
373	}
374}
375
376pub fn ringbuffer_row_pre_delete<F>(f: F) -> ClosureRingBufferRowPreDeleteInterceptor<F>
377where
378	F: for<'a> Fn(&mut RingBufferRowPreDeleteContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
379{
380	ClosureRingBufferRowPreDeleteInterceptor::new(f)
381}
382
383// POST DELETE
384pub struct RingBufferRowPostDeleteContext<'a> {
385	pub ringbuffer: &'a RingBuffer,
386	pub id: RowNumber,
387	pub deleted_row: &'a EncodedRow,
388}
389
390impl<'a> RingBufferRowPostDeleteContext<'a> {
391	pub fn new(ringbuffer: &'a RingBuffer, id: RowNumber, deleted_row: &'a EncodedRow) -> Self {
392		Self {
393			ringbuffer,
394			id,
395			deleted_row,
396		}
397	}
398}
399
400pub trait RingBufferRowPostDeleteInterceptor: Send + Sync {
401	fn intercept<'a>(&self, ctx: &mut RingBufferRowPostDeleteContext<'a>) -> Result<()>;
402}
403
404impl InterceptorChain<dyn RingBufferRowPostDeleteInterceptor + Send + Sync> {
405	pub fn execute(&self, mut ctx: RingBufferRowPostDeleteContext) -> Result<()> {
406		for interceptor in &self.interceptors {
407			interceptor.intercept(&mut ctx)?;
408		}
409		Ok(())
410	}
411}
412
413pub struct ClosureRingBufferRowPostDeleteInterceptor<F>
414where
415	F: for<'a> Fn(&mut RingBufferRowPostDeleteContext<'a>) -> Result<()> + Send + Sync,
416{
417	closure: F,
418}
419
420impl<F> ClosureRingBufferRowPostDeleteInterceptor<F>
421where
422	F: for<'a> Fn(&mut RingBufferRowPostDeleteContext<'a>) -> Result<()> + Send + Sync,
423{
424	pub fn new(closure: F) -> Self {
425		Self {
426			closure,
427		}
428	}
429}
430
431impl<F> Clone for ClosureRingBufferRowPostDeleteInterceptor<F>
432where
433	F: for<'a> Fn(&mut RingBufferRowPostDeleteContext<'a>) -> Result<()> + Send + Sync + Clone,
434{
435	fn clone(&self) -> Self {
436		Self {
437			closure: self.closure.clone(),
438		}
439	}
440}
441
442impl<F> RingBufferRowPostDeleteInterceptor for ClosureRingBufferRowPostDeleteInterceptor<F>
443where
444	F: for<'a> Fn(&mut RingBufferRowPostDeleteContext<'a>) -> Result<()> + Send + Sync,
445{
446	fn intercept<'a>(&self, ctx: &mut RingBufferRowPostDeleteContext<'a>) -> Result<()> {
447		(self.closure)(ctx)
448	}
449}
450
451pub fn ringbuffer_row_post_delete<F>(f: F) -> ClosureRingBufferRowPostDeleteInterceptor<F>
452where
453	F: for<'a> Fn(&mut RingBufferRowPostDeleteContext<'a>) -> Result<()> + Send + Sync + Clone + 'static,
454{
455	ClosureRingBufferRowPostDeleteInterceptor::new(f)
456}
457
458/// Helper struct for executing ring buffer interceptors via static methods.
459pub struct RingBufferRowInterceptor;
460
461impl RingBufferRowInterceptor {
462	pub fn pre_insert(
463		txn: &mut impl WithInterceptors,
464		ringbuffer: &RingBuffer,
465		row: EncodedRow,
466	) -> Result<EncodedRow> {
467		let ctx = RingBufferRowPreInsertContext::new(ringbuffer, row);
468		txn.ringbuffer_row_pre_insert_interceptors().execute(ctx)
469	}
470
471	pub fn post_insert(
472		txn: &mut impl WithInterceptors,
473		ringbuffer: &RingBuffer,
474		id: RowNumber,
475		row: &EncodedRow,
476	) -> Result<()> {
477		let ctx = RingBufferRowPostInsertContext::new(ringbuffer, id, row);
478		txn.ringbuffer_row_post_insert_interceptors().execute(ctx)
479	}
480
481	pub fn pre_update(
482		txn: &mut impl WithInterceptors,
483		ringbuffer: &RingBuffer,
484		id: RowNumber,
485		row: EncodedRow,
486	) -> Result<EncodedRow> {
487		let ctx = RingBufferRowPreUpdateContext::new(ringbuffer, id, row);
488		txn.ringbuffer_row_pre_update_interceptors().execute(ctx)
489	}
490
491	pub fn post_update(
492		txn: &mut impl WithInterceptors,
493		ringbuffer: &RingBuffer,
494		id: RowNumber,
495		post: &EncodedRow,
496		pre: &EncodedRow,
497	) -> Result<()> {
498		let ctx = RingBufferRowPostUpdateContext::new(ringbuffer, id, post, pre);
499		txn.ringbuffer_row_post_update_interceptors().execute(ctx)
500	}
501
502	pub fn pre_delete(txn: &mut impl WithInterceptors, ringbuffer: &RingBuffer, id: RowNumber) -> Result<()> {
503		let ctx = RingBufferRowPreDeleteContext::new(ringbuffer, id);
504		txn.ringbuffer_row_pre_delete_interceptors().execute(ctx)
505	}
506
507	pub fn post_delete(
508		txn: &mut impl WithInterceptors,
509		ringbuffer: &RingBuffer,
510		id: RowNumber,
511		deleted_row: &EncodedRow,
512	) -> Result<()> {
513		let ctx = RingBufferRowPostDeleteContext::new(ringbuffer, id, deleted_row);
514		txn.ringbuffer_row_post_delete_interceptors().execute(ctx)
515	}
516}