actix_telepathy/cluster/connector/
gossip.rs

1use crate::cluster::connector::messages::{
2    GossipEvent, GossipJoining, GossipMessage, NodeResolving,
3};
4use crate::cluster::connector::{Connector, ConnectorVariant};
5use crate::network::NetworkInterface;
6use crate::{Cluster, ConnectToNode, CustomSystemService, Node, NodeEvent, RemoteAddr};
7use actix::prelude::*;
8use log::*;
9use rand::prelude::{IteratorRandom, ThreadRng};
10use std::collections::{HashMap, HashSet};
11use std::iter::FromIterator;
12use std::net::SocketAddr;
13use std::str::FromStr;
14
15const CONNECTOR: &str = "Connector";
16
17#[derive(Debug, Clone)]
18enum GossipState {
19    Lonely,
20    Joining,
21    Joined,
22}
23
24/// The Gossip connector variant can connect the nodes to each other. Each node can have a different seed node.
25/// When joining the cluster, the node will connect to its seed node and receives the number of nodes that are about to join.
26/// The seed node of that node will then send the joining node's information to the other nodes via the Gossip protocol.
27/// Thereby, the seed node randomly chooses 3 nodes and sends the information to them. These 3 nodes will connect to the joining node.
28/// Then the 3 nodes will send the information to 3 other nodes and so on.
29/// This variant is recommended if the seed node is not always available.
30/// This variant is not recommended if the cluster is very large, because the gossip protocol takes more time the larger the cluster is.
31pub struct Gossip {
32    own_addr: SocketAddr,
33    members: HashMap<SocketAddr, Addr<NetworkInterface>>,
34    waiting_to_add: HashSet<SocketAddr>,
35    state: GossipState,
36    about_to_join: Option<usize>,
37    gossip_msgs: Vec<GossipMessage>,
38    info_msgs_to_send: Vec<Node>,
39    seed_nodes: Vec<SocketAddr>,
40}
41
42impl Default for Gossip {
43    fn default() -> Self {
44        Self {
45            own_addr: SocketAddr::from_str("127.0.0.1:8000").unwrap(),
46            members: HashMap::new(),
47            waiting_to_add: HashSet::new(),
48            state: GossipState::Lonely,
49            about_to_join: None,
50            gossip_msgs: vec![],
51            info_msgs_to_send: vec![],
52            seed_nodes: vec![],
53        }
54    }
55}
56
57impl Gossip {
58    pub fn new(own_addr: SocketAddr, seed_nodes: Vec<SocketAddr>) -> Self {
59        Self {
60            own_addr,
61            seed_nodes,
62            ..Default::default()
63        }
64    }
65
66    fn add_member(&mut self, node: Node) {
67        self.members.insert(
68            node.socket_addr,
69            node.network_interface.expect("Empty network interface"),
70        );
71        debug!(target: &self.own_addr.to_string(), "Member {} added!", node.socket_addr.to_string());
72    }
73
74    fn remove_member(&mut self, addr: SocketAddr) {
75        self.members.remove(&addr);
76        debug!(target: &self.own_addr.to_string(), "Member {} removed", addr.to_string());
77    }
78
79    fn ignite_member_up(&self, new_addr: SocketAddr) {
80        debug!(target: &self.own_addr.to_string(), "Igniting member up {}", new_addr.to_string());
81        self.gossip_member_event(
82            new_addr,
83            GossipEvent::Join,
84            HashSet::from_iter([self.own_addr, new_addr]),
85        );
86    }
87
88    fn ignite_member_down(&self, leaving_addr: SocketAddr) {
89        debug!(target: &self.own_addr.to_string(), "Igniting member down {}", leaving_addr.to_string());
90        self.gossip_member_event(
91            leaving_addr,
92            GossipEvent::Leave,
93            HashSet::from_iter([self.own_addr]),
94        );
95    }
96
97    fn gossip_member_event(&self, addr: SocketAddr, event: GossipEvent, seen: HashSet<SocketAddr>) {
98        debug!(target: &self.own_addr.to_string(), "Gossiping member event {} {:?} {:?}", addr.to_string(), event, seen);
99        let random_members = self.choose_random_members(3, &seen);
100
101        let gossip_message = GossipMessage { event, addr, seen };
102
103        for member in random_members {
104            member.do_send(gossip_message.clone())
105        }
106    }
107
108    fn choose_random_members(
109        &self,
110        amount: usize,
111        except: &HashSet<SocketAddr>,
112    ) -> Vec<RemoteAddr> {
113        let mut rng = ThreadRng::default();
114        self.members
115            .iter()
116            .filter(|(addr, _)| !except.contains(addr))
117            .choose_multiple(&mut rng, amount)
118            .into_iter()
119            .map(|(socket_addr, network_interface)| {
120                RemoteAddr::new_connector(*socket_addr, Some(network_interface.clone()))
121            })
122            .collect()
123    }
124
125    fn connect_to_node(&mut self, addr: &SocketAddr) {
126        self.waiting_to_add.insert(*addr);
127        Cluster::from_custom_registry().do_send(ConnectToNode(*addr))
128    }
129
130    fn all_seen(&self, seen: &HashSet<SocketAddr>) -> bool {
131        let members: HashSet<SocketAddr> = self.members.keys().cloned().collect();
132        members
133            .difference(seen)
134            .collect::<HashSet<&SocketAddr>>()
135            .is_empty()
136    }
137
138    fn handle_gossip_queue(&mut self) {
139        for _ in 0..self.gossip_msgs.len() {
140            if let Some(gossip_msg) = self.gossip_msgs.pop() {
141                self.handle_gossip_message(gossip_msg);
142            }
143        }
144    }
145
146    pub(crate) fn handle_gossip_message(&mut self, msg: GossipMessage) {
147        match &self.state {
148            GossipState::Lonely => {
149                error!(target: &self.own_addr.to_string(), "Received a GossipMessage while in LONELY state!")
150            }
151            GossipState::Joining => {
152                self.gossip_msgs.push(msg);
153                return;
154            }
155            GossipState::Joined => (),
156        }
157
158        let all_seen = self.all_seen(&msg.seen);
159        let mut seen = msg.seen;
160        let member_contains = self.members.contains_key(&msg.addr);
161        seen.insert(self.own_addr);
162
163        match &msg.event {
164            GossipEvent::Join => {
165                if member_contains & all_seen {
166                    return;
167                }
168
169                if !member_contains {
170                    self.connect_to_node(&msg.addr);
171                }
172            }
173            GossipEvent::Leave => {
174                if !member_contains & all_seen {
175                    return;
176                }
177
178                if member_contains {
179                    self.members.remove(&msg.addr);
180                }
181            }
182        }
183
184        self.gossip_member_event(msg.addr, msg.event, seen);
185    }
186
187    pub(crate) fn handle_gossip_joining(&mut self, msg: GossipJoining) {
188        match self.state {
189            GossipState::Joining => {
190                debug!(target: &self.own_addr.to_string(), "Received a GossipJoining message while in JOINING state! {} members", msg.about_to_join);
191                self.about_to_join = Some(msg.about_to_join);
192                if msg.about_to_join <= self.members.len() {
193                    self.change_state(GossipState::Joined);
194                }
195            }
196            _ => {
197                warn!(target: &self.own_addr.to_string(), "Received a GossipJoining message while not in JOINING state!")
198            }
199        }
200    }
201
202    fn change_state(&mut self, state: GossipState) {
203        debug!(target: &self.own_addr.to_string(), "changing state to {:?}", state);
204        self.state = state.clone();
205        match state {
206            GossipState::Joining => (),
207            GossipState::Joined => {
208                self.handle_gossip_queue();
209                self.share_info_with_joining_members();
210            }
211            GossipState::Lonely => (),
212        }
213    }
214
215    fn share_info_with_joining_member(&self, node: Node) {
216        debug!(target: &self.own_addr.to_string(), "Sharing info with joining member {}", node.socket_addr.to_string());
217        node.get_remote_addr(CONNECTOR.to_string())
218            .do_send(GossipJoining {
219                about_to_join: self.members.len(),
220            });
221        self.ignite_member_up(node.socket_addr);
222    }
223
224    fn share_info_with_joining_members(&mut self) {
225        while let Some(node) = self.info_msgs_to_send.pop() {
226            self.share_info_with_joining_member(node)
227        }
228    }
229
230    fn seed_nodes_already_members(&self) -> bool {
231        self.seed_nodes
232            .iter()
233            .all(|seed_node| self.members.contains_key(seed_node))
234    }
235}
236
237impl ConnectorVariant for Gossip {
238    fn handle_node_event(&mut self, msg: NodeEvent, _ctx: &mut Context<Connector>) {
239        match msg {
240            NodeEvent::MemberUp(node, seed) => {
241                self.add_member(node.clone());
242                if !self.waiting_to_add.remove(&node.socket_addr) {
243                    match &self.state {
244                        GossipState::Lonely => {
245                            if seed {
246                                // if current node is considered seed, we are the seed and therefore are already joined
247                                if self.seed_nodes_already_members() {
248                                    self.change_state(GossipState::Joined);
249                                    self.share_info_with_joining_member(node);
250                                } else {
251                                    self.change_state(GossipState::Joining);
252                                    self.info_msgs_to_send.push(node);
253                                }
254                            } else {
255                                // else we are not the seed and therefore need to join
256                                self.change_state(GossipState::Joining);
257                            }
258                        }
259                        GossipState::Joining => {
260                            if seed {
261                                warn!(target: &self.own_addr.to_string(), "Received a NodeEvent::MemberUp while in JOINING state but seed is true!");
262                                self.info_msgs_to_send.push(node);
263                            }
264
265                            if let Some(about_to_join) = self.about_to_join {
266                                if about_to_join == self.members.len() {
267                                    self.change_state(GossipState::Joined);
268                                }
269                            } else {
270                                warn!(target: &self.own_addr.to_string(), "Received a NodeEvent::MemberUp while in JOINING state but about_to_join is None!")
271                            }
272                        }
273                        GossipState::Joined => {
274                            if seed {
275                                self.share_info_with_joining_member(node);
276                            }
277                        }
278                    }
279                }
280            }
281            NodeEvent::MemberDown(host) => {
282                self.remove_member(host);
283                self.ignite_member_down(host);
284            }
285        }
286    }
287
288    fn get_member_map(
289        &mut self,
290        _msg: NodeResolving,
291        _ctx: &mut Context<Connector>,
292    ) -> &HashMap<SocketAddr, Addr<NetworkInterface>> {
293        &self.members
294    }
295
296    fn get_own_addr(&self) -> SocketAddr {
297        self.own_addr
298    }
299}