atm0s_sdn/
worker_inner.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    fmt::Debug,
4    hash::Hash,
5    net::SocketAddr,
6    sync::Arc,
7    time::Instant,
8};
9
10use atm0s_sdn_identity::NodeId;
11use atm0s_sdn_network::{
12    base::{Authorization, HandshakeBuilder, ServiceBuilder},
13    controller_plane::ControllerPlaneCfg,
14    data_plane::{DataPlaneCfg, NetInput, NetOutput, NetPair},
15    features::{FeaturesControl, FeaturesEvent},
16    worker::{SdnWorker, SdnWorkerBusEvent, SdnWorkerCfg, SdnWorkerInput, SdnWorkerOutput},
17    ExtIn, ExtOut,
18};
19use atm0s_sdn_router::shadow::ShadowRouterHistory;
20use rand::rngs::OsRng;
21use sans_io_runtime::{
22    backend::{BackendIncoming, BackendOutgoing},
23    BusChannelControl, BusControl, BusEvent, Controller, WorkerInner, WorkerInnerInput, WorkerInnerOutput,
24};
25
26use crate::time::TimePivot;
27
28pub type SdnController<UserData, SC, SE, TC, TW> = Controller<SdnExtIn<UserData, SC>, SdnExtOut<UserData, SE>, SdnSpawnCfg, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>, 1024>;
29
30pub type SdnExtIn<UserData, SC> = ExtIn<UserData, SC>;
31pub type SdnExtOut<UserData, SE> = ExtOut<UserData, SE>;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub struct SdnOwner;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
37pub enum SdnChannel {
38    Controller,
39    Worker(u16),
40}
41
42pub type SdnEvent<UserData, SC, SE, TC, TW> = SdnWorkerBusEvent<UserData, SC, SE, TC, TW>;
43
44pub struct ControllerCfg {
45    pub session: u64,
46    pub auth: Arc<dyn Authorization>,
47    pub handshake: Arc<dyn HandshakeBuilder>,
48    #[cfg(feature = "vpn")]
49    pub vpn_tun_device: Option<sans_io_runtime::backend::tun::TunDevice>,
50}
51
52pub struct SdnInnerCfg<UserData, SC, SE, TC, TW> {
53    pub node_id: NodeId,
54    pub tick_ms: u64,
55    pub bind_addrs: Vec<SocketAddr>,
56    pub controller: Option<ControllerCfg>,
57    #[allow(clippy::type_complexity)]
58    pub services: Vec<Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>>,
59    pub history: Arc<dyn ShadowRouterHistory>,
60    #[cfg(feature = "vpn")]
61    pub vpn_tun_fd: Option<sans_io_runtime::backend::tun::TunFd>,
62}
63
64pub type SdnSpawnCfg = ();
65
66pub struct SdnWorkerInner<UserData, SC, SE, TC, TW> {
67    worker: u16,
68    worker_inner: SdnWorker<UserData, SC, SE, TC, TW>,
69    timer: TimePivot,
70    #[cfg(feature = "vpn")]
71    _vpn_tun_device: Option<sans_io_runtime::backend::tun::TunDevice>,
72    bind_addrs: HashMap<SocketAddr, usize>,
73    bind_slots: HashMap<usize, SocketAddr>,
74    #[cfg(feature = "vpn")]
75    tun_backend_slot: Option<usize>,
76    #[allow(clippy::type_complexity)]
77    queue: VecDeque<WorkerInnerOutput<SdnOwner, SdnExtOut<UserData, SE>, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>, SdnSpawnCfg>>,
78    shutdown: bool,
79}
80
81#[allow(clippy::type_complexity)]
82impl<UserData: 'static + Eq + Copy + Hash + Debug, SC: Debug, SE: Debug, TC: Debug, TW: Debug> SdnWorkerInner<UserData, SC, SE, TC, TW> {
83    fn convert_output(
84        &mut self,
85        now_ms: u64,
86        event: SdnWorkerOutput<UserData, SC, SE, TC, TW>,
87    ) -> Option<WorkerInnerOutput<SdnOwner, SdnExtOut<UserData, SE>, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>, SdnSpawnCfg>> {
88        match event {
89            SdnWorkerOutput::Ext(ext) => Some(WorkerInnerOutput::Ext(true, ext)),
90            SdnWorkerOutput::ExtWorker(_) => {
91                panic!("should not have ExtWorker with standalone node")
92            }
93            SdnWorkerOutput::Net(net) => {
94                let out = match net {
95                    NetOutput::UdpPacket(pair, data) => BackendOutgoing::UdpPacket {
96                        slot: *self.bind_addrs.get(&pair.local)?,
97                        to: pair.remote,
98                        data,
99                    },
100                    NetOutput::UdpPackets(pairs, data) => {
101                        let to = pairs.into_iter().filter_map(|p| self.bind_addrs.get(&p.local).map(|s| (*s, p.remote))).collect::<Vec<_>>();
102                        BackendOutgoing::UdpPackets2 { to, data }
103                    }
104                    #[cfg(feature = "vpn")]
105                    NetOutput::TunPacket(data) => BackendOutgoing::TunPacket {
106                        slot: self.tun_backend_slot.expect("should have tun"),
107                        data,
108                    },
109                };
110                Some(WorkerInnerOutput::Net(SdnOwner, out))
111            }
112            SdnWorkerOutput::Bus(event) => match &event {
113                SdnWorkerBusEvent::Control(..) => Some(WorkerInnerOutput::Bus(BusControl::Channel(SdnOwner, BusChannelControl::Publish(SdnChannel::Controller, true, event)))),
114                SdnWorkerBusEvent::Workers(..) => Some(WorkerInnerOutput::Bus(BusControl::Broadcast(true, event))),
115                SdnWorkerBusEvent::Worker(worker, _msg) => Some(WorkerInnerOutput::Bus(BusControl::Channel(
116                    SdnOwner,
117                    BusChannelControl::Publish(SdnChannel::Worker(*worker), true, event),
118                ))),
119            },
120            SdnWorkerOutput::Continue => {
121                //we need to continue pop for continue gather output
122                let out = self.worker_inner.pop_output2(now_ms)?;
123                self.convert_output(now_ms, out)
124            }
125            SdnWorkerOutput::OnResourceEmpty => Some(WorkerInnerOutput::Continue),
126        }
127    }
128}
129
130impl<UserData: 'static + Eq + Copy + Hash + Debug, SC: Debug, SE: Debug, TC: Debug, TW: Debug>
131    WorkerInner<SdnOwner, SdnExtIn<UserData, SC>, SdnExtOut<UserData, SE>, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>, SdnInnerCfg<UserData, SC, SE, TC, TW>, SdnSpawnCfg>
132    for SdnWorkerInner<UserData, SC, SE, TC, TW>
133{
134    fn build(worker: u16, cfg: SdnInnerCfg<UserData, SC, SE, TC, TW>) -> Self {
135        let mut queue = VecDeque::from([WorkerInnerOutput::Bus(BusControl::Channel(SdnOwner, BusChannelControl::Subscribe(SdnChannel::Worker(worker))))]);
136
137        for addr in &cfg.bind_addrs {
138            queue.push_back(WorkerInnerOutput::Net(SdnOwner, BackendOutgoing::UdpListen { addr: *addr, reuse: true }));
139        }
140
141        #[cfg(feature = "vpn")]
142        if let Some(fd) = cfg.vpn_tun_fd {
143            queue.push_back(WorkerInnerOutput::Net(SdnOwner, BackendOutgoing::TunBind { fd }));
144        }
145        if let Some(controller) = cfg.controller {
146            queue.push_back(WorkerInnerOutput::Bus(BusControl::Channel(SdnOwner, BusChannelControl::Subscribe(SdnChannel::Controller))));
147            log::info!("Create controller worker");
148            Self {
149                worker,
150                worker_inner: SdnWorker::new(SdnWorkerCfg {
151                    node_id: cfg.node_id,
152                    tick_ms: cfg.tick_ms,
153                    controller: Some(ControllerPlaneCfg {
154                        bind_addrs: cfg.bind_addrs,
155                        authorization: controller.auth,
156                        handshake_builder: controller.handshake,
157                        session: controller.session,
158                        random: Box::new(OsRng),
159                        services: cfg.services.clone(),
160                        history: cfg.history.clone(),
161                    }),
162                    data: DataPlaneCfg {
163                        worker_id: worker,
164                        services: cfg.services,
165                        history: cfg.history,
166                    },
167                }),
168                timer: TimePivot::build(),
169                #[cfg(feature = "vpn")]
170                _vpn_tun_device: controller.vpn_tun_device,
171                queue,
172                shutdown: false,
173                bind_addrs: Default::default(),
174                bind_slots: Default::default(),
175                #[cfg(feature = "vpn")]
176                tun_backend_slot: None,
177            }
178        } else {
179            log::info!("Create data only worker");
180            Self {
181                worker,
182                worker_inner: SdnWorker::new(SdnWorkerCfg {
183                    node_id: cfg.node_id,
184                    tick_ms: cfg.tick_ms,
185                    controller: None,
186                    data: DataPlaneCfg {
187                        worker_id: worker,
188                        services: cfg.services,
189                        history: cfg.history,
190                    },
191                }),
192                timer: TimePivot::build(),
193                #[cfg(feature = "vpn")]
194                _vpn_tun_device: None,
195                queue,
196                shutdown: false,
197                bind_addrs: Default::default(),
198                bind_slots: Default::default(),
199                #[cfg(feature = "vpn")]
200                tun_backend_slot: None,
201            }
202        }
203    }
204
205    fn worker_index(&self) -> u16 {
206        self.worker
207    }
208
209    fn tasks(&self) -> usize {
210        self.worker_inner.tasks()
211    }
212
213    fn is_empty(&self) -> bool {
214        self.shutdown && self.queue.is_empty() && self.worker_inner.is_empty()
215    }
216
217    fn spawn(&mut self, _now: Instant, _cfg: SdnSpawnCfg) {
218        panic!("Spawn not supported")
219    }
220
221    fn on_tick(&mut self, now: Instant) {
222        let now_ms = self.timer.timestamp_ms(now);
223        self.worker_inner.on_tick(now_ms);
224    }
225
226    fn on_event(&mut self, now: Instant, event: WorkerInnerInput<SdnOwner, SdnExtIn<UserData, SC>, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>>) {
227        let now_ms = self.timer.timestamp_ms(now);
228        match event {
229            WorkerInnerInput::Net(_, event) => match event {
230                BackendIncoming::UdpListenResult { bind: _, result } => {
231                    if let Ok((addr, slot)) = result {
232                        log::info!("Worker {} bind addr {addr} to slot {slot}", self.worker);
233                        self.bind_addrs.insert(addr, slot);
234                        self.bind_slots.insert(slot, addr);
235                    }
236                }
237                BackendIncoming::UdpPacket { slot, from, data } => {
238                    let local = *self.bind_slots.get(&slot).expect("Should have local addr");
239                    let pair = NetPair::new(local, from);
240                    self.worker_inner.on_event(now_ms, SdnWorkerInput::Net(NetInput::UdpPacket(pair, data)))
241                }
242                #[cfg(feature = "vpn")]
243                BackendIncoming::TunBindResult { result } => {
244                    self.tun_backend_slot = Some(result.expect("Should have slot"));
245                }
246                #[cfg(feature = "vpn")]
247                BackendIncoming::TunPacket { slot: _, data } => self.worker_inner.on_event(now_ms, SdnWorkerInput::Net(NetInput::TunPacket(data))),
248            },
249            WorkerInnerInput::Bus(event) => match event {
250                BusEvent::Broadcast(_from_worker, msg) => self.worker_inner.on_event(now_ms, SdnWorkerInput::Bus(msg)),
251                BusEvent::Channel(_, _, msg) => self.worker_inner.on_event(now_ms, SdnWorkerInput::Bus(msg)),
252            },
253            WorkerInnerInput::Ext(ext) => self.worker_inner.on_event(now_ms, SdnWorkerInput::Ext(ext)),
254        };
255    }
256
257    fn pop_output(&mut self, now: Instant) -> Option<WorkerInnerOutput<SdnOwner, SdnExtOut<UserData, SE>, SdnChannel, SdnEvent<UserData, SC, SE, TC, TW>, SdnSpawnCfg>> {
258        if let Some(e) = self.queue.pop_front() {
259            return Some(e);
260        }
261        let now_ms = self.timer.timestamp_ms(now);
262        let out = self.worker_inner.pop_output2(now_ms)?;
263        self.convert_output(now_ms, out)
264    }
265
266    fn on_shutdown(&mut self, now: Instant) {
267        if self.shutdown {
268            return;
269        }
270        let now_ms = self.timer.timestamp_ms(now);
271        self.worker_inner.on_shutdown(now_ms);
272        for slot in self.bind_addrs.values() {
273            self.queue.push_back(WorkerInnerOutput::Net(SdnOwner, BackendOutgoing::UdpUnlisten { slot: *slot }));
274        }
275        self.shutdown = true;
276    }
277}