use crate::{
ObjectStoreFetch, apply_file_schema_type_coercions, coerce_int96_to_resolution,
};
use arrow::array::{Array, ArrayRef, BooleanArray};
use arrow::compute::and;
use arrow::compute::kernels::cmp::eq;
use arrow::compute::sum;
use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit};
use datafusion_common::encryption::FileDecryptionProperties;
use datafusion_common::stats::Precision;
use datafusion_common::{
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
};
use datafusion_execution::cache::cache_manager::{
CachedFileMetadataEntry, FileMetadata, FileMetadataCache,
};
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::Accumulator;
use log::debug;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::{parquet_column, parquet_to_arrow_schema};
use parquet::file::metadata::{
PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
SortingColumn,
};
use parquet::schema::types::SchemaDescriptor;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug)]
pub struct DFParquetMetadata<'a> {
store: &'a dyn ObjectStore,
object_meta: &'a ObjectMeta,
metadata_size_hint: Option<usize>,
decryption_properties: Option<Arc<FileDecryptionProperties>>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
pub coerce_int96: Option<TimeUnit>,
}
impl<'a> DFParquetMetadata<'a> {
pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self {
Self {
store,
object_meta,
metadata_size_hint: None,
decryption_properties: None,
file_metadata_cache: None,
coerce_int96: None,
}
}
pub fn with_metadata_size_hint(mut self, metadata_size_hint: Option<usize>) -> Self {
self.metadata_size_hint = metadata_size_hint;
self
}
pub fn with_decryption_properties(
mut self,
decryption_properties: Option<Arc<FileDecryptionProperties>>,
) -> Self {
self.decryption_properties = decryption_properties;
self
}
pub fn with_file_metadata_cache(
mut self,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
) -> Self {
self.file_metadata_cache = file_metadata_cache;
self
}
pub fn with_coerce_int96(mut self, time_unit: Option<TimeUnit>) -> Self {
self.coerce_int96 = time_unit;
self
}
pub async fn fetch_metadata(&self) -> Result<Arc<ParquetMetaData>> {
let Self {
store,
object_meta,
metadata_size_hint,
decryption_properties,
file_metadata_cache,
coerce_int96: _,
} = self;
let fetch = ObjectStoreFetch::new(*store, object_meta);
let cache_metadata =
!cfg!(feature = "parquet_encryption") || decryption_properties.is_none();
if cache_metadata
&& let Some(file_metadata_cache) = file_metadata_cache.as_ref()
&& let Some(cached) = file_metadata_cache.get(&object_meta.location)
&& cached.is_valid_for(object_meta)
&& let Some(cached_parquet) = cached
.file_metadata
.as_any()
.downcast_ref::<CachedParquetMetaData>()
{
return Ok(Arc::clone(cached_parquet.parquet_metadata()));
}
let mut reader =
ParquetMetaDataReader::new().with_prefetch_hint(*metadata_size_hint);
#[cfg(feature = "parquet_encryption")]
if let Some(decryption_properties) = decryption_properties {
reader = reader
.with_decryption_properties(Some(Arc::clone(decryption_properties)));
}
if cache_metadata && file_metadata_cache.is_some() {
reader = reader.with_page_index_policy(PageIndexPolicy::Optional);
}
let metadata = Arc::new(
reader
.load_and_finish(fetch, object_meta.size)
.await
.map_err(DataFusionError::from)?,
);
if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache {
file_metadata_cache.put(
&object_meta.location,
CachedFileMetadataEntry::new(
(*object_meta).clone(),
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
),
);
}
Ok(metadata)
}
pub async fn fetch_schema(&self) -> Result<Schema> {
let metadata = self.fetch_metadata().await?;
let file_metadata = metadata.file_metadata();
let schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;
let schema = self
.coerce_int96
.as_ref()
.and_then(|time_unit| {
coerce_int96_to_resolution(
file_metadata.schema_descr(),
&schema,
time_unit,
)
})
.unwrap_or(schema);
Ok(schema)
}
pub(crate) async fn fetch_schema_with_location(&self) -> Result<(Path, Schema)> {
let loc_path = self.object_meta.location.clone();
let schema = self.fetch_schema().await?;
Ok((loc_path, schema))
}
pub async fn fetch_statistics(&self, table_schema: &SchemaRef) -> Result<Statistics> {
let metadata = self.fetch_metadata().await?;
Self::statistics_from_parquet_metadata(&metadata, table_schema)
}
pub fn statistics_from_parquet_metadata(
metadata: &ParquetMetaData,
logical_file_schema: &SchemaRef,
) -> Result<Statistics> {
let row_groups_metadata = metadata.row_groups();
let mut statistics = Statistics::default();
let mut has_statistics = false;
let mut num_rows = 0_usize;
for row_group_meta in row_groups_metadata {
num_rows += row_group_meta.num_rows() as usize;
if !has_statistics {
has_statistics = row_group_meta
.columns()
.iter()
.any(|column| column.statistics().is_some());
}
}
statistics.num_rows = Precision::Exact(num_rows);
let file_metadata = metadata.file_metadata();
let mut physical_file_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;
if let Some(merged) =
apply_file_schema_type_coercions(logical_file_schema, &physical_file_schema)
{
physical_file_schema = merged;
}
statistics.column_statistics =
if has_statistics {
let (mut max_accs, mut min_accs) =
create_max_min_accs(logical_file_schema);
let mut null_counts_array =
vec![Precision::Absent; logical_file_schema.fields().len()];
let mut column_byte_sizes =
vec![Precision::Absent; logical_file_schema.fields().len()];
let mut is_max_value_exact =
vec![Some(true); logical_file_schema.fields().len()];
let mut is_min_value_exact =
vec![Some(true); logical_file_schema.fields().len()];
logical_file_schema.fields().iter().enumerate().for_each(
|(idx, field)| match StatisticsConverter::try_new(
field.name(),
&physical_file_schema,
file_metadata.schema_descr(),
) {
Ok(stats_converter) => {
let mut accumulators = StatisticsAccumulators {
min_accs: &mut min_accs,
max_accs: &mut max_accs,
null_counts_array: &mut null_counts_array,
is_min_value_exact: &mut is_min_value_exact,
is_max_value_exact: &mut is_max_value_exact,
column_byte_sizes: &mut column_byte_sizes,
};
summarize_min_max_null_counts(
file_metadata.schema_descr(),
logical_file_schema,
&physical_file_schema,
&mut accumulators,
idx,
&stats_converter,
row_groups_metadata,
)
.ok();
}
Err(e) => {
debug!("Failed to create statistics converter: {e}");
null_counts_array[idx] = Precision::Exact(num_rows);
}
},
);
get_col_stats(
logical_file_schema,
&null_counts_array,
&mut max_accs,
&mut min_accs,
&mut is_max_value_exact,
&mut is_min_value_exact,
&column_byte_sizes,
)
} else {
logical_file_schema
.fields()
.iter()
.enumerate()
.map(|(logical_file_schema_index, field)| {
let arrow_field =
logical_file_schema.field(logical_file_schema_index);
let parquet_idx = parquet_column(
file_metadata.schema_descr(),
&physical_file_schema,
arrow_field.name(),
)
.map(|(idx, _)| idx);
let byte_size = compute_arrow_column_size(
field.data_type(),
row_groups_metadata,
parquet_idx,
num_rows,
);
ColumnStatistics::new_unknown().with_byte_size(byte_size)
})
.collect()
};
#[cfg(debug_assertions)]
{
assert_eq!(
statistics.column_statistics.len(),
logical_file_schema.fields().len(),
"Column statistics length does not match table schema fields length"
);
}
Ok(statistics)
}
}
fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
if let DataType::Dictionary(_, value_type) = input_type {
value_type.as_ref()
} else {
input_type
}
}
fn create_max_min_accs(
schema: &Schema,
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
let max_values: Vec<Option<MaxAccumulator>> = schema
.fields()
.iter()
.map(|field| {
MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
})
.collect();
let min_values: Vec<Option<MinAccumulator>> = schema
.fields()
.iter()
.map(|field| {
MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
})
.collect();
(max_values, min_values)
}
fn get_col_stats(
schema: &Schema,
null_counts: &[Precision<usize>],
max_values: &mut [Option<MaxAccumulator>],
min_values: &mut [Option<MinAccumulator>],
is_max_value_exact: &mut [Option<bool>],
is_min_value_exact: &mut [Option<bool>],
column_byte_sizes: &[Precision<usize>],
) -> Vec<ColumnStatistics> {
(0..schema.fields().len())
.map(|i| {
let max_value = match (
max_values.get_mut(i).unwrap(),
is_max_value_exact.get(i).unwrap(),
) {
(Some(max_value), Some(true)) => {
max_value.evaluate().ok().map(Precision::Exact)
}
(Some(max_value), Some(false)) | (Some(max_value), None) => {
max_value.evaluate().ok().map(Precision::Inexact)
}
(None, _) => None,
};
let min_value = match (
min_values.get_mut(i).unwrap(),
is_min_value_exact.get(i).unwrap(),
) {
(Some(min_value), Some(true)) => {
min_value.evaluate().ok().map(Precision::Exact)
}
(Some(min_value), Some(false)) | (Some(min_value), None) => {
min_value.evaluate().ok().map(Precision::Inexact)
}
(None, _) => None,
};
ColumnStatistics {
null_count: null_counts[i],
max_value: max_value.unwrap_or(Precision::Absent),
min_value: min_value.unwrap_or(Precision::Absent),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: column_byte_sizes[i],
}
})
.collect()
}
struct StatisticsAccumulators<'a> {
min_accs: &'a mut [Option<MinAccumulator>],
max_accs: &'a mut [Option<MaxAccumulator>],
null_counts_array: &'a mut [Precision<usize>],
is_min_value_exact: &'a mut [Option<bool>],
is_max_value_exact: &'a mut [Option<bool>],
column_byte_sizes: &'a mut [Precision<usize>],
}
fn summarize_min_max_null_counts(
parquet_schema: &SchemaDescriptor,
logical_file_schema: &Schema,
physical_file_schema: &Schema,
accumulators: &mut StatisticsAccumulators,
logical_schema_index: usize,
stats_converter: &StatisticsConverter,
row_groups_metadata: &[RowGroupMetaData],
) -> Result<()> {
let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;
let is_max_value_exact_stat =
stats_converter.row_group_is_max_value_exact(row_groups_metadata)?;
let is_min_value_exact_stat =
stats_converter.row_group_is_min_value_exact(row_groups_metadata)?;
if let Some(max_acc) = &mut accumulators.max_accs[logical_schema_index] {
max_acc.update_batch(&[Arc::clone(&max_values)])?;
let exactness = &is_max_value_exact_stat;
if !exactness.is_empty()
&& exactness.null_count() == 0
&& exactness.true_count() == exactness.len()
{
accumulators.is_max_value_exact[logical_schema_index] = Some(true);
} else if exactness.true_count() == 0 {
accumulators.is_max_value_exact[logical_schema_index] = Some(false);
} else {
let val = max_acc.evaluate()?;
accumulators.is_max_value_exact[logical_schema_index] =
has_any_exact_match(&val, &max_values, exactness);
}
}
if let Some(min_acc) = &mut accumulators.min_accs[logical_schema_index] {
min_acc.update_batch(&[Arc::clone(&min_values)])?;
let exactness = &is_min_value_exact_stat;
if !exactness.is_empty()
&& exactness.null_count() == 0
&& exactness.true_count() == exactness.len()
{
accumulators.is_min_value_exact[logical_schema_index] = Some(true);
} else if exactness.true_count() == 0 {
accumulators.is_min_value_exact[logical_schema_index] = Some(false);
} else {
let val = min_acc.evaluate()?;
accumulators.is_min_value_exact[logical_schema_index] =
has_any_exact_match(&val, &min_values, exactness);
}
}
accumulators.null_counts_array[logical_schema_index] = match sum(&null_counts) {
Some(null_count) => Precision::Exact(null_count as usize),
None => match null_counts.len() {
0 => Precision::Exact(0),
_ => Precision::Absent,
},
};
let parquet_index = parquet_column(
parquet_schema,
physical_file_schema,
logical_file_schema.field(logical_schema_index).name(),
)
.map(|(idx, _)| idx);
let arrow_field = logical_file_schema.field(logical_schema_index);
accumulators.column_byte_sizes[logical_schema_index] = compute_arrow_column_size(
arrow_field.data_type(),
row_groups_metadata,
parquet_index,
row_groups_metadata
.iter()
.map(|rg| rg.num_rows() as usize)
.sum(),
);
Ok(())
}
fn compute_arrow_column_size(
data_type: &DataType,
row_groups_metadata: &[RowGroupMetaData],
parquet_idx: Option<usize>,
num_rows: usize,
) -> Precision<usize> {
if let Some(byte_width) = data_type.primitive_width() {
return Precision::Exact(byte_width * num_rows);
}
if let Some(parquet_idx) = parquet_idx {
let uncompressed_bytes: i64 = row_groups_metadata
.iter()
.filter_map(|rg| rg.columns().get(parquet_idx))
.map(|col| col.uncompressed_size())
.sum();
return Precision::Inexact(uncompressed_bytes as usize);
}
Precision::Absent
}
fn has_any_exact_match(
value: &ScalarValue,
array: &ArrayRef,
exactness: &BooleanArray,
) -> Option<bool> {
if value.is_null() {
return Some(false);
}
if array.len() == 1 {
return Some(exactness.is_valid(0) && exactness.value(0));
}
let scalar_array = value.to_scalar().ok()?;
let eq_mask = eq(&scalar_array, &array).ok()?;
let combined_mask = and(&eq_mask, exactness).ok()?;
Some(combined_mask.true_count() > 0)
}
pub struct CachedParquetMetaData(Arc<ParquetMetaData>);
impl CachedParquetMetaData {
pub fn new(metadata: Arc<ParquetMetaData>) -> Self {
Self(metadata)
}
pub fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
&self.0
}
}
impl FileMetadata for CachedParquetMetaData {
fn as_any(&self) -> &dyn Any {
self
}
fn memory_size(&self) -> usize {
self.0.memory_size()
}
fn extra_info(&self) -> HashMap<String, String> {
let page_index =
self.0.column_index().is_some() && self.0.offset_index().is_some();
HashMap::from([("page_index".to_owned(), page_index.to_string())])
}
}
pub(crate) fn sort_expr_to_sorting_column(
sort_expr: &PhysicalSortExpr,
) -> Result<SortingColumn> {
let column = sort_expr
.expr
.as_any()
.downcast_ref::<Column>()
.ok_or_else(|| {
DataFusionError::Plan(format!(
"Parquet sorting_columns only supports simple column references, \
but got expression: {}",
sort_expr.expr
))
})?;
let column_idx: i32 = column.index().try_into().map_err(|_| {
DataFusionError::Plan(format!(
"Column index {} is too large to be represented as i32",
column.index()
))
})?;
Ok(SortingColumn {
column_idx,
descending: sort_expr.options.descending,
nulls_first: sort_expr.options.nulls_first,
})
}
pub(crate) fn lex_ordering_to_sorting_columns(
ordering: &LexOrdering,
) -> Result<Vec<SortingColumn>> {
ordering.iter().map(sort_expr_to_sorting_column).collect()
}
pub fn ordering_from_parquet_metadata(
metadata: &ParquetMetaData,
schema: &SchemaRef,
) -> Result<Option<LexOrdering>> {
let sorting_columns = metadata
.row_groups()
.first()
.and_then(|rg| rg.sorting_columns())
.filter(|cols| !cols.is_empty());
let Some(sorting_columns) = sorting_columns else {
return Ok(None);
};
let parquet_schema = metadata.file_metadata().schema_descr();
let sort_exprs =
sorting_columns_to_physical_exprs(sorting_columns, parquet_schema, schema);
if sort_exprs.is_empty() {
return Ok(None);
}
Ok(LexOrdering::new(sort_exprs))
}
fn sorting_columns_to_physical_exprs(
sorting_columns: &[SortingColumn],
parquet_schema: &SchemaDescriptor,
arrow_schema: &SchemaRef,
) -> Vec<PhysicalSortExpr> {
use arrow::compute::SortOptions;
sorting_columns
.iter()
.filter_map(|sc| {
let parquet_column = parquet_schema.column(sc.column_idx as usize);
let name = parquet_column.name();
let (index, _) = arrow_schema.column_with_name(name)?;
let expr = Arc::new(Column::new(name, index));
let options = SortOptions {
descending: sc.descending,
nulls_first: sc.nulls_first,
};
Some(PhysicalSortExpr::new(expr, options))
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{ArrayRef, BooleanArray, Int32Array};
use datafusion_common::ScalarValue;
use std::sync::Arc;
#[test]
fn test_has_any_exact_match() {
{
let computed_min = ScalarValue::Int32(Some(0));
let row_group_mins =
Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
let exactness =
BooleanArray::from(vec![true, false, false, false, false, false]);
let result = has_any_exact_match(&computed_min, &row_group_mins, &exactness);
assert_eq!(result, Some(true));
}
{
let computed_min = ScalarValue::Int32(Some(0));
let row_group_mins =
Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
let exactness =
BooleanArray::from(vec![false, false, false, false, false, false]);
let result = has_any_exact_match(&computed_min, &row_group_mins, &exactness);
assert_eq!(result, Some(false));
}
{
let computed_max = ScalarValue::Int32(Some(5));
let row_group_maxes =
Arc::new(Int32Array::from(vec![1, 5, 3, 5, 2, 5])) as ArrayRef;
let exactness =
BooleanArray::from(vec![false, true, true, true, false, true]);
let result = has_any_exact_match(&computed_max, &row_group_maxes, &exactness);
assert_eq!(result, Some(true));
}
{
let computed_max = ScalarValue::Int32(None);
let row_group_maxes =
Arc::new(Int32Array::from(vec![None, None, None, None])) as ArrayRef;
let exactness = BooleanArray::from(vec![None, Some(true), None, Some(false)]);
let result = has_any_exact_match(&computed_max, &row_group_maxes, &exactness);
assert_eq!(result, Some(false));
}
}
}