1use std::{
8 marker::Sync,
9 net::SocketAddr,
10 sync::{Arc, RwLock},
11};
12
13use tokio::{
14 self,
15 net::{TcpListener, TcpStream, ToSocketAddrs},
16 sync::oneshot::{self, Sender},
17 time::{interval_at, Duration, Instant},
18};
19
20use opcua_core::{config::Config, prelude::*};
21use opcua_crypto::*;
22use opcua_types::service_types::ServerState as ServerStateType;
23
24use crate::{
25 address_space::types::AddressSpace,
26 comms::tcp_transport::*,
27 comms::transport::Transport,
28 config::ServerConfig,
29 constants,
30 diagnostics::ServerDiagnostics,
31 events::audit::AuditLog,
32 metrics::ServerMetrics,
33 state::{OperationalLimits, ServerState},
34 util::PollingAction,
35};
36
37pub type Connections = Vec<Arc<RwLock<TcpTransport>>>;
38
39pub struct Server {
66 pending_polling_actions: Vec<(u64, Box<dyn Fn() + Send + Sync + 'static>)>,
68 certificate_store: Arc<RwLock<CertificateStore>>,
70 server_metrics: Arc<RwLock<ServerMetrics>>,
73 server_state: Arc<RwLock<ServerState>>,
76 address_space: Arc<RwLock<AddressSpace>>,
78 connections: Arc<RwLock<Connections>>,
80}
81
82impl From<ServerConfig> for Server {
83 fn from(config: ServerConfig) -> Server {
84 Server::new(config)
85 }
86}
87
88impl Server {
89 pub fn new(mut config: ServerConfig) -> Server {
94 if !config.is_valid() {
95 panic!("Cannot create a server using an invalid configuration.");
96 }
97
98 let application_name = config.application_name.clone();
100 let application_uri = UAString::from(&config.application_uri);
101 let product_uri = UAString::from(&config.product_uri);
102 let start_time = DateTime::now();
103 let servers = vec![config.application_uri.clone()];
104 let base_endpoint = format!(
105 "opc.tcp://{}:{}",
106 config.tcp_config.host, config.tcp_config.port
107 );
108 let max_subscriptions = config.limits.max_subscriptions as usize;
109 let max_monitored_items_per_sub = config.limits.max_monitored_items_per_sub as usize;
110 let max_monitored_item_queue_size = config.limits.max_monitored_item_queue_size as usize;
111
112 let diagnostics = Arc::new(RwLock::new(ServerDiagnostics::default()));
113 let min_publishing_interval_ms = config.limits.min_publishing_interval * 1000.0;
114 let min_sampling_interval_ms = config.limits.min_sampling_interval * 1000.0;
115
116 let application_description = if config.create_sample_keypair {
120 Some(config.application_description())
121 } else {
122 None
123 };
124 let (mut certificate_store, server_certificate, server_pkey) =
125 CertificateStore::new_with_x509_data(
126 &config.pki_dir,
127 false,
128 config.certificate_path.as_deref(),
129 config.private_key_path.as_deref(),
130 application_description,
131 );
132 if server_certificate.is_none() || server_pkey.is_none() {
133 error!("Server is missing its application instance certificate and/or its private key. Encrypted endpoints will not function correctly.")
134 }
135
136 config.read_x509_thumbprints();
138
139 if config.certificate_validation.trust_client_certs {
142 info!("Server has chosen to auto trust client certificates. You do not want to do this in production code.");
143 certificate_store.set_trust_unknown_certs(true);
144 }
145 certificate_store.set_check_time(config.certificate_validation.check_time);
146
147 let config = Arc::new(RwLock::new(config));
148
149 let address_space = Arc::new(RwLock::new(AddressSpace::new()));
151
152 let audit_log = Arc::new(RwLock::new(AuditLog::new(address_space.clone())));
153
154 let server_state = ServerState {
155 application_uri,
156 product_uri,
157 application_name: LocalizedText {
158 locale: UAString::null(),
159 text: UAString::from(application_name),
160 },
161 servers,
162 base_endpoint,
163 state: ServerStateType::Shutdown,
164 start_time,
165 config,
166 server_certificate,
167 server_pkey,
168 last_subscription_id: 0,
169 max_subscriptions,
170 max_monitored_items_per_sub,
171 max_monitored_item_queue_size,
172 min_publishing_interval_ms,
173 min_sampling_interval_ms,
174 default_keep_alive_count: constants::DEFAULT_KEEP_ALIVE_COUNT,
175 max_keep_alive_count: constants::MAX_KEEP_ALIVE_COUNT,
176 max_lifetime_count: constants::MAX_KEEP_ALIVE_COUNT * 3,
177 diagnostics,
178 abort: false,
179 audit_log,
180 register_nodes_callback: None,
181 unregister_nodes_callback: None,
182 historical_data_provider: None,
183 historical_event_provider: None,
184 operational_limits: OperationalLimits::default(),
185 };
186 let server_state = Arc::new(RwLock::new(server_state));
187
188 {
189 let mut address_space = trace_write_lock!(address_space);
190 address_space.set_server_state(server_state.clone());
191 }
192
193 let server_metrics = Arc::new(RwLock::new(ServerMetrics::new()));
195
196 let certificate_store = Arc::new(RwLock::new(certificate_store));
198
199 let server = Server {
200 pending_polling_actions: Vec::new(),
201 server_state,
202 server_metrics: server_metrics.clone(),
203 address_space,
204 certificate_store,
205 connections: Arc::new(RwLock::new(Vec::new())),
206 };
207
208 let mut server_metrics = trace_write_lock!(server_metrics);
209 server_metrics.set_server_info(&server);
210
211 server
212 }
213
214 pub fn run(self) {
219 let server = Arc::new(RwLock::new(self));
220 Self::run_server(server);
221 }
222
223 pub fn run_server(server: Arc<RwLock<Server>>) {
226 let single_threaded_executor = {
227 let server = trace_read_lock!(server);
228 let server_state = trace_read_lock!(server.server_state);
229 let config = trace_read_lock!(server_state.config);
230 config.performance.single_threaded_executor
231 };
232 let server_task = Self::new_server_task(server);
233 let mut builder = if !single_threaded_executor {
235 tokio::runtime::Builder::new_multi_thread()
236 } else {
237 tokio::runtime::Builder::new_current_thread()
238 };
239 let runtime = builder.enable_all().build().unwrap();
240 Self::run_server_on_runtime(runtime, server_task, true);
241 }
242
243 pub fn run_server_on_runtime<F>(
248 runtime: tokio::runtime::Runtime,
249 server_task: F,
250 block: bool,
251 ) -> Option<tokio::task::JoinHandle<<F as futures::Future>::Output>>
252 where
253 F: std::future::Future + Send + 'static,
254 F::Output: Send + 'static,
255 {
256 if block {
257 runtime.block_on(server_task);
258 info!("Server has finished");
259 None
260 } else {
261 Some(runtime.spawn(server_task))
262 }
263 }
264
265 pub async fn new_server_task(server: Arc<RwLock<Server>>) {
267 let (sock_addr, discovery_server_url) = {
269 let server = trace_read_lock!(server);
270
271 server.log_endpoint_info();
273
274 let sock_addr = server.get_socket_address();
275 let server_state = trace_read_lock!(server.server_state);
276 let config = trace_read_lock!(server_state.config);
277
278 let discovery_server_url =
280 if let Some(ref discovery_server_url) = config.discovery_server_url {
281 if is_valid_opc_ua_url(discovery_server_url) {
282 Some(discovery_server_url.clone())
283 } else {
284 None
285 }
286 } else {
287 None
288 };
289
290 (sock_addr, discovery_server_url)
291 };
292 match sock_addr {
293 None => {
294 error!("Cannot resolve server address, check configuration of server");
295 }
296 Some(sock_addr) => Self::server_task(server, sock_addr, discovery_server_url).await,
297 }
298 }
299
300 async fn server_task<A: ToSocketAddrs>(
301 server: Arc<RwLock<Server>>,
302 sock_addr: A,
303 discovery_server_url: Option<String>,
304 ) {
305 info!("Waiting for Connection");
307 let listener = match TcpListener::bind(&sock_addr).await {
309 Ok(listener) => listener,
310 Err(err) => {
311 panic!("Could not bind to socket {:?}", err)
312 }
313 };
314
315 let (tx_abort, rx_abort) = oneshot::channel();
316
317 {
319 let mut server = trace_write_lock!(server);
320 {
322 let mut server_state = trace_write_lock!(server.server_state);
323 server_state.start_time = DateTime::now();
324 server_state.set_state(ServerStateType::Running);
325 }
326
327 if let Some(ref discovery_server_url) = discovery_server_url {
329 server.start_discovery_server_registration_timer(discovery_server_url);
330 } else {
331 info!("Server has not set a discovery server url, so no registration will happen");
332 }
333
334 server.start_pending_polling_actions();
336 }
337
338 Self::start_abort_poll(server.clone(), tx_abort);
340
341 tokio::select! {
345 _ = async {
346 loop {
347 match listener.accept().await {
348 Ok((socket, _addr)) => {
349 info!("Handling new connection {:?}", socket);
351 let mut server = trace_write_lock!(server);
353 let is_abort = {
354 let server_state = trace_read_lock!(server.server_state);
355 server_state.is_abort()
356 };
357 if is_abort {
358 info!("Server is aborting so it will not accept new connections");
359 break;
360 } else {
361 server.handle_connection(socket);
362 }
363 }
364 Err(e) => {
365 error!("couldn't accept connection to client: {:?}", e);
366 }
367 }
368 }
369 Ok::<_, tokio::io::Error>(())
371 } => {}
372 _ = rx_abort => {
373 info!("abort received");
374 }
375 }
376 info!("main server task is finished");
377 }
378
379 pub fn server_state(&self) -> Arc<RwLock<ServerState>> {
383 self.server_state.clone()
384 }
385
386 pub fn certificate_store(&self) -> Arc<RwLock<CertificateStore>> {
388 self.certificate_store.clone()
389 }
390
391 pub fn address_space(&self) -> Arc<RwLock<AddressSpace>> {
395 self.address_space.clone()
396 }
397
398 pub fn connections(&self) -> Arc<RwLock<Connections>> {
402 self.connections.clone()
403 }
404
405 pub fn server_metrics(&self) -> Arc<RwLock<ServerMetrics>> {
409 self.server_metrics.clone()
410 }
411
412 pub fn single_threaded_executor(&self) -> bool {
414 let server_state = trace_read_lock!(self.server_state);
415 let config = trace_read_lock!(server_state.config);
416 config.performance.single_threaded_executor
417 }
418
419 pub fn abort(&mut self) {
422 info!("Server has been instructed to abort");
423 let mut server_state = trace_write_lock!(self.server_state);
424 server_state.abort();
425 }
426
427 fn remove_dead_connections(&self) -> bool {
430 let mut connections = trace_write_lock!(self.connections);
432 connections.retain(|transport| {
433 let lock = transport.try_read();
436 if let Ok(ref transport) = lock {
437 let session_manager = transport.session_manager();
438 let session_manager = trace_read_lock!(session_manager);
439 !session_manager.sessions_terminated()
440 } else {
441 true
442 }
443 });
444 !connections.is_empty()
445 }
446
447 fn log_endpoint_info(&self) {
449 let server_state = trace_read_lock!(self.server_state);
450 let config = trace_read_lock!(server_state.config);
451 info!("OPC UA Server: {}", server_state.application_name);
452 info!("Base url: {}", server_state.base_endpoint);
453 info!("Supported endpoints:");
454 for (id, endpoint) in &config.endpoints {
455 let users: Vec<String> = endpoint.user_token_ids.iter().cloned().collect();
456 let users = users.join(", ");
457 info!("Endpoint \"{}\": {}", id, endpoint.path);
458 info!(" Security Mode: {}", endpoint.security_mode);
459 info!(" Security Policy: {}", endpoint.security_policy);
460 info!(" Supported user tokens - {}", users);
461 }
462 }
463
464 fn get_socket_address(&self) -> Option<SocketAddr> {
466 use std::net::ToSocketAddrs;
467 let server_state = trace_read_lock!(self.server_state);
468 let config = trace_read_lock!(server_state.config);
469 let address = format!("{}:{}", config.tcp_config.host, config.tcp_config.port);
471 if let Ok(mut addrs_iter) = address.to_socket_addrs() {
472 addrs_iter.next()
473 } else {
474 None
475 }
476 }
477
478 fn start_abort_poll(server: Arc<RwLock<Server>>, tx_abort: Sender<()>) {
482 tokio::spawn(async move {
483 let mut timer = interval_at(Instant::now(), Duration::from_millis(1000));
484 loop {
485 trace!("abort_poll_task.take_while");
486 {
488 let server = trace_read_lock!(server);
489 let has_open_connections = server.remove_dead_connections();
490 let server_state = trace_read_lock!(server.server_state);
491 if server_state.is_abort() {
493 if has_open_connections {
494 warn!("Abort called while there were still open connections");
495 }
496 info!("Server has aborted so, sending a command to break the listen loop");
497 tx_abort.send(()).unwrap();
498 break;
499 }
500 }
501 timer.tick().await;
502 }
503 info!("Abort poll task is finished");
504 });
505 }
506
507 #[cfg(not(feature = "discovery-server-registration"))]
509 fn start_discovery_server_registration_timer(&self, discovery_server_url: &str) {
510 info!("Discovery server registration is disabled in code so registration with {} will not happen", discovery_server_url);
511 }
512
513 #[cfg(feature = "discovery-server-registration")]
516 fn start_discovery_server_registration_timer(&self, discovery_server_url: &str) {
517 use crate::discovery;
518 use std::sync::Mutex;
519
520 let discovery_server_url = discovery_server_url.to_string();
521 info!(
522 "Server has set a discovery server url {} which will be used to register the server",
523 discovery_server_url
524 );
525 let server_state = self.server_state.clone();
526
527 let register_duration = Duration::from_secs(5 * 60);
530 let last_registered = Instant::now() - register_duration;
531 let last_registered = Arc::new(Mutex::new(last_registered));
532
533 tokio::spawn(async move {
534 let mut timer = interval_at(Instant::now(), Duration::from_millis(1000));
538 loop {
539 trace!("discovery_server_register.take_while");
540 {
541 let server_state = trace_read_lock!(server_state);
542 if !server_state.is_running() || server_state.is_abort() {
543 break;
544 }
545 }
546
547 timer.tick().await;
548
549 trace!("discovery_server_register.for_each");
552 let now = Instant::now();
553 let mut last_registered = trace_lock!(last_registered);
554 if now.duration_since(*last_registered) >= register_duration {
555 *last_registered = now;
556 let server_state = server_state.clone();
561 let discovery_server_url = discovery_server_url.clone();
562 let _ = std::thread::spawn(move || {
563 let _ = std::panic::catch_unwind(move || {
564 let server_state = trace_read_lock!(server_state);
565 if server_state.is_running() {
566 discovery::register_with_discovery_server(
567 &discovery_server_url,
568 &server_state,
569 );
570 }
571 });
572 });
573 }
574 }
575 info!("Discovery timer task is finished");
576 });
577 }
578
579 pub fn add_polling_action<F>(&mut self, interval_ms: u64, action: F)
587 where
588 F: Fn() + Send + Sync + 'static,
589 {
590 let server_state = trace_read_lock!(self.server_state);
592 if server_state.is_abort() {
593 error!("Polling action added when server is aborting");
594 } else if !server_state.is_running() {
596 self.pending_polling_actions
597 .push((interval_ms, Box::new(action)));
598 } else {
599 let _ = PollingAction::spawn(self.server_state.clone(), interval_ms, move || {
601 action();
603 });
604 }
605 }
606
607 fn start_pending_polling_actions(&mut self) {
609 let server_state = self.server_state.clone();
610 self.pending_polling_actions
611 .drain(..)
612 .for_each(|(interval_ms, action)| {
613 debug!(
614 "Starting a pending polling action at rate of {} ms",
615 interval_ms
616 );
617 let _ = PollingAction::spawn(server_state.clone(), interval_ms, move || {
618 action();
620 });
621 });
622 }
623
624 pub fn new_transport(&self) -> TcpTransport {
626 TcpTransport::new(
627 self.certificate_store.clone(),
628 self.server_state.clone(),
629 self.address_space.clone(),
630 )
631 }
632
633 fn handle_connection(&mut self, socket: TcpStream) {
635 trace!("Connection thread spawning");
636
637 let connection = Arc::new(RwLock::new(self.new_transport()));
639 {
640 let mut connections = trace_write_lock!(self.connections);
641 connections.push(connection.clone());
642 }
643
644 let looping_interval_ms = {
646 let server_state = trace_read_lock!(self.server_state);
647 f64::min(
649 server_state.min_publishing_interval_ms,
650 server_state.min_sampling_interval_ms,
651 )
652 };
653
654 TcpTransport::run(connection, socket, looping_interval_ms);
656 }
657}