use std::sync::{Arc, LazyLock};
use reifydb_core::{
interface::{
catalog::flow::FlowNodeId,
change::{Change, Diff},
},
internal_err,
value::column::columns::Columns,
};
use reifydb_engine::{
expression::{
compile::{CompiledExpr, compile_expression},
context::{CompileContext, EvalSession},
},
vm::stack::SymbolTable,
};
use reifydb_routine::function::registry::Functions;
use reifydb_rql::expression::Expression;
use reifydb_runtime::context::RuntimeContext;
use reifydb_type::{
Result,
params::Params,
value::{Value, identity::IdentityId, row_number::RowNumber},
};
use crate::{
operator::{Operator, Operators},
transaction::FlowTransaction,
};
static EMPTY_PARAMS: Params = Params::None;
static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
pub struct FilterOperator {
parent: Arc<Operators>,
node: FlowNodeId,
compiled_conditions: Vec<CompiledExpr>,
functions: Functions,
runtime_context: RuntimeContext,
}
impl FilterOperator {
pub fn new(
parent: Arc<Operators>,
node: FlowNodeId,
conditions: Vec<Expression>,
functions: Functions,
runtime_context: RuntimeContext,
) -> Self {
let compile_ctx = CompileContext {
functions: &functions,
symbols: &EMPTY_SYMBOL_TABLE,
};
let compiled_conditions: Vec<CompiledExpr> = conditions
.iter()
.map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile filter condition"))
.collect();
Self {
parent,
node,
compiled_conditions,
functions,
runtime_context,
}
}
fn evaluate(&self, columns: &Columns) -> Result<Vec<bool>> {
let row_count = columns.row_count();
if row_count == 0 {
return Ok(Vec::new());
}
let session = EvalSession {
params: &EMPTY_PARAMS,
symbols: &EMPTY_SYMBOL_TABLE,
functions: &self.functions,
runtime_context: &self.runtime_context,
arena: None,
identity: IdentityId::root(),
is_aggregate_context: false,
};
let exec_ctx = session.eval(columns.clone(), row_count);
let mut mask = vec![true; row_count];
for compiled_condition in &self.compiled_conditions {
let result_col = compiled_condition.execute(&exec_ctx)?;
for row_idx in 0..row_count {
if mask[row_idx] {
match result_col.data().get_value(row_idx) {
Value::Boolean(true) => {}
Value::Boolean(false) => mask[row_idx] = false,
result => {
return internal_err!(
"Filter condition did not evaluate to boolean, got: {:?}",
result
);
}
}
}
}
}
Ok(mask)
}
fn filter_passing(&self, columns: &Columns, mask: &[bool]) -> Columns {
let passing_indices: Vec<usize> =
mask.iter().enumerate().filter(|&(_, pass)| *pass).map(|(idx, _)| idx).collect();
if passing_indices.is_empty() {
Columns::empty()
} else {
columns.extract_by_indices(&passing_indices)
}
}
fn filter_failing(&self, columns: &Columns, mask: &[bool]) -> Columns {
let failing_indices: Vec<usize> =
mask.iter().enumerate().filter(|&(_, pass)| !*pass).map(|(idx, _)| idx).collect();
if failing_indices.is_empty() {
Columns::empty()
} else {
columns.extract_by_indices(&failing_indices)
}
}
}
impl Operator for FilterOperator {
fn id(&self) -> FlowNodeId {
self.node
}
fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> Result<Change> {
let mut result = Vec::new();
for diff in change.diffs {
match diff {
Diff::Insert {
post,
} => {
let mask = self.evaluate(&post)?;
let passing = self.filter_passing(&post, &mask);
if !passing.is_empty() {
result.push(Diff::Insert {
post: passing,
});
}
}
Diff::Update {
pre,
post,
} => {
let mask = self.evaluate(&post)?;
let passing = self.filter_passing(&post, &mask);
let failing = self.filter_failing(&post, &mask);
if !passing.is_empty() {
let passing_indices: Vec<usize> = mask
.iter()
.enumerate()
.filter(|&(_, pass)| *pass)
.map(|(idx, _)| idx)
.collect();
let pre_passing = pre.extract_by_indices(&passing_indices);
result.push(Diff::Update {
pre: pre_passing,
post: passing,
});
}
if !failing.is_empty() {
let failing_indices: Vec<usize> = mask
.iter()
.enumerate()
.filter(|&(_, pass)| !*pass)
.map(|(idx, _)| idx)
.collect();
let pre_failing = pre.extract_by_indices(&failing_indices);
result.push(Diff::Remove {
pre: pre_failing,
});
}
}
Diff::Remove {
pre,
} => {
result.push(Diff::Remove {
pre,
});
}
}
}
Ok(Change::from_flow(self.node, change.version, result))
}
fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
self.parent.pull(txn, rows)
}
}