mod reader_builder;
mod remaining;
use crate::DecodeResult;
use crate::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
};
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
use crate::util::push_buffers::PushBuffers;
use arrow_array::RecordBatch;
use bytes::Bytes;
use reader_builder::RowGroupReaderBuilder;
use remaining::RemainingRowGroups;
use std::ops::Range;
use std::sync::Arc;
pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
#[derive(Debug, Clone, Copy)]
pub struct NoInput;
impl ParquetPushDecoderBuilder {
pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> {
Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default())
}
pub fn try_new_decoder_with_options(
parquet_metadata: Arc<ParquetMetaData>,
arrow_reader_options: ArrowReaderOptions,
) -> Result<Self, ParquetError> {
let arrow_reader_metadata =
ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
Ok(Self::new_with_metadata(arrow_reader_metadata))
}
pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self {
Self::new_builder(NoInput, arrow_reader_metadata)
}
pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
let Self {
input: NoInput,
metadata: parquet_metadata,
schema: _,
fields,
batch_size,
row_groups,
projection,
filter,
selection,
limit,
offset,
metrics,
row_selection_policy,
max_predicate_cache_size,
} = self;
let row_groups =
row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
let file_len = 0; let buffers = PushBuffers::new(file_len);
let row_group_reader_builder = RowGroupReaderBuilder::new(
batch_size,
projection,
Arc::clone(&parquet_metadata),
fields,
filter,
limit,
offset,
metrics,
max_predicate_cache_size,
buffers,
row_selection_policy,
);
let remaining_row_groups = RemainingRowGroups::new(
parquet_metadata,
row_groups,
selection,
row_group_reader_builder,
);
Ok(ParquetPushDecoder {
state: ParquetDecoderState::ReadingRowGroup {
remaining_row_groups: Box::new(remaining_row_groups),
},
})
}
}
#[derive(Debug)]
pub struct ParquetPushDecoder {
state: ParquetDecoderState,
}
impl ParquetPushDecoder {
pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError> {
let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
let (new_state, decode_result) = current_state.try_next_batch()?;
self.state = new_state;
Ok(decode_result)
}
pub fn try_next_reader(
&mut self,
) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
let (new_state, decode_result) = current_state.try_next_reader()?;
self.state = new_state;
Ok(decode_result)
}
pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
self.push_ranges(vec![range], vec![data])
}
pub fn push_ranges(
&mut self,
ranges: Vec<Range<u64>>,
data: Vec<Bytes>,
) -> Result<(), ParquetError> {
let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
self.state = current_state.push_data(ranges, data)?;
Ok(())
}
pub fn buffered_bytes(&self) -> u64 {
self.state.buffered_bytes()
}
}
#[derive(Debug)]
enum ParquetDecoderState {
ReadingRowGroup {
remaining_row_groups: Box<RemainingRowGroups>,
},
DecodingRowGroup {
record_batch_reader: Box<ParquetRecordBatchReader>,
remaining_row_groups: Box<RemainingRowGroups>,
},
Finished,
}
impl ParquetDecoderState {
fn try_next_reader(
self,
) -> Result<(Self, DecodeResult<ParquetRecordBatchReader>), ParquetError> {
let mut current_state = self;
loop {
let (next_state, decode_result) = current_state.transition()?;
match decode_result {
DecodeResult::NeedsData(ranges) => {
return Ok((next_state, DecodeResult::NeedsData(ranges)));
}
DecodeResult::Data(()) | DecodeResult::Finished => {}
}
match next_state {
Self::ReadingRowGroup { .. } => current_state = next_state,
Self::DecodingRowGroup {
record_batch_reader,
remaining_row_groups,
} => {
let result = DecodeResult::Data(*record_batch_reader);
let next_state = Self::ReadingRowGroup {
remaining_row_groups,
};
return Ok((next_state, result));
}
Self::Finished => {
return Ok((Self::Finished, DecodeResult::Finished));
}
}
}
}
fn try_next_batch(self) -> Result<(Self, DecodeResult<RecordBatch>), ParquetError> {
let mut current_state = self;
loop {
let (new_state, decode_result) = current_state.transition()?;
match decode_result {
DecodeResult::NeedsData(ranges) => {
return Ok((new_state, DecodeResult::NeedsData(ranges)));
}
DecodeResult::Data(()) | DecodeResult::Finished => {}
}
match new_state {
Self::ReadingRowGroup { .. } => current_state = new_state,
Self::DecodingRowGroup {
mut record_batch_reader,
remaining_row_groups,
} => {
match record_batch_reader.next() {
Some(Ok(batch)) => {
let result = DecodeResult::Data(batch);
let next_state = Self::DecodingRowGroup {
record_batch_reader,
remaining_row_groups,
};
return Ok((next_state, result));
}
None => {
current_state = Self::ReadingRowGroup {
remaining_row_groups,
}
}
Some(Err(e)) => {
return Err(ParquetError::ArrowError(e.to_string()));
}
}
}
Self::Finished => {
return Ok((Self::Finished, DecodeResult::Finished));
}
}
}
}
fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> {
let data_ready = DecodeResult::Data(());
match self {
Self::ReadingRowGroup {
mut remaining_row_groups,
} => {
match remaining_row_groups.try_next_reader()? {
DecodeResult::Data(record_batch_reader) => {
Ok((
Self::DecodingRowGroup {
record_batch_reader: Box::new(record_batch_reader),
remaining_row_groups,
},
data_ready,
))
}
DecodeResult::NeedsData(ranges) => {
Ok((
Self::ReadingRowGroup {
remaining_row_groups,
},
DecodeResult::NeedsData(ranges),
))
}
DecodeResult::Finished => {
Ok((Self::Finished, DecodeResult::Finished))
}
}
}
Self::DecodingRowGroup { .. } => Ok((self, data_ready)),
Self::Finished => Ok((self, DecodeResult::Finished)),
}
}
pub fn push_data(
self,
ranges: Vec<Range<u64>>,
data: Vec<Bytes>,
) -> Result<Self, ParquetError> {
match self {
ParquetDecoderState::ReadingRowGroup {
mut remaining_row_groups,
} => {
remaining_row_groups.push_data(ranges, data);
Ok(ParquetDecoderState::ReadingRowGroup {
remaining_row_groups,
})
}
ParquetDecoderState::DecodingRowGroup {
record_batch_reader,
mut remaining_row_groups,
} => {
remaining_row_groups.push_data(ranges, data);
Ok(ParquetDecoderState::DecodingRowGroup {
record_batch_reader,
remaining_row_groups,
})
}
ParquetDecoderState::Finished => Err(ParquetError::General(
"Cannot push data to a finished decoder".to_string(),
)),
}
}
fn buffered_bytes(&self) -> u64 {
match self {
ParquetDecoderState::ReadingRowGroup {
remaining_row_groups,
} => remaining_row_groups.buffered_bytes(),
ParquetDecoderState::DecodingRowGroup {
record_batch_reader: _,
remaining_row_groups,
} => remaining_row_groups.buffered_bytes(),
ParquetDecoderState::Finished => 0,
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::DecodeResult;
use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
use crate::arrow::{ArrowWriter, ProjectionMask};
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaDataPushDecoder;
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::{gt, lt};
use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
use arrow_select::concat::concat_batches;
use bytes::Bytes;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::{Arc, LazyLock};
#[test]
fn test_decoder_size() {
assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
}
#[test]
fn test_decoder_all_data() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.build()
.unwrap();
decoder
.push_range(test_file_range(), TEST_FILE_DATA.clone())
.unwrap();
let results = vec![
expect_data(decoder.try_decode()),
expect_data(decoder.try_decode()),
];
expect_finished(decoder.try_decode());
let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
assert_eq!(all_output, *TEST_BATCH);
}
#[test]
fn test_decoder_incremental() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.build()
.unwrap();
let mut results = vec![];
let ranges = expect_needs_data(decoder.try_decode());
let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
push_ranges_to_decoder(&mut decoder, ranges);
assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
results.push(expect_data(decoder.try_decode()));
assert_eq!(decoder.buffered_bytes(), 0);
let ranges = expect_needs_data(decoder.try_decode());
let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
push_ranges_to_decoder(&mut decoder, ranges);
assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
results.push(expect_data(decoder.try_decode()));
assert_eq!(decoder.buffered_bytes(), 0);
expect_finished(decoder.try_decode());
let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
assert_eq!(all_output, *TEST_BATCH);
}
#[test]
fn test_decoder_partial() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.build()
.unwrap();
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch1 = expect_data(decoder.try_decode());
let expected1 = TEST_BATCH.slice(0, 200);
assert_eq!(batch1, expected1);
let ranges = expect_needs_data(decoder.try_decode());
let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
assert!(!ranges1.is_empty());
assert!(!ranges2.is_empty());
push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
let ranges = expect_needs_data(decoder.try_decode());
assert_eq!(ranges, ranges2); push_ranges_to_decoder(&mut decoder, vec![]);
let ranges = expect_needs_data(decoder.try_decode());
assert_eq!(ranges, ranges2); push_ranges_to_decoder(&mut decoder, ranges);
let batch2 = expect_data(decoder.try_decode());
let expected2 = TEST_BATCH.slice(200, 200);
assert_eq!(batch2, expected2);
expect_finished(decoder.try_decode());
}
#[test]
fn test_decoder_selection_does_one_request() {
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
let mut decoder = builder
.with_projection(
ProjectionMask::columns(&schema_descr, ["a", "b"]), )
.build()
.unwrap();
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch1 = expect_data(decoder.try_decode());
let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
assert_eq!(batch1, expected1);
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch2 = expect_data(decoder.try_decode());
let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
assert_eq!(batch2, expected2);
expect_finished(decoder.try_decode());
}
#[test]
fn test_decoder_single_filter_partial() {
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
let row_filter_a = ArrowPredicateFn::new(
ProjectionMask::columns(&schema_descr, ["a", "b"]),
|batch: RecordBatch| {
let scalar_250 = Int64Array::new_scalar(250);
let column = batch.column(0).as_primitive::<Int64Type>();
gt(column, &scalar_250)
},
);
let mut decoder = builder
.with_projection(
ProjectionMask::columns(&schema_descr, ["a"]), )
.with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
.build()
.unwrap();
let ranges = expect_needs_data(decoder.try_decode());
let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
assert!(!ranges1.is_empty());
assert!(!ranges2.is_empty());
push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
let ranges = expect_needs_data(decoder.try_decode());
assert_eq!(ranges, ranges2); let ranges = expect_needs_data(decoder.try_decode());
assert_eq!(ranges, ranges2); push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch = expect_data(decoder.try_decode());
let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
assert_eq!(batch, expected);
expect_finished(decoder.try_decode());
}
#[test]
fn test_decoder_single_filter_and_row_selection() {
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
let row_filter_a = ArrowPredicateFn::new(
ProjectionMask::columns(&schema_descr, ["a"]),
|batch: RecordBatch| {
let scalar_250 = Int64Array::new_scalar(250);
let column = batch.column(0).as_primitive::<Int64Type>();
gt(column, &scalar_250)
},
);
let mut decoder = builder
.with_projection(
ProjectionMask::columns(&schema_descr, ["b"]), )
.with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
.with_row_selection(RowSelection::from(vec![
RowSelector::skip(200), RowSelector::select(100), RowSelector::skip(100),
]))
.build()
.unwrap();
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch = expect_data(decoder.try_decode());
let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
assert_eq!(batch, expected);
expect_finished(decoder.try_decode());
}
#[test]
fn test_decoder_multi_filters() {
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
let row_filter_a = ArrowPredicateFn::new(
ProjectionMask::columns(&schema_descr, ["a"]),
|batch: RecordBatch| {
let scalar_175 = Int64Array::new_scalar(175);
let column = batch.column(0).as_primitive::<Int64Type>();
gt(column, &scalar_175)
},
);
let row_filter_b = ArrowPredicateFn::new(
ProjectionMask::columns(&schema_descr, ["b"]),
|batch: RecordBatch| {
let scalar_625 = Int64Array::new_scalar(625);
let column = batch.column(0).as_primitive::<Int64Type>();
lt(column, &scalar_625)
},
);
let mut decoder = builder
.with_projection(
ProjectionMask::columns(&schema_descr, ["c"]), )
.with_row_filter(RowFilter::new(vec![
Box::new(row_filter_a),
Box::new(row_filter_b),
]))
.build()
.unwrap();
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch1 = expect_data(decoder.try_decode());
let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
assert_eq!(batch1, expected1);
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch2 = expect_data(decoder.try_decode());
let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
assert_eq!(batch2, expected2);
expect_finished(decoder.try_decode());
}
#[test]
fn test_decoder_reuses_filter_pages() {
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
let row_filter_a = ArrowPredicateFn::new(
ProjectionMask::columns(&schema_descr, ["a"]),
|batch: RecordBatch| {
let scalar_250 = Int64Array::new_scalar(250);
let column = batch.column(0).as_primitive::<Int64Type>();
gt(column, &scalar_250)
},
);
let mut decoder = builder
.with_projection(
ProjectionMask::columns(&schema_descr, ["a"]), )
.with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
.build()
.unwrap();
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch = expect_data(decoder.try_decode());
let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
assert_eq!(batch, expected);
expect_finished(decoder.try_decode());
}
#[test]
fn test_decoder_empty_filters() {
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
let mut decoder = builder
.with_projection(
ProjectionMask::columns(&schema_descr, ["c"]), )
.with_row_filter(RowFilter::new(vec![
]))
.build()
.unwrap();
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch1 = expect_data(decoder.try_decode());
let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
assert_eq!(batch1, expected1);
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch2 = expect_data(decoder.try_decode());
let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
assert_eq!(batch2, expected2);
expect_finished(decoder.try_decode());
}
#[test]
fn test_decoder_offset_limit() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_offset(225)
.with_limit(20)
.build()
.unwrap();
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch1 = expect_data(decoder.try_decode());
let expected1 = TEST_BATCH.slice(225, 20);
assert_eq!(batch1, expected1);
expect_finished(decoder.try_decode());
}
#[test]
fn test_decoder_row_group_selection() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_groups(vec![1])
.build()
.unwrap();
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch1 = expect_data(decoder.try_decode());
let expected1 = TEST_BATCH.slice(200, 200);
assert_eq!(batch1, expected1);
expect_finished(decoder.try_decode());
}
#[test]
fn test_decoder_row_selection() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_selection(RowSelection::from(vec![
RowSelector::skip(225), RowSelector::select(20), ]))
.build()
.unwrap();
let ranges = expect_needs_data(decoder.try_decode());
push_ranges_to_decoder(&mut decoder, ranges);
let batch1 = expect_data(decoder.try_decode());
let expected1 = TEST_BATCH.slice(225, 20);
assert_eq!(batch1, expected1);
expect_finished(decoder.try_decode());
}
static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
if i % 2 == 0 {
format!("string_{i}")
} else {
format!("A string larger than 12 bytes and thus not inlined {i}")
}
})));
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
});
static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
let input_batch = &TEST_BATCH;
let mut output = Vec::new();
let writer_options = WriterProperties::builder()
.set_max_row_group_row_count(Some(200))
.set_data_page_row_count_limit(100)
.build();
let mut writer =
ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
let mut row_remain = input_batch.num_rows();
while row_remain > 0 {
let chunk_size = row_remain.min(50);
let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
writer.write(&chunk).unwrap();
row_remain -= chunk_size;
}
writer.close().unwrap();
Bytes::from(output)
});
fn test_file_len() -> u64 {
TEST_FILE_DATA.len() as u64
}
fn test_file_range() -> Range<u64> {
0..test_file_len()
}
pub fn test_file_slice(range: Range<u64>) -> Bytes {
let start: usize = range.start.try_into().unwrap();
let end: usize = range.end.try_into().unwrap();
TEST_FILE_DATA.slice(start..end)
}
pub fn test_file_parquet_metadata() -> Arc<crate::file::metadata::ParquetMetaData> {
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
let metadata = metadata_decoder.try_decode().unwrap();
let DecodeResult::Data(metadata) = metadata else {
panic!("Expected metadata to be decoded successfully");
};
Arc::new(metadata)
}
fn push_ranges_to_metadata_decoder(
metadata_decoder: &mut ParquetMetaDataPushDecoder,
ranges: Vec<Range<u64>>,
) {
let data = ranges
.iter()
.map(|range| test_file_slice(range.clone()))
.collect::<Vec<_>>();
metadata_decoder.push_ranges(ranges, data).unwrap();
}
fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec<Range<u64>>) {
let data = ranges
.iter()
.map(|range| test_file_slice(range.clone()))
.collect::<Vec<_>>();
decoder.push_ranges(ranges, data).unwrap();
}
fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
match result.expect("Expected Ok(DecodeResult::Data(T))") {
DecodeResult::Data(data) => data,
result => panic!("Expected DecodeResult::Data, got {result:?}"),
}
}
fn expect_needs_data<T: Debug>(
result: Result<DecodeResult<T>, ParquetError>,
) -> Vec<Range<u64>> {
match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
DecodeResult::NeedsData(ranges) => ranges,
result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
}
}
fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
match result.expect("Expected Ok(DecodeResult::Finished)") {
DecodeResult::Finished => {}
result => panic!("Expected DecodeResult::Finished, got {result:?}"),
}
}
}