handy_async/io/
async_write.rs

1use std::io::{Write, Error, ErrorKind};
2use futures::{Poll, Async, Future};
3
4use pattern::Window;
5use super::AsyncIoError;
6
7/// An asynchronous version of the standard `Write` trait.
8///
9/// Since this is assumed as a basic building block,
10/// it may be more convenient to use [`WriteInto`](./trait.WriteInto.html) for ordinary cases.
11///
12/// # Notice
13///
14/// For executing asynchronously, we assume the writer which implements this trait
15/// returns the `std::io::ErrorKind::WouldBlock` error
16/// if a write operation would be about to block.
17pub trait AsyncWrite: Write + Sized {
18    /// Creates a future which will write bytes asynchronously.
19    ///
20    /// # Examples
21    ///
22    /// ```
23    /// # extern crate futures;
24    /// # extern crate handy_async;
25    /// use futures::Future;
26    /// use handy_async::io::AsyncWrite;
27    ///
28    /// # fn main() {
29    /// let (output, _, _) = vec![].async_write(b"hello").wait().ok().unwrap();
30    /// assert_eq!(&output[..], b"hello");
31    ///
32    /// let mut output = [0; 3];
33    /// let (_, _, _) = (&mut output).async_write(b"hello").wait().ok().unwrap();
34    /// assert_eq!(&output[..], b"hel");
35    ///
36    /// let (_, _, written_size) = [0; 0].async_write(b"hello").wait().ok().unwrap();
37    /// assert_eq!(written_size, 0);
38    /// # }
39    /// ```
40    fn async_write<B: AsRef<[u8]>>(self, buf: B) -> WriteBytes<Self, B> {
41        WriteBytes(Some((self, buf)))
42    }
43
44    /// Creates a future which will write all bytes asynchronously.
45    ///
46    /// # Examples
47    ///
48    /// ```
49    /// # extern crate futures;
50    /// # extern crate handy_async;
51    /// use std::io::ErrorKind;
52    /// use futures::Future;
53    /// use handy_async::io::AsyncWrite;
54    ///
55    /// # fn main() {
56    /// let (output, _) = vec![].async_write_all(b"hello").wait().ok().unwrap();
57    /// assert_eq!(&output[..], b"hello");
58    ///
59    /// let mut output = [0; 3];
60    /// let e = (&mut output).async_write_all(b"hello").wait().err().unwrap();
61    /// assert_eq!(e.error_ref().kind(), ErrorKind::UnexpectedEof);
62    /// # }
63    fn async_write_all<B: AsRef<[u8]>>(self, buf: B) -> WriteAll<Self, B> {
64        WriteAll(self.async_write(Window::new_ref(buf)))
65    }
66
67    /// Creates a future which will flush the internal buffer.
68    ///
69    /// # Examples
70    ///
71    /// ```
72    /// # extern crate futures;
73    /// # extern crate handy_async;
74    /// use std::io::BufWriter;
75    /// use futures::Future;
76    /// use handy_async::io::AsyncWrite;
77    ///
78    /// # fn main() {
79    /// let writer = BufWriter::new(vec![]);
80    /// let (writer, _) = writer.async_write_all(b"hello").wait().ok().unwrap();
81    /// assert_eq!(writer.get_ref(), b"");
82    ///
83    /// let writer = writer.async_flush().wait().ok().unwrap();
84    /// assert_eq!(writer.get_ref(), b"hello");
85    /// # }
86    /// ```
87    fn async_flush(self) -> Flush<Self> {
88        Flush(Some(self))
89    }
90}
91impl<W: Write> AsyncWrite for W {}
92
93/// A future which will write bytes to `W`.
94///
95/// This is created by calling `AsyncWrite::async_write` method.
96#[derive(Debug)]
97pub struct WriteBytes<W, B>(Option<(W, B)>);
98impl<W, B> WriteBytes<W, B> {
99    /// Returns the reference to the writer.
100    pub fn writer(&self) -> &W {
101        &self.0.as_ref().expect("WriteBytes has been consumed").0
102    }
103
104    /// Returns the mutable reference to the writer.
105    pub fn writer_mut(&mut self) -> &mut W {
106        &mut self.0.as_mut().expect("WriteBytes has been consumed").0
107    }
108}
109impl<W: Write, B: AsRef<[u8]>> Future for WriteBytes<W, B> {
110    type Item = (W, B, usize);
111    type Error = AsyncIoError<(W, B)>;
112    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
113        let (mut w, b) = self.0.take().expect("Cannot poll WriteBytes twice");
114        match w.write(b.as_ref()) {
115            Ok(write_size) => Ok(Async::Ready((w, b, write_size))),
116            Err(e) => {
117                if e.kind() == ErrorKind::WouldBlock {
118                    self.0 = Some((w, b));
119                    Ok(Async::NotReady)
120                } else {
121                    Err(AsyncIoError::new((w, b), e))
122                }
123            }
124        }
125    }
126}
127
128/// A future which will write all bytes to `W`.
129///
130/// This is created by calling `AsyncWrite::async_write_all` method.
131#[derive(Debug)]
132pub struct WriteAll<W, B>(WriteBytes<W, Window<B>>);
133impl<W, B> WriteAll<W, B> {
134    /// Returns the reference to the writer.
135    pub fn writer(&self) -> &W {
136        self.0.writer()
137    }
138
139    /// Returns the mutable reference to the writer.
140    pub fn writer_mut(&mut self) -> &mut W {
141        self.0.writer_mut()
142    }
143}
144impl<W: Write, B: AsRef<[u8]>> Future for WriteAll<W, B> {
145    type Item = (W, B);
146    type Error = AsyncIoError<(W, B)>;
147    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
148        while let Async::Ready((w, b, size)) =
149            self.0.poll().map_err(
150                |e| e.map_state(|(w, b)| (w, b.into_inner())),
151            )?
152        {
153            let b = b.skip(size);
154            if b.as_ref().is_empty() {
155                return Ok(Async::Ready((w, b.into_inner())));
156            } else if size == 0 {
157                let e = Error::new(
158                    ErrorKind::UnexpectedEof,
159                    format!("Unexpected EOF (remaining {} bytes", b.as_ref().len()),
160                );
161                return Err(AsyncIoError::new((w, b.into_inner()), e));
162            } else {
163                self.0 = w.async_write(b);
164            }
165        }
166        Ok(Async::NotReady)
167    }
168}
169
170/// A future which will flush the internal buffer of `W`.
171///
172/// This is created by calling `AsyncWrite::async_flush` method.
173#[derive(Debug)]
174pub struct Flush<W>(Option<W>);
175impl<W> Flush<W> {
176    /// Returns the reference to the writer.
177    pub fn writer(&self) -> &W {
178        self.0.as_ref().expect("Flush has been consumed")
179    }
180
181    /// Returns the mutable reference to the writer.
182    pub fn writer_mut(&mut self) -> &mut W {
183        self.0.as_mut().expect("Flush has been consumed")
184    }
185}
186impl<W: Write> Future for Flush<W> {
187    type Item = W;
188    type Error = AsyncIoError<W>;
189    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
190        let mut w = self.0.take().expect("Cannot poll Flush twice");
191        match w.flush() {
192            Ok(()) => Ok(Async::Ready(w)),
193            Err(e) => {
194                if e.kind() == ErrorKind::WouldBlock {
195                    self.0 = Some(w);
196                    Ok(Async::NotReady)
197                } else {
198                    Err(AsyncIoError::new(w, e))
199                }
200            }
201        }
202    }
203}