use parquet2::error::Error as ParquetError;
use parquet2::indexes::{
select_pages, BooleanIndex, ByteIndex, FixedLenByteIndex, Index as ParquetIndex, NativeIndex,
PageLocation,
};
use parquet2::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet2::read::{read_columns_indexes as _read_columns_indexes, read_pages_locations};
use parquet2::schema::types::PhysicalType as ParquetPhysicalType;
mod binary;
mod boolean;
mod fixed_len_binary;
mod primitive;
use std::collections::VecDeque;
use std::io::{Read, Seek};
use crate::array::UInt64Array;
use crate::datatypes::{Field, PrimitiveType};
use crate::{
array::Array,
datatypes::{DataType, PhysicalType},
error::Error,
};
use super::get_field_pages;
pub use parquet2::indexes::{FilteredPage, Interval};
#[derive(Debug, PartialEq)]
pub enum FieldPageStatistics {
Single(ColumnPageStatistics),
Multiple(Vec<FieldPageStatistics>),
}
impl From<ColumnPageStatistics> for FieldPageStatistics {
fn from(column: ColumnPageStatistics) -> Self {
Self::Single(column)
}
}
#[derive(Debug, PartialEq)]
pub struct ColumnPageStatistics {
pub min: Box<dyn Array>,
pub max: Box<dyn Array>,
pub null_count: UInt64Array,
}
fn deserialize(
indexes: &mut VecDeque<&Box<dyn ParquetIndex>>,
data_type: DataType,
) -> Result<FieldPageStatistics, Error> {
match data_type.to_physical_type() {
PhysicalType::Boolean => {
let index = indexes
.pop_front()
.unwrap()
.as_any()
.downcast_ref::<BooleanIndex>()
.unwrap();
Ok(boolean::deserialize(&index.indexes).into())
}
PhysicalType::Primitive(PrimitiveType::Int128) => {
let index = indexes.pop_front().unwrap();
match index.physical_type() {
ParquetPhysicalType::Int32 => {
let index = index.as_any().downcast_ref::<NativeIndex<i32>>().unwrap();
Ok(primitive::deserialize_i32(&index.indexes, data_type).into())
}
parquet2::schema::types::PhysicalType::Int64 => {
let index = index.as_any().downcast_ref::<NativeIndex<i64>>().unwrap();
Ok(
primitive::deserialize_i64(
&index.indexes,
&index.primitive_type,
data_type,
)
.into(),
)
}
parquet2::schema::types::PhysicalType::FixedLenByteArray(_) => {
let index = index.as_any().downcast_ref::<FixedLenByteIndex>().unwrap();
Ok(fixed_len_binary::deserialize(&index.indexes, data_type).into())
}
other => Err(Error::nyi(format!(
"Deserialize {other:?} to arrow's int64"
))),
}
}
PhysicalType::Primitive(PrimitiveType::Int256) => {
let index = indexes.pop_front().unwrap();
match index.physical_type() {
ParquetPhysicalType::Int32 => {
let index = index.as_any().downcast_ref::<NativeIndex<i32>>().unwrap();
Ok(primitive::deserialize_i32(&index.indexes, data_type).into())
}
parquet2::schema::types::PhysicalType::Int64 => {
let index = index.as_any().downcast_ref::<NativeIndex<i64>>().unwrap();
Ok(
primitive::deserialize_i64(
&index.indexes,
&index.primitive_type,
data_type,
)
.into(),
)
}
parquet2::schema::types::PhysicalType::FixedLenByteArray(_) => {
let index = index.as_any().downcast_ref::<FixedLenByteIndex>().unwrap();
Ok(fixed_len_binary::deserialize(&index.indexes, data_type).into())
}
other => Err(Error::nyi(format!(
"Deserialize {other:?} to arrow's int64"
))),
}
}
PhysicalType::Primitive(PrimitiveType::UInt8)
| PhysicalType::Primitive(PrimitiveType::UInt16)
| PhysicalType::Primitive(PrimitiveType::UInt32)
| PhysicalType::Primitive(PrimitiveType::Int32) => {
let index = indexes
.pop_front()
.unwrap()
.as_any()
.downcast_ref::<NativeIndex<i32>>()
.unwrap();
Ok(primitive::deserialize_i32(&index.indexes, data_type).into())
}
PhysicalType::Primitive(PrimitiveType::UInt64)
| PhysicalType::Primitive(PrimitiveType::Int64) => {
let index = indexes.pop_front().unwrap();
match index.physical_type() {
ParquetPhysicalType::Int64 => {
let index = index.as_any().downcast_ref::<NativeIndex<i64>>().unwrap();
Ok(
primitive::deserialize_i64(
&index.indexes,
&index.primitive_type,
data_type,
)
.into(),
)
}
parquet2::schema::types::PhysicalType::Int96 => {
let index = index
.as_any()
.downcast_ref::<NativeIndex<[u32; 3]>>()
.unwrap();
Ok(primitive::deserialize_i96(&index.indexes, data_type).into())
}
other => Err(Error::nyi(format!(
"Deserialize {other:?} to arrow's int64"
))),
}
}
PhysicalType::Primitive(PrimitiveType::Float32) => {
let index = indexes
.pop_front()
.unwrap()
.as_any()
.downcast_ref::<NativeIndex<f32>>()
.unwrap();
Ok(primitive::deserialize_id(&index.indexes, data_type).into())
}
PhysicalType::Primitive(PrimitiveType::Float64) => {
let index = indexes
.pop_front()
.unwrap()
.as_any()
.downcast_ref::<NativeIndex<f64>>()
.unwrap();
Ok(primitive::deserialize_id(&index.indexes, data_type).into())
}
PhysicalType::Binary
| PhysicalType::LargeBinary
| PhysicalType::Utf8
| PhysicalType::LargeUtf8 => {
let index = indexes
.pop_front()
.unwrap()
.as_any()
.downcast_ref::<ByteIndex>()
.unwrap();
binary::deserialize(&index.indexes, &data_type).map(|x| x.into())
}
PhysicalType::FixedSizeBinary => {
let index = indexes
.pop_front()
.unwrap()
.as_any()
.downcast_ref::<FixedLenByteIndex>()
.unwrap();
Ok(fixed_len_binary::deserialize(&index.indexes, data_type).into())
}
PhysicalType::Dictionary(_) => {
if let DataType::Dictionary(_, inner, _) = data_type.to_logical_type() {
deserialize(indexes, (**inner).clone())
} else {
unreachable!()
}
}
PhysicalType::List => {
if let DataType::List(inner) = data_type.to_logical_type() {
deserialize(indexes, inner.data_type.clone())
} else {
unreachable!()
}
}
PhysicalType::LargeList => {
if let DataType::LargeList(inner) = data_type.to_logical_type() {
deserialize(indexes, inner.data_type.clone())
} else {
unreachable!()
}
}
PhysicalType::Map => {
if let DataType::Map(inner, _) = data_type.to_logical_type() {
deserialize(indexes, inner.data_type.clone())
} else {
unreachable!()
}
}
PhysicalType::Struct => {
let children_fields = if let DataType::Struct(children) = data_type.to_logical_type() {
children
} else {
unreachable!()
};
let children = children_fields
.iter()
.map(|child| deserialize(indexes, child.data_type.clone()))
.collect::<Result<Vec<_>, Error>>()?;
Ok(FieldPageStatistics::Multiple(children))
}
other => Err(Error::nyi(format!(
"Deserialize into arrow's {other:?} page index"
))),
}
}
pub fn has_indexes(row_group: &RowGroupMetaData) -> bool {
row_group
.columns()
.iter()
.all(|chunk| chunk.column_chunk().column_index_offset.is_some())
}
pub fn read_columns_indexes<R: Read + Seek>(
reader: &mut R,
chunks: &[ColumnChunkMetaData],
fields: &[Field],
) -> Result<Vec<FieldPageStatistics>, Error> {
let indexes = _read_columns_indexes(reader, chunks)?;
fields
.iter()
.map(|field| {
let indexes = get_field_pages(chunks, &indexes, &field.name);
let mut indexes = indexes.into_iter().collect();
deserialize(&mut indexes, field.data_type.clone())
})
.collect()
}
pub fn compute_page_row_intervals(
locations: &[PageLocation],
num_rows: usize,
) -> Result<Vec<Interval>, ParquetError> {
if locations.is_empty() {
return Ok(vec![]);
};
let last = (|| {
let start: usize = locations.last().unwrap().first_row_index.try_into()?;
let length = num_rows - start;
Result::<_, ParquetError>::Ok(Interval::new(start, length))
})();
let pages_lengths = locations
.windows(2)
.map(|x| {
let start = usize::try_from(x[0].first_row_index)?;
let length = usize::try_from(x[1].first_row_index - x[0].first_row_index)?;
Ok(Interval::new(start, length))
})
.chain(std::iter::once(last));
pages_lengths.collect()
}
pub fn read_filtered_pages<
R: Read + Seek,
F: Fn(&[FieldPageStatistics], &[Vec<Vec<Interval>>]) -> Vec<Interval>,
>(
reader: &mut R,
row_group: &RowGroupMetaData,
fields: &[Field],
predicate: F,
) -> Result<Vec<Vec<Vec<FilteredPage>>>, Error> {
let num_rows = row_group.num_rows();
let locations = read_pages_locations(reader, row_group.columns())?;
let locations = fields
.iter()
.map(|field| get_field_pages(row_group.columns(), &locations, &field.name))
.collect::<Vec<_>>();
let indexes = read_columns_indexes(reader, row_group.columns(), fields)?;
let intervals = locations
.iter()
.map(|locations| {
locations
.iter()
.map(|locations| Ok(compute_page_row_intervals(locations, num_rows)?))
.collect::<Result<Vec<_>, Error>>()
})
.collect::<Result<Vec<_>, Error>>()?;
let intervals = predicate(&indexes, &intervals);
locations
.into_iter()
.map(|locations| {
locations
.into_iter()
.map(|locations| Ok(select_pages(&intervals, locations, num_rows)?))
.collect::<Result<Vec<_>, Error>>()
})
.collect()
}