use super::io::TestReader;
use arrow::array::ArrayRef;
use arrow::array::Int64Array;
use arrow::compute::and;
use arrow::compute::kernels::cmp::{gt, lt};
use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray};
use arrow_schema::{DataType, Field};
use bytes::Bytes;
use futures::StreamExt;
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter};
use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder};
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::properties::WriterProperties;
use std::sync::Arc;
use std::sync::LazyLock;
#[tokio::test]
async fn test_default_read() {
let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0);
let sync_builder = test.sync_builder();
test.run_sync(sync_builder);
let async_builder = test.async_builder().await;
test.run_async(async_builder).await;
}
#[tokio::test]
async fn test_async_cache_with_filters() {
let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(49);
let async_builder = test.async_builder().await.add_project_ab_and_filter_b();
test.run_async(async_builder).await;
}
#[tokio::test]
async fn test_sync_cache_with_filters() {
let test = ParquetPredicateCacheTest::new()
.with_expected_records_read_from_cache(0);
let sync_builder = test.sync_builder().add_project_ab_and_filter_b();
test.run_sync(sync_builder);
}
#[tokio::test]
async fn test_cache_disabled_with_filters() {
let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0);
let sync_builder = test
.sync_builder()
.with_max_predicate_cache_size(0)
.add_project_ab_and_filter_b();
test.run_sync(sync_builder);
let async_builder = test
.async_builder()
.await
.with_max_predicate_cache_size(0)
.add_project_ab_and_filter_b();
test.run_async(async_builder).await;
}
#[tokio::test]
async fn test_cache_projection_excludes_nested_columns() {
let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0);
let sync_builder = test.sync_builder().add_nested_filter();
test.run_sync(sync_builder);
let async_builder = test.async_builder().await.add_nested_filter();
test.run_async(async_builder).await;
}
struct ParquetPredicateCacheTest {
bytes: Bytes,
expected_records_read_from_cache: usize,
}
impl ParquetPredicateCacheTest {
fn new() -> Self {
Self {
bytes: TEST_FILE_DATA.clone(),
expected_records_read_from_cache: 0,
}
}
fn new_nested() -> Self {
Self {
bytes: NESTED_TEST_FILE_DATA.clone(),
expected_records_read_from_cache: 0,
}
}
fn with_expected_records_read_from_cache(
mut self,
expected_records_read_from_cache: usize,
) -> Self {
self.expected_records_read_from_cache = expected_records_read_from_cache;
self
}
fn sync_builder(&self) -> ParquetRecordBatchReaderBuilder<Bytes> {
let reader = self.bytes.clone();
ParquetRecordBatchReaderBuilder::try_new_with_options(reader, ArrowReaderOptions::default())
.expect("ParquetRecordBatchReaderBuilder")
}
async fn async_builder(&self) -> ParquetRecordBatchStreamBuilder<TestReader> {
let reader = TestReader::new(self.bytes.clone());
ParquetRecordBatchStreamBuilder::new_with_options(reader, ArrowReaderOptions::default())
.await
.unwrap()
}
fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder<Bytes>) {
let metrics = ArrowReaderMetrics::enabled();
let reader = builder.with_metrics(metrics.clone()).build().unwrap();
for batch in reader {
match batch {
Ok(_) => {}
Err(e) => panic!("Error reading batch: {e}"),
}
}
self.verify_metrics(metrics)
}
async fn run_async(&self, builder: ParquetRecordBatchStreamBuilder<TestReader>) {
let metrics = ArrowReaderMetrics::enabled();
let mut stream = builder.with_metrics(metrics.clone()).build().unwrap();
while let Some(batch) = stream.next().await {
match batch {
Ok(_) => {}
Err(e) => panic!("Error reading batch: {e}"),
}
}
self.verify_metrics(metrics)
}
fn verify_metrics(&self, metrics: ArrowReaderMetrics) {
let Self {
bytes: _,
expected_records_read_from_cache,
} = self;
let read_from_cache = metrics
.records_read_from_cache()
.expect("Metrics enabled, so should have metrics");
assert_eq!(
&read_from_cache, expected_records_read_from_cache,
"Expected {expected_records_read_from_cache} records read from cache, but got {read_from_cache}"
);
}
}
static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
if i % 2 == 0 {
format!("string_{i}")
} else {
format!("A string larger than 12 bytes and thus not inlined {i}")
}
})));
let input_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
let mut output = Vec::new();
let writer_options = WriterProperties::builder()
.set_max_row_group_row_count(Some(200))
.set_data_page_row_count_limit(100)
.build();
let mut writer =
ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
let mut row_remain = input_batch.num_rows();
while row_remain > 0 {
let chunk_size = row_remain.min(50);
let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
writer.write(&chunk).unwrap();
row_remain -= chunk_size;
}
writer.close().unwrap();
Bytes::from(output)
});
static NESTED_TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
const NUM_ROWS: usize = 100;
let a: StringArray = (0..NUM_ROWS).map(|i| Some(format!("r{i}"))).collect();
let aa: StringArray = (0..NUM_ROWS).map(|i| Some(format!("v{i}"))).collect();
let bb: StringArray = (0..NUM_ROWS).map(|i| Some(format!("w{i}"))).collect();
let b = StructArray::from(vec![
(
Arc::new(Field::new("aa", DataType::Utf8, true)),
Arc::new(aa) as ArrayRef,
),
(
Arc::new(Field::new("bb", DataType::Utf8, true)),
Arc::new(bb) as ArrayRef,
),
]);
let input_batch = RecordBatch::try_from_iter([
("a", Arc::new(a) as ArrayRef),
("b", Arc::new(b) as ArrayRef),
])
.unwrap();
let mut output = Vec::new();
let writer_options = None;
let mut writer =
ArrowWriter::try_new(&mut output, input_batch.schema(), writer_options).unwrap();
writer.write(&input_batch).unwrap();
writer.close().unwrap();
Bytes::from(output)
});
trait ArrowReaderBuilderExt {
fn add_project_ab_and_filter_b(self) -> Self;
fn add_nested_filter(self) -> Self;
}
impl<T> ArrowReaderBuilderExt for ArrowReaderBuilder<T> {
fn add_project_ab_and_filter_b(self) -> Self {
let schema_descr = self.metadata().file_metadata().schema_descr_ptr();
let row_filter = ArrowPredicateFn::new(
ProjectionMask::columns(&schema_descr, ["b"]),
|batch: RecordBatch| {
let scalar_575 = Int64Array::new_scalar(575);
let scalar_625 = Int64Array::new_scalar(625);
let column = batch.column(0).as_primitive::<Int64Type>();
and(>(column, &scalar_575)?, <(column, &scalar_625)?)
},
);
self.with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"]))
.with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
}
fn add_nested_filter(self) -> Self {
let schema_descr = self.metadata().file_metadata().schema_descr_ptr();
let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]);
let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
Ok(arrow_array::BooleanArray::from(vec![
true;
batch.num_rows()
]))
});
let row_filter = RowFilter::new(vec![Box::new(always_true)]);
self.with_projection(nested_leaf_mask)
.with_row_filter(row_filter)
}
}