use crate::basic::Compression as CodecType;
use crate::errors::{ParquetError, Result};
pub trait Codec {
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>)
-> Result<usize>;
}
pub fn create_codec(codec: CodecType) -> Result<Option<Box<dyn Codec>>> {
match codec {
#[cfg(any(feature = "brotli", test))]
CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))),
#[cfg(any(feature = "flate2", test))]
CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))),
#[cfg(any(feature = "snap", test))]
CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))),
#[cfg(any(feature = "lz4", test))]
CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
#[cfg(any(feature = "zstd", test))]
CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
CodecType::UNCOMPRESSED => Ok(None),
_ => Err(nyi_err!("The codec type {} is not supported yet", codec)),
}
}
#[cfg(any(feature = "snap", test))]
mod snappy_codec {
use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder};
use crate::compression::Codec;
use crate::errors::Result;
pub struct SnappyCodec {
decoder: Decoder,
encoder: Encoder,
}
impl SnappyCodec {
pub(crate) fn new() -> Self {
Self {
decoder: Decoder::new(),
encoder: Encoder::new(),
}
}
}
impl Codec for SnappyCodec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
) -> Result<usize> {
let len = decompress_len(input_buf)?;
output_buf.resize(len, 0);
self.decoder
.decompress(input_buf, output_buf)
.map_err(|e| e.into())
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let output_buf_len = output_buf.len();
let required_len = max_compress_len(input_buf.len());
output_buf.resize(output_buf_len + required_len, 0);
let n = self
.encoder
.compress(input_buf, &mut output_buf[output_buf_len..])?;
output_buf.truncate(output_buf_len + n);
Ok(())
}
}
}
#[cfg(any(feature = "snap", test))]
pub use snappy_codec::*;
#[cfg(any(feature = "flate2", test))]
mod gzip_codec {
use std::io::{Read, Write};
use flate2::{read, write, Compression};
use crate::compression::Codec;
use crate::errors::Result;
pub struct GZipCodec {}
impl GZipCodec {
pub(crate) fn new() -> Self {
Self {}
}
}
impl Codec for GZipCodec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
) -> Result<usize> {
let mut decoder = read::GzDecoder::new(input_buf);
decoder.read_to_end(output_buf).map_err(|e| e.into())
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = write::GzEncoder::new(output_buf, Compression::default());
encoder.write_all(input_buf)?;
encoder.try_finish().map_err(|e| e.into())
}
}
}
#[cfg(any(feature = "flate2", test))]
pub use gzip_codec::*;
#[cfg(any(feature = "brotli", test))]
mod brotli_codec {
use std::io::{Read, Write};
use crate::compression::Codec;
use crate::errors::Result;
const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22;
pub struct BrotliCodec {}
impl BrotliCodec {
pub(crate) fn new() -> Self {
Self {}
}
}
impl Codec for BrotliCodec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
) -> Result<usize> {
brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
.read_to_end(output_buf)
.map_err(|e| e.into())
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = brotli::CompressorWriter::new(
output_buf,
BROTLI_DEFAULT_BUFFER_SIZE,
BROTLI_DEFAULT_COMPRESSION_QUALITY,
BROTLI_DEFAULT_LG_WINDOW_SIZE,
);
encoder.write_all(input_buf)?;
encoder.flush().map_err(|e| e.into())
}
}
}
#[cfg(any(feature = "brotli", test))]
pub use brotli_codec::*;
#[cfg(any(feature = "lz4", test))]
mod lz4_codec {
use std::io::{Read, Write};
use crate::compression::Codec;
use crate::errors::Result;
const LZ4_BUFFER_SIZE: usize = 4096;
pub struct LZ4Codec {}
impl LZ4Codec {
pub(crate) fn new() -> Self {
Self {}
}
}
impl Codec for LZ4Codec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
) -> Result<usize> {
let mut decoder = lz4::Decoder::new(input_buf)?;
let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
let mut total_len = 0;
loop {
let len = decoder.read(&mut buffer)?;
if len == 0 {
break;
}
total_len += len;
output_buf.write_all(&buffer[0..len])?;
}
Ok(total_len)
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?;
let mut from = 0;
loop {
let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len());
encoder.write_all(&input_buf[from..to])?;
from += LZ4_BUFFER_SIZE;
if from >= input_buf.len() {
break;
}
}
encoder.finish().1.map_err(|e| e.into())
}
}
}
#[cfg(any(feature = "lz4", test))]
pub use lz4_codec::*;
#[cfg(any(feature = "zstd", test))]
mod zstd_codec {
use std::io::{self, Write};
use crate::compression::Codec;
use crate::errors::Result;
pub struct ZSTDCodec {}
impl ZSTDCodec {
pub(crate) fn new() -> Self {
Self {}
}
}
const ZSTD_COMPRESSION_LEVEL: i32 = 1;
impl Codec for ZSTDCodec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
) -> Result<usize> {
let mut decoder = zstd::Decoder::new(input_buf)?;
match io::copy(&mut decoder, output_buf) {
Ok(n) => Ok(n as usize),
Err(e) => Err(e.into()),
}
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = zstd::Encoder::new(output_buf, ZSTD_COMPRESSION_LEVEL)?;
encoder.write_all(input_buf)?;
match encoder.finish() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
}
}
#[cfg(any(feature = "zstd", test))]
pub use zstd_codec::*;
#[cfg(test)]
mod tests {
use super::*;
use crate::util::test_common::*;
fn test_roundtrip(c: CodecType, data: &[u8]) {
let mut c1 = create_codec(c).unwrap().unwrap();
let mut c2 = create_codec(c).unwrap().unwrap();
let mut compressed = Vec::new();
let mut decompressed = Vec::new();
c1.compress(data, &mut compressed)
.expect("Error when compressing");
let mut decompressed_size = c2
.decompress(compressed.as_slice(), &mut decompressed)
.expect("Error when decompressing");
assert_eq!(data.len(), decompressed_size);
decompressed.truncate(decompressed_size);
assert_eq!(data, decompressed.as_slice());
compressed.clear();
c2.compress(data, &mut compressed)
.expect("Error when compressing");
decompressed_size = c1
.decompress(compressed.as_slice(), &mut decompressed)
.expect("Error when decompressing");
assert_eq!(data.len(), decompressed_size);
decompressed.truncate(decompressed_size);
assert_eq!(data, decompressed.as_slice());
}
fn test_codec(c: CodecType) {
let sizes = vec![100, 10000, 100000];
for size in sizes {
let data = random_bytes(size);
test_roundtrip(c, &data);
}
}
#[test]
fn test_codec_snappy() {
test_codec(CodecType::SNAPPY);
}
#[test]
fn test_codec_gzip() {
test_codec(CodecType::GZIP);
}
#[test]
fn test_codec_brotli() {
test_codec(CodecType::BROTLI);
}
#[test]
fn test_codec_lz4() {
test_codec(CodecType::LZ4);
}
#[test]
fn test_codec_zstd() {
test_codec(CodecType::ZSTD);
}
}