mdrv 0.0.5

Modular driver based on Mio for managing multiple connections over different protocols
Documentation
use std::io;
use std::time::{Duration};
use std::sync::mpsc::{self as std_chan};
use std::error::{Error as StdError};
use std::fmt;

use mio;
use mio_extras::channel::{self as mio_chan};


pub use self::mio_chan::SendError;
pub use self::std_chan::TryRecvError;

pub use self::mio_chan::{Sender, Receiver};
pub use self::mio_chan::channel;

#[derive(Debug)]
pub enum RecvError {
    Io(io::Error),
    Disconnected,
    Empty,
}

impl StdError for RecvError {
    fn description(&self) -> &str {
        match self {
            RecvError::Io(e) => e.description(),
            RecvError::Disconnected => "Channel disconnected",
            RecvError::Empty => "Channel empty",
        }
    }

    fn cause(&self) -> Option<&StdError> {
        match self {
            RecvError::Io(e) => Some(e),
            RecvError::Disconnected => None,
            RecvError::Empty => None,
        }
    }
}

impl fmt::Display for RecvError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{}", (self as &StdError).description())
    }
}

#[derive(Debug)]
pub enum Error {
    Io(io::Error),
    Disconnected,
    Empty,
}

impl StdError for Error {
    fn description(&self) -> &str {
        match self {
            Error::Io(e) => e.description(),
            Error::Disconnected => "Channel disconnected",
            Error::Empty => "Channel empty",
        }
    }

    fn cause(&self) -> Option<&StdError> {
        match self {
            Error::Io(e) => Some(e),
            Error::Disconnected => None,
            Error::Empty => None,
        }
    }
}

impl fmt::Display for Error {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{}", (self as &StdError).description())
    }
}

impl<T> From<SendError<T>> for Error {
    fn from(err: SendError<T>) -> Self {
        match err {
            SendError::Io(io_err) => Error::Io(io_err),
            SendError::Disconnected(_) => Error::Disconnected,
        }
    }
}

impl From<RecvError> for Error {
    fn from(err: RecvError) -> Self {
        match err {
            RecvError::Io(io_err) => Error::Io(io_err),
            RecvError::Disconnected => Error::Disconnected,
            RecvError::Empty => Error::Empty,
        }
    }
}

impl From<TryRecvError> for Error {
    fn from(err: TryRecvError) -> Self {
        match err {
            TryRecvError::Disconnected => Error::Disconnected,
            TryRecvError::Empty => Error::Empty,
        }
    }
}


pub struct SinglePoll {
    pub poll: mio::Poll,
    pub events: mio::Events,
}

impl SinglePoll {
    pub fn new<T>(rx: &Receiver<T>) -> Result<Self, Error> {
        let poll = mio::Poll::new().map_err(|e| Error::Io(e))?;
        poll.register(
            rx,
            mio::Token(0),
            mio::Ready::readable(),
            mio::PollOpt::edge()
        ).map_err(|e| Error::Io(e))?;
        let events = mio::Events::with_capacity(1);

        Ok(Self { poll, events })
    }

    pub fn wait(&mut self, timeout: Option<Duration>) -> Result<(), RecvError> {
        self.poll.poll(&mut self.events, timeout).map_err(|e| RecvError::Io(e)).and_then(|_| {
            match self.events.iter().next() {
                Some(res) => {
                    assert!(res.token() == mio::Token(0) && res.readiness().is_readable());
                    Ok(())
                },
                None => Err(RecvError::Empty),
            }
        })
    }
}

pub struct PollReceiver<'a, T: 'a> {
    pub rx: &'a Receiver<T>,
    pub poll: SinglePoll,
}

impl<'a, T> PollReceiver<'a, T> {
    pub fn new(rx: &'a Receiver<T>) -> Result<Self, Error> {
        Ok(Self { rx, poll: SinglePoll::new(&rx)? })
    }

    pub fn wait(&mut self, timeout: Option<Duration>) -> Result<(), RecvError> {
        self.poll.wait(timeout)
    }

    pub fn recv(&mut self, timeout: Option<Duration>) -> Result<T, RecvError> {
        match self.rx.try_recv() {
            Ok(msg) => Ok(msg),
            Err(err) => match err {
                TryRecvError::Empty => self.wait(timeout).and_then(|_| {
                    match self.rx.try_recv() {
                        Ok(msg) => Ok(msg),
                        Err(err) => match err {
                            TryRecvError::Empty => unreachable!(),
                            TryRecvError::Disconnected => Err(RecvError::Disconnected),
                        }
                    }
                }),
                TryRecvError::Disconnected => Err(RecvError::Disconnected),
            }
        }
    }
}


#[cfg(test)]
mod test {
    use super::*;
    use std::thread;
    use std::time::{Duration};


    #[test]
    fn send_recv() {
        let (tx, rx) = channel();

        tx.send(42 as i32).unwrap();

        let mut n = None;
        for _ in 0..100 {
            let r = rx.try_recv();
            if let Err(TryRecvError::Empty) = r {
                thread::sleep(Duration::from_millis(1));
            } else {
                n = Some(r.unwrap());
                break;
            }
        }

        assert_eq!(n.unwrap(), 42);
    }

    #[test]
    fn send_pollrecv() {
        let (tx, rx) = channel();
        let mut prx = PollReceiver::new(&rx).unwrap();

        tx.send(42 as i32).unwrap();
        let n = prx.recv(None).unwrap();
        
        assert_eq!(n, 42);
    }

    #[test]
    fn send_close_poll_recv() {
        let (tx, rx) = channel();
        let mut prx = PollReceiver::new(&rx).unwrap();

        thread::spawn(move || {
            tx.send(42 as i32).unwrap();
        });
        thread::sleep(Duration::from_millis(10));

        let n = prx.recv(None).unwrap();
        assert_eq!(n, 42);

        assert_matches!(prx.recv(None), Err(RecvError::Disconnected));
    }

    #[test]
    fn send_poll_recv_close() {
        let (tx, rx) = channel();
        let mut prx = PollReceiver::new(&rx).unwrap();

        thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            tx.send(42 as i32).unwrap();
        });

        let n = prx.recv(None).unwrap();
        assert_eq!(n, 42);

        assert_matches!(prx.recv(None), Err(RecvError::Disconnected));
    }

    #[test]
    fn send_close_poll_wait() {
        let (tx, rx) = channel();
        let mut prx = PollReceiver::new(&rx).unwrap();

        thread::spawn(move || {
            tx.send(42 as i32).unwrap();
        });
        thread::sleep(Duration::from_millis(10));

        let n = prx.recv(None).unwrap();
        assert_eq!(n, 42);

        assert_matches!(prx.wait(None), Ok(()));
    }

    #[test]
    fn close_send() {
        let tx = channel().0;

        if let Err(SendError::Disconnected(n)) = tx.send(42 as i32) {
            assert_eq!(n, 42);
        } else {
            panic!();
        }
    }

    #[test]
    fn multiple_polls() {
        let (_tx, rx) = channel::<i32>();

        PollReceiver::new(&rx).unwrap();
        assert_matches!(PollReceiver::new(&rx).err(), Some(Error::Io(_)));
    }
}