Skip to main content

reifydb_sub_flow/operator/
gate.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7	encoded::{key::EncodedKey, row::EncodedRow},
8	interface::{
9		catalog::flow::FlowNodeId,
10		change::{Change, Diff},
11	},
12	value::column::columns::Columns,
13};
14use reifydb_engine::{
15	expression::{
16		compile::{CompiledExpr, compile_expression},
17		context::{CompileContext, EvalContext},
18	},
19	vm::stack::SymbolTable,
20};
21use reifydb_routine::routine::registry::Routines;
22use reifydb_rql::expression::Expression;
23use reifydb_runtime::context::RuntimeContext;
24use reifydb_type::{
25	Result,
26	params::Params,
27	util::cowvec::CowVec,
28	value::{Value, identity::IdentityId, row_number::RowNumber},
29};
30
31use crate::{
32	operator::{Operator, Operators, stateful::raw::RawStatefulOperator},
33	transaction::FlowTransaction,
34};
35
36static EMPTY_PARAMS: Params = Params::None;
37static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(SymbolTable::new);
38
39/// A sentinel value stored to mark a row as "visible" (latch open).
40static VISIBLE_MARKER: LazyLock<EncodedRow> = LazyLock::new(|| EncodedRow(CowVec::new(vec![1])));
41
42pub struct GateOperator {
43	parent: Arc<Operators>,
44	node: FlowNodeId,
45	compiled_conditions: Vec<CompiledExpr>,
46	routines: Routines,
47	runtime_context: RuntimeContext,
48}
49
50impl GateOperator {
51	pub fn new(
52		parent: Arc<Operators>,
53		node: FlowNodeId,
54		conditions: Vec<Expression>,
55		routines: Routines,
56		runtime_context: RuntimeContext,
57	) -> Self {
58		let compile_ctx = CompileContext {
59			symbols: &EMPTY_SYMBOL_TABLE,
60		};
61		let compiled_conditions: Vec<CompiledExpr> = conditions
62			.iter()
63			.map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile gate condition"))
64			.collect();
65
66		Self {
67			parent,
68			node,
69			compiled_conditions,
70			routines,
71			runtime_context,
72		}
73	}
74
75	/// Evaluate conditions on all rows in Columns.
76	/// Returns a boolean mask indicating which rows pass the conditions.
77	fn evaluate(&self, columns: &Columns) -> Result<Vec<bool>> {
78		let row_count = columns.row_count();
79		if row_count == 0 {
80			return Ok(Vec::new());
81		}
82
83		let session = EvalContext {
84			params: &EMPTY_PARAMS,
85			symbols: &EMPTY_SYMBOL_TABLE,
86			routines: &self.routines,
87			runtime_context: &self.runtime_context,
88			arena: None,
89			identity: IdentityId::root(),
90			is_aggregate_context: false,
91			columns: Columns::empty(),
92			row_count: 1,
93			target: None,
94			take: None,
95		};
96		let exec_ctx = session.with_eval(columns.clone(), row_count);
97
98		let mut mask = vec![true; row_count];
99
100		for compiled_condition in &self.compiled_conditions {
101			let result_col = compiled_condition.execute(&exec_ctx)?;
102
103			for (row_idx, mask_val) in mask.iter_mut().enumerate() {
104				if *mask_val {
105					match result_col.data().get_value(row_idx) {
106						Value::Boolean(true) => {}
107						Value::Boolean(false) => *mask_val = false,
108						_ => *mask_val = false,
109					}
110				}
111			}
112		}
113
114		Ok(mask)
115	}
116
117	fn row_number_key(rn: RowNumber) -> EncodedKey {
118		EncodedKey::new(rn.0.to_be_bytes().to_vec())
119	}
120
121	fn is_visible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<bool> {
122		Ok(self.state_get(txn, &Self::row_number_key(rn))?.is_some())
123	}
124
125	fn mark_visible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<()> {
126		self.state_set(txn, &Self::row_number_key(rn), VISIBLE_MARKER.clone())
127	}
128
129	fn mark_invisible(&self, txn: &mut FlowTransaction, rn: RowNumber) -> Result<()> {
130		self.state_remove(txn, &Self::row_number_key(rn))
131	}
132}
133
134impl RawStatefulOperator for GateOperator {}
135
136impl Operator for GateOperator {
137	fn id(&self) -> FlowNodeId {
138		self.node
139	}
140
141	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
142		let mut result = Vec::new();
143
144		for diff in change.diffs {
145			match diff {
146				Diff::Insert {
147					post,
148				} => self.apply_gate_insert(txn, &post, &mut result)?,
149				Diff::Update {
150					pre,
151					post,
152				} => self.apply_gate_update(txn, pre, post, &mut result)?,
153				Diff::Remove {
154					pre,
155				} => self.apply_gate_remove(txn, pre, &mut result)?,
156			}
157		}
158
159		Ok(Change::from_flow(self.node, change.version, result, change.changed_at))
160	}
161
162	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
163		self.parent.pull(txn, rows)
164	}
165}
166
167impl GateOperator {
168	#[inline]
169	fn apply_gate_insert(
170		&self,
171		txn: &mut FlowTransaction,
172		post: &Arc<Columns>,
173		result: &mut Vec<Diff>,
174	) -> Result<()> {
175		// Without row_numbers there is no per-row state; behave as a pure filter.
176		if post.row_numbers.is_empty() {
177			let mask = self.evaluate(post)?;
178			let passing_indices: Vec<usize> =
179				mask.iter().enumerate().filter(|&(_, pass)| *pass).map(|(idx, _)| idx).collect();
180			if !passing_indices.is_empty() {
181				result.push(Diff::insert(post.extract_by_indices(&passing_indices)));
182			}
183			return Ok(());
184		}
185
186		let mask = self.evaluate(post)?;
187		let mut passing_indices = Vec::new();
188		for (i, &pass) in mask.iter().enumerate() {
189			let rn = post.row_numbers[i];
190			if pass {
191				self.mark_visible(txn, rn)?;
192				passing_indices.push(i);
193			}
194			// not pass: drop, latch stays closed
195		}
196		if !passing_indices.is_empty() {
197			result.push(Diff::insert(post.extract_by_indices(&passing_indices)));
198		}
199		Ok(())
200	}
201
202	#[inline]
203	fn apply_gate_update(
204		&self,
205		txn: &mut FlowTransaction,
206		pre: Arc<Columns>,
207		post: Arc<Columns>,
208		result: &mut Vec<Diff>,
209	) -> Result<()> {
210		// Without row_numbers there is no per-row state to consult; treat the
211		// row as already-visible and pass the Update through unchanged.
212		if post.row_numbers.is_empty() {
213			result.push(Diff::Update {
214				pre,
215				post,
216			});
217			return Ok(());
218		}
219
220		let mask = self.evaluate(&post)?;
221		let mut update_indices = Vec::new();
222		let mut insert_indices = Vec::new();
223
224		for (i, (&rn, &mask_val)) in post.row_numbers.iter().zip(mask.iter()).enumerate() {
225			if self.is_visible(txn, rn)? {
226				// latch already open: forward Update unconditionally
227				update_indices.push(i);
228			} else if mask_val {
229				// latch closed but condition now passes: open and emit as Insert
230				self.mark_visible(txn, rn)?;
231				insert_indices.push(i);
232			}
233			// closed and still failing: drop
234		}
235
236		if !update_indices.is_empty() {
237			result.push(Diff::update(
238				pre.extract_by_indices(&update_indices),
239				post.extract_by_indices(&update_indices),
240			));
241		}
242		if !insert_indices.is_empty() {
243			result.push(Diff::insert(post.extract_by_indices(&insert_indices)));
244		}
245		Ok(())
246	}
247
248	#[inline]
249	fn apply_gate_remove(
250		&self,
251		txn: &mut FlowTransaction,
252		pre: Arc<Columns>,
253		result: &mut Vec<Diff>,
254	) -> Result<()> {
255		// Without row_numbers there is no per-row state; pass through.
256		if pre.row_numbers.is_empty() {
257			result.push(Diff::Remove {
258				pre,
259			});
260			return Ok(());
261		}
262
263		let mut remove_indices = Vec::new();
264		for i in 0..pre.row_numbers.len() {
265			let rn = pre.row_numbers[i];
266			if self.is_visible(txn, rn)? {
267				self.mark_invisible(txn, rn)?;
268				remove_indices.push(i);
269			}
270			// never visible: drop
271		}
272
273		if !remove_indices.is_empty() {
274			result.push(Diff::remove(pre.extract_by_indices(&remove_indices)));
275		}
276		Ok(())
277	}
278}