use crate::page_filter::PagePruningAccessPlanFilter;
use crate::row_group_filter::RowGroupAccessPlanFilter;
use crate::{
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
};
use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::DataType;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr_adapter::replace_columns_with_literals;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use arrow::datatypes::{SchemaRef, TimeUnit};
use datafusion_common::encryption::FileDecryptionProperties;
use datafusion_common::stats::Precision;
use datafusion_common::{
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err,
};
use datafusion_datasource::{PartitionedFile, TableSchema};
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::{
PhysicalExpr, is_dynamic_physical_expr,
};
use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, PruningMetrics,
};
use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate};
use crate::sort::reverse_row_selection;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_execution::parquet_encryption::EncryptionFactory;
use futures::{Stream, StreamExt, TryStreamExt, ready};
use log::debug;
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData};
pub(super) struct ParquetOpener {
pub partition_index: usize,
pub projection: ProjectionExprs,
pub batch_size: usize,
pub limit: Option<usize>,
pub predicate: Option<Arc<dyn PhysicalExpr>>,
pub table_schema: TableSchema,
pub metadata_size_hint: Option<usize>,
pub metrics: ExecutionPlanMetricsSet,
pub parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
pub pushdown_filters: bool,
pub reorder_filters: bool,
pub force_filter_selections: bool,
pub enable_page_index: bool,
pub enable_bloom_filter: bool,
pub enable_row_group_stats_pruning: bool,
pub coerce_int96: Option<TimeUnit>,
#[cfg(feature = "parquet_encryption")]
pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
pub(crate) expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
#[cfg(feature = "parquet_encryption")]
pub encryption_factory:
Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
pub max_predicate_cache_size: Option<usize>,
pub reverse_row_groups: bool,
}
pub(crate) struct PreparedAccessPlan {
pub(crate) row_group_indexes: Vec<usize>,
pub(crate) row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
}
impl PreparedAccessPlan {
pub(crate) fn from_access_plan(
access_plan: ParquetAccessPlan,
rg_metadata: &[RowGroupMetaData],
) -> Result<Self> {
let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(rg_metadata)?;
Ok(Self {
row_group_indexes,
row_selection,
})
}
pub(crate) fn reverse(
mut self,
file_metadata: &parquet::file::metadata::ParquetMetaData,
) -> Result<Self> {
let row_groups_to_scan = self.row_group_indexes.clone();
self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect();
if let Some(row_selection) = self.row_selection {
self.row_selection = Some(reverse_row_selection(
&row_selection,
file_metadata,
&row_groups_to_scan, )?);
}
Ok(self)
}
fn apply_to_builder(
self,
mut builder: ParquetRecordBatchStreamBuilder<Box<dyn AsyncFileReader>>,
) -> ParquetRecordBatchStreamBuilder<Box<dyn AsyncFileReader>> {
if let Some(row_selection) = self.row_selection {
builder = builder.with_row_selection(row_selection);
}
builder.with_row_groups(self.row_group_indexes)
}
}
impl FileOpener for ParquetOpener {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let file_range = partitioned_file.range.clone();
let extensions = partitioned_file.extensions.clone();
let file_location = partitioned_file.object_meta.location.clone();
let file_name = file_location.to_string();
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);
let metadata_size_hint = partitioned_file
.metadata_size_hint
.or(self.metadata_size_hint);
let mut async_file_reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
self.partition_index,
partitioned_file.clone(),
metadata_size_hint,
&self.metrics,
)?;
let batch_size = self.batch_size;
let logical_file_schema = Arc::clone(self.table_schema.file_schema());
let output_schema = Arc::new(
self.projection
.project_schema(self.table_schema.table_schema())?,
);
let mut literal_columns: HashMap<String, ScalarValue> = self
.table_schema
.table_partition_cols()
.iter()
.zip(partitioned_file.partition_values.iter())
.map(|(field, value)| (field.name().clone(), value.clone()))
.collect();
literal_columns.extend(constant_columns_from_stats(
partitioned_file.statistics.as_deref(),
&logical_file_schema,
));
let mut projection = self.projection.clone();
let mut predicate = self.predicate.clone();
if !literal_columns.is_empty() {
projection = projection.try_map_exprs(|expr| {
replace_columns_with_literals(Arc::clone(&expr), &literal_columns)
})?;
predicate = predicate
.map(|p| replace_columns_with_literals(p, &literal_columns))
.transpose()?;
}
let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
let force_filter_selections = self.force_filter_selections;
let coerce_int96 = self.coerce_int96;
let enable_bloom_filter = self.enable_bloom_filter;
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
let limit = self.limit;
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
.global_counter("num_predicate_creation_errors");
let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
let enable_page_index = self.enable_page_index;
#[cfg(feature = "parquet_encryption")]
let encryption_context = self.get_encryption_context();
let max_predicate_cache_size = self.max_predicate_cache_size;
let reverse_row_groups = self.reverse_row_groups;
Ok(Box::pin(async move {
#[cfg(feature = "parquet_encryption")]
let file_decryption_properties = encryption_context
.get_file_decryption_properties(&file_location)
.await?;
let mut file_pruner = predicate
.as_ref()
.filter(|p| {
is_dynamic_physical_expr(p) || partitioned_file.has_statistics()
})
.and_then(|p| {
FilePruner::try_new(
Arc::clone(p),
&logical_file_schema,
&partitioned_file,
predicate_creation_errors.clone(),
)
});
if let Some(file_pruner) = &mut file_pruner
&& file_pruner.should_prune()?
{
file_metrics.files_ranges_pruned_statistics.add_pruned(1);
return Ok(futures::stream::empty().boxed());
}
file_metrics.files_ranges_pruned_statistics.add_matched(1);
let mut options = ArrowReaderOptions::new().with_page_index(false);
#[cfg(feature = "parquet_encryption")]
if let Some(fd_val) = file_decryption_properties {
options = options.with_file_decryption_properties(Arc::clone(&fd_val));
}
let mut metadata_timer = file_metrics.metadata_load_time.timer();
let mut reader_metadata =
ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone())
.await?;
let mut physical_file_schema = Arc::clone(reader_metadata.schema());
if let Some(merged) = apply_file_schema_type_coercions(
&logical_file_schema,
&physical_file_schema,
) {
physical_file_schema = Arc::new(merged);
options = options.with_schema(Arc::clone(&physical_file_schema));
reader_metadata = ArrowReaderMetadata::try_new(
Arc::clone(reader_metadata.metadata()),
options.clone(),
)?;
}
if let Some(ref coerce) = coerce_int96
&& let Some(merged) = coerce_int96_to_resolution(
reader_metadata.parquet_schema(),
&physical_file_schema,
coerce,
)
{
physical_file_schema = Arc::new(merged);
options = options.with_schema(Arc::clone(&physical_file_schema));
reader_metadata = ArrowReaderMetadata::try_new(
Arc::clone(reader_metadata.metadata()),
options.clone(),
)?;
}
let rewriter = expr_adapter_factory.create(
Arc::clone(&logical_file_schema),
Arc::clone(&physical_file_schema),
);
let simplifier = PhysicalExprSimplifier::new(&physical_file_schema);
predicate = predicate
.map(|p| simplifier.simplify(rewriter.rewrite(p)?))
.transpose()?;
projection = projection
.try_map_exprs(|p| simplifier.simplify(rewriter.rewrite(p)?))?;
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
predicate.as_ref(),
&physical_file_schema,
&predicate_creation_errors,
);
if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
reader_metadata = load_page_index(
reader_metadata,
&mut async_file_reader,
options.with_page_index(true),
)
.await?;
}
metadata_timer.stop();
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
async_file_reader,
reader_metadata,
);
let indices = projection.column_indices();
let mask = ProjectionMask::roots(builder.parquet_schema(), indices);
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
&physical_file_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
);
match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
}
Ok(None) => {}
Err(e) => {
debug!(
"Ignoring error building row filter for '{predicate:?}': {e}"
);
}
};
};
if force_filter_selections {
builder =
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
}
let file_metadata = Arc::clone(builder.metadata());
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
let access_plan =
create_initial_plan(&file_name, extensions, rg_metadata.len())?;
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
if let Some(range) = file_range.as_ref() {
row_groups.prune_by_range(rg_metadata, range);
}
if let Some(predicate) = predicate.as_ref() {
if enable_row_group_stats_pruning {
row_groups.prune_by_statistics(
&physical_file_schema,
builder.parquet_schema(),
rg_metadata,
predicate,
&file_metrics,
);
} else {
file_metrics
.row_groups_pruned_statistics
.add_matched(row_groups.remaining_row_group_count());
}
if enable_bloom_filter && !row_groups.is_empty() {
row_groups
.prune_by_bloom_filters(
&physical_file_schema,
&mut builder,
predicate,
&file_metrics,
)
.await;
} else {
file_metrics
.row_groups_pruned_bloom_filter
.add_matched(row_groups.remaining_row_group_count());
}
} else {
let n_remaining_row_groups = row_groups.remaining_row_group_count();
file_metrics
.row_groups_pruned_statistics
.add_matched(n_remaining_row_groups);
file_metrics
.row_groups_pruned_bloom_filter
.add_matched(n_remaining_row_groups);
}
let mut access_plan = row_groups.build();
if enable_page_index
&& !access_plan.is_empty()
&& let Some(p) = page_pruning_predicate
{
access_plan = p.prune_plan_with_page_index(
access_plan,
&physical_file_schema,
builder.parquet_schema(),
file_metadata.as_ref(),
&file_metrics,
);
}
let mut prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)?;
if reverse_row_groups {
prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
}
builder = prepared_plan.apply_to_builder(builder);
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
if let Some(max_predicate_cache_size) = max_predicate_cache_size {
builder = builder.with_max_predicate_cache_size(max_predicate_cache_size);
}
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
let stream = builder
.with_projection(mask)
.with_batch_size(batch_size)
.with_metrics(arrow_reader_metrics.clone())
.build()?;
let files_ranges_pruned_statistics =
file_metrics.files_ranges_pruned_statistics.clone();
let predicate_cache_inner_records =
file_metrics.predicate_cache_inner_records.clone();
let predicate_cache_records = file_metrics.predicate_cache_records.clone();
let stream_schema = Arc::clone(stream.schema());
let replace_schema = !stream_schema.eq(&output_schema);
let projection = projection
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
let projector = projection.make_projector(&stream_schema)?;
let stream = stream.map_err(DataFusionError::from).map(move |b| {
b.and_then(|mut b| {
copy_arrow_reader_metrics(
&arrow_reader_metrics,
&predicate_cache_inner_records,
&predicate_cache_records,
);
b = projector.project_batch(&b)?;
if replace_schema {
let (_stream_schema, arrays, num_rows) = b.into_parts();
let options =
RecordBatchOptions::new().with_row_count(Some(num_rows));
RecordBatch::try_new_with_options(
Arc::clone(&output_schema),
arrays,
&options,
)
.map_err(Into::into)
} else {
Ok(b)
}
})
});
if let Some(file_pruner) = file_pruner {
Ok(EarlyStoppingStream::new(
stream,
file_pruner,
files_ranges_pruned_statistics,
)
.boxed())
} else {
Ok(stream.boxed())
}
}))
}
}
fn copy_arrow_reader_metrics(
arrow_reader_metrics: &ArrowReaderMetrics,
predicate_cache_inner_records: &Count,
predicate_cache_records: &Count,
) {
if let Some(v) = arrow_reader_metrics.records_read_from_inner() {
predicate_cache_inner_records.add(v);
}
if let Some(v) = arrow_reader_metrics.records_read_from_cache() {
predicate_cache_records.add(v);
}
}
type ConstantColumns = HashMap<String, ScalarValue>;
fn constant_columns_from_stats(
statistics: Option<&Statistics>,
file_schema: &SchemaRef,
) -> ConstantColumns {
let mut constants = HashMap::new();
let Some(statistics) = statistics else {
return constants;
};
let num_rows = match statistics.num_rows {
Precision::Exact(num_rows) => Some(num_rows),
_ => None,
};
for (idx, column_stats) in statistics
.column_statistics
.iter()
.take(file_schema.fields().len())
.enumerate()
{
let field = file_schema.field(idx);
if let Some(value) =
constant_value_from_stats(column_stats, num_rows, field.data_type())
{
constants.insert(field.name().clone(), value);
}
}
constants
}
fn constant_value_from_stats(
column_stats: &ColumnStatistics,
num_rows: Option<usize>,
data_type: &DataType,
) -> Option<ScalarValue> {
if let (Precision::Exact(min), Precision::Exact(max)) =
(&column_stats.min_value, &column_stats.max_value)
&& min == max
&& !min.is_null()
&& matches!(column_stats.null_count, Precision::Exact(0))
{
if min.data_type() != *data_type {
return min.cast_to(data_type).ok();
}
return Some(min.clone());
}
if let (Some(num_rows), Precision::Exact(nulls)) =
(num_rows, &column_stats.null_count)
&& *nulls == num_rows
{
return ScalarValue::try_new_null(data_type).ok();
}
None
}
struct EarlyStoppingStream<S> {
done: bool,
file_pruner: FilePruner,
files_ranges_pruned_statistics: PruningMetrics,
inner: S,
}
impl<S> EarlyStoppingStream<S> {
pub fn new(
stream: S,
file_pruner: FilePruner,
files_ranges_pruned_statistics: PruningMetrics,
) -> Self {
Self {
done: false,
inner: stream,
file_pruner,
files_ranges_pruned_statistics,
}
}
}
impl<S> EarlyStoppingStream<S>
where
S: Stream<Item = Result<RecordBatch>> + Unpin,
{
fn check_prune(&mut self, input: Result<RecordBatch>) -> Result<Option<RecordBatch>> {
let batch = input?;
if self.file_pruner.should_prune()? {
self.files_ranges_pruned_statistics.add_pruned(1);
self.files_ranges_pruned_statistics.subtract_matched(1);
self.done = true;
Ok(None)
} else {
Ok(Some(batch))
}
}
}
impl<S> Stream for EarlyStoppingStream<S>
where
S: Stream<Item = Result<RecordBatch>> + Unpin,
{
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}
match ready!(self.inner.poll_next_unpin(cx)) {
None => {
self.done = true;
Poll::Ready(None)
}
Some(input_batch) => {
let output = self.check_prune(input_batch);
Poll::Ready(output.transpose())
}
}
}
}
#[derive(Default)]
struct EncryptionContext {
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
#[cfg(feature = "parquet_encryption")]
encryption_factory: Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
}
#[cfg(feature = "parquet_encryption")]
impl EncryptionContext {
fn new(
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
encryption_factory: Option<(
Arc<dyn EncryptionFactory>,
EncryptionFactoryOptions,
)>,
) -> Self {
Self {
file_decryption_properties,
encryption_factory,
}
}
async fn get_file_decryption_properties(
&self,
file_location: &object_store::path::Path,
) -> Result<Option<Arc<FileDecryptionProperties>>> {
match &self.file_decryption_properties {
Some(file_decryption_properties) => {
Ok(Some(Arc::clone(file_decryption_properties)))
}
None => match &self.encryption_factory {
Some((encryption_factory, encryption_config)) => Ok(encryption_factory
.get_file_decryption_properties(encryption_config, file_location)
.await?),
None => Ok(None),
},
}
}
}
#[cfg(not(feature = "parquet_encryption"))]
#[expect(dead_code)]
impl EncryptionContext {
async fn get_file_decryption_properties(
&self,
_file_location: &object_store::path::Path,
) -> Result<Option<Arc<FileDecryptionProperties>>> {
Ok(None)
}
}
impl ParquetOpener {
#[cfg(feature = "parquet_encryption")]
fn get_encryption_context(&self) -> EncryptionContext {
EncryptionContext::new(
self.file_decryption_properties.clone(),
self.encryption_factory.clone(),
)
}
#[cfg(not(feature = "parquet_encryption"))]
#[expect(dead_code)]
fn get_encryption_context(&self) -> EncryptionContext {
EncryptionContext::default()
}
}
fn create_initial_plan(
file_name: &str,
extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
row_group_count: usize,
) -> Result<ParquetAccessPlan> {
if let Some(extensions) = extensions {
if let Some(access_plan) = extensions.downcast_ref::<ParquetAccessPlan>() {
let plan_len = access_plan.len();
if plan_len != row_group_count {
return exec_err!(
"Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}"
);
}
return Ok(access_plan.clone());
} else {
debug!("DataSourceExec Ignoring unknown extension specified for {file_name}");
}
}
Ok(ParquetAccessPlan::new_all(row_group_count))
}
pub(crate) fn build_page_pruning_predicate(
predicate: &Arc<dyn PhysicalExpr>,
file_schema: &SchemaRef,
) -> Arc<PagePruningAccessPlanFilter> {
Arc::new(PagePruningAccessPlanFilter::new(
predicate,
Arc::clone(file_schema),
))
}
pub(crate) fn build_pruning_predicates(
predicate: Option<&Arc<dyn PhysicalExpr>>,
file_schema: &SchemaRef,
predicate_creation_errors: &Count,
) -> (
Option<Arc<PruningPredicate>>,
Option<Arc<PagePruningAccessPlanFilter>>,
) {
let Some(predicate) = predicate.as_ref() else {
return (None, None);
};
let pruning_predicate = build_pruning_predicate(
Arc::clone(predicate),
file_schema,
predicate_creation_errors,
);
let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema);
(pruning_predicate, Some(page_pruning_predicate))
}
async fn load_page_index<T: AsyncFileReader>(
reader_metadata: ArrowReaderMetadata,
input: &mut T,
options: ArrowReaderOptions,
) -> Result<ArrowReaderMetadata> {
let parquet_metadata = reader_metadata.metadata();
let missing_column_index = parquet_metadata.column_index().is_none();
let missing_offset_index = parquet_metadata.offset_index().is_none();
if missing_column_index || missing_offset_index {
let m = Arc::try_unwrap(Arc::clone(parquet_metadata))
.unwrap_or_else(|e| e.as_ref().clone());
let mut reader = ParquetMetaDataReader::new_with_metadata(m)
.with_page_index_policy(PageIndexPolicy::Optional);
reader.load_page_index(input).await?;
let new_parquet_metadata = reader.finish()?;
let new_arrow_reader =
ArrowReaderMetadata::try_new(Arc::new(new_parquet_metadata), options)?;
Ok(new_arrow_reader)
} else {
Ok(reader_metadata)
}
}
fn should_enable_page_index(
enable_page_index: bool,
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
) -> bool {
enable_page_index
&& page_pruning_predicate.is_some()
&& page_pruning_predicate
.as_ref()
.map(|p| p.filter_number() > 0)
.unwrap_or(false)
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use super::{ConstantColumns, constant_columns_from_stats};
use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use bytes::{BufMut, BytesMut};
use datafusion_common::{
ColumnStatistics, DataFusionError, ScalarValue, Statistics, record_batch,
stats::Precision,
};
use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener};
use datafusion_expr::{col, lit};
use datafusion_physical_expr::{
PhysicalExpr,
expressions::{Column, DynamicFilterPhysicalExpr, Literal},
planner::logical2physical,
projection::ProjectionExprs,
};
use datafusion_physical_expr_adapter::{
DefaultPhysicalExprAdapterFactory, replace_columns_with_literals,
};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{Stream, StreamExt};
use object_store::{ObjectStore, memory::InMemory, path::Path};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
struct ParquetOpenerBuilder {
store: Option<Arc<dyn ObjectStore>>,
table_schema: Option<TableSchema>,
partition_index: usize,
projection_indices: Option<Vec<usize>>,
projection: Option<ProjectionExprs>,
batch_size: usize,
limit: Option<usize>,
predicate: Option<Arc<dyn PhysicalExpr>>,
metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
pushdown_filters: bool,
reorder_filters: bool,
force_filter_selections: bool,
enable_page_index: bool,
enable_bloom_filter: bool,
enable_row_group_stats_pruning: bool,
coerce_int96: Option<arrow::datatypes::TimeUnit>,
max_predicate_cache_size: Option<usize>,
reverse_row_groups: bool,
}
impl ParquetOpenerBuilder {
fn new() -> Self {
Self {
store: None,
table_schema: None,
partition_index: 0,
projection_indices: None,
projection: None,
batch_size: 1024,
limit: None,
predicate: None,
metadata_size_hint: None,
metrics: ExecutionPlanMetricsSet::new(),
pushdown_filters: false,
reorder_filters: false,
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
enable_row_group_stats_pruning: false,
coerce_int96: None,
max_predicate_cache_size: None,
reverse_row_groups: false,
}
}
fn with_store(mut self, store: Arc<dyn ObjectStore>) -> Self {
self.store = Some(store);
self
}
fn with_schema(mut self, file_schema: SchemaRef) -> Self {
self.table_schema = Some(TableSchema::from_file_schema(file_schema));
self
}
fn with_table_schema(mut self, table_schema: TableSchema) -> Self {
self.table_schema = Some(table_schema);
self
}
fn with_projection_indices(mut self, indices: &[usize]) -> Self {
self.projection_indices = Some(indices.to_vec());
self
}
fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
self.predicate = Some(predicate);
self
}
fn with_pushdown_filters(mut self, enable: bool) -> Self {
self.pushdown_filters = enable;
self
}
fn with_reorder_filters(mut self, enable: bool) -> Self {
self.reorder_filters = enable;
self
}
fn with_row_group_stats_pruning(mut self, enable: bool) -> Self {
self.enable_row_group_stats_pruning = enable;
self
}
fn with_reverse_row_groups(mut self, enable: bool) -> Self {
self.reverse_row_groups = enable;
self
}
fn build(self) -> ParquetOpener {
let store = self
.store
.expect("ParquetOpenerBuilder: store must be set via with_store()");
let table_schema = self.table_schema.expect(
"ParquetOpenerBuilder: table_schema must be set via with_schema() or with_table_schema()",
);
let file_schema = Arc::clone(table_schema.file_schema());
let projection = if let Some(projection) = self.projection {
projection
} else if let Some(indices) = self.projection_indices {
ProjectionExprs::from_indices(&indices, &file_schema)
} else {
let all_indices: Vec<usize> = (0..file_schema.fields().len()).collect();
ProjectionExprs::from_indices(&all_indices, &file_schema)
};
ParquetOpener {
partition_index: self.partition_index,
projection,
batch_size: self.batch_size,
limit: self.limit,
predicate: self.predicate,
table_schema,
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics,
parquet_file_reader_factory: Arc::new(
DefaultParquetFileReaderFactory::new(store),
),
pushdown_filters: self.pushdown_filters,
reorder_filters: self.reorder_filters,
force_filter_selections: self.force_filter_selections,
enable_page_index: self.enable_page_index,
enable_bloom_filter: self.enable_bloom_filter,
enable_row_group_stats_pruning: self.enable_row_group_stats_pruning,
coerce_int96: self.coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: None,
expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
#[cfg(feature = "parquet_encryption")]
encryption_factory: None,
max_predicate_cache_size: self.max_predicate_cache_size,
reverse_row_groups: self.reverse_row_groups,
}
}
}
fn constant_int_stats() -> (Statistics, SchemaRef) {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let statistics = Statistics {
num_rows: Precision::Exact(3),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::from(5i32)),
min_value: Precision::Exact(ScalarValue::from(5i32)),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Absent,
},
ColumnStatistics::new_unknown(),
],
};
(statistics, schema)
}
#[test]
fn extract_constant_columns_non_null() {
let (statistics, schema) = constant_int_stats();
let constants = constant_columns_from_stats(Some(&statistics), &schema);
assert_eq!(constants.len(), 1);
assert_eq!(constants.get("a"), Some(&ScalarValue::from(5i32)));
assert!(!constants.contains_key("b"));
}
#[test]
fn extract_constant_columns_all_null() {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let statistics = Statistics {
num_rows: Precision::Exact(2),
total_byte_size: Precision::Absent,
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(2),
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Absent,
}],
};
let constants = constant_columns_from_stats(Some(&statistics), &schema);
assert_eq!(
constants.get("a"),
Some(&ScalarValue::Utf8(None)),
"all-null column should be treated as constant null"
);
}
#[test]
fn rewrite_projection_to_literals() {
let (statistics, schema) = constant_int_stats();
let constants = constant_columns_from_stats(Some(&statistics), &schema);
let projection = ProjectionExprs::from_indices(&[0, 1], &schema);
let rewritten = projection
.try_map_exprs(|expr| replace_columns_with_literals(expr, &constants))
.unwrap();
let exprs = rewritten.as_ref();
assert!(exprs[0].expr.as_any().downcast_ref::<Literal>().is_some());
assert!(exprs[1].expr.as_any().downcast_ref::<Column>().is_some());
assert_eq!(rewritten.column_indices(), vec![1]);
}
#[test]
fn rewrite_physical_expr_literal() {
let mut constants = ConstantColumns::new();
constants.insert("a".to_string(), ScalarValue::from(7i32));
let expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
let rewritten = replace_columns_with_literals(expr, &constants).unwrap();
assert!(rewritten.as_any().downcast_ref::<Literal>().is_some());
}
async fn count_batches_and_rows(
mut stream: std::pin::Pin<
Box<
dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
+ Send,
>,
>,
) -> (usize, usize) {
let mut num_batches = 0;
let mut num_rows = 0;
while let Some(Ok(batch)) = stream.next().await {
num_rows += batch.num_rows();
num_batches += 1;
}
(num_batches, num_rows)
}
async fn collect_int32_values(
mut stream: std::pin::Pin<
Box<
dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
+ Send,
>,
>,
) -> Vec<i32> {
use arrow::array::Array;
let mut values = vec![];
while let Some(Ok(batch)) = stream.next().await {
let array = batch
.column(0)
.as_any()
.downcast_ref::<arrow::array::Int32Array>()
.unwrap();
for i in 0..array.len() {
if !array.is_null(i) {
values.push(array.value(i));
}
}
}
values
}
async fn write_parquet(
store: Arc<dyn ObjectStore>,
filename: &str,
batch: arrow::record_batch::RecordBatch,
) -> usize {
write_parquet_batches(store, filename, vec![batch], None).await
}
async fn write_parquet_batches(
store: Arc<dyn ObjectStore>,
filename: &str,
batches: Vec<arrow::record_batch::RecordBatch>,
props: Option<WriterProperties>,
) -> usize {
let mut out = BytesMut::new().writer();
{
let schema = batches[0].schema();
let mut writer = ArrowWriter::try_new(&mut out, schema, props).unwrap();
for batch in batches {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
}
let data = out.into_inner().freeze();
let data_len = data.len();
store.put(&Path::from(filename), data.into()).await.unwrap();
data_len
}
fn make_dynamic_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
Arc::new(DynamicFilterPhysicalExpr::new(
expr.children().into_iter().map(Arc::clone).collect(),
expr,
))
}
#[tokio::test]
async fn test_prune_on_statistics() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let batch = record_batch!(
("a", Int32, vec![Some(1), Some(2), Some(2)]),
("b", Float32, vec![Some(1.0), Some(2.0), None])
)
.unwrap();
let data_size =
write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await;
let schema = batch.schema();
let file = PartitionedFile::new(
"test.parquet".to_string(),
u64::try_from(data_size).unwrap(),
)
.with_statistics(Arc::new(
Statistics::new_unknown(&schema)
.add_column_statistics(ColumnStatistics::new_unknown())
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Float32(Some(1.0))))
.with_max_value(Precision::Exact(ScalarValue::Float32(Some(2.0))))
.with_null_count(Precision::Exact(1)),
),
));
let make_opener = |predicate| {
ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0, 1])
.with_predicate(predicate)
.with_row_group_stats_pruning(true)
.build()
};
let expr = col("a").eq(lit(1));
let predicate = logical2physical(&expr, &schema);
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 1);
assert_eq!(num_rows, 3);
let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
let predicate = logical2physical(&expr, &schema);
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
#[tokio::test]
async fn test_prune_on_partition_statistics_with_dynamic_expression() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
let data_size =
write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
let file_schema = batch.schema();
let mut file = PartitionedFile::new(
"part=1/file.parquet".to_string(),
u64::try_from(data_size).unwrap(),
);
file.partition_values = vec![ScalarValue::Int32(Some(1))];
let table_schema = Arc::new(Schema::new(vec![
Field::new("part", DataType::Int32, false),
Field::new("a", DataType::Int32, false),
]));
let table_schema_for_opener = TableSchema::new(
file_schema.clone(),
vec![Arc::new(Field::new("part", DataType::Int32, false))],
);
let make_opener = |predicate| {
ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_table_schema(table_schema_for_opener.clone())
.with_projection_indices(&[0])
.with_predicate(predicate)
.with_row_group_stats_pruning(true)
.build()
};
let expr = col("part").eq(lit(1));
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 1);
assert_eq!(num_rows, 3);
let expr = col("part").eq(lit(2));
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
#[tokio::test]
async fn test_prune_on_partition_values_and_file_statistics() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let batch = record_batch!(
("a", Int32, vec![Some(1), Some(2), Some(3)]),
("b", Float64, vec![Some(1.0), Some(2.0), None])
)
.unwrap();
let data_size =
write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
let file_schema = batch.schema();
let mut file = PartitionedFile::new(
"part=1/file.parquet".to_string(),
u64::try_from(data_size).unwrap(),
);
file.partition_values = vec![ScalarValue::Int32(Some(1))];
file.statistics = Some(Arc::new(
Statistics::new_unknown(&file_schema)
.add_column_statistics(ColumnStatistics::new_unknown())
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Float64(Some(1.0))))
.with_max_value(Precision::Exact(ScalarValue::Float64(Some(2.0))))
.with_null_count(Precision::Exact(1)),
),
));
let table_schema = Arc::new(Schema::new(vec![
Field::new("part", DataType::Int32, false),
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Float32, true),
]));
let table_schema_for_opener = TableSchema::new(
file_schema.clone(),
vec![Arc::new(Field::new("part", DataType::Int32, false))],
);
let make_opener = |predicate| {
ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_table_schema(table_schema_for_opener.clone())
.with_projection_indices(&[0])
.with_predicate(predicate)
.with_row_group_stats_pruning(true)
.build()
};
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
let predicate = logical2physical(&expr, &table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 1);
assert_eq!(num_rows, 3);
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
let predicate = logical2physical(&expr, &table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
let predicate = logical2physical(&expr, &table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
let predicate = logical2physical(&expr, &table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
#[tokio::test]
async fn test_prune_on_partition_value_and_data_value() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(4)])).unwrap();
let data_size =
write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
let file_schema = batch.schema();
let mut file = PartitionedFile::new(
"part=1/file.parquet".to_string(),
u64::try_from(data_size).unwrap(),
);
file.partition_values = vec![ScalarValue::Int32(Some(1))];
let table_schema = Arc::new(Schema::new(vec![
Field::new("part", DataType::Int32, false),
Field::new("a", DataType::Int32, false),
]));
let table_schema_for_opener = TableSchema::new(
file_schema.clone(),
vec![Arc::new(Field::new("part", DataType::Int32, false))],
);
let make_opener = |predicate| {
ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_table_schema(table_schema_for_opener.clone())
.with_projection_indices(&[0])
.with_predicate(predicate)
.with_pushdown_filters(true) .with_reorder_filters(true)
.build()
};
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
let predicate = logical2physical(&expr, &table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 1);
assert_eq!(num_rows, 3);
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
let predicate = logical2physical(&expr, &table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 1);
assert_eq!(num_rows, 3);
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
let predicate = logical2physical(&expr, &table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 1);
assert_eq!(num_rows, 1);
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
let predicate = logical2physical(&expr, &table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
#[tokio::test]
async fn test_opener_pruning_skipped_on_static_filters() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
let data_size =
write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
let file_schema = batch.schema();
let mut file = PartitionedFile::new(
"part=1/file.parquet".to_string(),
u64::try_from(data_size).unwrap(),
);
file.partition_values = vec![ScalarValue::Int32(Some(1))];
file.statistics = Some(Arc::new(
Statistics::default().add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(1))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(3))))
.with_null_count(Precision::Exact(0)),
),
));
let table_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("part", DataType::Int32, false),
]));
let table_schema_for_opener = TableSchema::new(
file_schema.clone(),
vec![Arc::new(Field::new("part", DataType::Int32, false))],
);
let make_opener = |predicate| {
ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_table_schema(table_schema_for_opener.clone())
.with_projection_indices(&[0])
.with_predicate(predicate)
.build()
};
let expr = col("a").eq(lit(42));
let predicate = logical2physical(&expr, &table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
file.statistics = Some(Arc::new(Statistics::new_unknown(&file_schema)));
let expr = col("part").eq(lit(2));
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
let expr = col("part").eq(lit(2)).and(col("a").eq(lit(42)));
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
#[tokio::test]
async fn test_reverse_scan_row_groups() {
use parquet::file::properties::WriterProperties;
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let batch1 =
record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
let batch2 =
record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap();
let batch3 =
record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap();
let props = WriterProperties::builder()
.set_max_row_group_size(3) .build();
let data_len = write_parquet_batches(
Arc::clone(&store),
"test.parquet",
vec![batch1.clone(), batch2, batch3],
Some(props),
)
.await;
let schema = batch1.schema();
let file = PartitionedFile::new(
"test.parquet".to_string(),
u64::try_from(data_len).unwrap(),
);
let make_opener = |reverse_scan: bool| {
ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_reverse_row_groups(reverse_scan)
.build()
};
let opener = make_opener(false);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let forward_values = collect_int32_values(stream).await;
let opener = make_opener(true);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let reverse_values = collect_int32_values(stream).await;
assert_eq!(forward_values, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(reverse_values, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
}
#[tokio::test]
async fn test_reverse_scan_single_row_group() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
let data_size =
write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await;
let schema = batch.schema();
let file = PartitionedFile::new(
"test.parquet".to_string(),
u64::try_from(data_size).unwrap(),
);
let make_opener = |reverse_scan: bool| {
ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_reverse_row_groups(reverse_scan)
.build()
};
let opener_forward = make_opener(false);
let stream_forward = opener_forward.open(file.clone()).unwrap().await.unwrap();
let (batches_forward, _) = count_batches_and_rows(stream_forward).await;
let opener_reverse = make_opener(true);
let stream_reverse = opener_reverse.open(file).unwrap().await.unwrap();
let (batches_reverse, _) = count_batches_and_rows(stream_reverse).await;
assert_eq!(batches_forward, batches_reverse);
}
#[tokio::test]
async fn test_reverse_scan_with_row_selection() {
use parquet::file::properties::WriterProperties;
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let batch1 =
record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3), Some(4)]))
.unwrap(); let batch2 =
record_batch!(("a", Int32, vec![Some(5), Some(6), Some(7), Some(8)]))
.unwrap(); let batch3 =
record_batch!(("a", Int32, vec![Some(9), Some(10), Some(11), Some(12)]))
.unwrap();
let props = WriterProperties::builder()
.set_max_row_group_size(4)
.build();
let data_len = write_parquet_batches(
Arc::clone(&store),
"test.parquet",
vec![batch1.clone(), batch2, batch3],
Some(props),
)
.await;
let schema = batch1.schema();
use crate::ParquetAccessPlan;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
let mut access_plan = ParquetAccessPlan::new_all(3);
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::skip(2), RowSelector::select(2)]),
);
access_plan.scan_selection(
2,
RowSelection::from(vec![RowSelector::select(2), RowSelector::skip(2)]),
);
let file = PartitionedFile::new(
"test.parquet".to_string(),
u64::try_from(data_len).unwrap(),
)
.with_extensions(Arc::new(access_plan));
let make_opener = |reverse_scan: bool| {
ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_reverse_row_groups(reverse_scan)
.build()
};
let opener = make_opener(false);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let forward_values = collect_int32_values(stream).await;
assert_eq!(
forward_values,
vec![3, 4, 5, 6, 7, 8, 9, 10],
"Forward scan should select correct rows based on RowSelection"
);
let opener = make_opener(true);
let stream = opener.open(file).unwrap().await.unwrap();
let reverse_values = collect_int32_values(stream).await;
assert_eq!(
reverse_values,
vec![9, 10, 5, 6, 7, 8, 3, 4],
"Reverse scan should reverse row group order while maintaining correct RowSelection for each group"
);
}
#[tokio::test]
async fn test_reverse_scan_with_non_contiguous_row_groups() {
use parquet::file::properties::WriterProperties;
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let batch0 = record_batch!(("a", Int32, vec![Some(1), Some(2)])).unwrap();
let batch1 = record_batch!(("a", Int32, vec![Some(3), Some(4)])).unwrap();
let batch2 = record_batch!(("a", Int32, vec![Some(5), Some(6)])).unwrap();
let batch3 = record_batch!(("a", Int32, vec![Some(7), Some(8)])).unwrap();
let props = WriterProperties::builder()
.set_max_row_group_size(2)
.build();
let data_len = write_parquet_batches(
Arc::clone(&store),
"test.parquet",
vec![batch0.clone(), batch1, batch2, batch3],
Some(props),
)
.await;
let schema = batch0.schema();
use crate::ParquetAccessPlan;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
let mut access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan, RowGroupAccess::Skip, RowGroupAccess::Scan, RowGroupAccess::Scan, ]);
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
);
access_plan.scan_selection(
2,
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
);
access_plan.scan_selection(
3,
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
);
let file = PartitionedFile::new(
"test.parquet".to_string(),
u64::try_from(data_len).unwrap(),
)
.with_extensions(Arc::new(access_plan));
let make_opener = |reverse_scan: bool| {
ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_reverse_row_groups(reverse_scan)
.build()
};
let opener = make_opener(false);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let forward_values = collect_int32_values(stream).await;
assert_eq!(
forward_values,
vec![1, 5, 7],
"Forward scan with non-contiguous row groups"
);
let opener = make_opener(true);
let stream = opener.open(file).unwrap().await.unwrap();
let reverse_values = collect_int32_values(stream).await;
assert_eq!(
reverse_values,
vec![7, 5, 1],
"Reverse scan with non-contiguous row groups should correctly map RowSelection"
);
}
}