atm0s_sdn_network/features/
neighbours.rs

1use std::{
2    collections::{HashMap, HashSet, VecDeque},
3    fmt::Debug,
4    hash::Hash,
5};
6
7use atm0s_sdn_identity::{ConnId, NodeAddr, NodeId};
8use derivative::Derivative;
9use sans_io_runtime::{collections::DynamicDeque, TaskSwitcherChild};
10
11use crate::base::{ConnectionEvent, Feature, FeatureContext, FeatureControlActor, FeatureInput, FeatureOutput, FeatureSharedInput, FeatureWorker, FeatureWorkerInput, FeatureWorkerOutput};
12
13pub const FEATURE_ID: u8 = 0;
14pub const FEATURE_NAME: &str = "neighbours_api";
15
16// for avoid spamming seeds, we only request seeds if needed each 30 seconds
17pub const REQUEST_SEEDS_TIMEOUT_MS: u64 = 30000;
18
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum Control {
21    Sub,
22    UnSub,
23    ConnectTo(NodeAddr, bool),
24    DisconnectFrom(NodeId),
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum Event {
29    Connected(NodeId, ConnId),
30    Disconnected(NodeId, ConnId),
31    SeedAddressNeeded,
32}
33
34#[derive(Debug, Clone)]
35pub struct ToWorker;
36
37#[derive(Debug, Clone)]
38pub struct ToController;
39
40pub type Output<UserData> = FeatureOutput<UserData, Event, ToWorker>;
41pub type WorkerOutput<UserData> = FeatureWorkerOutput<UserData, Control, Event, ToController>;
42
43#[derive(Debug, Derivative)]
44#[derivative(Default(bound = ""))]
45pub struct NeighboursFeature<UserData> {
46    subs: Vec<FeatureControlActor<UserData>>,
47    output: VecDeque<Output<UserData>>,
48    seeds: HashSet<NodeId>,
49    nodes: HashMap<NodeId, HashSet<ConnId>>,
50    shutdown: bool,
51    last_requested_seeds: Option<u64>,
52}
53
54impl<UserData: Debug + Copy + Hash + Eq> NeighboursFeature<UserData> {
55    fn check_need_more_seeds(&mut self, now: u64) {
56        if self.shutdown {
57            return;
58        }
59
60        if let Some(last) = self.last_requested_seeds {
61            if now < last + REQUEST_SEEDS_TIMEOUT_MS {
62                return;
63            }
64        }
65
66        if self.seeds.is_empty() {
67            return;
68        }
69        for node in self.seeds.iter() {
70            if self.nodes.contains_key(node) {
71                return;
72            }
73        }
74
75        log::info!("[Neighbours] All seeds {:?} disconnected or connect error => need update seeds", self.seeds);
76        self.last_requested_seeds = Some(now);
77        for sub in self.subs.iter() {
78            self.output.push_back(FeatureOutput::Event(*sub, Event::SeedAddressNeeded));
79        }
80    }
81}
82
83impl<UserData: Debug + Copy + Hash + Eq> Feature<UserData, Control, Event, ToController, ToWorker> for NeighboursFeature<UserData> {
84    fn on_shared_input(&mut self, _ctx: &FeatureContext, now: u64, input: FeatureSharedInput) {
85        match input {
86            FeatureSharedInput::Tick(_) => {
87                self.check_need_more_seeds(now);
88            }
89            FeatureSharedInput::Connection(ConnectionEvent::Connecting(ctx)) => {
90                log::info!("[Neighbours] Node {} connection {} connecting", ctx.node, ctx.pair);
91                self.nodes.entry(ctx.node).or_default().insert(ctx.conn);
92            }
93            FeatureSharedInput::Connection(ConnectionEvent::ConnectError(ctx, _)) => {
94                log::info!("[Neighbours] Node {} connection {} connect error", ctx.node, ctx.pair);
95                let entry = self.nodes.entry(ctx.node).or_default();
96                entry.remove(&ctx.conn);
97                if entry.is_empty() {
98                    log::info!("[Neighbours] Node {} connect error all connections => remove", ctx.node);
99                    self.nodes.remove(&ctx.node);
100                }
101
102                self.check_need_more_seeds(now);
103            }
104            FeatureSharedInput::Connection(ConnectionEvent::Connected(ctx, _)) => {
105                log::info!("[Neighbours] Node {} connection {} connected", ctx.node, ctx.pair);
106                for sub in self.subs.iter() {
107                    self.output.push_back(FeatureOutput::Event(*sub, Event::Connected(ctx.node, ctx.conn)));
108                }
109            }
110            FeatureSharedInput::Connection(ConnectionEvent::Disconnected(ctx)) => {
111                log::info!("[Neighbours] Node {} connection {} disconnected", ctx.node, ctx.pair);
112                let entry = self.nodes.entry(ctx.node).or_default();
113                entry.remove(&ctx.conn);
114                if entry.is_empty() {
115                    log::info!("[Neighbours] Node {} disconnected all connections => remove", ctx.node);
116                    self.nodes.remove(&ctx.node);
117                }
118                for sub in self.subs.iter() {
119                    self.output.push_back(FeatureOutput::Event(*sub, Event::Disconnected(ctx.node, ctx.conn)));
120                }
121
122                self.check_need_more_seeds(now);
123            }
124            _ => {}
125        }
126    }
127
128    fn on_input(&mut self, _ctx: &FeatureContext, _now_ms: u64, input: FeatureInput<'_, UserData, Control, ToController>) {
129        if let FeatureInput::Control(actor, control) = input {
130            match control {
131                Control::Sub => {
132                    if !self.subs.contains(&actor) {
133                        log::info!("[Neighbours] Sub to neighbours from {:?}", actor);
134                        self.subs.push(actor);
135                    }
136                }
137                Control::UnSub => {
138                    if let Some(pos) = self.subs.iter().position(|x| *x == actor) {
139                        log::info!("[Neighbours] UnSub to neighbours from {:?}", actor);
140                        self.subs.swap_remove(pos);
141                    }
142                }
143                Control::ConnectTo(addr, is_seed) => {
144                    if is_seed {
145                        self.seeds.insert(addr.node_id());
146                    }
147                    self.output.push_back(FeatureOutput::NeighboursConnectTo(addr));
148                }
149                Control::DisconnectFrom(node) => {
150                    self.output.push_back(FeatureOutput::NeighboursDisconnectFrom(node));
151                }
152            }
153        }
154    }
155
156    fn on_shutdown(&mut self, _ctx: &FeatureContext, _now: u64) {
157        log::info!("[NeighboursFeature] Shutdown");
158        self.shutdown = true;
159    }
160}
161
162impl<UserData> TaskSwitcherChild<Output<UserData>> for NeighboursFeature<UserData> {
163    type Time = u64;
164
165    fn is_empty(&self) -> bool {
166        self.shutdown && self.output.is_empty()
167    }
168
169    fn empty_event(&self) -> Output<UserData> {
170        Output::OnResourceEmpty
171    }
172
173    fn pop_output(&mut self, _now: u64) -> Option<Output<UserData>> {
174        self.output.pop_front()
175    }
176}
177
178#[derive(Derivative)]
179#[derivative(Default(bound = ""))]
180pub struct NeighboursFeatureWorker<UserData> {
181    queue: DynamicDeque<WorkerOutput<UserData>, 1>,
182    shutdown: bool,
183}
184
185impl<UserData> FeatureWorker<UserData, Control, Event, ToController, ToWorker> for NeighboursFeatureWorker<UserData> {
186    fn on_input(&mut self, _ctx: &mut crate::base::FeatureWorkerContext, _now: u64, input: crate::base::FeatureWorkerInput<UserData, Control, ToWorker>) {
187        match input {
188            FeatureWorkerInput::Control(actor, control) => self.queue.push_back(FeatureWorkerOutput::ForwardControlToController(actor, control)),
189            FeatureWorkerInput::Network(conn, header, buf) => self.queue.push_back(FeatureWorkerOutput::ForwardNetworkToController(conn, header, buf)),
190            #[cfg(feature = "vpn")]
191            FeatureWorkerInput::TunPkt(..) => {}
192            FeatureWorkerInput::FromController(..) => {
193                log::warn!("No handler for FromController");
194            }
195            FeatureWorkerInput::Local(header, buf) => self.queue.push_back(FeatureWorkerOutput::ForwardLocalToController(header, buf)),
196        }
197    }
198
199    fn on_shutdown(&mut self, _ctx: &mut crate::base::FeatureWorkerContext, _now: u64) {
200        self.shutdown = true;
201    }
202}
203
204impl<UserData> TaskSwitcherChild<WorkerOutput<UserData>> for NeighboursFeatureWorker<UserData> {
205    type Time = u64;
206
207    fn is_empty(&self) -> bool {
208        self.shutdown && self.queue.is_empty()
209    }
210
211    fn empty_event(&self) -> WorkerOutput<UserData> {
212        WorkerOutput::OnResourceEmpty
213    }
214
215    fn pop_output(&mut self, _now: u64) -> Option<WorkerOutput<UserData>> {
216        self.queue.pop_front()
217    }
218}