use std::fmt;
use std::io::{self, Read as _, Write as _};
use std::net::Shutdown;
#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[doc(no_inline)]
pub use std::os::unix::net::SocketAddr;
use async_io::Async;
use futures_lite::{prelude::*, ready};
#[derive(Clone, Debug)]
pub struct UnixListener {
inner: Arc<Async<std::os::unix::net::UnixListener>>,
}
impl UnixListener {
fn new(inner: Arc<Async<std::os::unix::net::UnixListener>>) -> UnixListener {
UnixListener { inner }
}
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
let listener = Async::<std::os::unix::net::UnixListener>::bind(path)?;
Ok(UnixListener::new(Arc::new(listener)))
}
pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
let (stream, addr) = self.inner.accept().await?;
Ok((UnixStream::new(Arc::new(stream)), addr))
}
pub fn incoming(&self) -> Incoming<'_> {
Incoming {
incoming: Box::pin(self.inner.incoming()),
}
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().local_addr()
}
}
impl From<Async<std::os::unix::net::UnixListener>> for UnixListener {
fn from(listener: Async<std::os::unix::net::UnixListener>) -> UnixListener {
UnixListener::new(Arc::new(listener))
}
}
impl TryFrom<std::os::unix::net::UnixListener> for UnixListener {
type Error = io::Error;
fn try_from(listener: std::os::unix::net::UnixListener) -> io::Result<UnixListener> {
Ok(UnixListener::new(Arc::new(Async::new(listener)?)))
}
}
impl From<UnixListener> for Arc<Async<std::os::unix::net::UnixListener>> {
fn from(val: UnixListener) -> Self {
val.inner
}
}
#[cfg(unix)]
impl AsRawFd for UnixListener {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
#[cfg(unix)]
impl AsFd for UnixListener {
fn as_fd(&self) -> BorrowedFd<'_> {
self.inner.get_ref().as_fd()
}
}
#[cfg(unix)]
impl TryFrom<OwnedFd> for UnixListener {
type Error = io::Error;
fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
Self::try_from(std::os::unix::net::UnixListener::from(value))
}
}
#[cfg(windows)]
impl AsRawSocket for UnixListener {
fn as_raw_socket(&self) -> RawSocket {
self.inner.as_raw_socket()
}
}
pub struct Incoming<'a> {
incoming: Pin<
Box<
dyn Stream<Item = io::Result<Async<std::os::unix::net::UnixStream>>> + Send + Sync + 'a,
>,
>,
}
impl Stream for Incoming<'_> {
type Item = io::Result<UnixStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
Poll::Ready(res.map(|res| res.map(|stream| UnixStream::new(Arc::new(stream)))))
}
}
impl fmt::Debug for Incoming<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Incoming {{ ... }}")
}
}
pub struct UnixStream {
inner: Arc<Async<std::os::unix::net::UnixStream>>,
readable: Option<async_io::ReadableOwned<std::os::unix::net::UnixStream>>,
writable: Option<async_io::WritableOwned<std::os::unix::net::UnixStream>>,
}
impl UnwindSafe for UnixStream {}
impl RefUnwindSafe for UnixStream {}
impl UnixStream {
fn new(inner: Arc<Async<std::os::unix::net::UnixStream>>) -> UnixStream {
UnixStream {
inner,
readable: None,
writable: None,
}
}
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
let stream = Async::<std::os::unix::net::UnixStream>::connect(path).await?;
Ok(UnixStream::new(Arc::new(stream)))
}
pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
let (a, b) = Async::<std::os::unix::net::UnixStream>::pair()?;
Ok((UnixStream::new(Arc::new(a)), UnixStream::new(Arc::new(b))))
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().local_addr()
}
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().peer_addr()
}
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.inner.get_ref().shutdown(how)
}
}
impl fmt::Debug for UnixStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
impl Clone for UnixStream {
fn clone(&self) -> UnixStream {
UnixStream::new(self.inner.clone())
}
}
impl From<Async<std::os::unix::net::UnixStream>> for UnixStream {
fn from(stream: Async<std::os::unix::net::UnixStream>) -> UnixStream {
UnixStream::new(Arc::new(stream))
}
}
impl TryFrom<std::os::unix::net::UnixStream> for UnixStream {
type Error = io::Error;
fn try_from(stream: std::os::unix::net::UnixStream) -> io::Result<UnixStream> {
Ok(UnixStream::new(Arc::new(Async::new(stream)?)))
}
}
impl From<UnixStream> for Arc<Async<std::os::unix::net::UnixStream>> {
fn from(val: UnixStream) -> Self {
val.inner
}
}
#[cfg(unix)]
impl AsRawFd for UnixStream {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
#[cfg(unix)]
impl AsFd for UnixStream {
fn as_fd(&self) -> BorrowedFd<'_> {
self.inner.get_ref().as_fd()
}
}
#[cfg(unix)]
impl TryFrom<OwnedFd> for UnixStream {
type Error = io::Error;
fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
Self::try_from(std::os::unix::net::UnixStream::from(value))
}
}
#[cfg(windows)]
impl AsRawSocket for UnixStream {
fn as_raw_socket(&self) -> RawSocket {
self.inner.as_raw_socket()
}
}
impl AsyncRead for UnixStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
loop {
match self.inner.get_ref().read(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => {
self.readable = None;
return Poll::Ready(res);
}
}
if self.readable.is_none() {
self.readable = Some(self.inner.clone().readable_owned());
}
if let Some(f) = &mut self.readable {
let res = ready!(Pin::new(f).poll(cx));
self.readable = None;
res?;
}
}
}
}
impl AsyncWrite for UnixStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
loop {
match self.inner.get_ref().write(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => {
self.writable = None;
return Poll::Ready(res);
}
}
if self.writable.is_none() {
self.writable = Some(self.inner.clone().writable_owned());
}
if let Some(f) = &mut self.writable {
let res = ready!(Pin::new(f).poll(cx));
self.writable = None;
res?;
}
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
match self.inner.get_ref().flush() {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => {
self.writable = None;
return Poll::Ready(res);
}
}
if self.writable.is_none() {
self.writable = Some(self.inner.clone().writable_owned());
}
if let Some(f) = &mut self.writable {
let res = ready!(Pin::new(f).poll(cx));
self.writable = None;
res?;
}
}
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(self.inner.get_ref().shutdown(Shutdown::Write))
}
}
#[derive(Clone, Debug)]
pub struct UnixDatagram {
inner: Arc<Async<std::os::unix::net::UnixDatagram>>,
}
impl UnixDatagram {
fn new(inner: Arc<Async<std::os::unix::net::UnixDatagram>>) -> UnixDatagram {
UnixDatagram { inner }
}
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
let socket = Async::<std::os::unix::net::UnixDatagram>::bind(path)?;
Ok(UnixDatagram::new(Arc::new(socket)))
}
pub fn unbound() -> io::Result<UnixDatagram> {
let socket = Async::<std::os::unix::net::UnixDatagram>::unbound()?;
Ok(UnixDatagram::new(Arc::new(socket)))
}
pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
let (a, b) = Async::<std::os::unix::net::UnixDatagram>::pair()?;
Ok((
UnixDatagram::new(Arc::new(a)),
UnixDatagram::new(Arc::new(b)),
))
}
pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
let p = path.as_ref();
self.inner.get_ref().connect(p)
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().local_addr()
}
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().peer_addr()
}
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.inner.recv_from(buf).await
}
pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
self.inner.send_to(buf, path.as_ref()).await
}
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.recv(buf).await
}
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.inner.send(buf).await
}
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.inner.get_ref().shutdown(how)
}
}
impl From<Async<std::os::unix::net::UnixDatagram>> for UnixDatagram {
fn from(socket: Async<std::os::unix::net::UnixDatagram>) -> UnixDatagram {
UnixDatagram::new(Arc::new(socket))
}
}
impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
type Error = io::Error;
fn try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<UnixDatagram> {
Ok(UnixDatagram::new(Arc::new(Async::new(socket)?)))
}
}
impl From<UnixDatagram> for Arc<Async<std::os::unix::net::UnixDatagram>> {
fn from(val: UnixDatagram) -> Self {
val.inner
}
}
#[cfg(unix)]
impl AsRawFd for UnixDatagram {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
#[cfg(windows)]
impl AsRawSocket for UnixDatagram {
fn as_raw_socket(&self) -> RawSocket {
self.inner.as_raw_socket()
}
}