use futures::io::{AsyncRead, AsyncWrite};
use futures::stream::{self, Stream};
use futures::{future, pin_mut, ready};
use std::future::Future;
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[cfg(unix)]
use std::{
os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd},
os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
path::Path,
};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket};
use rustix::io as rio;
use rustix::net as rn;
use rustix::net::addr::SocketAddrArg;
use crate::runtime::RUNTIME_CAT;
use super::scheduler;
use super::{Reactor, Readable, ReadableOwned, Registration, Source, Writable, WritableOwned};
#[derive(Debug)]
pub struct Async<T: Send + 'static> {
pub(super) source: Arc<Source>,
pub(super) io: Option<T>,
pub(super) throttling_sched_hdl: Option<scheduler::ThrottlingHandleWeak>,
}
impl<T: Send + 'static> Unpin for Async<T> {}
#[cfg(unix)]
impl<T: AsFd + Send + 'static> Async<T> {
pub fn new(io: T) -> io::Result<Async<T>> {
set_nonblocking(io.as_fd())?;
Self::new_nonblocking(io)
}
pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
let registration = unsafe { Registration::new(io.as_fd()) };
let source = Reactor::with_mut(|reactor| reactor.insert_io(registration))?;
Ok(Async {
source,
io: Some(io),
throttling_sched_hdl: scheduler::Throttling::current()
.as_ref()
.map(scheduler::ThrottlingHandle::downgrade),
})
}
}
#[cfg(unix)]
impl<T: AsRawFd + Send + 'static> AsRawFd for Async<T> {
fn as_raw_fd(&self) -> RawFd {
self.get_ref().as_raw_fd()
}
}
#[cfg(unix)]
impl<T: AsFd + Send + 'static> AsFd for Async<T> {
fn as_fd(&self) -> BorrowedFd<'_> {
self.get_ref().as_fd()
}
}
#[cfg(unix)]
impl<T: AsFd + From<OwnedFd> + Send + 'static> TryFrom<OwnedFd> for Async<T> {
type Error = io::Error;
fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
Async::new(value.into())
}
}
#[cfg(unix)]
impl<T: Into<OwnedFd> + Send + 'static> TryFrom<Async<T>> for OwnedFd {
type Error = io::Error;
fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
value.into_inner().map(Into::into)
}
}
#[cfg(windows)]
impl<T: AsSocket + Send + 'static> Async<T> {
pub fn new(io: T) -> io::Result<Async<T>> {
set_nonblocking(io.as_socket())?;
Self::new_nonblocking(io)
}
pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
let registration = unsafe { Registration::new(io.as_socket()) };
let source = Reactor::with_mut(|reactor| reactor.insert_io(registration))?;
Ok(Async {
source,
io: Some(io),
throttling_sched_hdl: scheduler::Throttling::current()
.as_ref()
.map(scheduler::ThrottlingHandle::downgrade),
})
}
}
#[cfg(windows)]
impl<T: AsRawSocket + Send + 'static> AsRawSocket for Async<T> {
fn as_raw_socket(&self) -> RawSocket {
self.get_ref().as_raw_socket()
}
}
#[cfg(windows)]
impl<T: AsSocket + Send + 'static> AsSocket for Async<T> {
fn as_socket(&self) -> BorrowedSocket<'_> {
self.get_ref().as_socket()
}
}
#[cfg(windows)]
impl<T: AsSocket + From<OwnedSocket> + Send + 'static> TryFrom<OwnedSocket> for Async<T> {
type Error = io::Error;
fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
Async::new(value.into())
}
}
#[cfg(windows)]
impl<T: Into<OwnedSocket> + Send + 'static> TryFrom<Async<T>> for OwnedSocket {
type Error = io::Error;
fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
value.into_inner().map(Into::into)
}
}
impl<T: Send + 'static> Async<T> {
pub fn get_ref(&self) -> &T {
self.io.as_ref().unwrap()
}
pub unsafe fn get_mut(&mut self) -> &mut T {
self.io.as_mut().unwrap()
}
pub fn into_inner(mut self) -> io::Result<T> {
let io = self.io.take().unwrap();
Reactor::with_mut(|reactor| reactor.remove_io(&self.source))?;
Ok(io)
}
pub fn readable(&self) -> Readable<'_, T> {
Source::readable(self)
}
pub fn readable_owned(self: Arc<Self>) -> ReadableOwned<T> {
Source::readable_owned(self)
}
pub fn writable(&self) -> Writable<'_, T> {
Source::writable(self)
}
pub fn writable_owned(self: Arc<Self>) -> WritableOwned<T> {
Source::writable_owned(self)
}
pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.source.poll_readable(cx)
}
pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.source.poll_writable(cx)
}
pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
loop {
match op(self.get_ref()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
optimistic(self.readable()).await?;
}
}
pub async unsafe fn read_with_mut<R>(
&mut self,
op: impl FnMut(&mut T) -> io::Result<R>,
) -> io::Result<R> {
unsafe {
let mut op = op;
loop {
match op(self.get_mut()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
optimistic(self.readable()).await?;
}
}
}
pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
loop {
match op(self.get_ref()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
optimistic(self.writable()).await?;
}
}
pub async unsafe fn write_with_mut<R>(
&mut self,
op: impl FnMut(&mut T) -> io::Result<R>,
) -> io::Result<R> {
unsafe {
let mut op = op;
loop {
match op(self.get_mut()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
optimistic(self.writable()).await?;
}
}
}
}
impl<T: Send + 'static> AsRef<T> for Async<T> {
fn as_ref(&self) -> &T {
self.get_ref()
}
}
impl<T: Send + 'static> Drop for Async<T> {
fn drop(&mut self) {
match self.throttling_sched_hdl.take() {
Some(throttling_sched_hdl) => {
if let Some(sched) = throttling_sched_hdl.upgrade()
&& let Err(err) = sched.remove_io(&self.source)
{
gst::error!(
RUNTIME_CAT,
"Failed to remove fd {:?}: {err}",
self.source.registration,
);
}
}
_ => {
Reactor::with_mut(|reactor| {
if let Err(err) = reactor.remove_io(&self.source) {
gst::error!(
RUNTIME_CAT,
"Failed to remove fd {:?}: {err}",
self.source.registration,
);
}
});
}
}
}
}
pub unsafe trait IoSafe {}
unsafe impl<T: ?Sized> IoSafe for &T {}
unsafe impl IoSafe for std::fs::File {}
unsafe impl IoSafe for std::io::Stderr {}
unsafe impl IoSafe for std::io::Stdin {}
unsafe impl IoSafe for std::io::Stdout {}
unsafe impl IoSafe for std::io::StderrLock<'_> {}
unsafe impl IoSafe for std::io::StdinLock<'_> {}
unsafe impl IoSafe for std::io::StdoutLock<'_> {}
unsafe impl IoSafe for std::net::TcpStream {}
#[cfg(unix)]
unsafe impl IoSafe for std::os::unix::net::UnixStream {}
unsafe impl<T: IoSafe + Read> IoSafe for std::io::BufReader<T> {}
unsafe impl<T: IoSafe + Write> IoSafe for std::io::BufWriter<T> {}
unsafe impl<T: IoSafe + Write> IoSafe for std::io::LineWriter<T> {}
unsafe impl<T: IoSafe + ?Sized> IoSafe for &mut T {}
unsafe impl<T: IoSafe + ?Sized> IoSafe for Box<T> {}
unsafe impl<T: Clone + IoSafe> IoSafe for std::borrow::Cow<'_, T> {}
impl<T: IoSafe + Read + Send + 'static> AsyncRead for Async<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
loop {
match unsafe { (*self).get_mut() }.read(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_readable(cx))?;
}
}
fn poll_read_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
loop {
match unsafe { (*self).get_mut() }.read_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_readable(cx))?;
}
}
}
impl<T: Send + 'static> AsyncRead for &Async<T>
where
for<'a> &'a T: Read,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
loop {
match (*self).get_ref().read(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_readable(cx))?;
}
}
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
loop {
match (*self).get_ref().read_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_readable(cx))?;
}
}
}
impl<T: IoSafe + Write + Send + 'static> AsyncWrite for Async<T> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
loop {
match unsafe { (*self).get_mut() }.write(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
}
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
loop {
match unsafe { (*self).get_mut() }.write_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
match unsafe { (*self).get_mut() }.flush() {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}
}
impl<T: Send + 'static> AsyncWrite for &Async<T>
where
for<'a> &'a T: Write,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
loop {
match (*self).get_ref().write(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
loop {
match (*self).get_ref().write_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
match (*self).get_ref().flush() {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}
}
impl Async<TcpListener> {
pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>> {
let addr = addr.into();
Async::new(TcpListener::bind(addr)?)
}
pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
let (stream, addr) = self.read_with(|io| io.accept()).await?;
Ok((Async::new(stream)?, addr))
}
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_ {
stream::unfold(self, |listener| async move {
let res = listener.accept().await.map(|(stream, _)| stream);
Some((res, listener))
})
}
}
impl TryFrom<std::net::TcpListener> for Async<std::net::TcpListener> {
type Error = io::Error;
fn try_from(listener: std::net::TcpListener) -> io::Result<Self> {
Async::new(listener)
}
}
impl Async<TcpStream> {
pub async fn connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpStream>> {
let addr = addr.into();
let (domain, sock_addr) = match addr {
SocketAddr::V4(v4) => (rn::AddressFamily::INET, v4.as_any()),
SocketAddr::V6(v6) => (rn::AddressFamily::INET6, v6.as_any()),
};
let socket = connect(sock_addr, domain, Some(rn::ipproto::TCP))?;
let stream = Async::new_nonblocking(TcpStream::from(socket))?;
stream.writable().await?;
match stream.get_ref().take_error()? {
None => Ok(stream),
Some(err) => Err(err),
}
}
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.read_with(|io| io.peek(buf)).await
}
}
impl TryFrom<std::net::TcpStream> for Async<std::net::TcpStream> {
type Error = io::Error;
fn try_from(stream: std::net::TcpStream) -> io::Result<Self> {
Async::new(stream)
}
}
impl Async<UdpSocket> {
pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>> {
let addr = addr.into();
Async::new(UdpSocket::bind(addr)?)
}
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.read_with(|io| io.recv_from(buf)).await
}
pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.read_with(|io| io.peek_from(buf)).await
}
pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
let addr = addr.into();
self.write_with(|io| io.send_to(buf, addr)).await
}
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.read_with(|io| io.recv(buf)).await
}
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.read_with(|io| io.peek(buf)).await
}
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.write_with(|io| io.send(buf)).await
}
}
impl TryFrom<std::net::UdpSocket> for Async<std::net::UdpSocket> {
type Error = io::Error;
fn try_from(socket: std::net::UdpSocket) -> io::Result<Self> {
Async::new(socket)
}
}
impl TryFrom<socket2::Socket> for Async<std::net::UdpSocket> {
type Error = io::Error;
fn try_from(socket: socket2::Socket) -> io::Result<Self> {
Async::new(std::net::UdpSocket::from(socket))
}
}
#[cfg(unix)]
impl Async<UnixListener> {
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
let path = path.as_ref().to_owned();
Async::new(UnixListener::bind(path)?)
}
pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
let (stream, addr) = self.read_with(|io| io.accept()).await?;
Ok((Async::new(stream)?, addr))
}
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_ {
stream::unfold(self, |listener| async move {
let res = listener.accept().await.map(|(stream, _)| stream);
Some((res, listener))
})
}
}
#[cfg(unix)]
impl TryFrom<std::os::unix::net::UnixListener> for Async<std::os::unix::net::UnixListener> {
type Error = io::Error;
fn try_from(listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
Async::new(listener)
}
}
#[cfg(unix)]
impl Async<UnixStream> {
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
let address = convert_path_to_socket_address(path.as_ref())?;
let socket = connect(address.into(), rn::AddressFamily::UNIX, None)?;
let stream = Async::new_nonblocking(UnixStream::from(socket))?;
stream.writable().await?;
stream.get_ref().peer_addr()?;
Ok(stream)
}
pub fn pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)> {
let (stream1, stream2) = UnixStream::pair()?;
Ok((Async::new(stream1)?, Async::new(stream2)?))
}
}
#[cfg(unix)]
impl TryFrom<std::os::unix::net::UnixStream> for Async<std::os::unix::net::UnixStream> {
type Error = io::Error;
fn try_from(stream: std::os::unix::net::UnixStream) -> io::Result<Self> {
Async::new(stream)
}
}
#[cfg(unix)]
impl Async<UnixDatagram> {
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
let path = path.as_ref().to_owned();
Async::new(UnixDatagram::bind(path)?)
}
pub fn unbound() -> io::Result<Async<UnixDatagram>> {
Async::new(UnixDatagram::unbound()?)
}
pub fn pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)> {
let (socket1, socket2) = UnixDatagram::pair()?;
Ok((Async::new(socket1)?, Async::new(socket2)?))
}
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> {
self.read_with(|io| io.recv_from(buf)).await
}
pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
self.write_with(|io| io.send_to(buf, &path)).await
}
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.read_with(|io| io.recv(buf)).await
}
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.write_with(|io| io.send(buf)).await
}
}
#[cfg(unix)]
impl TryFrom<std::os::unix::net::UnixDatagram> for Async<std::os::unix::net::UnixDatagram> {
type Error = io::Error;
fn try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<Self> {
Async::new(socket)
}
}
async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
let mut polled = false;
pin_mut!(fut);
future::poll_fn(|cx| {
if !polled {
polled = true;
fut.as_mut().poll(cx)
} else {
Poll::Ready(Ok(()))
}
})
.await
}
fn connect(
addr: rn::SocketAddrAny,
domain: rn::AddressFamily,
protocol: Option<rn::Protocol>,
) -> io::Result<rustix::fd::OwnedFd> {
#[cfg(windows)]
use rustix::fd::AsFd;
setup_networking();
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd"
))]
let socket = rn::socket_with(
domain,
rn::SocketType::STREAM,
rn::SocketFlags::CLOEXEC | rn::SocketFlags::NONBLOCK,
protocol,
)?;
#[cfg(not(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd"
)))]
let socket = {
#[cfg(not(any(
target_os = "aix",
target_vendor = "apple",
target_os = "espidf",
windows,
)))]
let flags = rn::SocketFlags::CLOEXEC;
#[cfg(any(
target_os = "aix",
target_vendor = "apple",
target_os = "espidf",
windows,
))]
let flags = rn::SocketFlags::empty();
let socket = rn::socket_with(domain, rn::SocketType::STREAM, flags, protocol)?;
#[cfg(any(target_os = "aix", target_vendor = "apple"))]
rio::fcntl_setfd(&socket, rio::fcntl_getfd(&socket)? | rio::FdFlags::CLOEXEC)?;
set_nonblocking(socket.as_fd())?;
socket
};
#[cfg(any(
target_vendor = "apple",
target_os = "freebsd",
target_os = "netbsd",
target_os = "dragonfly",
))]
rn::sockopt::set_socket_nosigpipe(&socket, true)?;
#[cfg(windows)]
unsafe {
if windows_sys::Win32::Foundation::SetHandleInformation(
socket.as_raw_socket() as _,
windows_sys::Win32::Foundation::HANDLE_FLAG_INHERIT,
windows_sys::Win32::Foundation::HANDLE_FLAG_INHERIT,
) == 0
{
return Err(io::Error::last_os_error());
}
}
#[allow(unreachable_patterns)]
match rn::connect(&socket, &addr) {
Ok(_) => {}
#[cfg(unix)]
Err(rio::Errno::INPROGRESS) => {}
Err(rio::Errno::AGAIN) | Err(rio::Errno::WOULDBLOCK) => {}
Err(err) => return Err(err.into()),
}
Ok(socket)
}
#[inline]
fn setup_networking() {
#[cfg(windows)]
{
static INIT: std::sync::Once = std::sync::Once::new();
INIT.call_once(|| {
let _ = rustix::net::wsa_startup();
});
}
}
#[inline]
fn set_nonblocking(
#[cfg(unix)] fd: BorrowedFd<'_>,
#[cfg(windows)] fd: BorrowedSocket<'_>,
) -> io::Result<()> {
cfg_if::cfg_if! {
if #[cfg(any(windows, target_os = "linux"))] {
rustix::io::ioctl_fionbio(fd, true)?;
} else {
let previous = rustix::fs::fcntl_getfl(fd)?;
let new = previous | rustix::fs::OFlags::NONBLOCK;
if new != previous {
rustix::fs::fcntl_setfl(fd, new)?;
}
}
}
Ok(())
}
#[cfg(unix)]
#[inline]
fn convert_path_to_socket_address(path: &Path) -> io::Result<rn::SocketAddrUnix> {
#[cfg(any(target_os = "linux", target_os = "android"))]
let address = {
use std::os::unix::ffi::OsStrExt;
let path = path.as_os_str();
match path.as_bytes().first() {
Some(0) => rn::SocketAddrUnix::new_abstract_name(path.as_bytes().get(1..).unwrap())?,
_ => rn::SocketAddrUnix::new(path)?,
}
};
#[cfg(not(any(target_os = "linux", target_os = "android")))]
let address = rn::SocketAddrUnix::new(path)?;
Ok(address)
}