use crate::arrow::buffer::bit_util::sign_extend_be;
use crate::arrow::parquet_column;
use crate::basic::Type as PhysicalType;
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
use crate::file::page_index::index::{Index, PageIndex};
use crate::file::statistics::Statistics as ParquetStatistics;
use crate::schema::types::SchemaDescriptor;
use arrow_array::builder::{
BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
StringViewBuilder,
};
use arrow_array::{
new_empty_array, new_null_array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, Decimal32Array, Decimal64Array, Float16Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow_buffer::i256;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use half::f16;
use paste::paste;
use std::sync::Arc;
pub(crate) fn from_bytes_to_i32(b: &[u8]) -> i32 {
i32::from_be_bytes(sign_extend_be::<4>(b))
}
pub(crate) fn from_bytes_to_i64(b: &[u8]) -> i64 {
i64::from_be_bytes(sign_extend_be::<8>(b))
}
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
i128::from_be_bytes(sign_extend_be::<16>(b))
}
pub(crate) fn from_bytes_to_i256(b: &[u8]) -> i256 {
i256::from_be_bytes(sign_extend_be::<32>(b))
}
pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
match b {
[low, high] => Some(f16::from_be_bytes([*high, *low])),
_ => None,
}
}
macro_rules! make_stats_iterator {
($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
iter: I,
}
impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
fn new(iter: I) -> Self {
Self { iter }
}
}
impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
type Item = Option<&'a $stat_value_type>;
fn next(&mut self) -> Option<Self::Item> {
let next = self.iter.next();
next.map(|x| {
x.and_then(|stats| match stats {
$parquet_statistics_type(s) => s.$func(),
_ => None,
})
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
};
}
make_stats_iterator!(
MinBooleanStatsIterator,
min_opt,
ParquetStatistics::Boolean,
bool
);
make_stats_iterator!(
MaxBooleanStatsIterator,
max_opt,
ParquetStatistics::Boolean,
bool
);
make_stats_iterator!(
MinInt32StatsIterator,
min_opt,
ParquetStatistics::Int32,
i32
);
make_stats_iterator!(
MaxInt32StatsIterator,
max_opt,
ParquetStatistics::Int32,
i32
);
make_stats_iterator!(
MinInt64StatsIterator,
min_opt,
ParquetStatistics::Int64,
i64
);
make_stats_iterator!(
MaxInt64StatsIterator,
max_opt,
ParquetStatistics::Int64,
i64
);
make_stats_iterator!(
MinFloatStatsIterator,
min_opt,
ParquetStatistics::Float,
f32
);
make_stats_iterator!(
MaxFloatStatsIterator,
max_opt,
ParquetStatistics::Float,
f32
);
make_stats_iterator!(
MinDoubleStatsIterator,
min_opt,
ParquetStatistics::Double,
f64
);
make_stats_iterator!(
MaxDoubleStatsIterator,
max_opt,
ParquetStatistics::Double,
f64
);
make_stats_iterator!(
MinByteArrayStatsIterator,
min_bytes_opt,
ParquetStatistics::ByteArray,
[u8]
);
make_stats_iterator!(
MaxByteArrayStatsIterator,
max_bytes_opt,
ParquetStatistics::ByteArray,
[u8]
);
make_stats_iterator!(
MinFixedLenByteArrayStatsIterator,
min_bytes_opt,
ParquetStatistics::FixedLenByteArray,
[u8]
);
make_stats_iterator!(
MaxFixedLenByteArrayStatsIterator,
max_bytes_opt,
ParquetStatistics::FixedLenByteArray,
[u8]
);
macro_rules! make_decimal_stats_iterator {
($iterator_type:ident, $func:ident, $bytes_func:ident, $stat_value_type:ident, $convert_func: ident) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
iter: I,
}
impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
fn new(iter: I) -> Self {
Self { iter }
}
}
impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
type Item = Option<$stat_value_type>;
fn next(&mut self) -> Option<Self::Item> {
let next = self.iter.next();
next.map(|x| {
x.and_then(|stats| match stats {
ParquetStatistics::Int32(s) => {
s.$func().map(|x| $stat_value_type::from(*x))
}
ParquetStatistics::Int64(s) => s
.$func()
.map(|x| $stat_value_type::try_from(*x).ok())
.flatten(),
ParquetStatistics::ByteArray(s) => s.$bytes_func().map($convert_func),
ParquetStatistics::FixedLenByteArray(s) => {
s.$bytes_func().map($convert_func)
}
_ => None,
})
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
};
}
make_decimal_stats_iterator!(
MinDecimal32StatsIterator,
min_opt,
min_bytes_opt,
i32,
from_bytes_to_i32
);
make_decimal_stats_iterator!(
MaxDecimal32StatsIterator,
max_opt,
max_bytes_opt,
i32,
from_bytes_to_i32
);
make_decimal_stats_iterator!(
MinDecimal64StatsIterator,
min_opt,
min_bytes_opt,
i64,
from_bytes_to_i64
);
make_decimal_stats_iterator!(
MaxDecimal64StatsIterator,
max_opt,
max_bytes_opt,
i64,
from_bytes_to_i64
);
make_decimal_stats_iterator!(
MinDecimal128StatsIterator,
min_opt,
min_bytes_opt,
i128,
from_bytes_to_i128
);
make_decimal_stats_iterator!(
MaxDecimal128StatsIterator,
max_opt,
max_bytes_opt,
i128,
from_bytes_to_i128
);
make_decimal_stats_iterator!(
MinDecimal256StatsIterator,
min_opt,
min_bytes_opt,
i256,
from_bytes_to_i256
);
make_decimal_stats_iterator!(
MaxDecimal256StatsIterator,
max_opt,
max_bytes_opt,
i256,
from_bytes_to_i256
);
macro_rules! get_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
paste! {
match $data_type {
DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
[<$stat_type_prefix BooleanStatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| i8::try_from(*x).ok())
}),
))),
DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| i16::try_from(*x).ok())
}),
))),
DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| u8::try_from(*x).ok())
}),
))),
DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| u16::try_from(*x).ok())
}),
))),
DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)),
))),
DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)),
))),
DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
[<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
from_bytes_to_f16(x)
})),
))),
DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
[<$stat_type_prefix FloatStatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
[<$stat_type_prefix DoubleStatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator)
.map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))),
DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
DataType::Timestamp(unit, timezone) =>{
let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
Ok(match unit {
TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
})
},
DataType::Time32(unit) => {
Ok(match unit {
TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
)),
TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
)),
_ => {
let len = $iterator.count();
new_null_array($data_type, len)
}
})
},
DataType::Time64(unit) => {
Ok(match unit {
TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
)),
TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
)),
_ => {
let len = $iterator.count();
new_null_array($data_type, len)
}
})
},
DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
))),
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
))),
DataType::Utf8 => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = StringBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); continue;
};
let Ok(x) = std::str::from_utf8(x) else {
builder.append_null();
continue;
};
builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::LargeUtf8 => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = LargeStringBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); continue;
};
let Ok(x) = std::str::from_utf8(x) else {
builder.append_null();
continue;
};
builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::FixedSizeBinary(size) => {
let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
let mut builder = FixedSizeBinaryBuilder::new(*size);
for x in iterator {
let Some(x) = x else {
builder.append_null(); continue;
};
if x.len().try_into() != Ok(*size){
builder.append_null();
continue;
}
builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
}
Ok(Arc::new(builder.finish()))
},
DataType::Decimal32(precision, scale) => {
let arr = Decimal32Array::from_iter(
[<$stat_type_prefix Decimal32StatsIterator>]::new($iterator)
).with_precision_and_scale(*precision, *scale)?;
Ok(Arc::new(arr))
},
DataType::Decimal64(precision, scale) => {
let arr = Decimal64Array::from_iter(
[<$stat_type_prefix Decimal64StatsIterator>]::new($iterator)
).with_precision_and_scale(*precision, *scale)?;
Ok(Arc::new(arr))
},
DataType::Decimal128(precision, scale) => {
let arr = Decimal128Array::from_iter(
[<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
).with_precision_and_scale(*precision, *scale)?;
Ok(Arc::new(arr))
},
DataType::Decimal256(precision, scale) => {
let arr = Decimal256Array::from_iter(
[<$stat_type_prefix Decimal256StatsIterator>]::new($iterator)
).with_precision_and_scale(*precision, *scale)?;
Ok(Arc::new(arr))
},
DataType::Dictionary(_, value_type) => {
[<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type)
},
DataType::Utf8View => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = StringViewBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); continue;
};
let Ok(x) = std::str::from_utf8(x) else {
builder.append_null();
continue;
};
builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::BinaryView => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = BinaryViewBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); continue;
};
builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
}
DataType::Map(_,_) |
DataType::Duration(_) |
DataType::Interval(_) |
DataType::Date64 | DataType::Null |
DataType::List(_) |
DataType::ListView(_) |
DataType::FixedSizeList(_, _) |
DataType::LargeList(_) |
DataType::LargeListView(_) |
DataType::Struct(_) |
DataType::Union(_, _) |
DataType::RunEndEncoded(_, _) => {
let len = $iterator.count();
Ok(new_null_array($data_type, len))
}
}}}
}
macro_rules! make_data_page_stats_iterator {
($iterator_type: ident, $func: expr, $index_type: path, $stat_value_type: ty) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
iter: I,
}
impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
fn new(iter: I) -> Self {
Self { iter }
}
}
impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
type Item = Vec<Option<$stat_value_type>>;
fn next(&mut self) -> Option<Self::Item> {
let next = self.iter.next();
match next {
Some((len, index)) => match index {
$index_type(native_index) => {
Some(native_index.indexes.iter().map($func).collect::<Vec<_>>())
}
_ => Some(vec![None; len]),
},
_ => None,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
};
}
make_data_page_stats_iterator!(
MinBooleanDataPageStatsIterator,
|x: &PageIndex<bool>| { x.min },
Index::BOOLEAN,
bool
);
make_data_page_stats_iterator!(
MaxBooleanDataPageStatsIterator,
|x: &PageIndex<bool>| { x.max },
Index::BOOLEAN,
bool
);
make_data_page_stats_iterator!(
MinInt32DataPageStatsIterator,
|x: &PageIndex<i32>| { x.min },
Index::INT32,
i32
);
make_data_page_stats_iterator!(
MaxInt32DataPageStatsIterator,
|x: &PageIndex<i32>| { x.max },
Index::INT32,
i32
);
make_data_page_stats_iterator!(
MinInt64DataPageStatsIterator,
|x: &PageIndex<i64>| { x.min },
Index::INT64,
i64
);
make_data_page_stats_iterator!(
MaxInt64DataPageStatsIterator,
|x: &PageIndex<i64>| { x.max },
Index::INT64,
i64
);
make_data_page_stats_iterator!(
MinFloat16DataPageStatsIterator,
|x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
Index::FIXED_LEN_BYTE_ARRAY,
FixedLenByteArray
);
make_data_page_stats_iterator!(
MaxFloat16DataPageStatsIterator,
|x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
Index::FIXED_LEN_BYTE_ARRAY,
FixedLenByteArray
);
make_data_page_stats_iterator!(
MinFloat32DataPageStatsIterator,
|x: &PageIndex<f32>| { x.min },
Index::FLOAT,
f32
);
make_data_page_stats_iterator!(
MaxFloat32DataPageStatsIterator,
|x: &PageIndex<f32>| { x.max },
Index::FLOAT,
f32
);
make_data_page_stats_iterator!(
MinFloat64DataPageStatsIterator,
|x: &PageIndex<f64>| { x.min },
Index::DOUBLE,
f64
);
make_data_page_stats_iterator!(
MaxFloat64DataPageStatsIterator,
|x: &PageIndex<f64>| { x.max },
Index::DOUBLE,
f64
);
make_data_page_stats_iterator!(
MinByteArrayDataPageStatsIterator,
|x: &PageIndex<ByteArray>| { x.min.clone() },
Index::BYTE_ARRAY,
ByteArray
);
make_data_page_stats_iterator!(
MaxByteArrayDataPageStatsIterator,
|x: &PageIndex<ByteArray>| { x.max.clone() },
Index::BYTE_ARRAY,
ByteArray
);
make_data_page_stats_iterator!(
MaxFixedLenByteArrayDataPageStatsIterator,
|x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
Index::FIXED_LEN_BYTE_ARRAY,
FixedLenByteArray
);
make_data_page_stats_iterator!(
MinFixedLenByteArrayDataPageStatsIterator,
|x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
Index::FIXED_LEN_BYTE_ARRAY,
FixedLenByteArray
);
macro_rules! get_decimal_page_stats_iterator {
($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
iter: I,
}
impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
fn new(iter: I) -> Self {
Self { iter }
}
}
impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
type Item = Vec<Option<$stat_value_type>>;
fn next(&mut self) -> Option<Self::Item> {
let next = self.iter.next();
match next {
Some((len, index)) => match index {
Index::INT32(native_index) => Some(
native_index
.indexes
.iter()
.map(|x| x.$func.and_then(|x| Some($stat_value_type::from(x))))
.collect::<Vec<_>>(),
),
Index::INT64(native_index) => Some(
native_index
.indexes
.iter()
.map(|x| x.$func.and_then(|x| $stat_value_type::try_from(x).ok()))
.collect::<Vec<_>>(),
),
Index::BYTE_ARRAY(native_index) => Some(
native_index
.indexes
.iter()
.map(|x| {
x.clone().$func.and_then(|x| Some($convert_func(x.data())))
})
.collect::<Vec<_>>(),
),
Index::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
native_index
.indexes
.iter()
.map(|x| {
x.clone().$func.and_then(|x| Some($convert_func(x.data())))
})
.collect::<Vec<_>>(),
),
_ => Some(vec![None; len]),
},
_ => None,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
};
}
get_decimal_page_stats_iterator!(
MinDecimal32DataPageStatsIterator,
min,
i32,
from_bytes_to_i32
);
get_decimal_page_stats_iterator!(
MaxDecimal32DataPageStatsIterator,
max,
i32,
from_bytes_to_i32
);
get_decimal_page_stats_iterator!(
MinDecimal64DataPageStatsIterator,
min,
i64,
from_bytes_to_i64
);
get_decimal_page_stats_iterator!(
MaxDecimal64DataPageStatsIterator,
max,
i64,
from_bytes_to_i64
);
get_decimal_page_stats_iterator!(
MinDecimal128DataPageStatsIterator,
min,
i128,
from_bytes_to_i128
);
get_decimal_page_stats_iterator!(
MaxDecimal128DataPageStatsIterator,
max,
i128,
from_bytes_to_i128
);
get_decimal_page_stats_iterator!(
MinDecimal256DataPageStatsIterator,
min,
i256,
from_bytes_to_i256
);
get_decimal_page_stats_iterator!(
MaxDecimal256DataPageStatsIterator,
max,
i256,
from_bytes_to_i256
);
macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
paste! {
match $data_type {
DataType::Boolean => {
let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator);
let mut builder = BooleanBuilder::new();
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); continue;
};
builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
DataType::UInt8 => Ok(Arc::new(
UInt8Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| u8::try_from(x).ok())
})
})
.flatten()
)
)),
DataType::UInt16 => Ok(Arc::new(
UInt16Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| u16::try_from(x).ok())
})
})
.flatten()
)
)),
DataType::UInt32 => Ok(Arc::new(
UInt32Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| Some(x as u32))
})
})
.flatten()
))),
DataType::UInt64 => Ok(Arc::new(
UInt64Array::from_iter(
[<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| Some(x as u64))
})
})
.flatten()
))),
DataType::Int8 => Ok(Arc::new(
Int8Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| i8::try_from(x).ok())
})
})
.flatten()
)
)),
DataType::Int16 => Ok(Arc::new(
Int16Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| i16::try_from(x).ok())
})
})
.flatten()
)
)),
DataType::Int32 => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Int64 => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Float16 => Ok(Arc::new(
Float16Array::from_iter(
[<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| from_bytes_to_f16(x.data()))
})
})
.flatten()
)
)),
DataType::Float32 => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Float64 => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Binary => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Utf8 => {
let mut builder = StringBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); continue;
};
let Ok(x) = std::str::from_utf8(x.data()) else {
builder.append_null();
continue;
};
builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
DataType::LargeUtf8 => {
let mut builder = LargeStringBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); continue;
};
let Ok(x) = std::str::from_utf8(x.data()) else {
builder.append_null();
continue;
};
builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
DataType::Dictionary(_, value_type) => {
[<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator, $physical_type)
},
DataType::Timestamp(unit, timezone) => {
let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten();
Ok(match unit {
TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
})
},
DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> Ok(
Arc::new(
Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter()
.map(|x| {
x.and_then(|x| i64::try_from(x).ok())
})
.map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
}).flatten()
)
)
),
DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Decimal32(precision, scale) => Ok(Arc::new(
Decimal32Array::from_iter([<$stat_type_prefix Decimal32DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
DataType::Decimal64(precision, scale) => Ok(Arc::new(
Decimal64Array::from_iter([<$stat_type_prefix Decimal64DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
DataType::Decimal128(precision, scale) => Ok(Arc::new(
Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
DataType::Decimal256(precision, scale) => Ok(Arc::new(
Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
DataType::Time32(unit) => {
Ok(match unit {
TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
)),
TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
)),
_ => {
new_empty_array(&DataType::Time32(unit.clone()))
}
})
}
DataType::Time64(unit) => {
Ok(match unit {
TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(
[<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
)),
TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
[<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
)),
_ => {
new_empty_array(&DataType::Time64(unit.clone()))
}
})
},
DataType::FixedSizeBinary(size) => {
let mut builder = FixedSizeBinaryBuilder::new(*size);
let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); continue;
};
if x.len() == *size as usize {
let _ = builder.append_value(x.data());
} else {
builder.append_null();
}
}
}
Ok(Arc::new(builder.finish()))
},
DataType::Utf8View => {
let mut builder = StringViewBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); continue;
};
let Ok(x) = std::str::from_utf8(x.data()) else {
builder.append_null();
continue;
};
builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
DataType::BinaryView => {
let mut builder = BinaryViewBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); continue;
};
builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
DataType::Date64 | DataType::Null |
DataType::Duration(_) |
DataType::Interval(_) |
DataType::List(_) |
DataType::ListView(_) |
DataType::FixedSizeList(_, _) |
DataType::LargeList(_) |
DataType::LargeListView(_) |
DataType::Struct(_) |
DataType::Union(_, _) |
DataType::Map(_, _) |
DataType::RunEndEncoded(_, _) => {
let len = $iterator.count();
Ok(new_null_array($data_type, len))
},
}
}
}
}
fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
physical_type: Option<PhysicalType>,
) -> Result<ArrayRef> {
get_statistics!(Min, data_type, iterator, physical_type)
}
fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
physical_type: Option<PhysicalType>,
) -> Result<ArrayRef> {
get_statistics!(Max, data_type, iterator, physical_type)
}
pub(crate) fn min_page_statistics<'a, I>(
data_type: &DataType,
iterator: I,
physical_type: Option<PhysicalType>,
) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
get_data_page_statistics!(Min, data_type, iterator, physical_type)
}
pub(crate) fn max_page_statistics<'a, I>(
data_type: &DataType,
iterator: I,
physical_type: Option<PhysicalType>,
) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
get_data_page_statistics!(Max, data_type, iterator, physical_type)
}
pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
where
I: Iterator<Item = (usize, &'a Index)>,
{
let iter = iterator.flat_map(|(len, index)| match index {
Index::NONE => vec![None; len],
Index::BOOLEAN(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::INT32(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::INT64(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::FLOAT(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::DOUBLE(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::BYTE_ARRAY(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
_ => unimplemented!(),
});
Ok(UInt64Array::from_iter(iter))
}
#[derive(Debug)]
pub struct StatisticsConverter<'a> {
parquet_column_index: Option<usize>,
arrow_field: &'a Field,
missing_null_counts_as_zero: bool,
physical_type: Option<PhysicalType>,
}
impl<'a> StatisticsConverter<'a> {
pub fn parquet_column_index(&self) -> Option<usize> {
self.parquet_column_index
}
pub fn arrow_field(&self) -> &'a Field {
self.arrow_field
}
pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
self.missing_null_counts_as_zero = missing_null_counts_as_zero;
self
}
pub fn row_group_row_counts<I>(&self, metadatas: I) -> Result<Option<UInt64Array>>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(_) = self.parquet_column_index else {
return Ok(None);
};
let mut builder = UInt64Array::builder(10);
for metadata in metadatas.into_iter() {
let row_count = metadata.num_rows();
let row_count: u64 = row_count.try_into().map_err(|e| {
arrow_err!(format!(
"Parquet row count {row_count} too large to convert to u64: {e}"
))
})?;
builder.append_value(row_count);
}
Ok(Some(builder.finish()))
}
pub fn try_new<'b>(
column_name: &'b str,
arrow_schema: &'a Schema,
parquet_schema: &'a SchemaDescriptor,
) -> Result<Self> {
let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else {
return Err(arrow_err!(format!(
"Column '{}' not found in schema for statistics conversion",
column_name
)));
};
let parquet_index = match parquet_column(parquet_schema, arrow_schema, column_name) {
Some((parquet_idx, matched_field)) => {
if matched_field.as_ref() != arrow_field {
return Err(arrow_err!(format!(
"Matched column '{:?}' does not match original matched column '{:?}'",
matched_field, arrow_field
)));
}
Some(parquet_idx)
}
None => None,
};
Ok(Self {
parquet_column_index: parquet_index,
arrow_field,
missing_null_counts_as_zero: true,
physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()),
})
}
pub fn row_group_mins<I>(&self, metadatas: I) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let data_type = self.arrow_field.data_type();
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, metadatas));
};
let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
min_statistics(data_type, iter, self.physical_type)
}
pub fn row_group_maxes<I>(&self, metadatas: I) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let data_type = self.arrow_field.data_type();
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, metadatas));
};
let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
max_statistics(data_type, iter, self.physical_type)
}
pub fn row_group_is_max_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(BooleanArray::from_iter(std::iter::repeat_n(
None,
num_row_groups,
)));
};
let is_max_value_exact = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.map(|s| s.max_is_exact()));
Ok(BooleanArray::from_iter(is_max_value_exact))
}
pub fn row_group_is_min_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(BooleanArray::from_iter(std::iter::repeat_n(
None,
num_row_groups,
)));
};
let is_min_value_exact = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.map(|s| s.min_is_exact()));
Ok(BooleanArray::from_iter(is_min_value_exact))
}
pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(UInt64Array::from_iter(std::iter::repeat_n(
None,
num_row_groups,
)));
};
let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| {
s.and_then(|s| {
if self.missing_null_counts_as_zero {
Some(s.null_count_opt().unwrap_or(0))
} else {
s.null_count_opt()
}
})
});
Ok(UInt64Array::from_iter(null_counts))
}
pub fn data_page_mins<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();
(*num_data_pages, column_page_index_per_row_group_per_column)
});
min_page_statistics(data_type, iter, self.physical_type)
}
pub fn data_page_maxes<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();
(*num_data_pages, column_page_index_per_row_group_per_column)
});
max_page_statistics(data_type, iter, self.physical_type)
}
pub fn data_page_null_counts<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<UInt64Array>
where
I: IntoIterator<Item = &'a usize>,
{
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = row_group_indices.into_iter().count();
return Ok(UInt64Array::from_iter(std::iter::repeat_n(
None,
num_row_groups,
)));
};
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();
(*num_data_pages, column_page_index_per_row_group_per_column)
});
null_counts_page_statistics(iter)
}
pub fn data_page_row_counts<I>(
&self,
column_offset_index: &ParquetOffsetIndex,
row_group_metadatas: &'a [RowGroupMetaData],
row_group_indices: I,
) -> Result<Option<UInt64Array>>
where
I: IntoIterator<Item = &'a usize>,
{
let Some(parquet_index) = self.parquet_column_index else {
return Ok(None);
};
let mut row_count_total = Vec::new();
for rg_idx in row_group_indices {
let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();
let row_count_per_page = page_locations
.windows(2)
.map(|loc| Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64));
let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
let row_count_per_page = row_count_per_page
.chain(std::iter::once(Some(
*num_rows_in_row_group as u64
- page_locations.last().unwrap().first_row_index as u64,
)))
.collect::<Vec<_>>();
row_count_total.extend(row_count_per_page);
}
Ok(Some(UInt64Array::from_iter(row_count_total)))
}
fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
where
I: IntoIterator<Item = A>,
{
let num_row_groups = metadatas.into_iter().count();
new_null_array(data_type, num_row_groups)
}
}