use crate::arrow::array_reader::ArrayReader;
use crate::arrow::arrow_reader::{
ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
};
use crate::errors::{ParquetError, Result};
use arrow_array::Array;
use arrow_select::filter::prep_null_mask_filter;
use std::collections::VecDeque;
#[derive(Clone)]
pub(crate) struct ReadPlanBuilder {
batch_size: usize,
selection: Option<RowSelection>,
}
impl ReadPlanBuilder {
pub(crate) fn new(batch_size: usize) -> Self {
Self {
batch_size,
selection: None,
}
}
pub(crate) fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
self.selection = selection;
self
}
#[cfg(feature = "async")]
pub(crate) fn selection(&self) -> Option<&RowSelection> {
self.selection.as_ref()
}
pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
LimitedReadPlanBuilder::new(self, row_count)
}
pub(crate) fn selects_any(&self) -> bool {
self.selection
.as_ref()
.map(|s| s.selects_any())
.unwrap_or(true)
}
#[cfg(feature = "async")]
pub(crate) fn num_rows_selected(&self) -> Option<usize> {
self.selection.as_ref().map(|s| s.row_count())
}
pub(crate) fn with_predicate(
mut self,
array_reader: Box<dyn ArrayReader>,
predicate: &mut dyn ArrowPredicate,
) -> Result<Self> {
let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
let mut filters = vec![];
for maybe_batch in reader {
let maybe_batch = maybe_batch?;
let input_rows = maybe_batch.num_rows();
let filter = predicate.evaluate(maybe_batch)?;
if filter.len() != input_rows {
return Err(arrow_err!(
"ArrowPredicate predicate returned {} rows, expected {input_rows}",
filter.len()
));
}
match filter.null_count() {
0 => filters.push(filter),
_ => filters.push(prep_null_mask_filter(&filter)),
};
}
let raw = RowSelection::from_filters(&filters);
self.selection = match self.selection.take() {
Some(selection) => Some(selection.and_then(&raw)),
None => Some(raw),
};
Ok(self)
}
pub(crate) fn build(mut self) -> ReadPlan {
if !self.selects_any() {
self.selection = Some(RowSelection::from(vec![]));
}
let Self {
batch_size,
selection,
} = self;
let selection = selection.map(|s| s.trim().into());
ReadPlan {
batch_size,
selection,
}
}
}
pub(crate) struct LimitedReadPlanBuilder {
inner: ReadPlanBuilder,
row_count: usize,
offset: Option<usize>,
limit: Option<usize>,
}
impl LimitedReadPlanBuilder {
fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
Self {
inner,
row_count,
offset: None,
limit: None,
}
}
pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
self.offset = offset;
self
}
pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}
pub(crate) fn build_limited(self) -> ReadPlanBuilder {
let Self {
mut inner,
row_count,
offset,
limit,
} = self;
if !inner.selects_any() {
inner.selection = Some(RowSelection::from(vec![]));
}
if let Some(offset) = offset {
inner.selection = Some(match row_count.checked_sub(offset) {
None => RowSelection::from(vec![]),
Some(remaining) => inner
.selection
.map(|selection| selection.offset(offset))
.unwrap_or_else(|| {
RowSelection::from(vec![
RowSelector::skip(offset),
RowSelector::select(remaining),
])
}),
});
}
if let Some(limit) = limit {
inner.selection = Some(
inner
.selection
.map(|selection| selection.limit(limit))
.unwrap_or_else(|| {
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
}),
);
}
inner
}
}
pub(crate) struct ReadPlan {
batch_size: usize,
selection: Option<VecDeque<RowSelector>>,
}
impl ReadPlan {
pub(crate) fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
self.selection.as_mut()
}
#[inline(always)]
pub fn batch_size(&self) -> usize {
self.batch_size
}
}