#![cfg_attr(deny_warnings, deny(warnings))]
#![cfg_attr(deny_warnings, deny(missing_docs))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use std::borrow::Cow;
use std::ffi::{CStr, CString};
use std::io::Read;
use std::mem::MaybeUninit;
use std::num::NonZeroUsize;
pub const BLOSC_C_VERSION: &str = {
let version = match CStr::from_bytes_until_nul(blosc_sys::BLOSC_VERSION_STRING) {
Ok(v) => v,
Err(_) => unreachable!(),
};
match version.to_str() {
Ok(s) => s,
Err(_) => unreachable!(),
}
};
pub struct Encoder {
level: Level,
shuffle: Shuffle,
typesize: usize,
compressor: CompressAlgo,
blocksize: Option<NonZeroUsize>,
numinternalthreads: u32,
}
impl Default for Encoder {
fn default() -> Self {
Self::new(Level::new(9).unwrap())
}
}
impl Encoder {
pub fn new(level: Level) -> Self {
Self {
level,
shuffle: Shuffle::Byte,
typesize: 1,
compressor: CompressAlgo::Blosclz,
blocksize: None,
numinternalthreads: 1,
}
}
pub fn level(&mut self, level: Level) -> &mut Self {
self.level = level;
self
}
pub fn shuffle(&mut self, shuffle: Shuffle) -> &mut Self {
self.shuffle = shuffle;
self
}
pub fn typesize(&mut self, typesize: usize) -> &mut Self {
self.typesize = typesize;
self
}
pub fn compressor(&mut self, compressor: CompressAlgo) -> &mut Self {
self.compressor = compressor;
self
}
pub fn blocksize(&mut self, blocksize: Option<NonZeroUsize>) -> &mut Self {
self.blocksize = blocksize;
self
}
pub fn numinternalthreads(&mut self, numinternalthreads: u32) -> &mut Self {
self.numinternalthreads = numinternalthreads;
self
}
pub fn compress(&self, src: &[u8]) -> Result<Vec<u8>, CompressError> {
let dst_max_len = src.len() + blosc_sys::BLOSC_MAX_OVERHEAD as usize;
let mut dst = Vec::<MaybeUninit<u8>>::with_capacity(dst_max_len);
unsafe { dst.set_len(dst_max_len) };
let len = self.compress_into(src, dst.as_mut_slice())?;
assert!(len <= dst_max_len);
unsafe { dst.set_len(len) };
let vec = unsafe { std::mem::transmute::<Vec<MaybeUninit<u8>>, Vec<u8>>(dst) };
Ok(vec)
}
pub fn compress_into(
&self,
src: &[u8],
dst: &mut [MaybeUninit<u8>],
) -> Result<usize, CompressError> {
let status = unsafe {
blosc_sys::blosc_compress_ctx(
self.level.0 as i32 as std::ffi::c_int,
self.shuffle as u32 as std::ffi::c_int,
self.typesize,
src.len(),
src.as_ptr() as *const std::ffi::c_void,
dst.as_mut_ptr() as *mut std::ffi::c_void,
dst.len(),
self.compressor.as_ref().as_ptr(),
self.blocksize.map(|b| b.get()).unwrap_or(0),
self.numinternalthreads as std::ffi::c_int,
)
};
match status {
len if len > 0 => {
assert!(len as usize <= dst.len());
Ok(len as usize)
}
0 => Err(CompressError::DestinationBufferTooSmall),
_ => {
debug_assert!(status < 0);
Err(CompressError::InternalError(status))
}
}
}
}
#[derive(Debug)]
pub enum CompressError {
DestinationBufferTooSmall,
InternalError(i32),
}
impl std::fmt::Display for CompressError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CompressError::DestinationBufferTooSmall => {
f.write_str("destination buffer is too small")
}
CompressError::InternalError(status) => write!(f, "blosc internal error: {status}"),
}
}
}
impl std::error::Error for CompressError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Level(u32);
impl Level {
pub fn new(level: u32) -> Option<Self> {
(0..=9).contains(&level).then_some(Self(level))
}
}
impl TryFrom<u32> for Level {
type Error = ();
fn try_from(value: u32) -> Result<Self, Self::Error> {
Self::new(value).ok_or(())
}
}
impl From<Level> for u32 {
fn from(level: Level) -> Self {
level.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u32)]
pub enum Shuffle {
None = blosc_sys::BLOSC_NOSHUFFLE,
Byte = blosc_sys::BLOSC_SHUFFLE,
Bit = blosc_sys::BLOSC_BITSHUFFLE,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[allow(missing_docs)]
pub enum CompressAlgo {
Blosclz,
Lz4,
Lz4hc,
Zlib,
Zstd,
Other(CString),
}
impl AsRef<CStr> for CompressAlgo {
fn as_ref(&self) -> &CStr {
match self {
CompressAlgo::Blosclz => c"blosclz",
CompressAlgo::Lz4 => c"lz4",
CompressAlgo::Lz4hc => c"lz4hc",
CompressAlgo::Zlib => c"zlib",
CompressAlgo::Zstd => c"zstd",
CompressAlgo::Other(c) => c.as_ref(),
}
}
}
pub struct Decoder<'a> {
src: Cow<'a, [u8]>,
typesize: usize,
dst_len: usize,
}
impl<'a> Decoder<'a> {
pub fn from_reader(reader: &mut impl Read) -> Result<Self, DecompressError> {
let mut header = [MaybeUninit::<u8>::uninit(); blosc_sys::BLOSC_MIN_HEADER_LENGTH as usize];
reader.read_exact(unsafe {
std::mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(&mut header)
})?;
let mut nbytes = MaybeUninit::uninit();
let mut cbytes = MaybeUninit::uninit();
let mut blocksize = MaybeUninit::uninit();
unsafe {
blosc_sys::blosc_cbuffer_sizes(
header.as_ptr() as *const std::ffi::c_void,
nbytes.as_mut_ptr(),
cbytes.as_mut_ptr(),
blocksize.as_mut_ptr(),
)
};
let cbytes = unsafe { cbytes.assume_init() };
if cbytes == 0 {
return Err(DecompressError::DecompressingError);
}
let mut src = Vec::<MaybeUninit<u8>>::with_capacity(cbytes);
unsafe { src.set_len(cbytes) };
src[..blosc_sys::BLOSC_MIN_HEADER_LENGTH as usize]
.copy_from_slice(&header[..blosc_sys::BLOSC_MIN_HEADER_LENGTH as usize]);
reader.read_exact(unsafe {
std::mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(
&mut src[blosc_sys::BLOSC_MIN_HEADER_LENGTH as usize..],
)
})?;
let src = unsafe { std::mem::transmute::<Vec<MaybeUninit<u8>>, Vec<u8>>(src) };
Self::new(src)
}
pub fn new(src: impl Into<Cow<'a, [u8]>>) -> Result<Self, DecompressError> {
let src: Cow<'a, [u8]> = src.into();
let mut dst_len = 0;
let status = unsafe {
blosc_sys::blosc_cbuffer_validate(
src.as_ptr() as *const std::ffi::c_void,
src.len(),
&mut dst_len,
)
};
if status < 0 {
return Err(DecompressError::DecompressingError);
}
let mut typesize = MaybeUninit::<usize>::uninit();
let mut flags = MaybeUninit::<std::ffi::c_int>::uninit();
unsafe {
blosc_sys::blosc_cbuffer_metainfo(
src.as_ptr() as *const std::ffi::c_void,
typesize.as_mut_ptr(),
flags.as_mut_ptr(),
)
};
let typesize = unsafe { typesize.assume_init() };
Ok(Self {
src,
typesize,
dst_len,
})
}
pub fn decompress(&self, numinternalthreads: u32) -> Result<Vec<u8>, DecompressError> {
let mut dst = Vec::<MaybeUninit<u8>>::with_capacity(self.dst_len);
unsafe { dst.set_len(self.dst_len) };
let len = self.decompress_into(dst.as_mut_slice(), numinternalthreads)?;
assert!(len <= self.dst_len);
unsafe { dst.set_len(len) };
let vec = unsafe { std::mem::transmute::<Vec<MaybeUninit<u8>>, Vec<u8>>(dst) };
Ok(vec)
}
pub fn decompress_into(
&self,
dst: &mut [MaybeUninit<u8>],
numinternalthreads: u32,
) -> Result<usize, DecompressError> {
if dst.len() < self.dst_len {
return Err(DecompressError::DestinationBufferTooSmall);
}
let status = unsafe {
blosc_sys::blosc_decompress_ctx(
self.src.as_ptr() as *const std::ffi::c_void,
dst.as_mut_ptr() as *mut std::ffi::c_void,
dst.len(),
numinternalthreads as std::ffi::c_int,
)
};
match status {
len if len >= 0 => {
assert!(len as usize <= self.dst_len);
Ok(len as usize)
}
_ => Err(DecompressError::InternalError(status)),
}
}
pub fn as_buf(&self) -> &[u8] {
&self.src
}
pub fn into_buf(self) -> Cow<'a, [u8]> {
self.src
}
pub fn item(&self, idx: usize) -> Result<Vec<u8>, DecompressError> {
self.items(idx..idx + 1)
}
pub fn item_into(
&self,
idx: usize,
dst: &mut [MaybeUninit<u8>],
) -> Result<usize, DecompressError> {
self.items_into(idx..idx + 1, dst)
}
pub fn items(&self, idx: std::ops::Range<usize>) -> Result<Vec<u8>, DecompressError> {
let mut dst = vec![MaybeUninit::<u8>::uninit(); self.typesize * idx.len()];
self.items_into(idx, &mut dst)?;
Ok(unsafe { std::mem::transmute::<Vec<MaybeUninit<u8>>, Vec<u8>>(dst) })
}
pub fn items_into(
&self,
idx: std::ops::Range<usize>,
dst: &mut [MaybeUninit<u8>],
) -> Result<usize, DecompressError> {
let required_len = self.typesize * idx.len();
if dst.len() < required_len {
return Err(DecompressError::DestinationBufferTooSmall);
}
let status = unsafe {
blosc_sys::blosc_getitem(
self.src.as_ptr() as *const std::ffi::c_void,
idx.start as std::ffi::c_int,
idx.len() as std::ffi::c_int,
dst.as_mut_ptr() as *mut std::ffi::c_void,
)
};
let dst_len = if status < 0 {
return Err(DecompressError::DecompressingError);
} else {
status as usize
};
if dst_len != required_len {
return Err(DecompressError::DecompressingError);
}
Ok(dst_len)
}
}
#[derive(Debug)]
pub enum DecompressError {
DestinationBufferTooSmall,
DecompressingError,
InternalError(i32),
IoError(std::io::Error),
}
impl From<std::io::Error> for DecompressError {
fn from(err: std::io::Error) -> Self {
DecompressError::IoError(err)
}
}
impl std::fmt::Display for DecompressError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DecompressError::DestinationBufferTooSmall => {
f.write_str("destination buffer is too small")
}
DecompressError::DecompressingError => f.write_str("failed to decompress the data"),
DecompressError::InternalError(status) => write!(f, "blosc internal error: {status}"),
DecompressError::IoError(err) => write!(f, "I/O error: {err}"),
}
}
}
impl std::error::Error for DecompressError {}
#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use crate::{CompressAlgo, Level, Shuffle};
#[test]
fn round_trip() {
let mut rand = StdRng::seed_from_u64(0xb1ba0c326dc4dbba);
for _ in 0..100 {
let src_len = {
let max_lens = [0x1, 0x10, 0x100, 0x1000, 0x10000, 0x100000];
let max_len = max_lens[rand.random_range(0..max_lens.len())];
rand.random_range(0..=max_len)
};
let src = (0..rand.random_range(0..=src_len))
.map(|_| rand.random_range(0..=255) as u8)
.collect::<Vec<u8>>();
let clevel: Level = rand.random_range(0..=9).try_into().unwrap();
let shuffle = {
let shuffles = [Shuffle::None, Shuffle::Byte, Shuffle::Bit];
shuffles[rand.random_range(0..shuffles.len())]
};
let typesize = (1..=8)
.map(|i| rand.random_range(1..=(1 << (8 - i))))
.find(|&ts| src.len() % ts == 0)
.unwrap();
let compressor = {
let compressors = [
CompressAlgo::Blosclz,
CompressAlgo::Lz4,
CompressAlgo::Lz4hc,
CompressAlgo::Zlib,
CompressAlgo::Zstd,
];
compressors[rand.random_range(0..compressors.len())].clone()
};
let blocksize = {
let blocksizes = [
Option::<NonZeroUsize>::None,
Some(1.try_into().unwrap()),
Some(64.try_into().unwrap()),
Some(4096.try_into().unwrap()),
Some(262144.try_into().unwrap()),
Some(rand.random_range(1..4096).try_into().unwrap()),
];
blocksizes[rand.random_range(0..blocksizes.len())]
};
let numinternalthreads = rand.random_range(1..=16);
let compressed = crate::Encoder::new(clevel)
.shuffle(shuffle)
.typesize(typesize)
.compressor(compressor)
.blocksize(blocksize)
.numinternalthreads(numinternalthreads)
.compress(&src)
.unwrap();
let decoder = crate::Decoder::new(&compressed).unwrap();
let items_num = src.len() / typesize;
if items_num > 0 {
for _ in 0..10 {
let idx = rand.random_range(0..items_num);
let item = decoder.item(idx).unwrap();
assert_eq!(item, src[idx * typesize..(idx + 1) * typesize]);
}
for _ in 0..10 {
let start = rand.random_range(0..items_num);
let end = rand.random_range(start..items_num);
let items = decoder.items(start..end).unwrap();
assert_eq!(items, src[start * typesize..end * typesize]);
}
}
let decompressed = decoder.decompress(numinternalthreads).unwrap();
assert_eq!(src, decompressed);
}
}
}