Skip to main content

opcua_server/
server.rs

1use std::{
2    collections::HashMap,
3    net::{SocketAddr, ToSocketAddrs},
4    sync::{
5        atomic::{AtomicU16, AtomicU8},
6        Arc,
7    },
8    time::Duration,
9};
10
11use arc_swap::ArcSwap;
12use futures::{future::Either, never::Never, stream::FuturesUnordered, FutureExt, StreamExt};
13use opcua_core::{sync::RwLock, trace_read_lock, trace_write_lock};
14use opcua_nodes::DefaultTypeTree;
15use tokio::{
16    net::TcpListener,
17    pin,
18    sync::Notify,
19    task::{JoinError, JoinHandle},
20};
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, warn};
23
24use opcua_core::{config::Config, handle::AtomicHandle};
25use opcua_crypto::CertificateStore;
26
27use crate::{
28    diagnostics::ServerDiagnostics,
29    node_manager::{DefaultTypeTreeGetter, ServerContext},
30    reverse_connect::{self, ReverseConnectionManager},
31    session::controller::{ControllerCommand, SessionStarter},
32    transport::{
33        tcp::{TcpConnector, TransportConfig},
34        ReverseTcpConnector,
35    },
36    ServerStatusWrapper,
37};
38use opcua_types::{DateTime, LocalizedText, ServerState, UAString};
39
40use super::{
41    authenticator::DefaultAuthenticator,
42    builder::ServerBuilder,
43    config::ServerConfig,
44    info::ServerInfo,
45    node_manager::{NodeManagers, NodeManagersRef},
46    server_handle::ServerHandle,
47    session::manager::SessionManager,
48    subscriptions::SubscriptionCache,
49    ServerCapabilities,
50};
51
52struct ConnectionInfo {
53    command_send: tokio::sync::mpsc::Sender<ControllerCommand>,
54}
55
56/// The server struct. This is consumed when run, so you will typically not hold onto this for longer
57/// periods of time.
58pub struct Server {
59    /// Certificate store
60    certificate_store: Arc<RwLock<CertificateStore>>,
61    /// Session manager
62    session_manager: Arc<RwLock<SessionManager>>,
63    /// Open connections.
64    connections: FuturesUnordered<JoinHandle<u32>>,
65    /// Map to metadata about each open connection
66    connection_map: HashMap<u32, ConnectionInfo>,
67    /// Server configuration, fixed after the server is started
68    config: Arc<ServerConfig>,
69    /// Context for use by connections to access general server state.
70    info: Arc<ServerInfo>,
71    /// Subscription cache, global because subscriptions outlive sessions.
72    subscriptions: Arc<SubscriptionCache>,
73    /// List of node managers
74    node_managers: NodeManagers,
75    /// Cancellation token
76    token: CancellationToken,
77    /// Notify that is woken up if a new session is added to the session manager.
78    session_notify: Arc<Notify>,
79    /// Wrapper managing the `ServerStatus` server variable.
80    status: Arc<ServerStatusWrapper>,
81    /// Manager for reverse connections. This does nothing unless users register
82    /// reverse connect targets.
83    reverse_connect_manager: ReverseConnectionManager,
84}
85
86impl Server {
87    pub(crate) fn new_from_builder(builder: ServerBuilder) -> Result<(Self, ServerHandle), String> {
88        if let Err(e) = builder.config.validate() {
89            return Err(format!(
90                "Builder configuration is invalid: {}",
91                e.join(", ")
92            ));
93        }
94
95        let mut config = builder.config;
96
97        let application_name = config.application_name.clone();
98        let application_uri = UAString::from(&config.application_uri);
99        let product_uri = UAString::from(&config.product_uri);
100        let servers = vec![config.application_uri.clone()];
101        /* let base_endpoint = format!(
102            "opc.tcp://{}:{}",
103            config.tcp_config.host, config.tcp_config.port
104        ); */
105
106        // let diagnostics = Arc::new(RwLock::new(ServerDiagnostics::default()));
107        let send_buffer_size = config.limits.send_buffer_size;
108        let receive_buffer_size = config.limits.receive_buffer_size;
109
110        let application_description = if config.create_sample_keypair {
111            Some(config.application_description())
112        } else {
113            None
114        };
115
116        let (mut certificate_store, server_certificate, server_pkey) =
117            CertificateStore::new_with_x509_data(
118                &config.pki_dir,
119                false,
120                config.certificate_path.as_deref(),
121                config.private_key_path.as_deref(),
122                application_description,
123            );
124
125        if server_certificate.is_none() || server_pkey.is_none() {
126            warn!("Server is missing its application instance certificate and/or its private key. Encrypted endpoints will not function correctly.");
127        }
128
129        config.read_x509_thumbprints();
130
131        if config.certificate_validation.trust_client_certs {
132            info!("Server has chosen to auto trust client certificates. You do not want to do this in production code.");
133            certificate_store.set_trust_unknown_certs(true);
134        }
135        certificate_store.set_check_time(config.certificate_validation.check_time);
136
137        let config = Arc::new(config);
138
139        let service_level = Arc::new(AtomicU8::new(255));
140
141        let type_tree = Arc::new(RwLock::new(DefaultTypeTree::new()));
142
143        let info = ServerInfo {
144            authenticator: builder
145                .authenticator
146                .unwrap_or_else(|| Arc::new(DefaultAuthenticator::new(config.user_tokens.clone()))),
147            application_uri,
148            product_uri,
149            application_name: LocalizedText {
150                locale: UAString::null(),
151                text: UAString::from(application_name),
152            },
153            start_time: ArcSwap::new(Arc::new(opcua_types::DateTime::now())),
154            servers,
155            config: config.clone(),
156            server_certificate,
157            server_pkey,
158            operational_limits: config.limits.operational.clone(),
159            state: ArcSwap::new(Arc::new(ServerState::Shutdown)),
160            send_buffer_size,
161            receive_buffer_size,
162            type_tree: type_tree.clone(),
163            subscription_id_handle: AtomicHandle::new(1),
164            monitored_item_id_handle: AtomicHandle::new(1),
165            secure_channel_id_handle: Arc::new(AtomicHandle::new(1)),
166            capabilities: ServerCapabilities::default(),
167            service_level: service_level.clone(),
168            port: AtomicU16::new(0),
169            type_tree_getter: builder
170                .type_tree_getter
171                .unwrap_or_else(|| Arc::new(DefaultTypeTreeGetter)),
172            type_loaders: RwLock::new(builder.type_loaders),
173            diagnostics: ServerDiagnostics {
174                enabled: config.diagnostics,
175                ..Default::default()
176            },
177        };
178
179        let certificate_store = Arc::new(RwLock::new(certificate_store));
180
181        let info = Arc::new(info);
182        let subscriptions = Arc::new(SubscriptionCache::new(config.limits.subscriptions));
183
184        let node_managers_ref = NodeManagersRef::new_empty();
185        let status_wrapper = Arc::new(ServerStatusWrapper::new(
186            builder.build_info,
187            subscriptions.clone(),
188        ));
189        let context = ServerContext {
190            node_managers: node_managers_ref.clone(),
191            subscriptions: subscriptions.clone(),
192            info: info.clone(),
193            authenticator: info.authenticator.clone(),
194            type_tree: type_tree.clone(),
195            type_tree_getter: info.type_tree_getter.clone(),
196            status: status_wrapper.clone(),
197        };
198
199        let mut final_node_managers = Vec::new();
200        for nm_builder in builder.node_managers {
201            final_node_managers.push(nm_builder.build(context.clone()));
202        }
203
204        let node_managers = NodeManagers::new(final_node_managers);
205        node_managers_ref.init_from_node_managers(node_managers.clone());
206
207        let session_notify = Arc::new(Notify::new());
208        let session_manager = Arc::new(RwLock::new(SessionManager::new(
209            info.clone(),
210            session_notify.clone(),
211        )));
212
213        let (reverse_connect_manager, reverse_connect_handle) =
214            reverse_connect::ReverseConnectionManager::new(Duration::from_millis(
215                config.reverse_connect_failure_delay_ms,
216            ));
217
218        let handle = ServerHandle::new(
219            info.clone(),
220            service_level,
221            subscriptions.clone(),
222            node_managers.clone(),
223            session_manager.clone(),
224            type_tree.clone(),
225            status_wrapper.clone(),
226            builder.token.clone(),
227            reverse_connect_handle,
228        );
229        Ok((
230            Self {
231                certificate_store,
232                session_manager,
233                connections: FuturesUnordered::new(),
234                connection_map: HashMap::new(),
235                subscriptions,
236                config,
237                info,
238                node_managers,
239                token: builder.token,
240                session_notify,
241                status: status_wrapper.clone(),
242                reverse_connect_manager,
243            },
244            handle,
245        ))
246    }
247
248    /// Get a reference to the SubscriptionCache containing all subscriptions on the server.
249    pub fn subscriptions(&self) -> Arc<SubscriptionCache> {
250        self.subscriptions.clone()
251    }
252
253    #[allow(clippy::await_holding_lock)]
254    async fn initialize_node_managers(&self, context: &ServerContext) -> Result<(), String> {
255        info!("Initializing node managers");
256        {
257            if self.node_managers.is_empty() {
258                return Err("No node managers defined, server is invalid".to_string());
259            }
260
261            // Normally we would strongly attempt to avoid holding a lock over an await point,
262            // but during initialization we essentially own the type tree, so this shouldn't deadlock
263            // unless a manager for whatever reason attempts to lock the type tree again.
264            let mut type_tree = trace_write_lock!(self.info.type_tree);
265
266            for mgr in self.node_managers.iter() {
267                mgr.init(&mut type_tree, context.clone()).await;
268            }
269        }
270        Ok(())
271    }
272
273    #[cfg(feature = "discovery-server-registration")]
274    async fn run_discovery_server_registration(info: Arc<ServerInfo>) -> Never {
275        let registered_server = info.registered_server();
276        let Some(discovery_server_url) = info.config.discovery_server_url.as_ref() else {
277            loop {
278                futures::future::pending::<()>().await;
279            }
280        };
281        crate::discovery::periodic_discovery_server_registration(
282            discovery_server_url,
283            registered_server,
284            info.config.pki_dir.clone(),
285            Duration::from_secs(5 * 60),
286        )
287        .await
288    }
289
290    /// Run the server using a given TCP listener.
291    /// Note that the configured TCP endpoint is still used to create the endpoint
292    /// descriptions, you must properly set `host` and `port` even when using this.
293    ///
294    /// This is useful for testing, as you can bind a `TcpListener` to port `0` auto-assign
295    /// a port.
296    pub async fn run_with(mut self, listener: TcpListener) -> Result<(), String> {
297        let context = ServerContext {
298            node_managers: self.node_managers.as_weak(),
299            subscriptions: self.subscriptions.clone(),
300            info: self.info.clone(),
301            authenticator: self.info.authenticator.clone(),
302            type_tree: self.info.type_tree.clone(),
303            type_tree_getter: self.info.type_tree_getter.clone(),
304            status: self.status.clone(),
305        };
306
307        self.initialize_node_managers(&context).await?;
308
309        self.status.set_server_started();
310        self.info.start_time.store(Arc::new(DateTime::now()));
311
312        let addr = listener
313            .local_addr()
314            .map_err(|e| format!("Failed to bind socket: {e:?}"))?;
315        info!("Now listening for connections on {addr}");
316
317        self.info
318            .port
319            .store(addr.port(), std::sync::atomic::Ordering::Relaxed);
320
321        self.log_endpoint_info();
322
323        let mut connection_counter = 0;
324
325        #[cfg(feature = "discovery-server-registration")]
326        let discovery_fut = Self::run_discovery_server_registration(self.info.clone());
327
328        #[cfg(not(feature = "discovery-server-registration"))]
329        let discovery_fut = futures::future::pending();
330
331        pin!(discovery_fut);
332
333        let subscription_fut =
334            Self::run_subscription_ticks(self.config.subscription_poll_interval_ms, &context);
335        pin!(subscription_fut);
336
337        let session_expiry_fut =
338            Self::run_session_expiry(&self.session_manager, &self.session_notify);
339        pin!(session_expiry_fut);
340
341        loop {
342            let conn_fut = if self.connections.is_empty() {
343                if self.token.is_cancelled() {
344                    break;
345                }
346                Either::Left(futures::future::pending::<Option<Result<u32, JoinError>>>())
347            } else {
348                Either::Right(self.connections.next())
349            };
350
351            tokio::select! {
352                conn_res = conn_fut => {
353                    match conn_res.unwrap() {
354                        Ok(id) => {
355                            info!("Connection {} terminated", id);
356                            self.connection_map.remove(&id);
357                        },
358                        Err(e) => error!("Connection panic! {e}")
359                    }
360                }
361                _ = &mut subscription_fut => {}
362                _ = &mut discovery_fut => {}
363                _ = &mut session_expiry_fut => {}
364                rs = listener.accept() => {
365                    match rs {
366                        Ok((socket, addr)) => {
367                            info!("Accept new connection from {addr} ({connection_counter})");
368                            let conn = SessionStarter::new(
369                                TcpConnector::new(socket, TransportConfig {
370                                    send_buffer_size: self.info.config.limits.send_buffer_size,
371                                    max_message_size: self.info.config.limits.max_message_size,
372                                    max_chunk_count: self.info.config.limits.max_chunk_count,
373                                    receive_buffer_size: self.info.config.limits.receive_buffer_size,
374                                    hello_timeout: Duration::from_secs(self.info.config.tcp_config.hello_timeout as u64),
375                                }, self.info.decoding_options()),
376                                self.info.clone(),
377                                self.session_manager.clone(),
378                                self.certificate_store.clone(),
379                                self.node_managers.clone(),
380                                self.subscriptions.clone()
381                            );
382
383                            let (send, recv) = tokio::sync::mpsc::channel(5);
384                            let handle = tokio::spawn(conn.run(recv, |_| {}).map(move |_| connection_counter));
385                            self.connections.push(handle);
386                            self.connection_map.insert(connection_counter, ConnectionInfo {
387                                command_send: send
388                            });
389                            connection_counter += 1;
390                        }
391                        Err(e) => {
392                            error!("Failed to accept client connection: {:?}", e);
393                        }
394                    }
395                }
396                rev_connect = self.reverse_connect_manager.wait_for_connection() => {
397                    debug!("Attempting reverse connection to {:?}", rev_connect.target.address);
398                    let conn = SessionStarter::new(
399                        ReverseTcpConnector::new(
400                            TransportConfig {
401                                    send_buffer_size: self.info.config.limits.send_buffer_size,
402                                    max_message_size: self.info.config.limits.max_message_size,
403                                    max_chunk_count: self.info.config.limits.max_chunk_count,
404                                    receive_buffer_size: self.info.config.limits.receive_buffer_size,
405                                    hello_timeout: Duration::from_secs(self.info.config.tcp_config.hello_timeout as u64),
406                                },
407                            self.info.decoding_options(),
408                            rev_connect.target.address,
409                            self.info.application_uri.to_string(),
410                            rev_connect.target.endpoint_url,
411                        ),
412                        self.info.clone(),
413                        self.session_manager.clone(),
414                        self.certificate_store.clone(),
415                        self.node_managers.clone(),
416                        self.subscriptions.clone()
417                    );
418
419                    // We need to make sure that the reverse connect handle is passed
420                    // to the connection task, so that we can signal the result of the connection attempt
421                    // back to the reverse connect manager.
422                    let (send, recv) = tokio::sync::mpsc::channel(5);
423                    let rev_handle = rev_connect.handle;
424                    let handle = tokio::spawn(async move {
425                        conn.run(recv, |status| {
426                            rev_handle.set_result(status);
427                        }).await;
428                        connection_counter
429                    });
430                    self.connections.push(handle);
431                    self.connection_map.insert(connection_counter, ConnectionInfo {
432                        command_send: send
433                    });
434                    connection_counter += 1;
435                }
436                _ = self.token.cancelled() => {
437                    for conn in self.connection_map.values() {
438                        let _ = conn.command_send.send(ControllerCommand::Close).await;
439                    }
440                }
441            }
442        }
443
444        Ok(())
445    }
446
447    /// Run the server. The provided `token` can be used to stop the server gracefully.
448    pub async fn run(self) -> Result<(), String> {
449        let addr = self.get_socket_address();
450
451        let Some(addr) = addr else {
452            error!("Cannot resolve server address, check server configuration");
453            return Err("Cannot resolve server address, check server configuration".to_owned());
454        };
455
456        info!("Try to bind address at {addr}");
457        let listener = match TcpListener::bind(&addr).await {
458            Ok(listener) => listener,
459            Err(e) => {
460                error!("Failed to bind socket: {:?}", e);
461                return Err(format!("Failed to bind socket: {e:?}"));
462            }
463        };
464
465        self.run_with(listener).await
466    }
467
468    async fn run_subscription_ticks(interval: u64, context: &ServerContext) -> Never {
469        if interval == 0 {
470            futures::future::pending().await
471        } else {
472            let context = context.clone();
473            let mut tick = tokio::time::interval(Duration::from_millis(interval));
474            tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
475            loop {
476                tick.tick().await;
477
478                context.subscriptions.periodic_tick(&context).await;
479            }
480        }
481    }
482
483    async fn run_session_expiry(sessions: &RwLock<SessionManager>, notify: &Notify) -> Never {
484        loop {
485            let ((expiry, expired), notified) = {
486                let session_lck = trace_read_lock!(sessions);
487                // Make sure to create the notified future while we still hold the lock.
488                (session_lck.check_session_expiry(), notify.notified())
489            };
490            if !expired.is_empty() {
491                let mut session_lck = trace_write_lock!(sessions);
492                for id in expired {
493                    session_lck.expire_session(&id);
494                }
495            }
496            tokio::select! {
497                _ = tokio::time::sleep_until(expiry.into()) => {}
498                _ = notified => {}
499            }
500        }
501    }
502
503    /// Log information about the endpoints on this server
504    fn log_endpoint_info(&self) {
505        info!("OPC UA Server: {}", self.info.application_name);
506        info!("Base url: {}", self.info.base_endpoint());
507        info!("Supported endpoints:");
508        for (id, endpoint) in &self.config.endpoints {
509            let users: Vec<String> = endpoint.user_token_ids.iter().cloned().collect();
510            let users = users.join(", ");
511            info!("Endpoint \"{}\": {}", id, endpoint.path);
512            info!("  Security Mode:    {}", endpoint.security_mode);
513            info!("  Security Policy:  {}", endpoint.security_policy);
514            info!("  Supported user tokens - {}", users);
515        }
516    }
517
518    /// Returns the server socket address.
519    fn get_socket_address(&self) -> Option<SocketAddr> {
520        // Resolve this host / port to an address (or not)
521        let address = format!(
522            "{}:{}",
523            self.config.tcp_config.host, self.config.tcp_config.port
524        );
525        if let Ok(mut addrs_iter) = address.to_socket_addrs() {
526            addrs_iter.next()
527        } else {
528            None
529        }
530    }
531}