datagram_socket/
datagram.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use buffer_pool::RawPoolBufDatagramIo;
28use futures_util::future::poll_fn;
29use futures_util::ready;
30use futures_util::FutureExt;
31use std::future::Future;
32use std::io;
33use std::net::Ipv4Addr;
34use std::net::SocketAddr;
35use std::net::SocketAddrV4;
36use std::sync::Arc;
37use std::task::Context;
38use std::task::Poll;
39use tokio::io::ReadBuf;
40use tokio::net::UdpSocket;
41
42#[cfg(unix)]
43use std::os::fd::AsFd;
44#[cfg(unix)]
45use std::os::fd::BorrowedFd;
46#[cfg(unix)]
47use std::os::fd::FromRawFd;
48#[cfg(unix)]
49use std::os::fd::IntoRawFd;
50#[cfg(unix)]
51use std::os::fd::OwnedFd;
52#[cfg(unix)]
53use tokio::net::UnixDatagram;
54
55use crate::socket_stats::AsSocketStats;
56
57// This is the largest datagram we expect to support.
58// UDP and Unix sockets can support larger datagrams than this, but we only
59// expect to support packets coming to/from the Internet.
60pub const MAX_DATAGRAM_SIZE: usize = 1500;
61
62pub trait DatagramSocketWithStats: DatagramSocket {}
63
64impl<T> DatagramSocketWithStats for T where T: DatagramSocket + AsSocketStats {}
65
66/// Describes an implementation of a connected datagram socket.
67///
68/// Rather than using Socket for datagram-oriented sockets, the DatagramSocket
69/// trait purposely does not implement AsyncRead/AsyncWrite, which are traits
70/// with stream semantics. For example, the `AsyncReadExt::read_exact` method
71/// which issues as many reads as possible to fill the buffer provided.
72///
73/// For a similar reason, [`std::net::UdpSocket`] does not implement
74/// [`io::Read`] nor does [`tokio::net::UdpSocket`] implement
75/// [`tokio::io::AsyncRead`].
76pub trait DatagramSocket:
77    DatagramSocketSend + DatagramSocketRecv + 'static
78{
79    #[cfg(unix)]
80    fn as_raw_io(&self) -> Option<BorrowedFd<'_>>;
81
82    #[cfg(unix)]
83    fn into_fd(self) -> Option<OwnedFd>;
84
85    fn as_buf_io(&mut self) -> Option<&mut dyn RawPoolBufDatagramIo> {
86        None
87    }
88}
89
90/// Describes the send half of a connected datagram socket.
91pub trait DatagramSocketSend: Sync {
92    /// Attempts to send data on the socket to the remote address to which it
93    /// was previously connected.
94    ///
95    /// Note that on multiple calls to a `poll_*` method in the send direction,
96    /// only the `Waker` from the `Context` passed to the most recent call will
97    /// be scheduled to receive a wakeup.
98    ///
99    /// # Return value
100    ///
101    /// The function returns:
102    ///
103    /// * `Poll::Pending` if the socket is not available to write
104    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
105    /// * `Poll::Ready(Err(e))` if an error is encountered.
106    ///
107    /// # Errors
108    ///
109    /// This function may encounter any standard I/O error except `WouldBlock`.
110    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>>;
111
112    /// Attempts to send data on the socket to a given address.
113    ///
114    /// If this socket only supports a single address, it should forward to
115    /// `send`. It should *not* panic or discard the data.
116    /// It's recommended that this return an error if `addr` doesn't match the
117    /// only supported address.
118    ///
119    /// Note that on multiple calls to a `poll_*` method in the send direction,
120    /// only the `Waker` from the `Context` passed to the most recent call
121    /// will be scheduled to receive a wakeup.
122    ///
123    /// # Return value
124    ///
125    /// The function returns:
126    ///
127    /// * `Poll::Pending` if the socket is not ready to write
128    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
129    /// * `Poll::Ready(Err(e))` if an error is encountered.
130    ///
131    /// # Errors
132    ///
133    /// This function may encounter any standard I/O error except `WouldBlock`.
134    fn poll_send_to(
135        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
136    ) -> Poll<io::Result<usize>>;
137
138    /// Attempts to send multiple packets of data on the socket to the remote
139    /// address to which it was previously connected.
140    ///
141    /// Note that on multiple calls to a `poll_*` method in the send direction,
142    /// only the `Waker` from the `Context` passed to the most recent call
143    /// will be scheduled to receive a wakeup.
144    ///
145    /// # Return value
146    ///
147    /// The function returns:
148    ///
149    /// * `Poll::Pending` if the socket is not ready to write
150    /// * `Poll::Ready(Ok(n))` `n` is the number of packets sent. If any packet
151    ///   was sent only partially, that information is lost.
152    /// * `Poll::Ready(Err(e))` if an error is encountered.
153    ///
154    /// # Errors
155    ///
156    /// This function may encounter any standard I/O error except `WouldBlock`.
157    fn poll_send_many(
158        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
159    ) -> Poll<io::Result<usize>> {
160        let mut sent = 0;
161
162        for buf in bufs {
163            match self.poll_send(cx, buf.filled()) {
164                Poll::Ready(Ok(_)) => sent += 1,
165                Poll::Ready(err) => {
166                    if sent == 0 {
167                        return Poll::Ready(err);
168                    }
169                    break;
170                },
171                Poll::Pending => {
172                    if sent == 0 {
173                        return Poll::Pending;
174                    }
175                    break;
176                },
177            }
178        }
179
180        Poll::Ready(Ok(sent))
181    }
182
183    /// If the underlying socket is a `UdpSocket`, return the reference to it.
184    fn as_udp_socket(&self) -> Option<&UdpSocket> {
185        None
186    }
187
188    /// Returns the socket address of the remote peer this socket was connected
189    /// to.
190    fn peer_addr(&self) -> Option<SocketAddr> {
191        None
192    }
193}
194
195/// Writes datagrams to a socket.
196///
197/// Implemented as an extension trait, adding utility methods to all
198/// [`DatagramSocketSend`] types. Callers will tend to import this trait instead
199/// of [`DatagramSocketSend`].
200///
201/// [`DatagramSocketSend`]: DatagramSocketSend
202pub trait DatagramSocketSendExt: DatagramSocketSend {
203    /// Sends data on the socket to the remote address that the socket is
204    /// connected to.
205    fn send(&self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> {
206        poll_fn(move |cx| self.poll_send(cx, buf))
207    }
208
209    /// Sends data on the socket to the given address. On success, returns the
210    /// number of bytes written.
211    fn send_to(
212        &self, buf: &[u8], addr: SocketAddr,
213    ) -> impl Future<Output = io::Result<usize>> {
214        poll_fn(move |cx| self.poll_send_to(cx, buf, addr))
215    }
216
217    /// Sends multiple data packets on the socket to the to the remote address
218    /// that the socket is connected to. On success, returns the number of
219    /// packets sent.
220    fn send_many(
221        &self, bufs: &[ReadBuf<'_>],
222    ) -> impl Future<Output = io::Result<usize>> {
223        poll_fn(move |cx| self.poll_send_many(cx, bufs))
224    }
225
226    fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
227        match poll_fn(|cx| self.poll_send(cx, buf)).now_or_never() {
228            Some(result) => result,
229            None => Err(io::ErrorKind::WouldBlock.into()),
230        }
231    }
232
233    fn try_send_many(&self, bufs: &[ReadBuf<'_>]) -> io::Result<usize> {
234        match poll_fn(|cx| self.poll_send_many(cx, bufs)).now_or_never() {
235            Some(result) => result,
236            None => Err(io::ErrorKind::WouldBlock.into()),
237        }
238    }
239}
240
241/// Describes the receive half of a connected datagram socket.
242pub trait DatagramSocketRecv: Send {
243    /// Attempts to receive a single datagram message on the socket from the
244    /// remote address to which it is `connect`ed.
245    ///
246    /// Note that on multiple calls to a `poll_*` method in the `recv`
247    /// direction, only the `Waker` from the `Context` passed to the most
248    /// recent call will be scheduled to receive a wakeup.
249    ///
250    /// # Return value
251    ///
252    /// The function returns:
253    ///
254    /// * `Poll::Pending` if the socket is not ready to read
255    /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
256    /// * `Poll::Ready(Err(e))` if an error is encountered.
257    ///
258    /// # Errors
259    ///
260    /// This function may encounter any standard I/O error except `WouldBlock`.
261    fn poll_recv(
262        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
263    ) -> Poll<io::Result<()>>;
264
265    /// Attempts to receive a single datagram on the socket.
266    ///
267    /// Note that on multiple calls to a `poll_*` method in the `recv`
268    /// direction, only the `Waker` from the `Context` passed to the most
269    /// recent call will be scheduled to receive a wakeup.
270    ///
271    /// # Return value
272    ///
273    /// The function returns:
274    ///
275    /// * `Poll::Pending` if the socket is not ready to read
276    /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the
277    ///   socket is ready
278    /// * `Poll::Ready(Err(e))` if an error is encountered.
279    ///
280    /// # Errors
281    ///
282    /// This function may encounter any standard I/O error except `WouldBlock`.
283    fn poll_recv_from(
284        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
285    ) -> Poll<io::Result<SocketAddr>> {
286        self.poll_recv(cx, buf).map_ok(|_| {
287            SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
288        })
289    }
290
291    /// Attempts to receive multiple datagrams on the socket from the remote
292    /// address to which it is `connect`ed.
293    ///
294    /// Note that on multiple calls to a `poll_*` method in the `recv`
295    /// direction, only the `Waker` from the `Context` passed to the most
296    /// recent call will be scheduled to receive a wakeup.
297    ///
298    /// # Return value
299    ///
300    /// The function returns:
301    ///
302    /// * `Poll::Pending` if the socket is not ready to read
303    /// * `Poll::Ready(Ok(n))` reads data `ReadBuf` if the socket is ready `n`
304    ///   is the number of datagrams read.
305    /// * `Poll::Ready(Err(e))` if an error is encountered.
306    ///
307    /// # Errors
308    ///
309    /// This function may encounter any standard I/O error except `WouldBlock`.
310    fn poll_recv_many(
311        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
312    ) -> Poll<io::Result<usize>> {
313        let mut read = 0;
314
315        for buf in bufs {
316            match self.poll_recv(cx, buf) {
317                Poll::Ready(Ok(())) => read += 1,
318
319                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
320
321                // Only return `Poll::Ready` if at least one datagram was
322                // successfully read, otherwise block.
323                Poll::Pending if read == 0 => return Poll::Pending,
324                Poll::Pending => break,
325            }
326        }
327
328        Poll::Ready(Ok(read))
329    }
330
331    /// If the underlying socket is a `UdpSocket`, return the reference to it.
332    fn as_udp_socket(&self) -> Option<&UdpSocket> {
333        None
334    }
335}
336
337/// Reads datagrams from a socket.
338///
339/// Implemented as an extension trait, adding utility methods to all
340/// [`DatagramSocketRecv`] types. Callers will tend to import this trait instead
341/// of [`DatagramSocketRecv`].
342///
343/// [`DatagramSocketRecv`]: DatagramSocketRecv
344pub trait DatagramSocketRecvExt: DatagramSocketRecv {
345    /// Receives a single datagram message on the socket from the remote address
346    /// to which it is connected. On success, returns the number of bytes read.
347    fn recv(
348        &mut self, buf: &mut [u8],
349    ) -> impl Future<Output = io::Result<usize>> + Send {
350        poll_fn(|cx| {
351            let mut buf = ReadBuf::new(buf);
352
353            ready!(self.poll_recv(cx, &mut buf)?);
354
355            Poll::Ready(Ok(buf.filled().len()))
356        })
357    }
358
359    /// Receives a single datagram message on the socket. On success, returns
360    /// the number of bytes read and the origin.
361    fn recv_from(
362        &mut self, buf: &mut [u8],
363    ) -> impl Future<Output = io::Result<(usize, SocketAddr)>> + Send {
364        poll_fn(|cx| {
365            let mut buf = ReadBuf::new(buf);
366
367            let addr = ready!(self.poll_recv_from(cx, &mut buf)?);
368
369            Poll::Ready(Ok((buf.filled().len(), addr)))
370        })
371    }
372
373    /// Receives multiple datagrams on the socket from the remote address
374    /// to which it is connected. Returns the number of buffers used (i.e.
375    /// number of datagrams read). Each used buffer can be read up to its
376    /// `filled().len()`.
377    fn recv_many(
378        &mut self, bufs: &mut [ReadBuf<'_>],
379    ) -> impl Future<Output = io::Result<usize>> + Send {
380        poll_fn(|cx| self.poll_recv_many(cx, bufs))
381    }
382}
383
384impl<T: DatagramSocketSend + ?Sized> DatagramSocketSendExt for T {}
385
386impl<T: DatagramSocketRecv + ?Sized> DatagramSocketRecvExt for T {}
387
388/// A convenience method that can be implemented for any type if it wants
389/// to forward its `DatagramSocketSend` functionality to an inner field/socket.
390/// This automatically derives `DatagramSocketSend`.
391pub trait AsDatagramSocketSend {
392    type AsSend: DatagramSocketSend + ?Sized;
393
394    fn as_datagram_socket_send(&self) -> &Self::AsSend;
395}
396
397/// A convenience method that can be implemented for any type if it wants
398/// to forward its `DatagramSocketRecv` functionality to an inner field/socket.
399/// This automatically derives `DatagramSocketRecv`.
400pub trait AsDatagramSocketRecv {
401    type AsRecv: DatagramSocketRecv + ?Sized;
402
403    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv;
404    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv;
405}
406
407impl<T: AsDatagramSocketSend + Sync> DatagramSocketSend for T {
408    #[inline]
409    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
410        self.as_datagram_socket_send().poll_send(cx, buf)
411    }
412
413    #[inline]
414    fn poll_send_to(
415        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
416    ) -> Poll<io::Result<usize>> {
417        self.as_datagram_socket_send().poll_send_to(cx, buf, addr)
418    }
419
420    #[inline]
421    fn poll_send_many(
422        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
423    ) -> Poll<io::Result<usize>> {
424        self.as_datagram_socket_send().poll_send_many(cx, bufs)
425    }
426
427    #[inline]
428    fn as_udp_socket(&self) -> Option<&UdpSocket> {
429        self.as_datagram_socket_send().as_udp_socket()
430    }
431
432    #[inline]
433    fn peer_addr(&self) -> Option<SocketAddr> {
434        self.as_datagram_socket_send().peer_addr()
435    }
436}
437
438impl<T: AsDatagramSocketRecv + Send> DatagramSocketRecv for T {
439    #[inline]
440    fn poll_recv(
441        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
442    ) -> Poll<io::Result<()>> {
443        self.as_datagram_socket_recv().poll_recv(cx, buf)
444    }
445
446    #[inline]
447    fn poll_recv_from(
448        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
449    ) -> Poll<io::Result<SocketAddr>> {
450        self.as_datagram_socket_recv().poll_recv_from(cx, buf)
451    }
452
453    #[inline]
454    fn poll_recv_many(
455        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
456    ) -> Poll<io::Result<usize>> {
457        self.as_datagram_socket_recv().poll_recv_many(cx, bufs)
458    }
459
460    #[inline]
461    fn as_udp_socket(&self) -> Option<&UdpSocket> {
462        self.as_shared_datagram_socket_recv().as_udp_socket()
463    }
464}
465
466impl<T> AsDatagramSocketSend for &mut T
467where
468    T: DatagramSocketSend + Send + ?Sized,
469{
470    type AsSend = T;
471
472    fn as_datagram_socket_send(&self) -> &Self::AsSend {
473        self
474    }
475}
476
477impl<T> AsDatagramSocketSend for Box<T>
478where
479    T: DatagramSocketSend + Send + ?Sized,
480{
481    type AsSend = T;
482
483    fn as_datagram_socket_send(&self) -> &Self::AsSend {
484        self
485    }
486}
487
488impl<T> AsDatagramSocketSend for Arc<T>
489where
490    T: DatagramSocketSend + Send + ?Sized,
491{
492    type AsSend = T;
493
494    fn as_datagram_socket_send(&self) -> &Self::AsSend {
495        self
496    }
497}
498
499impl<T> AsDatagramSocketRecv for &mut T
500where
501    T: DatagramSocketRecv + Send + ?Sized,
502{
503    type AsRecv = T;
504
505    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
506        self
507    }
508
509    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
510        self
511    }
512}
513
514impl<T> AsDatagramSocketRecv for Box<T>
515where
516    T: DatagramSocketRecv + Send + ?Sized,
517{
518    type AsRecv = T;
519
520    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
521        self
522    }
523
524    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
525        self
526    }
527}
528
529impl DatagramSocket for UdpSocket {
530    #[cfg(unix)]
531    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
532        Some(self.as_fd())
533    }
534
535    #[cfg(unix)]
536    fn into_fd(self) -> Option<OwnedFd> {
537        Some(into_owned_fd(self.into_std().ok()?))
538    }
539}
540
541impl DatagramSocketSend for UdpSocket {
542    #[inline]
543    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
544        UdpSocket::poll_send(self, cx, buf)
545    }
546
547    #[inline]
548    fn poll_send_to(
549        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
550    ) -> Poll<io::Result<usize>> {
551        UdpSocket::poll_send_to(self, cx, buf, addr)
552    }
553
554    #[cfg(target_os = "linux")]
555    #[inline]
556    fn poll_send_many(
557        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
558    ) -> Poll<io::Result<usize>> {
559        crate::poll_sendmmsg!(self, cx, bufs)
560    }
561
562    fn as_udp_socket(&self) -> Option<&UdpSocket> {
563        Some(self)
564    }
565
566    fn peer_addr(&self) -> Option<SocketAddr> {
567        self.peer_addr().ok()
568    }
569}
570
571impl DatagramSocketRecv for UdpSocket {
572    #[inline]
573    fn poll_recv(
574        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
575    ) -> Poll<io::Result<()>> {
576        UdpSocket::poll_recv(self, cx, buf)
577    }
578
579    #[cfg(target_os = "linux")]
580    #[inline]
581    fn poll_recv_many(
582        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
583    ) -> Poll<io::Result<usize>> {
584        crate::poll_recvmmsg!(self, cx, bufs)
585    }
586
587    fn as_udp_socket(&self) -> Option<&UdpSocket> {
588        Some(self)
589    }
590}
591
592impl DatagramSocket for Arc<UdpSocket> {
593    #[cfg(unix)]
594    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
595        Some(self.as_fd())
596    }
597
598    #[cfg(unix)]
599    fn into_fd(self) -> Option<OwnedFd> {
600        None
601    }
602}
603
604impl DatagramSocketRecv for Arc<UdpSocket> {
605    #[inline]
606    fn poll_recv(
607        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
608    ) -> Poll<io::Result<()>> {
609        UdpSocket::poll_recv(self, cx, buf)
610    }
611
612    #[inline]
613    fn poll_recv_from(
614        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
615    ) -> Poll<io::Result<SocketAddr>> {
616        UdpSocket::poll_recv_from(self, cx, buf)
617    }
618
619    #[cfg(target_os = "linux")]
620    #[inline]
621    fn poll_recv_many(
622        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
623    ) -> Poll<io::Result<usize>> {
624        crate::poll_recvmmsg!(self, cx, bufs)
625    }
626
627    fn as_udp_socket(&self) -> Option<&UdpSocket> {
628        Some(self)
629    }
630}
631
632#[cfg(unix)]
633impl DatagramSocket for UnixDatagram {
634    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
635        Some(self.as_fd())
636    }
637
638    fn into_fd(self) -> Option<OwnedFd> {
639        Some(into_owned_fd(self.into_std().ok()?))
640    }
641}
642
643#[cfg(unix)]
644impl DatagramSocketSend for UnixDatagram {
645    #[inline]
646    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
647        UnixDatagram::poll_send(self, cx, buf)
648    }
649
650    #[inline]
651    fn poll_send_to(
652        &self, _: &mut Context, _: &[u8], _: SocketAddr,
653    ) -> Poll<io::Result<usize>> {
654        Poll::Ready(Err(io::Error::new(
655            io::ErrorKind::Unsupported,
656            "invalid address family",
657        )))
658    }
659
660    #[cfg(target_os = "linux")]
661    #[inline]
662    fn poll_send_many(
663        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
664    ) -> Poll<io::Result<usize>> {
665        crate::poll_sendmmsg!(self, cx, bufs)
666    }
667}
668
669#[cfg(unix)]
670impl DatagramSocketRecv for UnixDatagram {
671    #[inline]
672    fn poll_recv(
673        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
674    ) -> Poll<io::Result<()>> {
675        UnixDatagram::poll_recv(self, cx, buf)
676    }
677
678    #[cfg(target_os = "linux")]
679    #[inline]
680    fn poll_recv_many(
681        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
682    ) -> Poll<io::Result<usize>> {
683        crate::poll_recvmmsg!(self, cx, bufs)
684    }
685}
686
687#[cfg(unix)]
688impl DatagramSocketRecv for Arc<UnixDatagram> {
689    #[inline]
690    fn poll_recv(
691        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
692    ) -> Poll<io::Result<()>> {
693        UnixDatagram::poll_recv(self, cx, buf)
694    }
695
696    #[cfg(target_os = "linux")]
697    #[inline]
698    fn poll_recv_many(
699        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
700    ) -> Poll<io::Result<usize>> {
701        crate::poll_recvmmsg!(self, cx, bufs)
702    }
703}
704
705/// `Into<OwnedFd>::into` for types (tokio sockets etc) that don't implement
706/// `From<OwnedFd>`.
707#[cfg(unix)]
708fn into_owned_fd<F: IntoRawFd>(into_fd: F) -> OwnedFd {
709    unsafe { OwnedFd::from_raw_fd(into_fd.into_raw_fd()) }
710}
711
712/// A cheap wrapper around a datagram socket which describes if it is connected
713/// to an explicit peer.
714///
715/// This struct essentially forwards its underlying socket's `send_to()` method
716/// to `send()` if the socket is explicitly connected to a peer. This is helpful
717/// for preventing issues on platforms that do not support `send_to` on
718/// already-connected sockets.
719///
720/// # Warning
721/// A socket's "connectedness" is determined once, when it is created. If the
722/// socket is created as connected, then later disconnected from its peer, its
723/// `send_to()` call will fail.
724///
725/// For example, MacOS errors if `send_to` is used on a socket that's already
726/// connected. Only `send` can be used. By using `MaybeConnectedSocket`, you can
727/// use the same `send` and `send_to` APIs in both client- and server-side code.
728/// Clients, usually with connected sockets, will then forward `send_to` to
729/// `send`, whereas servers, usually with unconnected sockets, will use
730/// `send_to`.
731#[derive(Clone)]
732pub struct MaybeConnectedSocket<T> {
733    inner: T,
734    peer: Option<SocketAddr>,
735}
736
737impl<T: DatagramSocketSend> MaybeConnectedSocket<T> {
738    pub fn new(inner: T) -> Self {
739        Self {
740            peer: inner.peer_addr(),
741            inner,
742        }
743    }
744
745    /// Provides access to the wrapped socket, allowing the user to override
746    /// `send_to()` behavior if required.
747    pub fn inner(&self) -> &T {
748        &self.inner
749    }
750
751    /// Consumes `self`, returning the wrapped socket.
752    pub fn into_inner(self) -> T {
753        self.inner
754    }
755}
756
757impl<T: DatagramSocketSend> DatagramSocketSend for MaybeConnectedSocket<T> {
758    #[inline]
759    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
760        self.inner.poll_send(cx, buf)
761    }
762
763    #[inline]
764    fn poll_send_to(
765        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
766    ) -> Poll<io::Result<usize>> {
767        if let Some(peer) = self.peer {
768            debug_assert_eq!(peer, addr);
769            self.inner.poll_send(cx, buf)
770        } else {
771            self.inner.poll_send_to(cx, buf, addr)
772        }
773    }
774
775    #[inline]
776    fn poll_send_many(
777        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
778    ) -> Poll<io::Result<usize>> {
779        self.inner.poll_send_many(cx, bufs)
780    }
781
782    #[inline]
783    fn as_udp_socket(&self) -> Option<&UdpSocket> {
784        self.inner.as_udp_socket()
785    }
786
787    #[inline]
788    fn peer_addr(&self) -> Option<SocketAddr> {
789        self.peer
790    }
791}