#![feature(collections, io, net)]
#![allow(dead_code)]
use std::collections::{VecDeque, VecMap};
use std::io::{self, BufRead, Write, BufReader};
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
use std::thread::{self, JoinHandle};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, TryRecvError};
type ClientConn = (Receiver<io::Result<Vec<u8>>>, TcpStream);
pub struct Lobby {
listener: TcpListener,
connections: Arc<Mutex<VecMap<ClientConn>>>,
names: Arc<Mutex<VecMap<String>>>,
new_r: Receiver<usize>,
thread: JoinHandle,
}
impl Lobby {
pub fn new<A>(addr: A) -> io::Result<Lobby> where A: ToSocketAddrs {
let listener = try!(TcpListener::bind(&addr));
let connections = Arc::new(Mutex::new(VecMap::new()));
let names = Arc::new(Mutex::new(VecMap::new()));
let (new_s, new_r) = channel();
let thread = {
let listener = listener.try_clone().unwrap();
let connections = connections.clone();
let names = names.clone();
thread::spawn(move || {
let mut id = 0;
let free_ids = Arc::new(Mutex::new(VecDeque::new()));
for conn in listener.incoming() {
if let Ok(conn) = conn {
let free_ids = free_ids.clone();
let new_id = match free_ids.lock().unwrap().pop_front() {
Some(id) => id,
None => { id += 1; id },
};
let conn_reader = conn.try_clone().unwrap();
let (ds, dr) = channel();
let new_s = new_s.clone();
let names = names.clone();
thread::spawn(move || {
let mut reader = BufReader::new(conn_reader);
let mut name_buf = Vec::new();
let my_id = new_id;
match reader.read_until(0, &mut name_buf) {
Ok(_) => {
name_buf.pop(); names.lock().unwrap().insert(my_id, String::from_utf8(name_buf).unwrap());
new_s.send(new_id).unwrap();
},
Err(_) => {
drop(ds);
free_ids.lock().unwrap().push_back(my_id);
return;
},
}
loop {
let result = match reader.fill_buf() {
Ok(data) if data.len() == 0 => Some(0),
Ok(data) => { ds.send(Ok(data.to_vec())).unwrap(); Some(data.len()) },
Err(e) => { ds.send(Err(e)).unwrap(); None },
};
if let Some(read) = result {
if read > 0 {
reader.consume(read);
} else {
drop(ds);
free_ids.lock().unwrap().push_back(my_id);
break;
}
}
}
});
connections.lock().unwrap().insert(new_id, (dr, conn));
}
}
})
};
Ok(Lobby{
listener: listener,
connections: connections,
names: names,
new_r: new_r,
thread: thread,
})
}
pub fn message_all(&self, data: &[u8]) -> Vec<(usize, io::Error)> {
self.message(|_| true, data)
}
pub fn message_client(&self, client: usize, data: &[u8]) -> Vec<(usize, io::Error)> {
self.message(|id| id == client, data)
}
pub fn message_rest(&self, client: usize, data: &[u8]) -> Vec<(usize, io::Error)> {
self.message(|id| id != client, data)
}
pub fn message<P>(&self, predicate: P, data: &[u8]) -> Vec<(usize, io::Error)> where P: Fn(usize) -> bool {
let mut failed = Vec::new();
for (id, &mut (_, ref mut conn)) in self.connections.lock().unwrap().iter_mut().filter(|&(id, _)| predicate(id)) {
if let Err(e) = conn.write_all(data) {
failed.push((id, e));
}
}
failed
}
pub fn scan<F: Fn(usize, ScanResult) -> ()>(&self, callback: F) {
loop {
match self.new_r.try_recv() {
Ok(id) => callback(id, ScanResult::Connected),
Err(e) if e == TryRecvError::Empty => break,
Err(e) if e == TryRecvError::Disconnected => {
panic!("tried to check for new clients on disconnected channel!");
},
Err(_) => unimplemented!(),
}
}
let mut results = Vec::with_capacity(self.connections.lock().unwrap().len());
for (id, &mut (ref mut dr, _)) in self.connections.lock().unwrap().iter_mut() {
match dr.try_recv() {
Ok(Ok(data)) => results.push((id, ScanResult::Data(data))),
Ok(Err(err)) => results.push((id, ScanResult::IoError(err))),
Err(TryRecvError::Empty) => {}, Err(TryRecvError::Disconnected) => results.push((id, ScanResult::Disconnected)),
}
}
for (id, result) in results.into_iter() {
if let ScanResult::Disconnected = result {
self.connections.lock().unwrap().remove(&id);
}
callback(id, result);
}
}
pub fn name(&self, client: usize) -> Option<String> {
self.names.lock().unwrap().get(&client).map(|s| s.clone())
}
}
pub enum ScanResult {
Data(Vec<u8>),
IoError(io::Error),
Connected,
Disconnected,
}