reifydb_sub_flow/operator/
gate.rs1use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7 encoded::{encoded::EncodedValues, key::EncodedKey},
8 interface::{
9 catalog::flow::FlowNodeId,
10 change::{Change, Diff},
11 },
12 value::column::columns::Columns,
13};
14use reifydb_engine::{
15 expression::{
16 compile::{CompiledExpr, compile_expression},
17 context::{CompileContext, EvalContext},
18 },
19 vm::stack::SymbolTable,
20};
21use reifydb_function::registry::Functions;
22use reifydb_rql::expression::Expression;
23use reifydb_runtime::clock::Clock;
24use reifydb_type::{
25 Result,
26 params::Params,
27 util::cowvec::CowVec,
28 value::{Value, identity::IdentityId, row_number::RowNumber},
29};
30
31use crate::{
32 operator::{Operator, Operators, stateful::raw::RawStatefulOperator},
33 transaction::FlowTransaction,
34};
35
36static EMPTY_PARAMS: Params = Params::None;
37static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
38
39static VISIBLE_MARKER: LazyLock<EncodedValues> = LazyLock::new(|| EncodedValues(CowVec::new(vec![1])));
41
42pub struct GateOperator {
43 parent: Arc<Operators>,
44 node: FlowNodeId,
45 compiled_conditions: Vec<CompiledExpr>,
46 functions: Functions,
47 clock: Clock,
48}
49
50impl GateOperator {
51 pub fn new(
52 parent: Arc<Operators>,
53 node: FlowNodeId,
54 conditions: Vec<Expression>,
55 functions: Functions,
56 clock: Clock,
57 ) -> Self {
58 let compile_ctx = CompileContext {
59 functions: &functions,
60 symbol_table: &EMPTY_SYMBOL_TABLE,
61 };
62 let compiled_conditions: Vec<CompiledExpr> = conditions
63 .iter()
64 .map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile gate condition"))
65 .collect();
66
67 Self {
68 parent,
69 node,
70 compiled_conditions,
71 functions,
72 clock,
73 }
74 }
75
76 fn evaluate(&self, columns: &Columns) -> Result<Vec<bool>> {
79 let row_count = columns.row_count();
80 if row_count == 0 {
81 return Ok(Vec::new());
82 }
83
84 let exec_ctx = EvalContext {
85 target: None,
86 columns: columns.clone(),
87 row_count,
88 take: None,
89 params: &EMPTY_PARAMS,
90 symbol_table: &EMPTY_SYMBOL_TABLE,
91 is_aggregate_context: false,
92 functions: &self.functions,
93 clock: &self.clock,
94 arena: None,
95 identity: IdentityId::root(),
96 };
97
98 let mut mask = vec![true; row_count];
99
100 for compiled_condition in &self.compiled_conditions {
101 let result_col = compiled_condition.execute(&exec_ctx)?;
102
103 for row_idx in 0..row_count {
104 if mask[row_idx] {
105 match result_col.data().get_value(row_idx) {
106 Value::Boolean(true) => {}
107 Value::Boolean(false) => mask[row_idx] = false,
108 _ => mask[row_idx] = false,
109 }
110 }
111 }
112 }
113
114 Ok(mask)
115 }
116
117 fn row_number_key(rn: RowNumber) -> EncodedKey {
118 EncodedKey::new(rn.0.to_be_bytes().to_vec())
119 }
120
121 fn is_visible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<bool> {
122 Ok(self.state_get(txn, &Self::row_number_key(rn))?.is_some())
123 }
124
125 fn mark_visible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<()> {
126 self.state_set(txn, &Self::row_number_key(rn), VISIBLE_MARKER.clone())
127 }
128
129 fn mark_invisible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<()> {
130 self.state_remove(txn, &Self::row_number_key(rn))
131 }
132}
133
134impl RawStatefulOperator for GateOperator {}
135
136impl Operator for GateOperator {
137 fn id(&self) -> FlowNodeId {
138 self.node
139 }
140
141 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
142 let mut result = Vec::new();
143
144 for diff in change.diffs {
145 match diff {
146 Diff::Insert {
147 post,
148 } => {
149 if post.row_numbers.is_empty() {
151 let mask = self.evaluate(&post)?;
152 let passing_indices: Vec<usize> = mask
153 .iter()
154 .enumerate()
155 .filter(|&(_, pass)| *pass)
156 .map(|(idx, _)| idx)
157 .collect();
158 if !passing_indices.is_empty() {
159 result.push(Diff::Insert {
160 post: post.extract_by_indices(&passing_indices),
161 });
162 }
163 } else {
164 let mask = self.evaluate(&post)?;
166 let mut passing_indices = Vec::new();
167 for (i, &pass) in mask.iter().enumerate() {
168 let rn = post.row_numbers[i];
169 if pass {
170 self.mark_visible(txn, rn)?;
171 passing_indices.push(i);
172 }
173 }
175 if !passing_indices.is_empty() {
176 result.push(Diff::Insert {
177 post: post.extract_by_indices(&passing_indices),
178 });
179 }
180 }
181 }
182
183 Diff::Update {
184 pre,
185 post,
186 } => {
187 if post.row_numbers.is_empty() {
188 result.push(Diff::Update {
190 pre,
191 post,
192 });
193 } else {
194 let mask = self.evaluate(&post)?;
195 let mut update_indices = Vec::new();
196 let mut insert_indices = Vec::new();
197
198 for i in 0..post.row_numbers.len() {
199 let rn = post.row_numbers[i];
200 let visible = self.is_visible(txn, rn)?;
201
202 if visible {
203 update_indices.push(i);
205 } else {
206 if mask[i] {
208 self.mark_visible(txn, rn)?;
210 insert_indices.push(i);
211 }
212 }
214 }
215
216 if !update_indices.is_empty() {
217 result.push(Diff::Update {
218 pre: pre.extract_by_indices(&update_indices),
219 post: post.extract_by_indices(&update_indices),
220 });
221 }
222 if !insert_indices.is_empty() {
223 result.push(Diff::Insert {
224 post: post.extract_by_indices(&insert_indices),
225 });
226 }
227 }
228 }
229
230 Diff::Remove {
231 pre,
232 } => {
233 if pre.row_numbers.is_empty() {
234 result.push(Diff::Remove {
236 pre,
237 });
238 } else {
239 let mut remove_indices = Vec::new();
240 for i in 0..pre.row_numbers.len() {
241 let rn = pre.row_numbers[i];
242 if self.is_visible(txn, rn)? {
243 self.mark_invisible(txn, rn)?;
244 remove_indices.push(i);
245 }
246 }
248
249 if !remove_indices.is_empty() {
250 result.push(Diff::Remove {
251 pre: pre.extract_by_indices(&remove_indices),
252 });
253 }
254 }
255 }
256 }
257 }
258
259 Ok(Change::from_flow(self.node, change.version, result))
260 }
261
262 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
263 self.parent.pull(txn, rows)
264 }
265}