ruyi 0.1.6

An event-driven framework for non-blocking, asynchronous I/O in Rust
Documentation
use std::cell::UnsafeCell;
use std::io;

use buf::{ByteBuf, Appender};
use io::{AsyncRead, AsyncWrite};

const RECV_BUF_SIZE: usize = 128 * 1024;

struct RecvBuf {
    inner: UnsafeCell<ByteBuf>,
}

impl RecvBuf {
    #[inline]
    fn new() -> Self {
        RecvBuf {
            inner: UnsafeCell::new(ByteBuf::with_capacity(RECV_BUF_SIZE)),
        }
    }

    #[inline]
    fn append(v: usize, chain: &mut Appender) -> io::Result<usize> {
        if let Some(block) = chain.last_mut() {
            if block.appendable() >= v {
                return Ok(0);
            }
        }
        chain.append(v);
        Ok(0)
    }

    #[inline]
    fn get_mut(&self) -> &mut ByteBuf {
        let buf = unsafe { &mut *self.inner.get() };
        buf.append(RECV_BUF_SIZE, Self::append).unwrap();
        buf
    }
}

thread_local!(static RECV_BUF: RecvBuf = RecvBuf::new());

#[inline]
fn read<R>(r: &mut R) -> io::Result<Option<(ByteBuf, usize)>>
where
    R: AsyncRead,
{
    RECV_BUF.with(|recv_buf| {
        let buf = recv_buf.get_mut();
        let n = buf.read_in(r)?;
        let data = if n > 0 {
            Some((buf.drain_to(n)?, n))
        } else {
            None
        };
        Ok(data)
    })
}

pub mod read;
pub mod write;
pub mod copy;
pub mod split;

use futures::{Stream, Sink, Poll, Async, StartSend, AsyncSink};
use sink::IntoSink;
use stream::IntoStream;

#[derive(Debug)]
pub struct IStream<R> {
    r: Option<R>,
    would_block: bool,
}

impl<R> IStream<R>
where
    R: AsyncRead,
{
    #[inline]
    pub fn get_ref(&self) -> &R {
        self.r
            .as_ref()
            .expect("Attempted IStream::get_ref after completion")
    }

    #[inline]
    pub fn get_mut(&mut self) -> &mut R {
        self.r
            .as_mut()
            .expect("Attempted IStream::get_mut after completion")
    }

    #[inline]
    pub fn into_inner(self) -> R {
        self.r
            .expect("Attempted IStream::into_inner after completion")
    }

    #[inline]
    fn poll_read(&mut self) -> Poll<Option<ByteBuf>, io::Error> {
        match self.r {
            Some(ref mut r) => {
                if self.would_block {
                    self.would_block = false;
                    r.need_read()?;
                    return Ok(Async::NotReady);
                }
                if !r.is_readable() {
                    return Ok(Async::NotReady);
                }
                match read(r) {
                    Ok(Some((data, _))) => {
                        self.would_block = true;
                        Ok(Async::Ready(Some(data)))
                    }
                    Ok(None) => Ok(Async::Ready(None)),
                    Err(e) => {
                        match e.kind() {
                            io::ErrorKind::WouldBlock => {
                                r.need_read()?;
                                Ok(Async::NotReady)
                            }
                            _ => Err(e),
                        }
                    }
                }
            }
            None => panic!("Attempted to poll IStream after completion"),
        }
    }
}

impl<R> Stream for IStream<R>
where
    R: AsyncRead,
{
    type Item = ByteBuf;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.poll_read() {
            Ok(o) => Ok(o),
            Err(e) => {
                self.r = None;
                Err(e)
            }
        }
    }
}

#[derive(Debug)]
pub struct OStream<W> {
    w: Option<W>,
    buf: Option<ByteBuf>,
}

impl<W> OStream<W>
where
    W: AsyncWrite,
{
    #[inline]
    pub fn get_ref(&self) -> &W {
        self.w
            .as_ref()
            .expect("Attempted OStream::get_ref after completion")
    }

    #[inline]
    pub fn get_mut(&mut self) -> &mut W {
        self.w
            .as_mut()
            .expect("Attempted OStream::get_mut after completion")
    }

    #[inline]
    pub fn into_inner(self) -> W {
        self.w
            .expect("Attempted OStream::into_inner after completion")
    }

    #[inline]
    fn poll_write(&mut self) -> Poll<(), io::Error> {
        match self.w {
            Some(ref mut w) => {
                if w.is_writable() {
                    if let Some(ref mut data) = self.buf {
                        if let Err(e) = data.write_out(w) {
                            return match e.kind() {
                                io::ErrorKind::WouldBlock => {
                                    w.need_write()?;
                                    Ok(Async::NotReady)
                                }
                                _ => Err(e),
                            };
                        }
                        if !data.is_empty() {
                            w.need_write()?;
                            data.compact();
                            return Ok(Async::NotReady);
                        }
                    }
                    w.no_need_write()?;
                    Ok(Async::Ready(()))
                } else {
                    Ok(Async::NotReady)
                }
            }
            None => panic!("Attempted to poll OStream after completion"),
        }
    }
}

impl<W> Sink for OStream<W>
where
    W: AsyncWrite,
{
    type SinkItem = ByteBuf;
    type SinkError = io::Error;

    fn start_send(&mut self, bytes: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
        let buf = match self.buf.take() {
            Some(mut buf) => {
                buf.extend(bytes);
                buf
            }
            None => bytes,
        };
        self.buf = Some(buf);
        self.poll_complete()?;
        Ok(AsyncSink::Ready)
    }

    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
        match self.poll_write() {
            Ok(Async::Ready(())) => {
                self.buf = None;
                Ok(Async::Ready(()))
            }
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(e) => {
                self.w = None;
                Err(e)
            }
        }
    }

    #[inline]
    fn close(&mut self) -> Poll<(), Self::SinkError> {
        self.poll_complete()
    }
}

impl<R: AsyncRead> IntoStream for R {
    type Stream = IStream<Self>;

    #[inline]
    fn into_stream(self) -> Self::Stream {
        IStream {
            r: Some(self),
            would_block: false,
        }
    }
}

impl<W: AsyncWrite> IntoSink for W {
    type Sink = OStream<Self>;

    #[inline]
    fn into_sink(self) -> Self::Sink {
        OStream {
            w: Some(self),
            buf: None,
        }
    }
}