use std::any::Any;
use std::collections::HashMap;
use rustc_hash::FxHashSet;
use super::{find_column_index, resolve_alias, Expression};
use crate::core::{Result, Row, Schema, Value};
#[derive(Debug, Clone)]
enum HashedValues {
None,
Integers(FxHashSet<i64>),
Strings(FxHashSet<String>),
Booleans { has_true: bool, has_false: bool },
Mixed,
}
#[derive(Debug, Clone)]
pub struct InListExpr {
column: String,
values: Vec<Value>,
not: bool,
col_index: Option<usize>,
hashed: HashedValues,
has_null: bool,
aliases: HashMap<String, String>,
original_column: Option<String>,
}
impl InListExpr {
pub fn new(column: impl Into<String>, values: Vec<Value>) -> Self {
let has_null = values.iter().any(|v| v.is_null());
Self {
column: column.into(),
values,
not: false,
col_index: None,
hashed: HashedValues::None,
has_null,
aliases: HashMap::new(),
original_column: None,
}
}
pub fn not_in(column: impl Into<String>, values: Vec<Value>) -> Self {
let has_null = values.iter().any(|v| v.is_null());
Self {
column: column.into(),
values,
not: true,
col_index: None,
hashed: HashedValues::None,
has_null,
aliases: HashMap::new(),
original_column: None,
}
}
pub fn is_not(&self) -> bool {
self.not
}
pub fn values(&self) -> &[Value] {
&self.values
}
pub fn get_values(&self) -> &[Value] {
&self.values
}
fn build_hash_sets(&mut self) {
if self.values.is_empty() {
self.hashed = HashedValues::None;
return;
}
let first_type = self.values.iter().find_map(|v| match v {
Value::Integer(_) => Some("int"),
Value::Float(_) => Some("float"),
Value::Text(_) => Some("text"),
Value::Boolean(_) => Some("bool"),
Value::Null(_) => None,
_ => Some("other"),
});
match first_type {
Some("int") => {
let mut set = FxHashSet::default();
let mut all_int = true;
for v in &self.values {
match v {
Value::Integer(i) => {
set.insert(*i);
}
Value::Float(f) if f.fract() == 0.0 => {
set.insert(*f as i64);
}
Value::Null(_) => {} _ => {
all_int = false;
break;
}
}
}
if all_int {
self.hashed = HashedValues::Integers(set);
} else {
self.hashed = HashedValues::Mixed;
}
}
Some("text") => {
let mut set = FxHashSet::default();
let mut all_text = true;
for v in &self.values {
match v {
Value::Text(s) => {
set.insert(s.to_string());
}
Value::Null(_) => {}
_ => {
all_text = false;
break;
}
}
}
if all_text {
self.hashed = HashedValues::Strings(set);
} else {
self.hashed = HashedValues::Mixed;
}
}
Some("bool") => {
let mut has_true = false;
let mut has_false = false;
for v in &self.values {
match v {
Value::Boolean(true) => has_true = true,
Value::Boolean(false) => has_false = true,
Value::Null(_) => {}
_ => {}
}
}
self.hashed = HashedValues::Booleans {
has_true,
has_false,
};
}
_ => {
self.hashed = HashedValues::Mixed;
}
}
}
#[inline]
fn check_integer(&self, val: i64) -> bool {
match &self.hashed {
HashedValues::Integers(set) => set.contains(&val),
_ => {
for v in &self.values {
if let Some(list_val) = v.as_int64() {
if val == list_val {
return true;
}
} else if let Some(list_val) = v.as_float64() {
if val as f64 == list_val {
return true;
}
}
}
false
}
}
}
#[inline]
fn check_float(&self, val: f64) -> bool {
for v in &self.values {
if let Some(list_val) = v.as_float64() {
if val == list_val {
return true;
}
} else if let Some(list_val) = v.as_int64() {
if val == list_val as f64 {
return true;
}
}
}
false
}
#[inline]
fn check_string(&self, val: &str) -> bool {
match &self.hashed {
HashedValues::Strings(set) => set.contains(val),
_ => {
for v in &self.values {
if let Some(list_val) = v.as_string() {
if val == list_val {
return true;
}
}
}
false
}
}
}
#[inline]
fn check_boolean(&self, val: bool) -> bool {
match &self.hashed {
HashedValues::Booleans {
has_true,
has_false,
} => {
if val {
*has_true
} else {
*has_false
}
}
_ => {
self.values.iter().any(|v| v.as_boolean() == Some(val))
}
}
}
}
impl Expression for InListExpr {
fn evaluate(&self, row: &Row) -> Result<bool> {
let col_idx = match self.col_index {
Some(idx) if idx < row.len() => idx,
_ => return Ok(false),
};
let col_value = &row[col_idx];
if col_value.is_null() {
return Ok(false);
}
let found = match col_value {
Value::Integer(val) => self.check_integer(*val),
Value::Float(val) => self.check_float(*val),
Value::Text(val) => self.check_string(val),
Value::Boolean(val) => self.check_boolean(*val),
_ => false,
};
if found {
Ok(!self.not) } else if self.has_null {
Ok(false)
} else {
Ok(self.not) }
}
fn evaluate_fast(&self, row: &Row) -> bool {
let col_idx = match self.col_index {
Some(idx) if idx < row.len() => idx,
_ => return false, };
let col_value = &row[col_idx];
if col_value.is_null() {
return false;
}
let found = match col_value {
Value::Integer(val) => self.check_integer(*val),
Value::Float(val) => self.check_float(*val),
Value::Text(val) => self.check_string(val),
Value::Boolean(val) => self.check_boolean(*val),
_ => false,
};
if found {
!self.not } else if self.has_null {
false
} else {
self.not }
}
fn with_aliases(&self, aliases: &HashMap<String, String>) -> Box<dyn Expression> {
let resolved = resolve_alias(&self.column, aliases);
let mut expr = self.clone();
if resolved != self.column {
expr.original_column = Some(self.column.clone());
expr.column = resolved.to_string();
}
expr.aliases = aliases.clone();
expr.col_index = None;
expr.hashed = HashedValues::None; Box::new(expr)
}
fn prepare_for_schema(&mut self, schema: &Schema) {
if self.col_index.is_some() {
return;
}
self.col_index = find_column_index(schema, &self.column);
self.build_hash_sets();
}
fn is_prepared(&self) -> bool {
self.col_index.is_some()
}
fn get_column_name(&self) -> Option<&str> {
Some(&self.column)
}
fn can_use_index(&self) -> bool {
true
}
fn clone_box(&self) -> Box<dyn Expression> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{DataType, SchemaBuilder};
fn test_schema() -> Schema {
SchemaBuilder::new("test")
.add_primary_key("id", DataType::Integer)
.add("name", DataType::Text)
.add("status", DataType::Text)
.build()
}
#[test]
fn test_integer_in() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(2),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new(
"id",
vec![Value::integer(1), Value::integer(2), Value::integer(3)],
);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
assert!(expr.evaluate_fast(&row));
let mut expr = InListExpr::new(
"id",
vec![Value::integer(5), Value::integer(6), Value::integer(7)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
}
#[test]
fn test_string_in() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new(
"status",
vec![
Value::text("active"),
Value::text("inactive"),
Value::text("pending"),
],
);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
}
#[test]
fn test_not_in() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(4),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::not_in(
"id",
vec![Value::integer(1), Value::integer(2), Value::integer(3)],
);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
let mut expr = InListExpr::not_in(
"id",
vec![Value::integer(4), Value::integer(5), Value::integer(6)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
}
#[test]
fn test_null_in() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::null(DataType::Integer),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new(
"id",
vec![Value::integer(1), Value::integer(2), Value::integer(3)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
let mut expr = InListExpr::not_in(
"id",
vec![Value::integer(1), Value::integer(2), Value::integer(3)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
}
#[test]
fn test_empty_list() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new("id", vec![]);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
let mut expr = InListExpr::not_in("id", vec![]);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
}
#[test]
fn test_mixed_numeric_types() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(2),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new(
"id",
vec![Value::float(1.0), Value::float(2.0), Value::float(3.0)],
);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
}
#[test]
fn test_with_aliases() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut aliases = HashMap::new();
aliases.insert("i".to_string(), "id".to_string());
let expr = InListExpr::new("i", vec![Value::integer(1), Value::integer(2)]);
let mut aliased = expr.with_aliases(&aliases);
aliased.prepare_for_schema(&schema);
assert!(aliased.evaluate(&row).unwrap());
}
#[test]
fn test_is_not() {
let expr = InListExpr::new("id", vec![Value::integer(1)]);
assert!(!expr.is_not());
let expr = InListExpr::not_in("id", vec![Value::integer(1)]);
assert!(expr.is_not());
}
#[test]
fn test_not_in_with_null_in_list() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::not_in(
"id",
vec![Value::integer(2), Value::null(DataType::Integer)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
assert!(!expr.evaluate_fast(&row));
let row2 = Row::from_values(vec![
Value::integer(2),
Value::text("Bob"),
Value::text("active"),
]);
assert!(!expr.evaluate(&row2).unwrap());
assert!(!expr.evaluate_fast(&row2));
}
#[test]
fn test_in_with_null_in_list() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new(
"id",
vec![Value::integer(2), Value::null(DataType::Integer)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
assert!(!expr.evaluate_fast(&row));
let row2 = Row::from_values(vec![
Value::integer(2),
Value::text("Bob"),
Value::text("active"),
]);
assert!(expr.evaluate(&row2).unwrap());
assert!(expr.evaluate_fast(&row2));
}
#[test]
fn test_not_in_without_null() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::not_in("id", vec![Value::integer(2), Value::integer(3)]);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
assert!(expr.evaluate_fast(&row));
}
}