1use arrow::array::{Array, NullArray, UInt64Array};
19use arrow::array::{ArrayRef, BooleanArray};
20use arrow::datatypes::{FieldRef, Schema, SchemaRef};
21use std::collections::HashSet;
22use std::sync::Arc;
23
24use crate::error::DataFusionError;
25use crate::stats::Precision;
26use crate::{Column, Statistics};
27use crate::{ColumnStatistics, ScalarValue};
28
29pub trait PruningStatistics {
64 fn min_values(&self, column: &Column) -> Option<ArrayRef>;
72
73 fn max_values(&self, column: &Column) -> Option<ArrayRef>;
79
80 fn num_containers(&self) -> usize;
87
88 fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
97
98 fn row_counts(&self) -> Option<ArrayRef>;
109
110 #[allow(clippy::allow_attributes, clippy::mutable_key_type)] fn contained(
128 &self,
129 column: &Column,
130 values: &HashSet<ScalarValue>,
131 ) -> Option<BooleanArray>;
132}
133
134#[deprecated(
142 since = "52.0.0",
143 note = "This struct is no longer used internally. Use `replace_columns_with_literals` from `datafusion-physical-expr-adapter` to substitute partition column values before pruning. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
144)]
145#[derive(Clone)]
146pub struct PartitionPruningStatistics {
147 partition_values: Vec<ArrayRef>,
153 num_containers: usize,
158 partition_schema: SchemaRef,
164}
165
166#[expect(deprecated)]
167impl PartitionPruningStatistics {
168 pub fn try_new(
211 partition_values: Vec<Vec<ScalarValue>>,
212 partition_fields: Vec<FieldRef>,
213 ) -> Result<Self, DataFusionError> {
214 let num_containers = partition_values.len();
215 let partition_schema = Arc::new(Schema::new(partition_fields));
216 let mut partition_values_by_column =
217 vec![
218 Vec::with_capacity(partition_values.len());
219 partition_schema.fields().len()
220 ];
221 for partition_value in partition_values {
222 for (i, value) in partition_value.into_iter().enumerate() {
223 partition_values_by_column[i].push(value);
224 }
225 }
226 Ok(Self {
227 partition_values: partition_values_by_column
228 .into_iter()
229 .map(|v| {
230 if v.is_empty() {
231 Ok(Arc::new(NullArray::new(0)) as ArrayRef)
232 } else {
233 ScalarValue::iter_to_array(v)
234 }
235 })
236 .collect::<Result<Vec<_>, _>>()?,
237 num_containers,
238 partition_schema,
239 })
240 }
241}
242
243#[expect(deprecated)]
244impl PruningStatistics for PartitionPruningStatistics {
245 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
246 let index = self.partition_schema.index_of(column.name()).ok()?;
247 self.partition_values.get(index).and_then(|v| {
248 if v.is_empty() || v.null_count() == v.len() {
249 None
251 } else {
252 Some(Arc::clone(v))
254 }
255 })
256 }
257
258 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
259 self.min_values(column)
260 }
261
262 fn num_containers(&self) -> usize {
263 self.num_containers
264 }
265
266 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
267 None
268 }
269
270 fn row_counts(&self) -> Option<ArrayRef> {
271 None
272 }
273
274 fn contained(
275 &self,
276 column: &Column,
277 values: &HashSet<ScalarValue>,
278 ) -> Option<BooleanArray> {
279 let index = self.partition_schema.index_of(column.name()).ok()?;
280 let array = self.partition_values.get(index)?;
281 let boolean_array = values.iter().try_fold(None, |acc, v| {
282 let arrow_value = v.to_scalar().ok()?;
283 let eq_result = arrow::compute::kernels::cmp::eq(array, &arrow_value).ok()?;
284 match acc {
285 None => Some(Some(eq_result)),
286 Some(acc_array) => {
287 arrow::compute::kernels::boolean::or_kleene(&acc_array, &eq_result)
288 .map(Some)
289 .ok()
290 }
291 }
292 })??;
293 if boolean_array.is_empty() || boolean_array.null_count() == boolean_array.len() {
295 None
296 } else {
297 Some(boolean_array)
298 }
299 }
300}
301
302#[derive(Clone)]
314pub struct PrunableStatistics {
315 statistics: Vec<Arc<Statistics>>,
319 schema: SchemaRef,
321}
322
323impl PrunableStatistics {
324 pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
328 Self { statistics, schema }
329 }
330
331 fn get_exact_column_statistics(
332 &self,
333 column: &Column,
334 get_stat: impl Fn(&ColumnStatistics) -> &Precision<ScalarValue>,
335 ) -> Option<ArrayRef> {
336 let index = self.schema.index_of(column.name()).ok()?;
337 let mut has_value = false;
338 match ScalarValue::iter_to_array(self.statistics.iter().map(|s| {
339 s.column_statistics
340 .get(index)
341 .and_then(|stat| {
342 if let Precision::Exact(min) = get_stat(stat) {
343 has_value = true;
344 Some(min.clone())
345 } else {
346 None
347 }
348 })
349 .unwrap_or(ScalarValue::Null)
350 })) {
351 Ok(array) => has_value.then_some(array),
353 Err(_) => {
354 log::warn!(
355 "Failed to convert min values to array for column {}",
356 column.name()
357 );
358 None
359 }
360 }
361 }
362}
363
364impl PruningStatistics for PrunableStatistics {
365 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
366 self.get_exact_column_statistics(column, |stat| &stat.min_value)
367 }
368
369 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
370 self.get_exact_column_statistics(column, |stat| &stat.max_value)
371 }
372
373 fn num_containers(&self) -> usize {
374 self.statistics.len()
375 }
376
377 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
378 let index = self.schema.index_of(column.name()).ok()?;
379 if self.statistics.iter().any(|s| {
380 s.column_statistics
381 .get(index)
382 .is_some_and(|stat| stat.null_count.is_exact().unwrap_or(false))
383 }) {
384 Some(Arc::new(
385 self.statistics
386 .iter()
387 .map(|s| {
388 s.column_statistics.get(index).and_then(|stat| {
389 if let Precision::Exact(null_count) = &stat.null_count {
390 u64::try_from(*null_count).ok()
391 } else {
392 None
393 }
394 })
395 })
396 .collect::<UInt64Array>(),
397 ))
398 } else {
399 None
400 }
401 }
402
403 fn row_counts(&self) -> Option<ArrayRef> {
404 if self
405 .statistics
406 .iter()
407 .any(|s| s.num_rows.is_exact().unwrap_or(false))
408 {
409 Some(Arc::new(
410 self.statistics
411 .iter()
412 .map(|s| {
413 if let Precision::Exact(row_count) = &s.num_rows {
414 u64::try_from(*row_count).ok()
415 } else {
416 None
417 }
418 })
419 .collect::<UInt64Array>(),
420 ))
421 } else {
422 None
423 }
424 }
425
426 fn contained(
427 &self,
428 _column: &Column,
429 _values: &HashSet<ScalarValue>,
430 ) -> Option<BooleanArray> {
431 None
432 }
433}
434
435#[deprecated(
448 since = "52.0.0",
449 note = "This struct is no longer used internally. It may be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first. Please open an issue if you have a use case for it."
450)]
451pub struct CompositePruningStatistics {
452 pub statistics: Vec<Box<dyn PruningStatistics>>,
453}
454
455#[expect(deprecated)]
456impl CompositePruningStatistics {
457 pub fn new(statistics: Vec<Box<dyn PruningStatistics>>) -> Self {
460 assert!(!statistics.is_empty());
461 let num_containers = statistics[0].num_containers();
463 for stats in &statistics {
464 assert_eq!(num_containers, stats.num_containers());
465 }
466 Self { statistics }
467 }
468}
469
470#[expect(deprecated)]
471impl PruningStatistics for CompositePruningStatistics {
472 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
473 for stats in &self.statistics {
474 if let Some(array) = stats.min_values(column) {
475 return Some(array);
476 }
477 }
478 None
479 }
480
481 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
482 for stats in &self.statistics {
483 if let Some(array) = stats.max_values(column) {
484 return Some(array);
485 }
486 }
487 None
488 }
489
490 fn num_containers(&self) -> usize {
491 self.statistics[0].num_containers()
492 }
493
494 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
495 for stats in &self.statistics {
496 if let Some(array) = stats.null_counts(column) {
497 return Some(array);
498 }
499 }
500 None
501 }
502
503 fn row_counts(&self) -> Option<ArrayRef> {
504 for stats in &self.statistics {
505 if let Some(array) = stats.row_counts() {
506 return Some(array);
507 }
508 }
509 None
510 }
511
512 fn contained(
513 &self,
514 column: &Column,
515 values: &HashSet<ScalarValue>,
516 ) -> Option<BooleanArray> {
517 for stats in &self.statistics {
518 if let Some(array) = stats.contained(column, values) {
519 return Some(array);
520 }
521 }
522 None
523 }
524}
525
526#[cfg(test)]
527#[expect(deprecated)]
528#[allow(clippy::allow_attributes, clippy::mutable_key_type)] mod tests {
530 use crate::{
531 ColumnStatistics,
532 cast::{as_int32_array, as_uint64_array},
533 };
534
535 use super::*;
536 use arrow::datatypes::{DataType, Field};
537 use std::sync::Arc;
538
539 fn partition_pruning_statistics_setup() -> PartitionPruningStatistics {
547 let partition_values = vec![
548 vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
549 vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
550 ];
551 let partition_fields = vec![
552 Arc::new(Field::new("a", DataType::Int32, false)),
553 Arc::new(Field::new("b", DataType::Int32, false)),
554 ];
555 PartitionPruningStatistics::try_new(partition_values, partition_fields).unwrap()
556 }
557
558 #[test]
559 fn test_partition_pruning_statistics() {
560 let partition_stats = partition_pruning_statistics_setup();
561
562 let column_a = Column::new_unqualified("a");
563 let column_b = Column::new_unqualified("b");
564
565 assert!(partition_stats.null_counts(&column_a).is_none());
567 assert!(partition_stats.row_counts().is_none());
568 assert!(partition_stats.null_counts(&column_b).is_none());
569 assert!(partition_stats.row_counts().is_none());
570
571 let min_values_a =
573 as_int32_array(&partition_stats.min_values(&column_a).unwrap())
574 .unwrap()
575 .into_iter()
576 .collect::<Vec<_>>();
577 let expected_values_a = vec![Some(1), Some(3)];
578 assert_eq!(min_values_a, expected_values_a);
579 let max_values_a =
580 as_int32_array(&partition_stats.max_values(&column_a).unwrap())
581 .unwrap()
582 .into_iter()
583 .collect::<Vec<_>>();
584 let expected_values_a = vec![Some(1), Some(3)];
585 assert_eq!(max_values_a, expected_values_a);
586
587 let min_values_b =
588 as_int32_array(&partition_stats.min_values(&column_b).unwrap())
589 .unwrap()
590 .into_iter()
591 .collect::<Vec<_>>();
592 let expected_values_b = vec![Some(2), Some(4)];
593 assert_eq!(min_values_b, expected_values_b);
594 let max_values_b =
595 as_int32_array(&partition_stats.max_values(&column_b).unwrap())
596 .unwrap()
597 .into_iter()
598 .collect::<Vec<_>>();
599 let expected_values_b = vec![Some(2), Some(4)];
600 assert_eq!(max_values_b, expected_values_b);
601
602 let values = HashSet::from([ScalarValue::from(1i32)]);
604 let contained_a = partition_stats.contained(&column_a, &values).unwrap();
605 let expected_contained_a = BooleanArray::from(vec![true, false]);
606 assert_eq!(contained_a, expected_contained_a);
607 let contained_b = partition_stats.contained(&column_b, &values).unwrap();
608 let expected_contained_b = BooleanArray::from(vec![false, false]);
609 assert_eq!(contained_b, expected_contained_b);
610
611 assert_eq!(partition_stats.num_containers(), 2);
613 }
614
615 #[test]
616 fn test_partition_pruning_statistics_multiple_positive_values() {
617 let partition_stats = partition_pruning_statistics_setup();
618
619 let column_a = Column::new_unqualified("a");
620
621 let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(3i32)]);
623 let contained_a = partition_stats.contained(&column_a, &values).unwrap();
624 let expected_contained_a = BooleanArray::from(vec![true, true]);
625 assert_eq!(contained_a, expected_contained_a);
626 }
627
628 #[test]
629 fn test_partition_pruning_statistics_multiple_negative_values() {
630 let partition_stats = partition_pruning_statistics_setup();
631
632 let column_a = Column::new_unqualified("a");
633
634 let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(2i32)]);
638 let contained_a = partition_stats.contained(&column_a, &values).unwrap();
639 let expected_contained_a = BooleanArray::from(vec![true, false]);
640 assert_eq!(contained_a, expected_contained_a);
641 }
642
643 #[test]
644 fn test_partition_pruning_statistics_null_in_values() {
645 let partition_values = vec![
646 vec![
647 ScalarValue::from(1i32),
648 ScalarValue::from(2i32),
649 ScalarValue::from(3i32),
650 ],
651 vec![
652 ScalarValue::from(4i32),
653 ScalarValue::from(5i32),
654 ScalarValue::from(6i32),
655 ],
656 ];
657 let partition_fields = vec![
658 Arc::new(Field::new("a", DataType::Int32, false)),
659 Arc::new(Field::new("b", DataType::Int32, false)),
660 Arc::new(Field::new("c", DataType::Int32, false)),
661 ];
662 let partition_stats =
663 PartitionPruningStatistics::try_new(partition_values, partition_fields)
664 .unwrap();
665
666 let column_a = Column::new_unqualified("a");
667 let column_b = Column::new_unqualified("b");
668 let column_c = Column::new_unqualified("c");
669
670 let values_a = HashSet::from([ScalarValue::from(1i32), ScalarValue::Int32(None)]);
671 let contained_a = partition_stats.contained(&column_a, &values_a).unwrap();
672 let mut builder = BooleanArray::builder(2);
673 builder.append_value(true);
674 builder.append_null();
675 let expected_contained_a = builder.finish();
676 assert_eq!(contained_a, expected_contained_a);
677
678 let values_b = HashSet::from([ScalarValue::Int32(None), ScalarValue::from(5i32)]);
681 let contained_b = partition_stats.contained(&column_b, &values_b).unwrap();
682 let mut builder = BooleanArray::builder(2);
683 builder.append_null();
684 builder.append_value(true);
685 let expected_contained_b = builder.finish();
686 assert_eq!(contained_b, expected_contained_b);
687
688 let values_c = HashSet::from([ScalarValue::Int32(None)]);
690 let contained_c = partition_stats.contained(&column_c, &values_c);
691 assert!(contained_c.is_none());
692 }
693
694 #[test]
695 fn test_partition_pruning_statistics_empty() {
696 let partition_values = vec![];
697 let partition_fields = vec![
698 Arc::new(Field::new("a", DataType::Int32, false)),
699 Arc::new(Field::new("b", DataType::Int32, false)),
700 ];
701 let partition_stats =
702 PartitionPruningStatistics::try_new(partition_values, partition_fields)
703 .unwrap();
704
705 let column_a = Column::new_unqualified("a");
706 let column_b = Column::new_unqualified("b");
707
708 assert!(partition_stats.null_counts(&column_a).is_none());
710 assert!(partition_stats.row_counts().is_none());
711 assert!(partition_stats.null_counts(&column_b).is_none());
712 assert!(partition_stats.row_counts().is_none());
713
714 assert!(partition_stats.min_values(&column_a).is_none());
716 assert!(partition_stats.max_values(&column_a).is_none());
717 assert!(partition_stats.min_values(&column_b).is_none());
718 assert!(partition_stats.max_values(&column_b).is_none());
719
720 let values = HashSet::from([ScalarValue::from(1i32)]);
722 assert!(partition_stats.contained(&column_a, &values).is_none());
723 }
724
725 #[test]
726 fn test_statistics_pruning_statistics() {
727 let statistics = vec![
728 Arc::new(
729 Statistics::default()
730 .add_column_statistics(
731 ColumnStatistics::new_unknown()
732 .with_min_value(Precision::Exact(ScalarValue::from(0i32)))
733 .with_max_value(Precision::Exact(ScalarValue::from(100i32)))
734 .with_null_count(Precision::Exact(0)),
735 )
736 .add_column_statistics(
737 ColumnStatistics::new_unknown()
738 .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
739 .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
740 .with_null_count(Precision::Exact(5)),
741 )
742 .with_num_rows(Precision::Exact(100)),
743 ),
744 Arc::new(
745 Statistics::default()
746 .add_column_statistics(
747 ColumnStatistics::new_unknown()
748 .with_min_value(Precision::Exact(ScalarValue::from(50i32)))
749 .with_max_value(Precision::Exact(ScalarValue::from(300i32)))
750 .with_null_count(Precision::Exact(10)),
751 )
752 .add_column_statistics(
753 ColumnStatistics::new_unknown()
754 .with_min_value(Precision::Exact(ScalarValue::from(200i32)))
755 .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
756 .with_null_count(Precision::Exact(0)),
757 )
758 .with_num_rows(Precision::Exact(200)),
759 ),
760 ];
761
762 let schema = Arc::new(Schema::new(vec![
763 Field::new("a", DataType::Int32, false),
764 Field::new("b", DataType::Int32, false),
765 Field::new("c", DataType::Int32, false),
766 ]));
767 let pruning_stats = PrunableStatistics::new(statistics, schema);
768
769 let column_a = Column::new_unqualified("a");
770 let column_b = Column::new_unqualified("b");
771
772 let min_values_a = as_int32_array(&pruning_stats.min_values(&column_a).unwrap())
774 .unwrap()
775 .into_iter()
776 .collect::<Vec<_>>();
777 let expected_values_a = vec![Some(0), Some(50)];
778 assert_eq!(min_values_a, expected_values_a);
779 let max_values_a = as_int32_array(&pruning_stats.max_values(&column_a).unwrap())
780 .unwrap()
781 .into_iter()
782 .collect::<Vec<_>>();
783 let expected_values_a = vec![Some(100), Some(300)];
784 assert_eq!(max_values_a, expected_values_a);
785 let min_values_b = as_int32_array(&pruning_stats.min_values(&column_b).unwrap())
786 .unwrap()
787 .into_iter()
788 .collect::<Vec<_>>();
789 let expected_values_b = vec![Some(100), Some(200)];
790 assert_eq!(min_values_b, expected_values_b);
791 let max_values_b = as_int32_array(&pruning_stats.max_values(&column_b).unwrap())
792 .unwrap()
793 .into_iter()
794 .collect::<Vec<_>>();
795 let expected_values_b = vec![Some(200), Some(400)];
796 assert_eq!(max_values_b, expected_values_b);
797
798 let null_counts_a =
800 as_uint64_array(&pruning_stats.null_counts(&column_a).unwrap())
801 .unwrap()
802 .into_iter()
803 .collect::<Vec<_>>();
804 let expected_null_counts_a = vec![Some(0), Some(10)];
805 assert_eq!(null_counts_a, expected_null_counts_a);
806 let null_counts_b =
807 as_uint64_array(&pruning_stats.null_counts(&column_b).unwrap())
808 .unwrap()
809 .into_iter()
810 .collect::<Vec<_>>();
811 let expected_null_counts_b = vec![Some(5), Some(0)];
812 assert_eq!(null_counts_b, expected_null_counts_b);
813
814 let row_counts_a = as_uint64_array(&pruning_stats.row_counts().unwrap())
816 .unwrap()
817 .into_iter()
818 .collect::<Vec<_>>();
819 let expected_row_counts_a = vec![Some(100), Some(200)];
820 assert_eq!(row_counts_a, expected_row_counts_a);
821 let row_counts_b = as_uint64_array(&pruning_stats.row_counts().unwrap())
822 .unwrap()
823 .into_iter()
824 .collect::<Vec<_>>();
825 let expected_row_counts_b = vec![Some(100), Some(200)];
826 assert_eq!(row_counts_b, expected_row_counts_b);
827
828 let values = HashSet::from([ScalarValue::from(0i32)]);
830 assert!(pruning_stats.contained(&column_a, &values).is_none());
831 assert!(pruning_stats.contained(&column_b, &values).is_none());
832
833 assert_eq!(pruning_stats.num_containers(), 2);
835
836 let column_c = Column::new_unqualified("c");
838 assert!(pruning_stats.min_values(&column_c).is_none());
839 assert!(pruning_stats.max_values(&column_c).is_none());
840 assert!(pruning_stats.null_counts(&column_c).is_none());
841 let row_counts_c = as_uint64_array(&pruning_stats.row_counts().unwrap())
847 .unwrap()
848 .into_iter()
849 .collect::<Vec<_>>();
850 let expected_row_counts_c = vec![Some(100), Some(200)];
851 assert_eq!(row_counts_c, expected_row_counts_c);
852 assert!(pruning_stats.contained(&column_c, &values).is_none());
853
854 let column_d = Column::new_unqualified("d");
857 assert!(pruning_stats.min_values(&column_d).is_none());
858 assert!(pruning_stats.max_values(&column_d).is_none());
859 assert!(pruning_stats.null_counts(&column_d).is_none());
860 assert!(pruning_stats.row_counts().is_some());
861 assert!(pruning_stats.contained(&column_d, &values).is_none());
862 }
863
864 #[test]
865 fn test_statistics_pruning_statistics_empty() {
866 let statistics = vec![];
867 let schema = Arc::new(Schema::new(vec![
868 Field::new("a", DataType::Int32, false),
869 Field::new("b", DataType::Int32, false),
870 Field::new("c", DataType::Int32, false),
871 ]));
872 let pruning_stats = PrunableStatistics::new(statistics, schema);
873
874 let column_a = Column::new_unqualified("a");
875 let column_b = Column::new_unqualified("b");
876
877 assert!(pruning_stats.min_values(&column_a).is_none());
879 assert!(pruning_stats.max_values(&column_a).is_none());
880 assert!(pruning_stats.min_values(&column_b).is_none());
881 assert!(pruning_stats.max_values(&column_b).is_none());
882
883 assert!(pruning_stats.null_counts(&column_a).is_none());
885 assert!(pruning_stats.null_counts(&column_b).is_none());
886
887 assert!(pruning_stats.row_counts().is_none());
889 assert!(pruning_stats.row_counts().is_none());
890
891 let values = HashSet::from([ScalarValue::from(1i32)]);
893 assert!(pruning_stats.contained(&column_a, &values).is_none());
894 }
895
896 #[test]
897 fn test_composite_pruning_statistics_partition_and_file() {
898 let partition_values = vec![
900 vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
901 vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
902 ];
903 let partition_fields = vec![
904 Arc::new(Field::new("part_a", DataType::Int32, false)),
905 Arc::new(Field::new("part_b", DataType::Int32, false)),
906 ];
907 let partition_stats =
908 PartitionPruningStatistics::try_new(partition_values, partition_fields)
909 .unwrap();
910
911 let file_statistics = vec![
913 Arc::new(
914 Statistics::default()
915 .add_column_statistics(
916 ColumnStatistics::new_unknown()
917 .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
918 .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
919 .with_null_count(Precision::Exact(0)),
920 )
921 .add_column_statistics(
922 ColumnStatistics::new_unknown()
923 .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
924 .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
925 .with_null_count(Precision::Exact(5)),
926 )
927 .with_num_rows(Precision::Exact(100)),
928 ),
929 Arc::new(
930 Statistics::default()
931 .add_column_statistics(
932 ColumnStatistics::new_unknown()
933 .with_min_value(Precision::Exact(ScalarValue::from(500i32)))
934 .with_max_value(Precision::Exact(ScalarValue::from(600i32)))
935 .with_null_count(Precision::Exact(10)),
936 )
937 .add_column_statistics(
938 ColumnStatistics::new_unknown()
939 .with_min_value(Precision::Exact(ScalarValue::from(700i32)))
940 .with_max_value(Precision::Exact(ScalarValue::from(800i32)))
941 .with_null_count(Precision::Exact(0)),
942 )
943 .with_num_rows(Precision::Exact(200)),
944 ),
945 ];
946
947 let file_schema = Arc::new(Schema::new(vec![
948 Field::new("col_x", DataType::Int32, false),
949 Field::new("col_y", DataType::Int32, false),
950 ]));
951 let file_stats = PrunableStatistics::new(file_statistics, file_schema);
952
953 let composite_stats = CompositePruningStatistics::new(vec![
955 Box::new(partition_stats),
956 Box::new(file_stats),
957 ]);
958
959 let part_a = Column::new_unqualified("part_a");
961 let part_b = Column::new_unqualified("part_b");
962
963 let col_x = Column::new_unqualified("col_x");
965 let col_y = Column::new_unqualified("col_y");
966
967 let min_values_part_a =
969 as_int32_array(&composite_stats.min_values(&part_a).unwrap())
970 .unwrap()
971 .into_iter()
972 .collect::<Vec<_>>();
973 let expected_values_part_a = vec![Some(1), Some(2)];
974 assert_eq!(min_values_part_a, expected_values_part_a);
975
976 let max_values_part_a =
977 as_int32_array(&composite_stats.max_values(&part_a).unwrap())
978 .unwrap()
979 .into_iter()
980 .collect::<Vec<_>>();
981 assert_eq!(max_values_part_a, expected_values_part_a);
983
984 let min_values_part_b =
985 as_int32_array(&composite_stats.min_values(&part_b).unwrap())
986 .unwrap()
987 .into_iter()
988 .collect::<Vec<_>>();
989 let expected_values_part_b = vec![Some(10), Some(20)];
990 assert_eq!(min_values_part_b, expected_values_part_b);
991
992 let min_values_col_x =
994 as_int32_array(&composite_stats.min_values(&col_x).unwrap())
995 .unwrap()
996 .into_iter()
997 .collect::<Vec<_>>();
998 let expected_values_col_x = vec![Some(100), Some(500)];
999 assert_eq!(min_values_col_x, expected_values_col_x);
1000
1001 let max_values_col_x =
1002 as_int32_array(&composite_stats.max_values(&col_x).unwrap())
1003 .unwrap()
1004 .into_iter()
1005 .collect::<Vec<_>>();
1006 let expected_max_values_col_x = vec![Some(200), Some(600)];
1007 assert_eq!(max_values_col_x, expected_max_values_col_x);
1008
1009 let min_values_col_y =
1010 as_int32_array(&composite_stats.min_values(&col_y).unwrap())
1011 .unwrap()
1012 .into_iter()
1013 .collect::<Vec<_>>();
1014 let expected_values_col_y = vec![Some(300), Some(700)];
1015 assert_eq!(min_values_col_y, expected_values_col_y);
1016
1017 assert!(composite_stats.null_counts(&part_a).is_none());
1019 assert!(composite_stats.null_counts(&part_b).is_none());
1020
1021 let null_counts_col_x =
1022 as_uint64_array(&composite_stats.null_counts(&col_x).unwrap())
1023 .unwrap()
1024 .into_iter()
1025 .collect::<Vec<_>>();
1026 let expected_null_counts_col_x = vec![Some(0), Some(10)];
1027 assert_eq!(null_counts_col_x, expected_null_counts_col_x);
1028
1029 let row_counts_col_x = as_uint64_array(&composite_stats.row_counts().unwrap())
1031 .unwrap()
1032 .into_iter()
1033 .collect::<Vec<_>>();
1034 let expected_row_counts = vec![Some(100), Some(200)];
1035 assert_eq!(row_counts_col_x, expected_row_counts);
1036
1037 let values = HashSet::from([ScalarValue::from(1i32)]);
1039 let contained_part_a = composite_stats.contained(&part_a, &values).unwrap();
1040 let expected_contained_part_a = BooleanArray::from(vec![true, false]);
1041 assert_eq!(contained_part_a, expected_contained_part_a);
1042
1043 assert!(composite_stats.contained(&col_x, &values).is_none());
1045
1046 let non_existent = Column::new_unqualified("non_existent");
1049 assert!(composite_stats.min_values(&non_existent).is_none());
1050 assert!(composite_stats.max_values(&non_existent).is_none());
1051 assert!(composite_stats.null_counts(&non_existent).is_none());
1052 assert!(composite_stats.row_counts().is_some());
1053 assert!(composite_stats.contained(&non_existent, &values).is_none());
1054
1055 assert_eq!(composite_stats.num_containers(), 2);
1057 }
1058
1059 #[test]
1060 fn test_composite_pruning_statistics_priority() {
1061 let first_statistics = vec![
1066 Arc::new(
1067 Statistics::default()
1068 .add_column_statistics(
1069 ColumnStatistics::new_unknown()
1070 .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
1071 .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
1072 .with_null_count(Precision::Exact(0)),
1073 )
1074 .with_num_rows(Precision::Exact(100)),
1075 ),
1076 Arc::new(
1077 Statistics::default()
1078 .add_column_statistics(
1079 ColumnStatistics::new_unknown()
1080 .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
1081 .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
1082 .with_null_count(Precision::Exact(5)),
1083 )
1084 .with_num_rows(Precision::Exact(200)),
1085 ),
1086 ];
1087
1088 let first_schema = Arc::new(Schema::new(vec![Field::new(
1089 "col_a",
1090 DataType::Int32,
1091 false,
1092 )]));
1093 let first_stats = PrunableStatistics::new(first_statistics, first_schema);
1094
1095 let second_statistics = vec![
1097 Arc::new(
1098 Statistics::default()
1099 .add_column_statistics(
1100 ColumnStatistics::new_unknown()
1101 .with_min_value(Precision::Exact(ScalarValue::from(1000i32)))
1102 .with_max_value(Precision::Exact(ScalarValue::from(2000i32)))
1103 .with_null_count(Precision::Exact(10)),
1104 )
1105 .with_num_rows(Precision::Exact(1000)),
1106 ),
1107 Arc::new(
1108 Statistics::default()
1109 .add_column_statistics(
1110 ColumnStatistics::new_unknown()
1111 .with_min_value(Precision::Exact(ScalarValue::from(3000i32)))
1112 .with_max_value(Precision::Exact(ScalarValue::from(4000i32)))
1113 .with_null_count(Precision::Exact(20)),
1114 )
1115 .with_num_rows(Precision::Exact(2000)),
1116 ),
1117 ];
1118
1119 let second_schema = Arc::new(Schema::new(vec![Field::new(
1120 "col_a",
1121 DataType::Int32,
1122 false,
1123 )]));
1124 let second_stats = PrunableStatistics::new(second_statistics, second_schema);
1125
1126 let composite_stats = CompositePruningStatistics::new(vec![
1128 Box::new(first_stats.clone()),
1129 Box::new(second_stats.clone()),
1130 ]);
1131
1132 let col_a = Column::new_unqualified("col_a");
1133
1134 let min_values = as_int32_array(&composite_stats.min_values(&col_a).unwrap())
1136 .unwrap()
1137 .into_iter()
1138 .collect::<Vec<_>>();
1139 let expected_min_values = vec![Some(100), Some(300)];
1140 assert_eq!(min_values, expected_min_values);
1141
1142 let max_values = as_int32_array(&composite_stats.max_values(&col_a).unwrap())
1143 .unwrap()
1144 .into_iter()
1145 .collect::<Vec<_>>();
1146 let expected_max_values = vec![Some(200), Some(400)];
1147 assert_eq!(max_values, expected_max_values);
1148
1149 let null_counts = as_uint64_array(&composite_stats.null_counts(&col_a).unwrap())
1150 .unwrap()
1151 .into_iter()
1152 .collect::<Vec<_>>();
1153 let expected_null_counts = vec![Some(0), Some(5)];
1154 assert_eq!(null_counts, expected_null_counts);
1155
1156 let row_counts = as_uint64_array(&composite_stats.row_counts().unwrap())
1157 .unwrap()
1158 .into_iter()
1159 .collect::<Vec<_>>();
1160 let expected_row_counts = vec![Some(100), Some(200)];
1161 assert_eq!(row_counts, expected_row_counts);
1162
1163 let composite_stats_reversed = CompositePruningStatistics::new(vec![
1167 Box::new(second_stats.clone()),
1168 Box::new(first_stats.clone()),
1169 ]);
1170
1171 let min_values =
1173 as_int32_array(&composite_stats_reversed.min_values(&col_a).unwrap())
1174 .unwrap()
1175 .into_iter()
1176 .collect::<Vec<_>>();
1177 let expected_min_values = vec![Some(1000), Some(3000)];
1178 assert_eq!(min_values, expected_min_values);
1179
1180 let max_values =
1181 as_int32_array(&composite_stats_reversed.max_values(&col_a).unwrap())
1182 .unwrap()
1183 .into_iter()
1184 .collect::<Vec<_>>();
1185 let expected_max_values = vec![Some(2000), Some(4000)];
1186 assert_eq!(max_values, expected_max_values);
1187
1188 let null_counts =
1189 as_uint64_array(&composite_stats_reversed.null_counts(&col_a).unwrap())
1190 .unwrap()
1191 .into_iter()
1192 .collect::<Vec<_>>();
1193 let expected_null_counts = vec![Some(10), Some(20)];
1194 assert_eq!(null_counts, expected_null_counts);
1195
1196 let row_counts = as_uint64_array(&composite_stats_reversed.row_counts().unwrap())
1197 .unwrap()
1198 .into_iter()
1199 .collect::<Vec<_>>();
1200 let expected_row_counts = vec![Some(1000), Some(2000)];
1201 assert_eq!(row_counts, expected_row_counts);
1202 }
1203
1204 #[test]
1205 fn test_composite_pruning_statistics_empty_and_mismatched_containers() {
1206 let result = std::panic::catch_unwind(|| {
1209 CompositePruningStatistics::new(vec![]);
1210 });
1211 assert!(result.is_err());
1212
1213 let result = std::panic::catch_unwind(|| {
1215 let partition_values_1 = vec![
1218 vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
1219 vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
1220 ];
1221 let partition_fields_1 = vec![
1222 Arc::new(Field::new("part_a", DataType::Int32, false)),
1223 Arc::new(Field::new("part_b", DataType::Int32, false)),
1224 ];
1225 let partition_stats_1 = PartitionPruningStatistics::try_new(
1226 partition_values_1,
1227 partition_fields_1,
1228 )
1229 .unwrap();
1230 let partition_values_2 = vec![
1231 vec![ScalarValue::from(3i32), ScalarValue::from(30i32)],
1232 vec![ScalarValue::from(4i32), ScalarValue::from(40i32)],
1233 vec![ScalarValue::from(5i32), ScalarValue::from(50i32)],
1234 ];
1235 let partition_fields_2 = vec![
1236 Arc::new(Field::new("part_x", DataType::Int32, false)),
1237 Arc::new(Field::new("part_y", DataType::Int32, false)),
1238 ];
1239 let partition_stats_2 = PartitionPruningStatistics::try_new(
1240 partition_values_2,
1241 partition_fields_2,
1242 )
1243 .unwrap();
1244
1245 CompositePruningStatistics::new(vec![
1246 Box::new(partition_stats_1),
1247 Box::new(partition_stats_2),
1248 ]);
1249 });
1250 assert!(result.is_err());
1251 }
1252}