use crate::DecodeResult;
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
use bytes::Bytes;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
#[derive(Debug)]
pub(crate) struct RemainingRowGroups {
parquet_metadata: Arc<ParquetMetaData>,
row_groups: VecDeque<usize>,
selection: Option<RowSelection>,
row_group_reader_builder: RowGroupReaderBuilder,
}
impl RemainingRowGroups {
pub fn new(
parquet_metadata: Arc<ParquetMetaData>,
row_groups: Vec<usize>,
selection: Option<RowSelection>,
row_group_reader_builder: RowGroupReaderBuilder,
) -> Self {
Self {
parquet_metadata,
row_groups: VecDeque::from(row_groups),
selection,
row_group_reader_builder,
}
}
pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
self.row_group_reader_builder.push_data(ranges, buffers);
}
pub fn buffered_bytes(&self) -> u64 {
self.row_group_reader_builder.buffered_bytes()
}
pub fn try_next_reader(
&mut self,
) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
loop {
let result: DecodeResult<ParquetRecordBatchReader> =
self.row_group_reader_builder.try_build()?;
match result {
DecodeResult::Finished => {
}
DecodeResult::NeedsData(ranges) => {
return Ok(DecodeResult::NeedsData(ranges));
}
DecodeResult::Data(batch_reader) => {
return Ok(DecodeResult::Data(batch_reader));
}
}
let row_group_idx = match self.row_groups.pop_front() {
None => return Ok(DecodeResult::Finished),
Some(idx) => idx,
};
let row_count: usize = self
.parquet_metadata
.row_group(row_group_idx)
.num_rows()
.try_into()
.map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))?;
let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
self.row_group_reader_builder
.next_row_group(row_group_idx, row_count, selection)?;
}
}
}