use {
super::{dispatch_name, CONN_TIMEOUT_MSG},
crate::{
error::ReuniteError,
local_socket::{
prelude::*,
traits::{self, ReuniteResult},
ConnectOptions, PeerCreds,
},
os::unix::{
c_wrappers, local_socket::peer_creds::PeerCreds as PeerCredsInner, unixprelude::*,
},
ConnectWaitMode, Sealed, TryClone,
},
std::{
io::{self, prelude::*, IoSlice, IoSliceMut},
os::unix::net::UnixStream,
sync::Arc,
time::Duration,
},
};
#[derive(Debug)]
pub struct Stream(pub(super) UnixStream);
impl Sealed for Stream {}
impl traits::Stream for Stream {
type RecvHalf = RecvHalf;
type SendHalf = SendHalf;
fn from_options(mut opts: &ConnectOptions<'_>) -> io::Result<Self> {
let nonblocking_connect = matches!(
opts.get_wait_mode(),
ConnectWaitMode::Timeout(..) | ConnectWaitMode::Deferred
);
let (stream, inprog) = dispatch_name(
&mut opts,
false,
|&mut opts| opts.name.borrow(),
|_| None,
|addr, _| c_wrappers::create_client(addr, nonblocking_connect),
)?;
if let ConnectWaitMode::Timeout(timeout) = opts.get_wait_mode() {
if inprog {
c_wrappers::wait_for_connect(stream.as_fd(), Some(timeout), CONN_TIMEOUT_MSG)?;
}
}
if opts.get_nonblocking_stream() != nonblocking_connect {
c_wrappers::fast_set_nonblocking(stream.as_fd(), opts.get_nonblocking_stream())?;
}
Ok(stream.into())
}
#[inline]
fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
c_wrappers::set_nonblocking(self.as_fd(), nonblocking)
}
#[inline]
fn set_recv_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.0.set_read_timeout(timeout)
}
#[inline]
fn set_send_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.0.set_write_timeout(timeout)
}
#[inline]
fn split(self) -> (RecvHalf, SendHalf) {
let arc = Arc::new(self);
(RecvHalf(Arc::clone(&arc)), SendHalf(arc))
}
#[inline]
#[allow(clippy::unwrap_in_result)]
fn reunite(rh: RecvHalf, sh: SendHalf) -> ReuniteResult<Self> {
if !Arc::ptr_eq(&rh.0, &sh.0) {
return Err(ReuniteError { rh, sh });
}
drop(rh);
let inner = Arc::into_inner(sh.0).expect("stream half inexplicably copied");
Ok(inner)
}
}
impl traits::StreamCommon for Stream {
#[inline]
fn take_error(&self) -> io::Result<Option<io::Error>> { c_wrappers::take_error(self.as_fd()) }
#[inline]
fn peer_creds(&self) -> io::Result<PeerCreds> {
PeerCredsInner::for_socket(self.as_fd()).map(From::from)
}
}
impl Read for &Stream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { (&mut &self.0).read(buf) }
#[inline]
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
(&mut &self.0).read_vectored(bufs)
}
}
impl Write for &Stream {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { (&mut &self.0).write(buf) }
#[inline]
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
(&mut &self.0).write_vectored(bufs)
}
#[inline]
fn flush(&mut self) -> io::Result<()> { Ok(()) }
}
impl Stream {
#[inline(always)]
pub fn inner(&self) -> &UnixStream { &self.0 }
#[inline(always)]
pub fn inner_mut(&mut self) -> &mut UnixStream { &mut self.0 }
}
impl From<UnixStream> for Stream {
#[inline]
fn from(s: UnixStream) -> Self { Self(s) }
}
impl From<OwnedFd> for Stream {
#[inline]
fn from(fd: OwnedFd) -> Self { UnixStream::from(fd).into() }
}
impl TryClone for Stream {
#[inline]
fn try_clone(&self) -> std::io::Result<Self> { self.0.try_clone().map(Self::from) }
}
multimacro! {
Stream,
forward_asinto_handle(unix),
derive_sync_mut_rw,
}
macro_rules! arc_accessors {
($ty:ty) => {
impl $ty {
#[inline]
pub fn as_stream(&self) -> &Stream { &self.0 }
#[inline]
pub fn into_arc(self) -> Arc<Stream> { self.0 }
#[inline]
pub fn as_arc(&self) -> &Arc<Stream> { &self.0 }
}
};
}
#[derive(Clone, Debug)]
pub struct RecvHalf(pub(super) Arc<Stream>);
impl Sealed for RecvHalf {}
multimacro! {
RecvHalf,
forward_rbv(Stream, *),
arc_accessors,
forward_sync_ref_read,
forward_as_handle,
derive_sync_mut_read,
}
impl traits::RecvHalf for RecvHalf {
type Stream = Stream;
#[inline]
fn set_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.0.set_recv_timeout(timeout)
}
}
#[derive(Clone, Debug)]
pub struct SendHalf(pub(super) Arc<Stream>);
impl Sealed for SendHalf {}
multimacro! {
SendHalf,
forward_rbv(Stream, *),
arc_accessors,
forward_sync_ref_write,
forward_as_handle,
derive_sync_mut_write,
}
impl traits::SendHalf for SendHalf {
type Stream = Stream;
#[inline]
fn set_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.0.set_send_timeout(timeout)
}
}