use crate::DecodeResult;
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::FOOTER_SIZE;
use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index};
use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions};
use crate::file::page_index::index_reader::acc_range;
use crate::file::reader::ChunkReader;
use bytes::Bytes;
use std::ops::Range;
use std::sync::Arc;
#[cfg_attr(
feature = "arrow",
doc = r##"
```rust
# use std::ops::Range;
# use bytes::Bytes;
# use arrow_array::record_batch;
# use parquet::DecodeResult;
# use parquet::arrow::ArrowWriter;
# use parquet::errors::ParquetError;
# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
#
# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
# let file_bytes = {
# let mut buffer = vec![0];
# let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
# let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
# writer.write(&batch).unwrap();
# writer.close().unwrap();
# Bytes::from(buffer)
# };
# // mimic IO by returning a function that returns the bytes for a given range
# let get_range = |range: &Range<u64>| -> Bytes {
# let start = range.start as usize;
# let end = range.end as usize;
# file_bytes.slice(start..end)
# };
#
# let file_len = file_bytes.len() as u64;
// The `ParquetMetaDataPushDecoder` needs to know the file length.
let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
// try to decode the metadata. If more data is needed, the decoder will tell you what ranges
loop {
match decoder.try_decode() {
Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
Ok(DecodeResult::NeedsData(ranges)) => {
// The decoder needs more data
//
// In this example, we call a function that returns the bytes for each given range.
// In a real application, you would likely read the data from a file or network.
let data = ranges.iter().map(|range| get_range(range)).collect();
// Push the data into the decoder and try to decode again on the next iteration.
decoder.push_ranges(ranges, data).unwrap();
}
Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
Err(e) => return Err(e),
}
}
# }
```
"##
)]
#[cfg_attr(
feature = "arrow",
doc = r##"
```rust
# use std::ops::Range;
# use bytes::Bytes;
# use arrow_array::record_batch;
# use parquet::DecodeResult;
# use parquet::arrow::ArrowWriter;
# use parquet::errors::ParquetError;
# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
#
# fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
# let file_bytes = {
# let mut buffer = vec![0];
# let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
# let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
# writer.write(&batch).unwrap();
# writer.close().unwrap();
# Bytes::from(buffer)
# };
#
let file_len = file_bytes.len() as u64;
// For this example, we "prefetch" all the bytes which we have in memory,
// but in a real application, you would likely read a chunk from the end
// for example 1MB.
let prefetched_bytes = file_bytes.clone();
let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
// push the prefetched bytes into the decoder
decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
// The decoder will now be able to decode the metadata. Note in a real application,
// unless you can guarantee that the pushed data is enough to decode the metadata,
// you still need to call `try_decode` in a loop until it returns `DecodeResult::Data`
// as shown in the previous example
match decoder.try_decode() {
Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
other => { panic!("expected DecodeResult::Data, got: {other:?}") }
}
# }
```
"##
)]
#[cfg_attr(
feature = "arrow",
doc = r##"
```rust
# use std::ops::Range;
# use bytes::Bytes;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
# use arrow_array::record_batch;
# use parquet::DecodeResult;
# use parquet::arrow::ArrowWriter;
# use parquet::errors::ParquetError;
# use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder};
#
// This function decodes Parquet Metadata from anything that implements
// [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File
async fn decode_metadata(
file_len: u64,
mut async_source: impl AsyncRead + AsyncSeek + Unpin
) -> Result<ParquetMetaData, ParquetError> {
// We need a ParquetMetaDataPushDecoder to decode the metadata.
let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
loop {
match decoder.try_decode() {
Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful
Ok(DecodeResult::NeedsData(ranges)) => {
// The decoder needs more data
//
// In this example we use the AsyncRead and AsyncSeek traits to read the
// required ranges from the async source.
let mut data = Vec::with_capacity(ranges.len());
for range in &ranges {
let mut buffer = vec![0; (range.end - range.start) as usize];
async_source.seek(std::io::SeekFrom::Start(range.start)).await?;
async_source.read_exact(&mut buffer).await?;
data.push(Bytes::from(buffer));
}
// Push the data into the decoder and try to decode again on the next iteration.
decoder.push_ranges(ranges, data).unwrap();
}
Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") }
Err(e) => return Err(e),
}
}
}
```
"##
)]
#[derive(Debug)]
pub struct ParquetMetaDataPushDecoder {
state: DecodeState,
column_index_policy: PageIndexPolicy,
offset_index_policy: PageIndexPolicy,
buffers: crate::util::push_buffers::PushBuffers,
metadata_parser: MetadataParser,
}
impl ParquetMetaDataPushDecoder {
pub fn try_new(file_len: u64) -> Result<Self> {
if file_len < 8 {
return Err(ParquetError::General(format!(
"Parquet files are at least 8 bytes long, but file length is {file_len}"
)));
};
Ok(Self {
state: DecodeState::ReadingFooter,
column_index_policy: PageIndexPolicy::Optional,
offset_index_policy: PageIndexPolicy::Optional,
buffers: crate::util::push_buffers::PushBuffers::new(file_len),
metadata_parser: MetadataParser::new(),
})
}
pub(crate) fn try_new_with_footer_tail(file_len: u64, footer_tail: FooterTail) -> Result<Self> {
let mut new_self = Self::try_new(file_len)?;
new_self.state = DecodeState::ReadingMetadata(footer_tail);
Ok(new_self)
}
pub fn try_new_with_metadata(file_len: u64, metadata: ParquetMetaData) -> Result<Self> {
let mut new_self = Self::try_new(file_len)?;
new_self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
Ok(new_self)
}
pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
self.column_index_policy = page_index_policy;
self.offset_index_policy = page_index_policy;
self
}
pub fn with_column_index_policy(mut self, column_index_policy: PageIndexPolicy) -> Self {
self.column_index_policy = column_index_policy;
self
}
pub fn with_offset_index_policy(mut self, offset_index_policy: PageIndexPolicy) -> Self {
self.offset_index_policy = offset_index_policy;
self
}
pub fn with_metadata_options(mut self, options: Option<Arc<ParquetMetaDataOptions>>) -> Self {
self.metadata_parser = self.metadata_parser.with_metadata_options(options);
self
}
#[cfg(feature = "encryption")]
pub(crate) fn with_file_decryption_properties(
mut self,
file_decryption_properties: Option<std::sync::Arc<FileDecryptionProperties>>,
) -> Self {
self.metadata_parser = self
.metadata_parser
.with_file_decryption_properties(file_decryption_properties);
self
}
pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) -> Result<()> {
if matches!(&self.state, DecodeState::Finished) {
return Err(general_err!(
"ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
));
}
self.buffers.push_ranges(ranges, buffers);
Ok(())
}
pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) -> Result<()> {
if matches!(&self.state, DecodeState::Finished) {
return Err(general_err!(
"ParquetMetaDataPushDecoder: cannot push data after decoding is finished"
));
}
self.buffers.push_range(range, buffer);
Ok(())
}
pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
let file_len = self.buffers.file_len();
let footer_len = FOOTER_SIZE as u64;
loop {
match std::mem::replace(&mut self.state, DecodeState::Intermediate) {
DecodeState::ReadingFooter => {
let footer_start = file_len.saturating_sub(footer_len);
let footer_range = footer_start..file_len;
if !self.buffers.has_range(&footer_range) {
self.state = DecodeState::ReadingFooter;
return Ok(needs_range(footer_range));
}
let footer_bytes = self.get_bytes(&footer_range)?;
let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?;
self.state = DecodeState::ReadingMetadata(footer_tail);
continue;
}
DecodeState::ReadingMetadata(footer_tail) => {
let metadata_len: u64 = footer_tail.metadata_length() as u64;
let metadata_start = file_len - footer_len - metadata_len;
let metadata_end = metadata_start + metadata_len;
let metadata_range = metadata_start..metadata_end;
if !self.buffers.has_range(&metadata_range) {
self.state = DecodeState::ReadingMetadata(footer_tail);
return Ok(needs_range(metadata_range));
}
let metadata = self.metadata_parser.decode_metadata(
&self.get_bytes(&metadata_range)?,
footer_tail.is_encrypted_footer(),
)?;
self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
continue;
}
DecodeState::ReadingPageIndex(mut metadata) => {
let range = range_for_page_index(
&metadata,
self.column_index_policy,
self.offset_index_policy,
);
let Some(page_index_range) = range else {
self.state = DecodeState::Finished;
return Ok(DecodeResult::Data(*metadata));
};
if !self.buffers.has_range(&page_index_range) {
self.state = DecodeState::ReadingPageIndex(metadata);
return Ok(needs_range(page_index_range));
}
let buffer = self.get_bytes(&page_index_range)?;
let offset = page_index_range.start;
parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?;
parse_offset_index(&mut metadata, self.offset_index_policy, &buffer, offset)?;
self.state = DecodeState::Finished;
return Ok(DecodeResult::Data(*metadata));
}
DecodeState::Finished => return Ok(DecodeResult::Finished),
DecodeState::Intermediate => {
return Err(general_err!(
"ParquetMetaDataPushDecoder: internal error, invalid state"
));
}
}
}
}
fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
let start = range.start;
let raw_len = range.end - range.start;
let len: usize = raw_len.try_into().map_err(|_| {
ParquetError::General(format!(
"ParquetMetaDataPushDecoder: Range length too large to fit in usize: {raw_len}",
))
})?;
self.buffers.get_bytes(start, len)
}
}
fn needs_range(range: Range<u64>) -> DecodeResult<ParquetMetaData> {
DecodeResult::NeedsData(vec![range])
}
#[derive(Debug)]
enum DecodeState {
ReadingFooter,
ReadingMetadata(FooterTail),
ReadingPageIndex(Box<ParquetMetaData>),
Finished,
Intermediate,
}
pub fn range_for_page_index(
metadata: &ParquetMetaData,
column_index_policy: PageIndexPolicy,
offset_index_policy: PageIndexPolicy,
) -> Option<Range<u64>> {
let mut range = None;
for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
if column_index_policy != PageIndexPolicy::Skip {
range = acc_range(range, c.column_index_range());
}
if offset_index_policy != PageIndexPolicy::Skip {
range = acc_range(range, c.offset_index_range());
}
}
range
}
#[cfg(all(test, feature = "arrow"))]
mod tests {
use super::*;
use crate::arrow::ArrowWriter;
use crate::file::properties::WriterProperties;
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
use bytes::Bytes;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::{Arc, LazyLock};
#[test]
fn test_metadata_decoder_all_data() {
let file_len = test_file_len();
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
let metadata = expect_data(metadata_decoder.try_decode());
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).num_rows(), 200);
assert_eq!(metadata.row_group(1).num_rows(), 200);
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
}
#[test]
fn test_metadata_decoder_prefetch_success() {
let file_len = test_file_len();
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
let prefetch_range = (file_len - 2 * 1024)..file_len;
push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
let metadata = expect_data(metadata_decoder.try_decode());
expect_finished(metadata_decoder.try_decode());
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).num_rows(), 200);
assert_eq!(metadata.row_group(1).num_rows(), 200);
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
}
#[test]
fn test_metadata_decoder_prefetch_retry() {
let file_len = test_file_len();
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
let prefetch_range = (file_len - 1500)..file_len;
push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]);
let ranges = expect_needs_data(metadata_decoder.try_decode());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let metadata = expect_data(metadata_decoder.try_decode());
expect_finished(metadata_decoder.try_decode());
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).num_rows(), 200);
assert_eq!(metadata.row_group(1).num_rows(), 200);
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
}
#[test]
fn test_metadata_decoder_incremental() {
let file_len = TEST_FILE_DATA.len() as u64;
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
let ranges = expect_needs_data(metadata_decoder.try_decode());
assert_eq!(ranges.len(), 1);
assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let ranges = expect_needs_data(metadata_decoder.try_decode());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let ranges = expect_needs_data(metadata_decoder.try_decode());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let metadata = expect_data(metadata_decoder.try_decode());
expect_finished(metadata_decoder.try_decode());
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).num_rows(), 200);
assert_eq!(metadata.row_group(1).num_rows(), 200);
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
}
#[test]
fn test_metadata_decoder_incremental_no_page_index() {
let file_len = TEST_FILE_DATA.len() as u64;
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len)
.unwrap()
.with_page_index_policy(PageIndexPolicy::Skip);
let ranges = expect_needs_data(metadata_decoder.try_decode());
assert_eq!(ranges.len(), 1);
assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let ranges = expect_needs_data(metadata_decoder.try_decode());
push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
let metadata = expect_data(metadata_decoder.try_decode());
expect_finished(metadata_decoder.try_decode());
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).num_rows(), 200);
assert_eq!(metadata.row_group(1).num_rows(), 200);
assert!(metadata.column_index().is_none()); assert!(metadata.offset_index().is_none()); }
static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
if i % 2 == 0 {
format!("string_{i}")
} else {
format!("A string larger than 12 bytes and thus not inlined {i}")
}
})));
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
});
static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
let input_batch = &TEST_BATCH;
let mut output = Vec::new();
let writer_options = WriterProperties::builder()
.set_max_row_group_size(200)
.set_data_page_row_count_limit(100)
.build();
let mut writer =
ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
let mut row_remain = input_batch.num_rows();
while row_remain > 0 {
let chunk_size = row_remain.min(50);
let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
writer.write(&chunk).unwrap();
row_remain -= chunk_size;
}
writer.close().unwrap();
Bytes::from(output)
});
fn test_file_len() -> u64 {
TEST_FILE_DATA.len() as u64
}
fn test_file_range() -> Range<u64> {
0..test_file_len()
}
pub fn test_file_slice(range: Range<u64>) -> Bytes {
let start: usize = range.start.try_into().unwrap();
let end: usize = range.end.try_into().unwrap();
TEST_FILE_DATA.slice(start..end)
}
fn push_ranges_to_metadata_decoder(
metadata_decoder: &mut ParquetMetaDataPushDecoder,
ranges: Vec<Range<u64>>,
) {
let data = ranges
.iter()
.map(|range| test_file_slice(range.clone()))
.collect::<Vec<_>>();
metadata_decoder.push_ranges(ranges, data).unwrap();
}
fn expect_data<T: Debug>(result: Result<DecodeResult<T>>) -> T {
match result.expect("Expected Ok(DecodeResult::Data(T))") {
DecodeResult::Data(data) => data,
result => panic!("Expected DecodeResult::Data, got {result:?}"),
}
}
fn expect_needs_data<T: Debug>(result: Result<DecodeResult<T>>) -> Vec<Range<u64>> {
match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
DecodeResult::NeedsData(ranges) => ranges,
result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
}
}
fn expect_finished<T: Debug>(result: Result<DecodeResult<T>>) {
match result.expect("Expected Ok(DecodeResult::Finished)") {
DecodeResult::Finished => {}
result => panic!("Expected DecodeResult::Finished, got {result:?}"),
}
}
}