use std::io::{self, Read};
use byteorder::{BigEndian, ByteOrder};
use snappy;
use error::{Result, Error};
pub fn compress(src: &[u8]) -> Result<Vec<u8>> {
Ok(snappy::compress(src))
}
fn uncompress_to(src: &[u8], dst: &mut Vec<u8>) -> Result<()> {
snappy::uncompress_to(src, dst)
.map(|_| ())
.map_err(|_| Error::InvalidInputSnappy)
}
const MAGIC: &'static [u8] = &[0x82, b'S', b'N', b'A', b'P', b'P', b'Y', 0];
macro_rules! next_i32 {
($slice:expr) => {{
if $slice.len() < 4 {
return Err(Error::UnexpectedEOF);
}
{ let n = BigEndian::read_i32($slice);
$slice = &$slice[4..];
n
}
}}
}
fn validate_stream(mut stream: &[u8]) -> Result<&[u8]> {
if stream.len() < MAGIC.len() { return Err(Error::UnexpectedEOF); }
if &stream[..MAGIC.len()] != MAGIC { return Err(Error::InvalidInputSnappy); }
stream = &stream[MAGIC.len()..];
let version = next_i32!(stream);
if version != 1 { return Err(Error::InvalidInputSnappy); }
let compat = next_i32!(stream);
if compat != 1 { return Err(Error::InvalidInputSnappy); }
Ok(stream)
}
#[test]
fn test_validate_stream() {
let header = [0x82, 0x53, 0x4e, 0x41, 0x50, 0x50, 0x59, 0x00, 0, 0, 0, 1, 0, 0, 0, 1, 0x56];
let rest = validate_stream(&header).unwrap();
assert_eq!(rest, &[0x56]);
}
pub struct SnappyReader<'a> {
compressed_data: &'a [u8],
uncompressed_pos: usize,
uncompressed_chunk: Vec<u8>,
}
impl<'a> SnappyReader<'a> {
pub fn new(mut stream: &[u8]) -> Result<SnappyReader> {
stream = try!(validate_stream(stream));
Ok(SnappyReader {
compressed_data: stream,
uncompressed_pos: 0,
uncompressed_chunk: Vec::new(),
})
}
fn _read(&mut self, buf: &mut [u8]) -> Result<usize> {
if self.uncompressed_pos < self.uncompressed_chunk.len() {
self.read_uncompressed(buf)
} else if try!(self.next_chunk()) {
self.read_uncompressed(buf)
} else {
Ok(0)
}
}
fn read_uncompressed(&mut self, buf: &mut [u8]) -> Result<usize> {
let n = try!((&self.uncompressed_chunk[self.uncompressed_pos..]).read(buf));
self.uncompressed_pos += n;
Ok(n)
}
fn next_chunk(&mut self) -> Result<bool> {
if self.compressed_data.is_empty() {
return Ok(false);
}
self.uncompressed_pos = 0;
let chunk_size = next_i32!(self.compressed_data);
if chunk_size <= 0 {
return Err(Error::InvalidInputSnappy);
}
let chunk_size = chunk_size as usize;
self.uncompressed_chunk.clear();
try!(uncompress_to(&self.compressed_data[..chunk_size], &mut self.uncompressed_chunk));
self.compressed_data = &self.compressed_data[chunk_size..];
Ok(true)
}
fn _read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
let init_len = buf.len();
if self.uncompressed_pos < self.uncompressed_chunk.len() {
let rest = &self.uncompressed_chunk[self.uncompressed_pos..];
buf.extend_from_slice(rest);
self.uncompressed_pos += rest.len();
}
while !self.compressed_data.is_empty() {
let chunk_size = next_i32!(self.compressed_data);
if chunk_size <= 0 {
return Err(Error::InvalidInputSnappy);
}
let (c1, c2) = self.compressed_data.split_at(chunk_size as usize);
try!(uncompress_to(c1, buf));
self.compressed_data = c2;
}
Ok(buf.len() - init_len)
}
}
macro_rules! to_io_error {
($expr:expr) => {
match $expr {
Ok(n) => Ok(n),
Err(Error::Io(io_error)) => Err(io_error),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
}
}
impl<'a> Read for SnappyReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
to_io_error!(self._read(buf))
}
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
to_io_error!(self._read_to_end(buf))
}
}
#[cfg(test)]
mod tests {
use std::io::Read;
use error::{Error, Result};
use super::{compress, uncompress_to, SnappyReader};
fn uncompress(src: &[u8]) -> Result<Vec<u8>> {
let mut v = Vec::new();
match uncompress_to(src, &mut v) {
Ok(_) => Ok(v),
Err(e) => Err(e),
}
}
#[test]
fn test_compress() {
let msg = "This is test".as_bytes();
let compressed = compress(msg).unwrap();
let expected = &[12, 44, 84, 104, 105, 115, 32, 105, 115, 32, 116, 101, 115, 116];
assert_eq!(&compressed, expected);
}
#[test]
fn test_uncompress() {
let compressed = &[12, 44, 84, 104, 105, 115, 32, 105, 115, 32, 116, 101, 115, 116];
let uncompressed = String::from_utf8(uncompress(compressed).unwrap()).unwrap();
assert_eq!(&uncompressed, "This is test");
}
#[test]
fn test_uncompress_invalid_input() {
let compressed = &[12, 42, 84, 104, 105, 115, 32, 105, 115, 32, 116, 101, 115, 116];
let uncompressed = uncompress(compressed);
assert!(uncompressed.is_err());
assert!(if let Some(Error::InvalidInputSnappy) = uncompressed.err() { true } else { false });
}
static ORIGINAL: &'static str =
include_str!("../../test-data/fetch1.txt");
static COMPRESSED: &'static [u8] =
include_bytes!("../../test-data/fetch1.snappy.chunked.4k");
#[test]
fn test_snappy_reader_read() {
let mut buf = Vec::new();
let mut r = SnappyReader::new(COMPRESSED).unwrap();
let mut tmp_buf = [0u8; 1024];
loop {
match r.read(&mut tmp_buf).unwrap() {
0 => break,
n => buf.extend_from_slice(&tmp_buf[..n]),
}
}
assert_eq!(ORIGINAL.as_bytes(), &buf[..]);
}
#[test]
fn test_snappy_reader_read_to_end() {
let mut buf = Vec::new();
let mut r = SnappyReader::new(COMPRESSED).unwrap();
r.read_to_end(&mut buf).unwrap();
assert_eq!(ORIGINAL.as_bytes(), &buf[..]);
}
}