use std::cell::RefCell;
use zstd::block::Decompressor;
const MIN_CAPACITY: usize = 1024 * 1024 * 4;
const MAX_CAPACITY: usize = 1024 * 1024 * 16;
pub(crate) struct DecompressionState {
decompressor: Decompressor,
buffer: Vec<u8>,
}
impl DecompressionState {
fn new() -> Self {
Self {
decompressor: Decompressor::new(),
buffer: vec![0u8; MIN_CAPACITY],
}
}
fn decompress_and_transform<F, R>(
&mut self,
compressed: &[u8],
f: &mut F,
) -> std::io::Result<(usize, R)>
where
F: FnMut(&[u8]) -> R,
{
let capacity = Decompressor::upper_bound(compressed)
.unwrap_or(MAX_CAPACITY)
.min(MAX_CAPACITY);
let mut tmp = Vec::new();
let buffer = if capacity <= MIN_CAPACITY {
&mut self.buffer[..]
} else {
tmp.resize(capacity, 0u8);
&mut tmp
};
let span = tracing::trace_span!("decompress_and_transform");
let _entered = span.enter();
let len = self.decompressor.decompress_to_buffer(compressed, buffer)?;
let result = f(&buffer[0..len]);
Ok((len, result))
}
}
thread_local!(static DECOMPRESSOR: RefCell<DecompressionState> = RefCell::new(DecompressionState::new()));
pub fn decompress_and_transform<F, R>(compressed: &[u8], f: &mut F) -> std::io::Result<(usize, R)>
where
F: FnMut(&[u8]) -> R,
{
DECOMPRESSOR.with(|d| d.borrow_mut().decompress_and_transform(compressed, f))
}
#[cfg(test)]
mod tests {
use super::*;
use quickcheck::quickcheck;
use std::io::Cursor;
#[quickcheck]
fn thread_local_compression_decompression(data: Vec<u8>) -> anyhow::Result<bool> {
let cursor = Cursor::new(&data);
let compressed = zstd::encode_all(cursor, 0)?;
let (size, decompressed) = decompress_and_transform(&compressed, &mut |x| x.to_vec())?;
Ok(size == decompressed.len() && data == decompressed)
}
}