use parquet::basic::Compression;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CompressionAlgorithm {
None,
Snappy,
Lz4,
Zstd,
Brotli,
}
impl std::fmt::Display for CompressionAlgorithm {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::None => write!(f, "None"),
Self::Snappy => write!(f, "Snappy"),
Self::Lz4 => write!(f, "LZ4"),
Self::Zstd => write!(f, "ZSTD"),
Self::Brotli => write!(f, "Brotli"),
}
}
}
impl CompressionAlgorithm {
pub const ALL: &'static [CompressionAlgorithm] = &[
CompressionAlgorithm::None,
CompressionAlgorithm::Snappy,
CompressionAlgorithm::Lz4,
CompressionAlgorithm::Zstd,
CompressionAlgorithm::Brotli,
];
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CompressionConfig {
algorithm: CompressionAlgorithm,
level: Option<u32>,
}
impl Default for CompressionConfig {
fn default() -> Self {
Self {
algorithm: CompressionAlgorithm::Snappy,
level: None,
}
}
}
impl CompressionConfig {
pub fn new(algorithm: CompressionAlgorithm) -> Self {
Self {
algorithm,
level: None,
}
}
pub fn with_level(mut self, level: u32) -> Self {
self.level = Some(level);
self
}
pub fn algorithm(&self) -> CompressionAlgorithm {
self.algorithm
}
pub fn level(&self) -> Option<u32> {
self.level
}
pub fn to_parquet_compression(&self) -> Compression {
match self.algorithm {
CompressionAlgorithm::None => Compression::UNCOMPRESSED,
CompressionAlgorithm::Snappy => Compression::SNAPPY,
CompressionAlgorithm::Lz4 => Compression::LZ4,
CompressionAlgorithm::Zstd => {
let level = self.level.map(|l| l.clamp(1, 22) as i32);
match level {
Some(l) => Compression::ZSTD(parquet::basic::ZstdLevel::try_new(l).unwrap()),
None => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
}
}
CompressionAlgorithm::Brotli => {
let level = self.level.map(|l| l.clamp(0, 11));
match level {
Some(l) => {
Compression::BROTLI(parquet::basic::BrotliLevel::try_new(l).unwrap())
}
None => Compression::BROTLI(parquet::basic::BrotliLevel::default()),
}
}
}
}
pub fn none() -> Self {
Self::new(CompressionAlgorithm::None)
}
pub fn snappy() -> Self {
Self::new(CompressionAlgorithm::Snappy)
}
pub fn lz4() -> Self {
Self::new(CompressionAlgorithm::Lz4)
}
pub fn zstd() -> Self {
Self::new(CompressionAlgorithm::Zstd)
}
pub fn zstd_level(level: u32) -> Self {
Self::new(CompressionAlgorithm::Zstd).with_level(level)
}
pub fn brotli() -> Self {
Self::new(CompressionAlgorithm::Brotli)
}
pub fn brotli_level(level: u32) -> Self {
Self::new(CompressionAlgorithm::Brotli).with_level(level)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_is_snappy() {
let config = CompressionConfig::default();
assert_eq!(config.algorithm(), CompressionAlgorithm::Snappy);
assert_eq!(config.level(), None);
}
#[test]
fn parquet_compression_mapping() {
let c = CompressionConfig::none().to_parquet_compression();
assert_eq!(c, Compression::UNCOMPRESSED);
let c = CompressionConfig::snappy().to_parquet_compression();
assert_eq!(c, Compression::SNAPPY);
let c = CompressionConfig::lz4().to_parquet_compression();
assert_eq!(c, Compression::LZ4);
let c = CompressionConfig::zstd().to_parquet_compression();
matches!(c, Compression::ZSTD(_));
let c = CompressionConfig::zstd_level(9).to_parquet_compression();
matches!(c, Compression::ZSTD(_));
let c = CompressionConfig::brotli().to_parquet_compression();
matches!(c, Compression::BROTLI(_));
let c = CompressionConfig::brotli_level(11).to_parquet_compression();
matches!(c, Compression::BROTLI(_));
}
#[test]
fn level_clamping() {
let c = CompressionConfig::zstd_level(100).to_parquet_compression();
matches!(c, Compression::ZSTD(_));
let c = CompressionConfig::brotli_level(99).to_parquet_compression();
matches!(c, Compression::BROTLI(_));
}
#[test]
fn display_names() {
assert_eq!(format!("{}", CompressionAlgorithm::None), "None");
assert_eq!(format!("{}", CompressionAlgorithm::Snappy), "Snappy");
assert_eq!(format!("{}", CompressionAlgorithm::Lz4), "LZ4");
assert_eq!(format!("{}", CompressionAlgorithm::Zstd), "ZSTD");
assert_eq!(format!("{}", CompressionAlgorithm::Brotli), "Brotli");
}
#[test]
fn all_algorithms_count() {
assert_eq!(CompressionAlgorithm::ALL.len(), 5);
}
}