actix_telepathy/cluster/connector/
gossip.rs1use crate::cluster::connector::messages::{
2 GossipEvent, GossipJoining, GossipMessage, NodeResolving,
3};
4use crate::cluster::connector::{Connector, ConnectorVariant};
5use crate::network::NetworkInterface;
6use crate::{Cluster, ConnectToNode, CustomSystemService, Node, NodeEvent, RemoteAddr};
7use actix::prelude::*;
8use log::*;
9use rand::prelude::{IteratorRandom, ThreadRng};
10use std::collections::{HashMap, HashSet};
11use std::iter::FromIterator;
12use std::net::SocketAddr;
13use std::str::FromStr;
14
15const CONNECTOR: &str = "Connector";
16
17#[derive(Debug, Clone)]
18enum GossipState {
19 Lonely,
20 Joining,
21 Joined,
22}
23
24pub struct Gossip {
32 own_addr: SocketAddr,
33 members: HashMap<SocketAddr, Addr<NetworkInterface>>,
34 waiting_to_add: HashSet<SocketAddr>,
35 state: GossipState,
36 about_to_join: Option<usize>,
37 gossip_msgs: Vec<GossipMessage>,
38 info_msgs_to_send: Vec<Node>,
39 seed_nodes: Vec<SocketAddr>,
40}
41
42impl Default for Gossip {
43 fn default() -> Self {
44 Self {
45 own_addr: SocketAddr::from_str("127.0.0.1:8000").unwrap(),
46 members: HashMap::new(),
47 waiting_to_add: HashSet::new(),
48 state: GossipState::Lonely,
49 about_to_join: None,
50 gossip_msgs: vec![],
51 info_msgs_to_send: vec![],
52 seed_nodes: vec![],
53 }
54 }
55}
56
57impl Gossip {
58 pub fn new(own_addr: SocketAddr, seed_nodes: Vec<SocketAddr>) -> Self {
59 Self {
60 own_addr,
61 seed_nodes,
62 ..Default::default()
63 }
64 }
65
66 fn add_member(&mut self, node: Node) {
67 self.members.insert(
68 node.socket_addr,
69 node.network_interface.expect("Empty network interface"),
70 );
71 debug!(target: &self.own_addr.to_string(), "Member {} added!", node.socket_addr.to_string());
72 }
73
74 fn remove_member(&mut self, addr: SocketAddr) {
75 self.members.remove(&addr);
76 debug!(target: &self.own_addr.to_string(), "Member {} removed", addr.to_string());
77 }
78
79 fn ignite_member_up(&self, new_addr: SocketAddr) {
80 debug!(target: &self.own_addr.to_string(), "Igniting member up {}", new_addr.to_string());
81 self.gossip_member_event(
82 new_addr,
83 GossipEvent::Join,
84 HashSet::from_iter([self.own_addr, new_addr]),
85 );
86 }
87
88 fn ignite_member_down(&self, leaving_addr: SocketAddr) {
89 debug!(target: &self.own_addr.to_string(), "Igniting member down {}", leaving_addr.to_string());
90 self.gossip_member_event(
91 leaving_addr,
92 GossipEvent::Leave,
93 HashSet::from_iter([self.own_addr]),
94 );
95 }
96
97 fn gossip_member_event(&self, addr: SocketAddr, event: GossipEvent, seen: HashSet<SocketAddr>) {
98 debug!(target: &self.own_addr.to_string(), "Gossiping member event {} {:?} {:?}", addr.to_string(), event, seen);
99 let random_members = self.choose_random_members(3, &seen);
100
101 let gossip_message = GossipMessage { event, addr, seen };
102
103 for member in random_members {
104 member.do_send(gossip_message.clone())
105 }
106 }
107
108 fn choose_random_members(
109 &self,
110 amount: usize,
111 except: &HashSet<SocketAddr>,
112 ) -> Vec<RemoteAddr> {
113 let mut rng = ThreadRng::default();
114 self.members
115 .iter()
116 .filter(|(addr, _)| !except.contains(addr))
117 .choose_multiple(&mut rng, amount)
118 .into_iter()
119 .map(|(socket_addr, network_interface)| {
120 RemoteAddr::new_connector(*socket_addr, Some(network_interface.clone()))
121 })
122 .collect()
123 }
124
125 fn connect_to_node(&mut self, addr: &SocketAddr) {
126 self.waiting_to_add.insert(*addr);
127 Cluster::from_custom_registry().do_send(ConnectToNode(*addr))
128 }
129
130 fn all_seen(&self, seen: &HashSet<SocketAddr>) -> bool {
131 let members: HashSet<SocketAddr> = self.members.keys().cloned().collect();
132 members
133 .difference(seen)
134 .collect::<HashSet<&SocketAddr>>()
135 .is_empty()
136 }
137
138 fn handle_gossip_queue(&mut self) {
139 for _ in 0..self.gossip_msgs.len() {
140 if let Some(gossip_msg) = self.gossip_msgs.pop() {
141 self.handle_gossip_message(gossip_msg);
142 }
143 }
144 }
145
146 pub(crate) fn handle_gossip_message(&mut self, msg: GossipMessage) {
147 match &self.state {
148 GossipState::Lonely => {
149 error!(target: &self.own_addr.to_string(), "Received a GossipMessage while in LONELY state!")
150 }
151 GossipState::Joining => {
152 self.gossip_msgs.push(msg);
153 return;
154 }
155 GossipState::Joined => (),
156 }
157
158 let all_seen = self.all_seen(&msg.seen);
159 let mut seen = msg.seen;
160 let member_contains = self.members.contains_key(&msg.addr);
161 seen.insert(self.own_addr);
162
163 match &msg.event {
164 GossipEvent::Join => {
165 if member_contains & all_seen {
166 return;
167 }
168
169 if !member_contains {
170 self.connect_to_node(&msg.addr);
171 }
172 }
173 GossipEvent::Leave => {
174 if !member_contains & all_seen {
175 return;
176 }
177
178 if member_contains {
179 self.members.remove(&msg.addr);
180 }
181 }
182 }
183
184 self.gossip_member_event(msg.addr, msg.event, seen);
185 }
186
187 pub(crate) fn handle_gossip_joining(&mut self, msg: GossipJoining) {
188 match self.state {
189 GossipState::Joining => {
190 debug!(target: &self.own_addr.to_string(), "Received a GossipJoining message while in JOINING state! {} members", msg.about_to_join);
191 self.about_to_join = Some(msg.about_to_join);
192 if msg.about_to_join <= self.members.len() {
193 self.change_state(GossipState::Joined);
194 }
195 }
196 _ => {
197 warn!(target: &self.own_addr.to_string(), "Received a GossipJoining message while not in JOINING state!")
198 }
199 }
200 }
201
202 fn change_state(&mut self, state: GossipState) {
203 debug!(target: &self.own_addr.to_string(), "changing state to {:?}", state);
204 self.state = state.clone();
205 match state {
206 GossipState::Joining => (),
207 GossipState::Joined => {
208 self.handle_gossip_queue();
209 self.share_info_with_joining_members();
210 }
211 GossipState::Lonely => (),
212 }
213 }
214
215 fn share_info_with_joining_member(&self, node: Node) {
216 debug!(target: &self.own_addr.to_string(), "Sharing info with joining member {}", node.socket_addr.to_string());
217 node.get_remote_addr(CONNECTOR.to_string())
218 .do_send(GossipJoining {
219 about_to_join: self.members.len(),
220 });
221 self.ignite_member_up(node.socket_addr);
222 }
223
224 fn share_info_with_joining_members(&mut self) {
225 while let Some(node) = self.info_msgs_to_send.pop() {
226 self.share_info_with_joining_member(node)
227 }
228 }
229
230 fn seed_nodes_already_members(&self) -> bool {
231 self.seed_nodes
232 .iter()
233 .all(|seed_node| self.members.contains_key(seed_node))
234 }
235}
236
237impl ConnectorVariant for Gossip {
238 fn handle_node_event(&mut self, msg: NodeEvent, _ctx: &mut Context<Connector>) {
239 match msg {
240 NodeEvent::MemberUp(node, seed) => {
241 self.add_member(node.clone());
242 if !self.waiting_to_add.remove(&node.socket_addr) {
243 match &self.state {
244 GossipState::Lonely => {
245 if seed {
246 if self.seed_nodes_already_members() {
248 self.change_state(GossipState::Joined);
249 self.share_info_with_joining_member(node);
250 } else {
251 self.change_state(GossipState::Joining);
252 self.info_msgs_to_send.push(node);
253 }
254 } else {
255 self.change_state(GossipState::Joining);
257 }
258 }
259 GossipState::Joining => {
260 if seed {
261 warn!(target: &self.own_addr.to_string(), "Received a NodeEvent::MemberUp while in JOINING state but seed is true!");
262 self.info_msgs_to_send.push(node);
263 }
264
265 if let Some(about_to_join) = self.about_to_join {
266 if about_to_join == self.members.len() {
267 self.change_state(GossipState::Joined);
268 }
269 } else {
270 warn!(target: &self.own_addr.to_string(), "Received a NodeEvent::MemberUp while in JOINING state but about_to_join is None!")
271 }
272 }
273 GossipState::Joined => {
274 if seed {
275 self.share_info_with_joining_member(node);
276 }
277 }
278 }
279 }
280 }
281 NodeEvent::MemberDown(host) => {
282 self.remove_member(host);
283 self.ignite_member_down(host);
284 }
285 }
286 }
287
288 fn get_member_map(
289 &mut self,
290 _msg: NodeResolving,
291 _ctx: &mut Context<Connector>,
292 ) -> &HashMap<SocketAddr, Addr<NetworkInterface>> {
293 &self.members
294 }
295
296 fn get_own_addr(&self) -> SocketAddr {
297 self.own_addr
298 }
299}