use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::new_empty_array;
use arrow_schema::{FieldRef, Schema};
use datafusion_common::{Result, ScalarValue};
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
i128::from_be_bytes(sign_extend_be(b))
}
fn sign_extend_be(b: &[u8]) -> [u8; 16] {
assert!(b.len() <= 16, "Array too large, expected less than 16");
let is_negative = (b[0] & 128u8) == 128u8;
let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] };
for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) {
*d = *s;
}
result
}
macro_rules! get_statistic {
($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
if !$column_statistics.has_min_max_set() {
return None;
}
match $column_statistics {
ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))),
ParquetStatistics::Int32(s) => {
match $target_arrow_type {
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(*s.$func() as i128),
*precision,
*scale,
))
}
_ => Some(ScalarValue::Int32(Some(*s.$func()))),
}
}
ParquetStatistics::Int64(s) => {
match $target_arrow_type {
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(*s.$func() as i128),
*precision,
*scale,
))
}
_ => Some(ScalarValue::Int64(Some(*s.$func()))),
}
}
ParquetStatistics::Int96(_) => None,
ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))),
ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))),
ParquetStatistics::ByteArray(s) => {
match $target_arrow_type {
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(from_bytes_to_i128(s.$bytes_func())),
*precision,
*scale,
))
}
_ => {
let s = std::str::from_utf8(s.$bytes_func())
.map(|s| s.to_string())
.ok();
if s.is_none() {
log::debug!(
"Utf8 statistics is a non-UTF8 value, ignoring it."
);
}
Some(ScalarValue::Utf8(s))
}
}
}
ParquetStatistics::FixedLenByteArray(s) => {
match $target_arrow_type {
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(from_bytes_to_i128(s.$bytes_func())),
*precision,
*scale,
))
}
Some(DataType::FixedSizeBinary(size)) => {
let value = s.$bytes_func().to_vec();
let value = if value.len().try_into() == Ok(*size) {
Some(value)
} else {
log::debug!(
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
size,
value.len(),
);
None
};
Some(ScalarValue::FixedSizeBinary(
*size,
value,
))
}
_ => None,
}
}
}
}};
}
pub(crate) fn parquet_column<'a>(
parquet_schema: &SchemaDescriptor,
arrow_schema: &'a Schema,
name: &str,
) -> Option<(usize, &'a FieldRef)> {
let (root_idx, field) = arrow_schema.fields.find(name)?;
if field.data_type().is_nested() {
return None;
}
let parquet_idx = (0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
}
pub(crate) fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
) -> Result<ArrayRef> {
let scalars = iterator
.map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type))));
collect_scalars(data_type, scalars)
}
pub(crate) fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
) -> Result<ArrayRef> {
let scalars = iterator
.map(|x| x.and_then(|s| get_statistic!(s, max, max_bytes, Some(data_type))));
collect_scalars(data_type, scalars)
}
fn collect_scalars<I: Iterator<Item = Option<ScalarValue>>>(
data_type: &DataType,
iterator: I,
) -> Result<ArrayRef> {
let mut scalars = iterator.peekable();
match scalars.peek().is_none() {
true => Ok(new_empty_array(data_type)),
false => {
let null = ScalarValue::try_from(data_type)?;
ScalarValue::iter_to_array(scalars.map(|x| x.unwrap_or_else(|| null.clone())))
}
}
}
#[cfg(test)]
mod test {
use super::*;
use arrow_array::{
new_null_array, Array, BinaryArray, BooleanArray, Decimal128Array, Float32Array,
Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
TimestampNanosecondArray,
};
use arrow_schema::{Field, SchemaRef};
use bytes::Bytes;
use datafusion_common::test_util::parquet_test_data;
use parquet::arrow::arrow_reader::ArrowReaderBuilder;
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use std::path::PathBuf;
use std::sync::Arc;
#[test]
fn roundtrip_empty() {
let empty_bool_array = new_empty_array(&DataType::Boolean);
Test {
input: empty_bool_array.clone(),
expected_min: empty_bool_array.clone(),
expected_max: empty_bool_array.clone(),
}
.run()
}
#[test]
fn roundtrip_bool() {
Test {
input: bool_array([
Some(true),
None,
Some(true),
Some(true),
Some(false),
None,
None,
None,
None,
]),
expected_min: bool_array([Some(true), Some(false), None]),
expected_max: bool_array([Some(true), Some(true), None]),
}
.run()
}
#[test]
fn roundtrip_int32() {
Test {
input: i32_array([
Some(1),
None,
Some(3),
Some(0),
Some(5),
None,
None,
None,
None,
]),
expected_min: i32_array([Some(1), Some(0), None]),
expected_max: i32_array([Some(3), Some(5), None]),
}
.run()
}
#[test]
fn roundtrip_int64() {
Test {
input: i64_array([
Some(1),
None,
Some(3),
Some(0),
Some(5),
None,
None,
None,
None,
]),
expected_min: i64_array([Some(1), Some(0), None]),
expected_max: i64_array(vec![Some(3), Some(5), None]),
}
.run()
}
#[test]
fn roundtrip_f32() {
Test {
input: f32_array([
Some(1.0),
None,
Some(3.0),
Some(-1.0),
Some(5.0),
None,
None,
None,
None,
]),
expected_min: f32_array([Some(1.0), Some(-1.0), None]),
expected_max: f32_array([Some(3.0), Some(5.0), None]),
}
.run()
}
#[test]
fn roundtrip_f64() {
Test {
input: f64_array([
Some(1.0),
None,
Some(3.0),
Some(-1.0),
Some(5.0),
None,
None,
None,
None,
]),
expected_min: f64_array([Some(1.0), Some(-1.0), None]),
expected_max: f64_array([Some(3.0), Some(5.0), None]),
}
.run()
}
#[test]
#[should_panic(
expected = "Inconsistent types in ScalarValue::iter_to_array. Expected Int64, got TimestampNanosecond(NULL, None)"
)]
fn roundtrip_timestamp() {
Test {
input: timestamp_array([
Some(1),
None,
Some(3),
Some(9),
Some(5),
None,
None,
None,
None,
]),
expected_min: timestamp_array([Some(1), Some(5), None]),
expected_max: timestamp_array([Some(3), Some(9), None]),
}
.run()
}
#[test]
fn roundtrip_decimal() {
Test {
input: Arc::new(
Decimal128Array::from(vec![
Some(100),
None,
Some(22000),
Some(500000),
Some(330000),
None,
None,
None,
None,
])
.with_precision_and_scale(9, 2)
.unwrap(),
),
expected_min: Arc::new(
Decimal128Array::from(vec![Some(100), Some(330000), None])
.with_precision_and_scale(9, 2)
.unwrap(),
),
expected_max: Arc::new(
Decimal128Array::from(vec![Some(22000), Some(500000), None])
.with_precision_and_scale(9, 2)
.unwrap(),
),
}
.run()
}
#[test]
fn roundtrip_utf8() {
Test {
input: utf8_array([
Some("A"),
None,
Some("Q"),
Some("ZZ"),
Some("AA"),
None,
None,
None,
None,
]),
expected_min: utf8_array([Some("A"), Some("AA"), None]),
expected_max: utf8_array([Some("Q"), Some("ZZ"), None]),
}
.run()
}
#[test]
fn roundtrip_struct() {
let mut test = Test {
input: struct_array(vec![
(Some(true), Some(1)),
(None, None),
(Some(true), Some(3)),
(Some(true), Some(0)),
(Some(false), Some(5)),
(None, None),
(None, None),
(None, None),
(None, None),
]),
expected_min: struct_array(vec![
(Some(true), Some(1)),
(Some(true), Some(0)),
(None, None),
]),
expected_max: struct_array(vec![
(Some(true), Some(3)),
(Some(true), Some(0)),
(None, None),
]),
};
test.expected_min =
new_null_array(test.input.data_type(), test.expected_min.len());
test.expected_max =
new_null_array(test.input.data_type(), test.expected_min.len());
test.run()
}
#[test]
#[should_panic(
expected = "Inconsistent types in ScalarValue::iter_to_array. Expected Utf8, got Binary(NULL)"
)]
fn roundtrip_binary() {
Test {
input: Arc::new(BinaryArray::from_opt_vec(vec![
Some(b"A"),
None,
Some(b"Q"),
Some(b"ZZ"),
Some(b"AA"),
None,
None,
None,
None,
])),
expected_min: Arc::new(BinaryArray::from_opt_vec(vec![
Some(b"A"),
Some(b"AA"),
None,
])),
expected_max: Arc::new(BinaryArray::from_opt_vec(vec![
Some(b"Q"),
Some(b"ZZ"),
None,
])),
}
.run()
}
#[test]
fn struct_and_non_struct() {
let struct_col = struct_array(vec![
(Some(true), Some(1)),
(None, None),
(Some(true), Some(3)),
]);
let int_col = i32_array([Some(100), Some(200), Some(300)]);
let expected_min = i32_array([Some(100)]);
let expected_max = i32_array(vec![Some(300)]);
match struct_col.data_type() {
DataType::Struct(fields) => {
assert_eq!(fields.get(1).unwrap().name(), "int_col")
}
_ => panic!("unexpected data type for struct column"),
};
let input_batch = RecordBatch::try_from_iter([
("struct_col", struct_col),
("int_col", int_col),
])
.unwrap();
let schema = input_batch.schema();
let metadata = parquet_metadata(schema.clone(), input_batch);
let parquet_schema = metadata.file_metadata().schema_descr();
let (idx, _) = parquet_column(parquet_schema, &schema, "int_col").unwrap();
assert_eq!(idx, 2);
let row_groups = metadata.row_groups();
let iter = row_groups.iter().map(|x| x.column(idx).statistics());
let min = min_statistics(&DataType::Int32, iter.clone()).unwrap();
assert_eq!(
&min,
&expected_min,
"Min. Statistics\n\n{}\n\n",
DisplayStats(row_groups)
);
let max = max_statistics(&DataType::Int32, iter).unwrap();
assert_eq!(
&max,
&expected_max,
"Max. Statistics\n\n{}\n\n",
DisplayStats(row_groups)
);
}
#[test]
fn nan_in_stats() {
TestFile::new("nan_in_stats.parquet")
.with_column(ExpectedColumn {
name: "x",
expected_min: Arc::new(Float64Array::from(vec![Some(1.0)])),
expected_max: Arc::new(Float64Array::from(vec![Some(f64::NAN)])),
})
.run();
}
#[test]
fn alltypes_plain() {
TestFile::new("alltypes_plain.parquet")
.with_column(ExpectedColumn {
name: "id",
expected_min: i32_array([None]),
expected_max: i32_array([None]),
})
.with_column(ExpectedColumn {
name: "bool_col",
expected_min: bool_array([None]),
expected_max: bool_array([None]),
})
.run();
}
#[test]
fn alltypes_tiny_pages() {
TestFile::new("alltypes_tiny_pages.parquet")
.with_column(ExpectedColumn {
name: "id",
expected_min: i32_array([Some(0)]),
expected_max: i32_array([Some(7299)]),
})
.with_column(ExpectedColumn {
name: "bool_col",
expected_min: bool_array([Some(false)]),
expected_max: bool_array([Some(true)]),
})
.with_column(ExpectedColumn {
name: "tinyint_col",
expected_min: i32_array([Some(0)]),
expected_max: i32_array([Some(9)]),
})
.with_column(ExpectedColumn {
name: "smallint_col",
expected_min: i32_array([Some(0)]),
expected_max: i32_array([Some(9)]),
})
.with_column(ExpectedColumn {
name: "int_col",
expected_min: i32_array([Some(0)]),
expected_max: i32_array([Some(9)]),
})
.with_column(ExpectedColumn {
name: "bigint_col",
expected_min: i64_array([Some(0)]),
expected_max: i64_array([Some(90)]),
})
.with_column(ExpectedColumn {
name: "float_col",
expected_min: f32_array([Some(0.0)]),
expected_max: f32_array([Some(9.9)]),
})
.with_column(ExpectedColumn {
name: "double_col",
expected_min: f64_array([Some(0.0)]),
expected_max: f64_array([Some(90.89999999999999)]),
})
.with_column(ExpectedColumn {
name: "date_string_col",
expected_min: utf8_array([Some("01/01/09")]),
expected_max: utf8_array([Some("12/31/10")]),
})
.with_column(ExpectedColumn {
name: "string_col",
expected_min: utf8_array([Some("0")]),
expected_max: utf8_array([Some("9")]),
})
.with_column(ExpectedColumn {
name: "timestamp_col",
expected_min: timestamp_array([None]),
expected_max: timestamp_array([None]),
})
.with_column(ExpectedColumn {
name: "year",
expected_min: i32_array([Some(2009)]),
expected_max: i32_array([Some(2010)]),
})
.with_column(ExpectedColumn {
name: "month",
expected_min: i32_array([Some(1)]),
expected_max: i32_array([Some(12)]),
})
.run();
}
#[test]
fn fixed_length_decimal_legacy() {
TestFile::new("fixed_length_decimal_legacy.parquet")
.with_column(ExpectedColumn {
name: "value",
expected_min: Arc::new(
Decimal128Array::from(vec![Some(200)])
.with_precision_and_scale(13, 2)
.unwrap(),
),
expected_max: Arc::new(
Decimal128Array::from(vec![Some(2400)])
.with_precision_and_scale(13, 2)
.unwrap(),
),
})
.run();
}
const ROWS_PER_ROW_GROUP: usize = 3;
struct Test {
input: ArrayRef,
expected_min: ArrayRef,
expected_max: ArrayRef,
}
impl Test {
fn run(self) {
let Self {
input,
expected_min,
expected_max,
} = self;
let input_batch = RecordBatch::try_from_iter([("c1", input)]).unwrap();
let schema = input_batch.schema();
let metadata = parquet_metadata(schema.clone(), input_batch);
let parquet_schema = metadata.file_metadata().schema_descr();
let row_groups = metadata.row_groups();
for field in schema.fields() {
if field.data_type().is_nested() {
let lookup = parquet_column(parquet_schema, &schema, field.name());
assert_eq!(lookup, None);
continue;
}
let (idx, f) =
parquet_column(parquet_schema, &schema, field.name()).unwrap();
assert_eq!(f, field);
let iter = row_groups.iter().map(|x| x.column(idx).statistics());
let min = min_statistics(f.data_type(), iter.clone()).unwrap();
assert_eq!(
&min,
&expected_min,
"Min. Statistics\n\n{}\n\n",
DisplayStats(row_groups)
);
let max = max_statistics(f.data_type(), iter).unwrap();
assert_eq!(
&max,
&expected_max,
"Max. Statistics\n\n{}\n\n",
DisplayStats(row_groups)
);
}
}
}
fn parquet_metadata(schema: SchemaRef, batch: RecordBatch) -> Arc<ParquetMetaData> {
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.set_max_row_group_size(ROWS_PER_ROW_GROUP)
.build();
let mut buffer = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let reader = ArrowReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
reader.metadata().clone()
}
struct DisplayStats<'a>(&'a [RowGroupMetaData]);
impl<'a> std::fmt::Display for DisplayStats<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let row_groups = self.0;
writeln!(f, " row_groups: {}", row_groups.len())?;
for rg in row_groups {
for col in rg.columns() {
if let Some(statistics) = col.statistics() {
writeln!(f, " {}: {:?}", col.column_path(), statistics)?;
}
}
}
Ok(())
}
}
struct ExpectedColumn {
name: &'static str,
expected_min: ArrayRef,
expected_max: ArrayRef,
}
struct TestFile {
file_name: &'static str,
expected_columns: Vec<ExpectedColumn>,
}
impl TestFile {
fn new(file_name: &'static str) -> Self {
Self {
file_name,
expected_columns: Vec::new(),
}
}
fn with_column(mut self, column: ExpectedColumn) -> Self {
self.expected_columns.push(column);
self
}
fn run(self) {
let path = PathBuf::from(parquet_test_data()).join(self.file_name);
let file = std::fs::File::open(path).unwrap();
let reader = ArrowReaderBuilder::try_new(file).unwrap();
let arrow_schema = reader.schema();
let metadata = reader.metadata();
let row_groups = metadata.row_groups();
let parquet_schema = metadata.file_metadata().schema_descr();
for expected_column in self.expected_columns {
let ExpectedColumn {
name,
expected_min,
expected_max,
} = expected_column;
let (idx, field) =
parquet_column(parquet_schema, arrow_schema, name).unwrap();
let iter = row_groups.iter().map(|x| x.column(idx).statistics());
let actual_min = min_statistics(field.data_type(), iter.clone()).unwrap();
assert_eq!(&expected_min, &actual_min, "column {name}");
let actual_max = max_statistics(field.data_type(), iter).unwrap();
assert_eq!(&expected_max, &actual_max, "column {name}");
}
}
}
fn bool_array(input: impl IntoIterator<Item = Option<bool>>) -> ArrayRef {
let array: BooleanArray = input.into_iter().collect();
Arc::new(array)
}
fn i32_array(input: impl IntoIterator<Item = Option<i32>>) -> ArrayRef {
let array: Int32Array = input.into_iter().collect();
Arc::new(array)
}
fn i64_array(input: impl IntoIterator<Item = Option<i64>>) -> ArrayRef {
let array: Int64Array = input.into_iter().collect();
Arc::new(array)
}
fn f32_array(input: impl IntoIterator<Item = Option<f32>>) -> ArrayRef {
let array: Float32Array = input.into_iter().collect();
Arc::new(array)
}
fn f64_array(input: impl IntoIterator<Item = Option<f64>>) -> ArrayRef {
let array: Float64Array = input.into_iter().collect();
Arc::new(array)
}
fn timestamp_array(input: impl IntoIterator<Item = Option<i64>>) -> ArrayRef {
let array: TimestampNanosecondArray = input.into_iter().collect();
Arc::new(array)
}
fn utf8_array<'a>(input: impl IntoIterator<Item = Option<&'a str>>) -> ArrayRef {
let array: StringArray = input
.into_iter()
.map(|s| s.map(|s| s.to_string()))
.collect();
Arc::new(array)
}
fn struct_array(input: Vec<(Option<bool>, Option<i32>)>) -> ArrayRef {
let boolean: BooleanArray = input.iter().map(|(b, _i)| b).collect();
let int: Int32Array = input.iter().map(|(_b, i)| i).collect();
let nullable = true;
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("bool_col", DataType::Boolean, nullable)),
Arc::new(boolean) as ArrayRef,
),
(
Arc::new(Field::new("int_col", DataType::Int32, nullable)),
Arc::new(int) as ArrayRef,
),
]);
Arc::new(struct_array)
}
}