use std::fmt;
use std::io;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Poll, Context};
use async_std::io::{Read, Write, IoSlice, IoSliceMut};
use async_std::net::{TcpStream, Shutdown};
#[cfg(unix)] use async_std::os::unix::net::UnixStream;
use crate::backpressure::Token;
#[derive(Debug, Clone)]
enum Stream {
Tcp(TcpStream),
#[cfg(unix)]
Unix(UnixStream),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum PeerAddr {
Tcp(SocketAddr),
Unix(Option<PathBuf>),
}
#[derive(Debug, Clone)]
pub struct ByteStream {
stream: Stream,
token: Option<Token>,
}
trait Assert: Read + Write + Send + Unpin + 'static { }
impl Assert for ByteStream {}
impl fmt::Display for PeerAddr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PeerAddr::Tcp(s) => s.fmt(f),
PeerAddr::Unix(None) => "<unnamed>".fmt(f),
PeerAddr::Unix(Some(s)) => s.display().fmt(f),
}
}
}
impl ByteStream {
pub fn new_tcp(token: Token, stream: TcpStream) -> ByteStream {
ByteStream {
stream: Stream::Tcp(stream),
token: Some(token),
}
}
pub fn new_tcp_detached(stream: TcpStream) -> ByteStream {
ByteStream {
stream: Stream::Tcp(stream),
token: None,
}
}
#[cfg(unix)]
pub fn new_unix(token: Token, stream: UnixStream) -> ByteStream {
ByteStream {
stream: Stream::Unix(stream),
token: Some(token),
}
}
#[cfg(unix)]
pub fn new_unix_detached(stream: UnixStream) -> ByteStream {
ByteStream {
stream: Stream::Unix(stream),
token: None,
}
}
pub fn peer_addr(&self) -> io::Result<PeerAddr> {
match &self.stream {
Stream::Tcp(s) => s.peer_addr().map(PeerAddr::Tcp),
#[cfg(unix)]
Stream::Unix(s) => {
s.peer_addr()
.map(|a| a.as_pathname().map(|p| p.to_owned()))
.map(PeerAddr::Unix)
}
}
}
pub fn nodelay(&self) -> io::Result<bool> {
match &self.stream {
Stream::Tcp(s) => s.nodelay(),
#[cfg(unix)]
Stream::Unix(_) => Ok(true),
}
}
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
match &self.stream {
Stream::Tcp(s) => s.set_nodelay(nodelay),
#[cfg(unix)]
Stream::Unix(_) => Ok(()),
}
}
pub fn shutdown(&self, how: Shutdown) -> Result<(), io::Error> {
match &self.stream {
Stream::Tcp(s) => s.shutdown(how),
#[cfg(unix)]
Stream::Unix(s) => s.shutdown(how),
}
}
}
impl From<(Token, TcpStream)> for ByteStream {
fn from((token, stream): (Token, TcpStream)) -> ByteStream {
ByteStream::new_tcp(token, stream)
}
}
#[cfg(unix)]
impl From<(Token, UnixStream)> for ByteStream {
fn from((token, stream): (Token, UnixStream)) -> ByteStream {
ByteStream::new_unix(token, stream)
}
}
impl Read for ByteStream {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8])
-> Poll<Result<usize, io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_read(cx, buf)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_read(cx, buf)
}
}
}
fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context,
bufs: &mut [IoSliceMut])
-> Poll<Result<usize, io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
}
}
}
}
impl Read for &ByteStream {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8])
-> Poll<Result<usize, io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_read(cx, buf)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_read(cx, buf)
}
}
}
fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context,
bufs: &mut [IoSliceMut])
-> Poll<Result<usize, io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
}
}
}
}
impl Write for ByteStream {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8])
-> Poll<Result<usize, io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_write(cx, buf)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_write(cx, buf)
}
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Result<(), io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_flush(cx)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_flush(cx)
}
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Result<(), io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_close(cx)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_close(cx)
}
}
}
fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context,
bufs: &[IoSlice])
-> Poll<Result<usize, io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
}
}
}
}
impl Write for &ByteStream {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8])
-> Poll<Result<usize, io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_write(cx, buf)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_write(cx, buf)
}
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Result<(), io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_flush(cx)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_flush(cx)
}
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Result<(), io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_close(cx)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_close(cx)
}
}
}
fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context,
bufs: &[IoSlice])
-> Poll<Result<usize, io::Error>>
{
match self.stream {
Stream::Tcp(ref s) => {
Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
}
#[cfg(unix)]
Stream::Unix(ref s) => {
Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
}
}
}
}