#[cfg(feature = "tokio")] extern crate futures;
#[cfg(feature = "tokio")] #[macro_use] extern crate tokio_io;
use std::fmt;
use std::io::prelude::*;
use std::io::{self, BufReader, BufWriter};
use std::error;
#[cfg(feature = "tokio")] use futures::Poll;
#[cfg(feature = "tokio")] use tokio_io::{AsyncRead, AsyncWrite};
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
#[derive(Debug)]
pub struct BufStream<S: Write> {
inner: BufReader<InternalBufWriter<S>>
}
#[derive(Debug)]
pub struct IntoInnerError<W>(W, io::Error);
impl<W> IntoInnerError<W> {
pub fn error(&self) -> &io::Error { &self.1 }
pub fn into_inner(self) -> W { self.0 }
}
impl<W> From<IntoInnerError<W>> for io::Error {
fn from(iie: IntoInnerError<W>) -> io::Error { iie.1 }
}
impl<W: fmt::Debug> error::Error for IntoInnerError<W> {
fn description(&self) -> &str {
error::Error::description(self.error())
}
}
impl<W> fmt::Display for IntoInnerError<W> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.error().fmt(f)
}
}
struct InternalBufWriter<W: Write>(Option<BufWriter<W>>);
impl<W: Write> InternalBufWriter<W> {
fn get_ref(&self) -> &BufWriter<W> {
self.0.as_ref().unwrap()
}
fn get_mut(&mut self) -> &mut BufWriter<W> {
self.0.as_mut().unwrap()
}
}
impl<W: Read + Write> Read for InternalBufWriter<W> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.get_mut().get_mut().read(buf)
}
}
impl<W: Write + fmt::Debug> fmt::Debug for InternalBufWriter<W> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.get_ref().fmt(f)
}
}
impl<S: Read + Write> BufStream<S> {
pub fn with_capacities(reader_cap: usize, writer_cap: usize, inner: S)
-> BufStream<S> {
let writer = BufWriter::with_capacity(writer_cap, inner);
let internal_writer = InternalBufWriter(Some(writer));
let reader = BufReader::with_capacity(reader_cap, internal_writer);
BufStream { inner: reader }
}
pub fn new(inner: S) -> BufStream<S> {
BufStream::with_capacities(DEFAULT_BUF_SIZE, DEFAULT_BUF_SIZE, inner)
}
pub fn get_ref(&self) -> &S {
self.inner.get_ref().get_ref().get_ref()
}
pub fn get_mut(&mut self) -> &mut S {
self.inner.get_mut().get_mut().get_mut()
}
pub fn into_inner(mut self) -> Result<S, IntoInnerError<BufStream<S>>> {
let e = {
let InternalBufWriter(ref mut w) = *self.inner.get_mut();
let (e, w2) = match w.take().unwrap().into_inner() {
Ok(s) => return Ok(s),
Err(err) => {
(io::Error::new(err.error().kind(), err.error().to_string()),
err.into_inner())
}
};
*w = Some(w2);
e
};
Err(IntoInnerError(self, e))
}
}
impl<S: Read + Write> BufRead for BufStream<S> {
fn fill_buf(&mut self) -> io::Result<&[u8]> { self.inner.fill_buf() }
fn consume(&mut self, amt: usize) { self.inner.consume(amt) }
fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize> {
self.inner.read_until(byte, buf)
}
fn read_line(&mut self, string: &mut String) -> io::Result<usize> {
self.inner.read_line(string)
}
}
impl<S: Read + Write> Read for BufStream<S> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl<S: Read + Write> Write for BufStream<S> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.get_mut().0.as_mut().unwrap().write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.get_mut().0.as_mut().unwrap().flush()
}
}
#[cfg(feature = "tokio")]
impl<S: AsyncRead + AsyncWrite> AsyncRead for BufStream<S> {}
#[cfg(feature = "tokio")]
impl<S: AsyncRead + AsyncWrite> AsyncWrite for BufStream<S> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
try_nb!(self.flush());
let mut inner = self.inner.get_mut().0.as_mut().unwrap();
inner.shutdown()
}
}
#[cfg(test)]
mod tests {
use std::io::prelude::*;
use std::io;
use super::BufStream;
#[test]
fn test_buffered_stream() {
struct S;
impl Write for S {
fn write(&mut self, b: &[u8]) -> io::Result<usize> { Ok(b.len()) }
fn flush(&mut self) -> io::Result<()> { Ok(()) }
}
impl Read for S {
fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { Ok(0) }
}
let mut stream = BufStream::new(S);
assert_eq!(stream.read(&mut [0; 10]).unwrap(), 0);
stream.write(&[0; 10]).unwrap();
stream.flush().unwrap();
}
}