interprocess 2.2.2

Interprocess communication toolkit
Documentation
use super::super::name_to_addr;
use crate::{
	error::ReuniteError,
	local_socket::{traits::tokio as traits, Name},
	Sealed,
};
use std::{
	io::{self, ErrorKind::WouldBlock},
	os::{
		fd::{AsFd, OwnedFd},
		unix::{
			net::{SocketAddr, UnixStream as SyncUnixStream},
			prelude::BorrowedFd,
		},
	},
	pin::Pin,
	task::{ready, Context, Poll},
};
use tokio::{
	io::{AsyncRead, AsyncWrite, ReadBuf},
	net::{
		unix::{OwnedReadHalf as RecvHalfImpl, OwnedWriteHalf as SendHalfImpl},
		UnixStream,
	},
};

#[derive(Debug)]
pub struct Stream(pub(super) UnixStream);
impl Sealed for Stream {}

impl Stream {
	#[allow(clippy::unwrap_used)]
	async fn _connect(addr: SocketAddr) -> io::Result<UnixStream> {
		#[cfg(any(target_os = "linux", target_os = "android"))]
		{
			#[cfg(target_os = "android")]
			use std::os::android::net::SocketAddrExt;
			#[cfg(target_os = "linux")]
			use std::os::linux::net::SocketAddrExt;
			if addr.as_abstract_name().is_some() {
				return tokio::task::spawn_blocking(move || {
					let stream = SyncUnixStream::connect_addr(&addr)?;
					stream.set_nonblocking(true)?;
					Ok::<_, io::Error>(stream)
				})
				.await??
				.try_into();
			}
		}
		UnixStream::connect(addr.as_pathname().unwrap()).await
	}
}

impl traits::Stream for Stream {
	type RecvHalf = RecvHalf;
	type SendHalf = SendHalf;

	async fn connect(name: Name<'_>) -> io::Result<Self> {
		Self::_connect(name_to_addr(name, false)?)
			.await
			.map(Self::from)
	}
	fn split(self) -> (RecvHalf, SendHalf) {
		let (r, w) = self.0.into_split();
		(RecvHalf(r), SendHalf(w))
	}
	#[inline]
	fn reunite(rh: RecvHalf, sh: SendHalf) -> Result<Self, ReuniteError<RecvHalf, SendHalf>> {
		rh.0.reunite(sh.0)
			.map(Self::from)
			.map_err(|tokio::net::unix::ReuniteError(rh, sh)| ReuniteError {
				rh: RecvHalf(rh),
				sh: SendHalf(sh),
			})
	}
}

fn ioloop(
	mut try_io: impl FnMut() -> io::Result<usize>,
	mut poll_read_ready: impl FnMut() -> Poll<io::Result<()>>,
) -> Poll<io::Result<usize>> {
	loop {
		match try_io() {
			Err(e) if e.kind() == WouldBlock => ready!(poll_read_ready()?),
			els => return Poll::Ready(els),
		};
	}
}

multimacro! {
	Stream,
	pinproj_for_unpin(UnixStream),
	forward_rbv(UnixStream, &),
	forward_tokio_rw,
	forward_as_handle(unix),
	derive_trivial_conv(UnixStream),
}
impl AsyncRead for &Stream {
	#[inline]
	fn poll_read(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		buf: &mut ReadBuf<'_>,
	) -> Poll<io::Result<()>> {
		ioloop(|| self.0.try_read_buf(buf), || self.0.poll_read_ready(cx)).map(|e| e.map(|_| ()))
	}
}
impl AsyncWrite for &Stream {
	#[inline]
	fn poll_write(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		buf: &[u8],
	) -> Poll<io::Result<usize>> {
		ioloop(|| self.0.try_write(buf), || self.0.poll_write_ready(cx))
	}
	#[inline]
	fn poll_write_vectored(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		bufs: &[io::IoSlice<'_>],
	) -> Poll<io::Result<usize>> {
		ioloop(
			|| self.0.try_write_vectored(bufs),
			|| self.0.poll_write_ready(cx),
		)
	}
	#[inline]
	fn is_write_vectored(&self) -> bool {
		self.0.is_write_vectored()
	}
	#[inline]
	fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
		Poll::Ready(Ok(()))
	}
	#[inline]
	fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
		Poll::Ready(Ok(()))
	}
}
impl TryFrom<Stream> for OwnedFd {
	type Error = io::Error;
	#[inline]
	fn try_from(slf: Stream) -> io::Result<Self> {
		Ok(slf.0.into_std()?.into())
	}
}
impl TryFrom<OwnedFd> for Stream {
	type Error = io::Error;
	#[inline]
	fn try_from(fd: OwnedFd) -> io::Result<Self> {
		Ok(UnixStream::from_std(SyncUnixStream::from(fd))?.into())
	}
}

pub struct RecvHalf(RecvHalfImpl);
impl Sealed for RecvHalf {}
impl traits::RecvHalf for RecvHalf {
	type Stream = Stream;
}
multimacro! {
	RecvHalf,
	pinproj_for_unpin(RecvHalfImpl),
	forward_debug("local_socket::RecvHalf"),
	forward_tokio_read,
}
impl AsyncRead for &RecvHalf {
	#[inline]
	fn poll_read(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		buf: &mut ReadBuf<'_>,
	) -> Poll<io::Result<()>> {
		ioloop(
			|| self.0.try_read_buf(buf),
			|| self.0.as_ref().poll_read_ready(cx),
		)
		.map(|e| e.map(|_| ()))
	}
}
impl AsFd for RecvHalf {
	#[inline]
	fn as_fd(&self) -> BorrowedFd<'_> {
		self.0.as_ref().as_fd()
	}
}

pub struct SendHalf(SendHalfImpl);
impl Sealed for SendHalf {}
impl traits::SendHalf for SendHalf {
	type Stream = Stream;
}
multimacro! {
	SendHalf,
	pinproj_for_unpin(SendHalfImpl),
	forward_rbv(SendHalfImpl, &),
	forward_debug("local_socket::SendHalf"),
	forward_tokio_write,
}
impl AsyncWrite for &SendHalf {
	#[inline]
	fn poll_write(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		buf: &[u8],
	) -> Poll<io::Result<usize>> {
		ioloop(
			|| self.0.try_write(buf),
			|| self.0.as_ref().poll_write_ready(cx),
		)
	}
	#[inline]
	fn poll_write_vectored(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		bufs: &[io::IoSlice<'_>],
	) -> Poll<io::Result<usize>> {
		ioloop(
			|| self.0.try_write_vectored(bufs),
			|| self.0.as_ref().poll_write_ready(cx),
		)
	}
	#[inline]
	fn is_write_vectored(&self) -> bool {
		self.0.is_write_vectored()
	}
	#[inline]
	fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
		Poll::Ready(Ok(()))
	}
	#[inline]
	fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
		Poll::Ready(Ok(()))
	}
}
impl AsFd for SendHalf {
	#[inline]
	fn as_fd(&self) -> BorrowedFd<'_> {
		self.0.as_ref().as_fd()
	}
}