use crate::error::ActiveStorageError;
use crate::models;
use axum::body::Bytes;
use blusc::{blosc2_cbuffer_sizes, blosc2_decompress};
use flate2::read::GzDecoder;
use std::io::Read;
use zune_inflate::{DeflateDecoder, DeflateOptions};
pub fn decompress(
compression: models::Compression,
data: &Bytes,
) -> Result<Bytes, ActiveStorageError> {
match compression {
models::Compression::Blosc2 => decompress_blusc_blosc2(data),
models::Compression::Gzip => decompress_flate2_gzip(data),
models::Compression::Zlib => decompress_zune_zlib(data),
}
}
fn decompress_blusc_blosc2(data: &Bytes) -> Result<Bytes, ActiveStorageError> {
let (nbytes, _cbytes, _block_size) = blosc2_cbuffer_sizes(data);
if nbytes == 0 {
return Err(ActiveStorageError::DecompressionBlosc2 {
error: "invalid blosc2 compressed data",
});
}
let mut output = maligned::align_first::<u8, maligned::A8>(nbytes);
output.resize(nbytes, 0u8);
let written = blosc2_decompress(data, &mut output);
if written <= 0 {
return Err(ActiveStorageError::DecompressionBlosc2 {
error: "failed to decompress blosc2 data",
});
}
if written as usize != nbytes {
return Err(ActiveStorageError::DecompressionBlosc2 {
error: "unexpected decompressed size for blosc2 data",
});
}
Ok(output.into())
}
fn decompress_flate2_gzip(data: &Bytes) -> Result<Bytes, ActiveStorageError> {
let mut decoder = GzDecoder::<&[u8]>::new(data);
let mut buf = maligned::align_first::<u8, maligned::A8>(data.len());
decoder.read_to_end(&mut buf)?;
buf.shrink_to(0);
Ok(buf.into())
}
fn decompress_zune_zlib(data: &Bytes) -> Result<Bytes, ActiveStorageError> {
let options = DeflateOptions::default().set_size_hint(data.len());
let mut decoder = DeflateDecoder::new_with_options(data, options);
let data = decoder.decode_zlib()?;
Ok(data.into())
}
#[cfg(test)]
mod tests {
use super::*;
use blusc::{blosc2_compress, BLOSC2_MAX_OVERHEAD, BLOSC_SHUFFLE};
use flate2::read::{GzEncoder, ZlibEncoder};
use flate2::Compression;
use zune_inflate::errors::DecodeErrorStatus;
fn compress_blosc() -> Vec<u8> {
let input = b"hello world";
let mut compressed = vec![0u8; input.len() + BLOSC2_MAX_OVERHEAD];
let cbytes = blosc2_compress(5, BLOSC_SHUFFLE as i32, 1, input, &mut compressed);
assert!(cbytes > 0, "blosc2_compress failed, returned {}", cbytes);
assert!(
cbytes as usize <= compressed.len(),
"blosc2_compress returned size {cbytes} larger than destination buffer {}",
compressed.len()
);
compressed.truncate(cbytes as usize);
compressed
}
fn compress_gzip() -> Vec<u8> {
let mut result = Vec::<u8>::new();
let input = b"hello world";
let mut deflater = GzEncoder::new(&input[..], Compression::fast());
deflater.read_to_end(&mut result).unwrap();
result
}
fn compress_zlib() -> Vec<u8> {
let mut result = Vec::<u8>::new();
let input = b"hello world";
let mut deflater = ZlibEncoder::new(&input[..], Compression::fast());
deflater.read_to_end(&mut result).unwrap();
result
}
#[test]
fn test_decompress_blosc() {
let compressed = compress_blosc();
let result = decompress(models::Compression::Blosc2, &compressed.into()).unwrap();
assert_eq!(result, b"hello world".as_ref());
assert_eq!(result.as_ptr().align_offset(8), 0);
}
#[test]
fn test_decompress_gzip() {
let compressed = compress_gzip();
let result = decompress(models::Compression::Gzip, &compressed.into()).unwrap();
assert_eq!(result, b"hello world".as_ref());
assert_eq!(result.as_ptr().align_offset(8), 0);
}
#[test]
fn test_decompress_zlib() {
let compressed = compress_zlib();
let result = decompress(models::Compression::Zlib, &compressed.into()).unwrap();
assert_eq!(result, b"hello world".as_ref());
assert_eq!(result.as_ptr().align_offset(8), 0);
}
#[test]
fn test_decompress_invalid_blosc() {
let invalid = b"invalid format";
let err = decompress(models::Compression::Blosc2, &invalid.as_ref().into()).unwrap_err();
match err {
ActiveStorageError::DecompressionBlosc2 { error } => {
assert_eq!(error, "invalid blosc2 compressed data");
}
err => panic!("unexpected error {err}"),
}
}
#[test]
fn test_decompress_invalid_gzip() {
let invalid = b"invalid format";
let err = decompress(models::Compression::Gzip, &invalid.as_ref().into()).unwrap_err();
match err {
ActiveStorageError::DecompressionFlate2(io_err) => {
assert_eq!(io_err.kind(), std::io::ErrorKind::InvalidInput);
assert_eq!(io_err.to_string(), "invalid gzip header");
}
err => panic!("unexpected error {err}"),
}
}
#[test]
fn test_decompress_invalid_zlib() {
let invalid = b"invalid format";
let err = decompress(models::Compression::Zlib, &invalid.as_ref().into()).unwrap_err();
match err {
ActiveStorageError::DecompressionZune(zune_err) => match zune_err.error {
DecodeErrorStatus::GenericStr(message) => {
assert_eq!(message, "Unknown zlib compression method 9");
}
err => panic!("unexpected zune error {err:?}"),
},
err => panic!("unexpected error {err}"),
}
}
}