use std::sync::Arc;
use aisle::PruneRequest;
use arrow_array::{Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use bytes::Bytes;
use datafusion_expr::{col, lit};
use parquet::{
arrow::ArrowWriter,
file::{
metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader},
properties::{EnabledStatistics, WriterProperties},
},
};
fn make_batch_nullable(schema: &Schema, values: Vec<Option<i32>>) -> RecordBatch {
let array = Int32Array::from(values);
RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap()
}
fn write_parquet(batches: &[RecordBatch], props: WriterProperties) -> Vec<u8> {
let mut buffer: Vec<u8> = Vec::new();
let schema = batches[0].schema();
let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
for batch in batches {
writer.write(batch).unwrap();
}
writer.close().unwrap();
buffer
}
fn load_metadata(bytes: &[u8]) -> ParquetMetaData {
let bytes = Bytes::copy_from_slice(bytes);
ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Required)
.parse_and_finish(&bytes)
.unwrap()
}
fn load_metadata_without_page_index(bytes: &[u8]) -> ParquetMetaData {
let bytes = Bytes::copy_from_slice(bytes);
ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Skip)
.parse_and_finish(&bytes)
.unwrap()
}
#[test]
fn is_null_with_zero_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(1), Some(2), Some(3)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").is_null();
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[] as &[usize]);
}
#[test]
fn is_null_with_all_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![None, None, None]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").is_null();
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]);
}
#[test]
fn is_null_with_partial_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(1), None, Some(3)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").is_null();
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]);
}
#[test]
fn is_not_null_with_zero_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(1), Some(2), Some(3)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").is_not_null();
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]);
}
#[test]
fn is_not_null_with_all_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![None, None, None]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").is_not_null();
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[] as &[usize]);
}
#[test]
fn is_not_null_with_partial_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(1), None, Some(3)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").is_not_null();
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]);
}
#[test]
fn eq_with_zero_nulls_matching() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(1), Some(2), Some(3)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").eq(lit(2));
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]);
}
#[test]
fn eq_with_all_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![None, None, None]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").eq(lit(2));
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]); }
#[test]
fn eq_with_partial_nulls_in_range() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(1), None, Some(3)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").eq(lit(2));
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]);
}
#[test]
fn not_eq_with_zero_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(2), Some(2), Some(2)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").not_eq(lit(2));
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[] as &[usize]);
}
#[test]
fn not_eq_with_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(2), None, Some(2)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").not_eq(lit(2));
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[] as &[usize]);
}
#[test]
fn lt_with_zero_nulls_can_prune() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(5), Some(6), Some(7)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").lt(lit(5));
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[] as &[usize]);
}
#[test]
fn lt_with_zero_nulls_keeps_all() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(1), Some(2), Some(3)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").lt(lit(5));
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]);
}
#[test]
fn lt_with_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(1), None, Some(3)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").lt(lit(5));
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]);
}
#[test]
fn in_list_with_zero_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(1), Some(2), Some(3)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").in_list(vec![lit(2), lit(5)], false);
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]);
}
#[test]
fn in_list_with_all_nulls() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![None, None, None]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").in_list(vec![lit(1), lit(2)], false);
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]); }
#[test]
fn page_level_is_null_with_mixed_pages() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch1 = make_batch_nullable(&schema, vec![None, None]);
let batch2 = make_batch_nullable(&schema, vec![Some(1), Some(2)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_data_page_row_count_limit(2)
.set_max_row_group_size(100)
.build();
let bytes = write_parquet(&[batch1, batch2], props);
let metadata = load_metadata(&bytes);
let expr = col("a").is_null();
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(true)
.prune();
assert_eq!(result.row_groups(), &[0]);
if result.row_selection().is_some() {
let selection = result.row_selection().unwrap();
assert!(selection.selects_any());
}
}
#[test]
fn page_level_comparison_with_zero_null_count() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(1), Some(2), Some(10), Some(11)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_data_page_row_count_limit(2)
.set_max_row_group_size(100)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata(&bytes);
let expr = col("a").gt(lit(5));
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(true)
.prune();
assert_eq!(result.row_groups(), &[0]);
if result.row_selection().is_some() {
let selection = result.row_selection().unwrap();
assert!(selection.selects_any());
}
}
#[test]
fn missing_stats_returns_unknown() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let batch = make_batch_nullable(&schema, vec![Some(1), Some(2), Some(3)]);
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.build();
let bytes = write_parquet(&[batch], props);
let metadata = load_metadata_without_page_index(&bytes);
let expr = col("a").eq(lit(100));
let result = PruneRequest::new(&metadata, &schema)
.with_predicate(&expr)
.enable_page_index(false)
.prune();
assert_eq!(result.row_groups(), &[0]);
}