tun_rs/async_device/unix/
mod.rs

1#[cfg(all(target_os = "linux", not(target_env = "ohos")))]
2use crate::platform::offload::{handle_gro, VirtioNetHdr, VIRTIO_NET_HDR_LEN};
3use crate::platform::DeviceImpl;
4#[cfg(all(target_os = "linux", not(target_env = "ohos")))]
5use crate::platform::GROTable;
6use crate::SyncDevice;
7use std::io;
8use std::io::{IoSlice, IoSliceMut};
9use std::ops::Deref;
10use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
11
12#[cfg(feature = "async_tokio")]
13mod tokio;
14#[cfg(feature = "async_tokio")]
15pub use self::tokio::AsyncDevice;
16
17#[cfg(all(feature = "async_io", not(feature = "async_tokio")))]
18mod async_io;
19#[cfg(all(feature = "async_io", not(feature = "async_tokio")))]
20pub use self::async_io::AsyncDevice;
21
22impl FromRawFd for AsyncDevice {
23    unsafe fn from_raw_fd(fd: RawFd) -> Self {
24        AsyncDevice::from_fd(fd).unwrap()
25    }
26}
27impl IntoRawFd for AsyncDevice {
28    fn into_raw_fd(self) -> RawFd {
29        self.into_fd().unwrap()
30    }
31}
32impl AsRawFd for AsyncDevice {
33    fn as_raw_fd(&self) -> RawFd {
34        self.get_ref().as_raw_fd()
35    }
36}
37
38impl Deref for AsyncDevice {
39    type Target = DeviceImpl;
40
41    fn deref(&self) -> &Self::Target {
42        self.get_ref()
43    }
44}
45
46impl AsyncDevice {
47    #[allow(dead_code)]
48    pub fn new(device: SyncDevice) -> io::Result<AsyncDevice> {
49        AsyncDevice::new_dev(device.0)
50    }
51
52    /// # Safety
53    /// This method is safe if the provided fd is valid
54    /// Construct a AsyncDevice from an existing file descriptor
55    pub unsafe fn from_fd(fd: RawFd) -> io::Result<AsyncDevice> {
56        AsyncDevice::new_dev(DeviceImpl::from_fd(fd)?)
57    }
58    pub fn into_fd(self) -> io::Result<RawFd> {
59        Ok(self.into_device()?.into_raw_fd())
60    }
61    /// Waits for the device to become readable.
62    ///
63    /// This function is usually paired with `try_recv()`.
64    ///
65    /// The function may complete without the device being readable. This is a
66    /// false-positive and attempting a `try_recv()` will return with
67    /// `io::ErrorKind::WouldBlock`.
68    ///
69    /// # Cancel safety
70    ///
71    /// This method is cancel safe. Once a readiness event occurs, the method
72    /// will continue to return immediately until the readiness event is
73    /// consumed by an attempt to read that fails with `WouldBlock` or
74    /// `Poll::Pending`.
75    pub async fn readable(&self) -> io::Result<()> {
76        self.0.readable().await.map(|_| ())
77    }
78    /// Waits for the device to become writable.
79    ///
80    /// This function is usually paired with `try_send()`.
81    ///
82    /// The function may complete without the device being writable. This is a
83    /// false-positive and attempting a `try_send()` will return with
84    /// `io::ErrorKind::WouldBlock`.
85    ///
86    /// # Cancel safety
87    ///
88    /// This method is cancel safe. Once a readiness event occurs, the method
89    /// will continue to return immediately until the readiness event is
90    /// consumed by an attempt to write that fails with `WouldBlock` or
91    /// `Poll::Pending`.
92    pub async fn writable(&self) -> io::Result<()> {
93        self.0.writable().await.map(|_| ())
94    }
95    /// Receives a single packet from the device.
96    /// On success, returns the number of bytes read.
97    ///
98    /// The function must be called with valid byte array `buf` of sufficient
99    /// size to hold the message bytes. If a message is too long to fit in the
100    /// supplied buffer, excess bytes may be discarded.
101    pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
102        self.read_with(|device| device.recv(buf)).await
103    }
104    /// Tries to receive a single packet from the device.
105    /// On success, returns the number of bytes read.
106    ///
107    /// This method must be called with valid byte array `buf` of sufficient size
108    /// to hold the message bytes. If a message is too long to fit in the
109    /// supplied buffer, excess bytes may be discarded.
110    ///
111    /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
112    /// returned. This function is usually paired with `readable()`.
113    pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
114        self.try_read_io(|device| device.recv(buf))
115    }
116
117    /// Send a packet to the device
118    ///
119    /// # Return
120    /// On success, the number of bytes sent is returned, otherwise, the encountered error is returned.
121    pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
122        self.write_with(|device| device.send(buf)).await
123    }
124    /// Tries to send packet to the device.
125    ///
126    /// When the device buffer is full, `Err(io::ErrorKind::WouldBlock)` is
127    /// returned. This function is usually paired with `writable()`.
128    ///
129    /// # Returns
130    ///
131    /// If successful, `Ok(n)` is returned, where `n` is the number of bytes
132    /// sent. If the device is not ready to send data,
133    /// `Err(ErrorKind::WouldBlock)` is returned.
134    pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
135        self.try_write_io(|device| device.send(buf))
136    }
137    /// Receives a packet into multiple buffers (scatter read).
138    /// **Processes single packet per call**.
139    pub async fn recv_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
140        self.read_with(|device| device.recv_vectored(bufs)).await
141    }
142    /// Non-blocking version of `recv_vectored`.
143    pub fn try_recv_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
144        self.try_read_io(|device| device.recv_vectored(bufs))
145    }
146    /// Sends multiple buffers as a single packet (gather write).
147    pub async fn send_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
148        self.write_with(|device| device.send_vectored(bufs)).await
149    }
150    /// Non-blocking version of `send_vectored`.
151    pub fn try_send_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
152        self.try_write_io(|device| device.send_vectored(bufs))
153    }
154}
155
156#[cfg(all(target_os = "linux", not(target_env = "ohos")))]
157impl AsyncDevice {
158    /// # Prerequisites
159    /// - The `IFF_MULTI_QUEUE` flag must be enabled.
160    /// - The system must support network interface multi-queue functionality.
161    ///
162    /// # Description
163    /// When multi-queue is enabled, create a new queue by duplicating an existing one.
164    pub fn try_clone(&self) -> io::Result<Self> {
165        AsyncDevice::new_dev(self.get_ref().try_clone()?)
166    }
167    /// Recv a packet from the device.
168    /// If offload is enabled. This method can be used to obtain processed data.
169    ///
170    /// original_buffer is used to store raw data, including the VirtioNetHdr and the unsplit IP packet. The recommended size is 10 + 65535.
171    /// bufs and sizes are used to store the segmented IP packets. bufs.len == sizes.len > 65535/MTU
172    /// offset: Starting position
173    #[cfg(target_os = "linux")]
174    pub async fn recv_multiple<B: AsRef<[u8]> + AsMut<[u8]>>(
175        &self,
176        original_buffer: &mut [u8],
177        bufs: &mut [B],
178        sizes: &mut [usize],
179        offset: usize,
180    ) -> io::Result<usize> {
181        if bufs.is_empty() || bufs.len() != sizes.len() {
182            return Err(io::Error::other("bufs error"));
183        }
184        let tun = self.get_ref();
185        if tun.vnet_hdr {
186            let len = self.recv(original_buffer).await?;
187            if len <= VIRTIO_NET_HDR_LEN {
188                Err(io::Error::other(format!(
189                    "length of packet ({len}) <= VIRTIO_NET_HDR_LEN ({VIRTIO_NET_HDR_LEN})",
190                )))?
191            }
192            let hdr = VirtioNetHdr::decode(&original_buffer[..VIRTIO_NET_HDR_LEN])?;
193            tun.handle_virtio_read(
194                hdr,
195                &mut original_buffer[VIRTIO_NET_HDR_LEN..len],
196                bufs,
197                sizes,
198                offset,
199            )
200        } else {
201            let len = self.recv(&mut bufs[0].as_mut()[offset..]).await?;
202            sizes[0] = len;
203            Ok(1)
204        }
205    }
206    /// send multiple fragmented data packets.
207    /// GROTable can be reused, as it is used to assist in data merging.
208    /// Offset is the starting position of the data. Need to meet offset>10.
209    #[cfg(target_os = "linux")]
210    pub async fn send_multiple<B: crate::platform::ExpandBuffer>(
211        &self,
212        gro_table: &mut GROTable,
213        bufs: &mut [B],
214        mut offset: usize,
215    ) -> io::Result<usize> {
216        gro_table.reset();
217        let tun = self.get_ref();
218        if tun.vnet_hdr {
219            handle_gro(
220                bufs,
221                offset,
222                &mut gro_table.tcp_gro_table,
223                &mut gro_table.udp_gro_table,
224                tun.udp_gso,
225                &mut gro_table.to_write,
226            )?;
227            offset -= VIRTIO_NET_HDR_LEN;
228        } else {
229            for i in 0..bufs.len() {
230                gro_table.to_write.push(i);
231            }
232        }
233
234        let mut total = 0;
235        let mut err = Ok(());
236        for buf_idx in &gro_table.to_write {
237            match self.send(&bufs[*buf_idx].as_ref()[offset..]).await {
238                Ok(n) => {
239                    total += n;
240                }
241                Err(e) => {
242                    if let Some(code) = e.raw_os_error() {
243                        if libc::EBADFD == code {
244                            return Err(e);
245                        }
246                    }
247                    err = Err(e)
248                }
249            }
250        }
251        err?;
252        Ok(total)
253    }
254}