1#![cfg(unix)]
4#![doc(html_root_url = "https://docs.rs/tokio-utun")]
5
6extern crate futures;
7extern crate tokio_io;
8extern crate log;
9extern crate bytes;
10extern crate mio;
11extern crate mio_utun;
12extern crate tokio_codec;
13extern crate tokio_reactor;
14
15use std::io::{self, Read, Write};
16use std::os::unix::io::{FromRawFd, RawFd};
17use bytes::{Buf, BufMut};
18use futures::{Async, Poll};
19use mio::Ready;
20use tokio_reactor::{PollEvented, Handle};
21use tokio_io::{AsyncRead, AsyncWrite};
22use tokio_codec::{Framed, Encoder, Decoder};
23
24#[derive(Debug)]
26pub struct UtunStream {
27 io: PollEvented<mio_utun::UtunStream>
28}
29
30impl UtunStream {
31 pub fn connect(name: &str) -> io::Result<UtunStream> {
32 let stream = mio_utun::UtunStream::connect(name)?;
33 let io = PollEvented::new(stream);
34 Ok(UtunStream { io })
35 }
36
37 pub fn connect_with_handle(name: &str, handle: &Handle) -> io::Result<UtunStream> {
38 let stream = mio_utun::UtunStream::connect(name)?;
39 let io = PollEvented::new_with_handle(stream, handle)?;
40 Ok(UtunStream { io })
41 }
42
43 pub fn from_fd(fd: RawFd) -> UtunStream {
44 let stream = unsafe { mio_utun::UtunStream::from_raw_fd(fd) };
45 let io = PollEvented::new(stream);
46 UtunStream { io }
47 }
48
49 pub fn from_fd_with_handle(fd: RawFd, handle: &Handle) -> io::Result<UtunStream> {
50 let stream = unsafe { mio_utun::UtunStream::from_raw_fd(fd) };
51 let io = PollEvented::new_with_handle(stream, handle)?;
52 Ok(UtunStream { io })
53 }
54
55 pub fn name(&self) -> io::Result<String> {
56 self.io.get_ref().name()
57 }
58
59 pub fn framed<C: Decoder + Encoder>(self, codec: C) -> Framed<UtunStream, C> {
79 Framed::new(self, codec)
80 }
81
82 pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
89 self.io.poll_read_ready(ready)
90 }
91
92 pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
99 self.io.poll_write_ready()
100 }
101}
102
103impl Read for UtunStream {
104 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
105 self.io.read(buf)
106 }
107}
108
109impl Write for UtunStream {
110 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
111 self.io.write(buf)
112 }
113 fn flush(&mut self) -> io::Result<()> {
114 self.io.flush()
115 }
116}
117
118impl AsyncRead for UtunStream {
119 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
120 false
121 }
122
123 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
124 if let Async::NotReady = self.poll_read_ready(Ready::readable())? {
125 return Ok(Async::NotReady);
126 }
127
128 unsafe {
129 match self.io.read(buf.bytes_mut()) {
130 Ok(n) => {
131 buf.advance_mut(n);
132 Ok(Async::Ready(n))
133 },
134 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
135 self.io.clear_read_ready(Ready::readable())?;
136 Ok(Async::NotReady)
137 },
138 Err(e) => Err(e)
139 }
140 }
141 }
142}
143
144impl AsyncWrite for UtunStream {
145 fn shutdown(&mut self) -> Poll<(), io::Error> {
146 Ok(().into())
147 }
148
149 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
150 if let Async::NotReady = self.poll_write_ready()? {
151 return Ok(Async::NotReady);
152 }
153
154 match self.io.write(buf.bytes()) {
155 Ok(n) => {
156 buf.advance(n);
157 Ok(Async::Ready(n))
158 },
159 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
160 self.io.clear_write_ready()?;
161 Ok(Async::NotReady)
162 },
163 Err(e) => Err(e)
164 }
165 }
166}