actix_telepathy/cluster/
mod.rs1mod connector;
2mod listener;
3#[cfg(test)]
4mod tests;
5
6pub use self::listener::{ClusterListener, ClusterLog};
7pub use connector::NodeResolving;
8pub use connector::{gossip::Gossip, single_seed::SingleSeed};
9
10use crate::CustomSystemService;
11pub use crate::cluster::connector::ConnectionProtocol;
12pub use crate::cluster::connector::Connector;
13use crate::network::NetworkInterface;
14use crate::remote::Node;
15use actix::prelude::*;
16use actix_broker::BrokerIssue;
17use futures::StreamExt;
18use futures::executor::block_on;
19use log::*;
20use std::collections::HashMap;
21use std::fmt::Display;
22use std::io::Result as IoResult;
23use std::net;
24use std::net::SocketAddr;
25use std::str::FromStr;
26use tokio::net::{TcpListener, TcpStream};
27use tokio_stream::wrappers::TcpListenerStream;
28
29#[derive(MessageResponse)]
30pub enum ConnectionApprovalResponse {
31 Approved,
32 Declined,
33}
34
35#[derive(Message)]
36#[rtype(result = "ConnectionApprovalResponse")]
37pub struct ConnectionApproval {
38 pub addr: SocketAddr,
39 pub send_addr: SocketAddr,
40}
41
42#[derive(Message)]
43#[rtype(result = "()")]
44pub struct TcpConnect(pub TcpStream, pub SocketAddr);
45
46#[derive(Message, Debug)]
47#[rtype(result = "()")]
48pub enum NodeEvent {
49 MemberUp(Node, bool),
51 MemberDown(SocketAddr),
52}
53
54impl Display for NodeEvent {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 match self {
57 NodeEvent::MemberUp(node, seed) => {
58 write!(f, "MemberUp: {} (seed: {})", node.socket_addr, seed)
59 }
60 NodeEvent::MemberDown(addr) => write!(f, "MemberDown: {}", addr),
61 }
62 }
63}
64
65#[derive(Message)]
66#[rtype(result = "()")]
67pub struct ConnectToNode(pub(crate) SocketAddr);
68
69pub struct Cluster {
71 ip_address: SocketAddr,
72 addrs: Vec<SocketAddr>,
73 own_addr: Option<Addr<Cluster>>,
74 nodes: HashMap<SocketAddr, Addr<NetworkInterface>>,
75}
76
77impl Actor for Cluster {
78 type Context = Context<Self>;
79
80 fn started(&mut self, ctx: &mut Self::Context) {
81 let listener = Cluster::bind(self.ip_address.to_string()).unwrap();
82
83 let st = Box::leak(listener).map(|st| {
84 let st = st.unwrap();
85 let addr = st.peer_addr().unwrap();
86 TcpConnect(st, addr)
87 });
88
89 ctx.add_message_stream(st);
90
91 self.own_addr = Some(ctx.address());
92
93 let addrs_len = self.addrs.len();
94 for node_addr in 0..addrs_len {
95 self.add_node(*self.addrs.get(node_addr).unwrap(), true);
96 }
97 debug!("Cluster started {}", self.ip_address);
98 }
99}
100
101impl Cluster {
102 pub fn new(ip_address: SocketAddr, seed_nodes: Vec<SocketAddr>) -> Addr<Cluster> {
103 Self::new_with_connection_protocol(ip_address, seed_nodes, ConnectionProtocol::SingleSeed)
104 }
105
106 pub fn new_with_connection_protocol(
107 ip_address: SocketAddr,
108 seed_nodes: Vec<SocketAddr>,
109 connection_protocol: ConnectionProtocol,
110 ) -> Addr<Cluster> {
111 debug!("Cluster created");
112 Connector::start_service_from(connection_protocol, ip_address, seed_nodes.clone());
113
114 Cluster::start_service_with(move || Cluster {
115 ip_address,
116 addrs: seed_nodes.clone(),
117 own_addr: None,
118 nodes: Default::default(),
119 })
120 }
121
122 fn bind(addr: String) -> IoResult<Box<TcpListenerStream>> {
123 let addr = net::SocketAddr::from_str(&addr).unwrap();
124 let listener = Box::new(TcpListenerStream::new(
125 block_on(TcpListener::bind(&addr)).unwrap(),
126 ));
127 debug!("Listening on {}", addr);
128 Ok(listener)
129 }
130
131 fn add_node_from_stream(&mut self, addr: SocketAddr, stream: TcpStream) {
132 let own_ip = self.ip_address;
133 let node = NetworkInterface::from_stream(own_ip, addr, stream).start();
134 self.nodes.insert(addr, node);
135 }
136
137 fn add_node(&mut self, node_addr: SocketAddr, seed: bool) {
138 let own_ip = self.ip_address;
139 self.nodes
140 .entry(node_addr)
141 .or_insert_with(|| NetworkInterface::new(own_ip, node_addr, seed).start());
142 }
143}
144
145impl Default for Cluster {
148 fn default() -> Self {
149 let ip_addr = "127.0.0.1:8000";
150
151 Self {
152 ip_address: SocketAddr::from_str(ip_addr).unwrap(),
153 addrs: vec![],
154 own_addr: None,
155 nodes: HashMap::new(),
156 }
157 }
158}
159
160impl Supervised for Cluster {}
161impl SystemService for Cluster {}
162impl CustomSystemService for Cluster {}
163
164impl Handler<TcpConnect> for Cluster {
165 type Result = ();
166
167 fn handle(&mut self, msg: TcpConnect, _ctx: &mut Self::Context) -> Self::Result {
168 debug!("Incoming TcpConnect");
169 let stream = msg.0;
170 let addr = msg.1;
171 self.add_node_from_stream(addr, stream);
172 }
173}
174
175impl Handler<ConnectToNode> for Cluster {
176 type Result = ();
177
178 fn handle(&mut self, msg: ConnectToNode, _ctx: &mut Self::Context) -> Self::Result {
179 self.add_node(msg.0, false);
180 }
181}
182
183impl Handler<NodeEvent> for Cluster {
184 type Result = ();
185
186 fn handle(&mut self, msg: NodeEvent, _ctx: &mut Self::Context) -> Self::Result {
187 match &msg {
188 NodeEvent::MemberUp(node, _seed) => {
189 self.issue_system_async(ClusterLog::NewMember(node.clone()));
190 }
191 NodeEvent::MemberDown(host) => {
192 self.issue_system_async(ClusterLog::MemberLeft(*host));
193 self.nodes.remove(host);
194 }
195 }
196
197 Connector::from_custom_registry().do_send(msg)
198 }
199}
200
201impl Handler<ConnectionApproval> for Cluster {
202 type Result = ConnectionApprovalResponse;
203
204 fn handle(
205 &mut self,
206 msg: ConnectionApproval,
207 _ctx: &mut Self::Context,
208 ) -> ConnectionApprovalResponse {
209 if self.nodes.contains_key(&msg.addr) {
210 ConnectionApprovalResponse::Declined
211 } else {
212 let node = self
213 .nodes
214 .get(&msg.send_addr)
215 .expect("Should be filled")
216 .clone();
217 self.nodes.remove(&msg.send_addr);
218 self.nodes.insert(msg.addr, node);
219 ConnectionApprovalResponse::Approved
220 }
221 }
222}