1use std::{collections::VecDeque, fmt::Debug, hash::Hash, net::SocketAddr, sync::Arc};
2
3use atm0s_sdn_identity::NodeId;
4use atm0s_sdn_router::shadow::ShadowRouterHistory;
5use rand::RngCore;
6use sans_io_runtime::{return_if_none, return_if_some, TaskSwitcher, TaskSwitcherBranch, TaskSwitcherChild};
7
8use crate::{
9 base::{
10 Authorization, ConnectionEvent, FeatureContext, FeatureControlActor, FeatureInput, FeatureOutput, FeatureSharedInput, HandshakeBuilder, ServiceBuilder, ServiceControlActor, ServiceCtx,
11 ServiceInput, ServiceOutput, ServiceSharedInput,
12 },
13 features::{FeaturesControl, FeaturesEvent},
14 ExtIn, ExtOut, LogicControl, LogicEvent,
15};
16
17use self::{features::FeatureManager, neighbours::NeighboursManager, services::ServiceManager};
18
19mod features;
20mod neighbours;
21mod services;
22
23#[derive(Debug, Clone, convert_enum::From)]
24pub enum Input<UserData, SC, SE, TC> {
25 Ext(ExtIn<UserData, SC>),
26 Control(LogicControl<UserData, SC, SE, TC>),
27}
28
29#[derive(Debug, Clone, convert_enum::From)]
30pub enum Output<UserData, SE, TW> {
31 Ext(ExtOut<UserData, SE>),
32 Event(LogicEvent<UserData, SE, TW>),
33 #[convert_enum(optout)]
34 OnResourceEmpty,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive, num_enum::IntoPrimitive)]
38#[repr(usize)]
39enum TaskType {
40 Neighbours = 0,
41 Feature = 1,
42 Service = 2,
43}
44
45pub struct ControllerPlaneCfg<UserData, SC, SE, TC, TW> {
46 pub session: u64,
47 pub bind_addrs: Vec<SocketAddr>,
48 #[allow(clippy::type_complexity)]
49 pub services: Vec<Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>>,
50 pub authorization: Arc<dyn Authorization>,
51 pub handshake_builder: Arc<dyn HandshakeBuilder>,
52 pub random: Box<dyn RngCore + Send + Sync>,
53 pub history: Arc<dyn ShadowRouterHistory>,
54}
55
56pub struct ControllerPlane<UserData, SC, SE, TC, TW> {
57 tick_count: u64,
58 feature_ctx: FeatureContext,
59 service_ctx: ServiceCtx,
60 neighbours: TaskSwitcherBranch<NeighboursManager, neighbours::Output>,
61 features: TaskSwitcherBranch<FeatureManager<UserData>, features::Output<UserData>>,
62 #[allow(clippy::type_complexity)]
63 services: TaskSwitcherBranch<ServiceManager<UserData, SC, SE, TC, TW>, services::Output<UserData, SE, TW>>,
64 switcher: TaskSwitcher,
65 queue: VecDeque<Output<UserData, SE, TW>>,
66 shutdown: bool,
67 history: Arc<dyn ShadowRouterHistory>,
68}
69
70impl<UserData, SC, SE, TC, TW> ControllerPlane<UserData, SC, SE, TC, TW>
71where
72 UserData: 'static + Hash + Copy + Eq + Debug,
73{
74 pub fn new(node_id: NodeId, cfg: ControllerPlaneCfg<UserData, SC, SE, TC, TW>) -> Self {
85 log::info!("Create ControllerPlane for node: {}, running session {}", node_id, cfg.session);
86 let service_ids = cfg.services.iter().filter(|s| s.discoverable()).map(|s| s.service_id()).collect();
87
88 Self {
89 tick_count: 0,
90 feature_ctx: FeatureContext { node_id, session: cfg.session },
91 service_ctx: ServiceCtx { node_id, session: cfg.session },
92 neighbours: TaskSwitcherBranch::new(
93 NeighboursManager::new(node_id, cfg.bind_addrs, cfg.authorization, cfg.handshake_builder, cfg.random),
94 TaskType::Neighbours,
95 ),
96 features: TaskSwitcherBranch::new(FeatureManager::new(node_id, cfg.session, service_ids), TaskType::Feature),
97 services: TaskSwitcherBranch::new(ServiceManager::new(cfg.services), TaskType::Service),
98 switcher: TaskSwitcher::new(3), queue: VecDeque::new(),
100 shutdown: false,
101 history: cfg.history,
102 }
103 }
104
105 pub fn on_tick(&mut self, now_ms: u64) {
106 log::trace!("[ControllerPlane] on_tick: {}", now_ms);
107 self.neighbours.input(&mut self.switcher).on_tick(now_ms, self.tick_count);
108 self.features
109 .input(&mut self.switcher)
110 .on_shared_input(&self.feature_ctx, now_ms, FeatureSharedInput::Tick(self.tick_count));
111 self.services
112 .input(&mut self.switcher)
113 .on_shared_input(&self.service_ctx, now_ms, ServiceSharedInput::Tick(self.tick_count));
114 self.tick_count += 1;
115 self.history.set_ts(now_ms);
116 }
117
118 pub fn on_event(&mut self, now_ms: u64, event: Input<UserData, SC, SE, TC>) {
119 match event {
120 Input::Ext(ExtIn::FeaturesControl(userdata, control)) => {
121 self.features.input(&mut self.switcher).on_input(
122 &self.feature_ctx,
123 now_ms,
124 control.to_feature(),
125 FeatureInput::Control(FeatureControlActor::Controller(userdata), control),
126 );
127 }
128 Input::Ext(ExtIn::ServicesControl(service, userdata, control)) => {
129 self.services
130 .input(&mut self.switcher)
131 .on_input(&self.service_ctx, now_ms, service, ServiceInput::Control(ServiceControlActor::Controller(userdata), control));
132 }
133 Input::Control(LogicControl::NetNeighbour(pair, control)) => {
134 self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::Control(pair, control));
135 }
136 Input::Control(LogicControl::Feature(to)) => {
137 self.features
138 .input(&mut self.switcher)
139 .on_input(&self.feature_ctx, now_ms, to.to_feature(), FeatureInput::FromWorker(to));
140 }
141 Input::Control(LogicControl::Service(service, to)) => {
142 self.services.input(&mut self.switcher).on_input(&self.service_ctx, now_ms, service, ServiceInput::FromWorker(to));
143 }
144 Input::Control(LogicControl::NetRemote(feature, conn, meta, msg)) => {
145 if let Some(ctx) = self.neighbours.conn(conn) {
146 self.features.input(&mut self.switcher).on_input(&self.feature_ctx, now_ms, feature, FeatureInput::Net(ctx, meta, msg));
147 }
148 }
149 Input::Control(LogicControl::NetLocal(feature, meta, msg)) => {
150 self.features.input(&mut self.switcher).on_input(&self.feature_ctx, now_ms, feature, FeatureInput::Local(meta, msg));
151 }
152 Input::Control(LogicControl::ServiceEvent(service, event)) => {
153 self.services.input(&mut self.switcher).on_input(&self.service_ctx, now_ms, service, ServiceInput::FeatureEvent(event));
154 }
155 Input::Control(LogicControl::ServicesControl(actor, service, control)) => {
156 self.services
157 .input(&mut self.switcher)
158 .on_input(&self.service_ctx, now_ms, service, ServiceInput::Control(actor, control));
159 }
160 Input::Control(LogicControl::FeaturesControl(actor, control)) => {
161 self.features
162 .input(&mut self.switcher)
163 .on_input(&self.feature_ctx, now_ms, control.to_feature(), FeatureInput::Control(actor, control));
164 }
165 Input::Control(LogicControl::ExtFeaturesEvent(userdata, event)) => {
166 self.queue.push_back(Output::Ext(ExtOut::FeaturesEvent(userdata, event)));
167 }
168 Input::Control(LogicControl::ExtServicesEvent(service, userdata, event)) => {
169 self.queue.push_back(Output::Ext(ExtOut::ServicesEvent(service, userdata, event)));
170 }
171 }
172 }
173
174 pub fn on_shutdown(&mut self, now_ms: u64) {
175 if self.shutdown {
176 return;
177 }
178 log::info!("[ControllerPlane] Shutdown");
179 self.features.input(&mut self.switcher).on_shutdown(&self.feature_ctx, now_ms);
180 self.services.input(&mut self.switcher).on_shutdown(&self.service_ctx, now_ms);
181 self.neighbours.input(&mut self.switcher).on_shutdown(now_ms);
182 self.shutdown = true;
183 }
184
185 fn pop_neighbours(&mut self, now_ms: u64) {
186 let out = return_if_none!(self.neighbours.pop_output(now_ms, &mut self.switcher));
187 match out {
188 neighbours::Output::Control(remote, control) => self.queue.push_back(Output::Event(LogicEvent::NetNeighbour(remote, control))),
189 neighbours::Output::Event(event) => {
190 self.features
191 .input(&mut self.switcher)
192 .on_shared_input(&self.feature_ctx, now_ms, FeatureSharedInput::Connection(event.clone()));
193 self.services
194 .input(&mut self.switcher)
195 .on_shared_input(&self.service_ctx, now_ms, ServiceSharedInput::Connection(event.clone()));
196 match event {
197 ConnectionEvent::Connecting(_ctx) => {}
198 ConnectionEvent::ConnectError(_ctx, _err) => {}
199 ConnectionEvent::Connected(ctx, secure) => self.queue.push_back(Output::Event(LogicEvent::Pin(ctx.conn, ctx.node, ctx.pair, secure))),
200 ConnectionEvent::Stats(_ctx, _stats) => {}
201 ConnectionEvent::Disconnected(ctx) => self.queue.push_back(Output::Event(LogicEvent::UnPin(ctx.conn))),
202 }
203 }
204 neighbours::Output::OnResourceEmpty => {
205 log::info!("[ControllerPlane] Neighbours OnResourceEmpty");
206 }
207 }
208 }
209
210 fn pop_features(&mut self, now_ms: u64) {
211 let out = return_if_none!(self.features.pop_output(now_ms, &mut self.switcher));
212
213 let (feature, out) = match out {
214 features::Output::Output(feature, out) => (feature, out),
215 features::Output::Shutdown => {
216 log::info!("[ControllerPlane] Features Shutdown");
217 return;
218 }
219 };
220
221 match out {
222 FeatureOutput::ToWorker(is_broadcast, to) => self.queue.push_back(Output::Event(LogicEvent::Feature(is_broadcast, to))),
223 FeatureOutput::Event(actor, event) => {
224 log::debug!("[Controller] send FeatureEvent to actor {:?}, event {:?}", actor, event);
225 match actor {
226 FeatureControlActor::Controller(userdata) => self.queue.push_back(Output::Ext(ExtOut::FeaturesEvent(userdata, event))),
227 FeatureControlActor::Worker(worker, userdata) => self.queue.push_back(Output::Event(LogicEvent::ExtFeaturesEvent(worker, userdata, event))),
228 FeatureControlActor::Service(service) => {
229 self.services.input(&mut self.switcher).on_input(&self.service_ctx, now_ms, service, ServiceInput::FeatureEvent(event));
230 }
231 }
232 }
233 FeatureOutput::SendDirect(conn, meta, buf) => {
234 log::debug!("[ControllerPlane] SendDirect to conn: {:?}, len: {}", conn, buf.len());
235 let conn_ctx = return_if_none!(self.neighbours.conn(conn));
236 self.queue.push_back(Output::Event(LogicEvent::NetDirect(feature, conn_ctx.pair, conn, meta, buf)))
237 }
238 FeatureOutput::SendRoute(rule, ttl, buf) => {
239 log::debug!("[ControllerPlane] SendRoute to rule: {:?}, len: {}", rule, buf.len());
240 self.queue.push_back(Output::Event(LogicEvent::NetRoute(feature, rule, ttl, buf)))
241 }
242 FeatureOutput::NeighboursConnectTo(addr) => {
243 self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::ConnectTo(addr));
244 }
245 FeatureOutput::NeighboursDisconnectFrom(node) => {
246 self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::DisconnectFrom(node));
247 }
248 FeatureOutput::OnResourceEmpty => {
249 log::info!("[ControllerPlane] Feature {feature:?} OnResourceEmpty");
250 }
251 }
252 }
253
254 fn pop_services(&mut self, now_ms: u64) {
255 let out = return_if_none!(self.services.pop_output(now_ms, &mut self.switcher));
256
257 let (service, out) = match out {
258 services::Output::Output(service, out) => (service, out),
259 services::Output::OnResourceEmpty => {
260 log::info!("[ControllerPlane] Services OnResourceEmpty");
261 return;
262 }
263 };
264
265 match out {
266 ServiceOutput::FeatureControl(control) => {
267 self.features
268 .input(&mut self.switcher)
269 .on_input(&self.feature_ctx, now_ms, control.to_feature(), FeatureInput::Control(FeatureControlActor::Service(service), control));
270 }
271 ServiceOutput::Event(actor, event) => match actor {
272 ServiceControlActor::Controller(userdata) => self.queue.push_back(Output::Ext(ExtOut::ServicesEvent(service, userdata, event))),
273 ServiceControlActor::Worker(worker, userdata) => self.queue.push_back(Output::Event(LogicEvent::ExtServicesEvent(worker, service, userdata, event))),
274 },
275 ServiceOutput::BroadcastWorkers(to) => self.queue.push_back(Output::Event(LogicEvent::Service(service, to))),
276 ServiceOutput::OnResourceEmpty => {
277 log::info!("[ControllerPlane] Service {service} OnResourceEmpty");
278 }
279 }
280 }
281}
282
283impl<UserData, SC, SE, TC, TW> TaskSwitcherChild<Output<UserData, SE, TW>> for ControllerPlane<UserData, SC, SE, TC, TW>
284where
285 UserData: 'static + Hash + Copy + Eq + Debug,
286{
287 type Time = u64;
288
289 fn empty_event(&self) -> Output<UserData, SE, TW> {
290 Output::OnResourceEmpty
291 }
292
293 fn is_empty(&self) -> bool {
294 self.shutdown && self.queue.is_empty() && self.neighbours.is_empty() && self.features.is_empty() && self.services.is_empty()
295 }
296
297 fn pop_output(&mut self, now_ms: u64) -> Option<Output<UserData, SE, TW>> {
298 return_if_some!(self.queue.pop_front());
299
300 while let Some(current) = self.switcher.current() {
301 match current.try_into().expect("Should convert to TaskType") {
302 TaskType::Neighbours => self.pop_neighbours(now_ms),
303 TaskType::Feature => self.pop_features(now_ms),
304 TaskType::Service => self.pop_services(now_ms),
305 }
306
307 return_if_some!(self.queue.pop_front());
308 }
309
310 None
311 }
312}