1use std::{
10 collections::{HashMap, HashSet, VecDeque},
11 fmt::Debug,
12};
13
14use atm0s_sdn_identity::{ConnId, NodeAddr, NodeId};
15use atm0s_sdn_router::{RouteRule, ServiceBroadcastLevel};
16use sans_io_runtime::collections::DynamicDeque;
17
18use crate::{
19 base::{
20 ConnectionEvent, NetOutgoingMeta, Service, ServiceBuilder, ServiceCtx, ServiceId, ServiceInput, ServiceOutput, ServiceSharedInput, ServiceWorker, ServiceWorkerCtx, ServiceWorkerInput,
21 ServiceWorkerOutput, Ttl,
22 },
23 features::{data::Control as DataControl, neighbours::Control as NeighbourControl, FeaturesControl, FeaturesEvent},
24};
25
26const DATA_PORT: u16 = 2;
27pub const SERVICE_ID: u8 = 2;
28pub const SERVICE_NAME: &str = "manual2_discovery";
29
30fn neighbour_control<UserData, SE, TW>(c: NeighbourControl) -> ServiceOutput<UserData, FeaturesControl, SE, TW> {
31 ServiceOutput::FeatureControl(FeaturesControl::Neighbours(c))
32}
33
34fn data_control<UserData, SE, TW>(c: DataControl) -> ServiceOutput<UserData, FeaturesControl, SE, TW> {
35 ServiceOutput::FeatureControl(FeaturesControl::Data(c))
36}
37
38#[derive(Debug, Clone)]
39pub struct AdvertiseTarget {
40 pub service: ServiceId,
41 pub level: ServiceBroadcastLevel,
42}
43
44impl AdvertiseTarget {
45 pub fn new(service: ServiceId, level: ServiceBroadcastLevel) -> Self {
46 Self { service, level }
47 }
48}
49
50pub struct Manual2DiscoveryService<UserData, SC, SE, TC, TW> {
51 node_addr: NodeAddr,
52 targets: Vec<AdvertiseTarget>,
53 queue: VecDeque<ServiceOutput<UserData, FeaturesControl, SE, TW>>,
54 remote_nodes: HashMap<NodeId, HashSet<ConnId>>,
55 shutdown: bool,
56 broadcast_seq: u16,
57 broadcast_interval: u64,
58 last_broadcast: u64,
59 _tmp: std::marker::PhantomData<(SC, TC, TW)>,
60}
61
62impl<UserData, SC, SE, TC, TW> Manual2DiscoveryService<UserData, SC, SE, TC, TW> {
63 pub fn new(node_addr: NodeAddr, targets: Vec<AdvertiseTarget>, interval: u64) -> Self {
64 Self {
65 node_addr,
66 targets,
67 queue: VecDeque::from_iter([data_control(DataControl::DataListen(DATA_PORT))]),
68 remote_nodes: Default::default(),
69 shutdown: false,
70 broadcast_seq: 0,
71 broadcast_interval: interval,
72 last_broadcast: 0,
73 _tmp: std::marker::PhantomData,
74 }
75 }
76}
77
78impl<UserData, SC, SE, TC: Debug, TW: Debug> Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for Manual2DiscoveryService<UserData, SC, SE, TC, TW> {
79 fn is_service_empty(&self) -> bool {
80 self.shutdown && self.queue.is_empty()
81 }
82
83 fn service_id(&self) -> u8 {
84 SERVICE_ID
85 }
86
87 fn service_name(&self) -> &str {
88 SERVICE_NAME
89 }
90
91 fn on_shared_input<'a>(&mut self, _ctx: &ServiceCtx, now: u64, input: ServiceSharedInput) {
92 match input {
93 ServiceSharedInput::Tick(_) => {
94 if now >= self.last_broadcast + self.broadcast_interval {
95 self.last_broadcast = now;
96 self.broadcast_seq = self.broadcast_seq.wrapping_add(1);
97 for target in &self.targets {
98 log::debug!("[Manual2DiscoveryService] advertise to service {} with level {:?}", target.service, target.level);
99 self.queue.push_back(data_control(DataControl::DataSendRule(
100 DATA_PORT,
101 RouteRule::ToServices(*target.service, target.level, self.broadcast_seq),
102 NetOutgoingMeta::new(true, Ttl::default(), 0, true),
103 self.node_addr.to_vec(),
104 )));
105 }
106 }
107 }
108 ServiceSharedInput::Connection(connection_event) => match connection_event {
109 ConnectionEvent::Connecting(connection_ctx) => {
110 self.remote_nodes.entry(connection_ctx.node).or_default().insert(connection_ctx.conn);
111 }
112 ConnectionEvent::ConnectError(connection_ctx, _neighbours_connect_error) => {
113 let entry = self.remote_nodes.entry(connection_ctx.node).or_default();
114 entry.remove(&connection_ctx.conn);
115 if entry.is_empty() {
116 log::warn!("[Manual2DiscoveryService] Node {} connect failed all connections => remove", connection_ctx.node);
117 self.remote_nodes.remove(&connection_ctx.node);
118 }
119 }
120 ConnectionEvent::Connected(_connection_ctx, _secure_context) => {}
121 ConnectionEvent::Stats(_connection_ctx, _connection_stats) => {}
122 ConnectionEvent::Disconnected(connection_ctx) => {
123 let entry = self.remote_nodes.entry(connection_ctx.node).or_default();
124 entry.remove(&connection_ctx.conn);
125 if entry.is_empty() {
126 self.remote_nodes.remove(&connection_ctx.node);
127 log::info!("[Manual2DiscoveryService] Node {} disconnected all connections => remove", connection_ctx.node);
128 }
129 }
130 },
131 }
132 }
133
134 fn on_input(&mut self, _ctx: &ServiceCtx, _now: u64, input: ServiceInput<UserData, FeaturesEvent, SC, TC>) {
135 match input {
136 ServiceInput::Control(_, _) => {}
137 ServiceInput::FromWorker(_) => {}
138 ServiceInput::FeatureEvent(event) => {
139 if let FeaturesEvent::Data(event) = event {
140 match event {
141 crate::features::data::Event::Pong(_, _) => todo!(),
142 crate::features::data::Event::Recv(port, meta, data) => {
143 if port != DATA_PORT {
145 log::warn!("[Manual2DiscoveryService] recv from other port => ignore");
146 return;
147 }
148 if !meta.secure {
150 log::warn!("[Manual2DiscoveryService] recv from unsecure node => ignore");
151 return;
152 }
153 if let Some(source) = meta.source {
154 if source == self.node_addr.node_id() {
156 return;
157 }
158 if self.remote_nodes.contains_key(&source) {
160 log::debug!("[Manual2DiscoveryService] recv from already connected node {source} => ignore");
161 return;
162 }
163 } else {
164 log::warn!("[Manual2DiscoveryService] recv from anonymous node => ignore");
166 return;
167 }
168 if let Some(addr) = NodeAddr::from_vec(&data) {
169 log::info!("[Manual2DiscoveryService] node {} advertised => try connect {addr:?}", addr.node_id());
170 self.queue.push_back(neighbour_control(NeighbourControl::ConnectTo(addr, false)));
171 } else {
172 log::warn!("[Manual2DiscoveryService] node {:?} advertised invalid address => ignore", meta.source);
173 }
174 }
175 }
176 }
177 }
178 }
179 }
180
181 fn on_shutdown(&mut self, _ctx: &ServiceCtx, _now: u64) {
182 log::info!("[Manual2DiscoveryService] Shutdown");
183 self.shutdown = true;
184 }
185
186 fn pop_output2(&mut self, _now: u64) -> Option<ServiceOutput<UserData, FeaturesControl, SE, TW>> {
187 self.queue.pop_front()
188 }
189}
190
191pub struct Manual2DiscoveryServiceWorker<UserData, SC, SE, TC> {
192 queue: DynamicDeque<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>, 8>,
193 shutdown: bool,
194}
195
196impl<UserData, SC, SE, TC, TW> ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for Manual2DiscoveryServiceWorker<UserData, SC, SE, TC> {
197 fn is_service_empty(&self) -> bool {
198 self.shutdown && self.queue.is_empty()
199 }
200
201 fn service_id(&self) -> u8 {
202 SERVICE_ID
203 }
204
205 fn service_name(&self) -> &str {
206 SERVICE_NAME
207 }
208
209 fn on_tick(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, _tick_count: u64) {}
210
211 fn on_input(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, input: ServiceWorkerInput<UserData, FeaturesEvent, SC, TW>) {
212 match input {
213 ServiceWorkerInput::Control(actor, control) => self.queue.push_back(ServiceWorkerOutput::ForwardControlToController(actor, control)),
214 ServiceWorkerInput::FeatureEvent(event) => self.queue.push_back(ServiceWorkerOutput::ForwardFeatureEventToController(event)),
215 ServiceWorkerInput::FromController(_) => {}
216 }
217 }
218
219 fn on_shutdown(&mut self, _ctx: &ServiceWorkerCtx, _now: u64) {
220 log::info!("[Manual2DiscoveryServiceWorker] Shutdown");
221 self.shutdown = true;
222 }
223
224 fn pop_output2(&mut self, _now: u64) -> Option<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>> {
225 self.queue.pop_front()
226 }
227}
228
229pub struct Manual2DiscoveryServiceBuilder<UserData, SC, SE, TC, TW> {
230 _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
231 node_addr: NodeAddr,
232 targets: Vec<AdvertiseTarget>,
233 interval: u64,
234}
235
236impl<UserData, SC, SE, TC, TW> Manual2DiscoveryServiceBuilder<UserData, SC, SE, TC, TW> {
237 pub fn new(node_addr: NodeAddr, targets: Vec<AdvertiseTarget>, interval: u64) -> Self {
238 Self {
239 _tmp: std::marker::PhantomData,
240 node_addr,
241 targets,
242 interval,
243 }
244 }
245}
246
247impl<UserData, SC, SE, TC, TW> ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for Manual2DiscoveryServiceBuilder<UserData, SC, SE, TC, TW>
248where
249 UserData: 'static + Debug + Send + Sync,
250 SC: 'static + Debug + Send + Sync,
251 SE: 'static + Debug + Send + Sync,
252 TC: 'static + Debug + Send + Sync,
253 TW: 'static + Debug + Send + Sync,
254{
255 fn service_id(&self) -> u8 {
256 SERVICE_ID
257 }
258
259 fn service_name(&self) -> &str {
260 SERVICE_NAME
261 }
262
263 fn create(&self) -> Box<dyn Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
264 Box::new(Manual2DiscoveryService::new(self.node_addr.clone(), self.targets.clone(), self.interval))
265 }
266
267 fn create_worker(&self) -> Box<dyn ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
268 Box::new(Manual2DiscoveryServiceWorker {
269 queue: Default::default(),
270 shutdown: false,
271 })
272 }
273}