tun_rs/async_device/unix/
tokio.rs

1use std::io;
2use std::task::{Context, Poll};
3
4use crate::platform::DeviceImpl;
5use ::tokio::io::unix::AsyncFd as TokioAsyncFd;
6use ::tokio::io::Interest;
7
8/// An async Tun/Tap device wrapper around a Tun/Tap device.
9///
10/// This type does not provide a split method, because this functionality can be achieved by instead wrapping the socket in an Arc.
11///
12/// # Streams
13///
14/// If you need to produce a [`Stream`], you can look at [`DeviceFramed`](crate::async_framed::DeviceFramed).
15///
16/// **Note:** `DeviceFramed` is only available when the `async_framed` feature is enabled.
17///
18/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
19///
20/// # Examples
21///
22/// ```no_run
23/// use tun_rs::{AsyncDevice, DeviceBuilder};
24///
25/// #[tokio::main]
26/// async fn main() -> std::io::Result<()> {
27///     // Create a TUN device with basic configuration
28///     let dev = DeviceBuilder::new()
29///         .name("tun0")
30///         .mtu(1500)
31///         .ipv4("10.0.0.1", "255.255.255.0", None)
32///         .build_async()?;
33///
34///     // Send a simple packet (Replace with real IP message)
35///     let packet = b"[IP Packet: 10.0.0.1 -> 10.0.0.2] Hello, Async TUN!";
36///     dev.send(packet).await?;
37///
38///     // Receive a packet
39///     let mut buf = [0u8; 1500];
40///     let n = dev.recv(&mut buf).await?;
41///     println!("Received {} bytes: {:?}", n, &buf[..n]);
42///
43///     Ok(())
44/// }
45/// ```
46pub struct AsyncDevice(pub(crate) TokioAsyncFd<DeviceImpl>);
47impl AsyncDevice {
48    /// Polls the I/O handle for readability.
49    ///
50    /// # Caveats
51    ///
52    /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
53    /// `Waker` from the `Context` passed to the most recent call will be scheduled to
54    /// receive a wakeup.
55    ///
56    /// # Return value
57    ///
58    /// The function returns:
59    ///
60    /// * `Poll::Pending` if the device is not ready for reading.
61    /// * `Poll::Ready(Ok(()))` if the device is ready for reading.
62    /// * `Poll::Ready(Err(e))` if an error is encountered.
63    ///
64    /// # Errors
65    ///
66    /// This function may encounter any standard I/O error except `WouldBlock`.
67    pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
68        self.0.poll_read_ready(cx).map_ok(|_| ())
69    }
70    /// Attempts to receive a single packet from the device
71    ///
72    /// # Caveats
73    ///
74    /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
75    /// `Waker` from the `Context` passed to the most recent call will be scheduled to
76    /// receive a wakeup.
77    ///
78    /// # Return value
79    ///
80    /// The function returns:
81    ///
82    /// * `Poll::Pending` if the device is not ready to read
83    /// * `Poll::Ready(Ok(()))` reads data `buf` if the device is ready
84    /// * `Poll::Ready(Err(e))` if an error is encountered.
85    ///
86    /// # Errors
87    ///
88    /// This function may encounter any standard I/O error except `WouldBlock`.
89    pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
90        loop {
91            return match self.0.poll_read_ready(cx) {
92                Poll::Ready(Ok(mut rs)) => {
93                    let n = match rs.try_io(|dev| dev.get_ref().recv(buf)) {
94                        Ok(rs) => rs?,
95                        Err(_) => continue,
96                    };
97                    Poll::Ready(Ok(n))
98                }
99                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
100                Poll::Pending => Poll::Pending,
101            };
102        }
103    }
104
105    /// Polls the I/O handle for writability.
106    ///
107    /// # Caveats
108    ///
109    /// Note that on multiple calls to a `poll_*` method in the send direction,
110    /// only the `Waker` from the `Context` passed to the most recent call will
111    /// be scheduled to receive a wakeup.
112    ///
113    /// # Return value
114    ///
115    /// The function returns:
116    ///
117    /// * `Poll::Pending` if the device is not ready for writing.
118    /// * `Poll::Ready(Ok(()))` if the device is ready for writing.
119    /// * `Poll::Ready(Err(e))` if an error is encountered.
120    ///
121    /// # Errors
122    ///
123    /// This function may encounter any standard I/O error except `WouldBlock`.
124    pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
125        self.0.poll_write_ready(cx).map_ok(|_| ())
126    }
127    /// Attempts to send packet to the device
128    ///
129    /// # Caveats
130    ///
131    /// Note that on multiple calls to a `poll_*` method in the send direction,
132    /// only the `Waker` from the `Context` passed to the most recent call will
133    /// be scheduled to receive a wakeup.
134    ///
135    /// # Return value
136    ///
137    /// The function returns:
138    ///
139    /// * `Poll::Pending` if the device is not available to write
140    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
141    /// * `Poll::Ready(Err(e))` if an error is encountered.
142    ///
143    /// # Errors
144    ///
145    /// This function may encounter any standard I/O error except `WouldBlock`.
146    pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
147        loop {
148            return match self.0.poll_write_ready(cx) {
149                Poll::Ready(Ok(mut rs)) => {
150                    let n = match rs.try_io(|dev| dev.get_ref().send(buf)) {
151                        Ok(rs) => rs?,
152                        Err(_) => continue,
153                    };
154                    Poll::Ready(Ok(n))
155                }
156                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
157                Poll::Pending => Poll::Pending,
158            };
159        }
160    }
161}
162
163impl AsyncDevice {
164    pub(crate) fn new_dev(device: DeviceImpl) -> io::Result<Self> {
165        device.set_nonblocking(true)?;
166        Ok(Self(TokioAsyncFd::new(device)?))
167    }
168    pub(crate) fn into_device(self) -> io::Result<DeviceImpl> {
169        Ok(self.0.into_inner())
170    }
171
172    pub(crate) async fn read_with<R>(
173        &self,
174        mut op: impl FnMut(&DeviceImpl) -> io::Result<R>,
175    ) -> io::Result<R> {
176        self.0
177            .async_io(Interest::READABLE.add(Interest::ERROR), |device| op(device))
178            .await
179    }
180    pub(crate) async fn write_with<R>(
181        &self,
182        mut op: impl FnMut(&DeviceImpl) -> io::Result<R>,
183    ) -> io::Result<R> {
184        self.0
185            .async_io(Interest::WRITABLE, |device| op(device))
186            .await
187    }
188
189    pub(crate) fn try_read_io<R>(
190        &self,
191        f: impl FnOnce(&DeviceImpl) -> io::Result<R>,
192    ) -> io::Result<R> {
193        self.0
194            .try_io(Interest::READABLE.add(Interest::ERROR), |device| f(device))
195    }
196
197    pub(crate) fn try_write_io<R>(
198        &self,
199        f: impl FnOnce(&DeviceImpl) -> io::Result<R>,
200    ) -> io::Result<R> {
201        self.0.try_io(Interest::WRITABLE, |device| f(device))
202    }
203
204    pub(crate) fn get_ref(&self) -> &DeviceImpl {
205        self.0.get_ref()
206    }
207}