Skip to main content

s4_codec/
cpu_gzip.rs

1//! RFC 1952 gzip codec via `flate2` (v0.4 #26).
2//!
3//! Why CPU and not GPU: nvCOMP's GDeflate produces a multi-stream
4//! parallel-decode-friendly format that is **not** a single valid DEFLATE
5//! stream — wrapping it with a gzip header doesn't make stock `gunzip`
6//! decode it. To deliver the actual user-facing value of issue #26 (= "an
7//! S3 object S4 stored that any browser / curl / standard library can
8//! decompress without knowing about S4"), the codec has to emit a real
9//! gzip stream. CPU `flate2` is the right tool.
10//!
11//! Trade-off: no GPU acceleration on this codec. For wire-compat against
12//! gunzip-aware clients use `cpu-gzip`; for raw GPU throughput where
13//! everyone speaks S4 use `nvcomp-zstd` / `nvcomp-bitcomp`.
14//!
15//! Default compression level is 6 — `flate2`'s default and the same level
16//! `gzip(1)` uses out of the box. Range 0..=9 (= flate2::Compression range).
17
18use std::io::{Read, Write};
19
20use bytes::Bytes;
21use flate2::Compression;
22use flate2::read::GzDecoder;
23use flate2::write::GzEncoder;
24
25use crate::{
26    ChunkManifest, Codec, CodecError, CodecKind, DECOMPRESS_BOOTSTRAP_CAPACITY,
27    validate_decompress_manifest,
28};
29
30/// CPU gzip codec (RFC 1952). `level` clamped to 0..=9.
31#[derive(Debug, Clone)]
32pub struct CpuGzip {
33    level: u32,
34}
35
36impl CpuGzip {
37    pub const DEFAULT_LEVEL: u32 = 6;
38
39    pub fn new(level: u32) -> Self {
40        Self {
41            level: level.min(9),
42        }
43    }
44}
45
46impl Default for CpuGzip {
47    fn default() -> Self {
48        Self::new(Self::DEFAULT_LEVEL)
49    }
50}
51
52/// Sync, runtime-free decompress used by `s4-codec-wasm` (browser / WASM has
53/// no tokio runtime). Same checks as the trait implementation: codec/size
54/// match, decompression-bomb cap at `manifest.original_size + 1024`, crc32c
55/// verify after.
56pub fn decompress_blocking(input: &[u8], manifest: &ChunkManifest) -> Result<Vec<u8>, CodecError> {
57    if manifest.codec != CodecKind::CpuGzip {
58        return Err(CodecError::CodecMismatch {
59            expected: CodecKind::CpuGzip,
60            got: manifest.codec,
61        });
62    }
63    // v0.8.6 #89: pre-allocation guard. Reject `original_size > 5 GiB` and
64    // confirm `compressed_size` matches the actual payload length BEFORE
65    // the `Vec::with_capacity` below — same shape applied to cpu_zstd.
66    let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
67    let limit = manifest.original_size.saturating_add(1024);
68    // v0.8.6 #89: bootstrap-capped initial alloc — see lib.rs
69    // `DECOMPRESS_BOOTSTRAP_CAPACITY` doc.
70    let mut buf = Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
71    let mut decoder = GzDecoder::new(input);
72    (&mut decoder)
73        .take(limit)
74        .read_to_end(&mut buf)
75        .map_err(CodecError::Io)?;
76    if (buf.len() as u64) > manifest.original_size {
77        return Err(CodecError::Io(std::io::Error::other(format!(
78            "gzip decompression bomb detected: produced {} bytes, manifest claimed {}",
79            buf.len(),
80            manifest.original_size
81        ))));
82    }
83    if buf.len() as u64 != manifest.original_size {
84        return Err(CodecError::SizeMismatch {
85            expected: manifest.original_size,
86            got: buf.len() as u64,
87        });
88    }
89    let actual_crc = crc32c::crc32c(&buf);
90    if actual_crc != manifest.crc32c {
91        return Err(CodecError::CrcMismatch {
92            expected: manifest.crc32c,
93            got: actual_crc,
94        });
95    }
96    Ok(buf)
97}
98
99/// Sync compress sibling of `decompress_blocking`. Provided for symmetry.
100pub fn compress_blocking(input: &[u8], level: u32) -> Result<(Vec<u8>, ChunkManifest), CodecError> {
101    let level = level.min(9);
102    let original_size = input.len() as u64;
103    let original_crc = crc32c::crc32c(input);
104    let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
105    encoder.write_all(input).map_err(CodecError::Io)?;
106    let compressed = encoder.finish().map_err(CodecError::Io)?;
107    Ok((
108        compressed.clone(),
109        ChunkManifest {
110            codec: CodecKind::CpuGzip,
111            original_size,
112            compressed_size: compressed.len() as u64,
113            crc32c: original_crc,
114        },
115    ))
116}
117
118#[async_trait::async_trait]
119impl Codec for CpuGzip {
120    fn kind(&self) -> CodecKind {
121        CodecKind::CpuGzip
122    }
123
124    async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
125        let level = self.level;
126        let original_size = input.len() as u64;
127        let original_crc = crc32c::crc32c(&input);
128
129        let compressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
130            let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
131            encoder.write_all(input.as_ref())?;
132            encoder.finish()
133        })
134        .await??;
135
136        let manifest = ChunkManifest {
137            codec: CodecKind::CpuGzip,
138            original_size,
139            compressed_size: compressed.len() as u64,
140            crc32c: original_crc,
141        };
142        Ok((Bytes::from(compressed), manifest))
143    }
144
145    async fn decompress(
146        &self,
147        input: Bytes,
148        manifest: &ChunkManifest,
149    ) -> Result<Bytes, CodecError> {
150        if manifest.codec != CodecKind::CpuGzip {
151            return Err(CodecError::CodecMismatch {
152                expected: CodecKind::CpuGzip,
153                got: manifest.codec,
154            });
155        }
156        // v0.8.6 #89: pre-allocation guard — same shape as cpu_zstd. The
157        // CpuZstd OOM (issue #89) was caught by the fuzz farm in seconds;
158        // CpuGzip has the identical `Vec::with_capacity(original_size)`
159        // shape and is just as vulnerable to a forged manifest.
160        let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
161
162        let expected_crc = manifest.crc32c;
163        let expected_orig_size = manifest.original_size;
164
165        let decompressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
166            // Decompression-bomb hardening: cap at expected_orig_size + small
167            // margin (same pattern cpu_zstd uses). A malicious sidecar could
168            // claim a tiny original_size while the gzip footer says otherwise;
169            // we trust the manifest and detect inflation past it as bomb.
170            let limit = expected_orig_size.saturating_add(1024);
171            // v0.8.6 #89: bootstrap-capped initial alloc — see lib.rs
172            // `DECOMPRESS_BOOTSTRAP_CAPACITY` doc.
173            let mut buf =
174                Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
175            let mut decoder = GzDecoder::new(input.as_ref());
176            (&mut decoder).take(limit).read_to_end(&mut buf)?;
177            if (buf.len() as u64) > expected_orig_size {
178                return Err(std::io::Error::other(format!(
179                    "gzip decompression bomb detected: produced {} bytes, manifest claimed {}",
180                    buf.len(),
181                    expected_orig_size
182                )));
183            }
184            Ok(buf)
185        })
186        .await??;
187
188        if decompressed.len() as u64 != expected_orig_size {
189            return Err(CodecError::SizeMismatch {
190                expected: expected_orig_size,
191                got: decompressed.len() as u64,
192            });
193        }
194        let actual_crc = crc32c::crc32c(&decompressed);
195        if actual_crc != expected_crc {
196            return Err(CodecError::CrcMismatch {
197                expected: expected_crc,
198                got: actual_crc,
199            });
200        }
201        Ok(Bytes::from(decompressed))
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use std::io::Read;
209
210    #[tokio::test]
211    async fn roundtrip_small() {
212        let codec = CpuGzip::default();
213        let input = Bytes::from_static(b"the quick brown fox jumps over the lazy dog ".as_slice());
214        let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
215        assert_eq!(manifest.codec, CodecKind::CpuGzip);
216        assert_eq!(manifest.original_size, input.len() as u64);
217        let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
218        assert_eq!(decompressed, input);
219    }
220
221    #[tokio::test]
222    async fn roundtrip_compressible() {
223        let codec = CpuGzip::default();
224        let input = Bytes::from(vec![b'x'; 1024 * 1024]);
225        let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
226        // 1 MiB of 'x' bytes should gzip to <1 KiB
227        assert!(
228            compressed.len() < 2048,
229            "expected gzip to compress 1 MiB of x bytes well, got {} bytes",
230            compressed.len()
231        );
232        let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
233        assert_eq!(decompressed, input);
234    }
235
236    /// The whole point of this codec: stock `gunzip` (= flate2's
237    /// GzDecoder, the same library every Linux distro ships) decodes
238    /// the output. This is the wire-compat property the issue requires.
239    #[tokio::test]
240    async fn output_is_decodable_by_stock_gunzip() {
241        let codec = CpuGzip::default();
242        let input = Bytes::from(b"hello squished world\n".repeat(100));
243        let (compressed, _manifest) = codec.compress(input.clone()).await.unwrap();
244
245        // First two bytes must be the gzip magic (0x1f 0x8b) per RFC 1952.
246        assert_eq!(
247            &compressed[..2],
248            &[0x1f, 0x8b],
249            "must start with gzip magic"
250        );
251
252        // Decode with a fresh GzDecoder instance — different code path
253        // from our decompress (which goes via the manifest); represents
254        // what a downstream client / browser / curl would do.
255        let mut buf = Vec::new();
256        flate2::read::GzDecoder::new(compressed.as_ref())
257            .read_to_end(&mut buf)
258            .unwrap();
259        assert_eq!(buf, input.as_ref());
260    }
261
262    #[tokio::test]
263    async fn rejects_codec_mismatch() {
264        let codec = CpuGzip::default();
265        let manifest = ChunkManifest {
266            codec: CodecKind::CpuZstd,
267            original_size: 10,
268            compressed_size: 10,
269            crc32c: 0,
270        };
271        let err = codec
272            .decompress(Bytes::from_static(b"0123456789"), &manifest)
273            .await
274            .unwrap_err();
275        assert!(matches!(err, CodecError::CodecMismatch { .. }));
276    }
277
278    /// v0.8.6 #89 — CpuGzip has the identical
279    /// `Vec::with_capacity(manifest.original_size)` shape as CpuZstd
280    /// did. (1) reject manifests over the 5 GiB ceiling, (2) bootstrap-
281    /// cap the initial alloc so a sub-5-GiB-but-still-huge claim
282    /// (e.g. 4 GiB) doesn't drive the address space.
283    #[tokio::test]
284    async fn issue_89_rejects_manifest_over_5gib() {
285        let codec = CpuGzip::default();
286        let body = Bytes::from_static(&[0x1f, 0x8b]);
287        let manifest = ChunkManifest {
288            codec: CodecKind::CpuGzip,
289            original_size: crate::MAX_DECOMPRESSED_BYTES + 1,
290            compressed_size: body.len() as u64,
291            crc32c: 0,
292        };
293        let err = codec.decompress(body, &manifest).await.unwrap_err();
294        match err {
295            CodecError::ManifestSizeExceedsLimit { requested, limit } => {
296                assert_eq!(requested, crate::MAX_DECOMPRESSED_BYTES + 1);
297                assert_eq!(limit, crate::MAX_DECOMPRESSED_BYTES);
298            }
299            other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
300        }
301    }
302
303    #[tokio::test]
304    async fn issue_89_bootstrap_cap_keeps_4gib_claim_alloc_safe() {
305        let codec = CpuGzip::default();
306        let body = Bytes::from_static(&[0x1f, 0x8b]);
307        let manifest = ChunkManifest {
308            codec: CodecKind::CpuGzip,
309            original_size: u32::MAX as u64,
310            compressed_size: body.len() as u64,
311            crc32c: 0,
312        };
313        let err = codec.decompress(body, &manifest).await.unwrap_err();
314        assert!(
315            matches!(err, CodecError::Io(_) | CodecError::SizeMismatch { .. }),
316            "expected Io or SizeMismatch, got {err:?}"
317        );
318    }
319
320    /// `decompress_blocking` (used by `s4-codec-wasm`) round-trips through
321    /// `compress_blocking`. Output must still start with the gzip magic so
322    /// stock browsers can decode it via `DecompressionStream("gzip")`.
323    #[test]
324    fn blocking_roundtrip_and_gzip_magic() {
325        let input = b"hello squished world\n".repeat(100);
326        let (compressed, manifest) = compress_blocking(&input, CpuGzip::DEFAULT_LEVEL).unwrap();
327        assert_eq!(&compressed[..2], &[0x1f, 0x8b]);
328        let decompressed = decompress_blocking(&compressed, &manifest).unwrap();
329        assert_eq!(decompressed, input);
330    }
331}