use std::sync::mpsc::Sender;
use std::fmt::Debug;
use serde::{Serialize, Deserialize};
use node_id::NodeId;
use executor::ExecutorMsg;
use cluster::ClusterMsg;
use pid::Pid;
use correlation_id::CorrelationId;
use process::Process;
use envelope::Envelope;
use amy;
use errors::*;
use slog;
macro_rules! send {
($s:ident.$t:ident, $msg:expr, $pid:expr, $errmsg:expr) => {
if let Err(_) = $s.$t.send($msg) {
return Err(ErrorKind::SendError($errmsg, $pid.cloned()).into())
} else {
return Ok(());
}
}
}
#[derive(Clone)]
pub struct Node<T> {
pub id: NodeId,
pub logger: slog::Logger,
executor_tx: Sender<ExecutorMsg<T>>,
cluster_tx: Sender<ClusterMsg<T>>
}
impl<'de, T: Serialize + Deserialize<'de> + Debug + Clone> Node<T> {
pub fn new(id: NodeId,
executor_tx: Sender<ExecutorMsg<T>>,
cluster_tx: Sender<ClusterMsg<T>>,
logger: slog::Logger) -> Node<T> {
Node {
id: id,
executor_tx: executor_tx,
cluster_tx: cluster_tx,
logger: logger
}
}
pub fn join(&self, node_id: &NodeId) -> Result<()> {
send!(self.cluster_tx,
ClusterMsg::Join(node_id.clone()),
None,
format!("ClusterMsg::Join({:?})", *node_id))
}
pub fn leave(&self, node_id: &NodeId) -> Result<()> {
send!(self.cluster_tx,
ClusterMsg::Leave(node_id.clone()),
None,
format!("ClusterMsg::Leave({:?})", *node_id))
}
pub fn spawn(&self, pid: &Pid, process: Box<Process<T>>) -> Result<()> {
send!(self.executor_tx,
ExecutorMsg::Start(pid.clone(), process),
Some(pid),
format!("ExecutorMsg::Start({}, ..)", pid))
}
pub fn stop(&self, pid: &Pid) -> Result<()> {
send!(self.executor_tx,
ExecutorMsg::Stop(pid.clone()),
Some(pid),
format!("ExecutorMsg::Start({}, ..)", pid))
}
pub fn register_service(&self, pid: &Pid, tx: &amy::Sender<Envelope<T>>) -> Result<()>
{
send!(self.executor_tx,
ExecutorMsg::RegisterService(pid.clone(), tx.try_clone()?),
Some(pid),
format!("ExecutorMsg::RegisterService({}, ..)", pid))
}
pub fn send(&self, envelope: Envelope<T>) -> Result<()> {
let to = envelope.to.clone();
send!(self.executor_tx,
ExecutorMsg::Envelope(envelope),
Some(&to),
"ExecutorMsg::Envelope(envelope)".to_string())
}
pub fn executor_status(&self, correlation_id: CorrelationId) -> Result<()> {
let to = correlation_id.pid.clone();
send!(self.executor_tx,
ExecutorMsg::GetStatus(correlation_id),
Some(&to),
"ExecutorMsg::GetStatus".to_string())
}
pub fn cluster_status(&self, correlation_id: CorrelationId) -> Result<()> {
let to = correlation_id.pid.clone();
send!(self.cluster_tx,
ClusterMsg::GetStatus(correlation_id),
Some(&to),
"ClusterMsg::GetStatus".to_string())
}
pub fn shutdown(&self) {
self.executor_tx.send(ExecutorMsg::Shutdown).unwrap();
self.cluster_tx.send(ClusterMsg::Shutdown).unwrap();
}
}