znippy-common 0.7.3

Core logic and data structures for Znippy, a parallel chunked compression system.
//! Codec layer: OpenZL compression/decompression.
//! OpenZL is znippy's codec — it wraps zstd+lz4 with improved framing.

use anyhow::{Result, anyhow};

// ─── Compression Context ────────────────────────────────────────────

pub struct CompressCtx {
    cctx: openzl_sys_rs::ZlCCtx,
    level: i32,
}

unsafe impl Send for CompressCtx {}

impl CompressCtx {
    pub fn new(compression_level: i32) -> Result<Self> {
        use openzl_sys_rs::*;
        let mut cctx = ZlCCtx::new().ok_or_else(|| anyhow!("ZL_CCtx_create failed"))?;
        let version = unsafe { ZL_getDefaultEncodingVersion() } as i32;
        // stickyParameters=1 keeps params across compress calls (reuse ctx)
        cctx.set_parameter(ZL_CParam_ZL_CParam_stickyParameters, 1)
            .map_err(|e| anyhow!(e))?;
        cctx.set_parameter(ZL_CParam_ZL_CParam_formatVersion, version)
            .map_err(|e| anyhow!(e))?;
        cctx.set_parameter(ZL_CParam_ZL_CParam_compressionLevel, compression_level)
            .map_err(|e| anyhow!(e))?;
        Ok(Self { cctx, level: compression_level })
    }

    pub fn compress(&mut self, input: &[u8]) -> Result<Vec<u8>> {
        use openzl_sys_rs::zl_compress_bound;
        let bound = zl_compress_bound(input.len());
        let mut output = vec![0u8; bound];
        let compressed_size = self.cctx.compress(&mut output, input)
            .map_err(|e| anyhow!(e))?;
        output.truncate(compressed_size);
        Ok(output)
    }

    /// Compress into a reusable buffer; returns the number of bytes written
    /// (`out` is truncated to that). Reuse the same `out` across slices to keep
    /// the compress path allocation-free (the no-hot-path-alloc rule).
    pub fn compress_into(&mut self, input: &[u8], out: &mut Vec<u8>) -> Result<usize> {
        use openzl_sys_rs::zl_compress_bound;
        let bound = zl_compress_bound(input.len());
        if out.len() < bound {
            out.resize(bound, 0);
        }
        let compressed_size = self
            .cctx
            .compress(out.as_mut_slice(), input)
            .map_err(|e| anyhow!(e))?;
        out.truncate(compressed_size);
        Ok(compressed_size)
    }
}

pub fn decompress_frame(compressed: &[u8]) -> Result<Vec<u8>> {
    let mut out = Vec::new();
    decompress_into(compressed, &mut out)?;
    Ok(out)
}

/// Decompress into a reusable buffer; returns bytes written (`out` is truncated
/// to that). Reuse the same `out` across chunks to keep the decompress path
/// allocation-free (the no-hot-path-alloc rule).
pub fn decompress_into(compressed: &[u8], out: &mut Vec<u8>) -> Result<usize> {
    use openzl_sys_rs::*;
    let decompressed_size = zl_get_decompressed_size(compressed)
        .map_err(|e| anyhow!("OpenZL getDecompressedSize: {}", e))?;
    if out.len() < decompressed_size {
        out.resize(decompressed_size, 0);
    }
    let written = zl_decompress(&mut out[..decompressed_size], compressed)
        .map_err(|e| anyhow!("OpenZL decompress: {}", e))?;
    out.truncate(written);
    Ok(written)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_roundtrip() {
        let mut ctx = CompressCtx::new(3).unwrap();
        let input = b"Hello world! This is a test of compression roundtrip. Repeated data helps compression. Repeated data helps compression. Repeated data helps compression.";
        let compressed = ctx.compress(input).unwrap();
        println!("Compressed {} -> {} bytes", input.len(), compressed.len());
        let decompressed = decompress_frame(&compressed).unwrap();
        assert_eq!(&decompressed[..], &input[..]);
    }

    #[test]
    fn test_multi_compress_same_ctx() {
        let mut ctx = CompressCtx::new(3).unwrap();
        for i in 0..10 {
            let input: Vec<u8> = (0..4096).map(|x| ((x + i) % 251) as u8).collect();
            let compressed = ctx.compress(&input).unwrap();
            let decompressed = decompress_frame(&compressed).unwrap();
            assert_eq!(decompressed, input, "Failed at iteration {}", i);
        }
        println!("10 sequential compress calls OK");
    }

    #[test]
    fn test_parallel_contexts() {
        let handles: Vec<_> = (0..8).map(|t| {
            std::thread::spawn(move || {
                let mut ctx = CompressCtx::new(3).unwrap();
                for i in 0..5 {
                    let input: Vec<u8> = (0..8192).map(|x| ((x + i + t*100) % 251) as u8).collect();
                    let compressed = ctx.compress(&input).unwrap();
                    let decompressed = decompress_frame(&compressed).unwrap();
                    assert_eq!(decompressed, input);
                }
            })
        }).collect();
        for h in handles {
            h.join().unwrap();
        }
        println!("8 parallel contexts x 5 calls each OK");
    }
}