1use arrow::array::{Array, NullArray, UInt64Array};
19use arrow::array::{ArrayRef, BooleanArray};
20use arrow::datatypes::{FieldRef, Schema, SchemaRef};
21use std::collections::HashSet;
22use std::sync::Arc;
23
24use crate::error::DataFusionError;
25use crate::stats::Precision;
26use crate::{Column, Statistics};
27use crate::{ColumnStatistics, ScalarValue};
28
29pub trait PruningStatistics {
64 fn min_values(&self, column: &Column) -> Option<ArrayRef>;
72
73 fn max_values(&self, column: &Column) -> Option<ArrayRef>;
79
80 fn num_containers(&self) -> usize;
87
88 fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
97
98 fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
107
108 fn contained(
125 &self,
126 column: &Column,
127 values: &HashSet<ScalarValue>,
128 ) -> Option<BooleanArray>;
129}
130
131#[deprecated(
139 since = "52.0.0",
140 note = "This struct is no longer used internally. Use `replace_columns_with_literals` from `datafusion-physical-expr-adapter` to substitute partition column values before pruning. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
141)]
142#[derive(Clone)]
143pub struct PartitionPruningStatistics {
144 partition_values: Vec<ArrayRef>,
150 num_containers: usize,
155 partition_schema: SchemaRef,
161}
162
163#[expect(deprecated)]
164impl PartitionPruningStatistics {
165 pub fn try_new(
208 partition_values: Vec<Vec<ScalarValue>>,
209 partition_fields: Vec<FieldRef>,
210 ) -> Result<Self, DataFusionError> {
211 let num_containers = partition_values.len();
212 let partition_schema = Arc::new(Schema::new(partition_fields));
213 let mut partition_values_by_column =
214 vec![
215 Vec::with_capacity(partition_values.len());
216 partition_schema.fields().len()
217 ];
218 for partition_value in partition_values {
219 for (i, value) in partition_value.into_iter().enumerate() {
220 partition_values_by_column[i].push(value);
221 }
222 }
223 Ok(Self {
224 partition_values: partition_values_by_column
225 .into_iter()
226 .map(|v| {
227 if v.is_empty() {
228 Ok(Arc::new(NullArray::new(0)) as ArrayRef)
229 } else {
230 ScalarValue::iter_to_array(v)
231 }
232 })
233 .collect::<Result<Vec<_>, _>>()?,
234 num_containers,
235 partition_schema,
236 })
237 }
238}
239
240#[expect(deprecated)]
241impl PruningStatistics for PartitionPruningStatistics {
242 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
243 let index = self.partition_schema.index_of(column.name()).ok()?;
244 self.partition_values.get(index).and_then(|v| {
245 if v.is_empty() || v.null_count() == v.len() {
246 None
248 } else {
249 Some(Arc::clone(v))
251 }
252 })
253 }
254
255 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
256 self.min_values(column)
257 }
258
259 fn num_containers(&self) -> usize {
260 self.num_containers
261 }
262
263 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
264 None
265 }
266
267 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
268 None
269 }
270
271 fn contained(
272 &self,
273 column: &Column,
274 values: &HashSet<ScalarValue>,
275 ) -> Option<BooleanArray> {
276 let index = self.partition_schema.index_of(column.name()).ok()?;
277 let array = self.partition_values.get(index)?;
278 let boolean_array = values.iter().try_fold(None, |acc, v| {
279 let arrow_value = v.to_scalar().ok()?;
280 let eq_result = arrow::compute::kernels::cmp::eq(array, &arrow_value).ok()?;
281 match acc {
282 None => Some(Some(eq_result)),
283 Some(acc_array) => {
284 arrow::compute::kernels::boolean::or_kleene(&acc_array, &eq_result)
285 .map(Some)
286 .ok()
287 }
288 }
289 })??;
290 if boolean_array.is_empty() || boolean_array.null_count() == boolean_array.len() {
292 None
293 } else {
294 Some(boolean_array)
295 }
296 }
297}
298
299#[derive(Clone)]
311pub struct PrunableStatistics {
312 statistics: Vec<Arc<Statistics>>,
316 schema: SchemaRef,
318}
319
320impl PrunableStatistics {
321 pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
325 Self { statistics, schema }
326 }
327
328 fn get_exact_column_statistics(
329 &self,
330 column: &Column,
331 get_stat: impl Fn(&ColumnStatistics) -> &Precision<ScalarValue>,
332 ) -> Option<ArrayRef> {
333 let index = self.schema.index_of(column.name()).ok()?;
334 let mut has_value = false;
335 match ScalarValue::iter_to_array(self.statistics.iter().map(|s| {
336 s.column_statistics
337 .get(index)
338 .and_then(|stat| {
339 if let Precision::Exact(min) = get_stat(stat) {
340 has_value = true;
341 Some(min.clone())
342 } else {
343 None
344 }
345 })
346 .unwrap_or(ScalarValue::Null)
347 })) {
348 Ok(array) => has_value.then_some(array),
350 Err(_) => {
351 log::warn!(
352 "Failed to convert min values to array for column {}",
353 column.name()
354 );
355 None
356 }
357 }
358 }
359}
360
361impl PruningStatistics for PrunableStatistics {
362 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
363 self.get_exact_column_statistics(column, |stat| &stat.min_value)
364 }
365
366 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
367 self.get_exact_column_statistics(column, |stat| &stat.max_value)
368 }
369
370 fn num_containers(&self) -> usize {
371 self.statistics.len()
372 }
373
374 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
375 let index = self.schema.index_of(column.name()).ok()?;
376 if self.statistics.iter().any(|s| {
377 s.column_statistics
378 .get(index)
379 .is_some_and(|stat| stat.null_count.is_exact().unwrap_or(false))
380 }) {
381 Some(Arc::new(
382 self.statistics
383 .iter()
384 .map(|s| {
385 s.column_statistics.get(index).and_then(|stat| {
386 if let Precision::Exact(null_count) = &stat.null_count {
387 u64::try_from(*null_count).ok()
388 } else {
389 None
390 }
391 })
392 })
393 .collect::<UInt64Array>(),
394 ))
395 } else {
396 None
397 }
398 }
399
400 fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
401 if self.schema.index_of(column.name()).is_err() {
403 return None;
404 }
405 if self
406 .statistics
407 .iter()
408 .any(|s| s.num_rows.is_exact().unwrap_or(false))
409 {
410 Some(Arc::new(
411 self.statistics
412 .iter()
413 .map(|s| {
414 if let Precision::Exact(row_count) = &s.num_rows {
415 u64::try_from(*row_count).ok()
416 } else {
417 None
418 }
419 })
420 .collect::<UInt64Array>(),
421 ))
422 } else {
423 None
424 }
425 }
426
427 fn contained(
428 &self,
429 _column: &Column,
430 _values: &HashSet<ScalarValue>,
431 ) -> Option<BooleanArray> {
432 None
433 }
434}
435
436#[deprecated(
449 since = "52.0.0",
450 note = "This struct is no longer used internally. It may be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first. Please open an issue if you have a use case for it."
451)]
452pub struct CompositePruningStatistics {
453 pub statistics: Vec<Box<dyn PruningStatistics>>,
454}
455
456#[expect(deprecated)]
457impl CompositePruningStatistics {
458 pub fn new(statistics: Vec<Box<dyn PruningStatistics>>) -> Self {
461 assert!(!statistics.is_empty());
462 let num_containers = statistics[0].num_containers();
464 for stats in &statistics {
465 assert_eq!(num_containers, stats.num_containers());
466 }
467 Self { statistics }
468 }
469}
470
471#[expect(deprecated)]
472impl PruningStatistics for CompositePruningStatistics {
473 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
474 for stats in &self.statistics {
475 if let Some(array) = stats.min_values(column) {
476 return Some(array);
477 }
478 }
479 None
480 }
481
482 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
483 for stats in &self.statistics {
484 if let Some(array) = stats.max_values(column) {
485 return Some(array);
486 }
487 }
488 None
489 }
490
491 fn num_containers(&self) -> usize {
492 self.statistics[0].num_containers()
493 }
494
495 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
496 for stats in &self.statistics {
497 if let Some(array) = stats.null_counts(column) {
498 return Some(array);
499 }
500 }
501 None
502 }
503
504 fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
505 for stats in &self.statistics {
506 if let Some(array) = stats.row_counts(column) {
507 return Some(array);
508 }
509 }
510 None
511 }
512
513 fn contained(
514 &self,
515 column: &Column,
516 values: &HashSet<ScalarValue>,
517 ) -> Option<BooleanArray> {
518 for stats in &self.statistics {
519 if let Some(array) = stats.contained(column, values) {
520 return Some(array);
521 }
522 }
523 None
524 }
525}
526
527#[cfg(test)]
528#[expect(deprecated)]
529mod tests {
530 use crate::{
531 ColumnStatistics,
532 cast::{as_int32_array, as_uint64_array},
533 };
534
535 use super::*;
536 use arrow::datatypes::{DataType, Field};
537 use std::sync::Arc;
538
539 fn partition_pruning_statistics_setup() -> PartitionPruningStatistics {
547 let partition_values = vec![
548 vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
549 vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
550 ];
551 let partition_fields = vec![
552 Arc::new(Field::new("a", DataType::Int32, false)),
553 Arc::new(Field::new("b", DataType::Int32, false)),
554 ];
555 PartitionPruningStatistics::try_new(partition_values, partition_fields).unwrap()
556 }
557
558 #[test]
559 fn test_partition_pruning_statistics() {
560 let partition_stats = partition_pruning_statistics_setup();
561
562 let column_a = Column::new_unqualified("a");
563 let column_b = Column::new_unqualified("b");
564
565 assert!(partition_stats.null_counts(&column_a).is_none());
567 assert!(partition_stats.row_counts(&column_a).is_none());
568 assert!(partition_stats.null_counts(&column_b).is_none());
569 assert!(partition_stats.row_counts(&column_b).is_none());
570
571 let min_values_a =
573 as_int32_array(&partition_stats.min_values(&column_a).unwrap())
574 .unwrap()
575 .into_iter()
576 .collect::<Vec<_>>();
577 let expected_values_a = vec![Some(1), Some(3)];
578 assert_eq!(min_values_a, expected_values_a);
579 let max_values_a =
580 as_int32_array(&partition_stats.max_values(&column_a).unwrap())
581 .unwrap()
582 .into_iter()
583 .collect::<Vec<_>>();
584 let expected_values_a = vec![Some(1), Some(3)];
585 assert_eq!(max_values_a, expected_values_a);
586
587 let min_values_b =
588 as_int32_array(&partition_stats.min_values(&column_b).unwrap())
589 .unwrap()
590 .into_iter()
591 .collect::<Vec<_>>();
592 let expected_values_b = vec![Some(2), Some(4)];
593 assert_eq!(min_values_b, expected_values_b);
594 let max_values_b =
595 as_int32_array(&partition_stats.max_values(&column_b).unwrap())
596 .unwrap()
597 .into_iter()
598 .collect::<Vec<_>>();
599 let expected_values_b = vec![Some(2), Some(4)];
600 assert_eq!(max_values_b, expected_values_b);
601
602 let values = HashSet::from([ScalarValue::from(1i32)]);
604 let contained_a = partition_stats.contained(&column_a, &values).unwrap();
605 let expected_contained_a = BooleanArray::from(vec![true, false]);
606 assert_eq!(contained_a, expected_contained_a);
607 let contained_b = partition_stats.contained(&column_b, &values).unwrap();
608 let expected_contained_b = BooleanArray::from(vec![false, false]);
609 assert_eq!(contained_b, expected_contained_b);
610
611 assert_eq!(partition_stats.num_containers(), 2);
613 }
614
615 #[test]
616 fn test_partition_pruning_statistics_multiple_positive_values() {
617 let partition_stats = partition_pruning_statistics_setup();
618
619 let column_a = Column::new_unqualified("a");
620
621 let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(3i32)]);
623 let contained_a = partition_stats.contained(&column_a, &values).unwrap();
624 let expected_contained_a = BooleanArray::from(vec![true, true]);
625 assert_eq!(contained_a, expected_contained_a);
626 }
627
628 #[test]
629 fn test_partition_pruning_statistics_multiple_negative_values() {
630 let partition_stats = partition_pruning_statistics_setup();
631
632 let column_a = Column::new_unqualified("a");
633
634 let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(2i32)]);
638 let contained_a = partition_stats.contained(&column_a, &values).unwrap();
639 let expected_contained_a = BooleanArray::from(vec![true, false]);
640 assert_eq!(contained_a, expected_contained_a);
641 }
642
643 #[test]
644 fn test_partition_pruning_statistics_null_in_values() {
645 let partition_values = vec![
646 vec![
647 ScalarValue::from(1i32),
648 ScalarValue::from(2i32),
649 ScalarValue::from(3i32),
650 ],
651 vec![
652 ScalarValue::from(4i32),
653 ScalarValue::from(5i32),
654 ScalarValue::from(6i32),
655 ],
656 ];
657 let partition_fields = vec![
658 Arc::new(Field::new("a", DataType::Int32, false)),
659 Arc::new(Field::new("b", DataType::Int32, false)),
660 Arc::new(Field::new("c", DataType::Int32, false)),
661 ];
662 let partition_stats =
663 PartitionPruningStatistics::try_new(partition_values, partition_fields)
664 .unwrap();
665
666 let column_a = Column::new_unqualified("a");
667 let column_b = Column::new_unqualified("b");
668 let column_c = Column::new_unqualified("c");
669
670 let values_a = HashSet::from([ScalarValue::from(1i32), ScalarValue::Int32(None)]);
671 let contained_a = partition_stats.contained(&column_a, &values_a).unwrap();
672 let mut builder = BooleanArray::builder(2);
673 builder.append_value(true);
674 builder.append_null();
675 let expected_contained_a = builder.finish();
676 assert_eq!(contained_a, expected_contained_a);
677
678 let values_b = HashSet::from([ScalarValue::Int32(None), ScalarValue::from(5i32)]);
681 let contained_b = partition_stats.contained(&column_b, &values_b).unwrap();
682 let mut builder = BooleanArray::builder(2);
683 builder.append_null();
684 builder.append_value(true);
685 let expected_contained_b = builder.finish();
686 assert_eq!(contained_b, expected_contained_b);
687
688 let values_c = HashSet::from([ScalarValue::Int32(None)]);
690 let contained_c = partition_stats.contained(&column_c, &values_c);
691 assert!(contained_c.is_none());
692 }
693
694 #[test]
695 fn test_partition_pruning_statistics_empty() {
696 let partition_values = vec![];
697 let partition_fields = vec![
698 Arc::new(Field::new("a", DataType::Int32, false)),
699 Arc::new(Field::new("b", DataType::Int32, false)),
700 ];
701 let partition_stats =
702 PartitionPruningStatistics::try_new(partition_values, partition_fields)
703 .unwrap();
704
705 let column_a = Column::new_unqualified("a");
706 let column_b = Column::new_unqualified("b");
707
708 assert!(partition_stats.null_counts(&column_a).is_none());
710 assert!(partition_stats.row_counts(&column_a).is_none());
711 assert!(partition_stats.null_counts(&column_b).is_none());
712 assert!(partition_stats.row_counts(&column_b).is_none());
713
714 assert!(partition_stats.min_values(&column_a).is_none());
716 assert!(partition_stats.max_values(&column_a).is_none());
717 assert!(partition_stats.min_values(&column_b).is_none());
718 assert!(partition_stats.max_values(&column_b).is_none());
719
720 let values = HashSet::from([ScalarValue::from(1i32)]);
722 assert!(partition_stats.contained(&column_a, &values).is_none());
723 }
724
725 #[test]
726 fn test_statistics_pruning_statistics() {
727 let statistics = vec![
728 Arc::new(
729 Statistics::default()
730 .add_column_statistics(
731 ColumnStatistics::new_unknown()
732 .with_min_value(Precision::Exact(ScalarValue::from(0i32)))
733 .with_max_value(Precision::Exact(ScalarValue::from(100i32)))
734 .with_null_count(Precision::Exact(0)),
735 )
736 .add_column_statistics(
737 ColumnStatistics::new_unknown()
738 .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
739 .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
740 .with_null_count(Precision::Exact(5)),
741 )
742 .with_num_rows(Precision::Exact(100)),
743 ),
744 Arc::new(
745 Statistics::default()
746 .add_column_statistics(
747 ColumnStatistics::new_unknown()
748 .with_min_value(Precision::Exact(ScalarValue::from(50i32)))
749 .with_max_value(Precision::Exact(ScalarValue::from(300i32)))
750 .with_null_count(Precision::Exact(10)),
751 )
752 .add_column_statistics(
753 ColumnStatistics::new_unknown()
754 .with_min_value(Precision::Exact(ScalarValue::from(200i32)))
755 .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
756 .with_null_count(Precision::Exact(0)),
757 )
758 .with_num_rows(Precision::Exact(200)),
759 ),
760 ];
761
762 let schema = Arc::new(Schema::new(vec![
763 Field::new("a", DataType::Int32, false),
764 Field::new("b", DataType::Int32, false),
765 Field::new("c", DataType::Int32, false),
766 ]));
767 let pruning_stats = PrunableStatistics::new(statistics, schema);
768
769 let column_a = Column::new_unqualified("a");
770 let column_b = Column::new_unqualified("b");
771
772 let min_values_a = as_int32_array(&pruning_stats.min_values(&column_a).unwrap())
774 .unwrap()
775 .into_iter()
776 .collect::<Vec<_>>();
777 let expected_values_a = vec![Some(0), Some(50)];
778 assert_eq!(min_values_a, expected_values_a);
779 let max_values_a = as_int32_array(&pruning_stats.max_values(&column_a).unwrap())
780 .unwrap()
781 .into_iter()
782 .collect::<Vec<_>>();
783 let expected_values_a = vec![Some(100), Some(300)];
784 assert_eq!(max_values_a, expected_values_a);
785 let min_values_b = as_int32_array(&pruning_stats.min_values(&column_b).unwrap())
786 .unwrap()
787 .into_iter()
788 .collect::<Vec<_>>();
789 let expected_values_b = vec![Some(100), Some(200)];
790 assert_eq!(min_values_b, expected_values_b);
791 let max_values_b = as_int32_array(&pruning_stats.max_values(&column_b).unwrap())
792 .unwrap()
793 .into_iter()
794 .collect::<Vec<_>>();
795 let expected_values_b = vec![Some(200), Some(400)];
796 assert_eq!(max_values_b, expected_values_b);
797
798 let null_counts_a =
800 as_uint64_array(&pruning_stats.null_counts(&column_a).unwrap())
801 .unwrap()
802 .into_iter()
803 .collect::<Vec<_>>();
804 let expected_null_counts_a = vec![Some(0), Some(10)];
805 assert_eq!(null_counts_a, expected_null_counts_a);
806 let null_counts_b =
807 as_uint64_array(&pruning_stats.null_counts(&column_b).unwrap())
808 .unwrap()
809 .into_iter()
810 .collect::<Vec<_>>();
811 let expected_null_counts_b = vec![Some(5), Some(0)];
812 assert_eq!(null_counts_b, expected_null_counts_b);
813
814 let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap())
816 .unwrap()
817 .into_iter()
818 .collect::<Vec<_>>();
819 let expected_row_counts_a = vec![Some(100), Some(200)];
820 assert_eq!(row_counts_a, expected_row_counts_a);
821 let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap())
822 .unwrap()
823 .into_iter()
824 .collect::<Vec<_>>();
825 let expected_row_counts_b = vec![Some(100), Some(200)];
826 assert_eq!(row_counts_b, expected_row_counts_b);
827
828 let values = HashSet::from([ScalarValue::from(0i32)]);
830 assert!(pruning_stats.contained(&column_a, &values).is_none());
831 assert!(pruning_stats.contained(&column_b, &values).is_none());
832
833 assert_eq!(pruning_stats.num_containers(), 2);
835
836 let column_c = Column::new_unqualified("c");
838 assert!(pruning_stats.min_values(&column_c).is_none());
839 assert!(pruning_stats.max_values(&column_c).is_none());
840 assert!(pruning_stats.null_counts(&column_c).is_none());
841 let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap())
847 .unwrap()
848 .into_iter()
849 .collect::<Vec<_>>();
850 let expected_row_counts_c = vec![Some(100), Some(200)];
851 assert_eq!(row_counts_c, expected_row_counts_c);
852 assert!(pruning_stats.contained(&column_c, &values).is_none());
853
854 let column_d = Column::new_unqualified("d");
856 assert!(pruning_stats.min_values(&column_d).is_none());
857 assert!(pruning_stats.max_values(&column_d).is_none());
858 assert!(pruning_stats.null_counts(&column_d).is_none());
859 assert!(pruning_stats.row_counts(&column_d).is_none());
860 assert!(pruning_stats.contained(&column_d, &values).is_none());
861 }
862
863 #[test]
864 fn test_statistics_pruning_statistics_empty() {
865 let statistics = vec![];
866 let schema = Arc::new(Schema::new(vec![
867 Field::new("a", DataType::Int32, false),
868 Field::new("b", DataType::Int32, false),
869 Field::new("c", DataType::Int32, false),
870 ]));
871 let pruning_stats = PrunableStatistics::new(statistics, schema);
872
873 let column_a = Column::new_unqualified("a");
874 let column_b = Column::new_unqualified("b");
875
876 assert!(pruning_stats.min_values(&column_a).is_none());
878 assert!(pruning_stats.max_values(&column_a).is_none());
879 assert!(pruning_stats.min_values(&column_b).is_none());
880 assert!(pruning_stats.max_values(&column_b).is_none());
881
882 assert!(pruning_stats.null_counts(&column_a).is_none());
884 assert!(pruning_stats.null_counts(&column_b).is_none());
885
886 assert!(pruning_stats.row_counts(&column_a).is_none());
888 assert!(pruning_stats.row_counts(&column_b).is_none());
889
890 let values = HashSet::from([ScalarValue::from(1i32)]);
892 assert!(pruning_stats.contained(&column_a, &values).is_none());
893 }
894
895 #[test]
896 fn test_composite_pruning_statistics_partition_and_file() {
897 let partition_values = vec![
899 vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
900 vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
901 ];
902 let partition_fields = vec![
903 Arc::new(Field::new("part_a", DataType::Int32, false)),
904 Arc::new(Field::new("part_b", DataType::Int32, false)),
905 ];
906 let partition_stats =
907 PartitionPruningStatistics::try_new(partition_values, partition_fields)
908 .unwrap();
909
910 let file_statistics = vec![
912 Arc::new(
913 Statistics::default()
914 .add_column_statistics(
915 ColumnStatistics::new_unknown()
916 .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
917 .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
918 .with_null_count(Precision::Exact(0)),
919 )
920 .add_column_statistics(
921 ColumnStatistics::new_unknown()
922 .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
923 .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
924 .with_null_count(Precision::Exact(5)),
925 )
926 .with_num_rows(Precision::Exact(100)),
927 ),
928 Arc::new(
929 Statistics::default()
930 .add_column_statistics(
931 ColumnStatistics::new_unknown()
932 .with_min_value(Precision::Exact(ScalarValue::from(500i32)))
933 .with_max_value(Precision::Exact(ScalarValue::from(600i32)))
934 .with_null_count(Precision::Exact(10)),
935 )
936 .add_column_statistics(
937 ColumnStatistics::new_unknown()
938 .with_min_value(Precision::Exact(ScalarValue::from(700i32)))
939 .with_max_value(Precision::Exact(ScalarValue::from(800i32)))
940 .with_null_count(Precision::Exact(0)),
941 )
942 .with_num_rows(Precision::Exact(200)),
943 ),
944 ];
945
946 let file_schema = Arc::new(Schema::new(vec![
947 Field::new("col_x", DataType::Int32, false),
948 Field::new("col_y", DataType::Int32, false),
949 ]));
950 let file_stats = PrunableStatistics::new(file_statistics, file_schema);
951
952 let composite_stats = CompositePruningStatistics::new(vec![
954 Box::new(partition_stats),
955 Box::new(file_stats),
956 ]);
957
958 let part_a = Column::new_unqualified("part_a");
960 let part_b = Column::new_unqualified("part_b");
961
962 let col_x = Column::new_unqualified("col_x");
964 let col_y = Column::new_unqualified("col_y");
965
966 let min_values_part_a =
968 as_int32_array(&composite_stats.min_values(&part_a).unwrap())
969 .unwrap()
970 .into_iter()
971 .collect::<Vec<_>>();
972 let expected_values_part_a = vec![Some(1), Some(2)];
973 assert_eq!(min_values_part_a, expected_values_part_a);
974
975 let max_values_part_a =
976 as_int32_array(&composite_stats.max_values(&part_a).unwrap())
977 .unwrap()
978 .into_iter()
979 .collect::<Vec<_>>();
980 assert_eq!(max_values_part_a, expected_values_part_a);
982
983 let min_values_part_b =
984 as_int32_array(&composite_stats.min_values(&part_b).unwrap())
985 .unwrap()
986 .into_iter()
987 .collect::<Vec<_>>();
988 let expected_values_part_b = vec![Some(10), Some(20)];
989 assert_eq!(min_values_part_b, expected_values_part_b);
990
991 let min_values_col_x =
993 as_int32_array(&composite_stats.min_values(&col_x).unwrap())
994 .unwrap()
995 .into_iter()
996 .collect::<Vec<_>>();
997 let expected_values_col_x = vec![Some(100), Some(500)];
998 assert_eq!(min_values_col_x, expected_values_col_x);
999
1000 let max_values_col_x =
1001 as_int32_array(&composite_stats.max_values(&col_x).unwrap())
1002 .unwrap()
1003 .into_iter()
1004 .collect::<Vec<_>>();
1005 let expected_max_values_col_x = vec![Some(200), Some(600)];
1006 assert_eq!(max_values_col_x, expected_max_values_col_x);
1007
1008 let min_values_col_y =
1009 as_int32_array(&composite_stats.min_values(&col_y).unwrap())
1010 .unwrap()
1011 .into_iter()
1012 .collect::<Vec<_>>();
1013 let expected_values_col_y = vec![Some(300), Some(700)];
1014 assert_eq!(min_values_col_y, expected_values_col_y);
1015
1016 assert!(composite_stats.null_counts(&part_a).is_none());
1018 assert!(composite_stats.null_counts(&part_b).is_none());
1019
1020 let null_counts_col_x =
1021 as_uint64_array(&composite_stats.null_counts(&col_x).unwrap())
1022 .unwrap()
1023 .into_iter()
1024 .collect::<Vec<_>>();
1025 let expected_null_counts_col_x = vec![Some(0), Some(10)];
1026 assert_eq!(null_counts_col_x, expected_null_counts_col_x);
1027
1028 assert!(composite_stats.row_counts(&part_a).is_none());
1030 let row_counts_col_x =
1031 as_uint64_array(&composite_stats.row_counts(&col_x).unwrap())
1032 .unwrap()
1033 .into_iter()
1034 .collect::<Vec<_>>();
1035 let expected_row_counts = vec![Some(100), Some(200)];
1036 assert_eq!(row_counts_col_x, expected_row_counts);
1037
1038 let values = HashSet::from([ScalarValue::from(1i32)]);
1040 let contained_part_a = composite_stats.contained(&part_a, &values).unwrap();
1041 let expected_contained_part_a = BooleanArray::from(vec![true, false]);
1042 assert_eq!(contained_part_a, expected_contained_part_a);
1043
1044 assert!(composite_stats.contained(&col_x, &values).is_none());
1046
1047 let non_existent = Column::new_unqualified("non_existent");
1049 assert!(composite_stats.min_values(&non_existent).is_none());
1050 assert!(composite_stats.max_values(&non_existent).is_none());
1051 assert!(composite_stats.null_counts(&non_existent).is_none());
1052 assert!(composite_stats.row_counts(&non_existent).is_none());
1053 assert!(composite_stats.contained(&non_existent, &values).is_none());
1054
1055 assert_eq!(composite_stats.num_containers(), 2);
1057 }
1058
1059 #[test]
1060 fn test_composite_pruning_statistics_priority() {
1061 let first_statistics = vec![
1066 Arc::new(
1067 Statistics::default()
1068 .add_column_statistics(
1069 ColumnStatistics::new_unknown()
1070 .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
1071 .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
1072 .with_null_count(Precision::Exact(0)),
1073 )
1074 .with_num_rows(Precision::Exact(100)),
1075 ),
1076 Arc::new(
1077 Statistics::default()
1078 .add_column_statistics(
1079 ColumnStatistics::new_unknown()
1080 .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
1081 .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
1082 .with_null_count(Precision::Exact(5)),
1083 )
1084 .with_num_rows(Precision::Exact(200)),
1085 ),
1086 ];
1087
1088 let first_schema = Arc::new(Schema::new(vec![Field::new(
1089 "col_a",
1090 DataType::Int32,
1091 false,
1092 )]));
1093 let first_stats = PrunableStatistics::new(first_statistics, first_schema);
1094
1095 let second_statistics = vec![
1097 Arc::new(
1098 Statistics::default()
1099 .add_column_statistics(
1100 ColumnStatistics::new_unknown()
1101 .with_min_value(Precision::Exact(ScalarValue::from(1000i32)))
1102 .with_max_value(Precision::Exact(ScalarValue::from(2000i32)))
1103 .with_null_count(Precision::Exact(10)),
1104 )
1105 .with_num_rows(Precision::Exact(1000)),
1106 ),
1107 Arc::new(
1108 Statistics::default()
1109 .add_column_statistics(
1110 ColumnStatistics::new_unknown()
1111 .with_min_value(Precision::Exact(ScalarValue::from(3000i32)))
1112 .with_max_value(Precision::Exact(ScalarValue::from(4000i32)))
1113 .with_null_count(Precision::Exact(20)),
1114 )
1115 .with_num_rows(Precision::Exact(2000)),
1116 ),
1117 ];
1118
1119 let second_schema = Arc::new(Schema::new(vec![Field::new(
1120 "col_a",
1121 DataType::Int32,
1122 false,
1123 )]));
1124 let second_stats = PrunableStatistics::new(second_statistics, second_schema);
1125
1126 let composite_stats = CompositePruningStatistics::new(vec![
1128 Box::new(first_stats.clone()),
1129 Box::new(second_stats.clone()),
1130 ]);
1131
1132 let col_a = Column::new_unqualified("col_a");
1133
1134 let min_values = as_int32_array(&composite_stats.min_values(&col_a).unwrap())
1136 .unwrap()
1137 .into_iter()
1138 .collect::<Vec<_>>();
1139 let expected_min_values = vec![Some(100), Some(300)];
1140 assert_eq!(min_values, expected_min_values);
1141
1142 let max_values = as_int32_array(&composite_stats.max_values(&col_a).unwrap())
1143 .unwrap()
1144 .into_iter()
1145 .collect::<Vec<_>>();
1146 let expected_max_values = vec![Some(200), Some(400)];
1147 assert_eq!(max_values, expected_max_values);
1148
1149 let null_counts = as_uint64_array(&composite_stats.null_counts(&col_a).unwrap())
1150 .unwrap()
1151 .into_iter()
1152 .collect::<Vec<_>>();
1153 let expected_null_counts = vec![Some(0), Some(5)];
1154 assert_eq!(null_counts, expected_null_counts);
1155
1156 let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap())
1157 .unwrap()
1158 .into_iter()
1159 .collect::<Vec<_>>();
1160 let expected_row_counts = vec![Some(100), Some(200)];
1161 assert_eq!(row_counts, expected_row_counts);
1162
1163 let composite_stats_reversed = CompositePruningStatistics::new(vec![
1167 Box::new(second_stats.clone()),
1168 Box::new(first_stats.clone()),
1169 ]);
1170
1171 let min_values =
1173 as_int32_array(&composite_stats_reversed.min_values(&col_a).unwrap())
1174 .unwrap()
1175 .into_iter()
1176 .collect::<Vec<_>>();
1177 let expected_min_values = vec![Some(1000), Some(3000)];
1178 assert_eq!(min_values, expected_min_values);
1179
1180 let max_values =
1181 as_int32_array(&composite_stats_reversed.max_values(&col_a).unwrap())
1182 .unwrap()
1183 .into_iter()
1184 .collect::<Vec<_>>();
1185 let expected_max_values = vec![Some(2000), Some(4000)];
1186 assert_eq!(max_values, expected_max_values);
1187
1188 let null_counts =
1189 as_uint64_array(&composite_stats_reversed.null_counts(&col_a).unwrap())
1190 .unwrap()
1191 .into_iter()
1192 .collect::<Vec<_>>();
1193 let expected_null_counts = vec![Some(10), Some(20)];
1194 assert_eq!(null_counts, expected_null_counts);
1195
1196 let row_counts =
1197 as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap())
1198 .unwrap()
1199 .into_iter()
1200 .collect::<Vec<_>>();
1201 let expected_row_counts = vec![Some(1000), Some(2000)];
1202 assert_eq!(row_counts, expected_row_counts);
1203 }
1204
1205 #[test]
1206 fn test_composite_pruning_statistics_empty_and_mismatched_containers() {
1207 let result = std::panic::catch_unwind(|| {
1210 CompositePruningStatistics::new(vec![]);
1211 });
1212 assert!(result.is_err());
1213
1214 let result = std::panic::catch_unwind(|| {
1216 let partition_values_1 = vec![
1219 vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
1220 vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
1221 ];
1222 let partition_fields_1 = vec![
1223 Arc::new(Field::new("part_a", DataType::Int32, false)),
1224 Arc::new(Field::new("part_b", DataType::Int32, false)),
1225 ];
1226 let partition_stats_1 = PartitionPruningStatistics::try_new(
1227 partition_values_1,
1228 partition_fields_1,
1229 )
1230 .unwrap();
1231 let partition_values_2 = vec![
1232 vec![ScalarValue::from(3i32), ScalarValue::from(30i32)],
1233 vec![ScalarValue::from(4i32), ScalarValue::from(40i32)],
1234 vec![ScalarValue::from(5i32), ScalarValue::from(50i32)],
1235 ];
1236 let partition_fields_2 = vec![
1237 Arc::new(Field::new("part_x", DataType::Int32, false)),
1238 Arc::new(Field::new("part_y", DataType::Int32, false)),
1239 ];
1240 let partition_stats_2 = PartitionPruningStatistics::try_new(
1241 partition_values_2,
1242 partition_fields_2,
1243 )
1244 .unwrap();
1245
1246 CompositePruningStatistics::new(vec![
1247 Box::new(partition_stats_1),
1248 Box::new(partition_stats_2),
1249 ]);
1250 });
1251 assert!(result.is_err());
1252 }
1253}