tokio_uring/net/udp.rs
1use crate::{
2 buf::fixed::FixedBuf,
3 buf::{BoundedBuf, BoundedBufMut},
4 io::{SharedFd, Socket},
5 UnsubmittedWrite,
6};
7use socket2::SockAddr;
8use std::{
9 io,
10 net::SocketAddr,
11 os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
12};
13
14/// A UDP socket.
15///
16/// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket`
17/// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`:
18///
19/// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`)
20/// and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses
21/// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`write`](`UdpSocket::write`)
22/// and [`read`](`UdpSocket::read`) to communicate only with that remote address
23///
24/// # Examples
25/// Bind and connect a pair of sockets and send a packet:
26///
27/// ```
28/// use tokio_uring::net::UdpSocket;
29/// use std::net::SocketAddr;
30/// fn main() -> std::io::Result<()> {
31/// tokio_uring::start(async {
32/// let first_addr: SocketAddr = "127.0.0.1:2401".parse().unwrap();
33/// let second_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
34///
35/// // bind sockets
36/// let socket = UdpSocket::bind(first_addr.clone()).await?;
37/// let other_socket = UdpSocket::bind(second_addr.clone()).await?;
38///
39/// // connect sockets
40/// socket.connect(second_addr).await.unwrap();
41/// other_socket.connect(first_addr).await.unwrap();
42///
43/// let buf = vec![0; 32];
44///
45/// // write data
46/// let (result, _) = socket.write(b"hello world".as_slice()).submit().await;
47/// result.unwrap();
48///
49/// // read data
50/// let (result, buf) = other_socket.read(buf).await;
51/// let n_bytes = result.unwrap();
52///
53/// assert_eq!(b"hello world", &buf[..n_bytes]);
54///
55/// // write data using send on connected socket
56/// let (result, _) = socket.send(b"hello world via send".as_slice()).await;
57/// result.unwrap();
58///
59/// // read data
60/// let (result, buf) = other_socket.read(buf).await;
61/// let n_bytes = result.unwrap();
62///
63/// assert_eq!(b"hello world via send", &buf[..n_bytes]);
64///
65/// Ok(())
66/// })
67/// }
68/// ```
69/// Send and receive packets without connecting:
70///
71/// ```
72/// use tokio_uring::net::UdpSocket;
73/// use std::net::SocketAddr;
74/// fn main() -> std::io::Result<()> {
75/// tokio_uring::start(async {
76/// let first_addr: SocketAddr = "127.0.0.1:2401".parse().unwrap();
77/// let second_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
78///
79/// // bind sockets
80/// let socket = UdpSocket::bind(first_addr.clone()).await?;
81/// let other_socket = UdpSocket::bind(second_addr.clone()).await?;
82///
83/// let buf = vec![0; 32];
84///
85/// // write data
86/// let (result, _) = socket.send_to(b"hello world".as_slice(), second_addr).await;
87/// result.unwrap();
88///
89/// // read data
90/// let (result, buf) = other_socket.recv_from(buf).await;
91/// let (n_bytes, addr) = result.unwrap();
92///
93/// assert_eq!(addr, first_addr);
94/// assert_eq!(b"hello world", &buf[..n_bytes]);
95///
96/// Ok(())
97/// })
98/// }
99/// ```
100pub struct UdpSocket {
101 pub(super) inner: Socket,
102}
103
104impl UdpSocket {
105 /// Creates a new UDP socket and attempt to bind it to the addr provided.
106 ///
107 /// Returns a new instance of [`UdpSocket`] on success,
108 /// or an [`io::Error`](std::io::Error) on failure.
109 pub async fn bind(socket_addr: SocketAddr) -> io::Result<UdpSocket> {
110 let socket = Socket::bind(socket_addr, libc::SOCK_DGRAM)?;
111 Ok(UdpSocket { inner: socket })
112 }
113
114 /// Returns the local address to which this UDP socket is bound.
115 ///
116 /// This can be useful, for example, when binding to port 0 to
117 /// figure out which port was actually bound.
118 ///
119 /// # Examples
120 ///
121 /// ```
122 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
123 /// use tokio_uring::net::UdpSocket;
124 ///
125 /// tokio_uring::start(async {
126 /// let socket = UdpSocket::bind("127.0.0.1:8080".parse().unwrap()).await.unwrap();
127 /// let addr = socket.local_addr().expect("Couldn't get local address");
128 /// assert_eq!(addr, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080)));
129 /// });
130 /// ```
131 pub fn local_addr(&self) -> io::Result<SocketAddr> {
132 let fd = self.inner.as_raw_fd();
133 // SAFETY: Our fd is the handle the kernel has given us for a UdpSocket.
134 // Create a std::net::UdpSocket long enough to call its local_addr method
135 // and then forget it so the socket is not closed here.
136 let s = unsafe { std::net::UdpSocket::from_raw_fd(fd) };
137 let local_addr = s.local_addr();
138 std::mem::forget(s);
139 local_addr
140 }
141
142 /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`.
143 ///
144 /// This function is intended to be used to wrap a UDP socket from the
145 /// standard library in the tokio-uring equivalent. The conversion assumes nothing
146 /// about the underlying socket; it is left up to the user to decide what socket
147 /// options are appropriate for their use case.
148 ///
149 /// This can be used in conjunction with socket2's `Socket` interface to
150 /// configure a socket before it's handed off, such as setting options like
151 /// `reuse_address` or binding to multiple addresses.
152 ///
153 /// # Example
154 ///
155 /// ```
156 /// use socket2::{Protocol, Socket, Type};
157 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
158 /// use tokio_uring::net::UdpSocket;
159 ///
160 /// fn main() -> std::io::Result<()> {
161 /// tokio_uring::start(async {
162 /// let std_addr: SocketAddr = "127.0.0.1:2401".parse().unwrap();
163 /// let second_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
164 /// let sock = Socket::new(socket2::Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
165 /// sock.set_reuse_port(true)?;
166 /// sock.set_nonblocking(true)?;
167 /// sock.bind(&std_addr.into())?;
168 ///
169 /// let std_socket = UdpSocket::from_std(sock.into());
170 /// let other_socket = UdpSocket::bind(second_addr).await?;
171 ///
172 /// let buf = vec![0; 32];
173 ///
174 /// // write data
175 /// let (result, _) = std_socket
176 /// .send_to(b"hello world".as_slice(), second_addr)
177 /// .await;
178 /// result.unwrap();
179 ///
180 /// // read data
181 /// let (result, buf) = other_socket.recv_from(buf).await;
182 /// let (n_bytes, addr) = result.unwrap();
183 ///
184 /// assert_eq!(addr, std_addr);
185 /// assert_eq!(b"hello world", &buf[..n_bytes]);
186 ///
187 /// Ok(())
188 /// })
189 /// }
190 /// ```
191 pub fn from_std(socket: std::net::UdpSocket) -> Self {
192 let inner = Socket::from_std(socket);
193 Self { inner }
194 }
195
196 pub(crate) fn from_socket(inner: Socket) -> Self {
197 Self { inner }
198 }
199
200 /// "Connects" this UDP socket to a remote address.
201 ///
202 /// This enables `write` and `read` syscalls to be used on this instance.
203 /// It also constrains the `read` to receive data only from the specified remote peer.
204 ///
205 /// Note: UDP is connectionless, so a successful `connect` call does not execute
206 /// a handshake or validation of the remote peer of any kind.
207 /// Any errors would not be detected until the first send.
208 pub async fn connect(&self, socket_addr: SocketAddr) -> io::Result<()> {
209 self.inner.connect(SockAddr::from(socket_addr)).await
210 }
211
212 /// Sends data on the connected socket
213 ///
214 /// On success, returns the number of bytes written.
215 pub async fn send<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
216 self.inner.send_to(buf, None).await
217 }
218
219 /// Sends data on the socket to the given address.
220 ///
221 /// On success, returns the number of bytes written.
222 pub async fn send_to<T: BoundedBuf>(
223 &self,
224 buf: T,
225 socket_addr: SocketAddr,
226 ) -> crate::BufResult<usize, T> {
227 self.inner.send_to(buf, Some(socket_addr)).await
228 }
229
230 /// Sends data on the socket. Will attempt to do so without intermediate copies.
231 ///
232 /// On success, returns the number of bytes written.
233 ///
234 /// See the linux [kernel docs](https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html)
235 /// for a discussion on when this might be appropriate. In particular:
236 ///
237 /// > Copy avoidance is not a free lunch. As implemented, with page pinning,
238 /// > it replaces per byte copy cost with page accounting and completion
239 /// > notification overhead. As a result, zero copy is generally only effective
240 /// > at writes over around 10 KB.
241 ///
242 /// Note: Using fixed buffers [#54](https://github.com/tokio-rs/tokio-uring/pull/54), avoids the page-pinning overhead
243 pub async fn send_zc<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
244 self.inner.send_zc(buf).await
245 }
246
247 /// Sends a message on the socket using a msghdr.
248 ///
249 /// Returns a tuple of:
250 ///
251 /// * Result containing bytes written on success
252 /// * The original `io_slices` `Vec<T>`
253 /// * The original `msg_contol` `Option<U>`
254 ///
255 /// Consider using [`Self::sendmsg_zc`] for a zero-copy alternative.
256 pub async fn sendmsg<T: BoundedBuf, U: BoundedBuf>(
257 &self,
258 io_slices: Vec<T>,
259 socket_addr: Option<SocketAddr>,
260 msg_control: Option<U>,
261 ) -> (io::Result<usize>, Vec<T>, Option<U>) {
262 self.inner
263 .sendmsg(io_slices, socket_addr, msg_control)
264 .await
265 }
266
267 /// Sends a message on the socket using a msghdr.
268 ///
269 /// Returns a tuple of:
270 ///
271 /// * Result containing bytes written on success
272 /// * The original `io_slices` `Vec<T>`
273 /// * The original `msg_contol` `Option<U>`
274 ///
275 /// See the linux [kernel docs](https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html)
276 /// for a discussion on when this might be appropriate. In particular:
277 ///
278 /// > Copy avoidance is not a free lunch. As implemented, with page pinning,
279 /// > it replaces per byte copy cost with page accounting and completion
280 /// > notification overhead. As a result, zero copy is generally only effective
281 /// > at writes over around 10 KB.
282 ///
283 /// Can be used with socket_addr: None on connected sockets, which can have performance
284 /// benefits if multiple datagrams are sent to the same destination address.
285 pub async fn sendmsg_zc<T: BoundedBuf, U: BoundedBuf>(
286 &self,
287 io_slices: Vec<T>,
288 socket_addr: Option<SocketAddr>,
289 msg_control: Option<U>,
290 ) -> (io::Result<usize>, Vec<T>, Option<U>) {
291 self.inner
292 .sendmsg_zc(io_slices, socket_addr, msg_control)
293 .await
294 }
295
296 /// Receives a single datagram message on the socket.
297 ///
298 /// On success, returns the number of bytes read and the origin.
299 pub async fn recv_from<T: BoundedBufMut>(
300 &self,
301 buf: T,
302 ) -> crate::BufResult<(usize, SocketAddr), T> {
303 self.inner.recv_from(buf).await
304 }
305
306 /// Receives a single datagram message on the socket, into multiple buffers
307 ///
308 /// On success, returns the number of bytes read and the origin.
309 pub async fn recvmsg<T: BoundedBufMut>(
310 &self,
311 buf: Vec<T>,
312 ) -> crate::BufResult<(usize, SocketAddr), Vec<T>> {
313 self.inner.recvmsg(buf).await
314 }
315
316 /// Reads a packet of data from the socket into the buffer.
317 ///
318 /// Returns the original buffer and quantity of data read.
319 pub async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
320 self.inner.read(buf).await
321 }
322
323 /// Receives a single datagram message into a registered buffer.
324 ///
325 /// Like [`read`], but using a pre-mapped buffer
326 /// registered with [`FixedBufRegistry`].
327 ///
328 /// [`read`]: Self::read
329 /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
330 ///
331 /// # Errors
332 ///
333 /// In addition to errors that can be reported by `read`,
334 /// this operation fails if the buffer is not registered in the
335 /// current `tokio-uring` runtime.
336 pub async fn read_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
337 where
338 T: BoundedBufMut<BufMut = FixedBuf>,
339 {
340 self.inner.read_fixed(buf).await
341 }
342
343 /// Writes data into the socket from the specified buffer.
344 ///
345 /// Returns the original buffer and quantity of data written.
346 pub fn write<T: BoundedBuf>(&self, buf: T) -> UnsubmittedWrite<T> {
347 self.inner.write(buf)
348 }
349
350 /// Writes data into the socket from a registered buffer.
351 ///
352 /// Like [`write`], but using a pre-mapped buffer
353 /// registered with [`FixedBufRegistry`].
354 ///
355 /// [`write`]: Self::write
356 /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
357 ///
358 /// # Errors
359 ///
360 /// In addition to errors that can be reported by `write`,
361 /// this operation fails if the buffer is not registered in the
362 /// current `tokio-uring` runtime.
363 pub async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
364 where
365 T: BoundedBuf<Buf = FixedBuf>,
366 {
367 self.inner.write_fixed(buf).await
368 }
369
370 /// Shuts down the read, write, or both halves of this connection.
371 ///
372 /// This function causes all pending and future I/O on the specified portions to return
373 /// immediately with an appropriate value.
374 pub fn shutdown(&self, how: std::net::Shutdown) -> io::Result<()> {
375 self.inner.shutdown(how)
376 }
377}
378
379impl FromRawFd for UdpSocket {
380 unsafe fn from_raw_fd(fd: RawFd) -> Self {
381 UdpSocket::from_socket(Socket::from_shared_fd(SharedFd::new(fd)))
382 }
383}
384
385impl AsRawFd for UdpSocket {
386 fn as_raw_fd(&self) -> RawFd {
387 self.inner.as_raw_fd()
388 }
389}