atm0s_sdn/
builder.rs

1use std::{
2    fmt::Debug,
3    hash::Hash,
4    marker::PhantomData,
5    net::{IpAddr, SocketAddr},
6    sync::Arc,
7    time::Duration,
8};
9
10use atm0s_sdn_identity::{NodeAddr, NodeAddrBuilder, NodeId, Protocol};
11use atm0s_sdn_network::{
12    base::{Authorization, HandshakeBuilder, ServiceBuilder},
13    features::{FeaturesControl, FeaturesEvent},
14    secure::{HandshakeBuilderXDA, StaticKeyAuthorization},
15    services::{
16        manual2_discovery::{self, AdvertiseTarget},
17        manual_discovery, visualization,
18    },
19};
20use rand::{thread_rng, RngCore};
21use sans_io_runtime::backend::Backend;
22use serde::{de::DeserializeOwned, Serialize};
23
24use crate::{
25    history::DataWorkerHistory,
26    worker_inner::{ControllerCfg, SdnController, SdnInnerCfg, SdnOwner, SdnWorkerInner},
27};
28
29pub struct SdnBuilder<UserData, SC, SE, TC, TW, NodeInfo> {
30    auth: Option<Arc<dyn Authorization>>,
31    handshake: Option<Arc<dyn HandshakeBuilder>>,
32    node_addr: NodeAddr,
33    node_id: NodeId,
34    session: u64,
35    bind_addrs: Vec<SocketAddr>,
36    tick_ms: u64,
37    visualization_collector: bool,
38    #[allow(clippy::type_complexity)]
39    services: Vec<Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>>,
40    #[cfg(feature = "vpn")]
41    vpn_enable: bool,
42    #[cfg(feature = "vpn")]
43    vpn_ip: Option<(u8, u8, u8, u8)>,
44    #[cfg(feature = "vpn")]
45    vpn_netmask: Option<(u8, u8, u8, u8)>,
46    _tmp: PhantomData<NodeInfo>,
47}
48
49impl<UserData, SC, SE, TC: Debug, TW: Debug, NodeInfo> SdnBuilder<UserData, SC, SE, TC, TW, NodeInfo>
50where
51    UserData: 'static + Clone + Debug + Send + Sync + Copy + Eq + Hash,
52    NodeInfo: 'static + Clone + Debug + Send + Sync + Serialize + DeserializeOwned,
53    SC: 'static + Clone + Debug + Send + Sync + From<visualization::Control<NodeInfo>> + TryInto<visualization::Control<NodeInfo>>,
54    SE: 'static + Clone + Debug + Send + Sync + From<visualization::Event<NodeInfo>> + TryInto<visualization::Event<NodeInfo>>,
55    TC: 'static + Clone + Send + Sync,
56    TW: 'static + Clone + Send + Sync,
57{
58    pub fn new(node_id: NodeId, bind_addrs: &[SocketAddr], custom_ip: Vec<SocketAddr>) -> Self {
59        let node_addr = generate_node_addr(node_id, bind_addrs, custom_ip);
60        log::info!("Created node on addr {}", node_addr);
61
62        Self {
63            auth: None,
64            handshake: None,
65            node_addr,
66            node_id,
67            tick_ms: 1000,
68            session: thread_rng().next_u64(),
69            bind_addrs: bind_addrs.to_vec(),
70            visualization_collector: false,
71            services: vec![],
72            #[cfg(feature = "vpn")]
73            vpn_enable: false,
74            #[cfg(feature = "vpn")]
75            vpn_ip: None,
76            #[cfg(feature = "vpn")]
77            vpn_netmask: None,
78            _tmp: PhantomData,
79        }
80    }
81
82    pub fn node_addr(&self) -> NodeAddr {
83        self.node_addr.clone()
84    }
85
86    /// Setting authorization
87    pub fn set_authorization<A: Authorization + 'static>(&mut self, auth: A) {
88        self.auth = Some(Arc::new(auth));
89    }
90
91    /// Setting handshake
92    pub fn set_handshake<H: HandshakeBuilder + 'static>(&mut self, handshake: H) {
93        self.handshake = Some(Arc::new(handshake));
94    }
95
96    /// Setting visualization collector mode
97    pub fn set_visualization_collector(&mut self, value: bool) {
98        self.visualization_collector = value;
99    }
100
101    /// Setting manual discovery
102    pub fn set_manual_discovery(&mut self, local_tags: Vec<String>, connect_tags: Vec<String>) {
103        self.add_service(Arc::new(manual_discovery::ManualDiscoveryServiceBuilder::new(self.node_addr.clone(), local_tags, connect_tags)));
104    }
105
106    /// Setting manual2 discovery
107    pub fn set_manual2_discovery(&mut self, targets: Vec<AdvertiseTarget>, interval: u64) {
108        self.add_service(Arc::new(manual2_discovery::Manual2DiscoveryServiceBuilder::new(self.node_addr.clone(), targets, interval)));
109    }
110
111    /// panic if the service already exists
112    pub fn add_service(&mut self, service: Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>) {
113        for s in self.services.iter() {
114            assert_ne!(s.service_id(), service.service_id(), "Service ({}, {}) already exists", service.service_id(), service.service_name());
115        }
116        self.services.push(service);
117    }
118
119    #[cfg(feature = "vpn")]
120    pub fn enable_vpn(&mut self) {
121        self.vpn_enable = true;
122    }
123
124    #[cfg(feature = "vpn")]
125    pub fn set_vpn_ip(&mut self, ip: (u8, u8, u8, u8)) {
126        self.vpn_ip = Some(ip);
127    }
128
129    #[cfg(feature = "vpn")]
130    pub fn set_vpn_netmask(&mut self, netmask: (u8, u8, u8, u8)) {
131        self.vpn_netmask = Some(netmask);
132    }
133
134    pub fn build<B: Backend<SdnOwner>>(mut self, workers: usize, info: NodeInfo) -> SdnController<UserData, SC, SE, TC, TW> {
135        assert!(workers > 0);
136        #[cfg(feature = "vpn")]
137        let (tun_device, mut queue_fds) = {
138            if self.vpn_enable {
139                let vpn_ip = self.vpn_ip.unwrap_or((10, 33, 33, self.node_id as u8));
140                let vpn_netmask = self.vpn_netmask.unwrap_or((255, 255, 255, 0));
141                let mut tun_device = sans_io_runtime::backend::tun::create_tun(&format!("utun{}", self.node_id as u8), vpn_ip, vpn_netmask, 1400, workers);
142                let mut queue_fds = std::collections::VecDeque::with_capacity(workers);
143                for i in 0..workers {
144                    queue_fds.push_back(tun_device.get_queue_fd(i).expect("Should have tun queue fd"));
145                }
146                (Some(tun_device), queue_fds)
147            } else {
148                (None, std::collections::VecDeque::new())
149            }
150        };
151
152        self.add_service(Arc::new(visualization::VisualizationServiceBuilder::<UserData, SC, SE, TC, TW, NodeInfo>::new(
153            info,
154            self.visualization_collector,
155        )));
156
157        let history = Arc::new(DataWorkerHistory::default());
158
159        let mut controller = SdnController::default();
160        controller.add_worker::<SdnOwner, _, SdnWorkerInner<UserData, SC, SE, TC, TW>, B>(
161            Duration::from_millis(1000),
162            SdnInnerCfg {
163                node_id: self.node_id,
164                tick_ms: self.tick_ms,
165                bind_addrs: self.bind_addrs.to_vec(),
166                services: self.services.clone(),
167                history: history.clone(),
168                controller: Some(ControllerCfg {
169                    session: self.session,
170                    auth: self.auth.unwrap_or_else(|| Arc::new(StaticKeyAuthorization::new("unsecure"))),
171                    handshake: self.handshake.unwrap_or_else(|| Arc::new(HandshakeBuilderXDA)),
172                    #[cfg(feature = "vpn")]
173                    vpn_tun_device: tun_device,
174                }),
175                #[cfg(feature = "vpn")]
176                vpn_tun_fd: queue_fds.pop_front(),
177            },
178            None,
179        );
180
181        for _ in 1..workers {
182            controller.add_worker::<SdnOwner, _, SdnWorkerInner<UserData, SC, SE, TC, TW>, B>(
183                Duration::from_millis(1000),
184                SdnInnerCfg {
185                    node_id: self.node_id,
186                    tick_ms: self.tick_ms,
187                    bind_addrs: self.bind_addrs.to_vec(),
188                    services: self.services.clone(),
189                    history: history.clone(),
190                    controller: None,
191                    #[cfg(feature = "vpn")]
192                    vpn_tun_fd: queue_fds.pop_front(),
193                },
194                None,
195            );
196        }
197        controller
198    }
199}
200
201pub fn generate_node_addr(node_id: u32, bind_addrs: &[SocketAddr], custom_ips: Vec<SocketAddr>) -> NodeAddr {
202    let mut addr_builder = NodeAddrBuilder::new(node_id);
203    for bind_addr in bind_addrs {
204        match bind_addr.ip() {
205            IpAddr::V4(ip) => {
206                log::info!("Added ipv4 {}", ip);
207                addr_builder.add_protocol(Protocol::Ip4(ip));
208                addr_builder.add_protocol(Protocol::Udp(bind_addr.port()));
209            }
210            IpAddr::V6(ip) => {
211                log::info!("Added ipv6 {}", ip);
212                addr_builder.add_protocol(Protocol::Ip6(ip));
213                addr_builder.add_protocol(Protocol::Udp(bind_addr.port()));
214            }
215        }
216    }
217    for ip in custom_ips {
218        match ip {
219            SocketAddr::V4(ip) => {
220                log::info!("Added custom ipv4:\t{:?}", ip);
221                addr_builder.add_protocol(Protocol::Ip4(*ip.ip()));
222                addr_builder.add_protocol(Protocol::Udp(ip.port()));
223            }
224            SocketAddr::V6(ip) => {
225                log::info!("Added custom ipv6:\t{:?}", ip);
226                addr_builder.add_protocol(Protocol::Ip6(*ip.ip()));
227                addr_builder.add_protocol(Protocol::Udp(ip.port()));
228            }
229        }
230    }
231
232    addr_builder.addr()
233}