1use std::io::{Read, Write};
19
20use bytes::Bytes;
21use flate2::Compression;
22use flate2::read::GzDecoder;
23use flate2::write::GzEncoder;
24
25use crate::{
26 ChunkManifest, Codec, CodecError, CodecKind, DECOMPRESS_BOOTSTRAP_CAPACITY,
27 validate_decompress_manifest,
28};
29
30#[derive(Debug, Clone)]
32pub struct CpuGzip {
33 level: u32,
34}
35
36impl CpuGzip {
37 pub const DEFAULT_LEVEL: u32 = 6;
38
39 pub fn new(level: u32) -> Self {
40 Self {
41 level: level.min(9),
42 }
43 }
44}
45
46impl Default for CpuGzip {
47 fn default() -> Self {
48 Self::new(Self::DEFAULT_LEVEL)
49 }
50}
51
52pub fn decompress_blocking(input: &[u8], manifest: &ChunkManifest) -> Result<Vec<u8>, CodecError> {
57 if manifest.codec != CodecKind::CpuGzip {
58 return Err(CodecError::CodecMismatch {
59 expected: CodecKind::CpuGzip,
60 got: manifest.codec,
61 });
62 }
63 let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
67 let limit = manifest.original_size.saturating_add(1024);
68 let mut buf = Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
71 let mut decoder = GzDecoder::new(input);
72 {
73 let mut limited = (&mut decoder).take(limit);
74 limited.read_to_end(&mut buf).map_err(CodecError::Io)?;
75 }
76 if buf.len() as u64 > manifest.original_size {
81 let mut peek = [0u8; 1];
82 let more_available = decoder.read(&mut peek).map(|n| n > 0).unwrap_or(false);
83 return Err(CodecError::Io(std::io::Error::other(format!(
84 "gzip decompression bomb detected: produced at least {} bytes \
85 (truncated at cap = manifest.original_size + 1024 = {}); \
86 manifest claimed {}{}",
87 buf.len(),
88 limit,
89 manifest.original_size,
90 if more_available {
91 "; decoder had more bytes available beyond the cap"
92 } else {
93 ""
94 },
95 ))));
96 }
97 if buf.len() as u64 != manifest.original_size {
98 return Err(CodecError::SizeMismatch {
99 expected: manifest.original_size,
100 got: buf.len() as u64,
101 });
102 }
103 let actual_crc = crc32c::crc32c(&buf);
104 if actual_crc != manifest.crc32c {
105 return Err(CodecError::CrcMismatch {
106 expected: manifest.crc32c,
107 got: actual_crc,
108 });
109 }
110 Ok(buf)
111}
112
113pub fn compress_blocking(input: &[u8], level: u32) -> Result<(Vec<u8>, ChunkManifest), CodecError> {
115 let level = level.min(9);
116 let original_size = input.len() as u64;
117 let original_crc = crc32c::crc32c(input);
118 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
119 encoder.write_all(input).map_err(CodecError::Io)?;
120 let compressed = encoder.finish().map_err(CodecError::Io)?;
121 Ok((
122 compressed.clone(),
123 ChunkManifest {
124 codec: CodecKind::CpuGzip,
125 original_size,
126 compressed_size: compressed.len() as u64,
127 crc32c: original_crc,
128 },
129 ))
130}
131
132#[async_trait::async_trait]
133impl Codec for CpuGzip {
134 fn kind(&self) -> CodecKind {
135 CodecKind::CpuGzip
136 }
137
138 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
139 let level = self.level;
140 let original_size = input.len() as u64;
141 let original_crc = crc32c::crc32c(&input);
142
143 let compressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
144 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
145 encoder.write_all(input.as_ref())?;
146 encoder.finish()
147 })
148 .await??;
149
150 let manifest = ChunkManifest {
151 codec: CodecKind::CpuGzip,
152 original_size,
153 compressed_size: compressed.len() as u64,
154 crc32c: original_crc,
155 };
156 Ok((Bytes::from(compressed), manifest))
157 }
158
159 async fn decompress(
160 &self,
161 input: Bytes,
162 manifest: &ChunkManifest,
163 ) -> Result<Bytes, CodecError> {
164 if manifest.codec != CodecKind::CpuGzip {
165 return Err(CodecError::CodecMismatch {
166 expected: CodecKind::CpuGzip,
167 got: manifest.codec,
168 });
169 }
170 let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
175
176 let expected_crc = manifest.crc32c;
177 let expected_orig_size = manifest.original_size;
178
179 let decompressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
180 use std::io::Read;
181 let limit = expected_orig_size.saturating_add(1024);
186 let mut buf =
189 Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
190 let mut decoder = GzDecoder::new(input.as_ref());
191 {
192 let mut limited = (&mut decoder).take(limit);
193 limited.read_to_end(&mut buf)?;
194 }
195 if (buf.len() as u64) > expected_orig_size {
203 let mut peek = [0u8; 1];
204 let more_available = decoder.read(&mut peek).map(|n| n > 0).unwrap_or(false);
205 return Err(std::io::Error::other(format!(
206 "gzip decompression bomb detected: produced at least {} bytes \
207 (truncated at cap = manifest.original_size + 1024 = {}); \
208 manifest claimed {}{}",
209 buf.len(),
210 limit,
211 expected_orig_size,
212 if more_available {
213 "; decoder had more bytes available beyond the cap"
214 } else {
215 ""
216 },
217 )));
218 }
219 Ok(buf)
220 })
221 .await??;
222
223 if decompressed.len() as u64 != expected_orig_size {
224 return Err(CodecError::SizeMismatch {
225 expected: expected_orig_size,
226 got: decompressed.len() as u64,
227 });
228 }
229 let actual_crc = crc32c::crc32c(&decompressed);
230 if actual_crc != expected_crc {
231 return Err(CodecError::CrcMismatch {
232 expected: expected_crc,
233 got: actual_crc,
234 });
235 }
236 Ok(Bytes::from(decompressed))
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use std::io::Read;
244
245 #[tokio::test]
246 async fn roundtrip_small() {
247 let codec = CpuGzip::default();
248 let input = Bytes::from_static(b"the quick brown fox jumps over the lazy dog ".as_slice());
249 let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
250 assert_eq!(manifest.codec, CodecKind::CpuGzip);
251 assert_eq!(manifest.original_size, input.len() as u64);
252 let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
253 assert_eq!(decompressed, input);
254 }
255
256 #[tokio::test]
257 async fn roundtrip_compressible() {
258 let codec = CpuGzip::default();
259 let input = Bytes::from(vec![b'x'; 1024 * 1024]);
260 let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
261 assert!(
263 compressed.len() < 2048,
264 "expected gzip to compress 1 MiB of x bytes well, got {} bytes",
265 compressed.len()
266 );
267 let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
268 assert_eq!(decompressed, input);
269 }
270
271 #[tokio::test]
275 async fn output_is_decodable_by_stock_gunzip() {
276 let codec = CpuGzip::default();
277 let input = Bytes::from(b"hello squished world\n".repeat(100));
278 let (compressed, _manifest) = codec.compress(input.clone()).await.unwrap();
279
280 assert_eq!(
282 &compressed[..2],
283 &[0x1f, 0x8b],
284 "must start with gzip magic"
285 );
286
287 let mut buf = Vec::new();
291 flate2::read::GzDecoder::new(compressed.as_ref())
292 .read_to_end(&mut buf)
293 .unwrap();
294 assert_eq!(buf, input.as_ref());
295 }
296
297 #[tokio::test]
298 async fn rejects_codec_mismatch() {
299 let codec = CpuGzip::default();
300 let manifest = ChunkManifest {
301 codec: CodecKind::CpuZstd,
302 original_size: 10,
303 compressed_size: 10,
304 crc32c: 0,
305 };
306 let err = codec
307 .decompress(Bytes::from_static(b"0123456789"), &manifest)
308 .await
309 .unwrap_err();
310 assert!(matches!(err, CodecError::CodecMismatch { .. }));
311 }
312
313 #[tokio::test]
319 async fn issue_89_rejects_manifest_over_5gib() {
320 let codec = CpuGzip::default();
321 let body = Bytes::from_static(&[0x1f, 0x8b]);
322 let manifest = ChunkManifest {
323 codec: CodecKind::CpuGzip,
324 original_size: crate::MAX_DECOMPRESSED_BYTES + 1,
325 compressed_size: body.len() as u64,
326 crc32c: 0,
327 };
328 let err = codec.decompress(body, &manifest).await.unwrap_err();
329 match err {
330 CodecError::ManifestSizeExceedsLimit { requested, limit } => {
331 assert_eq!(requested, crate::MAX_DECOMPRESSED_BYTES + 1);
332 assert_eq!(limit, crate::MAX_DECOMPRESSED_BYTES);
333 }
334 other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
335 }
336 }
337
338 #[tokio::test]
339 async fn issue_89_bootstrap_cap_keeps_4gib_claim_alloc_safe() {
340 let codec = CpuGzip::default();
341 let body = Bytes::from_static(&[0x1f, 0x8b]);
342 let manifest = ChunkManifest {
343 codec: CodecKind::CpuGzip,
344 original_size: u32::MAX as u64,
345 compressed_size: body.len() as u64,
346 crc32c: 0,
347 };
348 let err = codec.decompress(body, &manifest).await.unwrap_err();
349 assert!(
350 matches!(err, CodecError::Io(_) | CodecError::SizeMismatch { .. }),
351 "expected Io or SizeMismatch, got {err:?}"
352 );
353 }
354
355 #[test]
359 fn blocking_roundtrip_and_gzip_magic() {
360 let input = b"hello squished world\n".repeat(100);
361 let (compressed, manifest) = compress_blocking(&input, CpuGzip::DEFAULT_LEVEL).unwrap();
362 assert_eq!(&compressed[..2], &[0x1f, 0x8b]);
363 let decompressed = decompress_blocking(&compressed, &manifest).unwrap();
364 assert_eq!(decompressed, input);
365 }
366
367 #[test]
370 fn issue_89_blocking_rejects_manifest_over_5gib() {
371 let body = &[0x1f, 0x8b];
372 let manifest = ChunkManifest {
373 codec: CodecKind::CpuGzip,
374 original_size: crate::MAX_DECOMPRESSED_BYTES + 1,
375 compressed_size: body.len() as u64,
376 crc32c: 0,
377 };
378 let err = decompress_blocking(body, &manifest).unwrap_err();
379 match err {
380 CodecError::ManifestSizeExceedsLimit { requested, limit } => {
381 assert_eq!(requested, crate::MAX_DECOMPRESSED_BYTES + 1);
382 assert_eq!(limit, crate::MAX_DECOMPRESSED_BYTES);
383 }
384 other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
385 }
386 }
387
388 #[test]
389 fn issue_89_blocking_bootstrap_cap_keeps_4gib_claim_alloc_safe() {
390 let body = &[0x1f, 0x8b];
391 let manifest = ChunkManifest {
392 codec: CodecKind::CpuGzip,
393 original_size: u32::MAX as u64,
394 compressed_size: body.len() as u64,
395 crc32c: 0,
396 };
397 let err = decompress_blocking(body, &manifest).unwrap_err();
398 assert!(
399 matches!(err, CodecError::Io(_) | CodecError::SizeMismatch { .. }),
400 "expected Io or SizeMismatch, got {err:?}"
401 );
402 }
403}