tokio_uring/net/
udp.rs

1use crate::{
2    buf::fixed::FixedBuf,
3    buf::{BoundedBuf, BoundedBufMut},
4    io::{SharedFd, Socket},
5    UnsubmittedWrite,
6};
7use socket2::SockAddr;
8use std::{
9    io,
10    net::SocketAddr,
11    os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
12};
13
14/// A UDP socket.
15///
16/// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket`
17/// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`:
18///
19/// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`)
20///   and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses
21/// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`write`](`UdpSocket::write`)
22///   and [`read`](`UdpSocket::read`) to communicate only with that remote address
23///
24/// # Examples
25/// Bind and connect a pair of sockets and send a packet:
26///
27/// ```
28/// use tokio_uring::net::UdpSocket;
29/// use std::net::SocketAddr;
30/// fn main() -> std::io::Result<()> {
31///     tokio_uring::start(async {
32///         let first_addr: SocketAddr = "127.0.0.1:2401".parse().unwrap();
33///         let second_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
34///
35///         // bind sockets
36///         let socket = UdpSocket::bind(first_addr.clone()).await?;
37///         let other_socket = UdpSocket::bind(second_addr.clone()).await?;
38///
39///         // connect sockets
40///         socket.connect(second_addr).await.unwrap();
41///         other_socket.connect(first_addr).await.unwrap();
42///
43///         let buf = vec![0; 32];
44///
45///         // write data
46///         let (result, _) = socket.write(b"hello world".as_slice()).submit().await;
47///         result.unwrap();
48///
49///         // read data
50///         let (result, buf) = other_socket.read(buf).await;
51///         let n_bytes = result.unwrap();
52///
53///         assert_eq!(b"hello world", &buf[..n_bytes]);
54///
55///         // write data using send on connected socket
56///         let (result, _) = socket.send(b"hello world via send".as_slice()).await;
57///         result.unwrap();
58///
59///         // read data
60///         let (result, buf) = other_socket.read(buf).await;
61///         let n_bytes = result.unwrap();
62///
63///         assert_eq!(b"hello world via send", &buf[..n_bytes]);
64///
65///         Ok(())
66///     })
67/// }
68/// ```
69/// Send and receive packets without connecting:
70///
71/// ```
72/// use tokio_uring::net::UdpSocket;
73/// use std::net::SocketAddr;
74/// fn main() -> std::io::Result<()> {
75///     tokio_uring::start(async {
76///         let first_addr: SocketAddr = "127.0.0.1:2401".parse().unwrap();
77///         let second_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
78///
79///         // bind sockets
80///         let socket = UdpSocket::bind(first_addr.clone()).await?;
81///         let other_socket = UdpSocket::bind(second_addr.clone()).await?;
82///
83///         let buf = vec![0; 32];
84///
85///         // write data
86///         let (result, _) = socket.send_to(b"hello world".as_slice(), second_addr).await;
87///         result.unwrap();
88///
89///         // read data
90///         let (result, buf) = other_socket.recv_from(buf).await;
91///         let (n_bytes, addr) = result.unwrap();
92///
93///         assert_eq!(addr, first_addr);
94///         assert_eq!(b"hello world", &buf[..n_bytes]);
95///
96///         Ok(())
97///     })
98/// }
99/// ```
100pub struct UdpSocket {
101    pub(super) inner: Socket,
102}
103
104impl UdpSocket {
105    /// Creates a new UDP socket and attempt to bind it to the addr provided.
106    ///
107    /// Returns a new instance of [`UdpSocket`] on success,
108    /// or an [`io::Error`](std::io::Error) on failure.
109    pub async fn bind(socket_addr: SocketAddr) -> io::Result<UdpSocket> {
110        let socket = Socket::bind(socket_addr, libc::SOCK_DGRAM)?;
111        Ok(UdpSocket { inner: socket })
112    }
113
114    /// Returns the local address to which this UDP socket is bound.
115    ///
116    /// This can be useful, for example, when binding to port 0 to
117    /// figure out which port was actually bound.
118    ///
119    /// # Examples
120    ///
121    /// ```
122    /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
123    /// use tokio_uring::net::UdpSocket;
124    ///
125    /// tokio_uring::start(async {
126    ///     let socket = UdpSocket::bind("127.0.0.1:8080".parse().unwrap()).await.unwrap();
127    ///     let addr = socket.local_addr().expect("Couldn't get local address");
128    ///     assert_eq!(addr, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080)));
129    /// });
130    /// ```
131    pub fn local_addr(&self) -> io::Result<SocketAddr> {
132        let fd = self.inner.as_raw_fd();
133        // SAFETY: Our fd is the handle the kernel has given us for a UdpSocket.
134        // Create a std::net::UdpSocket long enough to call its local_addr method
135        // and then forget it so the socket is not closed here.
136        let s = unsafe { std::net::UdpSocket::from_raw_fd(fd) };
137        let local_addr = s.local_addr();
138        std::mem::forget(s);
139        local_addr
140    }
141
142    /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`.
143    ///
144    /// This function is intended to be used to wrap a UDP socket from the
145    /// standard library in the tokio-uring equivalent. The conversion assumes nothing
146    /// about the underlying socket; it is left up to the user to decide what socket
147    /// options are appropriate for their use case.
148    ///
149    /// This can be used in conjunction with socket2's `Socket` interface to
150    /// configure a socket before it's handed off, such as setting options like
151    /// `reuse_address` or binding to multiple addresses.
152    ///
153    /// # Example
154    ///
155    /// ```
156    /// use socket2::{Protocol, Socket, Type};
157    /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
158    /// use tokio_uring::net::UdpSocket;
159    ///
160    /// fn main() -> std::io::Result<()> {
161    ///     tokio_uring::start(async {
162    ///         let std_addr: SocketAddr = "127.0.0.1:2401".parse().unwrap();
163    ///         let second_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
164    ///         let sock = Socket::new(socket2::Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
165    ///         sock.set_reuse_port(true)?;
166    ///         sock.set_nonblocking(true)?;
167    ///         sock.bind(&std_addr.into())?;
168    ///
169    ///         let std_socket = UdpSocket::from_std(sock.into());
170    ///         let other_socket = UdpSocket::bind(second_addr).await?;
171    ///
172    ///         let buf = vec![0; 32];
173    ///
174    ///         // write data
175    ///         let (result, _) = std_socket
176    ///             .send_to(b"hello world".as_slice(), second_addr)
177    ///             .await;
178    ///         result.unwrap();
179    ///
180    ///         // read data
181    ///         let (result, buf) = other_socket.recv_from(buf).await;
182    ///         let (n_bytes, addr) = result.unwrap();
183    ///
184    ///         assert_eq!(addr, std_addr);
185    ///         assert_eq!(b"hello world", &buf[..n_bytes]);
186    ///
187    ///         Ok(())
188    ///     })
189    /// }
190    /// ```
191    pub fn from_std(socket: std::net::UdpSocket) -> Self {
192        let inner = Socket::from_std(socket);
193        Self { inner }
194    }
195
196    pub(crate) fn from_socket(inner: Socket) -> Self {
197        Self { inner }
198    }
199
200    /// "Connects" this UDP socket to a remote address.
201    ///
202    /// This enables `write` and `read` syscalls to be used on this instance.
203    /// It also constrains the `read` to receive data only from the specified remote peer.
204    ///
205    /// Note: UDP is connectionless, so a successful `connect` call does not execute
206    /// a handshake or validation of the remote peer of any kind.
207    /// Any errors would not be detected until the first send.
208    pub async fn connect(&self, socket_addr: SocketAddr) -> io::Result<()> {
209        self.inner.connect(SockAddr::from(socket_addr)).await
210    }
211
212    /// Sends data on the connected socket
213    ///
214    /// On success, returns the number of bytes written.
215    pub async fn send<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
216        self.inner.send_to(buf, None).await
217    }
218
219    /// Sends data on the socket to the given address.
220    ///
221    /// On success, returns the number of bytes written.
222    pub async fn send_to<T: BoundedBuf>(
223        &self,
224        buf: T,
225        socket_addr: SocketAddr,
226    ) -> crate::BufResult<usize, T> {
227        self.inner.send_to(buf, Some(socket_addr)).await
228    }
229
230    /// Sends data on the socket. Will attempt to do so without intermediate copies.
231    ///
232    /// On success, returns the number of bytes written.
233    ///
234    /// See the linux [kernel docs](https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html)
235    /// for a discussion on when this might be appropriate. In particular:
236    ///
237    /// > Copy avoidance is not a free lunch. As implemented, with page pinning,
238    /// > it replaces per byte copy cost with page accounting and completion
239    /// > notification overhead. As a result, zero copy is generally only effective
240    /// > at writes over around 10 KB.
241    ///
242    /// Note: Using fixed buffers [#54](https://github.com/tokio-rs/tokio-uring/pull/54), avoids the page-pinning overhead
243    pub async fn send_zc<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
244        self.inner.send_zc(buf).await
245    }
246
247    /// Sends a message on the socket using a msghdr.
248    ///
249    /// Returns a tuple of:
250    ///
251    /// * Result containing bytes written on success
252    /// * The original `io_slices` `Vec<T>`
253    /// * The original `msg_contol` `Option<U>`
254    ///
255    /// Consider using [`Self::sendmsg_zc`] for a zero-copy alternative.
256    pub async fn sendmsg<T: BoundedBuf, U: BoundedBuf>(
257        &self,
258        io_slices: Vec<T>,
259        socket_addr: Option<SocketAddr>,
260        msg_control: Option<U>,
261    ) -> (io::Result<usize>, Vec<T>, Option<U>) {
262        self.inner
263            .sendmsg(io_slices, socket_addr, msg_control)
264            .await
265    }
266
267    /// Sends a message on the socket using a msghdr.
268    ///
269    /// Returns a tuple of:
270    ///
271    /// * Result containing bytes written on success
272    /// * The original `io_slices` `Vec<T>`
273    /// * The original `msg_contol` `Option<U>`
274    ///
275    /// See the linux [kernel docs](https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html)
276    /// for a discussion on when this might be appropriate. In particular:
277    ///
278    /// > Copy avoidance is not a free lunch. As implemented, with page pinning,
279    /// > it replaces per byte copy cost with page accounting and completion
280    /// > notification overhead. As a result, zero copy is generally only effective
281    /// > at writes over around 10 KB.
282    ///
283    /// Can be used with socket_addr: None on connected sockets, which can have performance
284    /// benefits if multiple datagrams are sent to the same destination address.
285    pub async fn sendmsg_zc<T: BoundedBuf, U: BoundedBuf>(
286        &self,
287        io_slices: Vec<T>,
288        socket_addr: Option<SocketAddr>,
289        msg_control: Option<U>,
290    ) -> (io::Result<usize>, Vec<T>, Option<U>) {
291        self.inner
292            .sendmsg_zc(io_slices, socket_addr, msg_control)
293            .await
294    }
295
296    /// Receives a single datagram message on the socket.
297    ///
298    /// On success, returns the number of bytes read and the origin.
299    pub async fn recv_from<T: BoundedBufMut>(
300        &self,
301        buf: T,
302    ) -> crate::BufResult<(usize, SocketAddr), T> {
303        self.inner.recv_from(buf).await
304    }
305
306    /// Receives a single datagram message on the socket, into multiple buffers
307    ///
308    /// On success, returns the number of bytes read and the origin.
309    pub async fn recvmsg<T: BoundedBufMut>(
310        &self,
311        buf: Vec<T>,
312    ) -> crate::BufResult<(usize, SocketAddr), Vec<T>> {
313        self.inner.recvmsg(buf).await
314    }
315
316    /// Reads a packet of data from the socket into the buffer.
317    ///
318    /// Returns the original buffer and quantity of data read.
319    pub async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
320        self.inner.read(buf).await
321    }
322
323    /// Receives a single datagram message into a registered buffer.
324    ///
325    /// Like [`read`], but using a pre-mapped buffer
326    /// registered with [`FixedBufRegistry`].
327    ///
328    /// [`read`]: Self::read
329    /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
330    ///
331    /// # Errors
332    ///
333    /// In addition to errors that can be reported by `read`,
334    /// this operation fails if the buffer is not registered in the
335    /// current `tokio-uring` runtime.
336    pub async fn read_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
337    where
338        T: BoundedBufMut<BufMut = FixedBuf>,
339    {
340        self.inner.read_fixed(buf).await
341    }
342
343    /// Writes data into the socket from the specified buffer.
344    ///
345    /// Returns the original buffer and quantity of data written.
346    pub fn write<T: BoundedBuf>(&self, buf: T) -> UnsubmittedWrite<T> {
347        self.inner.write(buf)
348    }
349
350    /// Writes data into the socket from a registered buffer.
351    ///
352    /// Like [`write`], but using a pre-mapped buffer
353    /// registered with [`FixedBufRegistry`].
354    ///
355    /// [`write`]: Self::write
356    /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
357    ///
358    /// # Errors
359    ///
360    /// In addition to errors that can be reported by `write`,
361    /// this operation fails if the buffer is not registered in the
362    /// current `tokio-uring` runtime.
363    pub async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
364    where
365        T: BoundedBuf<Buf = FixedBuf>,
366    {
367        self.inner.write_fixed(buf).await
368    }
369
370    /// Shuts down the read, write, or both halves of this connection.
371    ///
372    /// This function causes all pending and future I/O on the specified portions to return
373    /// immediately with an appropriate value.
374    pub fn shutdown(&self, how: std::net::Shutdown) -> io::Result<()> {
375        self.inner.shutdown(how)
376    }
377}
378
379impl FromRawFd for UdpSocket {
380    unsafe fn from_raw_fd(fd: RawFd) -> Self {
381        UdpSocket::from_socket(Socket::from_shared_fd(SharedFd::new(fd)))
382    }
383}
384
385impl AsRawFd for UdpSocket {
386    fn as_raw_fd(&self) -> RawFd {
387        self.inner.as_raw_fd()
388    }
389}