datafusion_pruning/
file_pruner.rs1use std::sync::Arc;
21
22use arrow::datatypes::{FieldRef, SchemaRef};
23use datafusion_common::{Result, internal_datafusion_err, pruning::PrunableStatistics};
24use datafusion_datasource::PartitionedFile;
25use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, snapshot_generation};
26use datafusion_physical_plan::metrics::Count;
27use log::debug;
28
29use crate::build_pruning_predicate;
30
31pub struct FilePruner {
37 predicate_generation: Option<u64>,
38 predicate: Arc<dyn PhysicalExpr>,
39 file_schema: SchemaRef,
41 file_stats_pruning: PrunableStatistics,
42 predicate_creation_errors: Count,
43}
44
45impl FilePruner {
46 #[deprecated(
47 since = "52.0.0",
48 note = "Use `try_new` instead which returns None if no statistics are available"
49 )]
50 #[expect(clippy::needless_pass_by_value)]
51 pub fn new(
52 predicate: Arc<dyn PhysicalExpr>,
53 logical_file_schema: &SchemaRef,
54 _partition_fields: Vec<FieldRef>,
55 partitioned_file: PartitionedFile,
56 predicate_creation_errors: Count,
57 ) -> Result<Self> {
58 Self::try_new(
59 predicate,
60 logical_file_schema,
61 &partitioned_file,
62 predicate_creation_errors,
63 )
64 .ok_or_else(|| {
65 internal_datafusion_err!(
66 "FilePruner::new called on a file without statistics: {:?}",
67 partitioned_file
68 )
69 })
70 }
71
72 pub fn try_new(
75 predicate: Arc<dyn PhysicalExpr>,
76 file_schema: &SchemaRef,
77 partitioned_file: &PartitionedFile,
78 predicate_creation_errors: Count,
79 ) -> Option<Self> {
80 let file_stats = partitioned_file.statistics.as_ref()?;
81 let file_stats_pruning =
82 PrunableStatistics::new(vec![file_stats.clone()], Arc::clone(file_schema));
83 Some(Self {
84 predicate_generation: None,
85 predicate,
86 file_schema: Arc::clone(file_schema),
87 file_stats_pruning,
88 predicate_creation_errors,
89 })
90 }
91
92 pub fn should_prune(&mut self) -> Result<bool> {
93 let new_generation = snapshot_generation(&self.predicate);
101 if let Some(current_generation) = self.predicate_generation.as_mut() {
102 if *current_generation == new_generation {
103 return Ok(false);
104 }
105 *current_generation = new_generation;
106 } else {
107 self.predicate_generation = Some(new_generation);
108 }
109 let pruning_predicate = build_pruning_predicate(
110 Arc::clone(&self.predicate),
111 &self.file_schema,
112 &self.predicate_creation_errors,
113 );
114 let Some(pruning_predicate) = pruning_predicate else {
115 return Ok(false);
116 };
117 match pruning_predicate.prune(&self.file_stats_pruning) {
118 Ok(values) => {
119 assert!(values.len() == 1);
120 if values.into_iter().all(|v| !v) {
122 return Ok(true);
123 }
124 }
125 Err(e) => {
127 debug!("Ignoring error building pruning predicate for file: {e}");
128 self.predicate_creation_errors.add(1);
129 }
130 }
131
132 Ok(false)
133 }
134}