ruyi 0.1.6

An event-driven framework for non-blocking, asynchronous I/O in Rust
Documentation
use std::fmt;
use std::io;
use std::mem;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, Shutdown};

use net2::TcpBuilder;

use futures::{Future, Poll, Stream, Async};

use nio;
use io::{AsyncRead, AsyncWrite};
use reactor::PollableIo;

#[derive(Debug)]
pub struct TcpStream {
    inner: PollableIo<nio::TcpStream>,
}

enum ConnectState {
    Connecting(TcpStream),
    Finishing(TcpStream),
    Connected(TcpStream),
    Error(io::Error),
    Dead,
}

pub struct TcpConnector {
    state: ConnectState,
}

impl TcpStream {
    #[inline]
    fn from(sock: nio::TcpStream) -> Self {
        TcpStream {
            inner: PollableIo::new(sock),
        }
    }

    pub fn connect(addr: &SocketAddr) -> TcpConnector {
        let state = match nio::TcpStream::connect(addr) {
            Ok((sock, connected)) => {
                let io = Self::from(sock);
                match connected {
                    false => ConnectState::Connecting(io),
                    true => ConnectState::Connected(io),
                }
            }
            Err(e) => ConnectState::Error(e),
        };
        TcpConnector { state }
    }

    #[inline]
    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
        self.inner.get_ref().peer_addr()
    }

    #[inline]
    pub fn local_addr(&self) -> io::Result<SocketAddr> {
        self.inner.get_ref().local_addr()
    }

    #[inline]
    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
        self.inner.get_ref().shutdown(how)
    }

    #[inline]
    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
        self.inner.get_ref().set_nodelay(nodelay)
    }

    #[inline]
    pub fn nodelay(&self) -> io::Result<bool> {
        self.inner.get_ref().nodelay()
    }

    #[inline]
    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
        self.inner.get_ref().set_ttl(ttl)
    }

    #[inline]
    pub fn ttl(&self) -> io::Result<u32> {
        self.inner.get_ref().ttl()
    }

    #[inline]
    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
        self.inner.get_ref().take_error()
    }
}

impl fmt::Display for TcpStream {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        use std::fmt::Debug;
        self.inner.get_ref().fmt(f)
    }
}

impl io::Read for TcpStream {
    #[inline]
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.inner.get_mut().read(buf)
    }
}

impl nio::ReadV for TcpStream {
    #[inline]
    fn readv(&mut self, iovs: &[nio::IoVec]) -> io::Result<usize> {
        self.inner.get_mut().readv(iovs)
    }
}

impl AsyncRead for TcpStream {
    #[inline]
    fn need_read(&mut self) -> io::Result<()> {
        self.inner.need_read()
    }

    #[inline]
    fn no_need_read(&mut self) -> io::Result<()> {
        self.inner.no_need_read()
    }

    #[inline]
    fn is_readable(&self) -> bool {
        self.inner.is_readable()
    }
}

impl io::Write for TcpStream {
    #[inline]
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.inner.get_mut().write(buf)
    }

    #[inline]
    fn flush(&mut self) -> io::Result<()> {
        self.inner.get_mut().flush()
    }
}

impl nio::WriteV for TcpStream {
    #[inline]
    fn writev(&mut self, iovs: &[nio::IoVec]) -> io::Result<usize> {
        self.inner.get_mut().writev(iovs)
    }
}

impl AsyncWrite for TcpStream {
    #[inline]
    fn need_write(&mut self) -> io::Result<()> {
        self.inner.need_write()
    }

    #[inline]
    fn no_need_write(&mut self) -> io::Result<()> {
        self.inner.no_need_write()
    }

    #[inline]
    fn is_writable(&self) -> bool {
        self.inner.is_writable()
    }
}

pub struct Incoming {
    io: PollableIo<nio::TcpListener>,
}

#[derive(Debug, Clone, Copy)]
pub struct TcpListenerBuilder {
    addr: SocketAddr,
    backlog: i32,
    ttl: Option<u32>,
    only_v6: Option<bool>,
}

pub struct TcpListener {
    inner: nio::TcpListener,
}

impl TcpListener {
    #[inline]
    pub fn builder() -> TcpListenerBuilder {
        Default::default()
    }

    #[inline]
    pub fn incoming(self) -> Incoming {
        Incoming {
            io: PollableIo::new(self.inner),
        }
    }

    #[inline]
    pub fn local_addr(&self) -> io::Result<SocketAddr> {
        self.inner.local_addr()
    }

    #[inline]
    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
        self.inner.set_ttl(ttl)
    }

    #[inline]
    pub fn ttl(&self) -> io::Result<u32> {
        self.inner.ttl()
    }

    #[inline]
    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
        self.inner.take_error()
    }

    #[inline]
    fn from(inner: nio::TcpListener) -> Self {
        TcpListener { inner }
    }
}

impl Default for TcpListenerBuilder {
    #[inline]
    fn default() -> Self {
        TcpListenerBuilder {
            addr: SocketAddr::new(IpAddr::from(Ipv4Addr::from(0)), 0),
            backlog: 128,
            ttl: None,
            only_v6: None,
        }
    }
}

impl TcpListenerBuilder {
    #[inline]
    pub fn addr(&mut self, addr: SocketAddr) -> &mut Self {
        self.addr = addr;
        self
    }

    #[inline]
    pub fn port(&mut self, port: u16) -> &mut Self {
        self.addr.set_port(port);
        self
    }

    #[inline]
    pub fn backlog(&mut self, backlog: i32) -> &mut Self {
        self.backlog = backlog;
        self
    }

    #[inline]
    pub fn ttl(&mut self, ttl: Option<u32>) -> &mut Self {
        self.ttl = ttl;
        self
    }

    #[inline]
    pub fn only_v6(&mut self, only_v6: Option<bool>) -> &mut Self {
        self.only_v6 = only_v6;
        self
    }

    pub fn build(&self) -> io::Result<TcpListener> {
        let builder = match self.addr {
            SocketAddr::V4(..) => TcpBuilder::new_v4()?,
            SocketAddr::V6(..) => TcpBuilder::new_v6()?,
        };
        if let Some(ttl) = self.ttl {
            builder.ttl(ttl)?;
        }
        if let Some(only_v6) = self.only_v6 {
            builder.only_v6(only_v6)?;
        }
        let listener = builder
            .reuse_address(true)?
            .bind(self.addr)?
            .listen(self.backlog)?;
        listener.set_nonblocking(true)?;
        Ok(TcpListener::from(nio::TcpListener::from(listener)))
    }
}

impl Stream for Incoming {
    type Item = (TcpStream, SocketAddr);
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.io.get_ref().accept() {
            Ok((s, a)) => Ok(Async::Ready(Some((TcpStream::from(s), a)))),
            Err(e) => {
                match e.kind() {
                    io::ErrorKind::WouldBlock => {
                        self.io.need_read()?;
                        Ok(Async::NotReady)
                    }
                    _ => Err(e),
                }
            }
        }
    }
}

impl Future for TcpConnector {
    type Item = TcpStream;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match mem::replace(&mut self.state, ConnectState::Dead) {
            ConnectState::Connecting(mut sock) => {
                sock.need_write()?;
                self.state = ConnectState::Finishing(sock);
                Ok(Async::NotReady)
            }
            ConnectState::Finishing(mut sock) => {
                if sock.is_writable() {
                    match sock.take_error()? {
                        None => {
                            sock.no_need_write()?;
                            Ok(Async::Ready(sock))
                        }
                        Some(e) => Err(e),
                    }
                } else {
                    self.state = ConnectState::Finishing(sock);
                    Ok(Async::NotReady)
                }
            }
            ConnectState::Connected(sock) => Ok(Async::Ready(sock)),
            ConnectState::Error(e) => Err(e),
            _ => panic!("Attempted to poll TcpConnector after completion"),
        }
    }
}