Skip to main content

reddb_server/storage/cache/
compressor.rs

1//! L2 Blob Compressor.
2//!
3//! Stateless, deep module encapsulating compression of blob payloads spilled to
4//! the L2 (on-disk) tier of [`BlobCache`]. The purpose is to shrink durable blob
5//! footprints without harming hot-path latency or wasting CPU on payloads that
6//! are already incompressible.
7//!
8//! # Design
9//!
10//! The module is intentionally small and side-effect free:
11//!
12//! - All operations are static — no internal state, no allocator pinning, no
13//!   threading concerns. Inputs are `&[u8]` slices, outputs are owned `Vec<u8>`.
14//! - Compression is best-effort: when the input is small, when the content type
15//!   already represents a compressed media format, or when the `zstd` output
16//!   fails to shrink the input meaningfully, the original bytes are returned
17//!   inside [`Compressed::Raw`] and no encode is performed (or its result is
18//!   discarded).
19//! - The compressed variant carries the original byte length so decompression
20//!   can pre-allocate exactly and verify the encoded length on decode.
21//!
22//! Wiring into [`BlobCache`] L2 `put`/`get` is performed in a follow-up slice;
23//! this module is additive and has no callers in this commit.
24
25use std::fmt;
26
27/// Default `zstd` compression level — favours encode speed over ratio. The L2
28/// tier is meant to amplify capacity, not to be the smallest possible store.
29pub const DEFAULT_ZSTD_LEVEL: i32 = 1;
30
31/// Default minimum payload size eligible for compression. Sub-kilobyte payloads
32/// rarely benefit and the framing overhead can exceed any savings.
33pub const DEFAULT_MIN_BYTES: usize = 1024;
34
35/// Default cutoff ratio above which the compressed bytes are discarded.
36/// `0.95` means we require at least a 5% reduction to keep the encoded form.
37pub const DEFAULT_MAX_RATIO: f64 = 0.95;
38
39/// Configuration knobs for [`L2BlobCompressor::compress`].
40#[derive(Clone, Copy, Debug)]
41pub struct CompressOpts {
42    /// `zstd` compression level. Higher is slower / smaller.
43    pub level: i32,
44    /// Inputs strictly smaller than this byte count are returned raw.
45    pub min_bytes: usize,
46    /// Skip the encoded form when `compressed.len() >= input.len() * max_ratio`.
47    pub max_ratio: f64,
48}
49
50impl Default for CompressOpts {
51    fn default() -> Self {
52        Self {
53            level: DEFAULT_ZSTD_LEVEL,
54            min_bytes: DEFAULT_MIN_BYTES,
55            max_ratio: DEFAULT_MAX_RATIO,
56        }
57    }
58}
59
60/// Storage-ready representation of a blob payload after the compressor has
61/// inspected it. `Raw` is byte-equivalent to the input; `Zstd` carries an
62/// encoded payload plus the original byte length for verification.
63#[derive(Clone, Debug, PartialEq, Eq)]
64pub enum Compressed {
65    /// The bytes were left untouched (skip rule fired or no shrinkage).
66    Raw(Vec<u8>),
67    /// `zstd`-encoded payload. `original_len` is the byte length the decoded
68    /// stream must produce.
69    Zstd { bytes: Vec<u8>, original_len: u32 },
70}
71
72impl Compressed {
73    /// Length of the on-disk payload (encoded bytes for `Zstd`, raw bytes for
74    /// `Raw`). Useful for L2 budget accounting.
75    pub fn stored_len(&self) -> usize {
76        match self {
77            Self::Raw(b) => b.len(),
78            Self::Zstd { bytes, .. } => bytes.len(),
79        }
80    }
81
82    /// Length the bytes occupy after decompression — equal to the original
83    /// input size in both variants.
84    pub fn original_len(&self) -> usize {
85        match self {
86            Self::Raw(b) => b.len(),
87            Self::Zstd { original_len, .. } => *original_len as usize,
88        }
89    }
90
91    /// `true` when the payload is `zstd`-encoded.
92    pub fn is_compressed(&self) -> bool {
93        matches!(self, Self::Zstd { .. })
94    }
95}
96
97/// Errors produced by the compressor.
98#[derive(Debug)]
99pub enum CompressError {
100    /// `zstd` failed to encode the payload. The inner string is the encoder
101    /// error rendered for diagnostics.
102    ZstdEncode(String),
103    /// `zstd` failed to decode the payload, or the decoded length did not
104    /// match the recorded `original_len`.
105    ZstdDecode(String),
106    /// A persisted [`Compressed`] value was tagged with an unknown format.
107    /// Reserved for forward-compatible callers; not produced by this module.
108    UnknownFormat,
109    /// Input payload exceeds `u32::MAX` bytes — the original-length field
110    /// cannot represent it.
111    OversizeOriginal(usize),
112}
113
114impl fmt::Display for CompressError {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        match self {
117            Self::ZstdEncode(msg) => write!(f, "zstd encode failed: {msg}"),
118            Self::ZstdDecode(msg) => write!(f, "zstd decode failed: {msg}"),
119            Self::UnknownFormat => write!(f, "unknown compressed format"),
120            Self::OversizeOriginal(n) => {
121                write!(
122                    f,
123                    "payload of {n} bytes exceeds u32::MAX original-length cap"
124                )
125            }
126        }
127    }
128}
129
130impl std::error::Error for CompressError {}
131
132/// Stateless compressor for L2 blob payloads.
133///
134/// All methods are associated functions; the type carries no data and is
135/// unconstructible at runtime. It exists purely as a namespace and to keep the
136/// public surface in one place.
137pub struct L2BlobCompressor;
138
139impl L2BlobCompressor {
140    /// Compress `bytes`, honouring the skip rules in [`CompressOpts`] and the
141    /// `content_type` hint.
142    ///
143    /// # Skip rules
144    ///
145    /// 1. `bytes.len() < opts.min_bytes` → returns [`Compressed::Raw`].
146    /// 2. `content_type` matches a pre-compressed media bucket
147    ///    (see [`is_precompressed_media`]) → returns [`Compressed::Raw`].
148    /// 3. The encoded payload would be `>= bytes.len() * opts.max_ratio` →
149    ///    returns [`Compressed::Raw`].
150    ///
151    /// # Errors
152    ///
153    /// - [`CompressError::OversizeOriginal`] if `bytes.len() > u32::MAX`.
154    /// - [`CompressError::ZstdEncode`] if the underlying encoder fails.
155    pub fn compress(
156        bytes: &[u8],
157        content_type: Option<&str>,
158        opts: &CompressOpts,
159    ) -> Result<Compressed, CompressError> {
160        // Reject payloads that cannot be represented in the on-disk header.
161        if bytes.len() > u32::MAX as usize {
162            return Err(CompressError::OversizeOriginal(bytes.len()));
163        }
164
165        // Skip rule 1: small payload — framing overhead dominates.
166        if bytes.len() < opts.min_bytes {
167            return Ok(Compressed::Raw(bytes.to_vec()));
168        }
169
170        // Skip rule 2: content type already represents compressed media.
171        if let Some(ct) = content_type {
172            if is_precompressed_media(ct) {
173                return Ok(Compressed::Raw(bytes.to_vec()));
174            }
175        }
176
177        // Encode via zstd. Errors here are surfaced — the caller can choose to
178        // fall back to a raw write rather than failing the L2 put outright.
179        let encoded = zstd::stream::encode_all(bytes, opts.level)
180            .map_err(|e| CompressError::ZstdEncode(e.to_string()))?;
181
182        // Skip rule 3: no meaningful shrinkage. Comparing as f64 keeps the
183        // ratio knob expressive (e.g. 0.5 to require 2x reduction).
184        let cutoff = (bytes.len() as f64) * opts.max_ratio;
185        if (encoded.len() as f64) >= cutoff {
186            return Ok(Compressed::Raw(bytes.to_vec()));
187        }
188
189        Ok(Compressed::Zstd {
190            bytes: encoded,
191            original_len: bytes.len() as u32,
192        })
193    }
194
195    /// Decompress a previously-stored [`Compressed`] payload back to the
196    /// original byte slice.
197    ///
198    /// # Errors
199    ///
200    /// - [`CompressError::ZstdDecode`] if the encoded bytes are malformed or if
201    ///   the decoded length does not match the recorded `original_len`.
202    pub fn decompress(c: &Compressed) -> Result<Vec<u8>, CompressError> {
203        match c {
204            Compressed::Raw(b) => Ok(b.clone()),
205            Compressed::Zstd {
206                bytes,
207                original_len,
208            } => {
209                let mut out: Vec<u8> = Vec::with_capacity(*original_len as usize);
210                let written = {
211                    let mut decoder = zstd::stream::Decoder::new(bytes.as_slice())
212                        .map_err(|e| CompressError::ZstdDecode(e.to_string()))?;
213                    std::io::copy(&mut decoder, &mut out)
214                        .map_err(|e| CompressError::ZstdDecode(e.to_string()))?
215                };
216                if written as usize != *original_len as usize {
217                    return Err(CompressError::ZstdDecode(format!(
218                        "decoded {written} bytes, expected {original_len}"
219                    )));
220                }
221                Ok(out)
222            }
223        }
224    }
225}
226
227/// Returns `true` when the supplied MIME-style content type names a media
228/// format that is already compressed and therefore not worth re-encoding.
229///
230/// Exceptions:
231///
232/// - `image/svg+xml` is treated as XML text and remains eligible.
233/// - `audio/wav` and `audio/x-wav` are uncompressed PCM and remain eligible.
234fn is_precompressed_media(content_type: &str) -> bool {
235    // Strip parameters such as `;charset=utf-8` and normalise case.
236    let head = content_type.split(';').next().unwrap_or("").trim();
237    let lower = head.to_ascii_lowercase();
238
239    if let Some(rest) = lower.strip_prefix("image/") {
240        // SVG is text-based and benefits substantially from zstd.
241        return rest != "svg+xml";
242    }
243    if lower.starts_with("video/") {
244        return true;
245    }
246    if let Some(rest) = lower.strip_prefix("audio/") {
247        // PCM WAV is uncompressed — let zstd handle it.
248        return !matches!(rest, "wav" | "x-wav");
249    }
250
251    matches!(
252        lower.as_str(),
253        "application/zip"
254            | "application/gzip"
255            | "application/x-brotli"
256            | "application/x-zstd"
257            | "application/octet-stream"
258    )
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    /// Tiny pseudo-random byte generator. We deliberately avoid pulling in a
266    /// crate dependency for tests — a linear congruential generator gives us
267    /// reproducible "random-looking" bytes that round-trip exactly.
268    fn pseudo_random(seed: u64, len: usize) -> Vec<u8> {
269        let mut state = seed.wrapping_mul(0x9E37_79B9_7F4A_7C15).wrapping_add(1);
270        let mut out = Vec::with_capacity(len);
271        for _ in 0..len {
272            state = state
273                .wrapping_mul(6364136223846793005)
274                .wrapping_add(1442695040888963407);
275            out.push((state >> 33) as u8);
276        }
277        out
278    }
279
280    fn lorem_4kb() -> Vec<u8> {
281        let unit = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit. \
282                     Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. \
283                     Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris \
284                     nisi ut aliquip ex ea commodo consequat. ";
285        let mut out = Vec::with_capacity(4096 + unit.len());
286        while out.len() < 4096 {
287            out.extend_from_slice(unit);
288        }
289        out.truncate(4096);
290        out
291    }
292
293    #[test]
294    fn round_trip_property_across_sizes() {
295        // Sample sizes from 0 to 16384 across a spread of seeds. Every output
296        // must decompress back to the exact input regardless of which branch
297        // (Raw / Zstd) the compressor chose.
298        let opts = CompressOpts::default();
299        let sizes = [
300            0usize, 1, 16, 64, 255, 511, 1023, 1024, 1025, 2048, 4096, 8192, 12345, 16384,
301        ];
302        for (i, &len) in sizes.iter().enumerate() {
303            let input = pseudo_random(0xDEAD_BEEF ^ (i as u64), len);
304            let compressed = L2BlobCompressor::compress(&input, None, &opts)
305                .expect("compress should not fail on in-memory input");
306            let decoded = L2BlobCompressor::decompress(&compressed)
307                .expect("decompress should not fail on freshly-encoded input");
308            assert_eq!(decoded, input, "round-trip mismatch at len={len}");
309            assert_eq!(compressed.original_len(), input.len());
310        }
311    }
312
313    #[test]
314    fn text_payload_shrinks_at_least_thirty_percent() {
315        let input = lorem_4kb();
316        let opts = CompressOpts::default();
317        let compressed = L2BlobCompressor::compress(&input, Some("text/plain"), &opts)
318            .expect("compress text payload");
319        match compressed {
320            Compressed::Zstd {
321                bytes,
322                original_len,
323            } => {
324                assert_eq!(original_len as usize, input.len());
325                let ratio = bytes.len() as f64 / input.len() as f64;
326                assert!(
327                    ratio <= 0.70,
328                    "expected >=30% reduction, got ratio {ratio} ({}/{})",
329                    bytes.len(),
330                    input.len()
331                );
332            }
333            other => panic!("expected Zstd variant for repetitive text, got {other:?}"),
334        }
335    }
336
337    #[test]
338    fn tiny_payload_returns_raw() {
339        let input = vec![0xABu8; 64]; // below default min_bytes (1024)
340        let opts = CompressOpts::default();
341        let out = L2BlobCompressor::compress(&input, None, &opts).unwrap();
342        match out {
343            Compressed::Raw(bytes) => assert_eq!(bytes, input),
344            other => panic!("expected Raw for tiny payload, got {other:?}"),
345        }
346    }
347
348    #[test]
349    fn image_png_content_type_returns_raw_even_when_large() {
350        // Highly compressible payload, but the content-type rule short-circuits.
351        let input = vec![0u8; 8 * 1024];
352        let opts = CompressOpts::default();
353        let out = L2BlobCompressor::compress(&input, Some("image/png"), &opts).unwrap();
354        assert!(matches!(out, Compressed::Raw(_)), "PNG must be Raw");
355    }
356
357    #[test]
358    fn image_svg_is_compressed_as_exception() {
359        // SVG is XML text and should be eligible for compression.
360        let mut input = Vec::new();
361        let chunk =
362            b"<svg xmlns='http://www.w3.org/2000/svg'><rect width='10' height='10'/></svg>\n";
363        while input.len() < 4096 {
364            input.extend_from_slice(chunk);
365        }
366        let opts = CompressOpts::default();
367        let out = L2BlobCompressor::compress(&input, Some("image/svg+xml"), &opts).unwrap();
368        assert!(out.is_compressed(), "image/svg+xml should be compressed");
369    }
370
371    #[test]
372    fn high_entropy_payload_returns_raw_via_max_ratio_gate() {
373        // Pseudo-random bytes have ~no redundancy; zstd cannot meaningfully
374        // shrink them and the max_ratio gate must reject the encoded form.
375        let input = pseudo_random(0xCAFE_F00D, 8 * 1024);
376        let opts = CompressOpts::default();
377        let out = L2BlobCompressor::compress(&input, None, &opts).unwrap();
378        match out {
379            Compressed::Raw(bytes) => assert_eq!(bytes, input),
380            Compressed::Zstd { bytes, .. } => {
381                panic!(
382                    "high-entropy input was kept as Zstd ({} bytes vs {} original) — \
383                     max_ratio gate failed",
384                    bytes.len(),
385                    input.len()
386                );
387            }
388        }
389    }
390
391    #[test]
392    fn malformed_zstd_bytes_yield_decode_error() {
393        let bogus = Compressed::Zstd {
394            bytes: vec![0x00, 0x01, 0x02, 0x03, 0xFF, 0xFE, 0xFD, 0xFC],
395            original_len: 4096,
396        };
397        let err = L2BlobCompressor::decompress(&bogus).expect_err("must fail to decode");
398        assert!(
399            matches!(err, CompressError::ZstdDecode(_)),
400            "expected ZstdDecode, got {err:?}"
401        );
402    }
403
404    #[test]
405    fn decoded_length_mismatch_yields_decode_error() {
406        // Encode 1024 bytes but lie about the original length so the post-decode
407        // verification step trips.
408        let input = lorem_4kb();
409        let truthful =
410            L2BlobCompressor::compress(&input, Some("text/plain"), &CompressOpts::default())
411                .unwrap();
412        let lying = match truthful {
413            Compressed::Zstd { bytes, .. } => Compressed::Zstd {
414                bytes,
415                original_len: (input.len() as u32) + 1,
416            },
417            other => panic!("expected Zstd, got {other:?}"),
418        };
419        let err = L2BlobCompressor::decompress(&lying).expect_err("must fail length check");
420        assert!(matches!(err, CompressError::ZstdDecode(_)));
421    }
422
423    /// Synthetic oversize check — we cannot allocate 4 GiB in a unit test, so
424    /// we forge a slice header pointing at a tiny backing buffer but reporting
425    /// a length larger than `u32::MAX`. The compressor must reject by length
426    /// inspection alone, before touching the bytes.
427    ///
428    /// SAFETY: we never read from the synthetic slice. `compress` only inspects
429    /// `bytes.len()` on the early-exit path that this test exercises.
430    #[test]
431    fn oversize_input_returns_oversize_error() {
432        let backing = [0u8; 16];
433        let fake_len = (u32::MAX as usize) + 1;
434        // Build a slice with an inflated length that we promise never to read.
435        let oversized: &[u8] = unsafe { std::slice::from_raw_parts(backing.as_ptr(), fake_len) };
436        let err = L2BlobCompressor::compress(oversized, None, &CompressOpts::default())
437            .expect_err("must reject oversize input");
438        match err {
439            CompressError::OversizeOriginal(n) => assert_eq!(n, fake_len),
440            other => panic!("expected OversizeOriginal, got {other:?}"),
441        }
442    }
443
444    #[test]
445    fn precompressed_media_classifier_handles_known_buckets() {
446        assert!(is_precompressed_media("image/png"));
447        assert!(is_precompressed_media("image/jpeg"));
448        assert!(!is_precompressed_media("image/svg+xml"));
449        assert!(is_precompressed_media("video/mp4"));
450        assert!(is_precompressed_media("video/webm"));
451        assert!(is_precompressed_media("audio/mpeg"));
452        assert!(!is_precompressed_media("audio/wav"));
453        assert!(!is_precompressed_media("audio/x-wav"));
454        assert!(is_precompressed_media("application/zip"));
455        assert!(is_precompressed_media("application/gzip"));
456        assert!(is_precompressed_media("application/x-brotli"));
457        assert!(is_precompressed_media("application/x-zstd"));
458        assert!(is_precompressed_media("application/octet-stream"));
459        assert!(!is_precompressed_media("text/plain"));
460        assert!(!is_precompressed_media("application/json"));
461        // Parameter handling + casing.
462        assert!(is_precompressed_media("Image/PNG; foo=bar"));
463        assert!(!is_precompressed_media("Text/Plain; charset=utf-8"));
464    }
465}