Skip to main content

s4_codec/
cpu_gzip.rs

1//! RFC 1952 gzip codec via `flate2` (v0.4 #26).
2//!
3//! Why CPU and not GPU: nvCOMP's GDeflate produces a multi-stream
4//! parallel-decode-friendly format that is **not** a single valid DEFLATE
5//! stream — wrapping it with a gzip header doesn't make stock `gunzip`
6//! decode it. To deliver the actual user-facing value of issue #26 (= "an
7//! S3 object S4 stored that any browser / curl / standard library can
8//! decompress without knowing about S4"), the codec has to emit a real
9//! gzip stream. CPU `flate2` is the right tool.
10//!
11//! Trade-off: no GPU acceleration on this codec. For wire-compat against
12//! gunzip-aware clients use `cpu-gzip`; for raw GPU throughput where
13//! everyone speaks S4 use `nvcomp-zstd` / `nvcomp-bitcomp`.
14//!
15//! Default compression level is 6 — `flate2`'s default and the same level
16//! `gzip(1)` uses out of the box. Range 0..=9 (= flate2::Compression range).
17
18use std::io::{Read, Write};
19
20use bytes::Bytes;
21use flate2::Compression;
22use flate2::read::GzDecoder;
23use flate2::write::GzEncoder;
24
25use crate::{
26    ChunkManifest, Codec, CodecError, CodecKind, DECOMPRESS_BOOTSTRAP_CAPACITY,
27    validate_decompress_manifest,
28};
29
30/// CPU gzip codec (RFC 1952). `level` clamped to 0..=9.
31#[derive(Debug, Clone)]
32pub struct CpuGzip {
33    level: u32,
34}
35
36impl CpuGzip {
37    pub const DEFAULT_LEVEL: u32 = 6;
38
39    pub fn new(level: u32) -> Self {
40        Self {
41            level: level.min(9),
42        }
43    }
44}
45
46impl Default for CpuGzip {
47    fn default() -> Self {
48        Self::new(Self::DEFAULT_LEVEL)
49    }
50}
51
52/// Sync, runtime-free decompress used by `s4-codec-wasm` (browser / WASM has
53/// no tokio runtime). Same checks as the trait implementation: codec/size
54/// match, decompression-bomb cap at `manifest.original_size + 1024`, crc32c
55/// verify after.
56pub fn decompress_blocking(input: &[u8], manifest: &ChunkManifest) -> Result<Vec<u8>, CodecError> {
57    if manifest.codec != CodecKind::CpuGzip {
58        return Err(CodecError::CodecMismatch {
59            expected: CodecKind::CpuGzip,
60            got: manifest.codec,
61        });
62    }
63    // v0.8.6 #89: pre-allocation guard. Reject `original_size > 5 GiB` and
64    // confirm `compressed_size` matches the actual payload length BEFORE
65    // the `Vec::with_capacity` below — same shape applied to cpu_zstd.
66    let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
67    let limit = manifest.original_size.saturating_add(1024);
68    // v0.8.6 #89: bootstrap-capped initial alloc — see lib.rs
69    // `DECOMPRESS_BOOTSTRAP_CAPACITY` doc.
70    let mut buf = Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
71    let mut decoder = GzDecoder::new(input);
72    {
73        let mut limited = (&mut decoder).take(limit);
74        limited.read_to_end(&mut buf).map_err(CodecError::Io)?;
75        // v0.8.15 M-9: see cpu_zstd.rs for the rationale. Probe one
76        // byte past `limit` to distinguish a truncated bomb from a
77        // legitimate overshoot inside the 1024-byte zstd buffer
78        // flush window the cap was tuned for.
79        if buf.len() as u64 > manifest.original_size {
80            let mut peek = [0u8; 1];
81            let more_available = limited.read(&mut peek).map(|n| n > 0).unwrap_or(false);
82            return Err(CodecError::Io(std::io::Error::other(format!(
83                "gzip decompression bomb detected: produced at least {} bytes \
84                 (truncated at cap = manifest.original_size + 1024 = {}); \
85                 manifest claimed {}{}",
86                buf.len(),
87                limit,
88                manifest.original_size,
89                if more_available {
90                    "; decoder had more bytes available beyond the cap"
91                } else {
92                    ""
93                },
94            ))));
95        }
96    }
97    if buf.len() as u64 != manifest.original_size {
98        return Err(CodecError::SizeMismatch {
99            expected: manifest.original_size,
100            got: buf.len() as u64,
101        });
102    }
103    let actual_crc = crc32c::crc32c(&buf);
104    if actual_crc != manifest.crc32c {
105        return Err(CodecError::CrcMismatch {
106            expected: manifest.crc32c,
107            got: actual_crc,
108        });
109    }
110    Ok(buf)
111}
112
113/// Sync compress sibling of `decompress_blocking`. Provided for symmetry.
114pub fn compress_blocking(input: &[u8], level: u32) -> Result<(Vec<u8>, ChunkManifest), CodecError> {
115    let level = level.min(9);
116    let original_size = input.len() as u64;
117    let original_crc = crc32c::crc32c(input);
118    let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
119    encoder.write_all(input).map_err(CodecError::Io)?;
120    let compressed = encoder.finish().map_err(CodecError::Io)?;
121    Ok((
122        compressed.clone(),
123        ChunkManifest {
124            codec: CodecKind::CpuGzip,
125            original_size,
126            compressed_size: compressed.len() as u64,
127            crc32c: original_crc,
128        },
129    ))
130}
131
132#[async_trait::async_trait]
133impl Codec for CpuGzip {
134    fn kind(&self) -> CodecKind {
135        CodecKind::CpuGzip
136    }
137
138    async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
139        let level = self.level;
140        let original_size = input.len() as u64;
141        let original_crc = crc32c::crc32c(&input);
142
143        let compressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
144            let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
145            encoder.write_all(input.as_ref())?;
146            encoder.finish()
147        })
148        .await??;
149
150        let manifest = ChunkManifest {
151            codec: CodecKind::CpuGzip,
152            original_size,
153            compressed_size: compressed.len() as u64,
154            crc32c: original_crc,
155        };
156        Ok((Bytes::from(compressed), manifest))
157    }
158
159    async fn decompress(
160        &self,
161        input: Bytes,
162        manifest: &ChunkManifest,
163    ) -> Result<Bytes, CodecError> {
164        if manifest.codec != CodecKind::CpuGzip {
165            return Err(CodecError::CodecMismatch {
166                expected: CodecKind::CpuGzip,
167                got: manifest.codec,
168            });
169        }
170        // v0.8.6 #89: pre-allocation guard — same shape as cpu_zstd. The
171        // CpuZstd OOM (issue #89) was caught by the fuzz farm in seconds;
172        // CpuGzip has the identical `Vec::with_capacity(original_size)`
173        // shape and is just as vulnerable to a forged manifest.
174        let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
175
176        let expected_crc = manifest.crc32c;
177        let expected_orig_size = manifest.original_size;
178
179        let decompressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
180            // Decompression-bomb hardening: cap at expected_orig_size + small
181            // margin (same pattern cpu_zstd uses). A malicious sidecar could
182            // claim a tiny original_size while the gzip footer says otherwise;
183            // we trust the manifest and detect inflation past it as bomb.
184            let limit = expected_orig_size.saturating_add(1024);
185            // v0.8.6 #89: bootstrap-capped initial alloc — see lib.rs
186            // `DECOMPRESS_BOOTSTRAP_CAPACITY` doc.
187            let mut buf =
188                Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
189            let mut decoder = GzDecoder::new(input.as_ref());
190            (&mut decoder).take(limit).read_to_end(&mut buf)?;
191            if (buf.len() as u64) > expected_orig_size {
192                return Err(std::io::Error::other(format!(
193                    "gzip decompression bomb detected: produced {} bytes, manifest claimed {}",
194                    buf.len(),
195                    expected_orig_size
196                )));
197            }
198            Ok(buf)
199        })
200        .await??;
201
202        if decompressed.len() as u64 != expected_orig_size {
203            return Err(CodecError::SizeMismatch {
204                expected: expected_orig_size,
205                got: decompressed.len() as u64,
206            });
207        }
208        let actual_crc = crc32c::crc32c(&decompressed);
209        if actual_crc != expected_crc {
210            return Err(CodecError::CrcMismatch {
211                expected: expected_crc,
212                got: actual_crc,
213            });
214        }
215        Ok(Bytes::from(decompressed))
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222    use std::io::Read;
223
224    #[tokio::test]
225    async fn roundtrip_small() {
226        let codec = CpuGzip::default();
227        let input = Bytes::from_static(b"the quick brown fox jumps over the lazy dog ".as_slice());
228        let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
229        assert_eq!(manifest.codec, CodecKind::CpuGzip);
230        assert_eq!(manifest.original_size, input.len() as u64);
231        let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
232        assert_eq!(decompressed, input);
233    }
234
235    #[tokio::test]
236    async fn roundtrip_compressible() {
237        let codec = CpuGzip::default();
238        let input = Bytes::from(vec![b'x'; 1024 * 1024]);
239        let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
240        // 1 MiB of 'x' bytes should gzip to <1 KiB
241        assert!(
242            compressed.len() < 2048,
243            "expected gzip to compress 1 MiB of x bytes well, got {} bytes",
244            compressed.len()
245        );
246        let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
247        assert_eq!(decompressed, input);
248    }
249
250    /// The whole point of this codec: stock `gunzip` (= flate2's
251    /// GzDecoder, the same library every Linux distro ships) decodes
252    /// the output. This is the wire-compat property the issue requires.
253    #[tokio::test]
254    async fn output_is_decodable_by_stock_gunzip() {
255        let codec = CpuGzip::default();
256        let input = Bytes::from(b"hello squished world\n".repeat(100));
257        let (compressed, _manifest) = codec.compress(input.clone()).await.unwrap();
258
259        // First two bytes must be the gzip magic (0x1f 0x8b) per RFC 1952.
260        assert_eq!(
261            &compressed[..2],
262            &[0x1f, 0x8b],
263            "must start with gzip magic"
264        );
265
266        // Decode with a fresh GzDecoder instance — different code path
267        // from our decompress (which goes via the manifest); represents
268        // what a downstream client / browser / curl would do.
269        let mut buf = Vec::new();
270        flate2::read::GzDecoder::new(compressed.as_ref())
271            .read_to_end(&mut buf)
272            .unwrap();
273        assert_eq!(buf, input.as_ref());
274    }
275
276    #[tokio::test]
277    async fn rejects_codec_mismatch() {
278        let codec = CpuGzip::default();
279        let manifest = ChunkManifest {
280            codec: CodecKind::CpuZstd,
281            original_size: 10,
282            compressed_size: 10,
283            crc32c: 0,
284        };
285        let err = codec
286            .decompress(Bytes::from_static(b"0123456789"), &manifest)
287            .await
288            .unwrap_err();
289        assert!(matches!(err, CodecError::CodecMismatch { .. }));
290    }
291
292    /// v0.8.6 #89 — CpuGzip has the identical
293    /// `Vec::with_capacity(manifest.original_size)` shape as CpuZstd
294    /// did. (1) reject manifests over the 5 GiB ceiling, (2) bootstrap-
295    /// cap the initial alloc so a sub-5-GiB-but-still-huge claim
296    /// (e.g. 4 GiB) doesn't drive the address space.
297    #[tokio::test]
298    async fn issue_89_rejects_manifest_over_5gib() {
299        let codec = CpuGzip::default();
300        let body = Bytes::from_static(&[0x1f, 0x8b]);
301        let manifest = ChunkManifest {
302            codec: CodecKind::CpuGzip,
303            original_size: crate::MAX_DECOMPRESSED_BYTES + 1,
304            compressed_size: body.len() as u64,
305            crc32c: 0,
306        };
307        let err = codec.decompress(body, &manifest).await.unwrap_err();
308        match err {
309            CodecError::ManifestSizeExceedsLimit { requested, limit } => {
310                assert_eq!(requested, crate::MAX_DECOMPRESSED_BYTES + 1);
311                assert_eq!(limit, crate::MAX_DECOMPRESSED_BYTES);
312            }
313            other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
314        }
315    }
316
317    #[tokio::test]
318    async fn issue_89_bootstrap_cap_keeps_4gib_claim_alloc_safe() {
319        let codec = CpuGzip::default();
320        let body = Bytes::from_static(&[0x1f, 0x8b]);
321        let manifest = ChunkManifest {
322            codec: CodecKind::CpuGzip,
323            original_size: u32::MAX as u64,
324            compressed_size: body.len() as u64,
325            crc32c: 0,
326        };
327        let err = codec.decompress(body, &manifest).await.unwrap_err();
328        assert!(
329            matches!(err, CodecError::Io(_) | CodecError::SizeMismatch { .. }),
330            "expected Io or SizeMismatch, got {err:?}"
331        );
332    }
333
334    /// `decompress_blocking` (used by `s4-codec-wasm`) round-trips through
335    /// `compress_blocking`. Output must still start with the gzip magic so
336    /// stock browsers can decode it via `DecompressionStream("gzip")`.
337    #[test]
338    fn blocking_roundtrip_and_gzip_magic() {
339        let input = b"hello squished world\n".repeat(100);
340        let (compressed, manifest) = compress_blocking(&input, CpuGzip::DEFAULT_LEVEL).unwrap();
341        assert_eq!(&compressed[..2], &[0x1f, 0x8b]);
342        let decompressed = decompress_blocking(&compressed, &manifest).unwrap();
343        assert_eq!(decompressed, input);
344    }
345
346    /// v0.8.7 (Codex review LOW) — blocking variants of the issue #89
347    /// regression tests, mirroring the cpu_zstd additions.
348    #[test]
349    fn issue_89_blocking_rejects_manifest_over_5gib() {
350        let body = &[0x1f, 0x8b];
351        let manifest = ChunkManifest {
352            codec: CodecKind::CpuGzip,
353            original_size: crate::MAX_DECOMPRESSED_BYTES + 1,
354            compressed_size: body.len() as u64,
355            crc32c: 0,
356        };
357        let err = decompress_blocking(body, &manifest).unwrap_err();
358        match err {
359            CodecError::ManifestSizeExceedsLimit { requested, limit } => {
360                assert_eq!(requested, crate::MAX_DECOMPRESSED_BYTES + 1);
361                assert_eq!(limit, crate::MAX_DECOMPRESSED_BYTES);
362            }
363            other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
364        }
365    }
366
367    #[test]
368    fn issue_89_blocking_bootstrap_cap_keeps_4gib_claim_alloc_safe() {
369        let body = &[0x1f, 0x8b];
370        let manifest = ChunkManifest {
371            codec: CodecKind::CpuGzip,
372            original_size: u32::MAX as u64,
373            compressed_size: body.len() as u64,
374            crc32c: 0,
375        };
376        let err = decompress_blocking(body, &manifest).unwrap_err();
377        assert!(
378            matches!(err, CodecError::Io(_) | CodecError::SizeMismatch { .. }),
379            "expected Io or SizeMismatch, got {err:?}"
380        );
381    }
382}