use std::{sync::Arc, time::Duration};
use opcua_core::{
comms::url::hostname_from_url, sync::RwLock, trace_read_lock, trace_write_lock, ResponseMessage,
};
use opcua_crypto::{
self, legacy_encrypt_secret, random, CertificateStore, PKey, SecurityPolicy, X509,
};
use opcua_types::{
ActivateSessionRequest, ActivateSessionResponse, AnonymousIdentityToken,
ApplicationDescription, ByteString, CancelRequest, CancelResponse, CloseSessionRequest,
CloseSessionResponse, CreateSessionRequest, CreateSessionResponse, EndpointDescription, Error,
ExtensionObject, IntegerId, IssuedIdentityToken, MessageSecurityMode, NodeId, SignatureData,
SignedSoftwareCertificate, StatusCode, UAString, UserNameIdentityToken, UserTokenType,
X509IdentityToken,
};
use rsa::RsaPrivateKey;
use tracing::error;
use crate::{
session::{
process_service_result, process_unexpected_response,
request_builder::{builder_base, builder_error, RequestHeaderBuilder},
},
AsyncSecureChannel, IdentityToken, Session, UARequest,
};
#[derive(Clone)]
pub struct CreateSession<'a> {
client_description: ApplicationDescription,
server_uri: UAString,
endpoint_url: UAString,
session_name: UAString,
client_certificate: Option<X509>,
session_timeout: f64,
max_response_message_size: u32,
certificate_store: &'a RwLock<CertificateStore>,
endpoint: &'a EndpointDescription,
nonce_length: usize,
header: RequestHeaderBuilder,
}
builder_base!(CreateSession<'a>);
impl<'a> CreateSession<'a> {
pub(crate) fn new(session: &'a Session) -> Self {
Self {
endpoint_url: session.endpoint_info().endpoint.endpoint_url.clone(),
server_uri: UAString::null(),
client_description: session.application_description.clone(),
session_name: session.session_name.clone(),
client_certificate: session.channel.read_own_certificate(),
endpoint: &session.endpoint_info().endpoint,
certificate_store: session.channel.certificate_store(),
session_timeout: session.session_timeout,
max_response_message_size: 0,
nonce_length: session.session_nonce_length,
header: RequestHeaderBuilder::new_from_session(session),
}
}
pub fn new_manual(
certificate_store: &'a RwLock<CertificateStore>,
endpoint: &'a EndpointDescription,
session_id: u32,
timeout: Duration,
auth_token: NodeId,
request_handle: IntegerId,
) -> Self {
Self {
endpoint_url: UAString::null(),
server_uri: UAString::null(),
client_description: ApplicationDescription::default(),
session_name: UAString::null(),
client_certificate: None,
session_timeout: 0.0,
max_response_message_size: 0,
certificate_store,
endpoint,
nonce_length: 32,
header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
}
}
pub fn client_description(mut self, desc: impl Into<ApplicationDescription>) -> Self {
self.client_description = desc.into();
self
}
pub fn server_uri(mut self, server_uri: impl Into<UAString>) -> Self {
self.server_uri = server_uri.into();
self
}
pub fn endpoint_url(mut self, endpoint_url: impl Into<UAString>) -> Self {
self.endpoint_url = endpoint_url.into();
self
}
pub fn session_name(mut self, session_name: impl Into<UAString>) -> Self {
self.session_name = session_name.into();
self
}
pub fn client_certificate(mut self, client_certificate: X509) -> Self {
self.client_certificate = Some(client_certificate);
self
}
pub fn client_cert_from_store(mut self, certificate_store: &RwLock<CertificateStore>) -> Self {
let cert_store = trace_read_lock!(certificate_store);
self.client_certificate = cert_store.read_own_cert().ok();
self
}
pub fn session_timeout(mut self, session_timeout: f64) -> Self {
self.session_timeout = session_timeout;
self
}
pub fn max_response_message_size(mut self, max_response_message_size: u32) -> Self {
self.max_response_message_size = max_response_message_size;
self
}
pub fn nonce_length(mut self, nonce_length: usize) -> Self {
self.nonce_length = nonce_length;
self
}
}
impl UARequest for CreateSession<'_> {
type Out = CreateSessionResponse;
async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
where
Self: 'a,
{
let client_nonce = random::byte_string(self.nonce_length);
let request = CreateSessionRequest {
request_header: self.header.header,
client_description: self.client_description,
server_uri: self.server_uri,
endpoint_url: self.endpoint_url,
session_name: self.session_name,
client_nonce: client_nonce.clone(),
client_certificate: self
.client_certificate
.as_ref()
.map(|v| v.as_byte_string())
.unwrap_or_default(),
requested_session_timeout: self.session_timeout,
max_response_message_size: self.max_response_message_size,
};
let response = channel.send(request, self.header.timeout).await?;
if let ResponseMessage::CreateSession(response) = response {
tracing::debug!("create_session, success");
process_service_result(&response.response_header)?;
let security_policy = channel.security_policy();
if security_policy != SecurityPolicy::None {
if let Ok(server_certificate) =
opcua_crypto::X509::from_byte_string(&response.server_certificate)
{
let hostname = hostname_from_url(self.endpoint.endpoint_url.as_ref())
.map_err(|_| StatusCode::BadUnexpectedError)?;
let application_uri = self.endpoint.server.application_uri.as_ref();
let certificate_store = trace_write_lock!(self.certificate_store);
certificate_store.validate_or_reject_application_instance_cert(
&server_certificate,
security_policy,
Some(&hostname),
Some(application_uri),
)?;
opcua_crypto::verify_signature_data(
&response.server_signature,
security_policy,
&server_certificate,
self.client_certificate
.as_ref()
.ok_or(StatusCode::BadCertificateInvalid)?,
client_nonce.as_ref(),
)?;
} else {
return Err(StatusCode::BadCertificateInvalid);
}
}
channel.update_from_created_session(
&response.server_nonce,
&response.server_certificate,
&response.authentication_token,
)?;
Ok(*response)
} else {
tracing::error!("create_session failed");
Err(process_unexpected_response(response))
}
}
}
#[derive(Debug, Clone)]
pub struct ActivateSession {
identity_token: IdentityToken,
private_key: Option<PKey<RsaPrivateKey>>,
locale_ids: Vec<UAString>,
client_software_certificates: Vec<SignedSoftwareCertificate>,
endpoint: EndpointDescription,
header: RequestHeaderBuilder,
}
builder_base!(ActivateSession);
impl ActivateSession {
pub(crate) fn new(session: &Session) -> Self {
Self {
identity_token: session.endpoint_info().user_identity_token.clone(),
private_key: session.channel.read_own_private_key(),
locale_ids: session
.endpoint_info()
.preferred_locales
.iter()
.map(UAString::from)
.collect(),
client_software_certificates: Vec::new(),
endpoint: session.endpoint_info().endpoint.clone(),
header: RequestHeaderBuilder::new_from_session(session),
}
}
pub fn new_manual(
endpoint: EndpointDescription,
session_id: u32,
timeout: Duration,
auth_token: NodeId,
request_handle: IntegerId,
) -> Self {
Self {
identity_token: IdentityToken::Anonymous,
private_key: None,
locale_ids: Vec::new(),
client_software_certificates: Vec::new(),
endpoint,
header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
}
}
pub fn identity_token(mut self, identity_token: IdentityToken) -> Self {
self.identity_token = identity_token;
self
}
pub fn private_key(mut self, private_key: PKey<RsaPrivateKey>) -> Self {
self.private_key = Some(private_key);
self
}
pub fn locale_ids(mut self, locale_ids: Vec<UAString>) -> Self {
self.locale_ids = locale_ids;
self
}
pub fn locale_id(mut self, locale_id: impl Into<UAString>) -> Self {
self.locale_ids.push(locale_id.into());
self
}
pub fn client_software_certificates(
mut self,
certificates: Vec<SignedSoftwareCertificate>,
) -> Self {
self.client_software_certificates = certificates;
self
}
pub fn client_software_certificate(mut self, certificate: SignedSoftwareCertificate) -> Self {
self.client_software_certificates.push(certificate);
self
}
async fn user_identity_token(
&self,
remote_nonce: &ByteString,
remote_cert: &Option<X509>,
security_mode: MessageSecurityMode,
channel_security_policy: SecurityPolicy,
) -> Result<(ExtensionObject, SignatureData), Error> {
let user_token_type = match &self.identity_token {
IdentityToken::Anonymous => UserTokenType::Anonymous,
IdentityToken::UserName(_, _) => UserTokenType::UserName,
IdentityToken::X509(_, _) => UserTokenType::Certificate,
IdentityToken::IssuedToken(_) => UserTokenType::IssuedToken,
};
let Some(policy) = self.endpoint.find_policy(user_token_type) else {
builder_error!(
self,
"Cannot find user token type {:?} for this endpoint, cannot connect",
user_token_type
);
return Err(Error::new(
StatusCode::BadSecurityPolicyRejected,
format!(
"Cannot find user token type {user_token_type:?} for this endpoint, cannot connect"
),
));
};
let security_policy = if policy.security_policy_uri.is_empty() {
SecurityPolicy::None
} else {
SecurityPolicy::from_uri(policy.security_policy_uri.as_ref())
};
if security_policy == SecurityPolicy::Unknown {
error!("Unknown security policy {}", policy.security_policy_uri);
return Err(Error::new(
StatusCode::BadSecurityPolicyRejected,
format!("Unknown security policy {}", policy.security_policy_uri),
));
}
match &self.identity_token {
IdentityToken::Anonymous => {
let identity_token = AnonymousIdentityToken {
policy_id: policy.policy_id.clone(),
};
let identity_token = ExtensionObject::from_message(identity_token);
Ok((identity_token, SignatureData::null()))
}
IdentityToken::UserName(user, pass) => {
let nonce = remote_nonce.as_ref();
let cert = remote_cert;
let secret = legacy_encrypt_secret(
channel_security_policy,
security_mode,
policy,
nonce,
cert,
pass.0.as_bytes(),
)?;
let identity_token = UserNameIdentityToken {
policy_id: secret.policy,
user_name: UAString::from(user.as_str()),
password: secret.secret,
encryption_algorithm: secret.encryption_algorithm,
};
Ok((
ExtensionObject::from_message(identity_token),
SignatureData::null(),
))
}
IdentityToken::X509(cert, private_key) => {
let nonce = remote_nonce.as_ref();
let server_cert = remote_cert;
let Some(server_cert) = &server_cert else {
error!("Cannot create an X509IdentityToken because the remote server has no cert with which to create a signature");
return Err(Error::new(StatusCode::BadCertificateInvalid, "Cannot create an X509IdentityToken because the remote server has no cert with which to create a signature"));
};
let user_token_signature = opcua_crypto::create_signature_data(
private_key,
security_policy,
&server_cert.as_byte_string(),
&ByteString::from(&nonce),
)
.map_err(|s| Error::new(s, "Failed to create token signature"))?;
let identity_token = X509IdentityToken {
policy_id: policy.policy_id.clone(),
certificate_data: cert.as_byte_string(),
};
Ok((
ExtensionObject::from_message(identity_token),
user_token_signature,
))
}
IdentityToken::IssuedToken(source) => {
let token = source.0.get_issued_token().await?;
let nonce = remote_nonce.as_ref();
let cert = remote_cert;
let secret = legacy_encrypt_secret(
channel_security_policy,
security_mode,
policy,
nonce,
cert,
token.as_ref(),
)?;
let identity_token = IssuedIdentityToken {
policy_id: secret.policy,
encryption_algorithm: secret.encryption_algorithm,
token_data: secret.secret,
};
Ok((
ExtensionObject::from_message(identity_token),
SignatureData::null(),
))
}
}
}
async fn build_request(
self,
channel: &AsyncSecureChannel,
) -> Result<ActivateSessionRequest, StatusCode> {
let (remote_cert, remote_nonce, security_policy, message_security_mode) = {
let secure_channel = trace_read_lock!(channel.secure_channel);
(
secure_channel.remote_cert(),
secure_channel.remote_nonce_as_byte_string(),
secure_channel.security_policy(),
secure_channel.security_mode(),
)
};
let (user_identity_token, user_token_signature) = self
.user_identity_token(
&remote_nonce,
&remote_cert,
message_security_mode,
security_policy,
)
.await?;
let client_signature = match security_policy {
SecurityPolicy::None => SignatureData::null(),
_ => {
let Some(client_pkey) = self.private_key else {
error!("Cannot create client signature - no pkey!");
return Err(StatusCode::BadUnexpectedError);
};
let Some(server_cert) = remote_cert else {
error!("Cannot sign server certificate because server cert is null");
return Err(StatusCode::BadUnexpectedError);
};
let server_nonce = remote_nonce;
if server_nonce.is_null_or_empty() {
error!("Cannot sign server certificate because server nonce is empty");
return Err(StatusCode::BadUnexpectedError);
}
let server_cert = server_cert.as_byte_string();
opcua_crypto::create_signature_data(
&client_pkey,
security_policy,
&server_cert,
&server_nonce,
)?
}
};
Ok(ActivateSessionRequest {
request_header: self.header.header,
client_signature,
client_software_certificates: if self.client_software_certificates.is_empty() {
None
} else {
Some(self.client_software_certificates)
},
locale_ids: if self.locale_ids.is_empty() {
None
} else {
Some(self.locale_ids)
},
user_identity_token,
user_token_signature,
})
}
}
impl UARequest for ActivateSession {
type Out = ActivateSessionResponse;
async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
where
Self: 'a,
{
let timeout = self.header.timeout;
let request = self.build_request(channel).await?;
let response = channel.send(request, timeout).await?;
if let ResponseMessage::ActivateSession(response) = response {
tracing::debug!("activate_session success");
process_service_result(&response.response_header)?;
Ok(*response)
} else {
tracing::error!("activate_session failed");
Err(process_unexpected_response(response))
}
}
}
#[derive(Debug, Clone)]
pub struct CloseSession {
delete_subscriptions: bool,
header: RequestHeaderBuilder,
}
builder_base!(CloseSession);
impl CloseSession {
pub(crate) fn new(session: &Session) -> Self {
Self {
delete_subscriptions: true,
header: RequestHeaderBuilder::new_from_session(session),
}
}
pub fn new_manual(
session_id: u32,
timeout: Duration,
auth_token: NodeId,
request_handle: IntegerId,
) -> Self {
Self {
delete_subscriptions: true,
header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
}
}
pub fn delete_subscriptions(mut self, delete_subscriptions: bool) -> Self {
self.delete_subscriptions = delete_subscriptions;
self
}
}
impl UARequest for CloseSession {
type Out = CloseSessionResponse;
async fn send<'a>(self, channel: &'a AsyncSecureChannel) -> Result<Self::Out, StatusCode>
where
Self: 'a,
{
let request = CloseSessionRequest {
delete_subscriptions: self.delete_subscriptions,
request_header: self.header.header,
};
let response = channel.send(request, self.header.timeout).await?;
if let ResponseMessage::CloseSession(response) = response {
process_service_result(&response.response_header)?;
Ok(*response)
} else {
error!("close_session failed {:?}", response);
Err(process_unexpected_response(response))
}
}
}
#[derive(Debug, Clone)]
pub struct Cancel {
request_handle: IntegerId,
header: RequestHeaderBuilder,
}
builder_base!(Cancel);
impl Cancel {
pub fn new(request_to_cancel: IntegerId, session: &Session) -> Self {
Self {
request_handle: request_to_cancel,
header: RequestHeaderBuilder::new_from_session(session),
}
}
pub fn new_manual(
request_to_cancel: IntegerId,
session_id: u32,
timeout: Duration,
auth_token: NodeId,
request_handle: IntegerId,
) -> Self {
Self {
request_handle: request_to_cancel,
header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
}
}
}
impl UARequest for Cancel {
type Out = CancelResponse;
async fn send<'a>(self, channel: &'a AsyncSecureChannel) -> Result<Self::Out, StatusCode>
where
Self: 'a,
{
let request = CancelRequest {
request_header: self.header.header,
request_handle: self.request_handle,
};
let response = channel.send(request, self.header.timeout).await?;
if let ResponseMessage::Cancel(response) = response {
process_service_result(&response.response_header)?;
Ok(*response)
} else {
Err(process_unexpected_response(response))
}
}
}
impl Session {
pub(crate) async fn create_session(&self) -> Result<NodeId, StatusCode> {
let response = CreateSession::new(self).send(&self.channel).await?;
let session_id = {
self.session_id.store(Arc::new(response.session_id.clone()));
response.session_id.clone()
};
Ok(session_id)
}
pub(crate) async fn activate_session(&self) -> Result<(), StatusCode> {
ActivateSession::new(self).send(&self.channel).await?;
Ok(())
}
pub(crate) async fn close_session(&self, delete_subscriptions: bool) -> Result<(), StatusCode> {
CloseSession::new(self)
.delete_subscriptions(delete_subscriptions)
.send(&self.channel)
.await?;
Ok(())
}
pub async fn cancel(&self, request_handle: IntegerId) -> Result<u32, StatusCode> {
Ok(Cancel::new(request_handle, self)
.send(&self.channel)
.await?
.cancel_count)
}
}