use crate::errors::AvroError;
use arrow_schema::ArrowError;
#[cfg(any(
feature = "deflate",
feature = "zstd",
feature = "bzip2",
feature = "xz"
))]
use std::io::{Read, Write};
pub const CODEC_METADATA_KEY: &str = "avro.codec";
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum CompressionCodec {
Deflate,
Snappy,
ZStandard,
Bzip2,
Xz,
}
impl CompressionCodec {
#[allow(unused_variables)]
pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, AvroError> {
match self {
#[cfg(feature = "deflate")]
CompressionCodec::Deflate => {
let mut decoder = flate2::read::DeflateDecoder::new(block);
let mut out = Vec::new();
decoder.read_to_end(&mut out)?;
Ok(out)
}
#[cfg(not(feature = "deflate"))]
CompressionCodec::Deflate => Err(AvroError::ParseError(
"Deflate codec requires deflate feature".to_string(),
)),
#[cfg(feature = "snappy")]
CompressionCodec::Snappy => {
let crc = &block[block.len() - 4..];
let block = &block[..block.len() - 4];
let mut decoder = snap::raw::Decoder::new();
let decoded = decoder
.decompress_vec(block)
.map_err(|e| AvroError::External(Box::new(e)))?;
let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
return Err(AvroError::ParseError("Snappy CRC mismatch".to_string()));
}
Ok(decoded)
}
#[cfg(not(feature = "snappy"))]
CompressionCodec::Snappy => Err(AvroError::ParseError(
"Snappy codec requires snappy feature".to_string(),
)),
#[cfg(feature = "zstd")]
CompressionCodec::ZStandard => {
let mut decoder = zstd::Decoder::new(block)?;
let mut out = Vec::new();
decoder
.read_to_end(&mut out)
.map_err(|e| AvroError::External(Box::new(e)))?;
Ok(out)
}
#[cfg(not(feature = "zstd"))]
CompressionCodec::ZStandard => Err(AvroError::ParseError(
"ZStandard codec requires zstd feature".to_string(),
)),
#[cfg(feature = "bzip2")]
CompressionCodec::Bzip2 => {
let mut decoder = bzip2::read::BzDecoder::new(block);
let mut out = Vec::new();
decoder
.read_to_end(&mut out)
.map_err(|e| AvroError::External(Box::new(e)))?;
Ok(out)
}
#[cfg(not(feature = "bzip2"))]
CompressionCodec::Bzip2 => Err(AvroError::ParseError(
"Bzip2 codec requires bzip2 feature".to_string(),
)),
#[cfg(feature = "xz")]
CompressionCodec::Xz => {
let mut decoder = xz::read::XzDecoder::new(block);
let mut out = Vec::new();
decoder
.read_to_end(&mut out)
.map_err(|e| AvroError::External(Box::new(e)))?;
Ok(out)
}
#[cfg(not(feature = "xz"))]
CompressionCodec::Xz => Err(AvroError::ParseError(
"XZ codec requires xz feature".to_string(),
)),
}
}
#[allow(unused_variables)]
pub(crate) fn compress(&self, data: &[u8]) -> Result<Vec<u8>, ArrowError> {
match self {
#[cfg(feature = "deflate")]
CompressionCodec::Deflate => {
let mut encoder =
flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default());
encoder.write_all(data)?;
let compressed = encoder.finish()?;
Ok(compressed)
}
#[cfg(not(feature = "deflate"))]
CompressionCodec::Deflate => Err(ArrowError::ParseError(
"Deflate codec requires deflate feature".to_string(),
)),
#[cfg(feature = "snappy")]
CompressionCodec::Snappy => {
let mut encoder = snap::raw::Encoder::new();
let mut compressed = encoder
.compress_vec(data)
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
let crc_val = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(data);
compressed.extend_from_slice(&crc_val.to_be_bytes());
Ok(compressed)
}
#[cfg(not(feature = "snappy"))]
CompressionCodec::Snappy => Err(ArrowError::ParseError(
"Snappy codec requires snappy feature".to_string(),
)),
#[cfg(feature = "zstd")]
CompressionCodec::ZStandard => {
let mut encoder = zstd::Encoder::new(Vec::new(), 0)
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
encoder.write_all(data)?;
let compressed = encoder
.finish()
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
Ok(compressed)
}
#[cfg(not(feature = "zstd"))]
CompressionCodec::ZStandard => Err(ArrowError::ParseError(
"ZStandard codec requires zstd feature".to_string(),
)),
#[cfg(feature = "bzip2")]
CompressionCodec::Bzip2 => {
let mut encoder =
bzip2::write::BzEncoder::new(Vec::new(), bzip2::Compression::default());
encoder.write_all(data)?;
let compressed = encoder.finish()?;
Ok(compressed)
}
#[cfg(not(feature = "bzip2"))]
CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
"Bzip2 codec requires bzip2 feature".to_string(),
)),
#[cfg(feature = "xz")]
CompressionCodec::Xz => {
let mut encoder = xz::write::XzEncoder::new(Vec::new(), 6);
encoder.write_all(data)?;
let compressed = encoder.finish()?;
Ok(compressed)
}
#[cfg(not(feature = "xz"))]
CompressionCodec::Xz => Err(ArrowError::ParseError(
"XZ codec requires xz feature".to_string(),
)),
}
}
}