Skip to main content

datafusion_common/
pruning.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{Array, NullArray, UInt64Array};
19use arrow::array::{ArrayRef, BooleanArray};
20use arrow::datatypes::{FieldRef, Schema, SchemaRef};
21use std::collections::HashSet;
22use std::sync::Arc;
23
24use crate::error::DataFusionError;
25use crate::stats::Precision;
26use crate::{Column, Statistics};
27use crate::{ColumnStatistics, ScalarValue};
28
29/// A source of runtime statistical information to [`PruningPredicate`]s.
30///
31/// # Supported Information
32///
33/// 1. Minimum and maximum values for columns
34///
35/// 2. Null counts and row counts for columns
36///
37/// 3. Whether the values in a column are contained in a set of literals
38///
39/// # Vectorized Interface
40///
41/// Information for containers / files are returned as Arrow [`ArrayRef`], so
42/// the evaluation happens once on a single `RecordBatch`, which amortizes the
43/// overhead of evaluating the predicate. This is important when pruning 1000s
44/// of containers which often happens in analytic systems that have 1000s of
45/// potential files to consider.
46///
47/// For example, for the following three files with a single column `a`:
48/// ```text
49/// file1: column a: min=5, max=10
50/// file2: column a: No stats
51/// file2: column a: min=20, max=30
52/// ```
53///
54/// PruningStatistics would return:
55///
56/// ```text
57/// min_values("a") -> Some([5, Null, 20])
58/// max_values("a") -> Some([10, Null, 30])
59/// min_values("X") -> None
60/// ```
61///
62/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html
63pub trait PruningStatistics {
64    /// Return the minimum values for the named column, if known.
65    ///
66    /// If the minimum value for a particular container is not known, the
67    /// returned array should have `null` in that row. If the minimum value is
68    /// not known for any row, return `None`.
69    ///
70    /// Note: the returned array must contain [`Self::num_containers`] rows
71    fn min_values(&self, column: &Column) -> Option<ArrayRef>;
72
73    /// Return the maximum values for the named column, if known.
74    ///
75    /// See [`Self::min_values`] for when to return `None` and null values.
76    ///
77    /// Note: the returned array must contain [`Self::num_containers`] rows
78    fn max_values(&self, column: &Column) -> Option<ArrayRef>;
79
80    /// Return the number of containers (e.g. Row Groups) being pruned with
81    /// these statistics.
82    ///
83    /// This value corresponds to the size of the [`ArrayRef`] returned by
84    /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
85    /// and [`Self::row_counts`].
86    fn num_containers(&self) -> usize;
87
88    /// Return the number of null values for the named column as an
89    /// [`UInt64Array`]
90    ///
91    /// See [`Self::min_values`] for when to return `None` and null values.
92    ///
93    /// Note: the returned array must contain [`Self::num_containers`] rows
94    ///
95    /// [`UInt64Array`]: arrow::array::UInt64Array
96    fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
97
98    /// Return the number of rows in each container as an [`UInt64Array`].
99    ///
100    /// Row counts are container-level (not column-specific) — the value
101    /// is the same regardless of which column is being considered.
102    ///
103    /// See [`Self::min_values`] for when to return `None` and null values.
104    ///
105    /// Note: the returned array must contain [`Self::num_containers`] rows
106    ///
107    /// [`UInt64Array`]: arrow::array::UInt64Array
108    fn row_counts(&self) -> Option<ArrayRef>;
109
110    /// Returns [`BooleanArray`] where each row represents information known
111    /// about specific literal `values` in a column.
112    ///
113    /// For example, Parquet Bloom Filters implement this API to communicate
114    /// that `values` are known not to be present in a Row Group.
115    ///
116    /// The returned array has one row for each container, with the following
117    /// meanings:
118    /// * `true` if the values in `column`  ONLY contain values from `values`
119    /// * `false` if the values in `column` are NOT ANY of `values`
120    /// * `null` if the neither of the above holds or is unknown.
121    ///
122    /// If these statistics can not determine column membership for any
123    /// container, return `None` (the default).
124    ///
125    /// Note: the returned array must contain [`Self::num_containers`] rows
126    #[allow(clippy::allow_attributes, clippy::mutable_key_type)] // ScalarValue has interior mutability but is intentionally used as hash key
127    fn contained(
128        &self,
129        column: &Column,
130        values: &HashSet<ScalarValue>,
131    ) -> Option<BooleanArray>;
132}
133
134/// Prune files based on their partition values.
135///
136/// This is used both at planning time and execution time to prune
137/// files based on their partition values.
138/// This feeds into [`CompositePruningStatistics`] to allow pruning
139/// with filters that depend both on partition columns and data columns
140/// (e.g. `WHERE partition_col = data_col`).
141#[deprecated(
142    since = "52.0.0",
143    note = "This struct is no longer used internally. Use `replace_columns_with_literals` from `datafusion-physical-expr-adapter` to substitute partition column values before pruning. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
144)]
145#[derive(Clone)]
146pub struct PartitionPruningStatistics {
147    /// Values for each column for each container.
148    ///
149    /// The outer vectors represent the columns while the inner vectors
150    /// represent the containers. The order must match the order of the
151    /// partition columns in [`PartitionPruningStatistics::partition_schema`].
152    partition_values: Vec<ArrayRef>,
153    /// The number of containers.
154    ///
155    /// Stored since the partition values are column-major and if
156    /// there are no columns we wouldn't know the number of containers.
157    num_containers: usize,
158    /// The schema of the partition columns.
159    ///
160    /// This must **not** be the schema of the entire file or table: it must
161    /// only be the schema of the partition columns, in the same order as the
162    /// values in [`PartitionPruningStatistics::partition_values`].
163    partition_schema: SchemaRef,
164}
165
166#[expect(deprecated)]
167impl PartitionPruningStatistics {
168    /// Create a new instance of [`PartitionPruningStatistics`].
169    ///
170    /// Args:
171    /// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
172    ///   The outer vector represents the containers while the inner
173    ///   vector represents the partition values for each column.
174    ///   Note that this is the **opposite** of the order of the
175    ///   partition columns in `PartitionPruningStatistics::partition_schema`.
176    /// * `partition_schema`: The schema of the partition columns.
177    ///   This must **not** be the schema of the entire file or table:
178    ///   instead it must only be the schema of the partition columns,
179    ///   in the same order as the values in `partition_values`.
180    ///
181    /// # Example
182    ///
183    /// To create [`PartitionPruningStatistics`] for two partition columns `a` and `b`,
184    /// for three containers like this:
185    ///
186    /// | a | b |
187    /// | - | - |
188    /// | 1 | 2 |
189    /// | 3 | 4 |
190    /// | 5 | 6 |
191    ///
192    /// ```
193    /// # use std::sync::Arc;
194    /// # use datafusion_common::ScalarValue;
195    /// # use arrow::datatypes::{DataType, Field};
196    /// # use datafusion_common::pruning::PartitionPruningStatistics;
197    ///
198    /// let partition_values = vec![
199    ///     vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
200    ///     vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
201    ///     vec![ScalarValue::from(5i32), ScalarValue::from(6i32)],
202    /// ];
203    /// let partition_fields = vec![
204    ///     Arc::new(Field::new("a", DataType::Int32, false)),
205    ///     Arc::new(Field::new("b", DataType::Int32, false)),
206    /// ];
207    /// let partition_stats =
208    ///     PartitionPruningStatistics::try_new(partition_values, partition_fields).unwrap();
209    /// ```
210    pub fn try_new(
211        partition_values: Vec<Vec<ScalarValue>>,
212        partition_fields: Vec<FieldRef>,
213    ) -> Result<Self, DataFusionError> {
214        let num_containers = partition_values.len();
215        let partition_schema = Arc::new(Schema::new(partition_fields));
216        let mut partition_values_by_column =
217            vec![
218                Vec::with_capacity(partition_values.len());
219                partition_schema.fields().len()
220            ];
221        for partition_value in partition_values {
222            for (i, value) in partition_value.into_iter().enumerate() {
223                partition_values_by_column[i].push(value);
224            }
225        }
226        Ok(Self {
227            partition_values: partition_values_by_column
228                .into_iter()
229                .map(|v| {
230                    if v.is_empty() {
231                        Ok(Arc::new(NullArray::new(0)) as ArrayRef)
232                    } else {
233                        ScalarValue::iter_to_array(v)
234                    }
235                })
236                .collect::<Result<Vec<_>, _>>()?,
237            num_containers,
238            partition_schema,
239        })
240    }
241}
242
243#[expect(deprecated)]
244impl PruningStatistics for PartitionPruningStatistics {
245    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
246        let index = self.partition_schema.index_of(column.name()).ok()?;
247        self.partition_values.get(index).and_then(|v| {
248            if v.is_empty() || v.null_count() == v.len() {
249                // If the array is empty or all nulls, return None
250                None
251            } else {
252                // Otherwise, return the array as is
253                Some(Arc::clone(v))
254            }
255        })
256    }
257
258    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
259        self.min_values(column)
260    }
261
262    fn num_containers(&self) -> usize {
263        self.num_containers
264    }
265
266    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
267        None
268    }
269
270    fn row_counts(&self) -> Option<ArrayRef> {
271        None
272    }
273
274    fn contained(
275        &self,
276        column: &Column,
277        values: &HashSet<ScalarValue>,
278    ) -> Option<BooleanArray> {
279        let index = self.partition_schema.index_of(column.name()).ok()?;
280        let array = self.partition_values.get(index)?;
281        let boolean_array = values.iter().try_fold(None, |acc, v| {
282            let arrow_value = v.to_scalar().ok()?;
283            let eq_result = arrow::compute::kernels::cmp::eq(array, &arrow_value).ok()?;
284            match acc {
285                None => Some(Some(eq_result)),
286                Some(acc_array) => {
287                    arrow::compute::kernels::boolean::or_kleene(&acc_array, &eq_result)
288                        .map(Some)
289                        .ok()
290                }
291            }
292        })??;
293        // If the boolean array is empty or all null values, return None
294        if boolean_array.is_empty() || boolean_array.null_count() == boolean_array.len() {
295            None
296        } else {
297            Some(boolean_array)
298        }
299    }
300}
301
302/// Prune a set of containers represented by their statistics.
303///
304/// Each [`Statistics`] represents a "container" -- some collection of data
305/// that has statistics of its columns.
306///
307/// It is up to the caller to decide what each container represents. For
308/// example, they can come from a file (e.g. [`PartitionedFile`]) or a set of of
309/// files (e.g. [`FileGroup`])
310///
311/// [`PartitionedFile`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.PartitionedFile.html
312/// [`FileGroup`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileGroup.html
313#[derive(Clone)]
314pub struct PrunableStatistics {
315    /// Statistics for each container.
316    /// These are taken as a reference since they may be rather large / expensive to clone
317    /// and we often won't return all of them as ArrayRefs (we only return the columns the predicate requests).
318    statistics: Vec<Arc<Statistics>>,
319    /// The schema of the file these statistics are for.
320    schema: SchemaRef,
321}
322
323impl PrunableStatistics {
324    /// Create a new instance of [`PrunableStatistics`].
325    /// Each [`Statistics`] represents a container (e.g. a file or a partition of files).
326    /// The `schema` is the schema of the data in the containers and should apply to all files.
327    pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
328        Self { statistics, schema }
329    }
330
331    fn get_exact_column_statistics(
332        &self,
333        column: &Column,
334        get_stat: impl Fn(&ColumnStatistics) -> &Precision<ScalarValue>,
335    ) -> Option<ArrayRef> {
336        let index = self.schema.index_of(column.name()).ok()?;
337        let mut has_value = false;
338        match ScalarValue::iter_to_array(self.statistics.iter().map(|s| {
339            s.column_statistics
340                .get(index)
341                .and_then(|stat| {
342                    if let Precision::Exact(min) = get_stat(stat) {
343                        has_value = true;
344                        Some(min.clone())
345                    } else {
346                        None
347                    }
348                })
349                .unwrap_or(ScalarValue::Null)
350        })) {
351            // If there is any non-null value and no errors, return the array
352            Ok(array) => has_value.then_some(array),
353            Err(_) => {
354                log::warn!(
355                    "Failed to convert min values to array for column {}",
356                    column.name()
357                );
358                None
359            }
360        }
361    }
362}
363
364impl PruningStatistics for PrunableStatistics {
365    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
366        self.get_exact_column_statistics(column, |stat| &stat.min_value)
367    }
368
369    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
370        self.get_exact_column_statistics(column, |stat| &stat.max_value)
371    }
372
373    fn num_containers(&self) -> usize {
374        self.statistics.len()
375    }
376
377    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
378        let index = self.schema.index_of(column.name()).ok()?;
379        if self.statistics.iter().any(|s| {
380            s.column_statistics
381                .get(index)
382                .is_some_and(|stat| stat.null_count.is_exact().unwrap_or(false))
383        }) {
384            Some(Arc::new(
385                self.statistics
386                    .iter()
387                    .map(|s| {
388                        s.column_statistics.get(index).and_then(|stat| {
389                            if let Precision::Exact(null_count) = &stat.null_count {
390                                u64::try_from(*null_count).ok()
391                            } else {
392                                None
393                            }
394                        })
395                    })
396                    .collect::<UInt64Array>(),
397            ))
398        } else {
399            None
400        }
401    }
402
403    fn row_counts(&self) -> Option<ArrayRef> {
404        if self
405            .statistics
406            .iter()
407            .any(|s| s.num_rows.is_exact().unwrap_or(false))
408        {
409            Some(Arc::new(
410                self.statistics
411                    .iter()
412                    .map(|s| {
413                        if let Precision::Exact(row_count) = &s.num_rows {
414                            u64::try_from(*row_count).ok()
415                        } else {
416                            None
417                        }
418                    })
419                    .collect::<UInt64Array>(),
420            ))
421        } else {
422            None
423        }
424    }
425
426    fn contained(
427        &self,
428        _column: &Column,
429        _values: &HashSet<ScalarValue>,
430    ) -> Option<BooleanArray> {
431        None
432    }
433}
434
435/// Combine multiple [`PruningStatistics`] into a single
436/// [`CompositePruningStatistics`].
437/// This can be used to combine statistics from different sources,
438/// for example partition values and file statistics.
439/// This allows pruning with filters that depend on multiple sources of statistics,
440/// such as `WHERE partition_col = data_col`.
441/// This is done by iterating over the statistics and returning the first
442/// one that has information for the requested column.
443/// If multiple statistics have information for the same column,
444/// the first one is returned without any regard for completeness or accuracy.
445/// That is: if the first statistics has information for a column, even if it is incomplete,
446/// that is returned even if a later statistics has more complete information.
447#[deprecated(
448    since = "52.0.0",
449    note = "This struct is no longer used internally. It may be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first. Please open an issue if you have a use case for it."
450)]
451pub struct CompositePruningStatistics {
452    pub statistics: Vec<Box<dyn PruningStatistics>>,
453}
454
455#[expect(deprecated)]
456impl CompositePruningStatistics {
457    /// Create a new instance of [`CompositePruningStatistics`] from
458    /// a vector of [`PruningStatistics`].
459    pub fn new(statistics: Vec<Box<dyn PruningStatistics>>) -> Self {
460        assert!(!statistics.is_empty());
461        // Check that all statistics have the same number of containers
462        let num_containers = statistics[0].num_containers();
463        for stats in &statistics {
464            assert_eq!(num_containers, stats.num_containers());
465        }
466        Self { statistics }
467    }
468}
469
470#[expect(deprecated)]
471impl PruningStatistics for CompositePruningStatistics {
472    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
473        for stats in &self.statistics {
474            if let Some(array) = stats.min_values(column) {
475                return Some(array);
476            }
477        }
478        None
479    }
480
481    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
482        for stats in &self.statistics {
483            if let Some(array) = stats.max_values(column) {
484                return Some(array);
485            }
486        }
487        None
488    }
489
490    fn num_containers(&self) -> usize {
491        self.statistics[0].num_containers()
492    }
493
494    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
495        for stats in &self.statistics {
496            if let Some(array) = stats.null_counts(column) {
497                return Some(array);
498            }
499        }
500        None
501    }
502
503    fn row_counts(&self) -> Option<ArrayRef> {
504        for stats in &self.statistics {
505            if let Some(array) = stats.row_counts() {
506                return Some(array);
507            }
508        }
509        None
510    }
511
512    fn contained(
513        &self,
514        column: &Column,
515        values: &HashSet<ScalarValue>,
516    ) -> Option<BooleanArray> {
517        for stats in &self.statistics {
518            if let Some(array) = stats.contained(column, values) {
519                return Some(array);
520            }
521        }
522        None
523    }
524}
525
526#[cfg(test)]
527#[expect(deprecated)]
528#[allow(clippy::allow_attributes, clippy::mutable_key_type)] // ScalarValue has interior mutability but is intentionally used as hash key
529mod tests {
530    use crate::{
531        ColumnStatistics,
532        cast::{as_int32_array, as_uint64_array},
533    };
534
535    use super::*;
536    use arrow::datatypes::{DataType, Field};
537    use std::sync::Arc;
538
539    /// return a PartitionPruningStatistics for two columns 'a' and 'b'
540    /// and the following stats
541    ///
542    /// | a | b |
543    /// | - | - |
544    /// | 1 | 2 |
545    /// | 3 | 4 |
546    fn partition_pruning_statistics_setup() -> PartitionPruningStatistics {
547        let partition_values = vec![
548            vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
549            vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
550        ];
551        let partition_fields = vec![
552            Arc::new(Field::new("a", DataType::Int32, false)),
553            Arc::new(Field::new("b", DataType::Int32, false)),
554        ];
555        PartitionPruningStatistics::try_new(partition_values, partition_fields).unwrap()
556    }
557
558    #[test]
559    fn test_partition_pruning_statistics() {
560        let partition_stats = partition_pruning_statistics_setup();
561
562        let column_a = Column::new_unqualified("a");
563        let column_b = Column::new_unqualified("b");
564
565        // Partition values don't know anything about nulls or row counts
566        assert!(partition_stats.null_counts(&column_a).is_none());
567        assert!(partition_stats.row_counts().is_none());
568        assert!(partition_stats.null_counts(&column_b).is_none());
569        assert!(partition_stats.row_counts().is_none());
570
571        // Min/max values are the same as the partition values
572        let min_values_a =
573            as_int32_array(&partition_stats.min_values(&column_a).unwrap())
574                .unwrap()
575                .into_iter()
576                .collect::<Vec<_>>();
577        let expected_values_a = vec![Some(1), Some(3)];
578        assert_eq!(min_values_a, expected_values_a);
579        let max_values_a =
580            as_int32_array(&partition_stats.max_values(&column_a).unwrap())
581                .unwrap()
582                .into_iter()
583                .collect::<Vec<_>>();
584        let expected_values_a = vec![Some(1), Some(3)];
585        assert_eq!(max_values_a, expected_values_a);
586
587        let min_values_b =
588            as_int32_array(&partition_stats.min_values(&column_b).unwrap())
589                .unwrap()
590                .into_iter()
591                .collect::<Vec<_>>();
592        let expected_values_b = vec![Some(2), Some(4)];
593        assert_eq!(min_values_b, expected_values_b);
594        let max_values_b =
595            as_int32_array(&partition_stats.max_values(&column_b).unwrap())
596                .unwrap()
597                .into_iter()
598                .collect::<Vec<_>>();
599        let expected_values_b = vec![Some(2), Some(4)];
600        assert_eq!(max_values_b, expected_values_b);
601
602        // Contained values are only true for the partition values
603        let values = HashSet::from([ScalarValue::from(1i32)]);
604        let contained_a = partition_stats.contained(&column_a, &values).unwrap();
605        let expected_contained_a = BooleanArray::from(vec![true, false]);
606        assert_eq!(contained_a, expected_contained_a);
607        let contained_b = partition_stats.contained(&column_b, &values).unwrap();
608        let expected_contained_b = BooleanArray::from(vec![false, false]);
609        assert_eq!(contained_b, expected_contained_b);
610
611        // The number of containers is the length of the partition values
612        assert_eq!(partition_stats.num_containers(), 2);
613    }
614
615    #[test]
616    fn test_partition_pruning_statistics_multiple_positive_values() {
617        let partition_stats = partition_pruning_statistics_setup();
618
619        let column_a = Column::new_unqualified("a");
620
621        // The two containers have `a` values 1 and 3, so they both only contain values from 1 and 3
622        let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(3i32)]);
623        let contained_a = partition_stats.contained(&column_a, &values).unwrap();
624        let expected_contained_a = BooleanArray::from(vec![true, true]);
625        assert_eq!(contained_a, expected_contained_a);
626    }
627
628    #[test]
629    fn test_partition_pruning_statistics_multiple_negative_values() {
630        let partition_stats = partition_pruning_statistics_setup();
631
632        let column_a = Column::new_unqualified("a");
633
634        // The two containers have `a` values 1 and 3,
635        // so the first contains ONLY values from 1,2
636        // but the second does not
637        let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(2i32)]);
638        let contained_a = partition_stats.contained(&column_a, &values).unwrap();
639        let expected_contained_a = BooleanArray::from(vec![true, false]);
640        assert_eq!(contained_a, expected_contained_a);
641    }
642
643    #[test]
644    fn test_partition_pruning_statistics_null_in_values() {
645        let partition_values = vec![
646            vec![
647                ScalarValue::from(1i32),
648                ScalarValue::from(2i32),
649                ScalarValue::from(3i32),
650            ],
651            vec![
652                ScalarValue::from(4i32),
653                ScalarValue::from(5i32),
654                ScalarValue::from(6i32),
655            ],
656        ];
657        let partition_fields = vec![
658            Arc::new(Field::new("a", DataType::Int32, false)),
659            Arc::new(Field::new("b", DataType::Int32, false)),
660            Arc::new(Field::new("c", DataType::Int32, false)),
661        ];
662        let partition_stats =
663            PartitionPruningStatistics::try_new(partition_values, partition_fields)
664                .unwrap();
665
666        let column_a = Column::new_unqualified("a");
667        let column_b = Column::new_unqualified("b");
668        let column_c = Column::new_unqualified("c");
669
670        let values_a = HashSet::from([ScalarValue::from(1i32), ScalarValue::Int32(None)]);
671        let contained_a = partition_stats.contained(&column_a, &values_a).unwrap();
672        let mut builder = BooleanArray::builder(2);
673        builder.append_value(true);
674        builder.append_null();
675        let expected_contained_a = builder.finish();
676        assert_eq!(contained_a, expected_contained_a);
677
678        // First match creates a NULL boolean array
679        // The accumulator should update the value to true for the second value
680        let values_b = HashSet::from([ScalarValue::Int32(None), ScalarValue::from(5i32)]);
681        let contained_b = partition_stats.contained(&column_b, &values_b).unwrap();
682        let mut builder = BooleanArray::builder(2);
683        builder.append_null();
684        builder.append_value(true);
685        let expected_contained_b = builder.finish();
686        assert_eq!(contained_b, expected_contained_b);
687
688        // All matches are null, contained should return None
689        let values_c = HashSet::from([ScalarValue::Int32(None)]);
690        let contained_c = partition_stats.contained(&column_c, &values_c);
691        assert!(contained_c.is_none());
692    }
693
694    #[test]
695    fn test_partition_pruning_statistics_empty() {
696        let partition_values = vec![];
697        let partition_fields = vec![
698            Arc::new(Field::new("a", DataType::Int32, false)),
699            Arc::new(Field::new("b", DataType::Int32, false)),
700        ];
701        let partition_stats =
702            PartitionPruningStatistics::try_new(partition_values, partition_fields)
703                .unwrap();
704
705        let column_a = Column::new_unqualified("a");
706        let column_b = Column::new_unqualified("b");
707
708        // Partition values don't know anything about nulls or row counts
709        assert!(partition_stats.null_counts(&column_a).is_none());
710        assert!(partition_stats.row_counts().is_none());
711        assert!(partition_stats.null_counts(&column_b).is_none());
712        assert!(partition_stats.row_counts().is_none());
713
714        // Min/max values are all missing
715        assert!(partition_stats.min_values(&column_a).is_none());
716        assert!(partition_stats.max_values(&column_a).is_none());
717        assert!(partition_stats.min_values(&column_b).is_none());
718        assert!(partition_stats.max_values(&column_b).is_none());
719
720        // Contained values are all empty
721        let values = HashSet::from([ScalarValue::from(1i32)]);
722        assert!(partition_stats.contained(&column_a, &values).is_none());
723    }
724
725    #[test]
726    fn test_statistics_pruning_statistics() {
727        let statistics = vec![
728            Arc::new(
729                Statistics::default()
730                    .add_column_statistics(
731                        ColumnStatistics::new_unknown()
732                            .with_min_value(Precision::Exact(ScalarValue::from(0i32)))
733                            .with_max_value(Precision::Exact(ScalarValue::from(100i32)))
734                            .with_null_count(Precision::Exact(0)),
735                    )
736                    .add_column_statistics(
737                        ColumnStatistics::new_unknown()
738                            .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
739                            .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
740                            .with_null_count(Precision::Exact(5)),
741                    )
742                    .with_num_rows(Precision::Exact(100)),
743            ),
744            Arc::new(
745                Statistics::default()
746                    .add_column_statistics(
747                        ColumnStatistics::new_unknown()
748                            .with_min_value(Precision::Exact(ScalarValue::from(50i32)))
749                            .with_max_value(Precision::Exact(ScalarValue::from(300i32)))
750                            .with_null_count(Precision::Exact(10)),
751                    )
752                    .add_column_statistics(
753                        ColumnStatistics::new_unknown()
754                            .with_min_value(Precision::Exact(ScalarValue::from(200i32)))
755                            .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
756                            .with_null_count(Precision::Exact(0)),
757                    )
758                    .with_num_rows(Precision::Exact(200)),
759            ),
760        ];
761
762        let schema = Arc::new(Schema::new(vec![
763            Field::new("a", DataType::Int32, false),
764            Field::new("b", DataType::Int32, false),
765            Field::new("c", DataType::Int32, false),
766        ]));
767        let pruning_stats = PrunableStatistics::new(statistics, schema);
768
769        let column_a = Column::new_unqualified("a");
770        let column_b = Column::new_unqualified("b");
771
772        // Min/max values are the same as the statistics
773        let min_values_a = as_int32_array(&pruning_stats.min_values(&column_a).unwrap())
774            .unwrap()
775            .into_iter()
776            .collect::<Vec<_>>();
777        let expected_values_a = vec![Some(0), Some(50)];
778        assert_eq!(min_values_a, expected_values_a);
779        let max_values_a = as_int32_array(&pruning_stats.max_values(&column_a).unwrap())
780            .unwrap()
781            .into_iter()
782            .collect::<Vec<_>>();
783        let expected_values_a = vec![Some(100), Some(300)];
784        assert_eq!(max_values_a, expected_values_a);
785        let min_values_b = as_int32_array(&pruning_stats.min_values(&column_b).unwrap())
786            .unwrap()
787            .into_iter()
788            .collect::<Vec<_>>();
789        let expected_values_b = vec![Some(100), Some(200)];
790        assert_eq!(min_values_b, expected_values_b);
791        let max_values_b = as_int32_array(&pruning_stats.max_values(&column_b).unwrap())
792            .unwrap()
793            .into_iter()
794            .collect::<Vec<_>>();
795        let expected_values_b = vec![Some(200), Some(400)];
796        assert_eq!(max_values_b, expected_values_b);
797
798        // Null counts are the same as the statistics
799        let null_counts_a =
800            as_uint64_array(&pruning_stats.null_counts(&column_a).unwrap())
801                .unwrap()
802                .into_iter()
803                .collect::<Vec<_>>();
804        let expected_null_counts_a = vec![Some(0), Some(10)];
805        assert_eq!(null_counts_a, expected_null_counts_a);
806        let null_counts_b =
807            as_uint64_array(&pruning_stats.null_counts(&column_b).unwrap())
808                .unwrap()
809                .into_iter()
810                .collect::<Vec<_>>();
811        let expected_null_counts_b = vec![Some(5), Some(0)];
812        assert_eq!(null_counts_b, expected_null_counts_b);
813
814        // Row counts are the same as the statistics
815        let row_counts_a = as_uint64_array(&pruning_stats.row_counts().unwrap())
816            .unwrap()
817            .into_iter()
818            .collect::<Vec<_>>();
819        let expected_row_counts_a = vec![Some(100), Some(200)];
820        assert_eq!(row_counts_a, expected_row_counts_a);
821        let row_counts_b = as_uint64_array(&pruning_stats.row_counts().unwrap())
822            .unwrap()
823            .into_iter()
824            .collect::<Vec<_>>();
825        let expected_row_counts_b = vec![Some(100), Some(200)];
826        assert_eq!(row_counts_b, expected_row_counts_b);
827
828        // Contained values are all null/missing (we can't know this just from statistics)
829        let values = HashSet::from([ScalarValue::from(0i32)]);
830        assert!(pruning_stats.contained(&column_a, &values).is_none());
831        assert!(pruning_stats.contained(&column_b, &values).is_none());
832
833        // The number of containers is the length of the statistics
834        assert_eq!(pruning_stats.num_containers(), 2);
835
836        // Test with a column that has no statistics
837        let column_c = Column::new_unqualified("c");
838        assert!(pruning_stats.min_values(&column_c).is_none());
839        assert!(pruning_stats.max_values(&column_c).is_none());
840        assert!(pruning_stats.null_counts(&column_c).is_none());
841        // Since row counts uses the first column that has row counts we get them back even
842        // if this columns does not have them set.
843        // This is debatable, personally I think `row_count` should not take a `Column` as an argument
844        // at all since all columns should have the same number of rows.
845        // But for now we just document the current behavior in this test.
846        let row_counts_c = as_uint64_array(&pruning_stats.row_counts().unwrap())
847            .unwrap()
848            .into_iter()
849            .collect::<Vec<_>>();
850        let expected_row_counts_c = vec![Some(100), Some(200)];
851        assert_eq!(row_counts_c, expected_row_counts_c);
852        assert!(pruning_stats.contained(&column_c, &values).is_none());
853
854        // Test with a column that doesn't exist — column-specific stats
855        // return None, but row_counts is container-level and still available
856        let column_d = Column::new_unqualified("d");
857        assert!(pruning_stats.min_values(&column_d).is_none());
858        assert!(pruning_stats.max_values(&column_d).is_none());
859        assert!(pruning_stats.null_counts(&column_d).is_none());
860        assert!(pruning_stats.row_counts().is_some());
861        assert!(pruning_stats.contained(&column_d, &values).is_none());
862    }
863
864    #[test]
865    fn test_statistics_pruning_statistics_empty() {
866        let statistics = vec![];
867        let schema = Arc::new(Schema::new(vec![
868            Field::new("a", DataType::Int32, false),
869            Field::new("b", DataType::Int32, false),
870            Field::new("c", DataType::Int32, false),
871        ]));
872        let pruning_stats = PrunableStatistics::new(statistics, schema);
873
874        let column_a = Column::new_unqualified("a");
875        let column_b = Column::new_unqualified("b");
876
877        // Min/max values are all missing
878        assert!(pruning_stats.min_values(&column_a).is_none());
879        assert!(pruning_stats.max_values(&column_a).is_none());
880        assert!(pruning_stats.min_values(&column_b).is_none());
881        assert!(pruning_stats.max_values(&column_b).is_none());
882
883        // Null counts are all missing
884        assert!(pruning_stats.null_counts(&column_a).is_none());
885        assert!(pruning_stats.null_counts(&column_b).is_none());
886
887        // Row counts are all missing
888        assert!(pruning_stats.row_counts().is_none());
889        assert!(pruning_stats.row_counts().is_none());
890
891        // Contained values are all empty
892        let values = HashSet::from([ScalarValue::from(1i32)]);
893        assert!(pruning_stats.contained(&column_a, &values).is_none());
894    }
895
896    #[test]
897    fn test_composite_pruning_statistics_partition_and_file() {
898        // Create partition statistics
899        let partition_values = vec![
900            vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
901            vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
902        ];
903        let partition_fields = vec![
904            Arc::new(Field::new("part_a", DataType::Int32, false)),
905            Arc::new(Field::new("part_b", DataType::Int32, false)),
906        ];
907        let partition_stats =
908            PartitionPruningStatistics::try_new(partition_values, partition_fields)
909                .unwrap();
910
911        // Create file statistics
912        let file_statistics = vec![
913            Arc::new(
914                Statistics::default()
915                    .add_column_statistics(
916                        ColumnStatistics::new_unknown()
917                            .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
918                            .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
919                            .with_null_count(Precision::Exact(0)),
920                    )
921                    .add_column_statistics(
922                        ColumnStatistics::new_unknown()
923                            .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
924                            .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
925                            .with_null_count(Precision::Exact(5)),
926                    )
927                    .with_num_rows(Precision::Exact(100)),
928            ),
929            Arc::new(
930                Statistics::default()
931                    .add_column_statistics(
932                        ColumnStatistics::new_unknown()
933                            .with_min_value(Precision::Exact(ScalarValue::from(500i32)))
934                            .with_max_value(Precision::Exact(ScalarValue::from(600i32)))
935                            .with_null_count(Precision::Exact(10)),
936                    )
937                    .add_column_statistics(
938                        ColumnStatistics::new_unknown()
939                            .with_min_value(Precision::Exact(ScalarValue::from(700i32)))
940                            .with_max_value(Precision::Exact(ScalarValue::from(800i32)))
941                            .with_null_count(Precision::Exact(0)),
942                    )
943                    .with_num_rows(Precision::Exact(200)),
944            ),
945        ];
946
947        let file_schema = Arc::new(Schema::new(vec![
948            Field::new("col_x", DataType::Int32, false),
949            Field::new("col_y", DataType::Int32, false),
950        ]));
951        let file_stats = PrunableStatistics::new(file_statistics, file_schema);
952
953        // Create composite statistics
954        let composite_stats = CompositePruningStatistics::new(vec![
955            Box::new(partition_stats),
956            Box::new(file_stats),
957        ]);
958
959        // Test accessing columns that are only in partition statistics
960        let part_a = Column::new_unqualified("part_a");
961        let part_b = Column::new_unqualified("part_b");
962
963        // Test accessing columns that are only in file statistics
964        let col_x = Column::new_unqualified("col_x");
965        let col_y = Column::new_unqualified("col_y");
966
967        // For partition columns, should get values from partition statistics
968        let min_values_part_a =
969            as_int32_array(&composite_stats.min_values(&part_a).unwrap())
970                .unwrap()
971                .into_iter()
972                .collect::<Vec<_>>();
973        let expected_values_part_a = vec![Some(1), Some(2)];
974        assert_eq!(min_values_part_a, expected_values_part_a);
975
976        let max_values_part_a =
977            as_int32_array(&composite_stats.max_values(&part_a).unwrap())
978                .unwrap()
979                .into_iter()
980                .collect::<Vec<_>>();
981        // For partition values, min and max are the same
982        assert_eq!(max_values_part_a, expected_values_part_a);
983
984        let min_values_part_b =
985            as_int32_array(&composite_stats.min_values(&part_b).unwrap())
986                .unwrap()
987                .into_iter()
988                .collect::<Vec<_>>();
989        let expected_values_part_b = vec![Some(10), Some(20)];
990        assert_eq!(min_values_part_b, expected_values_part_b);
991
992        // For file columns, should get values from file statistics
993        let min_values_col_x =
994            as_int32_array(&composite_stats.min_values(&col_x).unwrap())
995                .unwrap()
996                .into_iter()
997                .collect::<Vec<_>>();
998        let expected_values_col_x = vec![Some(100), Some(500)];
999        assert_eq!(min_values_col_x, expected_values_col_x);
1000
1001        let max_values_col_x =
1002            as_int32_array(&composite_stats.max_values(&col_x).unwrap())
1003                .unwrap()
1004                .into_iter()
1005                .collect::<Vec<_>>();
1006        let expected_max_values_col_x = vec![Some(200), Some(600)];
1007        assert_eq!(max_values_col_x, expected_max_values_col_x);
1008
1009        let min_values_col_y =
1010            as_int32_array(&composite_stats.min_values(&col_y).unwrap())
1011                .unwrap()
1012                .into_iter()
1013                .collect::<Vec<_>>();
1014        let expected_values_col_y = vec![Some(300), Some(700)];
1015        assert_eq!(min_values_col_y, expected_values_col_y);
1016
1017        // Test null counts - only available from file statistics
1018        assert!(composite_stats.null_counts(&part_a).is_none());
1019        assert!(composite_stats.null_counts(&part_b).is_none());
1020
1021        let null_counts_col_x =
1022            as_uint64_array(&composite_stats.null_counts(&col_x).unwrap())
1023                .unwrap()
1024                .into_iter()
1025                .collect::<Vec<_>>();
1026        let expected_null_counts_col_x = vec![Some(0), Some(10)];
1027        assert_eq!(null_counts_col_x, expected_null_counts_col_x);
1028
1029        // Test row counts — container-level, available from file statistics
1030        let row_counts_col_x = as_uint64_array(&composite_stats.row_counts().unwrap())
1031            .unwrap()
1032            .into_iter()
1033            .collect::<Vec<_>>();
1034        let expected_row_counts = vec![Some(100), Some(200)];
1035        assert_eq!(row_counts_col_x, expected_row_counts);
1036
1037        // Test contained values - only available from partition statistics
1038        let values = HashSet::from([ScalarValue::from(1i32)]);
1039        let contained_part_a = composite_stats.contained(&part_a, &values).unwrap();
1040        let expected_contained_part_a = BooleanArray::from(vec![true, false]);
1041        assert_eq!(contained_part_a, expected_contained_part_a);
1042
1043        // File statistics don't implement contained
1044        assert!(composite_stats.contained(&col_x, &values).is_none());
1045
1046        // Non-existent column should return None for column-specific stats,
1047        // but row_counts is container-level and still available
1048        let non_existent = Column::new_unqualified("non_existent");
1049        assert!(composite_stats.min_values(&non_existent).is_none());
1050        assert!(composite_stats.max_values(&non_existent).is_none());
1051        assert!(composite_stats.null_counts(&non_existent).is_none());
1052        assert!(composite_stats.row_counts().is_some());
1053        assert!(composite_stats.contained(&non_existent, &values).is_none());
1054
1055        // Verify num_containers matches
1056        assert_eq!(composite_stats.num_containers(), 2);
1057    }
1058
1059    #[test]
1060    fn test_composite_pruning_statistics_priority() {
1061        // Create two sets of file statistics with the same column names
1062        // but different values to test that the first one gets priority
1063
1064        // First set of statistics
1065        let first_statistics = vec![
1066            Arc::new(
1067                Statistics::default()
1068                    .add_column_statistics(
1069                        ColumnStatistics::new_unknown()
1070                            .with_min_value(Precision::Exact(ScalarValue::from(100i32)))
1071                            .with_max_value(Precision::Exact(ScalarValue::from(200i32)))
1072                            .with_null_count(Precision::Exact(0)),
1073                    )
1074                    .with_num_rows(Precision::Exact(100)),
1075            ),
1076            Arc::new(
1077                Statistics::default()
1078                    .add_column_statistics(
1079                        ColumnStatistics::new_unknown()
1080                            .with_min_value(Precision::Exact(ScalarValue::from(300i32)))
1081                            .with_max_value(Precision::Exact(ScalarValue::from(400i32)))
1082                            .with_null_count(Precision::Exact(5)),
1083                    )
1084                    .with_num_rows(Precision::Exact(200)),
1085            ),
1086        ];
1087
1088        let first_schema = Arc::new(Schema::new(vec![Field::new(
1089            "col_a",
1090            DataType::Int32,
1091            false,
1092        )]));
1093        let first_stats = PrunableStatistics::new(first_statistics, first_schema);
1094
1095        // Second set of statistics with the same column name but different values
1096        let second_statistics = vec![
1097            Arc::new(
1098                Statistics::default()
1099                    .add_column_statistics(
1100                        ColumnStatistics::new_unknown()
1101                            .with_min_value(Precision::Exact(ScalarValue::from(1000i32)))
1102                            .with_max_value(Precision::Exact(ScalarValue::from(2000i32)))
1103                            .with_null_count(Precision::Exact(10)),
1104                    )
1105                    .with_num_rows(Precision::Exact(1000)),
1106            ),
1107            Arc::new(
1108                Statistics::default()
1109                    .add_column_statistics(
1110                        ColumnStatistics::new_unknown()
1111                            .with_min_value(Precision::Exact(ScalarValue::from(3000i32)))
1112                            .with_max_value(Precision::Exact(ScalarValue::from(4000i32)))
1113                            .with_null_count(Precision::Exact(20)),
1114                    )
1115                    .with_num_rows(Precision::Exact(2000)),
1116            ),
1117        ];
1118
1119        let second_schema = Arc::new(Schema::new(vec![Field::new(
1120            "col_a",
1121            DataType::Int32,
1122            false,
1123        )]));
1124        let second_stats = PrunableStatistics::new(second_statistics, second_schema);
1125
1126        // Create composite statistics with first stats having priority
1127        let composite_stats = CompositePruningStatistics::new(vec![
1128            Box::new(first_stats.clone()),
1129            Box::new(second_stats.clone()),
1130        ]);
1131
1132        let col_a = Column::new_unqualified("col_a");
1133
1134        // Should get values from first statistics since it has priority
1135        let min_values = as_int32_array(&composite_stats.min_values(&col_a).unwrap())
1136            .unwrap()
1137            .into_iter()
1138            .collect::<Vec<_>>();
1139        let expected_min_values = vec![Some(100), Some(300)];
1140        assert_eq!(min_values, expected_min_values);
1141
1142        let max_values = as_int32_array(&composite_stats.max_values(&col_a).unwrap())
1143            .unwrap()
1144            .into_iter()
1145            .collect::<Vec<_>>();
1146        let expected_max_values = vec![Some(200), Some(400)];
1147        assert_eq!(max_values, expected_max_values);
1148
1149        let null_counts = as_uint64_array(&composite_stats.null_counts(&col_a).unwrap())
1150            .unwrap()
1151            .into_iter()
1152            .collect::<Vec<_>>();
1153        let expected_null_counts = vec![Some(0), Some(5)];
1154        assert_eq!(null_counts, expected_null_counts);
1155
1156        let row_counts = as_uint64_array(&composite_stats.row_counts().unwrap())
1157            .unwrap()
1158            .into_iter()
1159            .collect::<Vec<_>>();
1160        let expected_row_counts = vec![Some(100), Some(200)];
1161        assert_eq!(row_counts, expected_row_counts);
1162
1163        // Create composite statistics with second stats having priority
1164        // Now that we've added Clone trait to PrunableStatistics, we can just clone them
1165
1166        let composite_stats_reversed = CompositePruningStatistics::new(vec![
1167            Box::new(second_stats.clone()),
1168            Box::new(first_stats.clone()),
1169        ]);
1170
1171        // Should get values from second statistics since it now has priority
1172        let min_values =
1173            as_int32_array(&composite_stats_reversed.min_values(&col_a).unwrap())
1174                .unwrap()
1175                .into_iter()
1176                .collect::<Vec<_>>();
1177        let expected_min_values = vec![Some(1000), Some(3000)];
1178        assert_eq!(min_values, expected_min_values);
1179
1180        let max_values =
1181            as_int32_array(&composite_stats_reversed.max_values(&col_a).unwrap())
1182                .unwrap()
1183                .into_iter()
1184                .collect::<Vec<_>>();
1185        let expected_max_values = vec![Some(2000), Some(4000)];
1186        assert_eq!(max_values, expected_max_values);
1187
1188        let null_counts =
1189            as_uint64_array(&composite_stats_reversed.null_counts(&col_a).unwrap())
1190                .unwrap()
1191                .into_iter()
1192                .collect::<Vec<_>>();
1193        let expected_null_counts = vec![Some(10), Some(20)];
1194        assert_eq!(null_counts, expected_null_counts);
1195
1196        let row_counts = as_uint64_array(&composite_stats_reversed.row_counts().unwrap())
1197            .unwrap()
1198            .into_iter()
1199            .collect::<Vec<_>>();
1200        let expected_row_counts = vec![Some(1000), Some(2000)];
1201        assert_eq!(row_counts, expected_row_counts);
1202    }
1203
1204    #[test]
1205    fn test_composite_pruning_statistics_empty_and_mismatched_containers() {
1206        // Test with empty statistics vector
1207        // This should never happen, so we panic instead of returning a Result which would burned callers
1208        let result = std::panic::catch_unwind(|| {
1209            CompositePruningStatistics::new(vec![]);
1210        });
1211        assert!(result.is_err());
1212
1213        // We should panic here because the number of containers is different
1214        let result = std::panic::catch_unwind(|| {
1215            // Create statistics with different number of containers
1216            // Use partition stats for the test
1217            let partition_values_1 = vec![
1218                vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
1219                vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
1220            ];
1221            let partition_fields_1 = vec![
1222                Arc::new(Field::new("part_a", DataType::Int32, false)),
1223                Arc::new(Field::new("part_b", DataType::Int32, false)),
1224            ];
1225            let partition_stats_1 = PartitionPruningStatistics::try_new(
1226                partition_values_1,
1227                partition_fields_1,
1228            )
1229            .unwrap();
1230            let partition_values_2 = vec![
1231                vec![ScalarValue::from(3i32), ScalarValue::from(30i32)],
1232                vec![ScalarValue::from(4i32), ScalarValue::from(40i32)],
1233                vec![ScalarValue::from(5i32), ScalarValue::from(50i32)],
1234            ];
1235            let partition_fields_2 = vec![
1236                Arc::new(Field::new("part_x", DataType::Int32, false)),
1237                Arc::new(Field::new("part_y", DataType::Int32, false)),
1238            ];
1239            let partition_stats_2 = PartitionPruningStatistics::try_new(
1240                partition_values_2,
1241                partition_fields_2,
1242            )
1243            .unwrap();
1244
1245            CompositePruningStatistics::new(vec![
1246                Box::new(partition_stats_1),
1247                Box::new(partition_stats_2),
1248            ]);
1249        });
1250        assert!(result.is_err());
1251    }
1252}