extern crate amy;
use amy::{
Poller,
Registrar,
Event,
Notification,
LineReader
};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::str;
use std::io::{ErrorKind, Read, Write};
const IP: &'static str = "127.0.0.1:10002";
const DATA: &'static str = "Hello, World!\n";
#[test]
fn primary_example() {
let poller = Poller::new().unwrap();
let registrar = poller.get_registrar();
let (worker_tx, worker_rx) = channel();
let (client_tx, client_rx) = channel();
let (poller_tx, poller_rx) = channel();
let listener = TcpListener::bind(IP).unwrap();
listener.set_nonblocking(true).unwrap();
let client_tx2 = client_tx.clone();
let h1 = thread::spawn(move || {
run_worker(registrar, worker_rx, listener, client_tx2, poller_tx);
});
let h2 = thread::spawn(move || {
run_poller(poller, worker_tx, poller_rx, client_tx);
});
let h3 = thread::spawn(|| {
run_client(client_rx);
});
for h in vec![h1, h2, h3] {
h.join().unwrap();
}
}
fn run_client(rx: Receiver<()>) {
let mut sock = TcpStream::connect(IP).unwrap();
let _ = rx.recv().unwrap();
sock.write_all(DATA.as_bytes()).unwrap();
let mut buf = vec![0; DATA.len()];
sock.read_exact(&mut buf).unwrap();
assert_eq!(DATA, str::from_utf8(&buf).unwrap());
let _ = rx.recv().unwrap();
}
fn run_poller(mut poller: Poller,
worker_tx: Sender<Notification>,
rx: Receiver<()>,
client_tx: Sender<()>) {
let mut notifications = poller.wait(5000).unwrap();
assert_eq!(1, notifications.len());
let notification = notifications.pop().unwrap();
assert_eq!(Event::Read, notification.event);
assert_eq!(1, notification.id);
worker_tx.send(notification).unwrap();
let _ = rx.recv().unwrap();
let mut notifications = poller.wait(5000).unwrap();
assert_eq!(1, notifications.len());
let notification = notifications.pop().unwrap();
assert_eq!(Event::Read, notification.event);
assert_eq!(2, notification.id);
worker_tx.send(notification).unwrap();
let _ = rx.recv().unwrap();
let mut notifications = poller.wait(5000).unwrap();
assert_eq!(1, notifications.len());
let notification = notifications.pop().unwrap();
assert_eq!(Event::Write, notification.event);
assert_eq!(2, notification.id);
worker_tx.send(notification).unwrap();
let _ = rx.recv().unwrap();
let notifications = poller.wait(1000).unwrap();
assert_eq!(0, notifications.len());
client_tx.send(()).unwrap();
}
fn run_worker(registrar: Registrar,
rx: Receiver<Notification>,
listener: TcpListener,
client_tx: Sender<()>,
poller_tx: Sender<()>) {
let listener_id = registrar.register(&listener, Event::Read).unwrap();
assert_eq!(1, listener_id);
let notification = rx.recv().unwrap();
assert_eq!(notification.event, Event::Read);
assert_eq!(notification.id, listener_id);
let (mut socket, _) = listener.accept().unwrap();
socket.set_nonblocking(true).unwrap();
let socket_id = registrar.register(&socket, Event::Read).unwrap();
assert_eq!(2, socket_id);
if let Err(e) = listener.accept() {
assert_eq!(ErrorKind::WouldBlock, e.kind());
}
client_tx.send(()).unwrap();
poller_tx.send(()).unwrap();
let notification = rx.recv().unwrap();
assert_eq!(notification.event, Event::Read);
assert_eq!(notification.id, socket_id);
let mut line_reader = LineReader::new(1024);
let bytes_read = line_reader.read(&mut socket).unwrap();
assert_eq!(bytes_read, DATA.len());
let text = line_reader.iter_mut().next().unwrap().unwrap();
assert_eq!(DATA.to_string(), text);
poller_tx.send(()).unwrap();
registrar.reregister(socket_id, &socket, Event::Write).unwrap();
let notification = rx.recv().unwrap();
assert_eq!(notification.event, Event::Write);
assert_eq!(notification.id, socket_id);
let bytes_written = socket.write(&text.as_bytes()).unwrap();
assert_eq!(text.len(), bytes_written);
poller_tx.send(()).unwrap();
}