use std::collections::HashSet;
use std::sync::Arc;
use super::metrics::ParquetFileMetrics;
use crate::ParquetAccessPlan;
use arrow::array::BooleanArray;
use arrow::{
array::ArrayRef,
datatypes::{Schema, SchemaRef},
};
use datafusion_common::ScalarValue;
use datafusion_common::pruning::PruningStatistics;
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
use datafusion_pruning::PruningPredicate;
use log::{debug, trace};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::page_index::offset_index::PageLocation;
use parquet::schema::types::SchemaDescriptor;
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
file::metadata::{ParquetMetaData, RowGroupMetaData},
};
#[derive(Debug)]
pub struct PagePruningAccessPlanFilter {
predicates: Vec<PruningPredicate>,
}
impl PagePruningAccessPlanFilter {
#[expect(clippy::needless_pass_by_value)]
pub fn new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Self {
let predicates = split_conjunction(expr)
.into_iter()
.filter_map(|predicate| {
let pp = match PruningPredicate::try_new(
Arc::clone(predicate),
Arc::clone(&schema),
) {
Ok(pp) => pp,
Err(e) => {
debug!("Ignoring error creating page pruning predicate: {e}");
return None;
}
};
if pp.always_true() {
debug!("Ignoring always true page pruning predicate: {predicate}");
return None;
}
if pp.required_columns().single_column().is_none() {
debug!("Ignoring multi-column page pruning predicate: {predicate}");
return None;
}
Some(pp)
})
.collect::<Vec<_>>();
Self { predicates }
}
pub fn prune_plan_with_page_index(
&self,
mut access_plan: ParquetAccessPlan,
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
parquet_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
) -> ParquetAccessPlan {
let _timer_guard = file_metrics.page_index_eval_time.timer();
if self.predicates.is_empty() {
return access_plan;
}
let page_index_predicates = &self.predicates;
let groups = parquet_metadata.row_groups();
if groups.is_empty() {
return access_plan;
}
if parquet_metadata.offset_index().is_none()
|| parquet_metadata.column_index().is_none()
{
debug!(
"Can not prune pages due to lack of indexes. Have offset: {}, column index: {}",
parquet_metadata.offset_index().is_some(),
parquet_metadata.column_index().is_some()
);
return access_plan;
};
let mut total_skip = 0;
let mut total_select = 0;
let mut total_pages_skip = 0;
let mut total_pages_select = 0;
let row_group_indexes = access_plan.row_group_indexes();
for row_group_index in row_group_indexes {
let mut overall_selection = None;
for predicate in page_index_predicates {
let column = predicate
.required_columns()
.single_column()
.expect("Page pruning requires single column predicates");
let converter = StatisticsConverter::try_new(
column.name(),
arrow_schema,
parquet_schema,
);
let converter = match converter {
Ok(converter) => converter,
Err(e) => {
debug!(
"Could not create statistics converter for column {}: {e}",
column.name()
);
continue;
}
};
let selection = prune_pages_in_one_row_group(
row_group_index,
predicate,
converter,
parquet_metadata,
file_metrics,
);
let Some((selection, total_pages, matched_pages)) = selection else {
trace!("No pages pruned in prune_pages_in_one_row_group");
continue;
};
total_pages_select += matched_pages;
total_pages_skip += total_pages - matched_pages;
debug!(
"Use filter and page index to create RowSelection {:?} from predicate: {:?}",
&selection,
predicate.predicate_expr(),
);
overall_selection = update_selection(overall_selection, selection);
let selects_any = overall_selection
.as_ref()
.map(|selection| selection.selects_any())
.unwrap_or(true);
if !selects_any {
break;
}
}
if let Some(overall_selection) = overall_selection {
let rows_selected = overall_selection.row_count();
if rows_selected > 0 {
let rows_skipped = overall_selection.skipped_row_count();
trace!(
"Overall selection from predicate skipped {rows_skipped}, selected {rows_selected}: {overall_selection:?}"
);
total_skip += rows_skipped;
total_select += rows_selected;
access_plan.scan_selection(row_group_index, overall_selection)
} else {
let rows_skipped = groups[row_group_index].num_rows() as usize;
access_plan.skip(row_group_index);
total_skip += rows_skipped;
trace!(
"Overall selection from predicate is empty, \
skipping all {rows_skipped} rows in row group {row_group_index}"
);
}
}
}
file_metrics.page_index_rows_pruned.add_pruned(total_skip);
file_metrics
.page_index_rows_pruned
.add_matched(total_select);
file_metrics
.page_index_pages_pruned
.add_pruned(total_pages_skip);
file_metrics
.page_index_pages_pruned
.add_matched(total_pages_select);
access_plan
}
pub fn filter_number(&self) -> usize {
self.predicates.len()
}
}
fn update_selection(
current_selection: Option<RowSelection>,
row_selection: RowSelection,
) -> Option<RowSelection> {
match current_selection {
None => Some(row_selection),
Some(current_selection) => Some(current_selection.intersection(&row_selection)),
}
}
fn prune_pages_in_one_row_group(
row_group_index: usize,
pruning_predicate: &PruningPredicate,
converter: StatisticsConverter<'_>,
parquet_metadata: &ParquetMetaData,
metrics: &ParquetFileMetrics,
) -> Option<(RowSelection, usize, usize)> {
let pruning_stats =
PagesPruningStatistics::try_new(row_group_index, converter, parquet_metadata)?;
let values = match pruning_predicate.prune(&pruning_stats) {
Ok(values) => values,
Err(e) => {
debug!("Error evaluating page index predicate values {e}");
metrics.predicate_evaluation_errors.add(1);
return None;
}
};
let Some(page_row_counts) = pruning_stats.page_row_counts() else {
debug!(
"Can not determine page row counts for row group {row_group_index}, skipping"
);
metrics.predicate_evaluation_errors.add(1);
return None;
};
let mut vec = Vec::with_capacity(values.len());
assert_eq!(page_row_counts.len(), values.len());
let mut sum_row = *page_row_counts.first().unwrap();
let mut selected = *values.first().unwrap();
trace!("Pruned to {values:?} using {pruning_stats:?}");
for (i, &f) in values.iter().enumerate().skip(1) {
if f == selected {
sum_row += *page_row_counts.get(i).unwrap();
} else {
let selector = if selected {
RowSelector::select(sum_row)
} else {
RowSelector::skip(sum_row)
};
vec.push(selector);
sum_row = *page_row_counts.get(i).unwrap();
selected = f;
}
}
let selector = if selected {
RowSelector::select(sum_row)
} else {
RowSelector::skip(sum_row)
};
vec.push(selector);
let total_pages = values.len();
let matched_pages = values.iter().filter(|v| **v).count();
Some((RowSelection::from(vec), total_pages, matched_pages))
}
#[derive(Debug)]
struct PagesPruningStatistics<'a> {
row_group_index: usize,
row_group_metadatas: &'a [RowGroupMetaData],
converter: StatisticsConverter<'a>,
column_index: &'a ParquetColumnIndex,
offset_index: &'a ParquetOffsetIndex,
page_offsets: &'a Vec<PageLocation>,
}
impl<'a> PagesPruningStatistics<'a> {
fn try_new(
row_group_index: usize,
converter: StatisticsConverter<'a>,
parquet_metadata: &'a ParquetMetaData,
) -> Option<Self> {
let Some(parquet_column_index) = converter.parquet_column_index() else {
trace!(
"Column {:?} not in parquet file, skipping",
converter.arrow_field()
);
return None;
};
let column_index = parquet_metadata.column_index()?;
let offset_index = parquet_metadata.offset_index()?;
let row_group_metadatas = parquet_metadata.row_groups();
let Some(row_group_page_offsets) = offset_index.get(row_group_index) else {
trace!("No page offsets for row group {row_group_index}, skipping");
return None;
};
let Some(offset_index_metadata) =
row_group_page_offsets.get(parquet_column_index)
else {
trace!(
"No page offsets for column {:?} in row group {row_group_index}, skipping",
converter.arrow_field()
);
return None;
};
let page_offsets = offset_index_metadata.page_locations();
Some(Self {
row_group_index,
row_group_metadatas,
converter,
column_index,
offset_index,
page_offsets,
})
}
fn page_row_counts(&self) -> Option<Vec<usize>> {
let row_group_metadata = self
.row_group_metadatas
.get(self.row_group_index)
.unwrap();
let num_rows_in_row_group = row_group_metadata.num_rows() as usize;
let page_offsets = self.page_offsets;
let mut vec = Vec::with_capacity(page_offsets.len());
page_offsets.windows(2).for_each(|x| {
let start = x[0].first_row_index as usize;
let end = x[1].first_row_index as usize;
vec.push(end - start);
});
vec.push(num_rows_in_row_group - page_offsets.last()?.first_row_index as usize);
Some(vec)
}
}
impl PruningStatistics for PagesPruningStatistics<'_> {
fn min_values(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
match self.converter.data_page_mins(
self.column_index,
self.offset_index,
[&self.row_group_index],
) {
Ok(min_values) => Some(min_values),
Err(e) => {
debug!("Error evaluating data page min values {e}");
None
}
}
}
fn max_values(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
match self.converter.data_page_maxes(
self.column_index,
self.offset_index,
[&self.row_group_index],
) {
Ok(min_values) => Some(min_values),
Err(e) => {
debug!("Error evaluating data page max values {e}");
None
}
}
}
fn num_containers(&self) -> usize {
self.page_offsets.len()
}
fn null_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
match self.converter.data_page_null_counts(
self.column_index,
self.offset_index,
[&self.row_group_index],
) {
Ok(null_counts) => Some(Arc::new(null_counts)),
Err(e) => {
debug!("Error evaluating data page null counts {e}");
None
}
}
}
fn row_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
match self.converter.data_page_row_counts(
self.offset_index,
self.row_group_metadatas,
[&self.row_group_index],
) {
Ok(row_counts) => row_counts.map(|a| Arc::new(a) as ArrayRef),
Err(e) => {
debug!("Error evaluating data page row counts {e}");
None
}
}
}
fn contained(
&self,
_column: &datafusion_common::Column,
_values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
None
}
}