#[cfg(unix)]
use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd};
#[cfg(windows)]
use std::os::windows::prelude::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
use std::{
io,
net::{SocketAddr, ToSocketAddrs},
};
use crate::{
buf::{IoBuf, IoBufMut},
driver::{op::Op, shared_fd::SharedFd},
io::{operation_canceled, CancelHandle, Split},
};
#[derive(Debug)]
pub struct UdpSocket {
fd: SharedFd,
}
unsafe impl Split for UdpSocket {}
impl UdpSocket {
pub(crate) fn from_shared_fd(fd: SharedFd) -> Self {
Self { fd }
}
#[cfg(feature = "legacy")]
fn set_non_blocking(_socket: &socket2::Socket) -> io::Result<()> {
crate::driver::CURRENT.with(|x| match x {
#[cfg(all(target_os = "linux", feature = "iouring"))]
crate::driver::Inner::Uring(_) => Ok(()),
crate::driver::Inner::Legacy(_) => _socket.set_nonblocking(true),
})
}
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
let addr = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?;
let domain = if addr.is_ipv6() {
socket2::Domain::IPV6
} else {
socket2::Domain::IPV4
};
let socket =
socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?;
#[cfg(feature = "legacy")]
Self::set_non_blocking(&socket)?;
let addr = socket2::SockAddr::from(addr);
socket.bind(&addr)?;
#[cfg(unix)]
let fd = socket.into_raw_fd();
#[cfg(windows)]
let fd = socket.into_raw_socket();
Ok(Self::from_shared_fd(SharedFd::new::<false>(fd)?))
}
pub async fn recv_from<T: IoBufMut>(&self, buf: T) -> crate::BufResult<(usize, SocketAddr), T> {
let op = Op::recv_msg(self.fd.clone(), buf).unwrap();
op.wait().await
}
pub async fn send_to<T: IoBuf>(
&self,
buf: T,
socket_addr: SocketAddr,
) -> crate::BufResult<usize, T> {
let op = Op::send_msg(self.fd.clone(), buf, Some(socket_addr)).unwrap();
op.wait().await
}
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
#[cfg(unix)]
let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
#[cfg(windows)]
let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
let addr = socket.peer_addr();
#[cfg(unix)]
let _ = socket.into_raw_fd();
#[cfg(windows)]
let _ = socket.into_raw_socket();
addr?
.as_socket()
.ok_or_else(|| io::ErrorKind::InvalidInput.into())
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
#[cfg(unix)]
let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
#[cfg(windows)]
let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
let addr = socket.local_addr();
#[cfg(unix)]
let _ = socket.into_raw_fd();
#[cfg(windows)]
let _ = socket.into_raw_socket();
addr?
.as_socket()
.ok_or_else(|| io::ErrorKind::InvalidInput.into())
}
pub async fn connect(&self, socket_addr: SocketAddr) -> io::Result<()> {
let op = Op::connect(self.fd.clone(), socket_addr, false)?;
let completion = op.await;
completion.meta.result?;
Ok(())
}
pub async fn send<T: IoBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
let op = Op::send_msg(self.fd.clone(), buf, None).unwrap();
op.wait().await
}
pub async fn recv<T: IoBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
let op = Op::recv(self.fd.clone(), buf).unwrap();
op.read().await
}
pub fn from_std(socket: std::net::UdpSocket) -> io::Result<Self> {
#[cfg(unix)]
let fd = socket.as_raw_fd();
#[cfg(windows)]
let fd = socket.as_raw_socket();
match SharedFd::new::<false>(fd) {
Ok(shared) => {
#[cfg(unix)]
let _ = socket.into_raw_fd();
#[cfg(windows)]
let _ = socket.into_raw_socket();
Ok(Self::from_shared_fd(shared))
}
Err(e) => Err(e),
}
}
#[allow(unused_variables)]
pub fn set_reuse_address(&self, reuse: bool) -> io::Result<()> {
#[cfg(unix)]
let r = {
let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
let r = socket.set_reuse_address(reuse);
let _ = socket.into_raw_fd();
r
};
#[cfg(windows)]
let r = {
let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
let _ = socket.into_raw_socket();
Ok(())
};
r
}
#[allow(unused_variables)]
pub fn set_reuse_port(&self, reuse: bool) -> io::Result<()> {
#[cfg(unix)]
let r = {
let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
let r = socket.set_reuse_port(reuse);
let _ = socket.into_raw_fd();
r
};
#[cfg(windows)]
let r = {
let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
let _ = socket.into_raw_socket();
Ok(())
};
r
}
pub async fn readable(&self, relaxed: bool) -> io::Result<()> {
let op = Op::poll_read(&self.fd, relaxed).unwrap();
op.wait().await
}
pub async fn writable(&self, relaxed: bool) -> io::Result<()> {
let op = Op::poll_write(&self.fd, relaxed).unwrap();
op.wait().await
}
}
#[cfg(unix)]
impl AsRawFd for UdpSocket {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
self.fd.raw_fd()
}
}
#[cfg(windows)]
impl AsRawSocket for UdpSocket {
fn as_raw_socket(&self) -> RawSocket {
self.fd.raw_socket()
}
}
impl UdpSocket {
pub async fn cancelable_recv_from<T: IoBufMut>(
&self,
buf: T,
c: CancelHandle,
) -> crate::BufResult<(usize, SocketAddr), T> {
if c.canceled() {
return (Err(operation_canceled()), buf);
}
let op = Op::recv_msg(self.fd.clone(), buf).unwrap();
let _guard = c.associate_op(op.op_canceller());
op.wait().await
}
pub async fn cancelable_send_to<T: IoBuf>(
&self,
buf: T,
socket_addr: SocketAddr,
c: CancelHandle,
) -> crate::BufResult<usize, T> {
if c.canceled() {
return (Err(operation_canceled()), buf);
}
let op = Op::send_msg(self.fd.clone(), buf, Some(socket_addr)).unwrap();
let _guard = c.associate_op(op.op_canceller());
op.wait().await
}
pub async fn cancelable_send<T: IoBuf>(
&self,
buf: T,
c: CancelHandle,
) -> crate::BufResult<usize, T> {
if c.canceled() {
return (Err(operation_canceled()), buf);
}
let op = Op::send_msg(self.fd.clone(), buf, None).unwrap();
let _guard = c.associate_op(op.op_canceller());
op.wait().await
}
pub async fn cancelable_recv<T: IoBufMut>(
&self,
buf: T,
c: CancelHandle,
) -> crate::BufResult<usize, T> {
if c.canceled() {
return (Err(operation_canceled()), buf);
}
let op = Op::recv(self.fd.clone(), buf).unwrap();
let _guard = c.associate_op(op.op_canceller());
op.read().await
}
}