atm0s_sdn_network/services/
manual_discovery.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    fmt::Debug,
4};
5
6use atm0s_sdn_identity::{ConnId, NodeAddr, NodeId};
7use atm0s_sdn_utils::hash::hash_str;
8use sans_io_runtime::collections::DynamicDeque;
9
10use crate::{
11    base::{ConnectionEvent, Service, ServiceBuilder, ServiceCtx, ServiceInput, ServiceOutput, ServiceSharedInput, ServiceWorker, ServiceWorkerCtx, ServiceWorkerInput, ServiceWorkerOutput},
12    features::{
13        dht_kv::{Control as KvControl, Event as KvEvent, Key, Map, MapControl, MapEvent},
14        neighbours::Control as NeighbourControl,
15        FeaturesControl, FeaturesEvent,
16    },
17};
18
19const RETRY_CONNECT_MS: u64 = 60_000; //60 seconds
20const WAIT_DISCONNECT_MS: u64 = 60_000; //60 seconds
21
22pub const SERVICE_ID: u8 = 0;
23pub const SERVICE_NAME: &str = "manual_discovery";
24
25fn kv_control<UserData, SE, TW>(c: KvControl) -> ServiceOutput<UserData, FeaturesControl, SE, TW> {
26    ServiceOutput::FeatureControl(FeaturesControl::DhtKv(c))
27}
28
29fn neighbour_control<UserData, SE, TW>(c: NeighbourControl) -> ServiceOutput<UserData, FeaturesControl, SE, TW> {
30    ServiceOutput::FeatureControl(FeaturesControl::Neighbours(c))
31}
32
33pub struct ManualDiscoveryService<UserData, SC, SE, TC, TW> {
34    node_addr: NodeAddr,
35    queue: VecDeque<ServiceOutput<UserData, FeaturesControl, SE, TW>>,
36    nodes: HashMap<NodeId, NodeAddr>,
37    conns: HashMap<NodeId, Vec<ConnId>>,
38    removing_list: HashMap<NodeId, u64>,
39    last_retry_ms: u64,
40    shutdown: bool,
41    _tmp: std::marker::PhantomData<(SC, TC, TW)>,
42}
43
44impl<UserData, SC, SE, TC, TW> ManualDiscoveryService<UserData, SC, SE, TC, TW> {
45    pub fn new(node_addr: NodeAddr, local_tags: Vec<String>, connect_tags: Vec<String>) -> Self {
46        log::info!("Creating ManualDiscoveryService for node {node_addr} with local tags {local_tags:?} and connect tags {connect_tags:?}");
47
48        let mut queue = VecDeque::new();
49
50        for local_tag in local_tags.iter() {
51            let map = Map(hash_str(local_tag));
52            log::info!("Setting local tag: {local_tag} by set key {map}");
53            queue.push_back(kv_control(KvControl::MapCmd(map, MapControl::Set(Key(0), node_addr.to_vec()))));
54        }
55
56        for connect_tag in connect_tags.iter() {
57            let map = Map(hash_str(connect_tag));
58            log::info!("Setting connect tag: {connect_tag} by sub key {map}");
59            queue.push_back(kv_control(KvControl::MapCmd(map, MapControl::Sub)));
60        }
61
62        Self {
63            node_addr,
64            nodes: HashMap::new(),
65            conns: HashMap::new(),
66            queue,
67            removing_list: HashMap::new(),
68            last_retry_ms: 0,
69            shutdown: false,
70            _tmp: std::marker::PhantomData,
71        }
72    }
73
74    fn check_nodes(&mut self, now: u64) {
75        if self.last_retry_ms + RETRY_CONNECT_MS <= now {
76            self.last_retry_ms = now;
77            for (node, addr) in self.nodes.iter() {
78                if !self.conns.contains_key(node) {
79                    log::warn!("ManualDiscoveryService node {node} not connected, retry connect");
80                    self.queue.push_back(neighbour_control(NeighbourControl::ConnectTo(addr.clone(), false)));
81                }
82            }
83        }
84
85        let mut will_disconnect = vec![];
86        for (node, ts) in self.removing_list.iter() {
87            if now >= *ts + WAIT_DISCONNECT_MS && !self.nodes.contains_key(node) {
88                log::info!("ManualDiscoveryService node {node} still in removing_list => send Disconnect");
89                self.queue.push_back(neighbour_control(NeighbourControl::DisconnectFrom(*node)));
90                will_disconnect.push(*node);
91            }
92        }
93
94        for node in will_disconnect {
95            self.removing_list.remove(&node);
96        }
97    }
98}
99
100impl<UserData, SC, SE, TC: Debug, TW: Debug> Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for ManualDiscoveryService<UserData, SC, SE, TC, TW> {
101    fn is_service_empty(&self) -> bool {
102        self.shutdown && self.queue.is_empty()
103    }
104
105    fn service_id(&self) -> u8 {
106        SERVICE_ID
107    }
108
109    fn service_name(&self) -> &str {
110        SERVICE_NAME
111    }
112
113    fn on_shared_input<'a>(&mut self, _ctx: &ServiceCtx, now: u64, input: ServiceSharedInput) {
114        match input {
115            ServiceSharedInput::Tick(_) => self.check_nodes(now),
116            ServiceSharedInput::Connection(ConnectionEvent::Connected(ctx, _)) => {
117                let entry = self.conns.entry(ctx.node).or_default();
118                entry.push(ctx.conn);
119            }
120            ServiceSharedInput::Connection(ConnectionEvent::Disconnected(ctx)) => {
121                let entry = self.conns.entry(ctx.node).or_default();
122                entry.retain(|&conn| conn != ctx.conn);
123
124                if entry.is_empty() {
125                    log::info!("ManualDiscoveryService node {} disconnected all connections => remove", ctx.node);
126                    self.conns.remove(&ctx.node);
127                }
128            }
129            _ => {}
130        }
131    }
132
133    fn on_input(&mut self, _ctx: &ServiceCtx, now: u64, input: ServiceInput<UserData, FeaturesEvent, SC, TC>) {
134        if let ServiceInput::FeatureEvent(FeaturesEvent::DhtKv(KvEvent::MapEvent(map, event))) = input {
135            match event {
136                MapEvent::OnSet(_, source, value) => {
137                    if source == self.node_addr.node_id() {
138                        return;
139                    }
140                    if let Some(addr) = NodeAddr::from_vec(&value) {
141                        log::info!("ManualDiscoveryService node {source} added tag {map} => connect {addr}");
142                        self.nodes.insert(source, addr.clone());
143                        self.queue.push_back(neighbour_control(NeighbourControl::ConnectTo(addr, false)));
144                        self.removing_list.remove(&source);
145                    }
146                }
147                MapEvent::OnDel(_, source) => {
148                    self.nodes.remove(&source);
149                    self.removing_list.entry(source).or_insert_with(|| {
150                        log::info!("ManualDiscoveryService node {source} removed tag {map} => push to removing_list");
151                        now
152                    });
153                }
154                MapEvent::OnRelaySelected(node) => {
155                    log::info!("ManualDiscoveryService relay {node} selected for tag {map}");
156                }
157            }
158        }
159    }
160
161    fn on_shutdown(&mut self, _ctx: &ServiceCtx, _now: u64) {
162        log::info!("[ManualDiscoveryService] Shutdown");
163        self.shutdown = true;
164    }
165
166    fn pop_output2(&mut self, _now: u64) -> Option<ServiceOutput<UserData, FeaturesControl, SE, TW>> {
167        self.queue.pop_front()
168    }
169}
170
171pub struct ManualDiscoveryServiceWorker<UserData, SC, SE, TC> {
172    queue: DynamicDeque<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>, 8>,
173    shutdown: bool,
174}
175
176impl<UserData, SC, SE, TC, TW> ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for ManualDiscoveryServiceWorker<UserData, SC, SE, TC> {
177    fn is_service_empty(&self) -> bool {
178        self.shutdown && self.queue.is_empty()
179    }
180
181    fn service_id(&self) -> u8 {
182        SERVICE_ID
183    }
184
185    fn service_name(&self) -> &str {
186        SERVICE_NAME
187    }
188
189    fn on_tick(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, _tick_count: u64) {}
190
191    fn on_input(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, input: ServiceWorkerInput<UserData, FeaturesEvent, SC, TW>) {
192        match input {
193            ServiceWorkerInput::Control(actor, control) => self.queue.push_back(ServiceWorkerOutput::ForwardControlToController(actor, control)),
194            ServiceWorkerInput::FeatureEvent(event) => self.queue.push_back(ServiceWorkerOutput::ForwardFeatureEventToController(event)),
195            ServiceWorkerInput::FromController(_) => {}
196        }
197    }
198
199    fn on_shutdown(&mut self, _ctx: &ServiceWorkerCtx, _now: u64) {
200        log::info!("[ManualDiscoveryServiceWorker] Shutdown");
201        self.shutdown = true;
202    }
203
204    fn pop_output2(&mut self, _now: u64) -> Option<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>> {
205        self.queue.pop_front()
206    }
207}
208
209pub struct ManualDiscoveryServiceBuilder<UserData, SC, SE, TC, TW> {
210    _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
211    node_addr: NodeAddr,
212    local_tags: Vec<String>,
213    connect_tags: Vec<String>,
214}
215
216impl<UserData, SC, SE, TC, TW> ManualDiscoveryServiceBuilder<UserData, SC, SE, TC, TW> {
217    pub fn new(node_addr: NodeAddr, local_tags: Vec<String>, connect_tags: Vec<String>) -> Self {
218        Self {
219            _tmp: std::marker::PhantomData,
220            node_addr,
221            local_tags,
222            connect_tags,
223        }
224    }
225}
226
227impl<UserData, SC, SE, TC, TW> ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for ManualDiscoveryServiceBuilder<UserData, SC, SE, TC, TW>
228where
229    UserData: 'static + Debug + Send + Sync,
230    SC: 'static + Debug + Send + Sync,
231    SE: 'static + Debug + Send + Sync,
232    TC: 'static + Debug + Send + Sync,
233    TW: 'static + Debug + Send + Sync,
234{
235    fn service_id(&self) -> u8 {
236        SERVICE_ID
237    }
238
239    fn service_name(&self) -> &str {
240        SERVICE_NAME
241    }
242
243    fn create(&self) -> Box<dyn Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
244        Box::new(ManualDiscoveryService::new(self.node_addr.clone(), self.local_tags.clone(), self.connect_tags.clone()))
245    }
246
247    fn create_worker(&self) -> Box<dyn ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
248        Box::new(ManualDiscoveryServiceWorker {
249            queue: Default::default(),
250            shutdown: false,
251        })
252    }
253}
254
255#[cfg(test)]
256mod test {
257    use atm0s_sdn_identity::{ConnId, NodeAddr, NodeAddrBuilder, Protocol};
258    use atm0s_sdn_utils::hash::hash_str;
259
260    use crate::{
261        base::{Service, ServiceCtx, ServiceInput, ServiceOutput, ServiceSharedInput},
262        features::{
263            dht_kv::{self, Key, Map, MapControl, MapEvent},
264            neighbours, FeaturesControl, FeaturesEvent,
265        },
266        services::manual_discovery::{RETRY_CONNECT_MS, WAIT_DISCONNECT_MS},
267    };
268
269    use super::ManualDiscoveryService;
270
271    fn node_addr(node: u32) -> NodeAddr {
272        let mut builder = NodeAddrBuilder::new(node);
273        builder.add_protocol(Protocol::Ip4([127, 0, 0, 1].into()));
274        builder.add_protocol(Protocol::Udp(node as u16));
275        builder.addr()
276    }
277
278    fn map_cmd<SE, TC>(map: Map, control: MapControl) -> ServiceOutput<(), FeaturesControl, SE, TC> {
279        ServiceOutput::FeatureControl(FeaturesControl::DhtKv(dht_kv::Control::MapCmd(map, control)))
280    }
281
282    fn map_event<SC, TC>(map: Map, event: dht_kv::MapEvent) -> ServiceInput<(), FeaturesEvent, SC, TC> {
283        ServiceInput::FeatureEvent(FeaturesEvent::DhtKv(dht_kv::Event::MapEvent(map, event)))
284    }
285
286    fn neighbour_cmd<SE, TC>(control: neighbours::Control) -> ServiceOutput<(), FeaturesControl, SE, TC> {
287        ServiceOutput::FeatureControl(FeaturesControl::Neighbours(control))
288    }
289
290    fn neighbour_event<SC, TC>(event: neighbours::Event) -> ServiceInput<(), FeaturesEvent, SC, TC> {
291        ServiceInput::FeatureEvent(FeaturesEvent::Neighbours(event))
292    }
293
294    #[test]
295    fn should_send_connect() {
296        let addr1 = node_addr(100);
297        let addr2 = node_addr(101);
298
299        let ctx = ServiceCtx { node_id: 100, session: 0 };
300        let mut service = ManualDiscoveryService::<(), (), (), (), ()>::new(addr1.clone(), vec!["local".into()], vec!["connect".into()]);
301        let local_map = Map(hash_str("local"));
302        let connect_map = Map(hash_str("connect"));
303
304        assert_eq!(service.pop_output2(0), Some(map_cmd(local_map, MapControl::Set(Key(0), addr1.to_vec()))));
305        assert_eq!(service.pop_output2(0), Some(map_cmd(connect_map, MapControl::Sub)));
306
307        service.on_input(&ctx, 100, map_event(connect_map, MapEvent::OnSet(Key(1), 2, addr2.to_vec())));
308        assert_eq!(service.pop_output2(100), Some(neighbour_cmd(neighbours::Control::ConnectTo(addr2, false))));
309    }
310
311    #[test]
312    fn should_wait_disconnect_after_remove() {
313        let addr1 = node_addr(100);
314        let addr2 = node_addr(101);
315
316        let ctx = ServiceCtx { node_id: 100, session: 0 };
317        let mut service = ManualDiscoveryService::<(), (), (), (), ()>::new(addr1.clone(), vec!["local".into()], vec!["connect".into()]);
318        let local_map = Map(hash_str("local"));
319        let connect_map = Map(hash_str("connect"));
320
321        assert_eq!(service.pop_output2(0), Some(map_cmd(local_map, MapControl::Set(Key(0), addr1.to_vec()))));
322        assert_eq!(service.pop_output2(0), Some(map_cmd(connect_map, MapControl::Sub)));
323
324        // add node
325        service.on_input(&ctx, 100, map_event(connect_map, MapEvent::OnSet(Key(1), addr2.node_id(), addr2.to_vec())));
326        assert_eq!(service.pop_output2(100), Some(neighbour_cmd(neighbours::Control::ConnectTo(addr2.clone(), false))));
327
328        // fake connected
329        service.on_input(&ctx, 110, neighbour_event(neighbours::Event::Connected(addr2.node_id(), ConnId::from_out(0, 0))));
330
331        // remove node
332        service.on_shared_input(&ctx, 200, ServiceSharedInput::Tick(0));
333        assert_eq!(service.pop_output2(200), None);
334
335        // fake removed key
336        service.on_input(&ctx, 300, map_event(connect_map, MapEvent::OnDel(Key(1), addr2.node_id())));
337        assert_eq!(service.pop_output2(300), None);
338
339        // wait disconnect
340        service.on_shared_input(&ctx, 300 + WAIT_DISCONNECT_MS, ServiceSharedInput::Tick(0));
341        assert_eq!(service.pop_output2(300 + WAIT_DISCONNECT_MS), Some(neighbour_cmd(neighbours::Control::DisconnectFrom(addr2.node_id()))));
342        assert_eq!(service.pop_output2(300 + WAIT_DISCONNECT_MS), None);
343    }
344
345    #[test]
346    fn should_reconnect_after_disconnected() {
347        let addr1 = node_addr(100);
348        let addr2 = node_addr(101);
349
350        let ctx = ServiceCtx { node_id: 100, session: 0 };
351        let mut service = ManualDiscoveryService::<(), (), (), (), ()>::new(addr1.clone(), vec!["local".into()], vec!["connect".into()]);
352        let local_map = Map(hash_str("local"));
353        let connect_map = Map(hash_str("connect"));
354
355        assert_eq!(service.pop_output2(0), Some(map_cmd(local_map, MapControl::Set(Key(0), addr1.to_vec()))));
356        assert_eq!(service.pop_output2(0), Some(map_cmd(connect_map, MapControl::Sub)));
357
358        service.on_input(&ctx, 100, map_event(connect_map, MapEvent::OnSet(Key(1), 2, addr2.to_vec())));
359        assert_eq!(service.pop_output2(100), Some(neighbour_cmd(neighbours::Control::ConnectTo(addr2.clone(), false))));
360
361        service.on_shared_input(&ctx, 200, ServiceSharedInput::Tick(0));
362        assert_eq!(service.pop_output2(200), None);
363
364        service.on_input(&ctx, 300, neighbour_event(neighbours::Event::Disconnected(addr2.node_id(), ConnId::from_out(0, 0))));
365
366        service.on_shared_input(&ctx, 300, ServiceSharedInput::Tick(0));
367        assert_eq!(service.pop_output2(300), None);
368
369        service.on_shared_input(&ctx, RETRY_CONNECT_MS, ServiceSharedInput::Tick(0));
370        assert_eq!(service.pop_output2(RETRY_CONNECT_MS), Some(neighbour_cmd(neighbours::Control::ConnectTo(addr2.clone(), false))));
371        assert_eq!(service.pop_output2(RETRY_CONNECT_MS), None);
372    }
373}