use crate::arrow::array_reader::ArrayReader;
use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
use crate::arrow::arrow_reader::{
ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
};
use crate::errors::{ParquetError, Result};
use arrow_array::Array;
use arrow_select::filter::prep_null_mask_filter;
use std::collections::VecDeque;
#[derive(Clone, Debug)]
pub struct ReadPlanBuilder {
batch_size: usize,
selection: Option<RowSelection>,
row_selection_policy: RowSelectionPolicy,
}
impl ReadPlanBuilder {
pub fn new(batch_size: usize) -> Self {
Self {
batch_size,
selection: None,
row_selection_policy: RowSelectionPolicy::default(),
}
}
pub fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
self.selection = selection;
self
}
pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
self.row_selection_policy = policy;
self
}
pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
&self.row_selection_policy
}
pub fn selection(&self) -> Option<&RowSelection> {
self.selection.as_ref()
}
pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
LimitedReadPlanBuilder::new(self, row_count)
}
pub fn selects_any(&self) -> bool {
self.selection
.as_ref()
.map(|s| s.selects_any())
.unwrap_or(true)
}
pub fn num_rows_selected(&self) -> Option<usize> {
self.selection.as_ref().map(|s| s.row_count())
}
pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
match self.row_selection_policy {
RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
RowSelectionPolicy::Auto { threshold, .. } => {
let selection = match self.selection.as_ref() {
Some(selection) => selection,
None => return RowSelectionStrategy::Selectors,
};
let (total_rows, effective_count) =
selection.iter().fold((0usize, 0usize), |(rows, count), s| {
if s.row_count > 0 {
(rows + s.row_count, count + 1)
} else {
(rows, count)
}
});
if effective_count == 0 {
return RowSelectionStrategy::Mask;
}
if total_rows < effective_count.saturating_mul(threshold) {
RowSelectionStrategy::Mask
} else {
RowSelectionStrategy::Selectors
}
}
}
}
pub fn with_predicate(
mut self,
array_reader: Box<dyn ArrayReader>,
predicate: &mut dyn ArrowPredicate,
) -> Result<Self> {
let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
let mut filters = vec![];
for maybe_batch in reader {
let maybe_batch = maybe_batch?;
let input_rows = maybe_batch.num_rows();
let filter = predicate.evaluate(maybe_batch)?;
if filter.len() != input_rows {
return Err(arrow_err!(
"ArrowPredicate predicate returned {} rows, expected {input_rows}",
filter.len()
));
}
match filter.null_count() {
0 => filters.push(filter),
_ => filters.push(prep_null_mask_filter(&filter)),
};
}
let all_selected = filters.iter().all(|f| f.true_count() == f.len());
if all_selected && self.selection.is_none() {
return Ok(self);
}
let raw = RowSelection::from_filters(&filters);
self.selection = match self.selection.take() {
Some(selection) => Some(selection.and_then(&raw)),
None => Some(raw),
};
Ok(self)
}
pub fn build(mut self) -> ReadPlan {
if !self.selects_any() {
self.selection = Some(RowSelection::from(vec![]));
}
let selection_strategy = self.resolve_selection_strategy();
let Self {
batch_size,
selection,
row_selection_policy: _,
} = self;
let selection = selection.map(|s| s.trim());
let row_selection_cursor = selection
.map(|s| {
let trimmed = s.trim();
let selectors: Vec<RowSelector> = trimmed.into();
match selection_strategy {
RowSelectionStrategy::Mask => {
RowSelectionCursor::new_mask_from_selectors(selectors)
}
RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
}
})
.unwrap_or(RowSelectionCursor::new_all());
ReadPlan {
batch_size,
row_selection_cursor,
}
}
}
pub(crate) struct LimitedReadPlanBuilder {
inner: ReadPlanBuilder,
row_count: usize,
offset: Option<usize>,
limit: Option<usize>,
}
impl LimitedReadPlanBuilder {
fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
Self {
inner,
row_count,
offset: None,
limit: None,
}
}
pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
self.offset = offset;
self
}
pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}
pub(crate) fn build_limited(self) -> ReadPlanBuilder {
let Self {
mut inner,
row_count,
offset,
limit,
} = self;
if !inner.selects_any() {
inner.selection = Some(RowSelection::from(vec![]));
}
if let Some(offset) = offset {
inner.selection = Some(match row_count.checked_sub(offset) {
None => RowSelection::from(vec![]),
Some(remaining) => inner
.selection
.map(|selection| selection.offset(offset))
.unwrap_or_else(|| {
RowSelection::from(vec![
RowSelector::skip(offset),
RowSelector::select(remaining),
])
}),
});
}
if let Some(limit) = limit {
inner.selection = Some(
inner
.selection
.map(|selection| selection.limit(limit))
.unwrap_or_else(|| {
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
}),
);
}
inner
}
}
#[derive(Debug)]
pub struct ReadPlan {
batch_size: usize,
row_selection_cursor: RowSelectionCursor,
}
impl ReadPlan {
#[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
Some(selectors_cursor.selectors_mut())
} else {
None
}
}
pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
&mut self.row_selection_cursor
}
#[inline(always)]
pub fn batch_size(&self) -> usize {
self.batch_size
}
}
#[cfg(test)]
mod tests {
use super::*;
fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder {
ReadPlanBuilder::new(1024).with_selection(Some(selection))
}
#[test]
fn preferred_selection_strategy_prefers_mask_by_default() {
let selection = RowSelection::from(vec![RowSelector::select(8)]);
let builder = builder_with_selection(selection);
assert_eq!(
builder.resolve_selection_strategy(),
RowSelectionStrategy::Mask
);
}
#[test]
fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
let selection = RowSelection::from(vec![RowSelector::select(8)]);
let builder = builder_with_selection(selection)
.with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
assert_eq!(
builder.resolve_selection_strategy(),
RowSelectionStrategy::Selectors
);
}
}