datafusion_pruning/
file_pruner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! File-level pruning based on partition values and file-level statistics
19
20use std::sync::Arc;
21
22use arrow::datatypes::{FieldRef, SchemaRef};
23use datafusion_common::{Result, internal_datafusion_err, pruning::PrunableStatistics};
24use datafusion_datasource::PartitionedFile;
25use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, snapshot_generation};
26use datafusion_physical_plan::metrics::Count;
27use log::debug;
28
29use crate::build_pruning_predicate;
30
31/// Prune based on file-level statistics.
32///
33/// Note: Partition column pruning is handled earlier via `replace_columns_with_literals`
34/// which substitutes partition column references with their literal values before
35/// the predicate reaches this pruner.
36pub struct FilePruner {
37    predicate_generation: Option<u64>,
38    predicate: Arc<dyn PhysicalExpr>,
39    /// Schema used for pruning (the logical file schema).
40    file_schema: SchemaRef,
41    file_stats_pruning: PrunableStatistics,
42    predicate_creation_errors: Count,
43}
44
45impl FilePruner {
46    #[deprecated(
47        since = "52.0.0",
48        note = "Use `try_new` instead which returns None if no statistics are available"
49    )]
50    #[expect(clippy::needless_pass_by_value)]
51    pub fn new(
52        predicate: Arc<dyn PhysicalExpr>,
53        logical_file_schema: &SchemaRef,
54        _partition_fields: Vec<FieldRef>,
55        partitioned_file: PartitionedFile,
56        predicate_creation_errors: Count,
57    ) -> Result<Self> {
58        Self::try_new(
59            predicate,
60            logical_file_schema,
61            &partitioned_file,
62            predicate_creation_errors,
63        )
64        .ok_or_else(|| {
65            internal_datafusion_err!(
66                "FilePruner::new called on a file without statistics: {:?}",
67                partitioned_file
68            )
69        })
70    }
71
72    /// Create a new file pruner if statistics are available.
73    /// Returns None if this file does not have statistics.
74    pub fn try_new(
75        predicate: Arc<dyn PhysicalExpr>,
76        file_schema: &SchemaRef,
77        partitioned_file: &PartitionedFile,
78        predicate_creation_errors: Count,
79    ) -> Option<Self> {
80        let file_stats = partitioned_file.statistics.as_ref()?;
81        let file_stats_pruning =
82            PrunableStatistics::new(vec![file_stats.clone()], Arc::clone(file_schema));
83        Some(Self {
84            predicate_generation: None,
85            predicate,
86            file_schema: Arc::clone(file_schema),
87            file_stats_pruning,
88            predicate_creation_errors,
89        })
90    }
91
92    pub fn should_prune(&mut self) -> Result<bool> {
93        // Check if the predicate has changed since last invocation by tracking
94        // its "generation". Dynamic filter expressions can change their values
95        // during query execution, so we use generation tracking to detect when
96        // the predicate has been updated and needs to be rebuilt.
97        //
98        // If the generation hasn't changed, we can skip rebuilding the pruning
99        // predicate, which is an expensive operation involving expression analysis.
100        let new_generation = snapshot_generation(&self.predicate);
101        if let Some(current_generation) = self.predicate_generation.as_mut() {
102            if *current_generation == new_generation {
103                return Ok(false);
104            }
105            *current_generation = new_generation;
106        } else {
107            self.predicate_generation = Some(new_generation);
108        }
109        let pruning_predicate = build_pruning_predicate(
110            Arc::clone(&self.predicate),
111            &self.file_schema,
112            &self.predicate_creation_errors,
113        );
114        let Some(pruning_predicate) = pruning_predicate else {
115            return Ok(false);
116        };
117        match pruning_predicate.prune(&self.file_stats_pruning) {
118            Ok(values) => {
119                assert!(values.len() == 1);
120                // We expect a single container -> if all containers are false skip this file
121                if values.into_iter().all(|v| !v) {
122                    return Ok(true);
123                }
124            }
125            // Stats filter array could not be built, so we can't prune
126            Err(e) => {
127                debug!("Ignoring error building pruning predicate for file: {e}");
128                self.predicate_creation_errors.add(1);
129            }
130        }
131
132        Ok(false)
133    }
134}