1use std::{
2 collections::{HashMap, VecDeque},
3 fmt::Debug,
4};
5
6use atm0s_sdn_identity::{ConnId, NodeAddr, NodeId};
7use atm0s_sdn_utils::hash::hash_str;
8use sans_io_runtime::collections::DynamicDeque;
9
10use crate::{
11 base::{ConnectionEvent, Service, ServiceBuilder, ServiceCtx, ServiceInput, ServiceOutput, ServiceSharedInput, ServiceWorker, ServiceWorkerCtx, ServiceWorkerInput, ServiceWorkerOutput},
12 features::{
13 dht_kv::{Control as KvControl, Event as KvEvent, Key, Map, MapControl, MapEvent},
14 neighbours::Control as NeighbourControl,
15 FeaturesControl, FeaturesEvent,
16 },
17};
18
19const RETRY_CONNECT_MS: u64 = 60_000; const WAIT_DISCONNECT_MS: u64 = 60_000; pub const SERVICE_ID: u8 = 0;
23pub const SERVICE_NAME: &str = "manual_discovery";
24
25fn kv_control<UserData, SE, TW>(c: KvControl) -> ServiceOutput<UserData, FeaturesControl, SE, TW> {
26 ServiceOutput::FeatureControl(FeaturesControl::DhtKv(c))
27}
28
29fn neighbour_control<UserData, SE, TW>(c: NeighbourControl) -> ServiceOutput<UserData, FeaturesControl, SE, TW> {
30 ServiceOutput::FeatureControl(FeaturesControl::Neighbours(c))
31}
32
33pub struct ManualDiscoveryService<UserData, SC, SE, TC, TW> {
34 node_addr: NodeAddr,
35 queue: VecDeque<ServiceOutput<UserData, FeaturesControl, SE, TW>>,
36 nodes: HashMap<NodeId, NodeAddr>,
37 conns: HashMap<NodeId, Vec<ConnId>>,
38 removing_list: HashMap<NodeId, u64>,
39 last_retry_ms: u64,
40 shutdown: bool,
41 _tmp: std::marker::PhantomData<(SC, TC, TW)>,
42}
43
44impl<UserData, SC, SE, TC, TW> ManualDiscoveryService<UserData, SC, SE, TC, TW> {
45 pub fn new(node_addr: NodeAddr, local_tags: Vec<String>, connect_tags: Vec<String>) -> Self {
46 log::info!("Creating ManualDiscoveryService for node {node_addr} with local tags {local_tags:?} and connect tags {connect_tags:?}");
47
48 let mut queue = VecDeque::new();
49
50 for local_tag in local_tags.iter() {
51 let map = Map(hash_str(local_tag));
52 log::info!("Setting local tag: {local_tag} by set key {map}");
53 queue.push_back(kv_control(KvControl::MapCmd(map, MapControl::Set(Key(0), node_addr.to_vec()))));
54 }
55
56 for connect_tag in connect_tags.iter() {
57 let map = Map(hash_str(connect_tag));
58 log::info!("Setting connect tag: {connect_tag} by sub key {map}");
59 queue.push_back(kv_control(KvControl::MapCmd(map, MapControl::Sub)));
60 }
61
62 Self {
63 node_addr,
64 nodes: HashMap::new(),
65 conns: HashMap::new(),
66 queue,
67 removing_list: HashMap::new(),
68 last_retry_ms: 0,
69 shutdown: false,
70 _tmp: std::marker::PhantomData,
71 }
72 }
73
74 fn check_nodes(&mut self, now: u64) {
75 if self.last_retry_ms + RETRY_CONNECT_MS <= now {
76 self.last_retry_ms = now;
77 for (node, addr) in self.nodes.iter() {
78 if !self.conns.contains_key(node) {
79 log::warn!("ManualDiscoveryService node {node} not connected, retry connect");
80 self.queue.push_back(neighbour_control(NeighbourControl::ConnectTo(addr.clone(), false)));
81 }
82 }
83 }
84
85 let mut will_disconnect = vec![];
86 for (node, ts) in self.removing_list.iter() {
87 if now >= *ts + WAIT_DISCONNECT_MS && !self.nodes.contains_key(node) {
88 log::info!("ManualDiscoveryService node {node} still in removing_list => send Disconnect");
89 self.queue.push_back(neighbour_control(NeighbourControl::DisconnectFrom(*node)));
90 will_disconnect.push(*node);
91 }
92 }
93
94 for node in will_disconnect {
95 self.removing_list.remove(&node);
96 }
97 }
98}
99
100impl<UserData, SC, SE, TC: Debug, TW: Debug> Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for ManualDiscoveryService<UserData, SC, SE, TC, TW> {
101 fn is_service_empty(&self) -> bool {
102 self.shutdown && self.queue.is_empty()
103 }
104
105 fn service_id(&self) -> u8 {
106 SERVICE_ID
107 }
108
109 fn service_name(&self) -> &str {
110 SERVICE_NAME
111 }
112
113 fn on_shared_input<'a>(&mut self, _ctx: &ServiceCtx, now: u64, input: ServiceSharedInput) {
114 match input {
115 ServiceSharedInput::Tick(_) => self.check_nodes(now),
116 ServiceSharedInput::Connection(ConnectionEvent::Connected(ctx, _)) => {
117 let entry = self.conns.entry(ctx.node).or_default();
118 entry.push(ctx.conn);
119 }
120 ServiceSharedInput::Connection(ConnectionEvent::Disconnected(ctx)) => {
121 let entry = self.conns.entry(ctx.node).or_default();
122 entry.retain(|&conn| conn != ctx.conn);
123
124 if entry.is_empty() {
125 log::info!("ManualDiscoveryService node {} disconnected all connections => remove", ctx.node);
126 self.conns.remove(&ctx.node);
127 }
128 }
129 _ => {}
130 }
131 }
132
133 fn on_input(&mut self, _ctx: &ServiceCtx, now: u64, input: ServiceInput<UserData, FeaturesEvent, SC, TC>) {
134 if let ServiceInput::FeatureEvent(FeaturesEvent::DhtKv(KvEvent::MapEvent(map, event))) = input {
135 match event {
136 MapEvent::OnSet(_, source, value) => {
137 if source == self.node_addr.node_id() {
138 return;
139 }
140 if let Some(addr) = NodeAddr::from_vec(&value) {
141 log::info!("ManualDiscoveryService node {source} added tag {map} => connect {addr}");
142 self.nodes.insert(source, addr.clone());
143 self.queue.push_back(neighbour_control(NeighbourControl::ConnectTo(addr, false)));
144 self.removing_list.remove(&source);
145 }
146 }
147 MapEvent::OnDel(_, source) => {
148 self.nodes.remove(&source);
149 self.removing_list.entry(source).or_insert_with(|| {
150 log::info!("ManualDiscoveryService node {source} removed tag {map} => push to removing_list");
151 now
152 });
153 }
154 MapEvent::OnRelaySelected(node) => {
155 log::info!("ManualDiscoveryService relay {node} selected for tag {map}");
156 }
157 }
158 }
159 }
160
161 fn on_shutdown(&mut self, _ctx: &ServiceCtx, _now: u64) {
162 log::info!("[ManualDiscoveryService] Shutdown");
163 self.shutdown = true;
164 }
165
166 fn pop_output2(&mut self, _now: u64) -> Option<ServiceOutput<UserData, FeaturesControl, SE, TW>> {
167 self.queue.pop_front()
168 }
169}
170
171pub struct ManualDiscoveryServiceWorker<UserData, SC, SE, TC> {
172 queue: DynamicDeque<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>, 8>,
173 shutdown: bool,
174}
175
176impl<UserData, SC, SE, TC, TW> ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for ManualDiscoveryServiceWorker<UserData, SC, SE, TC> {
177 fn is_service_empty(&self) -> bool {
178 self.shutdown && self.queue.is_empty()
179 }
180
181 fn service_id(&self) -> u8 {
182 SERVICE_ID
183 }
184
185 fn service_name(&self) -> &str {
186 SERVICE_NAME
187 }
188
189 fn on_tick(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, _tick_count: u64) {}
190
191 fn on_input(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, input: ServiceWorkerInput<UserData, FeaturesEvent, SC, TW>) {
192 match input {
193 ServiceWorkerInput::Control(actor, control) => self.queue.push_back(ServiceWorkerOutput::ForwardControlToController(actor, control)),
194 ServiceWorkerInput::FeatureEvent(event) => self.queue.push_back(ServiceWorkerOutput::ForwardFeatureEventToController(event)),
195 ServiceWorkerInput::FromController(_) => {}
196 }
197 }
198
199 fn on_shutdown(&mut self, _ctx: &ServiceWorkerCtx, _now: u64) {
200 log::info!("[ManualDiscoveryServiceWorker] Shutdown");
201 self.shutdown = true;
202 }
203
204 fn pop_output2(&mut self, _now: u64) -> Option<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>> {
205 self.queue.pop_front()
206 }
207}
208
209pub struct ManualDiscoveryServiceBuilder<UserData, SC, SE, TC, TW> {
210 _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
211 node_addr: NodeAddr,
212 local_tags: Vec<String>,
213 connect_tags: Vec<String>,
214}
215
216impl<UserData, SC, SE, TC, TW> ManualDiscoveryServiceBuilder<UserData, SC, SE, TC, TW> {
217 pub fn new(node_addr: NodeAddr, local_tags: Vec<String>, connect_tags: Vec<String>) -> Self {
218 Self {
219 _tmp: std::marker::PhantomData,
220 node_addr,
221 local_tags,
222 connect_tags,
223 }
224 }
225}
226
227impl<UserData, SC, SE, TC, TW> ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for ManualDiscoveryServiceBuilder<UserData, SC, SE, TC, TW>
228where
229 UserData: 'static + Debug + Send + Sync,
230 SC: 'static + Debug + Send + Sync,
231 SE: 'static + Debug + Send + Sync,
232 TC: 'static + Debug + Send + Sync,
233 TW: 'static + Debug + Send + Sync,
234{
235 fn service_id(&self) -> u8 {
236 SERVICE_ID
237 }
238
239 fn service_name(&self) -> &str {
240 SERVICE_NAME
241 }
242
243 fn create(&self) -> Box<dyn Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
244 Box::new(ManualDiscoveryService::new(self.node_addr.clone(), self.local_tags.clone(), self.connect_tags.clone()))
245 }
246
247 fn create_worker(&self) -> Box<dyn ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
248 Box::new(ManualDiscoveryServiceWorker {
249 queue: Default::default(),
250 shutdown: false,
251 })
252 }
253}
254
255#[cfg(test)]
256mod test {
257 use atm0s_sdn_identity::{ConnId, NodeAddr, NodeAddrBuilder, Protocol};
258 use atm0s_sdn_utils::hash::hash_str;
259
260 use crate::{
261 base::{Service, ServiceCtx, ServiceInput, ServiceOutput, ServiceSharedInput},
262 features::{
263 dht_kv::{self, Key, Map, MapControl, MapEvent},
264 neighbours, FeaturesControl, FeaturesEvent,
265 },
266 services::manual_discovery::{RETRY_CONNECT_MS, WAIT_DISCONNECT_MS},
267 };
268
269 use super::ManualDiscoveryService;
270
271 fn node_addr(node: u32) -> NodeAddr {
272 let mut builder = NodeAddrBuilder::new(node);
273 builder.add_protocol(Protocol::Ip4([127, 0, 0, 1].into()));
274 builder.add_protocol(Protocol::Udp(node as u16));
275 builder.addr()
276 }
277
278 fn map_cmd<SE, TC>(map: Map, control: MapControl) -> ServiceOutput<(), FeaturesControl, SE, TC> {
279 ServiceOutput::FeatureControl(FeaturesControl::DhtKv(dht_kv::Control::MapCmd(map, control)))
280 }
281
282 fn map_event<SC, TC>(map: Map, event: dht_kv::MapEvent) -> ServiceInput<(), FeaturesEvent, SC, TC> {
283 ServiceInput::FeatureEvent(FeaturesEvent::DhtKv(dht_kv::Event::MapEvent(map, event)))
284 }
285
286 fn neighbour_cmd<SE, TC>(control: neighbours::Control) -> ServiceOutput<(), FeaturesControl, SE, TC> {
287 ServiceOutput::FeatureControl(FeaturesControl::Neighbours(control))
288 }
289
290 fn neighbour_event<SC, TC>(event: neighbours::Event) -> ServiceInput<(), FeaturesEvent, SC, TC> {
291 ServiceInput::FeatureEvent(FeaturesEvent::Neighbours(event))
292 }
293
294 #[test]
295 fn should_send_connect() {
296 let addr1 = node_addr(100);
297 let addr2 = node_addr(101);
298
299 let ctx = ServiceCtx { node_id: 100, session: 0 };
300 let mut service = ManualDiscoveryService::<(), (), (), (), ()>::new(addr1.clone(), vec!["local".into()], vec!["connect".into()]);
301 let local_map = Map(hash_str("local"));
302 let connect_map = Map(hash_str("connect"));
303
304 assert_eq!(service.pop_output2(0), Some(map_cmd(local_map, MapControl::Set(Key(0), addr1.to_vec()))));
305 assert_eq!(service.pop_output2(0), Some(map_cmd(connect_map, MapControl::Sub)));
306
307 service.on_input(&ctx, 100, map_event(connect_map, MapEvent::OnSet(Key(1), 2, addr2.to_vec())));
308 assert_eq!(service.pop_output2(100), Some(neighbour_cmd(neighbours::Control::ConnectTo(addr2, false))));
309 }
310
311 #[test]
312 fn should_wait_disconnect_after_remove() {
313 let addr1 = node_addr(100);
314 let addr2 = node_addr(101);
315
316 let ctx = ServiceCtx { node_id: 100, session: 0 };
317 let mut service = ManualDiscoveryService::<(), (), (), (), ()>::new(addr1.clone(), vec!["local".into()], vec!["connect".into()]);
318 let local_map = Map(hash_str("local"));
319 let connect_map = Map(hash_str("connect"));
320
321 assert_eq!(service.pop_output2(0), Some(map_cmd(local_map, MapControl::Set(Key(0), addr1.to_vec()))));
322 assert_eq!(service.pop_output2(0), Some(map_cmd(connect_map, MapControl::Sub)));
323
324 service.on_input(&ctx, 100, map_event(connect_map, MapEvent::OnSet(Key(1), addr2.node_id(), addr2.to_vec())));
326 assert_eq!(service.pop_output2(100), Some(neighbour_cmd(neighbours::Control::ConnectTo(addr2.clone(), false))));
327
328 service.on_input(&ctx, 110, neighbour_event(neighbours::Event::Connected(addr2.node_id(), ConnId::from_out(0, 0))));
330
331 service.on_shared_input(&ctx, 200, ServiceSharedInput::Tick(0));
333 assert_eq!(service.pop_output2(200), None);
334
335 service.on_input(&ctx, 300, map_event(connect_map, MapEvent::OnDel(Key(1), addr2.node_id())));
337 assert_eq!(service.pop_output2(300), None);
338
339 service.on_shared_input(&ctx, 300 + WAIT_DISCONNECT_MS, ServiceSharedInput::Tick(0));
341 assert_eq!(service.pop_output2(300 + WAIT_DISCONNECT_MS), Some(neighbour_cmd(neighbours::Control::DisconnectFrom(addr2.node_id()))));
342 assert_eq!(service.pop_output2(300 + WAIT_DISCONNECT_MS), None);
343 }
344
345 #[test]
346 fn should_reconnect_after_disconnected() {
347 let addr1 = node_addr(100);
348 let addr2 = node_addr(101);
349
350 let ctx = ServiceCtx { node_id: 100, session: 0 };
351 let mut service = ManualDiscoveryService::<(), (), (), (), ()>::new(addr1.clone(), vec!["local".into()], vec!["connect".into()]);
352 let local_map = Map(hash_str("local"));
353 let connect_map = Map(hash_str("connect"));
354
355 assert_eq!(service.pop_output2(0), Some(map_cmd(local_map, MapControl::Set(Key(0), addr1.to_vec()))));
356 assert_eq!(service.pop_output2(0), Some(map_cmd(connect_map, MapControl::Sub)));
357
358 service.on_input(&ctx, 100, map_event(connect_map, MapEvent::OnSet(Key(1), 2, addr2.to_vec())));
359 assert_eq!(service.pop_output2(100), Some(neighbour_cmd(neighbours::Control::ConnectTo(addr2.clone(), false))));
360
361 service.on_shared_input(&ctx, 200, ServiceSharedInput::Tick(0));
362 assert_eq!(service.pop_output2(200), None);
363
364 service.on_input(&ctx, 300, neighbour_event(neighbours::Event::Disconnected(addr2.node_id(), ConnId::from_out(0, 0))));
365
366 service.on_shared_input(&ctx, 300, ServiceSharedInput::Tick(0));
367 assert_eq!(service.pop_output2(300), None);
368
369 service.on_shared_input(&ctx, RETRY_CONNECT_MS, ServiceSharedInput::Tick(0));
370 assert_eq!(service.pop_output2(RETRY_CONNECT_MS), Some(neighbour_cmd(neighbours::Control::ConnectTo(addr2.clone(), false))));
371 assert_eq!(service.pop_output2(RETRY_CONNECT_MS), None);
372 }
373}