1extern crate futures;
2extern crate libc;
3extern crate mio;
4extern crate tokio;
5
6use std::io::{Error, ErrorKind, Read, Result, Write};
7use std::os::unix::io::AsRawFd;
8
9use self::futures::{Async as FAsync, AsyncSink, Sink, StartSend, Stream, Poll as FPoll};
10use self::libc::c_int;
11use self::mio::{Evented, Poll as MPoll, PollOpt, Ready, Token};
12use self::mio::unix::EventedFd;
13use self::tokio::reactor::{Handle, PollEvented2};
14
15use super::Iface;
16
17
18
19struct MioWrapper {
20 iface: Iface,
21}
22
23impl Evented for MioWrapper {
24 fn register(&self, poll: &MPoll, token: Token, events: Ready, opts: PollOpt) -> Result<()> {
25 EventedFd(&self.iface.as_raw_fd()).register(poll, token, events, opts)
26 }
27 fn reregister(&self, poll: &MPoll, token: Token, events: Ready, opts: PollOpt) -> Result<()> {
28 EventedFd(&self.iface.as_raw_fd()).reregister(poll, token, events, opts)
29 }
30 fn deregister(&self, poll: &MPoll) -> Result<()> {
31 EventedFd(&self.iface.as_raw_fd()).deregister(poll)
32 }
33}
34
35impl Read for MioWrapper {
36 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
37 self.iface.recv(buf)
38 }
39}
40
41impl Write for MioWrapper {
42 fn write(&mut self, buf: &[u8]) -> Result<usize> {
43 self.iface.send(buf)
44 }
45 fn flush(&mut self) -> Result<()> {
46 Ok(())
47 }
48}
49
50pub struct Async {
54 mio: PollEvented2<MioWrapper>,
55 recv_bufsize: usize,
56}
57
58impl Async {
59 pub fn new(iface: Iface) -> Result<Self> {
90 let fd = iface.as_raw_fd();
91 let mut nonblock: c_int = 1;
92 let result = unsafe { libc::ioctl(fd, libc::FIONBIO, &mut nonblock) };
93 if result == -1 {
94 Err(Error::last_os_error())
95 } else {
96 Ok(Async {
97 mio: PollEvented2::new(MioWrapper { iface }),
98 recv_bufsize: 1542,
99 })
100 }
101 }
102 pub fn set_recv_bufsize(&mut self, bufsize: usize) {
110 self.recv_bufsize = bufsize;
111 }
112}
113
114impl Stream for Async {
115 type Item = Vec<u8>;
116 type Error = Error;
117 fn poll(&mut self) -> FPoll<Option<Self::Item>, Self::Error> {
118 let mut buffer = vec![0; self.recv_bufsize];
120 match self.mio.read(&mut buffer) {
121 Ok(size) => {
122 buffer.resize(size, 0);
123 Ok(FAsync::Ready(Some(buffer)))
124 },
125 Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(FAsync::NotReady),
126 Err(e) => Err(e),
127 }
128 }
129}
130
131impl Sink for Async {
132 type SinkItem = Vec<u8>;
133 type SinkError = Error;
134 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
135 match self.mio.write(&item) {
136 Ok(_size) => Ok(AsyncSink::Ready),
138 Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(AsyncSink::NotReady(item)),
139 Err(e) => Err(e),
140 }
141 }
142 fn poll_complete(&mut self) -> FPoll<(), Self::SinkError> {
143 Ok(FAsync::Ready(()))
144 }
145}