use std::sync::Once;
use blosc2::chunk::SChunk;
use blosc2::{CParams, CompressAlgo, DParams};
use super::{CompressResult, CompressionError, Compressor};
use crate::pipeline::Blosc2Codec;
pub const DEFAULT_BLOSC2_CHUNK_BYTES: usize = 256 * 1024 * 1024;
const BLOSC2_MAX_BUFFERSIZE: usize = (i32::MAX as usize) - 32;
fn ensure_blosc2_init() {
static INIT: Once = Once::new();
INIT.call_once(|| {
unsafe { blosc2_sys::blosc2_init() };
});
}
fn map_err(e: blosc2::Error) -> CompressionError {
CompressionError::Blosc2(format!("{e:?}"))
}
pub(crate) fn codec_to_algo(codec: &Blosc2Codec) -> CompressAlgo {
match codec {
Blosc2Codec::Blosclz => CompressAlgo::Blosclz,
Blosc2Codec::Lz4 => CompressAlgo::Lz4,
Blosc2Codec::Lz4hc => CompressAlgo::Lz4hc,
Blosc2Codec::Zlib => CompressAlgo::Zlib,
Blosc2Codec::Zstd => CompressAlgo::Zstd,
}
}
pub struct Blosc2Compressor {
pub codec: Blosc2Codec,
pub clevel: i32,
pub typesize: usize,
pub nthreads: u32,
}
#[inline]
fn blosc2_nthreads(n: u32) -> usize {
if n == 0 { 1 } else { n as usize }
}
#[inline]
fn effective_chunk_bytes(requested: usize, typesize: usize) -> usize {
let ts = typesize.max(1);
let capped = requested.min(BLOSC2_MAX_BUFFERSIZE);
((capped / ts).max(1) * ts).min(BLOSC2_MAX_BUFFERSIZE)
}
impl Blosc2Compressor {
fn build_cparams(&self) -> Result<CParams, CompressionError> {
let algo = codec_to_algo(&self.codec);
let mut cparams = CParams::default();
cparams
.compressor(algo)
.clevel(self.clevel as u32)
.typesize(self.typesize)
.map_err(map_err)?;
cparams.nthreads(blosc2_nthreads(self.nthreads));
Ok(cparams)
}
fn build_dparams(&self) -> DParams {
let mut dparams = DParams::default();
dparams.nthreads(blosc2_nthreads(self.nthreads));
dparams
}
fn compress_with_chunk_bytes(
&self,
data: &[u8],
requested_chunk_bytes: usize,
) -> Result<CompressResult, CompressionError> {
ensure_blosc2_init();
let cparams = self.build_cparams()?;
let dparams = self.build_dparams();
let mut schunk = SChunk::new(cparams, dparams).map_err(map_err)?;
let chunk_bytes = effective_chunk_bytes(requested_chunk_bytes, self.typesize);
if data.len() <= chunk_bytes {
schunk.append(data).map_err(map_err)?;
} else {
for slice in data.chunks(chunk_bytes) {
schunk.append(slice).map_err(map_err)?;
}
}
let buf = schunk.to_buffer().map_err(map_err)?;
Ok(CompressResult {
data: buf.as_slice().to_vec(),
block_offsets: None,
})
}
}
impl Compressor for Blosc2Compressor {
fn compress(&self, data: &[u8]) -> Result<CompressResult, CompressionError> {
self.compress_with_chunk_bytes(data, DEFAULT_BLOSC2_CHUNK_BYTES)
}
fn decompress(&self, data: &[u8], _expected_size: usize) -> Result<Vec<u8>, CompressionError> {
ensure_blosc2_init();
let mut schunk = SChunk::from_buffer(data.into()).map_err(map_err)?;
let num_chunks = schunk.num_chunks();
if num_chunks == 0 {
return Ok(Vec::new());
}
let mut out: Vec<u8> = Vec::new();
for idx in 0..num_chunks {
let chunk = schunk.get_chunk(idx).map_err(map_err)?;
let bytes = chunk.decompress().map_err(map_err)?;
out.try_reserve(bytes.len()).map_err(|e| {
CompressionError::Blosc2(format!(
"failed to reserve {} bytes for decompressed chunk {idx}: {e}",
bytes.len(),
))
})?;
out.extend_from_slice(&bytes);
}
Ok(out)
}
fn decompress_range(
&self,
data: &[u8],
_block_offsets: &[u64],
byte_pos: usize,
byte_size: usize,
) -> Result<Vec<u8>, CompressionError> {
ensure_blosc2_init();
let schunk = SChunk::from_buffer(data.into()).map_err(map_err)?;
let ts = schunk.typesize();
if ts == 0 {
return Err(CompressionError::Blosc2("typesize is 0".to_string()));
}
let item_start = byte_pos / ts;
let byte_end = byte_pos
.checked_add(byte_size)
.ok_or_else(|| CompressionError::Blosc2("byte range overflow".to_string()))?;
let item_end = byte_end.div_ceil(ts);
let items = schunk.items(item_start..item_end).map_err(map_err)?;
let offset_in_items = byte_pos % ts;
let end = offset_in_items
.checked_add(byte_size)
.ok_or_else(|| CompressionError::Blosc2("range overflow".to_string()))?;
if end > items.len() {
return Err(CompressionError::Blosc2(format!(
"range exceeds decompressed data: need {end} bytes, got {}",
items.len()
)));
}
let slice = &items[offset_in_items..end];
let mut out: Vec<u8> = Vec::new();
out.try_reserve_exact(slice.len()).map_err(|e| {
CompressionError::Blosc2(format!(
"failed to reserve {} bytes for blosc2 range output: {e}",
slice.len()
))
})?;
out.extend_from_slice(slice);
Ok(out)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn small_chunk_compressor() -> Blosc2Compressor {
Blosc2Compressor {
codec: Blosc2Codec::Lz4,
clevel: 5,
typesize: 1,
nthreads: 0,
}
}
#[test]
fn blosc2_round_trip() {
let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
let compressor = Blosc2Compressor {
codec: Blosc2Codec::Lz4,
clevel: 5,
typesize: 1,
nthreads: 0,
};
let result = compressor.compress(&data).unwrap();
let decompressed = compressor.decompress(&result.data, data.len()).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn blosc2_round_trip_4byte() {
let data: Vec<u8> = (0..4000).flat_map(|i: u32| i.to_ne_bytes()).collect();
let compressor = Blosc2Compressor {
codec: Blosc2Codec::Blosclz,
clevel: 5,
typesize: 4,
nthreads: 0,
};
let result = compressor.compress(&data).unwrap();
let decompressed = compressor.decompress(&result.data, data.len()).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn blosc2_range_decode() {
let data: Vec<u8> = (0..8192).map(|i| (i % 256) as u8).collect();
let compressor = Blosc2Compressor {
codec: Blosc2Codec::Lz4,
clevel: 5,
typesize: 1,
nthreads: 0,
};
let result = compressor.compress(&data).unwrap();
let partial = compressor
.decompress_range(&result.data, &[], 200, 500)
.unwrap();
assert_eq!(partial.len(), 500);
assert_eq!(&partial[..], &data[200..700]);
}
#[test]
fn blosc2_round_trip_zstd() {
let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
let compressor = Blosc2Compressor {
codec: Blosc2Codec::Zstd,
clevel: 3,
typesize: 1,
nthreads: 0,
};
let result = compressor.compress(&data).unwrap();
let decompressed = compressor.decompress(&result.data, data.len()).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn blosc2_round_trip_zlib() {
let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
let compressor = Blosc2Compressor {
codec: Blosc2Codec::Zlib,
clevel: 5,
typesize: 1,
nthreads: 0,
};
let result = compressor.compress(&data).unwrap();
let decompressed = compressor.decompress(&result.data, data.len()).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn blosc2_nthreads_round_trip_lossless() {
let data: Vec<u8> = (0..256 * 1024).map(|i| ((i * 31) % 256) as u8).collect();
let seq_compressor = Blosc2Compressor {
codec: Blosc2Codec::Zstd,
clevel: 3,
typesize: 4,
nthreads: 0,
};
let seq_bytes = seq_compressor.compress(&data).unwrap().data;
let seq_rt = seq_compressor.decompress(&seq_bytes, data.len()).unwrap();
assert_eq!(seq_rt, data);
for n in [1u32, 2, 4, 8] {
let par_compressor = Blosc2Compressor {
codec: Blosc2Codec::Zstd,
clevel: 3,
typesize: 4,
nthreads: n,
};
let par_bytes = par_compressor.compress(&data).unwrap().data;
let par_rt = par_compressor.decompress(&par_bytes, data.len()).unwrap();
assert_eq!(
par_rt, data,
"blosc2 nthreads={n} round-trip must match original"
);
let cross_rt = seq_compressor.decompress(&par_bytes, data.len()).unwrap();
assert_eq!(cross_rt, data);
}
}
#[test]
fn blosc2_nthreads_zero_is_deterministic_across_runs() {
let data: Vec<u8> = (0..64 * 1024).map(|i| ((i * 17) % 256) as u8).collect();
let compressor = Blosc2Compressor {
codec: Blosc2Codec::Zstd,
clevel: 3,
typesize: 4,
nthreads: 0,
};
let a = compressor.compress(&data).unwrap().data;
let b = compressor.compress(&data).unwrap().data;
assert_eq!(a, b, "blosc2 nthreads=0 must be deterministic");
}
#[test]
fn blosc2_decompress_path() {
let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
let compressor = Blosc2Compressor {
codec: Blosc2Codec::Lz4,
clevel: 5,
typesize: 1,
nthreads: 0,
};
let compressed = compressor.compress(&data).unwrap().data;
let decoder = Blosc2Compressor {
codec: Blosc2Codec::Lz4,
clevel: 0,
typesize: 1,
nthreads: 0,
};
let decompressed = decoder.decompress(&compressed, data.len()).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn blosc2_multi_chunk_round_trip_short_tail() {
let chunk_bytes = 4096;
let len = 3 * chunk_bytes + 777;
let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();
let compressor = small_chunk_compressor();
let compressed = compressor
.compress_with_chunk_bytes(&data, chunk_bytes)
.unwrap()
.data;
let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
assert_eq!(decompressed.len(), data.len());
assert_eq!(decompressed, data);
let schunk = blosc2::chunk::SChunk::from_buffer(compressed.as_slice().into()).unwrap();
assert!(
schunk.num_chunks() >= 2,
"multi-chunk path not exercised: num_chunks = {}",
schunk.num_chunks()
);
}
#[test]
fn blosc2_multi_chunk_round_trip_exact_multiple() {
let chunk_bytes = 4096;
let len = 4 * chunk_bytes;
let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();
let compressor = small_chunk_compressor();
let compressed = compressor
.compress_with_chunk_bytes(&data, chunk_bytes)
.unwrap()
.data;
let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn blosc2_range_decode_spans_chunk_boundary() {
let chunk_bytes = 4096;
let len = 3 * chunk_bytes + 500;
let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();
let compressor = small_chunk_compressor();
let compressed = compressor
.compress_with_chunk_bytes(&data, chunk_bytes)
.unwrap()
.data;
let range_start = chunk_bytes - 200;
let range_len = 500;
let partial = compressor
.decompress_range(&compressed, &[], range_start, range_len)
.unwrap();
assert_eq!(partial.len(), range_len);
assert_eq!(&partial[..], &data[range_start..range_start + range_len]);
}
#[test]
fn blosc2_small_input_is_single_chunk() {
let data: Vec<u8> = (0..8192).map(|i| (i % 251) as u8).collect();
let compressor = Blosc2Compressor {
codec: Blosc2Codec::Lz4,
clevel: 5,
typesize: 1,
nthreads: 0,
};
let compressed = compressor.compress(&data).unwrap().data;
let schunk = blosc2::chunk::SChunk::from_buffer(compressed.as_slice().into()).unwrap();
assert_eq!(
schunk.num_chunks(),
1,
"small input should stay on single-append fast path"
);
let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn blosc2_input_equal_to_chunk_bytes_stays_single_chunk() {
let chunk_bytes = 4096;
let data: Vec<u8> = (0..chunk_bytes).map(|i| (i % 251) as u8).collect();
let compressor = small_chunk_compressor();
let compressed = compressor
.compress_with_chunk_bytes(&data, chunk_bytes)
.unwrap()
.data;
let schunk = blosc2::chunk::SChunk::from_buffer(compressed.as_slice().into()).unwrap();
assert_eq!(schunk.num_chunks(), 1);
let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn blosc2_multi_chunk_typesize_alignment() {
let chunk_bytes = 4096;
let num_values: usize = 2 * chunk_bytes + 37;
let data: Vec<u8> = (0..num_values)
.flat_map(|i: usize| (i as u32).to_ne_bytes())
.collect();
assert_eq!(data.len() % 4, 0);
let compressor = Blosc2Compressor {
codec: Blosc2Codec::Blosclz,
clevel: 5,
typesize: 4,
nthreads: 0,
};
let compressed = compressor
.compress_with_chunk_bytes(&data, chunk_bytes)
.unwrap()
.data;
let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn blosc2_multi_chunk_non_typesize_aligned_tail() {
let chunk_bytes = 4096;
let len = 2 * chunk_bytes + 13;
assert_ne!(len % 4, 0, "test setup: length must not be 4-aligned");
let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();
let compressor = Blosc2Compressor {
codec: Blosc2Codec::Blosclz,
clevel: 5,
typesize: 4,
nthreads: 0,
};
let compressed = compressor
.compress_with_chunk_bytes(&data, chunk_bytes)
.unwrap()
.data;
let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn blosc2_oversized_chunk_bytes_request_round_trips() {
let data: Vec<u8> = (0..8192).map(|i| (i % 251) as u8).collect();
let compressor = small_chunk_compressor();
let compressed = compressor
.compress_with_chunk_bytes(&data, BLOSC2_MAX_BUFFERSIZE + (1 << 30))
.unwrap()
.data;
let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn effective_chunk_bytes_never_exceeds_blosc2_max() {
for ts in [1usize, 2, 4, 8, 16, 32, 64, 128, 255] {
for &req in &[
0usize,
1,
BLOSC2_MAX_BUFFERSIZE / 2,
BLOSC2_MAX_BUFFERSIZE - 1,
BLOSC2_MAX_BUFFERSIZE,
BLOSC2_MAX_BUFFERSIZE + 1,
BLOSC2_MAX_BUFFERSIZE + (1 << 30),
usize::MAX,
] {
let got = effective_chunk_bytes(req, ts);
assert!(
got <= BLOSC2_MAX_BUFFERSIZE,
"effective_chunk_bytes({req}, {ts}) = {got} exceeds BLOSC2_MAX_BUFFERSIZE = {BLOSC2_MAX_BUFFERSIZE}",
);
}
}
}
#[test]
fn effective_chunk_bytes_is_typesize_aligned() {
for ts in [1usize, 2, 3, 4, 7, 8, 16, 32, 64, 128, 255] {
for &req in &[0usize, 1, 7, 8, 9, 4095, 4096, 4097, 1 << 20, 1 << 30] {
let got = effective_chunk_bytes(req, ts);
assert_eq!(
got % ts,
0,
"effective_chunk_bytes({req}, {ts}) = {got} not a multiple of {ts}",
);
assert!(
got >= ts,
"effective_chunk_bytes({req}, {ts}) = {got} below typesize",
);
}
}
}
#[test]
fn effective_chunk_bytes_rounds_up_below_typesize() {
assert_eq!(effective_chunk_bytes(0, 8), 8);
assert_eq!(effective_chunk_bytes(1, 8), 8);
assert_eq!(effective_chunk_bytes(7, 8), 8);
assert_eq!(effective_chunk_bytes(8, 8), 8);
}
#[test]
fn effective_chunk_bytes_zero_typesize_is_defensive() {
assert_eq!(effective_chunk_bytes(0, 0), 1);
assert_eq!(effective_chunk_bytes(4096, 0), 4096);
assert_eq!(
effective_chunk_bytes(BLOSC2_MAX_BUFFERSIZE + 1, 0),
BLOSC2_MAX_BUFFERSIZE
);
}
}