datafusion_common/
pruning.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{Array, NullArray, UInt64Array};
19use arrow::array::{ArrayRef, BooleanArray};
20use arrow::datatypes::{FieldRef, Schema, SchemaRef};
21use std::collections::HashSet;
22use std::sync::Arc;
23
24use crate::error::DataFusionError;
25use crate::stats::Precision;
26use crate::{Column, Statistics};
27use crate::{ColumnStatistics, ScalarValue};
28
29/// A source of runtime statistical information to [`PruningPredicate`]s.
30///
31/// # Supported Information
32///
33/// 1. Minimum and maximum values for columns
34///
35/// 2. Null counts and row counts for columns
36///
37/// 3. Whether the values in a column are contained in a set of literals
38///
39/// # Vectorized Interface
40///
41/// Information for containers / files are returned as Arrow [`ArrayRef`], so
42/// the evaluation happens once on a single `RecordBatch`, which amortizes the
43/// overhead of evaluating the predicate. This is important when pruning 1000s
44/// of containers which often happens in analytic systems that have 1000s of
45/// potential files to consider.
46///
47/// For example, for the following three files with a single column `a`:
48/// ```text
49/// file1: column a: min=5, max=10
50/// file2: column a: No stats
51/// file2: column a: min=20, max=30
52/// ```
53///
54/// PruningStatistics would return:
55///
56/// ```text
57/// min_values("a") -> Some([5, Null, 20])
58/// max_values("a") -> Some([10, Null, 30])
59/// min_values("X") -> None
60/// ```
61///
62/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html
63pub trait PruningStatistics {
64    /// Return the minimum values for the named column, if known.
65    ///
66    /// If the minimum value for a particular container is not known, the
67    /// returned array should have `null` in that row. If the minimum value is
68    /// not known for any row, return `None`.
69    ///
70    /// Note: the returned array must contain [`Self::num_containers`] rows
71    fn min_values(&self, column: &Column) -> Option<ArrayRef>;
72
73    /// Return the maximum values for the named column, if known.
74    ///
75    /// See [`Self::min_values`] for when to return `None` and null values.
76    ///
77    /// Note: the returned array must contain [`Self::num_containers`] rows
78    fn max_values(&self, column: &Column) -> Option<ArrayRef>;
79
80    /// Return the number of containers (e.g. Row Groups) being pruned with
81    /// these statistics.
82    ///
83    /// This value corresponds to the size of the [`ArrayRef`] returned by
84    /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
85    /// and [`Self::row_counts`].
86    fn num_containers(&self) -> usize;
87
88    /// Return the number of null values for the named column as an
89    /// [`UInt64Array`]
90    ///
91    /// See [`Self::min_values`] for when to return `None` and null values.
92    ///
93    /// Note: the returned array must contain [`Self::num_containers`] rows
94    ///
95    /// [`UInt64Array`]: arrow::array::UInt64Array
96    fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
97
98    /// Return the number of rows for the named column in each container
99    /// as an [`UInt64Array`].
100    ///
101    /// See [`Self::min_values`] for when to return `None` and null values.
102    ///
103    /// Note: the returned array must contain [`Self::num_containers`] rows
104    ///
105    /// [`UInt64Array`]: arrow::array::UInt64Array
106    fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
107
108    /// Returns [`BooleanArray`] where each row represents information known
109    /// about specific literal `values` in a column.
110    ///
111    /// For example, Parquet Bloom Filters implement this API to communicate
112    /// that `values` are known not to be present in a Row Group.
113    ///
114    /// The returned array has one row for each container, with the following
115    /// meanings:
116    /// * `true` if the values in `column`  ONLY contain values from `values`
117    /// * `false` if the values in `column` are NOT ANY of `values`
118    /// * `null` if the neither of the above holds or is unknown.
119    ///
120    /// If these statistics can not determine column membership for any
121    /// container, return `None` (the default).
122    ///
123    /// Note: the returned array must contain [`Self::num_containers`] rows
124    fn contained(
125        &self,
126        column: &Column,
127        values: &HashSet<ScalarValue>,
128    ) -> Option<BooleanArray>;
129}
130
131/// Prune files based on their partition values.
132///
133/// This is used both at planning time and execution time to prune
134/// files based on their partition values.
135/// This feeds into [`CompositePruningStatistics`] to allow pruning
136/// with filters that depend both on partition columns and data columns
137/// (e.g. `WHERE partition_col = data_col`).
138#[deprecated(
139    since = "52.0.0",
140    note = "This struct is no longer used internally. Use `replace_columns_with_literals` from `datafusion-physical-expr-adapter` to substitute partition column values before pruning. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
141)]
142#[derive(Clone)]
143pub struct PartitionPruningStatistics {
144    /// Values for each column for each container.
145    ///
146    /// The outer vectors represent the columns while the inner vectors
147    /// represent the containers. The order must match the order of the
148    /// partition columns in [`PartitionPruningStatistics::partition_schema`].
149    partition_values: Vec<ArrayRef>,
150    /// The number of containers.
151    ///
152    /// Stored since the partition values are column-major and if
153    /// there are no columns we wouldn't know the number of containers.
154    num_containers: usize,
155    /// The schema of the partition columns.
156    ///
157    /// This must **not** be the schema of the entire file or table: it must
158    /// only be the schema of the partition columns, in the same order as the
159    /// values in [`PartitionPruningStatistics::partition_values`].
160    partition_schema: SchemaRef,
161}
162
163#[expect(deprecated)]
164impl PartitionPruningStatistics {
165    /// Create a new instance of [`PartitionPruningStatistics`].
166    ///
167    /// Args:
168    /// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
169    ///   The outer vector represents the containers while the inner
170    ///   vector represents the partition values for each column.
171    ///   Note that this is the **opposite** of the order of the
172    ///   partition columns in `PartitionPruningStatistics::partition_schema`.
173    /// * `partition_schema`: The schema of the partition columns.
174    ///   This must **not** be the schema of the entire file or table:
175    ///   instead it must only be the schema of the partition columns,
176    ///   in the same order as the values in `partition_values`.
177    ///
178    /// # Example
179    ///
180    /// To create [`PartitionPruningStatistics`] for two partition columns `a` and `b`,
181    /// for three containers like this:
182    ///
183    /// | a | b |
184    /// | - | - |
185    /// | 1 | 2 |
186    /// | 3 | 4 |
187    /// | 5 | 6 |
188    ///
189    /// ```
190    /// # use std::sync::Arc;
191    /// # use datafusion_common::ScalarValue;
192    /// # use arrow::datatypes::{DataType, Field};
193    /// # use datafusion_common::pruning::PartitionPruningStatistics;
194    ///
195    /// let partition_values = vec![
196    ///     vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
197    ///     vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
198    ///     vec![ScalarValue::from(5i32), ScalarValue::from(6i32)],
199    /// ];
200    /// let partition_fields = vec![
201    ///     Arc::new(Field::new("a", DataType::Int32, false)),
202    ///     Arc::new(Field::new("b", DataType::Int32, false)),
203    /// ];
204    /// let partition_stats =
205    ///     PartitionPruningStatistics::try_new(partition_values, partition_fields).unwrap();
206    /// ```
207    pub fn try_new(
208        partition_values: Vec<Vec<ScalarValue>>,
209        partition_fields: Vec<FieldRef>,
210    ) -> Result<Self, DataFusionError> {
211        let num_containers = partition_values.len();
212        let partition_schema = Arc::new(Schema::new(partition_fields));
213        let mut partition_values_by_column =
214            vec![
215                Vec::with_capacity(partition_values.len());
216                partition_schema.fields().len()
217            ];
218        for partition_value in partition_values {
219            for (i, value) in partition_value.into_iter().enumerate() {
220                partition_values_by_column[i].push(value);
221            }
222        }
223        Ok(Self {
224            partition_values: partition_values_by_column
225                .into_iter()
226                .map(|v| {
227                    if v.is_empty() {
228                        Ok(Arc::new(NullArray::new(0)) as ArrayRef)
229                    } else {
230                        ScalarValue::iter_to_array(v)
231                    }
232                })
233                .collect::<Result<Vec<_>, _>>()?,
234            num_containers,
235            partition_schema,
236        })
237    }
238}
239
240#[expect(deprecated)]
241impl PruningStatistics for PartitionPruningStatistics {
242    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
243        let index = self.partition_schema.index_of(column.name()).ok()?;
244        self.partition_values.get(index).and_then(|v| {
245            if v.is_empty() || v.null_count() == v.len() {
246                // If the array is empty or all nulls, return None
247                None
248            } else {
249                // Otherwise, return the array as is
250                Some(Arc::clone(v))
251            }
252        })
253    }
254
255    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
256        self.min_values(column)
257    }
258
259    fn num_containers(&self) -> usize {
260        self.num_containers
261    }
262
263    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
264        None
265    }
266
267    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
268        None
269    }
270
271    fn contained(
272        &self,
273        column: &Column,
274        values: &HashSet<ScalarValue>,
275    ) -> Option<BooleanArray> {
276        let index = self.partition_schema.index_of(column.name()).ok()?;
277        let array = self.partition_values.get(index)?;
278        let boolean_array = values.iter().try_fold(None, |acc, v| {
279            let arrow_value = v.to_scalar().ok()?;
280            let eq_result = arrow::compute::kernels::cmp::eq(array, &arrow_value).ok()?;
281            match acc {
282                None => Some(Some(eq_result)),
283                Some(acc_array) => {
284                    arrow::compute::kernels::boolean::or_kleene(&acc_array, &eq_result)
285                        .map(Some)
286                        .ok()
287                }
288            }
289        })??;
290        // If the boolean array is empty or all null values, return None
291        if boolean_array.is_empty() || boolean_array.null_count() == boolean_array.len() {
292            None
293        } else {
294            Some(boolean_array)
295        }
296    }
297}
298
299/// Prune a set of containers represented by their statistics.
300///
301/// Each [`Statistics`] represents a "container" -- some collection of data
302/// that has statistics of its columns.
303///
304/// It is up to the caller to decide what each container represents. For
305/// example, they can come from a file (e.g. [`PartitionedFile`]) or a set of of
306/// files (e.g. [`FileGroup`])
307///
308/// [`PartitionedFile`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.PartitionedFile.html
309/// [`FileGroup`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileGroup.html
310#[derive(Clone)]
311pub struct PrunableStatistics {
312    /// Statistics for each container.
313    /// These are taken as a reference since they may be rather large / expensive to clone
314    /// and we often won't return all of them as ArrayRefs (we only return the columns the predicate requests).
315    statistics: Vec<Arc<Statistics>>,
316    /// The schema of the file these statistics are for.
317    schema: SchemaRef,
318}
319
320impl PrunableStatistics {
321    /// Create a new instance of [`PrunableStatistics`].
322    /// Each [`Statistics`] represents a container (e.g. a file or a partition of files).
323    /// The `schema` is the schema of the data in the containers and should apply to all files.
324    pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
325        Self { statistics, schema }
326    }
327
328    fn get_exact_column_statistics(
329        &self,
330        column: &Column,
331        get_stat: impl Fn(&ColumnStatistics) -> &Precision<ScalarValue>,
332    ) -> Option<ArrayRef> {
333        let index = self.schema.index_of(column.name()).ok()?;
334        let mut has_value = false;
335        match ScalarValue::iter_to_array(self.statistics.iter().map(|s| {
336            s.column_statistics
337                .get(index)
338                .and_then(|stat| {
339                    if let Precision::Exact(min) = get_stat(stat) {
340                        has_value = true;
341                        Some(min.clone())
342                    } else {
343                        None
344                    }
345                })
346                .unwrap_or(ScalarValue::Null)
347        })) {
348            // If there is any non-null value and no errors, return the array
349            Ok(array) => has_value.then_some(array),
350            Err(_) => {
351                log::warn!(
352                    "Failed to convert min values to array for column {}",
353                    column.name()
354                );
355                None
356            }
357        }
358    }
359}
360
361impl PruningStatistics for PrunableStatistics {
362    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
363        self.get_exact_column_statistics(column, |stat| &stat.min_value)
364    }
365
366    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
367        self.get_exact_column_statistics(column, |stat| &stat.max_value)
368    }
369
370    fn num_containers(&self) -> usize {
371        self.statistics.len()
372    }
373
374    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
375        let index = self.schema.index_of(column.name()).ok()?;
376        if self.statistics.iter().any(|s| {
377            s.column_statistics
378                .get(index)
379                .is_some_and(|stat| stat.null_count.is_exact().unwrap_or(false))
380        }) {
381            Some(Arc::new(
382                self.statistics
383                    .iter()
384                    .map(|s| {
385                        s.column_statistics.get(index).and_then(|stat| {
386                            if let Precision::Exact(null_count) = &stat.null_count {
387                                u64::try_from(*null_count).ok()
388                            } else {
389                                None
390                            }
391                        })
392                    })
393                    .collect::<UInt64Array>(),
394            ))
395        } else {
396            None
397        }
398    }
399
400    fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
401        // If the column does not exist in the schema, return None
402        if self.schema.index_of(column.name()).is_err() {
403            return None;
404        }
405        if self
406            .statistics
407            .iter()
408            .any(|s| s.num_rows.is_exact().unwrap_or(false))
409        {
410            Some(Arc::new(
411                self.statistics
412                    .iter()
413                    .map(|s| {
414                        if let Precision::Exact(row_count) = &s.num_rows {
415                            u64::try_from(*row_count).ok()
416                        } else {
417                            None
418                        }
419                    })
420                    .collect::<UInt64Array>(),
421            ))
422        } else {
423            None
424        }
425    }
426
427    fn contained(
428        &self,
429        _column: &Column,
430        _values: &HashSet<ScalarValue>,
431    ) -> Option<BooleanArray> {
432        None
433    }
434}
435
436/// Combine multiple [`PruningStatistics`] into a single
437/// [`CompositePruningStatistics`].
438/// This can be used to combine statistics from different sources,
439/// for example partition values and file statistics.
440/// This allows pruning with filters that depend on multiple sources of statistics,
441/// such as `WHERE partition_col = data_col`.
442/// This is done by iterating over the statistics and returning the first
443/// one that has information for the requested column.
444/// If multiple statistics have information for the same column,
445/// the first one is returned without any regard for completeness or accuracy.
446/// That is: if the first statistics has information for a column, even if it is incomplete,
447/// that is returned even if a later statistics has more complete information.
448#[deprecated(
449    since = "52.0.0",
450    note = "This struct is no longer used internally. It may be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first. Please open an issue if you have a use case for it."
451)]
452pub struct CompositePruningStatistics {
453    pub statistics: Vec<Box<dyn PruningStatistics>>,
454}
455
456#[expect(deprecated)]
457impl CompositePruningStatistics {
458    /// Create a new instance of [`CompositePruningStatistics`] from
459    /// a vector of [`PruningStatistics`].
460    pub fn new(statistics: Vec<Box<dyn PruningStatistics>>) -> Self {
461        assert!(!statistics.is_empty());
462        // Check that all statistics have the same number of containers
463        let num_containers = statistics[0].num_containers();
464        for stats in &statistics {
465            assert_eq!(num_containers, stats.num_containers());
466        }
467        Self { statistics }
468    }
469}
470
471#[expect(deprecated)]
472impl PruningStatistics for CompositePruningStatistics {
473    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
474        for stats in &self.statistics {
475            if let Some(array) = stats.min_values(column) {
476                return Some(array);
477            }
478        }
479        None
480    }
481
482    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
483        for stats in &self.statistics {
484            if let Some(array) = stats.max_values(column) {
485                return Some(array);
486            }
487        }
488        None
489    }
490
491    fn num_containers(&self) -> usize {
492        self.statistics[0].num_containers()
493    }
494
495    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
496        for stats in &self.statistics {
497            if let Some(array) = stats.null_counts(column) {
498                return Some(array);
499            }
500        }
501        None
502    }
503
504    fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
505        for stats in &self.statistics {
506            if let Some(array) = stats.row_counts(column) {
507                return Some(array);
508            }
509        }
510        None
511    }
512
513    fn contained(
514        &self,
515        column: &Column,
516        values: &HashSet<ScalarValue>,
517    ) -> Option<BooleanArray> {
518        for stats in &self.statistics {
519            if let Some(array) = stats.contained(column, values) {
520                return Some(array);
521            }
522        }
523        None
524    }
525}
526
527#[cfg(test)]
528#[expect(deprecated)]
529mod tests {
530    use crate::{
531        ColumnStatistics,
532        cast::{as_int32_array, as_uint64_array},
533    };
534
535    use super::*;
536    use arrow::datatypes::{DataType, Field};
537    use std::sync::Arc;
538
539    /// return a PartitionPruningStatistics for two columns 'a' and 'b'
540    /// and the following stats
541    ///
542    /// | a | b |
543    /// | - | - |
544    /// | 1 | 2 |
545    /// | 3 | 4 |
546    fn partition_pruning_statistics_setup() -> PartitionPruningStatistics {
547        let partition_values = vec![
548            vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
549            vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
550        ];
551        let partition_fields = vec![
552            Arc::new(Field::new("a", DataType::Int32, false)),
553            Arc::new(Field::new("b", DataType::Int32, false)),
554        ];
555        PartitionPruningStatistics::try_new(partition_values, partition_fields).unwrap()
556    }
557
558    #[test]
559    fn test_partition_pruning_statistics() {
560        let partition_stats = partition_pruning_statistics_setup();
561
562        let column_a = Column::new_unqualified("a");
563        let column_b = Column::new_unqualified("b");
564
565        // Partition values don't know anything about nulls or row counts
566        assert!(partition_stats.null_counts(&column_a).is_none());
567        assert!(partition_stats.row_counts(&column_a).is_none());
568        assert!(partition_stats.null_counts(&column_b).is_none());
569        assert!(partition_stats.row_counts(&column_b).is_none());
570
571        // Min/max values are the same as the partition values
572        let min_values_a =
573            as_int32_array(&partition_stats.min_values(&column_a).unwrap())
574                .unwrap()
575                .into_iter()
576                .collect::<Vec<_>>();
577        let expected_values_a = vec![Some(1), Some(3)];
578        assert_eq!(min_values_a, expected_values_a);
579        let max_values_a =
580            as_int32_array(&partition_stats.max_values(&column_a).unwrap())
581                .unwrap()
582                .into_iter()
583                .collect::<Vec<_>>();
584        let expected_values_a = vec![Some(1), Some(3)];
585        assert_eq!(max_values_a, expected_values_a);
586
587        let min_values_b =
588            as_int32_array(&partition_stats.min_values(&column_b).unwrap())
589                .unwrap()
590                .into_iter()
591                .collect::<Vec<_>>();
592        let expected_values_b = vec![Some(2), Some(4)];
593        assert_eq!(min_values_b, expected_values_b);
594        let max_values_b =
595            as_int32_array(&partition_stats.max_values(&column_b).unwrap())
596                .unwrap()
597                .into_iter()
598                .collect::<Vec<_>>();
599        let expected_values_b = vec![Some(2), Some(4)];
600        assert_eq!(max_values_b, expected_values_b);
601
602        // Contained values are only true for the partition values
603        let values = HashSet::from([ScalarValue::from(1i32)]);
604        let contained_a = partition_stats.contained(&column_a, &values).unwrap();
605        let expected_contained_a = BooleanArray::from(vec![true, false]);
606        assert_eq!(contained_a, expected_contained_a);
607        let contained_b = partition_stats.contained(&column_b, &values).unwrap();
608        let expected_contained_b = BooleanArray::from(vec![false, false]);
609        assert_eq!(contained_b, expected_contained_b);
610
611        // The number of containers is the length of the partition values
612        assert_eq!(partition_stats.num_containers(), 2);
613    }
614
615    #[test]
616    fn test_partition_pruning_statistics_multiple_positive_values() {
617        let partition_stats = partition_pruning_statistics_setup();
618
619        let column_a = Column::new_unqualified("a");
620
621        // The two containers have `a` values 1 and 3, so they both only contain values from 1 and 3
622        let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(3i32)]);
623        let contained_a = partition_stats.contained(&column_a, &values).unwrap();
624        let expected_contained_a = BooleanArray::from(vec![true, true]);
625        assert_eq!(contained_a, expected_contained_a);
626    }
627
628    #[test]
629    fn test_partition_pruning_statistics_multiple_negative_values() {
630        let partition_stats = partition_pruning_statistics_setup();
631
632        let column_a = Column::new_unqualified("a");
633
634        // The two containers have `a` values 1 and 3,
635        // so the first contains ONLY values from 1,2
636        // but the second does not
637        let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(2i32)]);
638        let contained_a = partition_stats.contained(&column_a, &values).unwrap();
639        let expected_contained_a = BooleanArray::from(vec![true, false]);
640        assert_eq!(contained_a, expected_contained_a);
641    }
642
643    #[test]
644    fn test_partition_pruning_statistics_null_in_values() {
645        let partition_values = vec![
646            vec![
647                ScalarValue::from(1i32),
648                ScalarValue::from(2i32),
649                ScalarValue::from(3i32),
650            ],
651            vec![
652                ScalarValue::from(4i32),
653                ScalarValue::from(5i32),
654                ScalarValue::from(6i32),
655            ],
656        ];
657        let partition_fields = vec![
658            Arc::new(Field::new("a", DataType::Int32, false)),
659            Arc::new(Field::new("b", DataType::Int32, false)),
660            Arc::new(Field::new("c", DataType::Int32, false)),
661        ];
662        let partition_stats =
663            PartitionPruningStatistics::try_new(partition_values, partition_fields)
664                .unwrap();
665
666        let column_a = Column::new_unqualified("a");
667        let column_b = Column::new_unqualified("b");
668        let column_c = Column::new_unqualified("c");
669
670        let values_a = HashSet::from([ScalarValue::from(1i32), ScalarValue::Int32(None)]);
671        let contained_a = partition_stats.contained(&column_a, &values_a).unwrap();
672        let mut builder = BooleanArray::builder(2);
673        builder.append_value(true);
674        builder.append_null();
675        let expected_contained_a = builder.finish();
676        assert_eq!(contained_a, expected_contained_a);
677
678        // First match creates a NULL boolean array
679        // The accumulator should update the value to true for the second value
680        let values_b = HashSet::from([ScalarValue::Int32(None), ScalarValue::from(5i32)]);
681        let contained_b = partition_stats.contained(&column_b, &values_b).unwrap();
682        let mut builder = BooleanArray::builder(2);
683        builder.append_null();
684        builder.append_value(true);
685        let expected_contained_b = builder.finish();
686        assert_eq!(contained_b, expected_contained_b);
687
688        // All matches are null, contained should return None
689        let values_c = HashSet::from([ScalarValue::Int32(None)]);
690        let contained_c = partition_stats.contained(&column_c, &values_c);
691        assert!(contained_c.is_none());
692    }
693
694    #[test]
695    fn test_partition_pruning_statistics_empty() {
696        let partition_values = vec![];
697        let partition_fields = vec![
698            Arc::new(Field::new("a", DataType::Int32, false)),
699            Arc::new(Field::new("b", DataType::Int32, false)),
700        ];
701        let partition_stats =
702            PartitionPruningStatistics::try_new(partition_values, partition_fields)
703                .unwrap();
704
705        let column_a = Column::new_unqualified("a");
706        let column_b = Column::new_unqualified("b");
707
708        // Partition values don't know anything about nulls or row counts
709        assert!(partition_stats.null_counts(&column_a).is_none());
710        assert!(partition_stats.row_counts(&column_a).is_none());
711        assert!(partition_stats.null_counts(&column_b).is_none());
712        assert!(partition_stats.row_counts(&column_b).is_none());
713
714        // Min/max values are all missing
715        assert!(partition_stats.min_values(&column_a).is_none());
716        assert!(partition_stats.max_values(&column_a).is_none());
717        assert!(partition_stats.min_values(&column_b).is_none());
718        assert!(partition_stats.max_values(&column_b).is_none());
719
720        // Contained values are all empty
721        let values = HashSet::from([ScalarValue::from(1i32)]);
722        assert!(partition_stats.contained(&column_a, &values).is_none());
723    }
724
725    #[test]
726    fn test_statistics_pruning_statistics() {
727        let statistics = vec![
728            Arc::new(
729                Statistics::default()
730                    .add_column_statistics(
731                        ColumnStatistics::new_unknown()
732                            .with_min_value(Precision::Exact(ScalarValue::from(0i32)))
733                            .with_max_value(Precision::Exact(ScalarValue::from(100i32)))
734                            .with_null_count(Precision::Exact(0)),
735                    )
736                    .add_column_statistics(
737                        ColumnStatistics::new_unknown()
738                            .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
739                            .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
740                            .with_null_count(Precision::Exact(5)),
741                    )
742                    .with_num_rows(Precision::Exact(100)),
743            ),
744            Arc::new(
745                Statistics::default()
746                    .add_column_statistics(
747                        ColumnStatistics::new_unknown()
748                            .with_min_value(Precision::Exact(ScalarValue::from(50i32)))
749                            .with_max_value(Precision::Exact(ScalarValue::from(300i32)))
750                            .with_null_count(Precision::Exact(10)),
751                    )
752                    .add_column_statistics(
753                        ColumnStatistics::new_unknown()
754                            .with_min_value(Precision::Exact(ScalarValue::from(200i32)))
755                            .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
756                            .with_null_count(Precision::Exact(0)),
757                    )
758                    .with_num_rows(Precision::Exact(200)),
759            ),
760        ];
761
762        let schema = Arc::new(Schema::new(vec![
763            Field::new("a", DataType::Int32, false),
764            Field::new("b", DataType::Int32, false),
765            Field::new("c", DataType::Int32, false),
766        ]));
767        let pruning_stats = PrunableStatistics::new(statistics, schema);
768
769        let column_a = Column::new_unqualified("a");
770        let column_b = Column::new_unqualified("b");
771
772        // Min/max values are the same as the statistics
773        let min_values_a = as_int32_array(&pruning_stats.min_values(&column_a).unwrap())
774            .unwrap()
775            .into_iter()
776            .collect::<Vec<_>>();
777        let expected_values_a = vec![Some(0), Some(50)];
778        assert_eq!(min_values_a, expected_values_a);
779        let max_values_a = as_int32_array(&pruning_stats.max_values(&column_a).unwrap())
780            .unwrap()
781            .into_iter()
782            .collect::<Vec<_>>();
783        let expected_values_a = vec![Some(100), Some(300)];
784        assert_eq!(max_values_a, expected_values_a);
785        let min_values_b = as_int32_array(&pruning_stats.min_values(&column_b).unwrap())
786            .unwrap()
787            .into_iter()
788            .collect::<Vec<_>>();
789        let expected_values_b = vec![Some(100), Some(200)];
790        assert_eq!(min_values_b, expected_values_b);
791        let max_values_b = as_int32_array(&pruning_stats.max_values(&column_b).unwrap())
792            .unwrap()
793            .into_iter()
794            .collect::<Vec<_>>();
795        let expected_values_b = vec![Some(200), Some(400)];
796        assert_eq!(max_values_b, expected_values_b);
797
798        // Null counts are the same as the statistics
799        let null_counts_a =
800            as_uint64_array(&pruning_stats.null_counts(&column_a).unwrap())
801                .unwrap()
802                .into_iter()
803                .collect::<Vec<_>>();
804        let expected_null_counts_a = vec![Some(0), Some(10)];
805        assert_eq!(null_counts_a, expected_null_counts_a);
806        let null_counts_b =
807            as_uint64_array(&pruning_stats.null_counts(&column_b).unwrap())
808                .unwrap()
809                .into_iter()
810                .collect::<Vec<_>>();
811        let expected_null_counts_b = vec![Some(5), Some(0)];
812        assert_eq!(null_counts_b, expected_null_counts_b);
813
814        // Row counts are the same as the statistics
815        let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap())
816            .unwrap()
817            .into_iter()
818            .collect::<Vec<_>>();
819        let expected_row_counts_a = vec![Some(100), Some(200)];
820        assert_eq!(row_counts_a, expected_row_counts_a);
821        let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap())
822            .unwrap()
823            .into_iter()
824            .collect::<Vec<_>>();
825        let expected_row_counts_b = vec![Some(100), Some(200)];
826        assert_eq!(row_counts_b, expected_row_counts_b);
827
828        // Contained values are all null/missing (we can't know this just from statistics)
829        let values = HashSet::from([ScalarValue::from(0i32)]);
830        assert!(pruning_stats.contained(&column_a, &values).is_none());
831        assert!(pruning_stats.contained(&column_b, &values).is_none());
832
833        // The number of containers is the length of the statistics
834        assert_eq!(pruning_stats.num_containers(), 2);
835
836        // Test with a column that has no statistics
837        let column_c = Column::new_unqualified("c");
838        assert!(pruning_stats.min_values(&column_c).is_none());
839        assert!(pruning_stats.max_values(&column_c).is_none());
840        assert!(pruning_stats.null_counts(&column_c).is_none());
841        // Since row counts uses the first column that has row counts we get them back even
842        // if this columns does not have them set.
843        // This is debatable, personally I think `row_count` should not take a `Column` as an argument
844        // at all since all columns should have the same number of rows.
845        // But for now we just document the current behavior in this test.
846        let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap())
847            .unwrap()
848            .into_iter()
849            .collect::<Vec<_>>();
850        let expected_row_counts_c = vec![Some(100), Some(200)];
851        assert_eq!(row_counts_c, expected_row_counts_c);
852        assert!(pruning_stats.contained(&column_c, &values).is_none());
853
854        // Test with a column that doesn't exist
855        let column_d = Column::new_unqualified("d");
856        assert!(pruning_stats.min_values(&column_d).is_none());
857        assert!(pruning_stats.max_values(&column_d).is_none());
858        assert!(pruning_stats.null_counts(&column_d).is_none());
859        assert!(pruning_stats.row_counts(&column_d).is_none());
860        assert!(pruning_stats.contained(&column_d, &values).is_none());
861    }
862
863    #[test]
864    fn test_statistics_pruning_statistics_empty() {
865        let statistics = vec![];
866        let schema = Arc::new(Schema::new(vec![
867            Field::new("a", DataType::Int32, false),
868            Field::new("b", DataType::Int32, false),
869            Field::new("c", DataType::Int32, false),
870        ]));
871        let pruning_stats = PrunableStatistics::new(statistics, schema);
872
873        let column_a = Column::new_unqualified("a");
874        let column_b = Column::new_unqualified("b");
875
876        // Min/max values are all missing
877        assert!(pruning_stats.min_values(&column_a).is_none());
878        assert!(pruning_stats.max_values(&column_a).is_none());
879        assert!(pruning_stats.min_values(&column_b).is_none());
880        assert!(pruning_stats.max_values(&column_b).is_none());
881
882        // Null counts are all missing
883        assert!(pruning_stats.null_counts(&column_a).is_none());
884        assert!(pruning_stats.null_counts(&column_b).is_none());
885
886        // Row counts are all missing
887        assert!(pruning_stats.row_counts(&column_a).is_none());
888        assert!(pruning_stats.row_counts(&column_b).is_none());
889
890        // Contained values are all empty
891        let values = HashSet::from([ScalarValue::from(1i32)]);
892        assert!(pruning_stats.contained(&column_a, &values).is_none());
893    }
894
895    #[test]
896    fn test_composite_pruning_statistics_partition_and_file() {
897        // Create partition statistics
898        let partition_values = vec![
899            vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
900            vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
901        ];
902        let partition_fields = vec![
903            Arc::new(Field::new("part_a", DataType::Int32, false)),
904            Arc::new(Field::new("part_b", DataType::Int32, false)),
905        ];
906        let partition_stats =
907            PartitionPruningStatistics::try_new(partition_values, partition_fields)
908                .unwrap();
909
910        // Create file statistics
911        let file_statistics = vec![
912            Arc::new(
913                Statistics::default()
914                    .add_column_statistics(
915                        ColumnStatistics::new_unknown()
916                            .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
917                            .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
918                            .with_null_count(Precision::Exact(0)),
919                    )
920                    .add_column_statistics(
921                        ColumnStatistics::new_unknown()
922                            .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
923                            .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
924                            .with_null_count(Precision::Exact(5)),
925                    )
926                    .with_num_rows(Precision::Exact(100)),
927            ),
928            Arc::new(
929                Statistics::default()
930                    .add_column_statistics(
931                        ColumnStatistics::new_unknown()
932                            .with_min_value(Precision::Exact(ScalarValue::from(500i32)))
933                            .with_max_value(Precision::Exact(ScalarValue::from(600i32)))
934                            .with_null_count(Precision::Exact(10)),
935                    )
936                    .add_column_statistics(
937                        ColumnStatistics::new_unknown()
938                            .with_min_value(Precision::Exact(ScalarValue::from(700i32)))
939                            .with_max_value(Precision::Exact(ScalarValue::from(800i32)))
940                            .with_null_count(Precision::Exact(0)),
941                    )
942                    .with_num_rows(Precision::Exact(200)),
943            ),
944        ];
945
946        let file_schema = Arc::new(Schema::new(vec![
947            Field::new("col_x", DataType::Int32, false),
948            Field::new("col_y", DataType::Int32, false),
949        ]));
950        let file_stats = PrunableStatistics::new(file_statistics, file_schema);
951
952        // Create composite statistics
953        let composite_stats = CompositePruningStatistics::new(vec![
954            Box::new(partition_stats),
955            Box::new(file_stats),
956        ]);
957
958        // Test accessing columns that are only in partition statistics
959        let part_a = Column::new_unqualified("part_a");
960        let part_b = Column::new_unqualified("part_b");
961
962        // Test accessing columns that are only in file statistics
963        let col_x = Column::new_unqualified("col_x");
964        let col_y = Column::new_unqualified("col_y");
965
966        // For partition columns, should get values from partition statistics
967        let min_values_part_a =
968            as_int32_array(&composite_stats.min_values(&part_a).unwrap())
969                .unwrap()
970                .into_iter()
971                .collect::<Vec<_>>();
972        let expected_values_part_a = vec![Some(1), Some(2)];
973        assert_eq!(min_values_part_a, expected_values_part_a);
974
975        let max_values_part_a =
976            as_int32_array(&composite_stats.max_values(&part_a).unwrap())
977                .unwrap()
978                .into_iter()
979                .collect::<Vec<_>>();
980        // For partition values, min and max are the same
981        assert_eq!(max_values_part_a, expected_values_part_a);
982
983        let min_values_part_b =
984            as_int32_array(&composite_stats.min_values(&part_b).unwrap())
985                .unwrap()
986                .into_iter()
987                .collect::<Vec<_>>();
988        let expected_values_part_b = vec![Some(10), Some(20)];
989        assert_eq!(min_values_part_b, expected_values_part_b);
990
991        // For file columns, should get values from file statistics
992        let min_values_col_x =
993            as_int32_array(&composite_stats.min_values(&col_x).unwrap())
994                .unwrap()
995                .into_iter()
996                .collect::<Vec<_>>();
997        let expected_values_col_x = vec![Some(100), Some(500)];
998        assert_eq!(min_values_col_x, expected_values_col_x);
999
1000        let max_values_col_x =
1001            as_int32_array(&composite_stats.max_values(&col_x).unwrap())
1002                .unwrap()
1003                .into_iter()
1004                .collect::<Vec<_>>();
1005        let expected_max_values_col_x = vec![Some(200), Some(600)];
1006        assert_eq!(max_values_col_x, expected_max_values_col_x);
1007
1008        let min_values_col_y =
1009            as_int32_array(&composite_stats.min_values(&col_y).unwrap())
1010                .unwrap()
1011                .into_iter()
1012                .collect::<Vec<_>>();
1013        let expected_values_col_y = vec![Some(300), Some(700)];
1014        assert_eq!(min_values_col_y, expected_values_col_y);
1015
1016        // Test null counts - only available from file statistics
1017        assert!(composite_stats.null_counts(&part_a).is_none());
1018        assert!(composite_stats.null_counts(&part_b).is_none());
1019
1020        let null_counts_col_x =
1021            as_uint64_array(&composite_stats.null_counts(&col_x).unwrap())
1022                .unwrap()
1023                .into_iter()
1024                .collect::<Vec<_>>();
1025        let expected_null_counts_col_x = vec![Some(0), Some(10)];
1026        assert_eq!(null_counts_col_x, expected_null_counts_col_x);
1027
1028        // Test row counts - only available from file statistics
1029        assert!(composite_stats.row_counts(&part_a).is_none());
1030        let row_counts_col_x =
1031            as_uint64_array(&composite_stats.row_counts(&col_x).unwrap())
1032                .unwrap()
1033                .into_iter()
1034                .collect::<Vec<_>>();
1035        let expected_row_counts = vec![Some(100), Some(200)];
1036        assert_eq!(row_counts_col_x, expected_row_counts);
1037
1038        // Test contained values - only available from partition statistics
1039        let values = HashSet::from([ScalarValue::from(1i32)]);
1040        let contained_part_a = composite_stats.contained(&part_a, &values).unwrap();
1041        let expected_contained_part_a = BooleanArray::from(vec![true, false]);
1042        assert_eq!(contained_part_a, expected_contained_part_a);
1043
1044        // File statistics don't implement contained
1045        assert!(composite_stats.contained(&col_x, &values).is_none());
1046
1047        // Non-existent column should return None for everything
1048        let non_existent = Column::new_unqualified("non_existent");
1049        assert!(composite_stats.min_values(&non_existent).is_none());
1050        assert!(composite_stats.max_values(&non_existent).is_none());
1051        assert!(composite_stats.null_counts(&non_existent).is_none());
1052        assert!(composite_stats.row_counts(&non_existent).is_none());
1053        assert!(composite_stats.contained(&non_existent, &values).is_none());
1054
1055        // Verify num_containers matches
1056        assert_eq!(composite_stats.num_containers(), 2);
1057    }
1058
1059    #[test]
1060    fn test_composite_pruning_statistics_priority() {
1061        // Create two sets of file statistics with the same column names
1062        // but different values to test that the first one gets priority
1063
1064        // First set of statistics
1065        let first_statistics = vec![
1066            Arc::new(
1067                Statistics::default()
1068                    .add_column_statistics(
1069                        ColumnStatistics::new_unknown()
1070                            .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
1071                            .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
1072                            .with_null_count(Precision::Exact(0)),
1073                    )
1074                    .with_num_rows(Precision::Exact(100)),
1075            ),
1076            Arc::new(
1077                Statistics::default()
1078                    .add_column_statistics(
1079                        ColumnStatistics::new_unknown()
1080                            .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
1081                            .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
1082                            .with_null_count(Precision::Exact(5)),
1083                    )
1084                    .with_num_rows(Precision::Exact(200)),
1085            ),
1086        ];
1087
1088        let first_schema = Arc::new(Schema::new(vec![Field::new(
1089            "col_a",
1090            DataType::Int32,
1091            false,
1092        )]));
1093        let first_stats = PrunableStatistics::new(first_statistics, first_schema);
1094
1095        // Second set of statistics with the same column name but different values
1096        let second_statistics = vec![
1097            Arc::new(
1098                Statistics::default()
1099                    .add_column_statistics(
1100                        ColumnStatistics::new_unknown()
1101                            .with_min_value(Precision::Exact(ScalarValue::from(1000i32)))
1102                            .with_max_value(Precision::Exact(ScalarValue::from(2000i32)))
1103                            .with_null_count(Precision::Exact(10)),
1104                    )
1105                    .with_num_rows(Precision::Exact(1000)),
1106            ),
1107            Arc::new(
1108                Statistics::default()
1109                    .add_column_statistics(
1110                        ColumnStatistics::new_unknown()
1111                            .with_min_value(Precision::Exact(ScalarValue::from(3000i32)))
1112                            .with_max_value(Precision::Exact(ScalarValue::from(4000i32)))
1113                            .with_null_count(Precision::Exact(20)),
1114                    )
1115                    .with_num_rows(Precision::Exact(2000)),
1116            ),
1117        ];
1118
1119        let second_schema = Arc::new(Schema::new(vec![Field::new(
1120            "col_a",
1121            DataType::Int32,
1122            false,
1123        )]));
1124        let second_stats = PrunableStatistics::new(second_statistics, second_schema);
1125
1126        // Create composite statistics with first stats having priority
1127        let composite_stats = CompositePruningStatistics::new(vec![
1128            Box::new(first_stats.clone()),
1129            Box::new(second_stats.clone()),
1130        ]);
1131
1132        let col_a = Column::new_unqualified("col_a");
1133
1134        // Should get values from first statistics since it has priority
1135        let min_values = as_int32_array(&composite_stats.min_values(&col_a).unwrap())
1136            .unwrap()
1137            .into_iter()
1138            .collect::<Vec<_>>();
1139        let expected_min_values = vec![Some(100), Some(300)];
1140        assert_eq!(min_values, expected_min_values);
1141
1142        let max_values = as_int32_array(&composite_stats.max_values(&col_a).unwrap())
1143            .unwrap()
1144            .into_iter()
1145            .collect::<Vec<_>>();
1146        let expected_max_values = vec![Some(200), Some(400)];
1147        assert_eq!(max_values, expected_max_values);
1148
1149        let null_counts = as_uint64_array(&composite_stats.null_counts(&col_a).unwrap())
1150            .unwrap()
1151            .into_iter()
1152            .collect::<Vec<_>>();
1153        let expected_null_counts = vec![Some(0), Some(5)];
1154        assert_eq!(null_counts, expected_null_counts);
1155
1156        let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap())
1157            .unwrap()
1158            .into_iter()
1159            .collect::<Vec<_>>();
1160        let expected_row_counts = vec![Some(100), Some(200)];
1161        assert_eq!(row_counts, expected_row_counts);
1162
1163        // Create composite statistics with second stats having priority
1164        // Now that we've added Clone trait to PrunableStatistics, we can just clone them
1165
1166        let composite_stats_reversed = CompositePruningStatistics::new(vec![
1167            Box::new(second_stats.clone()),
1168            Box::new(first_stats.clone()),
1169        ]);
1170
1171        // Should get values from second statistics since it now has priority
1172        let min_values =
1173            as_int32_array(&composite_stats_reversed.min_values(&col_a).unwrap())
1174                .unwrap()
1175                .into_iter()
1176                .collect::<Vec<_>>();
1177        let expected_min_values = vec![Some(1000), Some(3000)];
1178        assert_eq!(min_values, expected_min_values);
1179
1180        let max_values =
1181            as_int32_array(&composite_stats_reversed.max_values(&col_a).unwrap())
1182                .unwrap()
1183                .into_iter()
1184                .collect::<Vec<_>>();
1185        let expected_max_values = vec![Some(2000), Some(4000)];
1186        assert_eq!(max_values, expected_max_values);
1187
1188        let null_counts =
1189            as_uint64_array(&composite_stats_reversed.null_counts(&col_a).unwrap())
1190                .unwrap()
1191                .into_iter()
1192                .collect::<Vec<_>>();
1193        let expected_null_counts = vec![Some(10), Some(20)];
1194        assert_eq!(null_counts, expected_null_counts);
1195
1196        let row_counts =
1197            as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap())
1198                .unwrap()
1199                .into_iter()
1200                .collect::<Vec<_>>();
1201        let expected_row_counts = vec![Some(1000), Some(2000)];
1202        assert_eq!(row_counts, expected_row_counts);
1203    }
1204
1205    #[test]
1206    fn test_composite_pruning_statistics_empty_and_mismatched_containers() {
1207        // Test with empty statistics vector
1208        // This should never happen, so we panic instead of returning a Result which would burned callers
1209        let result = std::panic::catch_unwind(|| {
1210            CompositePruningStatistics::new(vec![]);
1211        });
1212        assert!(result.is_err());
1213
1214        // We should panic here because the number of containers is different
1215        let result = std::panic::catch_unwind(|| {
1216            // Create statistics with different number of containers
1217            // Use partition stats for the test
1218            let partition_values_1 = vec![
1219                vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
1220                vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
1221            ];
1222            let partition_fields_1 = vec![
1223                Arc::new(Field::new("part_a", DataType::Int32, false)),
1224                Arc::new(Field::new("part_b", DataType::Int32, false)),
1225            ];
1226            let partition_stats_1 = PartitionPruningStatistics::try_new(
1227                partition_values_1,
1228                partition_fields_1,
1229            )
1230            .unwrap();
1231            let partition_values_2 = vec![
1232                vec![ScalarValue::from(3i32), ScalarValue::from(30i32)],
1233                vec![ScalarValue::from(4i32), ScalarValue::from(40i32)],
1234                vec![ScalarValue::from(5i32), ScalarValue::from(50i32)],
1235            ];
1236            let partition_fields_2 = vec![
1237                Arc::new(Field::new("part_x", DataType::Int32, false)),
1238                Arc::new(Field::new("part_y", DataType::Int32, false)),
1239            ];
1240            let partition_stats_2 = PartitionPruningStatistics::try_new(
1241                partition_values_2,
1242                partition_fields_2,
1243            )
1244            .unwrap();
1245
1246            CompositePruningStatistics::new(vec![
1247                Box::new(partition_stats_1),
1248                Box::new(partition_stats_2),
1249            ]);
1250        });
1251        assert!(result.is_err());
1252    }
1253}