mdrv 0.0.5

Modular driver based on Mio for managing multiple connections over different protocols
Documentation
use mio;

use ::channel::{self, channel, Sender, Receiver, SendError, TryRecvError};
use ::proxy::{self, Proxy, Control, Eid};


#[derive(Debug)]
pub enum Tx {
    Close,
}

#[derive(Debug)]
pub enum Rx {
    Attached,
    Detached,
    Closed,
}

pub trait TxExt: From<Tx> + Into<Result<Tx, Self>> {}
pub trait RxExt: From<Rx> + Into<Result<Rx, Self>> {}

impl Into<Result<Tx, Self>> for Tx {
    fn into(self) -> Result<Tx, Self> {
        Ok(self)
    }
}

impl Into<Result<Rx, Self>> for Rx {
    fn into(self) -> Result<Rx, Self> {
        Ok(self)
    }
}

impl TxExt for Tx {}
impl RxExt for Rx {}


pub trait UserProxy<T: TxExt, R: RxExt>: Proxy {
    fn process_channel(&mut self, ctrl: &mut Control, msg: T) -> ::Result<()>;
}

pub struct ProxyWrapper<P: UserProxy<T, R>, T: TxExt, R: RxExt> {
    pub user: P,
    pub tx: Sender<R>,
    pub rx: Receiver<T>,
}

impl<P: UserProxy<T, R>, T: TxExt, R: RxExt> ProxyWrapper<P, T, R> {
    fn new(user: P, tx: Sender<R>, rx: Receiver<T>) -> ProxyWrapper<P, T, R> {
        ProxyWrapper { user, tx, rx }
    }
}

impl<P: UserProxy<T, R>, T: TxExt, R: RxExt> Proxy for ProxyWrapper<P, T, R> {
    fn attach(&mut self, ctrl: &Control) -> ::Result<()> {
        ctrl.register(&self.rx, 0, mio::Ready::readable(), mio::PollOpt::edge())
        .and_then(|_| {
            self.user.attach(ctrl)
            .and_then(|_| {
                self.tx.send(Rx::Attached.into()).map_err(|e| ::Error::Channel(e.into()))
                .and_then(|_| {
                    Ok(())
                })
                .or_else(|e| {
                    self.user.detach(ctrl).unwrap();
                    Err(e)
                })
            })
            .or_else(|e| {
                ctrl.deregister(&self.rx).unwrap();
                Err(e)
            })
        })
    }
    fn detach(&mut self, ctrl: &Control) -> ::Result<()> {
        self.user.detach(ctrl)
        .and_then(|_| { ctrl.deregister(&self.rx) })
        .and_then(|_| {
            match self.tx.send(Rx::Detached.into()) {
                Ok(()) => Ok(()),
                Err(err) => match err {
                    SendError::Disconnected(_) => Ok(()),
                    other => Err(::Error::Channel(other.into())),
                }
            }
        })
    }

    fn process(&mut self, ctrl: &mut Control, readiness: mio::Ready, eid: Eid) -> ::Result<()> {
        match eid {
            0 => {
                assert!(readiness.is_readable());
                loop {
                    match self.rx.try_recv() {
                        Ok(msg) => {
                            let umsg = match msg.into() {
                                Ok(bmsg) => {
                                    match bmsg {
                                        Tx::Close => ctrl.close(),
                                    }
                                    bmsg.into()
                                },
                                Err(umsg) => umsg,
                            };
                            match self.user.process_channel(ctrl, umsg) {
                                Ok(()) => (),
                                Err(e) => break Err(e),
                            }
                        },
                        Err(err) => match err {
                            TryRecvError::Empty => break Ok(()),
                            TryRecvError::Disconnected => break Err(channel::Error::Disconnected.into()),
                        }
                    }
                }
            },
            other_eid => self.user.process(ctrl, readiness, other_eid),
        }
    }
}

impl<P: UserProxy<T, R>, T: TxExt, R: RxExt> Drop for ProxyWrapper<P, T, R> {
    fn drop(&mut self) {
        match self.tx.send(Rx::Closed.into()) {
            Ok(()) => (),
            Err(err) => match err {
                SendError::Disconnected(_) => (),
                other => panic!("{:?}", other),
            }
        }
    }
}


pub trait UserHandle<T: TxExt, R: RxExt> {
    fn process_channel(&mut self, msg: R) -> ::Result<()>;
}

pub struct Handle<H: UserHandle<T, R>, T: TxExt, R: RxExt> {
    pub user: H,
    pub tx: Sender<T>,
    pub rx: Receiver<R>,
    closed: bool,
}

impl<H: UserHandle<T, R>, T: TxExt, R: RxExt> Handle<H, T, R> {
    fn new(user: H, tx: Sender<T>, rx: Receiver<R>) -> Self {
        Handle { user, tx, rx, closed: false }
    }

    fn is_exit(msg: R) -> (R, bool) {
        match msg.into() {
            Ok(bmsg) => {
                let v = match bmsg {
                    Rx::Closed => true,
                    _ => false,
                };
                (bmsg.into(), v)
            },
            Err(umsg) => (umsg, false),
        }
    }

    pub fn process(&mut self) -> ::Result<()> {
        if self.closed {
            return Err(proxy::Error::Closed.into());
        }

        loop {
            match self.rx.try_recv() {
                Ok(msg) => {
                    let (msg, exit) = Self::is_exit(msg);
                    match self.user.process_channel(msg) {
                        Ok(()) => {
                            if exit {
                                self.closed = true;
                                break Err(proxy::Error::Closed.into());
                            } else {
                                continue;
                            }
                        },
                        Err(e) => break Err(e),
                    }
                },
                Err(err) => match err {
                    TryRecvError::Empty => break Ok(()),
                    TryRecvError::Disconnected => break Err(::Error::from(channel::Error::Disconnected)),
                },
            };
        }
    }

    pub fn close(&mut self) -> ::Result<()> {
        if self.closed {
            return Err(proxy::Error::Closed.into());
        }
        self.tx.send(Tx::Close.into()).map_err(|e| ::Error::Channel(e.into()))
    }

    pub fn is_closed(&self) -> bool {
        self.closed
    }
}

impl<H: UserHandle<T, R>, T: TxExt, R: RxExt> Drop for Handle<H, T, R> {
    fn drop(&mut self) {
        match self.close() {
            Ok(_) => (),
            Err(err) => match err {
                ::Error::Proxy(proxy::Error::Closed) => (),
                ::Error::Channel(channel::Error::Disconnected) => (),
                other => panic!("{:?}", other),
            },
        }
    }
}

pub fn create<P, H, T, R>(user_proxy: P, user_handle: H) -> ::Result<(ProxyWrapper<P, T, R>, Handle<H, T, R>)>
where P: UserProxy<T, R>, H: UserHandle<T, R>, T: TxExt, R: RxExt {
    let (ptx, hrx) = channel();
    let (htx, prx) = channel();
    let proxy = ProxyWrapper::new(user_proxy, ptx, prx);
    let handle = Handle::new(user_handle, htx, hrx);
    Ok((proxy, handle))
}


#[cfg(test)]
mod test {
    use super::*;

    use ::channel::{SinglePoll, PollReceiver, RecvError};

    use std::thread;

    use ::dummy;

    #[test]
    fn handle_close_after() {
        let (_, mut h) = dummy::create().unwrap();

        assert_matches!(h.close(), Err(::Error::Channel(channel::Error::Disconnected)));

        assert_eq!(h.is_closed(), false);

        assert_matches!(h.process(), Err(::Error::Proxy(proxy::Error::Closed)));
        assert_matches!(h.user.msgs.pop_front(), Some(Rx::Closed));
        assert_matches!(h.user.msgs.pop_front(), None);
        assert_eq!(h.is_closed(), true);

        assert_matches!(h.rx.try_recv(), Err(TryRecvError::Disconnected));
    }

    #[test]
    fn handle_close_before() {
        let (p, mut h) = dummy::create().unwrap();

        thread::spawn(move || {
            let mp = p;
            let mut prx = PollReceiver::new(&mp.rx).unwrap();
            assert_matches!(prx.recv(None), Ok(Tx::Close));
        });

        h.process().unwrap();

        h.close().unwrap();
        assert_eq!(h.is_closed(), false);

        let mut sp = SinglePoll::new(&h.rx).unwrap();
        sp.wait(None).unwrap();
        assert_matches!(h.process(), Err(::Error::Proxy(proxy::Error::Closed)));

        assert_matches!(h.user.msgs.pop_front(), Some(Rx::Closed));
        assert_matches!(h.user.msgs.pop_front(), None);
        assert_eq!(h.is_closed(), true);
    }

    #[test]
    fn handle_drop() {
        let (p, _) = dummy::create().unwrap();

        let mut prx = PollReceiver::new(&p.rx).unwrap();
        assert_matches!(prx.recv(None), Ok(Tx::Close));
        assert_matches!(prx.recv(None), Err(RecvError::Disconnected));
    }
}