1use std::io::{Read, Write};
19
20use bytes::Bytes;
21use flate2::Compression;
22use flate2::read::GzDecoder;
23use flate2::write::GzEncoder;
24
25use crate::{ChunkManifest, Codec, CodecError, CodecKind};
26
27#[derive(Debug, Clone)]
29pub struct CpuGzip {
30 level: u32,
31}
32
33impl CpuGzip {
34 pub const DEFAULT_LEVEL: u32 = 6;
35
36 pub fn new(level: u32) -> Self {
37 Self {
38 level: level.min(9),
39 }
40 }
41}
42
43impl Default for CpuGzip {
44 fn default() -> Self {
45 Self::new(Self::DEFAULT_LEVEL)
46 }
47}
48
49pub fn decompress_blocking(input: &[u8], manifest: &ChunkManifest) -> Result<Vec<u8>, CodecError> {
54 if manifest.codec != CodecKind::CpuGzip {
55 return Err(CodecError::CodecMismatch {
56 expected: CodecKind::CpuGzip,
57 got: manifest.codec,
58 });
59 }
60 if input.len() as u64 != manifest.compressed_size {
61 return Err(CodecError::SizeMismatch {
62 expected: manifest.compressed_size,
63 got: input.len() as u64,
64 });
65 }
66 let limit = manifest.original_size.saturating_add(1024);
67 let mut buf = Vec::with_capacity(manifest.original_size as usize);
68 let mut decoder = GzDecoder::new(input);
69 (&mut decoder)
70 .take(limit)
71 .read_to_end(&mut buf)
72 .map_err(CodecError::Io)?;
73 if (buf.len() as u64) > manifest.original_size {
74 return Err(CodecError::Io(std::io::Error::other(format!(
75 "gzip decompression bomb detected: produced {} bytes, manifest claimed {}",
76 buf.len(),
77 manifest.original_size
78 ))));
79 }
80 if buf.len() as u64 != manifest.original_size {
81 return Err(CodecError::SizeMismatch {
82 expected: manifest.original_size,
83 got: buf.len() as u64,
84 });
85 }
86 let actual_crc = crc32c::crc32c(&buf);
87 if actual_crc != manifest.crc32c {
88 return Err(CodecError::CrcMismatch {
89 expected: manifest.crc32c,
90 got: actual_crc,
91 });
92 }
93 Ok(buf)
94}
95
96pub fn compress_blocking(input: &[u8], level: u32) -> Result<(Vec<u8>, ChunkManifest), CodecError> {
98 let level = level.min(9);
99 let original_size = input.len() as u64;
100 let original_crc = crc32c::crc32c(input);
101 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
102 encoder.write_all(input).map_err(CodecError::Io)?;
103 let compressed = encoder.finish().map_err(CodecError::Io)?;
104 Ok((
105 compressed.clone(),
106 ChunkManifest {
107 codec: CodecKind::CpuGzip,
108 original_size,
109 compressed_size: compressed.len() as u64,
110 crc32c: original_crc,
111 },
112 ))
113}
114
115#[async_trait::async_trait]
116impl Codec for CpuGzip {
117 fn kind(&self) -> CodecKind {
118 CodecKind::CpuGzip
119 }
120
121 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
122 let level = self.level;
123 let original_size = input.len() as u64;
124 let original_crc = crc32c::crc32c(&input);
125
126 let compressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
127 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level));
128 encoder.write_all(input.as_ref())?;
129 encoder.finish()
130 })
131 .await??;
132
133 let manifest = ChunkManifest {
134 codec: CodecKind::CpuGzip,
135 original_size,
136 compressed_size: compressed.len() as u64,
137 crc32c: original_crc,
138 };
139 Ok((Bytes::from(compressed), manifest))
140 }
141
142 async fn decompress(
143 &self,
144 input: Bytes,
145 manifest: &ChunkManifest,
146 ) -> Result<Bytes, CodecError> {
147 if manifest.codec != CodecKind::CpuGzip {
148 return Err(CodecError::CodecMismatch {
149 expected: CodecKind::CpuGzip,
150 got: manifest.codec,
151 });
152 }
153 if input.len() as u64 != manifest.compressed_size {
154 return Err(CodecError::SizeMismatch {
155 expected: manifest.compressed_size,
156 got: input.len() as u64,
157 });
158 }
159 let expected_crc = manifest.crc32c;
160 let expected_orig_size = manifest.original_size;
161
162 let decompressed = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
163 let limit = expected_orig_size.saturating_add(1024);
168 let mut buf = Vec::with_capacity(expected_orig_size as usize);
169 let mut decoder = GzDecoder::new(input.as_ref());
170 (&mut decoder).take(limit).read_to_end(&mut buf)?;
171 if (buf.len() as u64) > expected_orig_size {
172 return Err(std::io::Error::other(format!(
173 "gzip decompression bomb detected: produced {} bytes, manifest claimed {}",
174 buf.len(),
175 expected_orig_size
176 )));
177 }
178 Ok(buf)
179 })
180 .await??;
181
182 if decompressed.len() as u64 != expected_orig_size {
183 return Err(CodecError::SizeMismatch {
184 expected: expected_orig_size,
185 got: decompressed.len() as u64,
186 });
187 }
188 let actual_crc = crc32c::crc32c(&decompressed);
189 if actual_crc != expected_crc {
190 return Err(CodecError::CrcMismatch {
191 expected: expected_crc,
192 got: actual_crc,
193 });
194 }
195 Ok(Bytes::from(decompressed))
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use std::io::Read;
203
204 #[tokio::test]
205 async fn roundtrip_small() {
206 let codec = CpuGzip::default();
207 let input = Bytes::from_static(b"the quick brown fox jumps over the lazy dog ".as_slice());
208 let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
209 assert_eq!(manifest.codec, CodecKind::CpuGzip);
210 assert_eq!(manifest.original_size, input.len() as u64);
211 let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
212 assert_eq!(decompressed, input);
213 }
214
215 #[tokio::test]
216 async fn roundtrip_compressible() {
217 let codec = CpuGzip::default();
218 let input = Bytes::from(vec![b'x'; 1024 * 1024]);
219 let (compressed, manifest) = codec.compress(input.clone()).await.unwrap();
220 assert!(
222 compressed.len() < 2048,
223 "expected gzip to compress 1 MiB of x bytes well, got {} bytes",
224 compressed.len()
225 );
226 let decompressed = codec.decompress(compressed, &manifest).await.unwrap();
227 assert_eq!(decompressed, input);
228 }
229
230 #[tokio::test]
234 async fn output_is_decodable_by_stock_gunzip() {
235 let codec = CpuGzip::default();
236 let input = Bytes::from(b"hello squished world\n".repeat(100));
237 let (compressed, _manifest) = codec.compress(input.clone()).await.unwrap();
238
239 assert_eq!(
241 &compressed[..2],
242 &[0x1f, 0x8b],
243 "must start with gzip magic"
244 );
245
246 let mut buf = Vec::new();
250 flate2::read::GzDecoder::new(compressed.as_ref())
251 .read_to_end(&mut buf)
252 .unwrap();
253 assert_eq!(buf, input.as_ref());
254 }
255
256 #[tokio::test]
257 async fn rejects_codec_mismatch() {
258 let codec = CpuGzip::default();
259 let manifest = ChunkManifest {
260 codec: CodecKind::CpuZstd,
261 original_size: 10,
262 compressed_size: 10,
263 crc32c: 0,
264 };
265 let err = codec
266 .decompress(Bytes::from_static(b"0123456789"), &manifest)
267 .await
268 .unwrap_err();
269 assert!(matches!(err, CodecError::CodecMismatch { .. }));
270 }
271
272 #[test]
276 fn blocking_roundtrip_and_gzip_magic() {
277 let input = b"hello squished world\n".repeat(100);
278 let (compressed, manifest) = compress_blocking(&input, CpuGzip::DEFAULT_LEVEL).unwrap();
279 assert_eq!(&compressed[..2], &[0x1f, 0x8b]);
280 let decompressed = decompress_blocking(&compressed, &manifest).unwrap();
281 assert_eq!(decompressed, input);
282 }
283}