rust_asio 0.3.1

Asynchronous I/O library
Documentation
use std::io;
use std::cmp;
use {IoObject, IoService, Handler};
use backbone::ops::{UnsafeRefCell};

fn length_error() -> io::Error {
    io::Error::new(io::ErrorKind::Other, "E2BIG")
}

pub struct StreamBuf {
    buf: Vec<u8>,
    cur: usize,
    max: usize,
}

impl StreamBuf {
    pub fn new(max: usize) -> StreamBuf {
        StreamBuf {
            buf: Vec::new(),
            cur: 0,
            max: max,
        }
    }

    pub fn max_len(&self) -> usize {
        self.max
    }

    pub fn len(&self) -> usize {
        self.cur
    }

    pub fn prepare(&mut self, len: usize) -> io::Result<&mut [u8]> {
        if self.cur < self.max {
            let len = cmp::min(self.cur + len, self.max);
            // TODO: メモリ確保に失敗したときも Err にしたい
            self.buf.reserve(len);
            unsafe { self.buf.set_len(len); }
            Ok(&mut self.buf[self.cur..])
        } else {
            Err(length_error())
        }
    }

    pub fn prepare_exact(&mut self, mut len: usize) -> io::Result<&mut [u8]> {
        len += self.cur;
        if len <= self.max {
            // TODO: メモリ確保に失敗したときも Err にしたい
            self.buf.reserve(len);
            unsafe { self.buf.set_len(len); }
            Ok(&mut self.buf[self.cur..])
        } else {
            Err(length_error())
        }
    }

    pub fn commit(&mut self, len: usize) {
        self.cur = cmp::min(self.cur + len, self.buf.len());
    }

    pub fn consume(&mut self, mut len: usize) {
        if len > self.cur { len = self.cur; }
        self.buf.drain(..len);
        self.cur -= len;
    }

    pub fn as_slice(&self) -> &[u8] {
        &self.buf[..self.cur]
    }

    pub fn as_mut_slice(&mut self) -> &mut [u8] {
        &mut self.buf[..self.cur]
    }
}

impl io::Read for StreamBuf {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let len = cmp::min(self.cur, buf.len());
        buf[..len].clone_from_slice(&self.as_slice());
        self.consume(len);
        Ok(len)
    }
}

impl io::Write for StreamBuf {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let len = buf.len();
        try!(self.prepare_exact(len)).clone_from_slice(buf);
        self.commit(len);
        Ok(len)
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

pub trait MatchCondition : Send + 'static {
    fn is_match(&mut self, buf: &[u8]) -> Result<usize, usize>;
}

impl MatchCondition for usize {
    fn is_match(&mut self, buf: &[u8]) -> Result<usize, usize> {
        if buf.len() >= *self {
            Ok(*self)
        } else {
            *self -= buf.len();
            Err(buf.len())
        }
    }
}

impl MatchCondition for u8 {
    fn is_match(&mut self, buf: &[u8]) -> Result<usize, usize> {
        if let Some(len) = buf.iter().position(|&x| x == *self) {
            Ok(len+1)
        } else {
            Err(buf.len())
        }
    }
}

impl MatchCondition for &'static [u8] {
    fn is_match(&mut self, buf: &[u8]) -> Result<usize, usize> {
        let mut cur = 0;
        if !self.is_empty() {
            let head = self[0];
            let tail = &self[1..];
            let mut it = buf.iter();
            while let Some(mut len) = it.position(|&x| x == head) {
                len += cur + 1;
                let buf = &buf[len..];
                if buf.len() < tail.len() {
                    return Err(len - 1);
                } else if buf.starts_with(tail) {
                    return Ok(len + tail.len());
                }
                cur = len;
                it = buf.iter();
            }
            cur = buf.len();
        }
        Err(cur)
    }
}

impl MatchCondition for char {
    fn is_match(&mut self, buf: &[u8]) -> Result<usize, usize> {
        (*self as u8).is_match(buf)
    }
}

impl MatchCondition for &'static str {
    fn is_match(&mut self, buf: &[u8]) -> Result<usize, usize> {
        self.as_bytes().is_match(buf)
    }
}

pub trait Stream : IoObject + Send + 'static {
    fn async_read_some<F: Handler<usize>>(&self, buf: &mut [u8], handler: F);

    fn async_write_some<F: Handler<usize>>(&self, buf: &[u8], handler: F);

    fn read_some(&self, buf: &mut [u8]) -> io::Result<usize>;

    fn write_some(&self, buf: &[u8]) -> io::Result<usize>;
}

pub fn read_until<S: Stream, C: MatchCondition>(s: &S, sbuf: &mut StreamBuf, mut cond: C) -> io::Result<usize> {
    let mut cur = 0;
    loop {
        match cond.is_match(&sbuf.as_slice()[cur..]) {
            Ok(len) => return Ok(cur + len),
            Err(len) => {
                cur += len;
                let len = try!(s.read_some(try!(sbuf.prepare(4096))));
                sbuf.commit(len);
            },
        }
    }
}

struct ReadUntilHandler<S, C, F> {
    s: UnsafeRefCell<S>,
    sbuf: UnsafeRefCell<StreamBuf>,
    cond: C,
    handler: F,
    cur: usize,
}

impl<S, C, F> Handler<usize> for ReadUntilHandler<S, C, F>
    where S: Stream,
          C: MatchCondition,
          F: Handler<usize>,
{
    fn callback(self, io: &IoService, res: io::Result<usize>) {
        let ReadUntilHandler { s, sbuf, cond, handler, cur } = self;
        match res {
            Ok(len) => {
                let s = unsafe { s.as_ref() };
                let sbuf = unsafe { sbuf.as_mut_ref() };
                sbuf.commit(len);
                async_read_until_impl(s, sbuf, cond, handler, cur);
            },
            Err(err) => handler.callback(io, Err(err)),
        }
    }
}

fn async_read_until_impl<S: Stream, C: MatchCondition, F: Handler<usize>>(s: &S, sbuf: &mut StreamBuf, mut cond: C, handler: F, mut cur: usize) {
    let io = s.io_service();
    match cond.is_match(&sbuf.as_slice()[cur..]) {
        Ok(len) => handler.callback(io, Ok(cur + len)),
        Err(len) => {
            cur += len;
            let sbuf_ptr = UnsafeRefCell::new(sbuf);
            match sbuf.prepare(4096) {
                Ok(buf) => {
                    let handler = ReadUntilHandler {
                        s: UnsafeRefCell::new(s),
                        sbuf: sbuf_ptr,
                        cond: cond,
                        handler: handler,
                        cur: cur,
                    };
                    s.async_read_some(buf, handler);
                },
                Err(err) => handler.callback(io, Err(err)),
            }
        }
    }
}

pub fn async_read_until<S: Stream, C: MatchCondition, F: Handler<usize>>(s: &S, sbuf: &mut StreamBuf, cond: C, handler: F) {
    async_read_until_impl(s, sbuf, cond, handler, 0)
}

pub fn write_until<S: Stream, C: MatchCondition>(s: &S, sbuf: &mut StreamBuf, mut cond: C) -> io::Result<usize> {
    let len = {
        let len = match cond.is_match(sbuf.as_slice()) {
            Ok(len) => len,
            Err(len) => len,
        };
        try!(s.write_some(&sbuf.as_slice()[..len]))
    };
    sbuf.consume(len);
    Ok(len)
}

struct WriteUntilHandler<S, F> {
    s: UnsafeRefCell<S>,
    sbuf: UnsafeRefCell<StreamBuf>,
    handler: F,
    total: usize,
    cur: usize,
}

impl<S, F> Handler<usize> for WriteUntilHandler<S, F>
    where S: Stream,
          F: Handler<usize>,
{
    fn callback(self, io: &IoService, res: io::Result<usize>) {
        let WriteUntilHandler { s, sbuf, handler, total, mut cur } = self;
        match res {
            Ok(len) => {
                let s = unsafe { s.as_ref() };
                let sbuf = unsafe { sbuf.as_mut_ref() };
                sbuf.consume(len);
                cur -= len;
                if cur == 0 {
                    handler.callback(io, Ok(total))
                } else {
                    async_write_until_impl(s, sbuf, len, handler, cur);
                }
            },
            Err(err) => handler.callback(io, Err(err)),
        }
    }
}

fn async_write_until_impl<S: Stream, F: Handler<usize>>(s: &S, sbuf: &mut StreamBuf, total: usize, handler: F, cur: usize) {
    let handler = WriteUntilHandler {
        s: UnsafeRefCell::new(s),
        sbuf: UnsafeRefCell::new(sbuf),
        handler: handler,
        total: total,
        cur: cur,
    };
    s.async_write_some(&sbuf.as_slice()[..cur], handler);
}

pub fn async_write_until<S: Stream, C: MatchCondition, F: Handler<usize>>(s: &S, sbuf: &mut StreamBuf, mut cond: C, handler: F) {
    let total = match cond.is_match(sbuf.as_slice()) {
        Ok(len) => len,
        Err(len) => len,
    };
    async_write_until_impl(s, sbuf, total, handler, total)
}

#[test]
fn test_streambuf() {
    let sbuf = StreamBuf::new(100);
    assert_eq!(sbuf.len(), 0);
    assert_eq!(sbuf.max_len(), 100);
}

#[test]
fn test_streambuf_prepare() {
    let mut sbuf = StreamBuf::new(100);
    assert_eq!(sbuf.prepare(70).unwrap().len(), 70);
    sbuf.commit(70);
    assert_eq!(sbuf.len(), 70);
    assert_eq!(sbuf.prepare(70).unwrap().len(), 30);
    sbuf.commit(70);
    assert_eq!(sbuf.len(), 100);
}

#[test]
fn test_streambuf_prepare_exact() {
    let mut sbuf = StreamBuf::new(100);
    assert_eq!(sbuf.prepare_exact(70).unwrap().len(), 70);
    sbuf.commit(70);
    assert_eq!(sbuf.len(), 70);
    assert!(sbuf.prepare_exact(70).is_err());
    sbuf.commit(70);
    assert_eq!(sbuf.len(), 70);
}

#[test]
fn test_streambuf_consume() {
    let mut sbuf = StreamBuf::new(100);
    assert_eq!(sbuf.prepare_exact(1).unwrap().len(), 1);
    assert_eq!(sbuf.prepare_exact(100).unwrap().len(), 100);
    sbuf.commit(1);
    assert_eq!(sbuf.len(), 1);
    assert!(sbuf.prepare_exact(100).is_err());
    sbuf.consume(1);
    assert_eq!(sbuf.len(), 0);
    assert!(sbuf.prepare_exact(100).is_ok());
}

#[test]
fn test_match_cond() {
    assert!((5 as usize).is_match("hello".as_bytes()) == Ok(5));
    assert!((5 as usize).is_match("hello world".as_bytes()) == Ok(5));
    assert!((10 as usize).is_match("hello".as_bytes()) == Err(5));
    assert!('l'.is_match("hello".as_bytes()) == Ok(3));
    assert!('w'.is_match("hello".as_bytes()) == Err(5));
    assert!("lo".is_match("hello world".as_bytes()) == Ok(5));
    assert!("world!".is_match("hello world".as_bytes()) == Err(6));
    assert!("".is_match("hello".as_bytes()) == Err(0));
    assert!("l".is_match("hello".as_bytes()) == Ok(3));
}