use crate::DecodeResult;
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use crate::arrow::push_decoder::reader_builder::{
RowBudget, RowGroupBuildResult, RowGroupReaderBuilder, RowGroupReaderBuilderParts,
};
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
use arrow_schema::SchemaRef;
use bytes::Bytes;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
#[derive(Debug)]
enum QueuedRowGroupDecision {
Read(NextRowGroup),
Skip { remaining_budget: RowBudget },
}
#[derive(Debug)]
struct NextRowGroup {
row_group_idx: usize,
row_count: usize,
selection: Option<RowSelection>,
budget: RowBudget,
}
#[derive(Debug)]
struct RowGroupFrontier {
parquet_metadata: Arc<ParquetMetaData>,
row_groups: VecDeque<usize>,
selection: Option<RowSelection>,
budget: RowBudget,
has_predicates: bool,
}
impl RowGroupFrontier {
fn new(
parquet_metadata: Arc<ParquetMetaData>,
row_groups: Vec<usize>,
selection: Option<RowSelection>,
budget: RowBudget,
has_predicates: bool,
) -> Self {
Self {
parquet_metadata,
row_groups: VecDeque::from(row_groups),
selection,
budget,
has_predicates,
}
}
fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, ParquetError> {
self.parquet_metadata
.row_group(row_group_idx)
.num_rows()
.try_into()
.map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))
}
fn update_budget_after_row_group(&mut self, budget: RowBudget) {
self.budget = budget;
}
fn clear_remaining(&mut self) {
self.selection = None;
self.row_groups.clear();
}
fn plan_selected_row_group(
&self,
next_row_group: NextRowGroup,
selected_rows: usize,
) -> QueuedRowGroupDecision {
if self.has_predicates {
return QueuedRowGroupDecision::Read(next_row_group);
}
let rows_after_budget = self.budget.rows_after(selected_rows);
if rows_after_budget != 0 {
return QueuedRowGroupDecision::Read(next_row_group);
}
QueuedRowGroupDecision::Skip {
remaining_budget: self.budget.advance(selected_rows, rows_after_budget),
}
}
fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>, ParquetError> {
loop {
let Some(&row_group_idx) = self.row_groups.front() else {
return Ok(None);
};
if self.budget.is_exhausted()
|| self
.selection
.as_ref()
.is_some_and(|selection| selection.row_count() == 0)
{
self.clear_remaining();
return Ok(None);
}
let row_count = self.row_group_num_rows(row_group_idx)?;
let (selection, selected_rows) = match self.selection.as_mut() {
Some(selection) => {
let selection = selection.split_off(row_count);
let selected_rows = selection.row_count();
if selected_rows == 0 {
self.row_groups.pop_front();
continue;
}
let selection = if selected_rows == row_count {
None
} else {
Some(selection)
};
(selection, selected_rows)
}
None => (None, row_count),
};
let next_row_group = NextRowGroup {
row_group_idx,
row_count,
selection,
budget: self.budget,
};
match self.plan_selected_row_group(next_row_group, selected_rows) {
QueuedRowGroupDecision::Read(next_row_group) => {
self.row_groups.pop_front();
return Ok(Some(next_row_group));
}
QueuedRowGroupDecision::Skip { remaining_budget } => {
self.row_groups.pop_front();
self.budget = remaining_budget;
}
}
}
}
}
#[derive(Debug)]
pub(crate) struct RemainingRowGroups {
schema: SchemaRef,
frontier: RowGroupFrontier,
row_group_reader_builder: RowGroupReaderBuilder,
}
#[derive(Debug)]
pub(crate) struct RemainingRowGroupsParts {
pub schema: SchemaRef,
pub metadata: Arc<ParquetMetaData>,
pub row_groups: Vec<usize>,
pub selection: Option<RowSelection>,
pub offset: Option<usize>,
pub limit: Option<usize>,
pub reader_builder: RowGroupReaderBuilderParts,
}
impl RemainingRowGroups {
pub fn new(
schema: SchemaRef,
parquet_metadata: Arc<ParquetMetaData>,
row_groups: Vec<usize>,
selection: Option<RowSelection>,
budget: RowBudget,
has_predicates: bool,
row_group_reader_builder: RowGroupReaderBuilder,
) -> Self {
Self {
schema,
frontier: RowGroupFrontier::new(
parquet_metadata,
row_groups,
selection,
budget,
has_predicates,
),
row_group_reader_builder,
}
}
pub(crate) fn into_parts(self) -> RemainingRowGroupsParts {
let Self {
schema,
frontier,
row_group_reader_builder,
} = self;
let RowGroupFrontier {
parquet_metadata,
row_groups,
selection,
budget,
has_predicates: _,
} = frontier;
RemainingRowGroupsParts {
schema,
metadata: parquet_metadata,
row_groups: Vec::from(row_groups),
selection,
offset: budget.offset(),
limit: budget.limit(),
reader_builder: row_group_reader_builder.into_parts(),
}
}
pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
self.row_group_reader_builder.push_data(ranges, buffers);
}
pub fn buffered_bytes(&self) -> u64 {
self.row_group_reader_builder.buffered_bytes()
}
pub fn clear_all_ranges(&mut self) {
self.row_group_reader_builder.clear_all_ranges();
}
pub fn is_at_row_group_boundary(&self) -> bool {
self.row_group_reader_builder.is_finished()
}
pub fn row_groups_remaining(&self) -> usize {
self.frontier.row_groups.len()
}
pub fn try_next_reader(
&mut self,
) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
loop {
if !self.row_group_reader_builder.has_active_row_group() {
match self.frontier.next_readable_row_group()? {
Some(NextRowGroup {
row_group_idx,
row_count,
selection,
budget,
}) => {
self.row_group_reader_builder.next_row_group(
row_group_idx,
row_count,
selection,
budget,
)?;
}
None => return Ok(DecodeResult::Finished),
}
}
match self.row_group_reader_builder.try_build()? {
RowGroupBuildResult::Finished { remaining_budget } => {
self.frontier
.update_budget_after_row_group(remaining_budget);
}
RowGroupBuildResult::NeedsData(ranges) => {
return Ok(DecodeResult::NeedsData(ranges));
}
RowGroupBuildResult::Data {
batch_reader,
remaining_budget,
} => {
self.frontier
.update_budget_after_row_group(remaining_budget);
return Ok(DecodeResult::Data(batch_reader));
}
}
}
}
}