use crate::format::{BlockStats, BloomFilter};
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum PredicateValue {
Numeric(f64),
Integer(i64),
String(String),
}
#[derive(Debug, Clone)]
pub struct ScanPredicate {
pub col_idx: usize,
pub op: PredicateOp,
pub value: PredicateValue,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PredicateOp {
Gt,
Gte,
Lt,
Lte,
Eq,
Ne,
}
impl ScanPredicate {
pub fn gt(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Gt,
value: PredicateValue::Numeric(value),
}
}
pub fn gte(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Gte,
value: PredicateValue::Numeric(value),
}
}
pub fn lt(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Lt,
value: PredicateValue::Numeric(value),
}
}
pub fn lte(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Lte,
value: PredicateValue::Numeric(value),
}
}
pub fn eq(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Eq,
value: PredicateValue::Numeric(value),
}
}
pub fn not_eq(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Ne,
value: PredicateValue::Numeric(value),
}
}
pub fn eq_i64(col_idx: usize, value: i64) -> Self {
Self {
col_idx,
op: PredicateOp::Eq,
value: PredicateValue::Integer(value),
}
}
pub fn not_eq_i64(col_idx: usize, value: i64) -> Self {
Self {
col_idx,
op: PredicateOp::Ne,
value: PredicateValue::Integer(value),
}
}
pub fn gt_i64(col_idx: usize, value: i64) -> Self {
Self {
col_idx,
op: PredicateOp::Gt,
value: PredicateValue::Integer(value),
}
}
pub fn gte_i64(col_idx: usize, value: i64) -> Self {
Self {
col_idx,
op: PredicateOp::Gte,
value: PredicateValue::Integer(value),
}
}
pub fn lt_i64(col_idx: usize, value: i64) -> Self {
Self {
col_idx,
op: PredicateOp::Lt,
value: PredicateValue::Integer(value),
}
}
pub fn lte_i64(col_idx: usize, value: i64) -> Self {
Self {
col_idx,
op: PredicateOp::Lte,
value: PredicateValue::Integer(value),
}
}
pub fn str_eq(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Eq,
value: PredicateValue::String(value.into()),
}
}
pub fn str_not_eq(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Ne,
value: PredicateValue::String(value.into()),
}
}
pub fn str_gt(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Gt,
value: PredicateValue::String(value.into()),
}
}
pub fn str_gte(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Gte,
value: PredicateValue::String(value.into()),
}
}
pub fn str_lt(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Lt,
value: PredicateValue::String(value.into()),
}
}
pub fn str_lte(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Lte,
value: PredicateValue::String(value.into()),
}
}
pub fn can_skip_block(&self, stats: &BlockStats) -> bool {
match &self.value {
PredicateValue::Numeric(v) => can_skip_numeric(self.op, *v, stats),
PredicateValue::Integer(v) => can_skip_integer(self.op, *v, stats),
PredicateValue::String(v) => can_skip_string(self.op, v, stats),
}
}
}
fn can_skip_numeric(op: PredicateOp, value: f64, stats: &BlockStats) -> bool {
if stats.min.is_nan() || stats.max.is_nan() {
return false;
}
match op {
PredicateOp::Gt => stats.max <= value,
PredicateOp::Gte => stats.max < value,
PredicateOp::Lt => stats.min >= value,
PredicateOp::Lte => stats.min > value,
PredicateOp::Eq => value < stats.min || value > stats.max,
PredicateOp::Ne => stats.min == value && stats.max == value,
}
}
fn can_skip_integer(op: PredicateOp, value: i64, stats: &BlockStats) -> bool {
if let (Some(smin), Some(smax)) = (stats.min_i64, stats.max_i64) {
return match op {
PredicateOp::Gt => smax <= value,
PredicateOp::Gte => smax < value,
PredicateOp::Lt => smin >= value,
PredicateOp::Lte => smin > value,
PredicateOp::Eq => value < smin || value > smax,
PredicateOp::Ne => smin == value && smax == value,
};
}
match f64_to_exact_i64(value as f64).and(Some(value as f64)) {
Some(fv) => can_skip_numeric(op, fv, stats),
None => false,
}
}
pub fn f64_to_exact_i64(v: f64) -> Option<i64> {
if !v.is_finite() || v.fract() != 0.0 {
return None;
}
if v < i64::MIN as f64 || v >= i64::MAX as f64 {
return None;
}
Some(v as i64)
}
fn can_skip_string(op: PredicateOp, value: &str, stats: &BlockStats) -> bool {
let (Some(smin), Some(smax)) = (&stats.str_min, &stats.str_max) else {
return false;
};
let skip_by_range = match op {
PredicateOp::Gt => smax.as_str() <= value,
PredicateOp::Gte => smax.as_str() < value,
PredicateOp::Lt => smin.as_str() >= value,
PredicateOp::Lte => smin.as_str() > value,
PredicateOp::Eq => value < smin.as_str() || value > smax.as_str(),
PredicateOp::Ne => smin.as_str() == value && smax.as_str() == value,
};
if skip_by_range {
return true;
}
if op == PredicateOp::Eq
&& let Some(ref bloom) = stats.bloom
&& !bloom_may_contain(bloom, value)
{
return true; }
false
}
pub const BLOOM_BITS_DEFAULT: u32 = 2048;
pub const BLOOM_BYTES: usize = (BLOOM_BITS_DEFAULT as usize) / 8;
pub const BLOOM_K_DEFAULT: u8 = 3;
const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
const FNV_PRIME: u64 = 1_099_511_628_211;
fn bloom_bit_pos(value: &str, hash_idx: u32, m: u32) -> usize {
let mut hash = FNV_OFFSET ^ (hash_idx as u64).wrapping_mul(FNV_PRIME);
for byte in value.bytes() {
hash ^= byte as u64;
hash = hash.wrapping_mul(FNV_PRIME);
}
(hash as usize) & ((m as usize) - 1)
}
pub fn bloom_insert(bloom: &mut BloomFilter, value: &str) {
for i in 0..(bloom.k as u32) {
let bit = bloom_bit_pos(value, i, bloom.m);
bloom.bytes[bit / 8] |= 1 << (bit % 8);
}
}
pub fn bloom_may_contain(bloom: &BloomFilter, value: &str) -> bool {
let expected_bytes = (bloom.m as usize).div_ceil(8);
if bloom.bytes.len() < expected_bytes {
return true;
}
for i in 0..(bloom.k as u32) {
let bit = bloom_bit_pos(value, i, bloom.m);
if bloom.bytes[bit / 8] & (1 << (bit % 8)) == 0 {
return false;
}
}
true
}
pub fn build_bloom(values: &[&str]) -> BloomFilter {
build_bloom_with_params(values, BLOOM_K_DEFAULT, BLOOM_BITS_DEFAULT)
}
pub fn build_bloom_with_params(values: &[&str], k: u8, m: u32) -> BloomFilter {
debug_assert!(
m >= 8 && m.is_power_of_two(),
"m must be a power of two ≥ 8"
);
let byte_count = (m as usize).div_ceil(8);
let mut bloom = BloomFilter {
k,
m,
bytes: vec![0u8; byte_count],
};
for v in values {
bloom_insert(&mut bloom, v);
}
bloom
}
#[cfg(test)]
mod tests {
use super::*;
use zerompk;
fn stats(min: f64, max: f64) -> BlockStats {
BlockStats::numeric(min, max, 0, 1024)
}
#[test]
fn gt_predicate() {
let pred = ScanPredicate::gt(0, 50.0);
assert!(pred.can_skip_block(&stats(10.0, 40.0)));
assert!(!pred.can_skip_block(&stats(10.0, 60.0)));
assert!(pred.can_skip_block(&stats(10.0, 50.0)));
}
#[test]
fn gte_predicate() {
let pred = ScanPredicate::gte(0, 50.0);
assert!(pred.can_skip_block(&stats(10.0, 49.0)));
assert!(!pred.can_skip_block(&stats(10.0, 50.0)));
}
#[test]
fn lt_predicate() {
let pred = ScanPredicate::lt(0, 50.0);
assert!(pred.can_skip_block(&stats(60.0, 100.0)));
assert!(!pred.can_skip_block(&stats(40.0, 100.0)));
}
#[test]
fn eq_predicate() {
let pred = ScanPredicate::eq(0, 50.0);
assert!(pred.can_skip_block(&stats(10.0, 40.0)));
assert!(pred.can_skip_block(&stats(60.0, 100.0)));
assert!(!pred.can_skip_block(&stats(40.0, 60.0)));
}
#[test]
fn ne_predicate() {
let pred = ScanPredicate::not_eq(0, 50.0);
assert!(pred.can_skip_block(&stats(50.0, 50.0)));
assert!(!pred.can_skip_block(&stats(40.0, 60.0)));
}
#[test]
fn non_numeric_never_skipped_by_numeric_pred() {
let pred = ScanPredicate::gt(0, 50.0);
let nan_stats = BlockStats::non_numeric(0, 1024);
assert!(!pred.can_skip_block(&nan_stats));
}
fn str_stats(smin: &str, smax: &str) -> BlockStats {
BlockStats::string_block(0, 1024, Some(smin.into()), Some(smax.into()), None)
}
#[test]
fn string_eq_skip_below_range() {
let stats = str_stats("apple", "banana");
assert!(ScanPredicate::str_eq(0, "aaa").can_skip_block(&stats));
}
#[test]
fn string_eq_skip_above_range() {
let stats = str_stats("apple", "banana");
assert!(ScanPredicate::str_eq(0, "zzz").can_skip_block(&stats));
}
#[test]
fn string_eq_no_skip_in_range() {
let stats = str_stats("apple", "banana");
assert!(!ScanPredicate::str_eq(0, "avocado").can_skip_block(&stats));
}
#[test]
fn string_gt_skip() {
let stats = str_stats("apple", "fig");
assert!(ScanPredicate::str_gt(0, "fig").can_skip_block(&stats));
assert!(!ScanPredicate::str_gt(0, "egg").can_skip_block(&stats));
}
#[test]
fn string_lt_skip() {
let stats = str_stats("mango", "pear");
assert!(ScanPredicate::str_lt(0, "mango").can_skip_block(&stats));
assert!(!ScanPredicate::str_lt(0, "orange").can_skip_block(&stats));
}
#[test]
fn string_gte_skip() {
let stats = str_stats("ant", "cat");
assert!(ScanPredicate::str_gte(0, "dog").can_skip_block(&stats));
assert!(!ScanPredicate::str_gte(0, "cat").can_skip_block(&stats));
}
#[test]
fn string_lte_skip() {
let stats = str_stats("zebra", "zoo");
assert!(ScanPredicate::str_lte(0, "yak").can_skip_block(&stats));
assert!(!ScanPredicate::str_lte(0, "zebra").can_skip_block(&stats));
}
#[test]
fn string_ne_skip() {
let stats = str_stats("exact", "exact");
assert!(ScanPredicate::str_not_eq(0, "exact").can_skip_block(&stats));
let stats2 = str_stats("a", "z");
assert!(!ScanPredicate::str_not_eq(0, "exact").can_skip_block(&stats2));
}
#[test]
fn string_no_zone_map_no_skip() {
let stats = BlockStats::non_numeric(0, 1024);
assert!(!ScanPredicate::str_eq(0, "anything").can_skip_block(&stats));
}
#[test]
fn bloom_insert_and_query() {
let values = ["hello", "world", "foo"];
let bloom = build_bloom(&values);
assert!(bloom_may_contain(&bloom, "hello"));
assert!(bloom_may_contain(&bloom, "world"));
assert!(bloom_may_contain(&bloom, "foo"));
}
#[test]
fn bloom_absent_value_rejected() {
let values = ["alpha", "beta", "gamma"];
let bloom = build_bloom(&values);
let delta_present = bloom_may_contain(&bloom, "delta");
if !delta_present {
assert!(!bloom_may_contain(&bloom, "delta"));
}
}
#[test]
fn bloom_eq_skip_via_filter() {
let bloom = build_bloom(&["apple", "apricot", "banana"]);
let stats = BlockStats::string_block(
0,
1024,
Some("apple".into()),
Some("banana".into()),
Some(bloom.clone()),
);
let absent = !bloom_may_contain(&bloom, "avocado");
if absent {
assert!(ScanPredicate::str_eq(0, "avocado").can_skip_block(&stats));
}
assert!(!ScanPredicate::str_eq(0, "apple").can_skip_block(&stats));
}
#[test]
fn bloom_params_persist_through_msgpack_roundtrip() {
let bloom = build_bloom_with_params(&["hello", "world"], 7, 8192);
assert_eq!(bloom.k, 7);
assert_eq!(bloom.m, 8192);
assert_eq!(bloom.bytes.len(), 8192 / 8);
let stats = BlockStats::string_block(
0,
10,
Some("hello".into()),
Some("world".into()),
Some(bloom),
);
let encoded = zerompk::to_msgpack_vec(&stats).expect("MessagePack encode");
let decoded: BlockStats = zerompk::from_msgpack(&encoded).expect("MessagePack decode");
let persisted = decoded
.bloom
.expect("bloom must be present after roundtrip");
assert_eq!(persisted.k, 7, "k must be preserved on disk");
assert_eq!(persisted.m, 8192, "m must be preserved on disk");
assert_eq!(
persisted.bytes.len(),
1024,
"byte array length must match m/8"
);
assert!(bloom_may_contain(&persisted, "hello"));
assert!(bloom_may_contain(&persisted, "world"));
}
#[test]
fn bloom_custom_params_functional() {
let bloom = build_bloom_with_params(&["alpha", "beta", "gamma"], 7, 8192);
assert!(bloom_may_contain(&bloom, "alpha"));
assert!(bloom_may_contain(&bloom, "beta"));
assert!(bloom_may_contain(&bloom, "gamma"));
let default_bloom = build_bloom(&["alpha", "beta", "gamma"]);
assert_eq!(default_bloom.k, BLOOM_K_DEFAULT);
assert_eq!(default_bloom.m, BLOOM_BITS_DEFAULT);
}
fn i64_stats(min: i64, max: i64) -> BlockStats {
BlockStats::integer(min, max, 0, 1024)
}
fn f64_only_stats(min: f64, max: f64) -> BlockStats {
BlockStats::numeric(min, max, 0, 1024)
}
#[test]
fn large_i64_eq_correct_no_skip() {
let min = i64::MAX - 2;
let max = i64::MAX;
let target = i64::MAX - 1;
let stats = i64_stats(min, max);
assert!(
!ScanPredicate::eq_i64(0, target).can_skip_block(&stats),
"must not skip: target={target} is within [{min}, {max}]"
);
let min_f64 = min as f64;
let max_f64 = max as f64;
let target_f64 = target as f64;
assert_eq!(
min_f64, target_f64,
"f64 path is ambiguous: min and target round to the same f64"
);
assert_eq!(
max_f64, target_f64,
"f64 path is ambiguous: max and target round to the same f64"
);
}
#[test]
fn large_i64_definitely_outside() {
let stats = i64_stats(1000, 2000);
let outside: i64 = 9_007_199_254_740_993;
assert!(
ScanPredicate::eq_i64(0, outside).can_skip_block(&stats),
"must skip: {outside} is not in [1000, 2000]"
);
}
#[test]
fn integer_predicate_fallback_to_f64_for_v0_segment() {
let stats = f64_only_stats(10.0, 50.0);
assert!(ScanPredicate::eq_i64(0, 75).can_skip_block(&stats));
assert!(!ScanPredicate::eq_i64(0, 30).can_skip_block(&stats));
}
#[test]
fn integer_predicate_v0_segment_unsafe_value_no_skip() {
let large: i64 = 9_007_199_254_740_993; let large_f64 = large as f64; let stats = f64_only_stats(large_f64, large_f64);
assert!(
!ScanPredicate::eq_i64(0, large).can_skip_block(&stats),
"must not skip: predicate value is not exactly representable in f64"
);
}
#[test]
fn f64_to_exact_i64_normal_values() {
assert_eq!(f64_to_exact_i64(0.0), Some(0));
assert_eq!(f64_to_exact_i64(42.0), Some(42));
assert_eq!(f64_to_exact_i64(-42.0), Some(-42));
assert_eq!(f64_to_exact_i64(1.5), None); assert_eq!(f64_to_exact_i64(f64::INFINITY), None);
assert_eq!(f64_to_exact_i64(f64::NAN), None);
}
#[test]
fn f64_to_exact_i64_i64_min_is_representable() {
let v = i64::MIN as f64;
assert_eq!(f64_to_exact_i64(v), Some(i64::MIN));
}
#[test]
fn f64_to_exact_i64_i64_max_rounds_up() {
let v = i64::MAX as f64;
assert_eq!(
f64_to_exact_i64(v),
None,
"i64::MAX as f64 exceeds i64::MAX and must not convert"
);
}
#[test]
fn f64_to_exact_i64_just_below_i64_max_f64() {
let v: f64 = 9_223_372_036_854_774_784.0;
let result = f64_to_exact_i64(v);
assert!(
result.is_some(),
"large but exactly representable value should convert"
);
assert_eq!(result.unwrap() as f64, v);
}
}