tun_driver/
async.rs

1extern crate futures;
2extern crate libc;
3extern crate mio;
4extern crate tokio;
5
6use std::io::{Error, ErrorKind, Read, Result, Write};
7use std::os::unix::io::AsRawFd;
8
9use self::futures::{Async as FAsync, AsyncSink, Sink, StartSend, Stream, Poll as FPoll};
10use self::libc::c_int;
11use self::mio::{Evented, Poll as MPoll, PollOpt, Ready, Token};
12use self::mio::unix::EventedFd;
13use self::tokio::reactor::{Handle, PollEvented2};
14
15use super::Iface;
16
17
18
19struct MioWrapper {
20    iface: Iface,
21}
22
23impl Evented for MioWrapper {
24    fn register(&self, poll: &MPoll, token: Token, events: Ready, opts: PollOpt) -> Result<()> {
25        EventedFd(&self.iface.as_raw_fd()).register(poll, token, events, opts)
26    }
27    fn reregister(&self, poll: &MPoll, token: Token, events: Ready, opts: PollOpt) -> Result<()> {
28        EventedFd(&self.iface.as_raw_fd()).reregister(poll, token, events, opts)
29    }
30    fn deregister(&self, poll: &MPoll) -> Result<()> {
31        EventedFd(&self.iface.as_raw_fd()).deregister(poll)
32    }
33}
34
35impl Read for MioWrapper {
36    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
37        self.iface.recv(buf)
38    }
39}
40
41impl Write for MioWrapper {
42    fn write(&mut self, buf: &[u8]) -> Result<usize> {
43        self.iface.send(buf)
44    }
45    fn flush(&mut self) -> Result<()> {
46        Ok(())
47    }
48}
49
50/// A wrapper around [`Iface`](../struct.Iface.html) for use in connection with tokio.
51///
52/// This turns the synchronous `Iface` into an asynchronous `Sink + Stream` of packets.
53pub struct Async {
54    mio: PollEvented2<MioWrapper>,
55    recv_bufsize: usize,
56}
57
58impl Async {
59    /// Consumes an `Iface` and wraps it in a new `Async`.
60    ///
61    /// # Parameters
62    ///
63    /// * `iface`: The created interface to wrap. It gets consumed.
64    /// * `handle`: The handle to tokio's `Core` to run on.
65    ///
66    /// # Errors
67    ///
68    /// This fails with an error in case of low-level OS errors (they shouldn't usually happen).
69    ///
70    /// # Examples
71    ///
72    /// ```rust,no_run
73    /// # extern crate futures;
74    /// # extern crate tokio_core;
75    /// # extern crate tun_tap;
76    /// # use futures::Stream;
77    /// # use tun_tap::*;
78    /// # use tun_tap::async::*;
79    /// # use tokio_core::reactor::Core;
80    /// # fn main() {
81    /// let iface = Iface::new("mytun%d", Mode::Tun).unwrap();
82    /// let name = iface.name().to_owned();
83    /// // Bring the interface up by `ip addr add IP dev $name; ip link set up dev $name`
84    /// let core = Core::new().unwrap();
85    /// let async = Async::new(iface, &core.handle()).unwrap();
86    /// let (sink, stream) = async.split();
87    /// # }
88    /// ```
89    pub fn new(iface: Iface) -> Result<Self> {
90        let fd = iface.as_raw_fd();
91        let mut nonblock: c_int = 1;
92        let result = unsafe { libc::ioctl(fd, libc::FIONBIO, &mut nonblock) };
93        if result == -1 {
94            Err(Error::last_os_error())
95        } else {
96            Ok(Async {
97                mio: PollEvented2::new(MioWrapper { iface }),
98                recv_bufsize: 1542,
99            })
100        }
101    }
102    /// Sets the receive buffer size.
103    ///
104    /// When receiving a packet, a buffer of this size is allocated and the packet read into it.
105    /// This configures the size of the buffer.
106    ///
107    /// This needs to be called when the interface's MTU is changed from the default 1500. The
108    /// default should be enough otherwise.
109    pub fn set_recv_bufsize(&mut self, bufsize: usize) {
110        self.recv_bufsize = bufsize;
111    }
112}
113
114impl Stream for Async {
115    type Item = Vec<u8>;
116    type Error = Error;
117    fn poll(&mut self) -> FPoll<Option<Self::Item>, Self::Error> {
118        // TODO Reuse buffer?
119        let mut buffer = vec![0; self.recv_bufsize];
120        match self.mio.read(&mut buffer) {
121            Ok(size) => {
122                buffer.resize(size, 0);
123                Ok(FAsync::Ready(Some(buffer)))
124            },
125            Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(FAsync::NotReady),
126            Err(e) => Err(e),
127        }
128    }
129}
130
131impl Sink for Async {
132    type SinkItem = Vec<u8>;
133    type SinkError = Error;
134    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
135        match self.mio.write(&item) {
136            // TODO What to do about short write? Can it happen?
137            Ok(_size) => Ok(AsyncSink::Ready),
138            Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(AsyncSink::NotReady(item)),
139            Err(e) => Err(e),
140        }
141    }
142    fn poll_complete(&mut self) -> FPoll<(), Self::SinkError> {
143        Ok(FAsync::Ready(()))
144    }
145}