1pub use super::parquet_bridge::{
3 BrotliLevel, Compression, CompressionOptions, GzipLevel, ZstdLevel,
4};
5use crate::parquet::error::{ParquetError, ParquetResult};
6
7#[cfg(any(feature = "snappy", feature = "lz4"))]
8fn inner_compress<
9 G: Fn(usize) -> ParquetResult<usize>,
10 F: Fn(&[u8], &mut [u8]) -> ParquetResult<usize>,
11>(
12 input: &[u8],
13 output: &mut Vec<u8>,
14 get_length: G,
15 compress: F,
16) -> ParquetResult<()> {
17 let original_length = output.len();
18 let max_required_length = get_length(input.len())?;
19
20 output.resize(original_length + max_required_length, 0);
21 let compressed_size = compress(input, &mut output[original_length..])?;
22
23 output.truncate(original_length + compressed_size);
24 Ok(())
25}
26
27#[allow(unused_variables)]
33pub fn compress(
34 compression: CompressionOptions,
35 input_buf: &[u8],
36 #[allow(clippy::ptr_arg)] output_buf: &mut Vec<u8>,
37) -> ParquetResult<()> {
38 match compression {
39 #[cfg(feature = "brotli")]
40 CompressionOptions::Brotli(level) => {
41 use std::io::Write;
42 const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
43 const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; let q = level.unwrap_or_default();
46 let mut encoder = brotli::CompressorWriter::new(
47 output_buf,
48 BROTLI_DEFAULT_BUFFER_SIZE,
49 q.compression_level(),
50 BROTLI_DEFAULT_LG_WINDOW_SIZE,
51 );
52 encoder.write_all(input_buf)?;
53 encoder.flush().map_err(|e| e.into())
54 },
55 #[cfg(not(feature = "brotli"))]
56 CompressionOptions::Brotli(_) => Err(ParquetError::FeatureNotActive(
57 crate::parquet::error::Feature::Brotli,
58 "compress to brotli".to_string(),
59 )),
60 #[cfg(feature = "gzip")]
61 CompressionOptions::Gzip(level) => {
62 use std::io::Write;
63 let level = level.unwrap_or_default();
64 let mut encoder = flate2::write::GzEncoder::new(output_buf, level.into());
65 encoder.write_all(input_buf)?;
66 encoder.try_finish().map_err(|e| e.into())
67 },
68 #[cfg(not(feature = "gzip"))]
69 CompressionOptions::Gzip(_) => Err(ParquetError::FeatureNotActive(
70 crate::parquet::error::Feature::Gzip,
71 "compress to gzip".to_string(),
72 )),
73 #[cfg(feature = "snappy")]
74 CompressionOptions::Snappy => inner_compress(
75 input_buf,
76 output_buf,
77 |len| Ok(snap::raw::max_compress_len(len)),
78 |input, output| Ok(snap::raw::Encoder::new().compress(input, output)?),
79 ),
80 #[cfg(not(feature = "snappy"))]
81 CompressionOptions::Snappy => Err(ParquetError::FeatureNotActive(
82 crate::parquet::error::Feature::Snappy,
83 "compress to snappy".to_string(),
84 )),
85 #[cfg(feature = "lz4")]
86 CompressionOptions::Lz4Raw => inner_compress(
87 input_buf,
88 output_buf,
89 |len| Ok(lz4::block::compress_bound(len)?),
90 |input, output| {
91 let compressed_size = lz4::block::compress_to_buffer(input, None, false, output)?;
92 Ok(compressed_size)
93 },
94 ),
95 #[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]
96 CompressionOptions::Lz4Raw => Err(ParquetError::FeatureNotActive(
97 crate::parquet::error::Feature::Lz4,
98 "compress to lz4".to_string(),
99 )),
100 #[cfg(feature = "zstd")]
101 CompressionOptions::Zstd(level) => {
102 let level = level.map(|v| v.compression_level()).unwrap_or_default();
103 let old_len = output_buf.len();
106 output_buf.resize(
107 old_len + zstd::zstd_safe::compress_bound(input_buf.len()),
108 0,
109 );
110 match zstd::bulk::compress_to_buffer(input_buf, &mut output_buf[old_len..], level) {
111 Ok(written_size) => {
112 output_buf.truncate(old_len + written_size);
113 Ok(())
114 },
115 Err(e) => Err(e.into()),
116 }
117 },
118 #[cfg(not(feature = "zstd"))]
119 CompressionOptions::Zstd(_) => Err(ParquetError::FeatureNotActive(
120 crate::parquet::error::Feature::Zstd,
121 "compress to zstd".to_string(),
122 )),
123 CompressionOptions::Uncompressed => Err(ParquetError::InvalidParameter(
124 "Compressing uncompressed".to_string(),
125 )),
126 _ => Err(ParquetError::FeatureNotSupported(format!(
127 "Compression {:?} is not supported",
128 compression,
129 ))),
130 }
131}
132
133#[allow(unused_variables)]
136pub fn decompress(
137 compression: Compression,
138 input_buf: &[u8],
139 output_buf: &mut [u8],
140) -> ParquetResult<()> {
141 match compression {
142 #[cfg(feature = "brotli")]
143 Compression::Brotli => {
144 use std::io::Read;
145 const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
146 brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
147 .read_exact(output_buf)
148 .map_err(|e| e.into())
149 },
150 #[cfg(not(feature = "brotli"))]
151 Compression::Brotli => Err(ParquetError::FeatureNotActive(
152 crate::parquet::error::Feature::Brotli,
153 "decompress with brotli".to_string(),
154 )),
155 #[cfg(feature = "gzip")]
156 Compression::Gzip => {
157 use std::io::Read;
158 let mut decoder = flate2::read::GzDecoder::new(input_buf);
159 decoder.read_exact(output_buf).map_err(|e| e.into())
160 },
161 #[cfg(not(feature = "gzip"))]
162 Compression::Gzip => Err(ParquetError::FeatureNotActive(
163 crate::parquet::error::Feature::Gzip,
164 "decompress with gzip".to_string(),
165 )),
166 #[cfg(feature = "snappy")]
167 Compression::Snappy => {
168 use snap::raw::{decompress_len, Decoder};
169
170 let len = decompress_len(input_buf)?;
171 if len > output_buf.len() {
172 return Err(ParquetError::oos("snappy header out of spec"));
173 }
174 Decoder::new()
175 .decompress(input_buf, output_buf)
176 .map_err(|e| e.into())
177 .map(|_| ())
178 },
179 #[cfg(not(feature = "snappy"))]
180 Compression::Snappy => Err(ParquetError::FeatureNotActive(
181 crate::parquet::error::Feature::Snappy,
182 "decompress with snappy".to_string(),
183 )),
184 #[cfg(all(feature = "lz4_flex", not(feature = "lz4")))]
185 Compression::Lz4Raw => lz4_flex::block::decompress_into(input_buf, output_buf)
186 .map(|_| {})
187 .map_err(|e| e.into()),
188 #[cfg(feature = "lz4")]
189 Compression::Lz4Raw => {
190 lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
191 .map(|_| {})
192 .map_err(|e| e.into())
193 },
194 #[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]
195 Compression::Lz4Raw => Err(ParquetError::FeatureNotActive(
196 crate::parquet::error::Feature::Lz4,
197 "decompress with lz4".to_string(),
198 )),
199
200 #[cfg(any(feature = "lz4_flex", feature = "lz4"))]
201 Compression::Lz4 => try_decompress_hadoop(input_buf, output_buf).or_else(|_| {
202 lz4_decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
203 .map(|_| {})
204 }),
205
206 #[cfg(all(not(feature = "lz4_flex"), not(feature = "lz4")))]
207 Compression::Lz4 => Err(ParquetError::FeatureNotActive(
208 crate::parquet::error::Feature::Lz4,
209 "decompress with legacy lz4".to_string(),
210 )),
211
212 #[cfg(feature = "zstd")]
213 Compression::Zstd => {
214 use std::io::Read;
215 let mut decoder = zstd::Decoder::with_buffer(input_buf)?;
216 decoder.read_exact(output_buf).map_err(|e| e.into())
217 },
218 #[cfg(not(feature = "zstd"))]
219 Compression::Zstd => Err(ParquetError::FeatureNotActive(
220 crate::parquet::error::Feature::Zstd,
221 "decompress with zstd".to_string(),
222 )),
223 Compression::Uncompressed => Err(ParquetError::InvalidParameter(
224 "Compressing uncompressed".to_string(),
225 )),
226 _ => Err(ParquetError::FeatureNotSupported(format!(
227 "Compression {:?} is not supported",
228 compression,
229 ))),
230 }
231}
232
233#[cfg(any(feature = "lz4", feature = "lz4_flex"))]
237fn try_decompress_hadoop(input_buf: &[u8], output_buf: &mut [u8]) -> ParquetResult<()> {
238 const SIZE_U32: usize = size_of::<u32>();
249 const PREFIX_LEN: usize = SIZE_U32 * 2;
250 let mut input_len = input_buf.len();
251 let mut input = input_buf;
252 let mut output_len = output_buf.len();
253 let mut output: &mut [u8] = output_buf;
254 while input_len >= PREFIX_LEN {
255 let mut bytes = [0; SIZE_U32];
256 bytes.copy_from_slice(&input[0..4]);
257 let expected_decompressed_size = u32::from_be_bytes(bytes);
258 let mut bytes = [0; SIZE_U32];
259 bytes.copy_from_slice(&input[4..8]);
260 let expected_compressed_size = u32::from_be_bytes(bytes);
261 input = &input[PREFIX_LEN..];
262 input_len -= PREFIX_LEN;
263
264 if input_len < expected_compressed_size as usize {
265 return Err(ParquetError::oos("Not enough bytes for Hadoop frame"));
266 }
267
268 if output_len < expected_decompressed_size as usize {
269 return Err(ParquetError::oos(
270 "Not enough bytes to hold advertised output",
271 ));
272 }
273 let decompressed_size = lz4_decompress_to_buffer(
274 &input[..expected_compressed_size as usize],
275 Some(output_len as i32),
276 output,
277 )?;
278 if decompressed_size != expected_decompressed_size as usize {
279 return Err(ParquetError::oos("unexpected decompressed size"));
280 }
281 input_len -= expected_compressed_size as usize;
282 output_len -= expected_decompressed_size as usize;
283 if input_len > expected_compressed_size as usize {
284 input = &input[expected_compressed_size as usize..];
285 output = &mut output[expected_decompressed_size as usize..];
286 } else {
287 break;
288 }
289 }
290 if input_len == 0 {
291 Ok(())
292 } else {
293 Err(ParquetError::oos("Not all input are consumed"))
294 }
295}
296
297#[cfg(feature = "lz4")]
298#[inline]
299fn lz4_decompress_to_buffer(
300 src: &[u8],
301 uncompressed_size: Option<i32>,
302 buffer: &mut [u8],
303) -> ParquetResult<usize> {
304 let size = lz4::block::decompress_to_buffer(src, uncompressed_size, buffer)?;
305 Ok(size)
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311
312 fn test_roundtrip(c: CompressionOptions, data: &[u8]) {
313 let offset = 2048;
314
315 let mut compressed = vec![2; offset];
317 compress(c, data, &mut compressed).expect("Error when compressing");
318
319 assert!(compressed.len() - offset < data.len());
321
322 let mut decompressed = vec![0; data.len()];
323 decompress(c.into(), &compressed[offset..], &mut decompressed)
324 .expect("Error when decompressing");
325 assert_eq!(data, decompressed.as_slice());
326 }
327
328 fn test_codec(c: CompressionOptions) {
329 let sizes = vec![1000, 10000, 100000];
330 for size in sizes {
331 let data = (0..size).map(|x| (x % 255) as u8).collect::<Vec<_>>();
332 test_roundtrip(c, &data);
333 }
334 }
335
336 #[test]
337 fn test_codec_snappy() {
338 test_codec(CompressionOptions::Snappy);
339 }
340
341 #[test]
342 fn test_codec_gzip_default() {
343 test_codec(CompressionOptions::Gzip(None));
344 }
345
346 #[test]
347 fn test_codec_gzip_low_compression() {
348 test_codec(CompressionOptions::Gzip(Some(
349 GzipLevel::try_new(1).unwrap(),
350 )));
351 }
352
353 #[test]
354 fn test_codec_brotli_default() {
355 test_codec(CompressionOptions::Brotli(None));
356 }
357
358 #[test]
359 fn test_codec_brotli_low_compression() {
360 test_codec(CompressionOptions::Brotli(Some(
361 BrotliLevel::try_new(1).unwrap(),
362 )));
363 }
364
365 #[test]
366 fn test_codec_brotli_high_compression() {
367 test_codec(CompressionOptions::Brotli(Some(
368 BrotliLevel::try_new(11).unwrap(),
369 )));
370 }
371
372 #[test]
373 fn test_codec_lz4_raw() {
374 test_codec(CompressionOptions::Lz4Raw);
375 }
376
377 #[test]
378 fn test_codec_zstd_default() {
379 test_codec(CompressionOptions::Zstd(None));
380 }
381
382 #[cfg(feature = "zstd")]
383 #[test]
384 fn test_codec_zstd_low_compression() {
385 test_codec(CompressionOptions::Zstd(Some(
386 ZstdLevel::try_new(1).unwrap(),
387 )));
388 }
389
390 #[cfg(feature = "zstd")]
391 #[test]
392 fn test_codec_zstd_high_compression() {
393 test_codec(CompressionOptions::Zstd(Some(
394 ZstdLevel::try_new(21).unwrap(),
395 )));
396 }
397}