use std::any::Any;
use rustc_hash::{FxHashMap, FxHashSet};
use super::{find_column_index, resolve_alias, Expression};
use crate::common::I64Set;
use crate::core::{Result, Row, Schema, Value};
#[derive(Debug, Clone)]
enum HashedValues {
None,
Integers(I64Set),
Strings(FxHashSet<String>),
Booleans { has_true: bool, has_false: bool },
Mixed,
}
#[derive(Debug, Clone)]
pub struct InListExpr {
column: String,
values: Vec<Value>,
not: bool,
col_index: Option<usize>,
hashed: HashedValues,
has_null: bool,
aliases: FxHashMap<String, String>,
original_column: Option<String>,
cached_min: Option<Value>,
cached_max: Option<Value>,
}
impl InListExpr {
pub fn new(column: impl Into<String>, values: Vec<Value>) -> Self {
let has_null = values.iter().any(|v| v.is_null());
Self {
column: column.into(),
values,
not: false,
col_index: None,
hashed: HashedValues::None,
has_null,
aliases: FxHashMap::default(),
cached_min: None,
cached_max: None,
original_column: None,
}
}
pub fn not_in(column: impl Into<String>, values: Vec<Value>) -> Self {
let has_null = values.iter().any(|v| v.is_null());
Self {
column: column.into(),
values,
not: true,
col_index: None,
hashed: HashedValues::None,
has_null,
aliases: FxHashMap::default(),
original_column: None,
cached_min: None,
cached_max: None,
}
}
pub fn is_not(&self) -> bool {
self.not
}
pub fn values(&self) -> &[Value] {
&self.values
}
pub fn get_values(&self) -> &[Value] {
&self.values
}
fn build_hash_sets(&mut self) {
if self.values.is_empty() {
self.hashed = HashedValues::None;
return;
}
let first_type = self.values.iter().find_map(|v| match v {
Value::Integer(_) => Some("int"),
Value::Float(_) => Some("float"),
Value::Text(_) => Some("text"),
Value::Boolean(_) => Some("bool"),
Value::Null(_) => None,
_ => Some("other"),
});
match first_type {
Some("int") => {
let mut set = I64Set::new();
let mut all_int = true;
for v in &self.values {
match v {
Value::Integer(i) => {
set.insert(*i);
}
Value::Float(f) => {
if f.fract() == 0.0 {
set.insert(*f as i64);
} else {
all_int = false;
break;
}
}
Value::Null(_) => {} _ => {
all_int = false;
break;
}
}
}
if all_int {
self.hashed = HashedValues::Integers(set);
} else {
self.hashed = HashedValues::Mixed;
}
}
Some("text") => {
let mut set = FxHashSet::default();
let mut all_text = true;
for v in &self.values {
match v {
Value::Text(s) => {
set.insert(s.to_string());
}
Value::Null(_) => {}
_ => {
all_text = false;
break;
}
}
}
if all_text {
self.hashed = HashedValues::Strings(set);
} else {
self.hashed = HashedValues::Mixed;
}
}
Some("bool") => {
let mut has_true = false;
let mut has_false = false;
for v in &self.values {
match v {
Value::Boolean(true) => has_true = true,
Value::Boolean(false) => has_false = true,
Value::Null(_) => {}
_ => {}
}
}
self.hashed = HashedValues::Booleans {
has_true,
has_false,
};
}
_ => {
self.hashed = HashedValues::Mixed;
}
}
let mut min_val: Option<Value> = None;
let mut max_val: Option<Value> = None;
for v in &self.values {
if v.is_null() {
continue;
}
match (&min_val, &max_val) {
(None, _) => {
min_val = Some(v.clone());
max_val = Some(v.clone());
}
(Some(cur_min), Some(cur_max)) => {
if let Ok(std::cmp::Ordering::Less) = v.compare(cur_min) {
min_val = Some(v.clone());
}
if let Ok(std::cmp::Ordering::Greater) = v.compare(cur_max) {
max_val = Some(v.clone());
}
}
_ => {}
}
}
self.cached_min = min_val;
self.cached_max = max_val;
}
#[inline]
fn check_integer(&self, val: i64) -> bool {
match &self.hashed {
HashedValues::Integers(set) => set.contains(val),
_ => {
for v in &self.values {
if let Some(list_val) = v.as_int64() {
if val == list_val {
return true;
}
} else if let Some(list_val) = v.as_float64() {
if val as f64 == list_val {
return true;
}
}
}
false
}
}
}
#[inline]
fn check_float(&self, val: f64) -> bool {
for v in &self.values {
if let Some(list_val) = v.as_float64() {
if val == list_val {
return true;
}
} else if let Some(list_val) = v.as_int64() {
if val == list_val as f64 {
return true;
}
}
}
false
}
#[inline]
fn check_string(&self, val: &str) -> bool {
match &self.hashed {
HashedValues::Strings(set) => set.contains(val),
_ => {
for v in &self.values {
if let Some(list_val) = v.as_string() {
if val == list_val {
return true;
}
}
}
false
}
}
}
#[inline]
fn check_boolean(&self, val: bool) -> bool {
match &self.hashed {
HashedValues::Booleans {
has_true,
has_false,
} => {
if val {
*has_true
} else {
*has_false
}
}
_ => {
self.values.iter().any(|v| v.as_boolean() == Some(val))
}
}
}
}
impl Expression for InListExpr {
fn evaluate(&self, row: &Row) -> Result<bool> {
let col_idx = match self.col_index {
Some(idx) if idx < row.len() => idx,
_ => return Ok(false),
};
let col_value = &row[col_idx];
if col_value.is_null() {
return Ok(false);
}
let found = match col_value {
Value::Integer(val) => self.check_integer(*val),
Value::Float(val) => self.check_float(*val),
Value::Text(val) => self.check_string(val),
Value::Boolean(val) => self.check_boolean(*val),
_ => false,
};
if found {
Ok(!self.not) } else if self.has_null {
Ok(false)
} else {
Ok(self.not) }
}
fn evaluate_fast(&self, row: &Row) -> bool {
let col_idx = match self.col_index {
Some(idx) if idx < row.len() => idx,
_ => return false, };
let col_value = &row[col_idx];
if col_value.is_null() {
return false;
}
let found = match col_value {
Value::Integer(val) => self.check_integer(*val),
Value::Float(val) => self.check_float(*val),
Value::Text(val) => self.check_string(val),
Value::Boolean(val) => self.check_boolean(*val),
_ => false,
};
if found {
!self.not } else if self.has_null {
false
} else {
self.not }
}
fn with_aliases(&self, aliases: &FxHashMap<String, String>) -> Box<dyn Expression> {
let resolved = resolve_alias(&self.column, aliases);
let mut expr = self.clone();
if resolved != self.column {
expr.original_column = Some(self.column.clone());
expr.column = resolved.to_string();
}
expr.aliases = aliases.clone();
expr.col_index = None;
expr.hashed = HashedValues::None; Box::new(expr)
}
fn prepare_for_schema(&mut self, schema: &Schema) {
if self.col_index.is_some() {
return;
}
self.col_index = find_column_index(schema, &self.column);
self.build_hash_sets();
}
fn collect_column_indices(&self, out: &mut Vec<usize>) -> bool {
if let Some(idx) = self.col_index {
out.push(idx);
true
} else {
false
}
}
fn is_prepared(&self) -> bool {
self.col_index.is_some()
}
fn get_column_name(&self) -> Option<&str> {
Some(&self.column)
}
fn collect_comparisons(&self) -> Vec<(&str, crate::core::Operator, &Value)> {
if self.not {
return vec![];
}
match (&self.cached_min, &self.cached_max) {
(Some(min), Some(max)) => {
vec![
(&self.column, crate::core::Operator::Gte, min),
(&self.column, crate::core::Operator::Lte, max),
]
}
_ => vec![],
}
}
fn can_use_index(&self) -> bool {
true
}
fn is_conjunctive_simple(&self) -> bool {
false
}
fn clone_box(&self) -> Box<dyn Expression> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{DataType, SchemaBuilder};
fn test_schema() -> Schema {
SchemaBuilder::new("test")
.add_primary_key("id", DataType::Integer)
.add("name", DataType::Text)
.add("status", DataType::Text)
.build()
}
#[test]
fn test_integer_in() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(2),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new(
"id",
vec![Value::integer(1), Value::integer(2), Value::integer(3)],
);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
assert!(expr.evaluate_fast(&row));
let mut expr = InListExpr::new(
"id",
vec![Value::integer(5), Value::integer(6), Value::integer(7)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
}
#[test]
fn test_string_in() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new(
"status",
vec![
Value::text("active"),
Value::text("inactive"),
Value::text("pending"),
],
);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
}
#[test]
fn test_not_in() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(4),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::not_in(
"id",
vec![Value::integer(1), Value::integer(2), Value::integer(3)],
);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
let mut expr = InListExpr::not_in(
"id",
vec![Value::integer(4), Value::integer(5), Value::integer(6)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
}
#[test]
fn test_null_in() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::null(DataType::Integer),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new(
"id",
vec![Value::integer(1), Value::integer(2), Value::integer(3)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
let mut expr = InListExpr::not_in(
"id",
vec![Value::integer(1), Value::integer(2), Value::integer(3)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
}
#[test]
fn test_empty_list() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new("id", vec![]);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
let mut expr = InListExpr::not_in("id", vec![]);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
}
#[test]
fn test_mixed_numeric_types() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(2),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new(
"id",
vec![Value::float(1.0), Value::float(2.0), Value::float(3.0)],
);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
}
#[test]
fn test_with_aliases() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut aliases = FxHashMap::default();
aliases.insert("i".to_string(), "id".to_string());
let expr = InListExpr::new("i", vec![Value::integer(1), Value::integer(2)]);
let mut aliased = expr.with_aliases(&aliases);
aliased.prepare_for_schema(&schema);
assert!(aliased.evaluate(&row).unwrap());
}
#[test]
fn test_is_not() {
let expr = InListExpr::new("id", vec![Value::integer(1)]);
assert!(!expr.is_not());
let expr = InListExpr::not_in("id", vec![Value::integer(1)]);
assert!(expr.is_not());
}
#[test]
fn test_not_in_with_null_in_list() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::not_in(
"id",
vec![Value::integer(2), Value::null(DataType::Integer)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
assert!(!expr.evaluate_fast(&row));
let row2 = Row::from_values(vec![
Value::integer(2),
Value::text("Bob"),
Value::text("active"),
]);
assert!(!expr.evaluate(&row2).unwrap());
assert!(!expr.evaluate_fast(&row2));
}
#[test]
fn test_in_with_null_in_list() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::new(
"id",
vec![Value::integer(2), Value::null(DataType::Integer)],
);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
assert!(!expr.evaluate_fast(&row));
let row2 = Row::from_values(vec![
Value::integer(2),
Value::text("Bob"),
Value::text("active"),
]);
assert!(expr.evaluate(&row2).unwrap());
assert!(expr.evaluate_fast(&row2));
}
#[test]
fn test_not_in_without_null() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::text("Alice"),
Value::text("active"),
]);
let mut expr = InListExpr::not_in("id", vec![Value::integer(2), Value::integer(3)]);
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
assert!(expr.evaluate_fast(&row));
}
}