1use std::ops::Bound;
29
30#[derive(Debug, Clone)]
34pub enum PrunePredicate {
35 And(Vec<PrunePredicate>),
36 Or(Vec<PrunePredicate>),
37 Compare {
39 column: String,
40 op: PruneOp,
41 value: PruneValue,
42 },
43 In {
45 column: String,
46 values: Vec<PruneValue>,
47 },
48 Opaque,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum PruneOp {
55 Eq,
56 Lt,
57 LtEq,
58 Gt,
59 GtEq,
60 NotEq,
61}
62
63#[derive(Debug, Clone, PartialEq)]
68pub enum PruneValue {
69 Int(i64),
70 Float(f64),
71 Text(String),
72 Bool(bool),
73}
74
75impl PartialOrd for PruneValue {
76 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
77 use PruneValue::*;
78 match (self, other) {
79 (Int(a), Int(b)) => a.partial_cmp(b),
80 (Float(a), Float(b)) => a.partial_cmp(b),
81 (Int(a), Float(b)) => (*a as f64).partial_cmp(b),
82 (Float(a), Int(b)) => a.partial_cmp(&(*b as f64)),
83 (Text(a), Text(b)) => a.partial_cmp(b),
84 (Bool(a), Bool(b)) => a.partial_cmp(b),
85 _ => None,
86 }
87 }
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum PruneKind {
94 Range,
95 List,
96 Hash,
97}
98
99#[derive(Debug, Clone)]
102pub struct PrunePartitioning {
103 pub kind: PruneKind,
104 pub column: String,
105}
106
107#[derive(Debug, Clone)]
109pub struct RangeChild {
110 pub name: String,
111 pub low: Option<PruneValue>,
113 pub high_exclusive: Option<PruneValue>,
114}
115
116#[derive(Debug, Clone)]
118pub struct ListChild {
119 pub name: String,
120 pub values: Vec<PruneValue>,
121}
122
123#[derive(Debug, Clone)]
126pub struct HashChild {
127 pub name: String,
128 pub modulus: u32,
129 pub remainder: u32,
130}
131
132pub fn prune_range(
135 partitioning: &PrunePartitioning,
136 children: &[RangeChild],
137 predicate: &PrunePredicate,
138) -> Vec<String> {
139 debug_assert!(matches!(partitioning.kind, PruneKind::Range));
140 let interval = derive_interval(predicate, &partitioning.column);
141 children
142 .iter()
143 .filter(|child| child_range_overlaps(child, &interval))
144 .map(|c| c.name.clone())
145 .collect()
146}
147
148pub fn prune_list(
149 partitioning: &PrunePartitioning,
150 children: &[ListChild],
151 predicate: &PrunePredicate,
152) -> Vec<String> {
153 debug_assert!(matches!(partitioning.kind, PruneKind::List));
154 let allowed = derive_allowed_set(predicate, &partitioning.column);
155 children
156 .iter()
157 .filter(|child| match &allowed {
158 AllowedSet::Any => true,
159 AllowedSet::Only(set) => child.values.iter().any(|v| set.iter().any(|a| a == v)),
160 })
161 .map(|c| c.name.clone())
162 .collect()
163}
164
165pub fn prune_hash(
166 partitioning: &PrunePartitioning,
167 children: &[HashChild],
168 predicate: &PrunePredicate,
169) -> Vec<String> {
170 debug_assert!(matches!(partitioning.kind, PruneKind::Hash));
171 let Some(value) = derive_single_eq(predicate, &partitioning.column) else {
172 return children.iter().map(|c| c.name.clone()).collect();
173 };
174 let hashed = hash_prune_value(&value);
175 children
176 .iter()
177 .filter(|child| {
178 child.modulus == 0 || (hashed % child.modulus as u64) == child.remainder as u64
179 })
180 .map(|c| c.name.clone())
181 .collect()
182}
183
184#[derive(Debug, Clone)]
191struct Interval {
192 low: Bound<PruneValue>,
193 high: Bound<PruneValue>,
194 _holes: Vec<PruneValue>,
197}
198
199impl Interval {
200 fn unbounded() -> Self {
201 Self {
202 low: Bound::Unbounded,
203 high: Bound::Unbounded,
204 _holes: Vec::new(),
205 }
206 }
207}
208
209fn derive_interval(predicate: &PrunePredicate, column: &str) -> Interval {
210 match predicate {
211 PrunePredicate::Compare {
212 column: c,
213 op,
214 value,
215 } if c == column => match op {
216 PruneOp::Eq => Interval {
217 low: Bound::Included(value.clone()),
218 high: Bound::Included(value.clone()),
219 _holes: Vec::new(),
220 },
221 PruneOp::Lt => Interval {
222 low: Bound::Unbounded,
223 high: Bound::Excluded(value.clone()),
224 _holes: Vec::new(),
225 },
226 PruneOp::LtEq => Interval {
227 low: Bound::Unbounded,
228 high: Bound::Included(value.clone()),
229 _holes: Vec::new(),
230 },
231 PruneOp::Gt => Interval {
232 low: Bound::Excluded(value.clone()),
233 high: Bound::Unbounded,
234 _holes: Vec::new(),
235 },
236 PruneOp::GtEq => Interval {
237 low: Bound::Included(value.clone()),
238 high: Bound::Unbounded,
239 _holes: Vec::new(),
240 },
241 PruneOp::NotEq => {
242 let mut i = Interval::unbounded();
243 i._holes.push(value.clone());
244 i
245 }
246 },
247 PrunePredicate::In { column: c, values } if c == column && !values.is_empty() => {
248 let mut min = None;
249 let mut max = None;
250 for v in values {
251 if min.as_ref().map(|m: &PruneValue| v < m).unwrap_or(true) {
252 min = Some(v.clone());
253 }
254 if max.as_ref().map(|m: &PruneValue| v > m).unwrap_or(true) {
255 max = Some(v.clone());
256 }
257 }
258 Interval {
259 low: min.map(Bound::Included).unwrap_or(Bound::Unbounded),
260 high: max.map(Bound::Included).unwrap_or(Bound::Unbounded),
261 _holes: Vec::new(),
262 }
263 }
264 PrunePredicate::And(children) => {
265 let mut acc = Interval::unbounded();
266 for child in children {
267 let i = derive_interval(child, column);
268 acc = intersect(&acc, &i);
269 }
270 acc
271 }
272 PrunePredicate::Or(children) => {
273 let mut min: Option<Bound<PruneValue>> = None;
276 let mut max: Option<Bound<PruneValue>> = None;
277 for child in children {
278 let i = derive_interval(child, column);
279 if matches!(i.low, Bound::Unbounded) {
280 min = Some(Bound::Unbounded);
281 } else if min != Some(Bound::Unbounded) {
282 min = Some(widest_low(min.clone(), i.low.clone()));
283 }
284 if matches!(i.high, Bound::Unbounded) {
285 max = Some(Bound::Unbounded);
286 } else if max != Some(Bound::Unbounded) {
287 max = Some(widest_high(max.clone(), i.high.clone()));
288 }
289 }
290 Interval {
291 low: min.unwrap_or(Bound::Unbounded),
292 high: max.unwrap_or(Bound::Unbounded),
293 _holes: Vec::new(),
294 }
295 }
296 _ => Interval::unbounded(),
297 }
298}
299
300fn widest_low(current: Option<Bound<PruneValue>>, next: Bound<PruneValue>) -> Bound<PruneValue> {
301 match (current, next) {
302 (None, b) => b,
303 (Some(Bound::Unbounded), _) | (_, Bound::Unbounded) => Bound::Unbounded,
304 (Some(a), b) => {
305 if bound_low_less_or_equal(&a, &b) {
306 a
307 } else {
308 b
309 }
310 }
311 }
312}
313
314fn widest_high(current: Option<Bound<PruneValue>>, next: Bound<PruneValue>) -> Bound<PruneValue> {
315 match (current, next) {
316 (None, b) => b,
317 (Some(Bound::Unbounded), _) | (_, Bound::Unbounded) => Bound::Unbounded,
318 (Some(a), b) => {
319 if bound_high_greater_or_equal(&a, &b) {
320 a
321 } else {
322 b
323 }
324 }
325 }
326}
327
328fn bound_low_less_or_equal(a: &Bound<PruneValue>, b: &Bound<PruneValue>) -> bool {
329 match (a, b) {
330 (Bound::Unbounded, _) => true,
331 (_, Bound::Unbounded) => false,
332 (Bound::Included(x), Bound::Included(y)) | (Bound::Excluded(x), Bound::Excluded(y)) => {
333 x.partial_cmp(y).map(|o| o.is_le()).unwrap_or(true)
334 }
335 (Bound::Included(x), Bound::Excluded(y)) => {
336 x.partial_cmp(y).map(|o| o.is_le()).unwrap_or(true)
337 }
338 (Bound::Excluded(x), Bound::Included(y)) => {
339 x.partial_cmp(y).map(|o| o.is_lt()).unwrap_or(true)
340 }
341 }
342}
343
344fn bound_high_greater_or_equal(a: &Bound<PruneValue>, b: &Bound<PruneValue>) -> bool {
345 match (a, b) {
346 (Bound::Unbounded, _) => true,
347 (_, Bound::Unbounded) => false,
348 (Bound::Included(x), Bound::Included(y)) | (Bound::Excluded(x), Bound::Excluded(y)) => {
349 x.partial_cmp(y).map(|o| o.is_ge()).unwrap_or(true)
350 }
351 (Bound::Included(x), Bound::Excluded(y)) => {
352 x.partial_cmp(y).map(|o| o.is_ge()).unwrap_or(true)
353 }
354 (Bound::Excluded(x), Bound::Included(y)) => {
355 x.partial_cmp(y).map(|o| o.is_gt()).unwrap_or(true)
356 }
357 }
358}
359
360fn intersect(a: &Interval, b: &Interval) -> Interval {
361 let low = tighter_low(&a.low, &b.low);
362 let high = tighter_high(&a.high, &b.high);
363 Interval {
364 low,
365 high,
366 _holes: Vec::new(),
367 }
368}
369
370fn tighter_low(a: &Bound<PruneValue>, b: &Bound<PruneValue>) -> Bound<PruneValue> {
371 match (a, b) {
372 (Bound::Unbounded, other) | (other, Bound::Unbounded) => other.clone(),
373 (x, y) => {
374 if bound_low_less_or_equal(x, y) {
375 y.clone()
376 } else {
377 x.clone()
378 }
379 }
380 }
381}
382
383fn tighter_high(a: &Bound<PruneValue>, b: &Bound<PruneValue>) -> Bound<PruneValue> {
384 match (a, b) {
385 (Bound::Unbounded, other) | (other, Bound::Unbounded) => other.clone(),
386 (x, y) => {
387 if bound_high_greater_or_equal(x, y) {
388 y.clone()
389 } else {
390 x.clone()
391 }
392 }
393 }
394}
395
396fn child_range_overlaps(child: &RangeChild, interval: &Interval) -> bool {
397 let child_low_ok = match &interval.high {
400 Bound::Unbounded => true,
401 Bound::Included(upper) => match &child.low {
402 None => true,
403 Some(cl) => cl.partial_cmp(upper).map(|o| o.is_le()).unwrap_or(true),
404 },
405 Bound::Excluded(upper) => match &child.low {
406 None => true,
407 Some(cl) => cl.partial_cmp(upper).map(|o| o.is_lt()).unwrap_or(true),
408 },
409 };
410 let child_high_ok = match &interval.low {
411 Bound::Unbounded => true,
412 Bound::Included(lower) => match &child.high_exclusive {
413 None => true,
414 Some(ch) => ch.partial_cmp(lower).map(|o| o.is_gt()).unwrap_or(true),
415 },
416 Bound::Excluded(lower) => match &child.high_exclusive {
417 None => true,
418 Some(ch) => ch.partial_cmp(lower).map(|o| o.is_gt()).unwrap_or(true),
419 },
420 };
421 child_low_ok && child_high_ok
422}
423
424enum AllowedSet {
429 Any,
430 Only(Vec<PruneValue>),
431}
432
433fn derive_allowed_set(predicate: &PrunePredicate, column: &str) -> AllowedSet {
434 match predicate {
435 PrunePredicate::Compare {
436 column: c,
437 op: PruneOp::Eq,
438 value,
439 } if c == column => AllowedSet::Only(vec![value.clone()]),
440 PrunePredicate::In { column: c, values } if c == column && !values.is_empty() => {
441 AllowedSet::Only(values.clone())
442 }
443 PrunePredicate::And(children) => {
444 let mut acc: Option<Vec<PruneValue>> = None;
446 for child in children {
447 match derive_allowed_set(child, column) {
448 AllowedSet::Any => {}
449 AllowedSet::Only(set) => {
450 acc = Some(match acc {
451 None => set,
452 Some(existing) => existing
453 .into_iter()
454 .filter(|v| set.iter().any(|w| w == v))
455 .collect(),
456 });
457 }
458 }
459 }
460 acc.map(AllowedSet::Only).unwrap_or(AllowedSet::Any)
461 }
462 PrunePredicate::Or(children) => {
463 let mut merged: Vec<PruneValue> = Vec::new();
465 for child in children {
466 match derive_allowed_set(child, column) {
467 AllowedSet::Any => return AllowedSet::Any,
468 AllowedSet::Only(set) => {
469 for v in set {
470 if !merged.iter().any(|m| m == &v) {
471 merged.push(v);
472 }
473 }
474 }
475 }
476 }
477 AllowedSet::Only(merged)
478 }
479 _ => AllowedSet::Any,
480 }
481}
482
483fn derive_single_eq(predicate: &PrunePredicate, column: &str) -> Option<PruneValue> {
488 match predicate {
489 PrunePredicate::Compare {
490 column: c,
491 op: PruneOp::Eq,
492 value,
493 } if c == column => Some(value.clone()),
494 PrunePredicate::And(children) => {
495 for child in children {
496 if let Some(v) = derive_single_eq(child, column) {
497 return Some(v);
498 }
499 }
500 None
501 }
502 _ => None,
503 }
504}
505
506fn hash_prune_value(v: &PruneValue) -> u64 {
507 const OFFSET: u64 = 0xcbf29ce484222325;
509 const PRIME: u64 = 0x100000001b3;
510 let mut h = OFFSET;
511 let bytes: Vec<u8> = match v {
512 PruneValue::Int(i) => i.to_le_bytes().to_vec(),
513 PruneValue::Float(f) => f.to_le_bytes().to_vec(),
514 PruneValue::Text(s) => s.as_bytes().to_vec(),
515 PruneValue::Bool(b) => vec![*b as u8],
516 };
517 for b in bytes {
518 h ^= b as u64;
519 h = h.wrapping_mul(PRIME);
520 }
521 h
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527
528 fn pint(v: i64) -> PruneValue {
529 PruneValue::Int(v)
530 }
531
532 fn range_child(name: &str, low: Option<i64>, high_exclusive: Option<i64>) -> RangeChild {
533 RangeChild {
534 name: name.to_string(),
535 low: low.map(pint),
536 high_exclusive: high_exclusive.map(pint),
537 }
538 }
539
540 fn spec_range() -> PrunePartitioning {
541 PrunePartitioning {
542 kind: PruneKind::Range,
543 column: "ts".to_string(),
544 }
545 }
546
547 fn spec_list() -> PrunePartitioning {
548 PrunePartitioning {
549 kind: PruneKind::List,
550 column: "region".to_string(),
551 }
552 }
553
554 fn spec_hash() -> PrunePartitioning {
555 PrunePartitioning {
556 kind: PruneKind::Hash,
557 column: "user_id".to_string(),
558 }
559 }
560
561 #[test]
562 fn range_keeps_only_chunks_overlapping_equality() {
563 let children = vec![
564 range_child("c0", Some(0), Some(100)),
565 range_child("c1", Some(100), Some(200)),
566 range_child("c2", Some(200), Some(300)),
567 ];
568 let pred = PrunePredicate::Compare {
569 column: "ts".into(),
570 op: PruneOp::Eq,
571 value: pint(150),
572 };
573 assert_eq!(prune_range(&spec_range(), &children, &pred), vec!["c1"]);
574 }
575
576 #[test]
577 fn range_handles_greater_than_predicate() {
578 let children = vec![
579 range_child("c0", Some(0), Some(100)),
580 range_child("c1", Some(100), Some(200)),
581 range_child("c2", Some(200), Some(300)),
582 ];
583 let pred = PrunePredicate::Compare {
584 column: "ts".into(),
585 op: PruneOp::GtEq,
586 value: pint(150),
587 };
588 let kept = prune_range(&spec_range(), &children, &pred);
589 assert_eq!(kept, vec!["c1", "c2"]);
590 }
591
592 #[test]
593 fn range_handles_between_via_and() {
594 let children = vec![
595 range_child("c0", Some(0), Some(100)),
596 range_child("c1", Some(100), Some(200)),
597 range_child("c2", Some(200), Some(300)),
598 range_child("c3", Some(300), Some(400)),
599 ];
600 let pred = PrunePredicate::And(vec![
601 PrunePredicate::Compare {
602 column: "ts".into(),
603 op: PruneOp::GtEq,
604 value: pint(150),
605 },
606 PrunePredicate::Compare {
607 column: "ts".into(),
608 op: PruneOp::Lt,
609 value: pint(250),
610 },
611 ]);
612 let kept = prune_range(&spec_range(), &children, &pred);
613 assert_eq!(kept, vec!["c1", "c2"]);
614 }
615
616 #[test]
617 fn range_conservative_when_predicate_on_different_column() {
618 let children = vec![range_child("c0", Some(0), Some(100))];
619 let pred = PrunePredicate::Compare {
620 column: "other".into(),
621 op: PruneOp::Eq,
622 value: pint(42),
623 };
624 assert_eq!(prune_range(&spec_range(), &children, &pred), vec!["c0"]);
625 }
626
627 #[test]
628 fn range_handles_unbounded_children() {
629 let children = vec![
630 range_child("past", None, Some(0)),
631 range_child("mid", Some(0), Some(100)),
632 range_child("future", Some(100), None),
633 ];
634 let pred = PrunePredicate::Compare {
635 column: "ts".into(),
636 op: PruneOp::GtEq,
637 value: pint(50),
638 };
639 let kept = prune_range(&spec_range(), &children, &pred);
640 assert_eq!(kept, vec!["mid", "future"]);
641 }
642
643 #[test]
644 fn range_in_clause_bounds() {
645 let children = vec![
646 range_child("c0", Some(0), Some(100)),
647 range_child("c1", Some(100), Some(200)),
648 range_child("c2", Some(200), Some(300)),
649 ];
650 let pred = PrunePredicate::In {
651 column: "ts".into(),
652 values: vec![pint(50), pint(250)],
653 };
654 let kept = prune_range(&spec_range(), &children, &pred);
655 assert_eq!(kept, vec!["c0", "c1", "c2"]);
657 }
658
659 #[test]
660 fn list_prunes_non_matching_value_sets() {
661 let children = vec![
662 ListChild {
663 name: "us".to_string(),
664 values: vec![PruneValue::Text("us-east".into())],
665 },
666 ListChild {
667 name: "eu".to_string(),
668 values: vec![PruneValue::Text("eu-west".into())],
669 },
670 ListChild {
671 name: "apac".to_string(),
672 values: vec![PruneValue::Text("ap-south".into())],
673 },
674 ];
675 let pred = PrunePredicate::Compare {
676 column: "region".into(),
677 op: PruneOp::Eq,
678 value: PruneValue::Text("eu-west".into()),
679 };
680 assert_eq!(prune_list(&spec_list(), &children, &pred), vec!["eu"]);
681 }
682
683 #[test]
684 fn list_handles_in_clause() {
685 let children = vec![
686 ListChild {
687 name: "a".to_string(),
688 values: vec![PruneValue::Text("a".into())],
689 },
690 ListChild {
691 name: "b".to_string(),
692 values: vec![PruneValue::Text("b".into())],
693 },
694 ListChild {
695 name: "c".to_string(),
696 values: vec![PruneValue::Text("c".into())],
697 },
698 ];
699 let pred = PrunePredicate::In {
700 column: "region".into(),
701 values: vec![PruneValue::Text("a".into()), PruneValue::Text("c".into())],
702 };
703 let kept = prune_list(&spec_list(), &children, &pred);
704 assert_eq!(kept, vec!["a", "c"]);
705 }
706
707 #[test]
708 fn list_keeps_all_when_predicate_is_opaque() {
709 let children = vec![ListChild {
710 name: "x".to_string(),
711 values: vec![PruneValue::Text("x".into())],
712 }];
713 let pred = PrunePredicate::Opaque;
714 assert_eq!(prune_list(&spec_list(), &children, &pred), vec!["x"]);
715 }
716
717 #[test]
718 fn hash_routes_to_single_partition_on_equality() {
719 let children = vec![
720 HashChild {
721 name: "h0".to_string(),
722 modulus: 4,
723 remainder: 0,
724 },
725 HashChild {
726 name: "h1".to_string(),
727 modulus: 4,
728 remainder: 1,
729 },
730 HashChild {
731 name: "h2".to_string(),
732 modulus: 4,
733 remainder: 2,
734 },
735 HashChild {
736 name: "h3".to_string(),
737 modulus: 4,
738 remainder: 3,
739 },
740 ];
741 let pred = PrunePredicate::Compare {
742 column: "user_id".into(),
743 op: PruneOp::Eq,
744 value: pint(42),
745 };
746 let kept = prune_hash(&spec_hash(), &children, &pred);
747 assert_eq!(
748 kept.len(),
749 1,
750 "hash eq must land on one partition: {kept:?}"
751 );
752 }
753
754 #[test]
755 fn hash_keeps_all_without_equality() {
756 let children = vec![
757 HashChild {
758 name: "h0".to_string(),
759 modulus: 2,
760 remainder: 0,
761 },
762 HashChild {
763 name: "h1".to_string(),
764 modulus: 2,
765 remainder: 1,
766 },
767 ];
768 let pred = PrunePredicate::Compare {
769 column: "user_id".into(),
770 op: PruneOp::Gt,
771 value: pint(0),
772 };
773 let kept = prune_hash(&spec_hash(), &children, &pred);
774 assert_eq!(kept.len(), 2);
775 }
776
777 #[test]
778 fn and_tightens_across_child_predicates() {
779 let children = vec![
780 range_child("c0", Some(0), Some(50)),
781 range_child("c1", Some(50), Some(100)),
782 range_child("c2", Some(100), Some(150)),
783 ];
784 let pred = PrunePredicate::And(vec![
785 PrunePredicate::Compare {
786 column: "ts".into(),
787 op: PruneOp::GtEq,
788 value: pint(60),
789 },
790 PrunePredicate::Compare {
791 column: "ts".into(),
792 op: PruneOp::Lt,
793 value: pint(90),
794 },
795 ]);
796 assert_eq!(prune_range(&spec_range(), &children, &pred), vec!["c1"]);
797 }
798
799 #[test]
800 fn or_widens_across_child_predicates() {
801 let children = vec![
802 range_child("c0", Some(0), Some(100)),
803 range_child("c1", Some(100), Some(200)),
804 range_child("c2", Some(200), Some(300)),
805 ];
806 let pred = PrunePredicate::Or(vec![
807 PrunePredicate::Compare {
808 column: "ts".into(),
809 op: PruneOp::Eq,
810 value: pint(50),
811 },
812 PrunePredicate::Compare {
813 column: "ts".into(),
814 op: PruneOp::Eq,
815 value: pint(250),
816 },
817 ]);
818 let kept = prune_range(&spec_range(), &children, &pred);
819 assert_eq!(kept, vec!["c0", "c1", "c2"]);
821 }
822}