use std::io::{Read, Write};
use std::str::FromStr;
use failure::Error;
use libflate::deflate::{Decoder, Encoder};
#[cfg(feature = "snappy")]
use snap::{Reader, Writer};
use types::{ToAvro, Value};
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Codec {
Null,
Deflate,
#[cfg(feature = "snappy")]
Snappy,
}
impl ToAvro for Codec {
fn avro(self) -> Value {
Value::Bytes(
match self {
Codec::Null => "null",
Codec::Deflate => "deflate",
#[cfg(feature = "snappy")]
Codec::Snappy => "snappy",
}.to_owned()
.into_bytes(),
)
}
}
impl FromStr for Codec {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"null" => Ok(Codec::Null),
"deflate" => Ok(Codec::Deflate),
#[cfg(feature = "snappy")]
"snappy" => Ok(Codec::Snappy),
_ => Err(()),
}
}
}
impl Codec {
pub fn compress(&self, stream: &mut Vec<u8>) -> Result<(), Error> {
match self {
&Codec::Null => (),
&Codec::Deflate => {
let mut encoder = Encoder::new(Vec::new());
encoder.write_all(stream)?;
*stream = encoder.finish().into_result()?;
},
#[cfg(feature = "snappy")]
&Codec::Snappy => {
let mut writer = Writer::new(Vec::new());
writer.write_all(stream)?;
*stream = writer.into_inner()?; },
};
Ok(())
}
pub fn decompress(&self, stream: &mut Vec<u8>) -> Result<(), Error> {
match self {
&Codec::Null => (),
&Codec::Deflate => {
let mut decoded = Vec::new();
{
let mut decoder = Decoder::new(&stream[..]);
decoder.read_to_end(&mut decoded)?;
}
*stream = decoded;
},
#[cfg(feature = "snappy")]
&Codec::Snappy => {
let mut read = Vec::new();
{
let mut reader = Reader::new(&stream[..]);
reader.read_to_end(&mut read)?;
}
*stream = read;
},
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
static INPUT: &'static [u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
#[test]
fn null_compress_and_decompress() {
let codec = Codec::Null;
let mut stream = INPUT.to_vec();
codec.compress(&mut stream).unwrap();
assert_eq!(INPUT, stream.as_slice());
codec.decompress(&mut stream).unwrap();
assert_eq!(INPUT, stream.as_slice());
}
#[test]
fn deflate_compress_and_decompress() {
let codec = Codec::Deflate;
let mut stream = INPUT.to_vec();
codec.compress(&mut stream).unwrap();
assert_ne!(INPUT, stream.as_slice());
assert!(INPUT.len() > stream.len());
codec.decompress(&mut stream).unwrap();
assert_eq!(INPUT, stream.as_slice());
}
#[cfg(feature = "snappy")]
#[test]
fn snappy_compress_and_decompress() {
let codec = Codec::Snappy;
let mut stream = INPUT.to_vec();
codec.compress(&mut stream).unwrap();
assert_ne!(INPUT, stream.as_slice());
assert!(INPUT.len() > stream.len());
codec.decompress(&mut stream).unwrap();
assert_eq!(INPUT, stream.as_slice());
}
}