tun_rs/async_device/unix/mod.rs
1#[cfg(all(target_os = "linux", not(target_env = "ohos")))]
2use crate::platform::offload::{handle_gro, VirtioNetHdr, VIRTIO_NET_HDR_LEN};
3use crate::platform::DeviceImpl;
4#[cfg(all(target_os = "linux", not(target_env = "ohos")))]
5use crate::platform::GROTable;
6use crate::SyncDevice;
7use std::io;
8use std::io::{IoSlice, IoSliceMut};
9use std::ops::Deref;
10use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
11
12#[cfg(feature = "async_tokio")]
13mod tokio;
14#[cfg(feature = "async_tokio")]
15pub use self::tokio::AsyncDevice;
16
17#[cfg(all(feature = "async_io", not(feature = "async_tokio")))]
18mod async_io;
19#[cfg(all(feature = "async_io", not(feature = "async_tokio")))]
20pub use self::async_io::AsyncDevice;
21
22impl FromRawFd for AsyncDevice {
23 unsafe fn from_raw_fd(fd: RawFd) -> Self {
24 AsyncDevice::from_fd(fd).unwrap()
25 }
26}
27impl IntoRawFd for AsyncDevice {
28 fn into_raw_fd(self) -> RawFd {
29 self.into_fd().unwrap()
30 }
31}
32impl AsRawFd for AsyncDevice {
33 fn as_raw_fd(&self) -> RawFd {
34 self.get_ref().as_raw_fd()
35 }
36}
37
38impl Deref for AsyncDevice {
39 type Target = DeviceImpl;
40
41 fn deref(&self) -> &Self::Target {
42 self.get_ref()
43 }
44}
45
46impl AsyncDevice {
47 #[allow(dead_code)]
48 pub fn new(device: SyncDevice) -> io::Result<AsyncDevice> {
49 AsyncDevice::new_dev(device.0)
50 }
51
52 /// # Safety
53 /// This method is safe if the provided fd is valid
54 /// Construct a AsyncDevice from an existing file descriptor
55 pub unsafe fn from_fd(fd: RawFd) -> io::Result<AsyncDevice> {
56 AsyncDevice::new_dev(DeviceImpl::from_fd(fd)?)
57 }
58 pub fn into_fd(self) -> io::Result<RawFd> {
59 Ok(self.into_device()?.into_raw_fd())
60 }
61 /// Waits for the device to become readable.
62 ///
63 /// This function is usually paired with `try_recv()`.
64 ///
65 /// The function may complete without the device being readable. This is a
66 /// false-positive and attempting a `try_recv()` will return with
67 /// `io::ErrorKind::WouldBlock`.
68 ///
69 /// # Cancel safety
70 ///
71 /// This method is cancel safe. Once a readiness event occurs, the method
72 /// will continue to return immediately until the readiness event is
73 /// consumed by an attempt to read that fails with `WouldBlock` or
74 /// `Poll::Pending`.
75 pub async fn readable(&self) -> io::Result<()> {
76 self.0.readable().await.map(|_| ())
77 }
78 /// Waits for the device to become writable.
79 ///
80 /// This function is usually paired with `try_send()`.
81 ///
82 /// The function may complete without the device being writable. This is a
83 /// false-positive and attempting a `try_send()` will return with
84 /// `io::ErrorKind::WouldBlock`.
85 ///
86 /// # Cancel safety
87 ///
88 /// This method is cancel safe. Once a readiness event occurs, the method
89 /// will continue to return immediately until the readiness event is
90 /// consumed by an attempt to write that fails with `WouldBlock` or
91 /// `Poll::Pending`.
92 pub async fn writable(&self) -> io::Result<()> {
93 self.0.writable().await.map(|_| ())
94 }
95 /// Receives a single packet from the device.
96 /// On success, returns the number of bytes read.
97 ///
98 /// The function must be called with valid byte array `buf` of sufficient
99 /// size to hold the message bytes. If a message is too long to fit in the
100 /// supplied buffer, excess bytes may be discarded.
101 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
102 self.read_with(|device| device.recv(buf)).await
103 }
104 /// Tries to receive a single packet from the device.
105 /// On success, returns the number of bytes read.
106 ///
107 /// This method must be called with valid byte array `buf` of sufficient size
108 /// to hold the message bytes. If a message is too long to fit in the
109 /// supplied buffer, excess bytes may be discarded.
110 ///
111 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
112 /// returned. This function is usually paired with `readable()`.
113 pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
114 self.try_read_io(|device| device.recv(buf))
115 }
116
117 /// Send a packet to the device
118 ///
119 /// # Return
120 /// On success, the number of bytes sent is returned, otherwise, the encountered error is returned.
121 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
122 self.write_with(|device| device.send(buf)).await
123 }
124 /// Tries to send packet to the device.
125 ///
126 /// When the device buffer is full, `Err(io::ErrorKind::WouldBlock)` is
127 /// returned. This function is usually paired with `writable()`.
128 ///
129 /// # Returns
130 ///
131 /// If successful, `Ok(n)` is returned, where `n` is the number of bytes
132 /// sent. If the device is not ready to send data,
133 /// `Err(ErrorKind::WouldBlock)` is returned.
134 pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
135 self.try_write_io(|device| device.send(buf))
136 }
137 /// Receives a packet into multiple buffers (scatter read).
138 /// **Processes single packet per call**.
139 pub async fn recv_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
140 self.read_with(|device| device.recv_vectored(bufs)).await
141 }
142 /// Non-blocking version of `recv_vectored`.
143 pub fn try_recv_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
144 self.try_read_io(|device| device.recv_vectored(bufs))
145 }
146 /// Sends multiple buffers as a single packet (gather write).
147 pub async fn send_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
148 self.write_with(|device| device.send_vectored(bufs)).await
149 }
150 /// Non-blocking version of `send_vectored`.
151 pub fn try_send_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
152 self.try_write_io(|device| device.send_vectored(bufs))
153 }
154}
155
156#[cfg(all(target_os = "linux", not(target_env = "ohos")))]
157impl AsyncDevice {
158 /// # Prerequisites
159 /// - The `IFF_MULTI_QUEUE` flag must be enabled.
160 /// - The system must support network interface multi-queue functionality.
161 ///
162 /// # Description
163 /// When multi-queue is enabled, create a new queue by duplicating an existing one.
164 pub fn try_clone(&self) -> io::Result<Self> {
165 AsyncDevice::new_dev(self.get_ref().try_clone()?)
166 }
167 /// Recv a packet from the device.
168 /// If offload is enabled. This method can be used to obtain processed data.
169 ///
170 /// original_buffer is used to store raw data, including the VirtioNetHdr and the unsplit IP packet. The recommended size is 10 + 65535.
171 /// bufs and sizes are used to store the segmented IP packets. bufs.len == sizes.len > 65535/MTU
172 /// offset: Starting position
173 #[cfg(target_os = "linux")]
174 pub async fn recv_multiple<B: AsRef<[u8]> + AsMut<[u8]>>(
175 &self,
176 original_buffer: &mut [u8],
177 bufs: &mut [B],
178 sizes: &mut [usize],
179 offset: usize,
180 ) -> io::Result<usize> {
181 if bufs.is_empty() || bufs.len() != sizes.len() {
182 return Err(io::Error::other("bufs error"));
183 }
184 let tun = self.get_ref();
185 if tun.vnet_hdr {
186 let len = self.recv(original_buffer).await?;
187 if len <= VIRTIO_NET_HDR_LEN {
188 Err(io::Error::other(format!(
189 "length of packet ({len}) <= VIRTIO_NET_HDR_LEN ({VIRTIO_NET_HDR_LEN})",
190 )))?
191 }
192 let hdr = VirtioNetHdr::decode(&original_buffer[..VIRTIO_NET_HDR_LEN])?;
193 tun.handle_virtio_read(
194 hdr,
195 &mut original_buffer[VIRTIO_NET_HDR_LEN..len],
196 bufs,
197 sizes,
198 offset,
199 )
200 } else {
201 let len = self.recv(&mut bufs[0].as_mut()[offset..]).await?;
202 sizes[0] = len;
203 Ok(1)
204 }
205 }
206 /// send multiple fragmented data packets.
207 /// GROTable can be reused, as it is used to assist in data merging.
208 /// Offset is the starting position of the data. Need to meet offset>10.
209 #[cfg(target_os = "linux")]
210 pub async fn send_multiple<B: crate::platform::ExpandBuffer>(
211 &self,
212 gro_table: &mut GROTable,
213 bufs: &mut [B],
214 mut offset: usize,
215 ) -> io::Result<usize> {
216 gro_table.reset();
217 let tun = self.get_ref();
218 if tun.vnet_hdr {
219 handle_gro(
220 bufs,
221 offset,
222 &mut gro_table.tcp_gro_table,
223 &mut gro_table.udp_gro_table,
224 tun.udp_gso,
225 &mut gro_table.to_write,
226 )?;
227 offset -= VIRTIO_NET_HDR_LEN;
228 } else {
229 for i in 0..bufs.len() {
230 gro_table.to_write.push(i);
231 }
232 }
233
234 let mut total = 0;
235 let mut err = Ok(());
236 for buf_idx in &gro_table.to_write {
237 match self.send(&bufs[*buf_idx].as_ref()[offset..]).await {
238 Ok(n) => {
239 total += n;
240 }
241 Err(e) => {
242 if let Some(code) = e.raw_os_error() {
243 if libc::EBADFD == code {
244 return Err(e);
245 }
246 }
247 err = Err(e)
248 }
249 }
250 }
251 err?;
252 Ok(total)
253 }
254}