1use std::{marker::Sync, net::SocketAddr, panic::AssertUnwindSafe, sync::Arc};
8
9use tokio::{
10 self,
11 net::{TcpListener, TcpStream, ToSocketAddrs},
12 sync::oneshot::{self, Sender},
13 time::{interval_at, Duration, Instant},
14};
15
16use crate::core::{config::Config, prelude::*};
17use crate::crypto::*;
18use crate::sync::*;
19use crate::types::service_types::ServerState as ServerStateType;
20
21use crate::server::{
22 address_space::types::AddressSpace,
23 comms::tcp_transport::*,
24 comms::transport::Transport,
25 config::ServerConfig,
26 constants,
27 diagnostics::ServerDiagnostics,
28 events::audit::AuditLog,
29 metrics::ServerMetrics,
30 session::SessionManager,
31 state::{OperationalLimits, ServerState},
32 util::PollingAction,
33};
34
35pub type Connections = Vec<Arc<RwLock<TcpTransport>>>;
36
37pub struct Server {
64 pending_polling_actions: Vec<(u64, Box<dyn Fn() + Send + Sync + 'static>)>,
66 certificate_store: Arc<RwLock<CertificateStore>>,
68 server_metrics: Arc<RwLock<ServerMetrics>>,
71 server_state: Arc<RwLock<ServerState>>,
74 address_space: Arc<RwLock<AddressSpace>>,
76 connections: Arc<RwLock<Connections>>,
78 session_manager: Arc<RwLock<SessionManager>>,
80}
81
82impl From<ServerConfig> for Server {
83 fn from(config: ServerConfig) -> Server {
84 Server::new(config)
85 }
86}
87
88impl Server {
89 pub fn new(mut config: ServerConfig) -> Server {
94 if !config.is_valid() {
95 panic!("Cannot create a server using an invalid configuration.");
96 }
97
98 let application_name = config.application_name.clone();
100 let application_uri = UAString::from(&config.application_uri);
101 let product_uri = UAString::from(&config.product_uri);
102 let start_time = DateTime::now();
103 let servers = vec![config.application_uri.clone()];
104 let base_endpoint = format!(
105 "opc.tcp://{}:{}",
106 config.tcp_config.host, config.tcp_config.port
107 );
108 let max_subscriptions = config.limits.max_subscriptions as usize;
109 let max_monitored_items_per_sub = config.limits.max_monitored_items_per_sub as usize;
110 let max_monitored_item_queue_size = config.limits.max_monitored_item_queue_size as usize;
111
112 let diagnostics = Arc::new(RwLock::new(ServerDiagnostics::default()));
113 let min_publishing_interval_ms = config.limits.min_publishing_interval * 1000.0;
114 let min_sampling_interval_ms = config.limits.min_sampling_interval * 1000.0;
115 let send_buffer_size = config.limits.send_buffer_size;
116 let receive_buffer_size = config.limits.receive_buffer_size;
117
118 let application_description = if config.create_sample_keypair {
120 Some(config.application_description())
121 } else {
122 None
123 };
124 let (mut certificate_store, server_certificate, server_pkey) =
125 CertificateStore::new_with_x509_data(
126 &config.pki_dir,
127 false,
128 config.certificate_path.as_deref(),
129 config.private_key_path.as_deref(),
130 application_description,
131 );
132 if server_certificate.is_none() || server_pkey.is_none() {
133 error!("Server is missing its application instance certificate and/or its private key. Encrypted endpoints will not function correctly.")
134 }
135
136 config.read_x509_thumbprints();
138
139 if config.certificate_validation.trust_client_certs {
142 info!("Server has chosen to auto trust client certificates. You do not want to do this in production code.");
143 certificate_store.set_trust_unknown_certs(true);
144 }
145 certificate_store.set_check_time(config.certificate_validation.check_time);
146
147 let config = Arc::new(RwLock::new(config));
148
149 let address_space = Arc::new(RwLock::new(AddressSpace::new()));
151
152 let audit_log = Arc::new(RwLock::new(AuditLog::new(address_space.clone())));
153
154 let server_state = ServerState {
155 application_uri,
156 product_uri,
157 application_name: LocalizedText {
158 locale: UAString::null(),
159 text: UAString::from(application_name),
160 },
161 servers,
162 base_endpoint,
163 state: ServerStateType::Shutdown,
164 start_time,
165 config,
166 server_certificate,
167 server_pkey,
168 last_subscription_id: 0,
169 max_subscriptions,
170 max_monitored_items_per_sub,
171 max_monitored_item_queue_size,
172 min_publishing_interval_ms,
173 min_sampling_interval_ms,
174 default_keep_alive_count: constants::DEFAULT_KEEP_ALIVE_COUNT,
175 max_keep_alive_count: constants::MAX_KEEP_ALIVE_COUNT,
176 max_lifetime_count: constants::MAX_KEEP_ALIVE_COUNT * 3,
177 diagnostics,
178 abort: false,
179 audit_log,
180 register_nodes_callback: None,
181 unregister_nodes_callback: None,
182 historical_data_provider: None,
183 historical_event_provider: None,
184 operational_limits: OperationalLimits::default(),
185 send_buffer_size,
186 receive_buffer_size,
187 };
188 let server_state = Arc::new(RwLock::new(server_state));
189
190 {
191 let mut address_space = trace_write_lock!(address_space);
192 address_space.set_server_state(server_state.clone());
193 }
194
195 let server_metrics = Arc::new(RwLock::new(ServerMetrics::new()));
197
198 let certificate_store = Arc::new(RwLock::new(certificate_store));
200
201 let server = Server {
202 pending_polling_actions: Vec::new(),
203 server_state,
204 server_metrics: server_metrics.clone(),
205 address_space,
206 certificate_store,
207 connections: Arc::new(RwLock::new(Vec::new())),
208 session_manager: Arc::new(RwLock::new(SessionManager::default())),
209 };
210
211 let mut server_metrics = trace_write_lock!(server_metrics);
212 server_metrics.set_server_info(&server);
213
214 server
215 }
216
217 pub fn run(self) {
222 let server = Arc::new(RwLock::new(self));
223 Self::run_server(server);
224 }
225
226 pub fn run_server(server: Arc<RwLock<Server>>) {
229 let single_threaded_executor = {
230 let server = trace_read_lock!(server);
231 let server_state = trace_read_lock!(server.server_state);
232 let config = trace_read_lock!(server_state.config);
233 config.performance.single_threaded_executor
234 };
235 let server_task = Self::new_server_task(server);
236 let mut builder = if !single_threaded_executor {
238 tokio::runtime::Builder::new_multi_thread()
239 } else {
240 tokio::runtime::Builder::new_current_thread()
241 };
242 let runtime = builder.enable_all().build().unwrap();
243 Self::run_server_on_runtime(runtime, server_task, true);
244 }
245
246 pub fn run_server_on_runtime<F>(
251 runtime: tokio::runtime::Runtime,
252 server_task: F,
253 block: bool,
254 ) -> Option<tokio::task::JoinHandle<<F as futures::Future>::Output>>
255 where
256 F: std::future::Future + Send + 'static,
257 F::Output: Send + 'static,
258 {
259 if block {
260 runtime.block_on(server_task);
261 info!("Server has finished");
262 None
263 } else {
264 Some(runtime.spawn(server_task))
265 }
266 }
267
268 pub async fn new_server_task(server: Arc<RwLock<Server>>) {
270 let (sock_addr, discovery_server_url) = {
272 let server = trace_read_lock!(server);
273
274 server.log_endpoint_info();
276
277 let sock_addr = server.get_socket_address();
278 let server_state = trace_read_lock!(server.server_state);
279 let config = trace_read_lock!(server_state.config);
280
281 let discovery_server_url =
283 if let Some(ref discovery_server_url) = config.discovery_server_url {
284 if is_valid_opc_ua_url(discovery_server_url) {
285 Some(discovery_server_url.clone())
286 } else {
287 None
288 }
289 } else {
290 None
291 };
292
293 (sock_addr, discovery_server_url)
294 };
295 match sock_addr {
296 None => {
297 error!("Cannot resolve server address, check configuration of server");
298 }
299 Some(sock_addr) => Self::server_task(server, sock_addr, discovery_server_url).await,
300 }
301 }
302
303 async fn server_task<A: ToSocketAddrs>(
304 server: Arc<RwLock<Server>>,
305 sock_addr: A,
306 discovery_server_url: Option<String>,
307 ) {
308 info!("Waiting for Connection");
310 let listener = match TcpListener::bind(&sock_addr).await {
312 Ok(listener) => listener,
313 Err(err) => {
314 panic!("Could not bind to socket {:?}", err)
315 }
316 };
317
318 let (tx_abort, rx_abort) = oneshot::channel();
319
320 {
322 let mut server = trace_write_lock!(server);
323 {
325 let mut server_state = trace_write_lock!(server.server_state);
326 server_state.start_time = DateTime::now();
327 server_state.set_state(ServerStateType::Running);
328 }
329
330 if let Some(ref discovery_server_url) = discovery_server_url {
332 server.start_discovery_server_registration_timer(discovery_server_url);
333 } else {
334 info!("Server has not set a discovery server url, so no registration will happen");
335 }
336
337 server.start_pending_polling_actions();
339 }
340
341 Self::start_abort_poll(server.clone(), tx_abort);
343
344 tokio::select! {
348 _ = async {
349 loop {
350 match listener.accept().await {
351 Ok((socket, _addr)) => {
352 info!("Handling new connection {:?}", socket);
354 let mut server = trace_write_lock!(server);
356 let is_abort = {
357 let server_state = trace_read_lock!(server.server_state);
358 server_state.is_abort()
359 };
360 if is_abort {
361 info!("Server is aborting so it will not accept new connections");
362 break;
363 } else {
364 server.handle_connection(socket);
365 }
366 }
367 Err(e) => {
368 error!("couldn't accept connection to client: {:?}", e);
369 }
370 }
371 }
372 Ok::<_, tokio::io::Error>(())
374 } => {}
375 _ = rx_abort => {
376 info!("abort received");
377 }
378 }
379 info!("main server task is finished");
380 }
381
382 pub fn server_state(&self) -> Arc<RwLock<ServerState>> {
386 self.server_state.clone()
387 }
388
389 pub fn certificate_store(&self) -> Arc<RwLock<CertificateStore>> {
391 self.certificate_store.clone()
392 }
393
394 pub fn address_space(&self) -> Arc<RwLock<AddressSpace>> {
398 self.address_space.clone()
399 }
400
401 pub fn connections(&self) -> Arc<RwLock<Connections>> {
405 self.connections.clone()
406 }
407
408 pub fn server_metrics(&self) -> Arc<RwLock<ServerMetrics>> {
412 self.server_metrics.clone()
413 }
414
415 pub fn single_threaded_executor(&self) -> bool {
417 let server_state = trace_read_lock!(self.server_state);
418 let config = trace_read_lock!(server_state.config);
419 config.performance.single_threaded_executor
420 }
421
422 pub fn abort(&mut self) {
425 info!("Server has been instructed to abort");
426 let mut server_state = trace_write_lock!(self.server_state);
427 server_state.abort();
428 }
429
430 fn remove_dead_connections(&self) -> bool {
433 let mut connections = trace_write_lock!(self.connections);
435 connections.retain(|transport| {
436 let lock = transport.try_read();
439 if let Some(ref transport) = lock {
440 let session_manager = transport.session_manager();
441 let session_manager = trace_read_lock!(session_manager);
442 !session_manager.sessions_terminated()
443 } else {
444 true
445 }
446 });
447 !connections.is_empty()
448 }
449
450 fn log_endpoint_info(&self) {
452 let server_state = trace_read_lock!(self.server_state);
453 let config = trace_read_lock!(server_state.config);
454 info!("OPC UA Server: {}", server_state.application_name);
455 info!("Base url: {}", server_state.base_endpoint);
456 info!("Supported endpoints:");
457 for (id, endpoint) in &config.endpoints {
458 let users: Vec<String> = endpoint.user_token_ids.iter().cloned().collect();
459 let users = users.join(", ");
460 info!("Endpoint \"{}\": {}", id, endpoint.path);
461 info!(" Security Mode: {}", endpoint.security_mode);
462 info!(" Security Policy: {}", endpoint.security_policy);
463 info!(" Supported user tokens - {}", users);
464 }
465 }
466
467 fn get_socket_address(&self) -> Option<SocketAddr> {
469 use std::net::ToSocketAddrs;
470 let server_state = trace_read_lock!(self.server_state);
471 let config = trace_read_lock!(server_state.config);
472 let address = format!("{}:{}", config.tcp_config.host, config.tcp_config.port);
474 if let Ok(mut addrs_iter) = address.to_socket_addrs() {
475 addrs_iter.next()
476 } else {
477 None
478 }
479 }
480
481 fn start_abort_poll(server: Arc<RwLock<Server>>, tx_abort: Sender<()>) {
485 tokio::spawn(async move {
486 let mut timer = interval_at(Instant::now(), Duration::from_millis(1000));
487 loop {
488 trace!("abort_poll_task.take_while");
489 {
491 let server = trace_read_lock!(server);
492 let has_open_connections = server.remove_dead_connections();
493 let server_state = trace_read_lock!(server.server_state);
494 if server_state.is_abort() {
496 if has_open_connections {
497 warn!("Abort called while there were still open connections");
498 }
499 info!("Server has aborted so, sending a command to break the listen loop");
500 tx_abort.send(()).unwrap();
501 break;
502 }
503 }
504 timer.tick().await;
505 }
506 info!("Abort poll task is finished");
507 });
508 }
509
510 #[cfg(not(feature = "discovery-server-registration"))]
512 fn start_discovery_server_registration_timer(&self, discovery_server_url: &str) {
513 info!("Discovery server registration is disabled in code so registration with {} will not happen", discovery_server_url);
514 }
515
516 #[cfg(feature = "discovery-server-registration")]
519 fn start_discovery_server_registration_timer(&self, discovery_server_url: &str) {
520 use crate::server::discovery;
521
522 let discovery_server_url = discovery_server_url.to_string();
523 info!(
524 "Server has set a discovery server url {} which will be used to register the server",
525 discovery_server_url
526 );
527 let server_state = self.server_state.clone();
528
529 let register_duration = Duration::from_secs(5 * 60);
532 let last_registered = Instant::now() - register_duration;
533 let last_registered = Arc::new(Mutex::new(last_registered));
534
535 tokio::spawn(async move {
536 let mut timer = interval_at(Instant::now(), Duration::from_millis(1000));
540 loop {
541 trace!("discovery_server_register.take_while");
542 {
543 let server_state = trace_read_lock!(server_state);
544 if !server_state.is_running() || server_state.is_abort() {
545 break;
546 }
547 }
548
549 timer.tick().await;
550
551 trace!("discovery_server_register.for_each");
554 let now = Instant::now();
555 let mut last_registered = trace_lock!(last_registered);
556 if now.duration_since(*last_registered) >= register_duration {
557 *last_registered = now;
558 let server_state = server_state.clone();
563 let discovery_server_url = discovery_server_url.clone();
564 let _ = std::thread::spawn(move || {
565 let _ = std::panic::catch_unwind(AssertUnwindSafe(move || {
566 let server_state = trace_read_lock!(server_state);
567 if server_state.is_running() {
568 discovery::register_with_discovery_server(
569 &discovery_server_url,
570 &server_state,
571 );
572 }
573 }));
574 });
575 }
576 }
577 info!("Discovery timer task is finished");
578 });
579 }
580
581 pub fn add_polling_action<F>(&mut self, interval_ms: u64, action: F)
589 where
590 F: Fn() + Send + Sync + 'static,
591 {
592 let server_state = trace_read_lock!(self.server_state);
594 if server_state.is_abort() {
595 error!("Polling action added when server is aborting");
596 } else if !server_state.is_running() {
598 self.pending_polling_actions
599 .push((interval_ms, Box::new(action)));
600 } else {
601 let _ = PollingAction::spawn(self.server_state.clone(), interval_ms, move || {
603 action();
605 });
606 }
607 }
608
609 fn start_pending_polling_actions(&mut self) {
611 let server_state = self.server_state.clone();
612 self.pending_polling_actions
613 .drain(..)
614 .for_each(|(interval_ms, action)| {
615 debug!(
616 "Starting a pending polling action at rate of {} ms",
617 interval_ms
618 );
619 let _ = PollingAction::spawn(server_state.clone(), interval_ms, move || {
620 action();
622 });
623 });
624 }
625
626 pub fn new_transport(&self) -> TcpTransport {
628 TcpTransport::new(
629 self.certificate_store.clone(),
630 self.server_state.clone(),
631 self.address_space.clone(),
632 self.session_manager.clone(),
633 )
634 }
635
636 fn handle_connection(&mut self, socket: TcpStream) {
638 trace!("Connection thread spawning");
639
640 let connection = Arc::new(RwLock::new(self.new_transport()));
642 {
643 let mut connections = trace_write_lock!(self.connections);
644 connections.push(connection.clone());
645 }
646
647 let looping_interval_ms = {
649 let server_state = trace_read_lock!(self.server_state);
650 f64::min(
652 server_state.min_publishing_interval_ms,
653 server_state.min_sampling_interval_ms,
654 )
655 };
656
657 TcpTransport::run(connection, socket, looping_interval_ms);
659 }
660}