1extern crate futures;
5extern crate libc;
6extern crate mio;
7extern crate tokio_core;
8
9use std::io::{Error, ErrorKind, Read, Result, Write};
10use std::os::unix::io::AsRawFd;
11
12use self::futures::{Async as FAsync, AsyncSink, Sink, StartSend, Stream, Poll as FPoll};
13use self::mio::{Evented, Poll as MPoll, PollOpt, Ready, Token};
14use self::mio::unix::EventedFd;
15use self::tokio_core::reactor::{Handle, PollEvented};
16
17use super::Iface;
18
19struct MioWrapper {
20 iface: Iface,
21}
22
23impl Evented for MioWrapper {
24 fn register(&self, poll: &MPoll, token: Token, events: Ready, opts: PollOpt) -> Result<()> {
25 EventedFd(&self.iface.as_raw_fd()).register(poll, token, events, opts)
26 }
27 fn reregister(&self, poll: &MPoll, token: Token, events: Ready, opts: PollOpt) -> Result<()> {
28 EventedFd(&self.iface.as_raw_fd()).reregister(poll, token, events, opts)
29 }
30 fn deregister(&self, poll: &MPoll) -> Result<()> {
31 EventedFd(&self.iface.as_raw_fd()).deregister(poll)
32 }
33}
34
35impl Read for MioWrapper {
36 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
37 self.iface.recv(buf)
38 }
39}
40
41impl Write for MioWrapper {
42 fn write(&mut self, buf: &[u8]) -> Result<usize> {
43 self.iface.send(buf)
44 }
45 fn flush(&mut self) -> Result<()> {
46 Ok(())
47 }
48}
49
50pub struct Async {
54 mio: PollEvented<MioWrapper>,
55 recv_bufsize: usize,
56}
57
58impl Async {
59 pub fn new(iface: Iface, handle: &Handle) -> Result<Self> {
90 iface.set_non_blocking()?;
91 Ok(Async {
92 mio: PollEvented::new(MioWrapper { iface }, handle)?,
93 recv_bufsize: 1542,
94 })
95 }
96 pub fn set_recv_bufsize(&mut self, bufsize: usize) {
104 self.recv_bufsize = bufsize;
105 }
106}
107
108impl Stream for Async {
109 type Item = Vec<u8>;
110 type Error = Error;
111 fn poll(&mut self) -> FPoll<Option<Self::Item>, Self::Error> {
112 let mut buffer = vec![0; self.recv_bufsize];
114 match self.mio.read(&mut buffer) {
115 Ok(size) => {
116 buffer.resize(size, 0);
117 Ok(FAsync::Ready(Some(buffer)))
118 },
119 Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(FAsync::NotReady),
120 Err(e) => Err(e),
121 }
122 }
123}
124
125impl Sink for Async {
126 type SinkItem = Vec<u8>;
127 type SinkError = Error;
128 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
129 match self.mio.write(&item) {
130 Ok(_size) => Ok(AsyncSink::Ready),
132 Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(AsyncSink::NotReady(item)),
133 Err(e) => Err(e),
134 }
135 }
136 fn poll_complete(&mut self) -> FPoll<(), Self::SinkError> {
137 Ok(FAsync::Ready(()))
138 }
139}