1use std::{sync::Arc, time::Duration};
2
3use opcua_core::{
4 comms::url::hostname_from_url, sync::RwLock, trace_read_lock, trace_write_lock, ResponseMessage,
5};
6use opcua_crypto::{
7 self, legacy_encrypt_secret, random, CertificateStore, PKey, SecurityPolicy, X509,
8};
9use opcua_types::{
10 ActivateSessionRequest, ActivateSessionResponse, AnonymousIdentityToken,
11 ApplicationDescription, ByteString, CancelRequest, CancelResponse, CloseSessionRequest,
12 CloseSessionResponse, CreateSessionRequest, CreateSessionResponse, EndpointDescription, Error,
13 ExtensionObject, IntegerId, IssuedIdentityToken, MessageSecurityMode, NodeId, SignatureData,
14 SignedSoftwareCertificate, StatusCode, UAString, UserNameIdentityToken, UserTokenType,
15 X509IdentityToken,
16};
17use rsa::RsaPrivateKey;
18use tracing::error;
19
20use crate::{
21 session::{
22 process_service_result, process_unexpected_response,
23 request_builder::{builder_base, builder_error, RequestHeaderBuilder},
24 },
25 AsyncSecureChannel, IdentityToken, Session, UARequest,
26};
27
28#[derive(Clone)]
29pub struct CreateSession<'a> {
41 client_description: ApplicationDescription,
42 server_uri: UAString,
43 endpoint_url: UAString,
44 session_name: UAString,
45 client_certificate: Option<X509>,
46 session_timeout: f64,
47 max_response_message_size: u32,
48 certificate_store: &'a RwLock<CertificateStore>,
49 endpoint: &'a EndpointDescription,
50 nonce_length: usize,
51
52 header: RequestHeaderBuilder,
53}
54
55builder_base!(CreateSession<'a>);
56
57impl<'a> CreateSession<'a> {
58 pub(crate) fn new(session: &'a Session) -> Self {
62 Self {
63 endpoint_url: session.endpoint_info().endpoint.endpoint_url.clone(),
64 server_uri: UAString::null(),
65 client_description: session.application_description.clone(),
66 session_name: session.session_name.clone(),
67 client_certificate: session.channel.read_own_certificate(),
68 endpoint: &session.endpoint_info().endpoint,
69 certificate_store: session.channel.certificate_store(),
70 session_timeout: session.session_timeout,
71 max_response_message_size: 0,
72 nonce_length: session.session_nonce_length,
73 header: RequestHeaderBuilder::new_from_session(session),
74 }
75 }
76
77 pub fn new_manual(
79 certificate_store: &'a RwLock<CertificateStore>,
80 endpoint: &'a EndpointDescription,
81 session_id: u32,
82 timeout: Duration,
83 auth_token: NodeId,
84 request_handle: IntegerId,
85 ) -> Self {
86 Self {
87 endpoint_url: UAString::null(),
88 server_uri: UAString::null(),
89 client_description: ApplicationDescription::default(),
90 session_name: UAString::null(),
91 client_certificate: None,
92 session_timeout: 0.0,
93 max_response_message_size: 0,
94 certificate_store,
95 endpoint,
96 nonce_length: 32,
97 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
98 }
99 }
100
101 pub fn client_description(mut self, desc: impl Into<ApplicationDescription>) -> Self {
103 self.client_description = desc.into();
104 self
105 }
106
107 pub fn server_uri(mut self, server_uri: impl Into<UAString>) -> Self {
109 self.server_uri = server_uri.into();
110 self
111 }
112
113 pub fn endpoint_url(mut self, endpoint_url: impl Into<UAString>) -> Self {
115 self.endpoint_url = endpoint_url.into();
116 self
117 }
118
119 pub fn session_name(mut self, session_name: impl Into<UAString>) -> Self {
121 self.session_name = session_name.into();
122 self
123 }
124
125 pub fn client_certificate(mut self, client_certificate: X509) -> Self {
127 self.client_certificate = Some(client_certificate);
128 self
129 }
130
131 pub fn client_cert_from_store(mut self, certificate_store: &RwLock<CertificateStore>) -> Self {
133 let cert_store = trace_read_lock!(certificate_store);
134 self.client_certificate = cert_store.read_own_cert().ok();
135 self
136 }
137
138 pub fn session_timeout(mut self, session_timeout: f64) -> Self {
140 self.session_timeout = session_timeout;
141 self
142 }
143
144 pub fn max_response_message_size(mut self, max_response_message_size: u32) -> Self {
146 self.max_response_message_size = max_response_message_size;
147 self
148 }
149
150 pub fn nonce_length(mut self, nonce_length: usize) -> Self {
152 self.nonce_length = nonce_length;
153 self
154 }
155}
156
157impl UARequest for CreateSession<'_> {
158 type Out = CreateSessionResponse;
159
160 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
161 where
162 Self: 'a,
163 {
164 let client_nonce = random::byte_string(self.nonce_length);
165
166 let request = CreateSessionRequest {
167 request_header: self.header.header,
168 client_description: self.client_description,
169 server_uri: self.server_uri,
170 endpoint_url: self.endpoint_url,
171 session_name: self.session_name,
172 client_nonce: client_nonce.clone(),
173 client_certificate: self
174 .client_certificate
175 .as_ref()
176 .map(|v| v.as_byte_string())
177 .unwrap_or_default(),
178 requested_session_timeout: self.session_timeout,
179 max_response_message_size: self.max_response_message_size,
180 };
181 let response = channel.send(request, self.header.timeout).await?;
182
183 if let ResponseMessage::CreateSession(response) = response {
184 tracing::debug!("create_session, success");
185 process_service_result(&response.response_header)?;
186
187 let security_policy = channel.security_policy();
188
189 if security_policy != SecurityPolicy::None {
190 if let Ok(server_certificate) =
191 opcua_crypto::X509::from_byte_string(&response.server_certificate)
192 {
193 let hostname = hostname_from_url(self.endpoint.endpoint_url.as_ref())
195 .map_err(|_| StatusCode::BadUnexpectedError)?;
196 let application_uri = self.endpoint.server.application_uri.as_ref();
197
198 let certificate_store = trace_write_lock!(self.certificate_store);
199 certificate_store.validate_or_reject_application_instance_cert(
200 &server_certificate,
201 security_policy,
202 Some(&hostname),
203 Some(application_uri),
204 )?;
205
206 opcua_crypto::verify_signature_data(
207 &response.server_signature,
208 security_policy,
209 &server_certificate,
210 self.client_certificate
211 .as_ref()
212 .ok_or(StatusCode::BadCertificateInvalid)?,
213 client_nonce.as_ref(),
214 )?;
215 } else {
216 return Err(StatusCode::BadCertificateInvalid);
217 }
218 }
219
220 channel.update_from_created_session(
221 &response.server_nonce,
222 &response.server_certificate,
223 &response.authentication_token,
224 )?;
225
226 Ok(*response)
227 } else {
228 tracing::error!("create_session failed");
229 Err(process_unexpected_response(response))
230 }
231 }
232}
233
234#[derive(Debug, Clone)]
235pub struct ActivateSession {
243 identity_token: IdentityToken,
244 private_key: Option<PKey<RsaPrivateKey>>,
245 locale_ids: Vec<UAString>,
246 client_software_certificates: Vec<SignedSoftwareCertificate>,
247 endpoint: EndpointDescription,
248
249 header: RequestHeaderBuilder,
250}
251
252builder_base!(ActivateSession);
253
254impl ActivateSession {
255 pub(crate) fn new(session: &Session) -> Self {
259 Self {
260 identity_token: session.endpoint_info().user_identity_token.clone(),
261 private_key: session.channel.read_own_private_key(),
262 locale_ids: session
263 .endpoint_info()
264 .preferred_locales
265 .iter()
266 .map(UAString::from)
267 .collect(),
268 client_software_certificates: Vec::new(),
269 endpoint: session.endpoint_info().endpoint.clone(),
270 header: RequestHeaderBuilder::new_from_session(session),
271 }
272 }
273
274 pub fn new_manual(
276 endpoint: EndpointDescription,
277 session_id: u32,
278 timeout: Duration,
279 auth_token: NodeId,
280 request_handle: IntegerId,
281 ) -> Self {
282 Self {
283 identity_token: IdentityToken::Anonymous,
284 private_key: None,
285 locale_ids: Vec::new(),
286 client_software_certificates: Vec::new(),
287 endpoint,
288 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
289 }
290 }
291
292 pub fn identity_token(mut self, identity_token: IdentityToken) -> Self {
294 self.identity_token = identity_token;
295 self
296 }
297
298 pub fn private_key(mut self, private_key: PKey<RsaPrivateKey>) -> Self {
300 self.private_key = Some(private_key);
301 self
302 }
303
304 pub fn locale_ids(mut self, locale_ids: Vec<UAString>) -> Self {
306 self.locale_ids = locale_ids;
307 self
308 }
309
310 pub fn locale_id(mut self, locale_id: impl Into<UAString>) -> Self {
312 self.locale_ids.push(locale_id.into());
313 self
314 }
315
316 pub fn client_software_certificates(
318 mut self,
319 certificates: Vec<SignedSoftwareCertificate>,
320 ) -> Self {
321 self.client_software_certificates = certificates;
322 self
323 }
324
325 pub fn client_software_certificate(mut self, certificate: SignedSoftwareCertificate) -> Self {
327 self.client_software_certificates.push(certificate);
328 self
329 }
330
331 async fn user_identity_token(
332 &self,
333 remote_nonce: &ByteString,
334 remote_cert: &Option<X509>,
335 security_mode: MessageSecurityMode,
336 channel_security_policy: SecurityPolicy,
337 ) -> Result<(ExtensionObject, SignatureData), Error> {
338 let user_token_type = match &self.identity_token {
339 IdentityToken::Anonymous => UserTokenType::Anonymous,
340 IdentityToken::UserName(_, _) => UserTokenType::UserName,
341 IdentityToken::X509(_, _) => UserTokenType::Certificate,
342 IdentityToken::IssuedToken(_) => UserTokenType::IssuedToken,
343 };
344 let Some(policy) = self.endpoint.find_policy(user_token_type) else {
345 builder_error!(
346 self,
347 "Cannot find user token type {:?} for this endpoint, cannot connect",
348 user_token_type
349 );
350 return Err(Error::new(
351 StatusCode::BadSecurityPolicyRejected,
352 format!(
353 "Cannot find user token type {user_token_type:?} for this endpoint, cannot connect"
354 ),
355 ));
356 };
357 let security_policy = if policy.security_policy_uri.is_empty() {
358 SecurityPolicy::None
360 } else {
361 SecurityPolicy::from_uri(policy.security_policy_uri.as_ref())
362 };
363
364 if security_policy == SecurityPolicy::Unknown {
365 error!("Unknown security policy {}", policy.security_policy_uri);
366 return Err(Error::new(
367 StatusCode::BadSecurityPolicyRejected,
368 format!("Unknown security policy {}", policy.security_policy_uri),
369 ));
370 }
371
372 match &self.identity_token {
373 IdentityToken::Anonymous => {
374 let identity_token = AnonymousIdentityToken {
375 policy_id: policy.policy_id.clone(),
376 };
377 let identity_token = ExtensionObject::from_message(identity_token);
378 Ok((identity_token, SignatureData::null()))
379 }
380 IdentityToken::UserName(user, pass) => {
381 let nonce = remote_nonce.as_ref();
382 let cert = remote_cert;
383 let secret = legacy_encrypt_secret(
384 channel_security_policy,
385 security_mode,
386 policy,
387 nonce,
388 cert,
389 pass.0.as_bytes(),
390 )?;
391 let identity_token = UserNameIdentityToken {
392 policy_id: secret.policy,
393 user_name: UAString::from(user.as_str()),
394 password: secret.secret,
395 encryption_algorithm: secret.encryption_algorithm,
396 };
397 Ok((
398 ExtensionObject::from_message(identity_token),
399 SignatureData::null(),
400 ))
401 }
402 IdentityToken::X509(cert, private_key) => {
403 let nonce = remote_nonce.as_ref();
404 let server_cert = remote_cert;
405 let Some(server_cert) = &server_cert else {
406 error!("Cannot create an X509IdentityToken because the remote server has no cert with which to create a signature");
407 return Err(Error::new(StatusCode::BadCertificateInvalid, "Cannot create an X509IdentityToken because the remote server has no cert with which to create a signature"));
408 };
409
410 let user_token_signature = opcua_crypto::create_signature_data(
411 private_key,
412 security_policy,
413 &server_cert.as_byte_string(),
414 &ByteString::from(&nonce),
415 )
416 .map_err(|s| Error::new(s, "Failed to create token signature"))?;
417
418 let identity_token = X509IdentityToken {
420 policy_id: policy.policy_id.clone(),
421 certificate_data: cert.as_byte_string(),
422 };
423
424 Ok((
425 ExtensionObject::from_message(identity_token),
426 user_token_signature,
427 ))
428 }
429 IdentityToken::IssuedToken(source) => {
430 let token = source.0.get_issued_token().await?;
431 let nonce = remote_nonce.as_ref();
432 let cert = remote_cert;
433 let secret = legacy_encrypt_secret(
434 channel_security_policy,
435 security_mode,
436 policy,
437 nonce,
438 cert,
439 token.as_ref(),
440 )?;
441 let identity_token = IssuedIdentityToken {
442 policy_id: secret.policy,
443 encryption_algorithm: secret.encryption_algorithm,
444 token_data: secret.secret,
445 };
446 Ok((
447 ExtensionObject::from_message(identity_token),
448 SignatureData::null(),
449 ))
450 }
451 }
452 }
453
454 async fn build_request(
455 self,
456 channel: &AsyncSecureChannel,
457 ) -> Result<ActivateSessionRequest, StatusCode> {
458 let (remote_cert, remote_nonce, security_policy, message_security_mode) = {
459 let secure_channel = trace_read_lock!(channel.secure_channel);
460 (
461 secure_channel.remote_cert(),
462 secure_channel.remote_nonce_as_byte_string(),
463 secure_channel.security_policy(),
464 secure_channel.security_mode(),
465 )
466 };
467 let (user_identity_token, user_token_signature) = self
468 .user_identity_token(
469 &remote_nonce,
470 &remote_cert,
471 message_security_mode,
472 security_policy,
473 )
474 .await?;
475 let client_signature = match security_policy {
476 SecurityPolicy::None => SignatureData::null(),
477 _ => {
478 let Some(client_pkey) = self.private_key else {
479 error!("Cannot create client signature - no pkey!");
480 return Err(StatusCode::BadUnexpectedError);
481 };
482
483 let Some(server_cert) = remote_cert else {
484 error!("Cannot sign server certificate because server cert is null");
485 return Err(StatusCode::BadUnexpectedError);
486 };
487
488 let server_nonce = remote_nonce;
489 if server_nonce.is_null_or_empty() {
490 error!("Cannot sign server certificate because server nonce is empty");
491 return Err(StatusCode::BadUnexpectedError);
492 }
493
494 let server_cert = server_cert.as_byte_string();
495 opcua_crypto::create_signature_data(
496 &client_pkey,
497 security_policy,
498 &server_cert,
499 &server_nonce,
500 )?
501 }
502 };
503
504 Ok(ActivateSessionRequest {
505 request_header: self.header.header,
506 client_signature,
507 client_software_certificates: if self.client_software_certificates.is_empty() {
508 None
509 } else {
510 Some(self.client_software_certificates)
511 },
512 locale_ids: if self.locale_ids.is_empty() {
513 None
514 } else {
515 Some(self.locale_ids)
516 },
517 user_identity_token,
518 user_token_signature,
519 })
520 }
521}
522
523impl UARequest for ActivateSession {
524 type Out = ActivateSessionResponse;
525
526 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
527 where
528 Self: 'a,
529 {
530 let timeout = self.header.timeout;
531 let request = self.build_request(channel).await?;
532
533 let response = channel.send(request, timeout).await?;
534
535 if let ResponseMessage::ActivateSession(response) = response {
536 tracing::debug!("activate_session success");
537 process_service_result(&response.response_header)?;
539 Ok(*response)
540 } else {
541 tracing::error!("activate_session failed");
542 Err(process_unexpected_response(response))
543 }
544 }
545}
546
547#[derive(Debug, Clone)]
548pub struct CloseSession {
553 delete_subscriptions: bool,
554 header: RequestHeaderBuilder,
555}
556
557builder_base!(CloseSession);
558
559impl CloseSession {
560 pub(crate) fn new(session: &Session) -> Self {
564 Self {
565 delete_subscriptions: true,
566 header: RequestHeaderBuilder::new_from_session(session),
567 }
568 }
569
570 pub fn new_manual(
572 session_id: u32,
573 timeout: Duration,
574 auth_token: NodeId,
575 request_handle: IntegerId,
576 ) -> Self {
577 Self {
578 delete_subscriptions: true,
579 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
580 }
581 }
582
583 pub fn delete_subscriptions(mut self, delete_subscriptions: bool) -> Self {
586 self.delete_subscriptions = delete_subscriptions;
587 self
588 }
589}
590
591impl UARequest for CloseSession {
592 type Out = CloseSessionResponse;
593
594 async fn send<'a>(self, channel: &'a AsyncSecureChannel) -> Result<Self::Out, StatusCode>
595 where
596 Self: 'a,
597 {
598 let request = CloseSessionRequest {
599 delete_subscriptions: self.delete_subscriptions,
600 request_header: self.header.header,
601 };
602 let response = channel.send(request, self.header.timeout).await?;
603 if let ResponseMessage::CloseSession(response) = response {
604 process_service_result(&response.response_header)?;
605 Ok(*response)
606 } else {
607 error!("close_session failed {:?}", response);
608 Err(process_unexpected_response(response))
609 }
610 }
611}
612
613#[derive(Debug, Clone)]
614pub struct Cancel {
618 request_handle: IntegerId,
619 header: RequestHeaderBuilder,
620}
621
622builder_base!(Cancel);
623
624impl Cancel {
625 pub fn new(request_to_cancel: IntegerId, session: &Session) -> Self {
627 Self {
628 request_handle: request_to_cancel,
629 header: RequestHeaderBuilder::new_from_session(session),
630 }
631 }
632
633 pub fn new_manual(
635 request_to_cancel: IntegerId,
636 session_id: u32,
637 timeout: Duration,
638 auth_token: NodeId,
639 request_handle: IntegerId,
640 ) -> Self {
641 Self {
642 request_handle: request_to_cancel,
643 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
644 }
645 }
646}
647
648impl UARequest for Cancel {
649 type Out = CancelResponse;
650
651 async fn send<'a>(self, channel: &'a AsyncSecureChannel) -> Result<Self::Out, StatusCode>
652 where
653 Self: 'a,
654 {
655 let request = CancelRequest {
656 request_header: self.header.header,
657 request_handle: self.request_handle,
658 };
659
660 let response = channel.send(request, self.header.timeout).await?;
661 if let ResponseMessage::Cancel(response) = response {
662 process_service_result(&response.response_header)?;
663 Ok(*response)
664 } else {
665 Err(process_unexpected_response(response))
666 }
667 }
668}
669
670impl Session {
671 pub(crate) async fn create_session(&self) -> Result<NodeId, StatusCode> {
683 let response = CreateSession::new(self).send(&self.channel).await?;
684
685 let session_id = {
686 self.session_id.store(Arc::new(response.session_id.clone()));
687 response.session_id.clone()
688 };
689
690 Ok(session_id)
691 }
692
693 pub(crate) async fn activate_session(&self) -> Result<(), StatusCode> {
703 ActivateSession::new(self).send(&self.channel).await?;
704 Ok(())
705 }
706
707 pub(crate) async fn close_session(&self, delete_subscriptions: bool) -> Result<(), StatusCode> {
711 CloseSession::new(self)
712 .delete_subscriptions(delete_subscriptions)
713 .send(&self.channel)
714 .await?;
715 Ok(())
716 }
717
718 pub async fn cancel(&self, request_handle: IntegerId) -> Result<u32, StatusCode> {
732 Ok(Cancel::new(request_handle, self)
733 .send(&self.channel)
734 .await?
735 .cancel_count)
736 }
737}