1use std::{
2 collections::{HashMap, VecDeque},
3 fmt::Debug,
4 hash::Hash,
5 net::SocketAddr,
6 sync::Arc,
7 time::Instant,
8};
9
10use atm0s_sdn_identity::NodeId;
11use atm0s_sdn_network::{
12 base::{Authorization, HandshakeBuilder, ServiceBuilder},
13 controller_plane::ControllerPlaneCfg,
14 data_plane::{DataPlaneCfg, NetInput, NetOutput, NetPair},
15 features::{FeaturesControl, FeaturesEvent},
16 worker::{SdnWorker, SdnWorkerBusEvent, SdnWorkerCfg, SdnWorkerInput, SdnWorkerOutput},
17 ExtIn, ExtOut,
18};
19use atm0s_sdn_router::shadow::ShadowRouterHistory;
20use rand::rngs::OsRng;
21use sans_io_runtime::{
22 backend::{BackendIncoming, BackendOutgoing},
23 BusChannelControl, BusControl, BusEvent, Controller, WorkerInner, WorkerInnerInput, WorkerInnerOutput,
24};
25
26use crate::time::TimePivot;
27
28pub type SdnController<UserData, SC, SE, TC, TW> = Controller<SdnExtIn<UserData, SC>, SdnExtOut<UserData, SE>, SdnSpawnCfg, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>, 1024>;
29
30pub type SdnExtIn<UserData, SC> = ExtIn<UserData, SC>;
31pub type SdnExtOut<UserData, SE> = ExtOut<UserData, SE>;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub struct SdnOwner;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
37pub enum SdnChannel {
38 Controller,
39 Worker(u16),
40}
41
42pub type SdnEvent<UserData, SC, SE, TC, TW> = SdnWorkerBusEvent<UserData, SC, SE, TC, TW>;
43
44pub struct ControllerCfg {
45 pub session: u64,
46 pub auth: Arc<dyn Authorization>,
47 pub handshake: Arc<dyn HandshakeBuilder>,
48 #[cfg(feature = "vpn")]
49 pub vpn_tun_device: Option<sans_io_runtime::backend::tun::TunDevice>,
50}
51
52pub struct SdnInnerCfg<UserData, SC, SE, TC, TW> {
53 pub node_id: NodeId,
54 pub tick_ms: u64,
55 pub bind_addrs: Vec<SocketAddr>,
56 pub controller: Option<ControllerCfg>,
57 #[allow(clippy::type_complexity)]
58 pub services: Vec<Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>>,
59 pub history: Arc<dyn ShadowRouterHistory>,
60 #[cfg(feature = "vpn")]
61 pub vpn_tun_fd: Option<sans_io_runtime::backend::tun::TunFd>,
62}
63
64pub type SdnSpawnCfg = ();
65
66pub struct SdnWorkerInner<UserData, SC, SE, TC, TW> {
67 worker: u16,
68 worker_inner: SdnWorker<UserData, SC, SE, TC, TW>,
69 timer: TimePivot,
70 #[cfg(feature = "vpn")]
71 _vpn_tun_device: Option<sans_io_runtime::backend::tun::TunDevice>,
72 bind_addrs: HashMap<SocketAddr, usize>,
73 bind_slots: HashMap<usize, SocketAddr>,
74 #[cfg(feature = "vpn")]
75 tun_backend_slot: Option<usize>,
76 #[allow(clippy::type_complexity)]
77 queue: VecDeque<WorkerInnerOutput<SdnOwner, SdnExtOut<UserData, SE>, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>, SdnSpawnCfg>>,
78 shutdown: bool,
79}
80
81#[allow(clippy::type_complexity)]
82impl<UserData: 'static + Eq + Copy + Hash + Debug, SC: Debug, SE: Debug, TC: Debug, TW: Debug> SdnWorkerInner<UserData, SC, SE, TC, TW> {
83 fn convert_output(
84 &mut self,
85 now_ms: u64,
86 event: SdnWorkerOutput<UserData, SC, SE, TC, TW>,
87 ) -> Option<WorkerInnerOutput<SdnOwner, SdnExtOut<UserData, SE>, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>, SdnSpawnCfg>> {
88 match event {
89 SdnWorkerOutput::Ext(ext) => Some(WorkerInnerOutput::Ext(true, ext)),
90 SdnWorkerOutput::ExtWorker(_) => {
91 panic!("should not have ExtWorker with standalone node")
92 }
93 SdnWorkerOutput::Net(net) => {
94 let out = match net {
95 NetOutput::UdpPacket(pair, data) => BackendOutgoing::UdpPacket {
96 slot: *self.bind_addrs.get(&pair.local)?,
97 to: pair.remote,
98 data,
99 },
100 NetOutput::UdpPackets(pairs, data) => {
101 let to = pairs.into_iter().filter_map(|p| self.bind_addrs.get(&p.local).map(|s| (*s, p.remote))).collect::<Vec<_>>();
102 BackendOutgoing::UdpPackets2 { to, data }
103 }
104 #[cfg(feature = "vpn")]
105 NetOutput::TunPacket(data) => BackendOutgoing::TunPacket {
106 slot: self.tun_backend_slot.expect("should have tun"),
107 data,
108 },
109 };
110 Some(WorkerInnerOutput::Net(SdnOwner, out))
111 }
112 SdnWorkerOutput::Bus(event) => match &event {
113 SdnWorkerBusEvent::Control(..) => Some(WorkerInnerOutput::Bus(BusControl::Channel(SdnOwner, BusChannelControl::Publish(SdnChannel::Controller, true, event)))),
114 SdnWorkerBusEvent::Workers(..) => Some(WorkerInnerOutput::Bus(BusControl::Broadcast(true, event))),
115 SdnWorkerBusEvent::Worker(worker, _msg) => Some(WorkerInnerOutput::Bus(BusControl::Channel(
116 SdnOwner,
117 BusChannelControl::Publish(SdnChannel::Worker(*worker), true, event),
118 ))),
119 },
120 SdnWorkerOutput::Continue => {
121 let out = self.worker_inner.pop_output2(now_ms)?;
123 self.convert_output(now_ms, out)
124 }
125 SdnWorkerOutput::OnResourceEmpty => Some(WorkerInnerOutput::Continue),
126 }
127 }
128}
129
130impl<UserData: 'static + Eq + Copy + Hash + Debug, SC: Debug, SE: Debug, TC: Debug, TW: Debug>
131 WorkerInner<SdnOwner, SdnExtIn<UserData, SC>, SdnExtOut<UserData, SE>, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>, SdnInnerCfg<UserData, SC, SE, TC, TW>, SdnSpawnCfg>
132 for SdnWorkerInner<UserData, SC, SE, TC, TW>
133{
134 fn build(worker: u16, cfg: SdnInnerCfg<UserData, SC, SE, TC, TW>) -> Self {
135 let mut queue = VecDeque::from([WorkerInnerOutput::Bus(BusControl::Channel(SdnOwner, BusChannelControl::Subscribe(SdnChannel::Worker(worker))))]);
136
137 for addr in &cfg.bind_addrs {
138 queue.push_back(WorkerInnerOutput::Net(SdnOwner, BackendOutgoing::UdpListen { addr: *addr, reuse: true }));
139 }
140
141 #[cfg(feature = "vpn")]
142 if let Some(fd) = cfg.vpn_tun_fd {
143 queue.push_back(WorkerInnerOutput::Net(SdnOwner, BackendOutgoing::TunBind { fd }));
144 }
145 if let Some(controller) = cfg.controller {
146 queue.push_back(WorkerInnerOutput::Bus(BusControl::Channel(SdnOwner, BusChannelControl::Subscribe(SdnChannel::Controller))));
147 log::info!("Create controller worker");
148 Self {
149 worker,
150 worker_inner: SdnWorker::new(SdnWorkerCfg {
151 node_id: cfg.node_id,
152 tick_ms: cfg.tick_ms,
153 controller: Some(ControllerPlaneCfg {
154 bind_addrs: cfg.bind_addrs,
155 authorization: controller.auth,
156 handshake_builder: controller.handshake,
157 session: controller.session,
158 random: Box::new(OsRng),
159 services: cfg.services.clone(),
160 history: cfg.history.clone(),
161 }),
162 data: DataPlaneCfg {
163 worker_id: worker,
164 services: cfg.services,
165 history: cfg.history,
166 },
167 }),
168 timer: TimePivot::build(),
169 #[cfg(feature = "vpn")]
170 _vpn_tun_device: controller.vpn_tun_device,
171 queue,
172 shutdown: false,
173 bind_addrs: Default::default(),
174 bind_slots: Default::default(),
175 #[cfg(feature = "vpn")]
176 tun_backend_slot: None,
177 }
178 } else {
179 log::info!("Create data only worker");
180 Self {
181 worker,
182 worker_inner: SdnWorker::new(SdnWorkerCfg {
183 node_id: cfg.node_id,
184 tick_ms: cfg.tick_ms,
185 controller: None,
186 data: DataPlaneCfg {
187 worker_id: worker,
188 services: cfg.services,
189 history: cfg.history,
190 },
191 }),
192 timer: TimePivot::build(),
193 #[cfg(feature = "vpn")]
194 _vpn_tun_device: None,
195 queue,
196 shutdown: false,
197 bind_addrs: Default::default(),
198 bind_slots: Default::default(),
199 #[cfg(feature = "vpn")]
200 tun_backend_slot: None,
201 }
202 }
203 }
204
205 fn worker_index(&self) -> u16 {
206 self.worker
207 }
208
209 fn tasks(&self) -> usize {
210 self.worker_inner.tasks()
211 }
212
213 fn is_empty(&self) -> bool {
214 self.shutdown && self.queue.is_empty() && self.worker_inner.is_empty()
215 }
216
217 fn spawn(&mut self, _now: Instant, _cfg: SdnSpawnCfg) {
218 panic!("Spawn not supported")
219 }
220
221 fn on_tick(&mut self, now: Instant) {
222 let now_ms = self.timer.timestamp_ms(now);
223 self.worker_inner.on_tick(now_ms);
224 }
225
226 fn on_event(&mut self, now: Instant, event: WorkerInnerInput<SdnOwner, SdnExtIn<UserData, SC>, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>>) {
227 let now_ms = self.timer.timestamp_ms(now);
228 match event {
229 WorkerInnerInput::Net(_, event) => match event {
230 BackendIncoming::UdpListenResult { bind: _, result } => {
231 if let Ok((addr, slot)) = result {
232 log::info!("Worker {} bind addr {addr} to slot {slot}", self.worker);
233 self.bind_addrs.insert(addr, slot);
234 self.bind_slots.insert(slot, addr);
235 }
236 }
237 BackendIncoming::UdpPacket { slot, from, data } => {
238 let local = *self.bind_slots.get(&slot).expect("Should have local addr");
239 let pair = NetPair::new(local, from);
240 self.worker_inner.on_event(now_ms, SdnWorkerInput::Net(NetInput::UdpPacket(pair, data)))
241 }
242 #[cfg(feature = "vpn")]
243 BackendIncoming::TunBindResult { result } => {
244 self.tun_backend_slot = Some(result.expect("Should have slot"));
245 }
246 #[cfg(feature = "vpn")]
247 BackendIncoming::TunPacket { slot: _, data } => self.worker_inner.on_event(now_ms, SdnWorkerInput::Net(NetInput::TunPacket(data))),
248 },
249 WorkerInnerInput::Bus(event) => match event {
250 BusEvent::Broadcast(_from_worker, msg) => self.worker_inner.on_event(now_ms, SdnWorkerInput::Bus(msg)),
251 BusEvent::Channel(_, _, msg) => self.worker_inner.on_event(now_ms, SdnWorkerInput::Bus(msg)),
252 },
253 WorkerInnerInput::Ext(ext) => self.worker_inner.on_event(now_ms, SdnWorkerInput::Ext(ext)),
254 };
255 }
256
257 fn pop_output(&mut self, now: Instant) -> Option<WorkerInnerOutput<SdnOwner, SdnExtOut<UserData, SE>, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>, SdnSpawnCfg>> {
258 if let Some(e) = self.queue.pop_front() {
259 return Some(e);
260 }
261 let now_ms = self.timer.timestamp_ms(now);
262 let out = self.worker_inner.pop_output2(now_ms)?;
263 self.convert_output(now_ms, out)
264 }
265
266 fn on_shutdown(&mut self, now: Instant) {
267 if self.shutdown {
268 return;
269 }
270 let now_ms = self.timer.timestamp_ms(now);
271 self.worker_inner.on_shutdown(now_ms);
272 for slot in self.bind_addrs.values() {
273 self.queue.push_back(WorkerInnerOutput::Net(SdnOwner, BackendOutgoing::UdpUnlisten { slot: *slot }));
274 }
275 self.shutdown = true;
276 }
277}