use grafeo_common::types::Value;
use std::cmp::Ordering;
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct ZoneMapEntry {
pub min: Option<Value>,
pub max: Option<Value>,
pub null_count: u64,
pub row_count: u64,
pub bloom_filter: Option<BloomFilter>,
}
impl ZoneMapEntry {
pub fn new() -> Self {
Self {
min: None,
max: None,
null_count: 0,
row_count: 0,
bloom_filter: None,
}
}
pub fn with_min_max(min: Value, max: Value, null_count: u64, row_count: u64) -> Self {
Self {
min: Some(min),
max: Some(max),
null_count,
row_count,
bloom_filter: None,
}
}
pub fn with_bloom_filter(mut self, filter: BloomFilter) -> Self {
self.bloom_filter = Some(filter);
self
}
pub fn might_contain_equal(&self, value: &Value) -> bool {
if matches!(value, Value::Null) {
return self.null_count > 0;
}
if self.is_all_null() {
return false;
}
if let Some(ref bloom) = self.bloom_filter
&& !bloom.might_contain(value)
{
return false;
}
match (&self.min, &self.max) {
(Some(min), Some(max)) => {
let cmp_min = compare_values(value, min);
let cmp_max = compare_values(value, max);
match (cmp_min, cmp_max) {
(Some(Ordering::Less), _) => false, (_, Some(Ordering::Greater)) => false, _ => true,
}
}
_ => self.might_contain_non_null(),
}
}
pub fn might_contain_less_than(&self, value: &Value, inclusive: bool) -> bool {
match &self.min {
Some(min) => {
let cmp = compare_values(min, value);
match cmp {
Some(Ordering::Less) => true,
Some(Ordering::Equal) => inclusive,
Some(Ordering::Greater) => false,
None => true,
}
}
None => self.null_count > 0, }
}
pub fn might_contain_greater_than(&self, value: &Value, inclusive: bool) -> bool {
match &self.max {
Some(max) => {
let cmp = compare_values(max, value);
match cmp {
Some(Ordering::Greater) => true,
Some(Ordering::Equal) => inclusive,
Some(Ordering::Less) => false,
None => true,
}
}
None => self.null_count > 0,
}
}
pub fn might_contain_range(
&self,
lower: Option<&Value>,
upper: Option<&Value>,
lower_inclusive: bool,
upper_inclusive: bool,
) -> bool {
if let Some(lower_val) = lower
&& !self.might_contain_greater_than(lower_val, lower_inclusive)
{
return false;
}
if let Some(upper_val) = upper
&& !self.might_contain_less_than(upper_val, upper_inclusive)
{
return false;
}
true
}
pub fn might_contain_non_null(&self) -> bool {
self.row_count > self.null_count
}
pub fn is_all_null(&self) -> bool {
self.row_count > 0 && self.null_count == self.row_count
}
pub fn null_fraction(&self) -> f64 {
if self.row_count == 0 {
0.0
} else {
self.null_count as f64 / self.row_count as f64
}
}
}
impl Default for ZoneMapEntry {
fn default() -> Self {
Self::new()
}
}
pub struct ZoneMapBuilder {
min: Option<Value>,
max: Option<Value>,
null_count: u64,
row_count: u64,
bloom_builder: Option<BloomFilterBuilder>,
}
const DEFAULT_BLOOM_EXPECTED_ITEMS: usize = 2048;
const DEFAULT_BLOOM_FALSE_POSITIVE_RATE: f64 = 0.01;
impl ZoneMapBuilder {
pub fn new() -> Self {
Self {
min: None,
max: None,
null_count: 0,
row_count: 0,
bloom_builder: Some(BloomFilterBuilder::new(
DEFAULT_BLOOM_EXPECTED_ITEMS,
DEFAULT_BLOOM_FALSE_POSITIVE_RATE,
)),
}
}
pub fn without_bloom_filter() -> Self {
Self {
min: None,
max: None,
null_count: 0,
row_count: 0,
bloom_builder: None,
}
}
pub fn with_bloom_filter(expected_items: usize, false_positive_rate: f64) -> Self {
Self {
min: None,
max: None,
null_count: 0,
row_count: 0,
bloom_builder: Some(BloomFilterBuilder::new(expected_items, false_positive_rate)),
}
}
pub fn add(&mut self, value: &Value) {
self.row_count += 1;
if matches!(value, Value::Null) {
self.null_count += 1;
return;
}
self.min = match &self.min {
None => Some(value.clone()),
Some(current) => {
if compare_values(value, current) == Some(Ordering::Less) {
Some(value.clone())
} else {
self.min.clone()
}
}
};
self.max = match &self.max {
None => Some(value.clone()),
Some(current) => {
if compare_values(value, current) == Some(Ordering::Greater) {
Some(value.clone())
} else {
self.max.clone()
}
}
};
if let Some(ref mut bloom) = self.bloom_builder {
bloom.add(value);
}
}
pub fn build(self) -> ZoneMapEntry {
let bloom_filter = self.bloom_builder.map(|b| b.build());
ZoneMapEntry {
min: self.min,
max: self.max,
null_count: self.null_count,
row_count: self.row_count,
bloom_filter,
}
}
}
impl Default for ZoneMapBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct ZoneMapIndex {
entries: HashMap<u64, ZoneMapEntry>,
property: String,
}
impl ZoneMapIndex {
pub fn new(property: impl Into<String>) -> Self {
Self {
entries: HashMap::new(),
property: property.into(),
}
}
pub fn property(&self) -> &str {
&self.property
}
pub fn insert(&mut self, chunk_id: u64, entry: ZoneMapEntry) {
self.entries.insert(chunk_id, entry);
}
pub fn get(&self, chunk_id: u64) -> Option<&ZoneMapEntry> {
self.entries.get(&chunk_id)
}
pub fn remove(&mut self, chunk_id: u64) -> Option<ZoneMapEntry> {
self.entries.remove(&chunk_id)
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn filter_equal<'a>(
&'a self,
value: &'a Value,
chunk_ids: impl Iterator<Item = u64> + 'a,
) -> impl Iterator<Item = u64> + 'a {
chunk_ids.filter(move |&id| {
self.entries
.get(&id)
.map_or(true, |e| e.might_contain_equal(value)) })
}
pub fn filter_range<'a>(
&'a self,
lower: Option<&'a Value>,
upper: Option<&'a Value>,
lower_inclusive: bool,
upper_inclusive: bool,
chunk_ids: impl Iterator<Item = u64> + 'a,
) -> impl Iterator<Item = u64> + 'a {
chunk_ids.filter(move |&id| {
self.entries.get(&id).map_or(true, |e| {
e.might_contain_range(lower, upper, lower_inclusive, upper_inclusive)
})
})
}
pub fn chunks_ordered_by_min(&self) -> Vec<u64> {
let mut chunks: Vec<_> = self.entries.iter().collect();
chunks.sort_by(|(_, a), (_, b)| match (&a.min, &b.min) {
(Some(a_min), Some(b_min)) => compare_values(a_min, b_min).unwrap_or(Ordering::Equal),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => Ordering::Equal,
});
chunks.into_iter().map(|(&id, _)| id).collect()
}
pub fn overall_stats(&self) -> (Option<Value>, Option<Value>, u64, u64) {
let mut min: Option<Value> = None;
let mut max: Option<Value> = None;
let mut null_count = 0u64;
let mut row_count = 0u64;
for entry in self.entries.values() {
null_count += entry.null_count;
row_count += entry.row_count;
if let Some(ref entry_min) = entry.min {
min = match min {
None => Some(entry_min.clone()),
Some(ref current) => {
if compare_values(entry_min, current) == Some(Ordering::Less) {
Some(entry_min.clone())
} else {
min
}
}
};
}
if let Some(ref entry_max) = entry.max {
max = match max {
None => Some(entry_max.clone()),
Some(ref current) => {
if compare_values(entry_max, current) == Some(Ordering::Greater) {
Some(entry_max.clone())
} else {
max
}
}
};
}
}
(min, max, null_count, row_count)
}
}
#[derive(Debug, Clone)]
pub struct BloomFilter {
bits: Vec<u64>,
num_hashes: usize,
num_bits: usize,
}
impl BloomFilter {
pub fn new(num_bits: usize, num_hashes: usize) -> Self {
let num_words = (num_bits + 63) / 64;
Self {
bits: vec![0; num_words],
num_hashes,
num_bits,
}
}
pub fn add(&mut self, value: &Value) {
let hashes = self.compute_hashes(value);
for h in hashes {
let bit_idx = h % self.num_bits;
let word_idx = bit_idx / 64;
let bit_pos = bit_idx % 64;
self.bits[word_idx] |= 1 << bit_pos;
}
}
pub fn might_contain(&self, value: &Value) -> bool {
let hashes = self.compute_hashes(value);
for h in hashes {
let bit_idx = h % self.num_bits;
let word_idx = bit_idx / 64;
let bit_pos = bit_idx % 64;
if (self.bits[word_idx] & (1 << bit_pos)) == 0 {
return false;
}
}
true
}
fn compute_hashes(&self, value: &Value) -> Vec<usize> {
let base_hash = value_hash(value);
let mut hashes = Vec::with_capacity(self.num_hashes);
for i in 0..self.num_hashes {
let h1 = base_hash;
let h2 = base_hash.rotate_left(17);
hashes.push((h1.wrapping_add((i as u64).wrapping_mul(h2))) as usize);
}
hashes
}
}
pub struct BloomFilterBuilder {
filter: BloomFilter,
}
impl BloomFilterBuilder {
pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
let num_bits = optimal_num_bits(expected_items, false_positive_rate);
let num_hashes = optimal_num_hashes(num_bits, expected_items);
Self {
filter: BloomFilter::new(num_bits, num_hashes),
}
}
pub fn add(&mut self, value: &Value) {
self.filter.add(value);
}
pub fn build(self) -> BloomFilter {
self.filter
}
}
fn optimal_num_bits(n: usize, p: f64) -> usize {
let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
((-(n as f64) * p.ln()) / ln2_squared).ceil() as usize
}
fn optimal_num_hashes(m: usize, n: usize) -> usize {
((m as f64 / n as f64) * std::f64::consts::LN_2).ceil() as usize
}
fn value_hash(value: &Value) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
match value {
Value::Null => 0u64.hash(&mut hasher),
Value::Bool(b) => b.hash(&mut hasher),
Value::Int64(i) => i.hash(&mut hasher),
Value::Float64(f) => f.to_bits().hash(&mut hasher),
Value::String(s) => s.hash(&mut hasher),
Value::Bytes(b) => b.hash(&mut hasher),
_ => format!("{value:?}").hash(&mut hasher),
}
hasher.finish()
}
fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
match (a, b) {
(Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
(Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
(Value::String(a), Value::String(b)) => Some(a.cmp(b)),
(Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
(Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
(Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
(Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
(Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
(Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zone_map_entry_equal() {
let entry = ZoneMapEntry::with_min_max(Value::Int64(10), Value::Int64(100), 0, 100);
assert!(entry.might_contain_equal(&Value::Int64(50)));
assert!(entry.might_contain_equal(&Value::Int64(10)));
assert!(entry.might_contain_equal(&Value::Int64(100)));
assert!(!entry.might_contain_equal(&Value::Int64(5)));
assert!(!entry.might_contain_equal(&Value::Int64(105)));
}
#[test]
fn test_zone_map_entry_range() {
let entry = ZoneMapEntry::with_min_max(Value::Int64(10), Value::Int64(100), 0, 100);
assert!(entry.might_contain_range(
Some(&Value::Int64(0)),
Some(&Value::Int64(200)),
true,
true
));
assert!(entry.might_contain_range(
Some(&Value::Int64(50)),
Some(&Value::Int64(150)),
true,
true
));
assert!(!entry.might_contain_range(
Some(&Value::Int64(101)),
Some(&Value::Int64(200)),
true,
true
));
}
#[test]
fn test_zone_map_builder() {
let mut builder = ZoneMapBuilder::new();
for i in 1..=100 {
builder.add(&Value::Int64(i));
}
builder.add(&Value::Null);
builder.add(&Value::Null);
let entry = builder.build();
assert_eq!(entry.min, Some(Value::Int64(1)));
assert_eq!(entry.max, Some(Value::Int64(100)));
assert_eq!(entry.null_count, 2);
assert_eq!(entry.row_count, 102);
}
#[test]
fn test_zone_map_with_bloom() {
let mut builder = ZoneMapBuilder::with_bloom_filter(100, 0.01);
for i in 1..=100 {
builder.add(&Value::Int64(i));
}
let entry = builder.build();
assert!(entry.bloom_filter.is_some());
assert!(entry.might_contain_equal(&Value::Int64(50)));
}
#[test]
fn test_zone_map_index() {
let mut index = ZoneMapIndex::new("age");
index.insert(
0,
ZoneMapEntry::with_min_max(Value::Int64(0), Value::Int64(30), 0, 100),
);
index.insert(
1,
ZoneMapEntry::with_min_max(Value::Int64(31), Value::Int64(60), 0, 100),
);
index.insert(
2,
ZoneMapEntry::with_min_max(Value::Int64(61), Value::Int64(100), 0, 100),
);
let matching: Vec<_> = index.filter_equal(&Value::Int64(25), 0..3).collect();
assert_eq!(matching, vec![0]);
let matching: Vec<_> = index.filter_equal(&Value::Int64(75), 0..3).collect();
assert_eq!(matching, vec![2]);
let matching: Vec<_> = index
.filter_range(
Some(&Value::Int64(25)),
Some(&Value::Int64(65)),
true,
true,
0..3,
)
.collect();
assert_eq!(matching, vec![0, 1, 2]);
}
#[test]
fn test_bloom_filter() {
let mut filter = BloomFilter::new(1000, 7);
for i in 0..100 {
filter.add(&Value::Int64(i));
}
for i in 0..100 {
assert!(filter.might_contain(&Value::Int64(i)));
}
let _ = filter.might_contain(&Value::Int64(1000));
}
#[test]
fn test_zone_map_nulls() {
let entry = ZoneMapEntry {
min: None,
max: None,
null_count: 10,
row_count: 10,
bloom_filter: None,
};
assert!(entry.is_all_null());
assert!(!entry.might_contain_non_null());
assert!(entry.might_contain_equal(&Value::Null));
assert!(!entry.might_contain_equal(&Value::Int64(5)));
}
#[test]
fn test_zone_map_date_range() {
use grafeo_common::types::Date;
let min_date = Date::from_ymd(2024, 1, 1).unwrap();
let max_date = Date::from_ymd(2024, 12, 31).unwrap();
let entry =
ZoneMapEntry::with_min_max(Value::Date(min_date), Value::Date(max_date), 0, 365);
let mid = Date::from_ymd(2024, 6, 15).unwrap();
assert!(entry.might_contain_equal(&Value::Date(mid)));
assert!(entry.might_contain_equal(&Value::Date(min_date)));
assert!(entry.might_contain_equal(&Value::Date(max_date)));
let before = Date::from_ymd(2023, 12, 31).unwrap();
let after = Date::from_ymd(2025, 1, 1).unwrap();
assert!(!entry.might_contain_equal(&Value::Date(before)));
assert!(!entry.might_contain_equal(&Value::Date(after)));
let range_lo = Date::from_ymd(2024, 3, 1).unwrap();
let range_hi = Date::from_ymd(2024, 9, 30).unwrap();
assert!(entry.might_contain_range(
Some(&Value::Date(range_lo)),
Some(&Value::Date(range_hi)),
true,
true,
));
let early_lo = Date::from_ymd(2022, 1, 1).unwrap();
let early_hi = Date::from_ymd(2023, 6, 30).unwrap();
assert!(!entry.might_contain_range(
Some(&Value::Date(early_lo)),
Some(&Value::Date(early_hi)),
true,
true,
));
let late_lo = Date::from_ymd(2025, 2, 1).unwrap();
let late_hi = Date::from_ymd(2025, 12, 31).unwrap();
assert!(!entry.might_contain_range(
Some(&Value::Date(late_lo)),
Some(&Value::Date(late_hi)),
true,
true,
));
let mut builder = ZoneMapBuilder::without_bloom_filter();
let dates = [
Date::from_ymd(2024, 3, 10).unwrap(),
Date::from_ymd(2024, 7, 4).unwrap(),
Date::from_ymd(2024, 1, 1).unwrap(),
Date::from_ymd(2024, 11, 25).unwrap(),
];
for d in &dates {
builder.add(&Value::Date(*d));
}
let built = builder.build();
assert_eq!(
built.min,
Some(Value::Date(Date::from_ymd(2024, 1, 1).unwrap()))
);
assert_eq!(
built.max,
Some(Value::Date(Date::from_ymd(2024, 11, 25).unwrap()))
);
assert_eq!(built.row_count, 4);
}
#[test]
fn test_zone_map_timestamp_range() {
use grafeo_common::types::Timestamp;
let min_ts = Timestamp::from_secs(1_704_067_200); let max_ts = Timestamp::from_secs(1_735_689_599); let entry =
ZoneMapEntry::with_min_max(Value::Timestamp(min_ts), Value::Timestamp(max_ts), 0, 1000);
let mid = Timestamp::from_secs(1_719_792_000); assert!(entry.might_contain_equal(&Value::Timestamp(mid)));
assert!(entry.might_contain_equal(&Value::Timestamp(min_ts)));
assert!(entry.might_contain_equal(&Value::Timestamp(max_ts)));
let before = Timestamp::from_secs(1_704_067_199); let after = Timestamp::from_secs(1_735_689_600); assert!(!entry.might_contain_equal(&Value::Timestamp(before)));
assert!(!entry.might_contain_equal(&Value::Timestamp(after)));
assert!(entry.might_contain_less_than(&Value::Timestamp(max_ts), true));
assert!(!entry.might_contain_less_than(&Value::Timestamp(before), false));
assert!(entry.might_contain_greater_than(&Value::Timestamp(min_ts), true));
assert!(!entry.might_contain_greater_than(&Value::Timestamp(after), false));
let mut builder = ZoneMapBuilder::without_bloom_filter();
builder.add(&Value::Timestamp(Timestamp::from_secs(1_710_000_000)));
builder.add(&Value::Timestamp(Timestamp::from_secs(1_720_000_000)));
builder.add(&Value::Timestamp(Timestamp::from_secs(1_705_000_000)));
builder.add(&Value::Null);
let built = builder.build();
assert_eq!(
built.min,
Some(Value::Timestamp(Timestamp::from_secs(1_705_000_000)))
);
assert_eq!(
built.max,
Some(Value::Timestamp(Timestamp::from_secs(1_720_000_000)))
);
assert_eq!(built.null_count, 1);
assert_eq!(built.row_count, 4);
}
#[test]
fn test_zone_map_float64_range() {
let entry = ZoneMapEntry::with_min_max(Value::Float64(1.5), Value::Float64(99.9), 0, 500);
assert!(entry.might_contain_equal(&Value::Float64(50.0)));
assert!(entry.might_contain_equal(&Value::Float64(1.5)));
assert!(entry.might_contain_equal(&Value::Float64(99.9)));
assert!(!entry.might_contain_equal(&Value::Float64(1.0)));
assert!(!entry.might_contain_equal(&Value::Float64(100.0)));
assert!(entry.might_contain_range(
Some(&Value::Float64(10.0)),
Some(&Value::Float64(90.0)),
true,
true,
));
assert!(!entry.might_contain_range(
Some(&Value::Float64(-100.0)),
Some(&Value::Float64(1.0)),
true,
true,
));
assert!(!entry.might_contain_range(
Some(&Value::Float64(100.0)),
Some(&Value::Float64(200.0)),
true,
true,
));
assert!(entry.might_contain_less_than(&Value::Float64(50.0), false));
assert!(!entry.might_contain_less_than(&Value::Float64(1.0), false));
assert!(entry.might_contain_less_than(&Value::Float64(1.5), true));
assert!(!entry.might_contain_less_than(&Value::Float64(1.5), false));
assert!(entry.might_contain_greater_than(&Value::Float64(50.0), false));
assert!(!entry.might_contain_greater_than(&Value::Float64(100.0), false));
assert!(entry.might_contain_greater_than(&Value::Float64(99.9), true));
assert!(!entry.might_contain_greater_than(&Value::Float64(99.9), false));
let mut builder = ZoneMapBuilder::without_bloom_filter();
let values = [3.15, 2.72, 1.414, 1.618, 42.0];
for v in &values {
builder.add(&Value::Float64(*v));
}
let built = builder.build();
assert_eq!(built.min, Some(Value::Float64(1.414)));
assert_eq!(built.max, Some(Value::Float64(42.0)));
assert_eq!(built.row_count, 5);
assert_eq!(built.null_count, 0);
}
}