1use std::io::{Read, Write};
19
20use bytes::Bytes;
21use flate2::Compression;
22use flate2::read::GzDecoder;
23use flate2::write::GzEncoder;
24
25use crate::{
26 ChunkManifest, Codec, CodecError, CodecKind, DECOMPRESS_BOOTSTRAP_CAPACITY,
27 validate_decompress_manifest,
28};
29
30#[derive(Debug, Clone)]
32pub struct CpuGzip {
33 level: u32,
34}
35
36impl CpuGzip {
37 pub const DEFAULT_LEVEL: u32 = 6;
38
39 pub fn new(level: u32) -> Self {
40 Self {
41 level: level.min(9),
42 }
43 }
44}
45
46impl Default for CpuGzip {
47 fn default() -> Self {
48 Self::new(Self::DEFAULT_LEVEL)
49 }
50}
51
52pub fn decompress_blocking(input: &[u8], manifest: &ChunkManifest) -> Result<Vec<u8>, CodecError> {
57 if manifest.codec != CodecKind::CpuGzip {
58 return Err(CodecError::CodecMismatch {
59 expected: CodecKind::CpuGzip,
60 got: manifest.codec,
61 });
62 }
63 let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
67 let limit = manifest.original_size.saturating_add(1024);
68 let mut buf = Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
71 let mut decoder = GzDecoder::new(input);
72 (&mut decoder)
73 .take(limit)
74 .read_to_end(&mut buf)
75 .map_err(CodecError::Io)?;
76 if (buf.len() as u64) > manifest.original_size {
77 return Err(CodecError::Io(std::io::Error::other(format!(
78 "gzip decompression bomb detected: produced {} bytes, manifest claimed {}",
79 buf.len(),
80 manifest.original_size
81 ))));
82 }
83 if buf.len() as u64 != manifest.original_size {
84 return Err(CodecError::SizeMismatch {
85 expected: manifest.original_size,
86 got: buf.len() as u64,
87 });
88 }
89 let actual_crc = crc32c::crc32c(&buf);
90 if actual_crc != manifest.crc32c {
91 return Err(CodecError::CrcMismatch {
92 expected: manifest.crc32c,
93 got: actual_crc,
94 });
95 }
96 Ok(buf)
97}
98
99pub fn compress_blocking(input: &[u8], level: u32) -> Result<(Vec<u8>, ChunkManifest), CodecError> {
101 let level = level.min(9);
102 let original_size = input.len() as u64;
103 let original_crc = crc32c::crc32c(input);
104 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
105 encoder.write_all(input).map_err(CodecError::Io)?;
106 let compressed = encoder.finish().map_err(CodecError::Io)?;
107 Ok((
108 compressed.clone(),
109 ChunkManifest {
110 codec: CodecKind::CpuGzip,
111 original_size,
112 compressed_size: compressed.len() as u64,
113 crc32c: original_crc,
114 },
115 ))
116}
117
118#[async_trait::async_trait]
119impl Codec for CpuGzip {
120 fn kind(&self) -> CodecKind {
121 CodecKind::CpuGzip
122 }
123
124 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
125 let level = self.level;
126 let original_size = input.len() as u64;
127 let original_crc = crc32c::crc32c(&input);
128
129 let compressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
130 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
131 encoder.write_all(input.as_ref())?;
132 encoder.finish()
133 })
134 .await??;
135
136 let manifest = ChunkManifest {
137 codec: CodecKind::CpuGzip,
138 original_size,
139 compressed_size: compressed.len() as u64,
140 crc32c: original_crc,
141 };
142 Ok((Bytes::from(compressed), manifest))
143 }
144
145 async fn decompress(
146 &self,
147 input: Bytes,
148 manifest: &ChunkManifest,
149 ) -> Result<Bytes, CodecError> {
150 if manifest.codec != CodecKind::CpuGzip {
151 return Err(CodecError::CodecMismatch {
152 expected: CodecKind::CpuGzip,
153 got: manifest.codec,
154 });
155 }
156 let allocated_orig_size = validate_decompress_manifest(manifest, input.len())?;
161
162 let expected_crc = manifest.crc32c;
163 let expected_orig_size = manifest.original_size;
164
165 let decompressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
166 let limit = expected_orig_size.saturating_add(1024);
171 let mut buf =
174 Vec::with_capacity(allocated_orig_size.min(DECOMPRESS_BOOTSTRAP_CAPACITY));
175 let mut decoder = GzDecoder::new(input.as_ref());
176 (&mut decoder).take(limit).read_to_end(&mut buf)?;
177 if (buf.len() as u64) > expected_orig_size {
178 return Err(std::io::Error::other(format!(
179 "gzip decompression bomb detected: produced {} bytes, manifest claimed {}",
180 buf.len(),
181 expected_orig_size
182 )));
183 }
184 Ok(buf)
185 })
186 .await??;
187
188 if decompressed.len() as u64 != expected_orig_size {
189 return Err(CodecError::SizeMismatch {
190 expected: expected_orig_size,
191 got: decompressed.len() as u64,
192 });
193 }
194 let actual_crc = crc32c::crc32c(&decompressed);
195 if actual_crc != expected_crc {
196 return Err(CodecError::CrcMismatch {
197 expected: expected_crc,
198 got: actual_crc,
199 });
200 }
201 Ok(Bytes::from(decompressed))
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use std::io::Read;
209
210 #[tokio::test]
211 async fn roundtrip_small() {
212 let codec = CpuGzip::default();
213 let input = Bytes::from_static(b"the quick brown fox jumps over the lazy dog ".as_slice());
214 let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
215 assert_eq!(manifest.codec, CodecKind::CpuGzip);
216 assert_eq!(manifest.original_size, input.len() as u64);
217 let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
218 assert_eq!(decompressed, input);
219 }
220
221 #[tokio::test]
222 async fn roundtrip_compressible() {
223 let codec = CpuGzip::default();
224 let input = Bytes::from(vec![b'x'; 1024 * 1024]);
225 let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
226 assert!(
228 compressed.len() < 2048,
229 "expected gzip to compress 1 MiB of x bytes well, got {} bytes",
230 compressed.len()
231 );
232 let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
233 assert_eq!(decompressed, input);
234 }
235
236 #[tokio::test]
240 async fn output_is_decodable_by_stock_gunzip() {
241 let codec = CpuGzip::default();
242 let input = Bytes::from(b"hello squished world\n".repeat(100));
243 let (compressed, _manifest) = codec.compress(input.clone()).await.unwrap();
244
245 assert_eq!(
247 &compressed[..2],
248 &[0x1f, 0x8b],
249 "must start with gzip magic"
250 );
251
252 let mut buf = Vec::new();
256 flate2::read::GzDecoder::new(compressed.as_ref())
257 .read_to_end(&mut buf)
258 .unwrap();
259 assert_eq!(buf, input.as_ref());
260 }
261
262 #[tokio::test]
263 async fn rejects_codec_mismatch() {
264 let codec = CpuGzip::default();
265 let manifest = ChunkManifest {
266 codec: CodecKind::CpuZstd,
267 original_size: 10,
268 compressed_size: 10,
269 crc32c: 0,
270 };
271 let err = codec
272 .decompress(Bytes::from_static(b"0123456789"), &manifest)
273 .await
274 .unwrap_err();
275 assert!(matches!(err, CodecError::CodecMismatch { .. }));
276 }
277
278 #[tokio::test]
284 async fn issue_89_rejects_manifest_over_5gib() {
285 let codec = CpuGzip::default();
286 let body = Bytes::from_static(&[0x1f, 0x8b]);
287 let manifest = ChunkManifest {
288 codec: CodecKind::CpuGzip,
289 original_size: crate::MAX_DECOMPRESSED_BYTES + 1,
290 compressed_size: body.len() as u64,
291 crc32c: 0,
292 };
293 let err = codec.decompress(body, &manifest).await.unwrap_err();
294 match err {
295 CodecError::ManifestSizeExceedsLimit { requested, limit } => {
296 assert_eq!(requested, crate::MAX_DECOMPRESSED_BYTES + 1);
297 assert_eq!(limit, crate::MAX_DECOMPRESSED_BYTES);
298 }
299 other => panic!("expected ManifestSizeExceedsLimit, got {other:?}"),
300 }
301 }
302
303 #[tokio::test]
304 async fn issue_89_bootstrap_cap_keeps_4gib_claim_alloc_safe() {
305 let codec = CpuGzip::default();
306 let body = Bytes::from_static(&[0x1f, 0x8b]);
307 let manifest = ChunkManifest {
308 codec: CodecKind::CpuGzip,
309 original_size: u32::MAX as u64,
310 compressed_size: body.len() as u64,
311 crc32c: 0,
312 };
313 let err = codec.decompress(body, &manifest).await.unwrap_err();
314 assert!(
315 matches!(err, CodecError::Io(_) | CodecError::SizeMismatch { .. }),
316 "expected Io or SizeMismatch, got {err:?}"
317 );
318 }
319
320 #[test]
324 fn blocking_roundtrip_and_gzip_magic() {
325 let input = b"hello squished world\n".repeat(100);
326 let (compressed, manifest) = compress_blocking(&input, CpuGzip::DEFAULT_LEVEL).unwrap();
327 assert_eq!(&compressed[..2], &[0x1f, 0x8b]);
328 let decompressed = decompress_blocking(&compressed, &manifest).unwrap();
329 assert_eq!(decompressed, input);
330 }
331}