reifydb_sub_flow/subsystem/
intercept.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{cell::RefCell, rc::Rc};
5
6use reifydb_core::{
7	Result,
8	interceptor::{
9		Interceptors, PreCommitContext, PreCommitInterceptor, RegisterInterceptor, RingBufferPostDeleteContext,
10		RingBufferPostDeleteInterceptor, RingBufferPostInsertContext, RingBufferPostInsertInterceptor,
11		RingBufferPostUpdateContext, RingBufferPostUpdateInterceptor, TablePostDeleteContext,
12		TablePostDeleteInterceptor, TablePostInsertContext, TablePostInsertInterceptor, TablePostUpdateContext,
13		TablePostUpdateInterceptor,
14	},
15	interface::{CommandTransaction, SourceId},
16	ioc::{IocContainer, LazyResolveRc},
17};
18use reifydb_engine::StandardEngine;
19use reifydb_type::RowNumber;
20
21/// Event type for flow processing
22#[derive(Debug, Clone)]
23pub(crate) enum Change {
24	Insert {
25		_source_id: SourceId,
26		row_number: RowNumber,
27		post: Vec<u8>,
28	},
29	Update {
30		_source_id: SourceId,
31		row_number: RowNumber,
32		pre: Vec<u8>,
33		post: Vec<u8>,
34	},
35	Delete {
36		_source_id: SourceId,
37		row_number: RowNumber,
38		pre: Vec<u8>,
39	},
40}
41
42pub struct TransactionalFlowInterceptor {
43	engine: LazyResolveRc<StandardEngine>,
44	ioc: IocContainer,
45	// Transaction-scoped change buffer
46	changes: Rc<RefCell<Vec<Change>>>,
47}
48
49impl TransactionalFlowInterceptor {
50	pub fn new(ioc: IocContainer) -> Self {
51		Self {
52			engine: LazyResolveRc::new(),
53			ioc,
54			changes: Rc::new(RefCell::new(Vec::new())),
55		}
56	}
57}
58
59impl Clone for TransactionalFlowInterceptor {
60	fn clone(&self) -> Self {
61		Self {
62			engine: self.engine.clone(),
63			ioc: self.ioc.clone(),
64			changes: Rc::clone(&self.changes),
65		}
66	}
67}
68
69impl<CT: CommandTransaction> TablePostInsertInterceptor<CT> for TransactionalFlowInterceptor {
70	fn intercept(&self, ctx: &mut TablePostInsertContext<CT>) -> Result<()> {
71		self.changes.borrow_mut().push(Change::Insert {
72			_source_id: SourceId::from(ctx.table.id),
73			row_number: ctx.id,
74			post: ctx.row.to_vec(),
75		});
76
77		Ok(())
78	}
79}
80
81impl<CT: CommandTransaction> TablePostUpdateInterceptor<CT> for TransactionalFlowInterceptor {
82	fn intercept(&self, ctx: &mut TablePostUpdateContext<CT>) -> Result<()> {
83		self.changes.borrow_mut().push(Change::Update {
84			_source_id: SourceId::from(ctx.table.id),
85			row_number: ctx.id,
86			pre: ctx.old_row.to_vec(),
87			post: ctx.row.to_vec(),
88		});
89		Ok(())
90	}
91}
92
93impl<CT: CommandTransaction> TablePostDeleteInterceptor<CT> for TransactionalFlowInterceptor {
94	fn intercept(&self, ctx: &mut TablePostDeleteContext<CT>) -> Result<()> {
95		self.changes.borrow_mut().push(Change::Delete {
96			_source_id: SourceId::from(ctx.table.id),
97			row_number: ctx.id,
98			pre: ctx.deleted_row.to_vec(),
99		});
100		Ok(())
101	}
102}
103
104impl<CT: CommandTransaction> RingBufferPostInsertInterceptor<CT> for TransactionalFlowInterceptor {
105	fn intercept(&self, ctx: &mut RingBufferPostInsertContext<CT>) -> Result<()> {
106		self.changes.borrow_mut().push(Change::Insert {
107			_source_id: SourceId::from(ctx.ring_buffer.id),
108			row_number: ctx.id,
109			post: ctx.row.to_vec(),
110		});
111
112		Ok(())
113	}
114}
115
116impl<CT: CommandTransaction> RingBufferPostUpdateInterceptor<CT> for TransactionalFlowInterceptor {
117	fn intercept(&self, ctx: &mut RingBufferPostUpdateContext<CT>) -> Result<()> {
118		self.changes.borrow_mut().push(Change::Update {
119			_source_id: SourceId::from(ctx.ring_buffer.id),
120			row_number: ctx.id,
121			pre: ctx.old_row.to_vec(),
122			post: ctx.row.to_vec(),
123		});
124		Ok(())
125	}
126}
127
128impl<CT: CommandTransaction> RingBufferPostDeleteInterceptor<CT> for TransactionalFlowInterceptor {
129	fn intercept(&self, ctx: &mut RingBufferPostDeleteContext<CT>) -> Result<()> {
130		self.changes.borrow_mut().push(Change::Delete {
131			_source_id: SourceId::from(ctx.ring_buffer.id),
132			row_number: ctx.id,
133			pre: ctx.deleted_row.to_vec(),
134		});
135		Ok(())
136	}
137}
138
139impl<CT: CommandTransaction> PreCommitInterceptor<CT> for TransactionalFlowInterceptor {
140	fn intercept(&self, _ctx: &mut PreCommitContext<CT>) -> Result<()> {
141		let _engine = self.engine.get_or_resolve(&self.ioc)?;
142
143		// Process all collected changes through flow engine
144		let changes = self.changes.borrow_mut();
145		if !changes.is_empty() {
146			// TODO: Convert FlowChange to flow engine Change format
147			// and process through flow engine
148			// for change in changes.drain(..) {
149			// 	log_debug!("Intercepted change: {:?}", change);
150			// 	// The flow engine will be accessed via the
151			// engine/subsystem 	// This interceptor collects
152			// changes for the flow engine }
153		}
154
155		Ok(())
156	}
157}
158
159impl<CT: CommandTransaction> RegisterInterceptor<CT> for TransactionalFlowInterceptor {
160	fn register(self: Rc<Self>, interceptors: &mut Interceptors<CT>) {
161		interceptors.table_post_insert.add(self.clone());
162		interceptors.table_post_update.add(self.clone());
163		interceptors.table_post_delete.add(self.clone());
164		interceptors.ring_buffer_post_insert.add(self.clone());
165		interceptors.ring_buffer_post_update.add(self.clone());
166		interceptors.ring_buffer_post_delete.add(self.clone());
167		interceptors.pre_commit.add(self);
168	}
169}