use crate::errors::ParquetError;
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
use crate::DecodeResult;
#[derive(Debug)]
pub struct ParquetMetaDataPushDecoder {
done: bool,
metadata_reader: ParquetMetaDataReader,
buffers: crate::util::push_buffers::PushBuffers,
}
impl ParquetMetaDataPushDecoder {
pub fn try_new(file_len: u64) -> Result<Self, ParquetError> {
if file_len < 8 {
return Err(ParquetError::General(format!(
"Parquet files are at least 8 bytes long, but file length is {file_len}"
)));
};
let metadata_reader =
ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Optional);
Ok(Self {
done: false,
metadata_reader,
buffers: crate::util::push_buffers::PushBuffers::new(file_len),
})
}
pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
self.metadata_reader = self
.metadata_reader
.with_page_index_policy(page_index_policy);
self
}
pub fn push_ranges(
&mut self,
ranges: Vec<std::ops::Range<u64>>,
buffers: Vec<bytes::Bytes>,
) -> std::result::Result<(), String> {
if self.done {
return Err(
"ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
.to_string(),
);
}
self.buffers.push_ranges(ranges, buffers);
Ok(())
}
pub fn try_decode(
&mut self,
) -> std::result::Result<DecodeResult<ParquetMetaData>, ParquetError> {
if self.done {
return Ok(DecodeResult::Finished);
}
let file_len = self.buffers.file_len();
if !self.buffers.has_range(&(file_len - 8..file_len)) {
#[expect(clippy::single_range_in_vec_init)]
return Ok(DecodeResult::NeedsData(vec![file_len - 8..file_len]));
}
let maybe_metadata = self
.metadata_reader
.try_parse_sized(&self.buffers, self.buffers.file_len());
match maybe_metadata {
Ok(()) => {
let metadata = self.metadata_reader.finish()?;
self.done = true;
Ok(DecodeResult::Data(metadata))
}
Err(ParquetError::NeedMoreData(needed)) => {
let needed = needed as u64;
let Some(start_offset) = file_len.checked_sub(needed) else {
return Err(ParquetError::General(format!(
"Parquet metadata reader needs at least {needed} bytes, but file length is only {file_len}"
)));
};
let needed_range = start_offset..start_offset + needed;
Ok(DecodeResult::NeedsData(vec![needed_range]))
}
Err(ParquetError::NeedMoreDataRange(range)) => Ok(DecodeResult::NeedsData(vec![range])),
Err(e) => Err(e), }
}
}
#[cfg(all(test, feature = "arrow"))]
mod tests {
use super::*;
use crate::arrow::ArrowWriter;
use crate::file::properties::WriterProperties;
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
use bytes::Bytes;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::{Arc, LazyLock};
#[test]
fn test_metadata_decoder_all_data() {
let file_len = test_file_len();
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
let metadata = expect_data(metadata_decoder.try_decode());
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).num_rows(), 200);
assert_eq!(metadata.row_group(1).num_rows(), 200);
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
}
#[test]
fn test_metadata_decoder_prefetch_success() {
let file_len = test_file_len();
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
let prefetch_range = (file_len - 2 * 1024)..file_len;
push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
let metadata = expect_data(metadata_decoder.try_decode());
expect_finished(metadata_decoder.try_decode());
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).num_rows(), 200);
assert_eq!(metadata.row_group(1).num_rows(), 200);
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
}
#[test]
fn test_metadata_decoder_prefetch_retry() {
let file_len = test_file_len();
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
let prefetch_range = (file_len - 1500)..file_len;
push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
let ranges = expect_needs_data(metadata_decoder.try_decode());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let metadata = expect_data(metadata_decoder.try_decode());
expect_finished(metadata_decoder.try_decode());
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).num_rows(), 200);
assert_eq!(metadata.row_group(1).num_rows(), 200);
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
}
#[test]
fn test_metadata_decoder_incremental() {
let file_len = TEST_FILE_DATA.len() as u64;
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
let ranges = expect_needs_data(metadata_decoder.try_decode());
assert_eq!(ranges.len(), 1);
assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let ranges = expect_needs_data(metadata_decoder.try_decode());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let ranges = expect_needs_data(metadata_decoder.try_decode());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let metadata = expect_data(metadata_decoder.try_decode());
expect_finished(metadata_decoder.try_decode());
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).num_rows(), 200);
assert_eq!(metadata.row_group(1).num_rows(), 200);
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
}
#[test]
fn test_metadata_decoder_incremental_no_page_index() {
let file_len = TEST_FILE_DATA.len() as u64;
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len)
.unwrap()
.with_page_index_policy(PageIndexPolicy::Skip);
let ranges = expect_needs_data(metadata_decoder.try_decode());
assert_eq!(ranges.len(), 1);
assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let ranges = expect_needs_data(metadata_decoder.try_decode());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let metadata = expect_data(metadata_decoder.try_decode());
expect_finished(metadata_decoder.try_decode());
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).num_rows(), 200);
assert_eq!(metadata.row_group(1).num_rows(), 200);
assert!(metadata.column_index().is_none()); assert!(metadata.offset_index().is_none()); }
static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
if i % 2 == 0 {
format!("string_{i}")
} else {
format!("A string larger than 12 bytes and thus not inlined {i}")
}
})));
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
});
static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
let input_batch = &TEST_BATCH;
let mut output = Vec::new();
let writer_options = WriterProperties::builder()
.set_max_row_group_size(200)
.set_data_page_row_count_limit(100)
.build();
let mut writer =
ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
let mut row_remain = input_batch.num_rows();
while row_remain > 0 {
let chunk_size = row_remain.min(50);
let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
writer.write(&chunk).unwrap();
row_remain -= chunk_size;
}
writer.close().unwrap();
Bytes::from(output)
});
fn test_file_len() -> u64 {
TEST_FILE_DATA.len() as u64
}
fn test_file_range() -> Range<u64> {
0..test_file_len()
}
pub fn test_file_slice(range: Range<u64>) -> Bytes {
let start: usize = range.start.try_into().unwrap();
let end: usize = range.end.try_into().unwrap();
TEST_FILE_DATA.slice(start..end)
}
fn push_ranges_to_metadata_decoder(
metadata_decoder: &mut ParquetMetaDataPushDecoder,
ranges: Vec<Range<u64>>,
) {
let data = ranges
.iter()
.map(|range| test_file_slice(range.clone()))
.collect::<Vec<_>>();
metadata_decoder.push_ranges(ranges, data).unwrap();
}
fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
match result.expect("Expected Ok(DecodeResult::Data(T))") {
DecodeResult::Data(data) => data,
result => panic!("Expected DecodeResult::Data, got {result:?}"),
}
}
fn expect_needs_data<T: Debug>(
result: Result<DecodeResult<T>, ParquetError>,
) -> Vec<Range<u64>> {
match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
DecodeResult::NeedsData(ranges) => ranges,
result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
}
}
fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
match result.expect("Expected Ok(DecodeResult::Finished)") {
DecodeResult::Finished => {}
result => panic!("Expected DecodeResult::Finished, got {result:?}"),
}
}
}