mod binary;
mod boolean;
mod dictionary;
mod file;
mod fixed_len_bytes;
mod nested;
mod pages;
mod primitive;
mod row_group;
mod schema;
mod sink;
mod utf8;
mod utils;
use crate::array::*;
use crate::datatypes::*;
use crate::error::{Error, Result};
use crate::types::days_ms;
use crate::types::i256;
use crate::types::NativeType;
pub use nested::{num_values, write_rep_and_def};
pub use pages::{to_leaves, to_nested, to_parquet_leaves};
use parquet2::schema::types::PrimitiveType as ParquetPrimitiveType;
pub use parquet2::{
compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel},
encoding::Encoding,
fallible_streaming_iterator,
metadata::{Descriptor, FileMetaData, KeyValue, SchemaDescriptor, ThriftFileMetaData},
page::{CompressedDataPage, CompressedPage, Page},
schema::types::{FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType},
write::{
compress, write_metadata_sidecar, Compressor, DynIter, DynStreamingIterator, RowGroupIter,
Version,
},
FallibleStreamingIterator,
};
pub use utils::write_def_levels;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WriteOptions {
pub write_statistics: bool,
pub version: Version,
pub compression: CompressionOptions,
pub data_pagesize_limit: Option<usize>,
}
use crate::compute::aggregate::estimated_bytes_size;
pub use file::FileWriter;
pub use row_group::{row_group_iter, RowGroupIterator};
pub use schema::to_parquet_type;
pub use sink::FileSink;
pub use pages::array_to_columns;
pub use pages::Nested;
pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
let mut out = (0, 0);
for nested in nested.iter().rev() {
match nested {
Nested::LargeList(l_nested) => {
let start = *l_nested.offsets.first();
let end = *l_nested.offsets.last();
return (start as usize, (end - start) as usize);
}
Nested::List(l_nested) => {
let start = *l_nested.offsets.first();
let end = *l_nested.offsets.last();
return (start as usize, (end - start) as usize);
}
Nested::Primitive(_, _, len) => out = (0, *len),
_ => {}
}
}
out
}
pub(self) fn decimal_length_from_precision(precision: usize) -> usize {
(((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
}
pub fn to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
let parquet_types = schema
.fields
.iter()
.map(to_parquet_type)
.collect::<Result<Vec<_>>>()?;
Ok(SchemaDescriptor::new("root".to_string(), parquet_types))
}
pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {
if let (Encoding::DeltaBinaryPacked, DataType::Decimal(p, _)) =
(encoding, data_type.to_logical_type())
{
return *p <= 18;
};
matches!(
(encoding, data_type.to_logical_type()),
(Encoding::Plain, _)
| (
Encoding::DeltaLengthByteArray,
DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8,
)
| (Encoding::RleDictionary, DataType::Dictionary(_, _, _))
| (Encoding::PlainDictionary, DataType::Dictionary(_, _, _))
| (
Encoding::DeltaBinaryPacked,
DataType::Null
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Date32
| DataType::Time32(_)
| DataType::Int64
| DataType::Date64
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Duration(_)
)
)
}
pub fn slice_parquet_array(
primitive_array: &mut dyn Array,
nested: &mut [Nested],
mut current_offset: usize,
mut current_length: usize,
) {
for nested in nested.iter_mut() {
match nested {
Nested::LargeList(l_nested) => {
l_nested.offsets.slice(current_offset, current_length + 1);
if let Some(validity) = l_nested.validity.as_mut() {
validity.slice(current_offset, current_length)
};
current_length = l_nested.offsets.range() as usize;
current_offset = *l_nested.offsets.first() as usize;
}
Nested::List(l_nested) => {
l_nested.offsets.slice(current_offset, current_length + 1);
if let Some(validity) = l_nested.validity.as_mut() {
validity.slice(current_offset, current_length)
};
current_length = l_nested.offsets.range() as usize;
current_offset = *l_nested.offsets.first() as usize;
}
Nested::Struct(validity, _, length) => {
*length = current_length;
if let Some(validity) = validity.as_mut() {
validity.slice(current_offset, current_length)
};
}
Nested::Primitive(validity, _, length) => {
*length = current_length;
if let Some(validity) = validity.as_mut() {
validity.slice(current_offset, current_length)
};
primitive_array.slice(current_offset, current_length);
}
}
}
}
pub fn get_max_length(nested: &[Nested]) -> usize {
let mut length = 0;
for nested in nested.iter() {
match nested {
Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
_ => {}
}
}
length
}
pub fn array_to_pages(
primitive_array: &dyn Array,
type_: ParquetPrimitiveType,
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<Page>>> {
if let DataType::Dictionary(key_type, _, _) = primitive_array.data_type().to_logical_type() {
return match_integer_type!(key_type, |$T| {
dictionary::array_to_pages::<$T>(
primitive_array.as_any().downcast_ref().unwrap(),
type_,
&nested,
options,
encoding,
)
});
};
let nested = nested.to_vec();
let primitive_array = primitive_array.to_boxed();
let number_of_rows = nested[0].len();
let byte_size = estimated_bytes_size(primitive_array.as_ref());
const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let max_page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);
let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); let bytes_per_row = if number_of_rows == 0 {
0
} else {
((byte_size as f64) / (number_of_rows as f64)) as usize
};
let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
let pages = (0..number_of_rows)
.step_by(rows_per_page)
.map(move |offset| {
let length = if offset + rows_per_page > number_of_rows {
number_of_rows - offset
} else {
rows_per_page
};
let mut right_array = primitive_array.clone();
let mut right_nested = nested.clone();
slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
array_to_page(
right_array.as_ref(),
type_.clone(),
&right_nested,
options,
encoding,
)
});
Ok(DynIter::new(pages))
}
pub fn array_to_page(
array: &dyn Array,
type_: ParquetPrimitiveType,
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<Page> {
if nested.len() == 1 {
return array_to_page_simple(array, type_, options, encoding);
}
array_to_page_nested(array, type_, nested, options, encoding)
}
pub fn array_to_page_simple(
array: &dyn Array,
type_: ParquetPrimitiveType,
options: WriteOptions,
encoding: Encoding,
) -> Result<Page> {
let data_type = array.data_type();
if !can_encode(data_type, encoding) {
return Err(Error::InvalidArgumentError(format!(
"The datatype {data_type:?} cannot be encoded by {encoding:?}"
)));
}
match data_type.to_logical_type() {
DataType::Boolean => {
boolean::array_to_page(array.as_any().downcast_ref().unwrap(), options, type_)
}
DataType::UInt8 => primitive::array_to_page_integer::<u8, i32>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
),
DataType::UInt16 => primitive::array_to_page_integer::<u16, i32>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
),
DataType::UInt32 => primitive::array_to_page_integer::<u32, i32>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
),
DataType::UInt64 => primitive::array_to_page_integer::<u64, i64>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
),
DataType::Int8 => primitive::array_to_page_integer::<i8, i32>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
),
DataType::Int16 => primitive::array_to_page_integer::<i16, i32>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
),
DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
primitive::array_to_page_integer::<i32, i32>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
)
}
DataType::Int64
| DataType::Date64
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Duration(_) => primitive::array_to_page_integer::<i64, i64>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
),
DataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
),
DataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
),
DataType::Utf8 => utf8::array_to_page::<i32>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
),
DataType::LargeUtf8 => utf8::array_to_page::<i64>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
),
DataType::Binary => binary::array_to_page::<i32>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
),
DataType::LargeBinary => binary::array_to_page::<i64>(
array.as_any().downcast_ref().unwrap(),
options,
type_,
encoding,
),
DataType::Null => {
let array = Int32Array::new_null(DataType::Int32, array.len());
primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
}
DataType::Interval(IntervalUnit::YearMonth) => {
let type_ = type_;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i32>>()
.unwrap();
let mut values = Vec::<u8>::with_capacity(12 * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_le_bytes();
values.extend_from_slice(bytes);
values.extend_from_slice(&[0; 8]);
});
let array = FixedSizeBinaryArray::new(
DataType::FixedSizeBinary(12),
values.into(),
array.validity().cloned(),
);
let statistics = if options.write_statistics {
Some(fixed_len_bytes::build_statistics(&array, type_.clone()))
} else {
None
};
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
}
DataType::Interval(IntervalUnit::DayTime) => {
let type_ = type_;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<days_ms>>()
.unwrap();
let mut values = Vec::<u8>::with_capacity(12 * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_le_bytes();
values.extend_from_slice(&[0; 4]); values.extend_from_slice(bytes); });
let array = FixedSizeBinaryArray::new(
DataType::FixedSizeBinary(12),
values.into(),
array.validity().cloned(),
);
let statistics = if options.write_statistics {
Some(fixed_len_bytes::build_statistics(&array, type_.clone()))
} else {
None
};
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
}
DataType::FixedSizeBinary(_) => {
let type_ = type_;
let array = array.as_any().downcast_ref().unwrap();
let statistics = if options.write_statistics {
Some(fixed_len_bytes::build_statistics(array, type_.clone()))
} else {
None
};
fixed_len_bytes::array_to_page(array, options, type_, statistics)
}
DataType::Decimal256(precision, _) => {
let type_ = type_;
let precision = *precision;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
.unwrap();
if precision <= 9 {
let values = array
.values()
.iter()
.map(|x| x.0.as_i32())
.collect::<Vec<_>>()
.into();
let array =
PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
primitive::array_to_page_integer::<i32, i32>(&array, options, type_, encoding)
} else if precision <= 18 {
let values = array
.values()
.iter()
.map(|x| x.0.as_i64())
.collect::<Vec<_>>()
.into();
let array =
PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
primitive::array_to_page_integer::<i64, i64>(&array, options, type_, encoding)
} else if precision <= 38 {
let size = decimal_length_from_precision(precision);
let statistics = if options.write_statistics {
let stats = fixed_len_bytes::build_statistics_decimal256_with_i128(
array,
type_.clone(),
size,
);
Some(stats)
} else {
None
};
let mut values = Vec::<u8>::with_capacity(size * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.0.low().to_be_bytes()[16 - size..];
values.extend_from_slice(bytes)
});
let array = FixedSizeBinaryArray::new(
DataType::FixedSizeBinary(size),
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
} else {
let size = 32;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
.unwrap();
let statistics = if options.write_statistics {
let stats =
fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size);
Some(stats)
} else {
None
};
let mut values = Vec::<u8>::with_capacity(size * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_be_bytes();
values.extend_from_slice(bytes)
});
let array = FixedSizeBinaryArray::new(
DataType::FixedSizeBinary(size),
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
}
}
DataType::Decimal(precision, _) => {
let type_ = type_;
let precision = *precision;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i128>>()
.unwrap();
if precision <= 9 {
let values = array
.values()
.iter()
.map(|x| *x as i32)
.collect::<Vec<_>>()
.into();
let array =
PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
primitive::array_to_page_integer::<i32, i32>(&array, options, type_, encoding)
} else if precision <= 18 {
let values = array
.values()
.iter()
.map(|x| *x as i64)
.collect::<Vec<_>>()
.into();
let array =
PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
primitive::array_to_page_integer::<i64, i64>(&array, options, type_, encoding)
} else {
let size = decimal_length_from_precision(precision);
let statistics = if options.write_statistics {
let stats =
fixed_len_bytes::build_statistics_decimal(array, type_.clone(), size);
Some(stats)
} else {
None
};
let mut values = Vec::<u8>::with_capacity(size * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_be_bytes()[16 - size..];
values.extend_from_slice(bytes)
});
let array = FixedSizeBinaryArray::new(
DataType::FixedSizeBinary(size),
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
}
}
other => Err(Error::NotYetImplemented(format!(
"Writing parquet pages for data type {other:?}"
))),
}
.map(Page::Data)
}
fn array_to_page_nested(
array: &dyn Array,
type_: ParquetPrimitiveType,
nested: &[Nested],
options: WriteOptions,
_encoding: Encoding,
) -> Result<Page> {
use DataType::*;
match array.data_type().to_logical_type() {
Null => {
let array = Int32Array::new_null(DataType::Int32, array.len());
primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
}
Boolean => {
let array = array.as_any().downcast_ref().unwrap();
boolean::nested_array_to_page(array, options, type_, nested)
}
Utf8 => {
let array = array.as_any().downcast_ref().unwrap();
utf8::nested_array_to_page::<i32>(array, options, type_, nested)
}
LargeUtf8 => {
let array = array.as_any().downcast_ref().unwrap();
utf8::nested_array_to_page::<i64>(array, options, type_, nested)
}
Binary => {
let array = array.as_any().downcast_ref().unwrap();
binary::nested_array_to_page::<i32>(array, options, type_, nested)
}
LargeBinary => {
let array = array.as_any().downcast_ref().unwrap();
binary::nested_array_to_page::<i64>(array, options, type_, nested)
}
UInt8 => {
let array = array.as_any().downcast_ref().unwrap();
primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
}
UInt16 => {
let array = array.as_any().downcast_ref().unwrap();
primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
}
UInt32 => {
let array = array.as_any().downcast_ref().unwrap();
primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
}
UInt64 => {
let array = array.as_any().downcast_ref().unwrap();
primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
}
Int8 => {
let array = array.as_any().downcast_ref().unwrap();
primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
}
Int16 => {
let array = array.as_any().downcast_ref().unwrap();
primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
}
Int32 | Date32 | Time32(_) => {
let array = array.as_any().downcast_ref().unwrap();
primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
}
Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
let array = array.as_any().downcast_ref().unwrap();
primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
}
Float32 => {
let array = array.as_any().downcast_ref().unwrap();
primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
}
Float64 => {
let array = array.as_any().downcast_ref().unwrap();
primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
}
Decimal(precision, _) => {
let type_ = type_;
let precision = *precision;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i128>>()
.unwrap();
if precision <= 9 {
let values = array
.values()
.iter()
.map(|x| *x as i32)
.collect::<Vec<_>>()
.into();
let array =
PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
} else if precision <= 18 {
let values = array
.values()
.iter()
.map(|x| *x as i64)
.collect::<Vec<_>>()
.into();
let array =
PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
} else {
let size = decimal_length_from_precision(precision);
let statistics = if options.write_statistics {
let stats =
fixed_len_bytes::build_statistics_decimal(array, type_.clone(), size);
Some(stats)
} else {
None
};
let mut values = Vec::<u8>::with_capacity(size * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_be_bytes()[16 - size..];
values.extend_from_slice(bytes)
});
let array = FixedSizeBinaryArray::new(
DataType::FixedSizeBinary(size),
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
}
}
Decimal256(precision, _) => {
let type_ = type_;
let precision = *precision;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
.unwrap();
if precision <= 9 {
let values = array
.values()
.iter()
.map(|x| x.0.as_i32())
.collect::<Vec<_>>()
.into();
let array =
PrimitiveArray::<i32>::new(DataType::Int32, values, array.validity().cloned());
primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
} else if precision <= 18 {
let values = array
.values()
.iter()
.map(|x| x.0.as_i64())
.collect::<Vec<_>>()
.into();
let array =
PrimitiveArray::<i64>::new(DataType::Int64, values, array.validity().cloned());
primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
} else if precision <= 38 {
let size = decimal_length_from_precision(precision);
let statistics = if options.write_statistics {
let stats = fixed_len_bytes::build_statistics_decimal256_with_i128(
array,
type_.clone(),
size,
);
Some(stats)
} else {
None
};
let mut values = Vec::<u8>::with_capacity(size * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.0.low().to_be_bytes()[16 - size..];
values.extend_from_slice(bytes)
});
let array = FixedSizeBinaryArray::new(
DataType::FixedSizeBinary(size),
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
} else {
let size = 32;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
.unwrap();
let statistics = if options.write_statistics {
let stats =
fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size);
Some(stats)
} else {
None
};
let mut values = Vec::<u8>::with_capacity(size * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_be_bytes();
values.extend_from_slice(bytes)
});
let array = FixedSizeBinaryArray::new(
DataType::FixedSizeBinary(size),
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
}
}
other => Err(Error::NotYetImplemented(format!(
"Writing nested parquet pages for data type {other:?}"
))),
}
.map(Page::Data)
}
fn transverse_recursive<T, F: Fn(&DataType) -> T + Clone>(
data_type: &DataType,
map: F,
encodings: &mut Vec<T>,
) {
use crate::datatypes::PhysicalType::*;
match data_type.to_physical_type() {
Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
| Dictionary(_) | LargeUtf8 => encodings.push(map(data_type)),
List | FixedSizeList | LargeList => {
let a = data_type.to_logical_type();
if let DataType::List(inner) = a {
transverse_recursive(&inner.data_type, map, encodings)
} else if let DataType::LargeList(inner) = a {
transverse_recursive(&inner.data_type, map, encodings)
} else if let DataType::FixedSizeList(inner, _) = a {
transverse_recursive(&inner.data_type, map, encodings)
} else {
unreachable!()
}
}
Struct => {
if let DataType::Struct(fields) = data_type.to_logical_type() {
for field in fields {
transverse_recursive(&field.data_type, map.clone(), encodings)
}
} else {
unreachable!()
}
}
Map => {
if let DataType::Map(field, _) = data_type.to_logical_type() {
if let DataType::Struct(fields) = field.data_type.to_logical_type() {
for field in fields {
transverse_recursive(&field.data_type, map.clone(), encodings)
}
} else {
unreachable!()
}
} else {
unreachable!()
}
}
Union => todo!(),
}
}
pub fn transverse<T, F: Fn(&DataType) -> T + Clone>(data_type: &DataType, map: F) -> Vec<T> {
let mut encodings = vec![];
transverse_recursive(data_type, map, &mut encodings);
encodings
}