use crate::arrow::array_reader::ArrayReader;
use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
use crate::arrow::arrow_reader::{
ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
};
use crate::errors::{ParquetError, Result};
use arrow_array::{Array, BooleanArray};
use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
use arrow_select::filter::prep_null_mask_filter;
use std::collections::VecDeque;
pub struct PredicateOptions<'a> {
array_reader: Box<dyn ArrayReader>,
predicate: &'a mut dyn ArrowPredicate,
limit: Option<usize>,
total_rows: usize,
}
impl<'a> PredicateOptions<'a> {
pub fn new(array_reader: Box<dyn ArrayReader>, predicate: &'a mut dyn ArrowPredicate) -> Self {
Self {
array_reader,
predicate,
limit: None,
total_rows: 0,
}
}
pub fn with_limit(mut self, limit: usize, total_rows: usize) -> Self {
self.limit = Some(limit);
self.total_rows = total_rows;
self
}
}
#[derive(Clone, Debug)]
pub struct ReadPlanBuilder {
batch_size: usize,
selection: Option<RowSelection>,
row_selection_policy: RowSelectionPolicy,
}
impl ReadPlanBuilder {
pub fn new(batch_size: usize) -> Self {
Self {
batch_size,
selection: None,
row_selection_policy: RowSelectionPolicy::default(),
}
}
pub fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
self.selection = selection;
self
}
pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
self.row_selection_policy = policy;
self
}
pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
&self.row_selection_policy
}
pub fn selection(&self) -> Option<&RowSelection> {
self.selection.as_ref()
}
pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
LimitedReadPlanBuilder::new(self, row_count)
}
pub fn selects_any(&self) -> bool {
self.selection
.as_ref()
.map(|s| s.selects_any())
.unwrap_or(true)
}
pub fn num_rows_selected(&self) -> Option<usize> {
self.selection.as_ref().map(|s| s.row_count())
}
pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
match self.row_selection_policy {
RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
RowSelectionPolicy::Auto { threshold, .. } => {
let selection = match self.selection.as_ref() {
Some(selection) => selection,
None => return RowSelectionStrategy::Selectors,
};
let (total_rows, effective_count) =
selection.iter().fold((0usize, 0usize), |(rows, count), s| {
if s.row_count > 0 {
(rows + s.row_count, count + 1)
} else {
(rows, count)
}
});
if effective_count == 0 {
return RowSelectionStrategy::Mask;
}
if total_rows < effective_count.saturating_mul(threshold) {
RowSelectionStrategy::Mask
} else {
RowSelectionStrategy::Selectors
}
}
}
}
pub fn with_predicate(
self,
array_reader: Box<dyn ArrayReader>,
predicate: &mut dyn ArrowPredicate,
) -> Result<Self> {
self.with_predicate_options(PredicateOptions::new(array_reader, predicate))
}
pub fn with_predicate_options(mut self, options: PredicateOptions<'_>) -> Result<Self> {
let PredicateOptions {
array_reader,
predicate,
limit,
total_rows,
} = options;
let expected_rows = match self.selection.as_ref() {
Some(s) => Some(s.row_count()),
None => limit.map(|_| total_rows),
};
let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
let mut filters = vec![];
let mut processed_rows: usize = 0;
let mut matched_rows: usize = 0;
for maybe_batch in reader {
let maybe_batch = maybe_batch?;
let input_rows = maybe_batch.num_rows();
let filter = predicate.evaluate(maybe_batch)?;
if filter.len() != input_rows {
return Err(arrow_err!(
"ArrowPredicate predicate returned {} rows, expected {input_rows}",
filter.len()
));
}
let filter = match filter.null_count() {
0 => filter,
_ => prep_null_mask_filter(&filter),
};
processed_rows += input_rows;
match limit {
Some(limit) if matched_rows + filter.true_count() >= limit => {
let needed = limit - matched_rows;
let truncated = truncate_filter_after_n_trues(filter, needed);
filters.push(truncated);
break;
}
_ => {
matched_rows += filter.true_count();
filters.push(filter);
}
}
}
if let Some(expected) = expected_rows {
if processed_rows < expected {
let pad_len = expected - processed_rows;
filters.push(BooleanArray::new(BooleanBuffer::new_unset(pad_len), None));
}
}
let all_selected = filters.iter().all(|f| f.true_count() == f.len());
if all_selected && self.selection.is_none() {
return Ok(self);
}
let raw = RowSelection::from_filters(&filters);
self.selection = match self.selection.take() {
Some(selection) => Some(selection.and_then(&raw)),
None => Some(raw),
};
Ok(self)
}
pub fn build(mut self) -> ReadPlan {
if !self.selects_any() {
self.selection = Some(RowSelection::from(vec![]));
}
let selection_strategy = self.resolve_selection_strategy();
let Self {
batch_size,
selection,
row_selection_policy: _,
} = self;
let selection = selection.map(|s| s.trim());
let row_selection_cursor = selection
.map(|s| {
let trimmed = s.trim();
let selectors: Vec<RowSelector> = trimmed.into();
match selection_strategy {
RowSelectionStrategy::Mask => {
RowSelectionCursor::new_mask_from_selectors(selectors)
}
RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
}
})
.unwrap_or(RowSelectionCursor::new_all());
ReadPlan {
batch_size,
row_selection_cursor,
}
}
}
pub(crate) struct LimitedReadPlanBuilder {
inner: ReadPlanBuilder,
row_count: usize,
offset: Option<usize>,
limit: Option<usize>,
}
impl LimitedReadPlanBuilder {
fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
Self {
inner,
row_count,
offset: None,
limit: None,
}
}
pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
self.offset = offset;
self
}
pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}
pub(crate) fn build_limited(self) -> ReadPlanBuilder {
let Self {
mut inner,
row_count,
offset,
limit,
} = self;
if !inner.selects_any() {
inner.selection = Some(RowSelection::from(vec![]));
}
if let Some(offset) = offset {
inner.selection = Some(match row_count.checked_sub(offset) {
None => RowSelection::from(vec![]),
Some(remaining) => inner
.selection
.map(|selection| selection.offset(offset))
.unwrap_or_else(|| {
RowSelection::from(vec![
RowSelector::skip(offset),
RowSelector::select(remaining),
])
}),
});
}
if let Some(limit) = limit {
inner.selection = Some(
inner
.selection
.map(|selection| selection.limit(limit))
.unwrap_or_else(|| {
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
}),
);
}
inner
}
}
fn truncate_filter_after_n_trues(filter: BooleanArray, n: usize) -> BooleanArray {
if filter.true_count() <= n {
return filter;
}
let len = filter.len();
if n == 0 {
return BooleanArray::new(BooleanBuffer::new_unset(len), None);
}
let values = filter.values();
let last_kept = values
.set_indices()
.nth(n - 1)
.expect("n - 1 < true_count, checked above");
let mut builder = BooleanBufferBuilder::new(len);
builder.append_buffer(&values.slice(0, last_kept + 1));
builder.append_n(len - last_kept - 1, false);
BooleanArray::new(builder.finish(), None)
}
#[derive(Debug)]
pub struct ReadPlan {
batch_size: usize,
row_selection_cursor: RowSelectionCursor,
}
impl ReadPlan {
#[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
Some(selectors_cursor.selectors_mut())
} else {
None
}
}
pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
&mut self.row_selection_cursor
}
#[inline(always)]
pub fn batch_size(&self) -> usize {
self.batch_size
}
}
#[cfg(test)]
mod tests {
use super::*;
fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder {
ReadPlanBuilder::new(1024).with_selection(Some(selection))
}
#[test]
fn preferred_selection_strategy_prefers_mask_by_default() {
let selection = RowSelection::from(vec![RowSelector::select(8)]);
let builder = builder_with_selection(selection);
assert_eq!(
builder.resolve_selection_strategy(),
RowSelectionStrategy::Mask
);
}
#[test]
fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
let selection = RowSelection::from(vec![RowSelector::select(8)]);
let builder = builder_with_selection(selection)
.with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
assert_eq!(
builder.resolve_selection_strategy(),
RowSelectionStrategy::Selectors
);
}
#[test]
fn truncate_filter_after_n_trues_keeps_first_n_matches() {
let f = BooleanArray::from(vec![true, false, true, true, false, true, true]);
let t = truncate_filter_after_n_trues(f.clone(), 3);
assert_eq!(t.len(), f.len());
assert_eq!(t.true_count(), 3);
let out: Vec<bool> = (0..t.len()).map(|i| t.value(i)).collect();
assert_eq!(
out,
vec![true, false, true, true, false, false, false],
"first three trues should survive, the rest become false"
);
}
#[test]
fn truncate_filter_after_n_trues_passes_through_when_already_small_enough() {
let f = BooleanArray::from(vec![true, false, true, false]);
let t = truncate_filter_after_n_trues(f.clone(), 5);
assert_eq!(t.len(), f.len());
assert_eq!(t.true_count(), 2);
}
#[test]
fn truncate_filter_after_n_trues_zero_returns_all_false() {
let f = BooleanArray::from(vec![true, true, true]);
let t = truncate_filter_after_n_trues(f, 0);
assert_eq!(t.len(), 3);
assert_eq!(t.true_count(), 0);
}
#[test]
fn with_predicate_options_limit_pads_tail_when_no_prior_selection() {
use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::StructArrayReader;
use crate::arrow::array_reader::test_util::make_int32_page_reader;
use crate::arrow::arrow_reader::ArrowPredicateFn;
use arrow_schema::{DataType as ArrowType, Field, Fields};
const TOTAL_ROWS: usize = 100;
const LIMIT: usize = 10;
let data: Vec<i32> = (0..TOTAL_ROWS as i32).collect();
let levels = vec![0; TOTAL_ROWS];
let leaf = make_int32_page_reader(&data, &levels, &levels, 0, 0);
let struct_type = ArrowType::Struct(Fields::from(vec![Field::new(
"c0",
ArrowType::Int32,
false,
)]));
let struct_reader = StructArrayReader::new(struct_type, vec![leaf], 0, 0, false);
let mut predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
Ok(BooleanArray::from(vec![true; batch.num_rows()]))
});
let builder = ReadPlanBuilder::new(16)
.with_predicate_options(
PredicateOptions::new(Box::new(struct_reader), &mut predicate)
.with_limit(LIMIT, TOTAL_ROWS),
)
.unwrap();
let selection = builder
.selection()
.expect("limit-driven early break must produce a selection");
assert_eq!(selection.row_count(), LIMIT);
let total: usize = selection.iter().map(|s| s.row_count).sum();
assert_eq!(
total, TOTAL_ROWS,
"selection must span the full row group, not only the prefix evaluated before the limit"
);
}
}