use std::io::{Read, Error, ErrorKind};
use futures::{Poll, Async, Future};
use pattern::Window;
use super::AsyncIoError;
pub trait AsyncRead: Read + Sized {
fn async_read<B: AsMut<[u8]>>(self, buf: B) -> ReadBytes<Self, B> {
ReadBytes(Some((self, buf)))
}
fn async_read_non_empty<B: AsMut<[u8]>>(self, buf: B) -> ReadNonEmpty<Self, B> {
ReadNonEmpty(self.async_read(buf))
}
fn async_read_exact<B: AsMut<[u8]>>(self, buf: B) -> ReadExact<Self, B> {
ReadExact(self.async_read_non_empty(Window::new_mut(buf)))
}
}
impl<R: Read> AsyncRead for R {}
#[derive(Debug)]
pub struct ReadBytes<R, B>(Option<(R, B)>);
impl<R, B> ReadBytes<R, B> {
pub fn reader(&self) -> &R {
&self.0.as_ref().expect("ReadBytes has been consumed").0
}
pub fn reader_mut(&mut self) -> &mut R {
&mut self.0.as_mut().expect("ReadBytes has been consumed").0
}
}
impl<R: Read, B: AsMut<[u8]>> Future for ReadBytes<R, B> {
type Item = (R, B, usize);
type Error = AsyncIoError<(R, B)>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (mut inner, mut buf) = self.0.take().expect("Cannot poll ReadBytes twice");
match inner.read(buf.as_mut()) {
Ok(size) => Ok(Async::Ready((inner, buf, size))),
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
self.0 = Some((inner, buf));
Ok(Async::NotReady)
} else {
Err(AsyncIoError::new((inner, buf), e))
}
}
}
}
}
#[derive(Debug)]
pub struct ReadNonEmpty<R, B>(ReadBytes<R, B>);
impl<R, B> ReadNonEmpty<R, B> {
pub fn reader(&self) -> &R {
self.0.reader()
}
pub fn reader_mut(&mut self) -> &mut R {
self.0.reader_mut()
}
}
impl<R: Read, B: AsMut<[u8]>> Future for ReadNonEmpty<R, B> {
type Item = (R, B, usize);
type Error = AsyncIoError<(R, B)>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready((r, mut b, size)) = self.0.poll()? {
if size == 0 && !b.as_mut().is_empty() {
let e = Error::new(
ErrorKind::UnexpectedEof,
format!("Unexpected Eof ({} bytes are required)", b.as_mut().len()),
);
Err(AsyncIoError::new((r, b), e))
} else {
Ok(Async::Ready((r, b, size)))
}
} else {
Ok(Async::NotReady)
}
}
}
#[derive(Debug)]
pub struct ReadExact<R, B>(ReadNonEmpty<R, Window<B>>);
impl<R, B> ReadExact<R, B> {
pub fn reader(&self) -> &R {
self.0.reader()
}
pub fn reader_mut(&mut self) -> &mut R {
self.0.reader_mut()
}
}
impl<R, B> Future for ReadExact<R, B>
where
R: Read,
B: AsMut<[u8]>,
{
type Item = (R, B);
type Error = AsyncIoError<(R, B)>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
while let Async::Ready((r, b, read_size)) =
self.0.poll().map_err(
|e| e.map_state(|(r, b)| (r, b.into_inner())),
)?
{
let mut b = b.skip(read_size);
if b.as_mut().is_empty() {
return Ok(Async::Ready((r, b.into_inner())));
} else {
self.0 = r.async_read_non_empty(b);
}
}
Ok(Async::NotReady)
}
}