reifydb_sub_flow/subsystem/
intercept.rs1use std::{cell::RefCell, rc::Rc};
5
6use reifydb_core::{
7 Result,
8 interceptor::{
9 Interceptors, PreCommitContext, PreCommitInterceptor, RegisterInterceptor, RingBufferPostDeleteContext,
10 RingBufferPostDeleteInterceptor, RingBufferPostInsertContext, RingBufferPostInsertInterceptor,
11 RingBufferPostUpdateContext, RingBufferPostUpdateInterceptor, TablePostDeleteContext,
12 TablePostDeleteInterceptor, TablePostInsertContext, TablePostInsertInterceptor, TablePostUpdateContext,
13 TablePostUpdateInterceptor,
14 },
15 interface::{CommandTransaction, SourceId},
16 ioc::{IocContainer, LazyResolveRc},
17};
18use reifydb_engine::StandardEngine;
19use reifydb_type::RowNumber;
20
21#[derive(Debug, Clone)]
23pub(crate) enum Change {
24 Insert {
25 _source_id: SourceId,
26 row_number: RowNumber,
27 post: Vec<u8>,
28 },
29 Update {
30 _source_id: SourceId,
31 row_number: RowNumber,
32 pre: Vec<u8>,
33 post: Vec<u8>,
34 },
35 Delete {
36 _source_id: SourceId,
37 row_number: RowNumber,
38 pre: Vec<u8>,
39 },
40}
41
42pub struct TransactionalFlowInterceptor {
43 engine: LazyResolveRc<StandardEngine>,
44 ioc: IocContainer,
45 changes: Rc<RefCell<Vec<Change>>>,
47}
48
49impl TransactionalFlowInterceptor {
50 pub fn new(ioc: IocContainer) -> Self {
51 Self {
52 engine: LazyResolveRc::new(),
53 ioc,
54 changes: Rc::new(RefCell::new(Vec::new())),
55 }
56 }
57}
58
59impl Clone for TransactionalFlowInterceptor {
60 fn clone(&self) -> Self {
61 Self {
62 engine: self.engine.clone(),
63 ioc: self.ioc.clone(),
64 changes: Rc::clone(&self.changes),
65 }
66 }
67}
68
69impl<CT: CommandTransaction> TablePostInsertInterceptor<CT> for TransactionalFlowInterceptor {
70 fn intercept(&self, ctx: &mut TablePostInsertContext<CT>) -> Result<()> {
71 self.changes.borrow_mut().push(Change::Insert {
72 _source_id: SourceId::from(ctx.table.id),
73 row_number: ctx.id,
74 post: ctx.row.to_vec(),
75 });
76
77 Ok(())
78 }
79}
80
81impl<CT: CommandTransaction> TablePostUpdateInterceptor<CT> for TransactionalFlowInterceptor {
82 fn intercept(&self, ctx: &mut TablePostUpdateContext<CT>) -> Result<()> {
83 self.changes.borrow_mut().push(Change::Update {
84 _source_id: SourceId::from(ctx.table.id),
85 row_number: ctx.id,
86 pre: ctx.old_row.to_vec(),
87 post: ctx.row.to_vec(),
88 });
89 Ok(())
90 }
91}
92
93impl<CT: CommandTransaction> TablePostDeleteInterceptor<CT> for TransactionalFlowInterceptor {
94 fn intercept(&self, ctx: &mut TablePostDeleteContext<CT>) -> Result<()> {
95 self.changes.borrow_mut().push(Change::Delete {
96 _source_id: SourceId::from(ctx.table.id),
97 row_number: ctx.id,
98 pre: ctx.deleted_row.to_vec(),
99 });
100 Ok(())
101 }
102}
103
104impl<CT: CommandTransaction> RingBufferPostInsertInterceptor<CT> for TransactionalFlowInterceptor {
105 fn intercept(&self, ctx: &mut RingBufferPostInsertContext<CT>) -> Result<()> {
106 self.changes.borrow_mut().push(Change::Insert {
107 _source_id: SourceId::from(ctx.ring_buffer.id),
108 row_number: ctx.id,
109 post: ctx.row.to_vec(),
110 });
111
112 Ok(())
113 }
114}
115
116impl<CT: CommandTransaction> RingBufferPostUpdateInterceptor<CT> for TransactionalFlowInterceptor {
117 fn intercept(&self, ctx: &mut RingBufferPostUpdateContext<CT>) -> Result<()> {
118 self.changes.borrow_mut().push(Change::Update {
119 _source_id: SourceId::from(ctx.ring_buffer.id),
120 row_number: ctx.id,
121 pre: ctx.old_row.to_vec(),
122 post: ctx.row.to_vec(),
123 });
124 Ok(())
125 }
126}
127
128impl<CT: CommandTransaction> RingBufferPostDeleteInterceptor<CT> for TransactionalFlowInterceptor {
129 fn intercept(&self, ctx: &mut RingBufferPostDeleteContext<CT>) -> Result<()> {
130 self.changes.borrow_mut().push(Change::Delete {
131 _source_id: SourceId::from(ctx.ring_buffer.id),
132 row_number: ctx.id,
133 pre: ctx.deleted_row.to_vec(),
134 });
135 Ok(())
136 }
137}
138
139impl<CT: CommandTransaction> PreCommitInterceptor<CT> for TransactionalFlowInterceptor {
140 fn intercept(&self, _ctx: &mut PreCommitContext<CT>) -> Result<()> {
141 let _engine = self.engine.get_or_resolve(&self.ioc)?;
142
143 let changes = self.changes.borrow_mut();
145 if !changes.is_empty() {
146 }
154
155 Ok(())
156 }
157}
158
159impl<CT: CommandTransaction> RegisterInterceptor<CT> for TransactionalFlowInterceptor {
160 fn register(self: Rc<Self>, interceptors: &mut Interceptors<CT>) {
161 interceptors.table_post_insert.add(self.clone());
162 interceptors.table_post_update.add(self.clone());
163 interceptors.table_post_delete.add(self.clone());
164 interceptors.ring_buffer_post_insert.add(self.clone());
165 interceptors.ring_buffer_post_update.add(self.clone());
166 interceptors.ring_buffer_post_delete.add(self.clone());
167 interceptors.pre_commit.add(self);
168 }
169}