use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::sync::Arc;
use arrow::array::BooleanArray;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::file::metadata::ParquetMetaData;
use datafusion_common::Result;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
use datafusion_physical_plan::metrics;
use super::ParquetFileMetrics;
#[derive(Debug)]
pub(crate) struct DatafusionArrowPredicate {
physical_expr: Arc<dyn PhysicalExpr>,
projection_mask: ProjectionMask,
rows_pruned: metrics::Count,
rows_matched: metrics::Count,
time: metrics::Time,
}
impl DatafusionArrowPredicate {
pub fn try_new(
candidate: FilterCandidate,
metadata: &ParquetMetaData,
rows_pruned: metrics::Count,
rows_matched: metrics::Count,
time: metrics::Time,
) -> Result<Self> {
let physical_expr =
reassign_expr_columns(candidate.expr, &candidate.filter_schema)?;
Ok(Self {
physical_expr,
projection_mask: ProjectionMask::roots(
metadata.file_metadata().schema_descr(),
candidate.projection,
),
rows_pruned,
rows_matched,
time,
})
}
}
impl ArrowPredicate for DatafusionArrowPredicate {
fn projection(&self) -> &ProjectionMask {
&self.projection_mask
}
fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
let mut timer = self.time.timer();
self.physical_expr
.evaluate(&batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
let bool_arr = as_boolean_array(&array)?.clone();
let num_matched = bool_arr.true_count();
let num_pruned = bool_arr.len() - num_matched;
self.rows_pruned.add(num_pruned);
self.rows_matched.add(num_matched);
timer.stop();
Ok(bool_arr)
})
.map_err(|e| {
ArrowError::ComputeError(format!(
"Error evaluating filter predicate: {e:?}"
))
})
}
}
pub(crate) struct FilterCandidate {
expr: Arc<dyn PhysicalExpr>,
required_bytes: usize,
can_use_index: bool,
projection: Vec<usize>,
filter_schema: SchemaRef,
}
struct FilterCandidateBuilder {
expr: Arc<dyn PhysicalExpr>,
file_schema: SchemaRef,
}
impl FilterCandidateBuilder {
pub fn new(expr: Arc<dyn PhysicalExpr>, file_schema: Arc<Schema>) -> Self {
Self { expr, file_schema }
}
pub fn build(self, metadata: &ParquetMetaData) -> Result<Option<FilterCandidate>> {
let Some(required_column_indices) =
pushdown_columns(&self.expr, &self.file_schema)?
else {
return Ok(None);
};
let projected_schema =
Arc::new(self.file_schema.project(&required_column_indices)?);
let required_bytes = size_of_columns(&required_column_indices, metadata)?;
let can_use_index = columns_sorted(&required_column_indices, metadata)?;
Ok(Some(FilterCandidate {
expr: self.expr,
required_bytes,
can_use_index,
projection: required_column_indices,
filter_schema: projected_schema,
}))
}
}
struct PushdownChecker<'schema> {
non_primitive_columns: bool,
projected_columns: bool,
required_columns: BTreeSet<usize>,
file_schema: &'schema Schema,
}
impl<'schema> PushdownChecker<'schema> {
fn new(file_schema: &'schema Schema) -> Self {
Self {
non_primitive_columns: false,
projected_columns: false,
required_columns: BTreeSet::default(),
file_schema,
}
}
fn check_single_column(&mut self, column_name: &str) -> Option<TreeNodeRecursion> {
if let Ok(idx) = self.file_schema.index_of(column_name) {
self.required_columns.insert(idx);
if DataType::is_nested(self.file_schema.field(idx).data_type()) {
self.non_primitive_columns = true;
return Some(TreeNodeRecursion::Jump);
}
} else {
self.projected_columns = true;
return Some(TreeNodeRecursion::Jump);
}
None
}
#[inline]
fn prevents_pushdown(&self) -> bool {
self.non_primitive_columns || self.projected_columns
}
}
impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
type Node = Arc<dyn PhysicalExpr>;
fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
if let Some(column) = node.as_any().downcast_ref::<Column>()
&& let Some(recursion) = self.check_single_column(column.name())
{
return Ok(recursion);
}
Ok(TreeNodeRecursion::Continue)
}
}
fn pushdown_columns(
expr: &Arc<dyn PhysicalExpr>,
file_schema: &Schema,
) -> Result<Option<Vec<usize>>> {
let mut checker = PushdownChecker::new(file_schema);
expr.visit(&mut checker)?;
Ok((!checker.prevents_pushdown())
.then_some(checker.required_columns.into_iter().collect()))
}
pub fn can_expr_be_pushed_down_with_schemas(
expr: &Arc<dyn PhysicalExpr>,
file_schema: &Schema,
) -> bool {
match pushdown_columns(expr, file_schema) {
Ok(Some(_)) => true,
Ok(None) | Err(_) => false,
}
}
fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usize> {
let mut total_size = 0;
let row_groups = metadata.row_groups();
for idx in columns {
for rg in row_groups.iter() {
total_size += rg.column(*idx).compressed_size() as usize;
}
}
Ok(total_size)
}
fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<bool> {
Ok(false)
}
pub fn build_row_filter(
expr: &Arc<dyn PhysicalExpr>,
file_schema: &SchemaRef,
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowFilter>> {
let rows_pruned = &file_metrics.pushdown_rows_pruned;
let rows_matched = &file_metrics.pushdown_rows_matched;
let time = &file_metrics.row_pushdown_eval_time;
let predicates = split_conjunction(expr);
let mut candidates: Vec<FilterCandidate> = predicates
.into_iter()
.map(|expr| {
FilterCandidateBuilder::new(Arc::clone(expr), Arc::clone(file_schema))
.build(metadata)
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten()
.collect();
if candidates.is_empty() {
return Ok(None);
}
if reorder_predicates {
candidates.sort_unstable_by(|c1, c2| {
match c1.can_use_index.cmp(&c2.can_use_index) {
Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
ord => ord,
}
});
}
let total_candidates = candidates.len();
candidates
.into_iter()
.enumerate()
.map(|(idx, candidate)| {
let is_last = idx == total_candidates - 1;
let predicate_rows_pruned = rows_pruned.clone();
let predicate_rows_matched = if is_last {
rows_matched.clone()
} else {
metrics::Count::new()
};
DatafusionArrowPredicate::try_new(
candidate,
metadata,
predicate_rows_pruned,
predicate_rows_matched,
time.clone(),
)
.map(|pred| Box::new(pred) as _)
})
.collect::<Result<Vec<_>, _>>()
.map(|filters| Some(RowFilter::new(filters)))
}
#[cfg(test)]
mod test {
use super::*;
use datafusion_common::ScalarValue;
use arrow::datatypes::{Field, TimeUnit::Nanosecond};
use datafusion_expr::{Expr, col};
use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_expr_adapter::{
DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory,
};
use datafusion_physical_plan::metrics::{Count, Time};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
#[test]
fn test_filter_candidate_builder_ignore_complex_types() {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/list_columns.parquet"))
.expect("opening file");
let reader = SerializedFileReader::new(file).expect("creating reader");
let metadata = reader.metadata();
let table_schema =
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema");
let expr = col("int64_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);
let table_schema = Arc::new(table_schema.clone());
let candidate = FilterCandidateBuilder::new(expr, table_schema)
.build(metadata)
.expect("building candidate");
assert!(candidate.is_none());
}
#[test]
fn test_filter_type_coercion() {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
.expect("opening file");
let parquet_reader_builder =
ParquetRecordBatchReaderBuilder::try_new(file).expect("creating reader");
let metadata = parquet_reader_builder.metadata().clone();
let file_schema = parquet_reader_builder.schema().clone();
let table_schema = Schema::new(vec![Field::new(
"timestamp_col",
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
false,
)]);
let expr = col("timestamp_col").lt(Expr::Literal(
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
None,
));
let expr = logical2physical(&expr, &table_schema);
let expr = DefaultPhysicalExprAdapterFactory {}
.create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
.rewrite(expr)
.expect("rewriting expression");
let candidate = FilterCandidateBuilder::new(expr, file_schema.clone())
.build(&metadata)
.expect("building candidate")
.expect("candidate expected");
let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
&metadata,
Count::new(),
Count::new(),
Time::new(),
)
.expect("creating filter predicate");
let mut parquet_reader = parquet_reader_builder
.with_projection(row_filter.projection().clone())
.build()
.expect("building reader");
let first_rb = parquet_reader
.next()
.expect("expected record batch")
.expect("expected error free record batch");
let filtered = row_filter.evaluate(first_rb.clone());
assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8])));
let expr = col("timestamp_col").gt(Expr::Literal(
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
None,
));
let expr = logical2physical(&expr, &table_schema);
let expr = DefaultPhysicalExprAdapterFactory {}
.create(Arc::new(table_schema), Arc::clone(&file_schema))
.rewrite(expr)
.expect("rewriting expression");
let candidate = FilterCandidateBuilder::new(expr, file_schema)
.build(&metadata)
.expect("building candidate")
.expect("candidate expected");
let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
&metadata,
Count::new(),
Count::new(),
Time::new(),
)
.expect("creating filter predicate");
let filtered = row_filter.evaluate(first_rb);
assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8])));
}
#[test]
fn nested_data_structures_prevent_pushdown() {
let table_schema = Arc::new(get_lists_table_schema());
let expr = col("utf8_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);
check_expression_can_evaluate_against_schema(&expr, &table_schema);
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}
#[test]
fn projected_columns_prevent_pushdown() {
let table_schema = get_basic_table_schema();
let expr =
Arc::new(Column::new("nonexistent_column", 0)) as Arc<dyn PhysicalExpr>;
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}
#[test]
fn basic_expr_doesnt_prevent_pushdown() {
let table_schema = get_basic_table_schema();
let expr = col("string_col").is_null();
let expr = logical2physical(&expr, &table_schema);
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}
#[test]
fn complex_expr_doesnt_prevent_pushdown() {
let table_schema = get_basic_table_schema();
let expr = col("string_col")
.is_not_null()
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
let expr = logical2physical(&expr, &table_schema);
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}
fn get_basic_table_schema() -> Schema {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
.expect("opening file");
let reader = SerializedFileReader::new(file).expect("creating reader");
let metadata = reader.metadata();
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema")
}
fn get_lists_table_schema() -> Schema {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/list_columns.parquet"))
.expect("opening file");
let reader = SerializedFileReader::new(file).expect("creating reader");
let metadata = reader.metadata();
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema")
}
fn check_expression_can_evaluate_against_schema(
expr: &Arc<dyn PhysicalExpr>,
table_schema: &Arc<Schema>,
) -> bool {
let batch = RecordBatch::new_empty(Arc::clone(table_schema));
expr.evaluate(&batch).is_ok()
}
}