extern crate amy;
use amy::{
Poller,
Registrar,
Event,
Notification,
LineReader
};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::str;
use std::io::{Read, Write};
const IP: &'static str = "127.0.0.1:10002";
const DATA: &'static str = "Hello, World!\n";
#[test]
fn primary_example() {
let poller = Poller::new().unwrap();
let registrar = poller.get_registrar();
let (tx, rx) = channel();
let listener = TcpListener::bind(IP).unwrap();
listener.set_nonblocking(true).unwrap();
let h1 = thread::spawn(move || {
run_worker(registrar, rx, listener);
});
let h2 = thread::spawn(move || {
run_poller(poller, tx);
});
let h3 = thread::spawn(|| {
run_client();
});
for h in vec![h1, h2, h3] {
h.join().unwrap();
}
}
fn run_client() {
let mut sock = TcpStream::connect(IP).unwrap();
sock.write_all(DATA.as_bytes()).unwrap();
let mut buf = vec![0; DATA.len()];
sock.read_exact(&mut buf).unwrap();
assert_eq!(DATA, str::from_utf8(&buf).unwrap());
}
fn run_poller(mut poller: Poller, tx: Sender<Notification>) {
let mut notifications = poller.wait(5000).unwrap();
assert_eq!(1, notifications.len());
let notification = notifications.pop().unwrap();
assert_eq!(Event::Read, notification.event);
assert_eq!(0, notification.id);
tx.send(notification).unwrap();
let mut notifications = poller.wait(5000).unwrap();
assert_eq!(1, notifications.len());
let notification = notifications.pop().unwrap();
assert_eq!(Event::Read, notification.event);
assert_eq!(1, notification.id);
tx.send(notification).unwrap();
let mut notifications = poller.wait(5000).unwrap();
assert_eq!(1, notifications.len());
let notification = notifications.pop().unwrap();
assert_eq!(Event::Write, notification.event);
assert_eq!(1, notification.id);
tx.send(notification).unwrap();
let notifications = poller.wait(1000).unwrap();
assert_eq!(0, notifications.len());
}
fn run_worker(registrar: Registrar, rx: Receiver<Notification>, listener: TcpListener) {
let listener_id = registrar.register(&listener, Event::Read).unwrap();
assert_eq!(0, listener_id);
let notification = rx.recv().unwrap();
assert_eq!(notification.event, Event::Read);
assert_eq!(notification.id, listener_id);
let (mut socket, _) = listener.accept().unwrap();
socket.set_nonblocking(true).unwrap();
let socket_id = registrar.register(&socket, Event::Read).unwrap();
assert_eq!(1, socket_id);
let notification = rx.recv().unwrap();
assert_eq!(notification.event, Event::Read);
assert_eq!(notification.id, socket_id);
let mut line_reader = LineReader::new(1024);
let bytes_read = line_reader.read(&mut socket).unwrap();
assert_eq!(bytes_read, DATA.len());
let text = line_reader.iter_mut().next().unwrap().unwrap();
assert_eq!(DATA.to_string(), text);
registrar.reregister(socket_id, &socket, Event::Write).unwrap();
let notification = rx.recv().unwrap();
assert_eq!(notification.event, Event::Write);
assert_eq!(notification.id, socket_id);
let bytes_written = socket.write(&text.as_bytes()).unwrap();
assert_eq!(text.len(), bytes_written);
}