Skip to main content

grafeo_core/index/
zone_map.rs

1//! Zone maps for intelligent data skipping.
2//!
3//! Each chunk of property data tracks its min, max, and null count. When filtering
4//! with a predicate like `age < 30`, we check the zone map first - if the minimum
5//! age in a chunk is 50, we skip the entire chunk without reading a single value.
6//!
7//! This is huge for large scans. Combined with columnar storage, you often skip
8//! 90%+ of the data for selective predicates.
9
10use grafeo_common::types::Value;
11use std::cmp::Ordering;
12use std::collections::HashMap;
13
14/// Statistics for a single chunk of property data.
15///
16/// The query optimizer uses these to skip chunks that can't match a predicate.
17/// For example, if `max < 30` and the predicate is `value > 50`, skip the chunk.
18#[derive(Debug, Clone)]
19pub struct ZoneMapEntry {
20    /// Minimum value in the chunk (None if all nulls).
21    pub min: Option<Value>,
22    /// Maximum value in the chunk (None if all nulls).
23    pub max: Option<Value>,
24    /// Number of null values in the chunk.
25    pub null_count: u64,
26    /// Total number of values in the chunk.
27    pub row_count: u64,
28    /// Optional Bloom filter for equality checks.
29    pub bloom_filter: Option<BloomFilter>,
30}
31
32impl ZoneMapEntry {
33    /// Creates a new empty zone map entry.
34    pub fn new() -> Self {
35        Self {
36            min: None,
37            max: None,
38            null_count: 0,
39            row_count: 0,
40            bloom_filter: None,
41        }
42    }
43
44    /// Creates a zone map entry with min/max.
45    pub fn with_min_max(min: Value, max: Value, null_count: u64, row_count: u64) -> Self {
46        Self {
47            min: Some(min),
48            max: Some(max),
49            null_count,
50            row_count,
51            bloom_filter: None,
52        }
53    }
54
55    /// Sets the Bloom filter.
56    pub fn with_bloom_filter(mut self, filter: BloomFilter) -> Self {
57        self.bloom_filter = Some(filter);
58        self
59    }
60
61    /// Checks if this chunk might contain values matching an equality predicate.
62    ///
63    /// Returns `true` if the chunk might contain matches, `false` if it definitely doesn't.
64    pub fn might_contain_equal(&self, value: &Value) -> bool {
65        // If value is null, check if there are nulls
66        if matches!(value, Value::Null) {
67            return self.null_count > 0;
68        }
69
70        // If the chunk is all nulls, it can't contain non-null values
71        if self.is_all_null() {
72            return false;
73        }
74
75        // Check Bloom filter first if available
76        if let Some(ref bloom) = self.bloom_filter
77            && !bloom.might_contain(value)
78        {
79            return false;
80        }
81
82        // Check min/max bounds
83        match (&self.min, &self.max) {
84            (Some(min), Some(max)) => {
85                let cmp_min = compare_values(value, min);
86                let cmp_max = compare_values(value, max);
87
88                // Value must be >= min and <= max
89                match (cmp_min, cmp_max) {
90                    (Some(Ordering::Less), _) => false,    // value < min
91                    (_, Some(Ordering::Greater)) => false, // value > max
92                    _ => true,
93                }
94            }
95            // No bounds but might have non-null values - can't skip
96            _ => self.might_contain_non_null(),
97        }
98    }
99
100    /// Checks if this chunk might contain values matching a less-than predicate.
101    ///
102    /// Returns `true` if the chunk might contain matches, `false` if it definitely doesn't.
103    pub fn might_contain_less_than(&self, value: &Value, inclusive: bool) -> bool {
104        match &self.min {
105            Some(min) => {
106                let cmp = compare_values(min, value);
107                match cmp {
108                    Some(Ordering::Less) => true,
109                    Some(Ordering::Equal) => inclusive,
110                    Some(Ordering::Greater) => false,
111                    None => true,
112                }
113            }
114            None => self.null_count > 0, // Only nulls, which don't satisfy < predicate
115        }
116    }
117
118    /// Checks if this chunk might contain values matching a greater-than predicate.
119    ///
120    /// Returns `true` if the chunk might contain matches, `false` if it definitely doesn't.
121    pub fn might_contain_greater_than(&self, value: &Value, inclusive: bool) -> bool {
122        match &self.max {
123            Some(max) => {
124                let cmp = compare_values(max, value);
125                match cmp {
126                    Some(Ordering::Greater) => true,
127                    Some(Ordering::Equal) => inclusive,
128                    Some(Ordering::Less) => false,
129                    None => true,
130                }
131            }
132            None => self.null_count > 0,
133        }
134    }
135
136    /// Checks if this chunk might contain values in a range.
137    pub fn might_contain_range(
138        &self,
139        lower: Option<&Value>,
140        upper: Option<&Value>,
141        lower_inclusive: bool,
142        upper_inclusive: bool,
143    ) -> bool {
144        // Check lower bound
145        if let Some(lower_val) = lower
146            && !self.might_contain_greater_than(lower_val, lower_inclusive)
147        {
148            return false;
149        }
150
151        // Check upper bound
152        if let Some(upper_val) = upper
153            && !self.might_contain_less_than(upper_val, upper_inclusive)
154        {
155            return false;
156        }
157
158        true
159    }
160
161    /// Checks if this chunk might contain non-null values.
162    pub fn might_contain_non_null(&self) -> bool {
163        self.row_count > self.null_count
164    }
165
166    /// Checks if this chunk contains only null values.
167    pub fn is_all_null(&self) -> bool {
168        self.row_count > 0 && self.null_count == self.row_count
169    }
170
171    /// Returns the null fraction.
172    pub fn null_fraction(&self) -> f64 {
173        if self.row_count == 0 {
174            0.0
175        } else {
176            self.null_count as f64 / self.row_count as f64
177        }
178    }
179}
180
181impl Default for ZoneMapEntry {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187/// Incrementally builds a zone map entry as you add values.
188///
189/// Feed it values one at a time; it tracks min/max/nulls automatically.
190///
191/// By default, Bloom filters are enabled for efficient equality checks.
192/// Use [`without_bloom_filter`](Self::without_bloom_filter) if you don't need them.
193pub struct ZoneMapBuilder {
194    min: Option<Value>,
195    max: Option<Value>,
196    null_count: u64,
197    row_count: u64,
198    bloom_builder: Option<BloomFilterBuilder>,
199}
200
201/// Default expected items for Bloom filter in a chunk.
202const DEFAULT_BLOOM_EXPECTED_ITEMS: usize = 2048;
203
204/// Default false positive rate for Bloom filter (1%).
205const DEFAULT_BLOOM_FALSE_POSITIVE_RATE: f64 = 0.01;
206
207impl ZoneMapBuilder {
208    /// Creates a new zone map builder with Bloom filter enabled by default.
209    ///
210    /// Uses reasonable defaults:
211    /// - Expected items: 2048 (typical chunk size)
212    /// - False positive rate: 1%
213    pub fn new() -> Self {
214        Self {
215            min: None,
216            max: None,
217            null_count: 0,
218            row_count: 0,
219            bloom_builder: Some(BloomFilterBuilder::new(
220                DEFAULT_BLOOM_EXPECTED_ITEMS,
221                DEFAULT_BLOOM_FALSE_POSITIVE_RATE,
222            )),
223        }
224    }
225
226    /// Creates a builder without Bloom filter support.
227    ///
228    /// Use this when you know you won't need equality checks, or when
229    /// memory is constrained and the Bloom filter overhead isn't worth it.
230    pub fn without_bloom_filter() -> Self {
231        Self {
232            min: None,
233            max: None,
234            null_count: 0,
235            row_count: 0,
236            bloom_builder: None,
237        }
238    }
239
240    /// Creates a builder with custom Bloom filter settings.
241    ///
242    /// # Arguments
243    ///
244    /// * `expected_items` - Expected number of items in the chunk
245    /// * `false_positive_rate` - Desired false positive rate (e.g., 0.01 for 1%)
246    pub fn with_bloom_filter(expected_items: usize, false_positive_rate: f64) -> Self {
247        Self {
248            min: None,
249            max: None,
250            null_count: 0,
251            row_count: 0,
252            bloom_builder: Some(BloomFilterBuilder::new(expected_items, false_positive_rate)),
253        }
254    }
255
256    /// Adds a value to the zone map.
257    pub fn add(&mut self, value: &Value) {
258        self.row_count += 1;
259
260        if matches!(value, Value::Null) {
261            self.null_count += 1;
262            return;
263        }
264
265        // Update min
266        self.min = match &self.min {
267            None => Some(value.clone()),
268            Some(current) => {
269                if compare_values(value, current) == Some(Ordering::Less) {
270                    Some(value.clone())
271                } else {
272                    self.min.clone()
273                }
274            }
275        };
276
277        // Update max
278        self.max = match &self.max {
279            None => Some(value.clone()),
280            Some(current) => {
281                if compare_values(value, current) == Some(Ordering::Greater) {
282                    Some(value.clone())
283                } else {
284                    self.max.clone()
285                }
286            }
287        };
288
289        // Add to Bloom filter
290        if let Some(ref mut bloom) = self.bloom_builder {
291            bloom.add(value);
292        }
293    }
294
295    /// Builds the zone map entry.
296    pub fn build(self) -> ZoneMapEntry {
297        let bloom_filter = self.bloom_builder.map(|b| b.build());
298
299        ZoneMapEntry {
300            min: self.min,
301            max: self.max,
302            null_count: self.null_count,
303            row_count: self.row_count,
304            bloom_filter,
305        }
306    }
307}
308
309impl Default for ZoneMapBuilder {
310    fn default() -> Self {
311        Self::new()
312    }
313}
314
315/// Collection of zone maps for all chunks of a property column.
316///
317/// Use [`filter_equal`](Self::filter_equal) or [`filter_range`](Self::filter_range)
318/// to get chunk IDs that might contain matching values.
319pub struct ZoneMapIndex {
320    /// Zone map entries per chunk.
321    entries: HashMap<u64, ZoneMapEntry>,
322    /// Property name.
323    property: String,
324}
325
326impl ZoneMapIndex {
327    /// Creates a new zone map index for a property.
328    pub fn new(property: impl Into<String>) -> Self {
329        Self {
330            entries: HashMap::new(),
331            property: property.into(),
332        }
333    }
334
335    /// Returns the property name.
336    pub fn property(&self) -> &str {
337        &self.property
338    }
339
340    /// Adds or updates a zone map entry for a chunk.
341    pub fn insert(&mut self, chunk_id: u64, entry: ZoneMapEntry) {
342        self.entries.insert(chunk_id, entry);
343    }
344
345    /// Gets the zone map entry for a chunk.
346    pub fn get(&self, chunk_id: u64) -> Option<&ZoneMapEntry> {
347        self.entries.get(&chunk_id)
348    }
349
350    /// Removes the zone map entry for a chunk.
351    pub fn remove(&mut self, chunk_id: u64) -> Option<ZoneMapEntry> {
352        self.entries.remove(&chunk_id)
353    }
354
355    /// Returns the number of chunks with zone maps.
356    pub fn len(&self) -> usize {
357        self.entries.len()
358    }
359
360    /// Returns true if there are no zone maps.
361    pub fn is_empty(&self) -> bool {
362        self.entries.is_empty()
363    }
364
365    /// Filters chunk IDs that might match an equality predicate.
366    pub fn filter_equal<'a>(
367        &'a self,
368        value: &'a Value,
369        chunk_ids: impl Iterator<Item = u64> + 'a,
370    ) -> impl Iterator<Item = u64> + 'a {
371        chunk_ids.filter(move |&id| {
372            self.entries
373                .get(&id)
374                .map_or(true, |e| e.might_contain_equal(value)) // No zone map = assume might contain
375        })
376    }
377
378    /// Filters chunk IDs that might match a range predicate.
379    pub fn filter_range<'a>(
380        &'a self,
381        lower: Option<&'a Value>,
382        upper: Option<&'a Value>,
383        lower_inclusive: bool,
384        upper_inclusive: bool,
385        chunk_ids: impl Iterator<Item = u64> + 'a,
386    ) -> impl Iterator<Item = u64> + 'a {
387        chunk_ids.filter(move |&id| {
388            self.entries.get(&id).map_or(true, |e| {
389                e.might_contain_range(lower, upper, lower_inclusive, upper_inclusive)
390            })
391        })
392    }
393
394    /// Returns chunk IDs sorted by their minimum value.
395    pub fn chunks_ordered_by_min(&self) -> Vec<u64> {
396        let mut chunks: Vec<_> = self.entries.iter().collect();
397        chunks.sort_by(|(_, a), (_, b)| match (&a.min, &b.min) {
398            (Some(a_min), Some(b_min)) => compare_values(a_min, b_min).unwrap_or(Ordering::Equal),
399            (Some(_), None) => Ordering::Less,
400            (None, Some(_)) => Ordering::Greater,
401            (None, None) => Ordering::Equal,
402        });
403        chunks.into_iter().map(|(&id, _)| id).collect()
404    }
405
406    /// Returns overall statistics across all chunks.
407    pub fn overall_stats(&self) -> (Option<Value>, Option<Value>, u64, u64) {
408        let mut min: Option<Value> = None;
409        let mut max: Option<Value> = None;
410        let mut null_count = 0u64;
411        let mut row_count = 0u64;
412
413        for entry in self.entries.values() {
414            null_count += entry.null_count;
415            row_count += entry.row_count;
416
417            if let Some(ref entry_min) = entry.min {
418                min = match min {
419                    None => Some(entry_min.clone()),
420                    Some(ref current) => {
421                        if compare_values(entry_min, current) == Some(Ordering::Less) {
422                            Some(entry_min.clone())
423                        } else {
424                            min
425                        }
426                    }
427                };
428            }
429
430            if let Some(ref entry_max) = entry.max {
431                max = match max {
432                    None => Some(entry_max.clone()),
433                    Some(ref current) => {
434                        if compare_values(entry_max, current) == Some(Ordering::Greater) {
435                            Some(entry_max.clone())
436                        } else {
437                            max
438                        }
439                    }
440                };
441            }
442        }
443
444        (min, max, null_count, row_count)
445    }
446}
447
448/// A probabilistic data structure for fast "definitely not in set" checks.
449///
450/// Bloom filters can have false positives (says "maybe" when absent) but never
451/// false negatives (if it says "no", the value is definitely absent). Use this
452/// to quickly rule out chunks that can't contain a value.
453#[derive(Debug, Clone)]
454pub struct BloomFilter {
455    /// Bit array.
456    bits: Vec<u64>,
457    /// Number of hash functions.
458    num_hashes: usize,
459    /// Number of bits.
460    num_bits: usize,
461}
462
463impl BloomFilter {
464    /// Creates a new Bloom filter.
465    pub fn new(num_bits: usize, num_hashes: usize) -> Self {
466        let num_words = (num_bits + 63) / 64;
467        Self {
468            bits: vec![0; num_words],
469            num_hashes,
470            num_bits,
471        }
472    }
473
474    /// Adds a value to the filter.
475    pub fn add(&mut self, value: &Value) {
476        let hashes = self.compute_hashes(value);
477        for h in hashes {
478            let bit_idx = h % self.num_bits;
479            let word_idx = bit_idx / 64;
480            let bit_pos = bit_idx % 64;
481            self.bits[word_idx] |= 1 << bit_pos;
482        }
483    }
484
485    /// Checks if the filter might contain the value.
486    pub fn might_contain(&self, value: &Value) -> bool {
487        let hashes = self.compute_hashes(value);
488        for h in hashes {
489            let bit_idx = h % self.num_bits;
490            let word_idx = bit_idx / 64;
491            let bit_pos = bit_idx % 64;
492            if (self.bits[word_idx] & (1 << bit_pos)) == 0 {
493                return false;
494            }
495        }
496        true
497    }
498
499    fn compute_hashes(&self, value: &Value) -> Vec<usize> {
500        // Use a simple hash combination scheme
501        let base_hash = value_hash(value);
502        let mut hashes = Vec::with_capacity(self.num_hashes);
503
504        for i in 0..self.num_hashes {
505            // Double hashing: h(i) = h1 + i * h2
506            let h1 = base_hash;
507            let h2 = base_hash.rotate_left(17);
508            // reason: hash value truncated to usize for bit array indexing
509            #[allow(clippy::cast_possible_truncation)]
510            hashes.push((h1.wrapping_add((i as u64).wrapping_mul(h2))) as usize);
511        }
512
513        hashes
514    }
515}
516
517/// Builder for Bloom filters.
518pub struct BloomFilterBuilder {
519    filter: BloomFilter,
520}
521
522impl BloomFilterBuilder {
523    /// Creates a new Bloom filter builder.
524    pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
525        // Calculate optimal number of bits and hashes
526        let num_bits = optimal_num_bits(expected_items, false_positive_rate);
527        let num_hashes = optimal_num_hashes(num_bits, expected_items);
528
529        Self {
530            filter: BloomFilter::new(num_bits, num_hashes),
531        }
532    }
533
534    /// Adds a value to the filter.
535    pub fn add(&mut self, value: &Value) {
536        self.filter.add(value);
537    }
538
539    /// Builds the Bloom filter.
540    pub fn build(self) -> BloomFilter {
541        self.filter
542    }
543}
544
545/// Calculates optimal number of bits for a Bloom filter.
546fn optimal_num_bits(n: usize, p: f64) -> usize {
547    let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
548    // reason: Bloom filter size calculation result is non-negative and bounded
549    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
550    let result = ((-(n as f64) * p.ln()) / ln2_squared).ceil() as usize;
551    result
552}
553
554/// Calculates optimal number of hash functions for a Bloom filter.
555fn optimal_num_hashes(m: usize, n: usize) -> usize {
556    // reason: optimal hash count is non-negative and small
557    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
558    let result = ((m as f64 / n as f64) * std::f64::consts::LN_2).ceil() as usize;
559    result
560}
561
562/// Computes a hash for a value.
563fn value_hash(value: &Value) -> u64 {
564    use std::collections::hash_map::DefaultHasher;
565    use std::hash::{Hash, Hasher};
566
567    let mut hasher = DefaultHasher::new();
568
569    match value {
570        Value::Null => 0u64.hash(&mut hasher),
571        Value::Bool(b) => b.hash(&mut hasher),
572        Value::Int64(i) => i.hash(&mut hasher),
573        Value::Float64(f) => f.to_bits().hash(&mut hasher),
574        Value::String(s) => s.hash(&mut hasher),
575        Value::Bytes(b) => b.hash(&mut hasher),
576        _ => format!("{value:?}").hash(&mut hasher),
577    }
578
579    hasher.finish()
580}
581
582/// Compares two values.
583fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
584    match (a, b) {
585        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
586        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
587        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
588        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
589        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
590        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
591        (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
592        (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
593        (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
594        _ => None,
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601
602    #[test]
603    fn test_zone_map_entry_equal() {
604        let entry = ZoneMapEntry::with_min_max(Value::Int64(10), Value::Int64(100), 0, 100);
605
606        assert!(entry.might_contain_equal(&Value::Int64(50)));
607        assert!(entry.might_contain_equal(&Value::Int64(10)));
608        assert!(entry.might_contain_equal(&Value::Int64(100)));
609        assert!(!entry.might_contain_equal(&Value::Int64(5)));
610        assert!(!entry.might_contain_equal(&Value::Int64(105)));
611    }
612
613    #[test]
614    fn test_zone_map_entry_range() {
615        let entry = ZoneMapEntry::with_min_max(Value::Int64(10), Value::Int64(100), 0, 100);
616
617        // Range fully contains chunk
618        assert!(entry.might_contain_range(
619            Some(&Value::Int64(0)),
620            Some(&Value::Int64(200)),
621            true,
622            true
623        ));
624
625        // Range overlaps
626        assert!(entry.might_contain_range(
627            Some(&Value::Int64(50)),
628            Some(&Value::Int64(150)),
629            true,
630            true
631        ));
632
633        // Range doesn't overlap
634        assert!(!entry.might_contain_range(
635            Some(&Value::Int64(101)),
636            Some(&Value::Int64(200)),
637            true,
638            true
639        ));
640    }
641
642    #[test]
643    fn test_zone_map_builder() {
644        let mut builder = ZoneMapBuilder::new();
645
646        for i in 1..=100 {
647            builder.add(&Value::Int64(i));
648        }
649        builder.add(&Value::Null);
650        builder.add(&Value::Null);
651
652        let entry = builder.build();
653
654        assert_eq!(entry.min, Some(Value::Int64(1)));
655        assert_eq!(entry.max, Some(Value::Int64(100)));
656        assert_eq!(entry.null_count, 2);
657        assert_eq!(entry.row_count, 102);
658    }
659
660    #[test]
661    fn test_zone_map_with_bloom() {
662        let mut builder = ZoneMapBuilder::with_bloom_filter(100, 0.01);
663
664        for i in 1..=100 {
665            builder.add(&Value::Int64(i));
666        }
667
668        let entry = builder.build();
669
670        assert!(entry.bloom_filter.is_some());
671        assert!(entry.might_contain_equal(&Value::Int64(50)));
672        // Note: Bloom filters may have false positives but not false negatives
673    }
674
675    #[test]
676    fn test_zone_map_index() {
677        let mut index = ZoneMapIndex::new("age");
678
679        index.insert(
680            0,
681            ZoneMapEntry::with_min_max(Value::Int64(0), Value::Int64(30), 0, 100),
682        );
683        index.insert(
684            1,
685            ZoneMapEntry::with_min_max(Value::Int64(31), Value::Int64(60), 0, 100),
686        );
687        index.insert(
688            2,
689            ZoneMapEntry::with_min_max(Value::Int64(61), Value::Int64(100), 0, 100),
690        );
691
692        // Filter for age = 25 - should only return chunk 0
693        let matching: Vec<_> = index.filter_equal(&Value::Int64(25), 0..3).collect();
694        assert_eq!(matching, vec![0]);
695
696        // Filter for age = 75 - should only return chunk 2
697        let matching: Vec<_> = index.filter_equal(&Value::Int64(75), 0..3).collect();
698        assert_eq!(matching, vec![2]);
699
700        // Filter for age between 25 and 65 - should return chunks 0, 1, 2
701        let matching: Vec<_> = index
702            .filter_range(
703                Some(&Value::Int64(25)),
704                Some(&Value::Int64(65)),
705                true,
706                true,
707                0..3,
708            )
709            .collect();
710        assert_eq!(matching, vec![0, 1, 2]);
711    }
712
713    #[test]
714    fn test_bloom_filter() {
715        let mut filter = BloomFilter::new(1000, 7);
716
717        for i in 0..100 {
718            filter.add(&Value::Int64(i));
719        }
720
721        // Should definitely contain values we added
722        for i in 0..100 {
723            assert!(filter.might_contain(&Value::Int64(i)));
724        }
725
726        // May have false positives for values not added, but let's test a few
727        // (we can't assert false because of false positive rate)
728        let _ = filter.might_contain(&Value::Int64(1000));
729    }
730
731    #[test]
732    fn test_zone_map_nulls() {
733        let entry = ZoneMapEntry {
734            min: None,
735            max: None,
736            null_count: 10,
737            row_count: 10,
738            bloom_filter: None,
739        };
740
741        assert!(entry.is_all_null());
742        assert!(!entry.might_contain_non_null());
743        assert!(entry.might_contain_equal(&Value::Null));
744        assert!(!entry.might_contain_equal(&Value::Int64(5)));
745    }
746
747    #[test]
748    fn test_zone_map_date_range() {
749        use grafeo_common::types::Date;
750
751        let min_date = Date::from_ymd(2024, 1, 1).unwrap();
752        let max_date = Date::from_ymd(2024, 12, 31).unwrap();
753        let entry =
754            ZoneMapEntry::with_min_max(Value::Date(min_date), Value::Date(max_date), 0, 365);
755
756        // In-range date: should be a candidate
757        let mid = Date::from_ymd(2024, 6, 15).unwrap();
758        assert!(entry.might_contain_equal(&Value::Date(mid)));
759
760        // Boundary dates: should be candidates
761        assert!(entry.might_contain_equal(&Value::Date(min_date)));
762        assert!(entry.might_contain_equal(&Value::Date(max_date)));
763
764        // Out-of-range dates: must be pruned
765        let before = Date::from_ymd(2023, 12, 31).unwrap();
766        let after = Date::from_ymd(2025, 1, 1).unwrap();
767        assert!(!entry.might_contain_equal(&Value::Date(before)));
768        assert!(!entry.might_contain_equal(&Value::Date(after)));
769
770        // Range predicates
771        let range_lo = Date::from_ymd(2024, 3, 1).unwrap();
772        let range_hi = Date::from_ymd(2024, 9, 30).unwrap();
773        assert!(entry.might_contain_range(
774            Some(&Value::Date(range_lo)),
775            Some(&Value::Date(range_hi)),
776            true,
777            true,
778        ));
779
780        // Non-overlapping range (entirely before chunk)
781        let early_lo = Date::from_ymd(2022, 1, 1).unwrap();
782        let early_hi = Date::from_ymd(2023, 6, 30).unwrap();
783        assert!(!entry.might_contain_range(
784            Some(&Value::Date(early_lo)),
785            Some(&Value::Date(early_hi)),
786            true,
787            true,
788        ));
789
790        // Non-overlapping range (entirely after chunk)
791        let late_lo = Date::from_ymd(2025, 2, 1).unwrap();
792        let late_hi = Date::from_ymd(2025, 12, 31).unwrap();
793        assert!(!entry.might_contain_range(
794            Some(&Value::Date(late_lo)),
795            Some(&Value::Date(late_hi)),
796            true,
797            true,
798        ));
799
800        // Builder round-trip
801        let mut builder = ZoneMapBuilder::without_bloom_filter();
802        let dates = [
803            Date::from_ymd(2024, 3, 10).unwrap(),
804            Date::from_ymd(2024, 7, 4).unwrap(),
805            Date::from_ymd(2024, 1, 1).unwrap(),
806            Date::from_ymd(2024, 11, 25).unwrap(),
807        ];
808        for d in &dates {
809            builder.add(&Value::Date(*d));
810        }
811        let built = builder.build();
812        assert_eq!(
813            built.min,
814            Some(Value::Date(Date::from_ymd(2024, 1, 1).unwrap()))
815        );
816        assert_eq!(
817            built.max,
818            Some(Value::Date(Date::from_ymd(2024, 11, 25).unwrap()))
819        );
820        assert_eq!(built.row_count, 4);
821    }
822
823    #[test]
824    fn test_zone_map_timestamp_range() {
825        use grafeo_common::types::Timestamp;
826
827        // 2024-01-01T00:00:00Z and 2024-12-31T23:59:59Z
828        let min_ts = Timestamp::from_secs(1_704_067_200); // 2024-01-01
829        let max_ts = Timestamp::from_secs(1_735_689_599); // 2024-12-31T23:59:59
830        let entry =
831            ZoneMapEntry::with_min_max(Value::Timestamp(min_ts), Value::Timestamp(max_ts), 0, 1000);
832
833        // Mid-range timestamp: should be a candidate
834        let mid = Timestamp::from_secs(1_719_792_000); // ~2024-07-01
835        assert!(entry.might_contain_equal(&Value::Timestamp(mid)));
836
837        // Boundaries
838        assert!(entry.might_contain_equal(&Value::Timestamp(min_ts)));
839        assert!(entry.might_contain_equal(&Value::Timestamp(max_ts)));
840
841        // Out-of-range timestamps
842        let before = Timestamp::from_secs(1_704_067_199); // 1 second before min
843        let after = Timestamp::from_secs(1_735_689_600); // 1 second after max
844        assert!(!entry.might_contain_equal(&Value::Timestamp(before)));
845        assert!(!entry.might_contain_equal(&Value::Timestamp(after)));
846
847        // Less-than predicate
848        assert!(entry.might_contain_less_than(&Value::Timestamp(max_ts), true));
849        assert!(!entry.might_contain_less_than(&Value::Timestamp(before), false));
850
851        // Greater-than predicate
852        assert!(entry.might_contain_greater_than(&Value::Timestamp(min_ts), true));
853        assert!(!entry.might_contain_greater_than(&Value::Timestamp(after), false));
854
855        // Builder round-trip
856        let mut builder = ZoneMapBuilder::without_bloom_filter();
857        builder.add(&Value::Timestamp(Timestamp::from_secs(1_710_000_000)));
858        builder.add(&Value::Timestamp(Timestamp::from_secs(1_720_000_000)));
859        builder.add(&Value::Timestamp(Timestamp::from_secs(1_705_000_000)));
860        builder.add(&Value::Null);
861        let built = builder.build();
862        assert_eq!(
863            built.min,
864            Some(Value::Timestamp(Timestamp::from_secs(1_705_000_000)))
865        );
866        assert_eq!(
867            built.max,
868            Some(Value::Timestamp(Timestamp::from_secs(1_720_000_000)))
869        );
870        assert_eq!(built.null_count, 1);
871        assert_eq!(built.row_count, 4);
872    }
873
874    #[test]
875    fn test_zone_map_float64_range() {
876        let entry = ZoneMapEntry::with_min_max(Value::Float64(1.5), Value::Float64(99.9), 0, 500);
877
878        // In-range value
879        assert!(entry.might_contain_equal(&Value::Float64(50.0)));
880
881        // Boundaries
882        assert!(entry.might_contain_equal(&Value::Float64(1.5)));
883        assert!(entry.might_contain_equal(&Value::Float64(99.9)));
884
885        // Out-of-range values
886        assert!(!entry.might_contain_equal(&Value::Float64(1.0)));
887        assert!(!entry.might_contain_equal(&Value::Float64(100.0)));
888
889        // Range predicates
890        assert!(entry.might_contain_range(
891            Some(&Value::Float64(10.0)),
892            Some(&Value::Float64(90.0)),
893            true,
894            true,
895        ));
896
897        // Non-overlapping range (below)
898        assert!(!entry.might_contain_range(
899            Some(&Value::Float64(-100.0)),
900            Some(&Value::Float64(1.0)),
901            true,
902            true,
903        ));
904
905        // Non-overlapping range (above)
906        assert!(!entry.might_contain_range(
907            Some(&Value::Float64(100.0)),
908            Some(&Value::Float64(200.0)),
909            true,
910            true,
911        ));
912
913        // Less-than and greater-than predicates
914        assert!(entry.might_contain_less_than(&Value::Float64(50.0), false));
915        assert!(!entry.might_contain_less_than(&Value::Float64(1.0), false));
916        assert!(entry.might_contain_less_than(&Value::Float64(1.5), true));
917        assert!(!entry.might_contain_less_than(&Value::Float64(1.5), false));
918
919        assert!(entry.might_contain_greater_than(&Value::Float64(50.0), false));
920        assert!(!entry.might_contain_greater_than(&Value::Float64(100.0), false));
921        assert!(entry.might_contain_greater_than(&Value::Float64(99.9), true));
922        assert!(!entry.might_contain_greater_than(&Value::Float64(99.9), false));
923
924        // Builder round-trip
925        let mut builder = ZoneMapBuilder::without_bloom_filter();
926        let values = [3.15, 2.72, 1.414, 1.618, 42.0];
927        for v in &values {
928            builder.add(&Value::Float64(*v));
929        }
930        let built = builder.build();
931        assert_eq!(built.min, Some(Value::Float64(1.414)));
932        assert_eq!(built.max, Some(Value::Float64(42.0)));
933        assert_eq!(built.row_count, 5);
934        assert_eq!(built.null_count, 0);
935    }
936}