use std::sync::Arc;
use arrow::datatypes::{FieldRef, SchemaRef};
use datafusion_common::{Result, internal_datafusion_err, pruning::PrunableStatistics};
use datafusion_datasource::PartitionedFile;
use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, snapshot_generation};
use datafusion_physical_plan::metrics::Count;
use log::debug;
use crate::build_pruning_predicate;
pub struct FilePruner {
predicate_generation: Option<u64>,
predicate: Arc<dyn PhysicalExpr>,
file_schema: SchemaRef,
file_stats_pruning: PrunableStatistics,
predicate_creation_errors: Count,
}
impl FilePruner {
#[deprecated(
since = "52.0.0",
note = "Use `try_new` instead which returns None if no statistics are available"
)]
#[expect(clippy::needless_pass_by_value)]
pub fn new(
predicate: Arc<dyn PhysicalExpr>,
logical_file_schema: &SchemaRef,
_partition_fields: Vec<FieldRef>,
partitioned_file: PartitionedFile,
predicate_creation_errors: Count,
) -> Result<Self> {
Self::try_new(
predicate,
logical_file_schema,
&partitioned_file,
predicate_creation_errors,
)
.ok_or_else(|| {
internal_datafusion_err!(
"FilePruner::new called on a file without statistics: {:?}",
partitioned_file
)
})
}
pub fn try_new(
predicate: Arc<dyn PhysicalExpr>,
file_schema: &SchemaRef,
partitioned_file: &PartitionedFile,
predicate_creation_errors: Count,
) -> Option<Self> {
let file_stats = partitioned_file.statistics.as_ref()?;
let file_stats_pruning =
PrunableStatistics::new(vec![file_stats.clone()], Arc::clone(file_schema));
Some(Self {
predicate_generation: None,
predicate,
file_schema: Arc::clone(file_schema),
file_stats_pruning,
predicate_creation_errors,
})
}
pub fn should_prune(&mut self) -> Result<bool> {
let new_generation = snapshot_generation(&self.predicate);
if let Some(current_generation) = self.predicate_generation.as_mut() {
if *current_generation == new_generation {
return Ok(false);
}
*current_generation = new_generation;
} else {
self.predicate_generation = Some(new_generation);
}
let pruning_predicate = build_pruning_predicate(
Arc::clone(&self.predicate),
&self.file_schema,
&self.predicate_creation_errors,
);
let Some(pruning_predicate) = pruning_predicate else {
return Ok(false);
};
match pruning_predicate.prune(&self.file_stats_pruning) {
Ok(values) => {
assert!(values.len() == 1);
if values.into_iter().all(|v| !v) {
return Ok(true);
}
}
Err(e) => {
debug!("Ignoring error building pruning predicate for file: {e}");
self.predicate_creation_errors.add(1);
}
}
Ok(false)
}
}