use std::{
future::Future,
io,
os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
path::Path,
};
use super::{socket_addr::SocketAddr, UnixStream};
use crate::{
driver::{op::Op, shared_fd::SharedFd},
io::stream::Stream,
net::ListenerConfig,
};
pub struct UnixListener {
fd: SharedFd,
sys_listener: Option<std::os::unix::net::UnixListener>,
}
impl UnixListener {
pub(crate) fn from_shared_fd(fd: SharedFd) -> Self {
let sys_listener = unsafe { std::os::unix::net::UnixListener::from_raw_fd(fd.raw_fd()) };
Self {
fd,
sys_listener: Some(sys_listener),
}
}
pub fn bind_with_config<P: AsRef<Path>>(
path: P,
config: &ListenerConfig,
) -> io::Result<UnixListener> {
let sys_listener =
socket2::Socket::new(socket2::Domain::UNIX, socket2::Type::STREAM, None)?;
let addr = socket2::SockAddr::unix(path)?;
if config.reuse_port {
sys_listener.set_reuse_port(true)?;
}
if config.reuse_addr {
sys_listener.set_reuse_address(true)?;
}
if let Some(send_buf_size) = config.send_buf_size {
sys_listener.set_send_buffer_size(send_buf_size)?;
}
if let Some(recv_buf_size) = config.recv_buf_size {
sys_listener.set_recv_buffer_size(recv_buf_size)?;
}
sys_listener.bind(&addr)?;
sys_listener.listen(config.backlog)?;
let fd = SharedFd::new(sys_listener.into_raw_fd())?;
Ok(Self::from_shared_fd(fd))
}
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
Self::bind_with_config(path, &ListenerConfig::default())
}
pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
let op = Op::accept(&self.fd)?;
let completion = op.await;
let fd = completion.meta.result?;
let stream = UnixStream::from_shared_fd(SharedFd::new(fd as _)?);
let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) };
let storage: *mut libc::sockaddr_storage = &mut storage as *mut _;
let raw_addr_un: libc::sockaddr_un = unsafe { *storage.cast() };
let raw_addr_len = completion.data.addr.1;
let addr = SocketAddr::from_parts(raw_addr_un, raw_addr_len);
Ok((stream, addr))
}
}
impl Stream for UnixListener {
type Item = io::Result<(UnixStream, SocketAddr)>;
type NextFuture<'a> = impl Future<Output = Option<Self::Item>> + 'a;
fn next(&mut self) -> Self::NextFuture<'_> {
async move { Some(self.accept().await) }
}
}
impl std::fmt::Debug for UnixListener {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UnixListener")
.field("fd", &self.fd)
.finish()
}
}
impl AsRawFd for UnixListener {
fn as_raw_fd(&self) -> RawFd {
self.fd.raw_fd()
}
}
impl Drop for UnixListener {
fn drop(&mut self) {
self.sys_listener.take().unwrap().into_raw_fd();
}
}