mod footer_tail;
mod memory;
mod options;
mod parser;
mod push_decoder;
pub(crate) mod reader;
pub(crate) mod thrift;
mod writer;
use crate::basic::{EncodingMask, PageType};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::FileDecryptor;
#[cfg(feature = "encryption")]
use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
pub(crate) use crate::file::metadata::memory::HeapSize;
#[cfg(feature = "encryption")]
use crate::file::metadata::thrift::encryption::EncryptionAlgorithm;
use crate::file::page_index::column_index::{ByteArrayColumnIndex, PrimitiveColumnIndex};
use crate::file::page_index::{column_index::ColumnIndexMetaData, offset_index::PageLocation};
use crate::file::statistics::Statistics;
use crate::geospatial::statistics as geo_statistics;
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor,
Type as SchemaType,
};
use crate::thrift_struct;
use crate::{
basic::BoundaryOrder,
errors::{ParquetError, Result},
};
use crate::{
basic::{ColumnOrder, Compression, Encoding, Type},
parquet_thrift::{
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
},
};
use crate::{
data_type::private::ParquetValueType, file::page_index::offset_index::OffsetIndexMetaData,
};
pub use footer_tail::FooterTail;
pub use options::{ParquetMetaDataOptions, ParquetStatisticsPolicy};
pub use push_decoder::ParquetMetaDataPushDecoder;
pub use reader::{PageIndexPolicy, ParquetMetaDataReader};
use std::io::Write;
use std::ops::Range;
use std::sync::Arc;
pub use writer::ParquetMetaDataWriter;
pub(crate) use writer::ThriftMetadataWriter;
pub type ParquetColumnIndex = Vec<Vec<ColumnIndexMetaData>>;
pub type ParquetOffsetIndex = Vec<Vec<OffsetIndexMetaData>>;
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
column_index: Option<ParquetColumnIndex>,
offset_index: Option<ParquetOffsetIndex>,
#[cfg(feature = "encryption")]
file_decryptor: Option<Box<FileDecryptor>>,
}
impl ParquetMetaData {
pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>) -> Self {
ParquetMetaData {
file_metadata,
row_groups,
column_index: None,
offset_index: None,
#[cfg(feature = "encryption")]
file_decryptor: None,
}
}
#[cfg(feature = "encryption")]
pub(crate) fn with_file_decryptor(&mut self, file_decryptor: Option<FileDecryptor>) {
self.file_decryptor = file_decryptor.map(Box::new);
}
pub fn into_builder(self) -> ParquetMetaDataBuilder {
self.into()
}
pub fn file_metadata(&self) -> &FileMetaData {
&self.file_metadata
}
#[cfg(feature = "encryption")]
pub(crate) fn file_decryptor(&self) -> Option<&FileDecryptor> {
self.file_decryptor.as_deref()
}
pub fn num_row_groups(&self) -> usize {
self.row_groups.len()
}
pub fn row_group(&self, i: usize) -> &RowGroupMetaData {
&self.row_groups[i]
}
pub fn row_groups(&self) -> &[RowGroupMetaData] {
&self.row_groups
}
pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
self.column_index.as_ref()
}
pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
self.offset_index.as_ref()
}
pub fn memory_size(&self) -> usize {
#[cfg(feature = "encryption")]
let encryption_size = self.file_decryptor.heap_size();
#[cfg(not(feature = "encryption"))]
let encryption_size = 0usize;
std::mem::size_of::<Self>()
+ self.file_metadata.heap_size()
+ self.row_groups.heap_size()
+ self.column_index.heap_size()
+ self.offset_index.heap_size()
+ encryption_size
}
pub(crate) fn set_column_index(&mut self, index: Option<ParquetColumnIndex>) {
self.column_index = index;
}
pub(crate) fn set_offset_index(&mut self, index: Option<ParquetOffsetIndex>) {
self.offset_index = index;
}
}
pub struct ParquetMetaDataBuilder(ParquetMetaData);
impl ParquetMetaDataBuilder {
pub fn new(file_meta_data: FileMetaData) -> Self {
Self(ParquetMetaData::new(file_meta_data, vec![]))
}
pub fn new_from_metadata(metadata: ParquetMetaData) -> Self {
Self(metadata)
}
pub fn add_row_group(mut self, row_group: RowGroupMetaData) -> Self {
self.0.row_groups.push(row_group);
self
}
pub fn set_row_groups(mut self, row_groups: Vec<RowGroupMetaData>) -> Self {
self.0.row_groups = row_groups;
self
}
pub fn take_row_groups(&mut self) -> Vec<RowGroupMetaData> {
std::mem::take(&mut self.0.row_groups)
}
pub fn row_groups(&self) -> &[RowGroupMetaData] {
&self.0.row_groups
}
pub fn set_column_index(mut self, column_index: Option<ParquetColumnIndex>) -> Self {
self.0.column_index = column_index;
self
}
pub fn take_column_index(&mut self) -> Option<ParquetColumnIndex> {
std::mem::take(&mut self.0.column_index)
}
pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
self.0.column_index.as_ref()
}
pub fn set_offset_index(mut self, offset_index: Option<ParquetOffsetIndex>) -> Self {
self.0.offset_index = offset_index;
self
}
pub fn take_offset_index(&mut self) -> Option<ParquetOffsetIndex> {
std::mem::take(&mut self.0.offset_index)
}
pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
self.0.offset_index.as_ref()
}
#[cfg(feature = "encryption")]
pub(crate) fn set_file_decryptor(mut self, file_decryptor: Option<FileDecryptor>) -> Self {
self.0.with_file_decryptor(file_decryptor);
self
}
pub fn build(self) -> ParquetMetaData {
let Self(metadata) = self;
metadata
}
}
impl From<ParquetMetaData> for ParquetMetaDataBuilder {
fn from(meta_data: ParquetMetaData) -> Self {
Self(meta_data)
}
}
thrift_struct!(
pub struct KeyValue {
1: required string key
2: optional string value
}
);
impl KeyValue {
pub fn new<F2>(key: String, value: F2) -> KeyValue
where
F2: Into<Option<String>>,
{
KeyValue {
key,
value: value.into(),
}
}
}
thrift_struct!(
pub struct PageEncodingStats {
1: required PageType page_type;
2: required Encoding encoding;
3: required i32 count;
}
);
#[derive(Debug, Clone, PartialEq)]
enum ParquetPageEncodingStats {
Full(Vec<PageEncodingStats>),
Mask(EncodingMask),
}
pub type FileMetaDataPtr = Arc<FileMetaData>;
#[derive(Debug, Clone, PartialEq)]
pub struct FileMetaData {
version: i32,
num_rows: i64,
created_by: Option<String>,
key_value_metadata: Option<Vec<KeyValue>>,
schema_descr: SchemaDescPtr,
column_orders: Option<Vec<ColumnOrder>>,
#[cfg(feature = "encryption")]
encryption_algorithm: Option<Box<EncryptionAlgorithm>>,
#[cfg(feature = "encryption")]
footer_signing_key_metadata: Option<Vec<u8>>,
}
impl FileMetaData {
pub fn new(
version: i32,
num_rows: i64,
created_by: Option<String>,
key_value_metadata: Option<Vec<KeyValue>>,
schema_descr: SchemaDescPtr,
column_orders: Option<Vec<ColumnOrder>>,
) -> Self {
FileMetaData {
version,
num_rows,
created_by,
key_value_metadata,
schema_descr,
column_orders,
#[cfg(feature = "encryption")]
encryption_algorithm: None,
#[cfg(feature = "encryption")]
footer_signing_key_metadata: None,
}
}
#[cfg(feature = "encryption")]
pub(crate) fn with_encryption_algorithm(
mut self,
encryption_algorithm: Option<EncryptionAlgorithm>,
) -> Self {
self.encryption_algorithm = encryption_algorithm.map(Box::new);
self
}
#[cfg(feature = "encryption")]
pub(crate) fn with_footer_signing_key_metadata(
mut self,
footer_signing_key_metadata: Option<Vec<u8>>,
) -> Self {
self.footer_signing_key_metadata = footer_signing_key_metadata;
self
}
pub fn version(&self) -> i32 {
self.version
}
pub fn num_rows(&self) -> i64 {
self.num_rows
}
pub fn created_by(&self) -> Option<&str> {
self.created_by.as_deref()
}
pub fn key_value_metadata(&self) -> Option<&Vec<KeyValue>> {
self.key_value_metadata.as_ref()
}
pub fn schema(&self) -> &SchemaType {
self.schema_descr.root_schema()
}
pub fn schema_descr(&self) -> &SchemaDescriptor {
&self.schema_descr
}
pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
self.schema_descr.clone()
}
pub fn column_orders(&self) -> Option<&Vec<ColumnOrder>> {
self.column_orders.as_ref()
}
pub fn column_order(&self, i: usize) -> ColumnOrder {
self.column_orders
.as_ref()
.map(|data| data[i])
.unwrap_or(ColumnOrder::UNDEFINED)
}
}
thrift_struct!(
pub struct SortingColumn {
1: required i32 column_idx
2: required bool descending
3: required bool nulls_first
}
);
pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>;
#[derive(Debug, Clone, PartialEq)]
pub struct RowGroupMetaData {
columns: Vec<ColumnChunkMetaData>,
num_rows: i64,
sorting_columns: Option<Vec<SortingColumn>>,
total_byte_size: i64,
schema_descr: SchemaDescPtr,
file_offset: Option<i64>,
ordinal: Option<i16>,
}
impl RowGroupMetaData {
pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
RowGroupMetaDataBuilder::new(schema_descr)
}
pub fn num_columns(&self) -> usize {
self.columns.len()
}
pub fn column(&self, i: usize) -> &ColumnChunkMetaData {
&self.columns[i]
}
pub fn columns(&self) -> &[ColumnChunkMetaData] {
&self.columns
}
pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
&mut self.columns
}
pub fn num_rows(&self) -> i64 {
self.num_rows
}
pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
self.sorting_columns.as_ref()
}
pub fn total_byte_size(&self) -> i64 {
self.total_byte_size
}
pub fn compressed_size(&self) -> i64 {
self.columns.iter().map(|c| c.total_compressed_size).sum()
}
pub fn schema_descr(&self) -> &SchemaDescriptor {
self.schema_descr.as_ref()
}
pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
self.schema_descr.clone()
}
#[inline(always)]
pub fn ordinal(&self) -> Option<i16> {
self.ordinal
}
#[inline(always)]
pub fn file_offset(&self) -> Option<i64> {
self.file_offset
}
pub fn into_builder(self) -> RowGroupMetaDataBuilder {
RowGroupMetaDataBuilder(self)
}
}
pub struct RowGroupMetaDataBuilder(RowGroupMetaData);
impl RowGroupMetaDataBuilder {
fn new(schema_descr: SchemaDescPtr) -> Self {
Self(RowGroupMetaData {
columns: Vec::with_capacity(schema_descr.num_columns()),
schema_descr,
file_offset: None,
num_rows: 0,
sorting_columns: None,
total_byte_size: 0,
ordinal: None,
})
}
pub fn set_num_rows(mut self, value: i64) -> Self {
self.0.num_rows = value;
self
}
pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> Self {
self.0.sorting_columns = value;
self
}
pub fn set_total_byte_size(mut self, value: i64) -> Self {
self.0.total_byte_size = value;
self
}
pub fn take_columns(&mut self) -> Vec<ColumnChunkMetaData> {
std::mem::take(&mut self.0.columns)
}
pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
self.0.columns = value;
self
}
pub fn add_column_metadata(mut self, value: ColumnChunkMetaData) -> Self {
self.0.columns.push(value);
self
}
pub fn set_ordinal(mut self, value: i16) -> Self {
self.0.ordinal = Some(value);
self
}
pub fn set_file_offset(mut self, value: i64) -> Self {
self.0.file_offset = Some(value);
self
}
pub fn build(self) -> Result<RowGroupMetaData> {
if self.0.schema_descr.num_columns() != self.0.columns.len() {
return Err(general_err!(
"Column length mismatch: {} != {}",
self.0.schema_descr.num_columns(),
self.0.columns.len()
));
}
Ok(self.0)
}
pub(super) fn build_unchecked(self) -> RowGroupMetaData {
self.0
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnChunkMetaData {
column_descr: ColumnDescPtr,
encodings: EncodingMask,
file_path: Option<String>,
file_offset: i64,
num_values: i64,
compression: Compression,
total_compressed_size: i64,
total_uncompressed_size: i64,
data_page_offset: i64,
index_page_offset: Option<i64>,
dictionary_page_offset: Option<i64>,
statistics: Option<Statistics>,
geo_statistics: Option<Box<geo_statistics::GeospatialStatistics>>,
encoding_stats: Option<ParquetPageEncodingStats>,
bloom_filter_offset: Option<i64>,
bloom_filter_length: Option<i32>,
offset_index_offset: Option<i64>,
offset_index_length: Option<i32>,
column_index_offset: Option<i64>,
column_index_length: Option<i32>,
unencoded_byte_array_data_bytes: Option<i64>,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
#[cfg(feature = "encryption")]
column_crypto_metadata: Option<Box<ColumnCryptoMetaData>>,
#[cfg(feature = "encryption")]
encrypted_column_metadata: Option<Vec<u8>>,
#[cfg(feature = "encryption")]
plaintext_footer_mode: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct LevelHistogram {
inner: Vec<i64>,
}
impl LevelHistogram {
pub fn try_new(max_level: i16) -> Option<Self> {
if max_level > 0 {
Some(Self {
inner: vec![0; max_level as usize + 1],
})
} else {
None
}
}
pub fn values(&self) -> &[i64] {
&self.inner
}
pub fn into_inner(self) -> Vec<i64> {
self.inner
}
pub fn get(&self, index: usize) -> Option<i64> {
self.inner.get(index).copied()
}
pub fn add(&mut self, other: &Self) {
assert_eq!(self.len(), other.len());
for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) {
*dst += src;
}
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn reset(&mut self) {
for value in self.inner.iter_mut() {
*value = 0;
}
}
#[inline]
pub fn increment_by(&mut self, level: i16, count: i64) {
self.inner[level as usize] += count;
}
#[deprecated(since = "58.2.0", note = "Use `increment_by` instead")]
pub fn update_from_levels(&mut self, levels: &[i16]) {
for &level in levels {
self.increment_by(level, 1);
}
}
}
impl From<Vec<i64>> for LevelHistogram {
fn from(inner: Vec<i64>) -> Self {
Self { inner }
}
}
impl From<LevelHistogram> for Vec<i64> {
fn from(value: LevelHistogram) -> Self {
value.into_inner()
}
}
impl HeapSize for LevelHistogram {
fn heap_size(&self) -> usize {
self.inner.heap_size()
}
}
impl ColumnChunkMetaData {
pub fn builder(column_descr: ColumnDescPtr) -> ColumnChunkMetaDataBuilder {
ColumnChunkMetaDataBuilder::new(column_descr)
}
pub fn file_path(&self) -> Option<&str> {
self.file_path.as_deref()
}
pub fn file_offset(&self) -> i64 {
self.file_offset
}
pub fn column_type(&self) -> Type {
self.column_descr.physical_type()
}
pub fn column_path(&self) -> &ColumnPath {
self.column_descr.path()
}
pub fn column_descr(&self) -> &ColumnDescriptor {
self.column_descr.as_ref()
}
pub fn column_descr_ptr(&self) -> ColumnDescPtr {
self.column_descr.clone()
}
pub fn encodings(&self) -> impl Iterator<Item = Encoding> {
self.encodings.encodings()
}
pub fn encodings_mask(&self) -> &EncodingMask {
&self.encodings
}
pub fn num_values(&self) -> i64 {
self.num_values
}
pub fn compression(&self) -> Compression {
self.compression
}
pub fn compressed_size(&self) -> i64 {
self.total_compressed_size
}
pub fn uncompressed_size(&self) -> i64 {
self.total_uncompressed_size
}
pub fn data_page_offset(&self) -> i64 {
self.data_page_offset
}
pub fn index_page_offset(&self) -> Option<i64> {
self.index_page_offset
}
pub fn dictionary_page_offset(&self) -> Option<i64> {
self.dictionary_page_offset
}
pub fn byte_range(&self) -> (u64, u64) {
let col_start = match self.dictionary_page_offset() {
Some(dictionary_page_offset) => dictionary_page_offset,
None => self.data_page_offset(),
};
let col_len = self.compressed_size();
assert!(
col_start >= 0 && col_len >= 0,
"column start and length should not be negative"
);
(col_start as u64, col_len as u64)
}
pub fn statistics(&self) -> Option<&Statistics> {
self.statistics.as_ref()
}
pub fn geo_statistics(&self) -> Option<&geo_statistics::GeospatialStatistics> {
self.geo_statistics.as_deref()
}
pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> {
match self.encoding_stats.as_ref() {
Some(ParquetPageEncodingStats::Full(stats)) => Some(stats),
_ => None,
}
}
pub fn page_encoding_stats_mask(&self) -> Option<&EncodingMask> {
match self.encoding_stats.as_ref() {
Some(ParquetPageEncodingStats::Mask(stats)) => Some(stats),
_ => None,
}
}
pub fn bloom_filter_offset(&self) -> Option<i64> {
self.bloom_filter_offset
}
pub fn bloom_filter_length(&self) -> Option<i32> {
self.bloom_filter_length
}
pub fn column_index_offset(&self) -> Option<i64> {
self.column_index_offset
}
pub fn column_index_length(&self) -> Option<i32> {
self.column_index_length
}
pub(crate) fn column_index_range(&self) -> Option<Range<u64>> {
let offset = u64::try_from(self.column_index_offset?).ok()?;
let length = u64::try_from(self.column_index_length?).ok()?;
Some(offset..(offset + length))
}
pub fn offset_index_offset(&self) -> Option<i64> {
self.offset_index_offset
}
pub fn offset_index_length(&self) -> Option<i32> {
self.offset_index_length
}
pub(crate) fn offset_index_range(&self) -> Option<Range<u64>> {
let offset = u64::try_from(self.offset_index_offset?).ok()?;
let length = u64::try_from(self.offset_index_length?).ok()?;
Some(offset..(offset + length))
}
pub fn unencoded_byte_array_data_bytes(&self) -> Option<i64> {
self.unencoded_byte_array_data_bytes
}
pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> {
self.repetition_level_histogram.as_ref()
}
pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> {
self.definition_level_histogram.as_ref()
}
#[cfg(feature = "encryption")]
pub fn crypto_metadata(&self) -> Option<&ColumnCryptoMetaData> {
self.column_crypto_metadata.as_deref()
}
pub fn into_builder(self) -> ColumnChunkMetaDataBuilder {
ColumnChunkMetaDataBuilder::from(self)
}
}
pub struct ColumnChunkMetaDataBuilder(ColumnChunkMetaData);
impl ColumnChunkMetaDataBuilder {
fn new(column_descr: ColumnDescPtr) -> Self {
Self(ColumnChunkMetaData {
column_descr,
encodings: Default::default(),
file_path: None,
file_offset: 0,
num_values: 0,
compression: Compression::UNCOMPRESSED,
total_compressed_size: 0,
total_uncompressed_size: 0,
data_page_offset: 0,
index_page_offset: None,
dictionary_page_offset: None,
statistics: None,
geo_statistics: None,
encoding_stats: None,
bloom_filter_offset: None,
bloom_filter_length: None,
offset_index_offset: None,
offset_index_length: None,
column_index_offset: None,
column_index_length: None,
unencoded_byte_array_data_bytes: None,
repetition_level_histogram: None,
definition_level_histogram: None,
#[cfg(feature = "encryption")]
column_crypto_metadata: None,
#[cfg(feature = "encryption")]
encrypted_column_metadata: None,
#[cfg(feature = "encryption")]
plaintext_footer_mode: false,
})
}
pub fn set_encodings(mut self, encodings: Vec<Encoding>) -> Self {
self.0.encodings = EncodingMask::new_from_encodings(encodings.iter());
self
}
pub fn set_encodings_mask(mut self, encodings: EncodingMask) -> Self {
self.0.encodings = encodings;
self
}
pub fn set_file_path(mut self, value: String) -> Self {
self.0.file_path = Some(value);
self
}
pub fn set_num_values(mut self, value: i64) -> Self {
self.0.num_values = value;
self
}
pub fn set_compression(mut self, value: Compression) -> Self {
self.0.compression = value;
self
}
pub fn set_total_compressed_size(mut self, value: i64) -> Self {
self.0.total_compressed_size = value;
self
}
pub fn set_total_uncompressed_size(mut self, value: i64) -> Self {
self.0.total_uncompressed_size = value;
self
}
pub fn set_data_page_offset(mut self, value: i64) -> Self {
self.0.data_page_offset = value;
self
}
pub fn set_dictionary_page_offset(mut self, value: Option<i64>) -> Self {
self.0.dictionary_page_offset = value;
self
}
pub fn set_index_page_offset(mut self, value: Option<i64>) -> Self {
self.0.index_page_offset = value;
self
}
pub fn set_statistics(mut self, value: Statistics) -> Self {
self.0.statistics = Some(value);
self
}
pub fn set_geo_statistics(mut self, value: Box<geo_statistics::GeospatialStatistics>) -> Self {
self.0.geo_statistics = Some(value);
self
}
pub fn clear_statistics(mut self) -> Self {
self.0.statistics = None;
self
}
pub fn set_page_encoding_stats(mut self, value: Vec<PageEncodingStats>) -> Self {
self.0.encoding_stats = Some(ParquetPageEncodingStats::Full(value));
self
}
pub fn set_page_encoding_stats_mask(mut self, value: EncodingMask) -> Self {
self.0.encoding_stats = Some(ParquetPageEncodingStats::Mask(value));
self
}
pub fn clear_page_encoding_stats(mut self) -> Self {
self.0.encoding_stats = None;
self
}
pub fn set_bloom_filter_offset(mut self, value: Option<i64>) -> Self {
self.0.bloom_filter_offset = value;
self
}
pub fn set_bloom_filter_length(mut self, value: Option<i32>) -> Self {
self.0.bloom_filter_length = value;
self
}
pub fn set_offset_index_offset(mut self, value: Option<i64>) -> Self {
self.0.offset_index_offset = value;
self
}
pub fn set_offset_index_length(mut self, value: Option<i32>) -> Self {
self.0.offset_index_length = value;
self
}
pub fn set_column_index_offset(mut self, value: Option<i64>) -> Self {
self.0.column_index_offset = value;
self
}
pub fn set_column_index_length(mut self, value: Option<i32>) -> Self {
self.0.column_index_length = value;
self
}
pub fn set_unencoded_byte_array_data_bytes(mut self, value: Option<i64>) -> Self {
self.0.unencoded_byte_array_data_bytes = value;
self
}
pub fn set_repetition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
self.0.repetition_level_histogram = value;
self
}
pub fn set_definition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
self.0.definition_level_histogram = value;
self
}
#[cfg(feature = "encryption")]
pub fn set_column_crypto_metadata(mut self, value: Option<ColumnCryptoMetaData>) -> Self {
self.0.column_crypto_metadata = value.map(Box::new);
self
}
#[cfg(feature = "encryption")]
pub fn set_encrypted_column_metadata(mut self, value: Option<Vec<u8>>) -> Self {
self.0.encrypted_column_metadata = value;
self
}
pub fn build(self) -> Result<ColumnChunkMetaData> {
Ok(self.0)
}
}
pub struct ColumnIndexBuilder {
column_type: Type,
null_pages: Vec<bool>,
min_values: Vec<Vec<u8>>,
max_values: Vec<Vec<u8>>,
null_counts: Vec<i64>,
boundary_order: BoundaryOrder,
repetition_level_histograms: Option<Vec<i64>>,
definition_level_histograms: Option<Vec<i64>>,
valid: bool,
}
impl ColumnIndexBuilder {
pub fn new(column_type: Type) -> Self {
ColumnIndexBuilder {
column_type,
null_pages: Vec::new(),
min_values: Vec::new(),
max_values: Vec::new(),
null_counts: Vec::new(),
boundary_order: BoundaryOrder::UNORDERED,
repetition_level_histograms: None,
definition_level_histograms: None,
valid: true,
}
}
pub fn append(
&mut self,
null_page: bool,
min_value: Vec<u8>,
max_value: Vec<u8>,
null_count: i64,
) {
self.null_pages.push(null_page);
self.min_values.push(min_value);
self.max_values.push(max_value);
self.null_counts.push(null_count);
}
pub fn append_histograms(
&mut self,
repetition_level_histogram: &Option<LevelHistogram>,
definition_level_histogram: &Option<LevelHistogram>,
) {
if !self.valid {
return;
}
if let Some(rep_lvl_hist) = repetition_level_histogram {
let hist = self.repetition_level_histograms.get_or_insert(Vec::new());
hist.reserve(rep_lvl_hist.len());
hist.extend(rep_lvl_hist.values());
}
if let Some(def_lvl_hist) = definition_level_histogram {
let hist = self.definition_level_histograms.get_or_insert(Vec::new());
hist.reserve(def_lvl_hist.len());
hist.extend(def_lvl_hist.values());
}
}
pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
self.boundary_order = boundary_order;
}
pub fn to_invalid(&mut self) {
self.valid = false;
}
pub fn valid(&self) -> bool {
self.valid
}
pub fn build(self) -> Result<ColumnIndexMetaData> {
Ok(match self.column_type {
Type::BOOLEAN => {
let index = self.build_page_index()?;
ColumnIndexMetaData::BOOLEAN(index)
}
Type::INT32 => {
let index = self.build_page_index()?;
ColumnIndexMetaData::INT32(index)
}
Type::INT64 => {
let index = self.build_page_index()?;
ColumnIndexMetaData::INT64(index)
}
Type::INT96 => {
let index = self.build_page_index()?;
ColumnIndexMetaData::INT96(index)
}
Type::FLOAT => {
let index = self.build_page_index()?;
ColumnIndexMetaData::FLOAT(index)
}
Type::DOUBLE => {
let index = self.build_page_index()?;
ColumnIndexMetaData::DOUBLE(index)
}
Type::BYTE_ARRAY => {
let index = self.build_byte_array_index()?;
ColumnIndexMetaData::BYTE_ARRAY(index)
}
Type::FIXED_LEN_BYTE_ARRAY => {
let index = self.build_byte_array_index()?;
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
}
})
}
fn build_page_index<T>(self) -> Result<PrimitiveColumnIndex<T>>
where
T: ParquetValueType,
{
let min_values: Vec<&[u8]> = self.min_values.iter().map(|v| v.as_slice()).collect();
let max_values: Vec<&[u8]> = self.max_values.iter().map(|v| v.as_slice()).collect();
PrimitiveColumnIndex::try_new(
self.null_pages,
self.boundary_order,
Some(self.null_counts),
self.repetition_level_histograms,
self.definition_level_histograms,
min_values,
max_values,
)
}
fn build_byte_array_index(self) -> Result<ByteArrayColumnIndex> {
let min_values: Vec<&[u8]> = self.min_values.iter().map(|v| v.as_slice()).collect();
let max_values: Vec<&[u8]> = self.max_values.iter().map(|v| v.as_slice()).collect();
ByteArrayColumnIndex::try_new(
self.null_pages,
self.boundary_order,
Some(self.null_counts),
self.repetition_level_histograms,
self.definition_level_histograms,
min_values,
max_values,
)
}
}
impl From<ColumnChunkMetaData> for ColumnChunkMetaDataBuilder {
fn from(value: ColumnChunkMetaData) -> Self {
ColumnChunkMetaDataBuilder(value)
}
}
pub struct OffsetIndexBuilder {
offset_array: Vec<i64>,
compressed_page_size_array: Vec<i32>,
first_row_index_array: Vec<i64>,
unencoded_byte_array_data_bytes_array: Option<Vec<i64>>,
current_first_row_index: i64,
}
impl Default for OffsetIndexBuilder {
fn default() -> Self {
Self::new()
}
}
impl OffsetIndexBuilder {
pub fn new() -> Self {
OffsetIndexBuilder {
offset_array: Vec::new(),
compressed_page_size_array: Vec::new(),
first_row_index_array: Vec::new(),
unencoded_byte_array_data_bytes_array: None,
current_first_row_index: 0,
}
}
pub fn append_row_count(&mut self, row_count: i64) {
let current_page_row_index = self.current_first_row_index;
self.first_row_index_array.push(current_page_row_index);
self.current_first_row_index += row_count;
}
pub fn append_offset_and_size(&mut self, offset: i64, compressed_page_size: i32) {
self.offset_array.push(offset);
self.compressed_page_size_array.push(compressed_page_size);
}
pub fn append_unencoded_byte_array_data_bytes(
&mut self,
unencoded_byte_array_data_bytes: Option<i64>,
) {
if let Some(val) = unencoded_byte_array_data_bytes {
self.unencoded_byte_array_data_bytes_array
.get_or_insert(Vec::new())
.push(val);
}
}
pub fn build(self) -> OffsetIndexMetaData {
let locations = self
.offset_array
.iter()
.zip(self.compressed_page_size_array.iter())
.zip(self.first_row_index_array.iter())
.map(|((offset, size), row_index)| PageLocation {
offset: *offset,
compressed_page_size: *size,
first_row_index: *row_index,
})
.collect::<Vec<_>>();
OffsetIndexMetaData {
page_locations: locations,
unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes_array,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::basic::{PageType, SortOrder};
use crate::file::metadata::thrift::tests::{
read_column_chunk, read_column_chunk_with_options, read_row_group,
};
#[test]
#[allow(deprecated)]
fn test_level_histogram_update_from_levels_compat() {
let mut histogram = LevelHistogram::try_new(2).unwrap();
histogram.update_from_levels(&[0, 2, 1, 2, 2]);
assert_eq!(histogram.values(), &[1, 1, 3]);
}
#[test]
fn test_row_group_metadata_thrift_conversion() {
let schema_descr = get_test_schema_descr();
let mut columns = vec![];
for ptr in schema_descr.columns() {
let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
columns.push(column);
}
let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(1000)
.set_total_byte_size(2000)
.set_column_metadata(columns)
.set_ordinal(1)
.build()
.unwrap();
let mut buf = Vec::new();
let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
row_group_meta.write_thrift(&mut writer).unwrap();
let row_group_res = read_row_group(&mut buf, schema_descr).unwrap();
assert_eq!(row_group_res, row_group_meta);
}
#[test]
fn test_row_group_metadata_thrift_conversion_empty() {
let schema_descr = get_test_schema_descr();
let row_group_meta = RowGroupMetaData::builder(schema_descr).build();
assert!(row_group_meta.is_err());
if let Err(e) = row_group_meta {
assert_eq!(
format!("{e}"),
"Parquet error: Column length mismatch: 2 != 0"
);
}
}
#[test]
fn test_row_group_metadata_thrift_corrupted() {
let schema_descr_2cols = Arc::new(SchemaDescriptor::new(Arc::new(
SchemaType::group_type_builder("schema")
.with_fields(vec![
Arc::new(
SchemaType::primitive_type_builder("a", Type::INT32)
.build()
.unwrap(),
),
Arc::new(
SchemaType::primitive_type_builder("b", Type::INT32)
.build()
.unwrap(),
),
])
.build()
.unwrap(),
)));
let schema_descr_3cols = Arc::new(SchemaDescriptor::new(Arc::new(
SchemaType::group_type_builder("schema")
.with_fields(vec![
Arc::new(
SchemaType::primitive_type_builder("a", Type::INT32)
.build()
.unwrap(),
),
Arc::new(
SchemaType::primitive_type_builder("b", Type::INT32)
.build()
.unwrap(),
),
Arc::new(
SchemaType::primitive_type_builder("c", Type::INT32)
.build()
.unwrap(),
),
])
.build()
.unwrap(),
)));
let row_group_meta_2cols = RowGroupMetaData::builder(schema_descr_2cols.clone())
.set_num_rows(1000)
.set_total_byte_size(2000)
.set_column_metadata(vec![
ColumnChunkMetaData::builder(schema_descr_2cols.column(0))
.build()
.unwrap(),
ColumnChunkMetaData::builder(schema_descr_2cols.column(1))
.build()
.unwrap(),
])
.set_ordinal(1)
.build()
.unwrap();
let mut buf = Vec::new();
let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
row_group_meta_2cols.write_thrift(&mut writer).unwrap();
let err = read_row_group(&mut buf, schema_descr_3cols)
.unwrap_err()
.to_string();
assert_eq!(
err,
"Parquet error: Column count mismatch. Schema has 3 columns while Row Group has 2"
);
}
#[test]
fn test_column_chunk_metadata_thrift_conversion() {
let column_descr = get_test_schema_descr().column(0);
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
.set_encodings_mask(EncodingMask::new_from_encodings(
[Encoding::PLAIN, Encoding::RLE].iter(),
))
.set_file_path("file_path".to_owned())
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
.set_dictionary_page_offset(Some(5000))
.set_page_encoding_stats(vec![
PageEncodingStats {
page_type: PageType::DATA_PAGE,
encoding: Encoding::PLAIN,
count: 3,
},
PageEncodingStats {
page_type: PageType::DATA_PAGE,
encoding: Encoding::RLE,
count: 5,
},
])
.set_bloom_filter_offset(Some(6000))
.set_bloom_filter_length(Some(25))
.set_offset_index_offset(Some(7000))
.set_offset_index_length(Some(25))
.set_column_index_offset(Some(8000))
.set_column_index_length(Some(25))
.set_unencoded_byte_array_data_bytes(Some(2000))
.set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
.set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200])))
.build()
.unwrap();
let mut buf = Vec::new();
let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
col_metadata.write_thrift(&mut writer).unwrap();
let col_chunk_res = read_column_chunk(&mut buf, column_descr.clone()).unwrap();
let expected_metadata = ColumnChunkMetaData::builder(column_descr)
.set_encodings_mask(EncodingMask::new_from_encodings(
[Encoding::PLAIN, Encoding::RLE].iter(),
))
.set_file_path("file_path".to_owned())
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
.set_dictionary_page_offset(Some(5000))
.set_page_encoding_stats_mask(EncodingMask::new_from_encodings(
[Encoding::PLAIN, Encoding::RLE].iter(),
))
.set_bloom_filter_offset(Some(6000))
.set_bloom_filter_length(Some(25))
.set_offset_index_offset(Some(7000))
.set_offset_index_length(Some(25))
.set_column_index_offset(Some(8000))
.set_column_index_length(Some(25))
.set_unencoded_byte_array_data_bytes(Some(2000))
.set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
.set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200])))
.build()
.unwrap();
assert_eq!(col_chunk_res, expected_metadata);
}
#[test]
fn test_column_chunk_metadata_thrift_conversion_full_stats() {
let column_descr = get_test_schema_descr().column(0);
let stats = vec![
PageEncodingStats {
page_type: PageType::DATA_PAGE,
encoding: Encoding::PLAIN,
count: 3,
},
PageEncodingStats {
page_type: PageType::DATA_PAGE,
encoding: Encoding::RLE,
count: 5,
},
];
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
.set_encodings_mask(EncodingMask::new_from_encodings(
[Encoding::PLAIN, Encoding::RLE].iter(),
))
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
.set_page_encoding_stats(stats)
.build()
.unwrap();
let mut buf = Vec::new();
let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
col_metadata.write_thrift(&mut writer).unwrap();
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let col_chunk_res =
read_column_chunk_with_options(&mut buf, column_descr, Some(&options)).unwrap();
assert_eq!(col_chunk_res, col_metadata);
}
#[test]
fn test_column_chunk_metadata_thrift_conversion_empty() {
let column_descr = get_test_schema_descr().column(0);
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
.build()
.unwrap();
let mut buf = Vec::new();
let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
col_metadata.write_thrift(&mut writer).unwrap();
let col_chunk_res = read_column_chunk(&mut buf, column_descr).unwrap();
assert_eq!(col_chunk_res, col_metadata);
}
#[test]
fn test_compressed_size() {
let schema_descr = get_test_schema_descr();
let mut columns = vec![];
for column_descr in schema_descr.columns() {
let column = ColumnChunkMetaData::builder(column_descr.clone())
.set_total_compressed_size(500)
.set_total_uncompressed_size(700)
.build()
.unwrap();
columns.push(column);
}
let row_group_meta = RowGroupMetaData::builder(schema_descr)
.set_num_rows(1000)
.set_column_metadata(columns)
.build()
.unwrap();
let compressed_size_res: i64 = row_group_meta.compressed_size();
let compressed_size_exp: i64 = 1000;
assert_eq!(compressed_size_res, compressed_size_exp);
}
#[test]
fn test_memory_size() {
let schema_descr = get_test_schema_descr();
let columns = schema_descr
.columns()
.iter()
.map(|column_descr| {
ColumnChunkMetaData::builder(column_descr.clone())
.set_statistics(Statistics::new::<i32>(None, None, None, None, false))
.build()
})
.collect::<Result<Vec<_>>>()
.unwrap();
let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(1000)
.set_column_metadata(columns)
.build()
.unwrap();
let row_group_meta = vec![row_group_meta];
let version = 2;
let num_rows = 1000;
let created_by = Some(String::from("test harness"));
let key_value_metadata = Some(vec![KeyValue::new(
String::from("Foo"),
Some(String::from("bar")),
)]);
let column_orders = Some(vec![
ColumnOrder::UNDEFINED,
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
]);
let file_metadata = FileMetaData::new(
version,
num_rows,
created_by,
key_value_metadata,
schema_descr.clone(),
column_orders,
);
let columns_with_stats = schema_descr
.columns()
.iter()
.map(|column_descr| {
ColumnChunkMetaData::builder(column_descr.clone())
.set_statistics(Statistics::new::<i32>(
Some(0),
Some(100),
None,
None,
false,
))
.build()
})
.collect::<Result<Vec<_>>>()
.unwrap();
let row_group_meta_with_stats = RowGroupMetaData::builder(schema_descr)
.set_num_rows(1000)
.set_column_metadata(columns_with_stats)
.build()
.unwrap();
let row_group_meta_with_stats = vec![row_group_meta_with_stats];
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
.set_row_groups(row_group_meta_with_stats)
.build();
#[cfg(not(feature = "encryption"))]
let base_expected_size = 2766;
#[cfg(feature = "encryption")]
let base_expected_size = 2934;
assert_eq!(parquet_meta.memory_size(), base_expected_size);
let mut column_index = ColumnIndexBuilder::new(Type::BOOLEAN);
column_index.append(false, vec![1u8], vec![2u8, 3u8], 4);
let column_index = column_index.build().unwrap();
let native_index = match column_index {
ColumnIndexMetaData::BOOLEAN(index) => index,
_ => panic!("wrong type of column index"),
};
let mut offset_index = OffsetIndexBuilder::new();
offset_index.append_row_count(1);
offset_index.append_offset_and_size(2, 3);
offset_index.append_unencoded_byte_array_data_bytes(Some(10));
offset_index.append_row_count(1);
offset_index.append_offset_and_size(2, 3);
offset_index.append_unencoded_byte_array_data_bytes(Some(10));
let offset_index = offset_index.build();
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata)
.set_row_groups(row_group_meta)
.set_column_index(Some(vec![vec![ColumnIndexMetaData::BOOLEAN(native_index)]]))
.set_offset_index(Some(vec![vec![offset_index]]))
.build();
#[cfg(not(feature = "encryption"))]
let bigger_expected_size = 3192;
#[cfg(feature = "encryption")]
let bigger_expected_size = 3360;
assert!(bigger_expected_size > base_expected_size);
assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
}
#[test]
#[cfg(feature = "encryption")]
fn test_memory_size_with_decryptor() {
use crate::encryption::decrypt::FileDecryptionProperties;
use crate::file::metadata::thrift::encryption::AesGcmV1;
let schema_descr = get_test_schema_descr();
let columns = schema_descr
.columns()
.iter()
.map(|column_descr| ColumnChunkMetaData::builder(column_descr.clone()).build())
.collect::<Result<Vec<_>>>()
.unwrap();
let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(1000)
.set_column_metadata(columns)
.build()
.unwrap();
let row_group_meta = vec![row_group_meta];
let version = 2;
let num_rows = 1000;
let aad_file_unique = vec![1u8; 8];
let aad_prefix = vec![2u8; 8];
let encryption_algorithm = EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
aad_prefix: Some(aad_prefix.clone()),
aad_file_unique: Some(aad_file_unique.clone()),
supply_aad_prefix: Some(true),
});
let footer_key_metadata = Some(vec![3u8; 8]);
let file_metadata =
FileMetaData::new(version, num_rows, None, None, schema_descr.clone(), None)
.with_encryption_algorithm(Some(encryption_algorithm))
.with_footer_signing_key_metadata(footer_key_metadata.clone());
let parquet_meta_data = ParquetMetaDataBuilder::new(file_metadata.clone())
.set_row_groups(row_group_meta.clone())
.build();
let base_expected_size = 2058;
assert_eq!(parquet_meta_data.memory_size(), base_expected_size);
let footer_key = "0123456789012345".as_bytes();
let column_key = "1234567890123450".as_bytes();
let mut decryption_properties_builder =
FileDecryptionProperties::builder(footer_key.to_vec())
.with_aad_prefix(aad_prefix.clone());
for column in schema_descr.columns() {
decryption_properties_builder = decryption_properties_builder
.with_column_key(&column.path().string(), column_key.to_vec());
}
let decryption_properties = decryption_properties_builder.build().unwrap();
let decryptor = FileDecryptor::new(
&decryption_properties,
footer_key_metadata.as_deref(),
aad_file_unique,
aad_prefix,
)
.unwrap();
let parquet_meta_data = ParquetMetaDataBuilder::new(file_metadata.clone())
.set_row_groups(row_group_meta.clone())
.set_file_decryptor(Some(decryptor))
.build();
let expected_size_with_decryptor = 3072;
assert!(expected_size_with_decryptor > base_expected_size);
assert_eq!(
parquet_meta_data.memory_size(),
expected_size_with_decryptor
);
}
fn get_test_schema_descr() -> SchemaDescPtr {
let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![
Arc::new(
SchemaType::primitive_type_builder("a", Type::INT32)
.build()
.unwrap(),
),
Arc::new(
SchemaType::primitive_type_builder("b", Type::INT32)
.build()
.unwrap(),
),
])
.build()
.unwrap();
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}
}