use crate::arrow::ProjectionMask;
use crate::arrow::arrow_reader::RowSelection;
use crate::arrow::in_memory_row_group::{ColumnChunkData, FetchRanges, InMemoryRowGroup};
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::ChunkReader;
use crate::util::push_buffers::PushBuffers;
use bytes::Bytes;
use std::ops::Range;
use std::sync::Arc;
#[derive(Debug)]
pub(super) struct DataRequest {
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
ranges: Vec<Range<u64>>,
page_start_offsets: Option<Vec<Vec<u64>>>,
}
impl DataRequest {
pub fn needed_ranges(&self, buffers: &PushBuffers) -> Vec<Range<u64>> {
self.ranges
.iter()
.filter(|&range| !buffers.has_range(range))
.cloned()
.collect()
}
fn get_chunks(&self, buffers: &PushBuffers) -> Result<Vec<Bytes>, ParquetError> {
self.ranges
.iter()
.map(|range| {
let length: usize = (range.end - range.start)
.try_into()
.expect("overflow for offset");
buffers.get_bytes(range.start, length).map_err(|e| {
ParquetError::General(format!(
"Internal Error missing data for range {range:?} in buffers: {e}",
))
})
})
.collect()
}
pub fn try_into_in_memory_row_group<'a>(
self,
row_group_idx: usize,
row_count: usize,
parquet_metadata: &'a ParquetMetaData,
projection: &ProjectionMask,
buffers: &mut PushBuffers,
) -> Result<InMemoryRowGroup<'a>, ParquetError> {
let chunks = self.get_chunks(buffers)?;
let Self {
column_chunks,
ranges,
page_start_offsets,
} = self;
let mut in_memory_row_group = InMemoryRowGroup {
row_count,
column_chunks,
offset_index: get_offset_index(parquet_metadata, row_group_idx),
row_group_idx,
metadata: parquet_metadata,
};
in_memory_row_group.fill_column_chunks(projection, page_start_offsets, chunks);
buffers.clear_ranges(&ranges);
Ok(in_memory_row_group)
}
}
pub(super) struct DataRequestBuilder<'a> {
row_group_idx: usize,
row_count: usize,
batch_size: usize,
parquet_metadata: &'a ParquetMetaData,
projection: &'a ProjectionMask,
selection: Option<&'a RowSelection>,
cache_projection: Option<&'a ProjectionMask>,
column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
}
impl<'a> DataRequestBuilder<'a> {
pub(super) fn new(
row_group_idx: usize,
row_count: usize,
batch_size: usize,
parquet_metadata: &'a ParquetMetaData,
projection: &'a ProjectionMask,
) -> Self {
Self {
row_group_idx,
row_count,
batch_size,
parquet_metadata,
projection,
selection: None,
cache_projection: None,
column_chunks: None,
}
}
pub(super) fn with_selection(mut self, selection: Option<&'a RowSelection>) -> Self {
self.selection = selection;
self
}
pub(super) fn with_cache_projection(
mut self,
cache_projection: Option<&'a ProjectionMask>,
) -> Self {
self.cache_projection = cache_projection;
self
}
pub(super) fn with_column_chunks(
mut self,
column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
) -> Self {
self.column_chunks = column_chunks;
self
}
pub(crate) fn build(self) -> DataRequest {
let Self {
row_group_idx,
row_count,
batch_size,
parquet_metadata,
projection,
selection,
cache_projection,
column_chunks,
} = self;
let row_group_meta_data = parquet_metadata.row_group(row_group_idx);
let column_chunks =
column_chunks.unwrap_or_else(|| vec![None; row_group_meta_data.columns().len()]);
let row_group = InMemoryRowGroup {
row_count,
column_chunks,
offset_index: get_offset_index(parquet_metadata, row_group_idx),
row_group_idx,
metadata: parquet_metadata,
};
let FetchRanges {
ranges,
page_start_offsets,
} = row_group.fetch_ranges(projection, selection, batch_size, cache_projection);
DataRequest {
column_chunks: row_group.column_chunks,
ranges,
page_start_offsets,
}
}
}
fn get_offset_index(
parquet_metadata: &ParquetMetaData,
row_group_idx: usize,
) -> Option<&[OffsetIndexMetaData]> {
parquet_metadata
.offset_index()
.filter(|index| !index.is_empty())
.map(|x| x[row_group_idx].as_slice())
}