1use std::{
2 collections::HashMap,
3 fmt::{Debug, Display},
4 hash::Hash,
5 net::{AddrParseError, SocketAddr},
6 sync::Arc,
7};
8
9use atm0s_sdn_identity::{ConnId, NodeId};
10use atm0s_sdn_router::{
11 shadow::{ShadowRouter, ShadowRouterHistory},
12 RouteAction, RouteRule, RouterTable,
13};
14use sans_io_runtime::{collections::DynamicDeque, return_if_err, return_if_none, return_if_some, TaskSwitcher, TaskSwitcherBranch, TaskSwitcherChild};
15
16use crate::{
17 base::{
18 Buffer, FeatureControlActor, FeatureWorkerContext, FeatureWorkerInput, FeatureWorkerOutput, NeighboursControl, NetOutgoingMeta, ServiceBuilder, ServiceControlActor, ServiceId,
19 ServiceWorkerCtx, ServiceWorkerInput, ServiceWorkerOutput, TransportMsg, TransportMsgHeader,
20 },
21 features::{Features, FeaturesControl, FeaturesEvent},
22 ExtIn, ExtOut, LogicControl, LogicEvent,
23};
24
25use self::{connection::DataPlaneConnection, features::FeatureWorkerManager, services::ServiceWorkerManager};
26
27mod connection;
28mod features;
29mod services;
30
31#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, PartialOrd, Ord)]
34pub struct NetPair {
35 pub local: SocketAddr,
36 pub remote: SocketAddr,
37}
38
39impl NetPair {
40 pub fn new(local: SocketAddr, remote: SocketAddr) -> Self {
41 Self { local, remote }
42 }
43
44 pub fn new_str(local: &str, remote: &str) -> Result<Self, AddrParseError> {
45 Ok(Self {
46 local: local.parse::<SocketAddr>()?,
47 remote: remote.parse::<SocketAddr>()?,
48 })
49 }
50}
51
52impl Display for NetPair {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.write_fmt(format_args!("[{}-{}]", self.local, self.remote))
55 }
56}
57
58#[derive(Debug)]
59pub enum NetInput {
60 UdpPacket(NetPair, Buffer),
61 #[cfg(feature = "vpn")]
62 TunPacket(Buffer),
63}
64
65#[derive(Debug, Clone)]
66pub enum CrossWorker<UserData, SE> {
67 Feature(UserData, FeaturesEvent),
68 Service(ServiceId, UserData, SE),
69}
70
71#[derive(Debug)]
72pub enum Input<UserData, SC, SE, TW> {
73 Ext(ExtIn<UserData, SC>),
74 Net(NetInput),
75 Event(LogicEvent<UserData, SE, TW>),
76 Worker(CrossWorker<UserData, SE>),
77}
78
79#[derive(Debug)]
80pub enum NetOutput {
81 UdpPacket(NetPair, Buffer),
82 UdpPackets(Vec<NetPair>, Buffer),
83 #[cfg(feature = "vpn")]
84 TunPacket(Buffer),
85}
86
87#[derive(convert_enum::From)]
88pub enum Output<UserData, SC, SE, TC> {
89 Ext(ExtOut<UserData, SE>),
90 Net(NetOutput),
91 Control(LogicControl<UserData, SC, SE, TC>),
92 #[convert_enum(optout)]
93 Worker(u16, CrossWorker<UserData, SE>),
94 #[convert_enum(optout)]
95 OnResourceEmpty,
96 #[convert_enum(optout)]
97 Continue,
98}
99
100#[derive(num_enum::TryFromPrimitive, num_enum::IntoPrimitive)]
101#[repr(usize)]
102enum TaskType {
103 Feature = 0,
104 Service = 1,
105}
106
107pub struct DataPlaneCfg<UserData, SC, SE, TC, TW> {
108 pub worker_id: u16,
109 #[allow(clippy::type_complexity)]
110 pub services: Vec<Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>>,
111 pub history: Arc<dyn ShadowRouterHistory>,
112}
113
114pub struct DataPlane<UserData, SC, SE, TC, TW> {
115 tick_count: u64,
116 worker_id: u16,
117 feature_ctx: FeatureWorkerContext,
118 service_ctx: ServiceWorkerCtx,
119 features: TaskSwitcherBranch<FeatureWorkerManager<UserData>, features::Output<UserData>>,
120 #[allow(clippy::type_complexity)]
121 services: TaskSwitcherBranch<ServiceWorkerManager<UserData, SC, SE, TC, TW>, services::Output<UserData, SC, SE, TC>>,
122 conns: HashMap<NetPair, DataPlaneConnection>,
123 conns_reverse: HashMap<ConnId, NetPair>,
124 queue: DynamicDeque<Output<UserData, SC, SE, TC>, 16>,
125 shutdown: bool,
126 switcher: TaskSwitcher,
127}
128
129impl<UserData, SC, SE, TC, TW> DataPlane<UserData, SC, SE, TC, TW>
130where
131 UserData: 'static + Copy + Eq + Hash + Debug,
132{
133 pub fn new(node_id: NodeId, cfg: DataPlaneCfg<UserData, SC, SE, TC, TW>) -> Self {
134 log::info!("Create DataPlane for node: {}", node_id);
135
136 Self {
137 worker_id: cfg.worker_id,
138 tick_count: 0,
139 feature_ctx: FeatureWorkerContext {
140 node_id,
141 router: ShadowRouter::new(node_id, cfg.history),
142 },
143 service_ctx: ServiceWorkerCtx { node_id },
144 features: TaskSwitcherBranch::new(FeatureWorkerManager::new(), TaskType::Feature),
145 services: TaskSwitcherBranch::new(ServiceWorkerManager::new(cfg.services), TaskType::Service),
146 conns: HashMap::new(),
147 conns_reverse: HashMap::new(),
148 queue: DynamicDeque::default(),
149 shutdown: false,
150 switcher: TaskSwitcher::new(2),
151 }
152 }
153
154 pub fn route(&self, rule: RouteRule, source: Option<NodeId>, relay_from: Option<NodeId>) -> RouteAction<NetPair> {
155 self.feature_ctx.router.derive_action(&rule, source, relay_from)
156 }
157
158 pub fn on_tick(&mut self, now_ms: u64) {
159 log::trace!("[DataPlane] on_tick: {}", now_ms);
160 self.features.input(&mut self.switcher).on_tick(&mut self.feature_ctx, now_ms, self.tick_count);
161 self.services.input(&mut self.switcher).on_tick(&self.service_ctx, now_ms, self.tick_count);
162 self.tick_count += 1;
163 }
164
165 pub fn on_event(&mut self, now_ms: u64, event: Input<UserData, SC, SE, TW>) {
166 match event {
167 Input::Ext(ext) => match ext {
168 ExtIn::FeaturesControl(userdata, control) => {
169 let feature: Features = control.to_feature();
170 let actor = FeatureControlActor::Worker(self.worker_id, userdata);
171 self.features
172 .input(&mut self.switcher)
173 .on_input(&mut self.feature_ctx, feature, now_ms, FeatureWorkerInput::Control(actor, control));
174 }
175 ExtIn::ServicesControl(service, userdata, control) => {
176 let actor = ServiceControlActor::Worker(self.worker_id, userdata);
177 self.services
178 .input(&mut self.switcher)
179 .on_input(&self.service_ctx, now_ms, service, ServiceWorkerInput::Control(actor, control));
180 }
181 },
182 Input::Worker(CrossWorker::Feature(userdata, event)) => self.queue.push_back(Output::Ext(ExtOut::FeaturesEvent(userdata, event))),
183 Input::Worker(CrossWorker::Service(service, userdata, event)) => self.queue.push_back(Output::Ext(ExtOut::ServicesEvent(service, userdata, event))),
184 Input::Net(NetInput::UdpPacket(pair, buf)) => {
185 if buf.is_empty() {
186 return;
187 }
188 if let Ok(control) = NeighboursControl::try_from(&*buf) {
189 self.queue.push_back(LogicControl::NetNeighbour(pair, control).into());
190 } else {
191 self.incoming_route(now_ms, pair, buf);
192 }
193 }
194 #[cfg(feature = "vpn")]
195 Input::Net(NetInput::TunPacket(pkt)) => {
196 self.features
197 .input(&mut self.switcher)
198 .on_input(&mut self.feature_ctx, Features::Vpn, now_ms, FeatureWorkerInput::TunPkt(pkt));
199 }
200 Input::Event(LogicEvent::Feature(is_broadcast, to)) => {
201 let feature = to.to_feature();
202 self.features
203 .input(&mut self.switcher)
204 .on_input(&mut self.feature_ctx, feature, now_ms, FeatureWorkerInput::FromController(is_broadcast, to));
205 }
206 Input::Event(LogicEvent::Service(service, to)) => {
207 self.services
208 .input(&mut self.switcher)
209 .on_input(&self.service_ctx, now_ms, service, ServiceWorkerInput::FromController(to));
210 }
211 Input::Event(LogicEvent::ExtFeaturesEvent(worker, userdata, event)) => {
212 assert_eq!(self.worker_id, worker);
213 self.queue.push_back(Output::Ext(ExtOut::FeaturesEvent(userdata, event)));
214 }
215 Input::Event(LogicEvent::ExtServicesEvent(worker, service, userdata, event)) => {
216 assert_eq!(self.worker_id, worker);
217 self.queue.push_back(Output::Ext(ExtOut::ServicesEvent(service, userdata, event)));
218 }
219 Input::Event(LogicEvent::NetNeighbour(pair, control)) => {
220 let buf: Result<Vec<u8>, ()> = (&control).try_into();
221 if let Ok(buf) = buf {
222 self.queue.push_back(NetOutput::UdpPacket(pair, buf.into()).into());
223 }
224 }
225 Input::Event(LogicEvent::NetDirect(feature, pair, _conn, meta, buf)) => {
226 let header = meta.to_header(feature as u8, RouteRule::Direct, self.feature_ctx.node_id);
227 let conn = return_if_none!(self.conns.get_mut(&pair));
228 let msg = TransportMsg::build_raw(header, buf);
229 if let Some(pkt) = Self::build_send_to_from_mut(now_ms, conn, pair, msg.take()) {
230 self.queue.push_back(pkt.into());
231 }
232 }
233 Input::Event(LogicEvent::NetRoute(feature, rule, meta, buf)) => self.outgoing_route(now_ms, feature, rule, meta, buf),
234 Input::Event(LogicEvent::Pin(conn, node, pair, secure)) => {
235 self.conns.insert(pair, DataPlaneConnection::new(node, conn, pair, secure));
236 self.conns_reverse.insert(conn, pair);
237 }
238 Input::Event(LogicEvent::UnPin(conn)) => {
239 if let Some(addr) = self.conns_reverse.remove(&conn) {
240 log::info!("UnPin: conn: {} <--> addr: {}", conn, addr);
241 self.conns.remove(&addr);
242 }
243 }
244 }
245 }
246
247 pub fn on_shutdown(&mut self, now_ms: u64) {
248 if self.shutdown {
249 return;
250 }
251 log::info!("[DataPlane] Shutdown");
252 self.features.input(&mut self.switcher).on_shutdown(&mut self.feature_ctx, now_ms);
253 self.services.input(&mut self.switcher).on_shutdown(&self.service_ctx, now_ms);
254 self.shutdown = true;
255 }
256
257 fn incoming_route(&mut self, now_ms: u64, pair: NetPair, mut buf: Buffer) {
258 let conn = return_if_none!(self.conns.get_mut(&pair));
259 if TransportMsgHeader::is_secure(buf[0]) {
260 return_if_none!(conn.decrypt_if_need(now_ms, &mut buf));
261 }
262 let header = return_if_err!(TransportMsgHeader::try_from(&buf as &[u8]));
263 let action = self.feature_ctx.router.derive_action(&header.route, header.from_node, Some(conn.node()));
264 log::debug!("[DataPlane] Incoming rule: {:?} from: {pair}, node {:?} => action {:?}", header.route, header.from_node, action);
265 match action {
266 RouteAction::Reject => {}
267 RouteAction::Local => {
268 let feature = return_if_none!(header.feature.try_into().ok());
269 log::debug!("Incoming message for feature: {feature:?} from: {pair}");
270 self.features
271 .input(&mut self.switcher)
272 .on_network_raw(&mut self.feature_ctx, feature, now_ms, conn.conn(), pair, header, buf);
273 }
274 RouteAction::Next(pair) => {
275 if !TransportMsgHeader::decrease_ttl(&mut buf) {
276 log::debug!("TTL is 0, drop packet");
277 }
278 let target_conn = return_if_none!(self.conns.get_mut(&pair));
279 if let Some(out) = Self::build_send_to_from_mut(now_ms, target_conn, pair, buf) {
280 self.queue.push_back(out.into());
281 }
282 }
283 RouteAction::Broadcast(local, pairs) => {
284 if !TransportMsgHeader::decrease_ttl(&mut buf) {
285 log::debug!("TTL is 0, drop packet");
286 return;
287 }
288 if local {
289 if let Ok(feature) = header.feature.try_into() {
290 log::debug!("Incoming broadcast feature: {feature:?} from: {pair}");
291 self.features
292 .input(&mut self.switcher)
293 .on_network_raw(&mut self.feature_ctx, feature, now_ms, conn.conn(), pair, header, buf.clone());
294 }
295 }
296 if !pairs.is_empty() {
297 log::debug!("Incoming broadcast from: {pair} forward to: {pairs:?}");
298 if let Some(out) = self.build_send_to_multi_from_mut(now_ms, pairs, buf) {
299 self.queue.push_back(out.into());
300 }
301 }
302 }
303 }
304 }
305
306 fn outgoing_route(&mut self, now_ms: u64, feature: Features, rule: RouteRule, mut meta: NetOutgoingMeta, buf: Buffer) {
307 match self.feature_ctx.router.derive_action(&rule, Some(self.feature_ctx.node_id), None) {
308 RouteAction::Reject => {
309 log::debug!("[DataPlane] outgoing route rule {:?} is rejected", rule);
310 }
311 RouteAction::Local => {
312 log::debug!("[DataPlane] outgoing route rule {:?} is processed locally", rule);
313 let meta = meta.to_incoming(self.feature_ctx.node_id);
314 self.features
315 .input(&mut self.switcher)
316 .on_input(&mut self.feature_ctx, feature, now_ms, FeatureWorkerInput::Local(meta, buf));
317 }
318 RouteAction::Next(remote) => {
319 log::debug!("[DataPlane] outgoing route rule {:?} is go with remote {remote}", rule);
320 let header = meta.to_header(feature as u8, rule, self.feature_ctx.node_id);
321 let msg = TransportMsg::build_raw(header, buf);
322 let conn = return_if_none!(self.conns.get_mut(&remote));
323 if let Some(out) = Self::build_send_to_from_mut(now_ms, conn, remote, msg.take()) {
324 self.queue.push_back(out.into());
325 }
326 }
327 RouteAction::Broadcast(local, remotes) => {
328 log::debug!("[DataPlane] outgoing route rule {:?} is go with local {local} and remotes {:?}", rule, remotes);
329 meta.source = true; let header = meta.to_header(feature as u8, rule, self.feature_ctx.node_id);
332 if local {
333 let meta = meta.to_incoming(self.feature_ctx.node_id);
334 self.features
335 .input(&mut self.switcher)
336 .on_input(&mut self.feature_ctx, feature, now_ms, FeatureWorkerInput::Local(meta, buf.clone()));
337 }
338 let msg = TransportMsg::build_raw(header, buf);
339 if let Some(out) = self.build_send_to_multi_from_mut(now_ms, remotes, msg.take()) {
340 self.queue.push_back(out.into());
341 }
342 }
343 }
344 }
345
346 fn pop_features(&mut self, now_ms: u64) {
347 let out = return_if_none!(self.features.pop_output(now_ms, &mut self.switcher));
348 let (feature, out) = match out {
349 features::Output::Output(feature, out) => (feature, out),
350 features::Output::OnResourceEmpty => {
351 log::info!("[DataPlane] Features OnResourceEmpty");
352 return;
353 }
354 };
355 match out {
356 FeatureWorkerOutput::ForwardControlToController(service, control) => self.queue.push_back(LogicControl::FeaturesControl(service, control).into()),
357 FeatureWorkerOutput::ForwardNetworkToController(conn, header, msg) => self.queue.push_back(LogicControl::NetRemote(feature, conn, header, msg).into()),
358 FeatureWorkerOutput::ForwardLocalToController(header, buf) => self.queue.push_back(LogicControl::NetLocal(feature, header, buf).into()),
359 FeatureWorkerOutput::ToController(control) => self.queue.push_back(LogicControl::Feature(control).into()),
360 FeatureWorkerOutput::Event(actor, event) => match actor {
361 FeatureControlActor::Controller(userdata) => self.queue.push_back(Output::Control(LogicControl::ExtFeaturesEvent(userdata, event))),
362 FeatureControlActor::Worker(worker, userdata) => {
363 if self.worker_id == worker {
364 self.queue.push_back(Output::Ext(ExtOut::FeaturesEvent(userdata, event)));
365 } else {
366 self.queue.push_back(Output::Worker(worker, CrossWorker::Feature(userdata, event)));
367 }
368 }
369 FeatureControlActor::Service(service) => {
370 self.services
371 .input(&mut self.switcher)
372 .on_input(&self.service_ctx, now_ms, service, ServiceWorkerInput::FeatureEvent(event));
373 }
374 },
375 FeatureWorkerOutput::SendDirect(conn, meta, buf) => {
376 if let Some(addr) = self.conns_reverse.get(&conn) {
377 let conn = self.conns.get_mut(addr).expect("Should have");
378 let header = meta.to_header(feature as u8, RouteRule::Direct, self.feature_ctx.node_id);
379 let msg = TransportMsg::build_raw(header, buf);
380 self.queue.push_back(Self::build_send_to_from_mut(now_ms, conn, *addr, msg.take()).expect("Should have output").into())
381 }
382 }
383 FeatureWorkerOutput::SendRoute(rule, ttl, buf) => {
384 log::info!("SendRoute: {:?}", rule);
385 self.outgoing_route(now_ms, feature, rule, ttl, buf);
386 }
387 FeatureWorkerOutput::RawDirect(conn, buf) => {
388 if let Some(pair) = self.conns_reverse.get(&conn) {
389 let conn = self.conns.get_mut(pair).expect("Should have conn");
390 self.queue.push_back(Self::build_send_to(now_ms, conn, *pair, buf).expect("Should ok for convert RawDirect").into());
391 }
392 }
393 FeatureWorkerOutput::RawBroadcast(conns, buf) => {
394 let addrs = conns.iter().filter_map(|conn| self.conns_reverse.get(conn)).cloned().collect();
395 let out = self.build_send_to_multi(now_ms, addrs, buf).map(|e| e.into()).unwrap_or(Output::Continue);
396 self.queue.push_back(out);
397 }
398 FeatureWorkerOutput::RawDirect2(pair, buf) => {
399 if let Some(conn) = self.conns.get_mut(&pair) {
400 self.queue.push_back(Self::build_send_to(now_ms, conn, pair, buf).expect("Should ok for convert RawDirect2").into());
401 }
402 }
403 FeatureWorkerOutput::RawBroadcast2(pairs, buf) => {
404 let out = self.build_send_to_multi(now_ms, pairs, buf).map(|e| e.into()).unwrap_or(Output::Continue);
405 self.queue.push_back(out);
406 }
407 #[cfg(feature = "vpn")]
408 FeatureWorkerOutput::TunPkt(pkt) => self.queue.push_back(NetOutput::TunPacket(pkt).into()),
409 FeatureWorkerOutput::OnResourceEmpty => {
410 log::info!("[DataPlane] Feature {feature:?} OnResourceEmpty");
411 }
412 }
413 }
414
415 fn pop_services(&mut self, now_ms: u64) {
416 let out = return_if_none!(self.services.pop_output(now_ms, &mut self.switcher));
417 let (service, out) = match out {
418 services::Output::Output(service, out) => (service, out),
419 services::Output::OnResourceEmpty => {
420 log::info!("[DataPlane] Services OnResourceEmpty");
421 return;
422 }
423 };
424 match out {
425 ServiceWorkerOutput::ForwardControlToController(actor, control) => self.queue.push_back(LogicControl::ServicesControl(actor, service, control).into()),
426 ServiceWorkerOutput::ForwardFeatureEventToController(event) => self.queue.push_back(LogicControl::ServiceEvent(service, event).into()),
427 ServiceWorkerOutput::ToController(tc) => self.queue.push_back(LogicControl::Service(service, tc).into()),
428 ServiceWorkerOutput::FeatureControl(control) => {
429 let feature = control.to_feature();
430 self.features
431 .input(&mut self.switcher)
432 .on_input(&mut self.feature_ctx, feature, now_ms, FeatureWorkerInput::Control(FeatureControlActor::Service(service), control));
433 }
434 ServiceWorkerOutput::Event(actor, event) => match actor {
435 ServiceControlActor::Controller(userdata) => self.queue.push_back(Output::Control(LogicControl::ExtServicesEvent(service, userdata, event))),
436 ServiceControlActor::Worker(worker, userdata) => {
437 if self.worker_id == worker {
438 self.queue.push_back(Output::Ext(ExtOut::ServicesEvent(service, userdata, event)));
439 } else {
440 self.queue.push_back(Output::Worker(worker, CrossWorker::Service(service, userdata, event)));
441 }
442 }
443 },
444 ServiceWorkerOutput::OnResourceEmpty => {
445 log::info!("[DataPlane] Service {service} OnResourceEmpty");
446 }
447 }
448 }
449
450 fn build_send_to_from_mut(now: u64, conn: &mut DataPlaneConnection, pair: NetPair, mut buf: Buffer) -> Option<NetOutput> {
451 conn.encrypt_if_need(now, &mut buf)?;
452 Some(NetOutput::UdpPacket(pair, buf))
453 }
454
455 fn build_send_to_multi_from_mut(&mut self, now: u64, mut pairs: Vec<NetPair>, mut buf: Buffer) -> Option<NetOutput> {
456 if TransportMsgHeader::is_secure(buf[0]) {
457 let first = pairs.pop()?;
458 for pair in pairs {
459 if let Some(conn) = self.conns.get_mut(&pair) {
460 let mut buf = Buffer::build(&buf, 0, 12 + 16);
461 if conn.encrypt_if_need(now, &mut buf).is_some() {
462 let out = NetOutput::UdpPacket(pair, buf);
463 self.queue.push_back(Output::Net(out));
464 }
465 }
466 }
467 let conn = self.conns.get_mut(&first)?;
468 conn.encrypt_if_need(now, &mut buf)?;
469 Some(NetOutput::UdpPacket(first, buf))
470 } else {
471 Some(NetOutput::UdpPackets(pairs, buf))
472 }
473 }
474
475 fn build_send_to_multi(&mut self, now: u64, pairs: Vec<NetPair>, buf: Buffer) -> Option<NetOutput> {
476 if TransportMsgHeader::is_secure(buf[0]) {
477 let buf = Buffer::build(&buf, 0, 12 + 16);
478 self.build_send_to_multi_from_mut(now, pairs, buf)
479 } else {
480 Some(NetOutput::UdpPackets(pairs, buf))
481 }
482 }
483
484 fn build_send_to(now: u64, conn: &mut DataPlaneConnection, pair: NetPair, buf: Buffer) -> Option<NetOutput> {
485 if TransportMsgHeader::is_secure(buf[0]) {
486 let buf = Buffer::build(&buf, 0, 12 + 16);
487 Self::build_send_to_from_mut(now, conn, pair, buf)
488 } else {
489 Some(NetOutput::UdpPacket(pair, buf))
490 }
491 }
492}
493
494impl<UserData, SC, SE, TC, TW> TaskSwitcherChild<Output<UserData, SC, SE, TC>> for DataPlane<UserData, SC, SE, TC, TW>
495where
496 UserData: 'static + Copy + Eq + Hash + Debug,
497{
498 type Time = u64;
499
500 fn empty_event(&self) -> Output<UserData, SC, SE, TC> {
501 Output::OnResourceEmpty
502 }
503
504 fn is_empty(&self) -> bool {
505 self.shutdown && self.queue.is_empty() && self.features.is_empty() && self.services.is_empty()
506 }
507
508 fn pop_output(&mut self, now: u64) -> Option<Output<UserData, SC, SE, TC>> {
509 return_if_some!(self.queue.pop_front());
510
511 while let Some(current) = self.switcher.current() {
512 match current.try_into().ok()? {
513 TaskType::Feature => self.pop_features(now),
514 TaskType::Service => self.pop_services(now),
515 }
516
517 return_if_some!(self.queue.pop_front());
518 }
519
520 None
521 }
522}