Skip to main content

grafeo_core/index/
zone_map.rs

1//! Zone maps for intelligent data skipping.
2//!
3//! Each chunk of property data tracks its min, max, and null count. When filtering
4//! with a predicate like `age < 30`, we check the zone map first - if the minimum
5//! age in a chunk is 50, we skip the entire chunk without reading a single value.
6//!
7//! This is huge for large scans. Combined with columnar storage, you often skip
8//! 90%+ of the data for selective predicates.
9
10use grafeo_common::types::Value;
11use std::cmp::Ordering;
12use std::collections::HashMap;
13
14/// Statistics for a single chunk of property data.
15///
16/// The query optimizer uses these to skip chunks that can't match a predicate.
17/// For example, if `max < 30` and the predicate is `value > 50`, skip the chunk.
18#[derive(Debug, Clone)]
19pub struct ZoneMapEntry {
20    /// Minimum value in the chunk (None if all nulls).
21    pub min: Option<Value>,
22    /// Maximum value in the chunk (None if all nulls).
23    pub max: Option<Value>,
24    /// Number of null values in the chunk.
25    pub null_count: u64,
26    /// Total number of values in the chunk.
27    pub row_count: u64,
28    /// Optional Bloom filter for equality checks.
29    pub bloom_filter: Option<BloomFilter>,
30}
31
32impl ZoneMapEntry {
33    /// Creates a new empty zone map entry.
34    pub fn new() -> Self {
35        Self {
36            min: None,
37            max: None,
38            null_count: 0,
39            row_count: 0,
40            bloom_filter: None,
41        }
42    }
43
44    /// Creates a zone map entry with min/max.
45    pub fn with_min_max(min: Value, max: Value, null_count: u64, row_count: u64) -> Self {
46        Self {
47            min: Some(min),
48            max: Some(max),
49            null_count,
50            row_count,
51            bloom_filter: None,
52        }
53    }
54
55    /// Sets the Bloom filter.
56    pub fn with_bloom_filter(mut self, filter: BloomFilter) -> Self {
57        self.bloom_filter = Some(filter);
58        self
59    }
60
61    /// Checks if this chunk might contain values matching an equality predicate.
62    ///
63    /// Returns `true` if the chunk might contain matches, `false` if it definitely doesn't.
64    pub fn might_contain_equal(&self, value: &Value) -> bool {
65        // If value is null, check if there are nulls
66        if matches!(value, Value::Null) {
67            return self.null_count > 0;
68        }
69
70        // If the chunk is all nulls, it can't contain non-null values
71        if self.is_all_null() {
72            return false;
73        }
74
75        // Check Bloom filter first if available
76        if let Some(ref bloom) = self.bloom_filter
77            && !bloom.might_contain(value)
78        {
79            return false;
80        }
81
82        // Check min/max bounds
83        match (&self.min, &self.max) {
84            (Some(min), Some(max)) => {
85                let cmp_min = compare_values(value, min);
86                let cmp_max = compare_values(value, max);
87
88                // Value must be >= min and <= max
89                match (cmp_min, cmp_max) {
90                    (Some(Ordering::Less), _) => false,    // value < min
91                    (_, Some(Ordering::Greater)) => false, // value > max
92                    _ => true,
93                }
94            }
95            // No bounds but might have non-null values - can't skip
96            _ => self.might_contain_non_null(),
97        }
98    }
99
100    /// Checks if this chunk might contain values matching a less-than predicate.
101    ///
102    /// Returns `true` if the chunk might contain matches, `false` if it definitely doesn't.
103    pub fn might_contain_less_than(&self, value: &Value, inclusive: bool) -> bool {
104        match &self.min {
105            Some(min) => {
106                let cmp = compare_values(min, value);
107                match cmp {
108                    Some(Ordering::Less) => true,
109                    Some(Ordering::Equal) => inclusive,
110                    Some(Ordering::Greater) => false,
111                    None => true,
112                }
113            }
114            None => self.null_count > 0, // Only nulls, which don't satisfy < predicate
115        }
116    }
117
118    /// Checks if this chunk might contain values matching a greater-than predicate.
119    ///
120    /// Returns `true` if the chunk might contain matches, `false` if it definitely doesn't.
121    pub fn might_contain_greater_than(&self, value: &Value, inclusive: bool) -> bool {
122        match &self.max {
123            Some(max) => {
124                let cmp = compare_values(max, value);
125                match cmp {
126                    Some(Ordering::Greater) => true,
127                    Some(Ordering::Equal) => inclusive,
128                    Some(Ordering::Less) => false,
129                    None => true,
130                }
131            }
132            None => self.null_count > 0,
133        }
134    }
135
136    /// Checks if this chunk might contain values in a range.
137    pub fn might_contain_range(
138        &self,
139        lower: Option<&Value>,
140        upper: Option<&Value>,
141        lower_inclusive: bool,
142        upper_inclusive: bool,
143    ) -> bool {
144        // Check lower bound
145        if let Some(lower_val) = lower
146            && !self.might_contain_greater_than(lower_val, lower_inclusive)
147        {
148            return false;
149        }
150
151        // Check upper bound
152        if let Some(upper_val) = upper
153            && !self.might_contain_less_than(upper_val, upper_inclusive)
154        {
155            return false;
156        }
157
158        true
159    }
160
161    /// Checks if this chunk might contain non-null values.
162    pub fn might_contain_non_null(&self) -> bool {
163        self.row_count > self.null_count
164    }
165
166    /// Checks if this chunk contains only null values.
167    pub fn is_all_null(&self) -> bool {
168        self.row_count > 0 && self.null_count == self.row_count
169    }
170
171    /// Returns the null fraction.
172    pub fn null_fraction(&self) -> f64 {
173        if self.row_count == 0 {
174            0.0
175        } else {
176            self.null_count as f64 / self.row_count as f64
177        }
178    }
179}
180
181impl Default for ZoneMapEntry {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187/// Incrementally builds a zone map entry as you add values.
188///
189/// Feed it values one at a time; it tracks min/max/nulls automatically.
190///
191/// By default, Bloom filters are enabled for efficient equality checks.
192/// Use [`without_bloom_filter`](Self::without_bloom_filter) if you don't need them.
193pub struct ZoneMapBuilder {
194    min: Option<Value>,
195    max: Option<Value>,
196    null_count: u64,
197    row_count: u64,
198    bloom_builder: Option<BloomFilterBuilder>,
199}
200
201/// Default expected items for Bloom filter in a chunk.
202const DEFAULT_BLOOM_EXPECTED_ITEMS: usize = 2048;
203
204/// Default false positive rate for Bloom filter (1%).
205const DEFAULT_BLOOM_FALSE_POSITIVE_RATE: f64 = 0.01;
206
207impl ZoneMapBuilder {
208    /// Creates a new zone map builder with Bloom filter enabled by default.
209    ///
210    /// Uses reasonable defaults:
211    /// - Expected items: 2048 (typical chunk size)
212    /// - False positive rate: 1%
213    pub fn new() -> Self {
214        Self {
215            min: None,
216            max: None,
217            null_count: 0,
218            row_count: 0,
219            bloom_builder: Some(BloomFilterBuilder::new(
220                DEFAULT_BLOOM_EXPECTED_ITEMS,
221                DEFAULT_BLOOM_FALSE_POSITIVE_RATE,
222            )),
223        }
224    }
225
226    /// Creates a builder without Bloom filter support.
227    ///
228    /// Use this when you know you won't need equality checks, or when
229    /// memory is constrained and the Bloom filter overhead isn't worth it.
230    pub fn without_bloom_filter() -> Self {
231        Self {
232            min: None,
233            max: None,
234            null_count: 0,
235            row_count: 0,
236            bloom_builder: None,
237        }
238    }
239
240    /// Creates a builder with custom Bloom filter settings.
241    ///
242    /// # Arguments
243    ///
244    /// * `expected_items` - Expected number of items in the chunk
245    /// * `false_positive_rate` - Desired false positive rate (e.g., 0.01 for 1%)
246    pub fn with_bloom_filter(expected_items: usize, false_positive_rate: f64) -> Self {
247        Self {
248            min: None,
249            max: None,
250            null_count: 0,
251            row_count: 0,
252            bloom_builder: Some(BloomFilterBuilder::new(expected_items, false_positive_rate)),
253        }
254    }
255
256    /// Adds a value to the zone map.
257    pub fn add(&mut self, value: &Value) {
258        self.row_count += 1;
259
260        if matches!(value, Value::Null) {
261            self.null_count += 1;
262            return;
263        }
264
265        // Update min
266        self.min = match &self.min {
267            None => Some(value.clone()),
268            Some(current) => {
269                if compare_values(value, current) == Some(Ordering::Less) {
270                    Some(value.clone())
271                } else {
272                    self.min.clone()
273                }
274            }
275        };
276
277        // Update max
278        self.max = match &self.max {
279            None => Some(value.clone()),
280            Some(current) => {
281                if compare_values(value, current) == Some(Ordering::Greater) {
282                    Some(value.clone())
283                } else {
284                    self.max.clone()
285                }
286            }
287        };
288
289        // Add to Bloom filter
290        if let Some(ref mut bloom) = self.bloom_builder {
291            bloom.add(value);
292        }
293    }
294
295    /// Builds the zone map entry.
296    pub fn build(self) -> ZoneMapEntry {
297        let bloom_filter = self.bloom_builder.map(|b| b.build());
298
299        ZoneMapEntry {
300            min: self.min,
301            max: self.max,
302            null_count: self.null_count,
303            row_count: self.row_count,
304            bloom_filter,
305        }
306    }
307}
308
309impl Default for ZoneMapBuilder {
310    fn default() -> Self {
311        Self::new()
312    }
313}
314
315/// Collection of zone maps for all chunks of a property column.
316///
317/// Use [`filter_equal`](Self::filter_equal) or [`filter_range`](Self::filter_range)
318/// to get chunk IDs that might contain matching values.
319pub struct ZoneMapIndex {
320    /// Zone map entries per chunk.
321    entries: HashMap<u64, ZoneMapEntry>,
322    /// Property name.
323    property: String,
324}
325
326impl ZoneMapIndex {
327    /// Creates a new zone map index for a property.
328    pub fn new(property: impl Into<String>) -> Self {
329        Self {
330            entries: HashMap::new(),
331            property: property.into(),
332        }
333    }
334
335    /// Returns the property name.
336    pub fn property(&self) -> &str {
337        &self.property
338    }
339
340    /// Adds or updates a zone map entry for a chunk.
341    pub fn insert(&mut self, chunk_id: u64, entry: ZoneMapEntry) {
342        self.entries.insert(chunk_id, entry);
343    }
344
345    /// Gets the zone map entry for a chunk.
346    pub fn get(&self, chunk_id: u64) -> Option<&ZoneMapEntry> {
347        self.entries.get(&chunk_id)
348    }
349
350    /// Removes the zone map entry for a chunk.
351    pub fn remove(&mut self, chunk_id: u64) -> Option<ZoneMapEntry> {
352        self.entries.remove(&chunk_id)
353    }
354
355    /// Returns the number of chunks with zone maps.
356    pub fn len(&self) -> usize {
357        self.entries.len()
358    }
359
360    /// Returns true if there are no zone maps.
361    pub fn is_empty(&self) -> bool {
362        self.entries.is_empty()
363    }
364
365    /// Filters chunk IDs that might match an equality predicate.
366    pub fn filter_equal<'a>(
367        &'a self,
368        value: &'a Value,
369        chunk_ids: impl Iterator<Item = u64> + 'a,
370    ) -> impl Iterator<Item = u64> + 'a {
371        chunk_ids.filter(move |&id| {
372            self.entries
373                .get(&id)
374                .map_or(true, |e| e.might_contain_equal(value)) // No zone map = assume might contain
375        })
376    }
377
378    /// Filters chunk IDs that might match a range predicate.
379    pub fn filter_range<'a>(
380        &'a self,
381        lower: Option<&'a Value>,
382        upper: Option<&'a Value>,
383        lower_inclusive: bool,
384        upper_inclusive: bool,
385        chunk_ids: impl Iterator<Item = u64> + 'a,
386    ) -> impl Iterator<Item = u64> + 'a {
387        chunk_ids.filter(move |&id| {
388            self.entries.get(&id).map_or(true, |e| {
389                e.might_contain_range(lower, upper, lower_inclusive, upper_inclusive)
390            })
391        })
392    }
393
394    /// Returns chunk IDs sorted by their minimum value.
395    pub fn chunks_ordered_by_min(&self) -> Vec<u64> {
396        let mut chunks: Vec<_> = self.entries.iter().collect();
397        chunks.sort_by(|(_, a), (_, b)| match (&a.min, &b.min) {
398            (Some(a_min), Some(b_min)) => compare_values(a_min, b_min).unwrap_or(Ordering::Equal),
399            (Some(_), None) => Ordering::Less,
400            (None, Some(_)) => Ordering::Greater,
401            (None, None) => Ordering::Equal,
402        });
403        chunks.into_iter().map(|(&id, _)| id).collect()
404    }
405
406    /// Returns overall statistics across all chunks.
407    pub fn overall_stats(&self) -> (Option<Value>, Option<Value>, u64, u64) {
408        let mut min: Option<Value> = None;
409        let mut max: Option<Value> = None;
410        let mut null_count = 0u64;
411        let mut row_count = 0u64;
412
413        for entry in self.entries.values() {
414            null_count += entry.null_count;
415            row_count += entry.row_count;
416
417            if let Some(ref entry_min) = entry.min {
418                min = match min {
419                    None => Some(entry_min.clone()),
420                    Some(ref current) => {
421                        if compare_values(entry_min, current) == Some(Ordering::Less) {
422                            Some(entry_min.clone())
423                        } else {
424                            min
425                        }
426                    }
427                };
428            }
429
430            if let Some(ref entry_max) = entry.max {
431                max = match max {
432                    None => Some(entry_max.clone()),
433                    Some(ref current) => {
434                        if compare_values(entry_max, current) == Some(Ordering::Greater) {
435                            Some(entry_max.clone())
436                        } else {
437                            max
438                        }
439                    }
440                };
441            }
442        }
443
444        (min, max, null_count, row_count)
445    }
446}
447
448/// A probabilistic data structure for fast "definitely not in set" checks.
449///
450/// Bloom filters can have false positives (says "maybe" when absent) but never
451/// false negatives (if it says "no", the value is definitely absent). Use this
452/// to quickly rule out chunks that can't contain a value.
453#[derive(Debug, Clone)]
454pub struct BloomFilter {
455    /// Bit array.
456    bits: Vec<u64>,
457    /// Number of hash functions.
458    num_hashes: usize,
459    /// Number of bits.
460    num_bits: usize,
461}
462
463impl BloomFilter {
464    /// Creates a new Bloom filter.
465    pub fn new(num_bits: usize, num_hashes: usize) -> Self {
466        let num_words = (num_bits + 63) / 64;
467        Self {
468            bits: vec![0; num_words],
469            num_hashes,
470            num_bits,
471        }
472    }
473
474    /// Adds a value to the filter.
475    pub fn add(&mut self, value: &Value) {
476        let hashes = self.compute_hashes(value);
477        for h in hashes {
478            let bit_idx = h % self.num_bits;
479            let word_idx = bit_idx / 64;
480            let bit_pos = bit_idx % 64;
481            self.bits[word_idx] |= 1 << bit_pos;
482        }
483    }
484
485    /// Checks if the filter might contain the value.
486    pub fn might_contain(&self, value: &Value) -> bool {
487        let hashes = self.compute_hashes(value);
488        for h in hashes {
489            let bit_idx = h % self.num_bits;
490            let word_idx = bit_idx / 64;
491            let bit_pos = bit_idx % 64;
492            if (self.bits[word_idx] & (1 << bit_pos)) == 0 {
493                return false;
494            }
495        }
496        true
497    }
498
499    fn compute_hashes(&self, value: &Value) -> Vec<usize> {
500        // Use a simple hash combination scheme
501        let base_hash = value_hash(value);
502        let mut hashes = Vec::with_capacity(self.num_hashes);
503
504        for i in 0..self.num_hashes {
505            // Double hashing: h(i) = h1 + i * h2
506            let h1 = base_hash;
507            let h2 = base_hash.rotate_left(17);
508            hashes.push((h1.wrapping_add((i as u64).wrapping_mul(h2))) as usize);
509        }
510
511        hashes
512    }
513}
514
515/// Builder for Bloom filters.
516pub struct BloomFilterBuilder {
517    filter: BloomFilter,
518}
519
520impl BloomFilterBuilder {
521    /// Creates a new Bloom filter builder.
522    pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
523        // Calculate optimal number of bits and hashes
524        let num_bits = optimal_num_bits(expected_items, false_positive_rate);
525        let num_hashes = optimal_num_hashes(num_bits, expected_items);
526
527        Self {
528            filter: BloomFilter::new(num_bits, num_hashes),
529        }
530    }
531
532    /// Adds a value to the filter.
533    pub fn add(&mut self, value: &Value) {
534        self.filter.add(value);
535    }
536
537    /// Builds the Bloom filter.
538    pub fn build(self) -> BloomFilter {
539        self.filter
540    }
541}
542
543/// Calculates optimal number of bits for a Bloom filter.
544fn optimal_num_bits(n: usize, p: f64) -> usize {
545    let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
546    ((-(n as f64) * p.ln()) / ln2_squared).ceil() as usize
547}
548
549/// Calculates optimal number of hash functions for a Bloom filter.
550fn optimal_num_hashes(m: usize, n: usize) -> usize {
551    ((m as f64 / n as f64) * std::f64::consts::LN_2).ceil() as usize
552}
553
554/// Computes a hash for a value.
555fn value_hash(value: &Value) -> u64 {
556    use std::collections::hash_map::DefaultHasher;
557    use std::hash::{Hash, Hasher};
558
559    let mut hasher = DefaultHasher::new();
560
561    match value {
562        Value::Null => 0u64.hash(&mut hasher),
563        Value::Bool(b) => b.hash(&mut hasher),
564        Value::Int64(i) => i.hash(&mut hasher),
565        Value::Float64(f) => f.to_bits().hash(&mut hasher),
566        Value::String(s) => s.hash(&mut hasher),
567        Value::Bytes(b) => b.hash(&mut hasher),
568        _ => format!("{value:?}").hash(&mut hasher),
569    }
570
571    hasher.finish()
572}
573
574/// Compares two values.
575fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
576    match (a, b) {
577        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
578        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
579        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
580        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
581        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
582        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
583        (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
584        (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
585        (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
586        _ => None,
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593
594    #[test]
595    fn test_zone_map_entry_equal() {
596        let entry = ZoneMapEntry::with_min_max(Value::Int64(10), Value::Int64(100), 0, 100);
597
598        assert!(entry.might_contain_equal(&Value::Int64(50)));
599        assert!(entry.might_contain_equal(&Value::Int64(10)));
600        assert!(entry.might_contain_equal(&Value::Int64(100)));
601        assert!(!entry.might_contain_equal(&Value::Int64(5)));
602        assert!(!entry.might_contain_equal(&Value::Int64(105)));
603    }
604
605    #[test]
606    fn test_zone_map_entry_range() {
607        let entry = ZoneMapEntry::with_min_max(Value::Int64(10), Value::Int64(100), 0, 100);
608
609        // Range fully contains chunk
610        assert!(entry.might_contain_range(
611            Some(&Value::Int64(0)),
612            Some(&Value::Int64(200)),
613            true,
614            true
615        ));
616
617        // Range overlaps
618        assert!(entry.might_contain_range(
619            Some(&Value::Int64(50)),
620            Some(&Value::Int64(150)),
621            true,
622            true
623        ));
624
625        // Range doesn't overlap
626        assert!(!entry.might_contain_range(
627            Some(&Value::Int64(101)),
628            Some(&Value::Int64(200)),
629            true,
630            true
631        ));
632    }
633
634    #[test]
635    fn test_zone_map_builder() {
636        let mut builder = ZoneMapBuilder::new();
637
638        for i in 1..=100 {
639            builder.add(&Value::Int64(i));
640        }
641        builder.add(&Value::Null);
642        builder.add(&Value::Null);
643
644        let entry = builder.build();
645
646        assert_eq!(entry.min, Some(Value::Int64(1)));
647        assert_eq!(entry.max, Some(Value::Int64(100)));
648        assert_eq!(entry.null_count, 2);
649        assert_eq!(entry.row_count, 102);
650    }
651
652    #[test]
653    fn test_zone_map_with_bloom() {
654        let mut builder = ZoneMapBuilder::with_bloom_filter(100, 0.01);
655
656        for i in 1..=100 {
657            builder.add(&Value::Int64(i));
658        }
659
660        let entry = builder.build();
661
662        assert!(entry.bloom_filter.is_some());
663        assert!(entry.might_contain_equal(&Value::Int64(50)));
664        // Note: Bloom filters may have false positives but not false negatives
665    }
666
667    #[test]
668    fn test_zone_map_index() {
669        let mut index = ZoneMapIndex::new("age");
670
671        index.insert(
672            0,
673            ZoneMapEntry::with_min_max(Value::Int64(0), Value::Int64(30), 0, 100),
674        );
675        index.insert(
676            1,
677            ZoneMapEntry::with_min_max(Value::Int64(31), Value::Int64(60), 0, 100),
678        );
679        index.insert(
680            2,
681            ZoneMapEntry::with_min_max(Value::Int64(61), Value::Int64(100), 0, 100),
682        );
683
684        // Filter for age = 25 - should only return chunk 0
685        let matching: Vec<_> = index.filter_equal(&Value::Int64(25), 0..3).collect();
686        assert_eq!(matching, vec![0]);
687
688        // Filter for age = 75 - should only return chunk 2
689        let matching: Vec<_> = index.filter_equal(&Value::Int64(75), 0..3).collect();
690        assert_eq!(matching, vec![2]);
691
692        // Filter for age between 25 and 65 - should return chunks 0, 1, 2
693        let matching: Vec<_> = index
694            .filter_range(
695                Some(&Value::Int64(25)),
696                Some(&Value::Int64(65)),
697                true,
698                true,
699                0..3,
700            )
701            .collect();
702        assert_eq!(matching, vec![0, 1, 2]);
703    }
704
705    #[test]
706    fn test_bloom_filter() {
707        let mut filter = BloomFilter::new(1000, 7);
708
709        for i in 0..100 {
710            filter.add(&Value::Int64(i));
711        }
712
713        // Should definitely contain values we added
714        for i in 0..100 {
715            assert!(filter.might_contain(&Value::Int64(i)));
716        }
717
718        // May have false positives for values not added, but let's test a few
719        // (we can't assert false because of false positive rate)
720        let _ = filter.might_contain(&Value::Int64(1000));
721    }
722
723    #[test]
724    fn test_zone_map_nulls() {
725        let entry = ZoneMapEntry {
726            min: None,
727            max: None,
728            null_count: 10,
729            row_count: 10,
730            bloom_filter: None,
731        };
732
733        assert!(entry.is_all_null());
734        assert!(!entry.might_contain_non_null());
735        assert!(entry.might_contain_equal(&Value::Null));
736        assert!(!entry.might_contain_equal(&Value::Int64(5)));
737    }
738
739    #[test]
740    fn test_zone_map_date_range() {
741        use grafeo_common::types::Date;
742
743        let min_date = Date::from_ymd(2024, 1, 1).unwrap();
744        let max_date = Date::from_ymd(2024, 12, 31).unwrap();
745        let entry =
746            ZoneMapEntry::with_min_max(Value::Date(min_date), Value::Date(max_date), 0, 365);
747
748        // In-range date: should be a candidate
749        let mid = Date::from_ymd(2024, 6, 15).unwrap();
750        assert!(entry.might_contain_equal(&Value::Date(mid)));
751
752        // Boundary dates: should be candidates
753        assert!(entry.might_contain_equal(&Value::Date(min_date)));
754        assert!(entry.might_contain_equal(&Value::Date(max_date)));
755
756        // Out-of-range dates: must be pruned
757        let before = Date::from_ymd(2023, 12, 31).unwrap();
758        let after = Date::from_ymd(2025, 1, 1).unwrap();
759        assert!(!entry.might_contain_equal(&Value::Date(before)));
760        assert!(!entry.might_contain_equal(&Value::Date(after)));
761
762        // Range predicates
763        let range_lo = Date::from_ymd(2024, 3, 1).unwrap();
764        let range_hi = Date::from_ymd(2024, 9, 30).unwrap();
765        assert!(entry.might_contain_range(
766            Some(&Value::Date(range_lo)),
767            Some(&Value::Date(range_hi)),
768            true,
769            true,
770        ));
771
772        // Non-overlapping range (entirely before chunk)
773        let early_lo = Date::from_ymd(2022, 1, 1).unwrap();
774        let early_hi = Date::from_ymd(2023, 6, 30).unwrap();
775        assert!(!entry.might_contain_range(
776            Some(&Value::Date(early_lo)),
777            Some(&Value::Date(early_hi)),
778            true,
779            true,
780        ));
781
782        // Non-overlapping range (entirely after chunk)
783        let late_lo = Date::from_ymd(2025, 2, 1).unwrap();
784        let late_hi = Date::from_ymd(2025, 12, 31).unwrap();
785        assert!(!entry.might_contain_range(
786            Some(&Value::Date(late_lo)),
787            Some(&Value::Date(late_hi)),
788            true,
789            true,
790        ));
791
792        // Builder round-trip
793        let mut builder = ZoneMapBuilder::without_bloom_filter();
794        let dates = [
795            Date::from_ymd(2024, 3, 10).unwrap(),
796            Date::from_ymd(2024, 7, 4).unwrap(),
797            Date::from_ymd(2024, 1, 1).unwrap(),
798            Date::from_ymd(2024, 11, 25).unwrap(),
799        ];
800        for d in &dates {
801            builder.add(&Value::Date(*d));
802        }
803        let built = builder.build();
804        assert_eq!(
805            built.min,
806            Some(Value::Date(Date::from_ymd(2024, 1, 1).unwrap()))
807        );
808        assert_eq!(
809            built.max,
810            Some(Value::Date(Date::from_ymd(2024, 11, 25).unwrap()))
811        );
812        assert_eq!(built.row_count, 4);
813    }
814
815    #[test]
816    fn test_zone_map_timestamp_range() {
817        use grafeo_common::types::Timestamp;
818
819        // 2024-01-01T00:00:00Z and 2024-12-31T23:59:59Z
820        let min_ts = Timestamp::from_secs(1_704_067_200); // 2024-01-01
821        let max_ts = Timestamp::from_secs(1_735_689_599); // 2024-12-31T23:59:59
822        let entry =
823            ZoneMapEntry::with_min_max(Value::Timestamp(min_ts), Value::Timestamp(max_ts), 0, 1000);
824
825        // Mid-range timestamp: should be a candidate
826        let mid = Timestamp::from_secs(1_719_792_000); // ~2024-07-01
827        assert!(entry.might_contain_equal(&Value::Timestamp(mid)));
828
829        // Boundaries
830        assert!(entry.might_contain_equal(&Value::Timestamp(min_ts)));
831        assert!(entry.might_contain_equal(&Value::Timestamp(max_ts)));
832
833        // Out-of-range timestamps
834        let before = Timestamp::from_secs(1_704_067_199); // 1 second before min
835        let after = Timestamp::from_secs(1_735_689_600); // 1 second after max
836        assert!(!entry.might_contain_equal(&Value::Timestamp(before)));
837        assert!(!entry.might_contain_equal(&Value::Timestamp(after)));
838
839        // Less-than predicate
840        assert!(entry.might_contain_less_than(&Value::Timestamp(max_ts), true));
841        assert!(!entry.might_contain_less_than(&Value::Timestamp(before), false));
842
843        // Greater-than predicate
844        assert!(entry.might_contain_greater_than(&Value::Timestamp(min_ts), true));
845        assert!(!entry.might_contain_greater_than(&Value::Timestamp(after), false));
846
847        // Builder round-trip
848        let mut builder = ZoneMapBuilder::without_bloom_filter();
849        builder.add(&Value::Timestamp(Timestamp::from_secs(1_710_000_000)));
850        builder.add(&Value::Timestamp(Timestamp::from_secs(1_720_000_000)));
851        builder.add(&Value::Timestamp(Timestamp::from_secs(1_705_000_000)));
852        builder.add(&Value::Null);
853        let built = builder.build();
854        assert_eq!(
855            built.min,
856            Some(Value::Timestamp(Timestamp::from_secs(1_705_000_000)))
857        );
858        assert_eq!(
859            built.max,
860            Some(Value::Timestamp(Timestamp::from_secs(1_720_000_000)))
861        );
862        assert_eq!(built.null_count, 1);
863        assert_eq!(built.row_count, 4);
864    }
865
866    #[test]
867    fn test_zone_map_float64_range() {
868        let entry = ZoneMapEntry::with_min_max(Value::Float64(1.5), Value::Float64(99.9), 0, 500);
869
870        // In-range value
871        assert!(entry.might_contain_equal(&Value::Float64(50.0)));
872
873        // Boundaries
874        assert!(entry.might_contain_equal(&Value::Float64(1.5)));
875        assert!(entry.might_contain_equal(&Value::Float64(99.9)));
876
877        // Out-of-range values
878        assert!(!entry.might_contain_equal(&Value::Float64(1.0)));
879        assert!(!entry.might_contain_equal(&Value::Float64(100.0)));
880
881        // Range predicates
882        assert!(entry.might_contain_range(
883            Some(&Value::Float64(10.0)),
884            Some(&Value::Float64(90.0)),
885            true,
886            true,
887        ));
888
889        // Non-overlapping range (below)
890        assert!(!entry.might_contain_range(
891            Some(&Value::Float64(-100.0)),
892            Some(&Value::Float64(1.0)),
893            true,
894            true,
895        ));
896
897        // Non-overlapping range (above)
898        assert!(!entry.might_contain_range(
899            Some(&Value::Float64(100.0)),
900            Some(&Value::Float64(200.0)),
901            true,
902            true,
903        ));
904
905        // Less-than and greater-than predicates
906        assert!(entry.might_contain_less_than(&Value::Float64(50.0), false));
907        assert!(!entry.might_contain_less_than(&Value::Float64(1.0), false));
908        assert!(entry.might_contain_less_than(&Value::Float64(1.5), true));
909        assert!(!entry.might_contain_less_than(&Value::Float64(1.5), false));
910
911        assert!(entry.might_contain_greater_than(&Value::Float64(50.0), false));
912        assert!(!entry.might_contain_greater_than(&Value::Float64(100.0), false));
913        assert!(entry.might_contain_greater_than(&Value::Float64(99.9), true));
914        assert!(!entry.might_contain_greater_than(&Value::Float64(99.9), false));
915
916        // Builder round-trip
917        let mut builder = ZoneMapBuilder::without_bloom_filter();
918        let values = [3.15, 2.72, 1.414, 1.618, 42.0];
919        for v in &values {
920            builder.add(&Value::Float64(*v));
921        }
922        let built = builder.build();
923        assert_eq!(built.min, Some(Value::Float64(1.414)));
924        assert_eq!(built.max, Some(Value::Float64(42.0)));
925        assert_eq!(built.row_count, 5);
926        assert_eq!(built.null_count, 0);
927    }
928}