1use crate::{types::Value, AvroResult, Error};
20use libflate::deflate::{Decoder, Encoder};
21use std::io::{Read, Write};
22use strum_macros::{EnumIter, EnumString, IntoStaticStr};
23
24#[cfg(feature = "bzip")]
25use bzip2::{
26 read::{BzDecoder, BzEncoder},
27 Compression,
28};
29#[cfg(feature = "snappy")]
30extern crate crc32fast;
31#[cfg(feature = "snappy")]
32use crc32fast::Hasher;
33#[cfg(feature = "xz")]
34use xz2::read::{XzDecoder, XzEncoder};
35
36#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)]
38#[strum(serialize_all = "kebab_case")]
39pub enum Codec {
40 Null,
42 Deflate,
46 #[cfg(feature = "snappy")]
47 Snappy,
51 #[cfg(feature = "zstandard")]
52 Zstandard,
53 #[cfg(feature = "bzip")]
54 Bzip2,
57 #[cfg(feature = "xz")]
58 Xz,
61}
62
63impl From<Codec> for Value {
64 fn from(value: Codec) -> Self {
65 Self::Bytes(<&str>::from(value).as_bytes().to_vec())
66 }
67}
68
69impl Codec {
70 pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
72 match self {
73 Codec::Null => (),
74 Codec::Deflate => {
75 let mut encoder = Encoder::new(Vec::new());
76 encoder.write_all(stream).map_err(Error::DeflateCompress)?;
77 *stream = encoder
79 .finish()
80 .into_result()
81 .map_err(Error::DeflateCompressFinish)?;
82 }
83 #[cfg(feature = "snappy")]
84 Codec::Snappy => {
85 let mut encoded: Vec<u8> = vec![0; snap::raw::max_compress_len(stream.len())];
86 let compressed_size = snap::raw::Encoder::new()
87 .compress(&stream[..], &mut encoded[..])
88 .map_err(Error::SnappyCompress)?;
89
90 let mut hasher = Hasher::new();
91 hasher.update(&stream[..]);
92 let checksum = hasher.finalize();
93 let checksum_as_bytes = checksum.to_be_bytes();
94 let checksum_len = checksum_as_bytes.len();
95 encoded.truncate(compressed_size + checksum_len);
96 encoded[compressed_size..].copy_from_slice(&checksum_as_bytes);
97
98 *stream = encoded;
99 }
100 #[cfg(feature = "zstandard")]
101 Codec::Zstandard => {
102 let mut encoder = zstd::Encoder::new(Vec::new(), 0).unwrap();
103 encoder.write_all(stream).map_err(Error::ZstdCompress)?;
104 *stream = encoder.finish().unwrap();
105 }
106 #[cfg(feature = "bzip")]
107 Codec::Bzip2 => {
108 let mut encoder = BzEncoder::new(&stream[..], Compression::best());
109 let mut buffer = Vec::new();
110 encoder.read_to_end(&mut buffer).unwrap();
111 *stream = buffer;
112 }
113 #[cfg(feature = "xz")]
114 Codec::Xz => {
115 let compression_level = 9;
116 let mut encoder = XzEncoder::new(&stream[..], compression_level);
117 let mut buffer = Vec::new();
118 encoder.read_to_end(&mut buffer).unwrap();
119 *stream = buffer;
120 }
121 };
122
123 Ok(())
124 }
125
126 pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
128 *stream = match self {
129 Codec::Null => return Ok(()),
130 Codec::Deflate => {
131 let mut decoded = Vec::new();
132 let mut decoder = Decoder::new(&stream[..]);
133 decoder
134 .read_to_end(&mut decoded)
135 .map_err(Error::DeflateDecompress)?;
136 decoded
137 }
138 #[cfg(feature = "snappy")]
139 Codec::Snappy => {
140 let decompressed_size = snap::raw::decompress_len(&stream[..stream.len() - 4])
141 .map_err(Error::GetSnappyDecompressLen)?;
142 let mut decoded = vec![0; decompressed_size];
143 snap::raw::Decoder::new()
144 .decompress(&stream[..stream.len() - 4], &mut decoded[..])
145 .map_err(Error::SnappyDecompress)?;
146
147 let mut last_four: [u8; 4] = [0; 4];
148 last_four.copy_from_slice(&stream[(stream.len() - 4)..]);
149 let expected: u32 = u32::from_be_bytes(last_four);
150
151 let mut hasher = Hasher::new();
152 hasher.update(&decoded);
153 let actual = hasher.finalize();
154
155 if expected != actual {
156 return Err(Error::SnappyCrc32 { expected, actual });
157 }
158 decoded
159 }
160 #[cfg(feature = "zstandard")]
161 Codec::Zstandard => {
162 let mut decoded = Vec::new();
163 let mut decoder = zstd::Decoder::new(&stream[..]).unwrap();
164 std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?;
165 decoded
166 }
167 #[cfg(feature = "bzip")]
168 Codec::Bzip2 => {
169 let mut decoder = BzDecoder::new(&stream[..]);
170 let mut decoded = Vec::new();
171 decoder.read_to_end(&mut decoded).unwrap();
172 decoded
173 }
174 #[cfg(feature = "xz")]
175 Codec::Xz => {
176 let mut decoder = XzDecoder::new(&stream[..]);
177 let mut decoded: Vec<u8> = Vec::new();
178 decoder.read_to_end(&mut decoded).unwrap();
179 decoded
180 }
181 };
182 Ok(())
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189 use apache_avro_test_helper::TestResult;
190 use pretty_assertions::{assert_eq, assert_ne};
191
192 const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
193
194 #[test]
195 fn null_compress_and_decompress() -> TestResult {
196 let codec = Codec::Null;
197 let mut stream = INPUT.to_vec();
198 codec.compress(&mut stream)?;
199 assert_eq!(INPUT, stream.as_slice());
200 codec.decompress(&mut stream)?;
201 assert_eq!(INPUT, stream.as_slice());
202 Ok(())
203 }
204
205 #[test]
206 fn deflate_compress_and_decompress() -> TestResult {
207 compress_and_decompress(Codec::Deflate)
208 }
209
210 #[cfg(feature = "snappy")]
211 #[test]
212 fn snappy_compress_and_decompress() -> TestResult {
213 compress_and_decompress(Codec::Snappy)
214 }
215
216 #[cfg(feature = "zstandard")]
217 #[test]
218 fn zstd_compress_and_decompress() -> TestResult {
219 compress_and_decompress(Codec::Zstandard)
220 }
221
222 #[cfg(feature = "bzip")]
223 #[test]
224 fn bzip_compress_and_decompress() -> TestResult {
225 compress_and_decompress(Codec::Bzip2)
226 }
227
228 #[cfg(feature = "xz")]
229 #[test]
230 fn xz_compress_and_decompress() -> TestResult {
231 compress_and_decompress(Codec::Xz)
232 }
233
234 fn compress_and_decompress(codec: Codec) -> TestResult {
235 let mut stream = INPUT.to_vec();
236 codec.compress(&mut stream)?;
237 assert_ne!(INPUT, stream.as_slice());
238 assert!(INPUT.len() > stream.len());
239 codec.decompress(&mut stream)?;
240 assert_eq!(INPUT, stream.as_slice());
241 Ok(())
242 }
243
244 #[test]
245 fn codec_to_str() {
246 assert_eq!(<&str>::from(Codec::Null), "null");
247 assert_eq!(<&str>::from(Codec::Deflate), "deflate");
248
249 #[cfg(feature = "snappy")]
250 assert_eq!(<&str>::from(Codec::Snappy), "snappy");
251
252 #[cfg(feature = "zstandard")]
253 assert_eq!(<&str>::from(Codec::Zstandard), "zstandard");
254
255 #[cfg(feature = "bzip")]
256 assert_eq!(<&str>::from(Codec::Bzip2), "bzip2");
257
258 #[cfg(feature = "xz")]
259 assert_eq!(<&str>::from(Codec::Xz), "xz");
260 }
261
262 #[test]
263 fn codec_from_str() {
264 use std::str::FromStr;
265
266 assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
267 assert_eq!(Codec::from_str("deflate").unwrap(), Codec::Deflate);
268
269 #[cfg(feature = "snappy")]
270 assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
271
272 #[cfg(feature = "zstandard")]
273 assert_eq!(Codec::from_str("zstandard").unwrap(), Codec::Zstandard);
274
275 #[cfg(feature = "bzip")]
276 assert_eq!(Codec::from_str("bzip2").unwrap(), Codec::Bzip2);
277
278 #[cfg(feature = "xz")]
279 assert_eq!(Codec::from_str("xz").unwrap(), Codec::Xz);
280
281 assert!(Codec::from_str("not a codec").is_err());
282 }
283}