use std::io;
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use std::thread;
use storage::Storage;
use mo::Message;
#[derive(Debug)]
pub struct Server<A: ToSocketAddrs + Sync, S: Storage + Sync + Send> {
addr: A,
listener: Option<TcpListener>,
storage: Arc<Mutex<S>>,
}
impl<A, S> Server<A, S>
where A: ToSocketAddrs + Sync,
S: 'static + Storage + Sync + Send
{
pub fn new(addr: A, storage: S) -> Server<A, S> {
Server {
addr: addr,
listener: None,
storage: Arc::new(Mutex::new(storage)),
}
}
pub fn bind(&mut self) -> io::Result<()> {
self.listener = Some(try!(self.create_listener()));
Ok(())
}
pub fn serve_forever(mut self) {
let listener = match self.listener {
Some(ref listener) => listener,
None => {
self.listener = Some(self.create_listener().unwrap());
self.listener.as_ref().unwrap()
}
};
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let storage = self.storage.clone();
thread::spawn(move || handle_stream(stream, storage));
}
Err(err) => {
thread::spawn(move || handle_error(err));
}
}
}
}
fn create_listener(&self) -> io::Result<TcpListener> {
TcpListener::bind(&self.addr)
}
}
fn handle_stream(stream: TcpStream, storage: Arc<Mutex<Storage>>) {
match stream.peer_addr() {
Ok(addr) => {
debug!("Handling TcpStream from {}", addr);
}
Err(err) => {
warn!("Problem when extracting peer address from TcpStream, but we'll press on: {:?}",
err);
}
}
let ref message = match Message::read_from(stream) {
Ok(message) => {
info!("Recieved message from IMEI {} with MOMN {} and {} byte payload",
message.imei(),
message.momsn(),
message.payload_ref().len());
message
}
Err(err) => {
error!("Error when reading message: {:?}", err);
return;
}
};
match storage.lock().expect("unable to lock storage mutex").store(message) {
Ok(_) => info!("Stored message"),
Err(err) => error!("Problem storing message: {:?}", err),
}
}
fn handle_error(err: io::Error) {
error!("Error when receiving tcp communication: {:?}", err);
}