Skip to main content

s4_codec/
cpu_gzip.rs

1//! RFC 1952 gzip codec via `flate2` (v0.4 #26).
2//!
3//! Why CPU and not GPU: nvCOMP's GDeflate produces a multi-stream
4//! parallel-decode-friendly format that is **not** a single valid DEFLATE
5//! stream — wrapping it with a gzip header doesn't make stock `gunzip`
6//! decode it. To deliver the actual user-facing value of issue #26 (= "an
7//! S3 object S4 stored that any browser / curl / standard library can
8//! decompress without knowing about S4"), the codec has to emit a real
9//! gzip stream. CPU `flate2` is the right tool.
10//!
11//! Trade-off: no GPU acceleration on this codec. For wire-compat against
12//! gunzip-aware clients use `cpu-gzip`; for raw GPU throughput where
13//! everyone speaks S4 use `nvcomp-zstd` / `nvcomp-bitcomp`.
14//!
15//! Default compression level is 6 — `flate2`'s default and the same level
16//! `gzip(1)` uses out of the box. Range 0..=9 (= flate2::Compression range).
17
18use std::io::{Read, Write};
19
20use bytes::Bytes;
21use flate2::Compression;
22use flate2::read::GzDecoder;
23use flate2::write::GzEncoder;
24
25use crate::{
26    ChunkManifest, Codec, CodecError, CodecKind, DECOMPRESS_BOOTSTRAP_CAPACITY,
27    validate_decompress_manifest,
28};
29
30/// CPU gzip codec (RFC 1952). `level` clamped to 0..=9.
31#[derive(Debug, Clone)]
32pub struct CpuGzip {
33    level: u32,
34}
35
36impl CpuGzip {
37    pub const DEFAULT_LEVEL: u32 = 6;
38
39    pub fn new(level: u32) -> Self {
40        Self {
41            level: level.min(9),
42        }
43    }
44}
45
46impl Default for CpuGzip {
47    fn default() -> Self {
48        Self::new(Self::DEFAULT_LEVEL)
49    }
50}
51
52/// Sync, runtime-free decompress used by `s4-codec-wasm` (browser / WASM has
53/// no tokio runtime). Same checks as the trait implementation: codec/size
54/// match, decompression-bomb cap at `manifest.original_size + 1024`, crc32c
55/// verify after.
56pub fn decompress_blocking(input: &[u8], manifest: &ChunkManifest) -> Result<Vec<u8>, CodecError> {
57    if manifest.codec != CodecKind::CpuGzip {
58        return Err(CodecError::CodecMismatch {
59            expected: CodecKind::CpuGzip,
60            got: manifest.codec,
61        });
62    }
63    // v0.8.6 #89: pre-allocation guard. Reject `original_size > 5 GiB` and
64    // confirm `compressed_size` matches the actual payload length BEFORE
65    // the `Vec::with_capacity` below — same shape applied to cpu_zstd.
66    let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
67    let limit = manifest.original_size.saturating_add(1024);
68    // v0.8.6 #89: bootstrap-capped initial alloc — see lib.rs
69    // `DECOMPRESS_BOOTSTRAP_CAPACITY` doc.
70    let mut buf = Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
71    let mut decoder = GzDecoder::new(input);
72    {
73        let mut limited = (&mut decoder).take(limit);
74        limited.read_to_end(&mut buf).map_err(CodecError::Io)?;
75    }
76    // v0.8.16 F-1: same dead-code fix as cpu_zstd.rs. Probe via the
77    // inner decoder after dropping the `Take` wrapper so the budget
78    // is unbounded and the peek actually surfaces "decoder had more
79    // bytes available".
80    if buf.len() as u64 > manifest.original_size {
81        let mut peek = [0u8; 1];
82        let more_available = decoder.read(&mut peek).map(|n| n > 0).unwrap_or(false);
83        return Err(CodecError::Io(std::io::Error::other(format!(
84            "gzip decompression bomb detected: produced at least {} bytes \
85             (truncated at cap = manifest.original_size + 1024 = {}); \
86             manifest claimed {}{}",
87            buf.len(),
88            limit,
89            manifest.original_size,
90            if more_available {
91                "; decoder had more bytes available beyond the cap"
92            } else {
93                ""
94            },
95        ))));
96    }
97    if buf.len() as u64 != manifest.original_size {
98        return Err(CodecError::SizeMismatch {
99            expected: manifest.original_size,
100            got: buf.len() as u64,
101        });
102    }
103    let actual_crc = crc32c::crc32c(&buf);
104    if actual_crc != manifest.crc32c {
105        return Err(CodecError::CrcMismatch {
106            expected: manifest.crc32c,
107            got: actual_crc,
108        });
109    }
110    Ok(buf)
111}
112
113/// Sync compress sibling of `decompress_blocking`. Provided for symmetry.
114pub fn compress_blocking(input: &[u8], level: u32) -> Result<(Vec<u8>, ChunkManifest), CodecError> {
115    let level = level.min(9);
116    let original_size = input.len() as u64;
117    let original_crc = crc32c::crc32c(input);
118    let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
119    encoder.write_all(input).map_err(CodecError::Io)?;
120    let compressed = encoder.finish().map_err(CodecError::Io)?;
121    Ok((
122        compressed.clone(),
123        ChunkManifest {
124            codec: CodecKind::CpuGzip,
125            original_size,
126            compressed_size: compressed.len() as u64,
127            crc32c: original_crc,
128        },
129    ))
130}
131
132#[async_trait::async_trait]
133impl Codec for CpuGzip {
134    fn kind(&self) -> CodecKind {
135        CodecKind::CpuGzip
136    }
137
138    async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
139        let level = self.level;
140        let original_size = input.len() as u64;
141        let original_crc = crc32c::crc32c(&input);
142
143        let compressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
144            let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
145            encoder.write_all(input.as_ref())?;
146            encoder.finish()
147        })
148        .await??;
149
150        let manifest = ChunkManifest {
151            codec: CodecKind::CpuGzip,
152            original_size,
153            compressed_size: compressed.len() as u64,
154            crc32c: original_crc,
155        };
156        Ok((Bytes::from(compressed), manifest))
157    }
158
159    async fn decompress(
160        &self,
161        input: Bytes,
162        manifest: &ChunkManifest,
163    ) -> Result<Bytes, CodecError> {
164        if manifest.codec != CodecKind::CpuGzip {
165            return Err(CodecError::CodecMismatch {
166                expected: CodecKind::CpuGzip,
167                got: manifest.codec,
168            });
169        }
170        // v0.8.6 #89: pre-allocation guard — same shape as cpu_zstd. The
171        // CpuZstd OOM (issue #89) was caught by the fuzz farm in seconds;
172        // CpuGzip has the identical `Vec::with_capacity(original_size)`
173        // shape and is just as vulnerable to a forged manifest.
174        let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
175
176        let expected_crc = manifest.crc32c;
177        let expected_orig_size = manifest.original_size;
178
179        let decompressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
180            use std::io::Read;
181            // Decompression-bomb hardening: cap at expected_orig_size + small
182            // margin (same pattern cpu_zstd uses). A malicious sidecar could
183            // claim a tiny original_size while the gzip footer says otherwise;
184            // we trust the manifest and detect inflation past it as bomb.
185            let limit = expected_orig_size.saturating_add(1024);
186            // v0.8.6 #89: bootstrap-capped initial alloc — see lib.rs
187            // `DECOMPRESS_BOOTSTRAP_CAPACITY` doc.
188            let mut buf =
189                Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
190            let mut decoder = GzDecoder::new(input.as_ref());
191            {
192                let mut limited = (&mut decoder).take(limit);
193                limited.read_to_end(&mut buf)?;
194            }
195            // v0.8.16 F-1: mirror the dead-code fix from the free-fn
196            // `decompress_cpu_gzip` + the CpuZstd async path. The
197            // v0.8.15 #144 fix never reached the async `impl Codec`
198            // path, which is the one server-side multipart GET
199            // actually invokes — every operator triage log so far
200            // has been the old "produced N bytes" message because of
201            // that miss.
202            if (buf.len() as u64) > expected_orig_size {
203                let mut peek = [0u8; 1];
204                let more_available = decoder.read(&mut peek).map(|n| n > 0).unwrap_or(false);
205                return Err(std::io::Error::other(format!(
206                    "gzip decompression bomb detected: produced at least {} bytes \
207                     (truncated at cap = manifest.original_size + 1024 = {}); \
208                     manifest claimed {}{}",
209                    buf.len(),
210                    limit,
211                    expected_orig_size,
212                    if more_available {
213                        "; decoder had more bytes available beyond the cap"
214                    } else {
215                        ""
216                    },
217                )));
218            }
219            Ok(buf)
220        })
221        .await??;
222
223        if decompressed.len() as u64 != expected_orig_size {
224            return Err(CodecError::SizeMismatch {
225                expected: expected_orig_size,
226                got: decompressed.len() as u64,
227            });
228        }
229        let actual_crc = crc32c::crc32c(&decompressed);
230        if actual_crc != expected_crc {
231            return Err(CodecError::CrcMismatch {
232                expected: expected_crc,
233                got: actual_crc,
234            });
235        }
236        Ok(Bytes::from(decompressed))
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use std::io::Read;
244
245    #[tokio::test]
246    async fn roundtrip_small() {
247        let codec = CpuGzip::default();
248        let input = Bytes::from_static(b"the quick brown fox jumps over the lazy dog ".as_slice());
249        let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
250        assert_eq!(manifest.codec, CodecKind::CpuGzip);
251        assert_eq!(manifest.original_size, input.len() as u64);
252        let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
253        assert_eq!(decompressed, input);
254    }
255
256    #[tokio::test]
257    async fn roundtrip_compressible() {
258        let codec = CpuGzip::default();
259        let input = Bytes::from(vec![b'x'; 1024 * 1024]);
260        let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
261        // 1 MiB of 'x' bytes should gzip to <1 KiB
262        assert!(
263            compressed.len() < 2048,
264            "expected gzip to compress 1 MiB of x bytes well, got {} bytes",
265            compressed.len()
266        );
267        let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
268        assert_eq!(decompressed, input);
269    }
270
271    /// The whole point of this codec: stock `gunzip` (= flate2's
272    /// GzDecoder, the same library every Linux distro ships) decodes
273    /// the output. This is the wire-compat property the issue requires.
274    #[tokio::test]
275    async fn output_is_decodable_by_stock_gunzip() {
276        let codec = CpuGzip::default();
277        let input = Bytes::from(b"hello squished world\n".repeat(100));
278        let (compressed, _manifest) = codec.compress(input.clone()).await.unwrap();
279
280        // First two bytes must be the gzip magic (0x1f 0x8b) per RFC 1952.
281        assert_eq!(
282            &compressed[..2],
283            &[0x1f, 0x8b],
284            "must start with gzip magic"
285        );
286
287        // Decode with a fresh GzDecoder instance — different code path
288        // from our decompress (which goes via the manifest); represents
289        // what a downstream client / browser / curl would do.
290        let mut buf = Vec::new();
291        flate2::read::GzDecoder::new(compressed.as_ref())
292            .read_to_end(&mut buf)
293            .unwrap();
294        assert_eq!(buf, input.as_ref());
295    }
296
297    #[tokio::test]
298    async fn rejects_codec_mismatch() {
299        let codec = CpuGzip::default();
300        let manifest = ChunkManifest {
301            codec: CodecKind::CpuZstd,
302            original_size: 10,
303            compressed_size: 10,
304            crc32c: 0,
305        };
306        let err = codec
307            .decompress(Bytes::from_static(b"0123456789"), &manifest)
308            .await
309            .unwrap_err();
310        assert!(matches!(err, CodecError::CodecMismatch { .. }));
311    }
312
313    /// v0.8.6 #89 — CpuGzip has the identical
314    /// `Vec::with_capacity(manifest.original_size)` shape as CpuZstd
315    /// did. (1) reject manifests over the 5 GiB ceiling, (2) bootstrap-
316    /// cap the initial alloc so a sub-5-GiB-but-still-huge claim
317    /// (e.g. 4 GiB) doesn't drive the address space.
318    #[tokio::test]
319    async fn issue_89_rejects_manifest_over_5gib() {
320        let codec = CpuGzip::default();
321        let body = Bytes::from_static(&[0x1f, 0x8b]);
322        let manifest = ChunkManifest {
323            codec: CodecKind::CpuGzip,
324            original_size: crate::MAX_DECOMPRESSED_BYTES + 1,
325            compressed_size: body.len() as u64,
326            crc32c: 0,
327        };
328        let err = codec.decompress(body, &manifest).await.unwrap_err();
329        match err {
330            CodecError::ManifestSizeExceedsLimit { requested, limit } => {
331                assert_eq!(requested, crate::MAX_DECOMPRESSED_BYTES + 1);
332                assert_eq!(limit, crate::MAX_DECOMPRESSED_BYTES);
333            }
334            other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
335        }
336    }
337
338    #[tokio::test]
339    async fn issue_89_bootstrap_cap_keeps_4gib_claim_alloc_safe() {
340        let codec = CpuGzip::default();
341        let body = Bytes::from_static(&[0x1f, 0x8b]);
342        let manifest = ChunkManifest {
343            codec: CodecKind::CpuGzip,
344            original_size: u32::MAX as u64,
345            compressed_size: body.len() as u64,
346            crc32c: 0,
347        };
348        let err = codec.decompress(body, &manifest).await.unwrap_err();
349        assert!(
350            matches!(err, CodecError::Io(_) | CodecError::SizeMismatch { .. }),
351            "expected Io or SizeMismatch, got {err:?}"
352        );
353    }
354
355    /// `decompress_blocking` (used by `s4-codec-wasm`) round-trips through
356    /// `compress_blocking`. Output must still start with the gzip magic so
357    /// stock browsers can decode it via `DecompressionStream("gzip")`.
358    #[test]
359    fn blocking_roundtrip_and_gzip_magic() {
360        let input = b"hello squished world\n".repeat(100);
361        let (compressed, manifest) = compress_blocking(&input, CpuGzip::DEFAULT_LEVEL).unwrap();
362        assert_eq!(&compressed[..2], &[0x1f, 0x8b]);
363        let decompressed = decompress_blocking(&compressed, &manifest).unwrap();
364        assert_eq!(decompressed, input);
365    }
366
367    /// v0.8.7 (Codex review LOW) — blocking variants of the issue #89
368    /// regression tests, mirroring the cpu_zstd additions.
369    #[test]
370    fn issue_89_blocking_rejects_manifest_over_5gib() {
371        let body = &[0x1f, 0x8b];
372        let manifest = ChunkManifest {
373            codec: CodecKind::CpuGzip,
374            original_size: crate::MAX_DECOMPRESSED_BYTES + 1,
375            compressed_size: body.len() as u64,
376            crc32c: 0,
377        };
378        let err = decompress_blocking(body, &manifest).unwrap_err();
379        match err {
380            CodecError::ManifestSizeExceedsLimit { requested, limit } => {
381                assert_eq!(requested, crate::MAX_DECOMPRESSED_BYTES + 1);
382                assert_eq!(limit, crate::MAX_DECOMPRESSED_BYTES);
383            }
384            other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
385        }
386    }
387
388    #[test]
389    fn issue_89_blocking_bootstrap_cap_keeps_4gib_claim_alloc_safe() {
390        let body = &[0x1f, 0x8b];
391        let manifest = ChunkManifest {
392            codec: CodecKind::CpuGzip,
393            original_size: u32::MAX as u64,
394            compressed_size: body.len() as u64,
395            crc32c: 0,
396        };
397        let err = decompress_blocking(body, &manifest).unwrap_err();
398        assert!(
399            matches!(err, CodecError::Io(_) | CodecError::SizeMismatch { .. }),
400            "expected Io or SizeMismatch, got {err:?}"
401        );
402    }
403}