use crate::arrow::buffer::bit_util::sign_extend_be;
use crate::arrow::parquet_column;
use crate::basic::Type as PhysicalType;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::statistics::Statistics as ParquetStatistics;
use crate::schema::types::SchemaDescriptor;
use arrow_array::builder::{
BinaryBuilder, BinaryViewBuilder, BooleanBuilder, Date32Builder, Date64Builder,
Decimal32Builder, Decimal64Builder, FixedSizeBinaryBuilder, Float16Builder, Float32Builder,
Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder,
LargeStringBuilder, StringBuilder, StringViewBuilder, Time32MillisecondBuilder,
Time32SecondBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder,
TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder,
TimestampSecondBuilder, UInt8Builder, UInt16Builder, UInt32Builder, UInt64Builder,
};
use arrow_array::{
ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal32Array, Decimal64Array,
Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, Int8Array,
Int16Array, Int32Array, Int64Array, LargeBinaryArray, Time32MillisecondArray,
Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt8Array,
UInt16Array, UInt32Array, UInt64Array, new_null_array,
};
use arrow_buffer::{NullBufferBuilder, i256};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use half::f16;
use paste::paste;
use std::sync::Arc;
pub(crate) fn from_bytes_to_i32(b: &[u8]) -> i32 {
i32::from_be_bytes(sign_extend_be::<4>(b))
}
pub(crate) fn from_bytes_to_i64(b: &[u8]) -> i64 {
i64::from_be_bytes(sign_extend_be::<8>(b))
}
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
i128::from_be_bytes(sign_extend_be::<16>(b))
}
pub(crate) fn from_bytes_to_i256(b: &[u8]) -> i256 {
i256::from_be_bytes(sign_extend_be::<32>(b))
}
pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
match b {
[low, high] => Some(f16::from_be_bytes([*high, *low])),
_ => None,
}
}
macro_rules! make_stats_iterator {
($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
iter: I,
}
impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
fn new(iter: I) -> Self {
Self { iter }
}
}
impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
type Item = Option<&'a $stat_value_type>;
fn next(&mut self) -> Option<Self::Item> {
let next = self.iter.next();
next.map(|x| {
x.and_then(|stats| match stats {
$parquet_statistics_type(s) => s.$func(),
_ => None,
})
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
};
}
make_stats_iterator!(
MinBooleanStatsIterator,
min_opt,
ParquetStatistics::Boolean,
bool
);
make_stats_iterator!(
MaxBooleanStatsIterator,
max_opt,
ParquetStatistics::Boolean,
bool
);
make_stats_iterator!(
MinInt32StatsIterator,
min_opt,
ParquetStatistics::Int32,
i32
);
make_stats_iterator!(
MaxInt32StatsIterator,
max_opt,
ParquetStatistics::Int32,
i32
);
make_stats_iterator!(
MinInt64StatsIterator,
min_opt,
ParquetStatistics::Int64,
i64
);
make_stats_iterator!(
MaxInt64StatsIterator,
max_opt,
ParquetStatistics::Int64,
i64
);
make_stats_iterator!(
MinFloatStatsIterator,
min_opt,
ParquetStatistics::Float,
f32
);
make_stats_iterator!(
MaxFloatStatsIterator,
max_opt,
ParquetStatistics::Float,
f32
);
make_stats_iterator!(
MinDoubleStatsIterator,
min_opt,
ParquetStatistics::Double,
f64
);
make_stats_iterator!(
MaxDoubleStatsIterator,
max_opt,
ParquetStatistics::Double,
f64
);
make_stats_iterator!(
MinByteArrayStatsIterator,
min_bytes_opt,
ParquetStatistics::ByteArray,
[u8]
);
make_stats_iterator!(
MaxByteArrayStatsIterator,
max_bytes_opt,
ParquetStatistics::ByteArray,
[u8]
);
make_stats_iterator!(
MinFixedLenByteArrayStatsIterator,
min_bytes_opt,
ParquetStatistics::FixedLenByteArray,
[u8]
);
make_stats_iterator!(
MaxFixedLenByteArrayStatsIterator,
max_bytes_opt,
ParquetStatistics::FixedLenByteArray,
[u8]
);
macro_rules! make_decimal_stats_iterator {
($iterator_type:ident, $func:ident, $bytes_func:ident, $stat_value_type:ident, $convert_func: ident) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
iter: I,
}
impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
fn new(iter: I) -> Self {
Self { iter }
}
}
impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
type Item = Option<$stat_value_type>;
fn next(&mut self) -> Option<Self::Item> {
let next = self.iter.next();
next.map(|x| {
x.and_then(|stats| match stats {
ParquetStatistics::Int32(s) => {
s.$func().map(|x| $stat_value_type::from(*x))
}
ParquetStatistics::Int64(s) => s
.$func()
.map(|x| $stat_value_type::try_from(*x).ok())
.flatten(),
ParquetStatistics::ByteArray(s) => s.$bytes_func().map($convert_func),
ParquetStatistics::FixedLenByteArray(s) => {
s.$bytes_func().map($convert_func)
}
_ => None,
})
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
};
}
make_decimal_stats_iterator!(
MinDecimal32StatsIterator,
min_opt,
min_bytes_opt,
i32,
from_bytes_to_i32
);
make_decimal_stats_iterator!(
MaxDecimal32StatsIterator,
max_opt,
max_bytes_opt,
i32,
from_bytes_to_i32
);
make_decimal_stats_iterator!(
MinDecimal64StatsIterator,
min_opt,
min_bytes_opt,
i64,
from_bytes_to_i64
);
make_decimal_stats_iterator!(
MaxDecimal64StatsIterator,
max_opt,
max_bytes_opt,
i64,
from_bytes_to_i64
);
make_decimal_stats_iterator!(
MinDecimal128StatsIterator,
min_opt,
min_bytes_opt,
i128,
from_bytes_to_i128
);
make_decimal_stats_iterator!(
MaxDecimal128StatsIterator,
max_opt,
max_bytes_opt,
i128,
from_bytes_to_i128
);
make_decimal_stats_iterator!(
MinDecimal256StatsIterator,
min_opt,
min_bytes_opt,
i256,
from_bytes_to_i256
);
make_decimal_stats_iterator!(
MaxDecimal256StatsIterator,
max_opt,
max_bytes_opt,
i256,
from_bytes_to_i256
);
macro_rules! get_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
paste! {
match $data_type {
DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
[<$stat_type_prefix BooleanStatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| i8::try_from(*x).ok())
}),
))),
DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| i16::try_from(*x).ok())
}),
))),
DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| u8::try_from(*x).ok())
}),
))),
DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| u16::try_from(*x).ok())
}),
))),
DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)),
))),
DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)),
))),
DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
[<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
from_bytes_to_f16(x)
})),
))),
DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
[<$stat_type_prefix FloatStatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
[<$stat_type_prefix DoubleStatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator)
.map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))),
DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
DataType::Timestamp(unit, timezone) =>{
let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
Ok(match unit {
TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
})
},
DataType::Time32(unit) => {
Ok(match unit {
TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
)),
TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
)),
_ => {
let len = $iterator.count();
new_null_array($data_type, len)
}
})
},
DataType::Time64(unit) => {
Ok(match unit {
TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
)),
TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
)),
_ => {
let len = $iterator.count();
new_null_array($data_type, len)
}
})
},
DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
))),
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
))),
DataType::Utf8 => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = StringBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); continue;
};
let Ok(x) = std::str::from_utf8(x) else {
builder.append_null();
continue;
};
builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::LargeUtf8 => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = LargeStringBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); continue;
};
let Ok(x) = std::str::from_utf8(x) else {
builder.append_null();
continue;
};
builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::FixedSizeBinary(size) => {
let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
let mut builder = FixedSizeBinaryBuilder::new(*size);
for x in iterator {
let Some(x) = x else {
builder.append_null(); continue;
};
if x.len().try_into() != Ok(*size){
builder.append_null();
continue;
}
builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
}
Ok(Arc::new(builder.finish()))
},
DataType::Decimal32(precision, scale) => {
let arr = Decimal32Array::from_iter(
[<$stat_type_prefix Decimal32StatsIterator>]::new($iterator)
).with_precision_and_scale(*precision, *scale)?;
Ok(Arc::new(arr))
},
DataType::Decimal64(precision, scale) => {
let arr = Decimal64Array::from_iter(
[<$stat_type_prefix Decimal64StatsIterator>]::new($iterator)
).with_precision_and_scale(*precision, *scale)?;
Ok(Arc::new(arr))
},
DataType::Decimal128(precision, scale) => {
let arr = Decimal128Array::from_iter(
[<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
).with_precision_and_scale(*precision, *scale)?;
Ok(Arc::new(arr))
},
DataType::Decimal256(precision, scale) => {
let arr = Decimal256Array::from_iter(
[<$stat_type_prefix Decimal256StatsIterator>]::new($iterator)
).with_precision_and_scale(*precision, *scale)?;
Ok(Arc::new(arr))
},
DataType::Dictionary(_, value_type) => {
[<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type)
},
DataType::Utf8View => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = StringViewBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); continue;
};
let Ok(x) = std::str::from_utf8(x) else {
builder.append_null();
continue;
};
builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::BinaryView => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = BinaryViewBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); continue;
};
builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
}
DataType::Map(_,_) |
DataType::Duration(_) |
DataType::Interval(_) |
DataType::Date64 | DataType::Null |
DataType::List(_) |
DataType::ListView(_) |
DataType::FixedSizeList(_, _) |
DataType::LargeList(_) |
DataType::LargeListView(_) |
DataType::Struct(_) |
DataType::Union(_, _) |
DataType::RunEndEncoded(_, _) => {
let len = $iterator.count();
Ok(new_null_array($data_type, len))
}
}}}
}
macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
{
let chunks: Vec<(usize, &ColumnIndexMetaData)> = $iterator.collect();
let capacity: usize = chunks.iter().map(|c| c.0).sum();
paste! {
match $data_type {
DataType::Boolean => {
let mut b = BooleanBuilder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::BOOLEAN(index) => {
for val in index.[<$stat_type_prefix:lower _values_iter>]() {
b.append_option(val.copied());
}
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::UInt8 => {
let mut b = UInt8Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.and_then(|&x| u8::try_from(x).ok())),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::UInt16 => {
let mut b = UInt16Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.and_then(|&x| u16::try_from(x).ok())),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::UInt32 => {
let mut b = UInt32Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|&x| x as u32)),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::UInt64 => {
let mut b = UInt64Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|&x| x as u64)),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Int8 => {
let mut b = Int8Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.and_then(|&x| i8::try_from(x).ok())),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Int16 => {
let mut b = Int16Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.and_then(|&x| i16::try_from(x).ok())),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Int32 => {
let mut b = Int32Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Int64 => {
let mut b = Int64Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Float16 => {
let mut b = Float16Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.and_then(|x| from_bytes_to_f16(x))),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Float32 => {
let mut b = Float32Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::FLOAT(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Float64 => {
let mut b = Float64Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::DOUBLE(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Binary => {
let mut b = BinaryBuilder::with_capacity(capacity, capacity * 10);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::BYTE_ARRAY(index) => {
for val in index.[<$stat_type_prefix:lower _values_iter>]() {
b.append_option(val.map(|x| x.as_ref()));
}
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::LargeBinary => {
let mut b = LargeBinaryBuilder::with_capacity(capacity, capacity * 10);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::BYTE_ARRAY(index) => {
for val in index.[<$stat_type_prefix:lower _values_iter>]() {
b.append_option(val.map(|x| x.as_ref()));
}
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Utf8 => {
let mut b = StringBuilder::with_capacity(capacity, capacity * 10);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::BYTE_ARRAY(index) => {
for val in index.[<$stat_type_prefix:lower _values_iter>]() {
match val {
Some(x) => match std::str::from_utf8(x.as_ref()) {
Ok(s) => b.append_value(s),
_ => b.append_null(),
}
None => b.append_null(),
}
}
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::LargeUtf8 => {
let mut b = LargeStringBuilder::with_capacity(capacity, capacity * 10);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::BYTE_ARRAY(index) => {
for val in index.[<$stat_type_prefix:lower _values_iter>]() {
match val {
Some(x) => match std::str::from_utf8(x.as_ref()) {
Ok(s) => b.append_value(s),
_ => b.append_null(),
}
None => b.append_null(),
}
}
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Dictionary(_, value_type) => {
[<$stat_type_prefix:lower _ page_statistics>](value_type, chunks.into_iter(), $physical_type)
},
DataType::Timestamp(unit, timezone) => {
match unit {
TimeUnit::Second => {
let mut b = TimestampSecondBuilder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
}
TimeUnit::Millisecond => {
let mut b = TimestampMillisecondBuilder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
}
TimeUnit::Microsecond => {
let mut b = TimestampMicrosecondBuilder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
}
TimeUnit::Nanosecond => {
let mut b = TimestampNanosecondBuilder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
}
}
},
DataType::Date32 => {
let mut b = Date32Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> {
let mut b = Date64Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|&x| (x as i64) * 24 * 60 * 60 * 1000)),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => {
let mut b = Date64Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Decimal32(precision, scale) => {
let mut b = Decimal32Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.and_then(|&x| i32::try_from(x).ok())),
);
}
ColumnIndexMetaData::BYTE_ARRAY(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))),
);
}
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
},
DataType::Decimal64(precision, scale) => {
let mut b = Decimal64Builder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| *x as i64)),
);
}
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
ColumnIndexMetaData::BYTE_ARRAY(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))),
);
}
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
},
DataType::Decimal128(precision, scale) => {
let mut b = Decimal128Array::builder(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| *x as i128)),
);
}
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| *x as i128)),
);
}
ColumnIndexMetaData::BYTE_ARRAY(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))),
);
}
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
},
DataType::Decimal256(precision, scale) => {
let mut b = Decimal256Array::builder(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| i256::from_i128(*x as i128))),
);
}
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| i256::from_i128(*x as i128))),
);
}
ColumnIndexMetaData::BYTE_ARRAY(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))),
);
}
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
},
DataType::Time32(unit) => {
match unit {
TimeUnit::Second => {
let mut b = Time32SecondBuilder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
}
TimeUnit::Millisecond => {
let mut b = Time32MillisecondBuilder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT32(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
}
_ => {
Ok(new_null_array($data_type, capacity))
}
}
}
DataType::Time64(unit) => {
match unit {
TimeUnit::Microsecond => {
let mut b = Time64MicrosecondBuilder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
}
TimeUnit::Nanosecond => {
let mut b = Time64NanosecondBuilder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::INT64(index) => {
b.extend_from_iter_option(
index.[<$stat_type_prefix:lower _values_iter>]()
.map(|val| val.copied()),
);
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
}
_ => {
Ok(new_null_array($data_type, capacity))
}
}
},
DataType::FixedSizeBinary(size) => {
let mut b = FixedSizeBinaryBuilder::with_capacity(capacity, *size);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
for val in index.[<$stat_type_prefix:lower _values_iter>]() {
match val {
Some(v) => {
if v.len() == *size as usize {
let _ = b.append_value(v.as_ref())?;
} else {
b.append_null();
}
}
None => b.append_null(),
}
}
}
_ => b.append_nulls(len),
}
}
Ok(Arc::new(b.finish()))
},
DataType::Utf8View => {
let mut b = StringViewBuilder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::BYTE_ARRAY(index) => {
for val in index.[<$stat_type_prefix:lower _values_iter>]() {
match val {
Some(x) => match std::str::from_utf8(x.as_ref()) {
Ok(s) => b.append_value(s),
_ => b.append_null(),
}
None => b.append_null(),
}
}
}
_ => {
for _ in 0..len { b.append_null(); }
}
}
}
Ok(Arc::new(b.finish()))
},
DataType::BinaryView => {
let mut b = BinaryViewBuilder::with_capacity(capacity);
for (len, index) in chunks {
match index {
ColumnIndexMetaData::BYTE_ARRAY(index) => {
for val in index.[<$stat_type_prefix:lower _values_iter>]() {
match val {
Some(v) => b.append_value(v.as_ref()),
None => b.append_null(),
}
}
}
_ => {
for _ in 0..len { b.append_null(); }
}
}
}
Ok(Arc::new(b.finish()))
},
DataType::Date64 | DataType::Null |
DataType::Duration(_) |
DataType::Interval(_) |
DataType::List(_) |
DataType::ListView(_) |
DataType::FixedSizeList(_, _) |
DataType::LargeList(_) |
DataType::LargeListView(_) |
DataType::Struct(_) |
DataType::Union(_, _) |
DataType::Map(_, _) |
DataType::RunEndEncoded(_, _) => {
Ok(new_null_array($data_type, capacity))
},
}
}
}
}
}
fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
physical_type: Option<PhysicalType>,
) -> Result<ArrayRef> {
get_statistics!(Min, data_type, iterator, physical_type)
}
fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
physical_type: Option<PhysicalType>,
) -> Result<ArrayRef> {
get_statistics!(Max, data_type, iterator, physical_type)
}
pub(crate) fn min_page_statistics<'a, I>(
data_type: &DataType,
iterator: I,
physical_type: Option<PhysicalType>,
) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
{
get_data_page_statistics!(Min, data_type, iterator, physical_type)
}
pub(crate) fn max_page_statistics<'a, I>(
data_type: &DataType,
iterator: I,
physical_type: Option<PhysicalType>,
) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
{
get_data_page_statistics!(Max, data_type, iterator, physical_type)
}
pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
where
I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
{
let chunks: Vec<_> = iterator.collect();
let total_capacity: usize = chunks.iter().map(|(len, _)| *len).sum();
let mut values = Vec::with_capacity(total_capacity);
let mut nulls = NullBufferBuilder::new(total_capacity);
for (len, index) in chunks {
match index.null_counts() {
Some(counts) => {
values.extend(counts.iter().map(|&x| x as u64));
nulls.append_n_non_nulls(len);
}
None => {
values.resize(values.len() + len, 0);
nulls.append_n_nulls(len);
}
}
}
let null_buffer = nulls.build();
let array = UInt64Array::new(values.into(), null_buffer);
Ok(array)
}
#[derive(Debug)]
pub struct StatisticsConverter<'a> {
parquet_column_index: Option<usize>,
arrow_field: &'a Field,
missing_null_counts_as_zero: bool,
physical_type: Option<PhysicalType>,
}
impl<'a> StatisticsConverter<'a> {
pub fn parquet_column_index(&self) -> Option<usize> {
self.parquet_column_index
}
pub fn arrow_field(&self) -> &'a Field {
self.arrow_field
}
pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
self.missing_null_counts_as_zero = missing_null_counts_as_zero;
self
}
pub fn row_group_row_counts<I>(&self, metadatas: I) -> Result<Option<UInt64Array>>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(_) = self.parquet_column_index else {
return Ok(None);
};
let mut builder = UInt64Array::builder(10);
for metadata in metadatas.into_iter() {
let row_count = metadata.num_rows();
let row_count: u64 = row_count.try_into().map_err(|e| {
arrow_err!(format!(
"Parquet row count {row_count} too large to convert to u64: {e}"
))
})?;
builder.append_value(row_count);
}
Ok(Some(builder.finish()))
}
pub fn try_new<'b>(
column_name: &'b str,
arrow_schema: &'a Schema,
parquet_schema: &'a SchemaDescriptor,
) -> Result<Self> {
let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else {
return Err(arrow_err!(format!(
"Column '{}' not found in schema for statistics conversion",
column_name
)));
};
let parquet_index = match parquet_column(parquet_schema, arrow_schema, column_name) {
Some((parquet_idx, matched_field)) => {
if matched_field.as_ref() != arrow_field {
return Err(arrow_err!(format!(
"Matched column '{:?}' does not match original matched column '{:?}'",
matched_field, arrow_field
)));
}
Some(parquet_idx)
}
None => None,
};
Ok(Self {
parquet_column_index: parquet_index,
arrow_field,
missing_null_counts_as_zero: true,
physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()),
})
}
pub fn row_group_mins<I>(&self, metadatas: I) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let data_type = self.arrow_field.data_type();
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, metadatas));
};
let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
min_statistics(data_type, iter, self.physical_type)
}
pub fn row_group_maxes<I>(&self, metadatas: I) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let data_type = self.arrow_field.data_type();
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, metadatas));
};
let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
max_statistics(data_type, iter, self.physical_type)
}
pub fn row_group_is_max_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(BooleanArray::from_iter(std::iter::repeat_n(
None,
num_row_groups,
)));
};
let is_max_value_exact = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.map(|s| s.max_is_exact()));
Ok(BooleanArray::from_iter(is_max_value_exact))
}
pub fn row_group_is_min_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(BooleanArray::from_iter(std::iter::repeat_n(
None,
num_row_groups,
)));
};
let is_min_value_exact = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.map(|s| s.min_is_exact()));
Ok(BooleanArray::from_iter(is_min_value_exact))
}
pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(UInt64Array::from_iter(std::iter::repeat_n(
None,
num_row_groups,
)));
};
let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| {
s.and_then(|s| {
if self.missing_null_counts_as_zero {
Some(s.null_count_opt().unwrap_or(0))
} else {
s.null_count_opt()
}
})
});
Ok(UInt64Array::from_iter(null_counts))
}
pub fn data_page_mins<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();
(*num_data_pages, column_page_index_per_row_group_per_column)
});
min_page_statistics(data_type, iter, self.physical_type)
}
pub fn data_page_maxes<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();
(*num_data_pages, column_page_index_per_row_group_per_column)
});
max_page_statistics(data_type, iter, self.physical_type)
}
pub fn data_page_null_counts<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<UInt64Array>
where
I: IntoIterator<Item = &'a usize>,
{
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = row_group_indices.into_iter().count();
return Ok(UInt64Array::new_null(num_row_groups));
};
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();
(*num_data_pages, column_page_index_per_row_group_per_column)
});
null_counts_page_statistics(iter)
}
pub fn data_page_row_counts<I>(
&self,
column_offset_index: &ParquetOffsetIndex,
row_group_metadatas: &'a [RowGroupMetaData],
row_group_indices: I,
) -> Result<Option<UInt64Array>>
where
I: IntoIterator<Item = &'a usize>,
{
let Some(parquet_index) = self.parquet_column_index else {
return Ok(None);
};
let mut row_counts = Vec::new();
let mut nulls = NullBufferBuilder::new(0);
for rg_idx in row_group_indices {
let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();
let row_count_per_page = page_locations
.windows(2)
.map(|loc| Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64));
let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
let row_count_per_page = row_count_per_page.chain(std::iter::once(Some(
*num_rows_in_row_group as u64
- page_locations.last().unwrap().first_row_index as u64,
)));
row_counts.extend(row_count_per_page.clone().map(|x| x.unwrap_or(0)));
for val in row_count_per_page {
if val.is_some() {
nulls.append_non_null();
} else {
nulls.append_null();
}
}
}
Ok(Some(UInt64Array::new(row_counts.into(), nulls.build())))
}
fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
where
I: IntoIterator<Item = A>,
{
let num_row_groups = metadatas.into_iter().count();
new_null_array(data_type, num_row_groups)
}
}