reifydb_core/interceptor/
interceptor.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use async_trait::async_trait;
5use reifydb_type::RowNumber;
6
7use crate::{
8	CommitVersion,
9	interceptor::{
10		NamespaceDefPostCreateContext, NamespaceDefPostUpdateContext, NamespaceDefPreDeleteContext,
11		NamespaceDefPreUpdateContext, PostCommitContext, PreCommitContext, RingBufferDefPostCreateContext,
12		RingBufferDefPostUpdateContext, RingBufferDefPreDeleteContext, RingBufferDefPreUpdateContext,
13		RingBufferPostDeleteContext, RingBufferPostInsertContext, RingBufferPostUpdateContext,
14		RingBufferPreDeleteContext, RingBufferPreInsertContext, RingBufferPreUpdateContext,
15		TableDefPostCreateContext, TableDefPostUpdateContext, TableDefPreDeleteContext,
16		TableDefPreUpdateContext, TablePostDeleteContext, TablePostInsertContext, TablePostUpdateContext,
17		TablePreDeleteContext, TablePreInsertContext, TablePreUpdateContext, ViewDefPostCreateContext,
18		ViewDefPostUpdateContext, ViewDefPreDeleteContext, ViewDefPreUpdateContext,
19	},
20	interface::{
21		CommandTransaction, NamespaceDef, RingBufferDef, RowChange, TableDef, TransactionId,
22		TransactionalDefChanges, ViewDef,
23		interceptor::{
24			NamespaceDefInterceptor, RingBufferDefInterceptor, RingBufferInterceptor, TableDefInterceptor,
25			TableInterceptor, TransactionInterceptor, ViewDefInterceptor, WithInterceptors,
26		},
27	},
28	value::encoded::EncodedValues,
29};
30
31#[async_trait]
32impl<CT: CommandTransaction + WithInterceptors<CT> + Send> TableInterceptor<CT> for CT {
33	async fn pre_insert(&mut self, table: &TableDef, rn: RowNumber, row: &EncodedValues) -> crate::Result<()> {
34		if self.table_pre_insert_interceptors().is_empty() {
35			return Ok(());
36		}
37		let interceptors = self.table_pre_insert_interceptors().interceptors.clone();
38		let mut ctx = TablePreInsertContext::new(self, table, rn, row);
39		for interceptor in interceptors {
40			interceptor.intercept(&mut ctx).await?;
41		}
42		Ok(())
43	}
44
45	async fn post_insert(&mut self, table: &TableDef, id: RowNumber, row: &EncodedValues) -> crate::Result<()> {
46		if self.table_post_insert_interceptors().is_empty() {
47			return Ok(());
48		}
49		let interceptors = self.table_post_insert_interceptors().interceptors.clone();
50		let mut ctx = TablePostInsertContext::new(self, table, id, row);
51		for interceptor in interceptors {
52			interceptor.intercept(&mut ctx).await?;
53		}
54		Ok(())
55	}
56
57	async fn pre_update(&mut self, table: &TableDef, id: RowNumber, row: &EncodedValues) -> crate::Result<()> {
58		if self.table_pre_update_interceptors().is_empty() {
59			return Ok(());
60		}
61		let interceptors = self.table_pre_update_interceptors().interceptors.clone();
62		let mut ctx = TablePreUpdateContext::new(self, table, id, row);
63		for interceptor in interceptors {
64			interceptor.intercept(&mut ctx).await?;
65		}
66		Ok(())
67	}
68
69	async fn post_update(
70		&mut self,
71		table: &TableDef,
72		id: RowNumber,
73		row: &EncodedValues,
74		old_row: &EncodedValues,
75	) -> crate::Result<()> {
76		if self.table_post_update_interceptors().is_empty() {
77			return Ok(());
78		}
79		let interceptors = self.table_post_update_interceptors().interceptors.clone();
80		let mut ctx = TablePostUpdateContext::new(self, table, id, row, old_row);
81		for interceptor in interceptors {
82			interceptor.intercept(&mut ctx).await?;
83		}
84		Ok(())
85	}
86
87	async fn pre_delete(&mut self, table: &TableDef, id: RowNumber) -> crate::Result<()> {
88		if self.table_pre_delete_interceptors().is_empty() {
89			return Ok(());
90		}
91		let interceptors = self.table_pre_delete_interceptors().interceptors.clone();
92		let mut ctx = TablePreDeleteContext::new(self, table, id);
93		for interceptor in interceptors {
94			interceptor.intercept(&mut ctx).await?;
95		}
96		Ok(())
97	}
98
99	async fn post_delete(
100		&mut self,
101		table: &TableDef,
102		id: RowNumber,
103		deleted_row: &EncodedValues,
104	) -> crate::Result<()> {
105		if self.table_post_delete_interceptors().is_empty() {
106			return Ok(());
107		}
108		let interceptors = self.table_post_delete_interceptors().interceptors.clone();
109		let mut ctx = TablePostDeleteContext::new(self, table, id, deleted_row);
110		for interceptor in interceptors {
111			interceptor.intercept(&mut ctx).await?;
112		}
113		Ok(())
114	}
115}
116
117#[async_trait]
118impl<CT: CommandTransaction + WithInterceptors<CT> + Send> RingBufferInterceptor<CT> for CT {
119	async fn pre_insert(&mut self, ringbuffer: &RingBufferDef, row: &EncodedValues) -> crate::Result<()> {
120		if self.ringbuffer_pre_insert_interceptors().is_empty() {
121			return Ok(());
122		}
123		let interceptors = self.ringbuffer_pre_insert_interceptors().interceptors.clone();
124		let mut ctx = RingBufferPreInsertContext::new(self, ringbuffer, row);
125		for interceptor in interceptors {
126			interceptor.intercept(&mut ctx).await?;
127		}
128		Ok(())
129	}
130
131	async fn post_insert(
132		&mut self,
133		ringbuffer: &RingBufferDef,
134		id: RowNumber,
135		row: &EncodedValues,
136	) -> crate::Result<()> {
137		if self.ringbuffer_post_insert_interceptors().is_empty() {
138			return Ok(());
139		}
140		let interceptors = self.ringbuffer_post_insert_interceptors().interceptors.clone();
141		let mut ctx = RingBufferPostInsertContext::new(self, ringbuffer, id, row);
142		for interceptor in interceptors {
143			interceptor.intercept(&mut ctx).await?;
144		}
145		Ok(())
146	}
147
148	async fn pre_update(
149		&mut self,
150		ringbuffer: &RingBufferDef,
151		id: RowNumber,
152		row: &EncodedValues,
153	) -> crate::Result<()> {
154		if self.ringbuffer_pre_update_interceptors().is_empty() {
155			return Ok(());
156		}
157		let interceptors = self.ringbuffer_pre_update_interceptors().interceptors.clone();
158		let mut ctx = RingBufferPreUpdateContext::new(self, ringbuffer, id, row);
159		for interceptor in interceptors {
160			interceptor.intercept(&mut ctx).await?;
161		}
162		Ok(())
163	}
164
165	async fn post_update(
166		&mut self,
167		ringbuffer: &RingBufferDef,
168		id: RowNumber,
169		row: &EncodedValues,
170		old_row: &EncodedValues,
171	) -> crate::Result<()> {
172		if self.ringbuffer_post_update_interceptors().is_empty() {
173			return Ok(());
174		}
175		let interceptors = self.ringbuffer_post_update_interceptors().interceptors.clone();
176		let mut ctx = RingBufferPostUpdateContext::new(self, ringbuffer, id, row, old_row);
177		for interceptor in interceptors {
178			interceptor.intercept(&mut ctx).await?;
179		}
180		Ok(())
181	}
182
183	async fn pre_delete(&mut self, ringbuffer: &RingBufferDef, id: RowNumber) -> crate::Result<()> {
184		if self.ringbuffer_pre_delete_interceptors().is_empty() {
185			return Ok(());
186		}
187		let interceptors = self.ringbuffer_pre_delete_interceptors().interceptors.clone();
188		let mut ctx = RingBufferPreDeleteContext::new(self, ringbuffer, id);
189		for interceptor in interceptors {
190			interceptor.intercept(&mut ctx).await?;
191		}
192		Ok(())
193	}
194
195	async fn post_delete(
196		&mut self,
197		ringbuffer: &RingBufferDef,
198		id: RowNumber,
199		deleted_row: &EncodedValues,
200	) -> crate::Result<()> {
201		if self.ringbuffer_post_delete_interceptors().is_empty() {
202			return Ok(());
203		}
204		let interceptors = self.ringbuffer_post_delete_interceptors().interceptors.clone();
205		let mut ctx = RingBufferPostDeleteContext::new(self, ringbuffer, id, deleted_row);
206		for interceptor in interceptors {
207			interceptor.intercept(&mut ctx).await?;
208		}
209		Ok(())
210	}
211}
212
213#[async_trait]
214impl<CT: CommandTransaction + WithInterceptors<CT> + Send> NamespaceDefInterceptor<CT> for CT {
215	async fn post_create(&mut self, post: &NamespaceDef) -> crate::Result<()> {
216		if self.namespace_def_post_create_interceptors().is_empty() {
217			return Ok(());
218		}
219		let interceptors = self.namespace_def_post_create_interceptors().interceptors.clone();
220		let mut ctx = NamespaceDefPostCreateContext::new(self, post);
221		for interceptor in interceptors {
222			interceptor.intercept(&mut ctx).await?;
223		}
224		Ok(())
225	}
226
227	async fn pre_update(&mut self, pre: &NamespaceDef) -> crate::Result<()> {
228		if self.namespace_def_pre_update_interceptors().is_empty() {
229			return Ok(());
230		}
231		let interceptors = self.namespace_def_pre_update_interceptors().interceptors.clone();
232		let mut ctx = NamespaceDefPreUpdateContext::new(self, pre);
233		for interceptor in interceptors {
234			interceptor.intercept(&mut ctx).await?;
235		}
236		Ok(())
237	}
238
239	async fn post_update(&mut self, pre: &NamespaceDef, post: &NamespaceDef) -> crate::Result<()> {
240		if self.namespace_def_post_update_interceptors().is_empty() {
241			return Ok(());
242		}
243		let interceptors = self.namespace_def_post_update_interceptors().interceptors.clone();
244		let mut ctx = NamespaceDefPostUpdateContext::new(self, pre, post);
245		for interceptor in interceptors {
246			interceptor.intercept(&mut ctx).await?;
247		}
248		Ok(())
249	}
250
251	async fn pre_delete(&mut self, pre: &NamespaceDef) -> crate::Result<()> {
252		if self.namespace_def_pre_delete_interceptors().is_empty() {
253			return Ok(());
254		}
255		let interceptors = self.namespace_def_pre_delete_interceptors().interceptors.clone();
256		let mut ctx = NamespaceDefPreDeleteContext::new(self, pre);
257		for interceptor in interceptors {
258			interceptor.intercept(&mut ctx).await?;
259		}
260		Ok(())
261	}
262}
263
264#[async_trait]
265impl<CT: CommandTransaction + WithInterceptors<CT> + Send> TableDefInterceptor<CT> for CT {
266	async fn post_create(&mut self, post: &TableDef) -> crate::Result<()> {
267		if self.table_def_post_create_interceptors().is_empty() {
268			return Ok(());
269		}
270		let interceptors = self.table_def_post_create_interceptors().interceptors.clone();
271		let mut ctx = TableDefPostCreateContext::new(self, post);
272		for interceptor in interceptors {
273			interceptor.intercept(&mut ctx).await?;
274		}
275		Ok(())
276	}
277
278	async fn pre_update(&mut self, pre: &TableDef) -> crate::Result<()> {
279		if self.table_def_pre_update_interceptors().is_empty() {
280			return Ok(());
281		}
282		let interceptors = self.table_def_pre_update_interceptors().interceptors.clone();
283		let mut ctx = TableDefPreUpdateContext::new(self, pre);
284		for interceptor in interceptors {
285			interceptor.intercept(&mut ctx).await?;
286		}
287		Ok(())
288	}
289
290	async fn post_update(&mut self, pre: &TableDef, post: &TableDef) -> crate::Result<()> {
291		if self.table_def_post_update_interceptors().is_empty() {
292			return Ok(());
293		}
294		let interceptors = self.table_def_post_update_interceptors().interceptors.clone();
295		let mut ctx = TableDefPostUpdateContext::new(self, pre, post);
296		for interceptor in interceptors {
297			interceptor.intercept(&mut ctx).await?;
298		}
299		Ok(())
300	}
301
302	async fn pre_delete(&mut self, pre: &TableDef) -> crate::Result<()> {
303		if self.table_def_pre_delete_interceptors().is_empty() {
304			return Ok(());
305		}
306		let interceptors = self.table_def_pre_delete_interceptors().interceptors.clone();
307		let mut ctx = TableDefPreDeleteContext::new(self, pre);
308		for interceptor in interceptors {
309			interceptor.intercept(&mut ctx).await?;
310		}
311		Ok(())
312	}
313}
314
315#[async_trait]
316impl<CT: CommandTransaction + WithInterceptors<CT> + Send> ViewDefInterceptor<CT> for CT {
317	async fn post_create(&mut self, post: &ViewDef) -> crate::Result<()> {
318		if self.view_def_post_create_interceptors().is_empty() {
319			return Ok(());
320		}
321		let interceptors = self.view_def_post_create_interceptors().interceptors.clone();
322		let mut ctx = ViewDefPostCreateContext::new(self, post);
323		for interceptor in interceptors {
324			interceptor.intercept(&mut ctx).await?;
325		}
326		Ok(())
327	}
328
329	async fn pre_update(&mut self, pre: &ViewDef) -> crate::Result<()> {
330		if self.view_def_pre_update_interceptors().is_empty() {
331			return Ok(());
332		}
333		let interceptors = self.view_def_pre_update_interceptors().interceptors.clone();
334		let mut ctx = ViewDefPreUpdateContext::new(self, pre);
335		for interceptor in interceptors {
336			interceptor.intercept(&mut ctx).await?;
337		}
338		Ok(())
339	}
340
341	async fn post_update(&mut self, pre: &ViewDef, post: &ViewDef) -> crate::Result<()> {
342		if self.view_def_post_update_interceptors().is_empty() {
343			return Ok(());
344		}
345		let interceptors = self.view_def_post_update_interceptors().interceptors.clone();
346		let mut ctx = ViewDefPostUpdateContext::new(self, pre, post);
347		for interceptor in interceptors {
348			interceptor.intercept(&mut ctx).await?;
349		}
350		Ok(())
351	}
352
353	async fn pre_delete(&mut self, pre: &ViewDef) -> crate::Result<()> {
354		if self.view_def_pre_delete_interceptors().is_empty() {
355			return Ok(());
356		}
357		let interceptors = self.view_def_pre_delete_interceptors().interceptors.clone();
358		let mut ctx = ViewDefPreDeleteContext::new(self, pre);
359		for interceptor in interceptors {
360			interceptor.intercept(&mut ctx).await?;
361		}
362		Ok(())
363	}
364}
365
366#[async_trait]
367impl<CT: CommandTransaction + WithInterceptors<CT> + Send> RingBufferDefInterceptor<CT> for CT {
368	async fn post_create(&mut self, post: &RingBufferDef) -> crate::Result<()> {
369		if self.ringbuffer_def_post_create_interceptors().is_empty() {
370			return Ok(());
371		}
372		let interceptors = self.ringbuffer_def_post_create_interceptors().interceptors.clone();
373		let mut ctx = RingBufferDefPostCreateContext::new(self, post);
374		for interceptor in interceptors {
375			interceptor.intercept(&mut ctx).await?;
376		}
377		Ok(())
378	}
379
380	async fn pre_update(&mut self, pre: &RingBufferDef) -> crate::Result<()> {
381		if self.ringbuffer_def_pre_update_interceptors().is_empty() {
382			return Ok(());
383		}
384		let interceptors = self.ringbuffer_def_pre_update_interceptors().interceptors.clone();
385		let mut ctx = RingBufferDefPreUpdateContext::new(self, pre);
386		for interceptor in interceptors {
387			interceptor.intercept(&mut ctx).await?;
388		}
389		Ok(())
390	}
391
392	async fn post_update(&mut self, pre: &RingBufferDef, post: &RingBufferDef) -> crate::Result<()> {
393		if self.ringbuffer_def_post_update_interceptors().is_empty() {
394			return Ok(());
395		}
396		let interceptors = self.ringbuffer_def_post_update_interceptors().interceptors.clone();
397		let mut ctx = RingBufferDefPostUpdateContext::new(self, pre, post);
398		for interceptor in interceptors {
399			interceptor.intercept(&mut ctx).await?;
400		}
401		Ok(())
402	}
403
404	async fn pre_delete(&mut self, pre: &RingBufferDef) -> crate::Result<()> {
405		if self.ringbuffer_def_pre_delete_interceptors().is_empty() {
406			return Ok(());
407		}
408		let interceptors = self.ringbuffer_def_pre_delete_interceptors().interceptors.clone();
409		let mut ctx = RingBufferDefPreDeleteContext::new(self, pre);
410		for interceptor in interceptors {
411			interceptor.intercept(&mut ctx).await?;
412		}
413		Ok(())
414	}
415}
416
417#[async_trait]
418impl<CT: CommandTransaction + WithInterceptors<CT> + Send> TransactionInterceptor<CT> for CT {
419	async fn pre_commit(&mut self) -> crate::Result<()> {
420		if self.pre_commit_interceptors().is_empty() {
421			return Ok(());
422		}
423		let interceptors = self.pre_commit_interceptors().interceptors.clone();
424		let mut ctx = PreCommitContext::new(self);
425		for interceptor in interceptors {
426			interceptor.intercept(&mut ctx).await?;
427		}
428		Ok(())
429	}
430
431	async fn post_commit(
432		&mut self,
433		id: TransactionId,
434		version: CommitVersion,
435		changes: TransactionalDefChanges,
436		row_changes: Vec<RowChange>,
437	) -> crate::Result<()> {
438		if self.post_commit_interceptors().is_empty() {
439			return Ok(());
440		}
441		let interceptors = self.post_commit_interceptors().interceptors.clone();
442		let mut ctx = PostCommitContext::new(id, version, changes, row_changes);
443		for interceptor in interceptors {
444			interceptor.intercept(&mut ctx).await?;
445		}
446		Ok(())
447	}
448}