handy_async/io/async_write.rs
1use std::io::{Write, Error, ErrorKind};
2use futures::{Poll, Async, Future};
3
4use pattern::Window;
5use super::AsyncIoError;
6
7/// An asynchronous version of the standard `Write` trait.
8///
9/// Since this is assumed as a basic building block,
10/// it may be more convenient to use [`WriteInto`](./trait.WriteInto.html) for ordinary cases.
11///
12/// # Notice
13///
14/// For executing asynchronously, we assume the writer which implements this trait
15/// returns the `std::io::ErrorKind::WouldBlock` error
16/// if a write operation would be about to block.
17pub trait AsyncWrite: Write + Sized {
18 /// Creates a future which will write bytes asynchronously.
19 ///
20 /// # Examples
21 ///
22 /// ```
23 /// # extern crate futures;
24 /// # extern crate handy_async;
25 /// use futures::Future;
26 /// use handy_async::io::AsyncWrite;
27 ///
28 /// # fn main() {
29 /// let (output, _, _) = vec![].async_write(b"hello").wait().ok().unwrap();
30 /// assert_eq!(&output[..], b"hello");
31 ///
32 /// let mut output = [0; 3];
33 /// let (_, _, _) = (&mut output).async_write(b"hello").wait().ok().unwrap();
34 /// assert_eq!(&output[..], b"hel");
35 ///
36 /// let (_, _, written_size) = [0; 0].async_write(b"hello").wait().ok().unwrap();
37 /// assert_eq!(written_size, 0);
38 /// # }
39 /// ```
40 fn async_write<B: AsRef<[u8]>>(self, buf: B) -> WriteBytes<Self, B> {
41 WriteBytes(Some((self, buf)))
42 }
43
44 /// Creates a future which will write all bytes asynchronously.
45 ///
46 /// # Examples
47 ///
48 /// ```
49 /// # extern crate futures;
50 /// # extern crate handy_async;
51 /// use std::io::ErrorKind;
52 /// use futures::Future;
53 /// use handy_async::io::AsyncWrite;
54 ///
55 /// # fn main() {
56 /// let (output, _) = vec![].async_write_all(b"hello").wait().ok().unwrap();
57 /// assert_eq!(&output[..], b"hello");
58 ///
59 /// let mut output = [0; 3];
60 /// let e = (&mut output).async_write_all(b"hello").wait().err().unwrap();
61 /// assert_eq!(e.error_ref().kind(), ErrorKind::UnexpectedEof);
62 /// # }
63 fn async_write_all<B: AsRef<[u8]>>(self, buf: B) -> WriteAll<Self, B> {
64 WriteAll(self.async_write(Window::new_ref(buf)))
65 }
66
67 /// Creates a future which will flush the internal buffer.
68 ///
69 /// # Examples
70 ///
71 /// ```
72 /// # extern crate futures;
73 /// # extern crate handy_async;
74 /// use std::io::BufWriter;
75 /// use futures::Future;
76 /// use handy_async::io::AsyncWrite;
77 ///
78 /// # fn main() {
79 /// let writer = BufWriter::new(vec![]);
80 /// let (writer, _) = writer.async_write_all(b"hello").wait().ok().unwrap();
81 /// assert_eq!(writer.get_ref(), b"");
82 ///
83 /// let writer = writer.async_flush().wait().ok().unwrap();
84 /// assert_eq!(writer.get_ref(), b"hello");
85 /// # }
86 /// ```
87 fn async_flush(self) -> Flush<Self> {
88 Flush(Some(self))
89 }
90}
91impl<W: Write> AsyncWrite for W {}
92
93/// A future which will write bytes to `W`.
94///
95/// This is created by calling `AsyncWrite::async_write` method.
96#[derive(Debug)]
97pub struct WriteBytes<W, B>(Option<(W, B)>);
98impl<W, B> WriteBytes<W, B> {
99 /// Returns the reference to the writer.
100 pub fn writer(&self) -> &W {
101 &self.0.as_ref().expect("WriteBytes has been consumed").0
102 }
103
104 /// Returns the mutable reference to the writer.
105 pub fn writer_mut(&mut self) -> &mut W {
106 &mut self.0.as_mut().expect("WriteBytes has been consumed").0
107 }
108}
109impl<W: Write, B: AsRef<[u8]>> Future for WriteBytes<W, B> {
110 type Item = (W, B, usize);
111 type Error = AsyncIoError<(W, B)>;
112 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
113 let (mut w, b) = self.0.take().expect("Cannot poll WriteBytes twice");
114 match w.write(b.as_ref()) {
115 Ok(write_size) => Ok(Async::Ready((w, b, write_size))),
116 Err(e) => {
117 if e.kind() == ErrorKind::WouldBlock {
118 self.0 = Some((w, b));
119 Ok(Async::NotReady)
120 } else {
121 Err(AsyncIoError::new((w, b), e))
122 }
123 }
124 }
125 }
126}
127
128/// A future which will write all bytes to `W`.
129///
130/// This is created by calling `AsyncWrite::async_write_all` method.
131#[derive(Debug)]
132pub struct WriteAll<W, B>(WriteBytes<W, Window<B>>);
133impl<W, B> WriteAll<W, B> {
134 /// Returns the reference to the writer.
135 pub fn writer(&self) -> &W {
136 self.0.writer()
137 }
138
139 /// Returns the mutable reference to the writer.
140 pub fn writer_mut(&mut self) -> &mut W {
141 self.0.writer_mut()
142 }
143}
144impl<W: Write, B: AsRef<[u8]>> Future for WriteAll<W, B> {
145 type Item = (W, B);
146 type Error = AsyncIoError<(W, B)>;
147 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
148 while let Async::Ready((w, b, size)) =
149 self.0.poll().map_err(
150 |e| e.map_state(|(w, b)| (w, b.into_inner())),
151 )?
152 {
153 let b = b.skip(size);
154 if b.as_ref().is_empty() {
155 return Ok(Async::Ready((w, b.into_inner())));
156 } else if size == 0 {
157 let e = Error::new(
158 ErrorKind::UnexpectedEof,
159 format!("Unexpected EOF (remaining {} bytes", b.as_ref().len()),
160 );
161 return Err(AsyncIoError::new((w, b.into_inner()), e));
162 } else {
163 self.0 = w.async_write(b);
164 }
165 }
166 Ok(Async::NotReady)
167 }
168}
169
170/// A future which will flush the internal buffer of `W`.
171///
172/// This is created by calling `AsyncWrite::async_flush` method.
173#[derive(Debug)]
174pub struct Flush<W>(Option<W>);
175impl<W> Flush<W> {
176 /// Returns the reference to the writer.
177 pub fn writer(&self) -> &W {
178 self.0.as_ref().expect("Flush has been consumed")
179 }
180
181 /// Returns the mutable reference to the writer.
182 pub fn writer_mut(&mut self) -> &mut W {
183 self.0.as_mut().expect("Flush has been consumed")
184 }
185}
186impl<W: Write> Future for Flush<W> {
187 type Item = W;
188 type Error = AsyncIoError<W>;
189 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
190 let mut w = self.0.take().expect("Cannot poll Flush twice");
191 match w.flush() {
192 Ok(()) => Ok(Async::Ready(w)),
193 Err(e) => {
194 if e.kind() == ErrorKind::WouldBlock {
195 self.0 = Some(w);
196 Ok(Async::NotReady)
197 } else {
198 Err(AsyncIoError::new(w, e))
199 }
200 }
201 }
202 }
203}