Skip to main content

actix_telepathy/cluster/connector/
mod.rs

1pub use crate::cluster::connector::messages::NodeResolving;
2use crate::cluster::connector::messages::{GossipJoining, GossipMessage};
3use crate::{CustomSerialization, RemoteActor, RemoteMessage, RemoteWrapper};
4use crate::{CustomSystemService, Gossip, NetworkInterface, NodeEvent, SingleSeed};
5use actix::prelude::*;
6use log::*;
7use std::collections::HashMap;
8use std::net::SocketAddr;
9
10use self::messages::SingleSeedMembers;
11
12pub mod gossip;
13mod messages;
14pub mod single_seed;
15
16#[cfg(test)]
17mod tests;
18
19#[derive(Debug, Clone, Copy, Default)]
20pub enum ConnectionProtocol {
21    SingleSeed,
22    #[default]
23    Gossip,
24}
25
26#[derive(RemoteActor)]
27#[remote_messages(GossipMessage, GossipJoining, SingleSeedMembers)]
28pub enum Connector {
29    Gossip(Gossip),
30    SingleSeed(SingleSeed),
31}
32
33impl Connector {
34    pub fn from_connection_protocol(
35        connection_protocol: ConnectionProtocol,
36        own_address: SocketAddr,
37        seed_nodes: Vec<SocketAddr>,
38    ) -> Self {
39        match connection_protocol {
40            ConnectionProtocol::Gossip => Self::Gossip(Gossip::new(own_address, seed_nodes)),
41            ConnectionProtocol::SingleSeed => Self::SingleSeed(SingleSeed::new(own_address)),
42        }
43    }
44
45    pub fn start_service_from(
46        connection_protocol: ConnectionProtocol,
47        own_address: SocketAddr,
48        seed_nodes: Vec<SocketAddr>,
49    ) {
50        Self::start_service_with(move || {
51            Connector::from_connection_protocol(
52                connection_protocol,
53                own_address,
54                seed_nodes.clone(),
55            )
56        });
57    }
58}
59
60impl Actor for Connector {
61    type Context = Context<Self>;
62
63    fn started(&mut self, ctx: &mut Self::Context) {
64        self.register(ctx.address().recipient());
65        debug!("{} actor started", Self::ACTOR_ID);
66    }
67}
68
69impl Default for Connector {
70    fn default() -> Self {
71        Self::Gossip(Gossip::default())
72    }
73}
74
75impl Supervised for Connector {}
76impl SystemService for Connector {}
77impl CustomSystemService for Connector {
78    fn custom_service_started(&mut self, _ctx: &mut Context<Self>) {
79        debug!("Connector Service started");
80    }
81}
82
83pub trait ConnectorVariant {
84    fn handle_node_event(&mut self, msg: NodeEvent, ctx: &mut Context<Connector>);
85    fn get_member_map(
86        &mut self,
87        msg: NodeResolving,
88        ctx: &mut Context<Connector>,
89    ) -> &HashMap<SocketAddr, Addr<NetworkInterface>>;
90    fn get_own_addr(&self) -> SocketAddr;
91}
92
93impl Handler<NodeEvent> for Connector {
94    type Result = ();
95
96    fn handle(&mut self, msg: NodeEvent, ctx: &mut Self::Context) -> Self::Result {
97        match self {
98            Connector::Gossip(gossip) => gossip.handle_node_event(msg, ctx),
99            Connector::SingleSeed(single_seed) => single_seed.handle_node_event(msg, ctx),
100        }
101    }
102}
103
104impl Handler<NodeResolving> for Connector {
105    type Result = Result<Vec<Addr<NetworkInterface>>, ()>;
106
107    fn handle(&mut self, msg: NodeResolving, ctx: &mut Context<Self>) -> Self::Result {
108        let addrs = msg.addrs.clone();
109        let own_addr = match self {
110            Connector::Gossip(gossip) => gossip.get_own_addr(),
111            Connector::SingleSeed(single_seed) => single_seed.get_own_addr(),
112        };
113
114        let member_map = match self {
115            Connector::Gossip(gossip) => gossip.get_member_map(msg, ctx),
116            Connector::SingleSeed(single_seed) => single_seed.get_member_map(msg, ctx),
117        };
118
119        Ok(addrs
120            .into_iter()
121            .filter_map(|x| {
122                if x == own_addr {
123                    None
124                } else {
125                    Some(
126                        member_map
127                            .get(&x)
128                            .unwrap_or_else(|| panic!("Socket {} should be known!", &x))
129                            .clone(),
130                    )
131                }
132            })
133            .collect())
134    }
135}
136
137// --- Gossip impl ---
138
139impl Handler<GossipMessage> for Connector {
140    type Result = ();
141
142    fn handle(&mut self, msg: GossipMessage, _ctx: &mut Self::Context) -> Self::Result {
143        match self {
144            Connector::Gossip(gossip) => gossip.handle_gossip_message(msg),
145            _ => warn!("Connector can only handle GossipMessage if it is Connector::Gossip"),
146        }
147    }
148}
149
150impl Handler<GossipJoining> for Connector {
151    type Result = ();
152
153    fn handle(&mut self, msg: GossipJoining, _ctx: &mut Self::Context) -> Self::Result {
154        match self {
155            Connector::Gossip(gossip) => gossip.handle_gossip_joining(msg),
156            _ => warn!("Connector can only handle GossipJoining if it is Connector::Gossip"),
157        }
158    }
159}
160
161// --- SingleSeed impl ---
162
163impl Handler<SingleSeedMembers> for Connector {
164    type Result = ();
165
166    fn handle(&mut self, msg: SingleSeedMembers, _ctx: &mut Self::Context) -> Self::Result {
167        match self {
168            Connector::SingleSeed(single_seed) => single_seed.handle_single_seed_members(msg),
169            _ => {
170                warn!("Connector can only handle SingleSeedMembers if it is Connector::SingleSeed")
171            }
172        }
173    }
174}