use std::cell::RefCell;
use std::io;
use std::rc::Rc;
use std::sync::mpsc::TryRecvError;
use mio::{Evented, Poll, PollOpt, Ready, Token};
use mio_more::channel::{self as miochan, Receiver};
pub use mio_more::channel::{SendError, Sender, SyncSender, TrySendError};
use {EventDispatcher, EventSource};
pub enum Event<T> {
Msg(T),
Closed,
}
pub struct Channel<T> {
receiver: Rc<Receiver<T>>,
}
pub fn channel<T>() -> (Sender<T>, Channel<T>) {
let (sender, receiver) = miochan::channel();
(
sender,
Channel {
receiver: Rc::new(receiver),
},
)
}
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
let (sender, receiver) = miochan::sync_channel(bound);
(
sender,
Channel {
receiver: Rc::new(receiver),
},
)
}
impl<T> Evented for Channel<T> {
fn register(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
self.receiver.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
self.receiver.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.receiver.deregister(poll)
}
}
impl<T: 'static> EventSource for Channel<T> {
type Event = Event<T>;
fn interest(&self) -> Ready {
Ready::readable()
}
fn pollopts(&self) -> PollOpt {
PollOpt::edge()
}
fn make_dispatcher<F: FnMut(Event<T>) + 'static>(
&self,
callback: F,
) -> Rc<RefCell<EventDispatcher>> {
Rc::new(RefCell::new(Dispatcher {
receiver: self.receiver.clone(),
callback,
}))
}
}
struct Dispatcher<T, F: FnMut(Event<T>)> {
receiver: Rc<Receiver<T>>,
callback: F,
}
impl<T, F: FnMut(Event<T>)> EventDispatcher for Dispatcher<T, F> {
fn ready(&mut self, _: Ready) {
loop {
match self.receiver.try_recv() {
Ok(val) => (self.callback)(Event::Msg(val)),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
(self.callback)(Event::Closed);
break;
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::cell::Cell;
use std::rc::Rc;
use super::*;
#[test]
fn basic_channel() {
let mut event_loop = ::EventLoop::new().unwrap();
let handle = event_loop.handle();
let (tx, rx) = channel::<()>();
let got_msg = Rc::new(Cell::new(false));
let got_closed = Rc::new(Cell::new(false));
let got_msg_2 = got_msg.clone();
let got_closed_2 = got_closed.clone();
let _source = handle
.insert_source(rx, move |evt| match evt {
Event::Msg(()) => {
got_msg_2.set(true);
}
Event::Closed => {
got_closed_2.set(true);
}
})
.unwrap();
event_loop
.dispatch(Some(::std::time::Duration::from_millis(0)))
.unwrap();
assert!(!got_msg.get());
assert!(!got_closed.get());
tx.send(()).unwrap();
event_loop
.dispatch(Some(::std::time::Duration::from_millis(0)))
.unwrap();
assert!(got_msg.get());
assert!(!got_closed.get());
::std::mem::drop(tx);
event_loop
.dispatch(Some(::std::time::Duration::from_millis(0)))
.unwrap();
assert!(got_closed.get());
}
}