Skip to main content

actix_telepathy/cluster/connector/
single_seed.rs

1use super::{ConnectorVariant, messages::SingleSeedMembers};
2use crate::{
3    Cluster, ConnectToNode, CustomSystemService, NetworkInterface, Node, NodeEvent, RemoteAddr,
4};
5use actix::Addr;
6use log::*;
7use std::{collections::HashMap, net::SocketAddr, str::FromStr};
8
9/// The SingleSeed connector variant expects all nodes to have the same seed node (except the seed node itself, it has no seed node).
10/// If another node is added, it will be added to the cluster by the seed node.
11/// If a node has a different seed node, errors can occur.
12/// This variant is recommended for a fast connection setup, but it is not recommended if the seed node is not always available.
13pub struct SingleSeed {
14    own_addr: SocketAddr,
15    members: HashMap<SocketAddr, Addr<NetworkInterface>>,
16}
17
18impl Default for SingleSeed {
19    fn default() -> Self {
20        Self {
21            own_addr: SocketAddr::from_str("127.0.0.1:8000").unwrap(),
22            members: HashMap::new(),
23        }
24    }
25}
26
27impl SingleSeed {
28    pub fn new(own_addr: SocketAddr) -> Self {
29        Self {
30            own_addr,
31            ..Default::default()
32        }
33    }
34
35    fn add_member(&mut self, node: &Node) {
36        self.members
37            .insert(node.socket_addr, node.clone().network_interface.unwrap());
38        debug!(target: &self.own_addr.to_string(), "Member {} added!", node.socket_addr);
39    }
40
41    fn remove_member(&mut self, addr: SocketAddr) {
42        self.members.remove(&addr);
43        debug!("Member {} removed", addr);
44    }
45
46    fn give_information(&mut self, member_addr: SocketAddr) {
47        let members: Vec<SocketAddr> = self
48            .members
49            .keys()
50            .filter_map(|x| if x.eq(&member_addr) { None } else { Some(*x) })
51            .collect();
52
53        if !members.is_empty() {
54            match self.members.get(&member_addr) {
55                Some(node) => RemoteAddr::new_connector(member_addr, Some(node.clone()))
56                    .do_send(SingleSeedMembers(members)),
57                None => error!("Should be known by now"),
58            }
59        }
60    }
61
62    pub(crate) fn handle_single_seed_members(&mut self, msg: SingleSeedMembers) {
63        for addr in msg.0 {
64            Cluster::from_custom_registry().do_send(ConnectToNode(addr))
65        }
66    }
67}
68
69impl ConnectorVariant for SingleSeed {
70    fn handle_node_event(
71        &mut self,
72        msg: crate::NodeEvent,
73        _ctx: &mut actix::prelude::Context<crate::Connector>,
74    ) {
75        match msg {
76            NodeEvent::MemberUp(node, seed) => {
77                self.add_member(&node);
78                if seed {
79                    self.give_information(node.socket_addr);
80                }
81            }
82            NodeEvent::MemberDown(addr) => {
83                self.remove_member(addr);
84            }
85        }
86    }
87
88    fn get_member_map(
89        &mut self,
90        _msg: crate::NodeResolving,
91        _ctx: &mut actix::prelude::Context<crate::Connector>,
92    ) -> &HashMap<SocketAddr, Addr<NetworkInterface>> {
93        &self.members
94    }
95
96    fn get_own_addr(&self) -> SocketAddr {
97        self.own_addr
98    }
99}