apache_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for all supported compression codecs in Avro.
19use crate::{types::Value, AvroResult, Error};
20use libflate::deflate::{Decoder, Encoder};
21use std::io::{Read, Write};
22use strum_macros::{EnumIter, EnumString, IntoStaticStr};
23
24#[cfg(feature = "bzip")]
25use bzip2::{
26    read::{BzDecoder, BzEncoder},
27    Compression,
28};
29#[cfg(feature = "snappy")]
30extern crate crc32fast;
31#[cfg(feature = "snappy")]
32use crc32fast::Hasher;
33#[cfg(feature = "xz")]
34use xz2::read::{XzDecoder, XzEncoder};
35
36/// The compression codec used to compress blocks.
37#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)]
38#[strum(serialize_all = "kebab_case")]
39pub enum Codec {
40    /// The `Null` codec simply passes through data uncompressed.
41    Null,
42    /// The `Deflate` codec writes the data block using the deflate algorithm
43    /// as specified in RFC 1951, and typically implemented using the zlib library.
44    /// Note that this format (unlike the "zlib format" in RFC 1950) does not have a checksum.
45    Deflate,
46    #[cfg(feature = "snappy")]
47    /// The `Snappy` codec uses Google's [Snappy](http://google.github.io/snappy/)
48    /// compression library. Each compressed block is followed by the 4-byte, big-endian
49    /// CRC32 checksum of the uncompressed data in the block.
50    Snappy,
51    #[cfg(feature = "zstandard")]
52    Zstandard,
53    #[cfg(feature = "bzip")]
54    /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
55    /// compression library.
56    Bzip2,
57    #[cfg(feature = "xz")]
58    /// The `Xz` codec uses [Xz utils](https://tukaani.org/xz/)
59    /// compression library.
60    Xz,
61}
62
63impl From<Codec> for Value {
64    fn from(value: Codec) -> Self {
65        Self::Bytes(<&str>::from(value).as_bytes().to_vec())
66    }
67}
68
69impl Codec {
70    /// Compress a stream of bytes in-place.
71    pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
72        match self {
73            Codec::Null => (),
74            Codec::Deflate => {
75                let mut encoder = Encoder::new(Vec::new());
76                encoder.write_all(stream).map_err(Error::DeflateCompress)?;
77                // Deflate errors seem to just be io::Error
78                *stream = encoder
79                    .finish()
80                    .into_result()
81                    .map_err(Error::DeflateCompressFinish)?;
82            }
83            #[cfg(feature = "snappy")]
84            Codec::Snappy => {
85                let mut encoded: Vec<u8> = vec![0; snap::raw::max_compress_len(stream.len())];
86                let compressed_size = snap::raw::Encoder::new()
87                    .compress(&stream[..], &mut encoded[..])
88                    .map_err(Error::SnappyCompress)?;
89
90                let mut hasher = Hasher::new();
91                hasher.update(&stream[..]);
92                let checksum = hasher.finalize();
93                let checksum_as_bytes = checksum.to_be_bytes();
94                let checksum_len = checksum_as_bytes.len();
95                encoded.truncate(compressed_size + checksum_len);
96                encoded[compressed_size..].copy_from_slice(&checksum_as_bytes);
97
98                *stream = encoded;
99            }
100            #[cfg(feature = "zstandard")]
101            Codec::Zstandard => {
102                let mut encoder = zstd::Encoder::new(Vec::new(), 0).unwrap();
103                encoder.write_all(stream).map_err(Error::ZstdCompress)?;
104                *stream = encoder.finish().unwrap();
105            }
106            #[cfg(feature = "bzip")]
107            Codec::Bzip2 => {
108                let mut encoder = BzEncoder::new(&stream[..], Compression::best());
109                let mut buffer = Vec::new();
110                encoder.read_to_end(&mut buffer).unwrap();
111                *stream = buffer;
112            }
113            #[cfg(feature = "xz")]
114            Codec::Xz => {
115                let compression_level = 9;
116                let mut encoder = XzEncoder::new(&stream[..], compression_level);
117                let mut buffer = Vec::new();
118                encoder.read_to_end(&mut buffer).unwrap();
119                *stream = buffer;
120            }
121        };
122
123        Ok(())
124    }
125
126    /// Decompress a stream of bytes in-place.
127    pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
128        *stream = match self {
129            Codec::Null => return Ok(()),
130            Codec::Deflate => {
131                let mut decoded = Vec::new();
132                let mut decoder = Decoder::new(&stream[..]);
133                decoder
134                    .read_to_end(&mut decoded)
135                    .map_err(Error::DeflateDecompress)?;
136                decoded
137            }
138            #[cfg(feature = "snappy")]
139            Codec::Snappy => {
140                let decompressed_size = snap::raw::decompress_len(&stream[..stream.len() - 4])
141                    .map_err(Error::GetSnappyDecompressLen)?;
142                let mut decoded = vec![0; decompressed_size];
143                snap::raw::Decoder::new()
144                    .decompress(&stream[..stream.len() - 4], &mut decoded[..])
145                    .map_err(Error::SnappyDecompress)?;
146
147                let mut last_four: [u8; 4] = [0; 4];
148                last_four.copy_from_slice(&stream[(stream.len() - 4)..]);
149                let expected: u32 = u32::from_be_bytes(last_four);
150
151                let mut hasher = Hasher::new();
152                hasher.update(&decoded);
153                let actual = hasher.finalize();
154
155                if expected != actual {
156                    return Err(Error::SnappyCrc32 { expected, actual });
157                }
158                decoded
159            }
160            #[cfg(feature = "zstandard")]
161            Codec::Zstandard => {
162                let mut decoded = Vec::new();
163                let mut decoder = zstd::Decoder::new(&stream[..]).unwrap();
164                std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?;
165                decoded
166            }
167            #[cfg(feature = "bzip")]
168            Codec::Bzip2 => {
169                let mut decoder = BzDecoder::new(&stream[..]);
170                let mut decoded = Vec::new();
171                decoder.read_to_end(&mut decoded).unwrap();
172                decoded
173            }
174            #[cfg(feature = "xz")]
175            Codec::Xz => {
176                let mut decoder = XzDecoder::new(&stream[..]);
177                let mut decoded: Vec<u8> = Vec::new();
178                decoder.read_to_end(&mut decoded).unwrap();
179                decoded
180            }
181        };
182        Ok(())
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189    use apache_avro_test_helper::TestResult;
190    use pretty_assertions::{assert_eq, assert_ne};
191
192    const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
193
194    #[test]
195    fn null_compress_and_decompress() -> TestResult {
196        let codec = Codec::Null;
197        let mut stream = INPUT.to_vec();
198        codec.compress(&mut stream)?;
199        assert_eq!(INPUT, stream.as_slice());
200        codec.decompress(&mut stream)?;
201        assert_eq!(INPUT, stream.as_slice());
202        Ok(())
203    }
204
205    #[test]
206    fn deflate_compress_and_decompress() -> TestResult {
207        compress_and_decompress(Codec::Deflate)
208    }
209
210    #[cfg(feature = "snappy")]
211    #[test]
212    fn snappy_compress_and_decompress() -> TestResult {
213        compress_and_decompress(Codec::Snappy)
214    }
215
216    #[cfg(feature = "zstandard")]
217    #[test]
218    fn zstd_compress_and_decompress() -> TestResult {
219        compress_and_decompress(Codec::Zstandard)
220    }
221
222    #[cfg(feature = "bzip")]
223    #[test]
224    fn bzip_compress_and_decompress() -> TestResult {
225        compress_and_decompress(Codec::Bzip2)
226    }
227
228    #[cfg(feature = "xz")]
229    #[test]
230    fn xz_compress_and_decompress() -> TestResult {
231        compress_and_decompress(Codec::Xz)
232    }
233
234    fn compress_and_decompress(codec: Codec) -> TestResult {
235        let mut stream = INPUT.to_vec();
236        codec.compress(&mut stream)?;
237        assert_ne!(INPUT, stream.as_slice());
238        assert!(INPUT.len() > stream.len());
239        codec.decompress(&mut stream)?;
240        assert_eq!(INPUT, stream.as_slice());
241        Ok(())
242    }
243
244    #[test]
245    fn codec_to_str() {
246        assert_eq!(<&str>::from(Codec::Null), "null");
247        assert_eq!(<&str>::from(Codec::Deflate), "deflate");
248
249        #[cfg(feature = "snappy")]
250        assert_eq!(<&str>::from(Codec::Snappy), "snappy");
251
252        #[cfg(feature = "zstandard")]
253        assert_eq!(<&str>::from(Codec::Zstandard), "zstandard");
254
255        #[cfg(feature = "bzip")]
256        assert_eq!(<&str>::from(Codec::Bzip2), "bzip2");
257
258        #[cfg(feature = "xz")]
259        assert_eq!(<&str>::from(Codec::Xz), "xz");
260    }
261
262    #[test]
263    fn codec_from_str() {
264        use std::str::FromStr;
265
266        assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
267        assert_eq!(Codec::from_str("deflate").unwrap(), Codec::Deflate);
268
269        #[cfg(feature = "snappy")]
270        assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
271
272        #[cfg(feature = "zstandard")]
273        assert_eq!(Codec::from_str("zstandard").unwrap(), Codec::Zstandard);
274
275        #[cfg(feature = "bzip")]
276        assert_eq!(Codec::from_str("bzip2").unwrap(), Codec::Bzip2);
277
278        #[cfg(feature = "xz")]
279        assert_eq!(Codec::from_str("xz").unwrap(), Codec::Xz);
280
281        assert!(Codec::from_str("not a codec").is_err());
282    }
283}