#![cfg_attr(feature = "encryption", doc = "```rust")]
#![cfg_attr(not(feature = "encryption"), doc = "```ignore")]
experimental!(mod array_reader);
pub mod arrow_reader;
pub mod arrow_writer;
mod buffer;
mod decoder;
#[cfg(feature = "async")]
pub mod async_reader;
#[cfg(feature = "async")]
pub mod async_writer;
mod record_reader;
experimental!(mod schema);
use std::sync::Arc;
pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
#[cfg(feature = "async")]
pub use self::async_writer::AsyncArrowWriter;
use crate::schema::types::{SchemaDescriptor, Type};
use arrow_schema::{FieldRef, Schema};
pub use self::schema::{
add_encoded_arrow_schema_to_metadata, encode_arrow_schema, parquet_to_arrow_field_levels,
parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, ArrowSchemaConverter, FieldLevels,
};
pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProjectionMask {
mask: Option<Vec<bool>>,
}
impl ProjectionMask {
pub fn all() -> Self {
Self { mask: None }
}
pub fn none(len: usize) -> Self {
Self {
mask: Some(vec![false; len]),
}
}
pub fn leaves(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
let mut mask = vec![false; schema.num_columns()];
for leaf_idx in indices {
mask[leaf_idx] = true;
}
Self { mask: Some(mask) }
}
pub fn roots(schema: &SchemaDescriptor, indices: impl IntoIterator<Item = usize>) -> Self {
let num_root_columns = schema.root_schema().get_fields().len();
let mut root_mask = vec![false; num_root_columns];
for root_idx in indices {
root_mask[root_idx] = true;
}
let mask = (0..schema.num_columns())
.map(|leaf_idx| {
let root_idx = schema.get_column_root_idx(leaf_idx);
root_mask[root_idx]
})
.collect();
Self { mask: Some(mask) }
}
fn find_leaves(root: &Arc<Type>, parent: Option<&String>, paths: &mut Vec<String>) {
let path = parent
.map(|p| [p, root.name()].join("."))
.unwrap_or(root.name().to_string());
if root.is_group() {
for child in root.get_fields() {
Self::find_leaves(child, Some(&path), paths);
}
} else {
paths.push(path);
}
}
pub fn columns<'a>(
schema: &SchemaDescriptor,
names: impl IntoIterator<Item = &'a str>,
) -> Self {
let mut paths: Vec<String> = vec![];
for root in schema.root_schema().get_fields() {
Self::find_leaves(root, None, &mut paths);
}
assert_eq!(paths.len(), schema.num_columns());
let mut mask = vec![false; schema.num_columns()];
for name in names {
for idx in 0..schema.num_columns() {
if paths[idx].starts_with(name) {
mask[idx] = true;
}
}
}
Self { mask: Some(mask) }
}
pub fn leaf_included(&self, leaf_idx: usize) -> bool {
self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
}
pub fn union(&mut self, other: &Self) {
match (self.mask.as_ref(), other.mask.as_ref()) {
(None, _) | (_, None) => self.mask = None,
(Some(a), Some(b)) => {
debug_assert_eq!(a.len(), b.len());
let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a || b).collect();
self.mask = Some(mask);
}
}
}
pub fn intersect(&mut self, other: &Self) {
match (self.mask.as_ref(), other.mask.as_ref()) {
(None, _) => self.mask = other.mask.clone(),
(_, None) => {}
(Some(a), Some(b)) => {
debug_assert_eq!(a.len(), b.len());
let mask = a.iter().zip(b.iter()).map(|(&a, &b)| a && b).collect();
self.mask = Some(mask);
}
}
}
}
pub fn parquet_column<'a>(
parquet_schema: &SchemaDescriptor,
arrow_schema: &'a Schema,
name: &str,
) -> Option<(usize, &'a FieldRef)> {
let (root_idx, field) = arrow_schema.fields.find(name)?;
if field.data_type().is_nested() {
return None;
}
let parquet_idx = (0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
}
#[cfg(test)]
mod test {
use crate::arrow::ArrowWriter;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::schema::parser::parse_message_type;
use crate::schema::types::SchemaDescriptor;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use bytes::Bytes;
use std::sync::Arc;
use super::ProjectionMask;
#[test]
#[allow(deprecated)]
fn test_metadata_read_write_partial_offset() {
let parquet_bytes = create_parquet_file();
let original_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&parquet_bytes)
.unwrap();
let metadata_bytes = metadata_to_bytes(&original_metadata);
let err = ParquetMetaDataReader::new()
.with_page_indexes(true) .parse_and_finish(&metadata_bytes)
.err()
.unwrap();
assert_eq!(
err.to_string(),
"EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..357"
);
}
#[test]
fn test_metadata_read_write_roundtrip() {
let parquet_bytes = create_parquet_file();
let original_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&parquet_bytes)
.unwrap();
let metadata_bytes = metadata_to_bytes(&original_metadata);
assert_ne!(
metadata_bytes.len(),
parquet_bytes.len(),
"metadata is subset of parquet"
);
let roundtrip_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&metadata_bytes)
.unwrap();
assert_eq!(original_metadata, roundtrip_metadata);
}
#[test]
#[allow(deprecated)]
fn test_metadata_read_write_roundtrip_page_index() {
let parquet_bytes = create_parquet_file();
let original_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&parquet_bytes)
.unwrap();
let metadata_bytes = metadata_to_bytes(&original_metadata);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&metadata_bytes)
.unwrap();
let original_metadata = normalize_locations(original_metadata);
let roundtrip_metadata = normalize_locations(roundtrip_metadata);
assert_eq!(
format!("{original_metadata:#?}"),
format!("{roundtrip_metadata:#?}")
);
assert_eq!(original_metadata, roundtrip_metadata);
}
fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
let mut metadata_builder = metadata.into_builder();
for rg in metadata_builder.take_row_groups() {
let mut rg_builder = rg.into_builder();
for col in rg_builder.take_columns() {
rg_builder = rg_builder.add_column_metadata(
col.into_builder()
.set_offset_index_offset(None)
.set_index_page_offset(None)
.set_column_index_offset(None)
.build()
.unwrap(),
);
}
let rg = rg_builder.build().unwrap();
metadata_builder = metadata_builder.add_row_group(rg);
}
metadata_builder.build()
}
fn create_parquet_file() -> Bytes {
let mut buf = vec![];
let data = vec![100, 200, 201, 300, 102, 33];
let array: ArrayRef = Arc::new(Int32Array::from(data));
let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_write_page_header_statistics(true)
.build();
let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);
Bytes::from(buf)
}
fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
let mut buf = vec![];
ParquetMetaDataWriter::new(&mut buf, metadata)
.finish()
.unwrap();
Bytes::from(buf)
}
#[test]
fn test_mask_from_column_names() {
let message_type = "
message test_schema {
OPTIONAL group a (MAP) {
REPEATED group key_value {
REQUIRED BYTE_ARRAY key (UTF8);
OPTIONAL group value (MAP) {
REPEATED group key_value {
REQUIRED INT32 key;
REQUIRED BOOLEAN value;
}
}
}
}
REQUIRED INT32 b;
REQUIRED DOUBLE c;
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let mask = ProjectionMask::columns(&schema, ["foo", "bar"]);
assert_eq!(mask.mask.unwrap(), vec![false; 5]);
let mask = ProjectionMask::columns(&schema, []);
assert_eq!(mask.mask.unwrap(), vec![false; 5]);
let mask = ProjectionMask::columns(&schema, ["a", "c"]);
assert_eq!(mask.mask.unwrap(), [true, true, true, false, true]);
let mask = ProjectionMask::columns(&schema, ["a.key_value.key", "c"]);
assert_eq!(mask.mask.unwrap(), [true, false, false, false, true]);
let mask = ProjectionMask::columns(&schema, ["a.key_value.value", "b"]);
assert_eq!(mask.mask.unwrap(), [false, true, true, true, false]);
let message_type = "
message test_schema {
OPTIONAL group a (LIST) {
REPEATED group list {
OPTIONAL group element (LIST) {
REPEATED group list {
OPTIONAL group element (LIST) {
REPEATED group list {
OPTIONAL BYTE_ARRAY element (UTF8);
}
}
}
}
}
}
REQUIRED INT32 b;
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let mask = ProjectionMask::columns(&schema, ["a", "b"]);
assert_eq!(mask.mask.unwrap(), [true, true]);
let mask = ProjectionMask::columns(&schema, ["a.list.element", "b"]);
assert_eq!(mask.mask.unwrap(), [true, true]);
let mask =
ProjectionMask::columns(&schema, ["a.list.element.list.element.list.element", "b"]);
assert_eq!(mask.mask.unwrap(), [true, true]);
let mask = ProjectionMask::columns(&schema, ["b"]);
assert_eq!(mask.mask.unwrap(), [false, true]);
let message_type = "
message test_schema {
OPTIONAL INT32 a;
OPTIONAL INT32 b;
OPTIONAL INT32 c;
OPTIONAL INT32 d;
OPTIONAL INT32 e;
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let mask = ProjectionMask::columns(&schema, ["a", "b"]);
assert_eq!(mask.mask.unwrap(), [true, true, false, false, false]);
let mask = ProjectionMask::columns(&schema, ["d", "b", "d"]);
assert_eq!(mask.mask.unwrap(), [false, true, false, true, false]);
let message_type = "
message test_schema {
OPTIONAL INT32 a;
OPTIONAL INT32 b;
OPTIONAL INT32 a;
OPTIONAL INT32 d;
OPTIONAL INT32 e;
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let mask = ProjectionMask::columns(&schema, ["a", "e"]);
assert_eq!(mask.mask.unwrap(), [true, false, true, false, true]);
}
#[test]
fn test_projection_mask_union() {
let mut mask1 = ProjectionMask {
mask: Some(vec![true, false, true]),
};
let mask2 = ProjectionMask {
mask: Some(vec![false, true, true]),
};
mask1.union(&mask2);
assert_eq!(mask1.mask, Some(vec![true, true, true]));
let mut mask1 = ProjectionMask { mask: None };
let mask2 = ProjectionMask {
mask: Some(vec![false, true, true]),
};
mask1.union(&mask2);
assert_eq!(mask1.mask, None);
let mut mask1 = ProjectionMask {
mask: Some(vec![true, false, true]),
};
let mask2 = ProjectionMask { mask: None };
mask1.union(&mask2);
assert_eq!(mask1.mask, None);
let mut mask1 = ProjectionMask { mask: None };
let mask2 = ProjectionMask { mask: None };
mask1.union(&mask2);
assert_eq!(mask1.mask, None);
}
#[test]
fn test_projection_mask_intersect() {
let mut mask1 = ProjectionMask {
mask: Some(vec![true, false, true]),
};
let mask2 = ProjectionMask {
mask: Some(vec![false, true, true]),
};
mask1.intersect(&mask2);
assert_eq!(mask1.mask, Some(vec![false, false, true]));
let mut mask1 = ProjectionMask { mask: None };
let mask2 = ProjectionMask {
mask: Some(vec![false, true, true]),
};
mask1.intersect(&mask2);
assert_eq!(mask1.mask, Some(vec![false, true, true]));
let mut mask1 = ProjectionMask {
mask: Some(vec![true, false, true]),
};
let mask2 = ProjectionMask { mask: None };
mask1.intersect(&mask2);
assert_eq!(mask1.mask, Some(vec![true, false, true]));
let mut mask1 = ProjectionMask { mask: None };
let mask2 = ProjectionMask { mask: None };
mask1.intersect(&mask2);
assert_eq!(mask1.mask, None);
}
}