use std::io::{Write, Error, ErrorKind};
use futures::{Poll, Async, Future};
use pattern::Window;
use super::AsyncIoError;
pub trait AsyncWrite: Write + Sized {
fn async_write<B: AsRef<[u8]>>(self, buf: B) -> WriteBytes<Self, B> {
WriteBytes(Some((self, buf)))
}
fn async_write_all<B: AsRef<[u8]>>(self, buf: B) -> WriteAll<Self, B> {
WriteAll(self.async_write(Window::new_ref(buf)))
}
fn async_flush(self) -> Flush<Self> {
Flush(Some(self))
}
}
impl<W: Write> AsyncWrite for W {}
#[derive(Debug)]
pub struct WriteBytes<W, B>(Option<(W, B)>);
impl<W, B> WriteBytes<W, B> {
pub fn writer(&self) -> &W {
&self.0.as_ref().expect("WriteBytes has been consumed").0
}
pub fn writer_mut(&mut self) -> &mut W {
&mut self.0.as_mut().expect("WriteBytes has been consumed").0
}
}
impl<W: Write, B: AsRef<[u8]>> Future for WriteBytes<W, B> {
type Item = (W, B, usize);
type Error = AsyncIoError<(W, B)>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (mut w, b) = self.0.take().expect("Cannot poll WriteBytes twice");
match w.write(b.as_ref()) {
Ok(write_size) => Ok(Async::Ready((w, b, write_size))),
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
self.0 = Some((w, b));
Ok(Async::NotReady)
} else {
Err(AsyncIoError::new((w, b), e))
}
}
}
}
}
#[derive(Debug)]
pub struct WriteAll<W, B>(WriteBytes<W, Window<B>>);
impl<W, B> WriteAll<W, B> {
pub fn writer(&self) -> &W {
self.0.writer()
}
pub fn writer_mut(&mut self) -> &mut W {
self.0.writer_mut()
}
}
impl<W: Write, B: AsRef<[u8]>> Future for WriteAll<W, B> {
type Item = (W, B);
type Error = AsyncIoError<(W, B)>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
while let Async::Ready((w, b, size)) =
self.0.poll().map_err(
|e| e.map_state(|(w, b)| (w, b.into_inner())),
)?
{
let b = b.skip(size);
if b.as_ref().is_empty() {
return Ok(Async::Ready((w, b.into_inner())));
} else if size == 0 {
let e = Error::new(
ErrorKind::UnexpectedEof,
format!("Unexpected EOF (remaining {} bytes", b.as_ref().len()),
);
return Err(AsyncIoError::new((w, b.into_inner()), e));
} else {
self.0 = w.async_write(b);
}
}
Ok(Async::NotReady)
}
}
#[derive(Debug)]
pub struct Flush<W>(Option<W>);
impl<W> Flush<W> {
pub fn writer(&self) -> &W {
self.0.as_ref().expect("Flush has been consumed")
}
pub fn writer_mut(&mut self) -> &mut W {
self.0.as_mut().expect("Flush has been consumed")
}
}
impl<W: Write> Future for Flush<W> {
type Item = W;
type Error = AsyncIoError<W>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut w = self.0.take().expect("Cannot poll Flush twice");
match w.flush() {
Ok(()) => Ok(Async::Ready(w)),
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
self.0 = Some(w);
Ok(Async::NotReady)
} else {
Err(AsyncIoError::new(w, e))
}
}
}
}
}