atm0s_sdn_network/services/
manual2_discovery.rs

1//! This service implement a node discovery using service-broadcast feature
2//! Each node will be have a list of broadcast service target and broadcast level.
3//! Example: (Service 1, Global), (Service 2, Inner Zone)
4//!
5//! Each nodes based on configured above list will be try to broadcast it address in order to allow other nodes to connect to it.
6//!
7//! The origin idea of this method is for avoiding network partition. By rely on service broadcast, each node can adverise its address to other nodes even if network is incomplete state.
8
9use std::{
10    collections::{HashMap, HashSet, VecDeque},
11    fmt::Debug,
12};
13
14use atm0s_sdn_identity::{ConnId, NodeAddr, NodeId};
15use atm0s_sdn_router::{RouteRule, ServiceBroadcastLevel};
16use sans_io_runtime::collections::DynamicDeque;
17
18use crate::{
19    base::{
20        ConnectionEvent, NetOutgoingMeta, Service, ServiceBuilder, ServiceCtx, ServiceId, ServiceInput, ServiceOutput, ServiceSharedInput, ServiceWorker, ServiceWorkerCtx, ServiceWorkerInput,
21        ServiceWorkerOutput, Ttl,
22    },
23    features::{data::Control as DataControl, neighbours::Control as NeighbourControl, FeaturesControl, FeaturesEvent},
24};
25
26const DATA_PORT: u16 = 2;
27pub const SERVICE_ID: u8 = 2;
28pub const SERVICE_NAME: &str = "manual2_discovery";
29
30fn neighbour_control<UserData, SE, TW>(c: NeighbourControl) -> ServiceOutput<UserData, FeaturesControl, SE, TW> {
31    ServiceOutput::FeatureControl(FeaturesControl::Neighbours(c))
32}
33
34fn data_control<UserData, SE, TW>(c: DataControl) -> ServiceOutput<UserData, FeaturesControl, SE, TW> {
35    ServiceOutput::FeatureControl(FeaturesControl::Data(c))
36}
37
38#[derive(Debug, Clone)]
39pub struct AdvertiseTarget {
40    pub service: ServiceId,
41    pub level: ServiceBroadcastLevel,
42}
43
44impl AdvertiseTarget {
45    pub fn new(service: ServiceId, level: ServiceBroadcastLevel) -> Self {
46        Self { service, level }
47    }
48}
49
50pub struct Manual2DiscoveryService<UserData, SC, SE, TC, TW> {
51    node_addr: NodeAddr,
52    targets: Vec<AdvertiseTarget>,
53    queue: VecDeque<ServiceOutput<UserData, FeaturesControl, SE, TW>>,
54    remote_nodes: HashMap<NodeId, HashSet<ConnId>>,
55    shutdown: bool,
56    broadcast_seq: u16,
57    broadcast_interval: u64,
58    last_broadcast: u64,
59    _tmp: std::marker::PhantomData<(SC, TC, TW)>,
60}
61
62impl<UserData, SC, SE, TC, TW> Manual2DiscoveryService<UserData, SC, SE, TC, TW> {
63    pub fn new(node_addr: NodeAddr, targets: Vec<AdvertiseTarget>, interval: u64) -> Self {
64        Self {
65            node_addr,
66            targets,
67            queue: VecDeque::from_iter([data_control(DataControl::DataListen(DATA_PORT))]),
68            remote_nodes: Default::default(),
69            shutdown: false,
70            broadcast_seq: 0,
71            broadcast_interval: interval,
72            last_broadcast: 0,
73            _tmp: std::marker::PhantomData,
74        }
75    }
76}
77
78impl<UserData, SC, SE, TC: Debug, TW: Debug> Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for Manual2DiscoveryService<UserData, SC, SE, TC, TW> {
79    fn is_service_empty(&self) -> bool {
80        self.shutdown && self.queue.is_empty()
81    }
82
83    fn service_id(&self) -> u8 {
84        SERVICE_ID
85    }
86
87    fn service_name(&self) -> &str {
88        SERVICE_NAME
89    }
90
91    fn on_shared_input<'a>(&mut self, _ctx: &ServiceCtx, now: u64, input: ServiceSharedInput) {
92        match input {
93            ServiceSharedInput::Tick(_) => {
94                if now >= self.last_broadcast + self.broadcast_interval {
95                    self.last_broadcast = now;
96                    self.broadcast_seq = self.broadcast_seq.wrapping_add(1);
97                    for target in &self.targets {
98                        log::debug!("[Manual2DiscoveryService] advertise to service {} with level {:?}", target.service, target.level);
99                        self.queue.push_back(data_control(DataControl::DataSendRule(
100                            DATA_PORT,
101                            RouteRule::ToServices(*target.service, target.level, self.broadcast_seq),
102                            NetOutgoingMeta::new(true, Ttl::default(), 0, true),
103                            self.node_addr.to_vec(),
104                        )));
105                    }
106                }
107            }
108            ServiceSharedInput::Connection(connection_event) => match connection_event {
109                ConnectionEvent::Connecting(connection_ctx) => {
110                    self.remote_nodes.entry(connection_ctx.node).or_default().insert(connection_ctx.conn);
111                }
112                ConnectionEvent::ConnectError(connection_ctx, _neighbours_connect_error) => {
113                    let entry = self.remote_nodes.entry(connection_ctx.node).or_default();
114                    entry.remove(&connection_ctx.conn);
115                    if entry.is_empty() {
116                        log::warn!("[Manual2DiscoveryService] Node {} connect failed all connections => remove", connection_ctx.node);
117                        self.remote_nodes.remove(&connection_ctx.node);
118                    }
119                }
120                ConnectionEvent::Connected(_connection_ctx, _secure_context) => {}
121                ConnectionEvent::Stats(_connection_ctx, _connection_stats) => {}
122                ConnectionEvent::Disconnected(connection_ctx) => {
123                    let entry = self.remote_nodes.entry(connection_ctx.node).or_default();
124                    entry.remove(&connection_ctx.conn);
125                    if entry.is_empty() {
126                        self.remote_nodes.remove(&connection_ctx.node);
127                        log::info!("[Manual2DiscoveryService] Node {} disconnected all connections => remove", connection_ctx.node);
128                    }
129                }
130            },
131        }
132    }
133
134    fn on_input(&mut self, _ctx: &ServiceCtx, _now: u64, input: ServiceInput<UserData, FeaturesEvent, SC, TC>) {
135        match input {
136            ServiceInput::Control(_, _) => {}
137            ServiceInput::FromWorker(_) => {}
138            ServiceInput::FeatureEvent(event) => {
139                if let FeaturesEvent::Data(event) = event {
140                    match event {
141                        crate::features::data::Event::Pong(_, _) => todo!(),
142                        crate::features::data::Event::Recv(port, meta, data) => {
143                            // ignore other port
144                            if port != DATA_PORT {
145                                log::warn!("[Manual2DiscoveryService] recv from other port => ignore");
146                                return;
147                            }
148                            // ignore unsecure
149                            if !meta.secure {
150                                log::warn!("[Manual2DiscoveryService] recv from unsecure node => ignore");
151                                return;
152                            }
153                            if let Some(source) = meta.source {
154                                // ignore self
155                                if source == self.node_addr.node_id() {
156                                    return;
157                                }
158                                // ignore already connected
159                                if self.remote_nodes.contains_key(&source) {
160                                    log::debug!("[Manual2DiscoveryService] recv from already connected node {source} => ignore");
161                                    return;
162                                }
163                            } else {
164                                // ignore anonymous
165                                log::warn!("[Manual2DiscoveryService] recv from anonymous node => ignore");
166                                return;
167                            }
168                            if let Some(addr) = NodeAddr::from_vec(&data) {
169                                log::info!("[Manual2DiscoveryService] node {} advertised => try connect {addr:?}", addr.node_id());
170                                self.queue.push_back(neighbour_control(NeighbourControl::ConnectTo(addr, false)));
171                            } else {
172                                log::warn!("[Manual2DiscoveryService] node {:?} advertised invalid address => ignore", meta.source);
173                            }
174                        }
175                    }
176                }
177            }
178        }
179    }
180
181    fn on_shutdown(&mut self, _ctx: &ServiceCtx, _now: u64) {
182        log::info!("[Manual2DiscoveryService] Shutdown");
183        self.shutdown = true;
184    }
185
186    fn pop_output2(&mut self, _now: u64) -> Option<ServiceOutput<UserData, FeaturesControl, SE, TW>> {
187        self.queue.pop_front()
188    }
189}
190
191pub struct Manual2DiscoveryServiceWorker<UserData, SC, SE, TC> {
192    queue: DynamicDeque<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>, 8>,
193    shutdown: bool,
194}
195
196impl<UserData, SC, SE, TC, TW> ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for Manual2DiscoveryServiceWorker<UserData, SC, SE, TC> {
197    fn is_service_empty(&self) -> bool {
198        self.shutdown && self.queue.is_empty()
199    }
200
201    fn service_id(&self) -> u8 {
202        SERVICE_ID
203    }
204
205    fn service_name(&self) -> &str {
206        SERVICE_NAME
207    }
208
209    fn on_tick(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, _tick_count: u64) {}
210
211    fn on_input(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, input: ServiceWorkerInput<UserData, FeaturesEvent, SC, TW>) {
212        match input {
213            ServiceWorkerInput::Control(actor, control) => self.queue.push_back(ServiceWorkerOutput::ForwardControlToController(actor, control)),
214            ServiceWorkerInput::FeatureEvent(event) => self.queue.push_back(ServiceWorkerOutput::ForwardFeatureEventToController(event)),
215            ServiceWorkerInput::FromController(_) => {}
216        }
217    }
218
219    fn on_shutdown(&mut self, _ctx: &ServiceWorkerCtx, _now: u64) {
220        log::info!("[Manual2DiscoveryServiceWorker] Shutdown");
221        self.shutdown = true;
222    }
223
224    fn pop_output2(&mut self, _now: u64) -> Option<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>> {
225        self.queue.pop_front()
226    }
227}
228
229pub struct Manual2DiscoveryServiceBuilder<UserData, SC, SE, TC, TW> {
230    _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
231    node_addr: NodeAddr,
232    targets: Vec<AdvertiseTarget>,
233    interval: u64,
234}
235
236impl<UserData, SC, SE, TC, TW> Manual2DiscoveryServiceBuilder<UserData, SC, SE, TC, TW> {
237    pub fn new(node_addr: NodeAddr, targets: Vec<AdvertiseTarget>, interval: u64) -> Self {
238        Self {
239            _tmp: std::marker::PhantomData,
240            node_addr,
241            targets,
242            interval,
243        }
244    }
245}
246
247impl<UserData, SC, SE, TC, TW> ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for Manual2DiscoveryServiceBuilder<UserData, SC, SE, TC, TW>
248where
249    UserData: 'static + Debug + Send + Sync,
250    SC: 'static + Debug + Send + Sync,
251    SE: 'static + Debug + Send + Sync,
252    TC: 'static + Debug + Send + Sync,
253    TW: 'static + Debug + Send + Sync,
254{
255    fn service_id(&self) -> u8 {
256        SERVICE_ID
257    }
258
259    fn service_name(&self) -> &str {
260        SERVICE_NAME
261    }
262
263    fn create(&self) -> Box<dyn Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
264        Box::new(Manual2DiscoveryService::new(self.node_addr.clone(), self.targets.clone(), self.interval))
265    }
266
267    fn create_worker(&self) -> Box<dyn ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
268        Box::new(Manual2DiscoveryServiceWorker {
269            queue: Default::default(),
270            shutdown: false,
271        })
272    }
273}