Skip to main content

opcua/server/
server.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5//! Provides the [`Server`] type and functionality related to it.
6
7use std::{marker::Sync, net::SocketAddr, panic::AssertUnwindSafe, sync::Arc};
8
9use tokio::{
10    self,
11    net::{TcpListener, TcpStream, ToSocketAddrs},
12    sync::oneshot::{self, Sender},
13    time::{interval_at, Duration, Instant},
14};
15
16use crate::core::{config::Config, prelude::*};
17use crate::crypto::*;
18use crate::sync::*;
19use crate::types::service_types::ServerState as ServerStateType;
20
21use crate::server::{
22    address_space::types::AddressSpace,
23    comms::tcp_transport::*,
24    comms::transport::Transport,
25    config::ServerConfig,
26    constants,
27    diagnostics::ServerDiagnostics,
28    events::audit::AuditLog,
29    metrics::ServerMetrics,
30    session::SessionManager,
31    state::{OperationalLimits, ServerState},
32    util::PollingAction,
33};
34
35pub type Connections = Vec<Arc<RwLock<TcpTransport>>>;
36
37/// A `Server` represents a running instance of an OPC UA server. There can be more than one `Server`
38/// running at any given time providing they do not share the same ports.
39///
40/// A `Server` is initialised from a [`ServerConfig`]. The `ServerConfig` sets what port the server
41/// runs on, the endpoints it supports, the identity tokens it supports, identity tokens and so forth.
42/// A single server can offer multiple endpoints with different security policies. A server can
43/// also be configured to register itself with a discovery server.
44///
45/// Once the `Server` is configured, it is run by calling [`run`] which consumes the `Server`.
46/// Alternatively if you have reason to maintain access to the server object,
47/// you may call the static function [`run_server`] providing the server wrapped as
48/// `Arc<RwLock<Server>>`.
49///
50/// The server's [`AddressSpace`] is initialised with the default OPC UA node set, but may also
51/// be extended with additional nodes representing folders, variables, methods etc.
52///
53/// The server's [`CertificateStore`] manages the server's private key and public certificate. It
54/// also manages public certificates of incoming clients and arranges them into trusted and rejected
55/// collections.
56///
57/// [`run`]: #method.run
58/// [`run_server`]: #method.run_server
59/// [`ServerConfig`]: ../config/struct.ServerConfig.html
60/// [`AddressSpace`]: ../address_space/address_space/struct.AddressSpace.html
61/// [`CertificateStore`]: ../../opcua_core/crypto/certificate_store/struct.CertificateStore.html
62///
63pub struct Server {
64    /// List of pending polling actions to add to the server once run is called
65    pending_polling_actions: Vec<(u64, Box<dyn Fn() + Send + Sync + 'static>)>,
66    /// Certificate store for certs
67    certificate_store: Arc<RwLock<CertificateStore>>,
68    /// Server metrics - diagnostics and anything else that someone might be interested in that
69    /// describes the current state of the server
70    server_metrics: Arc<RwLock<ServerMetrics>>,
71    /// The server state is everything that sessions share that can possibly change. State
72    /// is initialised from a [`ServerConfig`].
73    server_state: Arc<RwLock<ServerState>>,
74    /// Address space
75    address_space: Arc<RwLock<AddressSpace>>,
76    /// List of open connections
77    connections: Arc<RwLock<Connections>>,
78    /// Session manager
79    session_manager: Arc<RwLock<SessionManager>>,
80}
81
82impl From<ServerConfig> for Server {
83    fn from(config: ServerConfig) -> Server {
84        Server::new(config)
85    }
86}
87
88impl Server {
89    /// Creates a new [`Server`] instance, initialising it from a [`ServerConfig`].
90    ///
91    /// [`Server`]: ./struct.Server.html
92    /// [`ServerConfig`]: ../config/struct.ServerConfig.html
93    pub fn new(mut config: ServerConfig) -> Server {
94        if !config.is_valid() {
95            panic!("Cannot create a server using an invalid configuration.");
96        }
97
98        // Set from config
99        let application_name = config.application_name.clone();
100        let application_uri = UAString::from(&config.application_uri);
101        let product_uri = UAString::from(&config.product_uri);
102        let start_time = DateTime::now();
103        let servers = vec![config.application_uri.clone()];
104        let base_endpoint = format!(
105            "opc.tcp://{}:{}",
106            config.tcp_config.host, config.tcp_config.port
107        );
108        let max_subscriptions = config.limits.max_subscriptions as usize;
109        let max_monitored_items_per_sub = config.limits.max_monitored_items_per_sub as usize;
110        let max_monitored_item_queue_size = config.limits.max_monitored_item_queue_size as usize;
111
112        let diagnostics = Arc::new(RwLock::new(ServerDiagnostics::default()));
113        let min_publishing_interval_ms = config.limits.min_publishing_interval * 1000.0;
114        let min_sampling_interval_ms = config.limits.min_sampling_interval * 1000.0;
115        let send_buffer_size = config.limits.send_buffer_size;
116        let receive_buffer_size = config.limits.receive_buffer_size;
117
118        // Security, pki auto create cert
119        let application_description = if config.create_sample_keypair {
120            Some(config.application_description())
121        } else {
122            None
123        };
124        let (mut certificate_store, server_certificate, server_pkey) =
125            CertificateStore::new_with_x509_data(
126                &config.pki_dir,
127                false,
128                config.certificate_path.as_deref(),
129                config.private_key_path.as_deref(),
130                application_description,
131            );
132        if server_certificate.is_none() || server_pkey.is_none() {
133            error!("Server is missing its application instance certificate and/or its private key. Encrypted endpoints will not function correctly.")
134        }
135
136        // Load thumbprints of every user token
137        config.read_x509_thumbprints();
138
139        // Servers may choose to auto trust clients to save some messing around with rejected certs.
140        // This is strongly not advised in production.
141        if config.certificate_validation.trust_client_certs {
142            info!("Server has chosen to auto trust client certificates. You do not want to do this in production code.");
143            certificate_store.set_trust_unknown_certs(true);
144        }
145        certificate_store.set_check_time(config.certificate_validation.check_time);
146
147        let config = Arc::new(RwLock::new(config));
148
149        // Set some values in the address space from the server state
150        let address_space = Arc::new(RwLock::new(AddressSpace::new()));
151
152        let audit_log = Arc::new(RwLock::new(AuditLog::new(address_space.clone())));
153
154        let server_state = ServerState {
155            application_uri,
156            product_uri,
157            application_name: LocalizedText {
158                locale: UAString::null(),
159                text: UAString::from(application_name),
160            },
161            servers,
162            base_endpoint,
163            state: ServerStateType::Shutdown,
164            start_time,
165            config,
166            server_certificate,
167            server_pkey,
168            last_subscription_id: 0,
169            max_subscriptions,
170            max_monitored_items_per_sub,
171            max_monitored_item_queue_size,
172            min_publishing_interval_ms,
173            min_sampling_interval_ms,
174            default_keep_alive_count: constants::DEFAULT_KEEP_ALIVE_COUNT,
175            max_keep_alive_count: constants::MAX_KEEP_ALIVE_COUNT,
176            max_lifetime_count: constants::MAX_KEEP_ALIVE_COUNT * 3,
177            diagnostics,
178            abort: false,
179            audit_log,
180            register_nodes_callback: None,
181            unregister_nodes_callback: None,
182            historical_data_provider: None,
183            historical_event_provider: None,
184            operational_limits: OperationalLimits::default(),
185            send_buffer_size,
186            receive_buffer_size,
187        };
188        let server_state = Arc::new(RwLock::new(server_state));
189
190        {
191            let mut address_space = trace_write_lock!(address_space);
192            address_space.set_server_state(server_state.clone());
193        }
194
195        // Server metrics
196        let server_metrics = Arc::new(RwLock::new(ServerMetrics::new()));
197
198        // Cert store
199        let certificate_store = Arc::new(RwLock::new(certificate_store));
200
201        let server = Server {
202            pending_polling_actions: Vec::new(),
203            server_state,
204            server_metrics: server_metrics.clone(),
205            address_space,
206            certificate_store,
207            connections: Arc::new(RwLock::new(Vec::new())),
208            session_manager: Arc::new(RwLock::new(SessionManager::default())),
209        };
210
211        let mut server_metrics = trace_write_lock!(server_metrics);
212        server_metrics.set_server_info(&server);
213
214        server
215    }
216
217    /// Runs the server and blocks until it completes either by aborting or by error. Typically
218    /// a server should be run on its own thread.
219    ///
220    /// Calling this function consumes the server.
221    pub fn run(self) {
222        let server = Arc::new(RwLock::new(self));
223        Self::run_server(server);
224    }
225
226    /// Runs the supplied server and blocks until it completes either by aborting or
227    /// by error.
228    pub fn run_server(server: Arc<RwLock<Server>>) {
229        let single_threaded_executor = {
230            let server = trace_read_lock!(server);
231            let server_state = trace_read_lock!(server.server_state);
232            let config = trace_read_lock!(server_state.config);
233            config.performance.single_threaded_executor
234        };
235        let server_task = Self::new_server_task(server);
236        // Launch
237        let mut builder = if !single_threaded_executor {
238            tokio::runtime::Builder::new_multi_thread()
239        } else {
240            tokio::runtime::Builder::new_current_thread()
241        };
242        let runtime = builder.enable_all().build().unwrap();
243        Self::run_server_on_runtime(runtime, server_task, true);
244    }
245
246    /// Allow the server to be run on a caller supplied runtime. If block is set, the task
247    /// runs to completion (abort or by error), otherwise, the task is spawned and a join handle is
248    /// returned by the function. Spawning might be suitable if the runtime is being used for other
249    /// async tasks.
250    pub fn run_server_on_runtime<F>(
251        runtime: tokio::runtime::Runtime,
252        server_task: F,
253        block: bool,
254    ) -> Option<tokio::task::JoinHandle<<F as futures::Future>::Output>>
255    where
256        F: std::future::Future + Send + 'static,
257        F::Output: Send + 'static,
258    {
259        if block {
260            runtime.block_on(server_task);
261            info!("Server has finished");
262            None
263        } else {
264            Some(runtime.spawn(server_task))
265        }
266    }
267
268    /// Returns the main server task - the loop that waits for connections and processes them.
269    pub async fn new_server_task(server: Arc<RwLock<Server>>) {
270        // Get the address and discovery url
271        let (sock_addr, discovery_server_url) = {
272            let server = trace_read_lock!(server);
273
274            // Debug endpoints
275            server.log_endpoint_info();
276
277            let sock_addr = server.get_socket_address();
278            let server_state = trace_read_lock!(server.server_state);
279            let config = trace_read_lock!(server_state.config);
280
281            // Discovery url must be present and valid
282            let discovery_server_url =
283                if let Some(ref discovery_server_url) = config.discovery_server_url {
284                    if is_valid_opc_ua_url(discovery_server_url) {
285                        Some(discovery_server_url.clone())
286                    } else {
287                        None
288                    }
289                } else {
290                    None
291                };
292
293            (sock_addr, discovery_server_url)
294        };
295        match sock_addr {
296            None => {
297                error!("Cannot resolve server address, check configuration of server");
298            }
299            Some(sock_addr) => Self::server_task(server, sock_addr, discovery_server_url).await,
300        }
301    }
302
303    async fn server_task<A: ToSocketAddrs>(
304        server: Arc<RwLock<Server>>,
305        sock_addr: A,
306        discovery_server_url: Option<String>,
307    ) {
308        // This is returned as the main server task
309        info!("Waiting for Connection");
310        // Listen for connections (or abort)
311        let listener = match TcpListener::bind(&sock_addr).await {
312            Ok(listener) => listener,
313            Err(err) => {
314                panic!("Could not bind to socket {:?}", err)
315            }
316        };
317
318        let (tx_abort, rx_abort) = oneshot::channel();
319
320        // Put the server into a running state
321        {
322            let mut server = trace_write_lock!(server);
323            // Running
324            {
325                let mut server_state = trace_write_lock!(server.server_state);
326                server_state.start_time = DateTime::now();
327                server_state.set_state(ServerStateType::Running);
328            }
329
330            // Start a timer that registers the server with a discovery server
331            if let Some(ref discovery_server_url) = discovery_server_url {
332                server.start_discovery_server_registration_timer(discovery_server_url);
333            } else {
334                info!("Server has not set a discovery server url, so no registration will happen");
335            }
336
337            // Start any pending polling action timers
338            server.start_pending_polling_actions();
339        }
340
341        // Start a server abort task loop
342        Self::start_abort_poll(server.clone(), tx_abort);
343
344        // This isn't nice syntax, but basically there are two async actions
345        // going on, one of which has to complete - either the listener breaks out of its
346        // loop, or the rx_abort receives an abort message.
347        tokio::select! {
348            _ = async {
349                loop {
350                    match listener.accept().await {
351                        Ok((socket, _addr)) => {
352                            // Clear out dead sessions
353                            info!("Handling new connection {:?}", socket);
354                            // Check for abort
355                            let mut server = trace_write_lock!(server);
356                            let is_abort = {
357                                let server_state = trace_read_lock!(server.server_state);
358                                server_state.is_abort()
359                            };
360                            if is_abort {
361                                info!("Server is aborting so it will not accept new connections");
362                                break;
363                            } else {
364                                server.handle_connection(socket);
365                            }
366                        }
367                        Err(e) => {
368                            error!("couldn't accept connection to client: {:?}", e);
369                        }
370                    }
371                }
372                // Help the rust type inferencer out
373                Ok::<_, tokio::io::Error>(())
374            } => {}
375            _ = rx_abort => {
376                info!("abort received");
377            }
378        }
379        info!("main server task is finished");
380    }
381
382    /// Returns the current [`ServerState`] for the server.
383    ///
384    /// [`ServerState`]: ../state/struct.ServerState.html
385    pub fn server_state(&self) -> Arc<RwLock<ServerState>> {
386        self.server_state.clone()
387    }
388
389    /// Returns the `CertificateStore` for the server.
390    pub fn certificate_store(&self) -> Arc<RwLock<CertificateStore>> {
391        self.certificate_store.clone()
392    }
393
394    /// Returns the [`AddressSpace`] for the server.
395    ///
396    /// [`AddressSpace`]: ../address_space/address_space/struct.AddressSpace.html
397    pub fn address_space(&self) -> Arc<RwLock<AddressSpace>> {
398        self.address_space.clone()
399    }
400
401    /// Returns the [`Connections`] for the server.
402    ///
403    /// [`Connections`]: ./type.Connections.html
404    pub fn connections(&self) -> Arc<RwLock<Connections>> {
405        self.connections.clone()
406    }
407
408    /// Returns the [`ServerMetrics`] for the server.
409    ///
410    /// [`ServerMetrics`]: ../metrics/struct.ServerMetrics.html
411    pub fn server_metrics(&self) -> Arc<RwLock<ServerMetrics>> {
412        self.server_metrics.clone()
413    }
414
415    /// Returns the `single_threaded_executor` for the server.
416    pub fn single_threaded_executor(&self) -> bool {
417        let server_state = trace_read_lock!(self.server_state);
418        let config = trace_read_lock!(server_state.config);
419        config.performance.single_threaded_executor
420    }
421
422    /// Sets a flag telling the running server to abort. The abort will happen asynchronously after
423    /// all sessions have disconnected.
424    pub fn abort(&mut self) {
425        info!("Server has been instructed to abort");
426        let mut server_state = trace_write_lock!(self.server_state);
427        server_state.abort();
428    }
429
430    /// Strip out dead connections, i.e those which have disconnected. Returns `true` if there are
431    /// still open connections after this function completes.
432    fn remove_dead_connections(&self) -> bool {
433        // Go through all connections, removing those that have terminated
434        let mut connections = trace_write_lock!(self.connections);
435        connections.retain(|transport| {
436            // Try to obtain the lock on the transport and the session and check if session is terminated
437            // if it is, then we'll use its termination status to sweep it out.
438            let lock = transport.try_read();
439            if let Some(ref transport) = lock {
440                let session_manager = transport.session_manager();
441                let session_manager = trace_read_lock!(session_manager);
442                !session_manager.sessions_terminated()
443            } else {
444                true
445            }
446        });
447        !connections.is_empty()
448    }
449
450    /// Log information about the endpoints on this server
451    fn log_endpoint_info(&self) {
452        let server_state = trace_read_lock!(self.server_state);
453        let config = trace_read_lock!(server_state.config);
454        info!("OPC UA Server: {}", server_state.application_name);
455        info!("Base url: {}", server_state.base_endpoint);
456        info!("Supported endpoints:");
457        for (id, endpoint) in &config.endpoints {
458            let users: Vec<String> = endpoint.user_token_ids.iter().cloned().collect();
459            let users = users.join(", ");
460            info!("Endpoint \"{}\": {}", id, endpoint.path);
461            info!("  Security Mode:    {}", endpoint.security_mode);
462            info!("  Security Policy:  {}", endpoint.security_policy);
463            info!("  Supported user tokens - {}", users);
464        }
465    }
466
467    /// Returns the server socket address.
468    fn get_socket_address(&self) -> Option<SocketAddr> {
469        use std::net::ToSocketAddrs;
470        let server_state = trace_read_lock!(self.server_state);
471        let config = trace_read_lock!(server_state.config);
472        // Resolve this host / port to an address (or not)
473        let address = format!("{}:{}", config.tcp_config.host, config.tcp_config.port);
474        if let Ok(mut addrs_iter) = address.to_socket_addrs() {
475            addrs_iter.next()
476        } else {
477            None
478        }
479    }
480
481    /// This timer will poll the server to see if it has aborted. It also cleans up dead connections.
482    /// If it determines to abort it will signal the tx_abort so that the main listener loop can
483    /// be broken at its convenience.
484    fn start_abort_poll(server: Arc<RwLock<Server>>, tx_abort: Sender<()>) {
485        tokio::spawn(async move {
486            let mut timer = interval_at(Instant::now(), Duration::from_millis(1000));
487            loop {
488                trace!("abort_poll_task.take_while");
489                // Check if there are any open sessions
490                {
491                    let server = trace_read_lock!(server);
492                    let has_open_connections = server.remove_dead_connections();
493                    let server_state = trace_read_lock!(server.server_state);
494                    // Predicate breaks on abort & no open connections
495                    if server_state.is_abort() {
496                        if has_open_connections {
497                            warn!("Abort called while there were still open connections");
498                        }
499                        info!("Server has aborted so, sending a command to break the listen loop");
500                        tx_abort.send(()).unwrap();
501                        break;
502                    }
503                }
504                timer.tick().await;
505            }
506            info!("Abort poll task is finished");
507        });
508    }
509
510    /// Discovery registration is disabled.
511    #[cfg(not(feature = "discovery-server-registration"))]
512    fn start_discovery_server_registration_timer(&self, discovery_server_url: &str) {
513        info!("Discovery server registration is disabled in code so registration with {} will not happen", discovery_server_url);
514    }
515
516    /// Discovery registration runs a timer that triggers every 5 minutes and causes the server
517    /// to register itself with a discovery server.
518    #[cfg(feature = "discovery-server-registration")]
519    fn start_discovery_server_registration_timer(&self, discovery_server_url: &str) {
520        use crate::server::discovery;
521
522        let discovery_server_url = discovery_server_url.to_string();
523        info!(
524            "Server has set a discovery server url {} which will be used to register the server",
525            discovery_server_url
526        );
527        let server_state = self.server_state.clone();
528
529        // The registration timer fires on a duration, so make that duration and pretend the
530        // last time it fired was now - duration, so it should instantly fire when polled next.
531        let register_duration = Duration::from_secs(5 * 60);
532        let last_registered = Instant::now() - register_duration;
533        let last_registered = Arc::new(Mutex::new(last_registered));
534
535        tokio::spawn(async move {
536            // Polling happens fairly quickly so task can terminate on server abort, however
537            // it is looking for the registration duration to have elapsed until it actually does
538            // anything.
539            let mut timer = interval_at(Instant::now(), Duration::from_millis(1000));
540            loop {
541                trace!("discovery_server_register.take_while");
542                {
543                    let server_state = trace_read_lock!(server_state);
544                    if !server_state.is_running() || server_state.is_abort() {
545                        break;
546                    }
547                }
548
549                timer.tick().await;
550
551                // Test if registration needs to happen, i.e. if this is first time around,
552                // or if duration has elapsed since last attempt.
553                trace!("discovery_server_register.for_each");
554                let now = Instant::now();
555                let mut last_registered = trace_lock!(last_registered);
556                if now.duration_since(*last_registered) >= register_duration {
557                    *last_registered = now;
558                    // Even though the client uses tokio internally, the client's API is synchronous
559                    // so the registration will happen on its own thread. The expectation is that
560                    // it will run and either succeed, or it will fail but either way the operation
561                    // will have completed before the next timer fires.
562                    let server_state = server_state.clone();
563                    let discovery_server_url = discovery_server_url.clone();
564                    let _ = std::thread::spawn(move || {
565                        let _ = std::panic::catch_unwind(AssertUnwindSafe(move || {
566                            let server_state = trace_read_lock!(server_state);
567                            if server_state.is_running() {
568                                discovery::register_with_discovery_server(
569                                    &discovery_server_url,
570                                    &server_state,
571                                );
572                            }
573                        }));
574                    });
575                }
576            }
577            info!("Discovery timer task is finished");
578        });
579    }
580
581    /// Creates a polling action that happens continuously on an interval while the server
582    /// is running. For example, a server might run a polling action every 100ms to synchronous
583    /// address space state between variables and their physical backends.
584    ///
585    /// The function that is supplied does not take any arguments. It is expected that the
586    /// implementation will move any variables into the function that are required to perform its
587    /// action.
588    pub fn add_polling_action<F>(&mut self, interval_ms: u64, action: F)
589    where
590        F: Fn() + Send + Sync + 'static,
591    {
592        // If the server is not yet running, the action is queued and is started later
593        let server_state = trace_read_lock!(self.server_state);
594        if server_state.is_abort() {
595            error!("Polling action added when server is aborting");
596        // DO NOTHING
597        } else if !server_state.is_running() {
598            self.pending_polling_actions
599                .push((interval_ms, Box::new(action)));
600        } else {
601            // Start the action immediately
602            let _ = PollingAction::spawn(self.server_state.clone(), interval_ms, move || {
603                // Call the provided closure with the address space
604                action();
605            });
606        }
607    }
608
609    /// Starts any polling actions which were queued ready to start but not yet
610    fn start_pending_polling_actions(&mut self) {
611        let server_state = self.server_state.clone();
612        self.pending_polling_actions
613            .drain(..)
614            .for_each(|(interval_ms, action)| {
615                debug!(
616                    "Starting a pending polling action at rate of {} ms",
617                    interval_ms
618                );
619                let _ = PollingAction::spawn(server_state.clone(), interval_ms, move || {
620                    // Call the provided action
621                    action();
622                });
623            });
624    }
625
626    /// Create a new transport.
627    pub fn new_transport(&self) -> TcpTransport {
628        TcpTransport::new(
629            self.certificate_store.clone(),
630            self.server_state.clone(),
631            self.address_space.clone(),
632            self.session_manager.clone(),
633        )
634    }
635
636    /// Handles the incoming request
637    fn handle_connection(&mut self, socket: TcpStream) {
638        trace!("Connection thread spawning");
639
640        // Spawn a task for the connection
641        let connection = Arc::new(RwLock::new(self.new_transport()));
642        {
643            let mut connections = trace_write_lock!(self.connections);
644            connections.push(connection.clone());
645        }
646
647        // Looping interval has to cope with whatever sampling rate server needs
648        let looping_interval_ms = {
649            let server_state = trace_read_lock!(self.server_state);
650            // Get the minimum interval in ms
651            f64::min(
652                server_state.min_publishing_interval_ms,
653                server_state.min_sampling_interval_ms,
654            )
655        };
656
657        // Run adds a session task to the tokio session
658        TcpTransport::run(connection, socket, looping_interval_ms);
659    }
660}