use arrow_schema::Schema;
use datafusion_expr::Expr;
use parquet::file::metadata::ParquetMetaData;
use super::{
api::{prune_compiled_with_bloom, prune_compiled_with_bloom_provider},
options::{PruneOptions, PruneOptionsBuilder},
provider::AsyncBloomFilterProvider,
result::PruneResult,
};
use crate::compile::{CompileResult, compile_pruning_ir};
#[derive(Debug)]
pub struct PruneRequest<'a> {
metadata: &'a ParquetMetaData,
schema: &'a Schema,
expr: Option<&'a Expr>,
options: PruneOptionsBuilder,
}
impl<'a> PruneRequest<'a> {
pub fn new(metadata: &'a ParquetMetaData, schema: &'a Schema) -> Self {
Self {
metadata,
schema,
expr: None,
options: PruneOptions::builder(),
}
}
pub fn with_predicate(mut self, expr: &'a Expr) -> Self {
self.expr = Some(expr);
self
}
pub fn enable_page_index(mut self, enable: bool) -> Self {
self.options = self.options.enable_page_index(enable);
self
}
pub fn enable_bloom_filter(mut self, enable: bool) -> Self {
self.options = self.options.enable_bloom_filter(enable);
self
}
pub fn emit_roaring(mut self, enable: bool) -> Self {
self.options = self.options.emit_roaring(enable);
self
}
pub fn allow_truncated_byte_array_ordering(mut self, enable: bool) -> Self {
self.options = self.options.allow_truncated_byte_array_ordering(enable);
self
}
pub fn prune(self) -> PruneResult {
let options = self.options.build();
if let Some(expr) = self.expr {
let compile = compile_pruning_ir(expr, self.schema);
prune_compiled_with_bloom(self.metadata, self.schema, compile, &options)
} else {
let row_groups: Vec<usize> = (0..self.metadata.num_row_groups()).collect();
PruneResult::new(row_groups, None, None, CompileResult::default())
}
}
pub async fn prune_async<P: AsyncBloomFilterProvider>(self, provider: &mut P) -> PruneResult {
let options = self.options.build();
if let Some(expr) = self.expr {
let compile = compile_pruning_ir(expr, self.schema);
prune_compiled_with_bloom_provider(
self.metadata,
self.schema,
compile,
&options,
provider,
)
.await
} else {
let row_groups: Vec<usize> = (0..self.metadata.num_row_groups()).collect();
PruneResult::new(row_groups, None, None, CompileResult::default())
}
}
}