use std::sync::{Arc, RwLock, Mutex};
use std::net::SocketAddr;
use std::marker::Sync;
use std::time::{Instant, Duration};
use std::thread;
use futures::{Future, Stream, future, sync::mpsc::{unbounded, UnboundedSender}};
use tokio::{self, net::{TcpListener, TcpStream}};
use tokio_timer::Interval;
use opcua_types::service_types::ServerState as ServerStateType;
use opcua_core::config::Config;
use opcua_core::prelude::*;
use crate::{
address_space::types::AddressSpace,
comms::tcp_transport::*,
comms::transport::Transport,
config::ServerConfig,
constants,
diagnostics::ServerDiagnostics,
discovery,
metrics::ServerMetrics,
services::message_handler::MessageHandler,
session::Session,
state::ServerState,
util::PollingAction,
};
pub type Connections = Vec<Arc<RwLock<TcpTransport>>>;
pub struct Server {
pending_polling_actions: Vec<(u64, Box<dyn Fn() + Send + Sync + 'static>)>,
certificate_store: Arc<RwLock<CertificateStore>>,
server_metrics: Arc<RwLock<ServerMetrics>>,
server_state: Arc<RwLock<ServerState>>,
address_space: Arc<RwLock<AddressSpace>>,
connections: Arc<RwLock<Connections>>,
}
impl From<ServerConfig> for Server {
fn from(config: ServerConfig) -> Server {
Server::new(config)
}
}
impl Server {
pub fn new(config: ServerConfig) -> Server {
if !config.is_valid() {
panic!("Cannot create a server using an invalid configuration.");
}
let application_name = config.application_name.clone();
let application_uri = UAString::from(config.application_uri.as_ref());
let product_uri = UAString::from(config.product_uri.as_ref());
let namespaces = vec!["http://opcfoundation.org/UA/".to_string(), "urn:OPCUA-Rust-Internal".to_string(), config.application_uri.clone()];
let start_time = DateTime::now();
let servers = vec![config.application_uri.clone()];
let base_endpoint = format!("opc.tcp://{}:{}", config.tcp_config.host, config.tcp_config.port);
let max_subscriptions = config.max_subscriptions as usize;
let diagnostics = Arc::new(RwLock::new(ServerDiagnostics::default()));
let application_description = if config.create_sample_keypair { Some(config.application_description()) } else { None };
let (mut certificate_store, server_certificate, server_pkey) = CertificateStore::new_with_keypair(&config.pki_dir, application_description);
if server_certificate.is_none() || server_pkey.is_none() {
error!("Server is missing its application instance certificate and/or its private key. Encrypted endpoints will not function correctly.")
}
if config.trust_client_certs {
info!("Server has chosen to auto trust client certificates. You do not want to do this in production code.");
certificate_store.trust_unknown_certs = true;
}
let config = Arc::new(RwLock::new(config.clone()));
let server_state = ServerState {
application_uri,
product_uri,
application_name: LocalizedText {
locale: UAString::null(),
text: UAString::from(application_name),
},
namespaces,
servers,
base_endpoint,
state: ServerStateType::Shutdown,
start_time,
config,
server_certificate,
server_pkey,
last_subscription_id: 0,
max_subscriptions,
min_publishing_interval: constants::MIN_PUBLISHING_INTERVAL,
default_keep_alive_count: constants::DEFAULT_KEEP_ALIVE_COUNT,
max_keep_alive_count: constants::MAX_KEEP_ALIVE_COUNT,
max_lifetime_count: constants::MAX_KEEP_ALIVE_COUNT * 3,
max_method_calls: constants::MAX_METHOD_CALLS,
max_nodes_per_node_management: constants::MAX_NODES_PER_NODE_MANAGEMENT,
max_browse_paths_per_translate: constants::MAX_BROWSE_PATHS_PER_TRANSLATE,
diagnostics,
abort: false,
register_nodes_callback: None,
unregister_nodes_callback: None,
};
let server_state = Arc::new(RwLock::new(server_state));
let address_space = Arc::new(RwLock::new(AddressSpace::new()));
{
let mut address_space = trace_write_lock_unwrap!(address_space);
address_space.set_server_state(server_state.clone());
}
let server_metrics = Arc::new(RwLock::new(ServerMetrics::new()));
let certificate_store = Arc::new(RwLock::new(certificate_store));
let server = Server {
pending_polling_actions: Vec::new(),
server_state,
server_metrics: server_metrics.clone(),
address_space,
certificate_store,
connections: Arc::new(RwLock::new(Vec::new())),
};
let mut server_metrics = trace_write_lock_unwrap!(server_metrics);
server_metrics.set_server_info(&server);
server
}
pub fn run(self) {
let server = Arc::new(RwLock::new(self));
Self::run_server(server);
}
pub fn run_server(server: Arc<RwLock<Server>>) {
let (sock_addr, discovery_server_url) = {
let server = trace_read_lock_unwrap!(server);
server.log_endpoint_info();
let sock_addr = server.get_socket_address();
let server_state = trace_read_lock_unwrap!(server.server_state);
let config = trace_read_lock_unwrap!(server_state.config);
let discovery_server_url = if let Some(ref discovery_server_url) = config.discovery_server_url {
if is_valid_opc_ua_url(discovery_server_url) {
Some(discovery_server_url.clone())
} else {
None
}
} else {
None
};
(sock_addr, discovery_server_url)
};
if sock_addr.is_none() {
error!("Cannot resolve server address, check configuration of server");
return;
}
let sock_addr = sock_addr.unwrap();
info!("Waiting for Connection");
tokio::run({
let server = server.clone();
let server_for_listener = server.clone();
let (tx_abort, rx_abort) = unbounded::<()>();
future::lazy(move || {
{
let mut server = trace_write_lock_unwrap!(server);
{
let mut server_state = trace_write_lock_unwrap!(server.server_state);
server_state.start_time = DateTime::now();
server_state.set_state(ServerStateType::Running);
}
server.start_discovery_server_registration_timer(discovery_server_url);
server.start_pending_polling_actions();
}
Self::start_abort_poll(server, tx_abort);
future::ok(())
}).and_then(move |_| {
use crate::completion_pact::stream_completion_pact;
let listener = TcpListener::bind(&sock_addr).unwrap();
stream_completion_pact(listener.incoming(), rx_abort)
.for_each(move |socket| {
info!("Handling new connection {:?}", socket);
let mut server = trace_write_lock_unwrap!(server_for_listener);
if {
let server_state = trace_read_lock_unwrap!(server.server_state);
server_state.is_abort()
} {
info!("Server is aborting so it will not accept new connections");
} else {
server.handle_connection(socket);
}
Ok(())
})
.map(|_| {
info!("Completion pact has completed");
})
.map_err(|err| {
error!("Completion pact, incoming error = {:?}", err);
})
}).map(|_| {
info!("Server task is finished");
}).map_err(|err| {
error!("Server task is finished with an error {:?}", err);
})
});
info!("Server has stopped");
}
pub fn server_state(&self) -> Arc<RwLock<ServerState>> {
self.server_state.clone()
}
pub fn certificate_store(&self) -> Arc<RwLock<CertificateStore>> {
self.certificate_store.clone()
}
pub fn address_space(&self) -> Arc<RwLock<AddressSpace>> {
self.address_space.clone()
}
pub fn connections(&self) -> Arc<RwLock<Connections>> {
self.connections.clone()
}
pub fn server_metrics(&self) -> Arc<RwLock<ServerMetrics>> {
self.server_metrics.clone()
}
pub fn abort(&mut self) {
info!("Server has been instructed to abort");
let mut server_state = trace_write_lock_unwrap!(self.server_state);
server_state.abort();
}
fn remove_dead_connections(&self) -> bool {
let mut connections = trace_write_lock_unwrap!(self.connections);
connections.retain(|connection| {
let mut lock = connection.try_read();
if let Ok(ref mut connection) = lock {
!connection.is_session_terminated()
} else {
true
}
});
!connections.is_empty()
}
fn log_endpoint_info(&self) {
let server_state = trace_read_lock_unwrap!(self.server_state);
let config = trace_read_lock_unwrap!(server_state.config);
info!("OPC UA Server: {}", server_state.application_name);
info!("Base url: {}", server_state.base_endpoint);
info!("Supported endpoints:");
for (id, endpoint) in &config.endpoints {
let users: Vec<String> = endpoint.user_token_ids.iter().map(|id| id.clone()).collect();
let users = users.join(", ");
info!("Endpoint \"{}\": {}", id, endpoint.path);
info!(" Security Mode: {}", endpoint.security_mode);
info!(" Security Policy: {}", endpoint.security_policy);
info!(" Supported user tokens - {}", users);
}
}
fn get_socket_address(&self) -> Option<SocketAddr> {
use std::net::ToSocketAddrs;
let server_state = trace_read_lock_unwrap!(self.server_state);
let config = trace_read_lock_unwrap!(server_state.config);
let address = format!("{}:{}", config.tcp_config.host, config.tcp_config.port);
if let Ok(mut addrs_iter) = address.to_socket_addrs() {
addrs_iter.next()
} else {
None
}
}
fn start_abort_poll(server: Arc<RwLock<Server>>, tx_abort: UnboundedSender<()>) {
let task = Interval::new(Instant::now(), Duration::from_millis(1000))
.take_while(move |_| {
trace!("abort_poll_task.take_while");
let abort = {
let server = trace_read_lock_unwrap!(server);
let has_open_connections = server.remove_dead_connections();
let server_state = trace_read_lock_unwrap!(server.server_state);
if server_state.is_abort() {
if has_open_connections {
warn!("Abort called while there were still open connections");
}
true
} else {
false
}
};
if abort {
info!("Server has aborted so, sending a command to break the listen loop");
tx_abort.unbounded_send(()).unwrap();
}
future::ok(!abort)
})
.for_each(|_| {
Ok(())
})
.map(|_| {
info!("Abort poll task is finished");
})
.map_err(|err| {
error!("Abort poll error = {:?}", err);
});
tokio::spawn(task);
}
fn start_discovery_server_registration_timer(&self, discovery_server_url: Option<String>) {
if let Some(discovery_server_url) = discovery_server_url {
info!("Server has set a discovery server url {} which will be used to register the server", discovery_server_url);
let server_state = self.server_state.clone();
let server_state_for_take = self.server_state.clone();
let register_duration = Duration::from_secs(5 * 60);
let last_registered = Instant::now() - register_duration;
let last_registered = Arc::new(Mutex::new(last_registered));
let task = Interval::new(Instant::now(), Duration::from_millis(1000))
.take_while(move |_| {
trace!("discovery_server_register.take_while");
let server_state = trace_read_lock_unwrap!(server_state_for_take);
future::ok(server_state.is_running() && !server_state.is_abort())
})
.for_each(move |_| {
trace!("discovery_server_register.for_each");
let now = Instant::now();
let mut last_registered = trace_lock_unwrap!(last_registered);
if now.duration_since(*last_registered) >= register_duration {
*last_registered = now;
let server_state = server_state.clone();
let discovery_server_url = discovery_server_url.clone();
let _ = thread::spawn(move || {
use std;
let _ = std::panic::catch_unwind(move || {
let server_state = trace_read_lock_unwrap!(server_state);
if server_state.is_running() {
discovery::register_with_discovery_server(&discovery_server_url, &server_state);
}
});
});
}
Ok(())
})
.map(|_| {
info!("Discovery timer task is finished");
})
.map_err(|err| {
error!("Discovery timer task registration error = {:?}", err);
});
tokio::spawn(task);
} else {
info!("Server has not set a discovery server url, so no registration will happen");
}
}
pub fn add_polling_action<F>(&mut self, interval_ms: u64, action: F)
where F: Fn() + Send + Sync + 'static {
let server_state = trace_read_lock_unwrap!(self.server_state);
if server_state.is_abort() {
error!("Polling action added when server is aborting");
} else if !server_state.is_running() {
self.pending_polling_actions.push((interval_ms, Box::new(action)));
} else {
let _ = PollingAction::spawn(self.server_state.clone(), interval_ms, move || {
action();
});
}
}
fn start_pending_polling_actions(&mut self) {
let server_state = self.server_state.clone();
self.pending_polling_actions
.drain(..)
.for_each(|(interval_ms, action)| {
debug!("Starting a pending polling action at rate of {} ms", interval_ms);
let _ = PollingAction::spawn(server_state.clone(), interval_ms, move || {
action();
});
});
}
pub fn new_transport(&self) -> TcpTransport {
let session = {
Arc::new(RwLock::new(Session::new(self)))
};
let address_space = self.address_space.clone();
let message_handler = MessageHandler::new(self.certificate_store.clone(), self.server_state.clone(), session.clone(), address_space.clone());
TcpTransport::new(self.server_state.clone(), session, address_space, message_handler)
}
fn handle_connection(&mut self, socket: TcpStream) {
trace!("Connection thread spawning");
let connection = Arc::new(RwLock::new(self.new_transport()));
{
let mut connections = trace_write_lock_unwrap!(self.connections);
connections.push(connection.clone());
}
TcpTransport::run(connection, socket);
}
}