use std::{
self,
collections::BTreeMap,
path::{Path, PathBuf},
str::FromStr,
time::Duration,
};
use chrono::TimeDelta;
use serde::{Deserialize, Serialize};
use tracing::warn;
use opcua_core::config::Config;
use opcua_crypto::SecurityPolicy;
use opcua_types::{
ApplicationType, EndpointDescription, Error, MessageSecurityMode, StatusCode, UAString,
};
use crate::{Client, IdentityToken, SessionRetryPolicy};
pub const ANONYMOUS_USER_TOKEN_ID: &str = "ANONYMOUS";
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct ClientUserToken {
pub user: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub password: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cert_path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub private_key_path: Option<String>,
}
impl ClientUserToken {
pub fn user_pass<S, T>(user: S, password: T) -> Self
where
S: Into<String>,
T: Into<String>,
{
ClientUserToken {
user: user.into(),
password: Some(password.into()),
cert_path: None,
private_key_path: None,
}
}
pub fn x509<S>(user: S, cert_path: &Path, private_key_path: &Path) -> Self
where
S: Into<String>,
{
ClientUserToken {
user: user.into(),
password: None,
cert_path: Some(cert_path.to_string_lossy().to_string()),
private_key_path: Some(private_key_path.to_string_lossy().to_string()),
}
}
pub fn validate(&self) -> Result<(), Vec<String>> {
let mut errors = Vec::new();
if self.user.is_empty() {
errors.push("User token has an empty name.".to_owned());
}
if self.password.is_some() {
if self.cert_path.is_some() || self.private_key_path.is_some() {
errors.push(format!(
"User token {} holds a password and certificate info - it cannot be both.",
self.user
));
}
} else if self.cert_path.is_none() && self.private_key_path.is_none() {
errors.push(format!(
"User token {} fails to provide a password or certificate info.",
self.user
));
} else if self.cert_path.is_none() || self.private_key_path.is_none() {
errors.push(format!(
"User token {} fails to provide both a certificate path and a private key path.",
self.user
));
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct ClientEndpoint {
pub url: String,
pub security_policy: String,
pub security_mode: String,
#[serde(default = "ClientEndpoint::anonymous_id")]
pub user_token_id: String,
}
impl ClientEndpoint {
pub fn new<T>(url: T) -> Self
where
T: Into<String>,
{
ClientEndpoint {
url: url.into(),
security_policy: SecurityPolicy::None.to_str().into(),
security_mode: MessageSecurityMode::None.into(),
user_token_id: Self::anonymous_id(),
}
}
fn anonymous_id() -> String {
ANONYMOUS_USER_TOKEN_ID.to_string()
}
pub fn security_policy(&self) -> SecurityPolicy {
SecurityPolicy::from_str(&self.security_policy).unwrap()
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub(crate) struct DecodingOptions {
#[serde(default = "defaults::max_message_size")]
pub(crate) max_message_size: usize,
#[serde(default = "defaults::max_chunk_count")]
pub(crate) max_chunk_count: usize,
#[serde(default = "defaults::max_chunk_size")]
pub(crate) max_chunk_size: usize,
#[serde(default = "defaults::max_incoming_chunk_size")]
pub(crate) max_incoming_chunk_size: usize,
#[serde(default = "defaults::max_string_length")]
pub(crate) max_string_length: usize,
#[serde(default = "defaults::max_byte_string_length")]
pub(crate) max_byte_string_length: usize,
#[serde(default = "defaults::max_array_length")]
pub(crate) max_array_length: usize,
}
impl DecodingOptions {
pub(crate) fn as_comms_decoding_options(&self) -> opcua_types::DecodingOptions {
opcua_types::DecodingOptions {
max_chunk_count: self.max_chunk_count,
max_message_size: self.max_message_size,
max_string_length: self.max_string_length,
max_byte_string_length: self.max_byte_string_length,
max_array_length: self.max_array_length,
client_offset: TimeDelta::zero(),
..Default::default()
}
}
}
impl Default for DecodingOptions {
fn default() -> Self {
Self {
max_message_size: defaults::max_message_size(),
max_chunk_count: defaults::max_chunk_count(),
max_chunk_size: defaults::max_chunk_size(),
max_incoming_chunk_size: defaults::max_incoming_chunk_size(),
max_string_length: defaults::max_string_length(),
max_byte_string_length: defaults::max_byte_string_length(),
max_array_length: defaults::max_array_length(),
}
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub(crate) struct Performance {
#[serde(default)]
pub(crate) ignore_clock_skew: bool,
#[serde(default = "defaults::recreate_monitored_items_chunk")]
pub(crate) recreate_monitored_items_chunk: usize,
}
impl Default for Performance {
fn default() -> Self {
Self {
ignore_clock_skew: false,
recreate_monitored_items_chunk: defaults::recreate_monitored_items_chunk(),
}
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct ClientConfig {
pub(crate) application_name: String,
pub(crate) application_uri: String,
pub(crate) product_uri: String,
pub(crate) create_sample_keypair: bool,
pub(crate) certificate_path: Option<PathBuf>,
pub(crate) private_key_path: Option<PathBuf>,
pub(crate) trust_server_certs: bool,
pub(crate) verify_server_certs: bool,
pub(crate) pki_dir: PathBuf,
pub(crate) preferred_locales: Vec<String>,
pub(crate) default_endpoint: String,
pub(crate) endpoints: BTreeMap<String, ClientEndpoint>,
pub(crate) user_tokens: BTreeMap<String, ClientUserToken>,
#[serde(default = "defaults::session_nonce_length")]
pub(crate) session_nonce_length: usize,
#[serde(default = "defaults::channel_lifetime")]
pub(crate) channel_lifetime: u32,
#[serde(default)]
pub(crate) decoding_options: DecodingOptions,
#[serde(default = "defaults::session_retry_limit")]
pub(crate) session_retry_limit: i32,
#[serde(default = "defaults::session_retry_initial")]
pub(crate) session_retry_initial: Duration,
#[serde(default = "defaults::session_retry_max")]
pub(crate) session_retry_max: Duration,
#[serde(default = "defaults::keep_alive_interval")]
pub(crate) keep_alive_interval: Duration,
#[serde(default = "defaults::max_failed_keep_alive_count")]
pub(crate) max_failed_keep_alive_count: u64,
#[serde(default = "defaults::request_timeout")]
pub(crate) request_timeout: Duration,
#[serde(default = "defaults::publish_timeout")]
pub(crate) publish_timeout: Duration,
#[serde(default = "defaults::min_publish_interval")]
pub(crate) min_publish_interval: Duration,
#[serde(default)]
pub(crate) performance: Performance,
#[serde(default = "defaults::recreate_subscriptions")]
pub(crate) recreate_subscriptions: bool,
pub(crate) session_name: String,
#[serde(default = "defaults::session_timeout")]
pub(crate) session_timeout: u32,
}
impl Config for ClientConfig {
fn validate(&self) -> Result<(), Vec<String>> {
let mut errors = Vec::new();
if self.application_name.is_empty() {
errors.push("Application name is empty".to_owned());
}
if self.application_uri.is_empty() {
errors.push("Application uri is empty".to_owned());
}
if self.user_tokens.contains_key(ANONYMOUS_USER_TOKEN_ID) {
errors.push(format!(
"User tokens contains the reserved \"{ANONYMOUS_USER_TOKEN_ID}\" id"
));
}
if self.user_tokens.contains_key("") {
errors.push("User tokens contains an endpoint with an empty id".to_owned());
}
self.user_tokens.iter().for_each(|(k, token)| {
if let Err(e) = token.validate() {
errors.push(format!("Token {k} failed to validate: {}", e.join(", ")))
}
});
if self.endpoints.is_empty() {
warn!("Endpoint config contains no endpoints");
} else {
if self.endpoints.contains_key("") {
errors.push("Endpoints contains an endpoint with an empty id".to_owned());
}
if !self.default_endpoint.is_empty()
&& !self.endpoints.contains_key(&self.default_endpoint)
{
errors.push(format!(
"Default endpoint id {} does not exist in list of endpoints",
self.default_endpoint
));
}
self.endpoints.iter().for_each(|(id, e)| {
if SecurityPolicy::from_str(&e.security_policy).unwrap() != SecurityPolicy::Unknown
{
if MessageSecurityMode::Invalid
== MessageSecurityMode::from(e.security_mode.as_ref())
{
errors.push(format!(
"Endpoint {} security mode {} is invalid",
id, e.security_mode
));
}
} else {
errors.push(format!(
"Endpoint {} security policy {} is invalid",
id, e.security_policy
));
}
});
}
if self.session_retry_limit < 0 && self.session_retry_limit != -1 {
errors.push(format!("Session retry limit of {} is invalid - must be -1 (infinite), 0 (never) or a positive value", self.session_retry_limit));
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
fn application_name(&self) -> UAString {
UAString::from(&self.application_name)
}
fn application_uri(&self) -> UAString {
UAString::from(&self.application_uri)
}
fn product_uri(&self) -> UAString {
UAString::from(&self.product_uri)
}
fn application_type(&self) -> ApplicationType {
ApplicationType::Client
}
}
impl ClientConfig {
pub fn session_retry_policy(&self) -> SessionRetryPolicy {
SessionRetryPolicy::new(
self.session_retry_max,
if self.session_retry_limit < 0 {
None
} else {
Some(self.session_retry_limit as u32)
},
self.session_retry_initial,
)
}
pub fn client_identity_token(
&self,
user_token_id: impl Into<String>,
) -> Result<IdentityToken, Error> {
let user_token_id = user_token_id.into();
if user_token_id == ANONYMOUS_USER_TOKEN_ID {
Ok(IdentityToken::Anonymous)
} else {
let Some(token) = self.user_tokens.get(&user_token_id) else {
return Err(Error::new(
StatusCode::BadInvalidArgument,
format!("Requested user token: {user_token_id} not found in config",),
));
};
if let Some(ref password) = token.password {
Ok(IdentityToken::UserName(token.user.clone(), password.into()))
} else if let Some(ref cert_path) = token.cert_path {
let Some(private_key_path) = &token.private_key_path else {
return Err(Error::new(
StatusCode::BadInvalidArgument,
"Client identity token with certificate does not have a private key",
));
};
IdentityToken::new_x509_path(cert_path, private_key_path)
} else {
Err(Error::new(
StatusCode::BadInvalidArgument,
"Non-anonymous client identity token with neither password nor certificate",
))
}
}
}
pub(super) fn endpoint_description_for_client_endpoint(
&self,
client_endpoint: &ClientEndpoint,
endpoints: &[EndpointDescription],
) -> Result<EndpointDescription, Error> {
let security_policy =
SecurityPolicy::from_str(&client_endpoint.security_policy).map_err(|_| {
Error::new(
StatusCode::BadSecurityPolicyRejected,
format!(
"Endpoint {} security policy {} is invalid",
client_endpoint.url, client_endpoint.security_policy
),
)
})?;
let security_mode = MessageSecurityMode::from(client_endpoint.security_mode.as_ref());
if security_mode == MessageSecurityMode::Invalid {
return Err(Error::new(
StatusCode::BadSecurityModeRejected,
format!(
"Endpoint {} security mode {} is invalid",
client_endpoint.url, client_endpoint.security_mode
),
));
}
let endpoint_url = client_endpoint.url.clone();
let endpoint = Client::find_matching_endpoint(
endpoints,
&endpoint_url,
security_policy,
security_mode,
)
.ok_or_else(|| {
Error::new(
StatusCode::BadTcpEndpointUrlInvalid,
format!(
"Endpoint {endpoint_url}, {security_policy:?} / {security_mode:?} does not match any supplied by the server"
),
)
})?;
Ok(endpoint)
}
}
impl Default for ClientConfig {
fn default() -> Self {
Self::new("", "")
}
}
mod defaults {
use std::time::Duration;
use crate::retry::SessionRetryPolicy;
pub(super) fn verify_server_certs() -> bool {
true
}
pub(super) fn channel_lifetime() -> u32 {
60_000
}
pub(super) fn session_retry_limit() -> i32 {
SessionRetryPolicy::DEFAULT_RETRY_LIMIT as i32
}
pub(super) fn session_retry_initial() -> Duration {
Duration::from_secs(1)
}
pub(super) fn session_retry_max() -> Duration {
Duration::from_secs(30)
}
pub(super) fn keep_alive_interval() -> Duration {
Duration::from_secs(10)
}
pub(super) fn max_array_length() -> usize {
opcua_types::constants::MAX_ARRAY_LENGTH
}
pub(super) fn max_byte_string_length() -> usize {
opcua_types::constants::MAX_BYTE_STRING_LENGTH
}
pub(super) fn max_chunk_count() -> usize {
opcua_types::constants::MAX_CHUNK_COUNT
}
pub(super) fn max_chunk_size() -> usize {
65535
}
pub(super) fn max_failed_keep_alive_count() -> u64 {
0
}
pub(super) fn max_incoming_chunk_size() -> usize {
65535
}
pub(super) fn max_message_size() -> usize {
opcua_types::constants::MAX_MESSAGE_SIZE
}
pub(super) fn max_string_length() -> usize {
opcua_types::constants::MAX_STRING_LENGTH
}
pub(super) fn request_timeout() -> Duration {
Duration::from_secs(60)
}
pub(super) fn publish_timeout() -> Duration {
Duration::from_secs(60)
}
pub(super) fn min_publish_interval() -> Duration {
Duration::from_millis(100)
}
pub(super) fn recreate_monitored_items_chunk() -> usize {
1000
}
pub(super) fn recreate_subscriptions() -> bool {
true
}
pub(super) fn session_timeout() -> u32 {
60_000
}
pub(super) fn session_nonce_length() -> usize {
32
}
}
impl ClientConfig {
pub const PKI_DIR: &'static str = "pki";
pub fn new(application_name: impl Into<String>, application_uri: impl Into<String>) -> Self {
let mut pki_dir = std::env::current_dir().unwrap();
pki_dir.push(Self::PKI_DIR);
ClientConfig {
application_name: application_name.into(),
application_uri: application_uri.into(),
product_uri: String::new(),
create_sample_keypair: false,
certificate_path: None,
private_key_path: None,
trust_server_certs: false,
verify_server_certs: defaults::verify_server_certs(),
pki_dir,
preferred_locales: Vec::new(),
default_endpoint: String::new(),
endpoints: BTreeMap::new(),
user_tokens: BTreeMap::new(),
channel_lifetime: defaults::channel_lifetime(),
decoding_options: DecodingOptions::default(),
session_retry_limit: defaults::session_retry_limit(),
session_retry_initial: defaults::session_retry_initial(),
session_retry_max: defaults::session_retry_max(),
keep_alive_interval: defaults::keep_alive_interval(),
max_failed_keep_alive_count: defaults::max_failed_keep_alive_count(),
request_timeout: defaults::request_timeout(),
publish_timeout: defaults::publish_timeout(),
min_publish_interval: defaults::min_publish_interval(),
performance: Performance::default(),
recreate_subscriptions: defaults::recreate_subscriptions(),
session_name: "Rust OPC UA Client".into(),
session_timeout: defaults::session_timeout(),
session_nonce_length: defaults::session_nonce_length(),
}
}
}
#[cfg(test)]
mod tests {
use std::{self, collections::BTreeMap, path::PathBuf};
use crate::ClientBuilder;
use opcua_core::config::Config;
use opcua_crypto::SecurityPolicy;
use opcua_types::MessageSecurityMode;
use super::{ClientConfig, ClientEndpoint, ClientUserToken, ANONYMOUS_USER_TOKEN_ID};
fn make_test_file(filename: &str) -> PathBuf {
let mut path = std::env::temp_dir();
path.push(filename);
path
}
fn sample_builder() -> ClientBuilder {
ClientBuilder::new()
.application_name("OPC UA Sample Client")
.application_uri("urn:SampleClient")
.create_sample_keypair(true)
.certificate_path("own/cert.der")
.private_key_path("private/private.pem")
.trust_server_certs(true)
.pki_dir("./pki")
.endpoints(vec![
(
"sample_none",
ClientEndpoint {
url: String::from("opc.tcp://127.0.0.1:4855/"),
security_policy: String::from(SecurityPolicy::None.to_str()),
security_mode: String::from(MessageSecurityMode::None),
user_token_id: ANONYMOUS_USER_TOKEN_ID.to_string(),
},
),
(
"sample_basic128rsa15",
ClientEndpoint {
url: String::from("opc.tcp://127.0.0.1:4855/"),
security_policy: String::from(SecurityPolicy::Basic128Rsa15.to_str()),
security_mode: String::from(MessageSecurityMode::SignAndEncrypt),
user_token_id: ANONYMOUS_USER_TOKEN_ID.to_string(),
},
),
(
"sample_basic256",
ClientEndpoint {
url: String::from("opc.tcp://127.0.0.1:4855/"),
security_policy: String::from(SecurityPolicy::Basic256.to_str()),
security_mode: String::from(MessageSecurityMode::SignAndEncrypt),
user_token_id: ANONYMOUS_USER_TOKEN_ID.to_string(),
},
),
(
"sample_basic256sha256",
ClientEndpoint {
url: String::from("opc.tcp://127.0.0.1:4855/"),
security_policy: String::from(SecurityPolicy::Basic256Sha256.to_str()),
security_mode: String::from(MessageSecurityMode::SignAndEncrypt),
user_token_id: ANONYMOUS_USER_TOKEN_ID.to_string(),
},
),
])
.default_endpoint("sample_none")
.user_token(
"sample_user",
ClientUserToken::user_pass("sample1", "sample1pwd"),
)
.user_token(
"sample_user2",
ClientUserToken::user_pass("sample2", "sample2pwd"),
)
}
fn default_sample_config() -> ClientConfig {
sample_builder().into_config()
}
#[test]
fn client_sample_config() {
let config = default_sample_config();
let mut path = std::env::current_dir().unwrap();
path.push("..");
path.push("samples");
path.push("client.conf");
println!("Path is {path:?}");
let saved = config.save(&path);
println!("Saved = {saved:?}");
assert!(saved.is_ok());
config.validate().unwrap();
}
#[test]
fn client_config() {
let path = make_test_file("client_config.yaml");
println!("Client path = {path:?}");
let config = default_sample_config();
let saved = config.save(&path);
println!("Saved = {saved:?}");
assert!(config.save(&path).is_ok());
if let Ok(config2) = ClientConfig::load(&path) {
assert_eq!(config, config2);
} else {
panic!("Cannot load config from file");
}
}
#[test]
fn client_invalid_security_policy_config() {
let mut config = default_sample_config();
config.endpoints = BTreeMap::new();
config.endpoints.insert(
String::from("sample_none"),
ClientEndpoint {
url: String::from("opc.tcp://127.0.0.1:4855"),
security_policy: String::from("http://blah"),
security_mode: String::from(MessageSecurityMode::None),
user_token_id: ANONYMOUS_USER_TOKEN_ID.to_string(),
},
);
assert_eq!(
config.validate().unwrap_err().join(", "),
"Endpoint sample_none security policy http://blah is invalid"
);
}
#[test]
fn client_invalid_security_mode_config() {
let mut config = default_sample_config();
config.endpoints = BTreeMap::new();
config.endpoints.insert(
String::from("sample_none"),
ClientEndpoint {
url: String::from("opc.tcp://127.0.0.1:4855"),
security_policy: String::from(SecurityPolicy::Basic128Rsa15.to_uri()),
security_mode: String::from("SingAndEncrypt"),
user_token_id: ANONYMOUS_USER_TOKEN_ID.to_string(),
},
);
assert_eq!(
config.validate().unwrap_err().join(", "),
"Endpoint sample_none security mode SingAndEncrypt is invalid"
);
}
#[test]
fn client_anonymous_user_tokens_id() {
let mut config = default_sample_config();
config.user_tokens = BTreeMap::new();
config.user_tokens.insert(
String::from("ANONYMOUS"),
ClientUserToken {
user: String::new(),
password: Some(String::new()),
cert_path: None,
private_key_path: None,
},
);
assert_eq!(
config.validate().unwrap_err().join(", "),
"User tokens contains the reserved \"ANONYMOUS\" id, Token ANONYMOUS failed to validate: User token has an empty name."
);
}
}