mod transmit;
mod broadcast;
mod receive;
pub use self::transmit::*;
pub use self::broadcast::*;
pub use self::transmit::*;
pub use self::receive::*;
use std::fmt::Debug;
use std::hash::Hash;
use std::collections::{HashMap, HashSet};
use std::iter::FromIterator;
use std::vec;
use futures::{Sink, Stream, Poll, Async, AsyncSink, StartSend};
use super::*;
use client::Disconnect;
#[derive(Clone, Debug, PartialEq)]
pub enum RoomError<I, C>
where I: Clone + Send + Debug + 'static,
C: Sink + Stream + 'static
{
UnknownClient(I),
DisconnectedClient(I, Disconnect<C::SinkError, C::Error>),
}
pub struct Room<I, C>
where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
C: Sink + Stream + 'static
{
clients: HashMap<I, Client<I, C>>,
}
impl<I, C> Room<I, C>
where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
C: Sink + Stream + 'static
{
pub fn new(clients: Vec<Client<I, C>>) -> Room<I, C> {
let mut room = Room::default();
for client in clients {
room.insert(client);
}
room
}
pub fn ids(&self) -> HashSet<I> {
self.clients.keys().cloned().collect()
}
pub fn client(&self, id: &I) -> Option<&Client<I, C>> {
self.clients.get(id)
}
#[doc(hidden)]
pub fn client_mut(&mut self, id: &I) -> Option<&mut Client<I, C>> {
self.clients.get_mut(id)
}
pub fn insert(&mut self, client: Client<I, C>) -> bool {
if self.contains(&client.id()) {
return false;
}
self.clients.insert(client.id(), client);
true
}
pub fn remove(&mut self, id: &I) -> Option<Client<I, C>> {
self.clients.remove(id)
}
pub fn contains(&self, id: &I) -> bool {
self.clients.contains_key(id)
}
pub fn broadcast_all(self, msg: C::SinkItem) -> Broadcast<I, C>
where C::SinkItem: Clone
{
let ids = self.ids();
Broadcast::new(self, msg, ids)
}
pub fn broadcast(self, msg: C::SinkItem, ids: HashSet<I>) -> Broadcast<I, C>
where C::SinkItem: Clone
{
Broadcast::new(self, msg, ids)
}
pub fn transmit(self, msgs: HashMap<I, C::SinkItem>) -> Transmit<I, C> {
Transmit::new(self, msgs.into_iter().collect())
}
pub fn receive_all(self) -> Receive<I, C> {
let ids = self.ids();
Receive::new(self, ids)
}
pub fn receive(self, ids: HashSet<I>) -> Receive<I, C> {
Receive::new(self, ids.into_iter().collect())
}
pub fn close(&mut self, ids: HashSet<I>) {
for id in ids {
let client = match self.client_mut(&id) {
Some(client) => client,
None => continue,
};
client.close();
}
}
pub fn close_all(&mut self) {
self.clients.clear();
self.clients.shrink_to_fit();
}
pub fn into_clients(self) -> HashMap<I, Client<I, C>> {
self.clients
}
fn err_for_disconnect(&mut self, id: I) -> RoomError<I, C> {
let (id, disconnect) = self.clients.remove(&id).unwrap().into_disconnect();
RoomError::DisconnectedClient(id, disconnect)
}
}
impl<I, C> Default for Room<I, C>
where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
C: Sink + Stream + 'static
{
fn default() -> Room<I, C> {
Room { clients: HashMap::new() }
}
}
impl<I, C> Sink for Room<I, C>
where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
C: Sink + Stream + 'static
{
type SinkItem = (I, C::SinkItem);
type SinkError = RoomError<I, C>;
fn start_send(&mut self,
(id, msg): Self::SinkItem)
-> StartSend<Self::SinkItem, Self::SinkError> {
let client_poll = {
let client = match self.client_mut(&id) {
None => return Err(RoomError::UnknownClient(id)),
Some(client) => client,
};
client.start_send(msg)
};
match client_poll {
Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
Ok(AsyncSink::NotReady(msg)) => Ok(AsyncSink::NotReady((id.clone(), msg))),
Err(()) => {
return Err(self.err_for_disconnect(id));
}
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
for id in self.clients.keys().cloned().collect::<Vec<I>>() {
let poll_complete = {
let client = self.clients.get_mut(&id).unwrap();
client.poll_complete()
};
match poll_complete {
Ok(Async::Ready(())) => {}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => {
return Err(self.err_for_disconnect(id));
}
}
}
Ok(Async::Ready(()))
}
}
impl<I, C> Stream for Room<I, C>
where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
C: Sink + Stream + 'static
{
type Item = (I, C::Item);
type Error = RoomError<I, C>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
for id in self.clients.keys().cloned().collect::<Vec<I>>() {
let poll = {
let client = self.clients.get_mut(&id).unwrap();
client.poll()
};
match poll {
Ok(Async::Ready(Some(msg))) => return Ok(Async::Ready(Some((id, msg)))),
Ok(Async::Ready(None)) => {
return Err(self.err_for_disconnect(id));
}
Err(()) => {
return Err(self.err_for_disconnect(id));
}
Ok(Async::NotReady) => continue,
}
}
Ok(Async::NotReady)
}
}
impl<I, C> IntoIterator for Room<I, C>
where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
C: Sink + Stream + 'static
{
type Item = Client<I, C>;
type IntoIter = vec::IntoIter<Client<I, C>>;
fn into_iter(self) -> Self::IntoIter {
self.clients.into_iter().map(|(_, client)| client).collect::<Vec<_>>().into_iter()
}
}
impl<I, C> FromIterator<Client<I, C>> for Room<I, C>
where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
C: Sink + Stream + 'static
{
fn from_iter<J: IntoIterator<Item = Client<I, C>>>(iter: J) -> Room<I, C> {
Room::new(iter.into_iter().collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::test::*;
use futures::{executor, Future, Stream};
#[test]
fn can_broadcast_all() {
let (rx0, _, client0) = mock_client("client0", 1);
let (rx1, _, client1) = mock_client("client1", 1);
let room = Room::new(vec![client0, client1]);
room.broadcast_all(TinyMsg::A).wait().unwrap();
assert_eq!(rx0.into_future().wait().unwrap().0, Some(TinyMsg::A));
assert_eq!(rx1.into_future().wait().unwrap().0, Some(TinyMsg::A));
}
#[test]
fn can_transmit() {
let (rx0, _, client0) = mock_client("client0", 1);
let (rx1, _, client1) = mock_client("client1", 1);
let mut msgs = HashMap::new();
msgs.insert(client0.id(), TinyMsg::A);
msgs.insert(client1.id(), TinyMsg::B("entropy".to_string()));
let room = Room::new(vec![client0, client1]);
room.transmit(msgs).wait().unwrap();
assert_eq!(rx0.into_future().wait().unwrap().0, Some(TinyMsg::A));
assert_eq!(rx1.into_future().wait().unwrap().0,
Some(TinyMsg::B("entropy".to_string())));
}
#[test]
fn can_receive_all() {
let (_, tx0, client0) = mock_client("client0", 1);
let (_, tx1, client1) = mock_client("client1", 1);
let (id0, id1) = (client0.id(), client1.id());
let room = Room::new(vec![client0, client1]);
let mut future = executor::spawn(room.receive_all().fuse());
assert!(future.poll_future(unpark_noop()).unwrap().is_not_ready());
tx0.send(TinyMsg::A).wait().unwrap();
tx1.send(TinyMsg::B("abc".to_string())).wait().unwrap();
let mut exp_msgs = HashMap::new();
exp_msgs.insert(id0.clone(), TinyMsg::A);
exp_msgs.insert(id1.clone(), TinyMsg::B("abc".to_string()));
match future.wait_future() {
Ok((msgs, room)) => {
assert_eq!(msgs, exp_msgs);
assert_eq!(room.into_iter()
.map(|client| (client.id(), client.is_connected()))
.collect::<HashMap<_, _>>(),
vec![(id0, true), (id1, true)].into_iter().collect());
}
_ => assert!(false),
}
}
#[test]
fn can_statuses() {
let (_, _, client0) = mock_client("client0", 1);
let (_, _, client1) = mock_client("client1", 1);
let mut room = Room::new(vec![client0, client1]);
for id in room.ids() {
assert!(room.client(&id).unwrap().is_connected());
}
let mut msgs = HashMap::new();
msgs.insert("client0".to_string(), TinyMsg::A);
room = room.transmit(msgs).wait().unwrap();
for id in room.ids() {
let client = room.client(&id).unwrap();
if id == "client0".to_string() {
assert!(client.is_disconnected().is_some());
} else {
assert!(client.is_connected());
}
}
room = room.broadcast_all(TinyMsg::B("abc".to_string())).wait().unwrap();
for id in room.ids() {
let client = room.client(&id).unwrap();
assert!(client.is_disconnected().is_some());
}
}
}