use std::path::Path;
use std::sync::Arc;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::basic::{Compression, Encoding};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use parquet::schema::types::ColumnPath;
use crate::ColumnarError;
fn check_datatype(dt: &DataType) -> Result<(), ColumnarError> {
match dt {
DataType::Union(_, _) => Err(ColumnarError::UnsupportedType(format!(
"{:?} is not supported for Parquet serialisation",
dt
))),
DataType::List(inner)
| DataType::LargeList(inner)
| DataType::FixedSizeList(inner, _)
| DataType::ListView(inner)
| DataType::LargeListView(inner) => check_datatype(inner.data_type()),
DataType::Map(inner, _) => check_datatype(inner.data_type()),
DataType::Struct(fields) => {
for f in fields {
check_datatype(f.data_type())?;
}
Ok(())
}
DataType::Dictionary(_, value_type) => check_datatype(value_type),
DataType::RunEndEncoded(_, values) => check_datatype(values.data_type()),
_ => Ok(()),
}
}
pub(crate) fn validate_schema_for_write(schema: &Schema) -> Result<(), ColumnarError> {
for field in schema.fields() {
check_datatype(field.data_type())?;
}
Ok(())
}
#[derive(Debug, Clone, Default)]
pub struct WriterConfig {
pub max_row_group_size: Option<usize>,
}
pub(crate) fn build_writer_props(schema: &Schema) -> WriterProperties {
build_writer_props_with_config(schema, &WriterConfig::default())
}
pub(crate) fn build_writer_props_with_config(
schema: &Schema,
config: &WriterConfig,
) -> WriterProperties {
let mut builder = WriterProperties::builder()
.set_compression(Compression::UNCOMPRESSED)
.set_dictionary_enabled(true)
.set_statistics_enabled(EnabledStatistics::Page);
if config.max_row_group_size.is_some() {
builder = builder.set_max_row_group_row_count(config.max_row_group_size);
}
for field in schema.fields() {
let col = ColumnPath::from(field.name().as_str());
match field.data_type() {
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Duration(_) => {
builder = builder.set_column_encoding(col, Encoding::DELTA_BINARY_PACKED);
}
DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary
| DataType::Utf8View
| DataType::BinaryView => {
builder = builder.set_column_encoding(col, Encoding::DELTA_LENGTH_BYTE_ARRAY);
}
_ => {}
}
}
builder.build()
}
pub(crate) fn write_batches_to_bytes_with_config(
schema: Arc<Schema>,
batches: &[RecordBatch],
config: &WriterConfig,
) -> Result<Vec<u8>, ColumnarError> {
validate_schema_for_write(&schema)?;
let props = build_writer_props_with_config(&schema, config);
let mut buf = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(props))?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
Ok(buf)
}
pub(crate) fn write_batches_with_config(
path: &Path,
schema: Arc<Schema>,
batches: &[RecordBatch],
config: &WriterConfig,
) -> Result<(), ColumnarError> {
validate_schema_for_write(&schema)?;
let props = build_writer_props_with_config(&schema, config);
let file = std::fs::File::create(path)?;
let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
Ok(())
}
pub(crate) fn write_batches(
path: &Path,
schema: Arc<Schema>,
batches: &[RecordBatch],
) -> Result<(), ColumnarError> {
validate_schema_for_write(&schema)?;
let props = build_writer_props(&schema);
let file = std::fs::File::create(path)?;
let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
Ok(())
}
pub(crate) fn write_batches_to_bytes(
schema: Arc<Schema>,
batches: &[RecordBatch],
) -> Result<Vec<u8>, ColumnarError> {
validate_schema_for_write(&schema)?;
let props = build_writer_props(&schema);
let mut buf = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(props))?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
Ok(buf)
}