mod data;
mod filter;
use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions, RowGroupCache};
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
use crate::arrow::arrow_reader::{
ParquetRecordBatchReader, PredicateOptions, ReadPlanBuilder, RowFilter, RowSelection,
RowSelectionPolicy,
};
use crate::arrow::in_memory_row_group::ColumnChunkData;
use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
use crate::arrow::schema::ParquetField;
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::util::push_buffers::PushBuffers;
use bytes::Bytes;
use data::DataRequest;
use filter::AdvanceResult;
use filter::FilterInfo;
use std::ops::Range;
use std::sync::{Arc, RwLock};
#[derive(Debug)]
struct RowGroupInfo {
row_group_idx: usize,
row_count: usize,
plan_builder: ReadPlanBuilder,
budget: RowBudget,
}
#[derive(Debug)]
enum RowGroupDecoderState {
Start {
row_group_info: RowGroupInfo,
},
Filters {
row_group_info: RowGroupInfo,
column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
filter_info: FilterInfo,
},
WaitingOnFilterData {
row_group_info: RowGroupInfo,
filter_info: FilterInfo,
data_request: DataRequest,
},
StartData {
row_group_info: RowGroupInfo,
column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
cache_info: Option<CacheInfo>,
},
WaitingOnData {
row_group_info: RowGroupInfo,
data_request: DataRequest,
cache_info: Option<CacheInfo>,
},
Finished,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) struct RowBudget {
offset: Option<usize>,
limit: Option<usize>,
}
impl RowBudget {
pub(crate) fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
Self { offset, limit }
}
pub(crate) fn is_exhausted(self) -> bool {
matches!(self.limit, Some(0))
}
pub(crate) fn offset(self) -> Option<usize> {
self.offset
}
pub(crate) fn limit(self) -> Option<usize> {
self.limit
}
pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize {
let rows_after_offset = rows_before_budget.saturating_sub(self.offset.unwrap_or(0));
match self.limit {
Some(limit) => rows_after_offset.min(limit),
None => rows_after_offset,
}
}
fn selected_row_limit(self) -> Option<usize> {
self.limit
.map(|limit| limit.saturating_add(self.offset.unwrap_or(0)))
}
fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) -> BudgetedReadPlan {
let rows_before_budget = plan_builder.num_rows_selected().unwrap_or(row_count);
let plan_builder = plan_builder
.limited(row_count)
.with_offset(self.offset)
.with_limit(self.limit)
.build_limited();
let rows_after_budget = self.rows_after(rows_before_budget);
BudgetedReadPlan {
plan_builder,
rows_before_budget,
rows_after_budget,
remaining_budget: self.advance(rows_before_budget, rows_after_budget),
}
}
pub(crate) fn advance(mut self, rows_before_budget: usize, rows_after_budget: usize) -> Self {
if let Some(offset) = &mut self.offset {
*offset = offset.saturating_sub(rows_before_budget - rows_after_budget);
}
if rows_after_budget != 0 {
if let Some(limit) = &mut self.limit {
*limit -= rows_after_budget;
}
}
self
}
}
#[derive(Debug)]
struct BudgetedReadPlan {
plan_builder: ReadPlanBuilder,
rows_before_budget: usize,
rows_after_budget: usize,
remaining_budget: RowBudget,
}
#[derive(Debug)]
pub(crate) enum RowGroupBuildResult {
Finished {
remaining_budget: RowBudget,
},
NeedsData(Vec<Range<u64>>),
Data {
batch_reader: ParquetRecordBatchReader,
remaining_budget: RowBudget,
},
}
#[derive(Debug)]
struct NextState {
next_state: RowGroupDecoderState,
result: Option<RowGroupBuildResult>,
}
impl NextState {
fn again(next_state: RowGroupDecoderState) -> Self {
Self {
next_state,
result: None,
}
}
fn result(next_state: RowGroupDecoderState, result: RowGroupBuildResult) -> Self {
Self {
next_state,
result: Some(result),
}
}
}
#[derive(Debug)]
pub(crate) struct RowGroupReaderBuilder {
batch_size: usize,
projection: ProjectionMask,
metadata: Arc<ParquetMetaData>,
fields: Option<Arc<ParquetField>>,
filter: Option<RowFilter>,
max_predicate_cache_size: usize,
metrics: ArrowReaderMetrics,
row_selection_policy: RowSelectionPolicy,
state: Option<RowGroupDecoderState>,
buffers: PushBuffers,
}
#[derive(Debug)]
pub(crate) struct RowGroupReaderBuilderParts {
pub batch_size: usize,
pub projection: ProjectionMask,
pub fields: Option<Arc<ParquetField>>,
pub filter: Option<RowFilter>,
pub max_predicate_cache_size: usize,
pub metrics: ArrowReaderMetrics,
pub row_selection_policy: RowSelectionPolicy,
pub buffers: PushBuffers,
}
impl RowGroupReaderBuilder {
#[expect(clippy::too_many_arguments)]
pub(crate) fn new(
batch_size: usize,
projection: ProjectionMask,
metadata: Arc<ParquetMetaData>,
fields: Option<Arc<ParquetField>>,
filter: Option<RowFilter>,
metrics: ArrowReaderMetrics,
max_predicate_cache_size: usize,
buffers: PushBuffers,
row_selection_policy: RowSelectionPolicy,
) -> Self {
Self {
batch_size,
projection,
metadata,
fields,
filter,
metrics,
max_predicate_cache_size,
row_selection_policy,
state: Some(RowGroupDecoderState::Finished),
buffers,
}
}
pub(crate) fn into_parts(self) -> RowGroupReaderBuilderParts {
let Self {
batch_size,
projection,
metadata: _,
fields,
filter,
max_predicate_cache_size,
metrics,
row_selection_policy,
state: _,
buffers,
} = self;
RowGroupReaderBuilderParts {
batch_size,
projection,
fields,
filter,
max_predicate_cache_size,
metrics,
row_selection_policy,
buffers,
}
}
pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
self.buffers.push_ranges(ranges, buffers);
}
pub(crate) fn is_finished(&self) -> bool {
matches!(self.state, Some(RowGroupDecoderState::Finished))
}
pub fn buffered_bytes(&self) -> u64 {
self.buffers.buffered_bytes()
}
pub fn clear_all_ranges(&mut self) {
self.buffers.clear_all_ranges();
}
fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
self.state.take().ok_or_else(|| {
ParquetError::General(String::from(
"Internal Error: RowGroupReader in invalid state",
))
})
}
pub(crate) fn has_active_row_group(&self) -> bool {
!matches!(self.state, Some(RowGroupDecoderState::Finished))
}
pub(crate) fn next_row_group(
&mut self,
row_group_idx: usize,
row_count: usize,
selection: Option<RowSelection>,
budget: RowBudget,
) -> Result<(), ParquetError> {
let state = self.take_state()?;
if !matches!(state, RowGroupDecoderState::Finished) {
return Err(ParquetError::General(format!(
"Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
)));
}
let plan_builder = ReadPlanBuilder::new(self.batch_size)
.with_selection(selection)
.with_row_selection_policy(self.row_selection_policy);
let row_group_info = RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
budget,
};
self.state = Some(RowGroupDecoderState::Start { row_group_info });
Ok(())
}
pub(crate) fn try_build(&mut self) -> Result<RowGroupBuildResult, ParquetError> {
loop {
let current_state = self.take_state()?;
match self.try_transition(current_state)? {
NextState {
next_state,
result: Some(result),
} => {
self.state = Some(next_state);
return Ok(result);
}
NextState {
next_state,
result: None,
} => {
self.state = Some(next_state);
}
}
}
}
fn try_transition(
&mut self,
current_state: RowGroupDecoderState,
) -> Result<NextState, ParquetError> {
let result = match current_state {
RowGroupDecoderState::Start { row_group_info } => {
debug_assert!(
!row_group_info.budget.is_exhausted(),
"RowGroupFrontier should not hand off row groups after the output limit is exhausted"
);
let column_chunks = None;
let Some(filter) = self.filter.take() else {
return Ok(NextState::again(RowGroupDecoderState::StartData {
row_group_info,
column_chunks,
cache_info: None,
}));
};
if filter.predicates.is_empty() {
return Ok(NextState::again(RowGroupDecoderState::StartData {
row_group_info,
column_chunks,
cache_info: None,
}));
};
let cache_projection =
self.compute_cache_projection(row_group_info.row_group_idx, &filter);
let cache_info = CacheInfo::new(
cache_projection,
Arc::new(RwLock::new(RowGroupCache::new(
self.batch_size,
self.max_predicate_cache_size,
))),
);
let filter_info = FilterInfo::new(filter, cache_info);
NextState::again(RowGroupDecoderState::Filters {
row_group_info,
filter_info,
column_chunks,
})
}
RowGroupDecoderState::Filters {
row_group_info,
column_chunks,
filter_info,
} => {
let RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
budget,
} = row_group_info;
if !plan_builder.selects_any() {
self.filter = Some(filter_info.into_filter());
return Ok(NextState::result(
RowGroupDecoderState::Finished,
RowGroupBuildResult::Finished {
remaining_budget: budget,
},
));
}
let predicate = filter_info.current();
let data_request = DataRequestBuilder::new(
row_group_idx,
row_count,
self.batch_size,
&self.metadata,
predicate.projection(), )
.with_selection(plan_builder.selection())
.with_cache_projection(Some(filter_info.cache_projection()))
.with_column_chunks(column_chunks)
.build();
let row_group_info = RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
budget,
};
NextState::again(RowGroupDecoderState::WaitingOnFilterData {
row_group_info,
filter_info,
data_request,
})
}
RowGroupDecoderState::WaitingOnFilterData {
row_group_info,
data_request,
mut filter_info,
} => {
let needed_ranges = data_request.needed_ranges(&self.buffers);
if !needed_ranges.is_empty() {
return Ok(NextState::result(
RowGroupDecoderState::WaitingOnFilterData {
row_group_info,
filter_info,
data_request,
},
RowGroupBuildResult::NeedsData(needed_ranges),
));
}
let RowGroupInfo {
row_group_idx,
row_count,
mut plan_builder,
budget,
} = row_group_info;
let predicate = filter_info.current();
let row_group = data_request.try_into_in_memory_row_group(
row_group_idx,
row_count,
&self.metadata,
predicate.projection(),
&mut self.buffers,
)?;
let cache_options = filter_info.cache_builder().producer();
let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
.with_batch_size(self.batch_size)
.with_cache_options(Some(&cache_options))
.with_parquet_metadata(&self.metadata)
.build_array_reader(self.fields.as_deref(), predicate.projection())?;
plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
plan_builder = override_selector_strategy_if_needed(
plan_builder,
predicate.projection(),
self.row_group_offset_index(row_group_idx),
);
let predicate_limit = filter_info
.is_last()
.then(|| budget.selected_row_limit())
.flatten();
let mut predicate_options =
PredicateOptions::new(array_reader, filter_info.current_mut());
if let Some(limit) = predicate_limit {
predicate_options = predicate_options.with_limit(limit, row_count);
}
plan_builder = plan_builder.with_predicate_options(predicate_options)?;
let row_group_info = RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
budget,
};
let column_chunks = Some(row_group.column_chunks);
match filter_info.advance() {
AdvanceResult::Continue(filter_info) => {
NextState::again(RowGroupDecoderState::Filters {
row_group_info,
column_chunks,
filter_info,
})
}
AdvanceResult::Done(filter, cache_info) => {
assert!(self.filter.is_none());
self.filter = Some(filter);
NextState::again(RowGroupDecoderState::StartData {
row_group_info,
column_chunks,
cache_info: Some(cache_info),
})
}
}
}
RowGroupDecoderState::StartData {
row_group_info,
column_chunks,
cache_info,
} => {
let RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
budget,
} = row_group_info;
let BudgetedReadPlan {
mut plan_builder,
rows_before_budget,
rows_after_budget,
remaining_budget,
} = budget.apply_to_plan(plan_builder, row_count);
if rows_before_budget == 0 {
return Ok(NextState::result(
RowGroupDecoderState::Finished,
RowGroupBuildResult::Finished { remaining_budget },
));
}
if rows_after_budget == 0 {
return Ok(NextState::result(
RowGroupDecoderState::Finished,
RowGroupBuildResult::Finished { remaining_budget },
));
}
let data_request = DataRequestBuilder::new(
row_group_idx,
row_count,
self.batch_size,
&self.metadata,
&self.projection,
)
.with_selection(plan_builder.selection())
.with_column_chunks(column_chunks)
.build();
plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
plan_builder = override_selector_strategy_if_needed(
plan_builder,
&self.projection,
self.row_group_offset_index(row_group_idx),
);
let row_group_info = RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
budget: remaining_budget,
};
NextState::again(RowGroupDecoderState::WaitingOnData {
row_group_info,
data_request,
cache_info,
})
}
RowGroupDecoderState::WaitingOnData {
row_group_info,
data_request,
cache_info,
} => {
let needed_ranges = data_request.needed_ranges(&self.buffers);
if !needed_ranges.is_empty() {
return Ok(NextState::result(
RowGroupDecoderState::WaitingOnData {
row_group_info,
data_request,
cache_info,
},
RowGroupBuildResult::NeedsData(needed_ranges),
));
}
let RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
budget,
} = row_group_info;
let row_group = data_request.try_into_in_memory_row_group(
row_group_idx,
row_count,
&self.metadata,
&self.projection,
&mut self.buffers,
)?;
let plan = plan_builder.build();
let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
.with_batch_size(self.batch_size)
.with_parquet_metadata(&self.metadata);
let array_reader = if let Some(cache_info) = cache_info.as_ref() {
let cache_options: CacheOptions = cache_info.builder().consumer();
array_reader_builder
.with_cache_options(Some(&cache_options))
.build_array_reader(self.fields.as_deref(), &self.projection)
} else {
array_reader_builder
.build_array_reader(self.fields.as_deref(), &self.projection)
}?;
let reader = ParquetRecordBatchReader::new(array_reader, plan);
NextState::result(
RowGroupDecoderState::Finished,
RowGroupBuildResult::Data {
batch_reader: reader,
remaining_budget: budget,
},
)
}
RowGroupDecoderState::Finished => {
return Err(ParquetError::General(String::from(
"Internal Error: try_build called without an active row group",
)));
}
};
Ok(result)
}
fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
let meta = self.metadata.row_group(row_group_idx);
match self.compute_cache_projection_inner(filter) {
Some(projection) => projection,
None => ProjectionMask::none(meta.columns().len()),
}
}
fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
if self.max_predicate_cache_size == 0 {
return None;
}
let mut cache_projection = filter.predicates.first()?.projection().clone();
for predicate in filter.predicates.iter() {
cache_projection.union(predicate.projection());
}
cache_projection.intersect(&self.projection);
self.exclude_nested_columns_from_cache(&cache_projection)
}
fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
mask.without_nested_types(self.metadata.file_metadata().schema_descr())
}
fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
self.metadata
.offset_index()
.filter(|index| !index.is_empty())
.and_then(|index| index.get(row_group_idx))
.map(|columns| columns.as_slice())
}
}
fn override_selector_strategy_if_needed(
plan_builder: ReadPlanBuilder,
projection_mask: &ProjectionMask,
offset_index: Option<&[OffsetIndexMetaData]>,
) -> ReadPlanBuilder {
let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
return plan_builder;
};
let preferred_strategy = plan_builder.resolve_selection_strategy();
let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
&& plan_builder.selection().is_some_and(|selection| {
selection.should_force_selectors(projection_mask, offset_index)
});
let resolved_strategy = if force_selectors {
RowSelectionStrategy::Selectors
} else {
preferred_strategy
};
let new_policy = match resolved_strategy {
RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
};
plan_builder.with_row_selection_policy(new_policy)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::arrow_reader::{RowSelection, RowSelector};
#[test]
fn test_structure_size() {
assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 232);
}
#[test]
fn test_row_budget_offset_limit_across_row_groups() {
let first =
RowBudget::new(Some(225), Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
assert_eq!(first.rows_before_budget, 200);
assert_eq!(first.rows_after_budget, 0);
assert_eq!(first.remaining_budget, RowBudget::new(Some(25), Some(20)));
assert_eq!(first.plan_builder.num_rows_selected(), Some(0));
let second = first
.remaining_budget
.apply_to_plan(ReadPlanBuilder::new(1024), 200);
assert_eq!(second.rows_before_budget, 200);
assert_eq!(second.rows_after_budget, 20);
assert_eq!(second.remaining_budget, RowBudget::new(Some(0), Some(0)));
assert_eq!(second.plan_builder.num_rows_selected(), Some(20));
}
#[test]
fn test_row_budget_limit_only() {
let budgeted =
RowBudget::new(None, Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
assert_eq!(budgeted.rows_before_budget, 200);
assert_eq!(budgeted.rows_after_budget, 20);
assert_eq!(budgeted.remaining_budget, RowBudget::new(None, Some(0)));
assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(20));
}
#[test]
fn test_row_budget_empty_selection() {
let empty_selection = RowSelection::from(vec![RowSelector::skip(200)]);
let budgeted = RowBudget::new(Some(10), Some(20)).apply_to_plan(
ReadPlanBuilder::new(1024).with_selection(Some(empty_selection)),
200,
);
assert_eq!(budgeted.rows_before_budget, 0);
assert_eq!(budgeted.rows_after_budget, 0);
assert_eq!(
budgeted.remaining_budget,
RowBudget::new(Some(10), Some(20))
);
assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(0));
}
}