actix_telepathy/cluster/connector/
mod.rs1pub use crate::cluster::connector::messages::NodeResolving;
2use crate::cluster::connector::messages::{GossipJoining, GossipMessage};
3use crate::{CustomSerialization, RemoteActor, RemoteMessage, RemoteWrapper};
4use crate::{CustomSystemService, Gossip, NetworkInterface, NodeEvent, SingleSeed};
5use actix::prelude::*;
6use log::*;
7use std::collections::HashMap;
8use std::net::SocketAddr;
9
10use self::messages::SingleSeedMembers;
11
12pub mod gossip;
13mod messages;
14pub mod single_seed;
15
16#[cfg(test)]
17mod tests;
18
19#[derive(Debug, Clone, Copy, Default)]
20pub enum ConnectionProtocol {
21 SingleSeed,
22 #[default]
23 Gossip,
24}
25
26#[derive(RemoteActor)]
27#[remote_messages(GossipMessage, GossipJoining, SingleSeedMembers)]
28pub enum Connector {
29 Gossip(Gossip),
30 SingleSeed(SingleSeed),
31}
32
33impl Connector {
34 pub fn from_connection_protocol(
35 connection_protocol: ConnectionProtocol,
36 own_address: SocketAddr,
37 seed_nodes: Vec<SocketAddr>,
38 ) -> Self {
39 match connection_protocol {
40 ConnectionProtocol::Gossip => Self::Gossip(Gossip::new(own_address, seed_nodes)),
41 ConnectionProtocol::SingleSeed => Self::SingleSeed(SingleSeed::new(own_address)),
42 }
43 }
44
45 pub fn start_service_from(
46 connection_protocol: ConnectionProtocol,
47 own_address: SocketAddr,
48 seed_nodes: Vec<SocketAddr>,
49 ) {
50 Self::start_service_with(move || {
51 Connector::from_connection_protocol(
52 connection_protocol,
53 own_address,
54 seed_nodes.clone(),
55 )
56 });
57 }
58}
59
60impl Actor for Connector {
61 type Context = Context<Self>;
62
63 fn started(&mut self, ctx: &mut Self::Context) {
64 self.register(ctx.address().recipient());
65 debug!("{} actor started", Self::ACTOR_ID);
66 }
67}
68
69impl Default for Connector {
70 fn default() -> Self {
71 Self::Gossip(Gossip::default())
72 }
73}
74
75impl Supervised for Connector {}
76impl SystemService for Connector {}
77impl CustomSystemService for Connector {
78 fn custom_service_started(&mut self, _ctx: &mut Context<Self>) {
79 debug!("Connector Service started");
80 }
81}
82
83pub trait ConnectorVariant {
84 fn handle_node_event(&mut self, msg: NodeEvent, ctx: &mut Context<Connector>);
85 fn get_member_map(
86 &mut self,
87 msg: NodeResolving,
88 ctx: &mut Context<Connector>,
89 ) -> &HashMap<SocketAddr, Addr<NetworkInterface>>;
90 fn get_own_addr(&self) -> SocketAddr;
91}
92
93impl Handler<NodeEvent> for Connector {
94 type Result = ();
95
96 fn handle(&mut self, msg: NodeEvent, ctx: &mut Self::Context) -> Self::Result {
97 match self {
98 Connector::Gossip(gossip) => gossip.handle_node_event(msg, ctx),
99 Connector::SingleSeed(single_seed) => single_seed.handle_node_event(msg, ctx),
100 }
101 }
102}
103
104impl Handler<NodeResolving> for Connector {
105 type Result = Result<Vec<Addr<NetworkInterface>>, ()>;
106
107 fn handle(&mut self, msg: NodeResolving, ctx: &mut Context<Self>) -> Self::Result {
108 let addrs = msg.addrs.clone();
109 let own_addr = match self {
110 Connector::Gossip(gossip) => gossip.get_own_addr(),
111 Connector::SingleSeed(single_seed) => single_seed.get_own_addr(),
112 };
113
114 let member_map = match self {
115 Connector::Gossip(gossip) => gossip.get_member_map(msg, ctx),
116 Connector::SingleSeed(single_seed) => single_seed.get_member_map(msg, ctx),
117 };
118
119 Ok(addrs
120 .into_iter()
121 .filter_map(|x| {
122 if x == own_addr {
123 None
124 } else {
125 Some(
126 member_map
127 .get(&x)
128 .unwrap_or_else(|| panic!("Socket {} should be known!", &x))
129 .clone(),
130 )
131 }
132 })
133 .collect())
134 }
135}
136
137impl Handler<GossipMessage> for Connector {
140 type Result = ();
141
142 fn handle(&mut self, msg: GossipMessage, _ctx: &mut Self::Context) -> Self::Result {
143 match self {
144 Connector::Gossip(gossip) => gossip.handle_gossip_message(msg),
145 _ => warn!("Connector can only handle GossipMessage if it is Connector::Gossip"),
146 }
147 }
148}
149
150impl Handler<GossipJoining> for Connector {
151 type Result = ();
152
153 fn handle(&mut self, msg: GossipJoining, _ctx: &mut Self::Context) -> Self::Result {
154 match self {
155 Connector::Gossip(gossip) => gossip.handle_gossip_joining(msg),
156 _ => warn!("Connector can only handle GossipJoining if it is Connector::Gossip"),
157 }
158 }
159}
160
161impl Handler<SingleSeedMembers> for Connector {
164 type Result = ();
165
166 fn handle(&mut self, msg: SingleSeedMembers, _ctx: &mut Self::Context) -> Self::Result {
167 match self {
168 Connector::SingleSeed(single_seed) => single_seed.handle_single_seed_members(msg),
169 _ => {
170 warn!("Connector can only handle SingleSeedMembers if it is Connector::SingleSeed")
171 }
172 }
173 }
174}