use std::any::Any;
use chrono::{DateTime, Utc};
use rustc_hash::FxHashMap;
use super::{find_column_index, resolve_alias, Expression};
use crate::core::{Operator, Result, Row, Schema, Value};
#[derive(Debug, Clone)]
pub struct BetweenExpr {
column: String,
lower_bound: Value,
upper_bound: Value,
inclusive: bool,
not: bool,
col_index: Option<usize>,
aliases: FxHashMap<String, String>,
original_column: Option<String>,
}
impl BetweenExpr {
pub fn new(column: impl Into<String>, lower: Value, upper: Value) -> Self {
Self {
column: column.into(),
lower_bound: lower,
upper_bound: upper,
inclusive: true,
not: false,
col_index: None,
aliases: FxHashMap::default(),
original_column: None,
}
}
pub fn not_between(column: impl Into<String>, lower: Value, upper: Value) -> Self {
Self {
column: column.into(),
lower_bound: lower,
upper_bound: upper,
inclusive: true,
not: true,
col_index: None,
aliases: FxHashMap::default(),
original_column: None,
}
}
pub fn with_inclusivity(
column: impl Into<String>,
lower: Value,
upper: Value,
inclusive: bool,
) -> Self {
Self {
column: column.into(),
lower_bound: lower,
upper_bound: upper,
inclusive,
not: false,
col_index: None,
aliases: FxHashMap::default(),
original_column: None,
}
}
pub fn is_inclusive(&self) -> bool {
self.inclusive
}
pub fn get_bounds(&self) -> (&Value, &Value) {
(&self.lower_bound, &self.upper_bound)
}
pub fn is_negated(&self) -> bool {
self.not
}
#[inline]
fn check_integer(&self, val: i64, lower: i64, upper: i64) -> bool {
if self.inclusive {
val >= lower && val <= upper
} else {
val > lower && val < upper
}
}
#[inline]
fn check_float(&self, val: f64, lower: f64, upper: f64) -> bool {
if self.inclusive {
val >= lower && val <= upper
} else {
val > lower && val < upper
}
}
#[inline]
fn check_string(&self, val: &str, lower: &str, upper: &str) -> bool {
if self.inclusive {
val >= lower && val <= upper
} else {
val > lower && val < upper
}
}
#[inline]
fn check_timestamp(
&self,
val: DateTime<Utc>,
lower: DateTime<Utc>,
upper: DateTime<Utc>,
) -> bool {
if self.inclusive {
val >= lower && val <= upper
} else {
val > lower && val < upper
}
}
}
impl Expression for BetweenExpr {
fn evaluate(&self, row: &Row) -> Result<bool> {
let col_idx = match self.col_index {
Some(idx) if idx < row.len() => idx,
_ => return Ok(false),
};
let col_value = &row[col_idx];
if col_value.is_null() {
return Ok(false);
}
let in_range =
match col_value {
Value::Integer(val) => {
let lower = self.lower_bound.as_int64().ok_or_else(|| {
crate::core::Error::type_conversion("lower bound", "integer")
})?;
let upper = self.upper_bound.as_int64().ok_or_else(|| {
crate::core::Error::type_conversion("upper bound", "integer")
})?;
self.check_integer(*val, lower, upper)
}
Value::Float(val) => {
let lower = self.lower_bound.as_float64().ok_or_else(|| {
crate::core::Error::type_conversion("lower bound", "float")
})?;
let upper = self.upper_bound.as_float64().ok_or_else(|| {
crate::core::Error::type_conversion("upper bound", "float")
})?;
self.check_float(*val, lower, upper)
}
Value::Text(val) => {
let lower = self.lower_bound.as_string().ok_or_else(|| {
crate::core::Error::type_conversion("lower bound", "string")
})?;
let upper = self.upper_bound.as_string().ok_or_else(|| {
crate::core::Error::type_conversion("upper bound", "string")
})?;
self.check_string(val, &lower, &upper)
}
Value::Timestamp(val) => {
let lower = self.lower_bound.as_timestamp().ok_or_else(|| {
crate::core::Error::type_conversion("lower bound", "timestamp")
})?;
let upper = self.upper_bound.as_timestamp().ok_or_else(|| {
crate::core::Error::type_conversion("upper bound", "timestamp")
})?;
self.check_timestamp(*val, lower, upper)
}
_ => false,
};
Ok(if self.not { !in_range } else { in_range })
}
fn evaluate_fast(&self, row: &Row) -> bool {
let col_idx = match self.col_index {
Some(idx) if idx < row.len() => idx,
_ => return false,
};
let col_value = &row[col_idx];
if col_value.is_null() {
return false;
}
let in_range = match col_value {
Value::Integer(val) => {
if let (Some(lower), Some(upper)) =
(self.lower_bound.as_int64(), self.upper_bound.as_int64())
{
self.check_integer(*val, lower, upper)
} else {
return false;
}
}
Value::Float(val) => {
if let (Some(lower), Some(upper)) =
(self.lower_bound.as_float64(), self.upper_bound.as_float64())
{
self.check_float(*val, lower, upper)
} else {
return false;
}
}
Value::Text(val) => {
if let (Some(lower), Some(upper)) =
(self.lower_bound.as_string(), self.upper_bound.as_string())
{
self.check_string(val, &lower, &upper)
} else {
return false;
}
}
Value::Timestamp(val) => {
if let (Some(lower), Some(upper)) = (
self.lower_bound.as_timestamp(),
self.upper_bound.as_timestamp(),
) {
self.check_timestamp(*val, lower, upper)
} else {
return false;
}
}
_ => return false,
};
if self.not {
!in_range
} else {
in_range
}
}
fn with_aliases(&self, aliases: &FxHashMap<String, String>) -> Box<dyn Expression> {
let resolved = resolve_alias(&self.column, aliases);
let mut expr = self.clone();
if resolved != self.column {
expr.original_column = Some(self.column.clone());
expr.column = resolved.to_string();
}
expr.aliases = aliases.clone();
expr.col_index = None;
Box::new(expr)
}
fn prepare_for_schema(&mut self, schema: &Schema) {
if self.col_index.is_some() {
return;
}
self.col_index = find_column_index(schema, &self.column);
}
fn collect_column_indices(&self, out: &mut Vec<usize>) -> bool {
if let Some(idx) = self.col_index {
out.push(idx);
true
} else {
false
}
}
fn is_prepared(&self) -> bool {
self.col_index.is_some()
}
fn get_column_name(&self) -> Option<&str> {
Some(&self.column)
}
fn can_use_index(&self) -> bool {
true
}
fn collect_comparisons(&self) -> Vec<(&str, Operator, &Value)> {
if self.not {
return vec![];
}
if self.inclusive {
vec![
(&self.column, Operator::Gte, &self.lower_bound),
(&self.column, Operator::Lte, &self.upper_bound),
]
} else {
vec![
(&self.column, Operator::Gt, &self.lower_bound),
(&self.column, Operator::Lt, &self.upper_bound),
]
}
}
fn is_conjunctive_simple(&self) -> bool {
true
}
fn clone_box(&self) -> Box<dyn Expression> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{DataType, SchemaBuilder};
fn test_schema() -> Schema {
SchemaBuilder::new("test")
.add_primary_key("id", DataType::Integer)
.add("score", DataType::Float)
.add("name", DataType::Text)
.build()
}
#[test]
fn test_integer_between() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(5),
Value::float(75.0),
Value::text("Alice"),
]);
let mut expr = BetweenExpr::new("id", Value::integer(1), Value::integer(10));
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
assert!(expr.evaluate_fast(&row));
let mut expr = BetweenExpr::new("id", Value::integer(1), Value::integer(4));
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
}
#[test]
fn test_inclusive_bounds() {
let schema = test_schema();
let row = Row::from_values(vec![Value::integer(1), Value::float(0.0), Value::text("a")]);
let mut expr = BetweenExpr::new("id", Value::integer(1), Value::integer(10));
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
let row = Row::from_values(vec![
Value::integer(10),
Value::float(0.0),
Value::text("a"),
]);
assert!(expr.evaluate(&row).unwrap());
}
#[test]
fn test_exclusive_bounds() {
let schema = test_schema();
let row = Row::from_values(vec![Value::integer(1), Value::float(0.0), Value::text("a")]);
let mut expr =
BetweenExpr::with_inclusivity("id", Value::integer(1), Value::integer(10), false);
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
let row = Row::from_values(vec![Value::integer(5), Value::float(0.0), Value::text("a")]);
assert!(expr.evaluate(&row).unwrap());
}
#[test]
fn test_float_between() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::float(75.0),
Value::text("Alice"),
]);
let mut expr = BetweenExpr::new("score", Value::float(0.0), Value::float(100.0));
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
}
#[test]
fn test_string_between() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(1),
Value::float(0.0),
Value::text("Bob"),
]);
let mut expr = BetweenExpr::new("name", Value::text("Alice"), Value::text("Charlie"));
expr.prepare_for_schema(&schema);
assert!(expr.evaluate(&row).unwrap());
let row = Row::from_values(vec![
Value::integer(1),
Value::float(0.0),
Value::text("Zack"),
]);
assert!(!expr.evaluate(&row).unwrap());
}
#[test]
fn test_null_in_between() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::null(DataType::Integer),
Value::float(0.0),
Value::text("Alice"),
]);
let mut expr = BetweenExpr::new("id", Value::integer(1), Value::integer(10));
expr.prepare_for_schema(&schema);
assert!(!expr.evaluate(&row).unwrap());
}
#[test]
fn test_unprepared() {
let row = Row::from_values(vec![Value::integer(5)]);
let expr = BetweenExpr::new("id", Value::integer(1), Value::integer(10));
assert!(!expr.evaluate(&row).unwrap());
assert!(!expr.evaluate_fast(&row));
}
#[test]
fn test_with_aliases() {
let schema = test_schema();
let row = Row::from_values(vec![
Value::integer(5),
Value::float(0.0),
Value::text("Alice"),
]);
let mut aliases = FxHashMap::default();
aliases.insert("i".to_string(), "id".to_string());
let expr = BetweenExpr::new("i", Value::integer(1), Value::integer(10));
let mut aliased = expr.with_aliases(&aliases);
aliased.prepare_for_schema(&schema);
assert!(aliased.evaluate(&row).unwrap());
}
}