nexus 0.0.1

A high performance networking library
Documentation
use std::sync::{Arc, Mutex};
use std::os::unix::io::{RawFd, AsRawFd};
use mio::{Selector, PollOpt, Token, EventSet, Evented};
use mio::unix::{PipeReader, PipeWriter};
use std::io::{self, Read};

use reactor::{ReactorError, Configurer};
use protocol::{Protocol};

#[derive(Clone)]
pub struct FakeProtocol<'a> {
    inner: Arc<Inner<'a>>,
}

struct Inner<'a> {
    on_readable_fd: Mutex<Option<RawFd>>,
    on_writable_fd: Mutex<Option<RawFd>>,
    on_timeout_fd: Mutex<Option<RawFd>>,
    on_disconnect_fd: Mutex<Option<RawFd>>,
    on_error_fd: Mutex<Option<RawFd>>,
    additions: Mutex<Vec<(&'a mut FakeSocket, EventSet)>>,
    updates: Mutex<Vec<(Token, EventSet)>>,
    removes: Mutex<Vec<Token>>,
    error: Mutex<Option<io::Error>>,
}

#[derive(Debug)]
pub enum FakeSocket {
    PReader(PipeReader),
    PWriter(PipeWriter),
}
use test_helpers::FakeSocket::*;

impl<'a> Evented for &'a mut FakeSocket {
    fn register(&self, selector:
                &mut Selector,
                token: Token,
                interest: EventSet,
                opts: PollOpt) -> io::Result<()> {
        match **self {
            PReader(ref r) => r.register(selector, token, interest, opts),
            PWriter(ref w) => w.register(selector, token, interest, opts),
        }
    }

    fn reregister(&self,
                  selector: &mut Selector,
                  token: Token,
                  interest: EventSet,
                  opts: PollOpt) -> io::Result<()> {
        match **self {
            PReader(ref r) => r.reregister(selector, token, interest, opts),
            PWriter(ref w) => w.reregister(selector, token, interest, opts),
        }
    }

    fn deregister(&self, selector: &mut Selector) -> io::Result<()> {
        match **self {
            PReader(ref r) => r.deregister(selector),
            PWriter(ref w) => w.deregister(selector),
        }
    }
}

impl AsRawFd for FakeSocket {
    fn as_raw_fd(&self) -> RawFd {
        match *self {
            PReader(ref r) => r.as_raw_fd(),
            PWriter(ref w) => w.as_raw_fd(),
        }
    }
}


impl<'a> Protocol for FakeProtocol<'a> {
    type Socket = &'a mut FakeSocket;

    fn on_readable(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, _token: Token) {
        let mut buf = [0u8, 32];
        let res = match *socket {
            &mut PReader(ref mut r) => r.read(&mut buf),
            &mut PWriter(_) => panic!("A readable writer was found."),
        };

        match res {
            Ok(_) => { },
            Err(e) => panic!("{:?}", e),
        }
        {
            let mut guard = self.inner.on_readable_fd.lock().unwrap();
            *guard = Some(socket.as_raw_fd());
        }
        self.configure(configurer);
    }

    fn on_writable(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, _token: Token) {
        {
            let mut guard = self.inner.on_writable_fd.lock().unwrap();
            *guard = Some(socket.as_raw_fd());
        }
        self.configure(configurer);
    }

    fn on_timeout(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, _token: Token) {
        {
            let mut guard = self.inner.on_timeout_fd.lock().unwrap();
            *guard = Some(socket.as_raw_fd());
        }
        self.configure(configurer);
    }

    fn on_disconnect(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, _token: Token) {
        {
            let mut guard = self.inner.on_disconnect_fd.lock().unwrap();
            *guard = Some(socket.as_raw_fd());
        }
        self.configure(configurer);
    }

    fn on_socket_error(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, _token: Token) {
        {
            let mut guard = self.inner.on_error_fd.lock().unwrap();
            *guard = Some(socket.as_raw_fd());
        }
        self.configure(configurer);
    }

    fn on_event_loop_error(&mut self, error: ReactorError<Self::Socket>) {
        panic!("Received error: {:?}", error);
    }

    fn tick(&mut self, configurer: &mut Configurer<Self::Socket>) {
        self.configure(configurer);
    }
}

impl<'a> FakeProtocol<'a> {
    pub fn new() -> FakeProtocol<'a> {
        let inner = Inner{
            on_readable_fd: Mutex::new(None),
            on_writable_fd: Mutex::new(None),
            on_timeout_fd: Mutex::new(None),
            on_disconnect_fd: Mutex::new(None),
            on_error_fd: Mutex::new(None),
            additions: Mutex::new(Vec::new()),
            updates: Mutex::new(Vec::new()),
            removes: Mutex::new(Vec::new()),
            error: Mutex::new(None),
        };

        FakeProtocol{
            inner: Arc::new(inner),
        }
    }

    pub fn clear_all(&mut self) {
        let mut guard = self.inner.on_error_fd.lock().unwrap();
        *guard = None;

        let mut guard = self.inner.on_readable_fd.lock().unwrap();
        *guard = None;

        let mut guard = self.inner.on_writable_fd.lock().unwrap();
        *guard = None;

        let mut guard = self.inner.on_timeout_fd.lock().unwrap();
        *guard = None;

        let mut guard = self.inner.on_disconnect_fd.lock().unwrap();
        *guard = None;
    }

    pub fn add_socket(&mut self, socket: &'a mut FakeSocket, events: EventSet) {
        let mut guard = self.inner.additions.lock().unwrap();
        guard.push((socket, events));
    }

    pub fn update_socket(&mut self, socket: Token, events: EventSet) {
        let mut guard = self.inner.updates.lock().unwrap();
        guard.push((socket, events));
    }

    pub fn remove_socket(&mut self, socket: Token) {
        let mut guard = self.inner.removes.lock().unwrap();
        guard.push(socket);
    }

    pub fn shutdown_error(&mut self, err: io::Error) {
        let mut guard = self.inner.error.lock().unwrap();
        *guard = Some(err);
    }

    fn configure(&mut self, event_configurer: &mut Configurer<<Self as Protocol>::Socket>) {
        let mut err_guard = self.inner.error.lock().unwrap();
        match err_guard.take() {
            Some(err) => {
                event_configurer.shutdown(err);
                return;
            },
            None => {},
        }

        let mut guard = self.inner.additions.lock().unwrap();
        while let Some((socket, events)) = guard.pop() {
            event_configurer.add_socket(socket, events);
        }

        let mut guard = self.inner.updates.lock().unwrap();
        while let Some((token, events)) = guard.pop() {
            event_configurer.update_socket(token, events);
        }

        let mut guard = self.inner.removes.lock().unwrap();
        while let Some(token) = guard.pop() {
            event_configurer.remove_socket(token);
        }
    }

    pub fn readable_fd(&mut self) -> Option<RawFd> {
        let guard = self.inner.on_readable_fd.lock().unwrap();
        (*guard).clone()
    }

    pub fn writable_fd(&mut self) -> Option<RawFd> {
        let guard = self.inner.on_writable_fd.lock().unwrap();
        (*guard).clone()
    }

    pub fn timeout_fd(&mut self) -> Option<RawFd> {
        let guard = self.inner.on_timeout_fd.lock().unwrap();
        (*guard).clone()
    }

    pub fn disconnect_fd(&mut self) -> Option<RawFd> {
        let guard = self.inner.on_readable_fd.lock().unwrap();
        (*guard).clone()
    }

    pub fn error_fd(&mut self) -> Option<RawFd> {
        let guard = self.inner.on_error_fd.lock().unwrap();
        (*guard).clone()
    }
}