mod receive;
mod transmit;
pub use self::receive::*;
pub use self::transmit::*;
use std::fmt::{self, Debug};
use futures::{Future, Sink, Stream, Poll, Async, AsyncSink, StartSend};
use futures::sync::mpsc;
use tokio_timer;
#[allow(unused_imports)]
use super::*;
pub type MpscClient<I, M> = Client<I, Unsplit<mpsc::Sender<M>, mpsc::Receiver<M>>>;
#[derive(Debug, Clone)]
pub struct Client<I, C>
where I: Clone + Send + Debug + 'static,
C: Sink + Stream + 'static
{
id: I,
inner: Result<C, Disconnect<C::SinkError, C::Error>>,
}
impl<I, T, R> Client<I, Unsplit<T, R>>
where I: Clone + Send + Debug + 'static,
T: Sink + 'static,
R: Stream + 'static
{
pub fn new_from_split(id: I, tx: T, rx: R) -> Client<I, Unsplit<T, R>> {
Client::new(id, Unsplit { tx: tx, rx: rx })
}
}
impl<I, C> Client<I, C>
where I: Clone + Send + Debug + 'static,
C: Sink + Stream + 'static
{
pub fn new(id: I, tx_rx: C) -> Client<I, C> {
Client {
id: id,
inner: Ok(tx_rx),
}
}
pub fn id(&self) -> I {
self.id.clone()
}
pub fn rename<J>(self, new_id: J) -> Client<J, C>
where J: Clone + Send + Debug + 'static
{
Client {
id: new_id,
inner: self.inner,
}
}
pub fn transmit(self, msg: C::SinkItem) -> Transmit<I, C> {
Transmit::new(self, msg)
}
pub fn receive(self) -> Receive<I, C> {
Receive::new(self)
}
pub fn is_connected(&self) -> bool {
self.inner.is_ok()
}
pub fn is_disconnected(&self) -> Option<&Disconnect<C::SinkError, C::Error>> {
if let Err(ref e) = self.inner {
Some(e)
} else {
None
}
}
#[doc(hidden)]
pub fn update_status(self) -> Box<Future<Item = (Option<C::Item>, Self), Error = Self>> {
unimplemented!();
}
pub fn close(&mut self) -> bool {
if self.inner.is_ok() {
self.inner = Err(Disconnect::Closed);
return true;
}
false
}
pub fn into_inner(self) -> (I, Result<C, Disconnect<C::SinkError, C::Error>>) {
(self.id, self.inner)
}
#[doc(hidden)]
pub fn into_disconnect(self) -> (I, Disconnect<C::SinkError, C::Error>) {
let disconnect = match self.inner {
Ok(_) => unimplemented!(),
Err(disconnect) => disconnect,
};
(self.id, disconnect)
}
}
impl<I, C> Stream for Client<I, C>
where I: Clone + Send + Debug + 'static,
C: Sink + Stream + 'static
{
type Item = C::Item;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let inner_poll = {
let inner = match self.inner.as_mut() {
Err(_) => return Ok(Async::Ready(None)),
Ok(inner) => inner,
};
inner.poll()
};
match inner_poll {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(Some(item))) => Ok(Async::Ready(Some(item))),
Ok(Async::Ready(None)) => {
self.inner = Err(Disconnect::Dropped);
Ok(Async::Ready(None))
}
Err(e) => {
self.inner = Err(Disconnect::Stream(e));
Ok(Async::Ready(None))
}
}
}
}
impl<I, C> Sink for Client<I, C>
where I: Clone + Send + Debug + 'static,
C: Sink + Stream + 'static
{
type SinkItem = C::SinkItem;
type SinkError = ();
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
let inner_start_send = {
let inner = match self.inner.as_mut() {
Err(_) => return Err(()),
Ok(inner) => inner,
};
inner.start_send(item)
};
match inner_start_send {
Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
Ok(AsyncSink::NotReady(item)) => Ok(AsyncSink::NotReady(item)),
Err(e) => {
self.inner = Err(Disconnect::Sink(e));
Err(())
}
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
let inner_poll_complete = {
let inner = match self.inner.as_mut() {
Err(_) => return Err(()),
Ok(inner) => inner,
};
inner.poll_complete()
};
match inner_poll_complete {
Ok(Async::Ready(())) => Ok(Async::Ready(())),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
self.inner = Err(Disconnect::Sink(e));
Err(())
}
}
}
}
impl<I, C> PartialEq for Client<I, C>
where I: Clone + Send + Debug + PartialEq + 'static,
C: Sink + Stream + 'static
{
fn eq(&self, other: &Client<I, C>) -> bool {
self.id == other.id
}
}
impl<I, C> Eq for Client<I, C>
where I: Clone + Send + Debug + PartialEq + Eq + 'static,
C: Sink + Stream + 'static
{
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Disconnect<T, R> {
Closed,
Dropped,
Timeout,
Timer(tokio_timer::TimerError),
Sink(T),
Stream(R),
}
#[derive(PartialEq, Eq)]
pub struct Unsplit<T, R>
where T: Sink + 'static,
R: Stream + 'static
{
tx: T,
rx: R,
}
impl<T, R> Debug for Unsplit<T, R>
where T: Sink + 'static,
R: Stream + 'static,
T::SinkError: Clone,
R::Error: Clone
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,
"Unsplit {{ tx: Sink, rx: Stream, sleep: tokio_timer::Sleep }}")
}
}
impl<T, R> Sink for Unsplit<T, R>
where T: Sink + 'static,
R: Stream + 'static
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
self.tx.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.tx.poll_complete()
}
}
impl<T, R> Stream for Unsplit<T, R>
where T: Sink + 'static,
R: Stream + 'static
{
type Item = R::Item;
type Error = R::Error;
fn poll(&mut self) -> Poll<Option<R::Item>, R::Error> {
self.rx.poll()
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::test::*;
use futures::{lazy, executor, Future, Stream};
#[test]
fn can_join_room() {
let client0_id = "client0";
let client1_id = "client1";
let (_, _, client0) = mock_client(client0_id.clone(), 1);
let (_, _, client0_duplicate_name) = mock_client(client0_id, 1);
let (_, _, client1) = mock_client(client1_id.clone(), 1);
let mut room = Room::default();
assert_eq!(room.ids().len(), 0);
assert!(room.insert(client0));
assert_eq!(room.ids(),
vec![client0_id.to_string()].into_iter().collect());
assert_eq!(room.insert(client0_duplicate_name), false);
assert_eq!(room.ids(),
vec![client0_id.to_string()].into_iter().collect());
assert!(room.insert(client1));
let client_ids = room.ids();
assert!(client_ids.len() == 2);
assert!(client_ids.contains(&client0_id.to_string()));
assert!(client_ids.contains(&client1_id.to_string()));
}
#[test]
fn can_transmit() {
let (rx_from_client, _, mut client) = mock_client("client1", 1);
let mut rx_stream = rx_from_client.wait().peekable();
for _ in 0..10 {
let msg = TinyMsg::A;
client = client.transmit(msg.clone()).wait().unwrap();
match rx_stream.next() {
Some(Ok(msg2)) => {
assert_eq!(msg, msg2);
}
_ => assert!(false),
}
}
}
#[test]
fn can_receive() {
let (_, mut tx_to_client, mut client) = mock_client("client1", 1);
for _ in 0..10 {
let msg = TinyMsg::B("ABC".to_string());
let receive = client.receive();
let mut future = executor::spawn(receive.fuse());
if let Ok(Async::NotReady) = future.poll_future(unpark_noop()) {
} else {
assert!(false);
}
tx_to_client = tx_to_client.send(msg.clone()).wait().unwrap();
match future.wait_future() {
Ok((msg, client_new)) => {
client = client_new;
assert_eq!("client1", client.id());
assert_eq!(msg, msg);
}
_ => {
unreachable!();
}
};
}
}
#[test]
fn stream_and_sink_separate() {
let (rx, tx, mut c0) = mock_client("client1", 1);
let mut rx = rx.wait();
lazy(move || {
assert_eq!(c0.poll(), Ok(Async::NotReady));
assert_eq!(c0.poll_complete(), Ok(Async::Ready(())));
let msg = TinyMsg::A;
tx.clone().send(msg.clone()).wait().unwrap();
assert_eq!(c0.poll(), Ok(Async::Ready(Some(msg))));
assert_eq!(c0.poll_complete(), Ok(Async::Ready(())));
assert_eq!(c0.poll(), Ok(Async::NotReady));
assert_eq!(c0.poll_complete(), Ok(Async::Ready(())));
assert_eq!(c0.start_send(TinyMsg::A), Ok(AsyncSink::Ready));
assert_eq!(c0.poll(), Ok(Async::NotReady));
assert_eq!(c0.poll_complete(), Ok(Async::Ready(())));
assert_eq!(c0.start_send(TinyMsg::A), Ok(AsyncSink::Ready));
assert_eq!(c0.poll(), Ok(Async::NotReady));
assert_eq!(c0.poll_complete(), Ok(Async::Ready(())));
assert_eq!(c0.start_send(TinyMsg::A),
Ok(AsyncSink::NotReady(TinyMsg::A)));
assert_eq!(c0.poll(), Ok(Async::NotReady));
assert_eq!(c0.poll_complete(), Ok(Async::Ready(())));
assert_eq!(c0.start_send(TinyMsg::A),
Ok(AsyncSink::NotReady(TinyMsg::A)));
assert_eq!(c0.poll(), Ok(Async::NotReady));
assert_eq!(c0.poll_complete(), Ok(Async::Ready(())));
rx.next();
assert_eq!(c0.start_send(TinyMsg::A), Ok(AsyncSink::Ready));
assert_eq!(c0.poll(), Ok(Async::NotReady));
assert_eq!(c0.poll_complete(), Ok(Async::Ready(())));
Ok::<(), ()>(())
})
.wait()
.unwrap();
}
#[test]
fn can_status() {
let msg = TinyMsg::B("ABC".to_string());
let (mut rx_from_client, _, client) = mock_client("client1", 1);
assert!(client.is_connected());
let _ = rx_from_client.close();
assert!(client.is_connected());
let (_, mut tx_to_client, client) = mock_client("client2", 1);
assert!(client.is_connected());
let _ = tx_to_client.close();
assert!(client.is_connected());
let (_, _, client) = mock_client("client2", 1);
assert!(client.is_connected());
match client.transmit(msg.clone()).wait() {
Ok(_) => unreachable!(),
Err(client) => assert!(client.is_disconnected().is_some()),
};
}
#[test]
fn can_close() {
let (_, _, mut client) = mock_client("client1", 1);
assert!(client.is_connected());
client.close();
assert!(client.is_disconnected().is_some());
}
}