use std::cmp;
use std::io;
use std::io::prelude::*;
#[cfg(feature = "tokio")]
use futures::Poll;
#[cfg(feature = "tokio")]
use tokio_io::{AsyncRead, AsyncWrite};
use super::bufread::{corrupt, read_gz_header};
use super::{GzBuilder, GzHeader};
use crc::{Crc, CrcWriter};
use zio;
use {Compress, Compression, Decompress, Status};
#[derive(Debug)]
pub struct GzEncoder<W: Write> {
inner: zio::Writer<W, Compress>,
crc: Crc,
crc_bytes_written: usize,
header: Vec<u8>,
}
pub fn gz_encoder<W: Write>(header: Vec<u8>, w: W, lvl: Compression) -> GzEncoder<W> {
GzEncoder {
inner: zio::Writer::new(w, Compress::new(lvl, false)),
crc: Crc::new(),
header: header,
crc_bytes_written: 0,
}
}
impl<W: Write> GzEncoder<W> {
pub fn new(w: W, level: Compression) -> GzEncoder<W> {
GzBuilder::new().write(w, level)
}
pub fn get_ref(&self) -> &W {
self.inner.get_ref()
}
pub fn get_mut(&mut self) -> &mut W {
self.inner.get_mut()
}
pub fn try_finish(&mut self) -> io::Result<()> {
self.write_header()?;
self.inner.finish()?;
while self.crc_bytes_written < 8 {
let (sum, amt) = (self.crc.sum() as u32, self.crc.amount());
let buf = [
(sum >> 0) as u8,
(sum >> 8) as u8,
(sum >> 16) as u8,
(sum >> 24) as u8,
(amt >> 0) as u8,
(amt >> 8) as u8,
(amt >> 16) as u8,
(amt >> 24) as u8,
];
let inner = self.inner.get_mut();
let n = inner.write(&buf[self.crc_bytes_written..])?;
self.crc_bytes_written += n;
}
Ok(())
}
pub fn finish(mut self) -> io::Result<W> {
self.try_finish()?;
Ok(self.inner.take_inner())
}
fn write_header(&mut self) -> io::Result<()> {
while self.header.len() > 0 {
let n = self.inner.get_mut().write(&self.header)?;
self.header.drain(..n);
}
Ok(())
}
}
impl<W: Write> Write for GzEncoder<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
assert_eq!(self.crc_bytes_written, 0);
self.write_header()?;
let n = self.inner.write(buf)?;
self.crc.update(&buf[..n]);
Ok(n)
}
fn flush(&mut self) -> io::Result<()> {
assert_eq!(self.crc_bytes_written, 0);
self.write_header()?;
self.inner.flush()
}
}
#[cfg(feature = "tokio")]
impl<W: AsyncWrite> AsyncWrite for GzEncoder<W> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
try_nb!(self.try_finish());
self.get_mut().shutdown()
}
}
impl<R: Read + Write> Read for GzEncoder<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.get_mut().read(buf)
}
}
#[cfg(feature = "tokio")]
impl<R: AsyncRead + AsyncWrite> AsyncRead for GzEncoder<R> {}
impl<W: Write> Drop for GzEncoder<W> {
fn drop(&mut self) {
if self.inner.is_present() {
let _ = self.try_finish();
}
}
}
#[derive(Debug)]
pub struct GzDecoder<W: Write> {
inner: zio::Writer<CrcWriter<W>, Decompress>,
crc_bytes: Vec<u8>,
header: Option<GzHeader>,
header_buf: Vec<u8>,
}
const CRC_BYTES_LEN: usize = 8;
impl<W: Write> GzDecoder<W> {
pub fn new(w: W) -> GzDecoder<W> {
GzDecoder {
inner: zio::Writer::new(CrcWriter::new(w), Decompress::new(false)),
crc_bytes: Vec::with_capacity(CRC_BYTES_LEN),
header: None,
header_buf: Vec::new(),
}
}
pub fn header(&self) -> Option<&GzHeader> {
self.header.as_ref()
}
pub fn get_ref(&self) -> &W {
self.inner.get_ref().get_ref()
}
pub fn get_mut(&mut self) -> &mut W {
self.inner.get_mut().get_mut()
}
pub fn try_finish(&mut self) -> io::Result<()> {
self.finish_and_check_crc()?;
Ok(())
}
pub fn finish(mut self) -> io::Result<W> {
self.finish_and_check_crc()?;
Ok(self.inner.take_inner().into_inner())
}
fn finish_and_check_crc(&mut self) -> io::Result<()> {
self.inner.finish()?;
if self.crc_bytes.len() != 8 {
return Err(corrupt());
}
let crc = ((self.crc_bytes[0] as u32) << 0)
| ((self.crc_bytes[1] as u32) << 8)
| ((self.crc_bytes[2] as u32) << 16)
| ((self.crc_bytes[3] as u32) << 24);
let amt = ((self.crc_bytes[4] as u32) << 0)
| ((self.crc_bytes[5] as u32) << 8)
| ((self.crc_bytes[6] as u32) << 16)
| ((self.crc_bytes[7] as u32) << 24);
if crc != self.inner.get_ref().crc().sum() as u32 {
return Err(corrupt());
}
if amt != self.inner.get_ref().crc().amount() {
return Err(corrupt());
}
Ok(())
}
}
struct Counter<T: Read> {
inner: T,
pos: usize,
}
impl<T: Read> Read for Counter<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let pos = self.inner.read(buf)?;
self.pos += pos;
Ok(pos)
}
}
impl<W: Write> Write for GzDecoder<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.header.is_none() {
let (res, pos) = {
let mut counter = Counter {
inner: self.header_buf.chain(buf),
pos: 0,
};
let res = read_gz_header(&mut counter);
(res, counter.pos)
};
match res {
Err(err) => {
if err.kind() == io::ErrorKind::UnexpectedEof {
self.header_buf.extend(buf);
Ok(buf.len())
} else {
Err(err)
}
}
Ok(header) => {
self.header = Some(header);
let pos = pos - self.header_buf.len();
self.header_buf.truncate(0);
Ok(pos)
}
}
} else {
let (n, status) = self.inner.write_with_status(buf)?;
if status == Status::StreamEnd {
if n < buf.len() && self.crc_bytes.len() < 8 {
let remaining = buf.len() - n;
let crc_bytes = cmp::min(remaining, CRC_BYTES_LEN - self.crc_bytes.len());
self.crc_bytes.extend(&buf[n..n + crc_bytes]);
return Ok(n + crc_bytes);
}
}
Ok(n)
}
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
#[cfg(feature = "tokio")]
impl<W: AsyncWrite> AsyncWrite for GzDecoder<W> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
try_nb!(self.try_finish());
self.inner.get_mut().get_mut().shutdown()
}
}
impl<W: Read + Write> Read for GzDecoder<W> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.get_mut().get_mut().read(buf)
}
}
#[cfg(feature = "tokio")]
impl<W: AsyncRead + AsyncWrite> AsyncRead for GzDecoder<W> {}
#[cfg(test)]
mod tests {
use super::*;
const STR: &'static str = "Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World";
#[test]
fn decode_writer_one_chunk() {
let mut e = GzEncoder::new(Vec::new(), Compression::default());
e.write(STR.as_ref()).unwrap();
let bytes = e.finish().unwrap();
let mut writer = Vec::new();
let mut decoder = GzDecoder::new(writer);
let n = decoder.write(&bytes[..]).unwrap();
decoder.write(&bytes[n..]).unwrap();
decoder.try_finish().unwrap();
writer = decoder.finish().unwrap();
let return_string = String::from_utf8(writer).expect("String parsing error");
assert_eq!(return_string, STR);
}
#[test]
fn decode_writer_partial_header() {
let mut e = GzEncoder::new(Vec::new(), Compression::default());
e.write(STR.as_ref()).unwrap();
let bytes = e.finish().unwrap();
let mut writer = Vec::new();
let mut decoder = GzDecoder::new(writer);
assert_eq!(decoder.write(&bytes[..5]).unwrap(), 5);
let n = decoder.write(&bytes[5..]).unwrap();
if n < bytes.len() - 5 {
decoder.write(&bytes[n + 5..]).unwrap();
}
writer = decoder.finish().unwrap();
let return_string = String::from_utf8(writer).expect("String parsing error");
assert_eq!(return_string, STR);
}
#[test]
fn decode_writer_exact_header() {
let mut e = GzEncoder::new(Vec::new(), Compression::default());
e.write(STR.as_ref()).unwrap();
let bytes = e.finish().unwrap();
let mut writer = Vec::new();
let mut decoder = GzDecoder::new(writer);
assert_eq!(decoder.write(&bytes[..10]).unwrap(), 10);
decoder.write(&bytes[10..]).unwrap();
writer = decoder.finish().unwrap();
let return_string = String::from_utf8(writer).expect("String parsing error");
assert_eq!(return_string, STR);
}
#[test]
fn decode_writer_partial_crc() {
let mut e = GzEncoder::new(Vec::new(), Compression::default());
e.write(STR.as_ref()).unwrap();
let bytes = e.finish().unwrap();
let mut writer = Vec::new();
let mut decoder = GzDecoder::new(writer);
let l = bytes.len() - 5;
let n = decoder.write(&bytes[..l]).unwrap();
decoder.write(&bytes[n..]).unwrap();
writer = decoder.finish().unwrap();
let return_string = String::from_utf8(writer).expect("String parsing error");
assert_eq!(return_string, STR);
}
}