1use std::{
2 collections::HashMap,
3 net::{SocketAddr, ToSocketAddrs},
4 sync::{
5 atomic::{AtomicU16, AtomicU8},
6 Arc,
7 },
8 time::Duration,
9};
10
11use arc_swap::ArcSwap;
12use futures::{future::Either, never::Never, stream::FuturesUnordered, FutureExt, StreamExt};
13use opcua_core::{sync::RwLock, trace_read_lock, trace_write_lock};
14use opcua_nodes::DefaultTypeTree;
15use tokio::{
16 net::TcpListener,
17 pin,
18 sync::Notify,
19 task::{JoinError, JoinHandle},
20};
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, warn};
23
24use opcua_core::{config::Config, handle::AtomicHandle};
25use opcua_crypto::CertificateStore;
26
27use crate::{
28 diagnostics::ServerDiagnostics,
29 node_manager::{DefaultTypeTreeGetter, ServerContext},
30 reverse_connect::{self, ReverseConnectionManager},
31 session::controller::{ControllerCommand, SessionStarter},
32 transport::{
33 tcp::{TcpConnector, TransportConfig},
34 ReverseTcpConnector,
35 },
36 ServerStatusWrapper,
37};
38use opcua_types::{DateTime, LocalizedText, ServerState, UAString};
39
40use super::{
41 authenticator::DefaultAuthenticator,
42 builder::ServerBuilder,
43 config::ServerConfig,
44 info::ServerInfo,
45 node_manager::{NodeManagers, NodeManagersRef},
46 server_handle::ServerHandle,
47 session::manager::SessionManager,
48 subscriptions::SubscriptionCache,
49 ServerCapabilities,
50};
51
52struct ConnectionInfo {
53 command_send: tokio::sync::mpsc::Sender<ControllerCommand>,
54}
55
56pub struct Server {
59 certificate_store: Arc<RwLock<CertificateStore>>,
61 session_manager: Arc<RwLock<SessionManager>>,
63 connections: FuturesUnordered<JoinHandle<u32>>,
65 connection_map: HashMap<u32, ConnectionInfo>,
67 config: Arc<ServerConfig>,
69 info: Arc<ServerInfo>,
71 subscriptions: Arc<SubscriptionCache>,
73 node_managers: NodeManagers,
75 token: CancellationToken,
77 session_notify: Arc<Notify>,
79 status: Arc<ServerStatusWrapper>,
81 reverse_connect_manager: ReverseConnectionManager,
84}
85
86impl Server {
87 pub(crate) fn new_from_builder(builder: ServerBuilder) -> Result<(Self, ServerHandle), String> {
88 if let Err(e) = builder.config.validate() {
89 return Err(format!(
90 "Builder configuration is invalid: {}",
91 e.join(", ")
92 ));
93 }
94
95 let mut config = builder.config;
96
97 let application_name = config.application_name.clone();
98 let application_uri = UAString::from(&config.application_uri);
99 let product_uri = UAString::from(&config.product_uri);
100 let servers = vec![config.application_uri.clone()];
101 let send_buffer_size = config.limits.send_buffer_size;
108 let receive_buffer_size = config.limits.receive_buffer_size;
109
110 let application_description = if config.create_sample_keypair {
111 Some(config.application_description())
112 } else {
113 None
114 };
115
116 let (mut certificate_store, server_certificate, server_pkey) =
117 CertificateStore::new_with_x509_data(
118 &config.pki_dir,
119 false,
120 config.certificate_path.as_deref(),
121 config.private_key_path.as_deref(),
122 application_description,
123 );
124
125 if server_certificate.is_none() || server_pkey.is_none() {
126 warn!("Server is missing its application instance certificate and/or its private key. Encrypted endpoints will not function correctly.");
127 }
128
129 config.read_x509_thumbprints();
130
131 if config.certificate_validation.trust_client_certs {
132 info!("Server has chosen to auto trust client certificates. You do not want to do this in production code.");
133 certificate_store.set_trust_unknown_certs(true);
134 }
135 certificate_store.set_check_time(config.certificate_validation.check_time);
136
137 let config = Arc::new(config);
138
139 let service_level = Arc::new(AtomicU8::new(255));
140
141 let type_tree = Arc::new(RwLock::new(DefaultTypeTree::new()));
142
143 let info = ServerInfo {
144 authenticator: builder
145 .authenticator
146 .unwrap_or_else(|| Arc::new(DefaultAuthenticator::new(config.user_tokens.clone()))),
147 application_uri,
148 product_uri,
149 application_name: LocalizedText {
150 locale: UAString::null(),
151 text: UAString::from(application_name),
152 },
153 start_time: ArcSwap::new(Arc::new(opcua_types::DateTime::now())),
154 servers,
155 config: config.clone(),
156 server_certificate,
157 server_pkey,
158 operational_limits: config.limits.operational.clone(),
159 state: ArcSwap::new(Arc::new(ServerState::Shutdown)),
160 send_buffer_size,
161 receive_buffer_size,
162 type_tree: type_tree.clone(),
163 subscription_id_handle: AtomicHandle::new(1),
164 monitored_item_id_handle: AtomicHandle::new(1),
165 secure_channel_id_handle: Arc::new(AtomicHandle::new(1)),
166 capabilities: ServerCapabilities::default(),
167 service_level: service_level.clone(),
168 port: AtomicU16::new(0),
169 type_tree_getter: builder
170 .type_tree_getter
171 .unwrap_or_else(|| Arc::new(DefaultTypeTreeGetter)),
172 type_loaders: RwLock::new(builder.type_loaders),
173 diagnostics: ServerDiagnostics {
174 enabled: config.diagnostics,
175 ..Default::default()
176 },
177 };
178
179 let certificate_store = Arc::new(RwLock::new(certificate_store));
180
181 let info = Arc::new(info);
182 let subscriptions = Arc::new(SubscriptionCache::new(config.limits.subscriptions));
183
184 let node_managers_ref = NodeManagersRef::new_empty();
185 let status_wrapper = Arc::new(ServerStatusWrapper::new(
186 builder.build_info,
187 subscriptions.clone(),
188 ));
189 let context = ServerContext {
190 node_managers: node_managers_ref.clone(),
191 subscriptions: subscriptions.clone(),
192 info: info.clone(),
193 authenticator: info.authenticator.clone(),
194 type_tree: type_tree.clone(),
195 type_tree_getter: info.type_tree_getter.clone(),
196 status: status_wrapper.clone(),
197 };
198
199 let mut final_node_managers = Vec::new();
200 for nm_builder in builder.node_managers {
201 final_node_managers.push(nm_builder.build(context.clone()));
202 }
203
204 let node_managers = NodeManagers::new(final_node_managers);
205 node_managers_ref.init_from_node_managers(node_managers.clone());
206
207 let session_notify = Arc::new(Notify::new());
208 let session_manager = Arc::new(RwLock::new(SessionManager::new(
209 info.clone(),
210 session_notify.clone(),
211 )));
212
213 let (reverse_connect_manager, reverse_connect_handle) =
214 reverse_connect::ReverseConnectionManager::new(Duration::from_millis(
215 config.reverse_connect_failure_delay_ms,
216 ));
217
218 let handle = ServerHandle::new(
219 info.clone(),
220 service_level,
221 subscriptions.clone(),
222 node_managers.clone(),
223 session_manager.clone(),
224 type_tree.clone(),
225 status_wrapper.clone(),
226 builder.token.clone(),
227 reverse_connect_handle,
228 );
229 Ok((
230 Self {
231 certificate_store,
232 session_manager,
233 connections: FuturesUnordered::new(),
234 connection_map: HashMap::new(),
235 subscriptions,
236 config,
237 info,
238 node_managers,
239 token: builder.token,
240 session_notify,
241 status: status_wrapper.clone(),
242 reverse_connect_manager,
243 },
244 handle,
245 ))
246 }
247
248 pub fn subscriptions(&self) -> Arc<SubscriptionCache> {
250 self.subscriptions.clone()
251 }
252
253 #[allow(clippy::await_holding_lock)]
254 async fn initialize_node_managers(&self, context: &ServerContext) -> Result<(), String> {
255 info!("Initializing node managers");
256 {
257 if self.node_managers.is_empty() {
258 return Err("No node managers defined, server is invalid".to_string());
259 }
260
261 let mut type_tree = trace_write_lock!(self.info.type_tree);
265
266 for mgr in self.node_managers.iter() {
267 mgr.init(&mut type_tree, context.clone()).await;
268 }
269 }
270 Ok(())
271 }
272
273 #[cfg(feature = "discovery-server-registration")]
274 async fn run_discovery_server_registration(info: Arc<ServerInfo>) -> Never {
275 let registered_server = info.registered_server();
276 let Some(discovery_server_url) = info.config.discovery_server_url.as_ref() else {
277 loop {
278 futures::future::pending::<()>().await;
279 }
280 };
281 crate::discovery::periodic_discovery_server_registration(
282 discovery_server_url,
283 registered_server,
284 info.config.pki_dir.clone(),
285 Duration::from_secs(5 * 60),
286 )
287 .await
288 }
289
290 pub async fn run_with(mut self, listener: TcpListener) -> Result<(), String> {
297 let context = ServerContext {
298 node_managers: self.node_managers.as_weak(),
299 subscriptions: self.subscriptions.clone(),
300 info: self.info.clone(),
301 authenticator: self.info.authenticator.clone(),
302 type_tree: self.info.type_tree.clone(),
303 type_tree_getter: self.info.type_tree_getter.clone(),
304 status: self.status.clone(),
305 };
306
307 self.initialize_node_managers(&context).await?;
308
309 self.status.set_server_started();
310 self.info.start_time.store(Arc::new(DateTime::now()));
311
312 let addr = listener
313 .local_addr()
314 .map_err(|e| format!("Failed to bind socket: {e:?}"))?;
315 info!("Now listening for connections on {addr}");
316
317 self.info
318 .port
319 .store(addr.port(), std::sync::atomic::Ordering::Relaxed);
320
321 self.log_endpoint_info();
322
323 let mut connection_counter = 0;
324
325 #[cfg(feature = "discovery-server-registration")]
326 let discovery_fut = Self::run_discovery_server_registration(self.info.clone());
327
328 #[cfg(not(feature = "discovery-server-registration"))]
329 let discovery_fut = futures::future::pending();
330
331 pin!(discovery_fut);
332
333 let subscription_fut =
334 Self::run_subscription_ticks(self.config.subscription_poll_interval_ms, &context);
335 pin!(subscription_fut);
336
337 let session_expiry_fut =
338 Self::run_session_expiry(&self.session_manager, &self.session_notify);
339 pin!(session_expiry_fut);
340
341 loop {
342 let conn_fut = if self.connections.is_empty() {
343 if self.token.is_cancelled() {
344 break;
345 }
346 Either::Left(futures::future::pending::<Option<Result<u32, JoinError>>>())
347 } else {
348 Either::Right(self.connections.next())
349 };
350
351 tokio::select! {
352 conn_res = conn_fut => {
353 match conn_res.unwrap() {
354 Ok(id) => {
355 info!("Connection {} terminated", id);
356 self.connection_map.remove(&id);
357 },
358 Err(e) => error!("Connection panic! {e}")
359 }
360 }
361 _ = &mut subscription_fut => {}
362 _ = &mut discovery_fut => {}
363 _ = &mut session_expiry_fut => {}
364 rs = listener.accept() => {
365 match rs {
366 Ok((socket, addr)) => {
367 info!("Accept new connection from {addr} ({connection_counter})");
368 let conn = SessionStarter::new(
369 TcpConnector::new(socket, TransportConfig {
370 send_buffer_size: self.info.config.limits.send_buffer_size,
371 max_message_size: self.info.config.limits.max_message_size,
372 max_chunk_count: self.info.config.limits.max_chunk_count,
373 receive_buffer_size: self.info.config.limits.receive_buffer_size,
374 hello_timeout: Duration::from_secs(self.info.config.tcp_config.hello_timeout as u64),
375 }, self.info.decoding_options()),
376 self.info.clone(),
377 self.session_manager.clone(),
378 self.certificate_store.clone(),
379 self.node_managers.clone(),
380 self.subscriptions.clone()
381 );
382
383 let (send, recv) = tokio::sync::mpsc::channel(5);
384 let handle = tokio::spawn(conn.run(recv, |_| {}).map(move |_| connection_counter));
385 self.connections.push(handle);
386 self.connection_map.insert(connection_counter, ConnectionInfo {
387 command_send: send
388 });
389 connection_counter += 1;
390 }
391 Err(e) => {
392 error!("Failed to accept client connection: {:?}", e);
393 }
394 }
395 }
396 rev_connect = self.reverse_connect_manager.wait_for_connection() => {
397 debug!("Attempting reverse connection to {:?}", rev_connect.target.address);
398 let conn = SessionStarter::new(
399 ReverseTcpConnector::new(
400 TransportConfig {
401 send_buffer_size: self.info.config.limits.send_buffer_size,
402 max_message_size: self.info.config.limits.max_message_size,
403 max_chunk_count: self.info.config.limits.max_chunk_count,
404 receive_buffer_size: self.info.config.limits.receive_buffer_size,
405 hello_timeout: Duration::from_secs(self.info.config.tcp_config.hello_timeout as u64),
406 },
407 self.info.decoding_options(),
408 rev_connect.target.address,
409 self.info.application_uri.to_string(),
410 rev_connect.target.endpoint_url,
411 ),
412 self.info.clone(),
413 self.session_manager.clone(),
414 self.certificate_store.clone(),
415 self.node_managers.clone(),
416 self.subscriptions.clone()
417 );
418
419 let (send, recv) = tokio::sync::mpsc::channel(5);
423 let rev_handle = rev_connect.handle;
424 let handle = tokio::spawn(async move {
425 conn.run(recv, |status| {
426 rev_handle.set_result(status);
427 }).await;
428 connection_counter
429 });
430 self.connections.push(handle);
431 self.connection_map.insert(connection_counter, ConnectionInfo {
432 command_send: send
433 });
434 connection_counter += 1;
435 }
436 _ = self.token.cancelled() => {
437 for conn in self.connection_map.values() {
438 let _ = conn.command_send.send(ControllerCommand::Close).await;
439 }
440 }
441 }
442 }
443
444 Ok(())
445 }
446
447 pub async fn run(self) -> Result<(), String> {
449 let addr = self.get_socket_address();
450
451 let Some(addr) = addr else {
452 error!("Cannot resolve server address, check server configuration");
453 return Err("Cannot resolve server address, check server configuration".to_owned());
454 };
455
456 info!("Try to bind address at {addr}");
457 let listener = match TcpListener::bind(&addr).await {
458 Ok(listener) => listener,
459 Err(e) => {
460 error!("Failed to bind socket: {:?}", e);
461 return Err(format!("Failed to bind socket: {e:?}"));
462 }
463 };
464
465 self.run_with(listener).await
466 }
467
468 async fn run_subscription_ticks(interval: u64, context: &ServerContext) -> Never {
469 if interval == 0 {
470 futures::future::pending().await
471 } else {
472 let context = context.clone();
473 let mut tick = tokio::time::interval(Duration::from_millis(interval));
474 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
475 loop {
476 tick.tick().await;
477
478 context.subscriptions.periodic_tick(&context).await;
479 }
480 }
481 }
482
483 async fn run_session_expiry(sessions: &RwLock<SessionManager>, notify: &Notify) -> Never {
484 loop {
485 let ((expiry, expired), notified) = {
486 let session_lck = trace_read_lock!(sessions);
487 (session_lck.check_session_expiry(), notify.notified())
489 };
490 if !expired.is_empty() {
491 let mut session_lck = trace_write_lock!(sessions);
492 for id in expired {
493 session_lck.expire_session(&id);
494 }
495 }
496 tokio::select! {
497 _ = tokio::time::sleep_until(expiry.into()) => {}
498 _ = notified => {}
499 }
500 }
501 }
502
503 fn log_endpoint_info(&self) {
505 info!("OPC UA Server: {}", self.info.application_name);
506 info!("Base url: {}", self.info.base_endpoint());
507 info!("Supported endpoints:");
508 for (id, endpoint) in &self.config.endpoints {
509 let users: Vec<String> = endpoint.user_token_ids.iter().cloned().collect();
510 let users = users.join(", ");
511 info!("Endpoint \"{}\": {}", id, endpoint.path);
512 info!(" Security Mode: {}", endpoint.security_mode);
513 info!(" Security Policy: {}", endpoint.security_policy);
514 info!(" Supported user tokens - {}", users);
515 }
516 }
517
518 fn get_socket_address(&self) -> Option<SocketAddr> {
520 let address = format!(
522 "{}:{}",
523 self.config.tcp_config.host, self.config.tcp_config.port
524 );
525 if let Ok(mut addrs_iter) = address.to_socket_addrs() {
526 addrs_iter.next()
527 } else {
528 None
529 }
530 }
531}