1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use futures::{Async, AsyncSink, Poll, Sink};
use tokio_io::AsyncWrite;
use std::io::{Error as IoError, ErrorKind as IoErrorKind, Result as IoResult, Write};
#[derive(Debug)]
pub struct SinkWrite<S> {
sink: S,
}
impl<S: Sink> SinkWrite<S> {
pub fn new(sink: S) -> Self {
Self { sink: sink }
}
pub fn into_inner(self) -> S {
self.sink
}
}
impl<S: Sink> Write for SinkWrite<S>
where
S::SinkItem: for<'a> From<&'a [u8]>,
IoError: From<S::SinkError>,
{
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
match self.sink.start_send(buf.into())? {
AsyncSink::NotReady(_) => return Err(IoErrorKind::WouldBlock.into()),
AsyncSink::Ready => {}
}
Ok(buf.len())
}
fn flush(&mut self) -> IoResult<()> {
match self.sink.poll_complete()? {
Async::NotReady => Err(IoErrorKind::WouldBlock.into()),
Async::Ready(_) => Ok(()),
}
}
}
impl<S: Sink> AsyncWrite for SinkWrite<S>
where
S::SinkItem: for<'a> From<&'a [u8]>,
IoError: From<S::SinkError>,
{
fn shutdown(&mut self) -> Poll<(), IoError> {
Ok(self.sink.poll_complete()?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{Future, Sink};
use std::io::ErrorKind as IoErrorKind;
use tokio_io::io as async_io;
#[test]
fn async_write_works() {
let sink = Vec::<Vec<u8>>::new().sink_map_err(|_| IoErrorKind::InvalidData);
let write = SinkWrite::new(sink);
let (write, _) = async_io::write_all(write, b"hello").wait().unwrap();
let (write, _) = async_io::write_all(write, b" world").wait().unwrap();
let write = async_io::shutdown(write).wait().unwrap();
let vec = write.into_inner().into_inner();
assert_eq!(&vec[0][..], b"hello");
assert_eq!(&vec[1][..], b" world");
}
}