1use std::{str::FromStr, sync::Arc};
2
3use chrono::Duration;
4use tokio::{pin, select};
5use tracing::error;
6
7use crate::{
8 transport::{
9 tcp::TransportConfiguration, Connector, ConnectorBuilder, TcpConnector, TransportPollResult,
10 },
11 AsyncSecureChannel, ClientConfig, ClientEndpoint, IdentityToken,
12};
13use opcua_core::{
14 comms::url::{
15 hostname_from_url, server_url_from_endpoint_url, url_matches_except_host,
16 url_with_replaced_hostname,
17 },
18 config::Config,
19 sync::RwLock,
20 ResponseMessage,
21};
22use opcua_crypto::{CertificateStore, SecurityPolicy};
23use opcua_types::{
24 ApplicationDescription, ContextOwned, DecodingOptions, EndpointDescription, Error,
25 FindServersOnNetworkRequest, FindServersOnNetworkResponse, FindServersRequest,
26 GetEndpointsRequest, MessageSecurityMode, NamespaceMap, RegisterServerRequest,
27 RegisteredServer, StatusCode, UAString,
28};
29
30use super::{
31 connection::SessionBuilder, process_service_result, process_unexpected_response, EndpointInfo,
32 Session, SessionEventLoop,
33};
34
35pub struct Client {
38 pub(super) config: ClientConfig,
40 certificate_store: Arc<RwLock<CertificateStore>>,
42}
43
44impl Client {
45 pub fn new(config: ClientConfig) -> Self {
53 let application_description = if config.create_sample_keypair {
54 Some(config.application_description())
55 } else {
56 None
57 };
58
59 let (mut certificate_store, client_certificate, client_pkey) =
60 CertificateStore::new_with_x509_data(
61 &config.pki_dir,
62 false,
63 config.certificate_path.as_deref(),
64 config.private_key_path.as_deref(),
65 application_description,
66 );
67 if client_certificate.is_none() || client_pkey.is_none() {
68 error!("Client is missing its application instance certificate and/or its private key. Encrypted endpoints will not function correctly.")
69 }
70
71 certificate_store.set_skip_verify_certs(!config.verify_server_certs);
73
74 certificate_store.set_trust_unknown_certs(config.trust_server_certs);
76
77 Self {
81 config,
82 certificate_store: Arc::new(RwLock::new(certificate_store)),
83 }
84 }
85
86 pub fn session_builder(&self) -> SessionBuilder<'_> {
88 SessionBuilder::<'_>::new(&self.config)
89 }
90
91 pub async fn connect_to_endpoint_id(
101 &mut self,
102 endpoint_id: impl Into<String>,
103 ) -> Result<(Arc<Session>, SessionEventLoop<TcpConnector>), Error> {
104 self.session_builder()
105 .with_endpoints(self.get_server_endpoints().await?)
106 .connect_to_endpoint_id(endpoint_id)?
107 .build(self.certificate_store.clone())
108 }
109
110 pub async fn connect_to_matching_endpoint(
130 &mut self,
131 endpoint: impl Into<EndpointDescription>,
132 user_identity_token: IdentityToken,
133 ) -> Result<(Arc<Session>, SessionEventLoop<TcpConnector>), Error> {
134 let endpoint = endpoint.into();
135
136 let server_url = endpoint.endpoint_url.as_ref();
138
139 self.session_builder()
140 .with_endpoints(self.get_server_endpoints_from_url(server_url).await?)
141 .connect_to_matching_endpoint(endpoint)?
142 .user_identity_token(user_identity_token)
143 .build(self.certificate_store.clone())
144 }
145
146 pub fn connect_to_endpoint_directly(
165 &mut self,
166 endpoint: impl Into<EndpointDescription>,
167 identity_token: IdentityToken,
168 ) -> Result<(Arc<Session>, SessionEventLoop<TcpConnector>), Error> {
169 self.session_builder()
170 .connect_to_endpoint_directly(endpoint)?
171 .user_identity_token(identity_token)
172 .build(self.certificate_store.clone())
173 }
174
175 pub async fn connect_to_default_endpoint(
194 &mut self,
195 ) -> Result<(Arc<Session>, SessionEventLoop<TcpConnector>), Error> {
196 self.session_builder()
197 .with_endpoints(self.get_server_endpoints().await?)
198 .connect_to_default_endpoint()?
199 .build(self.certificate_store.clone())
200 }
201
202 fn channel_from_endpoint_info(
207 &self,
208 endpoint_info: EndpointInfo,
209 channel_lifetime: u32,
210 ) -> AsyncSecureChannel {
211 AsyncSecureChannel::new(
212 self.certificate_store.clone(),
213 endpoint_info,
214 self.config.session_retry_policy(),
215 self.config.performance.ignore_clock_skew,
216 Arc::default(),
217 TransportConfiguration {
218 send_buffer_size: self.config.decoding_options.max_chunk_size,
219 recv_buffer_size: self.config.decoding_options.max_incoming_chunk_size,
220 max_message_size: self.config.decoding_options.max_message_size,
221 max_chunk_count: self.config.decoding_options.max_chunk_count,
222 },
223 channel_lifetime,
224 Arc::new(RwLock::new(ContextOwned::new_default(
226 NamespaceMap::new(),
227 self.decoding_options(),
228 ))),
229 )
230 }
231
232 pub fn default_endpoint(&self) -> Result<ClientEndpoint, String> {
240 let default_endpoint_id = self.config.default_endpoint.clone();
241 if default_endpoint_id.is_empty() {
242 Err("No default endpoint has been specified".to_string())
243 } else if let Some(endpoint) = self.config.endpoints.get(&default_endpoint_id) {
244 Ok(endpoint.clone())
245 } else {
246 Err(format!(
247 "Cannot find default endpoint with id {default_endpoint_id}"
248 ))
249 }
250 }
251
252 pub async fn get_server_endpoints(&self) -> Result<Vec<EndpointDescription>, Error> {
259 let default_endpoint = self
260 .default_endpoint()
261 .map_err(|e| Error::new(StatusCode::BadConfigurationError, e))?;
262 if let Ok(server_url) = server_url_from_endpoint_url(&default_endpoint.url) {
263 self.get_server_endpoints_from_url(server_url).await
264 } else {
265 error!(
266 "Cannot create a server url from the specified endpoint url {}",
267 default_endpoint.url
268 );
269 Err(Error::new(
270 StatusCode::BadUnexpectedError,
271 format!(
272 "Cannot create a server url from the specified endpoint url {}",
273 default_endpoint.url
274 ),
275 ))
276 }
277 }
278
279 fn decoding_options(&self) -> DecodingOptions {
280 let decoding_options = &self.config.decoding_options;
281 DecodingOptions {
282 max_chunk_count: decoding_options.max_chunk_count,
283 max_message_size: decoding_options.max_message_size,
284 max_string_length: decoding_options.max_string_length,
285 max_byte_string_length: decoding_options.max_byte_string_length,
286 max_array_length: decoding_options.max_array_length,
287 client_offset: Duration::zero(),
288 ..Default::default()
289 }
290 }
291
292 async fn get_server_endpoints_inner(
293 &self,
294 endpoint: &EndpointDescription,
295 channel: &AsyncSecureChannel,
296 locale_ids: Option<Vec<UAString>>,
297 profile_uris: Option<Vec<UAString>>,
298 ) -> Result<Vec<EndpointDescription>, StatusCode> {
299 let request = GetEndpointsRequest {
300 request_header: channel.make_request_header(self.config.request_timeout),
301 endpoint_url: endpoint.endpoint_url.clone(),
302 locale_ids,
303 profile_uris,
304 };
305 let response = channel.send(request, self.config.request_timeout).await?;
307 if let ResponseMessage::GetEndpoints(response) = response {
308 process_service_result(&response.response_header)?;
309 match response.endpoints {
310 None => Ok(Vec::new()),
311 Some(endpoints) => Ok(endpoints),
312 }
313 } else {
314 Err(process_unexpected_response(response))
315 }
316 }
317
318 pub async fn get_server_endpoints_from_url(
330 &self,
331 server: impl ConnectorBuilder,
332 ) -> Result<Vec<EndpointDescription>, Error> {
333 self.get_endpoints(server, &[], &[]).await
334 }
335
336 pub async fn get_endpoints(
350 &self,
351 server: impl ConnectorBuilder,
352 locale_ids: &[&str],
353 profile_uris: &[&str],
354 ) -> Result<Vec<EndpointDescription>, Error> {
355 let server = server.build()?;
356 let preferred_locales = Vec::new();
357 let endpoint = server.default_endpoint();
359 let endpoint_info = EndpointInfo {
360 endpoint: endpoint.clone(),
361 user_identity_token: IdentityToken::Anonymous,
362 preferred_locales,
363 };
364 let channel = self.channel_from_endpoint_info(endpoint_info, self.config.channel_lifetime);
365
366 let mut evt_loop = channel
367 .connect(&server)
368 .await
369 .map_err(|e| Error::new(e, "Failed to connect to server"))?;
370
371 let send_fut = self.get_server_endpoints_inner(
372 &endpoint,
373 &channel,
374 if locale_ids.is_empty() {
375 None
376 } else {
377 Some(locale_ids.iter().map(|i| (*i).into()).collect())
378 },
379 if profile_uris.is_empty() {
380 None
381 } else {
382 Some(profile_uris.iter().map(|i| (*i).into()).collect())
383 },
384 );
385 pin!(send_fut);
386
387 let res = loop {
388 select! {
389 r = evt_loop.poll() => {
390 if let TransportPollResult::Closed(e) = r {
391 return Err(Error::new(e, "Transport closed unexpectedly"));
392 }
393 },
394 res = &mut send_fut => break res.map_err(|e| Error::new(e, "Failed to get endpoints")),
395 }
396 };
397
398 channel.close_channel().await;
399
400 loop {
401 if matches!(evt_loop.poll().await, TransportPollResult::Closed(_)) {
402 break;
403 }
404 }
405
406 res
407 }
408
409 async fn find_servers_inner(
410 &self,
411 endpoint_url: String,
412 channel: &AsyncSecureChannel,
413 locale_ids: Option<Vec<UAString>>,
414 server_uris: Option<Vec<UAString>>,
415 ) -> Result<Vec<ApplicationDescription>, StatusCode> {
416 let request = FindServersRequest {
417 request_header: channel.make_request_header(self.config.request_timeout),
418 endpoint_url: endpoint_url.into(),
419 locale_ids,
420 server_uris,
421 };
422
423 let response = channel.send(request, self.config.request_timeout).await?;
424 if let ResponseMessage::FindServers(response) = response {
425 process_service_result(&response.response_header)?;
426 Ok(response.servers.unwrap_or_default())
427 } else {
428 Err(process_unexpected_response(response))
429 }
430 }
431
432 pub async fn find_servers(
446 &self,
447 discovery_endpoint: impl ConnectorBuilder,
448 locale_ids: Option<Vec<UAString>>,
449 server_uris: Option<Vec<UAString>>,
450 ) -> Result<Vec<ApplicationDescription>, StatusCode> {
451 let discovery_endpoint = discovery_endpoint.build()?;
452 let endpoint = discovery_endpoint.default_endpoint();
453 let session_info = EndpointInfo {
454 endpoint: endpoint.clone(),
455 user_identity_token: IdentityToken::Anonymous,
456 preferred_locales: Vec::new(),
457 };
458 let channel = self.channel_from_endpoint_info(session_info, self.config.channel_lifetime);
459
460 let mut evt_loop = channel.connect(&discovery_endpoint).await?;
461
462 let send_fut = self.find_servers_inner(
463 evt_loop.connected_url().to_owned(),
464 &channel,
465 locale_ids,
466 server_uris,
467 );
468 pin!(send_fut);
469
470 let res = loop {
471 select! {
472 r = evt_loop.poll() => {
473 if let TransportPollResult::Closed(e) = r {
474 return Err(e);
475 }
476 },
477 res = &mut send_fut => break res
478 }
479 };
480
481 channel.close_channel().await;
482
483 loop {
484 if matches!(evt_loop.poll().await, TransportPollResult::Closed(_)) {
485 break;
486 }
487 }
488
489 res
490 }
491
492 async fn find_servers_on_network_inner(
493 &self,
494 starting_record_id: u32,
495 max_records_to_return: u32,
496 server_capability_filter: Option<Vec<UAString>>,
497 channel: &AsyncSecureChannel,
498 ) -> Result<FindServersOnNetworkResponse, StatusCode> {
499 let request = FindServersOnNetworkRequest {
500 request_header: channel.make_request_header(self.config.request_timeout),
501 starting_record_id,
502 max_records_to_return,
503 server_capability_filter,
504 };
505
506 let response = channel.send(request, self.config.request_timeout).await?;
507 if let ResponseMessage::FindServersOnNetwork(response) = response {
508 process_service_result(&response.response_header)?;
509 Ok(*response)
510 } else {
511 Err(process_unexpected_response(response))
512 }
513 }
514
515 pub async fn find_servers_on_network(
534 &self,
535 discovery_endpoint: impl ConnectorBuilder,
536 starting_record_id: u32,
537 max_records_to_return: u32,
538 server_capability_filter: Option<Vec<UAString>>,
539 ) -> Result<FindServersOnNetworkResponse, StatusCode> {
540 let discovery_endpoint = discovery_endpoint.build()?;
541 let endpoint = discovery_endpoint.default_endpoint();
542 let session_info = EndpointInfo {
543 endpoint: endpoint.clone(),
544 user_identity_token: IdentityToken::Anonymous,
545 preferred_locales: Vec::new(),
546 };
547 let channel = self.channel_from_endpoint_info(session_info, self.config.channel_lifetime);
548
549 let mut evt_loop = channel.connect(&discovery_endpoint).await?;
550
551 let send_fut = self.find_servers_on_network_inner(
552 starting_record_id,
553 max_records_to_return,
554 server_capability_filter,
555 &channel,
556 );
557 pin!(send_fut);
558
559 let res = loop {
560 select! {
561 r = evt_loop.poll() => {
562 if let TransportPollResult::Closed(e) = r {
563 return Err(e);
564 }
565 },
566 res = &mut send_fut => break res
567 }
568 };
569
570 channel.close_channel().await;
571
572 loop {
573 if matches!(evt_loop.poll().await, TransportPollResult::Closed(_)) {
574 break;
575 }
576 }
577
578 res
579 }
580
581 pub fn find_matching_endpoint(
595 endpoints: &[EndpointDescription],
596 endpoint_url: &str,
597 security_policy: SecurityPolicy,
598 security_mode: MessageSecurityMode,
599 ) -> Option<EndpointDescription> {
600 if security_policy == SecurityPolicy::Unknown {
601 panic!("Cannot match against unknown security policy");
602 }
603
604 let mut matching_endpoint = endpoints
605 .iter()
606 .find(|e| {
607 security_mode == e.security_mode
609 && security_policy == SecurityPolicy::from_uri(e.security_policy_uri.as_ref())
610 && url_matches_except_host(endpoint_url, e.endpoint_url.as_ref())
611 })
612 .cloned()?;
613
614 let hostname = hostname_from_url(endpoint_url).ok()?;
615 let new_endpoint_url =
616 url_with_replaced_hostname(matching_endpoint.endpoint_url.as_ref(), &hostname).ok()?;
617
618 matching_endpoint.endpoint_url = new_endpoint_url.into();
621 Some(matching_endpoint)
622 }
623
624 pub fn is_supported_endpoint(&self, endpoint: &EndpointDescription) -> bool {
634 if let Ok(security_policy) = SecurityPolicy::from_str(endpoint.security_policy_uri.as_ref())
635 {
636 !matches!(security_policy, SecurityPolicy::Unknown)
637 } else {
638 false
639 }
640 }
641
642 async fn register_server_inner(
643 &self,
644 server: RegisteredServer,
645 channel: &AsyncSecureChannel,
646 ) -> Result<(), StatusCode> {
647 let request = RegisterServerRequest {
648 request_header: channel.make_request_header(self.config.request_timeout),
649 server,
650 };
651 let response = channel.send(request, self.config.request_timeout).await?;
652 if let ResponseMessage::RegisterServer(response) = response {
653 process_service_result(&response.response_header)?;
654 Ok(())
655 } else {
656 Err(process_unexpected_response(response))
657 }
658 }
659
660 pub async fn get_best_endpoint(
666 &self,
667 discovery_endpoint: impl ConnectorBuilder,
668 ) -> Result<EndpointDescription, Error> {
669 let discovery_endpoint = discovery_endpoint.build()?;
670 let endpoints = self
671 .get_server_endpoints_from_url(
672 discovery_endpoint.default_endpoint().endpoint_url.as_ref(),
673 )
674 .await?;
675 if endpoints.is_empty() {
676 return Err(Error::new(
677 StatusCode::BadUnexpectedError,
678 "No endpoints returned from server",
679 ));
680 }
681
682 let Some(endpoint) = endpoints
683 .into_iter()
684 .filter(|e| self.is_supported_endpoint(e))
685 .max_by(|a, b| a.security_level.cmp(&b.security_level))
686 else {
687 error!("Cannot find an endpoint that we can use");
688 return Err(Error::new(
689 StatusCode::BadUnexpectedError,
690 "No supported endpoints returned from server",
691 ));
692 };
693
694 Ok(endpoint)
695 }
696
697 pub async fn register_server(
718 &self,
719 connector: impl ConnectorBuilder,
720 server_endpoint: &EndpointDescription,
721 server: RegisteredServer,
722 ) -> Result<(), StatusCode> {
723 let endpoint_info = EndpointInfo {
724 endpoint: server_endpoint.clone(),
725 user_identity_token: IdentityToken::Anonymous,
726 preferred_locales: Vec::new(),
727 };
728 let connector = connector.build()?;
729 let channel = self.channel_from_endpoint_info(endpoint_info, self.config.channel_lifetime);
730
731 let mut evt_loop = channel.connect(&connector).await?;
732
733 let send_fut = self.register_server_inner(server, &channel);
734 pin!(send_fut);
735
736 let res = loop {
737 select! {
738 r = evt_loop.poll() => {
739 if let TransportPollResult::Closed(e) = r {
740 return Err(e);
741 }
742 },
743 res = &mut send_fut => break res
744 }
745 };
746
747 channel.close_channel().await;
748
749 loop {
750 if matches!(evt_loop.poll().await, TransportPollResult::Closed(_)) {
751 break;
752 }
753 }
754
755 res
756 }
757
758 pub fn certificate_store(&self) -> &Arc<RwLock<CertificateStore>> {
760 &self.certificate_store
761 }
762}