reifydb_sub_flow/subsystem/
intercept.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use reifydb_core::{
8 Result,
9 interceptor::{
10 Interceptors, PreCommitContext, PreCommitInterceptor, RegisterInterceptor, RingBufferPostDeleteContext,
11 RingBufferPostDeleteInterceptor, RingBufferPostInsertContext, RingBufferPostInsertInterceptor,
12 RingBufferPostUpdateContext, RingBufferPostUpdateInterceptor, TablePostDeleteContext,
13 TablePostDeleteInterceptor, TablePostInsertContext, TablePostInsertInterceptor, TablePostUpdateContext,
14 TablePostUpdateInterceptor,
15 },
16 interface::CommandTransaction,
17 ioc::{IocContainer, LazyResolveArc},
18};
19use reifydb_engine::StandardEngine;
20use reifydb_type::RowNumber;
21use tokio::sync::Mutex;
22
23#[derive(Debug, Clone)]
25pub(crate) enum Change {
26 Insert {
27 row_number: RowNumber,
28 post: Vec<u8>,
29 },
30 Update {
31 row_number: RowNumber,
32 pre: Vec<u8>,
33 post: Vec<u8>,
34 },
35 Delete {
36 row_number: RowNumber,
37 pre: Vec<u8>,
38 },
39}
40
41pub struct TransactionalFlowInterceptor {
42 engine: LazyResolveArc<StandardEngine>,
43 ioc: IocContainer,
44 changes: Arc<Mutex<Vec<Change>>>,
46}
47
48impl TransactionalFlowInterceptor {
49 pub fn new(ioc: IocContainer) -> Self {
50 Self {
51 engine: LazyResolveArc::new(),
52 ioc,
53 changes: Arc::new(Mutex::new(Vec::new())),
54 }
55 }
56}
57
58impl Clone for TransactionalFlowInterceptor {
59 fn clone(&self) -> Self {
60 Self {
61 engine: self.engine.clone(),
62 ioc: self.ioc.clone(),
63 changes: Arc::clone(&self.changes),
64 }
65 }
66}
67
68#[async_trait]
69impl<CT: CommandTransaction + Send> TablePostInsertInterceptor<CT> for TransactionalFlowInterceptor {
70 async fn intercept<'a>(&self, ctx: &mut TablePostInsertContext<'a, CT>) -> Result<()> {
71 self.changes.lock().await.push(Change::Insert {
72 row_number: ctx.id,
73 post: ctx.row.to_vec(),
74 });
75
76 Ok(())
77 }
78}
79
80#[async_trait]
81impl<CT: CommandTransaction + Send> TablePostUpdateInterceptor<CT> for TransactionalFlowInterceptor {
82 async fn intercept<'a>(&self, ctx: &mut TablePostUpdateContext<'a, CT>) -> Result<()> {
83 self.changes.lock().await.push(Change::Update {
84 row_number: ctx.id,
85 pre: ctx.old_row.to_vec(),
86 post: ctx.row.to_vec(),
87 });
88 Ok(())
89 }
90}
91
92#[async_trait]
93impl<CT: CommandTransaction + Send> TablePostDeleteInterceptor<CT> for TransactionalFlowInterceptor {
94 async fn intercept<'a>(&self, ctx: &mut TablePostDeleteContext<'a, CT>) -> Result<()> {
95 self.changes.lock().await.push(Change::Delete {
96 row_number: ctx.id,
97 pre: ctx.deleted_row.to_vec(),
98 });
99 Ok(())
100 }
101}
102
103#[async_trait]
104impl<CT: CommandTransaction + Send> RingBufferPostInsertInterceptor<CT> for TransactionalFlowInterceptor {
105 async fn intercept<'a>(&self, ctx: &mut RingBufferPostInsertContext<'a, CT>) -> Result<()> {
106 self.changes.lock().await.push(Change::Insert {
107 row_number: ctx.id,
108 post: ctx.row.to_vec(),
109 });
110
111 Ok(())
112 }
113}
114
115#[async_trait]
116impl<CT: CommandTransaction + Send> RingBufferPostUpdateInterceptor<CT> for TransactionalFlowInterceptor {
117 async fn intercept<'a>(&self, ctx: &mut RingBufferPostUpdateContext<'a, CT>) -> Result<()> {
118 self.changes.lock().await.push(Change::Update {
119 row_number: ctx.id,
120 pre: ctx.old_row.to_vec(),
121 post: ctx.row.to_vec(),
122 });
123 Ok(())
124 }
125}
126
127#[async_trait]
128impl<CT: CommandTransaction + Send> RingBufferPostDeleteInterceptor<CT> for TransactionalFlowInterceptor {
129 async fn intercept<'a>(&self, ctx: &mut RingBufferPostDeleteContext<'a, CT>) -> Result<()> {
130 self.changes.lock().await.push(Change::Delete {
131 row_number: ctx.id,
132 pre: ctx.deleted_row.to_vec(),
133 });
134 Ok(())
135 }
136}
137
138#[async_trait]
139impl<CT: CommandTransaction + Send> PreCommitInterceptor<CT> for TransactionalFlowInterceptor {
140 async fn intercept<'a>(&self, _ctx: &mut PreCommitContext<'a, CT>) -> Result<()> {
141 let _engine = self.engine.get_or_resolve(&self.ioc)?;
142
143 let changes = self.changes.lock().await;
145 if !changes.is_empty() {
146 }
154
155 Ok(())
156 }
157}
158
159impl<CT: CommandTransaction + Send + 'static> RegisterInterceptor<CT> for TransactionalFlowInterceptor {
160 fn register(self: Arc<Self>, interceptors: &mut Interceptors<CT>) {
161 interceptors.table_post_insert.add(self.clone());
162 interceptors.table_post_update.add(self.clone());
163 interceptors.table_post_delete.add(self.clone());
164 interceptors.ringbuffer_post_insert.add(self.clone());
165 interceptors.ringbuffer_post_update.add(self.clone());
166 interceptors.ringbuffer_post_delete.add(self.clone());
167 interceptors.pre_commit.add(self);
168 }
169}