opcua_server/
server.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5//! Provides the [`Server`] type and functionality related to it.
6
7use std::{
8    marker::Sync,
9    net::SocketAddr,
10    sync::{Arc, RwLock},
11};
12
13use tokio::{
14    self,
15    net::{TcpListener, TcpStream, ToSocketAddrs},
16    sync::oneshot::{self, Sender},
17    time::{interval_at, Duration, Instant},
18};
19
20use opcua_core::{config::Config, prelude::*};
21use opcua_crypto::*;
22use opcua_types::service_types::ServerState as ServerStateType;
23
24use crate::{
25    address_space::types::AddressSpace,
26    comms::tcp_transport::*,
27    comms::transport::Transport,
28    config::ServerConfig,
29    constants,
30    diagnostics::ServerDiagnostics,
31    events::audit::AuditLog,
32    metrics::ServerMetrics,
33    state::{OperationalLimits, ServerState},
34    util::PollingAction,
35};
36
37pub type Connections = Vec<Arc<RwLock<TcpTransport>>>;
38
39/// A `Server` represents a running instance of an OPC UA server. There can be more than one `Server`
40/// running at any given time providing they do not share the same ports.
41///
42/// A `Server` is initialised from a [`ServerConfig`]. The `ServerConfig` sets what port the server
43/// runs on, the endpoints it supports, the identity tokens it supports, identity tokens and so forth.
44/// A single server can offer multiple endpoints with different security policies. A server can
45/// also be configured to register itself with a discovery server.
46///
47/// Once the `Server` is configured, it is run by calling [`run`] which consumes the `Server`.
48/// Alternatively if you have reason to maintain access to the server object,
49/// you may call the static function [`run_server`] providing the server wrapped as
50/// `Arc<RwLock<Server>>`.
51///
52/// The server's [`AddressSpace`] is initialised with the default OPC UA node set, but may also
53/// be extended with additional nodes representing folders, variables, methods etc.
54///
55/// The server's [`CertificateStore`] manages the server's private key and public certificate. It
56/// also manages public certificates of incoming clients and arranges them into trusted and rejected
57/// collections.
58///
59/// [`run`]: #method.run
60/// [`run_server`]: #method.run_server
61/// [`ServerConfig`]: ../config/struct.ServerConfig.html
62/// [`AddressSpace`]: ../address_space/address_space/struct.AddressSpace.html
63/// [`CertificateStore`]: ../../opcua_core/crypto/certificate_store/struct.CertificateStore.html
64///
65pub struct Server {
66    /// List of pending polling actions to add to the server once run is called
67    pending_polling_actions: Vec<(u64, Box<dyn Fn() + Send + Sync + 'static>)>,
68    /// Certificate store for certs
69    certificate_store: Arc<RwLock<CertificateStore>>,
70    /// Server metrics - diagnostics and anything else that someone might be interested in that
71    /// describes the current state of the server
72    server_metrics: Arc<RwLock<ServerMetrics>>,
73    /// The server state is everything that sessions share that can possibly change. State
74    /// is initialised from a [`ServerConfig`].
75    server_state: Arc<RwLock<ServerState>>,
76    /// Address space
77    address_space: Arc<RwLock<AddressSpace>>,
78    /// List of open connections
79    connections: Arc<RwLock<Connections>>,
80}
81
82impl From<ServerConfig> for Server {
83    fn from(config: ServerConfig) -> Server {
84        Server::new(config)
85    }
86}
87
88impl Server {
89    /// Creates a new [`Server`] instance, initialising it from a [`ServerConfig`].
90    ///
91    /// [`Server`]: ./struct.Server.html
92    /// [`ServerConfig`]: ../config/struct.ServerConfig.html
93    pub fn new(mut config: ServerConfig) -> Server {
94        if !config.is_valid() {
95            panic!("Cannot create a server using an invalid configuration.");
96        }
97
98        // Set from config
99        let application_name = config.application_name.clone();
100        let application_uri = UAString::from(&config.application_uri);
101        let product_uri = UAString::from(&config.product_uri);
102        let start_time = DateTime::now();
103        let servers = vec![config.application_uri.clone()];
104        let base_endpoint = format!(
105            "opc.tcp://{}:{}",
106            config.tcp_config.host, config.tcp_config.port
107        );
108        let max_subscriptions = config.limits.max_subscriptions as usize;
109        let max_monitored_items_per_sub = config.limits.max_monitored_items_per_sub as usize;
110        let max_monitored_item_queue_size = config.limits.max_monitored_item_queue_size as usize;
111
112        let diagnostics = Arc::new(RwLock::new(ServerDiagnostics::default()));
113        let min_publishing_interval_ms = config.limits.min_publishing_interval * 1000.0;
114        let min_sampling_interval_ms = config.limits.min_sampling_interval * 1000.0;
115
116        // TODO max string, byte string and array lengths
117
118        // Security, pki auto create cert
119        let application_description = if config.create_sample_keypair {
120            Some(config.application_description())
121        } else {
122            None
123        };
124        let (mut certificate_store, server_certificate, server_pkey) =
125            CertificateStore::new_with_x509_data(
126                &config.pki_dir,
127                false,
128                config.certificate_path.as_deref(),
129                config.private_key_path.as_deref(),
130                application_description,
131            );
132        if server_certificate.is_none() || server_pkey.is_none() {
133            error!("Server is missing its application instance certificate and/or its private key. Encrypted endpoints will not function correctly.")
134        }
135
136        // Load thumbprints of every user token
137        config.read_x509_thumbprints();
138
139        // Servers may choose to auto trust clients to save some messing around with rejected certs.
140        // This is strongly not advised in production.
141        if config.certificate_validation.trust_client_certs {
142            info!("Server has chosen to auto trust client certificates. You do not want to do this in production code.");
143            certificate_store.set_trust_unknown_certs(true);
144        }
145        certificate_store.set_check_time(config.certificate_validation.check_time);
146
147        let config = Arc::new(RwLock::new(config));
148
149        // Set some values in the address space from the server state
150        let address_space = Arc::new(RwLock::new(AddressSpace::new()));
151
152        let audit_log = Arc::new(RwLock::new(AuditLog::new(address_space.clone())));
153
154        let server_state = ServerState {
155            application_uri,
156            product_uri,
157            application_name: LocalizedText {
158                locale: UAString::null(),
159                text: UAString::from(application_name),
160            },
161            servers,
162            base_endpoint,
163            state: ServerStateType::Shutdown,
164            start_time,
165            config,
166            server_certificate,
167            server_pkey,
168            last_subscription_id: 0,
169            max_subscriptions,
170            max_monitored_items_per_sub,
171            max_monitored_item_queue_size,
172            min_publishing_interval_ms,
173            min_sampling_interval_ms,
174            default_keep_alive_count: constants::DEFAULT_KEEP_ALIVE_COUNT,
175            max_keep_alive_count: constants::MAX_KEEP_ALIVE_COUNT,
176            max_lifetime_count: constants::MAX_KEEP_ALIVE_COUNT * 3,
177            diagnostics,
178            abort: false,
179            audit_log,
180            register_nodes_callback: None,
181            unregister_nodes_callback: None,
182            historical_data_provider: None,
183            historical_event_provider: None,
184            operational_limits: OperationalLimits::default(),
185        };
186        let server_state = Arc::new(RwLock::new(server_state));
187
188        {
189            let mut address_space = trace_write_lock!(address_space);
190            address_space.set_server_state(server_state.clone());
191        }
192
193        // Server metrics
194        let server_metrics = Arc::new(RwLock::new(ServerMetrics::new()));
195
196        // Cert store
197        let certificate_store = Arc::new(RwLock::new(certificate_store));
198
199        let server = Server {
200            pending_polling_actions: Vec::new(),
201            server_state,
202            server_metrics: server_metrics.clone(),
203            address_space,
204            certificate_store,
205            connections: Arc::new(RwLock::new(Vec::new())),
206        };
207
208        let mut server_metrics = trace_write_lock!(server_metrics);
209        server_metrics.set_server_info(&server);
210
211        server
212    }
213
214    /// Runs the server and blocks until it completes either by aborting or by error. Typically
215    /// a server should be run on its own thread.
216    ///
217    /// Calling this function consumes the server.
218    pub fn run(self) {
219        let server = Arc::new(RwLock::new(self));
220        Self::run_server(server);
221    }
222
223    /// Runs the supplied server and blocks until it completes either by aborting or
224    /// by error.
225    pub fn run_server(server: Arc<RwLock<Server>>) {
226        let single_threaded_executor = {
227            let server = trace_read_lock!(server);
228            let server_state = trace_read_lock!(server.server_state);
229            let config = trace_read_lock!(server_state.config);
230            config.performance.single_threaded_executor
231        };
232        let server_task = Self::new_server_task(server);
233        // Launch
234        let mut builder = if !single_threaded_executor {
235            tokio::runtime::Builder::new_multi_thread()
236        } else {
237            tokio::runtime::Builder::new_current_thread()
238        };
239        let runtime = builder.enable_all().build().unwrap();
240        Self::run_server_on_runtime(runtime, server_task, true);
241    }
242
243    /// Allow the server to be run on a caller supplied runtime. If block is set, the task
244    /// runs to completion (abort or by error), otherwise, the task is spawned and a join handle is
245    /// returned by the function. Spawning might be suitable if the runtime is being used for other
246    /// async tasks.
247    pub fn run_server_on_runtime<F>(
248        runtime: tokio::runtime::Runtime,
249        server_task: F,
250        block: bool,
251    ) -> Option<tokio::task::JoinHandle<<F as futures::Future>::Output>>
252    where
253        F: std::future::Future + Send + 'static,
254        F::Output: Send + 'static,
255    {
256        if block {
257            runtime.block_on(server_task);
258            info!("Server has finished");
259            None
260        } else {
261            Some(runtime.spawn(server_task))
262        }
263    }
264
265    /// Returns the main server task - the loop that waits for connections and processes them.
266    pub async fn new_server_task(server: Arc<RwLock<Server>>) {
267        // Get the address and discovery url
268        let (sock_addr, discovery_server_url) = {
269            let server = trace_read_lock!(server);
270
271            // Debug endpoints
272            server.log_endpoint_info();
273
274            let sock_addr = server.get_socket_address();
275            let server_state = trace_read_lock!(server.server_state);
276            let config = trace_read_lock!(server_state.config);
277
278            // Discovery url must be present and valid
279            let discovery_server_url =
280                if let Some(ref discovery_server_url) = config.discovery_server_url {
281                    if is_valid_opc_ua_url(discovery_server_url) {
282                        Some(discovery_server_url.clone())
283                    } else {
284                        None
285                    }
286                } else {
287                    None
288                };
289
290            (sock_addr, discovery_server_url)
291        };
292        match sock_addr {
293            None => {
294                error!("Cannot resolve server address, check configuration of server");
295            }
296            Some(sock_addr) => Self::server_task(server, sock_addr, discovery_server_url).await,
297        }
298    }
299
300    async fn server_task<A: ToSocketAddrs>(
301        server: Arc<RwLock<Server>>,
302        sock_addr: A,
303        discovery_server_url: Option<String>,
304    ) {
305        // This is returned as the main server task
306        info!("Waiting for Connection");
307        // Listen for connections (or abort)
308        let listener = match TcpListener::bind(&sock_addr).await {
309            Ok(listener) => listener,
310            Err(err) => {
311                panic!("Could not bind to socket {:?}", err)
312            }
313        };
314
315        let (tx_abort, rx_abort) = oneshot::channel();
316
317        // Put the server into a running state
318        {
319            let mut server = trace_write_lock!(server);
320            // Running
321            {
322                let mut server_state = trace_write_lock!(server.server_state);
323                server_state.start_time = DateTime::now();
324                server_state.set_state(ServerStateType::Running);
325            }
326
327            // Start a timer that registers the server with a discovery server
328            if let Some(ref discovery_server_url) = discovery_server_url {
329                server.start_discovery_server_registration_timer(discovery_server_url);
330            } else {
331                info!("Server has not set a discovery server url, so no registration will happen");
332            }
333
334            // Start any pending polling action timers
335            server.start_pending_polling_actions();
336        }
337
338        // Start a server abort task loop
339        Self::start_abort_poll(server.clone(), tx_abort);
340
341        // This isn't nice syntax, but basically there are two async actions
342        // going on, one of which has to complete - either the listener breaks out of its
343        // loop, or the rx_abort receives an abort message.
344        tokio::select! {
345            _ = async {
346                loop {
347                    match listener.accept().await {
348                        Ok((socket, _addr)) => {
349                            // Clear out dead sessions
350                            info!("Handling new connection {:?}", socket);
351                            // Check for abort
352                            let mut server = trace_write_lock!(server);
353                            let is_abort = {
354                                let server_state = trace_read_lock!(server.server_state);
355                                server_state.is_abort()
356                            };
357                            if is_abort {
358                                info!("Server is aborting so it will not accept new connections");
359                                break;
360                            } else {
361                                server.handle_connection(socket);
362                            }
363                        }
364                        Err(e) => {
365                            error!("couldn't accept connection to client: {:?}", e);
366                        }
367                    }
368                }
369                // Help the rust type inferencer out
370                Ok::<_, tokio::io::Error>(())
371            } => {}
372            _ = rx_abort => {
373                info!("abort received");
374            }
375        }
376        info!("main server task is finished");
377    }
378
379    /// Returns the current [`ServerState`] for the server.
380    ///
381    /// [`ServerState`]: ../state/struct.ServerState.html
382    pub fn server_state(&self) -> Arc<RwLock<ServerState>> {
383        self.server_state.clone()
384    }
385
386    /// Returns the `CertificateStore` for the server.
387    pub fn certificate_store(&self) -> Arc<RwLock<CertificateStore>> {
388        self.certificate_store.clone()
389    }
390
391    /// Returns the [`AddressSpace`] for the server.
392    ///
393    /// [`AddressSpace`]: ../address_space/address_space/struct.AddressSpace.html
394    pub fn address_space(&self) -> Arc<RwLock<AddressSpace>> {
395        self.address_space.clone()
396    }
397
398    /// Returns the [`Connections`] for the server.
399    ///
400    /// [`Connections`]: ./type.Connections.html
401    pub fn connections(&self) -> Arc<RwLock<Connections>> {
402        self.connections.clone()
403    }
404
405    /// Returns the [`ServerMetrics`] for the server.
406    ///
407    /// [`ServerMetrics`]: ../metrics/struct.ServerMetrics.html
408    pub fn server_metrics(&self) -> Arc<RwLock<ServerMetrics>> {
409        self.server_metrics.clone()
410    }
411
412    /// Returns the `single_threaded_executor` for the server.
413    pub fn single_threaded_executor(&self) -> bool {
414        let server_state = trace_read_lock!(self.server_state);
415        let config = trace_read_lock!(server_state.config);
416        config.performance.single_threaded_executor
417    }
418
419    /// Sets a flag telling the running server to abort. The abort will happen asynchronously after
420    /// all sessions have disconnected.
421    pub fn abort(&mut self) {
422        info!("Server has been instructed to abort");
423        let mut server_state = trace_write_lock!(self.server_state);
424        server_state.abort();
425    }
426
427    /// Strip out dead connections, i.e those which have disconnected. Returns `true` if there are
428    /// still open connections after this function completes.
429    fn remove_dead_connections(&self) -> bool {
430        // Go through all connections, removing those that have terminated
431        let mut connections = trace_write_lock!(self.connections);
432        connections.retain(|transport| {
433            // Try to obtain the lock on the transport and the session and check if session is terminated
434            // if it is, then we'll use its termination status to sweep it out.
435            let lock = transport.try_read();
436            if let Ok(ref transport) = lock {
437                let session_manager = transport.session_manager();
438                let session_manager = trace_read_lock!(session_manager);
439                !session_manager.sessions_terminated()
440            } else {
441                true
442            }
443        });
444        !connections.is_empty()
445    }
446
447    /// Log information about the endpoints on this server
448    fn log_endpoint_info(&self) {
449        let server_state = trace_read_lock!(self.server_state);
450        let config = trace_read_lock!(server_state.config);
451        info!("OPC UA Server: {}", server_state.application_name);
452        info!("Base url: {}", server_state.base_endpoint);
453        info!("Supported endpoints:");
454        for (id, endpoint) in &config.endpoints {
455            let users: Vec<String> = endpoint.user_token_ids.iter().cloned().collect();
456            let users = users.join(", ");
457            info!("Endpoint \"{}\": {}", id, endpoint.path);
458            info!("  Security Mode:    {}", endpoint.security_mode);
459            info!("  Security Policy:  {}", endpoint.security_policy);
460            info!("  Supported user tokens - {}", users);
461        }
462    }
463
464    /// Returns the server socket address.
465    fn get_socket_address(&self) -> Option<SocketAddr> {
466        use std::net::ToSocketAddrs;
467        let server_state = trace_read_lock!(self.server_state);
468        let config = trace_read_lock!(server_state.config);
469        // Resolve this host / port to an address (or not)
470        let address = format!("{}:{}", config.tcp_config.host, config.tcp_config.port);
471        if let Ok(mut addrs_iter) = address.to_socket_addrs() {
472            addrs_iter.next()
473        } else {
474            None
475        }
476    }
477
478    /// This timer will poll the server to see if it has aborted. It also cleans up dead connections.
479    /// If it determines to abort it will signal the tx_abort so that the main listener loop can
480    /// be broken at its convenience.
481    fn start_abort_poll(server: Arc<RwLock<Server>>, tx_abort: Sender<()>) {
482        tokio::spawn(async move {
483            let mut timer = interval_at(Instant::now(), Duration::from_millis(1000));
484            loop {
485                trace!("abort_poll_task.take_while");
486                // Check if there are any open sessions
487                {
488                    let server = trace_read_lock!(server);
489                    let has_open_connections = server.remove_dead_connections();
490                    let server_state = trace_read_lock!(server.server_state);
491                    // Predicate breaks on abort & no open connections
492                    if server_state.is_abort() {
493                        if has_open_connections {
494                            warn!("Abort called while there were still open connections");
495                        }
496                        info!("Server has aborted so, sending a command to break the listen loop");
497                        tx_abort.send(()).unwrap();
498                        break;
499                    }
500                }
501                timer.tick().await;
502            }
503            info!("Abort poll task is finished");
504        });
505    }
506
507    /// Discovery registration is disabled.
508    #[cfg(not(feature = "discovery-server-registration"))]
509    fn start_discovery_server_registration_timer(&self, discovery_server_url: &str) {
510        info!("Discovery server registration is disabled in code so registration with {} will not happen", discovery_server_url);
511    }
512
513    /// Discovery registration runs a timer that triggers every 5 minutes and causes the server
514    /// to register itself with a discovery server.
515    #[cfg(feature = "discovery-server-registration")]
516    fn start_discovery_server_registration_timer(&self, discovery_server_url: &str) {
517        use crate::discovery;
518        use std::sync::Mutex;
519
520        let discovery_server_url = discovery_server_url.to_string();
521        info!(
522            "Server has set a discovery server url {} which will be used to register the server",
523            discovery_server_url
524        );
525        let server_state = self.server_state.clone();
526
527        // The registration timer fires on a duration, so make that duration and pretend the
528        // last time it fired was now - duration, so it should instantly fire when polled next.
529        let register_duration = Duration::from_secs(5 * 60);
530        let last_registered = Instant::now() - register_duration;
531        let last_registered = Arc::new(Mutex::new(last_registered));
532
533        tokio::spawn(async move {
534            // Polling happens fairly quickly so task can terminate on server abort, however
535            // it is looking for the registration duration to have elapsed until it actually does
536            // anything.
537            let mut timer = interval_at(Instant::now(), Duration::from_millis(1000));
538            loop {
539                trace!("discovery_server_register.take_while");
540                {
541                    let server_state = trace_read_lock!(server_state);
542                    if !server_state.is_running() || server_state.is_abort() {
543                        break;
544                    }
545                }
546
547                timer.tick().await;
548
549                // Test if registration needs to happen, i.e. if this is first time around,
550                // or if duration has elapsed since last attempt.
551                trace!("discovery_server_register.for_each");
552                let now = Instant::now();
553                let mut last_registered = trace_lock!(last_registered);
554                if now.duration_since(*last_registered) >= register_duration {
555                    *last_registered = now;
556                    // Even though the client uses tokio internally, the client's API is synchronous
557                    // so the registration will happen on its own thread. The expectation is that
558                    // it will run and either succeed, or it will fail but either way the operation
559                    // will have completed before the next timer fires.
560                    let server_state = server_state.clone();
561                    let discovery_server_url = discovery_server_url.clone();
562                    let _ = std::thread::spawn(move || {
563                        let _ = std::panic::catch_unwind(move || {
564                            let server_state = trace_read_lock!(server_state);
565                            if server_state.is_running() {
566                                discovery::register_with_discovery_server(
567                                    &discovery_server_url,
568                                    &server_state,
569                                );
570                            }
571                        });
572                    });
573                }
574            }
575            info!("Discovery timer task is finished");
576        });
577    }
578
579    /// Creates a polling action that happens continuously on an interval while the server
580    /// is running. For example, a server might run a polling action every 100ms to synchronous
581    /// address space state between variables and their physical backends.
582    ///
583    /// The function that is supplied does not take any arguments. It is expected that the
584    /// implementation will move any variables into the function that are required to perform its
585    /// action.
586    pub fn add_polling_action<F>(&mut self, interval_ms: u64, action: F)
587    where
588        F: Fn() + Send + Sync + 'static,
589    {
590        // If the server is not yet running, the action is queued and is started later
591        let server_state = trace_read_lock!(self.server_state);
592        if server_state.is_abort() {
593            error!("Polling action added when server is aborting");
594        // DO NOTHING
595        } else if !server_state.is_running() {
596            self.pending_polling_actions
597                .push((interval_ms, Box::new(action)));
598        } else {
599            // Start the action immediately
600            let _ = PollingAction::spawn(self.server_state.clone(), interval_ms, move || {
601                // Call the provided closure with the address space
602                action();
603            });
604        }
605    }
606
607    /// Starts any polling actions which were queued ready to start but not yet
608    fn start_pending_polling_actions(&mut self) {
609        let server_state = self.server_state.clone();
610        self.pending_polling_actions
611            .drain(..)
612            .for_each(|(interval_ms, action)| {
613                debug!(
614                    "Starting a pending polling action at rate of {} ms",
615                    interval_ms
616                );
617                let _ = PollingAction::spawn(server_state.clone(), interval_ms, move || {
618                    // Call the provided action
619                    action();
620                });
621            });
622    }
623
624    /// Create a new transport.
625    pub fn new_transport(&self) -> TcpTransport {
626        TcpTransport::new(
627            self.certificate_store.clone(),
628            self.server_state.clone(),
629            self.address_space.clone(),
630        )
631    }
632
633    /// Handles the incoming request
634    fn handle_connection(&mut self, socket: TcpStream) {
635        trace!("Connection thread spawning");
636
637        // Spawn a thread for the connection
638        let connection = Arc::new(RwLock::new(self.new_transport()));
639        {
640            let mut connections = trace_write_lock!(self.connections);
641            connections.push(connection.clone());
642        }
643
644        // Looping interval has to cope with whatever sampling rate server needs
645        let looping_interval_ms = {
646            let server_state = trace_read_lock!(self.server_state);
647            // Get the minimum interval in ms
648            f64::min(
649                server_state.min_publishing_interval_ms,
650                server_state.min_sampling_interval_ms,
651            )
652        };
653
654        // Run adds a session task to the tokio session
655        TcpTransport::run(connection, socket, looping_interval_ms);
656    }
657}