use crate::basic::Type;
use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, ByteArray, FixedLenByteArray, Int96};
use crate::errors::ParquetError;
use crate::file::metadata::LevelHistogram;
use crate::format::{BoundaryOrder, ColumnIndex};
use std::fmt::Debug;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageIndex<T> {
pub min: Option<T>,
pub max: Option<T>,
pub null_count: Option<i64>,
pub repetition_level_histogram: Option<LevelHistogram>,
pub definition_level_histogram: Option<LevelHistogram>,
}
impl<T> PageIndex<T> {
pub fn min(&self) -> Option<&T> {
self.min.as_ref()
}
pub fn max(&self) -> Option<&T> {
self.max.as_ref()
}
pub fn null_count(&self) -> Option<i64> {
self.null_count
}
pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> {
self.repetition_level_histogram.as_ref()
}
pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> {
self.definition_level_histogram.as_ref()
}
}
impl<T> PageIndex<T>
where
T: AsBytes,
{
pub fn max_bytes(&self) -> Option<&[u8]> {
self.max.as_ref().map(|x| x.as_bytes())
}
pub fn min_bytes(&self) -> Option<&[u8]> {
self.min.as_ref().map(|x| x.as_bytes())
}
}
#[derive(Debug, Clone, PartialEq)]
#[allow(non_camel_case_types)]
pub enum Index {
NONE,
BOOLEAN(NativeIndex<bool>),
INT32(NativeIndex<i32>),
INT64(NativeIndex<i64>),
INT96(NativeIndex<Int96>),
FLOAT(NativeIndex<f32>),
DOUBLE(NativeIndex<f64>),
BYTE_ARRAY(NativeIndex<ByteArray>),
FIXED_LEN_BYTE_ARRAY(NativeIndex<FixedLenByteArray>),
}
impl Index {
pub fn is_sorted(&self) -> bool {
if let Some(order) = self.get_boundary_order() {
order.0 > (BoundaryOrder::UNORDERED.0)
} else {
false
}
}
pub fn get_boundary_order(&self) -> Option<BoundaryOrder> {
match self {
Index::NONE => None,
Index::BOOLEAN(index) => Some(index.boundary_order),
Index::INT32(index) => Some(index.boundary_order),
Index::INT64(index) => Some(index.boundary_order),
Index::INT96(index) => Some(index.boundary_order),
Index::FLOAT(index) => Some(index.boundary_order),
Index::DOUBLE(index) => Some(index.boundary_order),
Index::BYTE_ARRAY(index) => Some(index.boundary_order),
Index::FIXED_LEN_BYTE_ARRAY(index) => Some(index.boundary_order),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NativeIndex<T: ParquetValueType> {
pub indexes: Vec<PageIndex<T>>,
pub boundary_order: BoundaryOrder,
}
impl<T: ParquetValueType> NativeIndex<T> {
pub const PHYSICAL_TYPE: Type = T::PHYSICAL_TYPE;
pub(crate) fn try_new(index: ColumnIndex) -> Result<Self, ParquetError> {
let len = index.min_values.len();
let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);
let to_page_histograms = |opt_hist: Option<Vec<i64>>| {
if let Some(hist) = opt_hist {
let num_levels = hist.len() / len;
let mut res = Vec::with_capacity(len);
for i in 0..len {
let page_idx = i * num_levels;
let page_hist = hist[page_idx..page_idx + num_levels].to_vec();
res.push(Some(LevelHistogram::from(page_hist)));
}
res
} else {
vec![None; len]
}
};
let rep_hists: Vec<Option<LevelHistogram>> =
to_page_histograms(index.repetition_level_histograms);
let def_hists: Vec<Option<LevelHistogram>> =
to_page_histograms(index.definition_level_histograms);
let indexes = index
.min_values
.iter()
.zip(index.max_values.iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.zip(rep_hists.into_iter())
.zip(def_hists.into_iter())
.map(
|(
((((min, max), is_null), null_count), repetition_level_histogram),
definition_level_histogram,
)| {
let (min, max) = if is_null {
(None, None)
} else {
(
Some(T::try_from_le_slice(min)?),
Some(T::try_from_le_slice(max)?),
)
};
Ok(PageIndex {
min,
max,
null_count,
repetition_level_histogram,
definition_level_histogram,
})
},
)
.collect::<Result<Vec<_>, ParquetError>>()?;
Ok(Self {
indexes,
boundary_order: index.boundary_order,
})
}
pub(crate) fn to_thrift(&self) -> ColumnIndex {
let min_values = self
.indexes
.iter()
.map(|x| x.min_bytes().unwrap_or(&[]).to_vec())
.collect::<Vec<_>>();
let max_values = self
.indexes
.iter()
.map(|x| x.max_bytes().unwrap_or(&[]).to_vec())
.collect::<Vec<_>>();
let null_counts = self
.indexes
.iter()
.map(|x| x.null_count())
.collect::<Option<Vec<_>>>();
let repetition_level_histograms = self
.indexes
.iter()
.map(|x| x.repetition_level_histogram().map(|v| v.values()))
.collect::<Option<Vec<&[i64]>>>()
.map(|hists| hists.concat());
let definition_level_histograms = self
.indexes
.iter()
.map(|x| x.definition_level_histogram().map(|v| v.values()))
.collect::<Option<Vec<&[i64]>>>()
.map(|hists| hists.concat());
ColumnIndex::new(
self.indexes.iter().map(|x| x.min().is_none()).collect(),
min_values,
max_values,
self.boundary_order,
null_counts,
repetition_level_histograms,
definition_level_histograms,
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_page_index_min_max_null() {
let page_index = PageIndex {
min: Some(-123),
max: Some(234),
null_count: Some(0),
repetition_level_histogram: Some(LevelHistogram::from(vec![1, 2])),
definition_level_histogram: Some(LevelHistogram::from(vec![1, 2, 3])),
};
assert_eq!(page_index.min().unwrap(), &-123);
assert_eq!(page_index.max().unwrap(), &234);
assert_eq!(page_index.min_bytes().unwrap(), (-123).as_bytes());
assert_eq!(page_index.max_bytes().unwrap(), 234.as_bytes());
assert_eq!(page_index.null_count().unwrap(), 0);
assert_eq!(
page_index.repetition_level_histogram().unwrap().values(),
&vec![1, 2]
);
assert_eq!(
page_index.definition_level_histogram().unwrap().values(),
&vec![1, 2, 3]
);
}
#[test]
fn test_page_index_min_max_null_none() {
let page_index: PageIndex<i32> = PageIndex {
min: None,
max: None,
null_count: None,
repetition_level_histogram: None,
definition_level_histogram: None,
};
assert_eq!(page_index.min(), None);
assert_eq!(page_index.max(), None);
assert_eq!(page_index.min_bytes(), None);
assert_eq!(page_index.max_bytes(), None);
assert_eq!(page_index.null_count(), None);
assert_eq!(page_index.repetition_level_histogram(), None);
assert_eq!(page_index.definition_level_histogram(), None);
}
#[test]
fn test_invalid_column_index() {
let column_index = ColumnIndex {
null_pages: vec![true, false],
min_values: vec![
vec![],
vec![], ],
max_values: vec![
vec![],
vec![], ],
null_counts: None,
repetition_level_histograms: None,
definition_level_histograms: None,
boundary_order: BoundaryOrder::UNORDERED,
};
let err = NativeIndex::<i32>::try_new(column_index).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: error converting value, expected 4 bytes got 0"
);
}
}