use arrow::{
array::ArrayRef,
datatypes::{DataType, Schema},
};
use datafusion_common::Column;
use datafusion_common::ScalarValue;
use log::debug;
use parquet::file::{
metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
};
use crate::physical_plan::file_format::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::{
datasource::listing::FileRange,
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
};
use super::ParquetFileMetrics;
pub(crate) fn prune_row_groups(
groups: &[RowGroupMetaData],
range: Option<FileRange>,
predicate: Option<&PruningPredicate>,
metrics: &ParquetFileMetrics,
) -> Vec<usize> {
let mut filtered = Vec::with_capacity(groups.len());
for (idx, metadata) in groups.iter().enumerate() {
if let Some(range) = &range {
let offset = metadata.column(0).file_offset();
if offset < range.start || offset >= range.end {
continue;
}
}
if let Some(predicate) = predicate {
let pruning_stats = RowGroupPruningStatistics {
row_group_metadata: metadata,
parquet_schema: predicate.schema().as_ref(),
};
match predicate.prune(&pruning_stats) {
Ok(values) => {
if !values[0] {
metrics.row_groups_pruned.add(1);
continue;
}
}
Err(e) => {
debug!("Error evaluating row group predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
}
}
}
filtered.push(idx)
}
filtered
}
struct RowGroupPruningStatistics<'a> {
row_group_metadata: &'a RowGroupMetaData,
parquet_schema: &'a Schema,
}
macro_rules! get_statistic {
($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
if !$column_statistics.has_min_max_set() {
return None;
}
match $column_statistics {
ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))),
ParquetStatistics::Int32(s) => {
match $target_arrow_type {
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(*s.$func() as i128),
precision,
scale,
))
}
_ => Some(ScalarValue::Int32(Some(*s.$func()))),
}
}
ParquetStatistics::Int64(s) => {
match $target_arrow_type {
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(*s.$func() as i128),
precision,
scale,
))
}
_ => Some(ScalarValue::Int64(Some(*s.$func()))),
}
}
ParquetStatistics::Int96(_) => None,
ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))),
ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))),
ParquetStatistics::ByteArray(s) => {
match $target_arrow_type {
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(from_bytes_to_i128(s.$bytes_func())),
precision,
scale,
))
}
_ => {
let s = std::str::from_utf8(s.$bytes_func())
.map(|s| s.to_string())
.ok();
Some(ScalarValue::Utf8(s))
}
}
}
ParquetStatistics::FixedLenByteArray(s) => {
match $target_arrow_type {
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(from_bytes_to_i128(s.$bytes_func())),
precision,
scale,
))
}
_ => None,
}
}
}
}};
}
macro_rules! get_min_max_values {
($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
let (_column_index, field) =
if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) {
(v, f)
} else {
return None;
};
let data_type = field.data_type();
let null_scalar: ScalarValue = data_type.try_into().ok()?;
$self.row_group_metadata
.columns()
.iter()
.find(|c| c.column_descr().name() == &$column.name)
.and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None})
.map(|(stats, column_descr)|
{
let target_data_type = parquet_to_arrow_decimal_type(column_descr);
get_statistic!(stats, $func, $bytes_func, target_data_type)
})
.flatten()
.or_else(|| Some(null_scalar.clone()))
.map(|s| s.to_array())
}}
}
macro_rules! get_null_count_values {
($self:expr, $column:expr) => {{
let value = ScalarValue::UInt64(
if let Some(col) = $self
.row_group_metadata
.columns()
.iter()
.find(|c| c.column_descr().name() == &$column.name)
{
col.statistics().map(|s| s.null_count())
} else {
Some($self.row_group_metadata.num_rows() as u64)
},
);
Some(value.to_array())
}};
}
impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, min, min_bytes)
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, max, max_bytes)
}
fn num_containers(&self) -> usize {
1
}
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
get_null_count_values!(self, column)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use arrow::datatypes::DataType::Decimal128;
use arrow::datatypes::Schema;
use arrow::datatypes::{DataType, Field};
use datafusion_expr::{cast, col, lit};
use parquet::basic::LogicalType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::{
basic::Type as PhysicalType,
file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
schema::types::SchemaDescPtr,
};
use std::sync::Arc;
#[test]
fn row_group_pruning_predicate_simple_expr() {
use datafusion_expr::{col, lit};
let expr = col("c1").gt(lit(15));
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let schema_descr = get_test_schema_descr(vec![(
"c1",
PhysicalType::INT32,
None,
None,
None,
None,
)]);
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![1]
);
}
#[test]
fn row_group_pruning_predicate_missing_stats() {
use datafusion_expr::{col, lit};
let expr = col("c1").gt(lit(15));
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let schema_descr = get_test_schema_descr(vec![(
"c1",
PhysicalType::INT32,
None,
None,
None,
None,
)]);
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(None, None, None, 0, false)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![0, 1]
);
}
#[test]
fn row_group_pruning_predicate_partial_expr() {
use datafusion_expr::{col, lit};
let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2)));
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]));
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let schema_descr = get_test_schema_descr(vec![
("c1", PhysicalType::INT32, None, None, None, None),
("c2", PhysicalType::INT32, None, None, None, None),
]);
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
],
);
let metrics = parquet_file_metrics();
let groups = &[rgm1, rgm2];
assert_eq!(
prune_row_groups(groups, None, Some(&pruning_predicate), &metrics),
vec![1]
);
let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
assert_eq!(
prune_row_groups(groups, None, Some(&pruning_predicate), &metrics),
vec![0, 1]
);
}
fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
let schema_descr = get_test_schema_descr(vec![
("c1", PhysicalType::INT32, None, None, None, None),
("c2", PhysicalType::BOOLEAN, None, None, None, None),
]);
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
ParquetStatistics::boolean(Some(false), Some(true), None, 0, false),
],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
ParquetStatistics::boolean(Some(false), Some(true), None, 1, false),
],
);
vec![rgm1, rgm2]
}
#[test]
fn row_group_pruning_predicate_null_expr() {
use datafusion_expr::{col, lit};
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics),
vec![1]
);
}
#[test]
fn row_group_pruning_predicate_eq_null_expr() {
use datafusion_expr::{col, lit};
let expr = col("c1")
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics),
vec![1]
);
}
#[test]
fn row_group_pruning_predicate_decimal_type() {
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
let schema_descr = get_test_schema_descr(vec![(
"c1",
PhysicalType::INT32,
Some(LogicalType::Decimal {
scale: 2,
precision: 9,
}),
Some(9),
Some(2),
None,
)]);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(
Some(100),
Some(600),
None,
0,
false,
)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
);
let rgm3 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(Some(100), None, None, 0, false)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
&metrics
),
vec![0, 2]
);
let expr = cast(col("c1"), DataType::Decimal128(11, 2)).gt(cast(
lit(ScalarValue::Decimal128(Some(500), 5, 2)),
Decimal128(11, 2),
));
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
let schema_descr = get_test_schema_descr(vec![(
"c1",
PhysicalType::INT32,
Some(LogicalType::Decimal {
scale: 0,
precision: 9,
}),
Some(9),
Some(0),
None,
)]);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(
Some(100),
Some(600),
None,
0,
false,
)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
);
let rgm3 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(Some(0), Some(2), None, 0, false)],
);
let rgm4 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(None, Some(2), None, 0, false)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(
&[rgm1, rgm2, rgm3, rgm4],
None,
Some(&pruning_predicate),
&metrics
),
vec![0, 1, 3]
);
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema_descr = get_test_schema_descr(vec![(
"c1",
PhysicalType::INT64,
Some(LogicalType::Decimal {
scale: 2,
precision: 18,
}),
Some(18),
Some(2),
None,
)]);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int32(
Some(600),
Some(800),
None,
0,
false,
)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int64(Some(10), Some(20), None, 0, false)],
);
let rgm3 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::int64(None, None, None, 0, false)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
&metrics
),
vec![1, 2]
);
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let schema_descr = get_test_schema_descr(vec![(
"c1",
PhysicalType::FIXED_LEN_BYTE_ARRAY,
Some(LogicalType::Decimal {
scale: 2,
precision: 18,
}),
Some(18),
Some(2),
Some(16),
)]);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::fixed_len_byte_array(
Some(FixedLenByteArray::from(ByteArray::from(
500i128.to_be_bytes().to_vec(),
))),
Some(FixedLenByteArray::from(ByteArray::from(
8000i128.to_be_bytes().to_vec(),
))),
None,
0,
false,
)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::fixed_len_byte_array(
Some(FixedLenByteArray::from(ByteArray::from(
500i128.to_be_bytes().to_vec(),
))),
Some(FixedLenByteArray::from(ByteArray::from(
20000i128.to_be_bytes().to_vec(),
))),
None,
0,
false,
)],
);
let rgm3 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::fixed_len_byte_array(
None, None, None, 0, false,
)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
&metrics
),
vec![1, 2]
);
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let schema_descr = get_test_schema_descr(vec![(
"c1",
PhysicalType::BYTE_ARRAY,
Some(LogicalType::Decimal {
scale: 2,
precision: 18,
}),
Some(18),
Some(2),
Some(16),
)]);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::byte_array(
Some(ByteArray::from(500i128.to_be_bytes().to_vec())),
Some(ByteArray::from(8000i128.to_be_bytes().to_vec())),
None,
0,
false,
)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::byte_array(
Some(ByteArray::from(500i128.to_be_bytes().to_vec())),
Some(ByteArray::from(20000i128.to_be_bytes().to_vec())),
None,
0,
false,
)],
);
let rgm3 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::byte_array(None, None, None, 0, false)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
&metrics
),
vec![1, 2]
);
}
fn get_row_group_meta_data(
schema_descr: &SchemaDescPtr,
column_statistics: Vec<ParquetStatistics>,
) -> RowGroupMetaData {
use parquet::file::metadata::ColumnChunkMetaData;
let mut columns = vec![];
for (i, s) in column_statistics.iter().enumerate() {
let column = ColumnChunkMetaData::builder(schema_descr.column(i))
.set_statistics(s.clone())
.build()
.unwrap();
columns.push(column);
}
RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(1000)
.set_total_byte_size(2000)
.set_column_metadata(columns)
.build()
.unwrap()
}
#[allow(clippy::type_complexity)]
fn get_test_schema_descr(
fields: Vec<(
&str,
PhysicalType,
Option<LogicalType>,
Option<i32>, // precision
Option<i32>, // scale
Option<i32>, // length of bytes
)>,
) -> SchemaDescPtr {
use parquet::schema::types::{SchemaDescriptor, Type as SchemaType};
let mut schema_fields = fields
.iter()
.map(|(n, t, logical, precision, scale, length)| {
let mut builder = SchemaType::primitive_type_builder(n, *t);
match logical {
None => {}
Some(logical_type) => {
builder = builder.with_logical_type(Some(logical_type.clone()));
}
};
match precision {
None => {}
Some(v) => {
builder = builder.with_precision(*v);
}
};
match scale {
None => {}
Some(v) => {
builder = builder.with_scale(*v);
}
}
match length {
None => {}
Some(v) => {
builder = builder.with_length(*v);
}
}
Arc::new(builder.build().unwrap())
})
.collect::<Vec<_>>();
let schema = SchemaType::group_type_builder("schema")
.with_fields(&mut schema_fields)
.build()
.unwrap();
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}
fn parquet_file_metrics() -> ParquetFileMetrics {
let metrics = Arc::new(ExecutionPlanMetricsSet::new());
ParquetFileMetrics::new(0, "file.parquet", &metrics)
}
}