reifydb_sub_flow/operator/
gate.rs1use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7 encoded::{key::EncodedKey, row::EncodedRow},
8 interface::{
9 catalog::flow::FlowNodeId,
10 change::{Change, Diff},
11 },
12 value::column::columns::Columns,
13};
14use reifydb_engine::{
15 expression::{
16 compile::{CompiledExpr, compile_expression},
17 context::{CompileContext, EvalContext},
18 },
19 vm::stack::SymbolTable,
20};
21use reifydb_routine::routine::registry::Routines;
22use reifydb_rql::expression::Expression;
23use reifydb_runtime::context::RuntimeContext;
24use reifydb_type::{
25 Result,
26 params::Params,
27 util::cowvec::CowVec,
28 value::{Value, identity::IdentityId, row_number::RowNumber},
29};
30
31use crate::{
32 operator::{Operator, Operators, stateful::raw::RawStatefulOperator},
33 transaction::FlowTransaction,
34};
35
36static EMPTY_PARAMS: Params = Params::None;
37static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(SymbolTable::new);
38
39static VISIBLE_MARKER: LazyLock<EncodedRow> = LazyLock::new(|| EncodedRow(CowVec::new(vec![1])));
41
42pub struct GateOperator {
43 parent: Arc<Operators>,
44 node: FlowNodeId,
45 compiled_conditions: Vec<CompiledExpr>,
46 routines: Routines,
47 runtime_context: RuntimeContext,
48}
49
50impl GateOperator {
51 pub fn new(
52 parent: Arc<Operators>,
53 node: FlowNodeId,
54 conditions: Vec<Expression>,
55 routines: Routines,
56 runtime_context: RuntimeContext,
57 ) -> Self {
58 let compile_ctx = CompileContext {
59 symbols: &EMPTY_SYMBOL_TABLE,
60 };
61 let compiled_conditions: Vec<CompiledExpr> = conditions
62 .iter()
63 .map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile gate condition"))
64 .collect();
65
66 Self {
67 parent,
68 node,
69 compiled_conditions,
70 routines,
71 runtime_context,
72 }
73 }
74
75 fn evaluate(&self, columns: &Columns) -> Result<Vec<bool>> {
78 let row_count = columns.row_count();
79 if row_count == 0 {
80 return Ok(Vec::new());
81 }
82
83 let session = EvalContext {
84 params: &EMPTY_PARAMS,
85 symbols: &EMPTY_SYMBOL_TABLE,
86 routines: &self.routines,
87 runtime_context: &self.runtime_context,
88 arena: None,
89 identity: IdentityId::root(),
90 is_aggregate_context: false,
91 columns: Columns::empty(),
92 row_count: 1,
93 target: None,
94 take: None,
95 };
96 let exec_ctx = session.with_eval(columns.clone(), row_count);
97
98 let mut mask = vec![true; row_count];
99
100 for compiled_condition in &self.compiled_conditions {
101 let result_col = compiled_condition.execute(&exec_ctx)?;
102
103 for (row_idx, mask_val) in mask.iter_mut().enumerate() {
104 if *mask_val {
105 match result_col.data().get_value(row_idx) {
106 Value::Boolean(true) => {}
107 Value::Boolean(false) => *mask_val = false,
108 _ => *mask_val = false,
109 }
110 }
111 }
112 }
113
114 Ok(mask)
115 }
116
117 fn row_number_key(rn: RowNumber) -> EncodedKey {
118 EncodedKey::new(rn.0.to_be_bytes().to_vec())
119 }
120
121 fn is_visible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<bool> {
122 Ok(self.state_get(txn, &Self::row_number_key(rn))?.is_some())
123 }
124
125 fn mark_visible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<()> {
126 self.state_set(txn, &Self::row_number_key(rn), VISIBLE_MARKER.clone())
127 }
128
129 fn mark_invisible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<()> {
130 self.state_remove(txn, &Self::row_number_key(rn))
131 }
132}
133
134impl RawStatefulOperator for GateOperator {}
135
136impl Operator for GateOperator {
137 fn id(&self) -> FlowNodeId {
138 self.node
139 }
140
141 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
142 let mut result = Vec::new();
143
144 for diff in change.diffs {
145 match diff {
146 Diff::Insert {
147 post,
148 } => self.apply_gate_insert(txn, &post, &mut result)?,
149 Diff::Update {
150 pre,
151 post,
152 } => self.apply_gate_update(txn, pre, post, &mut result)?,
153 Diff::Remove {
154 pre,
155 } => self.apply_gate_remove(txn, pre, &mut result)?,
156 }
157 }
158
159 Ok(Change::from_flow(self.node, change.version, result, change.changed_at))
160 }
161
162 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
163 self.parent.pull(txn, rows)
164 }
165}
166
167impl GateOperator {
168 #[inline]
169 fn apply_gate_insert(
170 &self,
171 txn: &mut FlowTransaction,
172 post: &Arc<Columns>,
173 result: &mut Vec<Diff>,
174 ) -> Result<()> {
175 if post.row_numbers.is_empty() {
177 let mask = self.evaluate(post)?;
178 let passing_indices: Vec<usize> =
179 mask.iter().enumerate().filter(|&(_, pass)| *pass).map(|(idx, _)| idx).collect();
180 if !passing_indices.is_empty() {
181 result.push(Diff::insert(post.extract_by_indices(&passing_indices)));
182 }
183 return Ok(());
184 }
185
186 let mask = self.evaluate(post)?;
187 let mut passing_indices = Vec::new();
188 for (i, &pass) in mask.iter().enumerate() {
189 let rn = post.row_numbers[i];
190 if pass {
191 self.mark_visible(txn, rn)?;
192 passing_indices.push(i);
193 }
194 }
196 if !passing_indices.is_empty() {
197 result.push(Diff::insert(post.extract_by_indices(&passing_indices)));
198 }
199 Ok(())
200 }
201
202 #[inline]
203 fn apply_gate_update(
204 &self,
205 txn: &mut FlowTransaction,
206 pre: Arc<Columns>,
207 post: Arc<Columns>,
208 result: &mut Vec<Diff>,
209 ) -> Result<()> {
210 if post.row_numbers.is_empty() {
213 result.push(Diff::Update {
214 pre,
215 post,
216 });
217 return Ok(());
218 }
219
220 let mask = self.evaluate(&post)?;
221 let mut update_indices = Vec::new();
222 let mut insert_indices = Vec::new();
223
224 for (i, (&rn, &mask_val)) in post.row_numbers.iter().zip(mask.iter()).enumerate() {
225 if self.is_visible(txn, rn)? {
226 update_indices.push(i);
228 } else if mask_val {
229 self.mark_visible(txn, rn)?;
231 insert_indices.push(i);
232 }
233 }
235
236 if !update_indices.is_empty() {
237 result.push(Diff::update(
238 pre.extract_by_indices(&update_indices),
239 post.extract_by_indices(&update_indices),
240 ));
241 }
242 if !insert_indices.is_empty() {
243 result.push(Diff::insert(post.extract_by_indices(&insert_indices)));
244 }
245 Ok(())
246 }
247
248 #[inline]
249 fn apply_gate_remove(
250 &self,
251 txn: &mut FlowTransaction,
252 pre: Arc<Columns>,
253 result: &mut Vec<Diff>,
254 ) -> Result<()> {
255 if pre.row_numbers.is_empty() {
257 result.push(Diff::Remove {
258 pre,
259 });
260 return Ok(());
261 }
262
263 let mut remove_indices = Vec::new();
264 for i in 0..pre.row_numbers.len() {
265 let rn = pre.row_numbers[i];
266 if self.is_visible(txn, rn)? {
267 self.mark_invisible(txn, rn)?;
268 remove_indices.push(i);
269 }
270 }
272
273 if !remove_indices.is_empty() {
274 result.push(Diff::remove(pre.extract_by_indices(&remove_indices)));
275 }
276 Ok(())
277 }
278}