reddb_server/storage/index/
zone_map.rs1use crate::storage::index::bloom_segment::BloomSegment;
36use crate::storage::index::stats::{IndexKind, IndexStats};
37use crate::storage::index::{HasBloom, IndexBase};
38use crate::storage::primitives::HyperLogLog;
39
40#[derive(Debug, Clone)]
42pub enum ZonePredicate<'a> {
43 Equals(&'a [u8]),
45 Range {
47 start: Option<&'a [u8]>,
48 end: Option<&'a [u8]>,
49 },
50 IsNull,
52 IsNotNull,
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum ZoneDecision {
59 MustRead,
61 Skip,
63}
64
65pub struct ZoneMap {
67 min_key: Option<Vec<u8>>,
68 max_key: Option<Vec<u8>>,
69 total_count: u64,
70 null_count: u64,
71 hll: HyperLogLog,
72 bloom: BloomSegment,
73}
74
75impl ZoneMap {
76 pub fn with_capacity(expected_rows: usize) -> Self {
79 Self {
80 min_key: None,
81 max_key: None,
82 total_count: 0,
83 null_count: 0,
84 hll: HyperLogLog::new(),
85 bloom: BloomSegment::with_capacity(expected_rows.max(64)),
86 }
87 }
88
89 pub fn new() -> Self {
91 Self::with_capacity(128)
92 }
93
94 pub fn observe(&mut self, key: &[u8]) {
96 self.total_count = self.total_count.saturating_add(1);
97 self.hll.add(key);
98 self.bloom.insert(key);
99
100 match &self.min_key {
101 None => self.min_key = Some(key.to_vec()),
102 Some(cur) if key < cur.as_slice() => self.min_key = Some(key.to_vec()),
103 _ => {}
104 }
105 match &self.max_key {
106 None => self.max_key = Some(key.to_vec()),
107 Some(cur) if key > cur.as_slice() => self.max_key = Some(key.to_vec()),
108 _ => {}
109 }
110 }
111
112 pub fn observe_null(&mut self) {
115 self.total_count = self.total_count.saturating_add(1);
116 self.null_count = self.null_count.saturating_add(1);
117 }
118
119 pub fn min(&self) -> Option<&[u8]> {
121 self.min_key.as_deref()
122 }
123
124 pub fn max(&self) -> Option<&[u8]> {
126 self.max_key.as_deref()
127 }
128
129 pub fn total_count(&self) -> u64 {
131 self.total_count
132 }
133
134 pub fn null_count(&self) -> u64 {
136 self.null_count
137 }
138
139 pub fn non_null_count(&self) -> u64 {
141 self.total_count.saturating_sub(self.null_count)
142 }
143
144 pub fn distinct_estimate(&self) -> u64 {
146 self.hll.count()
147 }
148
149 pub fn bloom(&self) -> &BloomSegment {
151 &self.bloom
152 }
153
154 pub fn block_skip(&self, predicate: &ZonePredicate<'_>) -> ZoneDecision {
158 if self.total_count == 0 {
160 return ZoneDecision::Skip;
161 }
162
163 match predicate {
164 ZonePredicate::Equals(key) => {
165 if let (Some(min), Some(max)) = (self.min(), self.max()) {
167 if *key < min || *key > max {
168 return ZoneDecision::Skip;
169 }
170 }
171 if self.bloom.definitely_absent(key) {
173 return ZoneDecision::Skip;
174 }
175 ZoneDecision::MustRead
176 }
177 ZonePredicate::Range { start, end } => {
178 if let (Some(a), Some(qend)) = (self.min(), end) {
179 if *qend < a {
180 return ZoneDecision::Skip;
181 }
182 }
183 if let (Some(b), Some(qstart)) = (self.max(), start) {
184 if *qstart > b {
185 return ZoneDecision::Skip;
186 }
187 }
188 ZoneDecision::MustRead
189 }
190 ZonePredicate::IsNull => {
191 if self.null_count == 0 {
192 ZoneDecision::Skip
193 } else {
194 ZoneDecision::MustRead
195 }
196 }
197 ZonePredicate::IsNotNull => {
198 if self.non_null_count() == 0 {
199 ZoneDecision::Skip
200 } else {
201 ZoneDecision::MustRead
202 }
203 }
204 }
205 }
206
207 pub fn union(&mut self, other: &ZoneMap) {
210 self.total_count = self.total_count.saturating_add(other.total_count);
211 self.null_count = self.null_count.saturating_add(other.null_count);
212
213 match (&self.min_key, &other.min_key) {
214 (None, Some(o)) => self.min_key = Some(o.clone()),
215 (Some(s), Some(o)) if o < s => self.min_key = Some(o.clone()),
216 _ => {}
217 }
218 match (&self.max_key, &other.max_key) {
219 (None, Some(o)) => self.max_key = Some(o.clone()),
220 (Some(s), Some(o)) if o > s => self.max_key = Some(o.clone()),
221 _ => {}
222 }
223
224 self.hll.merge(&other.hll);
225 let _ = self.bloom.union_inplace(&other.bloom);
229 }
230
231 pub fn clear(&mut self) {
233 self.min_key = None;
234 self.max_key = None;
235 self.total_count = 0;
236 self.null_count = 0;
237 self.hll.clear();
238 self.bloom = BloomSegment::with_capacity(128);
240 }
241}
242
243impl Default for ZoneMap {
244 fn default() -> Self {
245 Self::new()
246 }
247}
248
249impl HasBloom for ZoneMap {
250 fn bloom_segment(&self) -> Option<&BloomSegment> {
251 Some(&self.bloom)
252 }
253}
254
255impl IndexBase for ZoneMap {
256 fn name(&self) -> &str {
257 "zone_map"
258 }
259
260 fn kind(&self) -> IndexKind {
261 IndexKind::ZoneMap
262 }
263
264 fn stats(&self) -> IndexStats {
265 IndexStats {
266 entries: self.total_count as usize,
267 distinct_keys: self.distinct_estimate() as usize,
268 approx_bytes: self.bloom.filter().byte_size() + 16 * 1024, kind: IndexKind::ZoneMap,
270 has_bloom: true,
271 index_correlation: 0.0,
272 }
273 }
274
275 fn bloom(&self) -> Option<&crate::storage::primitives::BloomFilter> {
276 Some(self.bloom.filter())
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283
284 #[test]
285 fn tracks_min_max() {
286 let mut zm = ZoneMap::with_capacity(64);
287 zm.observe(b"delta");
288 zm.observe(b"alpha");
289 zm.observe(b"charlie");
290 zm.observe(b"beta");
291 assert_eq!(zm.min(), Some(b"alpha".as_slice()));
292 assert_eq!(zm.max(), Some(b"delta".as_slice()));
293 assert_eq!(zm.total_count(), 4);
294 assert_eq!(zm.null_count(), 0);
295 }
296
297 #[test]
298 fn tracks_nulls_separately() {
299 let mut zm = ZoneMap::with_capacity(64);
300 zm.observe(b"a");
301 zm.observe_null();
302 zm.observe_null();
303 zm.observe(b"b");
304 assert_eq!(zm.total_count(), 4);
305 assert_eq!(zm.null_count(), 2);
306 assert_eq!(zm.non_null_count(), 2);
307 }
308
309 #[test]
310 fn distinct_estimate_approximates_cardinality() {
311 let mut zm = ZoneMap::with_capacity(2000);
312 for i in 0..1000 {
313 zm.observe(format!("user{i}").as_bytes());
314 }
315 for i in 0..1000 {
317 zm.observe(format!("user{i}").as_bytes());
318 }
319 let est = zm.distinct_estimate();
320 assert!(est > 900 && est < 1100, "estimate={est}");
322 }
323
324 #[test]
325 fn block_skip_equality_out_of_range() {
326 let mut zm = ZoneMap::with_capacity(64);
327 zm.observe(b"mango");
328 zm.observe(b"orange");
329 zm.observe(b"peach");
330 assert_eq!(
332 zm.block_skip(&ZonePredicate::Equals(b"apple")),
333 ZoneDecision::Skip
334 );
335 assert_eq!(
337 zm.block_skip(&ZonePredicate::Equals(b"strawberry")),
338 ZoneDecision::Skip
339 );
340 assert_eq!(
342 zm.block_skip(&ZonePredicate::Equals(b"mango")),
343 ZoneDecision::MustRead
344 );
345 }
346
347 #[test]
348 fn block_skip_equality_bloom_prune() {
349 let mut zm = ZoneMap::with_capacity(1024);
350 zm.observe(b"alpha");
351 zm.observe(b"zulu");
352 let decision = zm.block_skip(&ZonePredicate::Equals(b"needle"));
355 assert!(matches!(
359 decision,
360 ZoneDecision::Skip | ZoneDecision::MustRead
361 ));
362 }
363
364 #[test]
365 fn block_skip_range_non_overlapping() {
366 let mut zm = ZoneMap::with_capacity(64);
367 zm.observe(&10u32.to_be_bytes());
368 zm.observe(&50u32.to_be_bytes());
369 zm.observe(&100u32.to_be_bytes());
370
371 let lo = 200u32.to_be_bytes();
372 let hi = 300u32.to_be_bytes();
373 assert_eq!(
374 zm.block_skip(&ZonePredicate::Range {
375 start: Some(&lo),
376 end: Some(&hi),
377 }),
378 ZoneDecision::Skip
379 );
380
381 let qlo = 40u32.to_be_bytes();
382 let qhi = 60u32.to_be_bytes();
383 assert_eq!(
384 zm.block_skip(&ZonePredicate::Range {
385 start: Some(&qlo),
386 end: Some(&qhi),
387 }),
388 ZoneDecision::MustRead
389 );
390 }
391
392 #[test]
393 fn block_skip_null_predicates() {
394 let mut empty_nulls = ZoneMap::with_capacity(64);
395 empty_nulls.observe(b"x");
396 assert_eq!(
397 empty_nulls.block_skip(&ZonePredicate::IsNull),
398 ZoneDecision::Skip
399 );
400 assert_eq!(
401 empty_nulls.block_skip(&ZonePredicate::IsNotNull),
402 ZoneDecision::MustRead
403 );
404
405 let mut all_nulls = ZoneMap::with_capacity(64);
406 all_nulls.observe_null();
407 all_nulls.observe_null();
408 assert_eq!(
409 all_nulls.block_skip(&ZonePredicate::IsNull),
410 ZoneDecision::MustRead
411 );
412 assert_eq!(
413 all_nulls.block_skip(&ZonePredicate::IsNotNull),
414 ZoneDecision::Skip
415 );
416 }
417
418 #[test]
419 fn empty_block_skips_everything() {
420 let zm = ZoneMap::with_capacity(64);
421 assert_eq!(
422 zm.block_skip(&ZonePredicate::Equals(b"whatever")),
423 ZoneDecision::Skip
424 );
425 assert_eq!(
426 zm.block_skip(&ZonePredicate::Range {
427 start: None,
428 end: None,
429 }),
430 ZoneDecision::Skip
431 );
432 }
433
434 #[test]
435 fn union_merges_bounds_and_counts() {
436 let mut a = ZoneMap::with_capacity(256);
437 a.observe(b"cherry");
438 a.observe(b"apple");
439 a.observe_null();
440
441 let mut b = ZoneMap::with_capacity(256);
442 b.observe(b"zebra");
443 b.observe(b"banana");
444
445 a.union(&b);
446 assert_eq!(a.min(), Some(b"apple".as_slice()));
447 assert_eq!(a.max(), Some(b"zebra".as_slice()));
448 assert_eq!(a.total_count(), 5);
449 assert_eq!(a.null_count(), 1);
450 assert!(a.distinct_estimate() >= 4);
451 }
452
453 #[test]
454 fn stats_match_observation_counts() {
455 let mut zm = ZoneMap::with_capacity(64);
456 for i in 0..50u32 {
457 zm.observe(&i.to_be_bytes());
458 }
459 let s = zm.stats();
460 assert_eq!(s.entries, 50);
461 assert_eq!(s.kind, IndexKind::ZoneMap);
462 assert!(s.has_bloom);
463 }
464
465 #[test]
466 fn clear_resets_all_state() {
467 let mut zm = ZoneMap::with_capacity(64);
468 zm.observe(b"x");
469 zm.observe_null();
470 zm.clear();
471 assert_eq!(zm.total_count(), 0);
472 assert_eq!(zm.null_count(), 0);
473 assert_eq!(zm.min(), None);
474 assert_eq!(zm.max(), None);
475 }
476}