use std::collections::HashMap;
use std::io::Result;
use std::net::{TcpListener, TcpStream, SocketAddr};
use amy::{Notification, Event, Poller, Registrar};
use frame_reader::FrameReader;
use frame_writer::FrameWriter;
use Handler;
use std::sync::{Arc,Mutex};
pub fn run<T: Send + Handler + 'static>(port: u16, handler: Arc<Mutex<T>>) -> Result<()> {
use std::thread;
use std::sync::mpsc::channel;
info!("Listening on port {} ...", port);
let addr = format!("0.0.0.0:{}", port);
let mut poller = Poller::new().unwrap();
let registrar = poller.get_registrar().unwrap();
let (tx, rx) = channel();
let handle = thread::spawn(move || {
let listener = TcpListener::bind(&addr).unwrap();
listener.set_nonblocking(true).unwrap();
let listener_id = registrar.register(&listener, Event::Read).unwrap();
let mut connections = HashMap::new();
loop {
let notification : Notification = rx.recv().unwrap();
if notification.id == listener_id {
let (socket, address) = listener.accept().unwrap();
socket.set_nonblocking(true).unwrap();
let socket_id = registrar.register(&socket, Event::Both).unwrap();
info!("DEBUG accept socket#{} {:?} {:?} ...", socket_id, &socket, &address);
let conn = Conn {
sock: socket,
addr: address,
reader: FrameReader::new(1024 * 1024),
writer: FrameWriter::new(),
};
connections.insert(socket_id, conn);
} else {
if let Err(e) = handle_poll_notification(¬ification, ®istrar, &mut connections, handler.clone()) {
if let Some(conn) = connections.remove(¬ification.id) {
registrar.deregister(&conn.sock).unwrap();
error!("fail to handle poll notification Event::{:?} sock#{} {} -- {}", ¬ification.event, ¬ification.id, &conn.addr, e);
} else {
error!("fail to handle poll notification Event::{:?} sock#{} -- {}", ¬ification.event, ¬ification.id, e);
}
}
}
}
});
let handle_poller = thread::Builder::new().name(format!("poller")).spawn(move || {
loop {
let notifications = poller.wait(5000).unwrap();
for n in notifications {
tx.send(n).unwrap();
}
}
}).unwrap();
handle.join().unwrap();
handle_poller.join().unwrap();
Ok(())
}
struct Conn {
sock: TcpStream,
addr: SocketAddr,
reader: FrameReader,
writer: FrameWriter
}
fn handle_poll_notification<T: Send + Handler>(notification: &Notification,
_registrar: &Registrar,
connections: &mut HashMap<usize, Conn>,
handler: Arc<Mutex<T>>) -> Result<()> {
use value::Value;
if let Some(conn) = connections.get_mut(¬ification.id) {
match notification.event {
Event::Read => {
let _sz = conn.reader.read(&mut conn.sock)?;
for msg in conn.reader.iter_mut() {
println!("Received a complete message: {:?}", &msg);
let data = Value::Status("OK".to_string()).encode();
let _sz = conn.writer.write(&mut conn.sock, Some(data))?;
}
},
Event::Write => {
conn.writer.write(&mut conn.sock, None)?;
},
Event::Both => {
let _sz = conn.reader.read(&mut conn.sock)?;
for msg in conn.reader.iter_mut() {
let mut handler = handler.lock().unwrap();
if let Some(data) = handler.handle( &msg ) {
conn.writer.write(&mut conn.sock, Some(data.encode()))?;
}
}
let _sz = conn.writer.write(&mut conn.sock, None)?;
}
}
} else {
error!("SKIP notification for un-registered socket#{}", notification.id);
}
Ok(())
}