#![cfg_attr(feature = "encryption", doc = "```rust")]
#![cfg_attr(not(feature = "encryption"), doc = "```ignore")]
experimental!(mod array_reader);
pub mod arrow_reader;
pub mod arrow_writer;
mod buffer;
mod decoder;
#[cfg(feature = "async")]
pub mod async_reader;
#[cfg(feature = "async")]
pub mod async_writer;
pub mod push_decoder;
mod in_memory_row_group;
mod record_reader;
experimental!(mod schema);
use std::fmt::Debug;
pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
#[cfg(feature = "async")]
pub use self::async_writer::AsyncArrowWriter;
use crate::schema::types::SchemaDescriptor;
use arrow_schema::{FieldRef, Schema};
pub use self::schema::{
ArrowSchemaConverter, FieldLevels, add_encoded_arrow_schema_to_metadata, encode_arrow_schema,
parquet_to_arrow_field_levels, parquet_to_arrow_field_levels_with_virtual,
parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, virtual_type::*,
};
pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProjectionMask {
mask: Option<Vec<bool>>,
}
impl ProjectionMask {
pub fn all() -> Self {
Self { mask: None }
}
pub fn none(len: usize) -> Self {
Self {
mask: Some(vec![false; len]),
}
}
pub fn leaves(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
let mut mask = vec![false; schema.num_columns()];
for leaf_idx in indices {
mask[leaf_idx] = true;
}
Self { mask: Some(mask) }
}
pub fn roots(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
let num_root_columns = schema.root_schema().get_fields().len();
let mut root_mask = vec![false; num_root_columns];
for root_idx in indices {
root_mask[root_idx] = true;
}
let mask = (0..schema.num_columns())
.map(|leaf_idx| {
let root_idx = schema.get_column_root_idx(leaf_idx);
root_mask[root_idx]
})
.collect();
Self { mask: Some(mask) }
}
pub fn columns<'a>(
schema: &SchemaDescriptor,
names: impl IntoIterator<Item = &'a str>,
) -> Self {
let mut mask = vec![false; schema.num_columns()];
for name in names {
let name_path: Vec<&str> = name.split('.').collect();
for (idx, col) in schema.columns().iter().enumerate() {
let path = col.path().parts();
if name_path.len() > path.len() {
continue;
}
if name_path.iter().zip(path.iter()).all(|(a, b)| a == b) {
mask[idx] = true;
}
}
}
Self { mask: Some(mask) }
}
pub fn leaf_included(&self, leaf_idx: usize) -> bool {
self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
}
pub fn union(&mut self, other: &Self) {
match (self.mask.as_ref(), other.mask.as_ref()) {
(None, _) | (_, None) => self.mask = None,
(Some(a), Some(b)) => {
debug_assert_eq!(a.len(), b.len());
let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a || b).collect();
self.mask = Some(mask);
}
}
}
pub fn intersect(&mut self, other: &Self) {
match (self.mask.as_ref(), other.mask.as_ref()) {
(None, _) => self.mask = other.mask.clone(),
(_, None) => {}
(Some(a), Some(b)) => {
debug_assert_eq!(a.len(), b.len());
let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a && b).collect();
self.mask = Some(mask);
}
}
}
pub(crate) fn without_nested_types(&self, schema: &SchemaDescriptor) -> Option<Self> {
let num_leaves = schema.num_columns();
let num_roots = schema.root_schema().get_fields().len();
let mut root_leaf_counts = vec![0usize; num_roots];
for leaf_idx in 0..num_leaves {
let root_idx = schema.get_column_root_idx(leaf_idx);
root_leaf_counts[root_idx] += 1;
}
let mut included_leaves = Vec::new();
for leaf_idx in 0..num_leaves {
if self.leaf_included(leaf_idx) {
let root = schema.get_column_root(leaf_idx);
let root_idx = schema.get_column_root_idx(leaf_idx);
if root_leaf_counts[root_idx] == 1 && !root.is_list() {
included_leaves.push(leaf_idx);
}
}
}
if included_leaves.is_empty() {
None
} else {
Some(ProjectionMask::leaves(schema, included_leaves))
}
}
}
pub fn parquet_column<'a>(
parquet_schema: &SchemaDescriptor,
arrow_schema: &'a Schema,
name: &str,
) -> Option<(usize, &'a FieldRef)> {
let (root_idx, field) = arrow_schema.fields.find(name)?;
if field.data_type().is_nested() {
return None;
}
let parquet_idx = (0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
}
#[cfg(test)]
mod test {
use crate::arrow::ArrowWriter;
use crate::file::metadata::{
ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader, ParquetMetaDataWriter,
};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::schema::parser::parse_message_type;
use crate::schema::types::SchemaDescriptor;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use bytes::Bytes;
use std::sync::Arc;
use super::ProjectionMask;
#[test]
#[allow(deprecated)]
fn test_metadata_read_write_partial_offset() {
let parquet_bytes = create_parquet_file();
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let original_metadata = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.parse_and_finish(&parquet_bytes)
.unwrap();
let metadata_bytes = metadata_to_bytes(&original_metadata);
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let err = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.with_page_indexes(true) .parse_and_finish(&metadata_bytes)
.err()
.unwrap();
assert_eq!(
err.to_string(),
"EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..357"
);
}
#[test]
fn test_metadata_read_write_roundtrip() {
let parquet_bytes = create_parquet_file();
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let original_metadata = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.parse_and_finish(&parquet_bytes)
.unwrap();
let metadata_bytes = metadata_to_bytes(&original_metadata);
assert_ne!(
metadata_bytes.len(),
parquet_bytes.len(),
"metadata is subset of parquet"
);
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.parse_and_finish(&metadata_bytes)
.unwrap();
assert_eq!(original_metadata, roundtrip_metadata);
}
#[test]
#[allow(deprecated)]
fn test_metadata_read_write_roundtrip_page_index() {
let parquet_bytes = create_parquet_file();
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let original_metadata = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.with_page_indexes(true)
.parse_and_finish(&parquet_bytes)
.unwrap();
let metadata_bytes = metadata_to_bytes(&original_metadata);
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.with_page_indexes(true)
.parse_and_finish(&metadata_bytes)
.unwrap();
let original_metadata = normalize_locations(original_metadata);
let roundtrip_metadata = normalize_locations(roundtrip_metadata);
assert_eq!(
format!("{original_metadata:#?}"),
format!("{roundtrip_metadata:#?}")
);
assert_eq!(original_metadata, roundtrip_metadata);
}
fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
let mut metadata_builder = metadata.into_builder();
for rg in metadata_builder.take_row_groups() {
let mut rg_builder = rg.into_builder();
for col in rg_builder.take_columns() {
rg_builder = rg_builder.add_column_metadata(
col.into_builder()
.set_offset_index_offset(None)
.set_index_page_offset(None)
.set_column_index_offset(None)
.build()
.unwrap(),
);
}
let rg = rg_builder.build().unwrap();
metadata_builder = metadata_builder.add_row_group(rg);
}
metadata_builder.build()
}
fn create_parquet_file() -> Bytes {
let mut buf = vec![];
let data = vec![100, 200, 201, 300, 102, 33];
let array: ArrayRef = Arc::new(Int32Array::from(data));
let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_write_page_header_statistics(true)
.build();
let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);
Bytes::from(buf)
}
fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
let mut buf = vec![];
ParquetMetaDataWriter::new(&mut buf, metadata)
.finish()
.unwrap();
Bytes::from(buf)
}
#[test]
fn test_mask_from_column_names() {
let schema = parse_schema(
"
message test_schema {
OPTIONAL group a (MAP) {
REPEATED group key_value {
REQUIRED BYTE_ARRAY key (UTF8);
OPTIONAL group value (MAP) {
REPEATED group key_value {
REQUIRED INT32 key;
REQUIRED BOOLEAN value;
}
}
}
}
REQUIRED INT32 b;
REQUIRED DOUBLE c;
}
",
);
let mask = ProjectionMask::columns(&schema, ["foo", "bar"]);
assert_eq!(mask.mask.unwrap(), vec![false; 5]);
let mask = ProjectionMask::columns(&schema, []);
assert_eq!(mask.mask.unwrap(), vec![false; 5]);
let mask = ProjectionMask::columns(&schema, ["a", "c"]);
assert_eq!(mask.mask.unwrap(), [true, true, true, false, true]);
let mask = ProjectionMask::columns(&schema, ["a.key_value.key", "c"]);
assert_eq!(mask.mask.unwrap(), [true, false, false, false, true]);
let mask = ProjectionMask::columns(&schema, ["a.key_value.value", "b"]);
assert_eq!(mask.mask.unwrap(), [false, true, true, true, false]);
let schema = parse_schema(
"
message test_schema {
OPTIONAL group a (LIST) {
REPEATED group list {
OPTIONAL group element (LIST) {
REPEATED group list {
OPTIONAL group element (LIST) {
REPEATED group list {
OPTIONAL BYTE_ARRAY element (UTF8);
}
}
}
}
}
}
REQUIRED INT32 b;
}
",
);
let mask = ProjectionMask::columns(&schema, ["a", "b"]);
assert_eq!(mask.mask.unwrap(), [true, true]);
let mask = ProjectionMask::columns(&schema, ["a.list.element", "b"]);
assert_eq!(mask.mask.unwrap(), [true, true]);
let mask =
ProjectionMask::columns(&schema, ["a.list.element.list.element.list.element", "b"]);
assert_eq!(mask.mask.unwrap(), [true, true]);
let mask = ProjectionMask::columns(&schema, ["b"]);
assert_eq!(mask.mask.unwrap(), [false, true]);
let schema = parse_schema(
"
message test_schema {
OPTIONAL INT32 a;
OPTIONAL INT32 b;
OPTIONAL INT32 c;
OPTIONAL INT32 d;
OPTIONAL INT32 e;
}
",
);
let mask = ProjectionMask::columns(&schema, ["a", "b"]);
assert_eq!(mask.mask.unwrap(), [true, true, false, false, false]);
let mask = ProjectionMask::columns(&schema, ["d", "b", "d"]);
assert_eq!(mask.mask.unwrap(), [false, true, false, true, false]);
let schema = parse_schema(
"
message test_schema {
OPTIONAL INT32 a;
OPTIONAL INT32 b;
OPTIONAL INT32 a;
OPTIONAL INT32 d;
OPTIONAL INT32 e;
}
",
);
let mask = ProjectionMask::columns(&schema, ["a", "e"]);
assert_eq!(mask.mask.unwrap(), [true, false, true, false, true]);
let schema = parse_schema(
"
message test_schema {
OPTIONAL INT32 a;
OPTIONAL INT32 aa;
}
",
);
let mask = ProjectionMask::columns(&schema, ["a"]);
assert_eq!(mask.mask.unwrap(), [true, false]);
}
#[test]
fn test_projection_mask_union() {
let mut mask1 = ProjectionMask {
mask: Some(vec![true, false, true]),
};
let mask2 = ProjectionMask {
mask: Some(vec![false, true, true]),
};
mask1.union(&mask2);
assert_eq!(mask1.mask, Some(vec![true, true, true]));
let mut mask1 = ProjectionMask { mask: None };
let mask2 = ProjectionMask {
mask: Some(vec![false, true, true]),
};
mask1.union(&mask2);
assert_eq!(mask1.mask, None);
let mut mask1 = ProjectionMask {
mask: Some(vec![true, false, true]),
};
let mask2 = ProjectionMask { mask: None };
mask1.union(&mask2);
assert_eq!(mask1.mask, None);
let mut mask1 = ProjectionMask { mask: None };
let mask2 = ProjectionMask { mask: None };
mask1.union(&mask2);
assert_eq!(mask1.mask, None);
}
#[test]
fn test_projection_mask_intersect() {
let mut mask1 = ProjectionMask {
mask: Some(vec![true, false, true]),
};
let mask2 = ProjectionMask {
mask: Some(vec![false, true, true]),
};
mask1.intersect(&mask2);
assert_eq!(mask1.mask, Some(vec![false, false, true]));
let mut mask1 = ProjectionMask { mask: None };
let mask2 = ProjectionMask {
mask: Some(vec![false, true, true]),
};
mask1.intersect(&mask2);
assert_eq!(mask1.mask, Some(vec![false, true, true]));
let mut mask1 = ProjectionMask {
mask: Some(vec![true, false, true]),
};
let mask2 = ProjectionMask { mask: None };
mask1.intersect(&mask2);
assert_eq!(mask1.mask, Some(vec![true, false, true]));
let mut mask1 = ProjectionMask { mask: None };
let mask2 = ProjectionMask { mask: None };
mask1.intersect(&mask2);
assert_eq!(mask1.mask, None);
}
#[test]
fn test_projection_mask_without_nested_no_nested() {
let schema = parse_schema(
"
message test_schema {
OPTIONAL INT32 a;
OPTIONAL INT32 b;
REQUIRED DOUBLE d;
}
",
);
let mask = ProjectionMask::all();
assert_eq!(
Some(ProjectionMask::leaves(&schema, [0, 1, 2])),
mask.without_nested_types(&schema)
);
let mask = ProjectionMask::leaves(&schema, [1, 2]);
assert_eq!(Some(mask.clone()), mask.without_nested_types(&schema));
}
#[test]
fn test_projection_mask_without_nested_nested() {
let schema = parse_schema(
"
message test_schema {
OPTIONAL INT32 a;
OPTIONAL group b {
REQUIRED INT32 b1;
OPTIONAL INT64 b2;
}
OPTIONAL group c (LIST) {
REPEATED group list {
OPTIONAL INT32 element;
}
}
REQUIRED DOUBLE d;
}
",
);
let mask = ProjectionMask::all();
assert_eq!(
Some(ProjectionMask::leaves(&schema, [0, 4])),
mask.without_nested_types(&schema)
);
let mask = ProjectionMask::leaves(&schema, [1]);
assert_eq!(None, mask.without_nested_types(&schema));
let mask = ProjectionMask::leaves(&schema, [1, 4]);
assert_eq!(
Some(ProjectionMask::leaves(&schema, [4])),
mask.without_nested_types(&schema)
);
let mask = ProjectionMask::leaves(&schema, [3]);
assert_eq!(None, mask.without_nested_types(&schema));
}
#[test]
fn test_projection_mask_without_nested_map_only() {
let schema = parse_schema(
"
message test_schema {
required group my_map (MAP) {
repeated group key_value {
required binary key (STRING);
optional int32 value;
}
}
}
",
);
let mask = ProjectionMask::all();
assert_eq!(None, mask.without_nested_types(&schema));
let mask = ProjectionMask::leaves(&schema, [0]);
assert_eq!(None, mask.without_nested_types(&schema));
let mask = ProjectionMask::leaves(&schema, [1]);
assert_eq!(None, mask.without_nested_types(&schema));
}
#[test]
fn test_projection_mask_without_nested_map_with_non_nested() {
let schema = parse_schema(
"
message test_schema {
REQUIRED INT32 a;
required group my_map (MAP) {
repeated group key_value {
required binary key (STRING);
optional int32 value;
}
}
REQUIRED INT32 b;
}
",
);
let mask = ProjectionMask::all();
assert_eq!(
Some(ProjectionMask::leaves(&schema, [0, 3])),
mask.without_nested_types(&schema)
);
let mask = ProjectionMask::leaves(&schema, [1, 2, 3]);
assert_eq!(
Some(ProjectionMask::leaves(&schema, [3])),
mask.without_nested_types(&schema)
);
let mask = ProjectionMask::leaves(&schema, [1, 2]);
assert_eq!(None, mask.without_nested_types(&schema));
}
#[test]
fn test_projection_mask_without_nested_deeply_nested() {
let schema = parse_schema(
"
message test_schema {
OPTIONAL group a (MAP) {
REPEATED group key_value {
REQUIRED BYTE_ARRAY key (UTF8);
OPTIONAL group value (MAP) {
REPEATED group key_value {
REQUIRED INT32 key;
REQUIRED BOOLEAN value;
}
}
}
}
REQUIRED INT32 b;
REQUIRED DOUBLE c;
",
);
let mask = ProjectionMask::all();
assert_eq!(
Some(ProjectionMask::leaves(&schema, [3, 4])),
mask.without_nested_types(&schema)
);
let mask = ProjectionMask::leaves(&schema, [0, 4]);
assert_eq!(
Some(ProjectionMask::leaves(&schema, [4])),
mask.without_nested_types(&schema)
);
let mask = ProjectionMask::leaves(&schema, [1, 2, 3]);
assert_eq!(
Some(ProjectionMask::leaves(&schema, [3])),
mask.without_nested_types(&schema)
);
let mask = ProjectionMask::leaves(&schema, [0]);
assert_eq!(None, mask.without_nested_types(&schema));
}
#[test]
fn test_projection_mask_without_nested_list() {
let schema = parse_schema(
"
message test_schema {
required group my_list (LIST) {
repeated group list {
optional binary element (STRING);
}
}
REQUIRED INT32 b;
}
",
);
let mask = ProjectionMask::all();
assert_eq!(
Some(ProjectionMask::leaves(&schema, [1])),
mask.without_nested_types(&schema),
);
let mask = ProjectionMask::leaves(&schema, [0]);
assert_eq!(None, mask.without_nested_types(&schema));
let mask = ProjectionMask::leaves(&schema, [0, 1]);
assert_eq!(
Some(ProjectionMask::leaves(&schema, [1])),
mask.without_nested_types(&schema),
);
}
fn parse_schema(schema: &str) -> SchemaDescriptor {
let parquet_group_type = parse_message_type(schema).unwrap();
SchemaDescriptor::new(Arc::new(parquet_group_type))
}
}