reifydb_sub_flow/operator/
gate.rs1use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7 encoded::{key::EncodedKey, row::EncodedRow},
8 interface::{
9 catalog::flow::FlowNodeId,
10 change::{Change, Diff},
11 },
12 value::column::columns::Columns,
13};
14use reifydb_engine::{
15 expression::{
16 compile::{CompiledExpr, compile_expression},
17 context::{CompileContext, EvalSession},
18 },
19 vm::stack::SymbolTable,
20};
21use reifydb_routine::function::registry::Functions;
22use reifydb_rql::expression::Expression;
23use reifydb_runtime::context::RuntimeContext;
24use reifydb_type::{
25 Result,
26 params::Params,
27 util::cowvec::CowVec,
28 value::{Value, identity::IdentityId, row_number::RowNumber},
29};
30
31use crate::{
32 operator::{Operator, Operators, stateful::raw::RawStatefulOperator},
33 transaction::FlowTransaction,
34};
35
36static EMPTY_PARAMS: Params = Params::None;
37static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
38
39static VISIBLE_MARKER: LazyLock<EncodedRow> = LazyLock::new(|| EncodedRow(CowVec::new(vec![1])));
41
42pub struct GateOperator {
43 parent: Arc<Operators>,
44 node: FlowNodeId,
45 compiled_conditions: Vec<CompiledExpr>,
46 functions: Functions,
47 runtime_context: RuntimeContext,
48}
49
50impl GateOperator {
51 pub fn new(
52 parent: Arc<Operators>,
53 node: FlowNodeId,
54 conditions: Vec<Expression>,
55 functions: Functions,
56 runtime_context: RuntimeContext,
57 ) -> Self {
58 let compile_ctx = CompileContext {
59 functions: &functions,
60 symbols: &EMPTY_SYMBOL_TABLE,
61 };
62 let compiled_conditions: Vec<CompiledExpr> = conditions
63 .iter()
64 .map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile gate condition"))
65 .collect();
66
67 Self {
68 parent,
69 node,
70 compiled_conditions,
71 functions,
72 runtime_context,
73 }
74 }
75
76 fn evaluate(&self, columns: &Columns) -> Result<Vec<bool>> {
79 let row_count = columns.row_count();
80 if row_count == 0 {
81 return Ok(Vec::new());
82 }
83
84 let session = EvalSession {
85 params: &EMPTY_PARAMS,
86 symbols: &EMPTY_SYMBOL_TABLE,
87 functions: &self.functions,
88 runtime_context: &self.runtime_context,
89 arena: None,
90 identity: IdentityId::root(),
91 is_aggregate_context: false,
92 };
93 let exec_ctx = session.eval(columns.clone(), row_count);
94
95 let mut mask = vec![true; row_count];
96
97 for compiled_condition in &self.compiled_conditions {
98 let result_col = compiled_condition.execute(&exec_ctx)?;
99
100 for row_idx in 0..row_count {
101 if mask[row_idx] {
102 match result_col.data().get_value(row_idx) {
103 Value::Boolean(true) => {}
104 Value::Boolean(false) => mask[row_idx] = false,
105 _ => mask[row_idx] = false,
106 }
107 }
108 }
109 }
110
111 Ok(mask)
112 }
113
114 fn row_number_key(rn: RowNumber) -> EncodedKey {
115 EncodedKey::new(rn.0.to_be_bytes().to_vec())
116 }
117
118 fn is_visible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<bool> {
119 Ok(self.state_get(txn, &Self::row_number_key(rn))?.is_some())
120 }
121
122 fn mark_visible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<()> {
123 self.state_set(txn, &Self::row_number_key(rn), VISIBLE_MARKER.clone())
124 }
125
126 fn mark_invisible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<()> {
127 self.state_remove(txn, &Self::row_number_key(rn))
128 }
129}
130
131impl RawStatefulOperator for GateOperator {}
132
133impl Operator for GateOperator {
134 fn id(&self) -> FlowNodeId {
135 self.node
136 }
137
138 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
139 let mut result = Vec::new();
140
141 for diff in change.diffs {
142 match diff {
143 Diff::Insert {
144 post,
145 } => {
146 if post.row_numbers.is_empty() {
148 let mask = self.evaluate(&post)?;
149 let passing_indices: Vec<usize> = mask
150 .iter()
151 .enumerate()
152 .filter(|&(_, pass)| *pass)
153 .map(|(idx, _)| idx)
154 .collect();
155 if !passing_indices.is_empty() {
156 result.push(Diff::Insert {
157 post: post.extract_by_indices(&passing_indices),
158 });
159 }
160 } else {
161 let mask = self.evaluate(&post)?;
163 let mut passing_indices = Vec::new();
164 for (i, &pass) in mask.iter().enumerate() {
165 let rn = post.row_numbers[i];
166 if pass {
167 self.mark_visible(txn, rn)?;
168 passing_indices.push(i);
169 }
170 }
172 if !passing_indices.is_empty() {
173 result.push(Diff::Insert {
174 post: post.extract_by_indices(&passing_indices),
175 });
176 }
177 }
178 }
179
180 Diff::Update {
181 pre,
182 post,
183 } => {
184 if post.row_numbers.is_empty() {
185 result.push(Diff::Update {
187 pre,
188 post,
189 });
190 } else {
191 let mask = self.evaluate(&post)?;
192 let mut update_indices = Vec::new();
193 let mut insert_indices = Vec::new();
194
195 for i in 0..post.row_numbers.len() {
196 let rn = post.row_numbers[i];
197 let visible = self.is_visible(txn, rn)?;
198
199 if visible {
200 update_indices.push(i);
202 } else {
203 if mask[i] {
205 self.mark_visible(txn, rn)?;
207 insert_indices.push(i);
208 }
209 }
211 }
212
213 if !update_indices.is_empty() {
214 result.push(Diff::Update {
215 pre: pre.extract_by_indices(&update_indices),
216 post: post.extract_by_indices(&update_indices),
217 });
218 }
219 if !insert_indices.is_empty() {
220 result.push(Diff::Insert {
221 post: post.extract_by_indices(&insert_indices),
222 });
223 }
224 }
225 }
226
227 Diff::Remove {
228 pre,
229 } => {
230 if pre.row_numbers.is_empty() {
231 result.push(Diff::Remove {
233 pre,
234 });
235 } else {
236 let mut remove_indices = Vec::new();
237 for i in 0..pre.row_numbers.len() {
238 let rn = pre.row_numbers[i];
239 if self.is_visible(txn, rn)? {
240 self.mark_invisible(txn, rn)?;
241 remove_indices.push(i);
242 }
243 }
245
246 if !remove_indices.is_empty() {
247 result.push(Diff::Remove {
248 pre: pre.extract_by_indices(&remove_indices),
249 });
250 }
251 }
252 }
253 }
254 }
255
256 Ok(Change::from_flow(self.node, change.version, result))
257 }
258
259 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
260 self.parent.pull(txn, rows)
261 }
262}