use std::fmt::Formatter;
use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use crate::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
};
use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
use crate::bloom_filter::{
SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
mod metadata;
pub use metadata::*;
#[cfg(feature = "object_store")]
mod store;
use crate::DecodeResult;
use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder};
#[cfg(feature = "object_store")]
pub use store::*;
pub trait AsyncFileReader: Send {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
async move {
let mut result = Vec::with_capacity(ranges.len());
for range in ranges.into_iter() {
let data = self.get_bytes(range).await?;
result.push(data);
}
Ok(result)
}
.boxed()
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
}
impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
self.as_mut().get_bytes(range)
}
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
self.as_mut().get_byte_ranges(ranges)
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
self.as_mut().get_metadata(options)
}
}
impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
async move {
self.seek(SeekFrom::End(-(suffix as i64))).await?;
let mut buf = Vec::with_capacity(suffix);
self.take(suffix as _).read_to_end(&mut buf).await?;
Ok(buf.into())
}
.boxed()
}
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
async move {
self.seek(SeekFrom::Start(range.start)).await?;
let to_read = range.end - range.start;
let mut buffer = Vec::with_capacity(to_read.try_into()?);
let read = self.take(to_read).read_to_end(&mut buffer).await?;
if read as u64 != to_read {
return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
}
Ok(buffer.into())
}
.boxed()
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
async move {
let metadata_opts = options.map(|o| o.metadata_options().clone());
let mut metadata_reader =
ParquetMetaDataReader::new().with_metadata_options(metadata_opts);
if let Some(opts) = options {
metadata_reader = metadata_reader
.with_column_index_policy(opts.column_index_policy())
.with_offset_index_policy(opts.offset_index_policy());
}
#[cfg(feature = "encryption")]
let metadata_reader = metadata_reader.with_decryption_properties(
options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
);
let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
Ok(Arc::new(parquet_metadata))
}
.boxed()
}
}
impl ArrowReaderMetadata {
pub async fn load_async<T: AsyncFileReader>(
input: &mut T,
options: ArrowReaderOptions,
) -> Result<Self> {
let metadata = input.get_metadata(Some(&options)).await?;
Self::try_new(metadata, options)
}
}
#[doc(hidden)]
pub struct AsyncReader<T>(T);
pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
pub async fn new(input: T) -> Result<Self> {
Self::new_with_options(input, Default::default()).await
}
pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
Ok(Self::new_with_metadata(input, metadata))
}
pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
Self::new_builder(AsyncReader(input), metadata)
}
pub async fn get_row_group_column_bloom_filter(
&mut self,
row_group_idx: usize,
column_idx: usize,
) -> Result<Option<Sbbf>> {
let metadata = self.metadata.row_group(row_group_idx);
let column_metadata = metadata.column(column_idx);
let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
offset
.try_into()
.map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
} else {
return Ok(None);
};
let buffer = match column_metadata.bloom_filter_length() {
Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
None => self
.input
.0
.get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
}
.await?;
let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
match header.algorithm {
BloomFilterAlgorithm::BLOCK => {
}
}
match header.compression {
BloomFilterCompression::UNCOMPRESSED => {
}
}
match header.hash {
BloomFilterHash::XXHASH => {
}
}
let bitset = match column_metadata.bloom_filter_length() {
Some(_) => buffer.slice(
(TryInto::<usize>::try_into(bitset_offset).unwrap()
- TryInto::<usize>::try_into(offset).unwrap())..,
),
None => {
let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
self.input
.0
.get_bytes(bitset_offset..bitset_offset + bitset_length)
.await?
}
};
Ok(Some(Sbbf::new(&bitset)))
}
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let Self {
input,
metadata,
schema,
fields,
batch_size,
row_groups,
projection,
filter,
selection,
row_selection_policy: selection_strategy,
limit,
offset,
metrics,
max_predicate_cache_size,
} = self;
let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| m.len());
let projected_fields = schema
.fields
.filter_leaves(|idx, _| idx < projection_len && projection.leaf_included(idx));
let projected_schema = Arc::new(Schema::new(projected_fields));
let decoder = ParquetPushDecoderBuilder {
input: NoInput,
metadata,
schema,
fields,
projection,
filter,
selection,
row_selection_policy: selection_strategy,
batch_size,
row_groups,
limit,
offset,
metrics,
max_predicate_cache_size,
}
.build()?;
let request_state = RequestState::None { input: input.0 };
Ok(ParquetRecordBatchStream {
schema: projected_schema,
decoder,
request_state,
})
}
}
enum RequestState<T> {
None {
input: T,
},
Outstanding {
ranges: Vec<Range<u64>>,
future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
},
Done,
}
impl<T> RequestState<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
let ranges_captured = ranges.clone();
let future = async move {
let data = input.get_byte_ranges(ranges_captured).await?;
Ok((input, data))
}
.boxed();
RequestState::Outstanding { ranges, future }
}
}
impl<T> std::fmt::Debug for RequestState<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
RequestState::None { input: _ } => f
.debug_struct("RequestState::None")
.field("input", &"...")
.finish(),
RequestState::Outstanding { ranges, .. } => f
.debug_struct("RequestState::Outstanding")
.field("ranges", &ranges)
.finish(),
RequestState::Done => {
write!(f, "RequestState::Done")
}
}
}
}
pub struct ParquetRecordBatchStream<T> {
schema: SchemaRef,
request_state: RequestState<T>,
decoder: ParquetPushDecoder,
}
impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ParquetRecordBatchStream")
.field("request_state", &self.request_state)
.finish()
}
}
impl<T> ParquetRecordBatchStream<T> {
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
}
impl<T> ParquetRecordBatchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
loop {
let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
match request_state {
RequestState::None { input } => {
match self.decoder.try_next_reader()? {
DecodeResult::NeedsData(ranges) => {
self.request_state = RequestState::begin_request(input, ranges);
continue; }
DecodeResult::Data(reader) => {
self.request_state = RequestState::None { input };
return Ok(Some(reader));
}
DecodeResult::Finished => return Ok(None),
}
}
RequestState::Outstanding { ranges, future } => {
let (input, data) = future.await?;
self.decoder.push_ranges(ranges, data)?;
self.request_state = RequestState::None { input };
continue; }
RequestState::Done => {
self.request_state = RequestState::Done;
return Ok(None);
}
}
}
}
}
impl<T> Stream for ParquetRecordBatchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.poll_next_inner(cx) {
Ok(res) => {
res.map(|res| Ok(res).transpose())
}
Err(e) => {
self.request_state = RequestState::Done;
Poll::Ready(Some(Err(e)))
}
}
}
}
impl<T> ParquetRecordBatchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Result<Poll<Option<RecordBatch>>> {
loop {
let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
match request_state {
RequestState::None { input } => {
match self.decoder.try_decode()? {
DecodeResult::NeedsData(ranges) => {
self.request_state = RequestState::begin_request(input, ranges);
continue; }
DecodeResult::Data(batch) => {
self.request_state = RequestState::None { input };
return Ok(Poll::Ready(Some(batch)));
}
DecodeResult::Finished => {
self.request_state = RequestState::Done;
return Ok(Poll::Ready(None));
}
}
}
RequestState::Outstanding { ranges, mut future } => match future.poll_unpin(cx) {
Poll::Ready(result) => {
let (input, data) = result?;
self.decoder.push_ranges(ranges, data)?;
self.request_state = RequestState::None { input };
continue; }
Poll::Pending => {
self.request_state = RequestState::Outstanding { ranges, future };
return Ok(Poll::Pending);
}
},
RequestState::Done => {
self.request_state = RequestState::Done;
return Ok(Poll::Ready(None));
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper;
use crate::arrow::arrow_reader::{
ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
};
use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use crate::arrow::schema::virtual_type::RowNumber;
use crate::arrow::{ArrowWriter, AsyncArrowWriter, ProjectionMask};
use crate::file::metadata::PageIndexPolicy;
use crate::file::metadata::ParquetMetaDataReader;
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
use arrow::error::Result as ArrowResult;
use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::{
Array, ArrayRef, BooleanArray, Int32Array, RecordBatchReader, Scalar, StringArray,
StructArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use futures::{StreamExt, TryStreamExt};
use rand::{Rng, rng};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tempfile::tempfile;
#[derive(Clone)]
struct TestReader {
data: Bytes,
metadata: Option<Arc<ParquetMetaData>>,
requests: Arc<Mutex<Vec<Range<usize>>>>,
}
impl TestReader {
fn new(data: Bytes) -> Self {
Self {
data,
metadata: Default::default(),
requests: Default::default(),
}
}
}
impl AsyncFileReader for TestReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
let range = range.clone();
self.requests
.lock()
.unwrap()
.push(range.start as usize..range.end as usize);
futures::future::ready(Ok(self
.data
.slice(range.start as usize..range.end as usize)))
.boxed()
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
let mut metadata_reader = ParquetMetaDataReader::new();
if let Some(opts) = options {
metadata_reader = metadata_reader
.with_column_index_policy(opts.column_index_policy())
.with_offset_index_policy(opts.offset_index_policy());
}
self.metadata = Some(Arc::new(
metadata_reader.parse_and_finish(&self.data).unwrap(),
));
futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
}
}
#[tokio::test]
async fn test_async_reader() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data.clone());
let requests = async_reader.requests.clone();
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
let metadata = builder.metadata().clone();
assert_eq!(metadata.num_row_groups(), 1);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.build()
.unwrap();
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(104)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(async_batches, sync_batches);
let requests = requests.lock().unwrap();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
assert_eq!(
&requests[..],
&[
offset_1 as usize..(offset_1 + length_1) as usize,
offset_2 as usize..(offset_2 + length_2) as usize
]
);
}
#[tokio::test]
async fn test_async_reader_with_next_row_group() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data.clone());
let requests = async_reader.requests.clone();
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
let metadata = builder.metadata().clone();
assert_eq!(metadata.num_row_groups(), 1);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let mut stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.build()
.unwrap();
let mut readers = vec![];
while let Some(reader) = stream.next_row_group().await.unwrap() {
readers.push(reader);
}
let async_batches: Vec<_> = readers
.into_iter()
.flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
.collect();
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(104)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(async_batches, sync_batches);
let requests = requests.lock().unwrap();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
assert_eq!(
&requests[..],
&[
offset_1 as usize..(offset_1 + length_1) as usize,
offset_2 as usize..(offset_2 + length_2) as usize
]
);
}
#[tokio::test]
async fn test_async_reader_with_index() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data.clone());
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();
let metadata_with_index = builder.metadata();
assert_eq!(metadata_with_index.num_row_groups(), 1);
let offset_index = metadata_with_index.offset_index().unwrap();
let column_index = metadata_with_index.column_index().unwrap();
assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
let num_columns = metadata_with_index
.file_metadata()
.schema_descr()
.num_columns();
offset_index
.iter()
.for_each(|x| assert_eq!(x.len(), num_columns));
column_index
.iter()
.for_each(|x| assert_eq!(x.len(), num_columns));
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.build()
.unwrap();
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(1024)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(async_batches, sync_batches);
}
#[tokio::test]
async fn test_async_reader_with_limit() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let metadata = Arc::new(metadata);
assert_eq!(metadata.num_row_groups(), 1);
let async_reader = TestReader::new(data.clone());
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
assert_eq!(builder.metadata().num_row_groups(), 1);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.with_limit(1)
.build()
.unwrap();
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(1024)
.with_limit(1)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(async_batches, sync_batches);
}
#[tokio::test]
async fn test_async_reader_skip_pages() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data.clone());
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();
assert_eq!(builder.metadata().num_row_groups(), 1);
let selection = RowSelection::from(vec![
RowSelector::skip(21), RowSelector::select(21), RowSelector::skip(41), RowSelector::select(41), RowSelector::skip(25), RowSelector::select(25), RowSelector::skip(7116), RowSelector::select(10), ]);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
let stream = builder
.with_projection(mask.clone())
.with_row_selection(selection.clone())
.build()
.expect("building stream");
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(1024)
.with_row_selection(selection)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(async_batches, sync_batches);
}
#[tokio::test]
async fn test_fuzz_async_reader_selection() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let mut rand = rng();
for _ in 0..100 {
let mut expected_rows = 0;
let mut total_rows = 0;
let mut skip = false;
let mut selectors = vec![];
while total_rows < 7300 {
let row_count: usize = rand.random_range(1..100);
let row_count = row_count.min(7300 - total_rows);
selectors.push(RowSelector { row_count, skip });
total_rows += row_count;
if !skip {
expected_rows += row_count;
}
skip = !skip;
}
let selection = RowSelection::from(selectors);
let async_reader = TestReader::new(data.clone());
let options =
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();
assert_eq!(builder.metadata().num_row_groups(), 1);
let col_idx: usize = rand.random_range(0..13);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
let stream = builder
.with_projection(mask.clone())
.with_row_selection(selection.clone())
.build()
.expect("building stream");
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
assert_eq!(actual_rows, expected_rows);
}
}
#[tokio::test]
async fn test_async_reader_zero_row_selector() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let mut rand = rng();
let mut expected_rows = 0;
let mut total_rows = 0;
let mut skip = false;
let mut selectors = vec![];
selectors.push(RowSelector {
row_count: 0,
skip: false,
});
while total_rows < 7300 {
let row_count: usize = rand.random_range(1..100);
let row_count = row_count.min(7300 - total_rows);
selectors.push(RowSelector { row_count, skip });
total_rows += row_count;
if !skip {
expected_rows += row_count;
}
skip = !skip;
}
let selection = RowSelection::from(selectors);
let async_reader = TestReader::new(data.clone());
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();
assert_eq!(builder.metadata().num_row_groups(), 1);
let col_idx: usize = rand.random_range(0..13);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
let stream = builder
.with_projection(mask.clone())
.with_row_selection(selection.clone())
.build()
.expect("building stream");
let async_batches: Vec<_> = stream.try_collect().await.unwrap();
let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
assert_eq!(actual_rows, expected_rows);
}
#[tokio::test]
async fn test_limit_multiple_row_groups() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
let c = Int32Array::from_iter(0..6);
let data = RecordBatch::try_from_iter([
("a", Arc::new(a) as ArrayRef),
("b", Arc::new(b) as ArrayRef),
("c", Arc::new(c) as ArrayRef),
])
.unwrap();
let mut buf = Vec::with_capacity(1024);
let props = WriterProperties::builder()
.set_max_row_group_row_count(Some(3))
.build();
let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
writer.write(&data).unwrap();
writer.close().unwrap();
let data: Bytes = buf.into();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
assert_eq!(metadata.num_row_groups(), 2);
let test = TestReader::new(data);
let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
.await
.unwrap()
.with_batch_size(1024)
.with_limit(4)
.build()
.unwrap();
let batches: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
let batch = &batches[0];
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 3);
let col2 = batch.column(2).as_primitive::<Int32Type>();
assert_eq!(col2.values(), &[0, 1, 2]);
let batch = &batches[1];
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 3);
let col2 = batch.column(2).as_primitive::<Int32Type>();
assert_eq!(col2.values(), &[3]);
let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
.await
.unwrap()
.with_offset(2)
.with_limit(3)
.build()
.unwrap();
let batches: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
let batch = &batches[0];
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 3);
let col2 = batch.column(2).as_primitive::<Int32Type>();
assert_eq!(col2.values(), &[2]);
let batch = &batches[1];
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 3);
let col2 = batch.column(2).as_primitive::<Int32Type>();
assert_eq!(col2.values(), &[3, 4]);
let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
.await
.unwrap()
.with_offset(4)
.with_limit(20)
.build()
.unwrap();
let batches: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 3);
let col2 = batch.column(2).as_primitive::<Int32Type>();
assert_eq!(col2.values(), &[4, 5]);
}
#[tokio::test]
async fn test_batch_size_overallocate() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data.clone());
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
let file_rows = builder.metadata().file_metadata().num_rows() as usize;
let builder = builder
.with_projection(ProjectionMask::all())
.with_batch_size(1024);
assert_ne!(1024, file_rows);
assert_eq!(builder.batch_size, file_rows);
let _stream = builder.build().unwrap();
}
#[tokio::test]
async fn test_parquet_record_batch_stream_schema() {
fn get_all_field_names(schema: &Schema) -> Vec<&String> {
schema.flattened_fields().iter().map(|f| f.name()).collect()
}
let mut metadata = HashMap::with_capacity(1);
metadata.insert("key".to_string(), "value".to_string());
let nested_struct_array = StructArray::from(vec![
(
Arc::new(Field::new("d", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
),
(
Arc::new(Field::new("e", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
),
]);
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("a", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
),
(
Arc::new(Field::new("b", DataType::UInt64, true)),
Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
),
(
Arc::new(Field::new(
"c",
nested_struct_array.data_type().clone(),
true,
)),
Arc::new(nested_struct_array) as ArrayRef,
),
]);
let schema =
Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
let record_batch = RecordBatch::from(struct_array)
.with_schema(schema.clone())
.unwrap();
let mut file = tempfile().unwrap();
let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
writer.write(&record_batch).unwrap();
writer.close().unwrap();
let all_fields = ["a", "b", "c", "d", "e"];
let projections = [
(vec![], vec![]),
(vec![0], vec!["a"]),
(vec![0, 1], vec!["a", "b"]),
(vec![0, 1, 2], vec!["a", "b", "c", "d"]),
(vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
];
for (indices, expected_projected_names) in projections {
let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
assert_eq!(get_all_field_names(&builder), all_fields);
assert_eq!(builder.metadata, metadata);
assert_eq!(get_all_field_names(&reader), expected_projected_names);
assert_eq!(reader.metadata, HashMap::default());
assert_eq!(get_all_field_names(&batch), expected_projected_names);
assert_eq!(batch.metadata, HashMap::default());
};
let builder =
ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
let sync_builder_schema = builder.schema().clone();
let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
let mut reader = builder.with_projection(mask).build().unwrap();
let sync_reader_schema = reader.schema();
let batch = reader.next().unwrap().unwrap();
let sync_batch_schema = batch.schema();
assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
let file = tokio::fs::File::from(file.try_clone().unwrap());
let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
let async_builder_schema = builder.schema().clone();
let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
let mut reader = builder.with_projection(mask).build().unwrap();
let async_reader_schema = reader.schema().clone();
let batch = reader.next().await.unwrap().unwrap();
let async_batch_schema = batch.schema();
assert_schemas(
async_builder_schema,
async_reader_schema,
async_batch_schema,
);
}
}
#[tokio::test]
async fn test_nested_skip() {
let schema = Arc::new(Schema::new(vec![
Field::new("col_1", DataType::UInt64, false),
Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
]));
let props = WriterProperties::builder()
.set_data_page_row_count_limit(256)
.set_write_batch_size(256)
.set_max_row_group_row_count(Some(1024));
let mut file = tempfile().unwrap();
let mut writer =
ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
let mut builder = ListBuilder::new(StringBuilder::new());
for id in 0..1024 {
match id % 3 {
0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1 => builder.append_value([Some(format!("id_{id}"))]),
_ => builder.append_null(),
}
}
let refs = vec![
Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
Arc::new(builder.finish()) as ArrayRef,
];
let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let selections = [
RowSelection::from(vec![
RowSelector::skip(313),
RowSelector::select(1),
RowSelector::skip(709),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::skip(255),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::select(255),
RowSelector::skip(1),
RowSelector::select(767),
RowSelector::skip(1),
]),
RowSelection::from(vec![
RowSelector::skip(254),
RowSelector::select(1),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
];
for selection in selections {
let expected = selection.row_count();
let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
tokio::fs::File::from_std(file.try_clone().unwrap()),
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
)
.await
.unwrap();
reader = reader.with_row_selection(selection);
let mut stream = reader.build().unwrap();
let mut total_rows = 0;
while let Some(rb) = stream.next().await {
let rb = rb.unwrap();
total_rows += rb.num_rows();
}
assert_eq!(total_rows, expected);
}
}
#[tokio::test]
#[allow(deprecated)]
async fn empty_offset_index_doesnt_panic_in_read_row_group() {
use tokio::fs::File;
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let mut file = File::open(&path).await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let mut metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut file, file_size)
.await
.unwrap();
metadata.set_offset_index(Some(vec![]));
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
.build()
.unwrap();
let result = reader.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(result.len(), 1);
}
#[tokio::test]
#[allow(deprecated)]
async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
use tokio::fs::File;
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let mut file = File::open(&path).await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut file, file_size)
.await
.unwrap();
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
.build()
.unwrap();
let result = reader.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(result.len(), 8);
}
#[tokio::test]
#[allow(deprecated)]
async fn empty_offset_index_doesnt_panic_in_column_chunks() {
use tempfile::TempDir;
use tokio::fs::File;
fn write_metadata_to_local_file(
metadata: ParquetMetaData,
file: impl AsRef<std::path::Path>,
) {
use crate::file::metadata::ParquetMetaDataWriter;
use std::fs::File;
let file = File::create(file).unwrap();
ParquetMetaDataWriter::new(file, &metadata)
.finish()
.unwrap()
}
fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
use std::fs::File;
let file = File::open(file).unwrap();
ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&file)
.unwrap()
}
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let mut file = File::open(&path).await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut file, file_size)
.await
.unwrap();
let tempdir = TempDir::new().unwrap();
let metadata_path = tempdir.path().join("thrift_metadata.dat");
write_metadata_to_local_file(metadata, &metadata_path);
let metadata = read_metadata_from_local_file(&metadata_path);
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
.build()
.unwrap();
let result = reader.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(result.len(), 1);
}
#[tokio::test]
async fn test_cached_array_reader_sparse_offset_error() {
use futures::TryStreamExt;
use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
use arrow_array::{BooleanArray, RecordBatch};
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let async_reader = TestReader::new(data);
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();
let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
let parquet_schema = builder.parquet_schema();
let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
Ok(BooleanArray::from(vec![true; batch.num_rows()]))
});
let filter = RowFilter::new(vec![Box::new(always_true)]);
let stream = builder
.with_batch_size(8)
.with_projection(proj)
.with_row_selection(selection)
.with_row_filter(filter)
.build()
.unwrap();
let _result: Vec<_> = stream.try_collect().await.unwrap();
}
#[tokio::test]
async fn test_predicate_cache_disabled() {
let k = Int32Array::from_iter_values(0..10);
let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
let mut buf = Vec::new();
let props = WriterProperties::builder()
.set_data_page_row_count_limit(1)
.set_write_batch_size(1)
.set_max_row_group_row_count(Some(10))
.set_write_page_header_statistics(true)
.build();
let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
writer.write(&data).unwrap();
writer.close().unwrap();
let data = Bytes::from(buf);
let metadata = ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Required)
.parse_and_finish(&data)
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
let build_filter = || {
let scalar = Int32Array::from_iter_values([5]);
let predicate = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![0]),
move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
);
RowFilter::new(vec![Box::new(predicate)])
};
let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader_with_cache = TestReader::new(data.clone());
let requests_with_cache = reader_with_cache.requests.clone();
let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
reader_with_cache,
reader_metadata.clone(),
)
.with_batch_size(1000)
.with_row_selection(selection.clone())
.with_row_filter(build_filter())
.build()
.unwrap();
let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
let reader_without_cache = TestReader::new(data);
let requests_without_cache = reader_without_cache.requests.clone();
let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
reader_without_cache,
reader_metadata,
)
.with_batch_size(1000)
.with_row_selection(selection)
.with_row_filter(build_filter())
.with_max_predicate_cache_size(0) .build()
.unwrap();
let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(batches_with_cache, batches_without_cache);
let requests_with_cache = requests_with_cache.lock().unwrap();
let requests_without_cache = requests_without_cache.lock().unwrap();
assert_eq!(requests_with_cache.len(), 11);
assert_eq!(requests_without_cache.len(), 2);
assert_eq!(
requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
433
);
assert_eq!(
requests_without_cache
.iter()
.map(|r| r.len())
.sum::<usize>(),
92
);
}
#[test]
fn test_row_numbers_with_multiple_row_groups() {
test_row_numbers_with_multiple_row_groups_helper(
false,
|path, selection, _row_filter, batch_size| {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Could not create runtime");
runtime.block_on(async move {
let file = tokio::fs::File::open(path).await.unwrap();
let row_number_field = Arc::new(
Field::new("row_number", DataType::Int64, false)
.with_extension_type(RowNumber),
);
let options = ArrowReaderOptions::new()
.with_virtual_columns(vec![row_number_field])
.unwrap();
let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
.await
.unwrap()
.with_row_selection(selection)
.with_batch_size(batch_size)
.build()
.expect("Could not create reader");
reader.try_collect::<Vec<_>>().await.unwrap()
})
},
);
}
#[test]
fn test_row_numbers_with_multiple_row_groups_and_filter() {
test_row_numbers_with_multiple_row_groups_helper(
true,
|path, selection, row_filter, batch_size| {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Could not create runtime");
runtime.block_on(async move {
let file = tokio::fs::File::open(path).await.unwrap();
let row_number_field = Arc::new(
Field::new("row_number", DataType::Int64, false)
.with_extension_type(RowNumber),
);
let options = ArrowReaderOptions::new()
.with_virtual_columns(vec![row_number_field])
.unwrap();
let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
.await
.unwrap()
.with_row_selection(selection)
.with_row_filter(row_filter.expect("No row filter"))
.with_batch_size(batch_size)
.build()
.expect("Could not create reader");
reader.try_collect::<Vec<_>>().await.unwrap()
})
},
);
}
#[tokio::test]
async fn test_nested_lists() -> Result<()> {
let list_inner_field = Arc::new(Field::new("item", DataType::Float32, true));
let table_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("vector", DataType::List(list_inner_field.clone()), true),
]));
let mut list_builder =
ListBuilder::new(Float32Builder::new()).with_field(list_inner_field.clone());
list_builder.values().append_slice(&[10.0, 10.0, 10.0]);
list_builder.append(true);
list_builder.values().append_slice(&[20.0, 20.0, 20.0]);
list_builder.append(true);
list_builder.values().append_slice(&[30.0, 30.0, 30.0]);
list_builder.append(true);
list_builder.values().append_slice(&[40.0, 40.0, 40.0]);
list_builder.append(true);
let list_array = list_builder.finish();
let data = vec![RecordBatch::try_new(
table_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
Arc::new(list_array),
],
)?];
let mut buffer = Vec::new();
let mut writer = AsyncArrowWriter::try_new(&mut buffer, table_schema, None)?;
for batch in data {
writer.write(&batch).await?;
}
writer.close().await?;
let reader = TestReader::new(Bytes::from(buffer));
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
Ok(BooleanArray::from(vec![true; batch.num_rows()]))
});
let projection_mask = ProjectionMask::all();
let mut stream = builder
.with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
.with_projection(projection_mask)
.build()?;
while let Some(batch) = stream.next().await {
let _ = batch.unwrap(); }
Ok(())
}
}