use std::sync::Arc;
use chrono::{DateTime, Utc};
use crate::core::{Operator, Row, Schema, Value};
use super::between::BetweenExpr;
use super::comparison::ComparisonExpr;
use super::function::{FunctionArg, FunctionExpr};
use super::in_list::InListExpr;
use super::like::LikeExpr;
use super::logical::{AndExpr, ConstBoolExpr, NotExpr, OrExpr};
use super::null_check::NullCheckExpr;
use super::Expression;
#[derive(Debug, Clone)]
pub enum CompiledPattern {
Exact(Arc<str>),
Prefix(Arc<str>),
Suffix(Arc<str>),
Contains(Arc<str>),
Regex(regex::Regex),
}
impl CompiledPattern {
pub fn compile(pattern: &str, case_insensitive: bool) -> Self {
let pattern = if case_insensitive {
pattern.to_lowercase()
} else {
pattern.to_string()
};
let has_leading_percent = pattern.starts_with('%');
let has_trailing_percent = pattern.ends_with('%');
let inner = pattern.trim_matches('%');
let has_inner_wildcards = inner.contains('%') || inner.contains('_');
if !has_inner_wildcards {
match (has_leading_percent, has_trailing_percent) {
(false, false) => CompiledPattern::Exact(Arc::from(inner)),
(false, true) => CompiledPattern::Prefix(Arc::from(inner)),
(true, false) => CompiledPattern::Suffix(Arc::from(inner)),
(true, true) => CompiledPattern::Contains(Arc::from(inner)),
}
} else {
let regex_pattern = like_to_regex(&pattern);
let flags = if case_insensitive { "(?i)" } else { "" };
let full_pattern = format!("^{}{}$", flags, regex_pattern);
CompiledPattern::Regex(regex::Regex::new(&full_pattern).unwrap_or_else(|_e| {
#[cfg(debug_assertions)]
eprintln!(
"Warning: Failed to compile LIKE pattern '{}' as regex '{}': {}",
pattern, full_pattern, _e
);
regex::Regex::new("^$").unwrap()
}))
}
}
#[inline(always)]
pub fn matches(&self, s: &str, case_insensitive: bool) -> bool {
use std::borrow::Cow;
let s: Cow<'_, str> = if case_insensitive {
Cow::Owned(s.to_lowercase())
} else {
Cow::Borrowed(s)
};
match self {
CompiledPattern::Exact(pattern) => s.as_ref() == pattern.as_ref(),
CompiledPattern::Prefix(prefix) => s.starts_with(prefix.as_ref()),
CompiledPattern::Suffix(suffix) => s.ends_with(suffix.as_ref()),
CompiledPattern::Contains(substr) => s.contains(substr.as_ref()),
CompiledPattern::Regex(re) => re.is_match(&s),
}
}
}
fn like_to_regex(pattern: &str) -> String {
let mut result = String::with_capacity(pattern.len() * 2);
let mut chars = pattern.chars().peekable();
while let Some(c) = chars.next() {
match c {
'%' => result.push_str(".*"),
'_' => result.push('.'),
'\\' => {
if let Some(&next) = chars.peek() {
if next == '%' || next == '_' || next == '\\' {
result.push(chars.next().unwrap());
continue;
}
}
result.push_str("\\\\");
}
'.' | '^' | '$' | '*' | '+' | '?' | '(' | ')' | '[' | ']' | '{' | '}' | '|' => {
result.push('\\');
result.push(c);
}
_ => result.push(c),
}
}
result
}
#[derive(Debug, Clone)]
pub enum CompiledFilter {
IntegerEq { col_idx: usize, value: i64 },
IntegerNe { col_idx: usize, value: i64 },
IntegerGt { col_idx: usize, value: i64 },
IntegerGte { col_idx: usize, value: i64 },
IntegerLt { col_idx: usize, value: i64 },
IntegerLte { col_idx: usize, value: i64 },
IntegerBetween { col_idx: usize, min: i64, max: i64 },
IntegerIn { col_idx: usize, values: Vec<i64> },
FloatEq { col_idx: usize, value: f64 },
FloatNe { col_idx: usize, value: f64 },
FloatGt { col_idx: usize, value: f64 },
FloatGte { col_idx: usize, value: f64 },
FloatLt { col_idx: usize, value: f64 },
FloatLte { col_idx: usize, value: f64 },
FloatBetween { col_idx: usize, min: f64, max: f64 },
StringEq { col_idx: usize, value: Arc<str> },
StringNe { col_idx: usize, value: Arc<str> },
StringGt { col_idx: usize, value: Arc<str> },
StringGte { col_idx: usize, value: Arc<str> },
StringLt { col_idx: usize, value: Arc<str> },
StringLte { col_idx: usize, value: Arc<str> },
StringIn {
col_idx: usize,
values: Vec<Arc<str>>,
},
StringLike {
col_idx: usize,
pattern: CompiledPattern,
case_insensitive: bool,
negated: bool,
},
BooleanEq { col_idx: usize, value: bool },
BooleanNe { col_idx: usize, value: bool },
TimestampEq {
col_idx: usize,
value: DateTime<Utc>,
},
TimestampNe {
col_idx: usize,
value: DateTime<Utc>,
},
TimestampGt {
col_idx: usize,
value: DateTime<Utc>,
},
TimestampGte {
col_idx: usize,
value: DateTime<Utc>,
},
TimestampLt {
col_idx: usize,
value: DateTime<Utc>,
},
TimestampLte {
col_idx: usize,
value: DateTime<Utc>,
},
TimestampBetween {
col_idx: usize,
min: DateTime<Utc>,
max: DateTime<Utc>,
},
IsNull { col_idx: usize },
IsNotNull { col_idx: usize },
And(Box<CompiledFilter>, Box<CompiledFilter>),
AndN(Vec<CompiledFilter>),
Or(Box<CompiledFilter>, Box<CompiledFilter>),
OrN(Vec<CompiledFilter>),
Not(Box<CompiledFilter>),
True,
False,
UpperEq { col_idx: usize, value: Arc<str> },
LowerEq { col_idx: usize, value: Arc<str> },
TrimEq { col_idx: usize, value: Arc<str> },
LengthEq { col_idx: usize, value: i64 },
LengthNe { col_idx: usize, value: i64 },
LengthGt { col_idx: usize, value: i64 },
LengthGte { col_idx: usize, value: i64 },
LengthLt { col_idx: usize, value: i64 },
LengthLte { col_idx: usize, value: i64 },
Dynamic(Box<dyn Expression>),
}
impl CompiledFilter {
pub fn compile(expr: &dyn Expression, schema: &Schema) -> Self {
if let Some(comparison) = expr.as_any().downcast_ref::<ComparisonExpr>() {
return Self::compile_comparison(comparison, schema);
}
if let Some(and_expr) = expr.as_any().downcast_ref::<AndExpr>() {
return Self::compile_and(and_expr, schema);
}
if let Some(or_expr) = expr.as_any().downcast_ref::<OrExpr>() {
return Self::compile_or(or_expr, schema);
}
if let Some(not_expr) = expr.as_any().downcast_ref::<NotExpr>() {
if let Some(inner) = not_expr.get_inner() {
return CompiledFilter::Not(Box::new(Self::compile(inner, schema)));
}
}
if let Some(null_check) = expr.as_any().downcast_ref::<NullCheckExpr>() {
return Self::compile_null_check(null_check, schema);
}
if let Some(in_list) = expr.as_any().downcast_ref::<InListExpr>() {
return Self::compile_in_list(in_list, schema);
}
if let Some(between) = expr.as_any().downcast_ref::<BetweenExpr>() {
return Self::compile_between(between, schema);
}
if let Some(like_expr) = expr.as_any().downcast_ref::<LikeExpr>() {
return Self::compile_like(like_expr, schema);
}
if let Some(const_bool) = expr.as_any().downcast_ref::<ConstBoolExpr>() {
return if const_bool.value() {
CompiledFilter::True
} else {
CompiledFilter::False
};
}
if let Some(func_expr) = expr.as_any().downcast_ref::<FunctionExpr>() {
return Self::compile_function_expr(func_expr, schema);
}
CompiledFilter::Dynamic(expr.clone_box())
}
pub fn compile_boxed(expr: &dyn Expression, schema: &Schema) -> Self {
Self::compile(expr, schema)
}
fn compile_comparison(expr: &ComparisonExpr, schema: &Schema) -> Self {
let col_name = expr.get_column_name().unwrap_or("");
let col_idx = match super::find_column_index(schema, col_name) {
Some(idx) => idx,
None => return CompiledFilter::Dynamic(expr.clone_box()),
};
let (_, op, value) = match expr.get_comparison_info() {
Some(info) => info,
None => return CompiledFilter::Dynamic(expr.clone_box()),
};
match value {
Value::Integer(i) => {
let i = *i;
match op {
Operator::Eq => CompiledFilter::IntegerEq { col_idx, value: i },
Operator::Ne => CompiledFilter::IntegerNe { col_idx, value: i },
Operator::Gt => CompiledFilter::IntegerGt { col_idx, value: i },
Operator::Gte => CompiledFilter::IntegerGte { col_idx, value: i },
Operator::Lt => CompiledFilter::IntegerLt { col_idx, value: i },
Operator::Lte => CompiledFilter::IntegerLte { col_idx, value: i },
_ => CompiledFilter::Dynamic(expr.clone_box()),
}
}
Value::Float(f) => {
let f = *f;
match op {
Operator::Eq => CompiledFilter::FloatEq { col_idx, value: f },
Operator::Ne => CompiledFilter::FloatNe { col_idx, value: f },
Operator::Gt => CompiledFilter::FloatGt { col_idx, value: f },
Operator::Gte => CompiledFilter::FloatGte { col_idx, value: f },
Operator::Lt => CompiledFilter::FloatLt { col_idx, value: f },
Operator::Lte => CompiledFilter::FloatLte { col_idx, value: f },
_ => CompiledFilter::Dynamic(expr.clone_box()),
}
}
Value::Text(s) => {
let s: Arc<str> = Arc::from(s.as_ref());
match op {
Operator::Eq => CompiledFilter::StringEq { col_idx, value: s },
Operator::Ne => CompiledFilter::StringNe { col_idx, value: s },
Operator::Gt => CompiledFilter::StringGt { col_idx, value: s },
Operator::Gte => CompiledFilter::StringGte { col_idx, value: s },
Operator::Lt => CompiledFilter::StringLt { col_idx, value: s },
Operator::Lte => CompiledFilter::StringLte { col_idx, value: s },
_ => CompiledFilter::Dynamic(expr.clone_box()),
}
}
Value::Boolean(b) => {
let b = *b;
match op {
Operator::Eq => CompiledFilter::BooleanEq { col_idx, value: b },
Operator::Ne => CompiledFilter::BooleanNe { col_idx, value: b },
_ => CompiledFilter::Dynamic(expr.clone_box()),
}
}
Value::Timestamp(t) => {
let t = *t;
match op {
Operator::Eq => CompiledFilter::TimestampEq { col_idx, value: t },
Operator::Ne => CompiledFilter::TimestampNe { col_idx, value: t },
Operator::Gt => CompiledFilter::TimestampGt { col_idx, value: t },
Operator::Gte => CompiledFilter::TimestampGte { col_idx, value: t },
Operator::Lt => CompiledFilter::TimestampLt { col_idx, value: t },
Operator::Lte => CompiledFilter::TimestampLte { col_idx, value: t },
_ => CompiledFilter::Dynamic(expr.clone_box()),
}
}
_ => CompiledFilter::Dynamic(expr.clone_box()),
}
}
fn compile_and(expr: &AndExpr, schema: &Schema) -> Self {
if let Some(operands) = expr.get_and_operands() {
let mut flattened = Vec::with_capacity(operands.len());
for op in operands.iter() {
let compiled = Self::compile(op.as_ref(), schema);
Self::flatten_and(&mut flattened, compiled);
}
if flattened.iter().any(|f| matches!(f, CompiledFilter::False)) {
return CompiledFilter::False;
}
flattened.retain(|f| !matches!(f, CompiledFilter::True));
match flattened.len() {
0 => CompiledFilter::True, 1 => flattened.pop().unwrap(),
2 => {
let right = flattened.pop().unwrap();
let left = flattened.pop().unwrap();
CompiledFilter::And(Box::new(left), Box::new(right))
}
_ => CompiledFilter::AndN(flattened),
}
} else {
CompiledFilter::Dynamic(expr.clone_box())
}
}
fn flatten_and(result: &mut Vec<CompiledFilter>, filter: CompiledFilter) {
match filter {
CompiledFilter::And(left, right) => {
Self::flatten_and(result, *left);
Self::flatten_and(result, *right);
}
CompiledFilter::AndN(filters) => {
for f in filters {
Self::flatten_and(result, f);
}
}
other => result.push(other),
}
}
fn compile_or(expr: &OrExpr, schema: &Schema) -> Self {
if let Some(operands) = expr.get_or_operands() {
let mut flattened = Vec::with_capacity(operands.len());
for op in operands.iter() {
let compiled = Self::compile(op.as_ref(), schema);
Self::flatten_or(&mut flattened, compiled);
}
if flattened.iter().any(|f| matches!(f, CompiledFilter::True)) {
return CompiledFilter::True;
}
flattened.retain(|f| !matches!(f, CompiledFilter::False));
match flattened.len() {
0 => CompiledFilter::False, 1 => flattened.pop().unwrap(),
2 => {
let right = flattened.pop().unwrap();
let left = flattened.pop().unwrap();
CompiledFilter::Or(Box::new(left), Box::new(right))
}
_ => CompiledFilter::OrN(flattened),
}
} else {
CompiledFilter::Dynamic(expr.clone_box())
}
}
fn flatten_or(result: &mut Vec<CompiledFilter>, filter: CompiledFilter) {
match filter {
CompiledFilter::Or(left, right) => {
Self::flatten_or(result, *left);
Self::flatten_or(result, *right);
}
CompiledFilter::OrN(filters) => {
for f in filters {
Self::flatten_or(result, f);
}
}
other => result.push(other),
}
}
fn compile_null_check(expr: &NullCheckExpr, schema: &Schema) -> Self {
let col_name = expr.get_column_name().unwrap_or("");
match super::find_column_index(schema, col_name) {
Some(col_idx) => {
if expr.is_null_check() {
CompiledFilter::IsNull { col_idx }
} else {
CompiledFilter::IsNotNull { col_idx }
}
}
None => CompiledFilter::Dynamic(expr.clone_box()),
}
}
fn compile_in_list(expr: &InListExpr, schema: &Schema) -> Self {
let col_name = expr.get_column_name().unwrap_or("");
let col_idx = match super::find_column_index(schema, col_name) {
Some(idx) => idx,
None => return CompiledFilter::Dynamic(expr.clone_box()),
};
let values = expr.get_values();
let is_negated = expr.is_not();
if values.is_empty() {
return if is_negated {
CompiledFilter::True
} else {
CompiledFilter::False
};
}
let first = &values[0];
let in_filter = match first {
Value::Integer(_) => {
let int_values: Vec<i64> = values
.iter()
.filter_map(|v| match v {
Value::Integer(i) => Some(*i),
_ => None,
})
.collect();
if int_values.len() == values.len() {
Some(CompiledFilter::IntegerIn {
col_idx,
values: int_values,
})
} else {
None
}
}
Value::Text(_) => {
let str_values: Vec<Arc<str>> = values
.iter()
.filter_map(|v| match v {
Value::Text(s) => Some(Arc::from(s.as_ref())),
_ => None,
})
.collect();
if str_values.len() == values.len() {
Some(CompiledFilter::StringIn {
col_idx,
values: str_values,
})
} else {
None
}
}
_ => None,
};
match in_filter {
Some(filter) => {
if is_negated {
CompiledFilter::Not(Box::new(filter))
} else {
filter
}
}
None => CompiledFilter::Dynamic(expr.clone_box()),
}
}
fn compile_between(expr: &BetweenExpr, schema: &Schema) -> Self {
let col_name = expr.get_column_name().unwrap_or("");
let col_idx = match super::find_column_index(schema, col_name) {
Some(idx) => idx,
None => return CompiledFilter::Dynamic(expr.clone_box()),
};
let (low, high) = expr.get_bounds();
let is_negated = expr.is_negated();
let between_filter = match (low, high) {
(Value::Integer(min), Value::Integer(max)) => CompiledFilter::IntegerBetween {
col_idx,
min: *min,
max: *max,
},
(Value::Float(min), Value::Float(max)) => CompiledFilter::FloatBetween {
col_idx,
min: *min,
max: *max,
},
(Value::Timestamp(min), Value::Timestamp(max)) => CompiledFilter::TimestampBetween {
col_idx,
min: *min,
max: *max,
},
_ => return CompiledFilter::Dynamic(expr.clone_box()),
};
if is_negated {
CompiledFilter::Not(Box::new(between_filter))
} else {
between_filter
}
}
fn compile_like(expr: &LikeExpr, schema: &Schema) -> Self {
let col_name = expr.get_column_name().unwrap_or("");
let col_idx = match super::find_column_index(schema, col_name) {
Some(idx) => idx,
None => return CompiledFilter::Dynamic(expr.clone_box()),
};
let pattern_str = expr.get_pattern();
let case_insensitive = expr.is_case_insensitive();
let negated = expr.is_negated();
let pattern = CompiledPattern::compile(pattern_str, case_insensitive);
CompiledFilter::StringLike {
col_idx,
pattern,
case_insensitive,
negated,
}
}
fn compile_function_expr(expr: &FunctionExpr, schema: &Schema) -> Self {
let func_name = expr.function_name().to_uppercase();
let args = expr.get_arguments();
let op = expr.get_operator();
let compare_val = expr.get_compare_value();
if args.len() != 1 {
return CompiledFilter::Dynamic(expr.clone_box());
}
let col_idx = match &args[0] {
FunctionArg::Column(col_name) => match super::find_column_index(schema, col_name) {
Some(idx) => idx,
None => return CompiledFilter::Dynamic(expr.clone_box()),
},
_ => return CompiledFilter::Dynamic(expr.clone_box()),
};
match func_name.as_str() {
"UPPER" => {
if op != Operator::Eq {
return CompiledFilter::Dynamic(expr.clone_box());
}
match compare_val {
Value::Text(s) => CompiledFilter::UpperEq {
col_idx,
value: Arc::from(s.as_ref()),
},
_ => CompiledFilter::Dynamic(expr.clone_box()),
}
}
"LOWER" => {
if op != Operator::Eq {
return CompiledFilter::Dynamic(expr.clone_box());
}
match compare_val {
Value::Text(s) => CompiledFilter::LowerEq {
col_idx,
value: Arc::from(s.as_ref()),
},
_ => CompiledFilter::Dynamic(expr.clone_box()),
}
}
"TRIM" | "BTRIM" => {
if op != Operator::Eq {
return CompiledFilter::Dynamic(expr.clone_box());
}
match compare_val {
Value::Text(s) => CompiledFilter::TrimEq {
col_idx,
value: Arc::from(s.as_ref()),
},
_ => CompiledFilter::Dynamic(expr.clone_box()),
}
}
"LENGTH" | "LEN" | "CHAR_LENGTH" | "CHARACTER_LENGTH" => {
match compare_val {
Value::Integer(n) => match op {
Operator::Eq => CompiledFilter::LengthEq { col_idx, value: *n },
Operator::Ne => CompiledFilter::LengthNe { col_idx, value: *n },
Operator::Gt => CompiledFilter::LengthGt { col_idx, value: *n },
Operator::Gte => CompiledFilter::LengthGte { col_idx, value: *n },
Operator::Lt => CompiledFilter::LengthLt { col_idx, value: *n },
Operator::Lte => CompiledFilter::LengthLte { col_idx, value: *n },
_ => CompiledFilter::Dynamic(expr.clone_box()),
},
_ => CompiledFilter::Dynamic(expr.clone_box()),
}
}
_ => CompiledFilter::Dynamic(expr.clone_box()),
}
}
#[inline(always)]
pub fn matches(&self, row: &Row) -> bool {
match self {
CompiledFilter::IntegerEq { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Integer(i)) if *i == *value)
}
CompiledFilter::IntegerNe { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Integer(i)) if *i != *value)
}
CompiledFilter::IntegerGt { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Integer(i)) if *i > *value)
}
CompiledFilter::IntegerGte { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Integer(i)) if *i >= *value)
}
CompiledFilter::IntegerLt { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Integer(i)) if *i < *value)
}
CompiledFilter::IntegerLte { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Integer(i)) if *i <= *value)
}
CompiledFilter::IntegerBetween { col_idx, min, max } => {
matches!(row.get(*col_idx), Some(Value::Integer(i)) if *i >= *min && *i <= *max)
}
CompiledFilter::IntegerIn { col_idx, values } => {
if let Some(Value::Integer(i)) = row.get(*col_idx) {
values.contains(i)
} else {
false
}
}
CompiledFilter::FloatEq { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Float(f)) if *f == *value)
}
CompiledFilter::FloatNe { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Float(f)) if *f != *value)
}
CompiledFilter::FloatGt { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Float(f)) if *f > *value)
}
CompiledFilter::FloatGte { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Float(f)) if *f >= *value)
}
CompiledFilter::FloatLt { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Float(f)) if *f < *value)
}
CompiledFilter::FloatLte { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Float(f)) if *f <= *value)
}
CompiledFilter::FloatBetween { col_idx, min, max } => {
matches!(row.get(*col_idx), Some(Value::Float(f)) if *f >= *min && *f <= *max)
}
CompiledFilter::StringEq { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.as_ref() == value.as_ref()
} else {
false
}
}
CompiledFilter::StringNe { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.as_ref() != value.as_ref()
} else {
false
}
}
CompiledFilter::StringGt { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.as_ref() > value.as_ref()
} else {
false
}
}
CompiledFilter::StringGte { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.as_ref() >= value.as_ref()
} else {
false
}
}
CompiledFilter::StringLt { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.as_ref() < value.as_ref()
} else {
false
}
}
CompiledFilter::StringLte { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.as_ref() <= value.as_ref()
} else {
false
}
}
CompiledFilter::StringIn { col_idx, values } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
values.iter().any(|v| s.as_ref() == v.as_ref())
} else {
false
}
}
CompiledFilter::StringLike {
col_idx,
pattern,
case_insensitive,
negated,
} => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
let matches = pattern.matches(s.as_ref(), *case_insensitive);
if *negated {
!matches
} else {
matches
}
} else {
false
}
}
CompiledFilter::BooleanEq { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Boolean(b)) if *b == *value)
}
CompiledFilter::BooleanNe { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Boolean(b)) if *b != *value)
}
CompiledFilter::TimestampEq { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Timestamp(t)) if *t == *value)
}
CompiledFilter::TimestampNe { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Timestamp(t)) if *t != *value)
}
CompiledFilter::TimestampGt { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Timestamp(t)) if *t > *value)
}
CompiledFilter::TimestampGte { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Timestamp(t)) if *t >= *value)
}
CompiledFilter::TimestampLt { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Timestamp(t)) if *t < *value)
}
CompiledFilter::TimestampLte { col_idx, value } => {
matches!(row.get(*col_idx), Some(Value::Timestamp(t)) if *t <= *value)
}
CompiledFilter::TimestampBetween { col_idx, min, max } => {
matches!(row.get(*col_idx), Some(Value::Timestamp(t)) if *t >= *min && *t <= *max)
}
CompiledFilter::IsNull { col_idx } => {
row.get(*col_idx).map(|v| v.is_null()).unwrap_or(true)
}
CompiledFilter::IsNotNull { col_idx } => {
row.get(*col_idx).map(|v| !v.is_null()).unwrap_or(false)
}
CompiledFilter::And(left, right) => left.matches(row) && right.matches(row),
CompiledFilter::AndN(filters) => filters.iter().all(|f| f.matches(row)),
CompiledFilter::Or(left, right) => left.matches(row) || right.matches(row),
CompiledFilter::OrN(filters) => filters.iter().any(|f| f.matches(row)),
CompiledFilter::Not(inner) => {
if inner.is_unknown_due_to_null(row) {
return false;
}
!inner.matches(row)
}
CompiledFilter::True => true,
CompiledFilter::False => false,
CompiledFilter::UpperEq { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.to_uppercase() == value.as_ref()
} else {
false
}
}
CompiledFilter::LowerEq { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.to_lowercase() == value.as_ref()
} else {
false
}
}
CompiledFilter::TrimEq { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.trim() == value.as_ref()
} else {
false
}
}
CompiledFilter::LengthEq { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.chars().count() as i64 == *value
} else {
false
}
}
CompiledFilter::LengthNe { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.chars().count() as i64 != *value
} else {
false
}
}
CompiledFilter::LengthGt { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.chars().count() as i64 > *value
} else {
false
}
}
CompiledFilter::LengthGte { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.chars().count() as i64 >= *value
} else {
false
}
}
CompiledFilter::LengthLt { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
(s.chars().count() as i64) < *value
} else {
false
}
}
CompiledFilter::LengthLte { col_idx, value } => {
if let Some(Value::Text(s)) = row.get(*col_idx) {
s.chars().count() as i64 <= *value
} else {
false
}
}
CompiledFilter::Dynamic(expr) => expr.evaluate_fast(row),
}
}
pub fn is_fully_compiled(&self) -> bool {
match self {
CompiledFilter::Dynamic(_) => false,
CompiledFilter::And(left, right) => {
left.is_fully_compiled() && right.is_fully_compiled()
}
CompiledFilter::AndN(filters) => filters.iter().all(|f| f.is_fully_compiled()),
CompiledFilter::Or(left, right) => {
left.is_fully_compiled() && right.is_fully_compiled()
}
CompiledFilter::OrN(filters) => filters.iter().all(|f| f.is_fully_compiled()),
CompiledFilter::Not(inner) => inner.is_fully_compiled(),
_ => true,
}
}
#[inline(always)]
pub fn is_unknown_due_to_null(&self, row: &Row) -> bool {
match self {
CompiledFilter::IntegerEq { col_idx, .. }
| CompiledFilter::IntegerNe { col_idx, .. }
| CompiledFilter::IntegerGt { col_idx, .. }
| CompiledFilter::IntegerGte { col_idx, .. }
| CompiledFilter::IntegerLt { col_idx, .. }
| CompiledFilter::IntegerLte { col_idx, .. }
| CompiledFilter::IntegerBetween { col_idx, .. }
| CompiledFilter::IntegerIn { col_idx, .. }
| CompiledFilter::FloatEq { col_idx, .. }
| CompiledFilter::FloatNe { col_idx, .. }
| CompiledFilter::FloatGt { col_idx, .. }
| CompiledFilter::FloatGte { col_idx, .. }
| CompiledFilter::FloatLt { col_idx, .. }
| CompiledFilter::FloatLte { col_idx, .. }
| CompiledFilter::FloatBetween { col_idx, .. }
| CompiledFilter::StringEq { col_idx, .. }
| CompiledFilter::StringNe { col_idx, .. }
| CompiledFilter::StringGt { col_idx, .. }
| CompiledFilter::StringGte { col_idx, .. }
| CompiledFilter::StringLt { col_idx, .. }
| CompiledFilter::StringLte { col_idx, .. }
| CompiledFilter::StringIn { col_idx, .. }
| CompiledFilter::StringLike { col_idx, .. }
| CompiledFilter::BooleanEq { col_idx, .. }
| CompiledFilter::BooleanNe { col_idx, .. }
| CompiledFilter::TimestampEq { col_idx, .. }
| CompiledFilter::TimestampNe { col_idx, .. }
| CompiledFilter::TimestampGt { col_idx, .. }
| CompiledFilter::TimestampGte { col_idx, .. }
| CompiledFilter::TimestampLt { col_idx, .. }
| CompiledFilter::TimestampLte { col_idx, .. }
| CompiledFilter::TimestampBetween { col_idx, .. }
| CompiledFilter::UpperEq { col_idx, .. }
| CompiledFilter::LowerEq { col_idx, .. }
| CompiledFilter::TrimEq { col_idx, .. }
| CompiledFilter::LengthEq { col_idx, .. }
| CompiledFilter::LengthNe { col_idx, .. }
| CompiledFilter::LengthGt { col_idx, .. }
| CompiledFilter::LengthGte { col_idx, .. }
| CompiledFilter::LengthLt { col_idx, .. }
| CompiledFilter::LengthLte { col_idx, .. } => {
row.get(*col_idx).map(|v| v.is_null()).unwrap_or(true)
}
CompiledFilter::IsNull { .. } | CompiledFilter::IsNotNull { .. } => false,
CompiledFilter::And(_, _) | CompiledFilter::AndN(_) => false,
CompiledFilter::Or(_, _) | CompiledFilter::OrN(_) => false,
CompiledFilter::Not(inner) => inner.is_unknown_due_to_null(row),
CompiledFilter::True | CompiledFilter::False => false,
CompiledFilter::Dynamic(expr) => expr.is_unknown_due_to_null(row),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_row(id: i64, name: &str, age: i64, score: f64, active: bool) -> Row {
Row::from_values(vec![
Value::Integer(id),
Value::text(name),
Value::Integer(age),
Value::Float(score),
Value::Boolean(active),
])
}
#[test]
fn test_integer_eq() {
let filter = CompiledFilter::IntegerEq {
col_idx: 0,
value: 42,
};
let row1 = make_row(42, "Alice", 30, 95.5, true);
let row2 = make_row(43, "Bob", 25, 88.0, false);
assert!(filter.matches(&row1));
assert!(!filter.matches(&row2));
}
#[test]
fn test_integer_between() {
let filter = CompiledFilter::IntegerBetween {
col_idx: 2,
min: 25,
max: 35,
};
let row1 = make_row(1, "Alice", 30, 95.5, true);
let row2 = make_row(2, "Bob", 40, 88.0, false);
let row3 = make_row(3, "Charlie", 25, 70.0, true);
assert!(filter.matches(&row1));
assert!(!filter.matches(&row2));
assert!(filter.matches(&row3));
}
#[test]
fn test_string_eq() {
let filter = CompiledFilter::StringEq {
col_idx: 1,
value: Arc::from("Alice"),
};
let row1 = make_row(1, "Alice", 30, 95.5, true);
let row2 = make_row(2, "Bob", 25, 88.0, false);
assert!(filter.matches(&row1));
assert!(!filter.matches(&row2));
}
#[test]
fn test_and_filter() {
let filter = CompiledFilter::And(
Box::new(CompiledFilter::IntegerGte {
col_idx: 2,
value: 25,
}),
Box::new(CompiledFilter::BooleanEq {
col_idx: 4,
value: true,
}),
);
let row1 = make_row(1, "Alice", 30, 95.5, true); let row2 = make_row(2, "Bob", 30, 88.0, false); let row3 = make_row(3, "Charlie", 20, 70.0, true);
assert!(filter.matches(&row1));
assert!(!filter.matches(&row2));
assert!(!filter.matches(&row3));
}
#[test]
fn test_or_filter() {
let filter = CompiledFilter::Or(
Box::new(CompiledFilter::StringEq {
col_idx: 1,
value: Arc::from("Alice"),
}),
Box::new(CompiledFilter::StringEq {
col_idx: 1,
value: Arc::from("Bob"),
}),
);
let row1 = make_row(1, "Alice", 30, 95.5, true);
let row2 = make_row(2, "Bob", 25, 88.0, false);
let row3 = make_row(3, "Charlie", 35, 70.0, true);
assert!(filter.matches(&row1));
assert!(filter.matches(&row2));
assert!(!filter.matches(&row3));
}
#[test]
fn test_integer_in() {
let filter = CompiledFilter::IntegerIn {
col_idx: 0,
values: vec![1, 3, 5, 7, 9],
};
let row1 = make_row(1, "Alice", 30, 95.5, true);
let row2 = make_row(2, "Bob", 25, 88.0, false);
let row3 = make_row(5, "Charlie", 35, 70.0, true);
assert!(filter.matches(&row1));
assert!(!filter.matches(&row2));
assert!(filter.matches(&row3));
}
#[test]
fn test_like_prefix() {
let pattern = CompiledPattern::compile("Al%", false);
let filter = CompiledFilter::StringLike {
col_idx: 1,
pattern,
case_insensitive: false,
negated: false,
};
let row1 = make_row(1, "Alice", 30, 95.5, true);
let row2 = make_row(2, "Bob", 25, 88.0, false);
let row3 = make_row(3, "Albert", 35, 70.0, true);
assert!(filter.matches(&row1));
assert!(!filter.matches(&row2));
assert!(filter.matches(&row3));
}
#[test]
fn test_is_fully_compiled() {
let filter1 = CompiledFilter::IntegerEq {
col_idx: 0,
value: 42,
};
assert!(filter1.is_fully_compiled());
let filter2 = CompiledFilter::And(
Box::new(CompiledFilter::IntegerEq {
col_idx: 0,
value: 42,
}),
Box::new(CompiledFilter::StringEq {
col_idx: 1,
value: Arc::from("test"),
}),
);
assert!(filter2.is_fully_compiled());
}
#[test]
fn test_upper_eq() {
let filter = CompiledFilter::UpperEq {
col_idx: 1,
value: Arc::from("ALICE"),
};
let row1 = make_row(1, "alice", 30, 95.5, true);
let row2 = make_row(2, "Alice", 25, 88.0, false);
let row3 = make_row(3, "ALICE", 35, 70.0, true);
let row4 = make_row(4, "bob", 40, 80.0, false);
assert!(filter.matches(&row1)); assert!(filter.matches(&row2)); assert!(filter.matches(&row3)); assert!(!filter.matches(&row4)); }
#[test]
fn test_lower_eq() {
let filter = CompiledFilter::LowerEq {
col_idx: 1,
value: Arc::from("alice"),
};
let row1 = make_row(1, "alice", 30, 95.5, true);
let row2 = make_row(2, "Alice", 25, 88.0, false);
let row3 = make_row(3, "ALICE", 35, 70.0, true);
let row4 = make_row(4, "Bob", 40, 80.0, false);
assert!(filter.matches(&row1)); assert!(filter.matches(&row2)); assert!(filter.matches(&row3)); assert!(!filter.matches(&row4)); }
#[test]
fn test_trim_eq() {
let filter = CompiledFilter::TrimEq {
col_idx: 1,
value: Arc::from("Alice"),
};
let row1 = make_row(1, "Alice", 30, 95.5, true);
let row2 = make_row(2, " Alice ", 25, 88.0, false);
let row3 = make_row(3, "Alice ", 35, 70.0, true);
let row4 = make_row(4, " Bob ", 40, 80.0, false);
assert!(filter.matches(&row1)); assert!(filter.matches(&row2)); assert!(filter.matches(&row3)); assert!(!filter.matches(&row4)); }
#[test]
fn test_length_eq() {
let filter = CompiledFilter::LengthEq {
col_idx: 1,
value: 5,
};
let row1 = make_row(1, "Alice", 30, 95.5, true); let row2 = make_row(2, "Bob", 25, 88.0, false);
assert!(filter.matches(&row1));
assert!(!filter.matches(&row2));
}
#[test]
fn test_length_gt() {
let filter = CompiledFilter::LengthGt {
col_idx: 1,
value: 3,
};
let row1 = make_row(1, "Alice", 30, 95.5, true); let row2 = make_row(2, "Bob", 25, 88.0, false); let row3 = make_row(3, "Al", 35, 70.0, true);
assert!(filter.matches(&row1));
assert!(!filter.matches(&row2));
assert!(!filter.matches(&row3));
}
#[test]
fn test_length_gte() {
let filter = CompiledFilter::LengthGte {
col_idx: 1,
value: 3,
};
let row1 = make_row(1, "Alice", 30, 95.5, true); let row2 = make_row(2, "Bob", 25, 88.0, false); let row3 = make_row(3, "Al", 35, 70.0, true);
assert!(filter.matches(&row1));
assert!(filter.matches(&row2));
assert!(!filter.matches(&row3));
}
#[test]
fn test_length_lt() {
let filter = CompiledFilter::LengthLt {
col_idx: 1,
value: 4,
};
let row1 = make_row(1, "Alice", 30, 95.5, true); let row2 = make_row(2, "Bob", 25, 88.0, false); let row3 = make_row(3, "Al", 35, 70.0, true);
assert!(!filter.matches(&row1));
assert!(filter.matches(&row2));
assert!(filter.matches(&row3));
}
#[test]
fn test_length_lte() {
let filter = CompiledFilter::LengthLte {
col_idx: 1,
value: 3,
};
let row1 = make_row(1, "Alice", 30, 95.5, true); let row2 = make_row(2, "Bob", 25, 88.0, false); let row3 = make_row(3, "Al", 35, 70.0, true);
assert!(!filter.matches(&row1));
assert!(filter.matches(&row2));
assert!(filter.matches(&row3));
}
#[test]
fn test_scalar_func_with_null() {
let filter = CompiledFilter::UpperEq {
col_idx: 1,
value: Arc::from("ALICE"),
};
let row = Row::from_values(vec![
Value::Integer(1),
Value::null(crate::core::DataType::Text),
Value::Integer(30),
Value::Float(95.5),
Value::Boolean(true),
]);
assert!(!filter.matches(&row));
assert!(filter.is_unknown_due_to_null(&row));
}
#[test]
fn test_and_short_circuit_false() {
let filter = CompiledFilter::And(
Box::new(CompiledFilter::IntegerEq {
col_idx: 0,
value: 42,
}),
Box::new(CompiledFilter::False),
);
let row = make_row(42, "Alice", 30, 95.5, true);
assert!(!filter.matches(&row)); }
#[test]
fn test_or_short_circuit_true() {
let filter = CompiledFilter::Or(
Box::new(CompiledFilter::IntegerEq {
col_idx: 0,
value: 999, }),
Box::new(CompiledFilter::True),
);
let row = make_row(42, "Alice", 30, 95.5, true);
assert!(filter.matches(&row)); }
#[test]
fn test_and_with_true_constant() {
let filter = CompiledFilter::And(
Box::new(CompiledFilter::IntegerEq {
col_idx: 0,
value: 42,
}),
Box::new(CompiledFilter::True),
);
let row1 = make_row(42, "Alice", 30, 95.5, true);
let row2 = make_row(43, "Bob", 25, 88.0, false);
assert!(filter.matches(&row1)); assert!(!filter.matches(&row2)); }
#[test]
fn test_or_with_false_constant() {
let filter = CompiledFilter::Or(
Box::new(CompiledFilter::IntegerEq {
col_idx: 0,
value: 42,
}),
Box::new(CompiledFilter::False),
);
let row1 = make_row(42, "Alice", 30, 95.5, true);
let row2 = make_row(43, "Bob", 25, 88.0, false);
assert!(filter.matches(&row1)); assert!(!filter.matches(&row2)); }
#[test]
fn test_andn_flattened() {
let filter = CompiledFilter::AndN(vec![
CompiledFilter::IntegerGte {
col_idx: 0,
value: 1,
},
CompiledFilter::IntegerLte {
col_idx: 0,
value: 10,
},
CompiledFilter::BooleanEq {
col_idx: 4,
value: true,
},
]);
let row1 = make_row(5, "Alice", 30, 95.5, true); let row2 = make_row(5, "Bob", 25, 88.0, false); let row3 = make_row(15, "Charlie", 35, 70.0, true);
assert!(filter.matches(&row1));
assert!(!filter.matches(&row2));
assert!(!filter.matches(&row3));
}
#[test]
fn test_orn_flattened() {
let filter = CompiledFilter::OrN(vec![
CompiledFilter::IntegerEq {
col_idx: 0,
value: 1,
},
CompiledFilter::IntegerEq {
col_idx: 0,
value: 5,
},
CompiledFilter::IntegerEq {
col_idx: 0,
value: 10,
},
]);
let row1 = make_row(1, "Alice", 30, 95.5, true); let row2 = make_row(5, "Bob", 25, 88.0, false); let row3 = make_row(10, "Charlie", 35, 70.0, true); let row4 = make_row(7, "Dave", 40, 80.0, false);
assert!(filter.matches(&row1));
assert!(filter.matches(&row2));
assert!(filter.matches(&row3));
assert!(!filter.matches(&row4));
}
#[test]
fn test_nested_and_should_flatten() {
let inner = CompiledFilter::And(
Box::new(CompiledFilter::IntegerGte {
col_idx: 0,
value: 1,
}),
Box::new(CompiledFilter::IntegerLte {
col_idx: 0,
value: 10,
}),
);
let filter = CompiledFilter::And(
Box::new(inner),
Box::new(CompiledFilter::BooleanEq {
col_idx: 4,
value: true,
}),
);
let row1 = make_row(5, "Alice", 30, 95.5, true); let row2 = make_row(5, "Bob", 25, 88.0, false); let row3 = make_row(15, "Charlie", 35, 70.0, true);
assert!(filter.matches(&row1));
assert!(!filter.matches(&row2));
assert!(!filter.matches(&row3));
}
#[test]
fn test_nested_or_should_flatten() {
let inner = CompiledFilter::Or(
Box::new(CompiledFilter::IntegerEq {
col_idx: 0,
value: 1,
}),
Box::new(CompiledFilter::IntegerEq {
col_idx: 0,
value: 5,
}),
);
let filter = CompiledFilter::Or(
Box::new(inner),
Box::new(CompiledFilter::IntegerEq {
col_idx: 0,
value: 10,
}),
);
let row1 = make_row(1, "Alice", 30, 95.5, true); let row2 = make_row(5, "Bob", 25, 88.0, false); let row3 = make_row(10, "Charlie", 35, 70.0, true); let row4 = make_row(7, "Dave", 40, 80.0, false);
assert!(filter.matches(&row1));
assert!(filter.matches(&row2));
assert!(filter.matches(&row3));
assert!(!filter.matches(&row4));
}
}