1use std::io::{Read, Write};
19
20use bytes::Bytes;
21use flate2::Compression;
22use flate2::read::GzDecoder;
23use flate2::write::GzEncoder;
24
25use crate::{
26 ChunkManifest, Codec, CodecError, CodecKind, DECOMPRESS_BOOTSTRAP_CAPACITY,
27 validate_decompress_manifest,
28};
29
30#[derive(Debug, Clone)]
32pub struct CpuGzip {
33 level: u32,
34}
35
36impl CpuGzip {
37 pub const DEFAULT_LEVEL: u32 = 6;
38
39 pub fn new(level: u32) -> Self {
40 Self {
41 level: level.min(9),
42 }
43 }
44}
45
46impl Default for CpuGzip {
47 fn default() -> Self {
48 Self::new(Self::DEFAULT_LEVEL)
49 }
50}
51
52pub fn decompress_blocking(input: &[u8], manifest: &ChunkManifest) -> Result<Vec<u8>, CodecError> {
57 if manifest.codec != CodecKind::CpuGzip {
58 return Err(CodecError::CodecMismatch {
59 expected: CodecKind::CpuGzip,
60 got: manifest.codec,
61 });
62 }
63 let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
67 let limit = manifest.original_size.saturating_add(1024);
68 let mut buf = Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
71 let mut decoder = GzDecoder::new(input);
72 {
73 let mut limited = (&mut decoder).take(limit);
74 limited.read_to_end(&mut buf).map_err(CodecError::Io)?;
75 if buf.len() as u64 > manifest.original_size {
80 let mut peek = [0u8; 1];
81 let more_available = limited.read(&mut peek).map(|n| n > 0).unwrap_or(false);
82 return Err(CodecError::Io(std::io::Error::other(format!(
83 "gzip decompression bomb detected: produced at least {} bytes \
84 (truncated at cap = manifest.original_size + 1024 = {}); \
85 manifest claimed {}{}",
86 buf.len(),
87 limit,
88 manifest.original_size,
89 if more_available {
90 "; decoder had more bytes available beyond the cap"
91 } else {
92 ""
93 },
94 ))));
95 }
96 }
97 if buf.len() as u64 != manifest.original_size {
98 return Err(CodecError::SizeMismatch {
99 expected: manifest.original_size,
100 got: buf.len() as u64,
101 });
102 }
103 let actual_crc = crc32c::crc32c(&buf);
104 if actual_crc != manifest.crc32c {
105 return Err(CodecError::CrcMismatch {
106 expected: manifest.crc32c,
107 got: actual_crc,
108 });
109 }
110 Ok(buf)
111}
112
113pub fn compress_blocking(input: &[u8], level: u32) -> Result<(Vec<u8>, ChunkManifest), CodecError> {
115 let level = level.min(9);
116 let original_size = input.len() as u64;
117 let original_crc = crc32c::crc32c(input);
118 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
119 encoder.write_all(input).map_err(CodecError::Io)?;
120 let compressed = encoder.finish().map_err(CodecError::Io)?;
121 Ok((
122 compressed.clone(),
123 ChunkManifest {
124 codec: CodecKind::CpuGzip,
125 original_size,
126 compressed_size: compressed.len() as u64,
127 crc32c: original_crc,
128 },
129 ))
130}
131
132#[async_trait::async_trait]
133impl Codec for CpuGzip {
134 fn kind(&self) -> CodecKind {
135 CodecKind::CpuGzip
136 }
137
138 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
139 let level = self.level;
140 let original_size = input.len() as u64;
141 let original_crc = crc32c::crc32c(&input);
142
143 let compressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
144 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
145 encoder.write_all(input.as_ref())?;
146 encoder.finish()
147 })
148 .await??;
149
150 let manifest = ChunkManifest {
151 codec: CodecKind::CpuGzip,
152 original_size,
153 compressed_size: compressed.len() as u64,
154 crc32c: original_crc,
155 };
156 Ok((Bytes::from(compressed), manifest))
157 }
158
159 async fn decompress(
160 &self,
161 input: Bytes,
162 manifest: &ChunkManifest,
163 ) -> Result<Bytes, CodecError> {
164 if manifest.codec != CodecKind::CpuGzip {
165 return Err(CodecError::CodecMismatch {
166 expected: CodecKind::CpuGzip,
167 got: manifest.codec,
168 });
169 }
170 let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
175
176 let expected_crc = manifest.crc32c;
177 let expected_orig_size = manifest.original_size;
178
179 let decompressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
180 let limit = expected_orig_size.saturating_add(1024);
185 let mut buf =
188 Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
189 let mut decoder = GzDecoder::new(input.as_ref());
190 (&mut decoder).take(limit).read_to_end(&mut buf)?;
191 if (buf.len() as u64) > expected_orig_size {
192 return Err(std::io::Error::other(format!(
193 "gzip decompression bomb detected: produced {} bytes, manifest claimed {}",
194 buf.len(),
195 expected_orig_size
196 )));
197 }
198 Ok(buf)
199 })
200 .await??;
201
202 if decompressed.len() as u64 != expected_orig_size {
203 return Err(CodecError::SizeMismatch {
204 expected: expected_orig_size,
205 got: decompressed.len() as u64,
206 });
207 }
208 let actual_crc = crc32c::crc32c(&decompressed);
209 if actual_crc != expected_crc {
210 return Err(CodecError::CrcMismatch {
211 expected: expected_crc,
212 got: actual_crc,
213 });
214 }
215 Ok(Bytes::from(decompressed))
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222 use std::io::Read;
223
224 #[tokio::test]
225 async fn roundtrip_small() {
226 let codec = CpuGzip::default();
227 let input = Bytes::from_static(b"the quick brown fox jumps over the lazy dog ".as_slice());
228 let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
229 assert_eq!(manifest.codec, CodecKind::CpuGzip);
230 assert_eq!(manifest.original_size, input.len() as u64);
231 let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
232 assert_eq!(decompressed, input);
233 }
234
235 #[tokio::test]
236 async fn roundtrip_compressible() {
237 let codec = CpuGzip::default();
238 let input = Bytes::from(vec![b'x'; 1024 * 1024]);
239 let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
240 assert!(
242 compressed.len() < 2048,
243 "expected gzip to compress 1 MiB of x bytes well, got {} bytes",
244 compressed.len()
245 );
246 let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
247 assert_eq!(decompressed, input);
248 }
249
250 #[tokio::test]
254 async fn output_is_decodable_by_stock_gunzip() {
255 let codec = CpuGzip::default();
256 let input = Bytes::from(b"hello squished world\n".repeat(100));
257 let (compressed, _manifest) = codec.compress(input.clone()).await.unwrap();
258
259 assert_eq!(
261 &compressed[..2],
262 &[0x1f, 0x8b],
263 "must start with gzip magic"
264 );
265
266 let mut buf = Vec::new();
270 flate2::read::GzDecoder::new(compressed.as_ref())
271 .read_to_end(&mut buf)
272 .unwrap();
273 assert_eq!(buf, input.as_ref());
274 }
275
276 #[tokio::test]
277 async fn rejects_codec_mismatch() {
278 let codec = CpuGzip::default();
279 let manifest = ChunkManifest {
280 codec: CodecKind::CpuZstd,
281 original_size: 10,
282 compressed_size: 10,
283 crc32c: 0,
284 };
285 let err = codec
286 .decompress(Bytes::from_static(b"0123456789"), &manifest)
287 .await
288 .unwrap_err();
289 assert!(matches!(err, CodecError::CodecMismatch { .. }));
290 }
291
292 #[tokio::test]
298 async fn issue_89_rejects_manifest_over_5gib() {
299 let codec = CpuGzip::default();
300 let body = Bytes::from_static(&[0x1f, 0x8b]);
301 let manifest = ChunkManifest {
302 codec: CodecKind::CpuGzip,
303 original_size: crate::MAX_DECOMPRESSED_BYTES + 1,
304 compressed_size: body.len() as u64,
305 crc32c: 0,
306 };
307 let err = codec.decompress(body, &manifest).await.unwrap_err();
308 match err {
309 CodecError::ManifestSizeExceedsLimit { requested, limit } => {
310 assert_eq!(requested, crate::MAX_DECOMPRESSED_BYTES + 1);
311 assert_eq!(limit, crate::MAX_DECOMPRESSED_BYTES);
312 }
313 other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
314 }
315 }
316
317 #[tokio::test]
318 async fn issue_89_bootstrap_cap_keeps_4gib_claim_alloc_safe() {
319 let codec = CpuGzip::default();
320 let body = Bytes::from_static(&[0x1f, 0x8b]);
321 let manifest = ChunkManifest {
322 codec: CodecKind::CpuGzip,
323 original_size: u32::MAX as u64,
324 compressed_size: body.len() as u64,
325 crc32c: 0,
326 };
327 let err = codec.decompress(body, &manifest).await.unwrap_err();
328 assert!(
329 matches!(err, CodecError::Io(_) | CodecError::SizeMismatch { .. }),
330 "expected Io or SizeMismatch, got {err:?}"
331 );
332 }
333
334 #[test]
338 fn blocking_roundtrip_and_gzip_magic() {
339 let input = b"hello squished world\n".repeat(100);
340 let (compressed, manifest) = compress_blocking(&input, CpuGzip::DEFAULT_LEVEL).unwrap();
341 assert_eq!(&compressed[..2], &[0x1f, 0x8b]);
342 let decompressed = decompress_blocking(&compressed, &manifest).unwrap();
343 assert_eq!(decompressed, input);
344 }
345
346 #[test]
349 fn issue_89_blocking_rejects_manifest_over_5gib() {
350 let body = &[0x1f, 0x8b];
351 let manifest = ChunkManifest {
352 codec: CodecKind::CpuGzip,
353 original_size: crate::MAX_DECOMPRESSED_BYTES + 1,
354 compressed_size: body.len() as u64,
355 crc32c: 0,
356 };
357 let err = decompress_blocking(body, &manifest).unwrap_err();
358 match err {
359 CodecError::ManifestSizeExceedsLimit { requested, limit } => {
360 assert_eq!(requested, crate::MAX_DECOMPRESSED_BYTES + 1);
361 assert_eq!(limit, crate::MAX_DECOMPRESSED_BYTES);
362 }
363 other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
364 }
365 }
366
367 #[test]
368 fn issue_89_blocking_bootstrap_cap_keeps_4gib_claim_alloc_safe() {
369 let body = &[0x1f, 0x8b];
370 let manifest = ChunkManifest {
371 codec: CodecKind::CpuGzip,
372 original_size: u32::MAX as u64,
373 compressed_size: body.len() as u64,
374 crc32c: 0,
375 };
376 let err = decompress_blocking(body, &manifest).unwrap_err();
377 assert!(
378 matches!(err, CodecError::Io(_) | CodecError::SizeMismatch { .. }),
379 "expected Io or SizeMismatch, got {err:?}"
380 );
381 }
382}