#[cfg(not(target_arch = "wasm32"))]
use crate::cx::Cx;
#[cfg(not(target_arch = "wasm32"))]
use crate::net::lookup_all;
use crate::runtime::io_driver::IoRegistration;
use crate::runtime::reactor::Interest;
use crate::stream::Stream;
use std::io;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket as StdUdpSocket};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[cfg(target_arch = "wasm32")]
#[inline]
fn browser_udp_unsupported(op: &str) -> io::Error {
io::Error::new(
io::ErrorKind::Unsupported,
format!("{op} is unavailable in wasm-browser profiles; use browser transport bindings"),
)
}
#[cfg(target_arch = "wasm32")]
#[inline]
fn browser_udp_unsupported_result<T>(op: &str) -> io::Result<T> {
Err(browser_udp_unsupported(op))
}
#[cfg(target_arch = "wasm32")]
#[inline]
fn browser_udp_poll_unsupported<T>(op: &str) -> Poll<io::Result<T>> {
Poll::Ready(Err(browser_udp_unsupported(op)))
}
#[cfg(not(target_arch = "wasm32"))]
#[inline]
fn empty_udp_receive_buffer_error(op: &str) -> io::Error {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("UdpSocket::{op} requires a non-empty buffer"),
)
}
#[derive(Debug)]
pub struct UdpSocket {
registration: Option<IoRegistration>,
inner: Arc<StdUdpSocket>,
}
impl UdpSocket {
pub async fn bind<A: ToSocketAddrs + Send + 'static>(addr: A) -> io::Result<Self> {
#[cfg(target_arch = "wasm32")]
{
let _ = addr;
return browser_udp_unsupported_result("UdpSocket::bind");
}
#[cfg(not(target_arch = "wasm32"))]
{
let addrs = lookup_all(addr).await?;
if addrs.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"no socket addresses found",
));
}
let mut last_err = None;
for addr in addrs {
match StdUdpSocket::bind(addr) {
Ok(socket) => {
socket.set_nonblocking(true)?;
return Ok(Self {
inner: Arc::new(socket),
registration: None,
});
}
Err(err) => last_err = Some(err),
}
}
Err(last_err.unwrap_or_else(|| io::Error::other("failed to bind any address")))
}
}
pub async fn connect<A: ToSocketAddrs + Send + 'static>(&self, addr: A) -> io::Result<()> {
#[cfg(target_arch = "wasm32")]
{
let _ = addr;
return browser_udp_unsupported_result("UdpSocket::connect");
}
#[cfg(not(target_arch = "wasm32"))]
{
let addrs = lookup_all(addr).await?;
if addrs.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"no socket addresses found",
));
}
let mut last_err = None;
for addr in addrs {
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled"));
}
match self.inner.connect(addr) {
Ok(()) => return Ok(()),
Err(err) => last_err = Some(err),
}
}
Err(last_err.unwrap_or_else(|| io::Error::other("failed to connect to any address")))
}
}
pub async fn send_to<A: ToSocketAddrs + Send + 'static>(
&mut self,
buf: &[u8],
target: A,
) -> io::Result<usize> {
#[cfg(target_arch = "wasm32")]
{
let _ = (buf, target);
return browser_udp_unsupported_result("UdpSocket::send_to");
}
#[cfg(not(target_arch = "wasm32"))]
{
let addrs = lookup_all(target).await?;
if addrs.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"no socket addresses found",
));
}
std::future::poll_fn(|cx| self.poll_send_to(cx, buf, &addrs)).await
}
}
fn poll_send_to(
&mut self,
cx: &Context<'_>,
buf: &[u8],
addrs: &[SocketAddr],
) -> Poll<io::Result<usize>> {
#[cfg(target_arch = "wasm32")]
{
let _ = (self, cx, buf, addrs);
return browser_udp_poll_unsupported("UdpSocket::poll_send_to");
}
#[cfg(not(target_arch = "wasm32"))]
{
let mut last_err = None;
for addr in addrs {
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Interrupted,
"cancelled",
)));
}
match self.inner.send_to(buf, addr) {
Ok(n) => return Poll::Ready(Ok(n)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Err(err) = self.register_interest(cx, Interest::WRITABLE) {
return Poll::Ready(Err(err));
}
return Poll::Pending;
}
Err(e) => last_err = Some(e),
}
}
Poll::Ready(Err(last_err.unwrap_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "no addresses to send to")
})))
}
}
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
#[cfg(target_arch = "wasm32")]
{
let _ = buf;
return browser_udp_unsupported_result("UdpSocket::recv_from");
}
#[cfg(not(target_arch = "wasm32"))]
std::future::poll_fn(|cx| self.poll_recv_from(cx, buf)).await
}
pub fn poll_recv_from(
&mut self,
cx: &Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<(usize, SocketAddr)>> {
#[cfg(target_arch = "wasm32")]
{
let _ = (self, cx, buf);
return browser_udp_poll_unsupported("UdpSocket::poll_recv_from");
}
#[cfg(not(target_arch = "wasm32"))]
if buf.is_empty() {
return Poll::Ready(Err(empty_udp_receive_buffer_error("recv_from")));
}
#[cfg(not(target_arch = "wasm32"))]
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
match self.inner.recv_from(buf) {
Ok(res) => Poll::Ready(Ok(res)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Err(err) = self.register_interest(cx, Interest::READABLE) {
return Poll::Ready(Err(err));
}
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
#[cfg(target_arch = "wasm32")]
{
let _ = buf;
return browser_udp_unsupported_result("UdpSocket::send");
}
#[cfg(not(target_arch = "wasm32"))]
std::future::poll_fn(|cx| self.poll_send(cx, buf)).await
}
pub fn poll_send(&mut self, cx: &Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
#[cfg(target_arch = "wasm32")]
{
let _ = (self, cx, buf);
return browser_udp_poll_unsupported("UdpSocket::poll_send");
}
#[cfg(not(target_arch = "wasm32"))]
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
match self.inner.send(buf) {
Ok(n) => Poll::Ready(Ok(n)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Err(err) = self.register_interest(cx, Interest::WRITABLE) {
return Poll::Ready(Err(err));
}
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
#[cfg(target_arch = "wasm32")]
{
let _ = buf;
return browser_udp_unsupported_result("UdpSocket::recv");
}
#[cfg(not(target_arch = "wasm32"))]
std::future::poll_fn(|cx| self.poll_recv(cx, buf)).await
}
pub fn poll_recv(&mut self, cx: &Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
#[cfg(target_arch = "wasm32")]
{
let _ = (self, cx, buf);
return browser_udp_poll_unsupported("UdpSocket::poll_recv");
}
#[cfg(not(target_arch = "wasm32"))]
if buf.is_empty() {
return Poll::Ready(Err(empty_udp_receive_buffer_error("recv")));
}
#[cfg(not(target_arch = "wasm32"))]
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
match self.inner.recv(buf) {
Ok(n) => Poll::Ready(Ok(n)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Err(err) = self.register_interest(cx, Interest::READABLE) {
return Poll::Ready(Err(err));
}
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
pub async fn peek_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
#[cfg(target_arch = "wasm32")]
{
let _ = buf;
return browser_udp_unsupported_result("UdpSocket::peek_from");
}
#[cfg(not(target_arch = "wasm32"))]
std::future::poll_fn(|cx| self.poll_peek_from(cx, buf)).await
}
pub fn poll_peek_from(
&mut self,
cx: &Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<(usize, SocketAddr)>> {
#[cfg(target_arch = "wasm32")]
{
let _ = (self, cx, buf);
return browser_udp_poll_unsupported("UdpSocket::poll_peek_from");
}
#[cfg(not(target_arch = "wasm32"))]
if buf.is_empty() {
return Poll::Ready(Err(empty_udp_receive_buffer_error("peek_from")));
}
#[cfg(not(target_arch = "wasm32"))]
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
match self.inner.peek_from(buf) {
Ok(res) => Poll::Ready(Ok(res)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Err(err) = self.register_interest(cx, Interest::READABLE) {
return Poll::Ready(Err(err));
}
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
#[inline]
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.local_addr()
}
#[inline]
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner.peer_addr()
}
#[inline]
pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
self.inner.set_broadcast(on)
}
#[inline]
pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> {
self.inner.set_multicast_loop_v4(on)
}
#[inline]
pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> {
self.inner.join_multicast_v4(&multiaddr, &interface)
}
#[inline]
pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> {
self.inner.leave_multicast_v4(&multiaddr, &interface)
}
#[inline]
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.inner.set_ttl(ttl)
}
#[inline]
pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> {
self.inner.join_multicast_v6(multiaddr, interface)
}
#[inline]
pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> {
self.inner.leave_multicast_v6(multiaddr, interface)
}
#[inline]
pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> {
self.inner.set_multicast_ttl_v4(ttl)
}
#[must_use]
pub fn recv_stream(&mut self, buf_size: usize) -> RecvStream<'_> {
RecvStream::new(self, buf_size)
}
#[must_use]
pub fn send_sink(&mut self) -> SendSink<'_> {
SendSink::new(self)
}
pub fn try_clone(&self) -> io::Result<Self> {
Ok(Self {
inner: Arc::new(self.inner.try_clone()?),
registration: None,
})
}
pub fn into_std(self) -> io::Result<StdUdpSocket> {
match Arc::try_unwrap(self.inner) {
Ok(socket) => Ok(socket),
Err(shared) => shared.try_clone(),
}
}
pub fn from_std(socket: StdUdpSocket) -> io::Result<Self> {
#[cfg(target_arch = "wasm32")]
{
let _ = socket;
return browser_udp_unsupported_result("UdpSocket::from_std");
}
#[cfg(not(target_arch = "wasm32"))]
{
socket.set_nonblocking(true)?;
Ok(Self {
inner: Arc::new(socket),
registration: None,
})
}
}
#[cfg(target_arch = "wasm32")]
fn register_interest(&self, cx: &Context<'_>, interest: Interest) -> io::Result<()> {
let _ = (cx, interest);
browser_udp_unsupported_result("UdpSocket::register_interest")
}
#[cfg(not(target_arch = "wasm32"))]
fn register_interest(&mut self, cx: &Context<'_>, interest: Interest) -> io::Result<()> {
let target_interest = interest;
if let Some(registration) = &mut self.registration {
match registration.rearm(target_interest, cx.waker()) {
Ok(true) => return Ok(()),
Ok(false) => {
self.registration = None;
}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {
self.registration = None;
crate::net::tcp::stream::fallback_rewake(cx);
return Ok(());
}
Err(err) => return Err(err),
}
}
let Some(current) = Cx::current() else {
crate::net::tcp::stream::fallback_rewake(cx);
return Ok(());
};
let Some(driver) = current.io_driver_handle() else {
crate::net::tcp::stream::fallback_rewake(cx);
return Ok(());
};
match driver.register(&*self.inner, target_interest, cx.waker().clone()) {
Ok(registration) => {
self.registration = Some(registration);
Ok(())
}
Err(err) if err.kind() == io::ErrorKind::Unsupported => {
crate::net::tcp::stream::fallback_rewake(cx);
Ok(())
}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {
crate::net::tcp::stream::fallback_rewake(cx);
Ok(())
}
Err(err) => Err(err),
}
}
}
#[derive(Debug)]
pub struct RecvStream<'a> {
socket: &'a mut UdpSocket,
buf: Vec<u8>,
}
impl<'a> RecvStream<'a> {
#[must_use]
pub fn new(socket: &'a mut UdpSocket, buf_size: usize) -> Self {
Self {
socket,
buf: vec![0u8; buf_size.max(1)],
}
}
}
impl Stream for RecvStream<'_> {
type Item = io::Result<(Vec<u8>, SocketAddr)>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.socket.poll_recv_from(cx, &mut this.buf) {
Poll::Ready(Ok((n, addr))) => Poll::Ready(Some(Ok((this.buf[..n].to_vec(), addr)))),
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug)]
pub struct SendSink<'a> {
socket: &'a mut UdpSocket,
}
impl<'a> SendSink<'a> {
#[must_use]
pub fn new(socket: &'a mut UdpSocket) -> Self {
Self { socket }
}
pub async fn send_to<A: ToSocketAddrs + Send + 'static>(
&mut self,
buf: &[u8],
target: A,
) -> io::Result<usize> {
self.socket.send_to(buf, target).await
}
pub async fn send_datagram(&mut self, datagram: (Vec<u8>, SocketAddr)) -> io::Result<usize> {
self.socket.send_to(&datagram.0, datagram.1).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::{IoDriverHandle, LabReactor};
use crate::stream::StreamExt;
use crate::types::{Budget, RegionId, TaskId};
use futures_lite::future;
#[cfg(unix)]
use nix::fcntl::{FcntlArg, OFlag, fcntl};
use std::sync::Arc;
use std::task::Waker;
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
#[test]
fn udp_send_recv_from() {
future::block_on(async {
let mut server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let server_addr = server.local_addr().unwrap();
let mut client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let payload = b"ping";
let sent = client.send_to(payload, server_addr).await.unwrap();
assert_eq!(sent, payload.len());
let mut buf = [0u8; 16];
let (n, peer) = server.recv_from(&mut buf).await.unwrap();
assert_eq!(&buf[..n], payload);
assert_eq!(peer, client.local_addr().unwrap());
});
}
#[test]
fn udp_connected_send_recv() {
future::block_on(async {
let mut server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let server_addr = server.local_addr().unwrap();
let mut client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let client_addr = client.local_addr().unwrap();
server.connect(client_addr).await.unwrap();
client.connect(server_addr).await.unwrap();
let sent = client.send(b"hello").await.unwrap();
assert_eq!(sent, 5);
let mut buf = [0u8; 16];
let n = server.recv(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"hello");
let sent = server.send(b"world").await.unwrap();
assert_eq!(sent, 5);
let n = client.recv(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"world");
});
}
#[test]
fn udp_recv_stream_yields_datagram() {
future::block_on(async {
let mut server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let server_addr = server.local_addr().unwrap();
let mut client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
client.send_to(b"stream", server_addr).await.unwrap();
let mut stream = server.recv_stream(32);
let item = stream.next().await.unwrap().unwrap();
assert_eq!(item.0, b"stream");
});
}
#[test]
fn udp_recv_stream_zero_buffer_does_not_drop_nonempty_datagram() {
future::block_on(async {
let mut server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let server_addr = server.local_addr().unwrap();
let mut client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
client.send_to(b"stream", server_addr).await.unwrap();
let mut stream = server.recv_stream(0);
let item = stream.next().await.unwrap().unwrap();
assert_eq!(item.0, b"s");
});
}
#[test]
fn udp_peek_does_not_consume() {
future::block_on(async {
let mut server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let server_addr = server.local_addr().unwrap();
let mut client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
client.send_to(b"peek", server_addr).await.unwrap();
let mut buf = [0u8; 16];
let (n, _) = server.peek_from(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"peek");
let (n, _) = server.recv_from(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"peek");
});
}
#[test]
fn udp_recv_from_rejects_empty_buffer_without_consuming_datagram() {
future::block_on(async {
let mut server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let server_addr = server.local_addr().unwrap();
let mut client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let client_addr = client.local_addr().unwrap();
client.send_to(b"ping", server_addr).await.unwrap();
let mut empty = [];
let err = server.recv_from(&mut empty).await.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
let mut buf = [0u8; 16];
let (n, peer) = server.recv_from(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"ping");
assert_eq!(peer, client_addr);
});
}
#[test]
fn udp_mdns_multicast_tuple_matches_rfc6762() {
let std_socket = StdUdpSocket::bind("0.0.0.0:0").expect("bind socket");
let socket = UdpSocket::from_std(std_socket).expect("wrap socket");
let mdns_group = Ipv4Addr::new(224, 0, 0, 251);
let mdns_interface = Ipv4Addr::UNSPECIFIED;
socket
.join_multicast_v4(mdns_group, mdns_interface)
.expect("join mDNS group");
socket
.leave_multicast_v4(mdns_group, mdns_interface)
.expect("leave mDNS group");
let mdns_socket = std::net::SocketAddrV4::new(mdns_group, 5353);
assert_eq!(mdns_socket.to_string(), "224.0.0.251:5353");
}
#[test]
fn udp_socket_registers_on_wouldblock() {
let std_server = StdUdpSocket::bind("127.0.0.1:0").expect("bind server");
std_server.set_nonblocking(true).expect("nonblocking");
let reactor = Arc::new(LabReactor::new());
let driver = IoDriverHandle::new(reactor);
let cx = Cx::new_with_observability(
RegionId::new_for_test(0, 0),
TaskId::new_for_test(0, 0),
Budget::INFINITE,
None,
Some(driver),
None,
);
let _guard = Cx::set_current(Some(cx));
let mut socket = UdpSocket::from_std(std_server).expect("wrap socket");
let waker = noop_waker();
let cx = Context::from_waker(&waker);
let mut buf = [0u8; 8];
let poll = socket.poll_recv_from(&cx, &mut buf);
assert!(matches!(poll, Poll::Pending));
assert!(socket.registration.is_some());
}
#[test]
fn udp_try_clone_creates_independent_socket() {
future::block_on(async {
let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let cloned = socket.try_clone().unwrap();
assert_eq!(socket.local_addr().unwrap(), cloned.local_addr().unwrap());
assert!(cloned.registration.is_none());
});
}
#[cfg(unix)]
#[test]
fn udp_from_std_forces_nonblocking_mode() {
let std_socket = StdUdpSocket::bind("127.0.0.1:0").expect("bind socket");
let socket = UdpSocket::from_std(std_socket).expect("wrap socket");
let flags = fcntl(socket.inner.as_ref(), FcntlArg::F_GETFL).expect("read socket flags");
let is_nonblocking = OFlag::from_bits_truncate(flags).contains(OFlag::O_NONBLOCK);
assert!(
is_nonblocking,
"UdpSocket::from_std should force nonblocking mode"
);
}
#[test]
fn udp_large_datagram() {
future::block_on(async {
let mut server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let server_addr = server.local_addr().unwrap();
let mut client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let payload = vec![0xAB; 8192];
let sent = client.send_to(&payload, server_addr).await.unwrap();
assert_eq!(sent, 8192);
let mut buf = vec![0u8; 16384];
let (n, _) = server.recv_from(&mut buf).await.unwrap();
assert_eq!(n, 8192);
assert!(buf[..n].iter().all(|&b| b == 0xAB));
});
}
#[test]
fn udp_cancelled_operations_return_interrupted_without_registration() {
future::block_on(async {
let mut poll_recv_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let poll_recv_addr = poll_recv_socket.local_addr().unwrap();
let mut poll_send_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let poll_send_addr = poll_send_socket.local_addr().unwrap();
poll_send_socket.connect(poll_recv_addr).await.unwrap();
poll_recv_socket.connect(poll_send_addr).await.unwrap();
let mut send_to_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let peer_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let peer_addr = peer_socket.local_addr().unwrap();
let connect_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let cx = Cx::for_testing();
cx.set_cancel_requested(true);
let _guard = Cx::set_current(Some(cx));
let waker = noop_waker();
let task_cx = Context::from_waker(&waker);
let mut buf = [0u8; 16];
let connect_err = connect_socket.connect(peer_addr).await.unwrap_err();
assert_eq!(connect_err.kind(), io::ErrorKind::Interrupted);
assert!(connect_socket.peer_addr().is_err());
let send_to =
send_to_socket.poll_send_to(&task_cx, b"ping", std::slice::from_ref(&peer_addr));
assert!(matches!(
send_to,
Poll::Ready(Err(ref err)) if err.kind() == io::ErrorKind::Interrupted
));
assert!(send_to_socket.registration.is_none());
let recv_from = poll_recv_socket.poll_recv_from(&task_cx, &mut buf);
assert!(matches!(
recv_from,
Poll::Ready(Err(ref err)) if err.kind() == io::ErrorKind::Interrupted
));
assert!(poll_recv_socket.registration.is_none());
let send = poll_send_socket.poll_send(&task_cx, b"hello");
assert!(matches!(
send,
Poll::Ready(Err(ref err)) if err.kind() == io::ErrorKind::Interrupted
));
assert!(poll_send_socket.registration.is_none());
let recv = poll_recv_socket.poll_recv(&task_cx, &mut buf);
assert!(matches!(
recv,
Poll::Ready(Err(ref err)) if err.kind() == io::ErrorKind::Interrupted
));
assert!(poll_recv_socket.registration.is_none());
let peek_from = poll_recv_socket.poll_peek_from(&task_cx, &mut buf);
assert!(matches!(
peek_from,
Poll::Ready(Err(ref err)) if err.kind() == io::ErrorKind::Interrupted
));
assert!(poll_recv_socket.registration.is_none());
});
}
}