use futures_util::future::poll_fn;
use futures_util::ready;
use futures_util::FutureExt;
use std::future::Future;
use std::io;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use tokio::io::ReadBuf;
use tokio::net::UdpSocket;
use tokio::task::coop::unconstrained;
#[cfg(unix)]
use std::os::fd::AsFd;
#[cfg(unix)]
use std::os::fd::BorrowedFd;
#[cfg(unix)]
use std::os::fd::FromRawFd;
#[cfg(unix)]
use std::os::fd::IntoRawFd;
#[cfg(unix)]
use std::os::fd::OwnedFd;
#[cfg(unix)]
use tokio::net::UnixDatagram;
use crate::socket_stats::AsSocketStats;
pub const MAX_DATAGRAM_SIZE: usize = 1500;
pub trait DatagramSocketWithStats: DatagramSocket {}
impl<T> DatagramSocketWithStats for T where T: DatagramSocket + AsSocketStats {}
pub trait RawPoolBufDatagramIo: Send {
fn poll_send_datagrams(
&mut self, cx: &mut Context, datagrams: &mut [crate::DgramBuffer],
) -> Poll<io::Result<usize>>;
fn poll_recv_dgram(
&mut self, cx: &mut Context,
) -> Poll<io::Result<crate::DgramBuffer>>;
fn poll_recv_datagrams(
&mut self, cx: &mut Context, buffer: &mut Vec<crate::DgramBuffer>,
dgram_limit: usize,
) -> Poll<io::Result<usize>> {
for i in 0..dgram_limit {
match self.poll_recv_dgram(cx) {
Poll::Ready(Ok(buf)) => buffer.push(buf),
Poll::Ready(Err(err)) =>
if i > 0 {
return Poll::Ready(Ok(i));
} else {
return Poll::Ready(Err(err));
},
Poll::Pending =>
if i > 0 {
return Poll::Ready(Ok(i));
} else {
return Poll::Pending;
},
}
}
Poll::Ready(Ok(dgram_limit))
}
}
pub trait DatagramSocket:
DatagramSocketSend + DatagramSocketRecv + 'static
{
#[cfg(unix)]
fn as_raw_io(&self) -> Option<BorrowedFd<'_>>;
#[cfg(unix)]
fn into_fd(self) -> Option<OwnedFd>;
fn as_buf_io(&mut self) -> Option<&mut dyn RawPoolBufDatagramIo> {
None
}
}
pub trait DatagramSocketSend: Sync {
fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>>;
fn poll_send_to(
&self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
) -> Poll<io::Result<usize>>;
fn poll_send_many(
&self, cx: &mut Context, bufs: &[ReadBuf<'_>],
) -> Poll<io::Result<usize>> {
let mut sent = 0;
for buf in bufs {
match self.poll_send(cx, buf.filled()) {
Poll::Ready(Ok(_)) => sent += 1,
Poll::Ready(err) => {
if sent == 0 {
return Poll::Ready(err);
}
break;
},
Poll::Pending => {
if sent == 0 {
return Poll::Pending;
}
break;
},
}
}
Poll::Ready(Ok(sent))
}
fn as_udp_socket(&self) -> Option<&UdpSocket> {
None
}
fn peer_addr(&self) -> Option<SocketAddr> {
None
}
}
pub trait DatagramSocketSendExt: DatagramSocketSend {
fn send(&self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> {
poll_fn(move |cx| self.poll_send(cx, buf))
}
fn send_to(
&self, buf: &[u8], addr: SocketAddr,
) -> impl Future<Output = io::Result<usize>> {
poll_fn(move |cx| self.poll_send_to(cx, buf, addr))
}
fn send_many(
&self, bufs: &[ReadBuf<'_>],
) -> impl Future<Output = io::Result<usize>> {
poll_fn(move |cx| self.poll_send_many(cx, bufs))
}
fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
match unconstrained(poll_fn(|cx| self.poll_send(cx, buf))).now_or_never()
{
Some(result) => result,
None => Err(io::ErrorKind::WouldBlock.into()),
}
}
fn try_send_many(&self, bufs: &[ReadBuf<'_>]) -> io::Result<usize> {
match unconstrained(poll_fn(|cx| self.poll_send_many(cx, bufs)))
.now_or_never()
{
Some(result) => result,
None => Err(io::ErrorKind::WouldBlock.into()),
}
}
}
pub trait DatagramSocketRecv: Send {
fn poll_recv(
&mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>;
fn poll_recv_from(
&mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<SocketAddr>> {
self.poll_recv(cx, buf).map_ok(|_| {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
})
}
fn poll_recv_many(
&mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
) -> Poll<io::Result<usize>> {
let mut read = 0;
for buf in bufs {
match self.poll_recv(cx, buf) {
Poll::Ready(Ok(())) => read += 1,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending if read == 0 => return Poll::Pending,
Poll::Pending => break,
}
}
Poll::Ready(Ok(read))
}
fn as_udp_socket(&self) -> Option<&UdpSocket> {
None
}
}
pub trait DatagramSocketRecvExt: DatagramSocketRecv {
fn recv(
&mut self, buf: &mut [u8],
) -> impl Future<Output = io::Result<usize>> + Send {
poll_fn(|cx| {
let mut buf = ReadBuf::new(buf);
ready!(self.poll_recv(cx, &mut buf)?);
Poll::Ready(Ok(buf.filled().len()))
})
}
fn recv_from(
&mut self, buf: &mut [u8],
) -> impl Future<Output = io::Result<(usize, SocketAddr)>> + Send {
poll_fn(|cx| {
let mut buf = ReadBuf::new(buf);
let addr = ready!(self.poll_recv_from(cx, &mut buf)?);
Poll::Ready(Ok((buf.filled().len(), addr)))
})
}
fn recv_many(
&mut self, bufs: &mut [ReadBuf<'_>],
) -> impl Future<Output = io::Result<usize>> + Send {
poll_fn(|cx| self.poll_recv_many(cx, bufs))
}
}
impl<T: DatagramSocketSend + ?Sized> DatagramSocketSendExt for T {}
impl<T: DatagramSocketRecv + ?Sized> DatagramSocketRecvExt for T {}
pub trait AsDatagramSocketSend {
type AsSend: DatagramSocketSend + ?Sized;
fn as_datagram_socket_send(&self) -> &Self::AsSend;
}
pub trait AsDatagramSocketRecv {
type AsRecv: DatagramSocketRecv + ?Sized;
fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv;
fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv;
}
impl<T: AsDatagramSocketSend + Sync> DatagramSocketSend for T {
#[inline]
fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
self.as_datagram_socket_send().poll_send(cx, buf)
}
#[inline]
fn poll_send_to(
&self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
) -> Poll<io::Result<usize>> {
self.as_datagram_socket_send().poll_send_to(cx, buf, addr)
}
#[inline]
fn poll_send_many(
&self, cx: &mut Context, bufs: &[ReadBuf<'_>],
) -> Poll<io::Result<usize>> {
self.as_datagram_socket_send().poll_send_many(cx, bufs)
}
#[inline]
fn as_udp_socket(&self) -> Option<&UdpSocket> {
self.as_datagram_socket_send().as_udp_socket()
}
#[inline]
fn peer_addr(&self) -> Option<SocketAddr> {
self.as_datagram_socket_send().peer_addr()
}
}
impl<T: AsDatagramSocketRecv + Send> DatagramSocketRecv for T {
#[inline]
fn poll_recv(
&mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.as_datagram_socket_recv().poll_recv(cx, buf)
}
#[inline]
fn poll_recv_from(
&mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<SocketAddr>> {
self.as_datagram_socket_recv().poll_recv_from(cx, buf)
}
#[inline]
fn poll_recv_many(
&mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
) -> Poll<io::Result<usize>> {
self.as_datagram_socket_recv().poll_recv_many(cx, bufs)
}
#[inline]
fn as_udp_socket(&self) -> Option<&UdpSocket> {
self.as_shared_datagram_socket_recv().as_udp_socket()
}
}
impl<T> AsDatagramSocketSend for &mut T
where
T: DatagramSocketSend + Send + ?Sized,
{
type AsSend = T;
fn as_datagram_socket_send(&self) -> &Self::AsSend {
self
}
}
impl<T> AsDatagramSocketSend for Box<T>
where
T: DatagramSocketSend + Send + ?Sized,
{
type AsSend = T;
fn as_datagram_socket_send(&self) -> &Self::AsSend {
self
}
}
impl<T> AsDatagramSocketSend for Arc<T>
where
T: DatagramSocketSend + Send + ?Sized,
{
type AsSend = T;
fn as_datagram_socket_send(&self) -> &Self::AsSend {
self
}
}
impl<T> AsDatagramSocketRecv for &mut T
where
T: DatagramSocketRecv + Send + ?Sized,
{
type AsRecv = T;
fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
self
}
fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
self
}
}
impl<T> AsDatagramSocketRecv for Box<T>
where
T: DatagramSocketRecv + Send + ?Sized,
{
type AsRecv = T;
fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
self
}
fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
self
}
}
impl DatagramSocket for UdpSocket {
#[cfg(unix)]
fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
Some(self.as_fd())
}
#[cfg(unix)]
fn into_fd(self) -> Option<OwnedFd> {
Some(into_owned_fd(self.into_std().ok()?))
}
}
impl DatagramSocketSend for UdpSocket {
#[inline]
fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
UdpSocket::poll_send(self, cx, buf)
}
#[inline]
fn poll_send_to(
&self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
) -> Poll<io::Result<usize>> {
UdpSocket::poll_send_to(self, cx, buf, addr)
}
#[cfg(target_os = "linux")]
#[inline]
fn poll_send_many(
&self, cx: &mut Context, bufs: &[ReadBuf<'_>],
) -> Poll<io::Result<usize>> {
crate::poll_sendmmsg!(self, cx, bufs)
}
fn as_udp_socket(&self) -> Option<&UdpSocket> {
Some(self)
}
fn peer_addr(&self) -> Option<SocketAddr> {
self.peer_addr().ok()
}
}
impl DatagramSocketRecv for UdpSocket {
#[inline]
fn poll_recv(
&mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
UdpSocket::poll_recv(self, cx, buf)
}
#[cfg(target_os = "linux")]
#[inline]
fn poll_recv_many(
&mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
) -> Poll<io::Result<usize>> {
crate::poll_recvmmsg!(self, cx, bufs)
}
fn as_udp_socket(&self) -> Option<&UdpSocket> {
Some(self)
}
}
impl DatagramSocket for Arc<UdpSocket> {
#[cfg(unix)]
fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
Some(self.as_fd())
}
#[cfg(unix)]
fn into_fd(self) -> Option<OwnedFd> {
None
}
}
impl DatagramSocketRecv for Arc<UdpSocket> {
#[inline]
fn poll_recv(
&mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
UdpSocket::poll_recv(self, cx, buf)
}
#[inline]
fn poll_recv_from(
&mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<SocketAddr>> {
UdpSocket::poll_recv_from(self, cx, buf)
}
#[cfg(target_os = "linux")]
#[inline]
fn poll_recv_many(
&mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
) -> Poll<io::Result<usize>> {
crate::poll_recvmmsg!(self, cx, bufs)
}
fn as_udp_socket(&self) -> Option<&UdpSocket> {
Some(self)
}
}
#[cfg(unix)]
impl DatagramSocket for UnixDatagram {
fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
Some(self.as_fd())
}
fn into_fd(self) -> Option<OwnedFd> {
Some(into_owned_fd(self.into_std().ok()?))
}
}
#[cfg(unix)]
impl DatagramSocketSend for UnixDatagram {
#[inline]
fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
UnixDatagram::poll_send(self, cx, buf)
}
#[inline]
fn poll_send_to(
&self, _: &mut Context, _: &[u8], _: SocketAddr,
) -> Poll<io::Result<usize>> {
Poll::Ready(Err(io::Error::new(
io::ErrorKind::Unsupported,
"invalid address family",
)))
}
#[cfg(target_os = "linux")]
#[inline]
fn poll_send_many(
&self, cx: &mut Context, bufs: &[ReadBuf<'_>],
) -> Poll<io::Result<usize>> {
crate::poll_sendmmsg!(self, cx, bufs)
}
}
#[cfg(unix)]
impl DatagramSocketRecv for UnixDatagram {
#[inline]
fn poll_recv(
&mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
UnixDatagram::poll_recv(self, cx, buf)
}
#[cfg(target_os = "linux")]
#[inline]
fn poll_recv_many(
&mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
) -> Poll<io::Result<usize>> {
crate::poll_recvmmsg!(self, cx, bufs)
}
}
#[cfg(unix)]
impl DatagramSocketRecv for Arc<UnixDatagram> {
#[inline]
fn poll_recv(
&mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
UnixDatagram::poll_recv(self, cx, buf)
}
#[cfg(target_os = "linux")]
#[inline]
fn poll_recv_many(
&mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
) -> Poll<io::Result<usize>> {
crate::poll_recvmmsg!(self, cx, bufs)
}
}
#[cfg(unix)]
fn into_owned_fd<F: IntoRawFd>(into_fd: F) -> OwnedFd {
unsafe { OwnedFd::from_raw_fd(into_fd.into_raw_fd()) }
}
#[derive(Clone)]
pub struct MaybeConnectedSocket<T> {
inner: T,
peer: Option<SocketAddr>,
}
impl<T: DatagramSocketSend> MaybeConnectedSocket<T> {
pub fn new(inner: T) -> Self {
Self {
peer: inner.peer_addr(),
inner,
}
}
pub fn inner(&self) -> &T {
&self.inner
}
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T: DatagramSocketSend> DatagramSocketSend for MaybeConnectedSocket<T> {
#[inline]
fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
self.inner.poll_send(cx, buf)
}
#[inline]
fn poll_send_to(
&self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
) -> Poll<io::Result<usize>> {
if let Some(peer) = self.peer {
debug_assert_eq!(peer, addr);
self.inner.poll_send(cx, buf)
} else {
self.inner.poll_send_to(cx, buf, addr)
}
}
#[inline]
fn poll_send_many(
&self, cx: &mut Context, bufs: &[ReadBuf<'_>],
) -> Poll<io::Result<usize>> {
self.inner.poll_send_many(cx, bufs)
}
#[inline]
fn as_udp_socket(&self) -> Option<&UdpSocket> {
self.inner.as_udp_socket()
}
#[inline]
fn peer_addr(&self) -> Option<SocketAddr> {
self.peer
}
}