use core::time::Duration;
use rusl::error::Errno;
use rusl::network::get_inet_sock_name;
use rusl::platform::{
AddressFamily, NonNegativeI32, PollEvents, SocketAddressInet, SocketAddressUnix, SocketFlags,
SocketOptions, SocketType,
};
use rusl::string::unix_str::UnixStr;
use crate::error::Result;
use crate::io::{Read, Write};
use crate::sock::{
blocking_read_nonblock_sock, blocking_write_nonblock_sock, sock_nonblock_op_poll_if_not_ready,
};
use crate::unix::fd::{AsRawFd, OwnedFd, RawFd};
#[cfg(test)]
mod test;
#[derive(Debug)]
pub struct UnixStream(OwnedFd);
impl UnixStream {
#[inline]
pub fn connect(path: &UnixStr) -> Result<Self> {
Self::do_connect(path, None)
}
fn do_connect(path: &UnixStr, timeout: Option<Duration>) -> Result<Self> {
let fd = rusl::network::socket(
AddressFamily::AF_UNIX,
SocketOptions::new(
SocketType::SOCK_STREAM,
SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
),
0,
)?;
let addr = SocketAddressUnix::try_from_unix(path)?;
if let Err(e) = sock_nonblock_op_poll_if_not_ready(
fd,
Errno::EAGAIN,
PollEvents::POLLOUT,
timeout,
|sock| rusl::network::connect_unix(sock, &addr),
) {
let _ = rusl::unistd::close(fd);
return Err(e);
}
Ok(Self(OwnedFd(fd)))
}
pub fn try_connect(path: &UnixStr) -> Result<Option<Self>> {
let fd = rusl::network::socket(
AddressFamily::AF_UNIX,
SocketOptions::new(
SocketType::SOCK_STREAM,
SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
),
0,
)?;
let addr = SocketAddressUnix::try_from_unix(path)?;
match rusl::network::connect_unix(fd, &addr) {
Ok(()) => {}
Err(e) if e.code == Some(Errno::EAGAIN) => {
let _ = rusl::unistd::close(fd);
return Ok(None);
}
Err(e) => {
let _ = rusl::unistd::close(fd);
return Err(e.into());
}
}
Ok(Some(Self(OwnedFd(fd))))
}
}
impl AsRawFd for UnixStream {
fn as_raw_fd(&self) -> RawFd {
self.0 .0
}
}
impl Read for UnixStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
blocking_read_nonblock_sock(self.0 .0, buf, None)
}
}
impl Write for UnixStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize> {
blocking_write_nonblock_sock(self.0 .0, buf, None)
}
#[inline]
fn flush(&mut self) -> Result<()> {
Ok(())
}
}
pub struct UnixListener(OwnedFd);
impl UnixListener {
pub fn bind(path: &UnixStr) -> Result<Self> {
let fd = rusl::network::socket(
AddressFamily::AF_UNIX,
SocketOptions::new(
SocketType::SOCK_STREAM,
SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
),
0,
)?;
let addr = SocketAddressUnix::try_from_unix(path)?;
if let Err(e) = rusl::network::bind_unix(fd, &addr) {
let _ = rusl::unistd::close(fd);
return Err(e.into());
}
if let Err(e) = rusl::network::listen(fd, NonNegativeI32::MAX) {
let _ = rusl::unistd::close(fd);
return Err(e.into());
}
rusl::network::listen(fd, NonNegativeI32::MAX)?;
Ok(Self(OwnedFd(fd)))
}
#[inline]
pub fn accept(&mut self) -> Result<UnixStream> {
self.do_accept(None)
}
#[inline]
pub fn accept_with_timeout(&mut self, timeout: Duration) -> Result<UnixStream> {
self.do_accept(Some(timeout))
}
#[inline]
pub fn try_accept(&mut self) -> Result<Option<UnixStream>> {
let fd = match rusl::network::accept_unix(
self.0 .0,
SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
) {
Ok((fd, _addr)) => fd,
Err(e) if e.code == Some(Errno::EAGAIN) => return Ok(None),
Err(e) => return Err(e.into()),
};
Ok(Some(UnixStream(OwnedFd(fd))))
}
fn do_accept(&mut self, timeout: Option<Duration>) -> Result<UnixStream> {
let (fd, _addr) = sock_nonblock_op_poll_if_not_ready(
self.0 .0,
Errno::EAGAIN,
PollEvents::POLLIN,
timeout,
|sock| {
rusl::network::accept_unix(
sock,
SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
)
},
)?;
Ok(UnixStream(OwnedFd(fd)))
}
}
#[derive(Debug, Clone, Copy)]
pub struct SocketAddress {
ip: Ip,
port: u16,
}
impl SocketAddress {
#[must_use]
pub fn new(ip: Ip, port: u16) -> Self {
Self { ip, port }
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy)]
pub enum Ip {
V4([u8; 4]),
}
#[derive(Debug)]
pub struct TcpStream(OwnedFd);
#[derive(Debug)]
pub enum TcpTryConnect {
Connected(TcpStream),
InProgress(TcpStreamInProgress),
}
#[derive(Debug)]
pub struct TcpStreamInProgress(OwnedFd, SocketAddressInet);
impl TcpStreamInProgress {
pub fn try_connect(self) -> Result<TcpTryConnect> {
match rusl::network::connect_inet(self.0 .0, &self.1) {
Ok(()) => {}
Err(e) if matches!(e.code, Some(Errno::EINPROGRESS)) => {
return Ok(TcpTryConnect::InProgress(self));
}
Err(e) => {
return Err(e.into());
}
}
let Self(o, _s) = self;
Ok(TcpTryConnect::Connected(TcpStream(o)))
}
pub fn connect_blocking(self) -> Result<TcpStream> {
sock_nonblock_op_poll_if_not_ready(
self.0 .0,
Errno::EINPROGRESS,
PollEvents::POLLOUT,
None,
|sock| rusl::network::connect_inet(sock, &self.1),
)?;
let Self(o, _addr) = self;
Ok(TcpStream(o))
}
}
impl TcpStream {
pub fn connect(addr: &SocketAddress) -> Result<Self> {
Self::do_connect(addr, None)
}
pub fn connect_with_timeout(addr: &SocketAddress, timeout: Duration) -> Result<Self> {
Self::do_connect(addr, Some(timeout))
}
pub fn try_connect(addr: &SocketAddress) -> Result<TcpTryConnect> {
let fd = rusl::network::socket(
AddressFamily::AF_INET,
SocketOptions::new(
SocketType::SOCK_STREAM,
SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
),
6,
)?;
let addr = match addr.ip {
Ip::V4(bytes) => SocketAddressInet::new(bytes, addr.port),
};
match rusl::network::connect_inet(fd, &addr) {
Ok(()) => {}
Err(e) if matches!(e.code, Some(Errno::EINPROGRESS)) => {
return Ok(TcpTryConnect::InProgress(TcpStreamInProgress(
OwnedFd(fd),
addr,
)));
}
Err(e) => {
let _ = rusl::unistd::close(fd);
return Err(e.into());
}
}
Ok(TcpTryConnect::Connected(Self(OwnedFd(fd))))
}
#[expect(clippy::trivially_copy_pass_by_ref)]
fn do_connect(addr: &SocketAddress, timeout: Option<Duration>) -> Result<Self> {
let fd = rusl::network::socket(
AddressFamily::AF_INET,
SocketOptions::new(
SocketType::SOCK_STREAM,
SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
),
6,
)?;
let addr = match addr.ip {
Ip::V4(bytes) => SocketAddressInet::new(bytes, addr.port),
};
if let Err(e) = sock_nonblock_op_poll_if_not_ready(
fd,
Errno::EINPROGRESS,
PollEvents::POLLOUT,
timeout,
|sock| rusl::network::connect_inet(sock, &addr),
) {
let _ = rusl::unistd::close(fd);
return Err(e);
}
Ok(Self(OwnedFd(fd)))
}
#[inline]
pub fn read_with_timeout(&mut self, buf: &mut [u8], timeout: Duration) -> Result<usize> {
blocking_read_nonblock_sock(self.0 .0, buf, Some(timeout))
}
}
impl AsRawFd for TcpStream {
#[inline]
fn as_raw_fd(&self) -> RawFd {
self.0 .0
}
}
impl Read for TcpStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
blocking_read_nonblock_sock(self.0 .0, buf, None)
}
}
impl Write for TcpStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize> {
blocking_write_nonblock_sock(self.0 .0, buf, None)
}
#[inline]
fn flush(&mut self) -> Result<()> {
Ok(())
}
}
#[derive(Debug)]
pub struct TcpListener(OwnedFd);
impl TcpListener {
pub fn bind(addr: &SocketAddress) -> Result<Self> {
let fd = rusl::network::socket(
AddressFamily::AF_INET,
SocketOptions::new(
SocketType::SOCK_STREAM,
SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
),
6,
)?;
let addr = match addr.ip {
Ip::V4(bytes) => SocketAddressInet::new(bytes, addr.port),
};
if let Err(e) = rusl::network::bind_inet(fd, &addr) {
let _ = rusl::unistd::close(fd);
return Err(e.into());
}
rusl::network::listen(fd, NonNegativeI32::MAX)?;
Ok(Self(OwnedFd(fd)))
}
pub fn local_addr(&self) -> Result<SocketAddress> {
let name = get_inet_sock_name(self.0 .0)?;
let (ip, port) = name.ipv4_addr();
Ok(SocketAddress::new(Ip::V4(ip), port))
}
pub fn accept(&mut self) -> Result<TcpStream> {
self.do_accept(None)
}
pub fn accept_with_timeout(&mut self, timeout: Duration) -> Result<TcpStream> {
self.do_accept(Some(timeout))
}
pub fn try_accept(&mut self) -> Result<Option<TcpStream>> {
let fd = match rusl::network::accept_inet(
self.0 .0,
SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
) {
Ok((fd, _addr)) => fd,
Err(e) if e.code == Some(Errno::EAGAIN) => return Ok(None),
Err(e) => return Err(e.into()),
};
Ok(Some(TcpStream(OwnedFd(fd))))
}
fn do_accept(&self, timeout: Option<Duration>) -> Result<TcpStream> {
let (fd, _addr) = sock_nonblock_op_poll_if_not_ready(
self.0 .0,
Errno::EAGAIN,
PollEvents::POLLIN,
timeout,
|sock| {
rusl::network::accept_inet(
sock,
SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
)
},
)?;
Ok(TcpStream(OwnedFd(fd)))
}
}