1use graphos_common::types::Value;
10use std::cmp::Ordering;
11use std::collections::HashMap;
12
13#[derive(Debug, Clone)]
15pub struct ZoneMapEntry {
16 pub min: Option<Value>,
18 pub max: Option<Value>,
20 pub null_count: u64,
22 pub row_count: u64,
24 pub bloom_filter: Option<BloomFilter>,
26}
27
28impl ZoneMapEntry {
29 pub fn new() -> Self {
31 Self {
32 min: None,
33 max: None,
34 null_count: 0,
35 row_count: 0,
36 bloom_filter: None,
37 }
38 }
39
40 pub fn with_min_max(min: Value, max: Value, null_count: u64, row_count: u64) -> Self {
42 Self {
43 min: Some(min),
44 max: Some(max),
45 null_count,
46 row_count,
47 bloom_filter: None,
48 }
49 }
50
51 pub fn with_bloom_filter(mut self, filter: BloomFilter) -> Self {
53 self.bloom_filter = Some(filter);
54 self
55 }
56
57 pub fn might_contain_equal(&self, value: &Value) -> bool {
61 if matches!(value, Value::Null) {
63 return self.null_count > 0;
64 }
65
66 if self.is_all_null() {
68 return false;
69 }
70
71 if let Some(ref bloom) = self.bloom_filter {
73 if !bloom.might_contain(value) {
74 return false;
75 }
76 }
77
78 match (&self.min, &self.max) {
80 (Some(min), Some(max)) => {
81 let cmp_min = compare_values(value, min);
82 let cmp_max = compare_values(value, max);
83
84 match (cmp_min, cmp_max) {
86 (Some(Ordering::Less), _) => false, (_, Some(Ordering::Greater)) => false, _ => true,
89 }
90 }
91 _ => self.might_contain_non_null(),
93 }
94 }
95
96 pub fn might_contain_less_than(&self, value: &Value, inclusive: bool) -> bool {
100 match &self.min {
101 Some(min) => {
102 let cmp = compare_values(min, value);
103 match cmp {
104 Some(Ordering::Less) => true,
105 Some(Ordering::Equal) => inclusive,
106 Some(Ordering::Greater) => false,
107 None => true,
108 }
109 }
110 None => self.null_count > 0, }
112 }
113
114 pub fn might_contain_greater_than(&self, value: &Value, inclusive: bool) -> bool {
118 match &self.max {
119 Some(max) => {
120 let cmp = compare_values(max, value);
121 match cmp {
122 Some(Ordering::Greater) => true,
123 Some(Ordering::Equal) => inclusive,
124 Some(Ordering::Less) => false,
125 None => true,
126 }
127 }
128 None => self.null_count > 0,
129 }
130 }
131
132 pub fn might_contain_range(
134 &self,
135 lower: Option<&Value>,
136 upper: Option<&Value>,
137 lower_inclusive: bool,
138 upper_inclusive: bool,
139 ) -> bool {
140 if let Some(lower_val) = lower {
142 if !self.might_contain_greater_than(lower_val, lower_inclusive) {
143 return false;
144 }
145 }
146
147 if let Some(upper_val) = upper {
149 if !self.might_contain_less_than(upper_val, upper_inclusive) {
150 return false;
151 }
152 }
153
154 true
155 }
156
157 pub fn might_contain_non_null(&self) -> bool {
159 self.row_count > self.null_count
160 }
161
162 pub fn is_all_null(&self) -> bool {
164 self.row_count > 0 && self.null_count == self.row_count
165 }
166
167 pub fn null_fraction(&self) -> f64 {
169 if self.row_count == 0 {
170 0.0
171 } else {
172 self.null_count as f64 / self.row_count as f64
173 }
174 }
175}
176
177impl Default for ZoneMapEntry {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183pub struct ZoneMapBuilder {
185 min: Option<Value>,
186 max: Option<Value>,
187 null_count: u64,
188 row_count: u64,
189 bloom_builder: Option<BloomFilterBuilder>,
190}
191
192impl ZoneMapBuilder {
193 pub fn new() -> Self {
195 Self {
196 min: None,
197 max: None,
198 null_count: 0,
199 row_count: 0,
200 bloom_builder: None,
201 }
202 }
203
204 pub fn with_bloom_filter(expected_items: usize, false_positive_rate: f64) -> Self {
206 Self {
207 min: None,
208 max: None,
209 null_count: 0,
210 row_count: 0,
211 bloom_builder: Some(BloomFilterBuilder::new(expected_items, false_positive_rate)),
212 }
213 }
214
215 pub fn add(&mut self, value: &Value) {
217 self.row_count += 1;
218
219 if matches!(value, Value::Null) {
220 self.null_count += 1;
221 return;
222 }
223
224 self.min = match &self.min {
226 None => Some(value.clone()),
227 Some(current) => {
228 if compare_values(value, current) == Some(Ordering::Less) {
229 Some(value.clone())
230 } else {
231 self.min.clone()
232 }
233 }
234 };
235
236 self.max = match &self.max {
238 None => Some(value.clone()),
239 Some(current) => {
240 if compare_values(value, current) == Some(Ordering::Greater) {
241 Some(value.clone())
242 } else {
243 self.max.clone()
244 }
245 }
246 };
247
248 if let Some(ref mut bloom) = self.bloom_builder {
250 bloom.add(value);
251 }
252 }
253
254 pub fn build(self) -> ZoneMapEntry {
256 let bloom_filter = self.bloom_builder.map(|b| b.build());
257
258 ZoneMapEntry {
259 min: self.min,
260 max: self.max,
261 null_count: self.null_count,
262 row_count: self.row_count,
263 bloom_filter,
264 }
265 }
266}
267
268impl Default for ZoneMapBuilder {
269 fn default() -> Self {
270 Self::new()
271 }
272}
273
274pub struct ZoneMapIndex {
278 entries: HashMap<u64, ZoneMapEntry>,
280 property: String,
282}
283
284impl ZoneMapIndex {
285 pub fn new(property: impl Into<String>) -> Self {
287 Self {
288 entries: HashMap::new(),
289 property: property.into(),
290 }
291 }
292
293 pub fn property(&self) -> &str {
295 &self.property
296 }
297
298 pub fn insert(&mut self, chunk_id: u64, entry: ZoneMapEntry) {
300 self.entries.insert(chunk_id, entry);
301 }
302
303 pub fn get(&self, chunk_id: u64) -> Option<&ZoneMapEntry> {
305 self.entries.get(&chunk_id)
306 }
307
308 pub fn remove(&mut self, chunk_id: u64) -> Option<ZoneMapEntry> {
310 self.entries.remove(&chunk_id)
311 }
312
313 pub fn len(&self) -> usize {
315 self.entries.len()
316 }
317
318 pub fn is_empty(&self) -> bool {
320 self.entries.is_empty()
321 }
322
323 pub fn filter_equal<'a>(
325 &'a self,
326 value: &'a Value,
327 chunk_ids: impl Iterator<Item = u64> + 'a,
328 ) -> impl Iterator<Item = u64> + 'a {
329 chunk_ids.filter(move |&id| {
330 self.entries
331 .get(&id)
332 .map(|e| e.might_contain_equal(value))
333 .unwrap_or(true) })
335 }
336
337 pub fn filter_range<'a>(
339 &'a self,
340 lower: Option<&'a Value>,
341 upper: Option<&'a Value>,
342 lower_inclusive: bool,
343 upper_inclusive: bool,
344 chunk_ids: impl Iterator<Item = u64> + 'a,
345 ) -> impl Iterator<Item = u64> + 'a {
346 chunk_ids.filter(move |&id| {
347 self.entries
348 .get(&id)
349 .map(|e| e.might_contain_range(lower, upper, lower_inclusive, upper_inclusive))
350 .unwrap_or(true)
351 })
352 }
353
354 pub fn chunks_ordered_by_min(&self) -> Vec<u64> {
356 let mut chunks: Vec<_> = self.entries.iter().collect();
357 chunks.sort_by(|(_, a), (_, b)| match (&a.min, &b.min) {
358 (Some(a_min), Some(b_min)) => compare_values(a_min, b_min).unwrap_or(Ordering::Equal),
359 (Some(_), None) => Ordering::Less,
360 (None, Some(_)) => Ordering::Greater,
361 (None, None) => Ordering::Equal,
362 });
363 chunks.into_iter().map(|(&id, _)| id).collect()
364 }
365
366 pub fn overall_stats(&self) -> (Option<Value>, Option<Value>, u64, u64) {
368 let mut min: Option<Value> = None;
369 let mut max: Option<Value> = None;
370 let mut null_count = 0u64;
371 let mut row_count = 0u64;
372
373 for entry in self.entries.values() {
374 null_count += entry.null_count;
375 row_count += entry.row_count;
376
377 if let Some(ref entry_min) = entry.min {
378 min = match min {
379 None => Some(entry_min.clone()),
380 Some(ref current) => {
381 if compare_values(entry_min, current) == Some(Ordering::Less) {
382 Some(entry_min.clone())
383 } else {
384 min
385 }
386 }
387 };
388 }
389
390 if let Some(ref entry_max) = entry.max {
391 max = match max {
392 None => Some(entry_max.clone()),
393 Some(ref current) => {
394 if compare_values(entry_max, current) == Some(Ordering::Greater) {
395 Some(entry_max.clone())
396 } else {
397 max
398 }
399 }
400 };
401 }
402 }
403
404 (min, max, null_count, row_count)
405 }
406}
407
408#[derive(Debug, Clone)]
410pub struct BloomFilter {
411 bits: Vec<u64>,
413 num_hashes: usize,
415 num_bits: usize,
417}
418
419impl BloomFilter {
420 pub fn new(num_bits: usize, num_hashes: usize) -> Self {
422 let num_words = (num_bits + 63) / 64;
423 Self {
424 bits: vec![0; num_words],
425 num_hashes,
426 num_bits,
427 }
428 }
429
430 pub fn add(&mut self, value: &Value) {
432 let hashes = self.compute_hashes(value);
433 for h in hashes {
434 let bit_idx = h % self.num_bits;
435 let word_idx = bit_idx / 64;
436 let bit_pos = bit_idx % 64;
437 self.bits[word_idx] |= 1 << bit_pos;
438 }
439 }
440
441 pub fn might_contain(&self, value: &Value) -> bool {
443 let hashes = self.compute_hashes(value);
444 for h in hashes {
445 let bit_idx = h % self.num_bits;
446 let word_idx = bit_idx / 64;
447 let bit_pos = bit_idx % 64;
448 if (self.bits[word_idx] & (1 << bit_pos)) == 0 {
449 return false;
450 }
451 }
452 true
453 }
454
455 fn compute_hashes(&self, value: &Value) -> Vec<usize> {
456 let base_hash = value_hash(value);
458 let mut hashes = Vec::with_capacity(self.num_hashes);
459
460 for i in 0..self.num_hashes {
461 let h1 = base_hash;
463 let h2 = base_hash.rotate_left(17);
464 hashes.push((h1.wrapping_add((i as u64).wrapping_mul(h2))) as usize);
465 }
466
467 hashes
468 }
469}
470
471pub struct BloomFilterBuilder {
473 filter: BloomFilter,
474}
475
476impl BloomFilterBuilder {
477 pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
479 let num_bits = optimal_num_bits(expected_items, false_positive_rate);
481 let num_hashes = optimal_num_hashes(num_bits, expected_items);
482
483 Self {
484 filter: BloomFilter::new(num_bits, num_hashes),
485 }
486 }
487
488 pub fn add(&mut self, value: &Value) {
490 self.filter.add(value);
491 }
492
493 pub fn build(self) -> BloomFilter {
495 self.filter
496 }
497}
498
499fn optimal_num_bits(n: usize, p: f64) -> usize {
501 let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
502 ((-(n as f64) * p.ln()) / ln2_squared).ceil() as usize
503}
504
505fn optimal_num_hashes(m: usize, n: usize) -> usize {
507 ((m as f64 / n as f64) * std::f64::consts::LN_2).ceil() as usize
508}
509
510fn value_hash(value: &Value) -> u64 {
512 use std::collections::hash_map::DefaultHasher;
513 use std::hash::{Hash, Hasher};
514
515 let mut hasher = DefaultHasher::new();
516
517 match value {
518 Value::Null => 0u64.hash(&mut hasher),
519 Value::Bool(b) => b.hash(&mut hasher),
520 Value::Int64(i) => i.hash(&mut hasher),
521 Value::Float64(f) => f.to_bits().hash(&mut hasher),
522 Value::String(s) => s.hash(&mut hasher),
523 Value::Bytes(b) => b.hash(&mut hasher),
524 _ => format!("{value:?}").hash(&mut hasher),
525 }
526
527 hasher.finish()
528}
529
530fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
532 match (a, b) {
533 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
534 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
535 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
536 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
537 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
538 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
539 _ => None,
540 }
541}
542
543#[cfg(test)]
544mod tests {
545 use super::*;
546
547 #[test]
548 fn test_zone_map_entry_equal() {
549 let entry = ZoneMapEntry::with_min_max(Value::Int64(10), Value::Int64(100), 0, 100);
550
551 assert!(entry.might_contain_equal(&Value::Int64(50)));
552 assert!(entry.might_contain_equal(&Value::Int64(10)));
553 assert!(entry.might_contain_equal(&Value::Int64(100)));
554 assert!(!entry.might_contain_equal(&Value::Int64(5)));
555 assert!(!entry.might_contain_equal(&Value::Int64(105)));
556 }
557
558 #[test]
559 fn test_zone_map_entry_range() {
560 let entry = ZoneMapEntry::with_min_max(Value::Int64(10), Value::Int64(100), 0, 100);
561
562 assert!(entry.might_contain_range(
564 Some(&Value::Int64(0)),
565 Some(&Value::Int64(200)),
566 true,
567 true
568 ));
569
570 assert!(entry.might_contain_range(
572 Some(&Value::Int64(50)),
573 Some(&Value::Int64(150)),
574 true,
575 true
576 ));
577
578 assert!(!entry.might_contain_range(
580 Some(&Value::Int64(101)),
581 Some(&Value::Int64(200)),
582 true,
583 true
584 ));
585 }
586
587 #[test]
588 fn test_zone_map_builder() {
589 let mut builder = ZoneMapBuilder::new();
590
591 for i in 1..=100 {
592 builder.add(&Value::Int64(i));
593 }
594 builder.add(&Value::Null);
595 builder.add(&Value::Null);
596
597 let entry = builder.build();
598
599 assert_eq!(entry.min, Some(Value::Int64(1)));
600 assert_eq!(entry.max, Some(Value::Int64(100)));
601 assert_eq!(entry.null_count, 2);
602 assert_eq!(entry.row_count, 102);
603 }
604
605 #[test]
606 fn test_zone_map_with_bloom() {
607 let mut builder = ZoneMapBuilder::with_bloom_filter(100, 0.01);
608
609 for i in 1..=100 {
610 builder.add(&Value::Int64(i));
611 }
612
613 let entry = builder.build();
614
615 assert!(entry.bloom_filter.is_some());
616 assert!(entry.might_contain_equal(&Value::Int64(50)));
617 }
619
620 #[test]
621 fn test_zone_map_index() {
622 let mut index = ZoneMapIndex::new("age");
623
624 index.insert(
625 0,
626 ZoneMapEntry::with_min_max(Value::Int64(0), Value::Int64(30), 0, 100),
627 );
628 index.insert(
629 1,
630 ZoneMapEntry::with_min_max(Value::Int64(31), Value::Int64(60), 0, 100),
631 );
632 index.insert(
633 2,
634 ZoneMapEntry::with_min_max(Value::Int64(61), Value::Int64(100), 0, 100),
635 );
636
637 let matching: Vec<_> = index.filter_equal(&Value::Int64(25), 0..3).collect();
639 assert_eq!(matching, vec![0]);
640
641 let matching: Vec<_> = index.filter_equal(&Value::Int64(75), 0..3).collect();
643 assert_eq!(matching, vec![2]);
644
645 let matching: Vec<_> = index
647 .filter_range(
648 Some(&Value::Int64(25)),
649 Some(&Value::Int64(65)),
650 true,
651 true,
652 0..3,
653 )
654 .collect();
655 assert_eq!(matching, vec![0, 1, 2]);
656 }
657
658 #[test]
659 fn test_bloom_filter() {
660 let mut filter = BloomFilter::new(1000, 7);
661
662 for i in 0..100 {
663 filter.add(&Value::Int64(i));
664 }
665
666 for i in 0..100 {
668 assert!(filter.might_contain(&Value::Int64(i)));
669 }
670
671 let _ = filter.might_contain(&Value::Int64(1000));
674 }
675
676 #[test]
677 fn test_zone_map_nulls() {
678 let entry = ZoneMapEntry {
679 min: None,
680 max: None,
681 null_count: 10,
682 row_count: 10,
683 bloom_filter: None,
684 };
685
686 assert!(entry.is_all_null());
687 assert!(!entry.might_contain_non_null());
688 assert!(entry.might_contain_equal(&Value::Null));
689 assert!(!entry.might_contain_equal(&Value::Int64(5)));
690 }
691}