1use buffer_pool::RawPoolBufDatagramIo;
28use futures_util::future::poll_fn;
29use futures_util::ready;
30use futures_util::FutureExt;
31use std::future::Future;
32use std::io;
33use std::net::Ipv4Addr;
34use std::net::SocketAddr;
35use std::net::SocketAddrV4;
36use std::sync::Arc;
37use std::task::Context;
38use std::task::Poll;
39use tokio::io::ReadBuf;
40use tokio::net::UdpSocket;
41
42#[cfg(unix)]
43use std::os::fd::AsFd;
44#[cfg(unix)]
45use std::os::fd::BorrowedFd;
46#[cfg(unix)]
47use std::os::fd::FromRawFd;
48#[cfg(unix)]
49use std::os::fd::IntoRawFd;
50#[cfg(unix)]
51use std::os::fd::OwnedFd;
52#[cfg(unix)]
53use tokio::net::UnixDatagram;
54
55use crate::socket_stats::AsSocketStats;
56
57pub const MAX_DATAGRAM_SIZE: usize = 1500;
61
62pub trait DatagramSocketWithStats: DatagramSocket {}
63
64impl<T> DatagramSocketWithStats for T where T: DatagramSocket + AsSocketStats {}
65
66pub trait DatagramSocket:
77 DatagramSocketSend + DatagramSocketRecv + 'static
78{
79 #[cfg(unix)]
80 fn as_raw_io(&self) -> Option<BorrowedFd<'_>>;
81
82 #[cfg(unix)]
83 fn into_fd(self) -> Option<OwnedFd>;
84
85 fn as_buf_io(&mut self) -> Option<&mut dyn RawPoolBufDatagramIo> {
86 None
87 }
88}
89
90pub trait DatagramSocketSend: Sync {
92 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>>;
111
112 fn poll_send_to(
135 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
136 ) -> Poll<io::Result<usize>>;
137
138 fn poll_send_many(
158 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
159 ) -> Poll<io::Result<usize>> {
160 let mut sent = 0;
161
162 for buf in bufs {
163 match self.poll_send(cx, buf.filled()) {
164 Poll::Ready(Ok(_)) => sent += 1,
165 Poll::Ready(err) => {
166 if sent == 0 {
167 return Poll::Ready(err);
168 }
169 break;
170 },
171 Poll::Pending => {
172 if sent == 0 {
173 return Poll::Pending;
174 }
175 break;
176 },
177 }
178 }
179
180 Poll::Ready(Ok(sent))
181 }
182
183 fn as_udp_socket(&self) -> Option<&UdpSocket> {
185 None
186 }
187
188 fn peer_addr(&self) -> Option<SocketAddr> {
191 None
192 }
193}
194
195pub trait DatagramSocketSendExt: DatagramSocketSend {
203 fn send(&self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> {
206 poll_fn(move |cx| self.poll_send(cx, buf))
207 }
208
209 fn send_to(
212 &self, buf: &[u8], addr: SocketAddr,
213 ) -> impl Future<Output = io::Result<usize>> {
214 poll_fn(move |cx| self.poll_send_to(cx, buf, addr))
215 }
216
217 fn send_many(
221 &self, bufs: &[ReadBuf<'_>],
222 ) -> impl Future<Output = io::Result<usize>> {
223 poll_fn(move |cx| self.poll_send_many(cx, bufs))
224 }
225
226 fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
227 match poll_fn(|cx| self.poll_send(cx, buf)).now_or_never() {
228 Some(result) => result,
229 None => Err(io::ErrorKind::WouldBlock.into()),
230 }
231 }
232
233 fn try_send_many(&self, bufs: &[ReadBuf<'_>]) -> io::Result<usize> {
234 match poll_fn(|cx| self.poll_send_many(cx, bufs)).now_or_never() {
235 Some(result) => result,
236 None => Err(io::ErrorKind::WouldBlock.into()),
237 }
238 }
239}
240
241pub trait DatagramSocketRecv: Send {
243 fn poll_recv(
262 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
263 ) -> Poll<io::Result<()>>;
264
265 fn poll_recv_from(
284 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
285 ) -> Poll<io::Result<SocketAddr>> {
286 self.poll_recv(cx, buf).map_ok(|_| {
287 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
288 })
289 }
290
291 fn poll_recv_many(
311 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
312 ) -> Poll<io::Result<usize>> {
313 let mut read = 0;
314
315 for buf in bufs {
316 match self.poll_recv(cx, buf) {
317 Poll::Ready(Ok(())) => read += 1,
318
319 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
320
321 Poll::Pending if read == 0 => return Poll::Pending,
324 Poll::Pending => break,
325 }
326 }
327
328 Poll::Ready(Ok(read))
329 }
330
331 fn as_udp_socket(&self) -> Option<&UdpSocket> {
333 None
334 }
335}
336
337pub trait DatagramSocketRecvExt: DatagramSocketRecv {
345 fn recv(
348 &mut self, buf: &mut [u8],
349 ) -> impl Future<Output = io::Result<usize>> + Send {
350 poll_fn(|cx| {
351 let mut buf = ReadBuf::new(buf);
352
353 ready!(self.poll_recv(cx, &mut buf)?);
354
355 Poll::Ready(Ok(buf.filled().len()))
356 })
357 }
358
359 fn recv_from(
362 &mut self, buf: &mut [u8],
363 ) -> impl Future<Output = io::Result<(usize, SocketAddr)>> + Send {
364 poll_fn(|cx| {
365 let mut buf = ReadBuf::new(buf);
366
367 let addr = ready!(self.poll_recv_from(cx, &mut buf)?);
368
369 Poll::Ready(Ok((buf.filled().len(), addr)))
370 })
371 }
372
373 fn recv_many(
378 &mut self, bufs: &mut [ReadBuf<'_>],
379 ) -> impl Future<Output = io::Result<usize>> + Send {
380 poll_fn(|cx| self.poll_recv_many(cx, bufs))
381 }
382}
383
384impl<T: DatagramSocketSend + ?Sized> DatagramSocketSendExt for T {}
385
386impl<T: DatagramSocketRecv + ?Sized> DatagramSocketRecvExt for T {}
387
388pub trait AsDatagramSocketSend {
392 type AsSend: DatagramSocketSend + ?Sized;
393
394 fn as_datagram_socket_send(&self) -> &Self::AsSend;
395}
396
397pub trait AsDatagramSocketRecv {
401 type AsRecv: DatagramSocketRecv + ?Sized;
402
403 fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv;
404 fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv;
405}
406
407impl<T: AsDatagramSocketSend + Sync> DatagramSocketSend for T {
408 #[inline]
409 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
410 self.as_datagram_socket_send().poll_send(cx, buf)
411 }
412
413 #[inline]
414 fn poll_send_to(
415 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
416 ) -> Poll<io::Result<usize>> {
417 self.as_datagram_socket_send().poll_send_to(cx, buf, addr)
418 }
419
420 #[inline]
421 fn poll_send_many(
422 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
423 ) -> Poll<io::Result<usize>> {
424 self.as_datagram_socket_send().poll_send_many(cx, bufs)
425 }
426
427 #[inline]
428 fn as_udp_socket(&self) -> Option<&UdpSocket> {
429 self.as_datagram_socket_send().as_udp_socket()
430 }
431
432 #[inline]
433 fn peer_addr(&self) -> Option<SocketAddr> {
434 self.as_datagram_socket_send().peer_addr()
435 }
436}
437
438impl<T: AsDatagramSocketRecv + Send> DatagramSocketRecv for T {
439 #[inline]
440 fn poll_recv(
441 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
442 ) -> Poll<io::Result<()>> {
443 self.as_datagram_socket_recv().poll_recv(cx, buf)
444 }
445
446 #[inline]
447 fn poll_recv_from(
448 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
449 ) -> Poll<io::Result<SocketAddr>> {
450 self.as_datagram_socket_recv().poll_recv_from(cx, buf)
451 }
452
453 #[inline]
454 fn poll_recv_many(
455 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
456 ) -> Poll<io::Result<usize>> {
457 self.as_datagram_socket_recv().poll_recv_many(cx, bufs)
458 }
459
460 #[inline]
461 fn as_udp_socket(&self) -> Option<&UdpSocket> {
462 self.as_shared_datagram_socket_recv().as_udp_socket()
463 }
464}
465
466impl<T> AsDatagramSocketSend for &mut T
467where
468 T: DatagramSocketSend + Send + ?Sized,
469{
470 type AsSend = T;
471
472 fn as_datagram_socket_send(&self) -> &Self::AsSend {
473 self
474 }
475}
476
477impl<T> AsDatagramSocketSend for Box<T>
478where
479 T: DatagramSocketSend + Send + ?Sized,
480{
481 type AsSend = T;
482
483 fn as_datagram_socket_send(&self) -> &Self::AsSend {
484 self
485 }
486}
487
488impl<T> AsDatagramSocketSend for Arc<T>
489where
490 T: DatagramSocketSend + Send + ?Sized,
491{
492 type AsSend = T;
493
494 fn as_datagram_socket_send(&self) -> &Self::AsSend {
495 self
496 }
497}
498
499impl<T> AsDatagramSocketRecv for &mut T
500where
501 T: DatagramSocketRecv + Send + ?Sized,
502{
503 type AsRecv = T;
504
505 fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
506 self
507 }
508
509 fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
510 self
511 }
512}
513
514impl<T> AsDatagramSocketRecv for Box<T>
515where
516 T: DatagramSocketRecv + Send + ?Sized,
517{
518 type AsRecv = T;
519
520 fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
521 self
522 }
523
524 fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
525 self
526 }
527}
528
529impl DatagramSocket for UdpSocket {
530 #[cfg(unix)]
531 fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
532 Some(self.as_fd())
533 }
534
535 #[cfg(unix)]
536 fn into_fd(self) -> Option<OwnedFd> {
537 Some(into_owned_fd(self.into_std().ok()?))
538 }
539}
540
541impl DatagramSocketSend for UdpSocket {
542 #[inline]
543 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
544 UdpSocket::poll_send(self, cx, buf)
545 }
546
547 #[inline]
548 fn poll_send_to(
549 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
550 ) -> Poll<io::Result<usize>> {
551 UdpSocket::poll_send_to(self, cx, buf, addr)
552 }
553
554 #[cfg(target_os = "linux")]
555 #[inline]
556 fn poll_send_many(
557 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
558 ) -> Poll<io::Result<usize>> {
559 crate::poll_sendmmsg!(self, cx, bufs)
560 }
561
562 fn as_udp_socket(&self) -> Option<&UdpSocket> {
563 Some(self)
564 }
565
566 fn peer_addr(&self) -> Option<SocketAddr> {
567 self.peer_addr().ok()
568 }
569}
570
571impl DatagramSocketRecv for UdpSocket {
572 #[inline]
573 fn poll_recv(
574 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
575 ) -> Poll<io::Result<()>> {
576 UdpSocket::poll_recv(self, cx, buf)
577 }
578
579 #[cfg(target_os = "linux")]
580 #[inline]
581 fn poll_recv_many(
582 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
583 ) -> Poll<io::Result<usize>> {
584 crate::poll_recvmmsg!(self, cx, bufs)
585 }
586
587 fn as_udp_socket(&self) -> Option<&UdpSocket> {
588 Some(self)
589 }
590}
591
592impl DatagramSocket for Arc<UdpSocket> {
593 #[cfg(unix)]
594 fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
595 Some(self.as_fd())
596 }
597
598 #[cfg(unix)]
599 fn into_fd(self) -> Option<OwnedFd> {
600 None
601 }
602}
603
604impl DatagramSocketRecv for Arc<UdpSocket> {
605 #[inline]
606 fn poll_recv(
607 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
608 ) -> Poll<io::Result<()>> {
609 UdpSocket::poll_recv(self, cx, buf)
610 }
611
612 #[inline]
613 fn poll_recv_from(
614 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
615 ) -> Poll<io::Result<SocketAddr>> {
616 UdpSocket::poll_recv_from(self, cx, buf)
617 }
618
619 #[cfg(target_os = "linux")]
620 #[inline]
621 fn poll_recv_many(
622 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
623 ) -> Poll<io::Result<usize>> {
624 crate::poll_recvmmsg!(self, cx, bufs)
625 }
626
627 fn as_udp_socket(&self) -> Option<&UdpSocket> {
628 Some(self)
629 }
630}
631
632#[cfg(unix)]
633impl DatagramSocket for UnixDatagram {
634 fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
635 Some(self.as_fd())
636 }
637
638 fn into_fd(self) -> Option<OwnedFd> {
639 Some(into_owned_fd(self.into_std().ok()?))
640 }
641}
642
643#[cfg(unix)]
644impl DatagramSocketSend for UnixDatagram {
645 #[inline]
646 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
647 UnixDatagram::poll_send(self, cx, buf)
648 }
649
650 #[inline]
651 fn poll_send_to(
652 &self, _: &mut Context, _: &[u8], _: SocketAddr,
653 ) -> Poll<io::Result<usize>> {
654 Poll::Ready(Err(io::Error::new(
655 io::ErrorKind::Unsupported,
656 "invalid address family",
657 )))
658 }
659
660 #[cfg(target_os = "linux")]
661 #[inline]
662 fn poll_send_many(
663 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
664 ) -> Poll<io::Result<usize>> {
665 crate::poll_sendmmsg!(self, cx, bufs)
666 }
667}
668
669#[cfg(unix)]
670impl DatagramSocketRecv for UnixDatagram {
671 #[inline]
672 fn poll_recv(
673 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
674 ) -> Poll<io::Result<()>> {
675 UnixDatagram::poll_recv(self, cx, buf)
676 }
677
678 #[cfg(target_os = "linux")]
679 #[inline]
680 fn poll_recv_many(
681 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
682 ) -> Poll<io::Result<usize>> {
683 crate::poll_recvmmsg!(self, cx, bufs)
684 }
685}
686
687#[cfg(unix)]
688impl DatagramSocketRecv for Arc<UnixDatagram> {
689 #[inline]
690 fn poll_recv(
691 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
692 ) -> Poll<io::Result<()>> {
693 UnixDatagram::poll_recv(self, cx, buf)
694 }
695
696 #[cfg(target_os = "linux")]
697 #[inline]
698 fn poll_recv_many(
699 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
700 ) -> Poll<io::Result<usize>> {
701 crate::poll_recvmmsg!(self, cx, bufs)
702 }
703}
704
705#[cfg(unix)]
708fn into_owned_fd<F: IntoRawFd>(into_fd: F) -> OwnedFd {
709 unsafe { OwnedFd::from_raw_fd(into_fd.into_raw_fd()) }
710}
711
712#[derive(Clone)]
732pub struct MaybeConnectedSocket<T> {
733 inner: T,
734 peer: Option<SocketAddr>,
735}
736
737impl<T: DatagramSocketSend> MaybeConnectedSocket<T> {
738 pub fn new(inner: T) -> Self {
739 Self {
740 peer: inner.peer_addr(),
741 inner,
742 }
743 }
744
745 pub fn inner(&self) -> &T {
748 &self.inner
749 }
750
751 pub fn into_inner(self) -> T {
753 self.inner
754 }
755}
756
757impl<T: DatagramSocketSend> DatagramSocketSend for MaybeConnectedSocket<T> {
758 #[inline]
759 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
760 self.inner.poll_send(cx, buf)
761 }
762
763 #[inline]
764 fn poll_send_to(
765 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
766 ) -> Poll<io::Result<usize>> {
767 if let Some(peer) = self.peer {
768 debug_assert_eq!(peer, addr);
769 self.inner.poll_send(cx, buf)
770 } else {
771 self.inner.poll_send_to(cx, buf, addr)
772 }
773 }
774
775 #[inline]
776 fn poll_send_many(
777 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
778 ) -> Poll<io::Result<usize>> {
779 self.inner.poll_send_many(cx, bufs)
780 }
781
782 #[inline]
783 fn as_udp_socket(&self) -> Option<&UdpSocket> {
784 self.inner.as_udp_socket()
785 }
786
787 #[inline]
788 fn peer_addr(&self) -> Option<SocketAddr> {
789 self.peer
790 }
791}