calloop 0.1.1

A callback-based event loop
Documentation
//! An MPSC channel whose receiving end is an event source
//!
//! Create a channel using `Channel::<T>::new()`, which returns a
//! `Sender<T>` that can be cloned and sent accross threads if `T: Send`,
//! and a `Channel<T>` that can be inserted into an `EventLoop`. It will generate
//! one event per message.
//!
//! This implementation is based on
//! [`mio_more::channel`](https://docs.rs/mio-more/*/mio_more/channel/index.html).

use std::cell::RefCell;
use std::io;
use std::rc::Rc;
use std::sync::mpsc::TryRecvError;

use mio::{Evented, Poll, PollOpt, Ready, Token};

use mio_more::channel::{self as miochan, Receiver};
pub use mio_more::channel::{SendError, Sender, SyncSender, TrySendError};

use {EventDispatcher, EventSource};

/// The events generated by the channel event source
pub enum Event<T> {
    /// A message was received and is bundled here
    Msg(T),
    /// The channel was closed
    ///
    /// This means all the `Sender`s associated with this channel
    /// have been dropped, no more messages will ever be received.
    Closed,
}

/// The receiving end of the channel
///
/// This is the event source to be inserted into your `EventLoop`.
pub struct Channel<T> {
    receiver: Rc<Receiver<T>>,
}

/// Create a new asynchronous channel
pub fn channel<T>() -> (Sender<T>, Channel<T>) {
    let (sender, receiver) = miochan::channel();
    (
        sender,
        Channel {
            receiver: Rc::new(receiver),
        },
    )
}

/// Create a new synchronous, bounded channel
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
    let (sender, receiver) = miochan::sync_channel(bound);
    (
        sender,
        Channel {
            receiver: Rc::new(receiver),
        },
    )
}

impl<T> Evented for Channel<T> {
    fn register(
        &self,
        poll: &Poll,
        token: Token,
        interest: Ready,
        opts: PollOpt,
    ) -> io::Result<()> {
        self.receiver.register(poll, token, interest, opts)
    }

    fn reregister(
        &self,
        poll: &Poll,
        token: Token,
        interest: Ready,
        opts: PollOpt,
    ) -> io::Result<()> {
        self.receiver.reregister(poll, token, interest, opts)
    }

    fn deregister(&self, poll: &Poll) -> io::Result<()> {
        self.receiver.deregister(poll)
    }
}

impl<T: 'static> EventSource for Channel<T> {
    type Event = Event<T>;

    fn interest(&self) -> Ready {
        Ready::readable()
    }

    fn pollopts(&self) -> PollOpt {
        PollOpt::edge()
    }

    fn make_dispatcher<F: FnMut(Event<T>) + 'static>(
        &self,
        callback: F,
    ) -> Rc<RefCell<EventDispatcher>> {
        Rc::new(RefCell::new(Dispatcher {
            receiver: self.receiver.clone(),
            callback,
        }))
    }
}

struct Dispatcher<T, F: FnMut(Event<T>)> {
    receiver: Rc<Receiver<T>>,
    callback: F,
}

impl<T, F: FnMut(Event<T>)> EventDispatcher for Dispatcher<T, F> {
    fn ready(&mut self, _: Ready) {
        loop {
            match self.receiver.try_recv() {
                Ok(val) => (self.callback)(Event::Msg(val)),
                Err(TryRecvError::Empty) => break,
                Err(TryRecvError::Disconnected) => {
                    (self.callback)(Event::Closed);
                    break;
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use std::cell::Cell;
    use std::rc::Rc;

    use super::*;

    #[test]
    fn basic_channel() {
        let mut event_loop = ::EventLoop::new().unwrap();

        let handle = event_loop.handle();

        let (tx, rx) = channel::<()>();

        let got_msg = Rc::new(Cell::new(false));
        let got_closed = Rc::new(Cell::new(false));
        let got_msg_2 = got_msg.clone();
        let got_closed_2 = got_closed.clone();

        let _source = handle
            .insert_source(rx, move |evt| match evt {
                Event::Msg(()) => {
                    got_msg_2.set(true);
                }
                Event::Closed => {
                    got_closed_2.set(true);
                }
            })
            .unwrap();

        // nothing is sent, nothing is received
        event_loop
            .dispatch(Some(::std::time::Duration::from_millis(0)))
            .unwrap();

        assert!(!got_msg.get());
        assert!(!got_closed.get());

        // a message is send
        tx.send(()).unwrap();
        event_loop
            .dispatch(Some(::std::time::Duration::from_millis(0)))
            .unwrap();

        assert!(got_msg.get());
        assert!(!got_closed.get());

        // the sender is dropped
        ::std::mem::drop(tx);
        event_loop
            .dispatch(Some(::std::time::Duration::from_millis(0)))
            .unwrap();

        assert!(got_closed.get());
    }
}