use std::any::Any;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use crate::DefaultParquetFileReaderFactory;
use crate::ParquetFileReaderFactory;
use crate::opener::ParquetOpener;
use crate::opener::build_pruning_predicates;
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
use datafusion_common::config::ConfigOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file_stream::FileOpener;
use arrow::datatypes::TimeUnit;
use datafusion_common::DataFusionError;
use datafusion_common::config::TableParquetOptions;
use datafusion_datasource::TableSchema;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::{EquivalenceProperties, conjunction};
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_plan::DisplayFormatType;
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::PushedDown;
use datafusion_physical_plan::filter_pushdown::{
FilterPushdownPropagation, PushedDownPredicate,
};
use datafusion_physical_plan::metrics::Count;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
#[cfg(feature = "parquet_encryption")]
use datafusion_execution::parquet_encryption::EncryptionFactory;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use itertools::Itertools;
use object_store::ObjectStore;
#[cfg(feature = "parquet_encryption")]
use parquet::encryption::decrypt::FileDecryptionProperties;
#[derive(Clone, Debug)]
pub struct ParquetSource {
pub(crate) table_parquet_options: TableParquetOptions,
pub(crate) metrics: ExecutionPlanMetricsSet,
pub(crate) table_schema: TableSchema,
pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
pub(crate) parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
pub(crate) batch_size: Option<usize>,
pub(crate) metadata_size_hint: Option<usize>,
pub(crate) projection: ProjectionExprs,
#[cfg(feature = "parquet_encryption")]
pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
reverse_row_groups: bool,
}
impl ParquetSource {
pub fn new(table_schema: impl Into<TableSchema>) -> Self {
let table_schema = table_schema.into();
let full_schema = table_schema.table_schema();
let indices: Vec<usize> = (0..full_schema.fields().len()).collect();
Self {
projection: ProjectionExprs::from_indices(&indices, full_schema),
table_schema,
table_parquet_options: TableParquetOptions::default(),
metrics: ExecutionPlanMetricsSet::new(),
predicate: None,
parquet_file_reader_factory: None,
batch_size: None,
metadata_size_hint: None,
#[cfg(feature = "parquet_encryption")]
encryption_factory: None,
reverse_row_groups: false,
}
}
pub fn with_table_parquet_options(
mut self,
table_parquet_options: TableParquetOptions,
) -> Self {
self.table_parquet_options = table_parquet_options;
self
}
pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
self.metadata_size_hint = Some(metadata_size_hint);
self
}
#[expect(clippy::needless_pass_by_value)]
pub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> Self {
let mut conf = self.clone();
conf.predicate = Some(Arc::clone(&predicate));
conf
}
#[cfg(feature = "parquet_encryption")]
pub fn with_encryption_factory(
mut self,
encryption_factory: Arc<dyn EncryptionFactory>,
) -> Self {
self.encryption_factory = Some(encryption_factory);
self
}
pub fn table_parquet_options(&self) -> &TableParquetOptions {
&self.table_parquet_options
}
#[deprecated(since = "50.2.0", note = "use `filter` instead")]
pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>> {
self.predicate.as_ref()
}
pub fn parquet_file_reader_factory(
&self,
) -> Option<&Arc<dyn ParquetFileReaderFactory>> {
self.parquet_file_reader_factory.as_ref()
}
pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
) -> Self {
self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
self
}
pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
self.table_parquet_options.global.pushdown_filters = pushdown_filters;
self
}
pub(crate) fn pushdown_filters(&self) -> bool {
self.table_parquet_options.global.pushdown_filters
}
pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self {
self.table_parquet_options.global.reorder_filters = reorder_filters;
self
}
fn reorder_filters(&self) -> bool {
self.table_parquet_options.global.reorder_filters
}
fn force_filter_selections(&self) -> bool {
self.table_parquet_options.global.force_filter_selections
}
pub fn with_enable_page_index(mut self, enable_page_index: bool) -> Self {
self.table_parquet_options.global.enable_page_index = enable_page_index;
self
}
fn enable_page_index(&self) -> bool {
self.table_parquet_options.global.enable_page_index
}
pub fn with_bloom_filter_on_read(mut self, bloom_filter_on_read: bool) -> Self {
self.table_parquet_options.global.bloom_filter_on_read = bloom_filter_on_read;
self
}
pub fn with_bloom_filter_on_write(
mut self,
enable_bloom_filter_on_write: bool,
) -> Self {
self.table_parquet_options.global.bloom_filter_on_write =
enable_bloom_filter_on_write;
self
}
fn bloom_filter_on_read(&self) -> bool {
self.table_parquet_options.global.bloom_filter_on_read
}
pub fn max_predicate_cache_size(&self) -> Option<usize> {
self.table_parquet_options.global.max_predicate_cache_size
}
#[cfg(feature = "parquet_encryption")]
fn get_encryption_factory_with_config(
&self,
) -> Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)> {
match &self.encryption_factory {
None => None,
Some(factory) => Some((
Arc::clone(factory),
self.table_parquet_options.crypto.factory_options.clone(),
)),
}
}
pub(crate) fn with_reverse_row_groups(mut self, reverse_row_groups: bool) -> Self {
self.reverse_row_groups = reverse_row_groups;
self
}
#[cfg(test)]
pub(crate) fn reverse_row_groups(&self) -> bool {
self.reverse_row_groups
}
}
pub(crate) fn parse_coerce_int96_string(
str_setting: &str,
) -> datafusion_common::Result<TimeUnit> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
"ns" => Ok(TimeUnit::Nanosecond),
"us" => Ok(TimeUnit::Microsecond),
"ms" => Ok(TimeUnit::Millisecond),
"s" => Ok(TimeUnit::Second),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet coerce_int96: \
{str_setting}. Valid values are: ns, us, ms, and s."
))),
}
}
impl From<ParquetSource> for Arc<dyn FileSource> {
fn from(source: ParquetSource) -> Self {
as_file_source(source)
}
}
impl FileSource for ParquetSource {
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
let expr_adapter_factory = base_config
.expr_adapter_factory
.clone()
.unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory) as _);
let parquet_file_reader_factory =
self.parquet_file_reader_factory.clone().unwrap_or_else(|| {
Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
});
#[cfg(feature = "parquet_encryption")]
let file_decryption_properties = self
.table_parquet_options()
.crypto
.file_decryption
.clone()
.map(FileDecryptionProperties::from)
.map(Arc::new);
let coerce_int96 = self
.table_parquet_options
.global
.coerce_int96
.as_ref()
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());
let opener = Arc::new(ParquetOpener {
partition_index: partition,
projection: self.projection.clone(),
batch_size: self
.batch_size
.expect("Batch size must set before creating ParquetOpener"),
limit: base_config.limit,
preserve_order: base_config.preserve_order,
predicate: self.predicate.clone(),
table_schema: self.table_schema.clone(),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics().clone(),
parquet_file_reader_factory,
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
force_filter_selections: self.force_filter_selections(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties,
expr_adapter_factory,
#[cfg(feature = "parquet_encryption")]
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
});
Ok(opener)
}
fn as_any(&self) -> &dyn Any {
self
}
fn table_schema(&self) -> &TableSchema {
&self.table_schema
}
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
self.predicate.clone()
}
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.batch_size = Some(batch_size);
Arc::new(conf)
}
fn try_pushdown_projection(
&self,
projection: &ProjectionExprs,
) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> {
let mut source = self.clone();
source.projection = self.projection.try_merge(projection)?;
Ok(Some(Arc::new(source)))
}
fn projection(&self) -> Option<&ProjectionExprs> {
Some(&self.projection)
}
fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}
fn file_type(&self) -> &str {
"parquet"
}
fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let predicate_string = self
.filter()
.map(|p| format!(", predicate={p}"))
.unwrap_or_default();
write!(f, "{predicate_string}")?;
if self.reverse_row_groups {
write!(f, ", reverse_row_groups=true")?;
}
if let Some(predicate) = &self.predicate {
let predicate_creation_errors = Count::new();
if let (Some(pruning_predicate), _) = build_pruning_predicates(
Some(predicate),
self.table_schema.table_schema(),
&predicate_creation_errors,
) {
let mut guarantees = pruning_predicate
.literal_guarantees()
.iter()
.map(|item| format!("{item}"))
.collect_vec();
guarantees.sort();
write!(
f,
", pruning_predicate={}, required_guarantees=[{}]",
pruning_predicate.predicate_expr(),
guarantees.join(", ")
)?;
}
};
Ok(())
}
DisplayFormatType::TreeRender => {
if let Some(predicate) = self.filter() {
writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
}
Ok(())
}
}
}
fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
let table_schema = self.table_schema.table_schema();
let config_pushdown_enabled = config.execution.parquet.pushdown_filters;
let table_pushdown_enabled = self.pushdown_filters();
let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;
let mut source = self.clone();
let filters: Vec<PushedDownPredicate> = filters
.into_iter()
.map(|filter| {
if can_expr_be_pushed_down_with_schemas(&filter, table_schema) {
PushedDownPredicate::supported(filter)
} else {
PushedDownPredicate::unsupported(filter)
}
})
.collect();
if filters
.iter()
.all(|f| matches!(f.discriminant, PushedDown::No))
{
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
));
}
let allowed_filters = filters
.iter()
.filter_map(|f| match f.discriminant {
PushedDown::Yes => Some(Arc::clone(&f.predicate)),
PushedDown::No => None,
})
.collect_vec();
let predicate = match source.predicate {
Some(predicate) => {
conjunction(std::iter::once(predicate).chain(allowed_filters))
}
None => conjunction(allowed_filters),
};
source.predicate = Some(predicate);
source = source.with_pushdown_filters(pushdown_filters);
let source = Arc::new(source);
if !pushdown_filters {
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
)
.with_updated_node(source));
}
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
filters.iter().map(|f| f.discriminant).collect(),
)
.with_updated_node(source))
}
fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
eq_properties: &EquivalenceProperties,
) -> datafusion_common::Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
if order.is_empty() {
return Ok(SortOrderPushdownResult::Unsupported);
}
let reversed_eq_properties = {
let mut new = eq_properties.clone();
new.clear_orderings();
let reversed_orderings = eq_properties
.oeq_class()
.iter()
.map(|ordering| {
ordering
.iter()
.map(|expr| expr.reverse())
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
new.add_orderings(reversed_orderings);
new
};
if !reversed_eq_properties.ordering_satisfy(order.iter().cloned())? {
return Ok(SortOrderPushdownResult::Unsupported);
}
let new_source = self.clone().with_reverse_row_groups(true);
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(new_source) as Arc<dyn FileSource>,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::Schema;
use datafusion_physical_expr::expressions::lit;
#[test]
#[expect(deprecated)]
fn test_parquet_source_predicate_same_as_filter() {
let predicate = lit(true);
let parquet_source =
ParquetSource::new(Arc::new(Schema::empty())).with_predicate(predicate);
assert_eq!(parquet_source.predicate(), parquet_source.filter().as_ref());
}
#[test]
fn test_reverse_scan_default_value() {
use arrow::datatypes::Schema;
let schema = Arc::new(Schema::empty());
let source = ParquetSource::new(schema);
assert!(!source.reverse_row_groups());
}
#[test]
fn test_reverse_scan_with_setter() {
use arrow::datatypes::Schema;
let schema = Arc::new(Schema::empty());
let source = ParquetSource::new(schema.clone()).with_reverse_row_groups(true);
assert!(source.reverse_row_groups());
let source = source.with_reverse_row_groups(false);
assert!(!source.reverse_row_groups());
}
#[test]
fn test_reverse_scan_clone_preserves_value() {
use arrow::datatypes::Schema;
let schema = Arc::new(Schema::empty());
let source = ParquetSource::new(schema).with_reverse_row_groups(true);
let cloned = source.clone();
assert!(cloned.reverse_row_groups());
assert_eq!(source.reverse_row_groups(), cloned.reverse_row_groups());
}
#[test]
fn test_reverse_scan_with_other_options() {
use arrow::datatypes::Schema;
use datafusion_common::config::TableParquetOptions;
let schema = Arc::new(Schema::empty());
let options = TableParquetOptions::default();
let source = ParquetSource::new(schema)
.with_table_parquet_options(options)
.with_metadata_size_hint(8192)
.with_reverse_row_groups(true);
assert!(source.reverse_row_groups());
assert_eq!(source.metadata_size_hint, Some(8192));
}
#[test]
fn test_reverse_scan_builder_pattern() {
use arrow::datatypes::Schema;
let schema = Arc::new(Schema::empty());
let source = ParquetSource::new(schema)
.with_reverse_row_groups(true)
.with_reverse_row_groups(false)
.with_reverse_row_groups(true);
assert!(source.reverse_row_groups());
}
#[test]
fn test_reverse_scan_independent_of_predicate() {
use arrow::datatypes::Schema;
use datafusion_physical_expr::expressions::lit;
let schema = Arc::new(Schema::empty());
let predicate = lit(true);
let source = ParquetSource::new(schema)
.with_predicate(predicate)
.with_reverse_row_groups(true);
assert!(source.reverse_row_groups());
assert!(source.filter().is_some());
}
}