reifydb_sub_flow/subsystem/
intercept.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use reifydb_core::{
8	Result,
9	interceptor::{
10		Interceptors, PreCommitContext, PreCommitInterceptor, RegisterInterceptor, RingBufferPostDeleteContext,
11		RingBufferPostDeleteInterceptor, RingBufferPostInsertContext, RingBufferPostInsertInterceptor,
12		RingBufferPostUpdateContext, RingBufferPostUpdateInterceptor, TablePostDeleteContext,
13		TablePostDeleteInterceptor, TablePostInsertContext, TablePostInsertInterceptor, TablePostUpdateContext,
14		TablePostUpdateInterceptor,
15	},
16	interface::CommandTransaction,
17	ioc::{IocContainer, LazyResolveArc},
18};
19use reifydb_engine::StandardEngine;
20use reifydb_type::RowNumber;
21use tokio::sync::Mutex;
22
23/// Event type for flow processing
24#[derive(Debug, Clone)]
25pub(crate) enum Change {
26	Insert {
27		row_number: RowNumber,
28		post: Vec<u8>,
29	},
30	Update {
31		row_number: RowNumber,
32		pre: Vec<u8>,
33		post: Vec<u8>,
34	},
35	Delete {
36		row_number: RowNumber,
37		pre: Vec<u8>,
38	},
39}
40
41pub struct TransactionalFlowInterceptor {
42	engine: LazyResolveArc<StandardEngine>,
43	ioc: IocContainer,
44	// Transaction-scoped change buffer
45	changes: Arc<Mutex<Vec<Change>>>,
46}
47
48impl TransactionalFlowInterceptor {
49	pub fn new(ioc: IocContainer) -> Self {
50		Self {
51			engine: LazyResolveArc::new(),
52			ioc,
53			changes: Arc::new(Mutex::new(Vec::new())),
54		}
55	}
56}
57
58impl Clone for TransactionalFlowInterceptor {
59	fn clone(&self) -> Self {
60		Self {
61			engine: self.engine.clone(),
62			ioc: self.ioc.clone(),
63			changes: Arc::clone(&self.changes),
64		}
65	}
66}
67
68#[async_trait]
69impl<CT: CommandTransaction + Send> TablePostInsertInterceptor<CT> for TransactionalFlowInterceptor {
70	async fn intercept<'a>(&self, ctx: &mut TablePostInsertContext<'a, CT>) -> Result<()> {
71		self.changes.lock().await.push(Change::Insert {
72			row_number: ctx.id,
73			post: ctx.row.to_vec(),
74		});
75
76		Ok(())
77	}
78}
79
80#[async_trait]
81impl<CT: CommandTransaction + Send> TablePostUpdateInterceptor<CT> for TransactionalFlowInterceptor {
82	async fn intercept<'a>(&self, ctx: &mut TablePostUpdateContext<'a, CT>) -> Result<()> {
83		self.changes.lock().await.push(Change::Update {
84			row_number: ctx.id,
85			pre: ctx.old_row.to_vec(),
86			post: ctx.row.to_vec(),
87		});
88		Ok(())
89	}
90}
91
92#[async_trait]
93impl<CT: CommandTransaction + Send> TablePostDeleteInterceptor<CT> for TransactionalFlowInterceptor {
94	async fn intercept<'a>(&self, ctx: &mut TablePostDeleteContext<'a, CT>) -> Result<()> {
95		self.changes.lock().await.push(Change::Delete {
96			row_number: ctx.id,
97			pre: ctx.deleted_row.to_vec(),
98		});
99		Ok(())
100	}
101}
102
103#[async_trait]
104impl<CT: CommandTransaction + Send> RingBufferPostInsertInterceptor<CT> for TransactionalFlowInterceptor {
105	async fn intercept<'a>(&self, ctx: &mut RingBufferPostInsertContext<'a, CT>) -> Result<()> {
106		self.changes.lock().await.push(Change::Insert {
107			row_number: ctx.id,
108			post: ctx.row.to_vec(),
109		});
110
111		Ok(())
112	}
113}
114
115#[async_trait]
116impl<CT: CommandTransaction + Send> RingBufferPostUpdateInterceptor<CT> for TransactionalFlowInterceptor {
117	async fn intercept<'a>(&self, ctx: &mut RingBufferPostUpdateContext<'a, CT>) -> Result<()> {
118		self.changes.lock().await.push(Change::Update {
119			row_number: ctx.id,
120			pre: ctx.old_row.to_vec(),
121			post: ctx.row.to_vec(),
122		});
123		Ok(())
124	}
125}
126
127#[async_trait]
128impl<CT: CommandTransaction + Send> RingBufferPostDeleteInterceptor<CT> for TransactionalFlowInterceptor {
129	async fn intercept<'a>(&self, ctx: &mut RingBufferPostDeleteContext<'a, CT>) -> Result<()> {
130		self.changes.lock().await.push(Change::Delete {
131			row_number: ctx.id,
132			pre: ctx.deleted_row.to_vec(),
133		});
134		Ok(())
135	}
136}
137
138#[async_trait]
139impl<CT: CommandTransaction + Send> PreCommitInterceptor<CT> for TransactionalFlowInterceptor {
140	async fn intercept<'a>(&self, _ctx: &mut PreCommitContext<'a, CT>) -> Result<()> {
141		let _engine = self.engine.get_or_resolve(&self.ioc)?;
142
143		// Process all collected changes through flow engine
144		let changes = self.changes.lock().await;
145		if !changes.is_empty() {
146			// TODO: Convert FlowChange to flow engine Change format
147			// and process through flow engine
148			// for change in changes.drain(..) {
149			// 	debug!("Intercepted change: {:?}", change);
150			// 	// The flow engine will be accessed via the
151			// engine/subsystem 	// This interceptor collects
152			// changes for the flow engine }
153		}
154
155		Ok(())
156	}
157}
158
159impl<CT: CommandTransaction + Send + 'static> RegisterInterceptor<CT> for TransactionalFlowInterceptor {
160	fn register(self: Arc<Self>, interceptors: &mut Interceptors<CT>) {
161		interceptors.table_post_insert.add(self.clone());
162		interceptors.table_post_update.add(self.clone());
163		interceptors.table_post_delete.add(self.clone());
164		interceptors.ringbuffer_post_insert.add(self.clone());
165		interceptors.ringbuffer_post_update.add(self.clone());
166		interceptors.ringbuffer_post_delete.add(self.clone());
167		interceptors.pre_commit.add(self);
168	}
169}