use std::fmt;
use std::io;
use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::net::{self, SocketAddr};
use std::path::Path;
#[cfg(feature = "io_timeout")]
use std::time::Duration;
use crate::coroutine_impl::is_coroutine;
use crate::io::split_io::{SplitIo, SplitReader, SplitWriter};
use crate::io::sys::mod_socket;
use crate::io::sys::net as net_impl;
use crate::io::CoIo;
use crate::io::{self as io_impl, AsIoData};
use crate::yield_now::yield_with_io;
pub struct UnixStream(CoIo<net::UnixStream>);
impl fmt::Debug for UnixStream {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let mut builder = fmt.debug_struct("UnixStream");
builder.field("fd", &self.as_raw_fd());
if let Ok(addr) = self.local_addr() {
builder.field("local", &addr);
}
if let Ok(addr) = self.peer_addr() {
builder.field("peer", &addr);
}
builder.finish()
}
}
impl UnixStream {
pub(crate) fn from_coio(io: CoIo<net::UnixStream>) -> Self {
UnixStream(io)
}
pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
if !is_coroutine() {
let stream = net::UnixStream::connect(path)?;
return Ok(UnixStream(CoIo::new(stream)?));
}
let mut c = net_impl::UnixStreamConnect::new(path)?;
if c.check_connected()? {
return c.done();
}
yield_with_io(&c, c.is_coroutine);
c.done()
}
pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
let (i1, i2) = net::UnixStream::pair()?;
let i1 = UnixStream(CoIo::new(i1)?);
let i2 = UnixStream(CoIo::new(i2)?);
Ok((i1, i2))
}
pub fn try_clone(&self) -> io::Result<UnixStream> {
let stream = self.0.inner().try_clone()?;
Ok(UnixStream(CoIo::new(stream)?))
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.0.inner().local_addr()
}
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.0.inner().peer_addr()
}
#[cfg(feature = "io_timeout")]
pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.0.set_read_timeout(timeout)
}
#[cfg(feature = "io_timeout")]
pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.0.inner().set_write_timeout(timeout)?;
self.0.set_write_timeout(timeout)
}
#[cfg(feature = "io_timeout")]
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
self.0.read_timeout()
}
#[cfg(feature = "io_timeout")]
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
self.0.write_timeout()
}
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.0.peek(buf)
}
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.0.inner().take_error()
}
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.0.inner().shutdown(how)
}
#[inline]
pub fn inner(&self) -> &net::UnixStream {
self.0.inner()
}
#[inline]
pub fn inner_mut(&mut self) -> &mut net::UnixStream {
self.0.inner_mut()
}
}
impl io::Read for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
}
impl io::Write for UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
impl AsRawFd for UnixStream {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
impl FromRawFd for UnixStream {
unsafe fn from_raw_fd(fd: RawFd) -> UnixStream {
let stream = FromRawFd::from_raw_fd(fd);
UnixStream(CoIo::new(stream).expect("can't convert to UnixStream"))
}
}
impl IntoRawFd for UnixStream {
fn into_raw_fd(self) -> RawFd {
self.0.into_raw_fd()
}
}
impl io_impl::AsIoData for UnixStream {
fn as_io_data(&self) -> &io_impl::IoData {
self.0.as_io_data()
}
}
impl SplitIo for UnixStream {
fn split(self) -> io::Result<(SplitReader<Self>, SplitWriter<Self>)> {
let writer = self.try_clone()?;
mod_socket(writer.as_io_data(), false)?;
mod_socket(self.as_io_data(), true)?;
Ok((SplitReader::new(self), SplitWriter::new(writer)))
}
}
pub struct UnixListener(pub(crate) CoIo<net::UnixListener>);
impl fmt::Debug for UnixListener {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let mut builder = fmt.debug_struct("UnixListener");
builder.field("fd", &self.as_raw_fd());
if let Ok(addr) = self.local_addr() {
builder.field("local", &addr);
}
builder.finish()
}
}
impl UnixListener {
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
let listener = net::UnixListener::bind(path)?;
Ok(UnixListener(CoIo::new(listener)?))
}
pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
self.0.io_reset();
match self.0.inner().accept() {
Ok((s, a)) => return Ok((UnixStream(CoIo::new(s)?), a)),
Err(e) => {
let raw_err = e.raw_os_error();
if raw_err == Some(libc::EAGAIN) || raw_err == Some(libc::EWOULDBLOCK) {
} else {
return Err(e);
}
}
}
let mut a = net_impl::UnixListenerAccept::new(self)?;
yield_with_io(&a, a.is_coroutine);
a.done()
}
pub fn try_clone(&self) -> io::Result<UnixListener> {
let listener = self.0.inner().try_clone()?;
Ok(UnixListener(CoIo::new(listener)?))
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.0.inner().local_addr()
}
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.0.inner().take_error()
}
pub fn incoming(&self) -> Incoming {
Incoming { listener: self }
}
}
impl AsRawFd for UnixListener {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
impl FromRawFd for UnixListener {
unsafe fn from_raw_fd(fd: RawFd) -> UnixListener {
let listener = FromRawFd::from_raw_fd(fd);
UnixListener(CoIo::new(listener).expect("can't convert to UnixListener"))
}
}
impl IntoRawFd for UnixListener {
fn into_raw_fd(self) -> RawFd {
self.0.into_raw_fd()
}
}
impl<'a> IntoIterator for &'a UnixListener {
type Item = io::Result<UnixStream>;
type IntoIter = Incoming<'a>;
fn into_iter(self) -> Incoming<'a> {
self.incoming()
}
}
#[derive(Debug)]
pub struct Incoming<'a> {
listener: &'a UnixListener,
}
impl Iterator for Incoming<'_> {
type Item = io::Result<UnixStream>;
fn next(&mut self) -> Option<io::Result<UnixStream>> {
Some(self.listener.accept().map(|s| s.0))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(usize::MAX, None)
}
}
pub struct UnixDatagram(pub(crate) CoIo<net::UnixDatagram>);
impl fmt::Debug for UnixDatagram {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let mut builder = fmt.debug_struct("UnixDatagram");
builder.field("fd", &self.as_raw_fd());
if let Ok(addr) = self.local_addr() {
builder.field("local", &addr);
}
if let Ok(addr) = self.peer_addr() {
builder.field("peer", &addr);
}
builder.finish()
}
}
impl UnixDatagram {
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
let datagram = net::UnixDatagram::bind(path)?;
Ok(UnixDatagram(CoIo::new(datagram)?))
}
pub fn unbound() -> io::Result<UnixDatagram> {
let datagram = net::UnixDatagram::unbound()?;
Ok(UnixDatagram(CoIo::new(datagram)?))
}
pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
let (i1, i2) = net::UnixDatagram::pair()?;
let i1 = UnixDatagram(CoIo::new(i1)?);
let i2 = UnixDatagram(CoIo::new(i2)?);
Ok((i1, i2))
}
pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
self.0.inner().connect(path)
}
pub fn try_clone(&self) -> io::Result<UnixDatagram> {
let datagram = self.0.inner().try_clone()?;
Ok(UnixDatagram(CoIo::new(datagram)?))
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.0.inner().local_addr()
}
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.0.inner().peer_addr()
}
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.0.io_reset();
match self.0.inner().recv_from(buf) {
Ok(n) => return Ok(n),
Err(e) => {
let raw_err = e.raw_os_error();
if raw_err == Some(libc::EAGAIN) || raw_err == Some(libc::EWOULDBLOCK) {
} else {
return Err(e);
}
}
}
let mut reader = net_impl::UnixRecvFrom::new(self, buf);
yield_with_io(&reader, reader.is_coroutine);
reader.done()
}
pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.0.io_reset();
match self.0.inner().recv(buf) {
Ok(n) => return Ok(n),
Err(e) => {
let raw_err = e.raw_os_error();
if raw_err == Some(libc::EAGAIN) || raw_err == Some(libc::EWOULDBLOCK) {
} else {
return Err(e);
}
}
}
let mut reader = net_impl::SocketRead::new(
&self.0,
buf,
#[cfg(feature = "io_timeout")]
self.read_timeout().unwrap(),
);
yield_with_io(&reader, reader.is_coroutine);
reader.done()
}
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.0.peek(buf)
}
pub fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
self.0.io_reset();
match self.0.inner().send_to(buf, path.as_ref()) {
Ok(n) => return Ok(n),
Err(e) => {
let raw_err = e.raw_os_error();
if raw_err == Some(libc::EAGAIN) || raw_err == Some(libc::EWOULDBLOCK) {
} else {
return Err(e);
}
}
}
let mut writer = net_impl::UnixSendTo::new(self, buf, path.as_ref())?;
yield_with_io(&writer, writer.is_coroutine);
writer.done()
}
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.0.io_reset();
match self.0.inner().send(buf) {
Ok(n) => return Ok(n),
Err(e) => {
let raw_err = e.raw_os_error();
if raw_err == Some(libc::EAGAIN) || raw_err == Some(libc::EWOULDBLOCK) {
} else {
return Err(e);
}
}
}
let mut writer = net_impl::SocketWrite::new(
&self.0,
buf,
#[cfg(feature = "io_timeout")]
self.0.write_timeout().unwrap(),
);
yield_with_io(&writer, writer.is_coroutine);
writer.done()
}
#[cfg(feature = "io_timeout")]
pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.0.inner().set_read_timeout(timeout)?;
self.0.set_read_timeout(timeout)
}
#[cfg(feature = "io_timeout")]
pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.0.inner().set_write_timeout(timeout)?;
self.0.set_write_timeout(timeout)
}
#[cfg(feature = "io_timeout")]
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
self.0.read_timeout()
}
#[cfg(feature = "io_timeout")]
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
self.0.write_timeout()
}
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.0.inner().take_error()
}
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.0.inner().shutdown(how)
}
}
impl AsRawFd for UnixDatagram {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
impl FromRawFd for UnixDatagram {
unsafe fn from_raw_fd(fd: RawFd) -> UnixDatagram {
let datagram = FromRawFd::from_raw_fd(fd);
UnixDatagram(CoIo::new(datagram).expect("can't convert to UnixDatagram"))
}
}
impl IntoRawFd for UnixDatagram {
fn into_raw_fd(self) -> RawFd {
self.0.into_raw_fd()
}
}
impl io_impl::AsIoData for UnixDatagram {
fn as_io_data(&self) -> &io_impl::IoData {
self.0.as_io_data()
}
}
#[cfg(all(test, not(target_os = "emscripten")))]
mod test {
use std::io::prelude::*;
use super::*;
fn tmpdir() -> tempfile::TempDir {
tempfile::tempdir().expect("failed to create TempDir")
}
macro_rules! or_panic {
($e:expr) => {
match $e {
Ok(e) => e,
Err(e) => panic!("{}", e),
}
};
}
#[test]
fn basic() {
let dir = tmpdir();
let socket_path = dir.path().join("sock");
let msg1 = b"hello";
let msg2 = b"world!";
let listener = or_panic!(UnixListener::bind(&socket_path));
let thread = go!(move || {
let mut stream = or_panic!(listener.accept()).0;
let mut buf = [0; 5];
or_panic!(stream.read_exact(&mut buf));
assert_eq!(&msg1[..], &buf[..]);
or_panic!(stream.write_all(msg2));
});
let mut stream = or_panic!(UnixStream::connect(&socket_path));
assert_eq!(
Some(&*socket_path),
stream.peer_addr().unwrap().as_pathname()
);
or_panic!(stream.write_all(msg1));
let mut buf = vec![];
or_panic!(stream.read_to_end(&mut buf));
assert_eq!(&msg2[..], &buf[..]);
drop(stream);
thread.join().unwrap();
}
#[test]
fn pair() {
let msg1 = b"hello";
let msg2 = b"world!";
let (mut s1, mut s2) = or_panic!(UnixStream::pair());
let thread = go!(move || {
let mut buf = [0; 5];
or_panic!(s1.read_exact(&mut buf));
assert_eq!(&msg1[..], &buf[..]);
or_panic!(s1.write_all(msg2));
});
or_panic!(s2.write_all(msg1));
let mut buf = vec![];
or_panic!(s2.read_to_end(&mut buf));
assert_eq!(&msg2[..], &buf[..]);
drop(s2);
thread.join().unwrap();
}
#[test]
fn try_clone() {
let dir = tmpdir();
let socket_path = dir.path().join("sock");
let msg1 = b"hello";
let msg2 = b"world";
let listener = or_panic!(UnixListener::bind(&socket_path));
let thread = go!(move || {
let mut stream = or_panic!(listener.accept()).0;
or_panic!(stream.write_all(msg1));
or_panic!(stream.write_all(msg2));
});
let mut stream = or_panic!(UnixStream::connect(&socket_path));
let mut stream2 = or_panic!(stream.try_clone());
let mut buf = [0; 5];
or_panic!(stream.read_exact(&mut buf));
assert_eq!(&msg1[..], &buf[..]);
or_panic!(stream2.read_exact(&mut buf));
assert_eq!(&msg2[..], &buf[..]);
thread.join().unwrap();
}
#[test]
fn iter() {
let dir = tmpdir();
let socket_path = dir.path().join("sock");
let listener = or_panic!(UnixListener::bind(&socket_path));
let thread = go!(move || for stream in listener.incoming().take(2) {
let mut stream = or_panic!(stream);
let mut buf = [0];
or_panic!(stream.read_exact(&mut buf));
});
for _ in 0..2 {
let mut stream = or_panic!(UnixStream::connect(&socket_path));
or_panic!(stream.write_all(&[0]));
}
thread.join().unwrap();
}
#[test]
fn long_path() {
let dir = tmpdir();
let socket_path = dir.path().join(
"asdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfa\
sasdfasdfasdasdfasdfasdfadfasdfasdfasdfasdfasdf",
);
match UnixStream::connect(&socket_path) {
Err(ref e) if e.kind() == io::ErrorKind::InvalidInput => {}
Err(e) => panic!("unexpected error {e}"),
Ok(_) => panic!("unexpected success"),
}
match UnixListener::bind(&socket_path) {
Err(ref e) if e.kind() == io::ErrorKind::InvalidInput => {}
Err(e) => panic!("unexpected error {e}"),
Ok(_) => panic!("unexpected success"),
}
match UnixDatagram::bind(&socket_path) {
Err(ref e) if e.kind() == io::ErrorKind::InvalidInput => {}
Err(e) => panic!("unexpected error {e}"),
Ok(_) => panic!("unexpected success"),
}
}
#[test]
#[cfg(feature = "io_timeout")]
fn timeouts() {
let dir = tmpdir();
let socket_path = dir.path().join("sock");
let _listener = or_panic!(UnixListener::bind(&socket_path));
let stream = or_panic!(UnixStream::connect(&socket_path));
let dur = Duration::new(15410, 0);
assert_eq!(None, or_panic!(stream.read_timeout()));
or_panic!(stream.set_read_timeout(Some(dur)));
assert_eq!(Some(dur), or_panic!(stream.read_timeout()));
assert_eq!(None, or_panic!(stream.write_timeout()));
or_panic!(stream.set_write_timeout(Some(dur)));
assert_eq!(Some(dur), or_panic!(stream.write_timeout()));
or_panic!(stream.set_read_timeout(None));
assert_eq!(None, or_panic!(stream.read_timeout()));
or_panic!(stream.set_write_timeout(None));
assert_eq!(None, or_panic!(stream.write_timeout()));
}
#[test]
#[cfg(feature = "io_timeout")]
fn test_read_timeout() {
let dir = tmpdir();
let socket_path = dir.path().join("sock");
let _listener = or_panic!(UnixListener::bind(&socket_path));
let mut stream = or_panic!(UnixStream::connect(&socket_path));
or_panic!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
let mut buf = [0; 10];
let kind = stream.read(&mut buf).expect_err("expected error").kind();
assert!(kind == io::ErrorKind::WouldBlock || kind == io::ErrorKind::TimedOut);
}
#[test]
#[cfg(feature = "io_timeout")]
fn test_read_with_timeout() {
let dir = tmpdir();
let socket_path = dir.path().join("sock");
let listener = or_panic!(UnixListener::bind(&socket_path));
let mut stream = or_panic!(UnixStream::connect(&socket_path));
or_panic!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
let mut other_end = or_panic!(listener.accept()).0;
or_panic!(other_end.write_all(b"hello world"));
let mut buf = [0; 11];
or_panic!(stream.read_exact(&mut buf));
assert_eq!(b"hello world", &buf[..]);
let kind = stream
.read_exact(&mut buf)
.expect_err("expected error")
.kind();
assert!(kind == io::ErrorKind::WouldBlock || kind == io::ErrorKind::TimedOut);
}
#[test]
fn test_unix_datagram() {
let dir = tmpdir();
let path1 = dir.path().join("sock1");
let path2 = dir.path().join("sock2");
let sock1 = or_panic!(UnixDatagram::bind(path1));
let sock2 = or_panic!(UnixDatagram::bind(&path2));
let msg = b"hello world";
or_panic!(sock1.send_to(msg, &path2));
let mut buf = [0; 11];
or_panic!(sock2.recv_from(&mut buf));
assert_eq!(msg, &buf[..]);
}
#[test]
fn test_unnamed_unix_datagram() {
let dir = tmpdir();
let path1 = dir.path().join("sock1");
let sock1 = or_panic!(UnixDatagram::bind(&path1));
let sock2 = or_panic!(UnixDatagram::unbound());
let msg = b"hello world";
or_panic!(sock2.send_to(msg, &path1));
let mut buf = [0; 11];
let (usize, addr) = or_panic!(sock1.recv_from(&mut buf));
assert_eq!(usize, 11);
assert!(addr.is_unnamed());
assert_eq!(msg, &buf[..]);
}
#[test]
fn test_connect_unix_datagram() {
let dir = tmpdir();
let path1 = dir.path().join("sock1");
let path2 = dir.path().join("sock2");
let bsock1 = or_panic!(UnixDatagram::bind(&path1));
let bsock2 = or_panic!(UnixDatagram::bind(&path2));
let sock = or_panic!(UnixDatagram::unbound());
or_panic!(sock.connect(&path1));
let msg = b"hello there";
or_panic!(sock.send(msg));
let mut buf = [0; 11];
let (usize, addr) = or_panic!(bsock1.recv_from(&mut buf));
assert_eq!(usize, 11);
assert!(addr.is_unnamed());
assert_eq!(msg, &buf[..]);
or_panic!(sock.connect(&path2));
or_panic!(sock.send(msg));
or_panic!(bsock2.recv_from(&mut buf));
}
#[test]
fn test_unix_datagram_recv() {
let dir = tmpdir();
let path1 = dir.path().join("sock1");
let sock1 = or_panic!(UnixDatagram::bind(&path1));
let sock2 = or_panic!(UnixDatagram::unbound());
or_panic!(sock2.connect(&path1));
let msg = b"hello world";
or_panic!(sock2.send(msg));
let mut buf = [0; 11];
let size = or_panic!(sock1.recv(&mut buf));
assert_eq!(size, 11);
assert_eq!(msg, &buf[..]);
}
#[test]
fn datagram_pair() {
let msg1 = b"hello";
let msg2 = b"world!";
let (s1, s2) = or_panic!(UnixDatagram::pair());
let thread = go!(move || {
let mut buf = [0; 5];
or_panic!(s1.recv(&mut buf));
assert_eq!(&msg1[..], &buf[..]);
or_panic!(s1.send(msg2));
});
or_panic!(s2.send(msg1));
let mut buf = [0; 6];
or_panic!(s2.recv(&mut buf));
assert_eq!(&msg2[..], &buf[..]);
drop(s2);
thread.join().unwrap();
}
#[test]
fn abstract_namespace_not_allowed() {
assert!(UnixStream::connect("\0asdf").is_err());
}
#[test]
fn socket_peek() {
let msg1 = b"hello";
let msg2 = b"world!";
let (s1, s2) = or_panic!(UnixDatagram::pair());
or_panic!(s2.send(msg1));
let mut buf = [0; 5];
or_panic!(s1.peek(&mut buf));
assert_eq!(&msg1[..], &buf[..]);
buf.copy_from_slice(&[0; 5]);
or_panic!(s1.peek(&mut buf));
assert_eq!(&msg1[..], &buf[..]);
buf.copy_from_slice(&[0; 5]);
or_panic!(s1.peek(&mut buf));
assert_eq!(&msg1[..], &buf[..]);
or_panic!(s2.send(msg2));
let mut buf = [0; 11];
let n = s1.peek(&mut buf).unwrap();
assert_eq!(n, 5);
assert_eq!(&buf[0..n], &msg1[..]);
let n = s1.recv(&mut buf).unwrap();
assert_eq!(n, 5);
let n = s1.peek(&mut buf).unwrap();
assert_eq!(n, 6);
assert_eq!(&buf[0..n], &msg2[..]);
let n = s1.recv(&mut buf).unwrap();
assert_eq!(n, 6);
}
}