use std::cmp::Ordering;
use crate::graph::lpg::CompareOp;
use grafeo_common::types::Value;
#[derive(Debug, Clone, Default)]
pub struct ZoneMap {
pub min: Option<Value>,
pub max: Option<Value>,
pub null_count: usize,
pub row_count: usize,
}
impl ZoneMap {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
let (Some(min), Some(max)) = (&self.min, &self.max) else {
return true;
};
match op {
CompareOp::Eq => {
let ge_min = compare_values(value, min).map_or(true, |ord| ord != Ordering::Less);
let le_max =
compare_values(value, max).map_or(true, |ord| ord != Ordering::Greater);
ge_min && le_max
}
CompareOp::Ne => {
if self.null_count > 0 {
return true;
}
let all_same = compare_values(min, max).is_some_and(|ord| ord == Ordering::Equal);
let eq_value = min == value;
!(all_same && eq_value)
}
CompareOp::Lt => compare_values(min, value).map_or(true, |ord| ord == Ordering::Less),
CompareOp::Le => {
compare_values(min, value).map_or(true, |ord| ord != Ordering::Greater)
}
CompareOp::Gt => {
compare_values(max, value).map_or(true, |ord| ord == Ordering::Greater)
}
CompareOp::Ge => compare_values(max, value).map_or(true, |ord| ord != Ordering::Less),
}
}
}
pub(super) fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
match (a, b) {
(Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
(Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
(Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
(Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
(Value::String(a), Value::String(b)) => Some(a.cmp(b)),
(Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn int_zone(min: i64, max: i64, null_count: usize, row_count: usize) -> ZoneMap {
ZoneMap {
min: Some(Value::Int64(min)),
max: Some(Value::Int64(max)),
null_count,
row_count,
}
}
#[test]
fn test_eq_in_range() {
let zm = int_zone(10, 50, 0, 100);
assert!(zm.might_match(CompareOp::Eq, &Value::Int64(25)));
assert!(zm.might_match(CompareOp::Eq, &Value::Int64(10)));
assert!(zm.might_match(CompareOp::Eq, &Value::Int64(50)));
}
#[test]
fn test_eq_out_of_range() {
let zm = int_zone(10, 50, 0, 100);
assert!(!zm.might_match(CompareOp::Eq, &Value::Int64(5)));
assert!(!zm.might_match(CompareOp::Eq, &Value::Int64(51)));
}
#[test]
fn test_lt() {
let zm = int_zone(10, 50, 0, 100);
assert!(zm.might_match(CompareOp::Lt, &Value::Int64(20)));
assert!(!zm.might_match(CompareOp::Lt, &Value::Int64(10)));
assert!(!zm.might_match(CompareOp::Lt, &Value::Int64(5)));
}
#[test]
fn test_le() {
let zm = int_zone(10, 50, 0, 100);
assert!(zm.might_match(CompareOp::Le, &Value::Int64(10)));
assert!(!zm.might_match(CompareOp::Le, &Value::Int64(9)));
}
#[test]
fn test_gt() {
let zm = int_zone(10, 50, 0, 100);
assert!(zm.might_match(CompareOp::Gt, &Value::Int64(40)));
assert!(!zm.might_match(CompareOp::Gt, &Value::Int64(50)));
assert!(!zm.might_match(CompareOp::Gt, &Value::Int64(60)));
}
#[test]
fn test_ge() {
let zm = int_zone(10, 50, 0, 100);
assert!(zm.might_match(CompareOp::Ge, &Value::Int64(50)));
assert!(!zm.might_match(CompareOp::Ge, &Value::Int64(51)));
}
#[test]
fn test_ne() {
let zm = int_zone(10, 50, 0, 100);
assert!(zm.might_match(CompareOp::Ne, &Value::Int64(10)));
assert!(zm.might_match(CompareOp::Ne, &Value::Int64(25)));
let single = int_zone(42, 42, 0, 10);
assert!(!single.might_match(CompareOp::Ne, &Value::Int64(42)));
assert!(single.might_match(CompareOp::Ne, &Value::Int64(43)));
}
#[test]
fn test_ne_with_nulls() {
let zm = int_zone(42, 42, 5, 10);
assert!(zm.might_match(CompareOp::Ne, &Value::Int64(42)));
}
#[test]
fn test_empty_zone_map() {
let zm = ZoneMap::new();
assert!(zm.might_match(CompareOp::Eq, &Value::Int64(1)));
assert!(zm.might_match(CompareOp::Ne, &Value::Int64(1)));
assert!(zm.might_match(CompareOp::Lt, &Value::Int64(1)));
assert!(zm.might_match(CompareOp::Le, &Value::Int64(1)));
assert!(zm.might_match(CompareOp::Gt, &Value::Int64(1)));
assert!(zm.might_match(CompareOp::Ge, &Value::Int64(1)));
}
#[test]
fn test_string_zone_map() {
let zm = ZoneMap {
min: Some(Value::from("apple")),
max: Some(Value::from("grape")),
null_count: 0,
row_count: 50,
};
assert!(zm.might_match(CompareOp::Eq, &Value::from("banana")));
assert!(!zm.might_match(CompareOp::Eq, &Value::from("zebra")));
assert!(zm.might_match(CompareOp::Lt, &Value::from("banana")));
assert!(!zm.might_match(CompareOp::Gt, &Value::from("zebra")));
}
#[test]
fn test_incomparable_types_are_conservative() {
let zm = int_zone(10, 50, 0, 100);
assert!(zm.might_match(CompareOp::Eq, &Value::from("hello")));
assert!(zm.might_match(CompareOp::Lt, &Value::from("hello")));
}
#[test]
fn test_default() {
let zm = ZoneMap::default();
assert!(zm.min.is_none());
assert!(zm.max.is_none());
assert_eq!(zm.null_count, 0);
assert_eq!(zm.row_count, 0);
}
}