1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#![recursion_limit = "1024"]
#[macro_use]
extern crate error_chain;
extern crate orset;
extern crate rmp_serde as msgpack;
extern crate protobuf;
extern crate amy;
extern crate time;
extern crate net2;
extern crate libc;
extern crate ferris;
#[macro_use]
extern crate slog;
extern crate slog_stdlog;
extern crate serde;
#[macro_use]
extern crate serde_derive;
#[macro_use]
mod metrics;
mod node_id;
mod node;
mod members;
mod pid;
mod process;
mod envelope;
mod executor;
mod cluster;
mod msg;
mod timer_wheel;
mod service;
mod correlation_id;
pub mod serialize;
pub mod errors;
pub use errors::Result;
pub use node_id::NodeId;
pub use node::Node;
pub use pid::Pid;
pub use process::Process;
pub use envelope::Envelope;
pub use correlation_id::CorrelationId;
pub use msg::Msg;
pub use metrics::Metric;
pub use cluster::{
ClusterServer,
ClusterStatus,
};
pub use executor::{
Executor,
ExecutorStatus,
ExecutorMetrics
};
pub use service::{
Service,
ConnectionHandler,
ConnectionMsg,
ServiceHandler,
TcpServerHandler,
};
use std::thread::{self, JoinHandle};
use std::sync::mpsc::channel;
use std::fmt::Debug;
use serde::{Deserialize, Serialize};
use amy::Poller;
use slog::DrainExt;
use cluster::ClusterMsg;
const TIMEOUT: usize = 5000;
pub fn rouse<'de, T>(node_id: NodeId, logger: Option<slog::Logger>) -> (Node<T>, Vec<JoinHandle<()>>)
where T: Serialize + Deserialize<'de> + Send + 'static + Clone + Debug,
{
let logger = match logger {
Some(logger) => logger.new(o!("node_id" => node_id.to_string())),
None => slog::Logger::root(slog_stdlog::StdLog.fuse(), o!("node_id" => node_id.to_string()))
};
let mut poller = Poller::new().unwrap();
let (exec_tx, exec_rx) = channel();
let (cluster_tx, cluster_rx) = channel();
let cluster_server = ClusterServer::new(node_id.clone(),
cluster_rx,
exec_tx.clone(),
poller.get_registrar().unwrap(),
logger.clone());
let executor = Executor::new(node_id.clone(),
exec_tx.clone(),
exec_rx,
cluster_tx.clone(),
logger.clone());
let h1 = thread::Builder::new().name(format!("cluster_server::{}", node_id)).spawn(move || {
cluster_server.run()
}).unwrap();
let h2 = thread::Builder::new().name(format!("executor::{}", node_id)).spawn(move || {
executor.run()
}).unwrap();
let _cluster_tx = cluster_tx.clone();
let h3 = thread::Builder::new().name(format!("poller::{}", node_id)).spawn(move || {
loop {
let notifications = poller.wait(TIMEOUT).unwrap();
if let Err(_) = _cluster_tx.send(ClusterMsg::PollNotifications(notifications)) {
return;
}
}
}).unwrap();
(Node::new(node_id, exec_tx, cluster_tx, logger), vec![h1, h2, h3])
}