use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::sync::Arc;
use arrow::array::BooleanArray;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::file::metadata::ParquetMetaData;
use parquet::schema::types::SchemaDescriptor;
use datafusion_common::Result;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
use datafusion_physical_plan::metrics;
use super::ParquetFileMetrics;
use super::supported_predicates::supports_list_predicates;
#[derive(Debug)]
pub(crate) struct DatafusionArrowPredicate {
physical_expr: Arc<dyn PhysicalExpr>,
projection_mask: ProjectionMask,
rows_pruned: metrics::Count,
rows_matched: metrics::Count,
time: metrics::Time,
}
impl DatafusionArrowPredicate {
pub fn try_new(
candidate: FilterCandidate,
metadata: &ParquetMetaData,
rows_pruned: metrics::Count,
rows_matched: metrics::Count,
time: metrics::Time,
) -> Result<Self> {
let physical_expr =
reassign_expr_columns(candidate.expr, &candidate.filter_schema)?;
Ok(Self {
physical_expr,
projection_mask: ProjectionMask::leaves(
metadata.file_metadata().schema_descr(),
candidate.projection.leaf_indices.iter().copied(),
),
rows_pruned,
rows_matched,
time,
})
}
}
impl ArrowPredicate for DatafusionArrowPredicate {
fn projection(&self) -> &ProjectionMask {
&self.projection_mask
}
fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
let mut timer = self.time.timer();
self.physical_expr
.evaluate(&batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
let bool_arr = as_boolean_array(&array)?.clone();
let num_matched = bool_arr.true_count();
let num_pruned = bool_arr.len() - num_matched;
self.rows_pruned.add(num_pruned);
self.rows_matched.add(num_matched);
timer.stop();
Ok(bool_arr)
})
.map_err(|e| {
ArrowError::ComputeError(format!(
"Error evaluating filter predicate: {e:?}"
))
})
}
}
pub(crate) struct FilterCandidate {
expr: Arc<dyn PhysicalExpr>,
required_bytes: usize,
can_use_index: bool,
projection: LeafProjection,
filter_schema: SchemaRef,
}
#[derive(Debug, Clone)]
struct LeafProjection {
leaf_indices: Vec<usize>,
}
struct FilterCandidateBuilder {
expr: Arc<dyn PhysicalExpr>,
file_schema: SchemaRef,
}
impl FilterCandidateBuilder {
pub fn new(expr: Arc<dyn PhysicalExpr>, file_schema: Arc<Schema>) -> Self {
Self { expr, file_schema }
}
pub fn build(self, metadata: &ParquetMetaData) -> Result<Option<FilterCandidate>> {
let Some(required_columns) = pushdown_columns(&self.expr, &self.file_schema)?
else {
return Ok(None);
};
let root_indices: Vec<_> =
required_columns.required_columns.into_iter().collect();
let leaf_indices = leaf_indices_for_roots(
&root_indices,
metadata.file_metadata().schema_descr(),
);
let projected_schema = Arc::new(self.file_schema.project(&root_indices)?);
let required_bytes = size_of_columns(&leaf_indices, metadata)?;
let can_use_index = columns_sorted(&leaf_indices, metadata)?;
Ok(Some(FilterCandidate {
expr: self.expr,
required_bytes,
can_use_index,
projection: LeafProjection { leaf_indices },
filter_schema: projected_schema,
}))
}
}
struct PushdownChecker<'schema> {
non_primitive_columns: bool,
projected_columns: bool,
required_columns: Vec<usize>,
allow_list_columns: bool,
file_schema: &'schema Schema,
}
impl<'schema> PushdownChecker<'schema> {
fn new(file_schema: &'schema Schema, allow_list_columns: bool) -> Self {
Self {
non_primitive_columns: false,
projected_columns: false,
required_columns: Vec::new(),
allow_list_columns,
file_schema,
}
}
fn check_single_column(&mut self, column_name: &str) -> Option<TreeNodeRecursion> {
let idx = match self.file_schema.index_of(column_name) {
Ok(idx) => idx,
Err(_) => {
self.projected_columns = true;
return Some(TreeNodeRecursion::Jump);
}
};
self.required_columns.push(idx);
let data_type = self.file_schema.field(idx).data_type();
if DataType::is_nested(data_type) {
self.handle_nested_type(data_type)
} else {
None
}
}
fn handle_nested_type(&mut self, data_type: &DataType) -> Option<TreeNodeRecursion> {
if self.is_nested_type_supported(data_type) {
None
} else {
self.non_primitive_columns = true;
Some(TreeNodeRecursion::Jump)
}
}
fn is_nested_type_supported(&self, data_type: &DataType) -> bool {
let is_list = matches!(
data_type,
DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _)
);
self.allow_list_columns && is_list
}
#[inline]
fn prevents_pushdown(&self) -> bool {
self.non_primitive_columns || self.projected_columns
}
fn into_sorted_columns(mut self) -> PushdownColumns {
self.required_columns.sort_unstable();
self.required_columns.dedup();
PushdownColumns {
required_columns: self.required_columns,
}
}
}
impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
type Node = Arc<dyn PhysicalExpr>;
fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
if let Some(column) = node.as_any().downcast_ref::<Column>()
&& let Some(recursion) = self.check_single_column(column.name())
{
return Ok(recursion);
}
Ok(TreeNodeRecursion::Continue)
}
}
#[derive(Debug)]
struct PushdownColumns {
required_columns: Vec<usize>,
}
fn pushdown_columns(
expr: &Arc<dyn PhysicalExpr>,
file_schema: &Schema,
) -> Result<Option<PushdownColumns>> {
let allow_list_columns = supports_list_predicates(expr);
let mut checker = PushdownChecker::new(file_schema, allow_list_columns);
expr.visit(&mut checker)?;
Ok((!checker.prevents_pushdown()).then(|| checker.into_sorted_columns()))
}
fn leaf_indices_for_roots(
root_indices: &[usize],
schema_descr: &SchemaDescriptor,
) -> Vec<usize> {
let root_set: BTreeSet<_> = root_indices.iter().copied().collect();
(0..schema_descr.num_columns())
.filter(|leaf_idx| {
root_set.contains(&schema_descr.get_column_root_idx(*leaf_idx))
})
.collect()
}
pub fn can_expr_be_pushed_down_with_schemas(
expr: &Arc<dyn PhysicalExpr>,
file_schema: &Schema,
) -> bool {
match pushdown_columns(expr, file_schema) {
Ok(Some(_)) => true,
Ok(None) | Err(_) => false,
}
}
fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usize> {
let mut total_size = 0;
let row_groups = metadata.row_groups();
for idx in columns {
for rg in row_groups.iter() {
total_size += rg.column(*idx).compressed_size() as usize;
}
}
Ok(total_size)
}
fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<bool> {
Ok(false)
}
pub fn build_row_filter(
expr: &Arc<dyn PhysicalExpr>,
file_schema: &SchemaRef,
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowFilter>> {
let rows_pruned = &file_metrics.pushdown_rows_pruned;
let rows_matched = &file_metrics.pushdown_rows_matched;
let time = &file_metrics.row_pushdown_eval_time;
let predicates = split_conjunction(expr);
let mut candidates: Vec<FilterCandidate> = predicates
.into_iter()
.map(|expr| {
FilterCandidateBuilder::new(Arc::clone(expr), Arc::clone(file_schema))
.build(metadata)
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten()
.collect();
if candidates.is_empty() {
return Ok(None);
}
if reorder_predicates {
candidates.sort_unstable_by(|c1, c2| {
match c1.can_use_index.cmp(&c2.can_use_index) {
Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
ord => ord,
}
});
}
let total_candidates = candidates.len();
candidates
.into_iter()
.enumerate()
.map(|(idx, candidate)| {
let is_last = idx == total_candidates - 1;
let predicate_rows_pruned = rows_pruned.clone();
let predicate_rows_matched = if is_last {
rows_matched.clone()
} else {
metrics::Count::new()
};
DatafusionArrowPredicate::try_new(
candidate,
metadata,
predicate_rows_pruned,
predicate_rows_matched,
time.clone(),
)
.map(|pred| Box::new(pred) as _)
})
.collect::<Result<Vec<_>, _>>()
.map(|filters| Some(RowFilter::new(filters)))
}
#[cfg(test)]
mod test {
use super::*;
use datafusion_common::ScalarValue;
use arrow::array::{ListBuilder, StringBuilder};
use arrow::datatypes::{Field, TimeUnit::Nanosecond};
use datafusion_expr::{Expr, col};
use datafusion_functions_nested::array_has::{
array_has_all_udf, array_has_any_udf, array_has_udf,
};
use datafusion_functions_nested::expr_fn::{
array_has, array_has_all, array_has_any, make_array,
};
use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_expr_adapter::{
DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory,
};
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, Time};
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
use tempfile::NamedTempFile;
#[test]
fn test_filter_candidate_builder_supports_list_types() {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/list_columns.parquet"))
.expect("opening file");
let reader = SerializedFileReader::new(file).expect("creating reader");
let metadata = reader.metadata();
let table_schema =
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema");
let expr = col("int64_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);
let table_schema = Arc::new(table_schema.clone());
let list_index = table_schema
.index_of("int64_list")
.expect("list column should exist");
let candidate = FilterCandidateBuilder::new(expr, table_schema)
.build(metadata)
.expect("building candidate")
.expect("list pushdown should be supported");
assert_eq!(candidate.projection.leaf_indices, vec![list_index]);
}
#[test]
fn test_filter_type_coercion() {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
.expect("opening file");
let parquet_reader_builder =
ParquetRecordBatchReaderBuilder::try_new(file).expect("creating reader");
let metadata = parquet_reader_builder.metadata().clone();
let file_schema = parquet_reader_builder.schema().clone();
let table_schema = Schema::new(vec![Field::new(
"timestamp_col",
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
false,
)]);
let expr = col("timestamp_col").lt(Expr::Literal(
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
None,
));
let expr = logical2physical(&expr, &table_schema);
let expr = DefaultPhysicalExprAdapterFactory {}
.create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
.expect("creating expr adapter")
.rewrite(expr)
.expect("rewriting expression");
let candidate = FilterCandidateBuilder::new(expr, file_schema.clone())
.build(&metadata)
.expect("building candidate")
.expect("candidate expected");
let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
&metadata,
Count::new(),
Count::new(),
Time::new(),
)
.expect("creating filter predicate");
let mut parquet_reader = parquet_reader_builder
.with_projection(row_filter.projection().clone())
.build()
.expect("building reader");
let first_rb = parquet_reader
.next()
.expect("expected record batch")
.expect("expected error free record batch");
let filtered = row_filter.evaluate(first_rb.clone());
assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8])));
let expr = col("timestamp_col").gt(Expr::Literal(
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
None,
));
let expr = logical2physical(&expr, &table_schema);
let expr = DefaultPhysicalExprAdapterFactory {}
.create(Arc::new(table_schema), Arc::clone(&file_schema))
.expect("creating expr adapter")
.rewrite(expr)
.expect("rewriting expression");
let candidate = FilterCandidateBuilder::new(expr, file_schema)
.build(&metadata)
.expect("building candidate")
.expect("candidate expected");
let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
&metadata,
Count::new(),
Count::new(),
Time::new(),
)
.expect("creating filter predicate");
let filtered = row_filter.evaluate(first_rb);
assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8])));
}
#[test]
fn struct_data_structures_prevent_pushdown() {
let table_schema = Arc::new(Schema::new(vec![Field::new(
"struct_col",
DataType::Struct(
vec![Arc::new(Field::new("a", DataType::Int32, true))].into(),
),
true,
)]));
let expr = col("struct_col").is_not_null();
let expr = logical2physical(&expr, &table_schema);
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}
#[test]
fn mixed_primitive_and_struct_prevents_pushdown() {
let table_schema = Arc::new(Schema::new(vec![
Field::new(
"struct_col",
DataType::Struct(
vec![Arc::new(Field::new("a", DataType::Int32, true))].into(),
),
true,
),
Field::new("int_col", DataType::Int32, false),
]));
let expr = col("struct_col")
.is_not_null()
.and(col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None)));
let expr = logical2physical(&expr, &table_schema);
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
let expr_int_only =
col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None));
let expr_int_only = logical2physical(&expr_int_only, &table_schema);
assert!(can_expr_be_pushed_down_with_schemas(
&expr_int_only,
&table_schema
));
}
#[test]
fn nested_lists_allow_pushdown_checks() {
let table_schema = Arc::new(get_lists_table_schema());
let expr = col("utf8_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);
check_expression_can_evaluate_against_schema(&expr, &table_schema);
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}
#[test]
fn array_has_all_pushdown_filters_rows() {
let expr = array_has_all(
col("letters"),
make_array(vec![Expr::Literal(
ScalarValue::Utf8(Some("c".to_string())),
None,
)]),
);
test_array_predicate_pushdown("array_has_all", expr, 1, 2, true);
}
fn test_array_predicate_pushdown(
func_name: &str,
predicate_expr: Expr,
expected_pruned: usize,
expected_matched: usize,
expect_list_support: bool,
) {
let item_field = Arc::new(Field::new("item", DataType::Utf8, true));
let schema = Arc::new(Schema::new(vec![Field::new(
"letters",
DataType::List(item_field),
true,
)]));
let mut builder = ListBuilder::new(StringBuilder::new());
builder.values().append_value("a");
builder.values().append_value("b");
builder.append(true);
builder.values().append_value("c");
builder.append(true);
builder.values().append_value("c");
builder.values().append_value("d");
builder.append(true);
let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])
.expect("record batch");
let file = NamedTempFile::new().expect("temp file");
let mut writer =
ArrowWriter::try_new(file.reopen().unwrap(), schema, None).expect("writer");
writer.write(&batch).expect("write batch");
writer.close().expect("close writer");
let reader_file = file.reopen().expect("reopen file");
let parquet_reader_builder =
ParquetRecordBatchReaderBuilder::try_new(reader_file)
.expect("reader builder");
let metadata = parquet_reader_builder.metadata().clone();
let file_schema = parquet_reader_builder.schema().clone();
let expr = logical2physical(&predicate_expr, &file_schema);
if expect_list_support {
assert!(supports_list_predicates(&expr));
}
let metrics = ExecutionPlanMetricsSet::new();
let file_metrics =
ParquetFileMetrics::new(0, &format!("{func_name}.parquet"), &metrics);
let row_filter =
build_row_filter(&expr, &file_schema, &metadata, false, &file_metrics)
.expect("building row filter")
.expect("row filter should exist");
let reader = parquet_reader_builder
.with_row_filter(row_filter)
.build()
.expect("build reader");
let mut total_rows = 0;
for batch in reader {
let batch = batch.expect("record batch");
total_rows += batch.num_rows();
}
assert_eq!(
file_metrics.pushdown_rows_pruned.value(),
expected_pruned,
"{func_name}: expected {expected_pruned} pruned rows"
);
assert_eq!(
file_metrics.pushdown_rows_matched.value(),
expected_matched,
"{func_name}: expected {expected_matched} matched rows"
);
assert_eq!(
total_rows, expected_matched,
"{func_name}: expected {expected_matched} total rows"
);
}
#[test]
fn array_has_pushdown_filters_rows() {
let expr = array_has(
col("letters"),
Expr::Literal(ScalarValue::Utf8(Some("c".to_string())), None),
);
test_array_predicate_pushdown("array_has", expr, 1, 2, true);
}
#[test]
fn array_has_any_pushdown_filters_rows() {
let expr = array_has_any(
col("letters"),
make_array(vec![
Expr::Literal(ScalarValue::Utf8(Some("a".to_string())), None),
Expr::Literal(ScalarValue::Utf8(Some("d".to_string())), None),
]),
);
test_array_predicate_pushdown("array_has_any", expr, 1, 2, true);
}
#[test]
fn array_has_udf_pushdown_filters_rows() {
let expr = array_has_udf().call(vec![
col("letters"),
Expr::Literal(ScalarValue::Utf8(Some("c".to_string())), None),
]);
test_array_predicate_pushdown("array_has_udf", expr, 1, 2, true);
}
#[test]
fn array_has_all_udf_pushdown_filters_rows() {
let expr = array_has_all_udf().call(vec![
col("letters"),
make_array(vec![Expr::Literal(
ScalarValue::Utf8(Some("c".to_string())),
None,
)]),
]);
test_array_predicate_pushdown("array_has_all_udf", expr, 1, 2, true);
}
#[test]
fn array_has_any_udf_pushdown_filters_rows() {
let expr = array_has_any_udf().call(vec![
col("letters"),
make_array(vec![
Expr::Literal(ScalarValue::Utf8(Some("a".to_string())), None),
Expr::Literal(ScalarValue::Utf8(Some("d".to_string())), None),
]),
]);
test_array_predicate_pushdown("array_has_any_udf", expr, 1, 2, true);
}
#[test]
fn projected_columns_prevent_pushdown() {
let table_schema = get_basic_table_schema();
let expr =
Arc::new(Column::new("nonexistent_column", 0)) as Arc<dyn PhysicalExpr>;
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}
#[test]
fn basic_expr_doesnt_prevent_pushdown() {
let table_schema = get_basic_table_schema();
let expr = col("string_col").is_null();
let expr = logical2physical(&expr, &table_schema);
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}
#[test]
fn complex_expr_doesnt_prevent_pushdown() {
let table_schema = get_basic_table_schema();
let expr = col("string_col")
.is_not_null()
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
let expr = logical2physical(&expr, &table_schema);
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}
fn get_basic_table_schema() -> Schema {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
.expect("opening file");
let reader = SerializedFileReader::new(file).expect("creating reader");
let metadata = reader.metadata();
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema")
}
fn get_lists_table_schema() -> Schema {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/list_columns.parquet"))
.expect("opening file");
let reader = SerializedFileReader::new(file).expect("creating reader");
let metadata = reader.metadata();
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema")
}
#[test]
fn test_filter_pushdown_leaf_index_with_struct_in_schema() {
use arrow::array::{Int32Array, StringArray, StructArray};
let schema = Arc::new(Schema::new(vec![
Field::new("col_a", DataType::Int32, false),
Field::new(
"struct_col",
DataType::Struct(
vec![
Arc::new(Field::new("x", DataType::Int32, true)),
Arc::new(Field::new("y", DataType::Int32, true)),
]
.into(),
),
true,
),
Field::new("col_b", DataType::Utf8, false),
]));
let col_a = Arc::new(Int32Array::from(vec![1, 2, 3]));
let struct_col = Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("x", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
),
(
Arc::new(Field::new("y", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![100, 200, 300])) as _,
),
]));
let col_b = Arc::new(StringArray::from(vec!["aaa", "target", "zzz"]));
let batch =
RecordBatch::try_new(Arc::clone(&schema), vec![col_a, struct_col, col_b])
.unwrap();
let file = NamedTempFile::new().expect("temp file");
let mut writer =
ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), None)
.expect("writer");
writer.write(&batch).expect("write batch");
writer.close().expect("close writer");
let reader_file = file.reopen().expect("reopen file");
let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
.expect("reader builder");
let metadata = builder.metadata().clone();
let file_schema = builder.schema().clone();
assert_eq!(metadata.file_metadata().schema_descr().num_columns(), 4);
assert_eq!(file_schema.fields().len(), 3);
let expr = col("col_b").eq(Expr::Literal(
ScalarValue::Utf8(Some("target".to_string())),
None,
));
let expr = logical2physical(&expr, &file_schema);
let candidate = FilterCandidateBuilder::new(expr, file_schema)
.build(&metadata)
.expect("building candidate")
.expect("filter on primitive col_b should be pushable");
assert_eq!(
candidate.projection.leaf_indices,
vec![3],
"leaf_indices should be [3] for col_b"
);
}
fn check_expression_can_evaluate_against_schema(
expr: &Arc<dyn PhysicalExpr>,
table_schema: &Arc<Schema>,
) -> bool {
let batch = RecordBatch::new_empty(Arc::clone(table_schema));
expr.evaluate(&batch).is_ok()
}
}