use std::io;
use std::io::prelude::*;
#[cfg(feature = "tokio")]
use futures::Poll;
#[cfg(feature = "tokio")]
use tokio_io::{AsyncRead, AsyncWrite};
use {Action, Compress, Compression, Decompress, Status};
pub struct BzEncoder<R> {
obj: R,
data: Compress,
done: bool,
}
pub struct BzDecoder<R> {
obj: R,
data: Decompress,
done: bool,
multi: bool,
}
impl<R: BufRead> BzEncoder<R> {
pub fn new(r: R, level: Compression) -> BzEncoder<R> {
BzEncoder {
obj: r,
data: Compress::new(level, 30),
done: false,
}
}
}
impl<R> BzEncoder<R> {
pub fn get_ref(&self) -> &R {
&self.obj
}
pub fn get_mut(&mut self) -> &mut R {
&mut self.obj
}
pub fn into_inner(self) -> R {
self.obj
}
pub fn total_out(&self) -> u64 {
self.data.total_out()
}
pub fn total_in(&self) -> u64 {
self.data.total_in()
}
}
impl<R: BufRead> Read for BzEncoder<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.done {
return Ok(0);
}
loop {
let (read, consumed, eof, ret);
{
let input = try!(self.obj.fill_buf());
eof = input.is_empty();
let before_out = self.data.total_out();
let before_in = self.data.total_in();
let action = if eof { Action::Finish } else { Action::Run };
ret = self.data.compress(input, buf, action);
read = (self.data.total_out() - before_out) as usize;
consumed = (self.data.total_in() - before_in) as usize;
}
self.obj.consume(consumed);
let ret = ret.unwrap();
if read == 0 && !eof && buf.len() > 0 {
continue;
}
if ret == Status::StreamEnd {
self.done = true;
}
return Ok(read);
}
}
}
#[cfg(feature = "tokio")]
impl<R: AsyncRead + BufRead> AsyncRead for BzEncoder<R> {}
impl<W: Write> Write for BzEncoder<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.get_mut().write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.get_mut().flush()
}
}
#[cfg(feature = "tokio")]
impl<R: AsyncWrite> AsyncWrite for BzEncoder<R> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.get_mut().shutdown()
}
}
impl<R: BufRead> BzDecoder<R> {
pub fn new(r: R) -> BzDecoder<R> {
BzDecoder {
obj: r,
data: Decompress::new(false),
done: false,
multi: false,
}
}
fn multi(mut self, flag: bool) -> BzDecoder<R> {
self.multi = flag;
self
}
}
impl<R> BzDecoder<R> {
pub fn get_ref(&self) -> &R {
&self.obj
}
pub fn get_mut(&mut self) -> &mut R {
&mut self.obj
}
pub fn into_inner(self) -> R {
self.obj
}
pub fn total_in(&self) -> u64 {
self.data.total_in()
}
pub fn total_out(&self) -> u64 {
self.data.total_out()
}
}
impl<R: BufRead> Read for BzDecoder<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.done {
return Ok(0);
}
loop {
let (read, consumed, eof, ret);
{
let input = try!(self.obj.fill_buf());
eof = input.is_empty();
let before_out = self.data.total_out();
let before_in = self.data.total_in();
ret = self.data.decompress(input, buf);
read = (self.data.total_out() - before_out) as usize;
consumed = (self.data.total_in() - before_in) as usize;
}
self.obj.consume(consumed);
let ret = try!(ret.map_err(|e| { io::Error::new(io::ErrorKind::InvalidInput, e) }));
if ret == Status::StreamEnd {
if !eof && self.multi {
self.data = Decompress::new(false);
} else {
self.done = true;
}
return Ok(read);
}
if read > 0 || eof || buf.len() == 0 {
return Ok(read);
}
}
}
}
#[cfg(feature = "tokio")]
impl<R: AsyncRead + BufRead> AsyncRead for BzDecoder<R> {}
impl<W: Write> Write for BzDecoder<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.get_mut().write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.get_mut().flush()
}
}
#[cfg(feature = "tokio")]
impl<R: AsyncWrite> AsyncWrite for BzDecoder<R> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.get_mut().shutdown()
}
}
pub struct MultiBzDecoder<R>(BzDecoder<R>);
impl<R: BufRead> MultiBzDecoder<R> {
pub fn new(r: R) -> MultiBzDecoder<R> {
MultiBzDecoder(BzDecoder::new(r).multi(true))
}
}
impl<R> MultiBzDecoder<R> {
pub fn get_ref(&self) -> &R {
self.0.get_ref()
}
pub fn get_mut(&mut self) -> &mut R {
self.0.get_mut()
}
pub fn into_inner(self) -> R {
self.0.into_inner()
}
}
impl<R: BufRead> Read for MultiBzDecoder<R> {
fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
self.0.read(into)
}
}
#[cfg(feature = "tokio")]
impl<R: AsyncRead + BufRead> AsyncRead for MultiBzDecoder<R> {}
impl<R: BufRead + Write> Write for MultiBzDecoder<R> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.get_mut().write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.get_mut().flush()
}
}
#[cfg(feature = "tokio")]
impl<R: AsyncWrite + BufRead> AsyncWrite for MultiBzDecoder<R> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.get_mut().shutdown()
}
}