use std::fmt::Formatter;
use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use crate::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
};
use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
use crate::bloom_filter::{
SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
mod metadata;
pub use metadata::*;
#[cfg(feature = "object_store")]
mod store;
use crate::DecodeResult;
use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder};
#[cfg(feature = "object_store")]
pub use store::*;
pub trait AsyncFileReader: Send {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
async move {
let mut result = Vec::with_capacity(ranges.len());
for range in ranges.into_iter() {
let data = self.get_bytes(range).await?;
result.push(data);
}
Ok(result)
}
.boxed()
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
}
impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
self.as_mut().get_bytes(range)
}
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
self.as_mut().get_byte_ranges(ranges)
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
self.as_mut().get_metadata(options)
}
}
impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
async move {
self.seek(SeekFrom::End(-(suffix as i64))).await?;
let mut buf = Vec::with_capacity(suffix);
self.take(suffix as _).read_to_end(&mut buf).await?;
Ok(buf.into())
}
.boxed()
}
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
async move {
self.seek(SeekFrom::Start(range.start)).await?;
let to_read = range.end - range.start;
let mut buffer = Vec::with_capacity(to_read.try_into()?);
let read = self.take(to_read).read_to_end(&mut buffer).await?;
if read as u64 != to_read {
return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
}
Ok(buffer.into())
}
.boxed()
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
async move {
let metadata_opts = options.map(|o| o.metadata_options().clone());
let metadata_reader = ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::from(
options.is_some_and(|o| o.page_index()),
))
.with_metadata_options(metadata_opts);
#[cfg(feature = "encryption")]
let metadata_reader = metadata_reader.with_decryption_properties(
options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
);
let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
Ok(Arc::new(parquet_metadata))
}
.boxed()
}
}
impl ArrowReaderMetadata {
pub async fn load_async<T: AsyncFileReader>(
input: &mut T,
options: ArrowReaderOptions,
) -> Result<Self> {
let metadata = input.get_metadata(Some(&options)).await?;
Self::try_new(metadata, options)
}
}
#[doc(hidden)]
pub struct AsyncReader<T>(T);
pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
pub async fn new(input: T) -> Result<Self> {
Self::new_with_options(input, Default::default()).await
}
pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
Ok(Self::new_with_metadata(input, metadata))
}
pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
Self::new_builder(AsyncReader(input), metadata)
}
pub async fn get_row_group_column_bloom_filter(
&mut self,
row_group_idx: usize,
column_idx: usize,
) -> Result<Option<Sbbf>> {
let metadata = self.metadata.row_group(row_group_idx);
let column_metadata = metadata.column(column_idx);
let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
offset
.try_into()
.map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
} else {
return Ok(None);
};
let buffer = match column_metadata.bloom_filter_length() {
Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
None => self
.input
.0
.get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
}
.await?;
let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
match header.algorithm {
BloomFilterAlgorithm::BLOCK => {
}
}
match header.compression {
BloomFilterCompression::UNCOMPRESSED => {
}
}
match header.hash {
BloomFilterHash::XXHASH => {
}
}
let bitset = match column_metadata.bloom_filter_length() {
Some(_) => buffer.slice(
(TryInto::<usize>::try_into(bitset_offset).unwrap()
- TryInto::<usize>::try_into(offset).unwrap())..,
),
None => {
let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
self.input
.0
.get_bytes(bitset_offset..bitset_offset + bitset_length)
.await?
}
};
Ok(Some(Sbbf::new(&bitset)))
}
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let Self {
input,
metadata,
schema,
fields,
batch_size,
row_groups,
projection,
filter,
selection,
row_selection_policy: selection_strategy,
limit,
offset,
metrics,
max_predicate_cache_size,
} = self;
let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| m.len());
let projected_fields = schema
.fields
.filter_leaves(|idx, _| idx < projection_len && projection.leaf_included(idx));
let projected_schema = Arc::new(Schema::new(projected_fields));
let decoder = ParquetPushDecoderBuilder {
input: NoInput,
metadata,
schema,
fields,
projection,
filter,
selection,
row_selection_policy: selection_strategy,
batch_size,
row_groups,
limit,
offset,
metrics,
max_predicate_cache_size,
}
.build()?;
let request_state = RequestState::None { input: input.0 };
Ok(ParquetRecordBatchStream {
schema: projected_schema,
decoder,
request_state,
})
}
}
enum RequestState<T> {
None {
input: T,
},
Outstanding {
ranges: Vec<Range<u64>>,
future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
},
Done,
}
impl<T> RequestState<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
let ranges_captured = ranges.clone();
let future = async move {
let data = input.get_byte_ranges(ranges_captured).await?;
Ok((input, data))
}
.boxed();
RequestState::Outstanding { ranges, future }
}
}
impl<T> std::fmt::Debug for RequestState<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
RequestState::None { input: _ } => f
.debug_struct("RequestState::None")
.field("input", &"...")
.finish(),
RequestState::Outstanding { ranges, .. } => f
.debug_struct("RequestState::Outstanding")
.field("ranges", &ranges)
.finish(),
RequestState::Done => {
write!(f, "RequestState::Done")
}
}
}
}
pub struct ParquetRecordBatchStream<T> {
schema: SchemaRef,
request_state: RequestState<T>,
decoder: ParquetPushDecoder,
}
impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ParquetRecordBatchStream")
.field("request_state", &self.request_state)
.finish()
}
}
impl<T> ParquetRecordBatchStream<T> {
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
}
impl<T> ParquetRecordBatchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
loop {
let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
match request_state {
RequestState::None { input } => {
match self.decoder.try_next_reader()? {
DecodeResult::NeedsData(ranges) => {
self.request_state = RequestState::begin_request(input, ranges);
continue; }
DecodeResult::Data(reader) => {
self.request_state = RequestState::None { input };
return Ok(Some(reader));
}
DecodeResult::Finished => return Ok(None),
}
}
RequestState::Outstanding { ranges, future } => {
let (input, data) = future.await?;
self.decoder.push_ranges(ranges, data)?;
self.request_state = RequestState::None { input };
continue; }
RequestState::Done => {
self.request_state = RequestState::Done;
return Ok(None);
}
}
}
}
}
impl<T> Stream for ParquetRecordBatchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.poll_next_inner(cx) {
Ok(res) => {
res.map(|res| Ok(res).transpose())
}
Err(e) => {
self.request_state = RequestState::Done;
Poll::Ready(Some(Err(e)))
}
}
}
}
impl<T> ParquetRecordBatchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Result<Poll<Option<RecordBatch>>> {
loop {
let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
match request_state {
RequestState::None { input } => {
match self.decoder.try_decode()? {
DecodeResult::NeedsData(ranges) => {
self.request_state = RequestState::begin_request(input, ranges);
continue; }
DecodeResult::Data(batch) => {
self.request_state = RequestState::None { input };
return Ok(Poll::Ready(Some(batch)));
}
DecodeResult::Finished => {
self.request_state = RequestState::Done;
return Ok(Poll::Ready(None));
}
}
}
RequestState::Outstanding { ranges, mut future } => match future.poll_unpin(cx) {
Poll::Ready(result) => {
let (input, data) = result?;
self.decoder.push_ranges(ranges, data)?;
self.request_state = RequestState::None { input };
continue; }
Poll::Pending => {
self.request_state = RequestState::Outstanding { ranges, future };
return Ok(Poll::Pending);
}
},
RequestState::Done => {
self.request_state = RequestState::Done;
return Ok(Poll::Ready(None));
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::arrow_reader::RowSelectionPolicy;
use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper;
use crate::arrow::arrow_reader::{
ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
};
use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use crate::arrow::schema::virtual_type::RowNumber;
use crate::arrow::{ArrowWriter, AsyncArrowWriter, ProjectionMask};
use crate::file::metadata::ParquetMetaDataReader;
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
use arrow::compute::or;
use arrow::error::Result as ArrowResult;
use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::{Int32Type, TimestampNanosecondType};
use arrow_array::{
Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader,
Scalar, StringArray, StructArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use arrow_select::concat::concat_batches;
use futures::{StreamExt, TryStreamExt};
use rand::{Rng, rng};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tempfile::tempfile;
#[derive(Clone)]
struct TestReader {
data: Bytes,
metadata: Option<Arc<ParquetMetaData>>,
requests: Arc<Mutex<Vec<Range<usize>>>>,
}
impl TestReader {
fn new(data: Bytes) -> Self {
Self {
data,
metadata: Default::default(),
requests: Default::default(),
}
}
}
impl AsyncFileReader for TestReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
let range = range.clone();
self.requests
.lock()
.unwrap()
.push(range.start as usize..range.end as usize);
futures::future::ready(Ok(self
.data
.slice(range.start as usize..range.end as usize)))
.boxed()
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
);
self.metadata = Some(Arc::new(
metadata_reader.parse_and_finish(&self.data).unwrap(),
));
futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
}
}
#[tokio::test]
async fn test_async_reader() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data.clone());
let requests = async_reader.requests.clone();
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
let metadata = builder.metadata().clone();
assert_eq!(metadata.num_row_groups(), 1);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.build()
.unwrap();
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(104)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(async_batches, sync_batches);
let requests = requests.lock().unwrap();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
assert_eq!(
&requests[..],
&[
offset_1 as usize..(offset_1 + length_1) as usize,
offset_2 as usize..(offset_2 + length_2) as usize
]
);
}
#[tokio::test]
async fn test_async_reader_with_next_row_group() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data.clone());
let requests = async_reader.requests.clone();
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
let metadata = builder.metadata().clone();
assert_eq!(metadata.num_row_groups(), 1);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let mut stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.build()
.unwrap();
let mut readers = vec![];
while let Some(reader) = stream.next_row_group().await.unwrap() {
readers.push(reader);
}
let async_batches: Vec<_> = readers
.into_iter()
.flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
.collect();
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(104)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(async_batches, sync_batches);
let requests = requests.lock().unwrap();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
assert_eq!(
&requests[..],
&[
offset_1 as usize..(offset_1 + length_1) as usize,
offset_2 as usize..(offset_2 + length_2) as usize
]
);
}
#[tokio::test]
async fn test_async_reader_with_index() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data.clone());
let options = ArrowReaderOptions::new().with_page_index(true);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();
let metadata_with_index = builder.metadata();
assert_eq!(metadata_with_index.num_row_groups(), 1);
let offset_index = metadata_with_index.offset_index().unwrap();
let column_index = metadata_with_index.column_index().unwrap();
assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
let num_columns = metadata_with_index
.file_metadata()
.schema_descr()
.num_columns();
offset_index
.iter()
.for_each(|x| assert_eq!(x.len(), num_columns));
column_index
.iter()
.for_each(|x| assert_eq!(x.len(), num_columns));
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.build()
.unwrap();
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(1024)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(async_batches, sync_batches);
}
#[tokio::test]
async fn test_async_reader_with_limit() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let metadata = Arc::new(metadata);
assert_eq!(metadata.num_row_groups(), 1);
let async_reader = TestReader::new(data.clone());
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
assert_eq!(builder.metadata().num_row_groups(), 1);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.with_limit(1)
.build()
.unwrap();
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(1024)
.with_limit(1)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(async_batches, sync_batches);
}
#[tokio::test]
async fn test_async_reader_skip_pages() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data.clone());
let options = ArrowReaderOptions::new().with_page_index(true);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();
assert_eq!(builder.metadata().num_row_groups(), 1);
let selection = RowSelection::from(vec![
RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
let stream = builder
.with_projection(mask.clone())
.with_row_selection(selection.clone())
.build()
.expect("building stream");
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(1024)
.with_row_selection(selection)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(async_batches, sync_batches);
}
#[tokio::test]
async fn test_fuzz_async_reader_selection() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let mut rand = rng();
for _ in 0..100 {
let mut expected_rows = 0;
let mut total_rows = 0;
let mut skip = false;
let mut selectors = vec![];
while total_rows < 7300 {
let row_count: usize = rand.random_range(1..100);
let row_count = row_count.min(7300 - total_rows);
selectors.push(RowSelector { row_count, skip });
total_rows += row_count;
if !skip {
expected_rows += row_count;
}
skip = !skip;
}
let selection = RowSelection::from(selectors);
let async_reader = TestReader::new(data.clone());
let options = ArrowReaderOptions::new().with_page_index(true);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();
assert_eq!(builder.metadata().num_row_groups(), 1);
let col_idx: usize = rand.random_range(0..13);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
let stream = builder
.with_projection(mask.clone())
.with_row_selection(selection.clone())
.build()
.expect("building stream");
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
assert_eq!(actual_rows, expected_rows);
}
}
#[tokio::test]
async fn test_async_reader_zero_row_selector() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let mut rand = rng();
let mut expected_rows = 0;
let mut total_rows = 0;
let mut skip = false;
let mut selectors = vec![];
selectors.push(RowSelector {
row_count: 0,
skip: false,
});
while total_rows < 7300 {
let row_count: usize = rand.random_range(1..100);
let row_count = row_count.min(7300 - total_rows);
selectors.push(RowSelector { row_count, skip });
total_rows += row_count;
if !skip {
expected_rows += row_count;
}
skip = !skip;
}
let selection = RowSelection::from(selectors);
let async_reader = TestReader::new(data.clone());
let options = ArrowReaderOptions::new().with_page_index(true);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();
assert_eq!(builder.metadata().num_row_groups(), 1);
let col_idx: usize = rand.random_range(0..13);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
let stream = builder
.with_projection(mask.clone())
.with_row_selection(selection.clone())
.build()
.expect("building stream");
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
assert_eq!(actual_rows, expected_rows);
}
#[tokio::test]
async fn test_row_filter_full_page_skip_is_handled_async() {
let first_value: i64 = 1111;
let last_value: i64 = 9999;
let num_rows: usize = 12;
let schema = Arc::new(Schema::new(vec![
Field::new("key", DataType::Int64, false),
Field::new("value", DataType::Int64, false),
]));
let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
int_values[0] = first_value;
int_values[num_rows - 1] = last_value;
let keys = Int64Array::from(int_values.clone());
let values = Int64Array::from(int_values.clone());
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
)
.unwrap();
let props = WriterProperties::builder()
.set_write_batch_size(2)
.set_data_page_row_count_limit(2)
.build();
let mut buffer = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let data = Bytes::from(buffer);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(
TestReader::new(data.clone()),
ArrowReaderOptions::new().with_page_index(true),
)
.await
.unwrap();
let schema = builder.parquet_schema().clone();
let filter_mask = ProjectionMask::leaves(&schema, [0]);
let make_predicate = |mask: ProjectionMask| {
ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
let column = batch.column(0);
let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
or(&match_first, &match_second)
})
};
let predicate = make_predicate(filter_mask.clone());
let stream = ParquetRecordBatchStreamBuilder::new_with_options(
TestReader::new(data.clone()),
ArrowReaderOptions::new().with_page_index(true),
)
.await
.unwrap()
.with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
.with_batch_size(12)
.with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
.build()
.unwrap();
let schema = stream.schema().clone();
let batches: Vec<_> = stream.try_collect().await.unwrap();
let result = concat_batches(&schema, &batches).unwrap();
assert_eq!(result.num_rows(), 2);
}
#[tokio::test]
async fn test_row_filter() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
let data = RecordBatch::try_from_iter([
("a", Arc::new(a) as ArrayRef),
("b", Arc::new(b) as ArrayRef),
])
.unwrap();
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
writer.write(&data).unwrap();
writer.close().unwrap();
let data: Bytes = buf.into();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
let test = TestReader::new(data);
let requests = test.requests.clone();
let a_scalar = StringArray::from_iter_values(["b"]);
let a_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![0]),
move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
);
let filter = RowFilter::new(vec![Box::new(a_filter)]);
let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
let stream = ParquetRecordBatchStreamBuilder::new(test)
.await
.unwrap()
.with_projection(mask.clone())
.with_batch_size(1024)
.with_row_filter(filter)
.build()
.unwrap();
let batches: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(batch.num_columns(), 2);
assert_eq!(
batch.column(0).as_ref(),
&StringArray::from_iter_values(["b", "b", "b"])
);
assert_eq!(
batch.column(1).as_ref(),
&StringArray::from_iter_values(["2", "3", "4"])
);
assert_eq!(requests.lock().unwrap().len(), 2);
}
#[tokio::test]
async fn test_two_row_filters() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
let c = Int32Array::from_iter(0..6);
let data = RecordBatch::try_from_iter([
("a", Arc::new(a) as ArrayRef),
("b", Arc::new(b) as ArrayRef),
("c", Arc::new(c) as ArrayRef),
])
.unwrap();
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
writer.write(&data).unwrap();
writer.close().unwrap();
let data: Bytes = buf.into();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
let test = TestReader::new(data);
let requests = test.requests.clone();
let a_scalar = StringArray::from_iter_values(["b"]);
let a_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![0]),
move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
);
let b_scalar = StringArray::from_iter_values(["4"]);
let b_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![1]),
move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
);
let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
let stream = ParquetRecordBatchStreamBuilder::new(test)
.await
.unwrap()
.with_projection(mask.clone())
.with_batch_size(1024)
.with_row_filter(filter)
.build()
.unwrap();
let batches: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 2);
let col = batch.column(0);
let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
assert_eq!(val, "b");
let col = batch.column(1);
let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
assert_eq!(val, 3);
assert_eq!(requests.lock().unwrap().len(), 3);
}
#[tokio::test]
async fn test_limit_multiple_row_groups() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
let c = Int32Array::from_iter(0..6);
let data = RecordBatch::try_from_iter([
("a", Arc::new(a) as ArrayRef),
("b", Arc::new(b) as ArrayRef),
("c", Arc::new(c) as ArrayRef),
])
.unwrap();
let mut buf = Vec::with_capacity(1024);
let props = WriterProperties::builder()
.set_max_row_group_size(3)
.build();
let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
writer.write(&data).unwrap();
writer.close().unwrap();
let data: Bytes = buf.into();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
assert_eq!(metadata.num_row_groups(), 2);
let test = TestReader::new(data);
let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
.await
.unwrap()
.with_batch_size(1024)
.with_limit(4)
.build()
.unwrap();
let batches: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
let batch = &batches[0];
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 3);
let col2 = batch.column(2).as_primitive::<Int32Type>();
assert_eq!(col2.values(), &[0, 1, 2]);
let batch = &batches[1];
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 3);
let col2 = batch.column(2).as_primitive::<Int32Type>();
assert_eq!(col2.values(), &[3]);
let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
.await
.unwrap()
.with_offset(2)
.with_limit(3)
.build()
.unwrap();
let batches: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
let batch = &batches[0];
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 3);
let col2 = batch.column(2).as_primitive::<Int32Type>();
assert_eq!(col2.values(), &[2]);
let batch = &batches[1];
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 3);
let col2 = batch.column(2).as_primitive::<Int32Type>();
assert_eq!(col2.values(), &[3, 4]);
let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
.await
.unwrap()
.with_offset(4)
.with_limit(20)
.build()
.unwrap();
let batches: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 3);
let col2 = batch.column(2).as_primitive::<Int32Type>();
assert_eq!(col2.values(), &[4, 5]);
}
#[tokio::test]
async fn test_row_filter_with_index() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
assert_eq!(metadata.num_row_groups(), 1);
let async_reader = TestReader::new(data.clone());
let a_filter =
ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
Ok(batch.column(0).as_boolean().clone())
});
let b_scalar = Int8Array::from(vec![2]);
let b_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![2]),
move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)),
);
let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
let options = ArrowReaderOptions::new().with_page_index(true);
let stream = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap()
.with_projection(mask.clone())
.with_batch_size(1024)
.with_row_filter(filter)
.build()
.unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 730);
}
#[tokio::test]
async fn test_batch_size_overallocate() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data.clone());
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
let file_rows = builder.metadata().file_metadata().num_rows() as usize;
let builder = builder
.with_projection(ProjectionMask::all())
.with_batch_size(1024);
assert_ne!(1024, file_rows);
assert_eq!(builder.batch_size, file_rows);
let _stream = builder.build().unwrap();
}
#[tokio::test]
async fn test_get_row_group_column_bloom_filter_without_length() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
test_get_row_group_column_bloom_filter(data, false).await;
}
#[tokio::test]
async fn test_parquet_record_batch_stream_schema() {
fn get_all_field_names(schema: &Schema) -> Vec<&String> {
schema.flattened_fields().iter().map(|f| f.name()).collect()
}
let mut metadata = HashMap::with_capacity(1);
metadata.insert("key".to_string(), "value".to_string());
let nested_struct_array = StructArray::from(vec![
(
Arc::new(Field::new("d", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
),
(
Arc::new(Field::new("e", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
),
]);
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("a", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
),
(
Arc::new(Field::new("b", DataType::UInt64, true)),
Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
),
(
Arc::new(Field::new(
"c",
nested_struct_array.data_type().clone(),
true,
)),
Arc::new(nested_struct_array) as ArrayRef,
),
]);
let schema =
Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
let record_batch = RecordBatch::from(struct_array)
.with_schema(schema.clone())
.unwrap();
let mut file = tempfile().unwrap();
let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
writer.write(&record_batch).unwrap();
writer.close().unwrap();
let all_fields = ["a", "b", "c", "d", "e"];
let projections = [
(vec![], vec![]),
(vec![0], vec!["a"]),
(vec![0, 1], vec!["a", "b"]),
(vec![0, 1, 2], vec!["a", "b", "c", "d"]),
(vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
];
for (indices, expected_projected_names) in projections {
let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
assert_eq!(get_all_field_names(&builder), all_fields);
assert_eq!(builder.metadata, metadata);
assert_eq!(get_all_field_names(&reader), expected_projected_names);
assert_eq!(reader.metadata, HashMap::default());
assert_eq!(get_all_field_names(&batch), expected_projected_names);
assert_eq!(batch.metadata, HashMap::default());
};
let builder =
ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
let sync_builder_schema = builder.schema().clone();
let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
let mut reader = builder.with_projection(mask).build().unwrap();
let sync_reader_schema = reader.schema();
let batch = reader.next().unwrap().unwrap();
let sync_batch_schema = batch.schema();
assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
let file = tokio::fs::File::from(file.try_clone().unwrap());
let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
let async_builder_schema = builder.schema().clone();
let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
let mut reader = builder.with_projection(mask).build().unwrap();
let async_reader_schema = reader.schema().clone();
let batch = reader.next().await.unwrap().unwrap();
let async_batch_schema = batch.schema();
assert_schemas(
async_builder_schema,
async_reader_schema,
async_batch_schema,
);
}
}
#[tokio::test]
async fn test_get_row_group_column_bloom_filter_with_length() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data.clone());
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
let schema = builder.schema().clone();
let stream = builder.build().unwrap();
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
let mut parquet_data = Vec::new();
let props = WriterProperties::builder()
.set_bloom_filter_enabled(true)
.build();
let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
for batch in batches {
writer.write(&batch).unwrap();
}
writer.close().unwrap();
test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
}
async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
let async_reader = TestReader::new(data.clone());
let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
let metadata = builder.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let row_group = metadata.row_group(0);
let column = row_group.column(0);
assert_eq!(column.bloom_filter_length().is_some(), with_length);
let sbbf = builder
.get_row_group_column_bloom_filter(0, 0)
.await
.unwrap()
.unwrap();
assert!(sbbf.check(&"Hello"));
assert!(!sbbf.check(&"Hello_Not_Exists"));
}
#[tokio::test]
async fn test_nested_skip() {
let schema = Arc::new(Schema::new(vec![
Field::new("col_1", DataType::UInt64, false),
Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
]));
let props = WriterProperties::builder()
.set_data_page_row_count_limit(256)
.set_write_batch_size(256)
.set_max_row_group_size(1024);
let mut file = tempfile().unwrap();
let mut writer =
ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
let mut builder = ListBuilder::new(StringBuilder::new());
for id in 0..1024 {
match id % 3 {
0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1 => builder.append_value([Some(format!("id_{id}"))]),
_ => builder.append_null(),
}
}
let refs = vec![
Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
Arc::new(builder.finish()) as ArrayRef,
];
let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let selections = [
RowSelection::from(vec![
RowSelector::skip(313),
RowSelector::select(1),
RowSelector::skip(709),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::skip(255),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::select(255),
RowSelector::skip(1),
RowSelector::select(767),
RowSelector::skip(1),
]),
RowSelection::from(vec![
RowSelector::skip(254),
RowSelector::select(1),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
];
for selection in selections {
let expected = selection.row_count();
let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
tokio::fs::File::from_std(file.try_clone().unwrap()),
ArrowReaderOptions::new().with_page_index(true),
)
.await
.unwrap();
reader = reader.with_row_selection(selection);
let mut stream = reader.build().unwrap();
let mut total_rows = 0;
while let Some(rb) = stream.next().await {
let rb = rb.unwrap();
total_rows += rb.num_rows();
}
assert_eq!(total_rows, expected);
}
}
#[tokio::test]
async fn test_row_filter_nested() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
let b = StructArray::from(vec![
(
Arc::new(Field::new("aa", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["a", "b", "b", "b", "c", "c"])) as ArrayRef,
),
(
Arc::new(Field::new("bb", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5", "6"])) as ArrayRef,
),
]);
let c = Int32Array::from_iter(0..6);
let data = RecordBatch::try_from_iter([
("a", Arc::new(a) as ArrayRef),
("b", Arc::new(b) as ArrayRef),
("c", Arc::new(c) as ArrayRef),
])
.unwrap();
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
writer.write(&data).unwrap();
writer.close().unwrap();
let data: Bytes = buf.into();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
let test = TestReader::new(data);
let requests = test.requests.clone();
let a_scalar = StringArray::from_iter_values(["b"]);
let a_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![0]),
move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
);
let b_scalar = StringArray::from_iter_values(["4"]);
let b_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![2]),
move |batch| {
let struct_array = batch
.column(0)
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
eq(struct_array.column(0), &Scalar::new(&b_scalar))
},
);
let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 3]);
let stream = ParquetRecordBatchStreamBuilder::new(test)
.await
.unwrap()
.with_projection(mask.clone())
.with_batch_size(1024)
.with_row_filter(filter)
.build()
.unwrap();
let batches: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 2);
let col = batch.column(0);
let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
assert_eq!(val, "b");
let col = batch.column(1);
let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
assert_eq!(val, 3);
assert_eq!(requests.lock().unwrap().len(), 3);
}
#[tokio::test]
#[allow(deprecated)]
async fn empty_offset_index_doesnt_panic_in_read_row_group() {
use tokio::fs::File;
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let mut file = File::open(&path).await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let mut metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut file, file_size)
.await
.unwrap();
metadata.set_offset_index(Some(vec![]));
let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
.build()
.unwrap();
let result = reader.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(result.len(), 1);
}
#[tokio::test]
#[allow(deprecated)]
async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
use tokio::fs::File;
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let mut file = File::open(&path).await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut file, file_size)
.await
.unwrap();
let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
.build()
.unwrap();
let result = reader.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(result.len(), 8);
}
#[tokio::test]
#[allow(deprecated)]
async fn empty_offset_index_doesnt_panic_in_column_chunks() {
use tempfile::TempDir;
use tokio::fs::File;
fn write_metadata_to_local_file(
metadata: ParquetMetaData,
file: impl AsRef<std::path::Path>,
) {
use crate::file::metadata::ParquetMetaDataWriter;
use std::fs::File;
let file = File::create(file).unwrap();
ParquetMetaDataWriter::new(file, &metadata)
.finish()
.unwrap()
}
fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
use std::fs::File;
let file = File::open(file).unwrap();
ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&file)
.unwrap()
}
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let mut file = File::open(&path).await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut file, file_size)
.await
.unwrap();
let tempdir = TempDir::new().unwrap();
let metadata_path = tempdir.path().join("thrift_metadata.dat");
write_metadata_to_local_file(metadata, &metadata_path);
let metadata = read_metadata_from_local_file(&metadata_path);
let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
.build()
.unwrap();
let result = reader.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(result.len(), 1);
}
#[tokio::test]
async fn test_cached_array_reader_sparse_offset_error() {
use futures::TryStreamExt;
use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
use arrow_array::{BooleanArray, RecordBatch};
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data);
let options = ArrowReaderOptions::new().with_page_index(true);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();
let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
let parquet_schema = builder.parquet_schema();
let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
Ok(BooleanArray::from(vec![true; batch.num_rows()]))
});
let filter = RowFilter::new(vec![Box::new(always_true)]);
let stream = builder
.with_batch_size(8)
.with_projection(proj)
.with_row_selection(selection)
.with_row_filter(filter)
.build()
.unwrap();
let _result: Vec<_> = stream.try_collect().await.unwrap();
}
#[tokio::test]
async fn test_predicate_cache_disabled() {
let k = Int32Array::from_iter_values(0..10);
let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
let mut buf = Vec::new();
let props = WriterProperties::builder()
.set_data_page_row_count_limit(1)
.set_write_batch_size(1)
.set_max_row_group_size(10)
.set_write_page_header_statistics(true)
.build();
let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
writer.write(&data).unwrap();
writer.close().unwrap();
let data = Bytes::from(buf);
let metadata = ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Required)
.parse_and_finish(&data)
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
let build_filter = || {
let scalar = Int32Array::from_iter_values([5]);
let predicate = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![0]),
move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
);
RowFilter::new(vec![Box::new(predicate)])
};
let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader_with_cache = TestReader::new(data.clone());
let requests_with_cache = reader_with_cache.requests.clone();
let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
reader_with_cache,
reader_metadata.clone(),
)
.with_batch_size(1000)
.with_row_selection(selection.clone())
.with_row_filter(build_filter())
.build()
.unwrap();
let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
let reader_without_cache = TestReader::new(data);
let requests_without_cache = reader_without_cache.requests.clone();
let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
reader_without_cache,
reader_metadata,
)
.with_batch_size(1000)
.with_row_selection(selection)
.with_row_filter(build_filter())
.with_max_predicate_cache_size(0) .build()
.unwrap();
let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(batches_with_cache, batches_without_cache);
let requests_with_cache = requests_with_cache.lock().unwrap();
let requests_without_cache = requests_without_cache.lock().unwrap();
assert_eq!(requests_with_cache.len(), 11);
assert_eq!(requests_without_cache.len(), 2);
assert_eq!(
requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
433
);
assert_eq!(
requests_without_cache
.iter()
.map(|r| r.len())
.sum::<usize>(),
92
);
}
#[test]
fn test_row_numbers_with_multiple_row_groups() {
test_row_numbers_with_multiple_row_groups_helper(
false,
|path, selection, _row_filter, batch_size| {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Could not create runtime");
runtime.block_on(async move {
let file = tokio::fs::File::open(path).await.unwrap();
let row_number_field = Arc::new(
Field::new("row_number", DataType::Int64, false)
.with_extension_type(RowNumber),
);
let options = ArrowReaderOptions::new()
.with_virtual_columns(vec![row_number_field])
.unwrap();
let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
.await
.unwrap()
.with_row_selection(selection)
.with_batch_size(batch_size)
.build()
.expect("Could not create reader");
reader.try_collect::<Vec<_>>().await.unwrap()
})
},
);
}
#[test]
fn test_row_numbers_with_multiple_row_groups_and_filter() {
test_row_numbers_with_multiple_row_groups_helper(
true,
|path, selection, row_filter, batch_size| {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Could not create runtime");
runtime.block_on(async move {
let file = tokio::fs::File::open(path).await.unwrap();
let row_number_field = Arc::new(
Field::new("row_number", DataType::Int64, false)
.with_extension_type(RowNumber),
);
let options = ArrowReaderOptions::new()
.with_virtual_columns(vec![row_number_field])
.unwrap();
let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
.await
.unwrap()
.with_row_selection(selection)
.with_row_filter(row_filter.expect("No row filter"))
.with_batch_size(batch_size)
.build()
.expect("Could not create reader");
reader.try_collect::<Vec<_>>().await.unwrap()
})
},
);
}
#[tokio::test]
async fn test_nested_lists() -> Result<()> {
let list_inner_field = Arc::new(Field::new("item", DataType::Float32, true));
let table_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("vector", DataType::List(list_inner_field.clone()), true),
]));
let mut list_builder =
ListBuilder::new(Float32Builder::new()).with_field(list_inner_field.clone());
list_builder.values().append_slice(&[10.0, 10.0, 10.0]);
list_builder.append(true);
list_builder.values().append_slice(&[20.0, 20.0, 20.0]);
list_builder.append(true);
list_builder.values().append_slice(&[30.0, 30.0, 30.0]);
list_builder.append(true);
list_builder.values().append_slice(&[40.0, 40.0, 40.0]);
list_builder.append(true);
let list_array = list_builder.finish();
let data = vec![RecordBatch::try_new(
table_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
Arc::new(list_array),
],
)?];
let mut buffer = Vec::new();
let mut writer = AsyncArrowWriter::try_new(&mut buffer, table_schema, None)?;
for batch in data {
writer.write(&batch).await?;
}
writer.close().await?;
let reader = TestReader::new(Bytes::from(buffer));
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
Ok(BooleanArray::from(vec![true; batch.num_rows()]))
});
let projection_mask = ProjectionMask::all();
let mut stream = builder
.with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
.with_projection(projection_mask)
.build()?;
while let Some(batch) = stream.next().await {
let _ = batch.unwrap(); }
Ok(())
}
#[tokio::test]
async fn test_predicate_pushdown_with_skipped_pages() {
use arrow_array::TimestampNanosecondArray;
use arrow_schema::TimeUnit;
const TIME_IN_RANGE_START: i64 = 1_704_092_400_000_000_000;
const TIME_IN_RANGE_END: i64 = 1_704_110_400_000_000_000;
const TIME_BEFORE_RANGE: i64 = 1_704_078_000_000_000_000;
let schema = Arc::new(Schema::new(vec![
Field::new(
"time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("tag", DataType::Utf8, false),
]));
let props = WriterProperties::builder()
.set_max_row_group_size(300)
.set_data_page_row_count_limit(33)
.build();
let mut buffer = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap();
for _ in 0..2 {
for (tag_idx, tag) in ["a", "b", "c"].iter().enumerate() {
let times: Vec<i64> = (0..100)
.map(|j| {
let row_idx = tag_idx * 100 + j;
if row_idx % 2 == 0 {
TIME_IN_RANGE_START + (j as i64 * 1_000_000)
} else {
TIME_BEFORE_RANGE + (j as i64 * 1_000_000)
}
})
.collect();
let tags: Vec<&str> = (0..100).map(|_| *tag).collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampNanosecondArray::from(times)) as ArrayRef,
Arc::new(StringArray::from(tags)) as ArrayRef,
],
)
.unwrap();
writer.write(&batch).unwrap();
}
writer.flush().unwrap();
}
writer.close().unwrap();
let buffer = Bytes::from(buffer);
for policy in [
PageIndexPolicy::Skip,
PageIndexPolicy::Optional,
PageIndexPolicy::Required,
] {
println!("Testing with page index policy: {:?}", policy);
let reader = TestReader::new(buffer.clone());
let options = ArrowReaderOptions::default().with_page_index_policy(policy);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await
.unwrap();
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
let num_row_groups = builder.metadata().num_row_groups();
let mut selectors = Vec::new();
for _ in 0..num_row_groups {
selectors.push(RowSelector::select(100));
selectors.push(RowSelector::skip(100));
selectors.push(RowSelector::select(100));
}
let selection = RowSelection::from(selectors);
let time_gte_predicate =
ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, [0]), |batch| {
let col = batch.column(0).as_primitive::<TimestampNanosecondType>();
Ok(BooleanArray::from_iter(
col.iter().map(|t| t.map(|v| v >= TIME_IN_RANGE_START)),
))
});
let time_lt_predicate =
ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, [0]), |batch| {
let col = batch.column(0).as_primitive::<TimestampNanosecondType>();
Ok(BooleanArray::from_iter(
col.iter().map(|t| t.map(|v| v < TIME_IN_RANGE_END)),
))
});
let row_filter = RowFilter::new(vec![
Box::new(time_gte_predicate),
Box::new(time_lt_predicate),
]);
let projection = ProjectionMask::roots(&schema_descr, [1]);
let stream = builder
.with_row_filter(row_filter)
.with_row_selection(selection)
.with_projection(projection)
.build()
.unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
let batch = concat_batches(&batches[0].schema(), &batches).unwrap();
assert_eq!(batch.num_columns(), 1);
let expected = StringArray::from_iter_values(
std::iter::repeat_n("a", 50)
.chain(std::iter::repeat_n("c", 50))
.chain(std::iter::repeat_n("a", 50))
.chain(std::iter::repeat_n("c", 50)),
);
assert_eq!(batch.column(0).as_string(), &expected);
}
}
}