polars_parquet/parquet/
compression.rs

1//! Functionality to compress and decompress data according to the parquet specification
2pub use super::parquet_bridge::{
3    BrotliLevel, Compression, CompressionOptions, GzipLevel, ZstdLevel,
4};
5use crate::parquet::error::{ParquetError, ParquetResult};
6
7#[cfg(any(feature = "snappy", feature = "lz4"))]
8fn inner_compress<
9    G: Fn(usize) -> ParquetResult<usize>,
10    F: Fn(&[u8], &mut [u8]) -> ParquetResult<usize>,
11>(
12    input: &[u8],
13    output: &mut Vec<u8>,
14    get_length: G,
15    compress: F,
16) -> ParquetResult<()> {
17    let original_length = output.len();
18    let max_required_length = get_length(input.len())?;
19
20    output.resize(original_length + max_required_length, 0);
21    let compressed_size = compress(input, &mut output[original_length..])?;
22
23    output.truncate(original_length + compressed_size);
24    Ok(())
25}
26
27/// Compresses data stored in slice `input_buf` and writes the compressed result
28/// to `output_buf`.
29///
30/// Note that you'll need to call `clear()` before reusing the same `output_buf`
31/// across different `compress` calls.
32#[allow(unused_variables)]
33pub fn compress(
34    compression: CompressionOptions,
35    input_buf: &[u8],
36    #[allow(clippy::ptr_arg)] output_buf: &mut Vec<u8>,
37) -> ParquetResult<()> {
38    match compression {
39        #[cfg(feature = "brotli")]
40        CompressionOptions::Brotli(level) => {
41            use std::io::Write;
42            const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
43            const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22
44
45            let q = level.unwrap_or_default();
46            let mut encoder = brotli::CompressorWriter::new(
47                output_buf,
48                BROTLI_DEFAULT_BUFFER_SIZE,
49                q.compression_level(),
50                BROTLI_DEFAULT_LG_WINDOW_SIZE,
51            );
52            encoder.write_all(input_buf)?;
53            encoder.flush().map_err(|e| e.into())
54        },
55        #[cfg(not(feature = "brotli"))]
56        CompressionOptions::Brotli(_) => Err(ParquetError::FeatureNotActive(
57            crate::parquet::error::Feature::Brotli,
58            "compress to brotli".to_string(),
59        )),
60        #[cfg(feature = "gzip")]
61        CompressionOptions::Gzip(level) => {
62            use std::io::Write;
63            let level = level.unwrap_or_default();
64            let mut encoder = flate2::write::GzEncoder::new(output_buf, level.into());
65            encoder.write_all(input_buf)?;
66            encoder.try_finish().map_err(|e| e.into())
67        },
68        #[cfg(not(feature = "gzip"))]
69        CompressionOptions::Gzip(_) => Err(ParquetError::FeatureNotActive(
70            crate::parquet::error::Feature::Gzip,
71            "compress to gzip".to_string(),
72        )),
73        #[cfg(feature = "snappy")]
74        CompressionOptions::Snappy => inner_compress(
75            input_buf,
76            output_buf,
77            |len| Ok(snap::raw::max_compress_len(len)),
78            |input, output| Ok(snap::raw::Encoder::new().compress(input, output)?),
79        ),
80        #[cfg(not(feature = "snappy"))]
81        CompressionOptions::Snappy => Err(ParquetError::FeatureNotActive(
82            crate::parquet::error::Feature::Snappy,
83            "compress to snappy".to_string(),
84        )),
85        #[cfg(feature = "lz4")]
86        CompressionOptions::Lz4Raw => inner_compress(
87            input_buf,
88            output_buf,
89            |len| Ok(lz4::block::compress_bound(len)?),
90            |input, output| {
91                let compressed_size = lz4::block::compress_to_buffer(input, None, false, output)?;
92                Ok(compressed_size)
93            },
94        ),
95        #[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]
96        CompressionOptions::Lz4Raw => Err(ParquetError::FeatureNotActive(
97            crate::parquet::error::Feature::Lz4,
98            "compress to lz4".to_string(),
99        )),
100        #[cfg(feature = "zstd")]
101        CompressionOptions::Zstd(level) => {
102            let level = level.map(|v| v.compression_level()).unwrap_or_default();
103            // Make sure the buffer is large enough; the interface assumption is
104            // that decompressed data is appended to the output buffer.
105            let old_len = output_buf.len();
106            output_buf.resize(
107                old_len + zstd::zstd_safe::compress_bound(input_buf.len()),
108                0,
109            );
110            match zstd::bulk::compress_to_buffer(input_buf, &mut output_buf[old_len..], level) {
111                Ok(written_size) => {
112                    output_buf.truncate(old_len + written_size);
113                    Ok(())
114                },
115                Err(e) => Err(e.into()),
116            }
117        },
118        #[cfg(not(feature = "zstd"))]
119        CompressionOptions::Zstd(_) => Err(ParquetError::FeatureNotActive(
120            crate::parquet::error::Feature::Zstd,
121            "compress to zstd".to_string(),
122        )),
123        CompressionOptions::Uncompressed => Err(ParquetError::InvalidParameter(
124            "Compressing uncompressed".to_string(),
125        )),
126        _ => Err(ParquetError::FeatureNotSupported(format!(
127            "Compression {:?} is not supported",
128            compression,
129        ))),
130    }
131}
132
133/// Decompresses data stored in slice `input_buf` and writes output to `output_buf`.
134/// Returns the total number of bytes written.
135#[allow(unused_variables)]
136pub fn decompress(
137    compression: Compression,
138    input_buf: &[u8],
139    output_buf: &mut [u8],
140) -> ParquetResult<()> {
141    match compression {
142        #[cfg(feature = "brotli")]
143        Compression::Brotli => {
144            use std::io::Read;
145            const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
146            brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
147                .read_exact(output_buf)
148                .map_err(|e| e.into())
149        },
150        #[cfg(not(feature = "brotli"))]
151        Compression::Brotli => Err(ParquetError::FeatureNotActive(
152            crate::parquet::error::Feature::Brotli,
153            "decompress with brotli".to_string(),
154        )),
155        #[cfg(feature = "gzip")]
156        Compression::Gzip => {
157            use std::io::Read;
158            let mut decoder = flate2::read::GzDecoder::new(input_buf);
159            decoder.read_exact(output_buf).map_err(|e| e.into())
160        },
161        #[cfg(not(feature = "gzip"))]
162        Compression::Gzip => Err(ParquetError::FeatureNotActive(
163            crate::parquet::error::Feature::Gzip,
164            "decompress with gzip".to_string(),
165        )),
166        #[cfg(feature = "snappy")]
167        Compression::Snappy => {
168            use snap::raw::{decompress_len, Decoder};
169
170            let len = decompress_len(input_buf)?;
171            if len > output_buf.len() {
172                return Err(ParquetError::oos("snappy header out of spec"));
173            }
174            Decoder::new()
175                .decompress(input_buf, output_buf)
176                .map_err(|e| e.into())
177                .map(|_| ())
178        },
179        #[cfg(not(feature = "snappy"))]
180        Compression::Snappy => Err(ParquetError::FeatureNotActive(
181            crate::parquet::error::Feature::Snappy,
182            "decompress with snappy".to_string(),
183        )),
184        #[cfg(all(feature = "lz4_flex", not(feature = "lz4")))]
185        Compression::Lz4Raw => lz4_flex::block::decompress_into(input_buf, output_buf)
186            .map(|_| {})
187            .map_err(|e| e.into()),
188        #[cfg(feature = "lz4")]
189        Compression::Lz4Raw => {
190            lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
191                .map(|_| {})
192                .map_err(|e| e.into())
193        },
194        #[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]
195        Compression::Lz4Raw => Err(ParquetError::FeatureNotActive(
196            crate::parquet::error::Feature::Lz4,
197            "decompress with lz4".to_string(),
198        )),
199
200        #[cfg(any(feature = "lz4_flex", feature = "lz4"))]
201        Compression::Lz4 => try_decompress_hadoop(input_buf, output_buf).or_else(|_| {
202            lz4_decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
203                .map(|_| {})
204        }),
205
206        #[cfg(all(not(feature = "lz4_flex"), not(feature = "lz4")))]
207        Compression::Lz4 => Err(ParquetError::FeatureNotActive(
208            crate::parquet::error::Feature::Lz4,
209            "decompress with legacy lz4".to_string(),
210        )),
211
212        #[cfg(feature = "zstd")]
213        Compression::Zstd => {
214            use std::io::Read;
215            let mut decoder = zstd::Decoder::with_buffer(input_buf)?;
216            decoder.read_exact(output_buf).map_err(|e| e.into())
217        },
218        #[cfg(not(feature = "zstd"))]
219        Compression::Zstd => Err(ParquetError::FeatureNotActive(
220            crate::parquet::error::Feature::Zstd,
221            "decompress with zstd".to_string(),
222        )),
223        Compression::Uncompressed => Err(ParquetError::InvalidParameter(
224            "Compressing uncompressed".to_string(),
225        )),
226        _ => Err(ParquetError::FeatureNotSupported(format!(
227            "Compression {:?} is not supported",
228            compression,
229        ))),
230    }
231}
232
233/// Try to decompress the buffer as if it was compressed with the Hadoop Lz4Codec.
234/// Translated from the apache arrow c++ function [TryDecompressHadoop](https://github.com/apache/arrow/blob/bf18e6e4b5bb6180706b1ba0d597a65a4ce5ca48/cpp/src/arrow/util/compression_lz4.cc#L474).
235/// Returns error if decompression failed.
236#[cfg(any(feature = "lz4", feature = "lz4_flex"))]
237fn try_decompress_hadoop(input_buf: &[u8], output_buf: &mut [u8]) -> ParquetResult<()> {
238    // Parquet files written with the Hadoop Lz4Codec use their own framing.
239    // The input buffer can contain an arbitrary number of "frames", each
240    // with the following structure:
241    // - bytes 0..3: big-endian uint32_t representing the frame decompressed size
242    // - bytes 4..7: big-endian uint32_t representing the frame compressed size
243    // - bytes 8...: frame compressed data
244    //
245    // The Hadoop Lz4Codec source code can be found here:
246    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
247
248    const SIZE_U32: usize = size_of::<u32>();
249    const PREFIX_LEN: usize = SIZE_U32 * 2;
250    let mut input_len = input_buf.len();
251    let mut input = input_buf;
252    let mut output_len = output_buf.len();
253    let mut output: &mut [u8] = output_buf;
254    while input_len >= PREFIX_LEN {
255        let mut bytes = [0; SIZE_U32];
256        bytes.copy_from_slice(&input[0..4]);
257        let expected_decompressed_size = u32::from_be_bytes(bytes);
258        let mut bytes = [0; SIZE_U32];
259        bytes.copy_from_slice(&input[4..8]);
260        let expected_compressed_size = u32::from_be_bytes(bytes);
261        input = &input[PREFIX_LEN..];
262        input_len -= PREFIX_LEN;
263
264        if input_len < expected_compressed_size as usize {
265            return Err(ParquetError::oos("Not enough bytes for Hadoop frame"));
266        }
267
268        if output_len < expected_decompressed_size as usize {
269            return Err(ParquetError::oos(
270                "Not enough bytes to hold advertised output",
271            ));
272        }
273        let decompressed_size = lz4_decompress_to_buffer(
274            &input[..expected_compressed_size as usize],
275            Some(output_len as i32),
276            output,
277        )?;
278        if decompressed_size != expected_decompressed_size as usize {
279            return Err(ParquetError::oos("unexpected decompressed size"));
280        }
281        input_len -= expected_compressed_size as usize;
282        output_len -= expected_decompressed_size as usize;
283        if input_len > expected_compressed_size as usize {
284            input = &input[expected_compressed_size as usize..];
285            output = &mut output[expected_decompressed_size as usize..];
286        } else {
287            break;
288        }
289    }
290    if input_len == 0 {
291        Ok(())
292    } else {
293        Err(ParquetError::oos("Not all input are consumed"))
294    }
295}
296
297#[cfg(feature = "lz4")]
298#[inline]
299fn lz4_decompress_to_buffer(
300    src: &[u8],
301    uncompressed_size: Option<i32>,
302    buffer: &mut [u8],
303) -> ParquetResult<usize> {
304    let size = lz4::block::decompress_to_buffer(src, uncompressed_size, buffer)?;
305    Ok(size)
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311
312    fn test_roundtrip(c: CompressionOptions, data: &[u8]) {
313        let offset = 2048;
314
315        // Compress to a buffer that already has data is possible
316        let mut compressed = vec![2; offset];
317        compress(c, data, &mut compressed).expect("Error when compressing");
318
319        // data is compressed...
320        assert!(compressed.len() - offset < data.len());
321
322        let mut decompressed = vec![0; data.len()];
323        decompress(c.into(), &compressed[offset..], &mut decompressed)
324            .expect("Error when decompressing");
325        assert_eq!(data, decompressed.as_slice());
326    }
327
328    fn test_codec(c: CompressionOptions) {
329        let sizes = vec![1000, 10000, 100000];
330        for size in sizes {
331            let data = (0..size).map(|x| (x % 255) as u8).collect::<Vec<_>>();
332            test_roundtrip(c, &data);
333        }
334    }
335
336    #[test]
337    fn test_codec_snappy() {
338        test_codec(CompressionOptions::Snappy);
339    }
340
341    #[test]
342    fn test_codec_gzip_default() {
343        test_codec(CompressionOptions::Gzip(None));
344    }
345
346    #[test]
347    fn test_codec_gzip_low_compression() {
348        test_codec(CompressionOptions::Gzip(Some(
349            GzipLevel::try_new(1).unwrap(),
350        )));
351    }
352
353    #[test]
354    fn test_codec_brotli_default() {
355        test_codec(CompressionOptions::Brotli(None));
356    }
357
358    #[test]
359    fn test_codec_brotli_low_compression() {
360        test_codec(CompressionOptions::Brotli(Some(
361            BrotliLevel::try_new(1).unwrap(),
362        )));
363    }
364
365    #[test]
366    fn test_codec_brotli_high_compression() {
367        test_codec(CompressionOptions::Brotli(Some(
368            BrotliLevel::try_new(11).unwrap(),
369        )));
370    }
371
372    #[test]
373    fn test_codec_lz4_raw() {
374        test_codec(CompressionOptions::Lz4Raw);
375    }
376
377    #[test]
378    fn test_codec_zstd_default() {
379        test_codec(CompressionOptions::Zstd(None));
380    }
381
382    #[cfg(feature = "zstd")]
383    #[test]
384    fn test_codec_zstd_low_compression() {
385        test_codec(CompressionOptions::Zstd(Some(
386            ZstdLevel::try_new(1).unwrap(),
387        )));
388    }
389
390    #[cfg(feature = "zstd")]
391    #[test]
392    fn test_codec_zstd_high_compression() {
393        test_codec(CompressionOptions::Zstd(Some(
394            ZstdLevel::try_new(21).unwrap(),
395        )));
396    }
397}