Skip to main content

actix_telepathy/cluster/
mod.rs

1mod connector;
2mod listener;
3#[cfg(test)]
4mod tests;
5
6pub use self::listener::{ClusterListener, ClusterLog};
7pub use connector::NodeResolving;
8pub use connector::{gossip::Gossip, single_seed::SingleSeed};
9
10use crate::CustomSystemService;
11pub use crate::cluster::connector::ConnectionProtocol;
12pub use crate::cluster::connector::Connector;
13use crate::network::NetworkInterface;
14use crate::remote::Node;
15use actix::prelude::*;
16use actix_broker::BrokerIssue;
17use futures::StreamExt;
18use futures::executor::block_on;
19use log::*;
20use std::collections::HashMap;
21use std::fmt::Display;
22use std::io::Result as IoResult;
23use std::net;
24use std::net::SocketAddr;
25use std::str::FromStr;
26use tokio::net::{TcpListener, TcpStream};
27use tokio_stream::wrappers::TcpListenerStream;
28
29#[derive(MessageResponse)]
30pub enum ConnectionApprovalResponse {
31    Approved,
32    Declined,
33}
34
35#[derive(Message)]
36#[rtype(result = "ConnectionApprovalResponse")]
37pub struct ConnectionApproval {
38    pub addr: SocketAddr,
39    pub send_addr: SocketAddr,
40}
41
42#[derive(Message)]
43#[rtype(result = "()")]
44pub struct TcpConnect(pub TcpStream, pub SocketAddr);
45
46#[derive(Message, Debug)]
47#[rtype(result = "()")]
48pub enum NodeEvent {
49    /// (Node, and whether it is a seed node)
50    MemberUp(Node, bool),
51    MemberDown(SocketAddr),
52}
53
54impl Display for NodeEvent {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        match self {
57            NodeEvent::MemberUp(node, seed) => {
58                write!(f, "MemberUp: {} (seed: {})", node.socket_addr, seed)
59            }
60            NodeEvent::MemberDown(addr) => write!(f, "MemberDown: {}", addr),
61        }
62    }
63}
64
65#[derive(Message)]
66#[rtype(result = "()")]
67pub struct ConnectToNode(pub(crate) SocketAddr);
68
69/// Central Actor for cluster handling
70pub struct Cluster {
71    ip_address: SocketAddr,
72    addrs: Vec<SocketAddr>,
73    own_addr: Option<Addr<Cluster>>,
74    nodes: HashMap<SocketAddr, Addr<NetworkInterface>>,
75}
76
77impl Actor for Cluster {
78    type Context = Context<Self>;
79
80    fn started(&mut self, ctx: &mut Self::Context) {
81        let listener = Cluster::bind(self.ip_address.to_string()).unwrap();
82
83        let st = Box::leak(listener).map(|st| {
84            let st = st.unwrap();
85            let addr = st.peer_addr().unwrap();
86            TcpConnect(st, addr)
87        });
88
89        ctx.add_message_stream(st);
90
91        self.own_addr = Some(ctx.address());
92
93        let addrs_len = self.addrs.len();
94        for node_addr in 0..addrs_len {
95            self.add_node(*self.addrs.get(node_addr).unwrap(), true);
96        }
97        debug!("Cluster started {}", self.ip_address);
98    }
99}
100
101impl Cluster {
102    pub fn new(ip_address: SocketAddr, seed_nodes: Vec<SocketAddr>) -> Addr<Cluster> {
103        Self::new_with_connection_protocol(ip_address, seed_nodes, ConnectionProtocol::SingleSeed)
104    }
105
106    pub fn new_with_connection_protocol(
107        ip_address: SocketAddr,
108        seed_nodes: Vec<SocketAddr>,
109        connection_protocol: ConnectionProtocol,
110    ) -> Addr<Cluster> {
111        debug!("Cluster created");
112        Connector::start_service_from(connection_protocol, ip_address, seed_nodes.clone());
113
114        Cluster::start_service_with(move || Cluster {
115            ip_address,
116            addrs: seed_nodes.clone(),
117            own_addr: None,
118            nodes: Default::default(),
119        })
120    }
121
122    fn bind(addr: String) -> IoResult<Box<TcpListenerStream>> {
123        let addr = net::SocketAddr::from_str(&addr).unwrap();
124        let listener = Box::new(TcpListenerStream::new(
125            block_on(TcpListener::bind(&addr)).unwrap(),
126        ));
127        debug!("Listening on {}", addr);
128        Ok(listener)
129    }
130
131    fn add_node_from_stream(&mut self, addr: SocketAddr, stream: TcpStream) {
132        let own_ip = self.ip_address;
133        let node = NetworkInterface::from_stream(own_ip, addr, stream).start();
134        self.nodes.insert(addr, node);
135    }
136
137    fn add_node(&mut self, node_addr: SocketAddr, seed: bool) {
138        let own_ip = self.ip_address;
139        self.nodes
140            .entry(node_addr)
141            .or_insert_with(|| NetworkInterface::new(own_ip, node_addr, seed).start());
142    }
143}
144
145// Singleton
146
147impl Default for Cluster {
148    fn default() -> Self {
149        let ip_addr = "127.0.0.1:8000";
150
151        Self {
152            ip_address: SocketAddr::from_str(ip_addr).unwrap(),
153            addrs: vec![],
154            own_addr: None,
155            nodes: HashMap::new(),
156        }
157    }
158}
159
160impl Supervised for Cluster {}
161impl SystemService for Cluster {}
162impl CustomSystemService for Cluster {}
163
164impl Handler<TcpConnect> for Cluster {
165    type Result = ();
166
167    fn handle(&mut self, msg: TcpConnect, _ctx: &mut Self::Context) -> Self::Result {
168        debug!("Incoming TcpConnect");
169        let stream = msg.0;
170        let addr = msg.1;
171        self.add_node_from_stream(addr, stream);
172    }
173}
174
175impl Handler<ConnectToNode> for Cluster {
176    type Result = ();
177
178    fn handle(&mut self, msg: ConnectToNode, _ctx: &mut Self::Context) -> Self::Result {
179        self.add_node(msg.0, false);
180    }
181}
182
183impl Handler<NodeEvent> for Cluster {
184    type Result = ();
185
186    fn handle(&mut self, msg: NodeEvent, _ctx: &mut Self::Context) -> Self::Result {
187        match &msg {
188            NodeEvent::MemberUp(node, _seed) => {
189                self.issue_system_async(ClusterLog::NewMember(node.clone()));
190            }
191            NodeEvent::MemberDown(host) => {
192                self.issue_system_async(ClusterLog::MemberLeft(*host));
193                self.nodes.remove(host);
194            }
195        }
196
197        Connector::from_custom_registry().do_send(msg)
198    }
199}
200
201impl Handler<ConnectionApproval> for Cluster {
202    type Result = ConnectionApprovalResponse;
203
204    fn handle(
205        &mut self,
206        msg: ConnectionApproval,
207        _ctx: &mut Self::Context,
208    ) -> ConnectionApprovalResponse {
209        if self.nodes.contains_key(&msg.addr) {
210            ConnectionApprovalResponse::Declined
211        } else {
212            let node = self
213                .nodes
214                .get(&msg.send_addr)
215                .expect("Should be filled")
216                .clone();
217            self.nodes.remove(&msg.send_addr);
218            self.nodes.insert(msg.addr, node);
219            ConnectionApprovalResponse::Approved
220        }
221    }
222}