extern crate amy;
extern crate rabble;
#[macro_use]
extern crate assert_matches;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate slog;
extern crate slog_stdlog;
extern crate slog_envlogger;
extern crate slog_term;
extern crate log;
extern crate time;
mod utils;
use std::str;
use amy::{Poller, Receiver};
use time::Duration;
use utils::messages::*;
use utils::{
wait_for,
start_nodes,
test_pid,
register_test_as_service
};
use rabble::{
Envelope,
Msg,
ClusterStatus,
Node,
CorrelationId
};
const NUM_NODES: usize = 3;
#[test]
fn join_leave() {
let (nodes, handles) = start_nodes(NUM_NODES);
let mut poller = Poller::new().unwrap();
let (test_tx, test_rx) = poller.get_registrar().unwrap().channel().unwrap();
register_test_as_service(&mut poller, &nodes, &test_tx, &test_rx);
nodes[0].join(&nodes[1].id).unwrap();
assert!(wait_for_cluster_status(&nodes[0], &test_rx, 1));
assert!(wait_for_cluster_status(&nodes[1], &test_rx, 1));
nodes[0].join(&nodes[2].id).unwrap();
for node in &nodes {
assert!(wait_for_cluster_status(&node, &test_rx, 2));
}
nodes[0].leave(&nodes[1].id).unwrap();
assert!(wait_for_cluster_status(&nodes[0], &test_rx, 1));
assert!(wait_for_cluster_status(&nodes[2], &test_rx, 1));
nodes[0].leave(&nodes[0].id).unwrap();
assert!(wait_for_cluster_status(&nodes[0], &test_rx, 0));
assert!(wait_for_cluster_status(&nodes[2], &test_rx, 0));
for node in nodes {
node.shutdown();
}
for h in handles {
h.join().unwrap();
}
}
fn wait_for_cluster_status(node: &Node<RabbleUserMsg>,
test_rx: &Receiver<Envelope<RabbleUserMsg>>,
num_connected: usize) -> bool
{
let timeout = Duration::seconds(5);
let test_pid = test_pid(node.id.clone());
wait_for(timeout, || {
let correlation_id = CorrelationId::pid(test_pid.clone());
node.cluster_status(correlation_id.clone()).unwrap();
if let Ok(envelope) = test_rx.try_recv() {
if let Msg::ClusterStatus(ClusterStatus{established, num_connections, ..})
= envelope.msg
{
if established.len() == num_connected && num_connections == num_connected {
return true;
}
}
}
false
})
}