use std::collections::HashSet;
use std::sync::Arc;
use crate::schema::types::SchemaDescPtr;
#[derive(Default, Debug, Clone)]
pub enum ParquetStatisticsPolicy {
#[default]
KeepAll,
SkipAll,
SkipExcept(Arc<HashSet<usize>>),
}
impl ParquetStatisticsPolicy {
pub fn skip_except(keep: &[usize]) -> Self {
if keep.is_empty() {
Self::SkipAll
} else {
let mut keep_set = HashSet::<usize>::with_capacity(keep.len());
keep_set.extend(keep.iter());
Self::SkipExcept(Arc::new(keep_set))
}
}
pub(crate) fn is_skip(&self, col_index: usize) -> bool {
match self {
Self::KeepAll => false,
Self::SkipAll => true,
Self::SkipExcept(keep) => !keep.contains(&col_index),
}
}
}
#[derive(Default, Debug, Clone)]
pub struct ParquetMetaDataOptions {
schema_descr: Option<SchemaDescPtr>,
encoding_stats_as_mask: bool,
encoding_stats_policy: ParquetStatisticsPolicy,
}
impl ParquetMetaDataOptions {
pub fn new() -> Self {
Default::default()
}
pub fn schema(&self) -> Option<&SchemaDescPtr> {
self.schema_descr.as_ref()
}
pub fn set_schema(&mut self, val: SchemaDescPtr) {
self.schema_descr = Some(val);
}
pub fn with_schema(mut self, val: SchemaDescPtr) -> Self {
self.set_schema(val);
self
}
pub fn encoding_stats_as_mask(&self) -> bool {
self.encoding_stats_as_mask
}
pub fn set_encoding_stats_as_mask(&mut self, val: bool) {
self.encoding_stats_as_mask = val;
}
pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
self.set_encoding_stats_as_mask(val);
self
}
pub fn skip_encoding_stats(&self, col_index: usize) -> bool {
self.encoding_stats_policy.is_skip(col_index)
}
pub fn set_encoding_stats_policy(&mut self, policy: ParquetStatisticsPolicy) {
self.encoding_stats_policy = policy;
}
pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
self.set_encoding_stats_policy(policy);
self
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use crate::{
DecodeResult,
file::metadata::{ParquetMetaDataOptions, ParquetMetaDataPushDecoder},
util::test_common::file_util::get_test_file,
};
use std::{io::Read, sync::Arc};
#[test]
fn test_provide_schema() {
let mut buf: Vec<u8> = Vec::new();
get_test_file("alltypes_plain.parquet")
.read_to_end(&mut buf)
.unwrap();
let data = Bytes::from(buf);
let mut decoder = ParquetMetaDataPushDecoder::try_new(data.len() as u64).unwrap();
decoder
.push_range(0..data.len() as u64, data.clone())
.unwrap();
let expected = match decoder.try_decode().unwrap() {
DecodeResult::Data(m) => m,
_ => panic!("could not parse metadata"),
};
let expected_schema = expected.file_metadata().schema_descr_ptr();
let mut options = ParquetMetaDataOptions::new();
options.set_schema(expected_schema);
let options = Arc::new(options);
let mut decoder = ParquetMetaDataPushDecoder::try_new(data.len() as u64)
.unwrap()
.with_metadata_options(Some(options));
decoder.push_range(0..data.len() as u64, data).unwrap();
let metadata = match decoder.try_decode().unwrap() {
DecodeResult::Data(m) => m,
_ => panic!("could not parse metadata"),
};
assert_eq!(expected, metadata);
assert!(Arc::ptr_eq(
&expected.file_metadata().schema_descr_ptr(),
&metadata.file_metadata().schema_descr_ptr()
));
}
}