tun_tap/
async.rs

1//! Integration of TUN/TAP into tokio.
2//!
3//! See the [`Async`](struct.Async.html) structure.
4extern crate futures;
5extern crate libc;
6extern crate mio;
7extern crate tokio_core;
8
9use std::io::{Error, ErrorKind, Read, Result, Write};
10use std::os::unix::io::AsRawFd;
11
12use self::futures::{Async as FAsync, AsyncSink, Sink, StartSend, Stream, Poll as FPoll};
13use self::mio::{Evented, Poll as MPoll, PollOpt, Ready, Token};
14use self::mio::unix::EventedFd;
15use self::tokio_core::reactor::{Handle, PollEvented};
16
17use super::Iface;
18
19struct MioWrapper {
20    iface: Iface,
21}
22
23impl Evented for MioWrapper {
24    fn register(&self, poll: &MPoll, token: Token, events: Ready, opts: PollOpt) -> Result<()> {
25        EventedFd(&self.iface.as_raw_fd()).register(poll, token, events, opts)
26    }
27    fn reregister(&self, poll: &MPoll, token: Token, events: Ready, opts: PollOpt) -> Result<()> {
28        EventedFd(&self.iface.as_raw_fd()).reregister(poll, token, events, opts)
29    }
30    fn deregister(&self, poll: &MPoll) -> Result<()> {
31        EventedFd(&self.iface.as_raw_fd()).deregister(poll)
32    }
33}
34
35impl Read for MioWrapper {
36    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
37        self.iface.recv(buf)
38    }
39}
40
41impl Write for MioWrapper {
42    fn write(&mut self, buf: &[u8]) -> Result<usize> {
43        self.iface.send(buf)
44    }
45    fn flush(&mut self) -> Result<()> {
46        Ok(())
47    }
48}
49
50/// A wrapper around [`Iface`](../struct.Iface.html) for use in connection with tokio.
51///
52/// This turns the synchronous `Iface` into an asynchronous `Sink + Stream` of packets.
53pub struct Async {
54    mio: PollEvented<MioWrapper>,
55    recv_bufsize: usize,
56}
57
58impl Async {
59    /// Consumes an `Iface` and wraps it in a new `Async`.
60    ///
61    /// # Parameters
62    ///
63    /// * `iface`: The created interface to wrap. It gets consumed.
64    /// * `handle`: The handle to tokio's `Core` to run on.
65    ///
66    /// # Errors
67    ///
68    /// This fails with an error in case of low-level OS errors (they shouldn't usually happen).
69    ///
70    /// # Examples
71    ///
72    /// ```rust,no_run
73    /// # extern crate futures;
74    /// # extern crate tokio_core;
75    /// # extern crate tun_tap;
76    /// # use futures::Stream;
77    /// # use tun_tap::*;
78    /// # use tun_tap::async::*;
79    /// # use tokio_core::reactor::Core;
80    /// # fn main() {
81    /// let iface = Iface::new("mytun%d", Mode::Tun).unwrap();
82    /// let name = iface.name().to_owned();
83    /// // Bring the interface up by `ip addr add IP dev $name; ip link set up dev $name`
84    /// let core = Core::new().unwrap();
85    /// let async = Async::new(iface, &core.handle()).unwrap();
86    /// let (sink, stream) = async.split();
87    /// # }
88    /// ```
89    pub fn new(iface: Iface, handle: &Handle) -> Result<Self> {
90        iface.set_non_blocking()?;
91        Ok(Async {
92            mio: PollEvented::new(MioWrapper { iface }, handle)?,
93            recv_bufsize: 1542,
94        })
95    }
96    /// Sets the receive buffer size.
97    ///
98    /// When receiving a packet, a buffer of this size is allocated and the packet read into it.
99    /// This configures the size of the buffer.
100    ///
101    /// This needs to be called when the interface's MTU is changed from the default 1500. The
102    /// default should be enough otherwise.
103    pub fn set_recv_bufsize(&mut self, bufsize: usize) {
104        self.recv_bufsize = bufsize;
105    }
106}
107
108impl Stream for Async {
109    type Item = Vec<u8>;
110    type Error = Error;
111    fn poll(&mut self) -> FPoll<Option<Self::Item>, Self::Error> {
112        // TODO Reuse buffer?
113        let mut buffer = vec![0; self.recv_bufsize];
114        match self.mio.read(&mut buffer) {
115            Ok(size) => {
116                buffer.resize(size, 0);
117                Ok(FAsync::Ready(Some(buffer)))
118            },
119            Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(FAsync::NotReady),
120            Err(e) => Err(e),
121        }
122    }
123}
124
125impl Sink for Async {
126    type SinkItem = Vec<u8>;
127    type SinkError = Error;
128    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
129        match self.mio.write(&item) {
130            // TODO What to do about short write? Can it happen?
131            Ok(_size) => Ok(AsyncSink::Ready),
132            Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(AsyncSink::NotReady(item)),
133            Err(e) => Err(e),
134        }
135    }
136    fn poll_complete(&mut self) -> FPoll<(), Self::SinkError> {
137        Ok(FAsync::Ready(()))
138    }
139}