rust_asio 0.6.0

Asynchronous I/O library
Documentation
use prelude::{Protocol, SockAddr};
use ffi::{self, socklen_t};
use unsafe_cell::{UnsafeRefCell, UnsafeSliceCell};
use error::{ErrCode, READY, EINTR, EAGAIN, EWOULDBLOCK, ECANCELED, last_error, eof};
use core::{IoContext, ThreadIoContext, workplace};
use async::{Receiver, Handler, WrappedHandler, Operation};
use reactive_io::{AsyncInput, getnonblock, setnonblock};

use std::io;
use libc::ssize_t;

trait Reader : Send + 'static {
    type Output;

    fn read<T>(&mut self, soc: &T, buf: &mut [u8]) -> ssize_t
        where T: AsyncInput;

    fn ok(&self, len: ssize_t) -> Self::Output;
}

struct Read;

impl Reader for Read {
    type Output = usize;

    fn read<T>(&mut self, soc: &T, buf: &mut [u8]) -> ssize_t
        where T: AsyncInput,
    {
        unsafe { ffi::read(soc, buf) }
    }

    fn ok(&self, len: ssize_t) -> Self::Output {
        len as usize
    }
}

struct Recv { flags: i32 }

impl Reader for Recv {
    type Output = usize;

    fn read<T>(&mut self, soc: &T, buf: &mut [u8]) -> ssize_t
        where T: AsyncInput,
    {
        unsafe { ffi::recv(soc, buf, self.flags) }
    }

    fn ok(&self, len: ssize_t) -> Self::Output {
        len as usize
    }
}

struct RecvFrom<E> { flags: i32, ep: E, socklen: socklen_t }

impl<E: SockAddr> Reader for RecvFrom<E> {
    type Output = (usize, E);

    fn read<T>(&mut self, soc: &T, buf: &mut [u8]) -> ssize_t
        where T: AsyncInput,
    {
        unsafe { ffi::recvfrom(soc, buf, self.flags, &mut self.ep, &mut self.socklen) }
    }

    fn ok(&self, len: ssize_t) -> Self::Output {
        let mut ep = self.ep.clone();
        unsafe { ep.resize(self.socklen as usize) };
        (len as usize, ep)
    }
}

fn read_detail<T, R>(soc: &T, buf: &mut [u8], mut reader: R) -> io::Result<R::Output>
    where T: AsyncInput,
          R: Reader,
{
    while !soc.as_ctx().stopped() {
        let len = reader.read(soc, buf);
        if len > 0 {
            return Ok(reader.ok(len));
        }
        if len == 0 {
            return Err(eof());
        }
        let ec = last_error();
        if ec != EINTR {
            return Err(ec.into());
        }
    }
    Err(ECANCELED.into())
}

pub fn read<T>(soc: &T, buf: &mut [u8]) -> io::Result<usize>
    where T: AsyncInput,
{
    read_detail(soc, buf, Read)
}

pub fn recv<T>(soc: &T, buf: &mut [u8], flags: i32) -> io::Result<usize>
    where T: AsyncInput,
{
    read_detail(soc, buf, Recv { flags: flags })
}

pub fn recvfrom<T, P>(soc: &T, buf: &mut [u8], flags: i32, pro: &P) -> io::Result<(usize, P::Endpoint)>
    where T: AsyncInput,
          P: Protocol,
{
    let ep = unsafe { pro.uninitialized() };
    let socklen = ep.capacity() as socklen_t;
    read_detail(soc, buf, RecvFrom { flags: flags, ep: ep, socklen: socklen })
}

struct ReadHandler<T, R> {
    soc: UnsafeRefCell<T>,
    buf: UnsafeSliceCell<u8>,
    reader: R,
}

impl<T, R> WrappedHandler<R::Output, io::Error> for ReadHandler<T, R>
    where T: AsyncInput,
          R: Reader,
{
    fn perform(&mut self, ctx: &IoContext, this: &mut ThreadIoContext, ec: ErrCode, op: Operation<R::Output, io::Error, Self>) {
        let soc = unsafe { self.soc.as_ref() };
        match ec {
            READY => {
                let mode = getnonblock(soc).unwrap();
                setnonblock(soc, true).unwrap();
                while !ctx.stopped() {
                    let buf = unsafe { self.buf.as_mut_slice() };
                    let len = self.reader.read(soc, buf);
                    if len > 0 {
                        setnonblock(soc, mode).unwrap();
                        soc.next_op(this);
                        op.send(ctx, Ok(self.reader.ok(len)));
                        return;
                    }
                    if len == 0 {
                        setnonblock(soc, mode).unwrap();
                        soc.next_op(this);
                        op.send(ctx, Err(eof()));
                        return;
                    }
                    let ec = last_error();
                    if ec == EAGAIN || ec == EWOULDBLOCK {
                        setnonblock(soc, mode).unwrap();
                        soc.add_op(this, op, ec);
                        return;
                    }
                    if ec != EINTR {
                        setnonblock(soc, mode).unwrap();
                        soc.next_op(this);
                        op.send(ctx, Err(ec.into()));
                        return;
                    }
                }
                setnonblock(soc, mode).unwrap();
                soc.next_op(this);
                op.send(ctx, Err(ECANCELED.into()));
            },
            ec => {
                soc.next_op(this);
                op.send(ctx, Err(ec.into()));
            },
        }
    }
}

fn async_read_detail<T, F, R>(soc: &T, buf: &mut [u8], handler: F, reader: R) -> F::Output
    where T: AsyncInput,
          F: Handler<R::Output, io::Error>,
          R: Reader,
{
    let (op, res) = handler.channel(ReadHandler {
        soc: UnsafeRefCell::new(soc),
        buf: UnsafeSliceCell::new(buf),
        reader: reader
    });
    workplace(soc.as_ctx(), |this| soc.add_op(this, op, READY));
    res.recv(soc.as_ctx())
}

pub fn async_read<T, F>(soc: &T, buf: &mut [u8], handler: F) -> F::Output
    where T: AsyncInput,
          F: Handler<usize, io::Error>
{
    async_read_detail(soc, buf, handler, Read)
}

pub fn async_recv<T, F>(soc: &T, buf: &mut [u8], flags: i32, handler: F) -> F::Output
    where T: AsyncInput,
          F: Handler<usize, io::Error>,
{
    async_read_detail(soc, buf, handler, Recv { flags: flags })
}

pub fn async_recvfrom<T, E, F>(soc: &T, buf: &mut [u8], flags: i32, ep: E, handler: F) -> F::Output
    where T: AsyncInput,
          E: SockAddr,
          F: Handler<(usize, E), io::Error>,
{
    let socklen = ep.capacity() as socklen_t;
    async_read_detail(soc, buf, handler, RecvFrom { flags: flags, ep: ep, socklen: socklen })
}