Skip to main content

s4_codec/
cpu_gzip.rs

1//! RFC 1952 gzip codec via `flate2` (v0.4 #26).
2//!
3//! Why CPU and not GPU: nvCOMP's GDeflate produces a multi-stream
4//! parallel-decode-friendly format that is **not** a single valid DEFLATE
5//! stream — wrapping it with a gzip header doesn't make stock `gunzip`
6//! decode it. To deliver the actual user-facing value of issue #26 (= "an
7//! S3 object S4 stored that any browser / curl / standard library can
8//! decompress without knowing about S4"), the codec has to emit a real
9//! gzip stream. CPU `flate2` is the right tool.
10//!
11//! Trade-off: no GPU acceleration on this codec. For wire-compat against
12//! gunzip-aware clients use `cpu-gzip`; for raw GPU throughput where
13//! everyone speaks S4 use `nvcomp-zstd` / `nvcomp-bitcomp`.
14//!
15//! Default compression level is 6 — `flate2`'s default and the same level
16//! `gzip(1)` uses out of the box. Range 0..=9 (= flate2::Compression range).
17
18use std::io::{Read, Write};
19
20use bytes::Bytes;
21use flate2::Compression;
22use flate2::read::GzDecoder;
23use flate2::write::GzEncoder;
24
25use crate::{ChunkManifest, Codec, CodecError, CodecKind};
26
27/// CPU gzip codec (RFC 1952). `level` clamped to 0..=9.
28#[derive(Debug, Clone)]
29pub struct CpuGzip {
30    level: u32,
31}
32
33impl CpuGzip {
34    pub const DEFAULT_LEVEL: u32 = 6;
35
36    pub fn new(level: u32) -> Self {
37        Self {
38            level: level.min(9),
39        }
40    }
41}
42
43impl Default for CpuGzip {
44    fn default() -> Self {
45        Self::new(Self::DEFAULT_LEVEL)
46    }
47}
48
49/// Sync, runtime-free decompress used by `s4-codec-wasm` (browser / WASM has
50/// no tokio runtime). Same checks as the trait implementation: codec/size
51/// match, decompression-bomb cap at `manifest.original_size + 1024`, crc32c
52/// verify after.
53pub fn decompress_blocking(input: &[u8], manifest: &ChunkManifest) -> Result<Vec<u8>, CodecError> {
54    if manifest.codec != CodecKind::CpuGzip {
55        return Err(CodecError::CodecMismatch {
56            expected: CodecKind::CpuGzip,
57            got: manifest.codec,
58        });
59    }
60    if input.len() as u64 != manifest.compressed_size {
61        return Err(CodecError::SizeMismatch {
62            expected: manifest.compressed_size,
63            got: input.len() as u64,
64        });
65    }
66    let limit = manifest.original_size.saturating_add(1024);
67    let mut buf = Vec::with_capacity(manifest.original_size as usize);
68    let mut decoder = GzDecoder::new(input);
69    (&mut decoder)
70        .take(limit)
71        .read_to_end(&mut buf)
72        .map_err(CodecError::Io)?;
73    if (buf.len() as u64) > manifest.original_size {
74        return Err(CodecError::Io(std::io::Error::other(format!(
75            "gzip decompression bomb detected: produced {} bytes, manifest claimed {}",
76            buf.len(),
77            manifest.original_size
78        ))));
79    }
80    if buf.len() as u64 != manifest.original_size {
81        return Err(CodecError::SizeMismatch {
82            expected: manifest.original_size,
83            got: buf.len() as u64,
84        });
85    }
86    let actual_crc = crc32c::crc32c(&buf);
87    if actual_crc != manifest.crc32c {
88        return Err(CodecError::CrcMismatch {
89            expected: manifest.crc32c,
90            got: actual_crc,
91        });
92    }
93    Ok(buf)
94}
95
96/// Sync compress sibling of `decompress_blocking`. Provided for symmetry.
97pub fn compress_blocking(input: &[u8], level: u32) -> Result<(Vec<u8>, ChunkManifest), CodecError> {
98    let level = level.min(9);
99    let original_size = input.len() as u64;
100    let original_crc = crc32c::crc32c(input);
101    let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
102    encoder.write_all(input).map_err(CodecError::Io)?;
103    let compressed = encoder.finish().map_err(CodecError::Io)?;
104    Ok((
105        compressed.clone(),
106        ChunkManifest {
107            codec: CodecKind::CpuGzip,
108            original_size,
109            compressed_size: compressed.len() as u64,
110            crc32c: original_crc,
111        },
112    ))
113}
114
115#[async_trait::async_trait]
116impl Codec for CpuGzip {
117    fn kind(&self) -> CodecKind {
118        CodecKind::CpuGzip
119    }
120
121    async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
122        let level = self.level;
123        let original_size = input.len() as u64;
124        let original_crc = crc32c::crc32c(&input);
125
126        let compressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
127            let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
128            encoder.write_all(input.as_ref())?;
129            encoder.finish()
130        })
131        .await??;
132
133        let manifest = ChunkManifest {
134            codec: CodecKind::CpuGzip,
135            original_size,
136            compressed_size: compressed.len() as u64,
137            crc32c: original_crc,
138        };
139        Ok((Bytes::from(compressed), manifest))
140    }
141
142    async fn decompress(
143        &self,
144        input: Bytes,
145        manifest: &ChunkManifest,
146    ) -> Result<Bytes, CodecError> {
147        if manifest.codec != CodecKind::CpuGzip {
148            return Err(CodecError::CodecMismatch {
149                expected: CodecKind::CpuGzip,
150                got: manifest.codec,
151            });
152        }
153        if input.len() as u64 != manifest.compressed_size {
154            return Err(CodecError::SizeMismatch {
155                expected: manifest.compressed_size,
156                got: input.len() as u64,
157            });
158        }
159        let expected_crc = manifest.crc32c;
160        let expected_orig_size = manifest.original_size;
161
162        let decompressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
163            // Decompression-bomb hardening: cap at expected_orig_size + small
164            // margin (same pattern cpu_zstd uses). A malicious sidecar could
165            // claim a tiny original_size while the gzip footer says otherwise;
166            // we trust the manifest and detect inflation past it as bomb.
167            let limit = expected_orig_size.saturating_add(1024);
168            let mut buf = Vec::with_capacity(expected_orig_size as usize);
169            let mut decoder = GzDecoder::new(input.as_ref());
170            (&mut decoder).take(limit).read_to_end(&mut buf)?;
171            if (buf.len() as u64) > expected_orig_size {
172                return Err(std::io::Error::other(format!(
173                    "gzip decompression bomb detected: produced {} bytes, manifest claimed {}",
174                    buf.len(),
175                    expected_orig_size
176                )));
177            }
178            Ok(buf)
179        })
180        .await??;
181
182        if decompressed.len() as u64 != expected_orig_size {
183            return Err(CodecError::SizeMismatch {
184                expected: expected_orig_size,
185                got: decompressed.len() as u64,
186            });
187        }
188        let actual_crc = crc32c::crc32c(&decompressed);
189        if actual_crc != expected_crc {
190            return Err(CodecError::CrcMismatch {
191                expected: expected_crc,
192                got: actual_crc,
193            });
194        }
195        Ok(Bytes::from(decompressed))
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202    use std::io::Read;
203
204    #[tokio::test]
205    async fn roundtrip_small() {
206        let codec = CpuGzip::default();
207        let input = Bytes::from_static(b"the quick brown fox jumps over the lazy dog ".as_slice());
208        let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
209        assert_eq!(manifest.codec, CodecKind::CpuGzip);
210        assert_eq!(manifest.original_size, input.len() as u64);
211        let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
212        assert_eq!(decompressed, input);
213    }
214
215    #[tokio::test]
216    async fn roundtrip_compressible() {
217        let codec = CpuGzip::default();
218        let input = Bytes::from(vec![b'x'; 1024 * 1024]);
219        let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
220        // 1 MiB of 'x' bytes should gzip to <1 KiB
221        assert!(
222            compressed.len() < 2048,
223            "expected gzip to compress 1 MiB of x bytes well, got {} bytes",
224            compressed.len()
225        );
226        let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
227        assert_eq!(decompressed, input);
228    }
229
230    /// The whole point of this codec: stock `gunzip` (= flate2's
231    /// GzDecoder, the same library every Linux distro ships) decodes
232    /// the output. This is the wire-compat property the issue requires.
233    #[tokio::test]
234    async fn output_is_decodable_by_stock_gunzip() {
235        let codec = CpuGzip::default();
236        let input = Bytes::from(b"hello squished world\n".repeat(100));
237        let (compressed, _manifest) = codec.compress(input.clone()).await.unwrap();
238
239        // First two bytes must be the gzip magic (0x1f 0x8b) per RFC 1952.
240        assert_eq!(
241            &compressed[..2],
242            &[0x1f, 0x8b],
243            "must start with gzip magic"
244        );
245
246        // Decode with a fresh GzDecoder instance — different code path
247        // from our decompress (which goes via the manifest); represents
248        // what a downstream client / browser / curl would do.
249        let mut buf = Vec::new();
250        flate2::read::GzDecoder::new(compressed.as_ref())
251            .read_to_end(&mut buf)
252            .unwrap();
253        assert_eq!(buf, input.as_ref());
254    }
255
256    #[tokio::test]
257    async fn rejects_codec_mismatch() {
258        let codec = CpuGzip::default();
259        let manifest = ChunkManifest {
260            codec: CodecKind::CpuZstd,
261            original_size: 10,
262            compressed_size: 10,
263            crc32c: 0,
264        };
265        let err = codec
266            .decompress(Bytes::from_static(b"0123456789"), &manifest)
267            .await
268            .unwrap_err();
269        assert!(matches!(err, CodecError::CodecMismatch { .. }));
270    }
271
272    /// `decompress_blocking` (used by `s4-codec-wasm`) round-trips through
273    /// `compress_blocking`. Output must still start with the gzip magic so
274    /// stock browsers can decode it via `DecompressionStream("gzip")`.
275    #[test]
276    fn blocking_roundtrip_and_gzip_magic() {
277        let input = b"hello squished world\n".repeat(100);
278        let (compressed, manifest) = compress_blocking(&input, CpuGzip::DEFAULT_LEVEL).unwrap();
279        assert_eq!(&compressed[..2], &[0x1f, 0x8b]);
280        let decompressed = decompress_blocking(&compressed, &manifest).unwrap();
281        assert_eq!(decompressed, input);
282    }
283}