cdrs_async/
compressor.rs

1//!CDRS support traffic decompression as it is described in [Apache
2//!Cassandra protocol](
3//!https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v4.spec#L790)
4//!
5//!Before being used, client and server must agree on a compression algorithm to
6//!use, which is done in the STARTUP message. As a consequence, a STARTUP message
7//!must never be compressed.  However, once the STARTUP frame has been received
8//!by the server, messages can be compressed (including the response to the STARTUP
9//!request).
10
11use std::{convert::From, error::Error, fmt, io, result};
12
13use cassandra_proto::compression::Compressor;
14use lz4_compress as lz4;
15use snap;
16
17type Result<T> = result::Result<T, CompressionError>;
18
19pub const LZ4: &'static str = "lz4";
20pub const SNAPPY: &'static str = "snappy";
21
22/// It's an error which may occure during encoding or deconding
23/// frame body. As there are only two types of compressors it
24/// contains two related enum options.
25#[derive(Debug)]
26pub enum CompressionError {
27  /// Snappy error.
28  Snappy(snap::Error),
29  /// Lz4 error.
30  Lz4(io::Error),
31}
32
33impl fmt::Display for CompressionError {
34  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35    match *self {
36      CompressionError::Snappy(ref err) => write!(f, "Snappy Error: {:?}", err),
37      CompressionError::Lz4(ref err) => write!(f, "Lz4 Error: {:?}", err),
38    }
39  }
40}
41
42impl Error for CompressionError {
43  fn description(&self) -> &str {
44    match *self {
45      CompressionError::Snappy(ref err) => err.description(),
46      CompressionError::Lz4(ref err) => err.description(),
47    }
48  }
49}
50
51/// Enum which represents a type of compression. Only non-startup frame's body can be compressed.
52///
53/// It encodes `bytes` basing on type of `Compression`.
54///
55/// # Examples
56///
57/// ```
58///   use cdrs::compression::Compression;
59///
60///   let snappy_compression = Compression::Snappy;
61///   let bytes = String::from("Hello World").into_bytes().to_vec();
62///   let encoded = snappy_compression.encode(bytes.clone()).unwrap();
63///   assert_eq!(snappy_compression.decode(encoded).unwrap(), bytes);
64///
65/// ```
66///
67/// It decodes `bytes` basing on type of compression.
68///
69/// # Examples
70///
71/// ```
72///    use cdrs::compression::Compression;
73///    let lz4_compression = Compression::Lz4;
74///    let bytes = String::from("Hello World").into_bytes().to_vec();
75///    let encoded = lz4_compression.encode(bytes.clone()).unwrap();
76///    let len = encoded.len() as u8;
77///    let mut input = vec![0, 0, 0, len];
78///    input.extend_from_slice(encoded.as_slice());
79///    assert_eq!(lz4_compression.decode(input).unwrap(), bytes);
80/// ```
81#[derive(Debug, PartialEq, Clone, Copy, Eq, Ord, PartialOrd)]
82pub enum Compression {
83  /// [lz4](https://code.google.com/p/lz4/) compression.
84  Lz4,
85  /// [snappy](https://code.google.com/p/snappy/) compression.
86  Snappy,
87  /// Without compression.
88  None,
89}
90
91impl Compression {
92  /// It transforms compression method into a `&str`.
93  pub fn as_str(&self) -> Option<&'static str> {
94    match *self {
95      Compression::Lz4 => Some(LZ4),
96      Compression::Snappy => Some(SNAPPY),
97      Compression::None => None,
98    }
99  }
100
101  fn encode_snappy(bytes: Vec<u8>) -> Result<Vec<u8>> {
102    let mut encoder = snap::Encoder::new();
103    encoder
104      .compress_vec(bytes.as_slice())
105      .map_err(CompressionError::Snappy)
106  }
107
108  fn decode_snappy(bytes: Vec<u8>) -> Result<Vec<u8>> {
109    let mut decoder = snap::Decoder::new();
110    decoder
111      .decompress_vec(bytes.as_slice())
112      .map_err(CompressionError::Snappy)
113  }
114
115  fn encode_lz4(bytes: Vec<u8>) -> Result<Vec<u8>> {
116    Ok(lz4::compress(bytes.as_slice()))
117  }
118
119  fn decode_lz4(bytes: Vec<u8>) -> Result<Vec<u8>> {
120    // skip first 4 bytes in accordance to
121    // https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v4.spec#L805
122    lz4::decompress(&bytes[4..]).map_err(CompressionError::Lz4)
123  }
124}
125
126impl Compressor for Compression {
127  type CompressorError = CompressionError;
128
129  fn encode(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
130    match *self {
131      Compression::Lz4 => Compression::encode_lz4(bytes),
132      Compression::Snappy => Compression::encode_snappy(bytes),
133      Compression::None => Ok(bytes),
134    }
135  }
136
137  fn decode(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
138    match *self {
139      Compression::Lz4 => Compression::decode_lz4(bytes),
140      Compression::Snappy => Compression::decode_snappy(bytes),
141      Compression::None => Ok(bytes),
142    }
143  }
144
145  fn into_string(&self) -> Option<String> {
146    match *self {
147      Compression::Lz4 => Some(LZ4.into()),
148      Compression::Snappy => Some(SNAPPY.into()),
149      Compression::None => None,
150    }
151  }
152}
153
154impl From<String> for Compression {
155  /// It converts `String` into `Compression`. If string is neither `lz4` nor `snappy` then
156  /// `Compression::None` will be returned
157  fn from(compression_string: String) -> Compression {
158    Compression::from(compression_string.as_str())
159  }
160}
161
162impl<'a> From<&'a str> for Compression {
163  /// It converts `str` into `Compression`. If string is neither `lz4` nor `snappy` then
164  /// `Compression::None` will be returned
165  fn from(compression_str: &'a str) -> Compression {
166    match compression_str {
167      LZ4 => Compression::Lz4,
168      SNAPPY => Compression::Snappy,
169      _ => Compression::None,
170    }
171  }
172}
173
174#[cfg(test)]
175mod tests {
176  use super::*;
177  #[test]
178  fn test_compression_from_str() {
179    let lz4 = "lz4";
180    assert_eq!(Compression::from(lz4), Compression::Lz4);
181    let snappy = "snappy";
182    assert_eq!(Compression::from(snappy), Compression::Snappy);
183    let none = "x";
184    assert_eq!(Compression::from(none), Compression::None);
185  }
186
187  #[test]
188  fn test_compression_from_string() {
189    let lz4 = "lz4".to_string();
190    assert_eq!(Compression::from(lz4), Compression::Lz4);
191    let snappy = "snappy".to_string();
192    assert_eq!(Compression::from(snappy), Compression::Snappy);
193    let none = "x".to_string();
194    assert_eq!(Compression::from(none), Compression::None);
195  }
196
197  #[test]
198  fn test_compression_encode_snappy() {
199    let snappy_compression = Compression::Snappy;
200    let bytes = String::from("Hello World").into_bytes().to_vec();
201    snappy_compression
202      .encode(bytes.clone())
203      .expect("Should work without exceptions");
204  }
205
206  #[test]
207  fn test_compression_decode_snappy() {
208    let snappy_compression = Compression::Snappy;
209    let bytes = String::from("Hello World").into_bytes().to_vec();
210    let encoded = snappy_compression.encode(bytes.clone()).unwrap();
211    assert_eq!(snappy_compression.decode(encoded).unwrap(), bytes);
212  }
213
214  #[test]
215  fn test_compression_encode_lz4() {
216    let snappy_compression = Compression::Lz4;
217    let bytes = String::from("Hello World").into_bytes().to_vec();
218    snappy_compression
219      .encode(bytes.clone())
220      .expect("Should work without exceptions");
221  }
222
223  #[test]
224  fn test_compression_decode_lz4() {
225    let lz4_compression = Compression::Lz4;
226    let bytes = String::from("Hello World").into_bytes().to_vec();
227    let encoded = lz4_compression.encode(bytes.clone()).unwrap();
228    let len = encoded.len() as u8;
229    let mut input = vec![0, 0, 0, len];
230    input.extend_from_slice(encoded.as_slice());
231    assert_eq!(lz4_compression.decode(input).unwrap(), bytes);
232  }
233
234  #[test]
235  fn test_compression_encode_none() {
236    let none_compression = Compression::None;
237    let bytes = String::from("Hello World").into_bytes().to_vec();
238    none_compression
239      .encode(bytes.clone())
240      .expect("Should work without exceptions");
241  }
242
243  #[test]
244  fn test_compression_decode_none() {
245    let none_compression = Compression::None;
246    let bytes = String::from("Hello World").into_bytes().to_vec();
247    let encoded = none_compression.encode(bytes.clone()).unwrap();
248    assert_eq!(none_compression.decode(encoded).unwrap(), bytes);
249  }
250
251  #[test]
252  fn test_compression_encode_lz4_with_invalid_input() {
253    let lz4_compression = Compression::Lz4;
254    let bytes: Vec<u8> = vec![0x7f, 0x7f, 0x7f, 0x7f, 0x7f];
255    let encoded = lz4_compression.encode(bytes.clone()).unwrap();
256    let decode = lz4_compression.decode(encoded);
257    assert_eq!(decode.is_err(), true);
258  }
259
260  #[test]
261  fn test_compression_encode_snappy_with_non_utf8() {
262    let snappy_compression = Compression::Snappy;
263    let v = vec![0xff, 0xff];
264    let encoded = snappy_compression
265      .encode(v.clone())
266      .expect("Should work without exceptions");
267    assert_eq!(snappy_compression.decode(encoded).unwrap(), v);
268  }
269}