#[cfg_attr(
feature = "experimental",
doc = r##"
# Example
```no_run
use parquet::{basic::Compression, compression::{create_codec, CodecOptionsBuilder}};
let codec_options = CodecOptionsBuilder::default()
.set_backward_compatible_lz4(false)
.build();
let mut codec = match create_codec(Compression::SNAPPY, &codec_options) {
Ok(Some(codec)) => codec,
_ => panic!(),
};
let data = vec![b'p', b'a', b'r', b'q', b'u', b'e', b't'];
let mut compressed = vec![];
codec.compress(&data[..], &mut compressed).unwrap();
let mut output = vec![];
codec.decompress(&compressed[..], &mut output, None).unwrap();
assert_eq!(output, data);
```
"##
)]
use crate::basic::Compression as CodecType;
use crate::errors::{ParquetError, Result};
pub trait Codec: Send {
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
uncompress_size: Option<usize>,
) -> Result<usize>;
}
#[derive(Debug, PartialEq, Eq)]
pub struct CodecOptions {
backward_compatible_lz4: bool,
}
impl Default for CodecOptions {
fn default() -> Self {
CodecOptionsBuilder::default().build()
}
}
pub struct CodecOptionsBuilder {
backward_compatible_lz4: bool,
}
impl Default for CodecOptionsBuilder {
fn default() -> Self {
Self {
backward_compatible_lz4: true,
}
}
}
impl CodecOptionsBuilder {
pub fn set_backward_compatible_lz4(mut self, value: bool) -> CodecOptionsBuilder {
self.backward_compatible_lz4 = value;
self
}
pub fn build(self) -> CodecOptions {
CodecOptions {
backward_compatible_lz4: self.backward_compatible_lz4,
}
}
}
pub fn create_codec(
codec: CodecType,
_options: &CodecOptions,
) -> Result<Option<Box<dyn Codec>>> {
match codec {
#[cfg(any(feature = "brotli", test))]
CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))),
#[cfg(any(feature = "flate2", test))]
CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))),
#[cfg(any(feature = "snap", test))]
CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))),
#[cfg(any(feature = "lz4", test))]
CodecType::LZ4 => Ok(Some(Box::new(LZ4HadoopCodec::new(
_options.backward_compatible_lz4,
)))),
#[cfg(any(feature = "zstd", test))]
CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
#[cfg(any(feature = "lz4", test))]
CodecType::LZ4_RAW => Ok(Some(Box::new(LZ4RawCodec::new()))),
CodecType::UNCOMPRESSED => Ok(None),
_ => Err(nyi_err!("The codec type {} is not supported yet", codec)),
}
}
#[cfg(any(feature = "snap", test))]
mod snappy_codec {
use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder};
use crate::compression::Codec;
use crate::errors::Result;
pub struct SnappyCodec {
decoder: Decoder,
encoder: Encoder,
}
impl SnappyCodec {
pub(crate) fn new() -> Self {
Self {
decoder: Decoder::new(),
encoder: Encoder::new(),
}
}
}
impl Codec for SnappyCodec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
uncompress_size: Option<usize>,
) -> Result<usize> {
let len = match uncompress_size {
Some(size) => size,
None => decompress_len(input_buf)?,
};
let offset = output_buf.len();
output_buf.resize(offset + len, 0);
self.decoder
.decompress(input_buf, &mut output_buf[offset..])
.map_err(|e| e.into())
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let output_buf_len = output_buf.len();
let required_len = max_compress_len(input_buf.len());
output_buf.resize(output_buf_len + required_len, 0);
let n = self
.encoder
.compress(input_buf, &mut output_buf[output_buf_len..])?;
output_buf.truncate(output_buf_len + n);
Ok(())
}
}
}
#[cfg(any(feature = "snap", test))]
pub use snappy_codec::*;
#[cfg(any(feature = "flate2", test))]
mod gzip_codec {
use std::io::{Read, Write};
use flate2::{read, write, Compression};
use crate::compression::Codec;
use crate::errors::Result;
pub struct GZipCodec {}
impl GZipCodec {
pub(crate) fn new() -> Self {
Self {}
}
}
impl Codec for GZipCodec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
_uncompress_size: Option<usize>,
) -> Result<usize> {
let mut decoder = read::GzDecoder::new(input_buf);
decoder.read_to_end(output_buf).map_err(|e| e.into())
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = write::GzEncoder::new(output_buf, Compression::default());
encoder.write_all(input_buf)?;
encoder.try_finish().map_err(|e| e.into())
}
}
}
#[cfg(any(feature = "flate2", test))]
pub use gzip_codec::*;
#[cfg(any(feature = "brotli", test))]
mod brotli_codec {
use std::io::{Read, Write};
use crate::compression::Codec;
use crate::errors::Result;
const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22;
pub struct BrotliCodec {}
impl BrotliCodec {
pub(crate) fn new() -> Self {
Self {}
}
}
impl Codec for BrotliCodec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
uncompress_size: Option<usize>,
) -> Result<usize> {
let buffer_size = uncompress_size.unwrap_or(BROTLI_DEFAULT_BUFFER_SIZE);
brotli::Decompressor::new(input_buf, buffer_size)
.read_to_end(output_buf)
.map_err(|e| e.into())
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = brotli::CompressorWriter::new(
output_buf,
BROTLI_DEFAULT_BUFFER_SIZE,
BROTLI_DEFAULT_COMPRESSION_QUALITY,
BROTLI_DEFAULT_LG_WINDOW_SIZE,
);
encoder.write_all(input_buf)?;
encoder.flush().map_err(|e| e.into())
}
}
}
#[cfg(any(feature = "brotli", test))]
pub use brotli_codec::*;
#[cfg(any(feature = "lz4", test))]
mod lz4_codec {
use std::io::{Read, Write};
use crate::compression::Codec;
use crate::errors::Result;
const LZ4_BUFFER_SIZE: usize = 4096;
pub struct LZ4Codec {}
impl LZ4Codec {
pub(crate) fn new() -> Self {
Self {}
}
}
impl Codec for LZ4Codec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
_uncompress_size: Option<usize>,
) -> Result<usize> {
let mut decoder = lz4::Decoder::new(input_buf)?;
let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
let mut total_len = 0;
loop {
let len = decoder.read(&mut buffer)?;
if len == 0 {
break;
}
total_len += len;
output_buf.write_all(&buffer[0..len])?;
}
Ok(total_len)
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?;
let mut from = 0;
loop {
let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len());
encoder.write_all(&input_buf[from..to])?;
from += LZ4_BUFFER_SIZE;
if from >= input_buf.len() {
break;
}
}
encoder.finish().1.map_err(|e| e.into())
}
}
}
#[cfg(any(feature = "lz4", test))]
pub use lz4_codec::*;
#[cfg(any(feature = "zstd", test))]
mod zstd_codec {
use std::io::{self, Write};
use crate::compression::Codec;
use crate::errors::Result;
pub struct ZSTDCodec {}
impl ZSTDCodec {
pub(crate) fn new() -> Self {
Self {}
}
}
const ZSTD_COMPRESSION_LEVEL: i32 = 1;
impl Codec for ZSTDCodec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
_uncompress_size: Option<usize>,
) -> Result<usize> {
let mut decoder = zstd::Decoder::new(input_buf)?;
match io::copy(&mut decoder, output_buf) {
Ok(n) => Ok(n as usize),
Err(e) => Err(e.into()),
}
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = zstd::Encoder::new(output_buf, ZSTD_COMPRESSION_LEVEL)?;
encoder.write_all(input_buf)?;
match encoder.finish() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
}
}
#[cfg(any(feature = "zstd", test))]
pub use zstd_codec::*;
#[cfg(any(feature = "lz4", test))]
mod lz4_raw_codec {
use crate::compression::Codec;
use crate::errors::ParquetError;
use crate::errors::Result;
pub struct LZ4RawCodec {}
impl LZ4RawCodec {
pub(crate) fn new() -> Self {
Self {}
}
}
impl Codec for LZ4RawCodec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
uncompress_size: Option<usize>,
) -> Result<usize> {
let offset = output_buf.len();
let required_len = match uncompress_size {
Some(uncompress_size) => uncompress_size,
None => {
return Err(ParquetError::General(
"LZ4RawCodec unsupported without uncompress_size".into(),
))
}
};
output_buf.resize(offset + required_len, 0);
match lz4::block::decompress_to_buffer(
input_buf,
Some(required_len.try_into().unwrap()),
&mut output_buf[offset..],
) {
Ok(n) => {
if n != required_len {
return Err(ParquetError::General(
"LZ4RawCodec uncompress_size is not the expected one".into(),
));
}
Ok(n)
}
Err(e) => Err(e.into()),
}
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let offset = output_buf.len();
let required_len = lz4::block::compress_bound(input_buf.len())?;
output_buf.resize(offset + required_len, 0);
match lz4::block::compress_to_buffer(
input_buf,
None,
false,
&mut output_buf[offset..],
) {
Ok(n) => {
output_buf.truncate(offset + n);
Ok(())
}
Err(e) => Err(e.into()),
}
}
}
}
#[cfg(any(feature = "lz4", test))]
pub use lz4_raw_codec::*;
#[cfg(any(feature = "lz4", test))]
mod lz4_hadoop_codec {
use crate::compression::lz4_codec::LZ4Codec;
use crate::compression::lz4_raw_codec::LZ4RawCodec;
use crate::compression::Codec;
use crate::errors::{ParquetError, Result};
use std::io;
const SIZE_U32: usize = std::mem::size_of::<u32>();
const PREFIX_LEN: usize = SIZE_U32 * 2;
pub struct LZ4HadoopCodec {
backward_compatible_lz4: bool,
}
impl LZ4HadoopCodec {
pub(crate) fn new(backward_compatible_lz4: bool) -> Self {
Self {
backward_compatible_lz4,
}
}
}
fn try_decompress_hadoop(
input_buf: &[u8],
output_buf: &mut [u8],
) -> io::Result<usize> {
let mut input_len = input_buf.len();
let mut input = input_buf;
let mut read_bytes = 0;
let mut output_len = output_buf.len();
let mut output: &mut [u8] = output_buf;
while input_len >= PREFIX_LEN {
let mut bytes = [0; SIZE_U32];
bytes.copy_from_slice(&input[0..4]);
let expected_decompressed_size = u32::from_be_bytes(bytes);
let mut bytes = [0; SIZE_U32];
bytes.copy_from_slice(&input[4..8]);
let expected_compressed_size = u32::from_be_bytes(bytes);
input = &input[PREFIX_LEN..];
input_len -= PREFIX_LEN;
if input_len < expected_compressed_size as usize {
return Err(io::Error::new(
io::ErrorKind::Other,
"Not enough bytes for Hadoop frame",
));
}
if output_len < expected_decompressed_size as usize {
return Err(io::Error::new(
io::ErrorKind::Other,
"Not enough bytes to hold advertised output",
));
}
let decompressed_size = lz4::block::decompress_to_buffer(
&input[..expected_compressed_size as usize],
Some(output_len as i32),
output,
)?;
if decompressed_size != expected_decompressed_size as usize {
return Err(io::Error::new(
io::ErrorKind::Other,
"Unexpected decompressed size",
));
}
input_len -= expected_compressed_size as usize;
output_len -= expected_decompressed_size as usize;
read_bytes += expected_decompressed_size as usize;
if input_len > expected_compressed_size as usize {
input = &input[expected_compressed_size as usize..];
output = &mut output[expected_decompressed_size as usize..];
} else {
break;
}
}
if input_len == 0 {
Ok(read_bytes)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Not all input are consumed",
))
}
}
impl Codec for LZ4HadoopCodec {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
uncompress_size: Option<usize>,
) -> Result<usize> {
let output_len = output_buf.len();
let required_len = match uncompress_size {
Some(n) => n,
None => {
return Err(ParquetError::General(
"LZ4HadoopCodec unsupported without uncompress_size".into(),
))
}
};
output_buf.resize(output_len + required_len, 0);
match try_decompress_hadoop(input_buf, &mut output_buf[output_len..]) {
Ok(n) => {
if n != required_len {
return Err(ParquetError::General(
"LZ4HadoopCodec uncompress_size is not the expected one"
.into(),
));
}
Ok(n)
}
Err(e) if !self.backward_compatible_lz4 => Err(e.into()),
Err(_) => {
output_buf.truncate(output_len);
match LZ4Codec::new().decompress(
input_buf,
output_buf,
uncompress_size,
) {
Ok(n) => Ok(n),
Err(_) => {
output_buf.truncate(output_len);
LZ4RawCodec::new().decompress(
input_buf,
output_buf,
uncompress_size,
)
}
}
}
}
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let offset = output_buf.len();
output_buf.resize(offset + PREFIX_LEN, 0);
LZ4RawCodec::new().compress(input_buf, output_buf)?;
let output_buf = &mut output_buf[offset..];
let compressed_size = output_buf.len() - PREFIX_LEN;
let compressed_size = compressed_size as u32;
let uncompressed_size = input_buf.len() as u32;
output_buf[..SIZE_U32].copy_from_slice(&uncompressed_size.to_be_bytes());
output_buf[SIZE_U32..PREFIX_LEN].copy_from_slice(&compressed_size.to_be_bytes());
Ok(())
}
}
}
#[cfg(any(feature = "lz4", test))]
pub use lz4_hadoop_codec::*;
#[cfg(test)]
mod tests {
use super::*;
use crate::util::test_common::rand_gen::random_bytes;
fn test_roundtrip(c: CodecType, data: &[u8], uncompress_size: Option<usize>) {
let codec_options = CodecOptionsBuilder::default()
.set_backward_compatible_lz4(false)
.build();
let mut c1 = create_codec(c, &codec_options).unwrap().unwrap();
let mut c2 = create_codec(c, &codec_options).unwrap().unwrap();
let mut compressed = Vec::new();
let mut decompressed = Vec::new();
c1.compress(data, &mut compressed)
.expect("Error when compressing");
let decompressed_size = c2
.decompress(compressed.as_slice(), &mut decompressed, uncompress_size)
.expect("Error when decompressing");
assert_eq!(data.len(), decompressed_size);
assert_eq!(data, decompressed.as_slice());
decompressed.clear();
compressed.clear();
c2.compress(data, &mut compressed)
.expect("Error when compressing");
let decompressed_size = c1
.decompress(compressed.as_slice(), &mut decompressed, uncompress_size)
.expect("Error when decompressing");
assert_eq!(data.len(), decompressed_size);
assert_eq!(data, decompressed.as_slice());
decompressed.clear();
compressed.clear();
let prefix = &[0xDE, 0xAD, 0xBE, 0xEF];
decompressed.extend_from_slice(prefix);
compressed.extend_from_slice(prefix);
c2.compress(data, &mut compressed)
.expect("Error when compressing");
assert_eq!(&compressed[..4], prefix);
let decompressed_size = c2
.decompress(&compressed[4..], &mut decompressed, uncompress_size)
.expect("Error when decompressing");
assert_eq!(data.len(), decompressed_size);
assert_eq!(data, &decompressed[4..]);
assert_eq!(&decompressed[..4], prefix);
}
fn test_codec_with_size(c: CodecType) {
let sizes = vec![100, 10000, 100000];
for size in sizes {
let data = random_bytes(size);
test_roundtrip(c, &data, Some(data.len()));
}
}
fn test_codec_without_size(c: CodecType) {
let sizes = vec![100, 10000, 100000];
for size in sizes {
let data = random_bytes(size);
test_roundtrip(c, &data, None);
}
}
#[test]
fn test_codec_snappy() {
test_codec_with_size(CodecType::SNAPPY);
test_codec_without_size(CodecType::SNAPPY);
}
#[test]
fn test_codec_gzip() {
test_codec_with_size(CodecType::GZIP);
test_codec_without_size(CodecType::GZIP);
}
#[test]
fn test_codec_brotli() {
test_codec_with_size(CodecType::BROTLI);
test_codec_without_size(CodecType::BROTLI);
}
#[test]
fn test_codec_lz4() {
test_codec_with_size(CodecType::LZ4);
}
#[test]
fn test_codec_zstd() {
test_codec_with_size(CodecType::ZSTD);
test_codec_without_size(CodecType::ZSTD);
}
#[test]
fn test_codec_lz4_raw() {
test_codec_with_size(CodecType::LZ4_RAW);
}
}