1use grafeo_common::types::Value;
11use std::cmp::Ordering;
12use std::collections::HashMap;
13
14#[derive(Debug, Clone)]
19pub struct ZoneMapEntry {
20 pub min: Option<Value>,
22 pub max: Option<Value>,
24 pub null_count: u64,
26 pub row_count: u64,
28 pub bloom_filter: Option<BloomFilter>,
30}
31
32impl ZoneMapEntry {
33 pub fn new() -> Self {
35 Self {
36 min: None,
37 max: None,
38 null_count: 0,
39 row_count: 0,
40 bloom_filter: None,
41 }
42 }
43
44 pub fn with_min_max(min: Value, max: Value, null_count: u64, row_count: u64) -> Self {
46 Self {
47 min: Some(min),
48 max: Some(max),
49 null_count,
50 row_count,
51 bloom_filter: None,
52 }
53 }
54
55 pub fn with_bloom_filter(mut self, filter: BloomFilter) -> Self {
57 self.bloom_filter = Some(filter);
58 self
59 }
60
61 pub fn might_contain_equal(&self, value: &Value) -> bool {
65 if matches!(value, Value::Null) {
67 return self.null_count > 0;
68 }
69
70 if self.is_all_null() {
72 return false;
73 }
74
75 if let Some(ref bloom) = self.bloom_filter {
77 if !bloom.might_contain(value) {
78 return false;
79 }
80 }
81
82 match (&self.min, &self.max) {
84 (Some(min), Some(max)) => {
85 let cmp_min = compare_values(value, min);
86 let cmp_max = compare_values(value, max);
87
88 match (cmp_min, cmp_max) {
90 (Some(Ordering::Less), _) => false, (_, Some(Ordering::Greater)) => false, _ => true,
93 }
94 }
95 _ => self.might_contain_non_null(),
97 }
98 }
99
100 pub fn might_contain_less_than(&self, value: &Value, inclusive: bool) -> bool {
104 match &self.min {
105 Some(min) => {
106 let cmp = compare_values(min, value);
107 match cmp {
108 Some(Ordering::Less) => true,
109 Some(Ordering::Equal) => inclusive,
110 Some(Ordering::Greater) => false,
111 None => true,
112 }
113 }
114 None => self.null_count > 0, }
116 }
117
118 pub fn might_contain_greater_than(&self, value: &Value, inclusive: bool) -> bool {
122 match &self.max {
123 Some(max) => {
124 let cmp = compare_values(max, value);
125 match cmp {
126 Some(Ordering::Greater) => true,
127 Some(Ordering::Equal) => inclusive,
128 Some(Ordering::Less) => false,
129 None => true,
130 }
131 }
132 None => self.null_count > 0,
133 }
134 }
135
136 pub fn might_contain_range(
138 &self,
139 lower: Option<&Value>,
140 upper: Option<&Value>,
141 lower_inclusive: bool,
142 upper_inclusive: bool,
143 ) -> bool {
144 if let Some(lower_val) = lower {
146 if !self.might_contain_greater_than(lower_val, lower_inclusive) {
147 return false;
148 }
149 }
150
151 if let Some(upper_val) = upper {
153 if !self.might_contain_less_than(upper_val, upper_inclusive) {
154 return false;
155 }
156 }
157
158 true
159 }
160
161 pub fn might_contain_non_null(&self) -> bool {
163 self.row_count > self.null_count
164 }
165
166 pub fn is_all_null(&self) -> bool {
168 self.row_count > 0 && self.null_count == self.row_count
169 }
170
171 pub fn null_fraction(&self) -> f64 {
173 if self.row_count == 0 {
174 0.0
175 } else {
176 self.null_count as f64 / self.row_count as f64
177 }
178 }
179}
180
181impl Default for ZoneMapEntry {
182 fn default() -> Self {
183 Self::new()
184 }
185}
186
187pub struct ZoneMapBuilder {
194 min: Option<Value>,
195 max: Option<Value>,
196 null_count: u64,
197 row_count: u64,
198 bloom_builder: Option<BloomFilterBuilder>,
199}
200
201const DEFAULT_BLOOM_EXPECTED_ITEMS: usize = 2048;
203
204const DEFAULT_BLOOM_FALSE_POSITIVE_RATE: f64 = 0.01;
206
207impl ZoneMapBuilder {
208 pub fn new() -> Self {
214 Self {
215 min: None,
216 max: None,
217 null_count: 0,
218 row_count: 0,
219 bloom_builder: Some(BloomFilterBuilder::new(
220 DEFAULT_BLOOM_EXPECTED_ITEMS,
221 DEFAULT_BLOOM_FALSE_POSITIVE_RATE,
222 )),
223 }
224 }
225
226 pub fn without_bloom_filter() -> Self {
231 Self {
232 min: None,
233 max: None,
234 null_count: 0,
235 row_count: 0,
236 bloom_builder: None,
237 }
238 }
239
240 pub fn with_bloom_filter(expected_items: usize, false_positive_rate: f64) -> Self {
247 Self {
248 min: None,
249 max: None,
250 null_count: 0,
251 row_count: 0,
252 bloom_builder: Some(BloomFilterBuilder::new(expected_items, false_positive_rate)),
253 }
254 }
255
256 pub fn add(&mut self, value: &Value) {
258 self.row_count += 1;
259
260 if matches!(value, Value::Null) {
261 self.null_count += 1;
262 return;
263 }
264
265 self.min = match &self.min {
267 None => Some(value.clone()),
268 Some(current) => {
269 if compare_values(value, current) == Some(Ordering::Less) {
270 Some(value.clone())
271 } else {
272 self.min.clone()
273 }
274 }
275 };
276
277 self.max = match &self.max {
279 None => Some(value.clone()),
280 Some(current) => {
281 if compare_values(value, current) == Some(Ordering::Greater) {
282 Some(value.clone())
283 } else {
284 self.max.clone()
285 }
286 }
287 };
288
289 if let Some(ref mut bloom) = self.bloom_builder {
291 bloom.add(value);
292 }
293 }
294
295 pub fn build(self) -> ZoneMapEntry {
297 let bloom_filter = self.bloom_builder.map(|b| b.build());
298
299 ZoneMapEntry {
300 min: self.min,
301 max: self.max,
302 null_count: self.null_count,
303 row_count: self.row_count,
304 bloom_filter,
305 }
306 }
307}
308
309impl Default for ZoneMapBuilder {
310 fn default() -> Self {
311 Self::new()
312 }
313}
314
315pub struct ZoneMapIndex {
320 entries: HashMap<u64, ZoneMapEntry>,
322 property: String,
324}
325
326impl ZoneMapIndex {
327 pub fn new(property: impl Into<String>) -> Self {
329 Self {
330 entries: HashMap::new(),
331 property: property.into(),
332 }
333 }
334
335 pub fn property(&self) -> &str {
337 &self.property
338 }
339
340 pub fn insert(&mut self, chunk_id: u64, entry: ZoneMapEntry) {
342 self.entries.insert(chunk_id, entry);
343 }
344
345 pub fn get(&self, chunk_id: u64) -> Option<&ZoneMapEntry> {
347 self.entries.get(&chunk_id)
348 }
349
350 pub fn remove(&mut self, chunk_id: u64) -> Option<ZoneMapEntry> {
352 self.entries.remove(&chunk_id)
353 }
354
355 pub fn len(&self) -> usize {
357 self.entries.len()
358 }
359
360 pub fn is_empty(&self) -> bool {
362 self.entries.is_empty()
363 }
364
365 pub fn filter_equal<'a>(
367 &'a self,
368 value: &'a Value,
369 chunk_ids: impl Iterator<Item = u64> + 'a,
370 ) -> impl Iterator<Item = u64> + 'a {
371 chunk_ids.filter(move |&id| {
372 self.entries
373 .get(&id)
374 .map(|e| e.might_contain_equal(value))
375 .unwrap_or(true) })
377 }
378
379 pub fn filter_range<'a>(
381 &'a self,
382 lower: Option<&'a Value>,
383 upper: Option<&'a Value>,
384 lower_inclusive: bool,
385 upper_inclusive: bool,
386 chunk_ids: impl Iterator<Item = u64> + 'a,
387 ) -> impl Iterator<Item = u64> + 'a {
388 chunk_ids.filter(move |&id| {
389 self.entries
390 .get(&id)
391 .map(|e| e.might_contain_range(lower, upper, lower_inclusive, upper_inclusive))
392 .unwrap_or(true)
393 })
394 }
395
396 pub fn chunks_ordered_by_min(&self) -> Vec<u64> {
398 let mut chunks: Vec<_> = self.entries.iter().collect();
399 chunks.sort_by(|(_, a), (_, b)| match (&a.min, &b.min) {
400 (Some(a_min), Some(b_min)) => compare_values(a_min, b_min).unwrap_or(Ordering::Equal),
401 (Some(_), None) => Ordering::Less,
402 (None, Some(_)) => Ordering::Greater,
403 (None, None) => Ordering::Equal,
404 });
405 chunks.into_iter().map(|(&id, _)| id).collect()
406 }
407
408 pub fn overall_stats(&self) -> (Option<Value>, Option<Value>, u64, u64) {
410 let mut min: Option<Value> = None;
411 let mut max: Option<Value> = None;
412 let mut null_count = 0u64;
413 let mut row_count = 0u64;
414
415 for entry in self.entries.values() {
416 null_count += entry.null_count;
417 row_count += entry.row_count;
418
419 if let Some(ref entry_min) = entry.min {
420 min = match min {
421 None => Some(entry_min.clone()),
422 Some(ref current) => {
423 if compare_values(entry_min, current) == Some(Ordering::Less) {
424 Some(entry_min.clone())
425 } else {
426 min
427 }
428 }
429 };
430 }
431
432 if let Some(ref entry_max) = entry.max {
433 max = match max {
434 None => Some(entry_max.clone()),
435 Some(ref current) => {
436 if compare_values(entry_max, current) == Some(Ordering::Greater) {
437 Some(entry_max.clone())
438 } else {
439 max
440 }
441 }
442 };
443 }
444 }
445
446 (min, max, null_count, row_count)
447 }
448}
449
450#[derive(Debug, Clone)]
456pub struct BloomFilter {
457 bits: Vec<u64>,
459 num_hashes: usize,
461 num_bits: usize,
463}
464
465impl BloomFilter {
466 pub fn new(num_bits: usize, num_hashes: usize) -> Self {
468 let num_words = (num_bits + 63) / 64;
469 Self {
470 bits: vec![0; num_words],
471 num_hashes,
472 num_bits,
473 }
474 }
475
476 pub fn add(&mut self, value: &Value) {
478 let hashes = self.compute_hashes(value);
479 for h in hashes {
480 let bit_idx = h % self.num_bits;
481 let word_idx = bit_idx / 64;
482 let bit_pos = bit_idx % 64;
483 self.bits[word_idx] |= 1 << bit_pos;
484 }
485 }
486
487 pub fn might_contain(&self, value: &Value) -> bool {
489 let hashes = self.compute_hashes(value);
490 for h in hashes {
491 let bit_idx = h % self.num_bits;
492 let word_idx = bit_idx / 64;
493 let bit_pos = bit_idx % 64;
494 if (self.bits[word_idx] & (1 << bit_pos)) == 0 {
495 return false;
496 }
497 }
498 true
499 }
500
501 fn compute_hashes(&self, value: &Value) -> Vec<usize> {
502 let base_hash = value_hash(value);
504 let mut hashes = Vec::with_capacity(self.num_hashes);
505
506 for i in 0..self.num_hashes {
507 let h1 = base_hash;
509 let h2 = base_hash.rotate_left(17);
510 hashes.push((h1.wrapping_add((i as u64).wrapping_mul(h2))) as usize);
511 }
512
513 hashes
514 }
515}
516
517pub struct BloomFilterBuilder {
519 filter: BloomFilter,
520}
521
522impl BloomFilterBuilder {
523 pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
525 let num_bits = optimal_num_bits(expected_items, false_positive_rate);
527 let num_hashes = optimal_num_hashes(num_bits, expected_items);
528
529 Self {
530 filter: BloomFilter::new(num_bits, num_hashes),
531 }
532 }
533
534 pub fn add(&mut self, value: &Value) {
536 self.filter.add(value);
537 }
538
539 pub fn build(self) -> BloomFilter {
541 self.filter
542 }
543}
544
545fn optimal_num_bits(n: usize, p: f64) -> usize {
547 let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
548 ((-(n as f64) * p.ln()) / ln2_squared).ceil() as usize
549}
550
551fn optimal_num_hashes(m: usize, n: usize) -> usize {
553 ((m as f64 / n as f64) * std::f64::consts::LN_2).ceil() as usize
554}
555
556fn value_hash(value: &Value) -> u64 {
558 use std::collections::hash_map::DefaultHasher;
559 use std::hash::{Hash, Hasher};
560
561 let mut hasher = DefaultHasher::new();
562
563 match value {
564 Value::Null => 0u64.hash(&mut hasher),
565 Value::Bool(b) => b.hash(&mut hasher),
566 Value::Int64(i) => i.hash(&mut hasher),
567 Value::Float64(f) => f.to_bits().hash(&mut hasher),
568 Value::String(s) => s.hash(&mut hasher),
569 Value::Bytes(b) => b.hash(&mut hasher),
570 _ => format!("{value:?}").hash(&mut hasher),
571 }
572
573 hasher.finish()
574}
575
576fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
578 match (a, b) {
579 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
580 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
581 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
582 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
583 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
584 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
585 _ => None,
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592
593 #[test]
594 fn test_zone_map_entry_equal() {
595 let entry = ZoneMapEntry::with_min_max(Value::Int64(10), Value::Int64(100), 0, 100);
596
597 assert!(entry.might_contain_equal(&Value::Int64(50)));
598 assert!(entry.might_contain_equal(&Value::Int64(10)));
599 assert!(entry.might_contain_equal(&Value::Int64(100)));
600 assert!(!entry.might_contain_equal(&Value::Int64(5)));
601 assert!(!entry.might_contain_equal(&Value::Int64(105)));
602 }
603
604 #[test]
605 fn test_zone_map_entry_range() {
606 let entry = ZoneMapEntry::with_min_max(Value::Int64(10), Value::Int64(100), 0, 100);
607
608 assert!(entry.might_contain_range(
610 Some(&Value::Int64(0)),
611 Some(&Value::Int64(200)),
612 true,
613 true
614 ));
615
616 assert!(entry.might_contain_range(
618 Some(&Value::Int64(50)),
619 Some(&Value::Int64(150)),
620 true,
621 true
622 ));
623
624 assert!(!entry.might_contain_range(
626 Some(&Value::Int64(101)),
627 Some(&Value::Int64(200)),
628 true,
629 true
630 ));
631 }
632
633 #[test]
634 fn test_zone_map_builder() {
635 let mut builder = ZoneMapBuilder::new();
636
637 for i in 1..=100 {
638 builder.add(&Value::Int64(i));
639 }
640 builder.add(&Value::Null);
641 builder.add(&Value::Null);
642
643 let entry = builder.build();
644
645 assert_eq!(entry.min, Some(Value::Int64(1)));
646 assert_eq!(entry.max, Some(Value::Int64(100)));
647 assert_eq!(entry.null_count, 2);
648 assert_eq!(entry.row_count, 102);
649 }
650
651 #[test]
652 fn test_zone_map_with_bloom() {
653 let mut builder = ZoneMapBuilder::with_bloom_filter(100, 0.01);
654
655 for i in 1..=100 {
656 builder.add(&Value::Int64(i));
657 }
658
659 let entry = builder.build();
660
661 assert!(entry.bloom_filter.is_some());
662 assert!(entry.might_contain_equal(&Value::Int64(50)));
663 }
665
666 #[test]
667 fn test_zone_map_index() {
668 let mut index = ZoneMapIndex::new("age");
669
670 index.insert(
671 0,
672 ZoneMapEntry::with_min_max(Value::Int64(0), Value::Int64(30), 0, 100),
673 );
674 index.insert(
675 1,
676 ZoneMapEntry::with_min_max(Value::Int64(31), Value::Int64(60), 0, 100),
677 );
678 index.insert(
679 2,
680 ZoneMapEntry::with_min_max(Value::Int64(61), Value::Int64(100), 0, 100),
681 );
682
683 let matching: Vec<_> = index.filter_equal(&Value::Int64(25), 0..3).collect();
685 assert_eq!(matching, vec![0]);
686
687 let matching: Vec<_> = index.filter_equal(&Value::Int64(75), 0..3).collect();
689 assert_eq!(matching, vec![2]);
690
691 let matching: Vec<_> = index
693 .filter_range(
694 Some(&Value::Int64(25)),
695 Some(&Value::Int64(65)),
696 true,
697 true,
698 0..3,
699 )
700 .collect();
701 assert_eq!(matching, vec![0, 1, 2]);
702 }
703
704 #[test]
705 fn test_bloom_filter() {
706 let mut filter = BloomFilter::new(1000, 7);
707
708 for i in 0..100 {
709 filter.add(&Value::Int64(i));
710 }
711
712 for i in 0..100 {
714 assert!(filter.might_contain(&Value::Int64(i)));
715 }
716
717 let _ = filter.might_contain(&Value::Int64(1000));
720 }
721
722 #[test]
723 fn test_zone_map_nulls() {
724 let entry = ZoneMapEntry {
725 min: None,
726 max: None,
727 null_count: 10,
728 row_count: 10,
729 bloom_filter: None,
730 };
731
732 assert!(entry.is_all_null());
733 assert!(!entry.might_contain_non_null());
734 assert!(entry.might_contain_equal(&Value::Null));
735 assert!(!entry.might_contain_equal(&Value::Int64(5)));
736 }
737}