atm0s_sdn_network/features/
neighbours.rs1use std::{
2 collections::{HashMap, HashSet, VecDeque},
3 fmt::Debug,
4 hash::Hash,
5};
6
7use atm0s_sdn_identity::{ConnId, NodeAddr, NodeId};
8use derivative::Derivative;
9use sans_io_runtime::{collections::DynamicDeque, TaskSwitcherChild};
10
11use crate::base::{ConnectionEvent, Feature, FeatureContext, FeatureControlActor, FeatureInput, FeatureOutput, FeatureSharedInput, FeatureWorker, FeatureWorkerInput, FeatureWorkerOutput};
12
13pub const FEATURE_ID: u8 = 0;
14pub const FEATURE_NAME: &str = "neighbours_api";
15
16pub const REQUEST_SEEDS_TIMEOUT_MS: u64 = 30000;
18
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum Control {
21 Sub,
22 UnSub,
23 ConnectTo(NodeAddr, bool),
24 DisconnectFrom(NodeId),
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum Event {
29 Connected(NodeId, ConnId),
30 Disconnected(NodeId, ConnId),
31 SeedAddressNeeded,
32}
33
34#[derive(Debug, Clone)]
35pub struct ToWorker;
36
37#[derive(Debug, Clone)]
38pub struct ToController;
39
40pub type Output<UserData> = FeatureOutput<UserData, Event, ToWorker>;
41pub type WorkerOutput<UserData> = FeatureWorkerOutput<UserData, Control, Event, ToController>;
42
43#[derive(Debug, Derivative)]
44#[derivative(Default(bound = ""))]
45pub struct NeighboursFeature<UserData> {
46 subs: Vec<FeatureControlActor<UserData>>,
47 output: VecDeque<Output<UserData>>,
48 seeds: HashSet<NodeId>,
49 nodes: HashMap<NodeId, HashSet<ConnId>>,
50 shutdown: bool,
51 last_requested_seeds: Option<u64>,
52}
53
54impl<UserData: Debug + Copy + Hash + Eq> NeighboursFeature<UserData> {
55 fn check_need_more_seeds(&mut self, now: u64) {
56 if self.shutdown {
57 return;
58 }
59
60 if let Some(last) = self.last_requested_seeds {
61 if now < last + REQUEST_SEEDS_TIMEOUT_MS {
62 return;
63 }
64 }
65
66 if self.seeds.is_empty() {
67 return;
68 }
69 for node in self.seeds.iter() {
70 if self.nodes.contains_key(node) {
71 return;
72 }
73 }
74
75 log::info!("[Neighbours] All seeds {:?} disconnected or connect error => need update seeds", self.seeds);
76 self.last_requested_seeds = Some(now);
77 for sub in self.subs.iter() {
78 self.output.push_back(FeatureOutput::Event(*sub, Event::SeedAddressNeeded));
79 }
80 }
81}
82
83impl<UserData: Debug + Copy + Hash + Eq> Feature<UserData, Control, Event, ToController, ToWorker> for NeighboursFeature<UserData> {
84 fn on_shared_input(&mut self, _ctx: &FeatureContext, now: u64, input: FeatureSharedInput) {
85 match input {
86 FeatureSharedInput::Tick(_) => {
87 self.check_need_more_seeds(now);
88 }
89 FeatureSharedInput::Connection(ConnectionEvent::Connecting(ctx)) => {
90 log::info!("[Neighbours] Node {} connection {} connecting", ctx.node, ctx.pair);
91 self.nodes.entry(ctx.node).or_default().insert(ctx.conn);
92 }
93 FeatureSharedInput::Connection(ConnectionEvent::ConnectError(ctx, _)) => {
94 log::info!("[Neighbours] Node {} connection {} connect error", ctx.node, ctx.pair);
95 let entry = self.nodes.entry(ctx.node).or_default();
96 entry.remove(&ctx.conn);
97 if entry.is_empty() {
98 log::info!("[Neighbours] Node {} connect error all connections => remove", ctx.node);
99 self.nodes.remove(&ctx.node);
100 }
101
102 self.check_need_more_seeds(now);
103 }
104 FeatureSharedInput::Connection(ConnectionEvent::Connected(ctx, _)) => {
105 log::info!("[Neighbours] Node {} connection {} connected", ctx.node, ctx.pair);
106 for sub in self.subs.iter() {
107 self.output.push_back(FeatureOutput::Event(*sub, Event::Connected(ctx.node, ctx.conn)));
108 }
109 }
110 FeatureSharedInput::Connection(ConnectionEvent::Disconnected(ctx)) => {
111 log::info!("[Neighbours] Node {} connection {} disconnected", ctx.node, ctx.pair);
112 let entry = self.nodes.entry(ctx.node).or_default();
113 entry.remove(&ctx.conn);
114 if entry.is_empty() {
115 log::info!("[Neighbours] Node {} disconnected all connections => remove", ctx.node);
116 self.nodes.remove(&ctx.node);
117 }
118 for sub in self.subs.iter() {
119 self.output.push_back(FeatureOutput::Event(*sub, Event::Disconnected(ctx.node, ctx.conn)));
120 }
121
122 self.check_need_more_seeds(now);
123 }
124 _ => {}
125 }
126 }
127
128 fn on_input(&mut self, _ctx: &FeatureContext, _now_ms: u64, input: FeatureInput<'_, UserData, Control, ToController>) {
129 if let FeatureInput::Control(actor, control) = input {
130 match control {
131 Control::Sub => {
132 if !self.subs.contains(&actor) {
133 log::info!("[Neighbours] Sub to neighbours from {:?}", actor);
134 self.subs.push(actor);
135 }
136 }
137 Control::UnSub => {
138 if let Some(pos) = self.subs.iter().position(|x| *x == actor) {
139 log::info!("[Neighbours] UnSub to neighbours from {:?}", actor);
140 self.subs.swap_remove(pos);
141 }
142 }
143 Control::ConnectTo(addr, is_seed) => {
144 if is_seed {
145 self.seeds.insert(addr.node_id());
146 }
147 self.output.push_back(FeatureOutput::NeighboursConnectTo(addr));
148 }
149 Control::DisconnectFrom(node) => {
150 self.output.push_back(FeatureOutput::NeighboursDisconnectFrom(node));
151 }
152 }
153 }
154 }
155
156 fn on_shutdown(&mut self, _ctx: &FeatureContext, _now: u64) {
157 log::info!("[NeighboursFeature] Shutdown");
158 self.shutdown = true;
159 }
160}
161
162impl<UserData> TaskSwitcherChild<Output<UserData>> for NeighboursFeature<UserData> {
163 type Time = u64;
164
165 fn is_empty(&self) -> bool {
166 self.shutdown && self.output.is_empty()
167 }
168
169 fn empty_event(&self) -> Output<UserData> {
170 Output::OnResourceEmpty
171 }
172
173 fn pop_output(&mut self, _now: u64) -> Option<Output<UserData>> {
174 self.output.pop_front()
175 }
176}
177
178#[derive(Derivative)]
179#[derivative(Default(bound = ""))]
180pub struct NeighboursFeatureWorker<UserData> {
181 queue: DynamicDeque<WorkerOutput<UserData>, 1>,
182 shutdown: bool,
183}
184
185impl<UserData> FeatureWorker<UserData, Control, Event, ToController, ToWorker> for NeighboursFeatureWorker<UserData> {
186 fn on_input(&mut self, _ctx: &mut crate::base::FeatureWorkerContext, _now: u64, input: crate::base::FeatureWorkerInput<UserData, Control, ToWorker>) {
187 match input {
188 FeatureWorkerInput::Control(actor, control) => self.queue.push_back(FeatureWorkerOutput::ForwardControlToController(actor, control)),
189 FeatureWorkerInput::Network(conn, header, buf) => self.queue.push_back(FeatureWorkerOutput::ForwardNetworkToController(conn, header, buf)),
190 #[cfg(feature = "vpn")]
191 FeatureWorkerInput::TunPkt(..) => {}
192 FeatureWorkerInput::FromController(..) => {
193 log::warn!("No handler for FromController");
194 }
195 FeatureWorkerInput::Local(header, buf) => self.queue.push_back(FeatureWorkerOutput::ForwardLocalToController(header, buf)),
196 }
197 }
198
199 fn on_shutdown(&mut self, _ctx: &mut crate::base::FeatureWorkerContext, _now: u64) {
200 self.shutdown = true;
201 }
202}
203
204impl<UserData> TaskSwitcherChild<WorkerOutput<UserData>> for NeighboursFeatureWorker<UserData> {
205 type Time = u64;
206
207 fn is_empty(&self) -> bool {
208 self.shutdown && self.queue.is_empty()
209 }
210
211 fn empty_event(&self) -> WorkerOutput<UserData> {
212 WorkerOutput::OnResourceEmpty
213 }
214
215 fn pop_output(&mut self, _now: u64) -> Option<WorkerOutput<UserData>> {
216 self.queue.pop_front()
217 }
218}