mod data;
mod filter;
use crate::DecodeResult;
use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions, RowGroupCache};
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
use crate::arrow::arrow_reader::{
ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy,
};
use crate::arrow::in_memory_row_group::ColumnChunkData;
use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
use crate::arrow::schema::ParquetField;
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::util::push_buffers::PushBuffers;
use bytes::Bytes;
use data::DataRequest;
use filter::AdvanceResult;
use filter::FilterInfo;
use std::ops::Range;
use std::sync::{Arc, RwLock};
#[derive(Debug)]
struct RowGroupInfo {
row_group_idx: usize,
row_count: usize,
plan_builder: ReadPlanBuilder,
}
#[derive(Debug)]
enum RowGroupDecoderState {
Start {
row_group_info: RowGroupInfo,
},
Filters {
row_group_info: RowGroupInfo,
column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
filter_info: FilterInfo,
},
WaitingOnFilterData {
row_group_info: RowGroupInfo,
filter_info: FilterInfo,
data_request: DataRequest,
},
StartData {
row_group_info: RowGroupInfo,
column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
cache_info: Option<CacheInfo>,
},
WaitingOnData {
row_group_info: RowGroupInfo,
data_request: DataRequest,
cache_info: Option<CacheInfo>,
},
Finished,
}
#[derive(Debug)]
struct NextState {
next_state: RowGroupDecoderState,
result: Option<DecodeResult<ParquetRecordBatchReader>>,
}
impl NextState {
fn again(next_state: RowGroupDecoderState) -> Self {
Self {
next_state,
result: None,
}
}
fn result(
next_state: RowGroupDecoderState,
result: DecodeResult<ParquetRecordBatchReader>,
) -> Self {
Self {
next_state,
result: Some(result),
}
}
}
#[derive(Debug)]
pub(crate) struct RowGroupReaderBuilder {
batch_size: usize,
projection: ProjectionMask,
metadata: Arc<ParquetMetaData>,
fields: Option<Arc<ParquetField>>,
filter: Option<RowFilter>,
limit: Option<usize>,
offset: Option<usize>,
max_predicate_cache_size: usize,
metrics: ArrowReaderMetrics,
row_selection_policy: RowSelectionPolicy,
state: Option<RowGroupDecoderState>,
buffers: PushBuffers,
}
impl RowGroupReaderBuilder {
#[expect(clippy::too_many_arguments)]
pub(crate) fn new(
batch_size: usize,
projection: ProjectionMask,
metadata: Arc<ParquetMetaData>,
fields: Option<Arc<ParquetField>>,
filter: Option<RowFilter>,
limit: Option<usize>,
offset: Option<usize>,
metrics: ArrowReaderMetrics,
max_predicate_cache_size: usize,
buffers: PushBuffers,
row_selection_policy: RowSelectionPolicy,
) -> Self {
Self {
batch_size,
projection,
metadata,
fields,
filter,
limit,
offset,
metrics,
max_predicate_cache_size,
row_selection_policy,
state: Some(RowGroupDecoderState::Finished),
buffers,
}
}
pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
self.buffers.push_ranges(ranges, buffers);
}
pub fn buffered_bytes(&self) -> u64 {
self.buffers.buffered_bytes()
}
fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
self.state.take().ok_or_else(|| {
ParquetError::General(String::from(
"Internal Error: RowGroupReader in invalid state",
))
})
}
pub(crate) fn next_row_group(
&mut self,
row_group_idx: usize,
row_count: usize,
selection: Option<RowSelection>,
) -> Result<(), ParquetError> {
let state = self.take_state()?;
if !matches!(state, RowGroupDecoderState::Finished) {
return Err(ParquetError::General(format!(
"Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
)));
}
let plan_builder = ReadPlanBuilder::new(self.batch_size)
.with_selection(selection)
.with_row_selection_policy(self.row_selection_policy);
let row_group_info = RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
};
self.state = Some(RowGroupDecoderState::Start { row_group_info });
Ok(())
}
pub(crate) fn try_build(
&mut self,
) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
loop {
let current_state = self.take_state()?;
match self.try_transition(current_state)? {
NextState {
next_state,
result: Some(result),
} => {
self.state = Some(next_state);
return Ok(result);
}
NextState {
next_state,
result: None,
} => {
self.state = Some(next_state);
}
}
}
}
fn try_transition(
&mut self,
current_state: RowGroupDecoderState,
) -> Result<NextState, ParquetError> {
let result = match current_state {
RowGroupDecoderState::Start { row_group_info } => {
let column_chunks = None;
let Some(filter) = self.filter.take() else {
return Ok(NextState::again(RowGroupDecoderState::StartData {
row_group_info,
column_chunks,
cache_info: None,
}));
};
if filter.predicates.is_empty() {
return Ok(NextState::again(RowGroupDecoderState::StartData {
row_group_info,
column_chunks,
cache_info: None,
}));
};
let cache_projection =
self.compute_cache_projection(row_group_info.row_group_idx, &filter);
let cache_info = CacheInfo::new(
cache_projection,
Arc::new(RwLock::new(RowGroupCache::new(
self.batch_size,
self.max_predicate_cache_size,
))),
);
let filter_info = FilterInfo::new(filter, cache_info);
NextState::again(RowGroupDecoderState::Filters {
row_group_info,
filter_info,
column_chunks,
})
}
RowGroupDecoderState::Filters {
row_group_info,
column_chunks,
filter_info,
} => {
let RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
} = row_group_info;
if !plan_builder.selects_any() {
self.filter = Some(filter_info.into_filter());
return Ok(NextState::result(
RowGroupDecoderState::Finished,
DecodeResult::Finished,
));
}
let predicate = filter_info.current();
let data_request = DataRequestBuilder::new(
row_group_idx,
row_count,
self.batch_size,
&self.metadata,
predicate.projection(), )
.with_selection(plan_builder.selection())
.with_cache_projection(Some(filter_info.cache_projection()))
.with_column_chunks(column_chunks)
.build();
let row_group_info = RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
};
NextState::again(RowGroupDecoderState::WaitingOnFilterData {
row_group_info,
filter_info,
data_request,
})
}
RowGroupDecoderState::WaitingOnFilterData {
row_group_info,
data_request,
mut filter_info,
} => {
let needed_ranges = data_request.needed_ranges(&self.buffers);
if !needed_ranges.is_empty() {
return Ok(NextState::result(
RowGroupDecoderState::WaitingOnFilterData {
row_group_info,
filter_info,
data_request,
},
DecodeResult::NeedsData(needed_ranges),
));
}
let RowGroupInfo {
row_group_idx,
row_count,
mut plan_builder,
} = row_group_info;
let predicate = filter_info.current();
let row_group = data_request.try_into_in_memory_row_group(
row_group_idx,
row_count,
&self.metadata,
predicate.projection(),
&mut self.buffers,
)?;
let cache_options = filter_info.cache_builder().producer();
let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
.with_cache_options(Some(&cache_options))
.with_parquet_metadata(&self.metadata)
.build_array_reader(self.fields.as_deref(), predicate.projection())?;
plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
plan_builder = override_selector_strategy_if_needed(
plan_builder,
predicate.projection(),
self.row_group_offset_index(row_group_idx),
);
plan_builder =
plan_builder.with_predicate(array_reader, filter_info.current_mut())?;
let row_group_info = RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
};
let column_chunks = Some(row_group.column_chunks);
match filter_info.advance() {
AdvanceResult::Continue(filter_info) => {
NextState::again(RowGroupDecoderState::Filters {
row_group_info,
column_chunks,
filter_info,
})
}
AdvanceResult::Done(filter, cache_info) => {
assert!(self.filter.is_none());
self.filter = Some(filter);
NextState::again(RowGroupDecoderState::StartData {
row_group_info,
column_chunks,
cache_info: Some(cache_info),
})
}
}
}
RowGroupDecoderState::StartData {
row_group_info,
column_chunks,
cache_info,
} => {
let RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
} = row_group_info;
let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count);
if rows_before == 0 {
return Ok(NextState::result(
RowGroupDecoderState::Finished,
DecodeResult::Finished,
));
}
let mut plan_builder = plan_builder
.limited(row_count)
.with_offset(self.offset)
.with_limit(self.limit)
.build_limited();
let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count);
if let Some(offset) = &mut self.offset {
*offset = offset.saturating_sub(rows_before - rows_after)
}
if rows_after == 0 {
return Ok(NextState::result(
RowGroupDecoderState::Finished,
DecodeResult::Finished,
));
}
if let Some(limit) = &mut self.limit {
*limit -= rows_after;
}
let data_request = DataRequestBuilder::new(
row_group_idx,
row_count,
self.batch_size,
&self.metadata,
&self.projection,
)
.with_selection(plan_builder.selection())
.with_column_chunks(column_chunks)
.build();
plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
plan_builder = override_selector_strategy_if_needed(
plan_builder,
&self.projection,
self.row_group_offset_index(row_group_idx),
);
let row_group_info = RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
};
NextState::again(RowGroupDecoderState::WaitingOnData {
row_group_info,
data_request,
cache_info,
})
}
RowGroupDecoderState::WaitingOnData {
row_group_info,
data_request,
cache_info,
} => {
let needed_ranges = data_request.needed_ranges(&self.buffers);
if !needed_ranges.is_empty() {
return Ok(NextState::result(
RowGroupDecoderState::WaitingOnData {
row_group_info,
data_request,
cache_info,
},
DecodeResult::NeedsData(needed_ranges),
));
}
let RowGroupInfo {
row_group_idx,
row_count,
plan_builder,
} = row_group_info;
let row_group = data_request.try_into_in_memory_row_group(
row_group_idx,
row_count,
&self.metadata,
&self.projection,
&mut self.buffers,
)?;
let plan = plan_builder.build();
let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
.with_parquet_metadata(&self.metadata);
let array_reader = if let Some(cache_info) = cache_info.as_ref() {
let cache_options: CacheOptions = cache_info.builder().consumer();
array_reader_builder
.with_cache_options(Some(&cache_options))
.build_array_reader(self.fields.as_deref(), &self.projection)
} else {
array_reader_builder
.build_array_reader(self.fields.as_deref(), &self.projection)
}?;
let reader = ParquetRecordBatchReader::new(array_reader, plan);
NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader))
}
RowGroupDecoderState::Finished => {
NextState::result(RowGroupDecoderState::Finished, DecodeResult::Finished)
}
};
Ok(result)
}
fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
let meta = self.metadata.row_group(row_group_idx);
match self.compute_cache_projection_inner(filter) {
Some(projection) => projection,
None => ProjectionMask::none(meta.columns().len()),
}
}
fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
if self.max_predicate_cache_size == 0 {
return None;
}
let mut cache_projection = filter.predicates.first()?.projection().clone();
for predicate in filter.predicates.iter() {
cache_projection.union(predicate.projection());
}
cache_projection.intersect(&self.projection);
self.exclude_nested_columns_from_cache(&cache_projection)
}
fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
mask.without_nested_types(self.metadata.file_metadata().schema_descr())
}
fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
self.metadata
.offset_index()
.filter(|index| !index.is_empty())
.and_then(|index| index.get(row_group_idx))
.map(|columns| columns.as_slice())
}
}
fn override_selector_strategy_if_needed(
plan_builder: ReadPlanBuilder,
projection_mask: &ProjectionMask,
offset_index: Option<&[OffsetIndexMetaData]>,
) -> ReadPlanBuilder {
let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
return plan_builder;
};
let preferred_strategy = plan_builder.resolve_selection_strategy();
let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
&& plan_builder.selection().is_some_and(|selection| {
selection.should_force_selectors(projection_mask, offset_index)
});
let resolved_strategy = if force_selectors {
RowSelectionStrategy::Selectors
} else {
preferred_strategy
};
let new_policy = match resolved_strategy {
RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
};
plan_builder.with_row_selection_policy(new_policy)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_structure_size() {
assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
}
}