datafusion_common/
pruning.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{Array, NullArray, UInt64Array};
19use arrow::array::{ArrayRef, BooleanArray};
20use arrow::datatypes::{FieldRef, Schema, SchemaRef};
21use std::collections::HashSet;
22use std::sync::Arc;
23
24use crate::error::DataFusionError;
25use crate::stats::Precision;
26use crate::{Column, Statistics};
27use crate::{ColumnStatistics, ScalarValue};
28
29/// A source of runtime statistical information to [`PruningPredicate`]s.
30///
31/// # Supported Information
32///
33/// 1. Minimum and maximum values for columns
34///
35/// 2. Null counts and row counts for columns
36///
37/// 3. Whether the values in a column are contained in a set of literals
38///
39/// # Vectorized Interface
40///
41/// Information for containers / files are returned as Arrow [`ArrayRef`], so
42/// the evaluation happens once on a single `RecordBatch`, which amortizes the
43/// overhead of evaluating the predicate. This is important when pruning 1000s
44/// of containers which often happens in analytic systems that have 1000s of
45/// potential files to consider.
46///
47/// For example, for the following three files with a single column `a`:
48/// ```text
49/// file1: column a: min=5, max=10
50/// file2: column a: No stats
51/// file2: column a: min=20, max=30
52/// ```
53///
54/// PruningStatistics would return:
55///
56/// ```text
57/// min_values("a") -> Some([5, Null, 20])
58/// max_values("a") -> Some([10, Null, 30])
59/// min_values("X") -> None
60/// ```
61///
62/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html
63pub trait PruningStatistics {
64    /// Return the minimum values for the named column, if known.
65    ///
66    /// If the minimum value for a particular container is not known, the
67    /// returned array should have `null` in that row. If the minimum value is
68    /// not known for any row, return `None`.
69    ///
70    /// Note: the returned array must contain [`Self::num_containers`] rows
71    fn min_values(&self, column: &Column) -> Option<ArrayRef>;
72
73    /// Return the maximum values for the named column, if known.
74    ///
75    /// See [`Self::min_values`] for when to return `None` and null values.
76    ///
77    /// Note: the returned array must contain [`Self::num_containers`] rows
78    fn max_values(&self, column: &Column) -> Option<ArrayRef>;
79
80    /// Return the number of containers (e.g. Row Groups) being pruned with
81    /// these statistics.
82    ///
83    /// This value corresponds to the size of the [`ArrayRef`] returned by
84    /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
85    /// and [`Self::row_counts`].
86    fn num_containers(&self) -> usize;
87
88    /// Return the number of null values for the named column as an
89    /// [`UInt64Array`]
90    ///
91    /// See [`Self::min_values`] for when to return `None` and null values.
92    ///
93    /// Note: the returned array must contain [`Self::num_containers`] rows
94    ///
95    /// [`UInt64Array`]: arrow::array::UInt64Array
96    fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
97
98    /// Return the number of rows for the named column in each container
99    /// as an [`UInt64Array`].
100    ///
101    /// See [`Self::min_values`] for when to return `None` and null values.
102    ///
103    /// Note: the returned array must contain [`Self::num_containers`] rows
104    ///
105    /// [`UInt64Array`]: arrow::array::UInt64Array
106    fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
107
108    /// Returns [`BooleanArray`] where each row represents information known
109    /// about specific literal `values` in a column.
110    ///
111    /// For example, Parquet Bloom Filters implement this API to communicate
112    /// that `values` are known not to be present in a Row Group.
113    ///
114    /// The returned array has one row for each container, with the following
115    /// meanings:
116    /// * `true` if the values in `column`  ONLY contain values from `values`
117    /// * `false` if the values in `column` are NOT ANY of `values`
118    /// * `null` if the neither of the above holds or is unknown.
119    ///
120    /// If these statistics can not determine column membership for any
121    /// container, return `None` (the default).
122    ///
123    /// Note: the returned array must contain [`Self::num_containers`] rows
124    fn contained(
125        &self,
126        column: &Column,
127        values: &HashSet<ScalarValue>,
128    ) -> Option<BooleanArray>;
129}
130
131/// Prune files based on their partition values.
132///
133/// This is used both at planning time and execution time to prune
134/// files based on their partition values.
135/// This feeds into [`CompositePruningStatistics`] to allow pruning
136/// with filters that depend both on partition columns and data columns
137/// (e.g. `WHERE partition_col = data_col`).
138#[derive(Clone)]
139pub struct PartitionPruningStatistics {
140    /// Values for each column for each container.
141    ///
142    /// The outer vectors represent the columns while the inner vectors
143    /// represent the containers. The order must match the order of the
144    /// partition columns in [`PartitionPruningStatistics::partition_schema`].
145    partition_values: Vec<ArrayRef>,
146    /// The number of containers.
147    ///
148    /// Stored since the partition values are column-major and if
149    /// there are no columns we wouldn't know the number of containers.
150    num_containers: usize,
151    /// The schema of the partition columns.
152    ///
153    /// This must **not** be the schema of the entire file or table: it must
154    /// only be the schema of the partition columns, in the same order as the
155    /// values in [`PartitionPruningStatistics::partition_values`].
156    partition_schema: SchemaRef,
157}
158
159impl PartitionPruningStatistics {
160    /// Create a new instance of [`PartitionPruningStatistics`].
161    ///
162    /// Args:
163    /// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
164    ///   The outer vector represents the containers while the inner
165    ///   vector represents the partition values for each column.
166    ///   Note that this is the **opposite** of the order of the
167    ///   partition columns in `PartitionPruningStatistics::partition_schema`.
168    /// * `partition_schema`: The schema of the partition columns.
169    ///   This must **not** be the schema of the entire file or table:
170    ///   instead it must only be the schema of the partition columns,
171    ///   in the same order as the values in `partition_values`.
172    pub fn try_new(
173        partition_values: Vec<Vec<ScalarValue>>,
174        partition_fields: Vec<FieldRef>,
175    ) -> Result<Self, DataFusionError> {
176        let num_containers = partition_values.len();
177        let partition_schema = Arc::new(Schema::new(partition_fields));
178        let mut partition_values_by_column =
179            vec![
180                Vec::with_capacity(partition_values.len());
181                partition_schema.fields().len()
182            ];
183        for partition_value in partition_values {
184            for (i, value) in partition_value.into_iter().enumerate() {
185                partition_values_by_column[i].push(value);
186            }
187        }
188        Ok(Self {
189            partition_values: partition_values_by_column
190                .into_iter()
191                .map(|v| {
192                    if v.is_empty() {
193                        Ok(Arc::new(NullArray::new(0)) as ArrayRef)
194                    } else {
195                        ScalarValue::iter_to_array(v)
196                    }
197                })
198                .collect::<Result<Vec<_>, _>>()?,
199            num_containers,
200            partition_schema,
201        })
202    }
203}
204
205impl PruningStatistics for PartitionPruningStatistics {
206    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
207        let index = self.partition_schema.index_of(column.name()).ok()?;
208        self.partition_values.get(index).and_then(|v| {
209            if v.is_empty() || v.null_count() == v.len() {
210                // If the array is empty or all nulls, return None
211                None
212            } else {
213                // Otherwise, return the array as is
214                Some(Arc::clone(v))
215            }
216        })
217    }
218
219    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
220        self.min_values(column)
221    }
222
223    fn num_containers(&self) -> usize {
224        self.num_containers
225    }
226
227    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
228        None
229    }
230
231    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
232        None
233    }
234
235    fn contained(
236        &self,
237        column: &Column,
238        values: &HashSet<ScalarValue>,
239    ) -> Option<BooleanArray> {
240        let index = self.partition_schema.index_of(column.name()).ok()?;
241        let array = self.partition_values.get(index)?;
242        let boolean_array = values.iter().try_fold(None, |acc, v| {
243            let arrow_value = v.to_scalar().ok()?;
244            let eq_result = arrow::compute::kernels::cmp::eq(array, &arrow_value).ok()?;
245            match acc {
246                None => Some(Some(eq_result)),
247                Some(acc_array) => {
248                    arrow::compute::kernels::boolean::and(&acc_array, &eq_result)
249                        .map(Some)
250                        .ok()
251                }
252            }
253        })??;
254        // If the boolean array is empty or all null values, return None
255        if boolean_array.is_empty() || boolean_array.null_count() == boolean_array.len() {
256            None
257        } else {
258            Some(boolean_array)
259        }
260    }
261}
262
263/// Prune a set of containers represented by their statistics.
264///
265/// Each [`Statistics`] represents a "container" -- some collection of data
266/// that has statistics of its columns.
267///
268/// It is up to the caller to decide what each container represents. For
269/// example, they can come from a file (e.g. [`PartitionedFile`]) or a set of of
270/// files (e.g. [`FileGroup`])
271///
272/// [`PartitionedFile`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.PartitionedFile.html
273/// [`FileGroup`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileGroup.html
274#[derive(Clone)]
275pub struct PrunableStatistics {
276    /// Statistics for each container.
277    /// These are taken as a reference since they may be rather large / expensive to clone
278    /// and we often won't return all of them as ArrayRefs (we only return the columns the predicate requests).
279    statistics: Vec<Arc<Statistics>>,
280    /// The schema of the file these statistics are for.
281    schema: SchemaRef,
282}
283
284impl PrunableStatistics {
285    /// Create a new instance of [`PrunableStatistics`].
286    /// Each [`Statistics`] represents a container (e.g. a file or a partition of files).
287    /// The `schema` is the schema of the data in the containers and should apply to all files.
288    pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
289        Self { statistics, schema }
290    }
291
292    fn get_exact_column_statistics(
293        &self,
294        column: &Column,
295        get_stat: impl Fn(&ColumnStatistics) -> &Precision<ScalarValue>,
296    ) -> Option<ArrayRef> {
297        let index = self.schema.index_of(column.name()).ok()?;
298        let mut has_value = false;
299        match ScalarValue::iter_to_array(self.statistics.iter().map(|s| {
300            s.column_statistics
301                .get(index)
302                .and_then(|stat| {
303                    if let Precision::Exact(min) = get_stat(stat) {
304                        has_value = true;
305                        Some(min.clone())
306                    } else {
307                        None
308                    }
309                })
310                .unwrap_or(ScalarValue::Null)
311        })) {
312            // If there is any non-null value and no errors, return the array
313            Ok(array) => has_value.then_some(array),
314            Err(_) => {
315                log::warn!(
316                    "Failed to convert min values to array for column {}",
317                    column.name()
318                );
319                None
320            }
321        }
322    }
323}
324
325impl PruningStatistics for PrunableStatistics {
326    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
327        self.get_exact_column_statistics(column, |stat| &stat.min_value)
328    }
329
330    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
331        self.get_exact_column_statistics(column, |stat| &stat.max_value)
332    }
333
334    fn num_containers(&self) -> usize {
335        self.statistics.len()
336    }
337
338    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
339        let index = self.schema.index_of(column.name()).ok()?;
340        if self.statistics.iter().any(|s| {
341            s.column_statistics
342                .get(index)
343                .is_some_and(|stat| stat.null_count.is_exact().unwrap_or(false))
344        }) {
345            Some(Arc::new(
346                self.statistics
347                    .iter()
348                    .map(|s| {
349                        s.column_statistics.get(index).and_then(|stat| {
350                            if let Precision::Exact(null_count) = &stat.null_count {
351                                u64::try_from(*null_count).ok()
352                            } else {
353                                None
354                            }
355                        })
356                    })
357                    .collect::<UInt64Array>(),
358            ))
359        } else {
360            None
361        }
362    }
363
364    fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
365        // If the column does not exist in the schema, return None
366        if self.schema.index_of(column.name()).is_err() {
367            return None;
368        }
369        if self
370            .statistics
371            .iter()
372            .any(|s| s.num_rows.is_exact().unwrap_or(false))
373        {
374            Some(Arc::new(
375                self.statistics
376                    .iter()
377                    .map(|s| {
378                        if let Precision::Exact(row_count) = &s.num_rows {
379                            u64::try_from(*row_count).ok()
380                        } else {
381                            None
382                        }
383                    })
384                    .collect::<UInt64Array>(),
385            ))
386        } else {
387            None
388        }
389    }
390
391    fn contained(
392        &self,
393        _column: &Column,
394        _values: &HashSet<ScalarValue>,
395    ) -> Option<BooleanArray> {
396        None
397    }
398}
399
400/// Combine multiple [`PruningStatistics`] into a single
401/// [`CompositePruningStatistics`].
402/// This can be used to combine statistics from different sources,
403/// for example partition values and file statistics.
404/// This allows pruning with filters that depend on multiple sources of statistics,
405/// such as `WHERE partition_col = data_col`.
406/// This is done by iterating over the statistics and returning the first
407/// one that has information for the requested column.
408/// If multiple statistics have information for the same column,
409/// the first one is returned without any regard for completeness or accuracy.
410/// That is: if the first statistics has information for a column, even if it is incomplete,
411/// that is returned even if a later statistics has more complete information.
412pub struct CompositePruningStatistics {
413    pub statistics: Vec<Box<dyn PruningStatistics>>,
414}
415
416impl CompositePruningStatistics {
417    /// Create a new instance of [`CompositePruningStatistics`] from
418    /// a vector of [`PruningStatistics`].
419    pub fn new(statistics: Vec<Box<dyn PruningStatistics>>) -> Self {
420        assert!(!statistics.is_empty());
421        // Check that all statistics have the same number of containers
422        let num_containers = statistics[0].num_containers();
423        for stats in &statistics {
424            assert_eq!(num_containers, stats.num_containers());
425        }
426        Self { statistics }
427    }
428}
429
430impl PruningStatistics for CompositePruningStatistics {
431    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
432        for stats in &self.statistics {
433            if let Some(array) = stats.min_values(column) {
434                return Some(array);
435            }
436        }
437        None
438    }
439
440    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
441        for stats in &self.statistics {
442            if let Some(array) = stats.max_values(column) {
443                return Some(array);
444            }
445        }
446        None
447    }
448
449    fn num_containers(&self) -> usize {
450        self.statistics[0].num_containers()
451    }
452
453    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
454        for stats in &self.statistics {
455            if let Some(array) = stats.null_counts(column) {
456                return Some(array);
457            }
458        }
459        None
460    }
461
462    fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
463        for stats in &self.statistics {
464            if let Some(array) = stats.row_counts(column) {
465                return Some(array);
466            }
467        }
468        None
469    }
470
471    fn contained(
472        &self,
473        column: &Column,
474        values: &HashSet<ScalarValue>,
475    ) -> Option<BooleanArray> {
476        for stats in &self.statistics {
477            if let Some(array) = stats.contained(column, values) {
478                return Some(array);
479            }
480        }
481        None
482    }
483}
484
485#[cfg(test)]
486mod tests {
487    use crate::{
488        cast::{as_int32_array, as_uint64_array},
489        ColumnStatistics,
490    };
491
492    use super::*;
493    use arrow::datatypes::{DataType, Field};
494    use std::sync::Arc;
495
496    #[test]
497    fn test_partition_pruning_statistics() {
498        let partition_values = vec![
499            vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
500            vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
501        ];
502        let partition_fields = vec![
503            Arc::new(Field::new("a", DataType::Int32, false)),
504            Arc::new(Field::new("b", DataType::Int32, false)),
505        ];
506        let partition_stats =
507            PartitionPruningStatistics::try_new(partition_values, partition_fields)
508                .unwrap();
509
510        let column_a = Column::new_unqualified("a");
511        let column_b = Column::new_unqualified("b");
512
513        // Partition values don't know anything about nulls or row counts
514        assert!(partition_stats.null_counts(&column_a).is_none());
515        assert!(partition_stats.row_counts(&column_a).is_none());
516        assert!(partition_stats.null_counts(&column_b).is_none());
517        assert!(partition_stats.row_counts(&column_b).is_none());
518
519        // Min/max values are the same as the partition values
520        let min_values_a =
521            as_int32_array(&partition_stats.min_values(&column_a).unwrap())
522                .unwrap()
523                .into_iter()
524                .collect::<Vec<_>>();
525        let expected_values_a = vec![Some(1), Some(3)];
526        assert_eq!(min_values_a, expected_values_a);
527        let max_values_a =
528            as_int32_array(&partition_stats.max_values(&column_a).unwrap())
529                .unwrap()
530                .into_iter()
531                .collect::<Vec<_>>();
532        let expected_values_a = vec![Some(1), Some(3)];
533        assert_eq!(max_values_a, expected_values_a);
534
535        let min_values_b =
536            as_int32_array(&partition_stats.min_values(&column_b).unwrap())
537                .unwrap()
538                .into_iter()
539                .collect::<Vec<_>>();
540        let expected_values_b = vec![Some(2), Some(4)];
541        assert_eq!(min_values_b, expected_values_b);
542        let max_values_b =
543            as_int32_array(&partition_stats.max_values(&column_b).unwrap())
544                .unwrap()
545                .into_iter()
546                .collect::<Vec<_>>();
547        let expected_values_b = vec![Some(2), Some(4)];
548        assert_eq!(max_values_b, expected_values_b);
549
550        // Contained values are only true for the partition values
551        let values = HashSet::from([ScalarValue::from(1i32)]);
552        let contained_a = partition_stats.contained(&column_a, &values).unwrap();
553        let expected_contained_a = BooleanArray::from(vec![true, false]);
554        assert_eq!(contained_a, expected_contained_a);
555        let contained_b = partition_stats.contained(&column_b, &values).unwrap();
556        let expected_contained_b = BooleanArray::from(vec![false, false]);
557        assert_eq!(contained_b, expected_contained_b);
558
559        // The number of containers is the length of the partition values
560        assert_eq!(partition_stats.num_containers(), 2);
561    }
562
563    #[test]
564    fn test_partition_pruning_statistics_empty() {
565        let partition_values = vec![];
566        let partition_fields = vec![
567            Arc::new(Field::new("a", DataType::Int32, false)),
568            Arc::new(Field::new("b", DataType::Int32, false)),
569        ];
570        let partition_stats =
571            PartitionPruningStatistics::try_new(partition_values, partition_fields)
572                .unwrap();
573
574        let column_a = Column::new_unqualified("a");
575        let column_b = Column::new_unqualified("b");
576
577        // Partition values don't know anything about nulls or row counts
578        assert!(partition_stats.null_counts(&column_a).is_none());
579        assert!(partition_stats.row_counts(&column_a).is_none());
580        assert!(partition_stats.null_counts(&column_b).is_none());
581        assert!(partition_stats.row_counts(&column_b).is_none());
582
583        // Min/max values are all missing
584        assert!(partition_stats.min_values(&column_a).is_none());
585        assert!(partition_stats.max_values(&column_a).is_none());
586        assert!(partition_stats.min_values(&column_b).is_none());
587        assert!(partition_stats.max_values(&column_b).is_none());
588
589        // Contained values are all empty
590        let values = HashSet::from([ScalarValue::from(1i32)]);
591        assert!(partition_stats.contained(&column_a, &values).is_none());
592    }
593
594    #[test]
595    fn test_statistics_pruning_statistics() {
596        let statistics = vec![
597            Arc::new(
598                Statistics::default()
599                    .add_column_statistics(
600                        ColumnStatistics::new_unknown()
601                            .with_min_value(Precision::Exact(ScalarValue::from(0i32)))
602                            .with_max_value(Precision::Exact(ScalarValue::from(100i32)))
603                            .with_null_count(Precision::Exact(0)),
604                    )
605                    .add_column_statistics(
606                        ColumnStatistics::new_unknown()
607                            .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
608                            .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
609                            .with_null_count(Precision::Exact(5)),
610                    )
611                    .with_num_rows(Precision::Exact(100)),
612            ),
613            Arc::new(
614                Statistics::default()
615                    .add_column_statistics(
616                        ColumnStatistics::new_unknown()
617                            .with_min_value(Precision::Exact(ScalarValue::from(50i32)))
618                            .with_max_value(Precision::Exact(ScalarValue::from(300i32)))
619                            .with_null_count(Precision::Exact(10)),
620                    )
621                    .add_column_statistics(
622                        ColumnStatistics::new_unknown()
623                            .with_min_value(Precision::Exact(ScalarValue::from(200i32)))
624                            .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
625                            .with_null_count(Precision::Exact(0)),
626                    )
627                    .with_num_rows(Precision::Exact(200)),
628            ),
629        ];
630
631        let schema = Arc::new(Schema::new(vec![
632            Field::new("a", DataType::Int32, false),
633            Field::new("b", DataType::Int32, false),
634            Field::new("c", DataType::Int32, false),
635        ]));
636        let pruning_stats = PrunableStatistics::new(statistics, schema);
637
638        let column_a = Column::new_unqualified("a");
639        let column_b = Column::new_unqualified("b");
640
641        // Min/max values are the same as the statistics
642        let min_values_a = as_int32_array(&pruning_stats.min_values(&column_a).unwrap())
643            .unwrap()
644            .into_iter()
645            .collect::<Vec<_>>();
646        let expected_values_a = vec![Some(0), Some(50)];
647        assert_eq!(min_values_a, expected_values_a);
648        let max_values_a = as_int32_array(&pruning_stats.max_values(&column_a).unwrap())
649            .unwrap()
650            .into_iter()
651            .collect::<Vec<_>>();
652        let expected_values_a = vec![Some(100), Some(300)];
653        assert_eq!(max_values_a, expected_values_a);
654        let min_values_b = as_int32_array(&pruning_stats.min_values(&column_b).unwrap())
655            .unwrap()
656            .into_iter()
657            .collect::<Vec<_>>();
658        let expected_values_b = vec![Some(100), Some(200)];
659        assert_eq!(min_values_b, expected_values_b);
660        let max_values_b = as_int32_array(&pruning_stats.max_values(&column_b).unwrap())
661            .unwrap()
662            .into_iter()
663            .collect::<Vec<_>>();
664        let expected_values_b = vec![Some(200), Some(400)];
665        assert_eq!(max_values_b, expected_values_b);
666
667        // Null counts are the same as the statistics
668        let null_counts_a =
669            as_uint64_array(&pruning_stats.null_counts(&column_a).unwrap())
670                .unwrap()
671                .into_iter()
672                .collect::<Vec<_>>();
673        let expected_null_counts_a = vec![Some(0), Some(10)];
674        assert_eq!(null_counts_a, expected_null_counts_a);
675        let null_counts_b =
676            as_uint64_array(&pruning_stats.null_counts(&column_b).unwrap())
677                .unwrap()
678                .into_iter()
679                .collect::<Vec<_>>();
680        let expected_null_counts_b = vec![Some(5), Some(0)];
681        assert_eq!(null_counts_b, expected_null_counts_b);
682
683        // Row counts are the same as the statistics
684        let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap())
685            .unwrap()
686            .into_iter()
687            .collect::<Vec<_>>();
688        let expected_row_counts_a = vec![Some(100), Some(200)];
689        assert_eq!(row_counts_a, expected_row_counts_a);
690        let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap())
691            .unwrap()
692            .into_iter()
693            .collect::<Vec<_>>();
694        let expected_row_counts_b = vec![Some(100), Some(200)];
695        assert_eq!(row_counts_b, expected_row_counts_b);
696
697        // Contained values are all null/missing (we can't know this just from statistics)
698        let values = HashSet::from([ScalarValue::from(0i32)]);
699        assert!(pruning_stats.contained(&column_a, &values).is_none());
700        assert!(pruning_stats.contained(&column_b, &values).is_none());
701
702        // The number of containers is the length of the statistics
703        assert_eq!(pruning_stats.num_containers(), 2);
704
705        // Test with a column that has no statistics
706        let column_c = Column::new_unqualified("c");
707        assert!(pruning_stats.min_values(&column_c).is_none());
708        assert!(pruning_stats.max_values(&column_c).is_none());
709        assert!(pruning_stats.null_counts(&column_c).is_none());
710        // Since row counts uses the first column that has row counts we get them back even
711        // if this columns does not have them set.
712        // This is debatable, personally I think `row_count` should not take a `Column` as an argument
713        // at all since all columns should have the same number of rows.
714        // But for now we just document the current behavior in this test.
715        let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap())
716            .unwrap()
717            .into_iter()
718            .collect::<Vec<_>>();
719        let expected_row_counts_c = vec![Some(100), Some(200)];
720        assert_eq!(row_counts_c, expected_row_counts_c);
721        assert!(pruning_stats.contained(&column_c, &values).is_none());
722
723        // Test with a column that doesn't exist
724        let column_d = Column::new_unqualified("d");
725        assert!(pruning_stats.min_values(&column_d).is_none());
726        assert!(pruning_stats.max_values(&column_d).is_none());
727        assert!(pruning_stats.null_counts(&column_d).is_none());
728        assert!(pruning_stats.row_counts(&column_d).is_none());
729        assert!(pruning_stats.contained(&column_d, &values).is_none());
730    }
731
732    #[test]
733    fn test_statistics_pruning_statistics_empty() {
734        let statistics = vec![];
735        let schema = Arc::new(Schema::new(vec![
736            Field::new("a", DataType::Int32, false),
737            Field::new("b", DataType::Int32, false),
738            Field::new("c", DataType::Int32, false),
739        ]));
740        let pruning_stats = PrunableStatistics::new(statistics, schema);
741
742        let column_a = Column::new_unqualified("a");
743        let column_b = Column::new_unqualified("b");
744
745        // Min/max values are all missing
746        assert!(pruning_stats.min_values(&column_a).is_none());
747        assert!(pruning_stats.max_values(&column_a).is_none());
748        assert!(pruning_stats.min_values(&column_b).is_none());
749        assert!(pruning_stats.max_values(&column_b).is_none());
750
751        // Null counts are all missing
752        assert!(pruning_stats.null_counts(&column_a).is_none());
753        assert!(pruning_stats.null_counts(&column_b).is_none());
754
755        // Row counts are all missing
756        assert!(pruning_stats.row_counts(&column_a).is_none());
757        assert!(pruning_stats.row_counts(&column_b).is_none());
758
759        // Contained values are all empty
760        let values = HashSet::from([ScalarValue::from(1i32)]);
761        assert!(pruning_stats.contained(&column_a, &values).is_none());
762    }
763
764    #[test]
765    fn test_composite_pruning_statistics_partition_and_file() {
766        // Create partition statistics
767        let partition_values = vec![
768            vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
769            vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
770        ];
771        let partition_fields = vec![
772            Arc::new(Field::new("part_a", DataType::Int32, false)),
773            Arc::new(Field::new("part_b", DataType::Int32, false)),
774        ];
775        let partition_stats =
776            PartitionPruningStatistics::try_new(partition_values, partition_fields)
777                .unwrap();
778
779        // Create file statistics
780        let file_statistics = vec![
781            Arc::new(
782                Statistics::default()
783                    .add_column_statistics(
784                        ColumnStatistics::new_unknown()
785                            .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
786                            .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
787                            .with_null_count(Precision::Exact(0)),
788                    )
789                    .add_column_statistics(
790                        ColumnStatistics::new_unknown()
791                            .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
792                            .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
793                            .with_null_count(Precision::Exact(5)),
794                    )
795                    .with_num_rows(Precision::Exact(100)),
796            ),
797            Arc::new(
798                Statistics::default()
799                    .add_column_statistics(
800                        ColumnStatistics::new_unknown()
801                            .with_min_value(Precision::Exact(ScalarValue::from(500i32)))
802                            .with_max_value(Precision::Exact(ScalarValue::from(600i32)))
803                            .with_null_count(Precision::Exact(10)),
804                    )
805                    .add_column_statistics(
806                        ColumnStatistics::new_unknown()
807                            .with_min_value(Precision::Exact(ScalarValue::from(700i32)))
808                            .with_max_value(Precision::Exact(ScalarValue::from(800i32)))
809                            .with_null_count(Precision::Exact(0)),
810                    )
811                    .with_num_rows(Precision::Exact(200)),
812            ),
813        ];
814
815        let file_schema = Arc::new(Schema::new(vec![
816            Field::new("col_x", DataType::Int32, false),
817            Field::new("col_y", DataType::Int32, false),
818        ]));
819        let file_stats = PrunableStatistics::new(file_statistics, file_schema);
820
821        // Create composite statistics
822        let composite_stats = CompositePruningStatistics::new(vec![
823            Box::new(partition_stats),
824            Box::new(file_stats),
825        ]);
826
827        // Test accessing columns that are only in partition statistics
828        let part_a = Column::new_unqualified("part_a");
829        let part_b = Column::new_unqualified("part_b");
830
831        // Test accessing columns that are only in file statistics
832        let col_x = Column::new_unqualified("col_x");
833        let col_y = Column::new_unqualified("col_y");
834
835        // For partition columns, should get values from partition statistics
836        let min_values_part_a =
837            as_int32_array(&composite_stats.min_values(&part_a).unwrap())
838                .unwrap()
839                .into_iter()
840                .collect::<Vec<_>>();
841        let expected_values_part_a = vec![Some(1), Some(2)];
842        assert_eq!(min_values_part_a, expected_values_part_a);
843
844        let max_values_part_a =
845            as_int32_array(&composite_stats.max_values(&part_a).unwrap())
846                .unwrap()
847                .into_iter()
848                .collect::<Vec<_>>();
849        // For partition values, min and max are the same
850        assert_eq!(max_values_part_a, expected_values_part_a);
851
852        let min_values_part_b =
853            as_int32_array(&composite_stats.min_values(&part_b).unwrap())
854                .unwrap()
855                .into_iter()
856                .collect::<Vec<_>>();
857        let expected_values_part_b = vec![Some(10), Some(20)];
858        assert_eq!(min_values_part_b, expected_values_part_b);
859
860        // For file columns, should get values from file statistics
861        let min_values_col_x =
862            as_int32_array(&composite_stats.min_values(&col_x).unwrap())
863                .unwrap()
864                .into_iter()
865                .collect::<Vec<_>>();
866        let expected_values_col_x = vec![Some(100), Some(500)];
867        assert_eq!(min_values_col_x, expected_values_col_x);
868
869        let max_values_col_x =
870            as_int32_array(&composite_stats.max_values(&col_x).unwrap())
871                .unwrap()
872                .into_iter()
873                .collect::<Vec<_>>();
874        let expected_max_values_col_x = vec![Some(200), Some(600)];
875        assert_eq!(max_values_col_x, expected_max_values_col_x);
876
877        let min_values_col_y =
878            as_int32_array(&composite_stats.min_values(&col_y).unwrap())
879                .unwrap()
880                .into_iter()
881                .collect::<Vec<_>>();
882        let expected_values_col_y = vec![Some(300), Some(700)];
883        assert_eq!(min_values_col_y, expected_values_col_y);
884
885        // Test null counts - only available from file statistics
886        assert!(composite_stats.null_counts(&part_a).is_none());
887        assert!(composite_stats.null_counts(&part_b).is_none());
888
889        let null_counts_col_x =
890            as_uint64_array(&composite_stats.null_counts(&col_x).unwrap())
891                .unwrap()
892                .into_iter()
893                .collect::<Vec<_>>();
894        let expected_null_counts_col_x = vec![Some(0), Some(10)];
895        assert_eq!(null_counts_col_x, expected_null_counts_col_x);
896
897        // Test row counts - only available from file statistics
898        assert!(composite_stats.row_counts(&part_a).is_none());
899        let row_counts_col_x =
900            as_uint64_array(&composite_stats.row_counts(&col_x).unwrap())
901                .unwrap()
902                .into_iter()
903                .collect::<Vec<_>>();
904        let expected_row_counts = vec![Some(100), Some(200)];
905        assert_eq!(row_counts_col_x, expected_row_counts);
906
907        // Test contained values - only available from partition statistics
908        let values = HashSet::from([ScalarValue::from(1i32)]);
909        let contained_part_a = composite_stats.contained(&part_a, &values).unwrap();
910        let expected_contained_part_a = BooleanArray::from(vec![true, false]);
911        assert_eq!(contained_part_a, expected_contained_part_a);
912
913        // File statistics don't implement contained
914        assert!(composite_stats.contained(&col_x, &values).is_none());
915
916        // Non-existent column should return None for everything
917        let non_existent = Column::new_unqualified("non_existent");
918        assert!(composite_stats.min_values(&non_existent).is_none());
919        assert!(composite_stats.max_values(&non_existent).is_none());
920        assert!(composite_stats.null_counts(&non_existent).is_none());
921        assert!(composite_stats.row_counts(&non_existent).is_none());
922        assert!(composite_stats.contained(&non_existent, &values).is_none());
923
924        // Verify num_containers matches
925        assert_eq!(composite_stats.num_containers(), 2);
926    }
927
928    #[test]
929    fn test_composite_pruning_statistics_priority() {
930        // Create two sets of file statistics with the same column names
931        // but different values to test that the first one gets priority
932
933        // First set of statistics
934        let first_statistics = vec![
935            Arc::new(
936                Statistics::default()
937                    .add_column_statistics(
938                        ColumnStatistics::new_unknown()
939                            .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
940                            .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
941                            .with_null_count(Precision::Exact(0)),
942                    )
943                    .with_num_rows(Precision::Exact(100)),
944            ),
945            Arc::new(
946                Statistics::default()
947                    .add_column_statistics(
948                        ColumnStatistics::new_unknown()
949                            .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
950                            .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
951                            .with_null_count(Precision::Exact(5)),
952                    )
953                    .with_num_rows(Precision::Exact(200)),
954            ),
955        ];
956
957        let first_schema = Arc::new(Schema::new(vec![Field::new(
958            "col_a",
959            DataType::Int32,
960            false,
961        )]));
962        let first_stats = PrunableStatistics::new(first_statistics, first_schema);
963
964        // Second set of statistics with the same column name but different values
965        let second_statistics = vec![
966            Arc::new(
967                Statistics::default()
968                    .add_column_statistics(
969                        ColumnStatistics::new_unknown()
970                            .with_min_value(Precision::Exact(ScalarValue::from(1000i32)))
971                            .with_max_value(Precision::Exact(ScalarValue::from(2000i32)))
972                            .with_null_count(Precision::Exact(10)),
973                    )
974                    .with_num_rows(Precision::Exact(1000)),
975            ),
976            Arc::new(
977                Statistics::default()
978                    .add_column_statistics(
979                        ColumnStatistics::new_unknown()
980                            .with_min_value(Precision::Exact(ScalarValue::from(3000i32)))
981                            .with_max_value(Precision::Exact(ScalarValue::from(4000i32)))
982                            .with_null_count(Precision::Exact(20)),
983                    )
984                    .with_num_rows(Precision::Exact(2000)),
985            ),
986        ];
987
988        let second_schema = Arc::new(Schema::new(vec![Field::new(
989            "col_a",
990            DataType::Int32,
991            false,
992        )]));
993        let second_stats = PrunableStatistics::new(second_statistics, second_schema);
994
995        // Create composite statistics with first stats having priority
996        let composite_stats = CompositePruningStatistics::new(vec![
997            Box::new(first_stats.clone()),
998            Box::new(second_stats.clone()),
999        ]);
1000
1001        let col_a = Column::new_unqualified("col_a");
1002
1003        // Should get values from first statistics since it has priority
1004        let min_values = as_int32_array(&composite_stats.min_values(&col_a).unwrap())
1005            .unwrap()
1006            .into_iter()
1007            .collect::<Vec<_>>();
1008        let expected_min_values = vec![Some(100), Some(300)];
1009        assert_eq!(min_values, expected_min_values);
1010
1011        let max_values = as_int32_array(&composite_stats.max_values(&col_a).unwrap())
1012            .unwrap()
1013            .into_iter()
1014            .collect::<Vec<_>>();
1015        let expected_max_values = vec![Some(200), Some(400)];
1016        assert_eq!(max_values, expected_max_values);
1017
1018        let null_counts = as_uint64_array(&composite_stats.null_counts(&col_a).unwrap())
1019            .unwrap()
1020            .into_iter()
1021            .collect::<Vec<_>>();
1022        let expected_null_counts = vec![Some(0), Some(5)];
1023        assert_eq!(null_counts, expected_null_counts);
1024
1025        let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap())
1026            .unwrap()
1027            .into_iter()
1028            .collect::<Vec<_>>();
1029        let expected_row_counts = vec![Some(100), Some(200)];
1030        assert_eq!(row_counts, expected_row_counts);
1031
1032        // Create composite statistics with second stats having priority
1033        // Now that we've added Clone trait to PrunableStatistics, we can just clone them
1034
1035        let composite_stats_reversed = CompositePruningStatistics::new(vec![
1036            Box::new(second_stats.clone()),
1037            Box::new(first_stats.clone()),
1038        ]);
1039
1040        // Should get values from second statistics since it now has priority
1041        let min_values =
1042            as_int32_array(&composite_stats_reversed.min_values(&col_a).unwrap())
1043                .unwrap()
1044                .into_iter()
1045                .collect::<Vec<_>>();
1046        let expected_min_values = vec![Some(1000), Some(3000)];
1047        assert_eq!(min_values, expected_min_values);
1048
1049        let max_values =
1050            as_int32_array(&composite_stats_reversed.max_values(&col_a).unwrap())
1051                .unwrap()
1052                .into_iter()
1053                .collect::<Vec<_>>();
1054        let expected_max_values = vec![Some(2000), Some(4000)];
1055        assert_eq!(max_values, expected_max_values);
1056
1057        let null_counts =
1058            as_uint64_array(&composite_stats_reversed.null_counts(&col_a).unwrap())
1059                .unwrap()
1060                .into_iter()
1061                .collect::<Vec<_>>();
1062        let expected_null_counts = vec![Some(10), Some(20)];
1063        assert_eq!(null_counts, expected_null_counts);
1064
1065        let row_counts =
1066            as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap())
1067                .unwrap()
1068                .into_iter()
1069                .collect::<Vec<_>>();
1070        let expected_row_counts = vec![Some(1000), Some(2000)];
1071        assert_eq!(row_counts, expected_row_counts);
1072    }
1073
1074    #[test]
1075    fn test_composite_pruning_statistics_empty_and_mismatched_containers() {
1076        // Test with empty statistics vector
1077        // This should never happen, so we panic instead of returning a Result which would burned callers
1078        let result = std::panic::catch_unwind(|| {
1079            CompositePruningStatistics::new(vec![]);
1080        });
1081        assert!(result.is_err());
1082
1083        // We should panic here because the number of containers is different
1084        let result = std::panic::catch_unwind(|| {
1085            // Create statistics with different number of containers
1086            // Use partition stats for the test
1087            let partition_values_1 = vec![
1088                vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
1089                vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
1090            ];
1091            let partition_fields_1 = vec![
1092                Arc::new(Field::new("part_a", DataType::Int32, false)),
1093                Arc::new(Field::new("part_b", DataType::Int32, false)),
1094            ];
1095            let partition_stats_1 = PartitionPruningStatistics::try_new(
1096                partition_values_1,
1097                partition_fields_1,
1098            )
1099            .unwrap();
1100            let partition_values_2 = vec![
1101                vec![ScalarValue::from(3i32), ScalarValue::from(30i32)],
1102                vec![ScalarValue::from(4i32), ScalarValue::from(40i32)],
1103                vec![ScalarValue::from(5i32), ScalarValue::from(50i32)],
1104            ];
1105            let partition_fields_2 = vec![
1106                Arc::new(Field::new("part_x", DataType::Int32, false)),
1107                Arc::new(Field::new("part_y", DataType::Int32, false)),
1108            ];
1109            let partition_stats_2 = PartitionPruningStatistics::try_new(
1110                partition_values_2,
1111                partition_fields_2,
1112            )
1113            .unwrap();
1114
1115            CompositePruningStatistics::new(vec![
1116                Box::new(partition_stats_1),
1117                Box::new(partition_stats_2),
1118            ]);
1119        });
1120        assert!(result.is_err());
1121    }
1122}