tokio_utun/
lib.rs

1//! MIO bindings for Unix Domain Sockets
2
3#![cfg(unix)]
4#![doc(html_root_url = "https://docs.rs/tokio-utun")]
5
6extern crate futures;
7extern crate tokio_io;
8extern crate log;
9extern crate bytes;
10extern crate mio;
11extern crate mio_utun;
12extern crate tokio_codec;
13extern crate tokio_reactor;
14
15use std::io::{self, Read, Write};
16use std::os::unix::io::{FromRawFd, RawFd};
17use bytes::{Buf, BufMut};
18use futures::{Async, Poll};
19use mio::Ready;
20use tokio_reactor::{PollEvented, Handle};
21use tokio_io::{AsyncRead, AsyncWrite};
22use tokio_codec::{Framed, Encoder, Decoder};
23
24/// The primary class for this crate, a stream of tunneled traffic.
25#[derive(Debug)]
26pub struct UtunStream {
27    io: PollEvented<mio_utun::UtunStream>
28}
29
30impl UtunStream {
31    pub fn connect(name: &str) -> io::Result<UtunStream> {
32        let stream = mio_utun::UtunStream::connect(name)?;
33        let io = PollEvented::new(stream);
34        Ok(UtunStream { io })
35    }
36
37    pub fn connect_with_handle(name: &str, handle: &Handle) -> io::Result<UtunStream> {
38        let stream = mio_utun::UtunStream::connect(name)?;
39        let io = PollEvented::new_with_handle(stream, handle)?;
40        Ok(UtunStream { io })
41    }
42
43    pub fn from_fd(fd: RawFd) -> UtunStream {
44        let stream = unsafe { mio_utun::UtunStream::from_raw_fd(fd) };
45        let io = PollEvented::new(stream);
46        UtunStream { io }
47    }
48
49    pub fn from_fd_with_handle(fd: RawFd, handle: &Handle) -> io::Result<UtunStream> {
50        let stream = unsafe { mio_utun::UtunStream::from_raw_fd(fd) };
51        let io = PollEvented::new_with_handle(stream, handle)?;
52        Ok(UtunStream { io })
53    }
54
55    pub fn name(&self) -> io::Result<String> {
56        self.io.get_ref().name()
57    }
58
59    /// Provides a `Stream` and `Sink` interface for reading and writing to this
60    /// `UdpSocket` object, using the provided `UdpCodec` to read and write the
61    /// raw data.
62    ///
63    /// Raw UDP sockets work with datagrams, but higher-level code usually
64    /// wants to batch these into meaningful chunks, called "frames". This
65    /// method layers framing on top of this socket by using the `UdpCodec`
66    /// trait to handle encoding and decoding of messages frames. Note that
67    /// the incoming and outgoing frame types may be distinct.
68    ///
69    /// This function returns a *single* object that is both `Stream` and
70    /// `Sink`; grouping this into a single object is often useful for layering
71    /// things which require both read and write access to the underlying
72    /// object.
73    ///
74    /// If you want to work more directly with the streams and sink, consider
75    /// calling `split` on the `UdpFramed` returned by this method, which will
76    /// break them into separate objects, allowing them to interact more
77    /// easily.
78    pub fn framed<C: Decoder + Encoder>(self, codec: C) -> Framed<UtunStream, C> {
79        Framed::new(self, codec)
80    }
81
82    /// Test whether this socket is ready to be read or not.
83    ///
84    /// If the socket is *not* readable then the current task is scheduled to
85    /// get a notification when the socket does become readable. That is, this
86    /// is only suitable for calling in a `Future::poll` method and will
87    /// automatically handle ensuring a retry once the socket is readable again.
88    pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
89        self.io.poll_read_ready(ready)
90    }
91
92    /// Test whether this socket is ready to be written to or not.
93    ///
94    /// If the socket is *not* writable then the current task is scheduled to
95    /// get a notification when the socket does become writable. That is, this
96    /// is only suitable for calling in a `Future::poll` method and will
97    /// automatically handle ensuring a retry once the socket is writable again.
98    pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
99        self.io.poll_write_ready()
100    }
101}
102
103impl Read for UtunStream {
104    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
105        self.io.read(buf)
106    }
107}
108
109impl Write for UtunStream {
110    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
111        self.io.write(buf)
112    }
113    fn flush(&mut self) -> io::Result<()> {
114        self.io.flush()
115    }
116}
117
118impl AsyncRead for UtunStream {
119    unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
120        false
121    }
122
123    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
124        if let Async::NotReady = self.poll_read_ready(Ready::readable())? {
125            return Ok(Async::NotReady);
126        }
127
128        unsafe {
129            match self.io.read(buf.bytes_mut()) {
130                Ok(n) => {
131                    buf.advance_mut(n);
132                    Ok(Async::Ready(n))
133                },
134                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
135                    self.io.clear_read_ready(Ready::readable())?;
136                    Ok(Async::NotReady)
137                },
138                Err(e) => Err(e)
139            }
140        }
141    }
142}
143
144impl AsyncWrite for UtunStream {
145    fn shutdown(&mut self) -> Poll<(), io::Error> {
146        Ok(().into())
147    }
148
149    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
150        if let Async::NotReady = self.poll_write_ready()? {
151            return Ok(Async::NotReady);
152        }
153
154        match self.io.write(buf.bytes()) {
155            Ok(n) => {
156                buf.advance(n);
157                Ok(Async::Ready(n))
158            },
159            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
160                self.io.clear_write_ready()?;
161                Ok(Async::NotReady)
162            },
163            Err(e) => Err(e)
164        }
165    }
166}