use std::sync::{Arc, RwLock};
use opcua_core::prelude::*;
use opcua_crypto::{PrivateKey, SecurityPolicy, user_identity, X509};
use opcua_types::{
profiles,
service_types::{
ActivateSessionRequest, AnonymousIdentityToken, ApplicationDescription, ApplicationType, EndpointDescription,
RegisteredServer, ServerState as ServerStateType, SignatureData, UserNameIdentityToken, UserTokenPolicy, UserTokenType,
X509IdentityToken,
},
status_code::StatusCode,
};
use crate::{
callbacks::{RegisterNodes, UnregisterNodes},
config::{ServerConfig, ServerEndpoint},
constants,
diagnostics::ServerDiagnostics,
events::{
audit::{AuditEvent, AuditLog},
event::Event,
},
historical::{HistoricalDataProvider, HistoricalEventProvider},
identity_token::{IdentityToken, POLICY_ID_ANONYMOUS, POLICY_ID_USER_PASS_NONE, POLICY_ID_USER_PASS_RSA_15, POLICY_ID_USER_PASS_RSA_OAEP, POLICY_ID_X509},
};
pub(crate) struct OperationalLimits {
pub max_nodes_per_translate_browse_paths_to_node_ids: usize,
pub max_nodes_per_read: usize,
pub max_nodes_per_write: usize,
pub max_nodes_per_method_call: usize,
pub max_nodes_per_browse: usize,
pub max_nodes_per_register_nodes: usize,
pub max_nodes_per_node_management: usize,
pub max_monitored_items_per_call: usize,
pub max_nodes_per_history_read_data: usize,
pub max_nodes_per_history_read_events: usize,
pub max_nodes_per_history_update_data: usize,
pub max_nodes_per_history_update_events: usize,
}
impl Default for OperationalLimits {
fn default() -> Self {
Self {
max_nodes_per_translate_browse_paths_to_node_ids: constants::MAX_NODES_PER_TRANSLATE_BROWSE_PATHS_TO_NODE_IDS,
max_nodes_per_read: constants::MAX_NODES_PER_READ,
max_nodes_per_write: constants::MAX_NODES_PER_WRITE,
max_nodes_per_method_call: constants::MAX_NODES_PER_METHOD_CALL,
max_nodes_per_browse: constants::MAX_NODES_PER_BROWSE,
max_nodes_per_register_nodes: constants::MAX_NODES_PER_REGISTER_NODES,
max_nodes_per_node_management: constants::MAX_NODES_PER_NODE_MANAGEMENT,
max_monitored_items_per_call: constants::MAX_MONITORED_ITEMS_PER_CALL,
max_nodes_per_history_read_data: constants::MAX_NODES_PER_HISTORY_READ_DATA,
max_nodes_per_history_read_events: constants::MAX_NODES_PER_HISTORY_READ_EVENTS,
max_nodes_per_history_update_data: constants::MAX_NODES_PER_HISTORY_UPDATE_DATA,
max_nodes_per_history_update_events: constants::MAX_NODES_PER_HISTORY_UPDATE_EVENTS,
}
}
}
pub struct ServerState {
pub application_uri: UAString,
pub product_uri: UAString,
pub application_name: LocalizedText,
pub base_endpoint: String,
pub start_time: DateTime,
pub servers: Vec<String>,
pub config: Arc<RwLock<ServerConfig>>,
pub server_certificate: Option<X509>,
pub server_pkey: Option<PrivateKey>,
pub last_subscription_id: u32,
pub max_subscriptions: usize,
pub max_monitored_items_per_sub: usize,
pub min_publishing_interval_ms: Duration,
pub min_sampling_interval_ms: Duration,
pub default_keep_alive_count: u32,
pub max_keep_alive_count: u32,
pub max_lifetime_count: u32,
pub(crate) operational_limits: OperationalLimits,
pub state: ServerStateType,
pub abort: bool,
pub(crate) audit_log: Arc<RwLock<AuditLog>>,
pub(crate) diagnostics: Arc<RwLock<ServerDiagnostics>>,
pub(crate) register_nodes_callback: Option<Box<dyn RegisterNodes + Send + Sync>>,
pub(crate) unregister_nodes_callback: Option<Box<dyn UnregisterNodes + Send + Sync>>,
pub(crate) historical_data_provider: Option<Box<dyn HistoricalDataProvider + Send + Sync>>,
pub(crate) historical_event_provider: Option<Box<dyn HistoricalEventProvider + Send + Sync>>,
}
impl ServerState {
pub fn endpoints(&self, endpoint_url: &UAString, transport_profile_uris: &Option<Vec<UAString>>) -> Option<Vec<EndpointDescription>> {
debug!("Endpoints requested, transport profile uris {:?}", transport_profile_uris);
if let Some(ref transport_profile_uris) = *transport_profile_uris {
if !transport_profile_uris.is_empty() {
let found_binary_transport = transport_profile_uris.iter().any(|profile_uri| {
profile_uri.as_ref() == profiles::TRANSPORT_PROFILE_URI_BINARY
});
if !found_binary_transport {
error!("Client wants to connect with a non binary transport {:#?}", transport_profile_uris);
return None;
}
}
}
let config = trace_read_lock_unwrap!(self.config);
if let Ok(hostname) = hostname_from_url(endpoint_url.as_ref()) {
if !hostname.eq_ignore_ascii_case(&config.tcp_config.host) {
debug!("Endpoint url \"{}\" hostname supplied by caller does not match server's hostname \"{}\"", endpoint_url, &config.tcp_config.host);
}
let endpoints = config.endpoints.iter()
.map(|(_, e)| {
self.new_endpoint_description(&config, e, true)
})
.collect();
Some(endpoints)
} else {
warn!("Endpoint url \"{}\" is unrecognized, using default", endpoint_url);
if let Some(e) = config.default_endpoint() {
Some(vec![self.new_endpoint_description(&config, e, true)])
}
else {
Some(vec![])
}
}
}
pub fn endpoint_exists(&self, endpoint_url: &str, security_policy: SecurityPolicy, security_mode: MessageSecurityMode) -> bool {
let config = trace_read_lock_unwrap!(self.config);
config.find_endpoint(endpoint_url, security_policy, security_mode).is_some()
}
pub fn new_endpoint_descriptions(&self, endpoint_url: &str) -> Option<Vec<EndpointDescription>> {
debug!("find_endpoint, url = {}", endpoint_url);
let config = trace_read_lock_unwrap!(self.config);
let base_endpoint_url = config.base_endpoint_url();
let endpoints: Vec<EndpointDescription> = config.endpoints.iter().filter(|&(_, e)| {
url_matches_except_host(&e.endpoint_url(&base_endpoint_url), endpoint_url)
}).map(|(_, e)| self.new_endpoint_description(&config, e, false)).collect();
if endpoints.is_empty() { None } else { Some(endpoints) }
}
fn user_pass_security_policy_id(endpoint: &ServerEndpoint) -> UAString {
match endpoint.password_security_policy() {
SecurityPolicy::None => POLICY_ID_USER_PASS_NONE,
SecurityPolicy::Basic128Rsa15 => POLICY_ID_USER_PASS_RSA_15,
SecurityPolicy::Basic256 | SecurityPolicy::Basic256Sha256 => POLICY_ID_USER_PASS_RSA_OAEP,
SecurityPolicy::Aes128Sha256RsaOaep | SecurityPolicy::Aes256Sha256RsaPss => POLICY_ID_USER_PASS_RSA_OAEP,
_ => { panic!() }
}.into()
}
fn user_pass_security_policy_uri(_endpoint: &ServerEndpoint) -> UAString {
UAString::null()
}
fn user_identity_tokens(&self, config: &ServerConfig, endpoint: &ServerEndpoint) -> Vec<UserTokenPolicy> {
let mut user_identity_tokens = Vec::with_capacity(3);
if endpoint.supports_anonymous() {
user_identity_tokens.push(UserTokenPolicy {
policy_id: UAString::from(POLICY_ID_ANONYMOUS),
token_type: UserTokenType::Anonymous,
issued_token_type: UAString::null(),
issuer_endpoint_url: UAString::null(),
security_policy_uri: UAString::null(),
});
}
if endpoint.supports_user_pass(&config.user_tokens) {
user_identity_tokens.push(UserTokenPolicy {
policy_id: Self::user_pass_security_policy_id(endpoint),
token_type: UserTokenType::UserName,
issued_token_type: UAString::null(),
issuer_endpoint_url: UAString::null(),
security_policy_uri: Self::user_pass_security_policy_uri(endpoint),
});
}
if endpoint.supports_x509(&config.user_tokens) {
user_identity_tokens.push(UserTokenPolicy {
policy_id: UAString::from(POLICY_ID_X509),
token_type: UserTokenType::Certificate,
issued_token_type: UAString::null(),
issuer_endpoint_url: UAString::null(),
security_policy_uri: UAString::from(SecurityPolicy::Basic128Rsa15.to_uri()),
});
}
if user_identity_tokens.is_empty() {
debug!("user_identity_tokens() returned zero endpoints for endpoint {} / {} {}", endpoint.path, endpoint.security_policy, endpoint.security_mode);
}
user_identity_tokens
}
fn new_endpoint_description(&self, config: &ServerConfig, endpoint: &ServerEndpoint, all_fields: bool) -> EndpointDescription {
let base_endpoint_url = config.base_endpoint_url();
let user_identity_tokens = self.user_identity_tokens(config, endpoint);
let (server, server_certificate) = if all_fields {
(ApplicationDescription {
application_uri: self.application_uri.clone(),
product_uri: self.product_uri.clone(),
application_name: self.application_name.clone(),
application_type: self.application_type(),
gateway_server_uri: self.gateway_server_uri(),
discovery_profile_uri: UAString::null(),
discovery_urls: self.discovery_urls(),
}, self.server_certificate_as_byte_string())
} else {
(ApplicationDescription {
application_uri: UAString::null(),
product_uri: UAString::null(),
application_name: LocalizedText::null(),
application_type: self.application_type(),
gateway_server_uri: self.gateway_server_uri(),
discovery_profile_uri: UAString::null(),
discovery_urls: self.discovery_urls(),
}, ByteString::null())
};
EndpointDescription {
endpoint_url: endpoint.endpoint_url(&base_endpoint_url).into(),
server,
server_certificate,
security_mode: endpoint.message_security_mode(),
security_policy_uri: UAString::from(endpoint.security_policy().to_uri()),
user_identity_tokens: Some(user_identity_tokens),
transport_profile_uri: UAString::from(profiles::TRANSPORT_PROFILE_URI_BINARY),
security_level: endpoint.security_level,
}
}
pub fn discovery_urls(&self) -> Option<Vec<UAString>> {
let config = trace_read_lock_unwrap!(self.config);
if config.discovery_urls.is_empty() {
None
} else {
Some(config.discovery_urls.iter().map(|url| UAString::from(url)).collect())
}
}
pub fn application_type(&self) -> ApplicationType { ApplicationType::Server }
pub fn gateway_server_uri(&self) -> UAString { UAString::null() }
pub fn abort(&mut self) {
info!("Server has been told to abort");
self.abort = true;
self.state = ServerStateType::Shutdown;
}
pub fn state(&self) -> ServerStateType { self.state }
pub fn set_state(&mut self, state: ServerStateType) {
self.state = state;
}
pub fn is_abort(&self) -> bool { self.abort }
pub fn is_running(&self) -> bool { self.state == ServerStateType::Running }
pub fn server_certificate_as_byte_string(&self) -> ByteString {
if let Some(ref server_certificate) = self.server_certificate {
server_certificate.as_byte_string()
} else {
ByteString::null()
}
}
pub fn registered_server(&self) -> RegisteredServer {
let server_uri = self.application_uri.clone();
let product_uri = self.product_uri.clone();
let gateway_server_uri = self.gateway_server_uri();
let discovery_urls = self.discovery_urls();
let server_type = self.application_type();
let is_online = self.is_running();
let server_names = Some(vec![self.application_name.clone()]);
RegisteredServer {
server_uri,
product_uri,
server_names,
server_type,
gateway_server_uri,
discovery_urls,
semaphore_file_path: UAString::null(),
is_online,
}
}
pub fn create_subscription_id(&mut self) -> u32 {
self.last_subscription_id += 1;
self.last_subscription_id
}
pub fn authenticate_endpoint(&self, request: &ActivateSessionRequest, endpoint_url: &str, security_policy: SecurityPolicy, security_mode: MessageSecurityMode, user_identity_token: &ExtensionObject, server_nonce: &ByteString) -> Result<String, StatusCode> {
let config = trace_read_lock_unwrap!(self.config);
if let Some(endpoint) = config.find_endpoint(endpoint_url, security_policy, security_mode) {
match IdentityToken::new(user_identity_token, &self.decoding_limits()) {
IdentityToken::None => {
error!("User identity token type unsupported");
Err(StatusCode::BadIdentityTokenInvalid)
}
IdentityToken::AnonymousIdentityToken(token) => {
Self::authenticate_anonymous_token(endpoint, &token)
}
IdentityToken::UserNameIdentityToken(token) => {
self.authenticate_username_identity_token(&config, endpoint, &token, &self.server_pkey, server_nonce)
}
IdentityToken::X509IdentityToken(token) => {
self.authenticate_x509_identity_token(&config, endpoint, &token, &request.user_token_signature, &self.server_certificate, server_nonce)
}
IdentityToken::Invalid(o) => {
error!("User identity token type {:?} is unsupported", o.node_id);
Err(StatusCode::BadIdentityTokenInvalid)
}
}
} else {
error!("Cannot find endpoint that matches path \"{}\", security policy {:?}, and security mode {:?}", endpoint_url, security_policy, security_mode);
Err(StatusCode::BadTcpEndpointUrlInvalid)
}
}
pub fn set_register_nodes_callbacks(&mut self, register_nodes_callback: Box<dyn RegisterNodes + Send + Sync>, unregister_nodes_callback: Box<dyn UnregisterNodes + Send + Sync>) {
self.register_nodes_callback = Some(register_nodes_callback);
self.unregister_nodes_callback = Some(unregister_nodes_callback);
}
pub fn decoding_limits(&self) -> DecodingLimits {
let config = trace_read_lock_unwrap!(self.config);
config.decoding_limits()
}
fn authenticate_anonymous_token(endpoint: &ServerEndpoint, token: &AnonymousIdentityToken) -> Result<String, StatusCode> {
if token.policy_id.as_ref() != POLICY_ID_ANONYMOUS {
error!("Token doesn't possess the correct policy id");
Err(StatusCode::BadIdentityTokenInvalid)
} else if !endpoint.supports_anonymous() {
error!("Endpoint \"{}\" does not support anonymous authentication", endpoint.path);
Err(StatusCode::BadIdentityTokenRejected)
} else {
debug!("Anonymous identity is authenticated");
Ok(String::from(crate::config::ANONYMOUS_USER_TOKEN_ID))
}
}
fn authenticate_username_identity_token(&self, config: &ServerConfig, endpoint: &ServerEndpoint, token: &UserNameIdentityToken, server_key: &Option<PrivateKey>, server_nonce: &ByteString) -> Result<String, StatusCode> {
if !endpoint.supports_user_pass(&config.user_tokens) {
error!("Endpoint doesn't support username password tokens");
Err(StatusCode::BadIdentityTokenRejected)
} else if token.policy_id != Self::user_pass_security_policy_id(endpoint) {
error!("Token doesn't possess the correct policy id");
Err(StatusCode::BadIdentityTokenInvalid)
} else if token.user_name.is_null() {
error!("User identify token supplies no user name");
Err(StatusCode::BadIdentityTokenInvalid)
} else {
debug!("policy id = {}, encryption algorithm = {}", token.policy_id.as_ref(), token.encryption_algorithm.as_ref());
let token_password = if !token.encryption_algorithm.is_null() {
if let Some(ref server_key) = server_key {
user_identity::decrypt_user_identity_token_password(&token, server_nonce.as_ref(), server_key)?
} else {
error!("Identity token password is encrypted but no server private key was supplied");
return Err(StatusCode::BadIdentityTokenInvalid);
}
} else {
token.plaintext_password()?
};
for user_token_id in &endpoint.user_token_ids {
if let Some(server_user_token) = config.user_tokens.get(user_token_id) {
if server_user_token.is_user_pass() && &server_user_token.user == token.user_name.as_ref() {
let valid = if server_user_token.pass.is_none() {
token_password.is_empty()
} else {
let server_password = server_user_token.pass.as_ref().unwrap().as_bytes();
server_password == token_password.as_bytes()
};
if !valid {
error!("Cannot authenticate \"{}\", password is invalid", server_user_token.user);
return Err(StatusCode::BadUserAccessDenied);
} else {
return Ok(user_token_id.clone());
}
}
}
}
error!("Cannot authenticate \"{}\", user not found for endpoint", token.user_name);
Err(StatusCode::BadUserAccessDenied)
}
}
fn authenticate_x509_identity_token(&self, config: &ServerConfig, endpoint: &ServerEndpoint, token: &X509IdentityToken, user_token_signature: &SignatureData, server_certificate: &Option<X509>, server_nonce: &ByteString) -> Result<String, StatusCode> {
if !endpoint.supports_x509(&config.user_tokens) {
error!("Endpoint doesn't support x509 tokens");
Err(StatusCode::BadIdentityTokenRejected)
} else if token.policy_id.as_ref() != POLICY_ID_X509 {
error!("Token doesn't possess the correct policy id");
Err(StatusCode::BadIdentityTokenRejected)
} else {
let result = match server_certificate {
Some(ref server_certificate) => {
let user_identity_tokens = self.user_identity_tokens(config, endpoint);
let security_policy = user_identity_tokens.iter()
.find(|t| t.token_type == UserTokenType::Certificate)
.map(|t| SecurityPolicy::from_uri(t.security_policy_uri.as_ref()))
.unwrap_or(endpoint.security_policy());
match security_policy {
SecurityPolicy::Unknown | SecurityPolicy::None => Err(StatusCode::BadIdentityTokenInvalid),
security_policy => {
user_identity::verify_x509_identity_token(token, user_token_signature, security_policy, server_certificate, server_nonce.as_ref())
}
}
}
None => {
Err(StatusCode::BadIdentityTokenInvalid)
}
};
result.and_then(|_| {
let signing_cert = X509::from_byte_string(&token.certificate_data)?;
let signing_thumbprint = signing_cert.thumbprint();
for user_token_id in &endpoint.user_token_ids {
if let Some(server_user_token) = config.user_tokens.get(user_token_id) {
if let Some(ref user_thumbprint) = server_user_token.thumbprint {
if *user_thumbprint == signing_thumbprint {
return Ok(user_token_id.clone());
}
}
}
}
Err(StatusCode::BadIdentityTokenInvalid)
})
}
}
pub fn set_historical_data_provider(&mut self, historical_data_provider: Box<dyn HistoricalDataProvider + Send + Sync>) {
self.historical_data_provider = Some(historical_data_provider);
}
pub fn set_historical_event_provider(&mut self, historical_event_provider: Box<dyn HistoricalEventProvider + Send + Sync>) {
self.historical_event_provider = Some(historical_event_provider);
}
pub(crate) fn raise_and_log<T>(&self, event: T) -> Result<NodeId, ()> where T: AuditEvent + Event {
let audit_log = trace_write_lock_unwrap!(self.audit_log);
audit_log.raise_and_log(event)
}
}