comms 0.1.4

Experimental library for communicating with groups of Tokio server clients.
Documentation
extern crate futures;
extern crate comms;

use std::collections::VecDeque;
use futures::{future, sink, stream, Future, Sink, Stream, Poll, Async, AsyncSink};
use futures::stream::{SplitSink, SplitStream};
use futures::sync::mpsc;
use comms::*;

enum Msg {
    Alpha,
    Beta(bool),
    Gamma(usize),
}

type PeerId = u64;
type ActorId = u64;
type MsgPair = (PeerId, Msg);

type FramedTcpSink<C> = sink::SinkFromErr<SplitSink<Framed<TcpStream, C>>, IoErrorString>;
type FramedTcpStream<C> = stream::FromErr<SplitStream<Framed<TcpStream, C>>, IoErrorString>;

pub struct LoadBalancePeersForActors<C> {
    // Currently present peers.
    peers: Room<PeerId, FramedTcpSink<C>, FramedTcpStream<C>>,
    // For adding new peers and assigning them IDs 1..N.
    peer_id_counter: PeerId,
    new_peer_rx: Fuse<mpsc::Receiver<Framed<TcpStream, C>>>,
    // For adding new actors.
    new_actor_rx: Fuse<mpsc::Receiver<(ActorId, mpsc::Sender<MsgPair>)>>,
    // For adding new messages to send.
    outgoing_msg_rx: Fuse<mpsc::Receiver<MsgPair>>,
    outgoing_msg_queue: VecDeque<MsgPair>,
    // For forwarding messages from peers.
    actor_peer_counts: HashMap<ActorId, u64>,
    actor_txs: HashMap<ActorId, mpsc::Sender<MsgPair>>,
    peer_actor_map: HashMap<PeerId, ClientId>,
}

impl<C> LoadBalancePeersForActors<C> {
    pub fn new(new_peer_rx: mpsc::Receiver<MsgPair>,
               new_actor_rx: mpsc::Receiver<(ActorId, mpsc::Sender<MsgPair>)>,
               outgoing_msg_rx: mpsc::Receiver<MsgPair>)
               -> LoadBalancePeersForActors<C> {
        LoadBalancePeersForActors {
            peers: Room::default(),
            peer_id_counter: 0,
            new_peer_rx: new_peer_rx.fuse(),
            new_actor_rx: new_actor_rx.fuse(),
            outgoing_msg_rx: outgoing_msg_rx.fuse(),
            outgoing_msg_queue: VecDeque::new(),
            actor_peer_counts: HashMap::new(),
            actor_txs: HashMap::new(),
            peer_actor_map: HashMap::new(),
        }
    }

    fn new_peer_id(&mut self) -> PeerId {
        let peer_id = self.peer_id_counter + 1;
        self.peer_id_counter += 1;
        peer_id
    }
}

impl Future for Spectators {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<(), ()> {
        println!("LoadBalancePeersForActors wakeup {:?}",
                 self.peers.ready_ids());

        while Ok(Async::Ready(Some(peer))) = self.new_peer_rx.poll() {
            let (tx, rx) = peer.split();
            let peer = Client::new(self.new_peer_id(), ClientTimeout::None, tx, rx);
            self.peers.insert(peer);
            // @TODO: Assign to an actor.
        }

        while Ok(Async::Ready(Some((actor_id, actor_tx)))) = self.new_actor_rx.poll() {
            self.actor_peer_counts.insert(actor_id, 0);
            self.actor_txs.insert(actor_id, actor_tx);
        }

        while Ok(Async::Ready(Some(msg))) = self.outgoing_msg_rx.poll() {
            self.outgoing_msg_queue.push_back(msg);
        }

        // This is bottlenecked by the batched nature of `Room::start_send`. Perhaps that
        // should be a separate type, e.g., `RoomTransmit`.
        // It's pretty clear we'd want a form of Sink that takes `(I, T::SinkItem)` pairs,
        // but *also* one that takes `HashMap<I, T::SinkItem>`.
        // It might be easiest to have `Room` and `UnmanagedRoom`? `EasyRoom` and `Room`?
        loop {
            if let Some((id, msg)) = self.outgoing_msg_queue.pop_front() {
                let h = HashMap::new();
                h.insert(id, msg.clone());
                match self.peers.start_send(h) {
                    Ok(AsyncSink::Ready) => {}
                    Ok(AsyncSink::NotReady(h)) => {
                        let msg = h.remove(id);
                        self.outgoing_msg_queue.push_front(msg);
                    }
                    Err(_) => break,
                }
            }
            match self.peers.poll_complete() {
                Ok(Async::Ready(())) => continue,
                Ok(Async::NotReady) |
                Err(_) => break,
            }
        }

        loop {
            if let Ok(Async::Ready(Some(msgs))) = self.peers.poll() {
                for (id, msg) in msgs {
                    let actor_id = match self.peer_actor_map.get(&id) {
                        Some(actor_id) => actor_id,
                        None => {
                            println!("Peer {} had not Actor set.", peer_id);
                            continue;
                        }
                    };
                    match self.actor_txs.get_mut(actor_id) {
                        Some(actor_tx) => {
                            match actor_tx.start_send((id, msg)) {
                                Ok(AsyncSink::Ready) => {}
                                Ok(AsyncSink::NotReady(h)) => {
                                    let msg = h.remove(id);
                                    self.outgoing_msg_queue.push_front(msg);
                                }
                                // Drop messages on error?
                                Err(_) => break,
                            }
                        }
                    }
                }
            }
        }

        Ok(Async::NotReady)
    }
}