extern crate futures;
extern crate comms;
use std::collections::VecDeque;
use futures::{future, sink, stream, Future, Sink, Stream, Poll, Async, AsyncSink};
use futures::stream::{SplitSink, SplitStream};
use futures::sync::mpsc;
use comms::*;
enum Msg {
Alpha,
Beta(bool),
Gamma(usize),
}
type PeerId = u64;
type ActorId = u64;
type MsgPair = (PeerId, Msg);
type FramedTcpSink<C> = sink::SinkFromErr<SplitSink<Framed<TcpStream, C>>, IoErrorString>;
type FramedTcpStream<C> = stream::FromErr<SplitStream<Framed<TcpStream, C>>, IoErrorString>;
pub struct LoadBalancePeersForActors<C> {
// Currently present peers.
peers: Room<PeerId, FramedTcpSink<C>, FramedTcpStream<C>>,
// For adding new peers and assigning them IDs 1..N.
peer_id_counter: PeerId,
new_peer_rx: Fuse<mpsc::Receiver<Framed<TcpStream, C>>>,
// For adding new actors.
new_actor_rx: Fuse<mpsc::Receiver<(ActorId, mpsc::Sender<MsgPair>)>>,
// For adding new messages to send.
outgoing_msg_rx: Fuse<mpsc::Receiver<MsgPair>>,
outgoing_msg_queue: VecDeque<MsgPair>,
// For forwarding messages from peers.
actor_peer_counts: HashMap<ActorId, u64>,
actor_txs: HashMap<ActorId, mpsc::Sender<MsgPair>>,
peer_actor_map: HashMap<PeerId, ClientId>,
}
impl<C> LoadBalancePeersForActors<C> {
pub fn new(new_peer_rx: mpsc::Receiver<MsgPair>,
new_actor_rx: mpsc::Receiver<(ActorId, mpsc::Sender<MsgPair>)>,
outgoing_msg_rx: mpsc::Receiver<MsgPair>)
-> LoadBalancePeersForActors<C> {
LoadBalancePeersForActors {
peers: Room::default(),
peer_id_counter: 0,
new_peer_rx: new_peer_rx.fuse(),
new_actor_rx: new_actor_rx.fuse(),
outgoing_msg_rx: outgoing_msg_rx.fuse(),
outgoing_msg_queue: VecDeque::new(),
actor_peer_counts: HashMap::new(),
actor_txs: HashMap::new(),
peer_actor_map: HashMap::new(),
}
}
fn new_peer_id(&mut self) -> PeerId {
let peer_id = self.peer_id_counter + 1;
self.peer_id_counter += 1;
peer_id
}
}
impl Future for Spectators {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
println!("LoadBalancePeersForActors wakeup {:?}",
self.peers.ready_ids());
while Ok(Async::Ready(Some(peer))) = self.new_peer_rx.poll() {
let (tx, rx) = peer.split();
let peer = Client::new(self.new_peer_id(), ClientTimeout::None, tx, rx);
self.peers.insert(peer);
// @TODO: Assign to an actor.
}
while Ok(Async::Ready(Some((actor_id, actor_tx)))) = self.new_actor_rx.poll() {
self.actor_peer_counts.insert(actor_id, 0);
self.actor_txs.insert(actor_id, actor_tx);
}
while Ok(Async::Ready(Some(msg))) = self.outgoing_msg_rx.poll() {
self.outgoing_msg_queue.push_back(msg);
}
// This is bottlenecked by the batched nature of `Room::start_send`. Perhaps that
// should be a separate type, e.g., `RoomTransmit`.
// It's pretty clear we'd want a form of Sink that takes `(I, T::SinkItem)` pairs,
// but *also* one that takes `HashMap<I, T::SinkItem>`.
// It might be easiest to have `Room` and `UnmanagedRoom`? `EasyRoom` and `Room`?
loop {
if let Some((id, msg)) = self.outgoing_msg_queue.pop_front() {
let h = HashMap::new();
h.insert(id, msg.clone());
match self.peers.start_send(h) {
Ok(AsyncSink::Ready) => {}
Ok(AsyncSink::NotReady(h)) => {
let msg = h.remove(id);
self.outgoing_msg_queue.push_front(msg);
}
Err(_) => break,
}
}
match self.peers.poll_complete() {
Ok(Async::Ready(())) => continue,
Ok(Async::NotReady) |
Err(_) => break,
}
}
loop {
if let Ok(Async::Ready(Some(msgs))) = self.peers.poll() {
for (id, msg) in msgs {
let actor_id = match self.peer_actor_map.get(&id) {
Some(actor_id) => actor_id,
None => {
println!("Peer {} had not Actor set.", peer_id);
continue;
}
};
match self.actor_txs.get_mut(actor_id) {
Some(actor_tx) => {
match actor_tx.start_send((id, msg)) {
Ok(AsyncSink::Ready) => {}
Ok(AsyncSink::NotReady(h)) => {
let msg = h.remove(id);
self.outgoing_msg_queue.push_front(msg);
}
// Drop messages on error?
Err(_) => break,
}
}
}
}
}
}
Ok(Async::NotReady)
}
}