Skip to main content

crabka_compression/
lib.rs

1//! Kafka wire-protocol compression codecs.
2//!
3//! Kafka uses four codecs on the wire — gzip, snappy, lz4, zstd — each with
4//! specific framing conventions:
5//!
6//! - **gzip**: standard RFC-1952 gzip via `flate2` (pure-Rust `miniz_oxide`
7//!   backend).
8//! - **snappy**: xerial-snappy framing over `snap` raw blocks. Kafka does not
9//!   use the standard Google Snappy stream format; it uses the xerial framing
10//!   (8-byte magic header, two 4-byte version fields, then a sequence of
11//!   `u32-BE` length-prefixed raw snappy chunks).
12//! - **lz4**: LZ4 frame format (magic `0x04 22 4D 18`) with independent blocks
13//!   and 64 KiB block size, matching `KafkaLZ4BlockOutputStream`'s defaults.
14//! - **zstd**: plain zstd at compression level 3 (Kafka's default).
15//!
16//! Each codec is behind a Cargo feature (`gzip`, `snappy`, `lz4`, `zstd`), all
17//! enabled by default. Disabling a feature leaves the API stable but returns
18//! `Err(`[`CompressionError::FeatureDisabled`]`)` at runtime.
19//!
20//! ## Compress and decompress a record payload
21//!
22//! ```rust
23//! use crabka_compression::{CompressionType, compress, decompress};
24//!
25//! # fn run() -> Result<(), Box<dyn std::error::Error>> {
26//! let compressed = compress(CompressionType::Lz4, b"order-created")?;
27//! let plain = decompress(CompressionType::Lz4, &compressed, 1024)?;
28//! assert_eq!(plain.as_ref(), b"order-created");
29//! # Ok(())
30//! # }
31//! ```
32
33mod codec_type;
34mod error;
35
36pub use codec_type::CompressionType;
37pub use error::CompressionError;
38
39use bytes::Bytes;
40
41/// Compress `data` using the codec identified by `ct`.
42///
43/// For `CompressionType::None`, returns the input unchanged (wrapped in a
44/// new `Bytes`). For other codecs, dispatches to the per-codec module.
45/// If the codec's Cargo feature is not enabled, returns
46/// `Err(CompressionError::FeatureDisabled(_))`.
47pub fn compress(ct: CompressionType, data: &[u8]) -> Result<Bytes, CompressionError> {
48    match ct {
49        CompressionType::None => Ok(Bytes::copy_from_slice(data)),
50        CompressionType::Gzip => gzip_compress(data),
51        CompressionType::Snappy => snappy_compress(data),
52        CompressionType::Lz4 => lz4_compress(data),
53        CompressionType::Zstd => zstd_compress(data),
54    }
55}
56
57/// Decompress `data` using the codec identified by `ct`. See `compress`.
58///
59/// `max_output` bounds the size of the decompressed output: if decompression
60/// would produce more than `max_output` bytes, returns
61/// `Err(CompressionError::TooLarge { .. })` without materializing the oversized
62/// buffer. This guards against decompression bombs on the untrusted decode
63/// path. Callers that handle wire input should derive `max_output` from the
64/// compressed length (e.g. a bounded ratio plus an absolute ceiling).
65pub fn decompress(
66    ct: CompressionType,
67    data: &[u8],
68    max_output: usize,
69) -> Result<Bytes, CompressionError> {
70    match ct {
71        CompressionType::None => {
72            if data.len() > max_output {
73                Err(CompressionError::TooLarge { limit: max_output })
74            } else {
75                Ok(Bytes::copy_from_slice(data))
76            }
77        }
78        CompressionType::Gzip => gzip_decompress(data, max_output),
79        CompressionType::Snappy => snappy_decompress(data, max_output),
80        CompressionType::Lz4 => lz4_decompress(data, max_output),
81        CompressionType::Zstd => zstd_decompress(data, max_output),
82    }
83}
84
85// --- per-codec dispatch, with feature-gated stubs ------------------------
86
87#[cfg(feature = "gzip")]
88mod gzip;
89#[cfg(feature = "gzip")]
90use crate::gzip::{compress as gzip_compress, decompress as gzip_decompress};
91#[cfg(not(feature = "gzip"))]
92fn gzip_compress(_: &[u8]) -> Result<Bytes, CompressionError> {
93    Err(CompressionError::FeatureDisabled("gzip"))
94}
95#[cfg(not(feature = "gzip"))]
96fn gzip_decompress(_: &[u8], _: usize) -> Result<Bytes, CompressionError> {
97    Err(CompressionError::FeatureDisabled("gzip"))
98}
99
100#[cfg(feature = "snappy")]
101mod snappy;
102#[cfg(feature = "snappy")]
103use crate::snappy::{compress as snappy_compress, decompress as snappy_decompress};
104#[cfg(not(feature = "snappy"))]
105fn snappy_compress(_: &[u8]) -> Result<Bytes, CompressionError> {
106    Err(CompressionError::FeatureDisabled("snappy"))
107}
108#[cfg(not(feature = "snappy"))]
109fn snappy_decompress(_: &[u8], _: usize) -> Result<Bytes, CompressionError> {
110    Err(CompressionError::FeatureDisabled("snappy"))
111}
112
113#[cfg(feature = "lz4")]
114mod lz4;
115#[cfg(feature = "lz4")]
116use crate::lz4::{compress as lz4_compress, decompress as lz4_decompress};
117#[cfg(not(feature = "lz4"))]
118fn lz4_compress(_: &[u8]) -> Result<Bytes, CompressionError> {
119    Err(CompressionError::FeatureDisabled("lz4"))
120}
121#[cfg(not(feature = "lz4"))]
122fn lz4_decompress(_: &[u8], _: usize) -> Result<Bytes, CompressionError> {
123    Err(CompressionError::FeatureDisabled("lz4"))
124}
125
126#[cfg(feature = "zstd")]
127mod zstd;
128#[cfg(feature = "zstd")]
129use crate::zstd::{compress as zstd_compress, decompress as zstd_decompress};
130#[cfg(not(feature = "zstd"))]
131fn zstd_compress(_: &[u8]) -> Result<Bytes, CompressionError> {
132    Err(CompressionError::FeatureDisabled("zstd"))
133}
134#[cfg(not(feature = "zstd"))]
135fn zstd_decompress(_: &[u8], _: usize) -> Result<Bytes, CompressionError> {
136    Err(CompressionError::FeatureDisabled("zstd"))
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use assert2::assert;
143
144    #[test]
145    fn passthrough_none_compress() {
146        let out = compress(CompressionType::None, b"abcdef").unwrap();
147        assert!(out.as_ref() == b"abcdef");
148    }
149
150    #[test]
151    fn passthrough_none_decompress() {
152        let out = decompress(CompressionType::None, b"abcdef", 1024).unwrap();
153        assert!(out.as_ref() == b"abcdef");
154    }
155
156    #[test]
157    fn passthrough_none_decompress_respects_cap() {
158        // Input larger than the cap is rejected even for the None passthrough.
159        assert!(matches!(
160            decompress(CompressionType::None, b"abcdef", 3),
161            Err(CompressionError::TooLarge { limit: 3 })
162        ));
163    }
164}