use arrow::array::ArrayRef;
use arrow::array::Int64Array;
use arrow::compute::and;
use arrow::compute::kernels::cmp::{gt, lt};
use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
use arrow_array::{RecordBatch, StringViewArray};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter};
use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
use parquet::file::properties::WriterProperties;
use std::ops::Range;
use std::sync::Arc;
use std::sync::LazyLock;
#[tokio::test]
async fn test_default_read() {
let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0);
let sync_builder = test.sync_builder();
test.run_sync(sync_builder);
let async_builder = test.async_builder().await;
test.run_async(async_builder).await;
}
#[tokio::test]
async fn test_async_cache_with_filters() {
let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(49);
let async_builder = test.async_builder().await;
let async_builder = test.add_project_ab_and_filter_b(async_builder);
test.run_async(async_builder).await;
}
#[tokio::test]
async fn test_sync_cache_with_filters() {
let test = ParquetPredicateCacheTest::new()
.with_expected_records_read_from_cache(0);
let sync_builder = test.sync_builder();
let sync_builder = test.add_project_ab_and_filter_b(sync_builder);
test.run_sync(sync_builder);
}
#[tokio::test]
async fn test_cache_disabled_with_filters() {
let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0);
let sync_builder = test.sync_builder().with_max_predicate_cache_size(0);
let sync_builder = test.add_project_ab_and_filter_b(sync_builder);
test.run_sync(sync_builder);
let async_builder = test.async_builder().await.with_max_predicate_cache_size(0);
let async_builder = test.add_project_ab_and_filter_b(async_builder);
test.run_async(async_builder).await;
}
struct ParquetPredicateCacheTest {
bytes: Bytes,
expected_records_read_from_cache: usize,
}
impl ParquetPredicateCacheTest {
fn new() -> Self {
Self {
bytes: TEST_FILE_DATA.clone(),
expected_records_read_from_cache: 0,
}
}
fn with_expected_records_read_from_cache(
mut self,
expected_records_read_from_cache: usize,
) -> Self {
self.expected_records_read_from_cache = expected_records_read_from_cache;
self
}
fn sync_builder(&self) -> ParquetRecordBatchReaderBuilder<Bytes> {
let reader = self.bytes.clone();
ParquetRecordBatchReaderBuilder::try_new_with_options(reader, ArrowReaderOptions::default())
.expect("ParquetRecordBatchReaderBuilder")
}
async fn async_builder(&self) -> ParquetRecordBatchStreamBuilder<TestReader> {
let reader = TestReader::new(self.bytes.clone());
ParquetRecordBatchStreamBuilder::new_with_options(reader, ArrowReaderOptions::default())
.await
.unwrap()
}
fn add_project_ab_and_filter_b<T>(
&self,
builder: ArrowReaderBuilder<T>,
) -> ArrowReaderBuilder<T> {
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
let row_filter = ArrowPredicateFn::new(
ProjectionMask::columns(&schema_descr, ["b"]),
|batch: RecordBatch| {
let scalar_575 = Int64Array::new_scalar(575);
let scalar_625 = Int64Array::new_scalar(625);
let column = batch.column(0).as_primitive::<Int64Type>();
and(>(column, &scalar_575)?, <(column, &scalar_625)?)
},
);
builder
.with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"]))
.with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
}
fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder<Bytes>) {
let metrics = ArrowReaderMetrics::enabled();
let reader = builder.with_metrics(metrics.clone()).build().unwrap();
for batch in reader {
match batch {
Ok(_) => {}
Err(e) => panic!("Error reading batch: {e}"),
}
}
self.verify_metrics(metrics)
}
async fn run_async(&self, builder: ParquetRecordBatchStreamBuilder<TestReader>) {
let metrics = ArrowReaderMetrics::enabled();
let mut stream = builder.with_metrics(metrics.clone()).build().unwrap();
while let Some(batch) = stream.next().await {
match batch {
Ok(_) => {}
Err(e) => panic!("Error reading batch: {e}"),
}
}
self.verify_metrics(metrics)
}
fn verify_metrics(&self, metrics: ArrowReaderMetrics) {
let Self {
bytes: _,
expected_records_read_from_cache,
} = self;
let read_from_cache = metrics
.records_read_from_cache()
.expect("Metrics enabled, so should have metrics");
assert_eq!(
&read_from_cache, expected_records_read_from_cache,
"Expected {expected_records_read_from_cache} records read from cache, but got {read_from_cache}"
);
}
}
static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
if i % 2 == 0 {
format!("string_{i}")
} else {
format!("A string larger than 12 bytes and thus not inlined {i}")
}
})));
let input_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
let mut output = Vec::new();
let writer_options = WriterProperties::builder()
.set_max_row_group_size(200)
.set_data_page_row_count_limit(100)
.build();
let mut writer =
ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
let mut row_remain = input_batch.num_rows();
while row_remain > 0 {
let chunk_size = row_remain.min(50);
let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
writer.write(&chunk).unwrap();
row_remain -= chunk_size;
}
writer.close().unwrap();
Bytes::from(output)
});
#[derive(Clone)]
struct TestReader {
data: Bytes,
metadata: Option<Arc<ParquetMetaData>>,
}
impl TestReader {
fn new(data: Bytes) -> Self {
Self {
data,
metadata: Default::default(),
}
}
}
impl AsyncFileReader for TestReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
let range = range.clone();
futures::future::ready(Ok(self
.data
.slice(range.start as usize..range.end as usize)))
.boxed()
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
);
self.metadata = Some(Arc::new(
metadata_reader.parse_and_finish(&self.data).unwrap(),
));
futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
}
}