Skip to main content

reifydb_sub_flow/operator/
gate.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7	encoded::{encoded::EncodedValues, key::EncodedKey},
8	interface::{
9		catalog::flow::FlowNodeId,
10		change::{Change, Diff},
11	},
12	value::column::columns::Columns,
13};
14use reifydb_engine::{
15	expression::{
16		compile::{CompiledExpr, compile_expression},
17		context::{CompileContext, EvalContext},
18	},
19	vm::stack::SymbolTable,
20};
21use reifydb_function::registry::Functions;
22use reifydb_rql::expression::Expression;
23use reifydb_runtime::clock::Clock;
24use reifydb_type::{
25	Result,
26	params::Params,
27	util::cowvec::CowVec,
28	value::{Value, identity::IdentityId, row_number::RowNumber},
29};
30
31use crate::{
32	operator::{Operator, Operators, stateful::raw::RawStatefulOperator},
33	transaction::FlowTransaction,
34};
35
36static EMPTY_PARAMS: Params = Params::None;
37static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
38
39/// A sentinel value stored to mark a row as "visible" (latch open).
40static VISIBLE_MARKER: LazyLock<EncodedValues> = LazyLock::new(|| EncodedValues(CowVec::new(vec![1])));
41
42pub struct GateOperator {
43	parent: Arc<Operators>,
44	node: FlowNodeId,
45	compiled_conditions: Vec<CompiledExpr>,
46	functions: Functions,
47	clock: Clock,
48}
49
50impl GateOperator {
51	pub fn new(
52		parent: Arc<Operators>,
53		node: FlowNodeId,
54		conditions: Vec<Expression>,
55		functions: Functions,
56		clock: Clock,
57	) -> Self {
58		let compile_ctx = CompileContext {
59			functions: &functions,
60			symbol_table: &EMPTY_SYMBOL_TABLE,
61		};
62		let compiled_conditions: Vec<CompiledExpr> = conditions
63			.iter()
64			.map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile gate condition"))
65			.collect();
66
67		Self {
68			parent,
69			node,
70			compiled_conditions,
71			functions,
72			clock,
73		}
74	}
75
76	/// Evaluate conditions on all rows in Columns.
77	/// Returns a boolean mask indicating which rows pass the conditions.
78	fn evaluate(&self, columns: &Columns) -> Result<Vec<bool>> {
79		let row_count = columns.row_count();
80		if row_count == 0 {
81			return Ok(Vec::new());
82		}
83
84		let exec_ctx = EvalContext {
85			target: None,
86			columns: columns.clone(),
87			row_count,
88			take: None,
89			params: &EMPTY_PARAMS,
90			symbol_table: &EMPTY_SYMBOL_TABLE,
91			is_aggregate_context: false,
92			functions: &self.functions,
93			clock: &self.clock,
94			arena: None,
95			identity: IdentityId::root(),
96		};
97
98		let mut mask = vec![true; row_count];
99
100		for compiled_condition in &self.compiled_conditions {
101			let result_col = compiled_condition.execute(&exec_ctx)?;
102
103			for row_idx in 0..row_count {
104				if mask[row_idx] {
105					match result_col.data().get_value(row_idx) {
106						Value::Boolean(true) => {}
107						Value::Boolean(false) => mask[row_idx] = false,
108						_ => mask[row_idx] = false,
109					}
110				}
111			}
112		}
113
114		Ok(mask)
115	}
116
117	fn row_number_key(rn: RowNumber) -> EncodedKey {
118		EncodedKey::new(rn.0.to_be_bytes().to_vec())
119	}
120
121	fn is_visible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<bool> {
122		Ok(self.state_get(txn, &Self::row_number_key(rn))?.is_some())
123	}
124
125	fn mark_visible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<()> {
126		self.state_set(txn, &Self::row_number_key(rn), VISIBLE_MARKER.clone())
127	}
128
129	fn mark_invisible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<()> {
130		self.state_remove(txn, &Self::row_number_key(rn))
131	}
132}
133
134impl RawStatefulOperator for GateOperator {}
135
136impl Operator for GateOperator {
137	fn id(&self) -> FlowNodeId {
138		self.node
139	}
140
141	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
142		let mut result = Vec::new();
143
144		for diff in change.diffs {
145			match diff {
146				Diff::Insert {
147					post,
148				} => {
149					// No row_numbers? pass through as filter
150					if post.row_numbers.is_empty() {
151						let mask = self.evaluate(&post)?;
152						let passing_indices: Vec<usize> = mask
153							.iter()
154							.enumerate()
155							.filter(|&(_, pass)| *pass)
156							.map(|(idx, _)| idx)
157							.collect();
158						if !passing_indices.is_empty() {
159							result.push(Diff::Insert {
160								post: post.extract_by_indices(&passing_indices),
161							});
162						}
163					} else {
164						// Evaluate condition per row
165						let mask = self.evaluate(&post)?;
166						let mut passing_indices = Vec::new();
167						for (i, &pass) in mask.iter().enumerate() {
168							let rn = post.row_numbers[i];
169							if pass {
170								self.mark_visible(txn, rn)?;
171								passing_indices.push(i);
172							}
173							// if not pass: drop (latch stays closed)
174						}
175						if !passing_indices.is_empty() {
176							result.push(Diff::Insert {
177								post: post.extract_by_indices(&passing_indices),
178							});
179						}
180					}
181				}
182
183				Diff::Update {
184					pre,
185					post,
186				} => {
187					if post.row_numbers.is_empty() {
188						// No state info available — treat as visible (pass-through)
189						result.push(Diff::Update {
190							pre,
191							post,
192						});
193					} else {
194						let mask = self.evaluate(&post)?;
195						let mut update_indices = Vec::new();
196						let mut insert_indices = Vec::new();
197
198						for i in 0..post.row_numbers.len() {
199							let rn = post.row_numbers[i];
200							let visible = self.is_visible(txn, rn)?;
201
202							if visible {
203								// Already open — pass through as Update unconditionally
204								update_indices.push(i);
205							} else {
206								// Not yet open — check condition on post
207								if mask[i] {
208									// Open the latch, emit as Insert
209									self.mark_visible(txn, rn)?;
210									insert_indices.push(i);
211								}
212								// else: still fails — drop
213							}
214						}
215
216						if !update_indices.is_empty() {
217							result.push(Diff::Update {
218								pre: pre.extract_by_indices(&update_indices),
219								post: post.extract_by_indices(&update_indices),
220							});
221						}
222						if !insert_indices.is_empty() {
223							result.push(Diff::Insert {
224								post: post.extract_by_indices(&insert_indices),
225							});
226						}
227					}
228				}
229
230				Diff::Remove {
231					pre,
232				} => {
233					if pre.row_numbers.is_empty() {
234						// No state info available — treat as visible (pass-through)
235						result.push(Diff::Remove {
236							pre,
237						});
238					} else {
239						let mut remove_indices = Vec::new();
240						for i in 0..pre.row_numbers.len() {
241							let rn = pre.row_numbers[i];
242							if self.is_visible(txn, rn)? {
243								self.mark_invisible(txn, rn)?;
244								remove_indices.push(i);
245							}
246							// else: was never visible — drop
247						}
248
249						if !remove_indices.is_empty() {
250							result.push(Diff::Remove {
251								pre: pre.extract_by_indices(&remove_indices),
252							});
253						}
254					}
255				}
256			}
257		}
258
259		Ok(Change::from_flow(self.node, change.version, result))
260	}
261
262	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
263		self.parent.pull(txn, rows)
264	}
265}