1use arrow::array::{Array, NullArray, UInt64Array};
19use arrow::array::{ArrayRef, BooleanArray};
20use arrow::datatypes::{FieldRef, Schema, SchemaRef};
21use std::collections::HashSet;
22use std::sync::Arc;
23
24use crate::error::DataFusionError;
25use crate::stats::Precision;
26use crate::{Column, Statistics};
27use crate::{ColumnStatistics, ScalarValue};
28
29pub trait PruningStatistics {
64 fn min_values(&self, column: &Column) -> Option<ArrayRef>;
72
73 fn max_values(&self, column: &Column) -> Option<ArrayRef>;
79
80 fn num_containers(&self) -> usize;
87
88 fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
97
98 fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
107
108 fn contained(
125 &self,
126 column: &Column,
127 values: &HashSet<ScalarValue>,
128 ) -> Option<BooleanArray>;
129}
130
131#[derive(Clone)]
139pub struct PartitionPruningStatistics {
140 partition_values: Vec<ArrayRef>,
146 num_containers: usize,
151 partition_schema: SchemaRef,
157}
158
159impl PartitionPruningStatistics {
160 pub fn try_new(
173 partition_values: Vec<Vec<ScalarValue>>,
174 partition_fields: Vec<FieldRef>,
175 ) -> Result<Self, DataFusionError> {
176 let num_containers = partition_values.len();
177 let partition_schema = Arc::new(Schema::new(partition_fields));
178 let mut partition_values_by_column =
179 vec![
180 Vec::with_capacity(partition_values.len());
181 partition_schema.fields().len()
182 ];
183 for partition_value in partition_values {
184 for (i, value) in partition_value.into_iter().enumerate() {
185 partition_values_by_column[i].push(value);
186 }
187 }
188 Ok(Self {
189 partition_values: partition_values_by_column
190 .into_iter()
191 .map(|v| {
192 if v.is_empty() {
193 Ok(Arc::new(NullArray::new(0)) as ArrayRef)
194 } else {
195 ScalarValue::iter_to_array(v)
196 }
197 })
198 .collect::<Result<Vec<_>, _>>()?,
199 num_containers,
200 partition_schema,
201 })
202 }
203}
204
205impl PruningStatistics for PartitionPruningStatistics {
206 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
207 let index = self.partition_schema.index_of(column.name()).ok()?;
208 self.partition_values.get(index).and_then(|v| {
209 if v.is_empty() || v.null_count() == v.len() {
210 None
212 } else {
213 Some(Arc::clone(v))
215 }
216 })
217 }
218
219 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
220 self.min_values(column)
221 }
222
223 fn num_containers(&self) -> usize {
224 self.num_containers
225 }
226
227 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
228 None
229 }
230
231 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
232 None
233 }
234
235 fn contained(
236 &self,
237 column: &Column,
238 values: &HashSet<ScalarValue>,
239 ) -> Option<BooleanArray> {
240 let index = self.partition_schema.index_of(column.name()).ok()?;
241 let array = self.partition_values.get(index)?;
242 let boolean_array = values.iter().try_fold(None, |acc, v| {
243 let arrow_value = v.to_scalar().ok()?;
244 let eq_result = arrow::compute::kernels::cmp::eq(array, &arrow_value).ok()?;
245 match acc {
246 None => Some(Some(eq_result)),
247 Some(acc_array) => {
248 arrow::compute::kernels::boolean::and(&acc_array, &eq_result)
249 .map(Some)
250 .ok()
251 }
252 }
253 })??;
254 if boolean_array.is_empty() || boolean_array.null_count() == boolean_array.len() {
256 None
257 } else {
258 Some(boolean_array)
259 }
260 }
261}
262
263#[derive(Clone)]
275pub struct PrunableStatistics {
276 statistics: Vec<Arc<Statistics>>,
280 schema: SchemaRef,
282}
283
284impl PrunableStatistics {
285 pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
289 Self { statistics, schema }
290 }
291
292 fn get_exact_column_statistics(
293 &self,
294 column: &Column,
295 get_stat: impl Fn(&ColumnStatistics) -> &Precision<ScalarValue>,
296 ) -> Option<ArrayRef> {
297 let index = self.schema.index_of(column.name()).ok()?;
298 let mut has_value = false;
299 match ScalarValue::iter_to_array(self.statistics.iter().map(|s| {
300 s.column_statistics
301 .get(index)
302 .and_then(|stat| {
303 if let Precision::Exact(min) = get_stat(stat) {
304 has_value = true;
305 Some(min.clone())
306 } else {
307 None
308 }
309 })
310 .unwrap_or(ScalarValue::Null)
311 })) {
312 Ok(array) => has_value.then_some(array),
314 Err(_) => {
315 log::warn!(
316 "Failed to convert min values to array for column {}",
317 column.name()
318 );
319 None
320 }
321 }
322 }
323}
324
325impl PruningStatistics for PrunableStatistics {
326 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
327 self.get_exact_column_statistics(column, |stat| &stat.min_value)
328 }
329
330 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
331 self.get_exact_column_statistics(column, |stat| &stat.max_value)
332 }
333
334 fn num_containers(&self) -> usize {
335 self.statistics.len()
336 }
337
338 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
339 let index = self.schema.index_of(column.name()).ok()?;
340 if self.statistics.iter().any(|s| {
341 s.column_statistics
342 .get(index)
343 .is_some_and(|stat| stat.null_count.is_exact().unwrap_or(false))
344 }) {
345 Some(Arc::new(
346 self.statistics
347 .iter()
348 .map(|s| {
349 s.column_statistics.get(index).and_then(|stat| {
350 if let Precision::Exact(null_count) = &stat.null_count {
351 u64::try_from(*null_count).ok()
352 } else {
353 None
354 }
355 })
356 })
357 .collect::<UInt64Array>(),
358 ))
359 } else {
360 None
361 }
362 }
363
364 fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
365 if self.schema.index_of(column.name()).is_err() {
367 return None;
368 }
369 if self
370 .statistics
371 .iter()
372 .any(|s| s.num_rows.is_exact().unwrap_or(false))
373 {
374 Some(Arc::new(
375 self.statistics
376 .iter()
377 .map(|s| {
378 if let Precision::Exact(row_count) = &s.num_rows {
379 u64::try_from(*row_count).ok()
380 } else {
381 None
382 }
383 })
384 .collect::<UInt64Array>(),
385 ))
386 } else {
387 None
388 }
389 }
390
391 fn contained(
392 &self,
393 _column: &Column,
394 _values: &HashSet<ScalarValue>,
395 ) -> Option<BooleanArray> {
396 None
397 }
398}
399
400pub struct CompositePruningStatistics {
413 pub statistics: Vec<Box<dyn PruningStatistics>>,
414}
415
416impl CompositePruningStatistics {
417 pub fn new(statistics: Vec<Box<dyn PruningStatistics>>) -> Self {
420 assert!(!statistics.is_empty());
421 let num_containers = statistics[0].num_containers();
423 for stats in &statistics {
424 assert_eq!(num_containers, stats.num_containers());
425 }
426 Self { statistics }
427 }
428}
429
430impl PruningStatistics for CompositePruningStatistics {
431 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
432 for stats in &self.statistics {
433 if let Some(array) = stats.min_values(column) {
434 return Some(array);
435 }
436 }
437 None
438 }
439
440 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
441 for stats in &self.statistics {
442 if let Some(array) = stats.max_values(column) {
443 return Some(array);
444 }
445 }
446 None
447 }
448
449 fn num_containers(&self) -> usize {
450 self.statistics[0].num_containers()
451 }
452
453 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
454 for stats in &self.statistics {
455 if let Some(array) = stats.null_counts(column) {
456 return Some(array);
457 }
458 }
459 None
460 }
461
462 fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
463 for stats in &self.statistics {
464 if let Some(array) = stats.row_counts(column) {
465 return Some(array);
466 }
467 }
468 None
469 }
470
471 fn contained(
472 &self,
473 column: &Column,
474 values: &HashSet<ScalarValue>,
475 ) -> Option<BooleanArray> {
476 for stats in &self.statistics {
477 if let Some(array) = stats.contained(column, values) {
478 return Some(array);
479 }
480 }
481 None
482 }
483}
484
485#[cfg(test)]
486mod tests {
487 use crate::{
488 cast::{as_int32_array, as_uint64_array},
489 ColumnStatistics,
490 };
491
492 use super::*;
493 use arrow::datatypes::{DataType, Field};
494 use std::sync::Arc;
495
496 #[test]
497 fn test_partition_pruning_statistics() {
498 let partition_values = vec![
499 vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
500 vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
501 ];
502 let partition_fields = vec![
503 Arc::new(Field::new("a", DataType::Int32, false)),
504 Arc::new(Field::new("b", DataType::Int32, false)),
505 ];
506 let partition_stats =
507 PartitionPruningStatistics::try_new(partition_values, partition_fields)
508 .unwrap();
509
510 let column_a = Column::new_unqualified("a");
511 let column_b = Column::new_unqualified("b");
512
513 assert!(partition_stats.null_counts(&column_a).is_none());
515 assert!(partition_stats.row_counts(&column_a).is_none());
516 assert!(partition_stats.null_counts(&column_b).is_none());
517 assert!(partition_stats.row_counts(&column_b).is_none());
518
519 let min_values_a =
521 as_int32_array(&partition_stats.min_values(&column_a).unwrap())
522 .unwrap()
523 .into_iter()
524 .collect::<Vec<_>>();
525 let expected_values_a = vec![Some(1), Some(3)];
526 assert_eq!(min_values_a, expected_values_a);
527 let max_values_a =
528 as_int32_array(&partition_stats.max_values(&column_a).unwrap())
529 .unwrap()
530 .into_iter()
531 .collect::<Vec<_>>();
532 let expected_values_a = vec![Some(1), Some(3)];
533 assert_eq!(max_values_a, expected_values_a);
534
535 let min_values_b =
536 as_int32_array(&partition_stats.min_values(&column_b).unwrap())
537 .unwrap()
538 .into_iter()
539 .collect::<Vec<_>>();
540 let expected_values_b = vec![Some(2), Some(4)];
541 assert_eq!(min_values_b, expected_values_b);
542 let max_values_b =
543 as_int32_array(&partition_stats.max_values(&column_b).unwrap())
544 .unwrap()
545 .into_iter()
546 .collect::<Vec<_>>();
547 let expected_values_b = vec![Some(2), Some(4)];
548 assert_eq!(max_values_b, expected_values_b);
549
550 let values = HashSet::from([ScalarValue::from(1i32)]);
552 let contained_a = partition_stats.contained(&column_a, &values).unwrap();
553 let expected_contained_a = BooleanArray::from(vec![true, false]);
554 assert_eq!(contained_a, expected_contained_a);
555 let contained_b = partition_stats.contained(&column_b, &values).unwrap();
556 let expected_contained_b = BooleanArray::from(vec![false, false]);
557 assert_eq!(contained_b, expected_contained_b);
558
559 assert_eq!(partition_stats.num_containers(), 2);
561 }
562
563 #[test]
564 fn test_partition_pruning_statistics_empty() {
565 let partition_values = vec![];
566 let partition_fields = vec![
567 Arc::new(Field::new("a", DataType::Int32, false)),
568 Arc::new(Field::new("b", DataType::Int32, false)),
569 ];
570 let partition_stats =
571 PartitionPruningStatistics::try_new(partition_values, partition_fields)
572 .unwrap();
573
574 let column_a = Column::new_unqualified("a");
575 let column_b = Column::new_unqualified("b");
576
577 assert!(partition_stats.null_counts(&column_a).is_none());
579 assert!(partition_stats.row_counts(&column_a).is_none());
580 assert!(partition_stats.null_counts(&column_b).is_none());
581 assert!(partition_stats.row_counts(&column_b).is_none());
582
583 assert!(partition_stats.min_values(&column_a).is_none());
585 assert!(partition_stats.max_values(&column_a).is_none());
586 assert!(partition_stats.min_values(&column_b).is_none());
587 assert!(partition_stats.max_values(&column_b).is_none());
588
589 let values = HashSet::from([ScalarValue::from(1i32)]);
591 assert!(partition_stats.contained(&column_a, &values).is_none());
592 }
593
594 #[test]
595 fn test_statistics_pruning_statistics() {
596 let statistics = vec![
597 Arc::new(
598 Statistics::default()
599 .add_column_statistics(
600 ColumnStatistics::new_unknown()
601 .with_min_value(Precision::Exact(ScalarValue::from(0i32)))
602 .with_max_value(Precision::Exact(ScalarValue::from(100i32)))
603 .with_null_count(Precision::Exact(0)),
604 )
605 .add_column_statistics(
606 ColumnStatistics::new_unknown()
607 .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
608 .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
609 .with_null_count(Precision::Exact(5)),
610 )
611 .with_num_rows(Precision::Exact(100)),
612 ),
613 Arc::new(
614 Statistics::default()
615 .add_column_statistics(
616 ColumnStatistics::new_unknown()
617 .with_min_value(Precision::Exact(ScalarValue::from(50i32)))
618 .with_max_value(Precision::Exact(ScalarValue::from(300i32)))
619 .with_null_count(Precision::Exact(10)),
620 )
621 .add_column_statistics(
622 ColumnStatistics::new_unknown()
623 .with_min_value(Precision::Exact(ScalarValue::from(200i32)))
624 .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
625 .with_null_count(Precision::Exact(0)),
626 )
627 .with_num_rows(Precision::Exact(200)),
628 ),
629 ];
630
631 let schema = Arc::new(Schema::new(vec![
632 Field::new("a", DataType::Int32, false),
633 Field::new("b", DataType::Int32, false),
634 Field::new("c", DataType::Int32, false),
635 ]));
636 let pruning_stats = PrunableStatistics::new(statistics, schema);
637
638 let column_a = Column::new_unqualified("a");
639 let column_b = Column::new_unqualified("b");
640
641 let min_values_a = as_int32_array(&pruning_stats.min_values(&column_a).unwrap())
643 .unwrap()
644 .into_iter()
645 .collect::<Vec<_>>();
646 let expected_values_a = vec![Some(0), Some(50)];
647 assert_eq!(min_values_a, expected_values_a);
648 let max_values_a = as_int32_array(&pruning_stats.max_values(&column_a).unwrap())
649 .unwrap()
650 .into_iter()
651 .collect::<Vec<_>>();
652 let expected_values_a = vec![Some(100), Some(300)];
653 assert_eq!(max_values_a, expected_values_a);
654 let min_values_b = as_int32_array(&pruning_stats.min_values(&column_b).unwrap())
655 .unwrap()
656 .into_iter()
657 .collect::<Vec<_>>();
658 let expected_values_b = vec![Some(100), Some(200)];
659 assert_eq!(min_values_b, expected_values_b);
660 let max_values_b = as_int32_array(&pruning_stats.max_values(&column_b).unwrap())
661 .unwrap()
662 .into_iter()
663 .collect::<Vec<_>>();
664 let expected_values_b = vec![Some(200), Some(400)];
665 assert_eq!(max_values_b, expected_values_b);
666
667 let null_counts_a =
669 as_uint64_array(&pruning_stats.null_counts(&column_a).unwrap())
670 .unwrap()
671 .into_iter()
672 .collect::<Vec<_>>();
673 let expected_null_counts_a = vec![Some(0), Some(10)];
674 assert_eq!(null_counts_a, expected_null_counts_a);
675 let null_counts_b =
676 as_uint64_array(&pruning_stats.null_counts(&column_b).unwrap())
677 .unwrap()
678 .into_iter()
679 .collect::<Vec<_>>();
680 let expected_null_counts_b = vec![Some(5), Some(0)];
681 assert_eq!(null_counts_b, expected_null_counts_b);
682
683 let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap())
685 .unwrap()
686 .into_iter()
687 .collect::<Vec<_>>();
688 let expected_row_counts_a = vec![Some(100), Some(200)];
689 assert_eq!(row_counts_a, expected_row_counts_a);
690 let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap())
691 .unwrap()
692 .into_iter()
693 .collect::<Vec<_>>();
694 let expected_row_counts_b = vec![Some(100), Some(200)];
695 assert_eq!(row_counts_b, expected_row_counts_b);
696
697 let values = HashSet::from([ScalarValue::from(0i32)]);
699 assert!(pruning_stats.contained(&column_a, &values).is_none());
700 assert!(pruning_stats.contained(&column_b, &values).is_none());
701
702 assert_eq!(pruning_stats.num_containers(), 2);
704
705 let column_c = Column::new_unqualified("c");
707 assert!(pruning_stats.min_values(&column_c).is_none());
708 assert!(pruning_stats.max_values(&column_c).is_none());
709 assert!(pruning_stats.null_counts(&column_c).is_none());
710 let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap())
716 .unwrap()
717 .into_iter()
718 .collect::<Vec<_>>();
719 let expected_row_counts_c = vec![Some(100), Some(200)];
720 assert_eq!(row_counts_c, expected_row_counts_c);
721 assert!(pruning_stats.contained(&column_c, &values).is_none());
722
723 let column_d = Column::new_unqualified("d");
725 assert!(pruning_stats.min_values(&column_d).is_none());
726 assert!(pruning_stats.max_values(&column_d).is_none());
727 assert!(pruning_stats.null_counts(&column_d).is_none());
728 assert!(pruning_stats.row_counts(&column_d).is_none());
729 assert!(pruning_stats.contained(&column_d, &values).is_none());
730 }
731
732 #[test]
733 fn test_statistics_pruning_statistics_empty() {
734 let statistics = vec![];
735 let schema = Arc::new(Schema::new(vec![
736 Field::new("a", DataType::Int32, false),
737 Field::new("b", DataType::Int32, false),
738 Field::new("c", DataType::Int32, false),
739 ]));
740 let pruning_stats = PrunableStatistics::new(statistics, schema);
741
742 let column_a = Column::new_unqualified("a");
743 let column_b = Column::new_unqualified("b");
744
745 assert!(pruning_stats.min_values(&column_a).is_none());
747 assert!(pruning_stats.max_values(&column_a).is_none());
748 assert!(pruning_stats.min_values(&column_b).is_none());
749 assert!(pruning_stats.max_values(&column_b).is_none());
750
751 assert!(pruning_stats.null_counts(&column_a).is_none());
753 assert!(pruning_stats.null_counts(&column_b).is_none());
754
755 assert!(pruning_stats.row_counts(&column_a).is_none());
757 assert!(pruning_stats.row_counts(&column_b).is_none());
758
759 let values = HashSet::from([ScalarValue::from(1i32)]);
761 assert!(pruning_stats.contained(&column_a, &values).is_none());
762 }
763
764 #[test]
765 fn test_composite_pruning_statistics_partition_and_file() {
766 let partition_values = vec![
768 vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
769 vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
770 ];
771 let partition_fields = vec![
772 Arc::new(Field::new("part_a", DataType::Int32, false)),
773 Arc::new(Field::new("part_b", DataType::Int32, false)),
774 ];
775 let partition_stats =
776 PartitionPruningStatistics::try_new(partition_values, partition_fields)
777 .unwrap();
778
779 let file_statistics = vec![
781 Arc::new(
782 Statistics::default()
783 .add_column_statistics(
784 ColumnStatistics::new_unknown()
785 .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
786 .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
787 .with_null_count(Precision::Exact(0)),
788 )
789 .add_column_statistics(
790 ColumnStatistics::new_unknown()
791 .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
792 .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
793 .with_null_count(Precision::Exact(5)),
794 )
795 .with_num_rows(Precision::Exact(100)),
796 ),
797 Arc::new(
798 Statistics::default()
799 .add_column_statistics(
800 ColumnStatistics::new_unknown()
801 .with_min_value(Precision::Exact(ScalarValue::from(500i32)))
802 .with_max_value(Precision::Exact(ScalarValue::from(600i32)))
803 .with_null_count(Precision::Exact(10)),
804 )
805 .add_column_statistics(
806 ColumnStatistics::new_unknown()
807 .with_min_value(Precision::Exact(ScalarValue::from(700i32)))
808 .with_max_value(Precision::Exact(ScalarValue::from(800i32)))
809 .with_null_count(Precision::Exact(0)),
810 )
811 .with_num_rows(Precision::Exact(200)),
812 ),
813 ];
814
815 let file_schema = Arc::new(Schema::new(vec![
816 Field::new("col_x", DataType::Int32, false),
817 Field::new("col_y", DataType::Int32, false),
818 ]));
819 let file_stats = PrunableStatistics::new(file_statistics, file_schema);
820
821 let composite_stats = CompositePruningStatistics::new(vec![
823 Box::new(partition_stats),
824 Box::new(file_stats),
825 ]);
826
827 let part_a = Column::new_unqualified("part_a");
829 let part_b = Column::new_unqualified("part_b");
830
831 let col_x = Column::new_unqualified("col_x");
833 let col_y = Column::new_unqualified("col_y");
834
835 let min_values_part_a =
837 as_int32_array(&composite_stats.min_values(&part_a).unwrap())
838 .unwrap()
839 .into_iter()
840 .collect::<Vec<_>>();
841 let expected_values_part_a = vec![Some(1), Some(2)];
842 assert_eq!(min_values_part_a, expected_values_part_a);
843
844 let max_values_part_a =
845 as_int32_array(&composite_stats.max_values(&part_a).unwrap())
846 .unwrap()
847 .into_iter()
848 .collect::<Vec<_>>();
849 assert_eq!(max_values_part_a, expected_values_part_a);
851
852 let min_values_part_b =
853 as_int32_array(&composite_stats.min_values(&part_b).unwrap())
854 .unwrap()
855 .into_iter()
856 .collect::<Vec<_>>();
857 let expected_values_part_b = vec![Some(10), Some(20)];
858 assert_eq!(min_values_part_b, expected_values_part_b);
859
860 let min_values_col_x =
862 as_int32_array(&composite_stats.min_values(&col_x).unwrap())
863 .unwrap()
864 .into_iter()
865 .collect::<Vec<_>>();
866 let expected_values_col_x = vec![Some(100), Some(500)];
867 assert_eq!(min_values_col_x, expected_values_col_x);
868
869 let max_values_col_x =
870 as_int32_array(&composite_stats.max_values(&col_x).unwrap())
871 .unwrap()
872 .into_iter()
873 .collect::<Vec<_>>();
874 let expected_max_values_col_x = vec![Some(200), Some(600)];
875 assert_eq!(max_values_col_x, expected_max_values_col_x);
876
877 let min_values_col_y =
878 as_int32_array(&composite_stats.min_values(&col_y).unwrap())
879 .unwrap()
880 .into_iter()
881 .collect::<Vec<_>>();
882 let expected_values_col_y = vec![Some(300), Some(700)];
883 assert_eq!(min_values_col_y, expected_values_col_y);
884
885 assert!(composite_stats.null_counts(&part_a).is_none());
887 assert!(composite_stats.null_counts(&part_b).is_none());
888
889 let null_counts_col_x =
890 as_uint64_array(&composite_stats.null_counts(&col_x).unwrap())
891 .unwrap()
892 .into_iter()
893 .collect::<Vec<_>>();
894 let expected_null_counts_col_x = vec![Some(0), Some(10)];
895 assert_eq!(null_counts_col_x, expected_null_counts_col_x);
896
897 assert!(composite_stats.row_counts(&part_a).is_none());
899 let row_counts_col_x =
900 as_uint64_array(&composite_stats.row_counts(&col_x).unwrap())
901 .unwrap()
902 .into_iter()
903 .collect::<Vec<_>>();
904 let expected_row_counts = vec![Some(100), Some(200)];
905 assert_eq!(row_counts_col_x, expected_row_counts);
906
907 let values = HashSet::from([ScalarValue::from(1i32)]);
909 let contained_part_a = composite_stats.contained(&part_a, &values).unwrap();
910 let expected_contained_part_a = BooleanArray::from(vec![true, false]);
911 assert_eq!(contained_part_a, expected_contained_part_a);
912
913 assert!(composite_stats.contained(&col_x, &values).is_none());
915
916 let non_existent = Column::new_unqualified("non_existent");
918 assert!(composite_stats.min_values(&non_existent).is_none());
919 assert!(composite_stats.max_values(&non_existent).is_none());
920 assert!(composite_stats.null_counts(&non_existent).is_none());
921 assert!(composite_stats.row_counts(&non_existent).is_none());
922 assert!(composite_stats.contained(&non_existent, &values).is_none());
923
924 assert_eq!(composite_stats.num_containers(), 2);
926 }
927
928 #[test]
929 fn test_composite_pruning_statistics_priority() {
930 let first_statistics = vec![
935 Arc::new(
936 Statistics::default()
937 .add_column_statistics(
938 ColumnStatistics::new_unknown()
939 .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
940 .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
941 .with_null_count(Precision::Exact(0)),
942 )
943 .with_num_rows(Precision::Exact(100)),
944 ),
945 Arc::new(
946 Statistics::default()
947 .add_column_statistics(
948 ColumnStatistics::new_unknown()
949 .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
950 .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
951 .with_null_count(Precision::Exact(5)),
952 )
953 .with_num_rows(Precision::Exact(200)),
954 ),
955 ];
956
957 let first_schema = Arc::new(Schema::new(vec![Field::new(
958 "col_a",
959 DataType::Int32,
960 false,
961 )]));
962 let first_stats = PrunableStatistics::new(first_statistics, first_schema);
963
964 let second_statistics = vec![
966 Arc::new(
967 Statistics::default()
968 .add_column_statistics(
969 ColumnStatistics::new_unknown()
970 .with_min_value(Precision::Exact(ScalarValue::from(1000i32)))
971 .with_max_value(Precision::Exact(ScalarValue::from(2000i32)))
972 .with_null_count(Precision::Exact(10)),
973 )
974 .with_num_rows(Precision::Exact(1000)),
975 ),
976 Arc::new(
977 Statistics::default()
978 .add_column_statistics(
979 ColumnStatistics::new_unknown()
980 .with_min_value(Precision::Exact(ScalarValue::from(3000i32)))
981 .with_max_value(Precision::Exact(ScalarValue::from(4000i32)))
982 .with_null_count(Precision::Exact(20)),
983 )
984 .with_num_rows(Precision::Exact(2000)),
985 ),
986 ];
987
988 let second_schema = Arc::new(Schema::new(vec![Field::new(
989 "col_a",
990 DataType::Int32,
991 false,
992 )]));
993 let second_stats = PrunableStatistics::new(second_statistics, second_schema);
994
995 let composite_stats = CompositePruningStatistics::new(vec![
997 Box::new(first_stats.clone()),
998 Box::new(second_stats.clone()),
999 ]);
1000
1001 let col_a = Column::new_unqualified("col_a");
1002
1003 let min_values = as_int32_array(&composite_stats.min_values(&col_a).unwrap())
1005 .unwrap()
1006 .into_iter()
1007 .collect::<Vec<_>>();
1008 let expected_min_values = vec![Some(100), Some(300)];
1009 assert_eq!(min_values, expected_min_values);
1010
1011 let max_values = as_int32_array(&composite_stats.max_values(&col_a).unwrap())
1012 .unwrap()
1013 .into_iter()
1014 .collect::<Vec<_>>();
1015 let expected_max_values = vec![Some(200), Some(400)];
1016 assert_eq!(max_values, expected_max_values);
1017
1018 let null_counts = as_uint64_array(&composite_stats.null_counts(&col_a).unwrap())
1019 .unwrap()
1020 .into_iter()
1021 .collect::<Vec<_>>();
1022 let expected_null_counts = vec![Some(0), Some(5)];
1023 assert_eq!(null_counts, expected_null_counts);
1024
1025 let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap())
1026 .unwrap()
1027 .into_iter()
1028 .collect::<Vec<_>>();
1029 let expected_row_counts = vec![Some(100), Some(200)];
1030 assert_eq!(row_counts, expected_row_counts);
1031
1032 let composite_stats_reversed = CompositePruningStatistics::new(vec![
1036 Box::new(second_stats.clone()),
1037 Box::new(first_stats.clone()),
1038 ]);
1039
1040 let min_values =
1042 as_int32_array(&composite_stats_reversed.min_values(&col_a).unwrap())
1043 .unwrap()
1044 .into_iter()
1045 .collect::<Vec<_>>();
1046 let expected_min_values = vec![Some(1000), Some(3000)];
1047 assert_eq!(min_values, expected_min_values);
1048
1049 let max_values =
1050 as_int32_array(&composite_stats_reversed.max_values(&col_a).unwrap())
1051 .unwrap()
1052 .into_iter()
1053 .collect::<Vec<_>>();
1054 let expected_max_values = vec![Some(2000), Some(4000)];
1055 assert_eq!(max_values, expected_max_values);
1056
1057 let null_counts =
1058 as_uint64_array(&composite_stats_reversed.null_counts(&col_a).unwrap())
1059 .unwrap()
1060 .into_iter()
1061 .collect::<Vec<_>>();
1062 let expected_null_counts = vec![Some(10), Some(20)];
1063 assert_eq!(null_counts, expected_null_counts);
1064
1065 let row_counts =
1066 as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap())
1067 .unwrap()
1068 .into_iter()
1069 .collect::<Vec<_>>();
1070 let expected_row_counts = vec![Some(1000), Some(2000)];
1071 assert_eq!(row_counts, expected_row_counts);
1072 }
1073
1074 #[test]
1075 fn test_composite_pruning_statistics_empty_and_mismatched_containers() {
1076 let result = std::panic::catch_unwind(|| {
1079 CompositePruningStatistics::new(vec![]);
1080 });
1081 assert!(result.is_err());
1082
1083 let result = std::panic::catch_unwind(|| {
1085 let partition_values_1 = vec![
1088 vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
1089 vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
1090 ];
1091 let partition_fields_1 = vec![
1092 Arc::new(Field::new("part_a", DataType::Int32, false)),
1093 Arc::new(Field::new("part_b", DataType::Int32, false)),
1094 ];
1095 let partition_stats_1 = PartitionPruningStatistics::try_new(
1096 partition_values_1,
1097 partition_fields_1,
1098 )
1099 .unwrap();
1100 let partition_values_2 = vec![
1101 vec![ScalarValue::from(3i32), ScalarValue::from(30i32)],
1102 vec![ScalarValue::from(4i32), ScalarValue::from(40i32)],
1103 vec![ScalarValue::from(5i32), ScalarValue::from(50i32)],
1104 ];
1105 let partition_fields_2 = vec![
1106 Arc::new(Field::new("part_x", DataType::Int32, false)),
1107 Arc::new(Field::new("part_y", DataType::Int32, false)),
1108 ];
1109 let partition_stats_2 = PartitionPruningStatistics::try_new(
1110 partition_values_2,
1111 partition_fields_2,
1112 )
1113 .unwrap();
1114
1115 CompositePruningStatistics::new(vec![
1116 Box::new(partition_stats_1),
1117 Box::new(partition_stats_2),
1118 ]);
1119 });
1120 assert!(result.is_err());
1121 }
1122}