use {expect_events, sleep_ms};
use mio::{channel, Events, Poll, PollOpt, Ready, Token};
use mio::event::Event;
use std::sync::mpsc::TryRecvError;
use std::thread;
use std::time::Duration;
#[test]
pub fn test_poll_channel_edge() {
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(1024);
let (tx, rx) = channel::channel();
poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
tx.send("hello").unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(1, num);
let event = events.get(0).unwrap();
assert_eq!(event.token(), Token(123));
assert_eq!(event.readiness(), Ready::readable());
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
assert_eq!("hello", rx.try_recv().unwrap());
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
tx.send("goodbye").unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(1, num);
let event = events.get(0).unwrap();
assert_eq!(event.token(), Token(123));
assert_eq!(event.readiness(), Ready::readable());
rx.try_recv().unwrap();
drop(tx);
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(1, num);
let event = events.get(0).unwrap();
assert_eq!(event.token(), Token(123));
assert_eq!(event.readiness(), Ready::readable());
match rx.try_recv() {
Err(TryRecvError::Disconnected) => {}
no => panic!("unexpected value {:?}", no),
}
}
#[test]
pub fn test_poll_channel_oneshot() {
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(1024);
let (tx, rx) = channel::channel();
poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
tx.send("hello").unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(1, num);
let event = events.get(0).unwrap();
assert_eq!(event.token(), Token(123));
assert_eq!(event.readiness(), Ready::readable());
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
assert_eq!("hello", rx.try_recv().unwrap());
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
tx.send("goodbye").unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
for _ in 0..3 {
poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(1, num);
let event = events.get(0).unwrap();
assert_eq!(event.token(), Token(123));
assert_eq!(event.readiness(), Ready::readable());
}
assert_eq!("goodbye", rx.try_recv().unwrap());
poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
}
#[test]
pub fn test_poll_channel_level() {
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(1024);
let (tx, rx) = channel::channel();
poll.register(&rx, Token(123), Ready::readable(), PollOpt::level()).unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
tx.send("hello").unwrap();
for i in 0..5 {
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert!(1 == num, "actually got {} on iteration {}", num, i);
let event = events.get(0).unwrap();
assert_eq!(event.token(), Token(123));
assert_eq!(event.readiness(), Ready::readable());
}
assert_eq!("hello", rx.try_recv().unwrap());
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
}
#[test]
pub fn test_poll_channel_writable() {
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(1024);
let (tx, rx) = channel::channel();
poll.register(&rx, Token(123), Ready::writable(), PollOpt::edge()).unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
tx.send("hello").unwrap();
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
}
#[test]
pub fn test_dropping_receive_before_poll() {
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(1024);
let (tx, rx) = channel::channel();
poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
tx.send("hello").unwrap();
drop(rx);
let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
assert_eq!(0, num);
}
#[test]
pub fn test_mixing_channel_with_socket() {
use mio::net::{TcpListener, TcpStream};
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(1024);
let (tx, rx) = channel::channel();
let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
poll.register(&rx, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
tx.send("hello").unwrap();
let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
sleep_ms(250);
expect_events(&poll, &mut events, 2, vec![
Event::new(Ready::empty(), Token(0)),
Event::new(Ready::empty(), Token(1)),
]);
}
#[test]
pub fn test_sending_from_other_thread_while_polling() {
const ITERATIONS: usize = 20;
const THREADS: usize = 5;
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(1024);
for _ in 0..ITERATIONS {
let (tx, rx) = channel::channel();
poll.register(&rx, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
for _ in 0..THREADS {
let tx = tx.clone();
thread::spawn(move || {
sleep_ms(50);
tx.send("ping").unwrap();
});
}
let mut recv = 0;
while recv < THREADS {
let num = poll.poll(&mut events, None).unwrap();
if num != 0 {
assert_eq!(1, num);
assert_eq!(events.get(0).unwrap().token(), Token(0));
while let Ok(_) = rx.try_recv() {
recv += 1;
}
}
}
}
}