tun_rs/async_device/unix/mod.rs
1#[cfg(all(target_os = "linux", not(target_env = "ohos")))]
2use crate::platform::offload::{handle_gro, VirtioNetHdr, VIRTIO_NET_HDR_LEN};
3use crate::platform::DeviceImpl;
4#[cfg(all(target_os = "linux", not(target_env = "ohos")))]
5use crate::platform::GROTable;
6use crate::SyncDevice;
7use std::io;
8use std::io::{IoSlice, IoSliceMut};
9use std::ops::Deref;
10use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
11
12#[cfg(feature = "async_tokio")]
13mod tokio;
14#[cfg(feature = "async_tokio")]
15pub use self::tokio::AsyncDevice;
16
17#[cfg(all(feature = "async_io", not(feature = "async_tokio")))]
18mod async_io;
19#[cfg(all(feature = "async_io", not(feature = "async_tokio")))]
20pub use self::async_io::AsyncDevice;
21
22impl FromRawFd for AsyncDevice {
23 unsafe fn from_raw_fd(fd: RawFd) -> Self {
24 AsyncDevice::from_fd(fd).unwrap()
25 }
26}
27impl IntoRawFd for AsyncDevice {
28 fn into_raw_fd(self) -> RawFd {
29 self.into_fd().unwrap()
30 }
31}
32impl AsRawFd for AsyncDevice {
33 fn as_raw_fd(&self) -> RawFd {
34 self.get_ref().as_raw_fd()
35 }
36}
37
38impl Deref for AsyncDevice {
39 type Target = DeviceImpl;
40
41 fn deref(&self) -> &Self::Target {
42 self.get_ref()
43 }
44}
45
46impl AsyncDevice {
47 #[allow(dead_code)]
48 pub fn new(device: SyncDevice) -> io::Result<AsyncDevice> {
49 AsyncDevice::new_dev(device.0)
50 }
51
52 /// # Safety
53 /// This method is safe if the provided fd is valid
54 /// Construct a AsyncDevice from an existing file descriptor
55 pub unsafe fn from_fd(fd: RawFd) -> io::Result<AsyncDevice> {
56 AsyncDevice::new_dev(DeviceImpl::from_fd(fd)?)
57 }
58
59 /// # Safety
60 /// The fd passed in must be a valid, open file descriptor.
61 /// Unlike [`from_fd`], this function does **not** take ownership of `fd`,
62 /// and therefore will not close it when dropped.
63 /// The caller is responsible for ensuring the lifetime and eventual closure of `fd`.
64 #[allow(dead_code)]
65 pub(crate) unsafe fn borrow_raw(fd: RawFd) -> io::Result<Self> {
66 AsyncDevice::new_dev(DeviceImpl::borrow_raw(fd)?)
67 }
68
69 pub fn into_fd(self) -> io::Result<RawFd> {
70 Ok(self.into_device()?.into_raw_fd())
71 }
72 /// Waits for the device to become readable.
73 ///
74 /// This function is usually paired with `try_recv()` for manual readiness-based I/O.
75 ///
76 /// The function may complete without the device being readable. This is a
77 /// false-positive and attempting a `try_recv()` will return with
78 /// `io::ErrorKind::WouldBlock`.
79 ///
80 /// # Cancel safety
81 ///
82 /// This method is cancel safe. Once a readiness event occurs, the method
83 /// will continue to return immediately until the readiness event is
84 /// consumed by an attempt to read that fails with `WouldBlock` or
85 /// `Poll::Pending`.
86 ///
87 /// # Example
88 ///
89 /// ```no_run
90 /// # #[cfg(all(unix, any(feature = "async_io", feature = "async_tokio")))]
91 /// # async fn example() -> std::io::Result<()> {
92 /// use tun_rs::DeviceBuilder;
93 ///
94 /// let dev = DeviceBuilder::new()
95 /// .ipv4("10.0.0.1", 24, None)
96 /// .build_async()?;
97 ///
98 /// // Wait for the device to be readable
99 /// dev.readable().await?;
100 ///
101 /// // Try to read (may still return WouldBlock)
102 /// let mut buf = vec![0u8; 1500];
103 /// match dev.try_recv(&mut buf) {
104 /// Ok(n) => println!("Read {} bytes", n),
105 /// Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
106 /// println!("False positive readiness");
107 /// }
108 /// Err(e) => return Err(e),
109 /// }
110 /// # Ok(())
111 /// # }
112 /// ```
113 pub async fn readable(&self) -> io::Result<()> {
114 self.0.readable().await.map(|_| ())
115 }
116 /// Waits for the device to become writable.
117 ///
118 /// This function is usually paired with `try_send()` for manual readiness-based I/O.
119 ///
120 /// The function may complete without the device being writable. This is a
121 /// false-positive and attempting a `try_send()` will return with
122 /// `io::ErrorKind::WouldBlock`.
123 ///
124 /// # Cancel safety
125 ///
126 /// This method is cancel safe. Once a readiness event occurs, the method
127 /// will continue to return immediately until the readiness event is
128 /// consumed by an attempt to write that fails with `WouldBlock` or
129 /// `Poll::Pending`.
130 ///
131 /// # Example
132 ///
133 /// ```no_run
134 /// # #[cfg(all(unix, any(feature = "async_io", feature = "async_tokio")))]
135 /// # async fn example() -> std::io::Result<()> {
136 /// use tun_rs::DeviceBuilder;
137 ///
138 /// let dev = DeviceBuilder::new()
139 /// .ipv4("10.0.0.1", 24, None)
140 /// .build_async()?;
141 ///
142 /// // Prepare a packet
143 /// let packet = b"Hello, TUN!";
144 ///
145 /// // Wait for the device to be writable
146 /// dev.writable().await?;
147 ///
148 /// // Try to send (may still return WouldBlock)
149 /// match dev.try_send(packet) {
150 /// Ok(n) => println!("Sent {} bytes", n),
151 /// Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
152 /// println!("False positive writability");
153 /// }
154 /// Err(e) => return Err(e),
155 /// }
156 /// # Ok(())
157 /// # }
158 /// ```
159 pub async fn writable(&self) -> io::Result<()> {
160 self.0.writable().await.map(|_| ())
161 }
162 /// Receives a single packet from the device.
163 /// On success, returns the number of bytes read.
164 ///
165 /// The function must be called with valid byte array `buf` of sufficient
166 /// size to hold the message bytes. If a message is too long to fit in the
167 /// supplied buffer, excess bytes may be discarded.
168 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
169 self.read_with(|device| device.recv(buf)).await
170 }
171 /// Tries to receive a single packet from the device.
172 /// On success, returns the number of bytes read.
173 ///
174 /// This method must be called with valid byte array `buf` of sufficient size
175 /// to hold the message bytes. If a message is too long to fit in the
176 /// supplied buffer, excess bytes may be discarded.
177 ///
178 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
179 /// returned. This function is usually paired with `readable()`.
180 pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
181 self.try_read_io(|device| device.recv(buf))
182 }
183
184 /// Send a packet to the device
185 ///
186 /// # Return
187 /// On success, the number of bytes sent is returned, otherwise, the encountered error is returned.
188 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
189 self.write_with(|device| device.send(buf)).await
190 }
191 /// Tries to send packet to the device.
192 ///
193 /// When the device buffer is full, `Err(io::ErrorKind::WouldBlock)` is
194 /// returned. This function is usually paired with `writable()`.
195 ///
196 /// # Returns
197 ///
198 /// If successful, `Ok(n)` is returned, where `n` is the number of bytes
199 /// sent. If the device is not ready to send data,
200 /// `Err(ErrorKind::WouldBlock)` is returned.
201 pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
202 self.try_write_io(|device| device.send(buf))
203 }
204 /// Receives a packet into multiple buffers (scatter read).
205 /// **Processes single packet per call**.
206 pub async fn recv_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
207 self.read_with(|device| device.recv_vectored(bufs)).await
208 }
209 /// Non-blocking version of `recv_vectored`.
210 pub fn try_recv_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
211 self.try_read_io(|device| device.recv_vectored(bufs))
212 }
213 /// Sends multiple buffers as a single packet (gather write).
214 pub async fn send_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
215 self.write_with(|device| device.send_vectored(bufs)).await
216 }
217 /// Non-blocking version of `send_vectored`.
218 pub fn try_send_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
219 self.try_write_io(|device| device.send_vectored(bufs))
220 }
221}
222
223#[cfg(all(target_os = "linux", not(target_env = "ohos")))]
224impl AsyncDevice {
225 /// # Prerequisites
226 /// - The `IFF_MULTI_QUEUE` flag must be enabled.
227 /// - The system must support network interface multi-queue functionality.
228 ///
229 /// # Description
230 /// When multi-queue is enabled, create a new queue by duplicating an existing one.
231 pub fn try_clone(&self) -> io::Result<Self> {
232 AsyncDevice::new_dev(self.get_ref().try_clone()?)
233 }
234 /// Recv a packet from the device.
235 /// If offload is enabled. This method can be used to obtain processed data.
236 ///
237 /// original_buffer is used to store raw data, including the VirtioNetHdr and the unsplit IP packet. The recommended size is 10 + 65535.
238 /// bufs and sizes are used to store the segmented IP packets. bufs.len == sizes.len > 65535/MTU
239 /// offset: Starting position
240 #[cfg(target_os = "linux")]
241 pub async fn recv_multiple<B: AsRef<[u8]> + AsMut<[u8]>>(
242 &self,
243 original_buffer: &mut [u8],
244 bufs: &mut [B],
245 sizes: &mut [usize],
246 offset: usize,
247 ) -> io::Result<usize> {
248 if bufs.is_empty() || bufs.len() != sizes.len() {
249 return Err(io::Error::other("bufs error"));
250 }
251 let tun = self.get_ref();
252 if tun.vnet_hdr {
253 let len = self.recv(original_buffer).await?;
254 if len <= VIRTIO_NET_HDR_LEN {
255 Err(io::Error::other(format!(
256 "length of packet ({len}) <= VIRTIO_NET_HDR_LEN ({VIRTIO_NET_HDR_LEN})",
257 )))?
258 }
259 let hdr = VirtioNetHdr::decode(&original_buffer[..VIRTIO_NET_HDR_LEN])?;
260 tun.handle_virtio_read(
261 hdr,
262 &mut original_buffer[VIRTIO_NET_HDR_LEN..len],
263 bufs,
264 sizes,
265 offset,
266 )
267 } else {
268 let len = self.recv(&mut bufs[0].as_mut()[offset..]).await?;
269 sizes[0] = len;
270 Ok(1)
271 }
272 }
273 /// send multiple fragmented data packets.
274 /// GROTable can be reused, as it is used to assist in data merging.
275 /// Offset is the starting position of the data. Need to meet offset>10.
276 #[cfg(target_os = "linux")]
277 pub async fn send_multiple<B: crate::platform::ExpandBuffer>(
278 &self,
279 gro_table: &mut GROTable,
280 bufs: &mut [B],
281 mut offset: usize,
282 ) -> io::Result<usize> {
283 gro_table.reset();
284 let tun = self.get_ref();
285 if tun.vnet_hdr {
286 handle_gro(
287 bufs,
288 offset,
289 &mut gro_table.tcp_gro_table,
290 &mut gro_table.udp_gro_table,
291 tun.udp_gso,
292 &mut gro_table.to_write,
293 )?;
294 offset -= VIRTIO_NET_HDR_LEN;
295 } else {
296 for i in 0..bufs.len() {
297 gro_table.to_write.push(i);
298 }
299 }
300
301 let mut total = 0;
302 let mut err = Ok(());
303 for buf_idx in &gro_table.to_write {
304 match self.send(&bufs[*buf_idx].as_ref()[offset..]).await {
305 Ok(n) => {
306 total += n;
307 }
308 Err(e) => {
309 if let Some(code) = e.raw_os_error() {
310 if libc::EBADFD == code {
311 return Err(e);
312 }
313 }
314 err = Err(e)
315 }
316 }
317 }
318 err?;
319 Ok(total)
320 }
321}