use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum CompressionCodec {
Uncompressed,
#[default]
Snappy,
Gzip,
Lz4,
Zstd,
Brotli,
Lz4Raw,
}
impl CompressionCodec {
pub fn to_parquet_compression(self) -> parquet::basic::Compression {
match self {
Self::Uncompressed => parquet::basic::Compression::UNCOMPRESSED,
Self::Snappy => parquet::basic::Compression::SNAPPY,
Self::Gzip => parquet::basic::Compression::GZIP(Default::default()),
Self::Lz4 => parquet::basic::Compression::LZ4,
Self::Zstd => parquet::basic::Compression::ZSTD(Default::default()),
Self::Brotli => parquet::basic::Compression::BROTLI(Default::default()),
Self::Lz4Raw => parquet::basic::Compression::LZ4_RAW,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum ParquetVersion {
V1,
#[default]
V2,
}
impl ParquetVersion {
pub fn to_parquet_version(self) -> parquet::file::properties::WriterVersion {
match self {
Self::V1 => parquet::file::properties::WriterVersion::PARQUET_1_0,
Self::V2 => parquet::file::properties::WriterVersion::PARQUET_2_0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParquetWriteOptions {
pub compression: CompressionCodec,
pub row_group_size: usize,
pub data_page_size: usize,
pub enable_dictionary: bool,
pub enable_statistics: bool,
pub version: ParquetVersion,
pub write_batch_size: usize,
}
impl Default for ParquetWriteOptions {
fn default() -> Self {
Self {
compression: CompressionCodec::default(),
row_group_size: 1024 * 1024, data_page_size: 1024 * 1024, enable_dictionary: true,
enable_statistics: true,
version: ParquetVersion::default(),
write_batch_size: 1024,
}
}
}
impl ParquetWriteOptions {
pub fn with_compression(compression: CompressionCodec) -> Self {
Self {
compression,
..Default::default()
}
}
pub fn with_row_group_size(mut self, size: usize) -> Self {
self.row_group_size = size;
self
}
pub fn with_data_page_size(mut self, size: usize) -> Self {
self.data_page_size = size;
self
}
pub fn with_dictionary(mut self, enable: bool) -> Self {
self.enable_dictionary = enable;
self
}
pub fn with_statistics(mut self, enable: bool) -> Self {
self.enable_statistics = enable;
self
}
pub fn with_version(mut self, version: ParquetVersion) -> Self {
self.version = version;
self
}
pub fn to_writer_properties(&self) -> parquet::file::properties::WriterProperties {
parquet::file::properties::WriterProperties::builder()
.set_compression(self.compression.to_parquet_compression())
.set_max_row_group_row_count(Some(self.row_group_size))
.set_data_page_size_limit(self.data_page_size)
.set_dictionary_enabled(self.enable_dictionary)
.set_statistics_enabled(if self.enable_statistics {
parquet::file::properties::EnabledStatistics::Page
} else {
parquet::file::properties::EnabledStatistics::None
})
.set_writer_version(self.version.to_parquet_version())
.set_write_batch_size(self.write_batch_size)
.build()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_compression_codec_conversion() {
let codecs = [
CompressionCodec::Uncompressed,
CompressionCodec::Snappy,
CompressionCodec::Gzip,
CompressionCodec::Lz4,
CompressionCodec::Brotli,
];
for codec in codecs {
let parquet_compression = codec.to_parquet_compression();
assert!(matches!(
parquet_compression,
parquet::basic::Compression::UNCOMPRESSED
| parquet::basic::Compression::SNAPPY
| parquet::basic::Compression::GZIP(_)
| parquet::basic::Compression::LZ4
| parquet::basic::Compression::BROTLI(_)
));
}
}
#[test]
fn test_default_options() {
let options = ParquetWriteOptions::default();
assert_eq!(options.compression, CompressionCodec::Snappy);
assert_eq!(options.version, ParquetVersion::V2);
assert!(options.enable_dictionary);
assert!(options.enable_statistics);
}
#[test]
fn test_builder_pattern() {
let options = ParquetWriteOptions::with_compression(CompressionCodec::Brotli)
.with_row_group_size(50000)
.with_dictionary(false);
assert_eq!(options.compression, CompressionCodec::Brotli);
assert_eq!(options.row_group_size, 50000);
assert!(!options.enable_dictionary);
}
}