use arrow_schema::{Schema, SchemaRef};
use datafusion_expr::Expr;
use parquet::{
arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStreamBuilder},
file::metadata::ParquetMetaData,
};
use crate::{
compile::{SchemaPathIndex, build_schema_path_index, compile_pruning_ir_with_index},
prune::{
AsyncBloomFilterProvider, PruneOptions, PruneResult, prune_compiled,
prune_compiled_with_bloom_provider,
},
};
#[derive(Debug, Clone)]
pub struct Pruner {
schema: SchemaRef,
schema_index: SchemaPathIndex,
options: PruneOptions,
}
impl Pruner {
pub fn try_new(schema: SchemaRef) -> Result<Self, String> {
Self::try_with_options(schema, PruneOptions::default())
}
pub fn try_with_options(schema: SchemaRef, options: PruneOptions) -> Result<Self, String> {
if schema.fields().is_empty() {
return Err("Schema must have at least one field".to_string());
}
let schema_index = build_schema_path_index(schema.as_ref());
Ok(Self {
schema,
schema_index,
options,
})
}
pub fn schema(&self) -> &Schema {
self.schema.as_ref()
}
pub fn options(&self) -> &PruneOptions {
&self.options
}
pub fn prune(&self, metadata: &ParquetMetaData, expr: &Expr) -> PruneResult {
let compile = compile_pruning_ir_with_index(expr, self.schema.as_ref(), &self.schema_index);
prune_compiled(metadata, self.schema.as_ref(), compile, &self.options)
}
pub async fn prune_with_async_reader<T: AsyncFileReader + 'static>(
&self,
builder: &mut ParquetRecordBatchStreamBuilder<T>,
expr: &Expr,
) -> PruneResult {
let compile = compile_pruning_ir_with_index(expr, self.schema.as_ref(), &self.schema_index);
let metadata = builder.metadata().clone();
prune_compiled_with_bloom_provider(
metadata.as_ref(),
self.schema.as_ref(),
compile,
&self.options,
builder,
)
.await
}
pub async fn prune_with_bloom_provider<P: AsyncBloomFilterProvider>(
&self,
metadata: &ParquetMetaData,
expr: &Expr,
provider: &mut P,
) -> PruneResult {
let compile = compile_pruning_ir_with_index(expr, self.schema.as_ref(), &self.schema_index);
prune_compiled_with_bloom_provider(
metadata,
self.schema.as_ref(),
compile,
&self.options,
provider,
)
.await
}
}