use crate::format::BlockStats;
#[derive(Debug, Clone)]
pub enum PredicateValue {
Numeric(f64),
String(String),
}
#[derive(Debug, Clone)]
pub struct ScanPredicate {
pub col_idx: usize,
pub op: PredicateOp,
pub value: PredicateValue,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PredicateOp {
Gt,
Gte,
Lt,
Lte,
Eq,
Ne,
}
impl ScanPredicate {
pub fn gt(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Gt,
value: PredicateValue::Numeric(value),
}
}
pub fn gte(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Gte,
value: PredicateValue::Numeric(value),
}
}
pub fn lt(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Lt,
value: PredicateValue::Numeric(value),
}
}
pub fn lte(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Lte,
value: PredicateValue::Numeric(value),
}
}
pub fn eq(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Eq,
value: PredicateValue::Numeric(value),
}
}
pub fn not_eq(col_idx: usize, value: f64) -> Self {
Self {
col_idx,
op: PredicateOp::Ne,
value: PredicateValue::Numeric(value),
}
}
pub fn str_eq(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Eq,
value: PredicateValue::String(value.into()),
}
}
pub fn str_not_eq(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Ne,
value: PredicateValue::String(value.into()),
}
}
pub fn str_gt(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Gt,
value: PredicateValue::String(value.into()),
}
}
pub fn str_gte(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Gte,
value: PredicateValue::String(value.into()),
}
}
pub fn str_lt(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Lt,
value: PredicateValue::String(value.into()),
}
}
pub fn str_lte(col_idx: usize, value: impl Into<String>) -> Self {
Self {
col_idx,
op: PredicateOp::Lte,
value: PredicateValue::String(value.into()),
}
}
pub fn can_skip_block(&self, stats: &BlockStats) -> bool {
match &self.value {
PredicateValue::Numeric(v) => can_skip_numeric(self.op, *v, stats),
PredicateValue::String(v) => can_skip_string(self.op, v, stats),
}
}
}
fn can_skip_numeric(op: PredicateOp, value: f64, stats: &BlockStats) -> bool {
if stats.min.is_nan() || stats.max.is_nan() {
return false;
}
match op {
PredicateOp::Gt => stats.max <= value,
PredicateOp::Gte => stats.max < value,
PredicateOp::Lt => stats.min >= value,
PredicateOp::Lte => stats.min > value,
PredicateOp::Eq => value < stats.min || value > stats.max,
PredicateOp::Ne => stats.min == value && stats.max == value,
}
}
fn can_skip_string(op: PredicateOp, value: &str, stats: &BlockStats) -> bool {
let (Some(smin), Some(smax)) = (&stats.str_min, &stats.str_max) else {
return false;
};
let skip_by_range = match op {
PredicateOp::Gt => smax.as_str() <= value,
PredicateOp::Gte => smax.as_str() < value,
PredicateOp::Lt => smin.as_str() >= value,
PredicateOp::Lte => smin.as_str() > value,
PredicateOp::Eq => value < smin.as_str() || value > smax.as_str(),
PredicateOp::Ne => smin.as_str() == value && smax.as_str() == value,
};
if skip_by_range {
return true;
}
if op == PredicateOp::Eq
&& let Some(ref bloom_bytes) = stats.bloom
&& !bloom_may_contain(bloom_bytes, value)
{
return true; }
false
}
pub const BLOOM_BYTES: usize = 256;
const BLOOM_HASH_COUNT: u32 = 3;
const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
const FNV_PRIME: u64 = 1_099_511_628_211;
fn bloom_bit_pos(value: &str, hash_idx: u32) -> usize {
let mut hash = FNV_OFFSET ^ (hash_idx as u64).wrapping_mul(FNV_PRIME);
for byte in value.bytes() {
hash ^= byte as u64;
hash = hash.wrapping_mul(FNV_PRIME);
}
(hash as usize) & (BLOOM_BYTES * 8 - 1)
}
pub fn bloom_insert(bloom: &mut [u8], value: &str) {
for i in 0..BLOOM_HASH_COUNT {
let bit = bloom_bit_pos(value, i);
bloom[bit / 8] |= 1 << (bit % 8);
}
}
pub fn bloom_may_contain(bloom: &[u8], value: &str) -> bool {
if bloom.len() < BLOOM_BYTES {
return true;
}
for i in 0..BLOOM_HASH_COUNT {
let bit = bloom_bit_pos(value, i);
if bloom[bit / 8] & (1 << (bit % 8)) == 0 {
return false;
}
}
true
}
pub fn build_bloom(values: &[&str]) -> Vec<u8> {
let mut bloom = vec![0u8; BLOOM_BYTES];
for v in values {
bloom_insert(&mut bloom, v);
}
bloom
}
#[cfg(test)]
mod tests {
use super::*;
fn stats(min: f64, max: f64) -> BlockStats {
BlockStats::numeric(min, max, 0, 1024)
}
#[test]
fn gt_predicate() {
let pred = ScanPredicate::gt(0, 50.0);
assert!(pred.can_skip_block(&stats(10.0, 40.0)));
assert!(!pred.can_skip_block(&stats(10.0, 60.0)));
assert!(pred.can_skip_block(&stats(10.0, 50.0)));
}
#[test]
fn gte_predicate() {
let pred = ScanPredicate::gte(0, 50.0);
assert!(pred.can_skip_block(&stats(10.0, 49.0)));
assert!(!pred.can_skip_block(&stats(10.0, 50.0)));
}
#[test]
fn lt_predicate() {
let pred = ScanPredicate::lt(0, 50.0);
assert!(pred.can_skip_block(&stats(60.0, 100.0)));
assert!(!pred.can_skip_block(&stats(40.0, 100.0)));
}
#[test]
fn eq_predicate() {
let pred = ScanPredicate::eq(0, 50.0);
assert!(pred.can_skip_block(&stats(10.0, 40.0)));
assert!(pred.can_skip_block(&stats(60.0, 100.0)));
assert!(!pred.can_skip_block(&stats(40.0, 60.0)));
}
#[test]
fn ne_predicate() {
let pred = ScanPredicate::not_eq(0, 50.0);
assert!(pred.can_skip_block(&stats(50.0, 50.0)));
assert!(!pred.can_skip_block(&stats(40.0, 60.0)));
}
#[test]
fn non_numeric_never_skipped_by_numeric_pred() {
let pred = ScanPredicate::gt(0, 50.0);
let nan_stats = BlockStats::non_numeric(0, 1024);
assert!(!pred.can_skip_block(&nan_stats));
}
fn str_stats(smin: &str, smax: &str) -> BlockStats {
BlockStats::string_block(0, 1024, Some(smin.into()), Some(smax.into()), None)
}
#[test]
fn string_eq_skip_below_range() {
let stats = str_stats("apple", "banana");
assert!(ScanPredicate::str_eq(0, "aaa").can_skip_block(&stats));
}
#[test]
fn string_eq_skip_above_range() {
let stats = str_stats("apple", "banana");
assert!(ScanPredicate::str_eq(0, "zzz").can_skip_block(&stats));
}
#[test]
fn string_eq_no_skip_in_range() {
let stats = str_stats("apple", "banana");
assert!(!ScanPredicate::str_eq(0, "avocado").can_skip_block(&stats));
}
#[test]
fn string_gt_skip() {
let stats = str_stats("apple", "fig");
assert!(ScanPredicate::str_gt(0, "fig").can_skip_block(&stats));
assert!(!ScanPredicate::str_gt(0, "egg").can_skip_block(&stats));
}
#[test]
fn string_lt_skip() {
let stats = str_stats("mango", "pear");
assert!(ScanPredicate::str_lt(0, "mango").can_skip_block(&stats));
assert!(!ScanPredicate::str_lt(0, "orange").can_skip_block(&stats));
}
#[test]
fn string_gte_skip() {
let stats = str_stats("ant", "cat");
assert!(ScanPredicate::str_gte(0, "dog").can_skip_block(&stats));
assert!(!ScanPredicate::str_gte(0, "cat").can_skip_block(&stats));
}
#[test]
fn string_lte_skip() {
let stats = str_stats("zebra", "zoo");
assert!(ScanPredicate::str_lte(0, "yak").can_skip_block(&stats));
assert!(!ScanPredicate::str_lte(0, "zebra").can_skip_block(&stats));
}
#[test]
fn string_ne_skip() {
let stats = str_stats("exact", "exact");
assert!(ScanPredicate::str_not_eq(0, "exact").can_skip_block(&stats));
let stats2 = str_stats("a", "z");
assert!(!ScanPredicate::str_not_eq(0, "exact").can_skip_block(&stats2));
}
#[test]
fn string_no_zone_map_no_skip() {
let stats = BlockStats::non_numeric(0, 1024);
assert!(!ScanPredicate::str_eq(0, "anything").can_skip_block(&stats));
}
#[test]
fn bloom_insert_and_query() {
let values = ["hello", "world", "foo"];
let bloom = build_bloom(&values);
assert!(bloom_may_contain(&bloom, "hello"));
assert!(bloom_may_contain(&bloom, "world"));
assert!(bloom_may_contain(&bloom, "foo"));
}
#[test]
fn bloom_absent_value_rejected() {
let values = ["alpha", "beta", "gamma"];
let bloom = build_bloom(&values);
let delta_present = bloom_may_contain(&bloom, "delta");
if !delta_present {
assert!(!bloom_may_contain(&bloom, "delta"));
}
}
#[test]
fn bloom_eq_skip_via_filter() {
let bloom = build_bloom(&["apple", "apricot", "banana"]);
let stats = BlockStats::string_block(
0,
1024,
Some("apple".into()),
Some("banana".into()),
Some(bloom.clone()),
);
let absent = !bloom_may_contain(&bloom, "avocado");
if absent {
assert!(ScanPredicate::str_eq(0, "avocado").can_skip_block(&stats));
}
assert!(!ScanPredicate::str_eq(0, "apple").can_skip_block(&stats));
}
}