atm0s_sdn_network/
controller_plane.rs

1use std::{collections::VecDeque, fmt::Debug, hash::Hash, net::SocketAddr, sync::Arc};
2
3use atm0s_sdn_identity::NodeId;
4use atm0s_sdn_router::shadow::ShadowRouterHistory;
5use rand::RngCore;
6use sans_io_runtime::{return_if_none, return_if_some, TaskSwitcher, TaskSwitcherBranch, TaskSwitcherChild};
7
8use crate::{
9    base::{
10        Authorization, ConnectionEvent, FeatureContext, FeatureControlActor, FeatureInput, FeatureOutput, FeatureSharedInput, HandshakeBuilder, ServiceBuilder, ServiceControlActor, ServiceCtx,
11        ServiceInput, ServiceOutput, ServiceSharedInput,
12    },
13    features::{FeaturesControl, FeaturesEvent},
14    ExtIn, ExtOut, LogicControl, LogicEvent,
15};
16
17use self::{features::FeatureManager, neighbours::NeighboursManager, services::ServiceManager};
18
19mod features;
20mod neighbours;
21mod services;
22
23#[derive(Debug, Clone, convert_enum::From)]
24pub enum Input<UserData, SC, SE, TC> {
25    Ext(ExtIn<UserData, SC>),
26    Control(LogicControl<UserData, SC, SE, TC>),
27}
28
29#[derive(Debug, Clone, convert_enum::From)]
30pub enum Output<UserData, SE, TW> {
31    Ext(ExtOut<UserData, SE>),
32    Event(LogicEvent<UserData, SE, TW>),
33    #[convert_enum(optout)]
34    OnResourceEmpty,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive, num_enum::IntoPrimitive)]
38#[repr(usize)]
39enum TaskType {
40    Neighbours = 0,
41    Feature = 1,
42    Service = 2,
43}
44
45pub struct ControllerPlaneCfg<UserData, SC, SE, TC, TW> {
46    pub session: u64,
47    pub bind_addrs: Vec<SocketAddr>,
48    #[allow(clippy::type_complexity)]
49    pub services: Vec<Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>>,
50    pub authorization: Arc<dyn Authorization>,
51    pub handshake_builder: Arc<dyn HandshakeBuilder>,
52    pub random: Box<dyn RngCore + Send + Sync>,
53    pub history: Arc<dyn ShadowRouterHistory>,
54}
55
56pub struct ControllerPlane<UserData, SC, SE, TC, TW> {
57    tick_count: u64,
58    feature_ctx: FeatureContext,
59    service_ctx: ServiceCtx,
60    neighbours: TaskSwitcherBranch<NeighboursManager, neighbours::Output>,
61    features: TaskSwitcherBranch<FeatureManager<UserData>, features::Output<UserData>>,
62    #[allow(clippy::type_complexity)]
63    services: TaskSwitcherBranch<ServiceManager<UserData, SC, SE, TC, TW>, services::Output<UserData, SE, TW>>,
64    switcher: TaskSwitcher,
65    queue: VecDeque<Output<UserData, SE, TW>>,
66    shutdown: bool,
67    history: Arc<dyn ShadowRouterHistory>,
68}
69
70impl<UserData, SC, SE, TC, TW> ControllerPlane<UserData, SC, SE, TC, TW>
71where
72    UserData: 'static + Hash + Copy + Eq + Debug,
73{
74    /// Create a new ControllerPlane
75    ///
76    /// # Arguments
77    ///
78    /// * `node_id` - The node id of the current node
79    /// * `session` - The session id of the current node, it can be a random number
80    ///
81    /// # Returns
82    ///
83    /// A new ControllerPlane
84    pub fn new(node_id: NodeId, cfg: ControllerPlaneCfg<UserData, SC, SE, TC, TW>) -> Self {
85        log::info!("Create ControllerPlane for node: {}, running session {}", node_id, cfg.session);
86        let service_ids = cfg.services.iter().filter(|s| s.discoverable()).map(|s| s.service_id()).collect();
87
88        Self {
89            tick_count: 0,
90            feature_ctx: FeatureContext { node_id, session: cfg.session },
91            service_ctx: ServiceCtx { node_id, session: cfg.session },
92            neighbours: TaskSwitcherBranch::new(
93                NeighboursManager::new(node_id, cfg.bind_addrs, cfg.authorization, cfg.handshake_builder, cfg.random),
94                TaskType::Neighbours,
95            ),
96            features: TaskSwitcherBranch::new(FeatureManager::new(node_id, cfg.session, service_ids), TaskType::Feature),
97            services: TaskSwitcherBranch::new(ServiceManager::new(cfg.services), TaskType::Service),
98            switcher: TaskSwitcher::new(3), //3 types: Neighbours, Feature, Service
99            queue: VecDeque::new(),
100            shutdown: false,
101            history: cfg.history,
102        }
103    }
104
105    pub fn on_tick(&mut self, now_ms: u64) {
106        log::trace!("[ControllerPlane] on_tick: {}", now_ms);
107        self.neighbours.input(&mut self.switcher).on_tick(now_ms, self.tick_count);
108        self.features
109            .input(&mut self.switcher)
110            .on_shared_input(&self.feature_ctx, now_ms, FeatureSharedInput::Tick(self.tick_count));
111        self.services
112            .input(&mut self.switcher)
113            .on_shared_input(&self.service_ctx, now_ms, ServiceSharedInput::Tick(self.tick_count));
114        self.tick_count += 1;
115        self.history.set_ts(now_ms);
116    }
117
118    pub fn on_event(&mut self, now_ms: u64, event: Input<UserData, SC, SE, TC>) {
119        match event {
120            Input::Ext(ExtIn::FeaturesControl(userdata, control)) => {
121                self.features.input(&mut self.switcher).on_input(
122                    &self.feature_ctx,
123                    now_ms,
124                    control.to_feature(),
125                    FeatureInput::Control(FeatureControlActor::Controller(userdata), control),
126                );
127            }
128            Input::Ext(ExtIn::ServicesControl(service, userdata, control)) => {
129                self.services
130                    .input(&mut self.switcher)
131                    .on_input(&self.service_ctx, now_ms, service, ServiceInput::Control(ServiceControlActor::Controller(userdata), control));
132            }
133            Input::Control(LogicControl::NetNeighbour(pair, control)) => {
134                self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::Control(pair, control));
135            }
136            Input::Control(LogicControl::Feature(to)) => {
137                self.features
138                    .input(&mut self.switcher)
139                    .on_input(&self.feature_ctx, now_ms, to.to_feature(), FeatureInput::FromWorker(to));
140            }
141            Input::Control(LogicControl::Service(service, to)) => {
142                self.services.input(&mut self.switcher).on_input(&self.service_ctx, now_ms, service, ServiceInput::FromWorker(to));
143            }
144            Input::Control(LogicControl::NetRemote(feature, conn, meta, msg)) => {
145                if let Some(ctx) = self.neighbours.conn(conn) {
146                    self.features.input(&mut self.switcher).on_input(&self.feature_ctx, now_ms, feature, FeatureInput::Net(ctx, meta, msg));
147                }
148            }
149            Input::Control(LogicControl::NetLocal(feature, meta, msg)) => {
150                self.features.input(&mut self.switcher).on_input(&self.feature_ctx, now_ms, feature, FeatureInput::Local(meta, msg));
151            }
152            Input::Control(LogicControl::ServiceEvent(service, event)) => {
153                self.services.input(&mut self.switcher).on_input(&self.service_ctx, now_ms, service, ServiceInput::FeatureEvent(event));
154            }
155            Input::Control(LogicControl::ServicesControl(actor, service, control)) => {
156                self.services
157                    .input(&mut self.switcher)
158                    .on_input(&self.service_ctx, now_ms, service, ServiceInput::Control(actor, control));
159            }
160            Input::Control(LogicControl::FeaturesControl(actor, control)) => {
161                self.features
162                    .input(&mut self.switcher)
163                    .on_input(&self.feature_ctx, now_ms, control.to_feature(), FeatureInput::Control(actor, control));
164            }
165            Input::Control(LogicControl::ExtFeaturesEvent(userdata, event)) => {
166                self.queue.push_back(Output::Ext(ExtOut::FeaturesEvent(userdata, event)));
167            }
168            Input::Control(LogicControl::ExtServicesEvent(service, userdata, event)) => {
169                self.queue.push_back(Output::Ext(ExtOut::ServicesEvent(service, userdata, event)));
170            }
171        }
172    }
173
174    pub fn on_shutdown(&mut self, now_ms: u64) {
175        if self.shutdown {
176            return;
177        }
178        log::info!("[ControllerPlane] Shutdown");
179        self.features.input(&mut self.switcher).on_shutdown(&self.feature_ctx, now_ms);
180        self.services.input(&mut self.switcher).on_shutdown(&self.service_ctx, now_ms);
181        self.neighbours.input(&mut self.switcher).on_shutdown(now_ms);
182        self.shutdown = true;
183    }
184
185    fn pop_neighbours(&mut self, now_ms: u64) {
186        let out = return_if_none!(self.neighbours.pop_output(now_ms, &mut self.switcher));
187        match out {
188            neighbours::Output::Control(remote, control) => self.queue.push_back(Output::Event(LogicEvent::NetNeighbour(remote, control))),
189            neighbours::Output::Event(event) => {
190                self.features
191                    .input(&mut self.switcher)
192                    .on_shared_input(&self.feature_ctx, now_ms, FeatureSharedInput::Connection(event.clone()));
193                self.services
194                    .input(&mut self.switcher)
195                    .on_shared_input(&self.service_ctx, now_ms, ServiceSharedInput::Connection(event.clone()));
196                match event {
197                    ConnectionEvent::Connecting(_ctx) => {}
198                    ConnectionEvent::ConnectError(_ctx, _err) => {}
199                    ConnectionEvent::Connected(ctx, secure) => self.queue.push_back(Output::Event(LogicEvent::Pin(ctx.conn, ctx.node, ctx.pair, secure))),
200                    ConnectionEvent::Stats(_ctx, _stats) => {}
201                    ConnectionEvent::Disconnected(ctx) => self.queue.push_back(Output::Event(LogicEvent::UnPin(ctx.conn))),
202                }
203            }
204            neighbours::Output::OnResourceEmpty => {
205                log::info!("[ControllerPlane] Neighbours OnResourceEmpty");
206            }
207        }
208    }
209
210    fn pop_features(&mut self, now_ms: u64) {
211        let out = return_if_none!(self.features.pop_output(now_ms, &mut self.switcher));
212
213        let (feature, out) = match out {
214            features::Output::Output(feature, out) => (feature, out),
215            features::Output::Shutdown => {
216                log::info!("[ControllerPlane] Features Shutdown");
217                return;
218            }
219        };
220
221        match out {
222            FeatureOutput::ToWorker(is_broadcast, to) => self.queue.push_back(Output::Event(LogicEvent::Feature(is_broadcast, to))),
223            FeatureOutput::Event(actor, event) => {
224                log::debug!("[Controller] send FeatureEvent to actor {:?}, event {:?}", actor, event);
225                match actor {
226                    FeatureControlActor::Controller(userdata) => self.queue.push_back(Output::Ext(ExtOut::FeaturesEvent(userdata, event))),
227                    FeatureControlActor::Worker(worker, userdata) => self.queue.push_back(Output::Event(LogicEvent::ExtFeaturesEvent(worker, userdata, event))),
228                    FeatureControlActor::Service(service) => {
229                        self.services.input(&mut self.switcher).on_input(&self.service_ctx, now_ms, service, ServiceInput::FeatureEvent(event));
230                    }
231                }
232            }
233            FeatureOutput::SendDirect(conn, meta, buf) => {
234                log::debug!("[ControllerPlane] SendDirect to conn: {:?}, len: {}", conn, buf.len());
235                let conn_ctx = return_if_none!(self.neighbours.conn(conn));
236                self.queue.push_back(Output::Event(LogicEvent::NetDirect(feature, conn_ctx.pair, conn, meta, buf)))
237            }
238            FeatureOutput::SendRoute(rule, ttl, buf) => {
239                log::debug!("[ControllerPlane] SendRoute to rule: {:?}, len: {}", rule, buf.len());
240                self.queue.push_back(Output::Event(LogicEvent::NetRoute(feature, rule, ttl, buf)))
241            }
242            FeatureOutput::NeighboursConnectTo(addr) => {
243                self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::ConnectTo(addr));
244            }
245            FeatureOutput::NeighboursDisconnectFrom(node) => {
246                self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::DisconnectFrom(node));
247            }
248            FeatureOutput::OnResourceEmpty => {
249                log::info!("[ControllerPlane] Feature {feature:?} OnResourceEmpty");
250            }
251        }
252    }
253
254    fn pop_services(&mut self, now_ms: u64) {
255        let out = return_if_none!(self.services.pop_output(now_ms, &mut self.switcher));
256
257        let (service, out) = match out {
258            services::Output::Output(service, out) => (service, out),
259            services::Output::OnResourceEmpty => {
260                log::info!("[ControllerPlane] Services OnResourceEmpty");
261                return;
262            }
263        };
264
265        match out {
266            ServiceOutput::FeatureControl(control) => {
267                self.features
268                    .input(&mut self.switcher)
269                    .on_input(&self.feature_ctx, now_ms, control.to_feature(), FeatureInput::Control(FeatureControlActor::Service(service), control));
270            }
271            ServiceOutput::Event(actor, event) => match actor {
272                ServiceControlActor::Controller(userdata) => self.queue.push_back(Output::Ext(ExtOut::ServicesEvent(service, userdata, event))),
273                ServiceControlActor::Worker(worker, userdata) => self.queue.push_back(Output::Event(LogicEvent::ExtServicesEvent(worker, service, userdata, event))),
274            },
275            ServiceOutput::BroadcastWorkers(to) => self.queue.push_back(Output::Event(LogicEvent::Service(service, to))),
276            ServiceOutput::OnResourceEmpty => {
277                log::info!("[ControllerPlane] Service {service} OnResourceEmpty");
278            }
279        }
280    }
281}
282
283impl<UserData, SC, SE, TC, TW> TaskSwitcherChild<Output<UserData, SE, TW>> for ControllerPlane<UserData, SC, SE, TC, TW>
284where
285    UserData: 'static + Hash + Copy + Eq + Debug,
286{
287    type Time = u64;
288
289    fn empty_event(&self) -> Output<UserData, SE, TW> {
290        Output::OnResourceEmpty
291    }
292
293    fn is_empty(&self) -> bool {
294        self.shutdown && self.queue.is_empty() && self.neighbours.is_empty() && self.features.is_empty() && self.services.is_empty()
295    }
296
297    fn pop_output(&mut self, now_ms: u64) -> Option<Output<UserData, SE, TW>> {
298        return_if_some!(self.queue.pop_front());
299
300        while let Some(current) = self.switcher.current() {
301            match current.try_into().expect("Should convert to TaskType") {
302                TaskType::Neighbours => self.pop_neighbours(now_ms),
303                TaskType::Feature => self.pop_features(now_ms),
304                TaskType::Service => self.pop_services(now_ms),
305            }
306
307            return_if_some!(self.queue.pop_front());
308        }
309
310        None
311    }
312}