extern crate time;
extern crate slog;
extern crate slog_term;
extern crate slog_envlogger;
extern crate slog_stdlog;
pub mod replica;
pub mod api_server;
pub mod messages;
use std::thread::{self, JoinHandle};
use std::net::TcpStream;
use amy::{Poller, Receiver, Sender};
use self::slog::DrainExt;
use self::time::{SteadyTime, Duration};
use utils::messages::*;
use rabble::{
self,
NodeId,
Node,
Envelope,
Pid,
CorrelationId,
Msg
};
use rabble::serialize::{Serialize, MsgpackSerializer};
type CrNode = Node<RabbleUserMsg>;
type CrReceiver = Receiver<Envelope<RabbleUserMsg>>;
type CrSender = Sender<Envelope<RabbleUserMsg>>;
#[allow(dead_code)] pub fn wait_for<F>(timeout: Duration, mut f: F) -> bool
where F: FnMut() -> bool
{
let sleep_time = Duration::milliseconds(10);
let start = SteadyTime::now();
while let false = f() {
thread::sleep(sleep_time.to_std().unwrap());
if SteadyTime::now() - start > timeout {
return false;
}
}
true
}
#[allow(dead_code)] pub fn send(sock: &mut TcpStream,
serializer: &mut MsgpackSerializer<ApiClientMsg>,
msg: ApiClientMsg)
{
if let Ok(true) = serializer.write_msgs(sock, Some(&msg)) {
return;
}
assert_eq!(true, wait_for(Duration::seconds(5), || {
serializer.set_writable();
match serializer.write_msgs(sock, None) {
Ok(true) => true,
Ok(false) => false,
Err(e) => {
println!("Failed to write to socket: {}", e);
assert!(false);
unreachable!();
}
}
}));
}
#[allow(dead_code)] pub fn create_node_ids(n: usize) -> Vec<NodeId> {
(1..n + 1).map(|n| {
NodeId {
name: format!("node{}", n),
addr: format!("127.0.0.1:1100{}", n)
}
}).collect()
}
#[allow(dead_code)] pub fn start_nodes(n: usize) -> (Vec<Node<RabbleUserMsg>>, Vec<JoinHandle<()>>) {
let term = slog_term::streamer().build();
let drain = slog_envlogger::LogBuilder::new(term)
.filter(None, slog::FilterLevel::Debug).build();
let root_logger = slog::Logger::root(drain.fuse(), None);
slog_stdlog::set_logger(root_logger.clone()).unwrap();
create_node_ids(n).into_iter().fold((Vec::new(), Vec::new()),
|(mut nodes, mut handles), node_id| {
let (node, handle_list) = rabble::rouse(node_id, Some(root_logger.clone()));
nodes.push(node);
handles.extend(handle_list);
(nodes, handles)
})
}
#[allow(dead_code)] pub fn test_pid(node_id: NodeId) -> Pid {
Pid {
name: "test-runner".to_string(),
group: None,
node: node_id
}
}
#[allow(dead_code)] pub fn register_test_as_service(poller: &mut Poller,
nodes: &Vec<CrNode>,
test_tx: &CrSender,
test_rx: &CrReceiver)
{
for node in nodes {
let test_pid = test_pid(node.id.clone());
let correlation_id = CorrelationId::pid(test_pid.clone());
node.register_service(&test_pid, &test_tx).unwrap();
loop {
node.cluster_status(correlation_id.clone()).unwrap();
let notifications = poller.wait(10).unwrap();
if notifications.len() != 0 {
while let Ok(envelope) = test_rx.try_recv() {
assert_matches!(envelope.msg, Msg::ClusterStatus(_));
}
break;
}
}
}
}