use crate::cx::Cx;
use crate::net::unix::stream::UCred;
use crate::runtime::io_driver::IoRegistration;
use crate::runtime::reactor::Interest;
use nix::errno::Errno;
use nix::sys::socket::{self, MsgFlags, SockaddrLike};
use std::io;
use std::os::unix::net::{self, SocketAddr};
use std::path::{Path, PathBuf};
use std::task::{Context, Poll};
#[inline]
fn empty_datagram_recv_from_buffer_error() -> io::Error {
io::Error::new(
io::ErrorKind::InvalidInput,
"UnixDatagram::recv_from requires a non-empty buffer",
)
}
#[derive(Debug)]
pub struct UnixDatagram {
registration: Option<IoRegistration>,
inner: net::UnixDatagram,
path: Option<PathBuf>,
cleanup_identity: Option<super::listener::SocketFileIdentity>,
}
impl UnixDatagram {
fn from_bound_with<F>(path: &Path, inner: net::UnixDatagram, configure: F) -> io::Result<Self>
where
F: FnOnce(&net::UnixDatagram) -> io::Result<()>,
{
let (inner, cleanup_identity) =
super::listener::finalize_bound_socket(path, inner, configure)?;
Ok(Self {
inner,
path: Some(path.to_path_buf()),
cleanup_identity,
registration: None,
})
}
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let path = path.as_ref();
let inner = net::UnixDatagram::bind(path)?;
Self::from_bound_with(path, inner, |socket| socket.set_nonblocking(true))
}
#[cfg(target_os = "linux")]
pub fn bind_abstract(name: &[u8]) -> io::Result<Self> {
use std::os::linux::net::SocketAddrExt;
let addr = SocketAddr::from_abstract_name(name)?;
let inner = net::UnixDatagram::bind_addr(&addr)?;
inner.set_nonblocking(true)?;
Ok(Self {
inner,
path: None, cleanup_identity: None,
registration: None,
})
}
pub fn unbound() -> io::Result<Self> {
let inner = net::UnixDatagram::unbound()?;
inner.set_nonblocking(true)?;
Ok(Self {
inner,
path: None,
cleanup_identity: None,
registration: None,
})
}
pub fn pair() -> io::Result<(Self, Self)> {
let (s1, s2) = net::UnixDatagram::pair()?;
s1.set_nonblocking(true)?;
s2.set_nonblocking(true)?;
Ok((
Self {
inner: s1,
path: None,
cleanup_identity: None,
registration: None,
},
Self {
inner: s2,
path: None,
cleanup_identity: None,
registration: None,
},
))
}
pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
self.inner.connect(path)
}
#[cfg(target_os = "linux")]
pub fn connect_abstract(&self, name: &[u8]) -> io::Result<()> {
use std::os::linux::net::SocketAddrExt;
let addr = SocketAddr::from_abstract_name(name)?;
self.inner.connect_addr(&addr)
}
fn register_interest(&mut self, cx: &Context<'_>, interest: Interest) -> io::Result<()> {
let target_interest = interest;
if let Some(registration) = &mut self.registration {
match registration.rearm(target_interest, cx.waker()) {
Ok(true) => return Ok(()),
Ok(false) => {
self.registration = None;
}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {
self.registration = None;
crate::net::tcp::stream::fallback_rewake(cx);
return Ok(());
}
Err(err) => return Err(err),
}
}
let Some(current) = Cx::current() else {
crate::net::tcp::stream::fallback_rewake(cx);
return Ok(());
};
let Some(driver) = current.io_driver_handle() else {
crate::net::tcp::stream::fallback_rewake(cx);
return Ok(());
};
match driver.register(&self.inner, interest, cx.waker().clone()) {
Ok(registration) => {
self.registration = Some(registration);
Ok(())
}
Err(err) if err.kind() == io::ErrorKind::Unsupported => {
crate::net::tcp::stream::fallback_rewake(cx);
Ok(())
}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {
crate::net::tcp::stream::fallback_rewake(cx);
Ok(())
}
Err(err) => Err(err),
}
}
pub async fn send_to<P: AsRef<Path>>(&mut self, buf: &[u8], path: P) -> io::Result<usize> {
let path = path.as_ref().to_path_buf();
std::future::poll_fn(|cx| {
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
match self.inner.send_to(buf, &path) {
Ok(n) => Poll::Ready(Ok(n)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Err(err) = self.register_interest(cx, Interest::WRITABLE) {
return Poll::Ready(Err(err));
}
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
})
.await
}
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
if buf.is_empty() {
return Err(empty_datagram_recv_from_buffer_error());
}
std::future::poll_fn(|cx| {
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
match self.inner.recv_from(buf) {
Ok((n, addr)) => Poll::Ready(Ok((n, addr))),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Err(err) = self.register_interest(cx, Interest::READABLE) {
return Poll::Ready(Err(err));
}
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
})
.await
}
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
std::future::poll_fn(|cx| {
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
match self.inner.send(buf) {
Ok(n) => Poll::Ready(Ok(n)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Err(err) = self.register_interest(cx, Interest::WRITABLE) {
return Poll::Ready(Err(err));
}
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
})
.await
}
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
std::future::poll_fn(|cx| {
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
match self.inner.recv(buf) {
Ok(n) => Poll::Ready(Ok(n)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Err(err) = self.register_interest(cx, Interest::READABLE) {
return Poll::Ready(Err(err));
}
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
})
.await
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.local_addr()
}
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner.peer_addr()
}
#[cfg(any(
target_os = "linux",
target_os = "macos",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd"
))]
pub fn peer_cred(&self) -> io::Result<UCred> {
datagram_peer_cred_impl(&self.inner)
}
pub fn from_std(socket: net::UnixDatagram) -> io::Result<Self> {
socket.set_nonblocking(true)?;
Ok(Self {
inner: socket,
path: None, cleanup_identity: None,
registration: None,
})
}
#[must_use]
pub fn as_std(&self) -> &net::UnixDatagram {
&self.inner
}
pub fn take_path(&mut self) -> Option<PathBuf> {
self.cleanup_identity = None;
self.path.take()
}
pub fn poll_recv_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
use std::os::unix::io::AsRawFd;
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
let mut buf = [0u8; 1];
match socket::recv(
self.inner.as_raw_fd(),
&mut buf,
MsgFlags::MSG_PEEK | MsgFlags::MSG_DONTWAIT,
) {
Ok(_) => Poll::Ready(Ok(())),
Err(errno) if errno == Errno::EAGAIN || errno == Errno::EWOULDBLOCK => {
if let Err(e) = self.register_interest(cx, Interest::READABLE) {
return Poll::Ready(Err(e));
}
Poll::Pending
}
Err(errno) => Poll::Ready(Err(io::Error::from_raw_os_error(errno as i32))),
}
}
pub fn poll_send_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
use nix::poll::{PollFd, PollFlags, PollTimeout, poll};
use std::os::unix::io::AsFd;
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
let mut fds = [PollFd::new(self.inner.as_fd(), PollFlags::POLLOUT)];
match poll(&mut fds, PollTimeout::ZERO) {
Ok(0) => {
if let Err(e) = self.register_interest(cx, Interest::WRITABLE) {
return Poll::Ready(Err(e));
}
Poll::Pending
}
Ok(_) => {
let Some(revents) = fds[0].revents() else {
return Poll::Ready(Err(io::Error::other("poll returned unknown event bits")));
};
if revents.contains(PollFlags::POLLOUT) {
Poll::Ready(Ok(()))
} else if revents
.intersects(PollFlags::POLLERR | PollFlags::POLLHUP | PollFlags::POLLNVAL)
{
if let Ok(Some(err)) = self.inner.take_error() {
return Poll::Ready(Err(err));
}
Poll::Ready(Err(io::Error::other(format!(
"poll indicates socket error: {revents:?}"
))))
} else {
if let Err(e) = self.register_interest(cx, Interest::WRITABLE) {
return Poll::Ready(Err(e));
}
Poll::Pending
}
}
Err(errno) => Poll::Ready(Err(io::Error::from_raw_os_error(errno as i32))),
}
}
pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
use std::os::unix::io::AsRawFd;
std::future::poll_fn(|cx| {
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
match socket::recv(
self.inner.as_raw_fd(),
buf,
MsgFlags::MSG_PEEK | MsgFlags::MSG_DONTWAIT,
) {
Ok(n) => Poll::Ready(Ok(n)),
Err(errno) if errno == Errno::EAGAIN || errno == Errno::EWOULDBLOCK => {
if let Err(e) = self.register_interest(cx, Interest::READABLE) {
return Poll::Ready(Err(e));
}
Poll::Pending
}
Err(errno) => Poll::Ready(Err(io::Error::from_raw_os_error(errno as i32))),
}
})
.await
}
fn socket_addr_from_unix_addr(addr: &socket::UnixAddr) -> io::Result<SocketAddr> {
fn get_unnamed() -> io::Result<SocketAddr> {
static UNNAMED: std::sync::OnceLock<SocketAddr> = std::sync::OnceLock::new();
if let Some(addr) = UNNAMED.get() {
return Ok(addr.clone());
}
let addr = net::UnixDatagram::unbound()?.local_addr()?;
let _ = UNNAMED.set(addr.clone());
Ok(addr)
}
if addr.len() as usize <= std::mem::offset_of!(libc::sockaddr_un, sun_path) {
return get_unnamed();
}
if let Some(path) = addr.path() {
return SocketAddr::from_pathname(path)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e));
}
#[cfg(target_os = "linux")]
if let Some(name) = addr.as_abstract() {
use std::os::linux::net::SocketAddrExt;
return <SocketAddr as SocketAddrExt>::from_abstract_name(name)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e));
}
get_unnamed()
}
pub async fn peek_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
use std::io::IoSliceMut;
use std::os::unix::io::AsRawFd;
std::future::poll_fn(|cx| {
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "cancelled")));
}
let mut iov = [IoSliceMut::new(buf)];
match socket::recvmsg::<socket::UnixAddr>(
self.inner.as_raw_fd(),
&mut iov,
None,
MsgFlags::MSG_PEEK | MsgFlags::MSG_DONTWAIT,
) {
Ok(msg) => {
let Some(addr) = msg.address else {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidData,
"unix datagram recvmsg missing source address",
)));
};
let addr = Self::socket_addr_from_unix_addr(&addr)?;
Poll::Ready(Ok((msg.bytes, addr)))
}
Err(errno) if errno == Errno::EAGAIN || errno == Errno::EWOULDBLOCK => {
if let Err(e) = self.register_interest(cx, Interest::READABLE) {
return Poll::Ready(Err(e));
}
Poll::Pending
}
Err(errno) => Poll::Ready(Err(io::Error::from_raw_os_error(errno as i32))),
}
})
.await
}
pub fn set_read_timeout(&self, dur: Option<std::time::Duration>) -> io::Result<()> {
self.inner.set_read_timeout(dur)
}
pub fn set_write_timeout(&self, dur: Option<std::time::Duration>) -> io::Result<()> {
self.inner.set_write_timeout(dur)
}
pub fn read_timeout(&self) -> io::Result<Option<std::time::Duration>> {
self.inner.read_timeout()
}
pub fn write_timeout(&self) -> io::Result<Option<std::time::Duration>> {
self.inner.write_timeout()
}
}
impl Drop for UnixDatagram {
fn drop(&mut self) {
if let (Some(path), Some(identity)) = (&self.path, self.cleanup_identity) {
let _ = super::listener::remove_socket_file_if_same_inode(path, identity);
}
}
}
#[cfg(unix)]
impl std::os::unix::io::AsRawFd for UnixDatagram {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
self.inner.as_raw_fd()
}
}
#[cfg(target_os = "linux")]
fn datagram_peer_cred_impl(socket: &net::UnixDatagram) -> io::Result<UCred> {
use nix::sys::socket::sockopt;
let cred = socket::getsockopt(socket, sockopt::PeerCredentials)
.map_err(|e| io::Error::from_raw_os_error(e as i32))?;
Ok(UCred {
uid: cred.uid() as u32,
gid: cred.gid() as u32,
pid: Some(cred.pid()),
})
}
#[cfg(any(
target_os = "macos",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd"
))]
fn datagram_peer_cred_impl(socket: &net::UnixDatagram) -> io::Result<UCred> {
let (uid, gid) =
nix::unistd::getpeereid(socket).map_err(|e| io::Error::from_raw_os_error(e as i32))?;
Ok(UCred {
uid: uid.as_raw(),
gid: gid.as_raw(),
pid: None, })
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::task::{Context, Waker};
use tempfile::tempdir;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
#[test]
fn test_pair() {
init_test("test_datagram_pair");
futures_lite::future::block_on(async {
let (mut a, mut b) = UnixDatagram::pair().expect("pair failed");
a.send(b"hello").await.expect("send failed");
let mut buf = [0u8; 5];
let n = b.recv(&mut buf).await.expect("recv failed");
crate::assert_with_log!(n == 5, "received bytes", 5, n);
crate::assert_with_log!(&buf == b"hello", "received data", b"hello", buf);
});
crate::test_complete!("test_datagram_pair");
}
#[test]
fn test_bind_and_send_to() {
init_test("test_datagram_bind_send_to");
futures_lite::future::block_on(async {
let dir = tempdir().expect("create temp dir");
let server_path = dir.path().join("server.sock");
let mut server = UnixDatagram::bind(&server_path).expect("bind failed");
let mut client = UnixDatagram::unbound().expect("unbound failed");
let sent = client
.send_to(b"hello", &server_path)
.await
.expect("send_to failed");
crate::assert_with_log!(sent == 5, "sent bytes", 5, sent);
let mut buf = [0u8; 5];
let (n, _addr) = server.recv_from(&mut buf).await.expect("recv_from failed");
crate::assert_with_log!(n == 5, "received bytes", 5, n);
crate::assert_with_log!(&buf == b"hello", "received data", b"hello", buf);
});
crate::test_complete!("test_datagram_bind_send_to");
}
#[test]
fn test_peek_from_reports_peer_and_preserves_data() {
init_test("test_datagram_peek_from");
futures_lite::future::block_on(async {
let dir = tempdir().expect("create temp dir");
let server_path = dir.path().join("server.sock");
let client_path = dir.path().join("client.sock");
let mut server = UnixDatagram::bind(&server_path).expect("bind server failed");
let mut client = UnixDatagram::bind(&client_path).expect("bind client failed");
client
.send_to(b"peek", &server_path)
.await
.expect("send_to failed");
let mut peek_buf = [0u8; 4];
let (n, addr) = server
.peek_from(&mut peek_buf)
.await
.expect("peek_from failed");
crate::assert_with_log!(n == 4, "peek bytes", 4, n);
crate::assert_with_log!(&peek_buf == b"peek", "peek data", b"peek", peek_buf);
let peek_path = addr.as_pathname().map(std::path::Path::to_path_buf);
crate::assert_with_log!(
peek_path.as_ref() == Some(&client_path),
"peek addr",
Some(&client_path),
peek_path.as_ref()
);
let mut recv_buf = [0u8; 4];
let (n2, addr2) = server
.recv_from(&mut recv_buf)
.await
.expect("recv_from failed");
crate::assert_with_log!(n2 == 4, "recv bytes", 4, n2);
crate::assert_with_log!(&recv_buf == b"peek", "recv data", b"peek", recv_buf);
let recv_path = addr2.as_pathname().map(std::path::Path::to_path_buf);
crate::assert_with_log!(
recv_path.as_ref() == Some(&client_path),
"recv addr",
Some(&client_path),
recv_path.as_ref()
);
});
crate::test_complete!("test_datagram_peek_from");
}
#[test]
fn test_peek_from_unbound_sender_reports_unnamed_addr() {
init_test("test_datagram_peek_from_unbound_sender_reports_unnamed_addr");
futures_lite::future::block_on(async {
let dir = tempdir().expect("create temp dir");
let server_path = dir.path().join("server.sock");
let mut server = UnixDatagram::bind(&server_path).expect("bind server failed");
let mut client = UnixDatagram::unbound().expect("unbound failed");
client
.send_to(b"peek", &server_path)
.await
.expect("send_to failed");
let mut peek_buf = [0u8; 4];
let (peeked, peek_addr) = server
.peek_from(&mut peek_buf)
.await
.expect("peek_from failed");
crate::assert_with_log!(peeked == 4, "peek bytes", 4, peeked);
crate::assert_with_log!(&peek_buf == b"peek", "peek data", b"peek", peek_buf);
crate::assert_with_log!(
peek_addr.is_unnamed(),
"peek addr unnamed",
true,
peek_addr.is_unnamed()
);
crate::assert_with_log!(
peek_addr.as_pathname().is_none(),
"peek addr pathname",
"None",
format!("{:?}", peek_addr.as_pathname())
);
let mut recv_buf = [0u8; 4];
let (received, recv_addr) = server
.recv_from(&mut recv_buf)
.await
.expect("recv_from failed");
crate::assert_with_log!(received == 4, "recv bytes", 4, received);
crate::assert_with_log!(&recv_buf == b"peek", "recv data", b"peek", recv_buf);
crate::assert_with_log!(
recv_addr.is_unnamed(),
"recv addr unnamed",
true,
recv_addr.is_unnamed()
);
crate::assert_with_log!(
recv_addr.as_pathname().is_none(),
"recv addr pathname",
"None",
format!("{:?}", recv_addr.as_pathname())
);
});
crate::test_complete!("test_datagram_peek_from_unbound_sender_reports_unnamed_addr");
}
#[test]
fn test_recv_from_rejects_empty_buffer_without_consuming_datagram() {
init_test("test_datagram_recv_from_empty_buffer");
futures_lite::future::block_on(async {
let dir = tempdir().expect("create temp dir");
let server_path = dir.path().join("server.sock");
let client_path = dir.path().join("client.sock");
let mut server = UnixDatagram::bind(&server_path).expect("bind server failed");
let mut client = UnixDatagram::bind(&client_path).expect("bind client failed");
client
.send_to(b"ping", &server_path)
.await
.expect("send_to failed");
let mut empty = [];
let err = server
.recv_from(&mut empty)
.await
.expect_err("empty buffer must fail");
crate::assert_with_log!(
err.kind() == io::ErrorKind::InvalidInput,
"error kind",
io::ErrorKind::InvalidInput,
err.kind()
);
let mut buf = [0u8; 4];
let (received, addr) = server
.recv_from(&mut buf)
.await
.expect("recv_from after error failed");
crate::assert_with_log!(received == 4, "recv bytes", 4, received);
crate::assert_with_log!(&buf == b"ping", "recv data", b"ping", buf);
let recv_path = addr.as_pathname().map(std::path::Path::to_path_buf);
crate::assert_with_log!(
recv_path.as_ref() == Some(&client_path),
"recv addr",
Some(&client_path),
recv_path.as_ref()
);
});
crate::test_complete!("test_datagram_recv_from_empty_buffer");
}
#[test]
fn test_connect() {
init_test("test_datagram_connect");
futures_lite::future::block_on(async {
let dir = tempdir().expect("create temp dir");
let server_path = dir.path().join("server.sock");
let client_path = dir.path().join("client.sock");
let mut server = UnixDatagram::bind(&server_path).expect("bind server failed");
let mut client = UnixDatagram::bind(&client_path).expect("bind client failed");
client.connect(&server_path).expect("connect failed");
client.send(b"ping").await.expect("send failed");
let mut buf = [0u8; 4];
let (n, addr) = server.recv_from(&mut buf).await.expect("recv_from failed");
crate::assert_with_log!(n == 4, "received bytes", 4, n);
crate::assert_with_log!(&buf == b"ping", "received data", b"ping", buf);
let pathname = addr.as_pathname();
crate::assert_with_log!(pathname.is_some(), "has pathname", true, pathname.is_some());
});
crate::test_complete!("test_datagram_connect");
}
#[test]
fn test_socket_cleanup_on_drop() {
init_test("test_datagram_cleanup_on_drop");
let dir = tempdir().expect("create temp dir");
let path = dir.path().join("cleanup_test.sock");
{
let _socket = UnixDatagram::bind(&path).expect("bind failed");
let exists = path.exists();
crate::assert_with_log!(exists, "socket exists", true, exists);
}
let exists = path.exists();
crate::assert_with_log!(!exists, "socket cleaned up", false, exists);
crate::test_complete!("test_datagram_cleanup_on_drop");
}
#[test]
fn test_from_std_no_cleanup() {
init_test("test_datagram_from_std_no_cleanup");
let dir = tempdir().expect("create temp dir");
let path = dir.path().join("from_std_test.sock");
let std_socket = net::UnixDatagram::bind(&path).expect("bind failed");
{
let _socket = UnixDatagram::from_std(std_socket).expect("from_std failed");
}
let exists = path.exists();
crate::assert_with_log!(exists, "socket remains", true, exists);
std::fs::remove_file(&path).ok();
crate::test_complete!("test_datagram_from_std_no_cleanup");
}
#[test]
fn test_take_path_prevents_cleanup() {
init_test("test_datagram_take_path");
let dir = tempdir().expect("create temp dir");
let path = dir.path().join("take_path_test.sock");
{
let mut socket = UnixDatagram::bind(&path).expect("bind failed");
let taken = socket.take_path();
crate::assert_with_log!(taken.is_some(), "taken some", true, taken.is_some());
}
let exists = path.exists();
crate::assert_with_log!(exists, "socket remains", true, exists);
std::fs::remove_file(&path).ok();
crate::test_complete!("test_datagram_take_path");
}
#[test]
fn test_bind_refuses_stale_socket_file() {
init_test("test_datagram_bind_refuses_stale_socket_file");
let dir = tempdir().expect("create temp dir");
let path = dir.path().join("stale_datagram.sock");
let stale = net::UnixDatagram::bind(&path).expect("create stale socket");
drop(stale);
crate::assert_with_log!(path.exists(), "stale socket exists", true, path.exists());
let err = UnixDatagram::bind(&path).expect_err("bind should refuse stale socket path");
crate::assert_with_log!(
err.kind() == io::ErrorKind::AddrInUse,
"bind error kind",
io::ErrorKind::AddrInUse,
err.kind()
);
crate::assert_with_log!(path.exists(), "stale socket preserved", true, path.exists());
crate::test_complete!("test_datagram_bind_refuses_stale_socket_file");
}
#[test]
fn test_bind_refuses_live_socket_file() {
init_test("test_datagram_bind_refuses_live_socket_file");
let dir = tempdir().expect("create temp dir");
let path = dir.path().join("live_datagram.sock");
let original = net::UnixDatagram::bind(&path).expect("create live socket");
let err = UnixDatagram::bind(&path).expect_err("bind should refuse live socket path");
crate::assert_with_log!(
err.kind() == io::ErrorKind::AddrInUse,
"bind error kind",
io::ErrorKind::AddrInUse,
err.kind()
);
crate::assert_with_log!(path.exists(), "live socket preserved", true, path.exists());
drop(original);
std::fs::remove_file(&path).ok();
crate::test_complete!("test_datagram_bind_refuses_live_socket_file");
}
#[test]
fn replacement_socket_path_survives_old_datagram_drop() {
init_test("replacement_socket_path_survives_old_datagram_drop");
let dir = tempdir().expect("create temp dir");
let path = dir.path().join("datagram_rebind.sock");
let original = UnixDatagram::bind(&path).expect("bind failed");
crate::assert_with_log!(path.exists(), "socket exists", true, path.exists());
std::fs::remove_file(&path).expect("unlink original path");
let replacement = net::UnixDatagram::bind(&path).expect("bind replacement failed");
crate::assert_with_log!(path.exists(), "replacement exists", true, path.exists());
drop(original);
crate::assert_with_log!(
path.exists(),
"old datagram drop preserved replacement path",
true,
path.exists()
);
drop(replacement);
std::fs::remove_file(&path).ok();
crate::test_complete!("replacement_socket_path_survives_old_datagram_drop");
}
#[test]
fn test_bind_cleanup_on_post_bind_init_failure() {
init_test("test_datagram_bind_cleanup_on_post_bind_init_failure");
let dir = tempdir().expect("create temp dir");
let path = dir.path().join("datagram_init_failure.sock");
crate::net::unix::listener::remove_stale_socket_file(&path).expect("clear stale socket");
let inner = net::UnixDatagram::bind(&path).expect("bind failed");
let err = UnixDatagram::from_bound_with(&path, inner, |_socket| {
Err(io::Error::other("injected datagram init failure"))
})
.expect_err("post-bind init should fail");
crate::assert_with_log!(
err.kind() == io::ErrorKind::Other,
"init error kind",
io::ErrorKind::Other,
err.kind()
);
crate::assert_with_log!(
!path.exists(),
"socket path cleaned after init failure",
false,
path.exists()
);
crate::test_complete!("test_datagram_bind_cleanup_on_post_bind_init_failure");
}
#[test]
fn test_local_addr() {
init_test("test_datagram_local_addr");
let dir = tempdir().expect("create temp dir");
let path = dir.path().join("local_addr_test.sock");
let socket = UnixDatagram::bind(&path).expect("bind failed");
let addr = socket.local_addr().expect("local_addr failed");
let pathname = addr.as_pathname();
crate::assert_with_log!(pathname.is_some(), "has pathname", true, pathname.is_some());
let pathname = pathname.unwrap();
crate::assert_with_log!(pathname == path, "pathname matches", path, pathname);
crate::test_complete!("test_datagram_local_addr");
}
#[test]
fn test_unbound_local_addr() {
init_test("test_datagram_unbound_local_addr");
let socket = UnixDatagram::unbound().expect("unbound failed");
let addr = socket.local_addr().expect("local_addr failed");
let pathname = addr.as_pathname();
crate::assert_with_log!(
pathname.is_none(),
"no pathname",
"None",
format!("{:?}", pathname)
);
crate::test_complete!("test_datagram_unbound_local_addr");
}
#[test]
fn test_peek() {
init_test("test_datagram_peek");
futures_lite::future::block_on(async {
let (mut a, mut b) = UnixDatagram::pair().expect("pair failed");
a.send(b"hello").await.expect("send failed");
let mut buf = [0u8; 5];
let n = b.peek(&mut buf).await.expect("peek failed");
crate::assert_with_log!(n == 5, "peeked bytes", 5, n);
crate::assert_with_log!(&buf == b"hello", "peeked data", b"hello", buf);
let mut buf2 = [0u8; 5];
let n = b.recv(&mut buf2).await.expect("recv failed");
crate::assert_with_log!(n == 5, "received bytes", 5, n);
crate::assert_with_log!(&buf2 == b"hello", "received data", b"hello", buf2);
});
crate::test_complete!("test_datagram_peek");
}
#[cfg(target_os = "linux")]
#[test]
fn test_abstract_socket() {
init_test("test_datagram_abstract_socket");
futures_lite::future::block_on(async {
let server_name = b"asupersync_test_datagram_abstract";
let mut server = UnixDatagram::bind_abstract(server_name).expect("bind failed");
let mut client = UnixDatagram::unbound().expect("unbound failed");
client
.connect_abstract(server_name)
.expect("connect failed");
client.send(b"hello").await.expect("send failed");
let mut buf = [0u8; 5];
let n = server.recv(&mut buf).await.expect("recv failed");
crate::assert_with_log!(n == 5, "received bytes", 5, n);
});
crate::test_complete!("test_datagram_abstract_socket");
}
#[test]
fn test_datagram_registers_on_wouldblock() {
use crate::cx::Cx;
use crate::runtime::LabReactor;
use crate::runtime::io_driver::IoDriverHandle;
use crate::types::{Budget, RegionId, TaskId};
init_test("test_datagram_registers_on_wouldblock");
let (mut a, mut b) = UnixDatagram::pair().expect("pair failed");
let reactor = Arc::new(LabReactor::new());
let driver = IoDriverHandle::new(reactor);
let cx = Cx::new_with_observability(
RegionId::new_for_test(0, 0),
TaskId::new_for_test(0, 0),
Budget::INFINITE,
None,
Some(driver),
None,
);
let _guard = Cx::set_current(Some(cx));
let waker = noop_waker();
let mut poll_cx = Context::from_waker(&waker);
let poll = b.poll_recv_ready(&mut poll_cx);
crate::assert_with_log!(
matches!(poll, Poll::Pending),
"poll is Pending",
"Poll::Pending",
format!("{:?}", poll)
);
let has_registration = b.registration.is_some();
crate::assert_with_log!(
has_registration,
"registration present",
true,
has_registration
);
futures_lite::future::block_on(async {
a.send(b"test").await.expect("send failed");
});
let poll = b.poll_recv_ready(&mut poll_cx);
crate::assert_with_log!(
matches!(poll, Poll::Ready(Ok(()))),
"poll is Ready",
"Poll::Ready(Ok(()))",
format!("{:?}", poll)
);
crate::test_complete!("test_datagram_registers_on_wouldblock");
}
#[test]
fn test_datagram_send_registers_on_wouldblock() {
use crate::cx::Cx;
use crate::runtime::LabReactor;
use crate::runtime::io_driver::IoDriverHandle;
use crate::types::{Budget, RegionId, TaskId};
init_test("test_datagram_send_registers_on_wouldblock");
let (mut a, _b) = UnixDatagram::pair().expect("pair failed");
let reactor = Arc::new(LabReactor::new());
let driver = IoDriverHandle::new(reactor);
let cx = Cx::new_with_observability(
RegionId::new_for_test(0, 0),
TaskId::new_for_test(0, 0),
Budget::INFINITE,
None,
Some(driver),
None,
);
let _guard = Cx::set_current(Some(cx));
let waker = noop_waker();
let mut poll_cx = Context::from_waker(&waker);
let poll = a.poll_send_ready(&mut poll_cx);
if matches!(poll, Poll::Pending) {
let has_registration = a.registration.is_some();
crate::assert_with_log!(
has_registration,
"registration present on Pending",
true,
has_registration
);
}
crate::test_complete!("test_datagram_send_registers_on_wouldblock");
}
#[test]
#[allow(clippy::too_many_lines)]
fn test_cancelled_datagram_ops_return_interrupted_without_registration() {
use crate::cx::Cx;
init_test("test_cancelled_datagram_ops_return_interrupted_without_registration");
let dir = tempdir().expect("create temp dir");
let server_path = dir.path().join("cancel-server.sock");
let client_path = dir.path().join("cancel-client.sock");
let mut path_server = UnixDatagram::bind(&server_path).expect("bind server failed");
let mut path_client = UnixDatagram::bind(&client_path).expect("bind client failed");
let (mut connected_a, mut connected_b) = UnixDatagram::pair().expect("pair failed");
let cx = Cx::for_testing();
cx.set_cancel_requested(true);
let _guard = Cx::set_current(Some(cx));
let err = futures_lite::future::block_on(path_client.send_to(b"hello", &server_path))
.expect_err("cancelled send_to must fail");
crate::assert_with_log!(
err.kind() == io::ErrorKind::Interrupted,
"send_to cancelled",
io::ErrorKind::Interrupted,
err.kind()
);
crate::assert_with_log!(
path_client.registration.is_none(),
"send_to registration skipped",
true,
path_client.registration.is_none()
);
let mut buf = [0u8; 8];
let err = futures_lite::future::block_on(path_server.recv_from(&mut buf))
.expect_err("cancelled recv_from must fail");
crate::assert_with_log!(
err.kind() == io::ErrorKind::Interrupted,
"recv_from cancelled",
io::ErrorKind::Interrupted,
err.kind()
);
crate::assert_with_log!(
path_server.registration.is_none(),
"recv_from registration skipped",
true,
path_server.registration.is_none()
);
let err = futures_lite::future::block_on(connected_a.send(b"hello"))
.expect_err("cancelled send must fail");
crate::assert_with_log!(
err.kind() == io::ErrorKind::Interrupted,
"send cancelled",
io::ErrorKind::Interrupted,
err.kind()
);
crate::assert_with_log!(
connected_a.registration.is_none(),
"send registration skipped",
true,
connected_a.registration.is_none()
);
let err = futures_lite::future::block_on(connected_b.recv(&mut buf))
.expect_err("cancelled recv must fail");
crate::assert_with_log!(
err.kind() == io::ErrorKind::Interrupted,
"recv cancelled",
io::ErrorKind::Interrupted,
err.kind()
);
crate::assert_with_log!(
connected_b.registration.is_none(),
"recv registration skipped",
true,
connected_b.registration.is_none()
);
let err = futures_lite::future::block_on(connected_b.peek(&mut buf))
.expect_err("cancelled peek must fail");
crate::assert_with_log!(
err.kind() == io::ErrorKind::Interrupted,
"peek cancelled",
io::ErrorKind::Interrupted,
err.kind()
);
crate::assert_with_log!(
connected_b.registration.is_none(),
"peek registration skipped",
true,
connected_b.registration.is_none()
);
let err = futures_lite::future::block_on(path_server.peek_from(&mut buf))
.expect_err("cancelled peek_from must fail");
crate::assert_with_log!(
err.kind() == io::ErrorKind::Interrupted,
"peek_from cancelled",
io::ErrorKind::Interrupted,
err.kind()
);
crate::assert_with_log!(
path_server.registration.is_none(),
"peek_from registration skipped",
true,
path_server.registration.is_none()
);
let waker = noop_waker();
let mut poll_cx = Context::from_waker(&waker);
let recv_ready = path_server.poll_recv_ready(&mut poll_cx);
crate::assert_with_log!(
matches!(
recv_ready,
Poll::Ready(Err(ref err)) if err.kind() == io::ErrorKind::Interrupted
),
"poll_recv_ready cancelled",
"Poll::Ready(Interrupted)",
format!("{recv_ready:?}")
);
crate::assert_with_log!(
path_server.registration.is_none(),
"poll_recv_ready registration skipped",
true,
path_server.registration.is_none()
);
let send_ready = connected_a.poll_send_ready(&mut poll_cx);
crate::assert_with_log!(
matches!(
send_ready,
Poll::Ready(Err(ref err)) if err.kind() == io::ErrorKind::Interrupted
),
"poll_send_ready cancelled",
"Poll::Ready(Interrupted)",
format!("{send_ready:?}")
);
crate::assert_with_log!(
connected_a.registration.is_none(),
"poll_send_ready registration skipped",
true,
connected_a.registration.is_none()
);
crate::test_complete!(
"test_cancelled_datagram_ops_return_interrupted_without_registration"
);
}
#[cfg(any(
target_os = "linux",
target_os = "macos",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd"
))]
#[test]
fn test_peer_cred() {
init_test("test_datagram_peer_cred");
let (a, b) = UnixDatagram::pair().expect("pair failed");
let cred_a = a.peer_cred().expect("peer_cred a failed");
let cred_b = b.peer_cred().expect("peer_cred b failed");
let user_id = nix::unistd::getuid().as_raw();
let group_id = nix::unistd::getgid().as_raw();
crate::assert_with_log!(cred_a.uid == user_id, "a uid", user_id, cred_a.uid);
crate::assert_with_log!(cred_a.gid == group_id, "a gid", group_id, cred_a.gid);
crate::assert_with_log!(cred_b.uid == user_id, "b uid", user_id, cred_b.uid);
crate::assert_with_log!(cred_b.gid == group_id, "b gid", group_id, cred_b.gid);
#[cfg(target_os = "linux")]
{
let proc_id = i32::try_from(std::process::id()).expect("process id fits in i32");
let pid_a = cred_a.pid.expect("pid should be available on Linux");
let pid_b = cred_b.pid.expect("pid should be available on Linux");
crate::assert_with_log!(pid_a == proc_id, "a pid", proc_id, pid_a);
crate::assert_with_log!(pid_b == proc_id, "b pid", proc_id, pid_b);
}
crate::test_complete!("test_datagram_peer_cred");
}
#[test]
fn test_bind_refuses_non_socket_path() {
init_test("test_datagram_bind_refuses_non_socket_path");
let dir = tempdir().expect("create temp dir");
let path = dir.path().join("not_a_socket");
std::fs::write(&path, b"important data").expect("write file");
let err = UnixDatagram::bind(&path).expect_err("bind should reject non-socket path");
crate::assert_with_log!(
err.kind() == std::io::ErrorKind::AlreadyExists,
"error kind",
std::io::ErrorKind::AlreadyExists,
err.kind()
);
let contents = std::fs::read(&path).expect("read file");
let unchanged = contents == b"important data";
crate::assert_with_log!(unchanged, "file unchanged", true, unchanged);
crate::test_complete!("test_datagram_bind_refuses_non_socket_path");
}
}