1use std::{
2 fmt::Debug,
3 hash::Hash,
4 marker::PhantomData,
5 net::{IpAddr, SocketAddr},
6 sync::Arc,
7 time::Duration,
8};
9
10use atm0s_sdn_identity::{NodeAddr, NodeAddrBuilder, NodeId, Protocol};
11use atm0s_sdn_network::{
12 base::{Authorization, HandshakeBuilder, ServiceBuilder},
13 features::{FeaturesControl, FeaturesEvent},
14 secure::{HandshakeBuilderXDA, StaticKeyAuthorization},
15 services::{
16 manual2_discovery::{self, AdvertiseTarget},
17 manual_discovery, visualization,
18 },
19};
20use rand::{thread_rng, RngCore};
21use sans_io_runtime::backend::Backend;
22use serde::{de::DeserializeOwned, Serialize};
23
24use crate::{
25 history::DataWorkerHistory,
26 worker_inner::{ControllerCfg, SdnController, SdnInnerCfg, SdnOwner, SdnWorkerInner},
27};
28
29pub struct SdnBuilder<UserData, SC, SE, TC, TW, NodeInfo> {
30 auth: Option<Arc<dyn Authorization>>,
31 handshake: Option<Arc<dyn HandshakeBuilder>>,
32 node_addr: NodeAddr,
33 node_id: NodeId,
34 session: u64,
35 bind_addrs: Vec<SocketAddr>,
36 tick_ms: u64,
37 visualization_collector: bool,
38 #[allow(clippy::type_complexity)]
39 services: Vec<Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>>,
40 #[cfg(feature = "vpn")]
41 vpn_enable: bool,
42 #[cfg(feature = "vpn")]
43 vpn_ip: Option<(u8, u8, u8, u8)>,
44 #[cfg(feature = "vpn")]
45 vpn_netmask: Option<(u8, u8, u8, u8)>,
46 _tmp: PhantomData<NodeInfo>,
47}
48
49impl<UserData, SC, SE, TC: Debug, TW: Debug, NodeInfo> SdnBuilder<UserData, SC, SE, TC, TW, NodeInfo>
50where
51 UserData: 'static + Clone + Debug + Send + Sync + Copy + Eq + Hash,
52 NodeInfo: 'static + Clone + Debug + Send + Sync + Serialize + DeserializeOwned,
53 SC: 'static + Clone + Debug + Send + Sync + From<visualization::Control<NodeInfo>> + TryInto<visualization::Control<NodeInfo>>,
54 SE: 'static + Clone + Debug + Send + Sync + From<visualization::Event<NodeInfo>> + TryInto<visualization::Event<NodeInfo>>,
55 TC: 'static + Clone + Send + Sync,
56 TW: 'static + Clone + Send + Sync,
57{
58 pub fn new(node_id: NodeId, bind_addrs: &[SocketAddr], custom_ip: Vec<SocketAddr>) -> Self {
59 let node_addr = generate_node_addr(node_id, bind_addrs, custom_ip);
60 log::info!("Created node on addr {}", node_addr);
61
62 Self {
63 auth: None,
64 handshake: None,
65 node_addr,
66 node_id,
67 tick_ms: 1000,
68 session: thread_rng().next_u64(),
69 bind_addrs: bind_addrs.to_vec(),
70 visualization_collector: false,
71 services: vec![],
72 #[cfg(feature = "vpn")]
73 vpn_enable: false,
74 #[cfg(feature = "vpn")]
75 vpn_ip: None,
76 #[cfg(feature = "vpn")]
77 vpn_netmask: None,
78 _tmp: PhantomData,
79 }
80 }
81
82 pub fn node_addr(&self) -> NodeAddr {
83 self.node_addr.clone()
84 }
85
86 pub fn set_authorization<A: Authorization + 'static>(&mut self, auth: A) {
88 self.auth = Some(Arc::new(auth));
89 }
90
91 pub fn set_handshake<H: HandshakeBuilder + 'static>(&mut self, handshake: H) {
93 self.handshake = Some(Arc::new(handshake));
94 }
95
96 pub fn set_visualization_collector(&mut self, value: bool) {
98 self.visualization_collector = value;
99 }
100
101 pub fn set_manual_discovery(&mut self, local_tags: Vec<String>, connect_tags: Vec<String>) {
103 self.add_service(Arc::new(manual_discovery::ManualDiscoveryServiceBuilder::new(self.node_addr.clone(), local_tags, connect_tags)));
104 }
105
106 pub fn set_manual2_discovery(&mut self, targets: Vec<AdvertiseTarget>, interval: u64) {
108 self.add_service(Arc::new(manual2_discovery::Manual2DiscoveryServiceBuilder::new(self.node_addr.clone(), targets, interval)));
109 }
110
111 pub fn add_service(&mut self, service: Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>) {
113 for s in self.services.iter() {
114 assert_ne!(s.service_id(), service.service_id(), "Service ({}, {}) already exists", service.service_id(), service.service_name());
115 }
116 self.services.push(service);
117 }
118
119 #[cfg(feature = "vpn")]
120 pub fn enable_vpn(&mut self) {
121 self.vpn_enable = true;
122 }
123
124 #[cfg(feature = "vpn")]
125 pub fn set_vpn_ip(&mut self, ip: (u8, u8, u8, u8)) {
126 self.vpn_ip = Some(ip);
127 }
128
129 #[cfg(feature = "vpn")]
130 pub fn set_vpn_netmask(&mut self, netmask: (u8, u8, u8, u8)) {
131 self.vpn_netmask = Some(netmask);
132 }
133
134 pub fn build<B: Backend<SdnOwner>>(mut self, workers: usize, info: NodeInfo) -> SdnController<UserData, SC, SE, TC, TW> {
135 assert!(workers > 0);
136 #[cfg(feature = "vpn")]
137 let (tun_device, mut queue_fds) = {
138 if self.vpn_enable {
139 let vpn_ip = self.vpn_ip.unwrap_or((10, 33, 33, self.node_id as u8));
140 let vpn_netmask = self.vpn_netmask.unwrap_or((255, 255, 255, 0));
141 let mut tun_device = sans_io_runtime::backend::tun::create_tun(&format!("utun{}", self.node_id as u8), vpn_ip, vpn_netmask, 1400, workers);
142 let mut queue_fds = std::collections::VecDeque::with_capacity(workers);
143 for i in 0..workers {
144 queue_fds.push_back(tun_device.get_queue_fd(i).expect("Should have tun queue fd"));
145 }
146 (Some(tun_device), queue_fds)
147 } else {
148 (None, std::collections::VecDeque::new())
149 }
150 };
151
152 self.add_service(Arc::new(visualization::VisualizationServiceBuilder::<UserData, SC, SE, TC, TW, NodeInfo>::new(
153 info,
154 self.visualization_collector,
155 )));
156
157 let history = Arc::new(DataWorkerHistory::default());
158
159 let mut controller = SdnController::default();
160 controller.add_worker::<SdnOwner, _, SdnWorkerInner<UserData, SC, SE, TC, TW>, B>(
161 Duration::from_millis(1000),
162 SdnInnerCfg {
163 node_id: self.node_id,
164 tick_ms: self.tick_ms,
165 bind_addrs: self.bind_addrs.to_vec(),
166 services: self.services.clone(),
167 history: history.clone(),
168 controller: Some(ControllerCfg {
169 session: self.session,
170 auth: self.auth.unwrap_or_else(|| Arc::new(StaticKeyAuthorization::new("unsecure"))),
171 handshake: self.handshake.unwrap_or_else(|| Arc::new(HandshakeBuilderXDA)),
172 #[cfg(feature = "vpn")]
173 vpn_tun_device: tun_device,
174 }),
175 #[cfg(feature = "vpn")]
176 vpn_tun_fd: queue_fds.pop_front(),
177 },
178 None,
179 );
180
181 for _ in 1..workers {
182 controller.add_worker::<SdnOwner, _, SdnWorkerInner<UserData, SC, SE, TC, TW>, B>(
183 Duration::from_millis(1000),
184 SdnInnerCfg {
185 node_id: self.node_id,
186 tick_ms: self.tick_ms,
187 bind_addrs: self.bind_addrs.to_vec(),
188 services: self.services.clone(),
189 history: history.clone(),
190 controller: None,
191 #[cfg(feature = "vpn")]
192 vpn_tun_fd: queue_fds.pop_front(),
193 },
194 None,
195 );
196 }
197 controller
198 }
199}
200
201pub fn generate_node_addr(node_id: u32, bind_addrs: &[SocketAddr], custom_ips: Vec<SocketAddr>) -> NodeAddr {
202 let mut addr_builder = NodeAddrBuilder::new(node_id);
203 for bind_addr in bind_addrs {
204 match bind_addr.ip() {
205 IpAddr::V4(ip) => {
206 log::info!("Added ipv4 {}", ip);
207 addr_builder.add_protocol(Protocol::Ip4(ip));
208 addr_builder.add_protocol(Protocol::Udp(bind_addr.port()));
209 }
210 IpAddr::V6(ip) => {
211 log::info!("Added ipv6 {}", ip);
212 addr_builder.add_protocol(Protocol::Ip6(ip));
213 addr_builder.add_protocol(Protocol::Udp(bind_addr.port()));
214 }
215 }
216 }
217 for ip in custom_ips {
218 match ip {
219 SocketAddr::V4(ip) => {
220 log::info!("Added custom ipv4:\t{:?}", ip);
221 addr_builder.add_protocol(Protocol::Ip4(*ip.ip()));
222 addr_builder.add_protocol(Protocol::Udp(ip.port()));
223 }
224 SocketAddr::V6(ip) => {
225 log::info!("Added custom ipv6:\t{:?}", ip);
226 addr_builder.add_protocol(Protocol::Ip6(*ip.ip()));
227 addr_builder.add_protocol(Protocol::Udp(ip.port()));
228 }
229 }
230 }
231
232 addr_builder.addr()
233}