Skip to main content

opcua_client/session/
client.rs

1use std::{str::FromStr, sync::Arc};
2
3use chrono::Duration;
4use tokio::{pin, select};
5use tracing::error;
6
7use crate::{
8    transport::{
9        tcp::TransportConfiguration, Connector, ConnectorBuilder, TcpConnector, TransportPollResult,
10    },
11    AsyncSecureChannel, ClientConfig, ClientEndpoint, IdentityToken,
12};
13use opcua_core::{
14    comms::url::{
15        hostname_from_url, server_url_from_endpoint_url, url_matches_except_host,
16        url_with_replaced_hostname,
17    },
18    config::Config,
19    sync::RwLock,
20    ResponseMessage,
21};
22use opcua_crypto::{CertificateStore, SecurityPolicy};
23use opcua_types::{
24    ApplicationDescription, ContextOwned, DecodingOptions, EndpointDescription, Error,
25    FindServersOnNetworkRequest, FindServersOnNetworkResponse, FindServersRequest,
26    GetEndpointsRequest, MessageSecurityMode, NamespaceMap, RegisterServerRequest,
27    RegisteredServer, StatusCode, UAString,
28};
29
30use super::{
31    connection::SessionBuilder, process_service_result, process_unexpected_response, EndpointInfo,
32    Session, SessionEventLoop,
33};
34
35/// Wrapper around common data for generating sessions and performing requests
36/// with one-shot connections.
37pub struct Client {
38    /// Client configuration
39    pub(super) config: ClientConfig,
40    /// Certificate store is where certificates go.
41    certificate_store: Arc<RwLock<CertificateStore>>,
42}
43
44impl Client {
45    /// Create a new client from config.
46    ///
47    /// Note that this does not make any connection to the server.
48    ///
49    /// # Arguments
50    ///
51    /// * `config` - Client configuration object.
52    pub fn new(config: ClientConfig) -> Self {
53        let application_description = if config.create_sample_keypair {
54            Some(config.application_description())
55        } else {
56            None
57        };
58
59        let (mut certificate_store, client_certificate, client_pkey) =
60            CertificateStore::new_with_x509_data(
61                &config.pki_dir,
62                false,
63                config.certificate_path.as_deref(),
64                config.private_key_path.as_deref(),
65                application_description,
66            );
67        if client_certificate.is_none() || client_pkey.is_none() {
68            error!("Client is missing its application instance certificate and/or its private key. Encrypted endpoints will not function correctly.")
69        }
70
71        // Clients may choose to skip additional server certificate validations
72        certificate_store.set_skip_verify_certs(!config.verify_server_certs);
73
74        // Clients may choose to auto trust servers to save some messing around with rejected certs
75        certificate_store.set_trust_unknown_certs(config.trust_server_certs);
76
77        // The session retry policy dictates how many times to retry if connection to the server goes down
78        // and on what interval
79
80        Self {
81            config,
82            certificate_store: Arc::new(RwLock::new(certificate_store)),
83        }
84    }
85
86    /// Get a new session builder that can be used to build a session dynamically.
87    pub fn session_builder(&self) -> SessionBuilder<'_> {
88        SessionBuilder::<'_>::new(&self.config)
89    }
90
91    /// Connects to a named endpoint that you have defined in the `ClientConfig`
92    /// and creates a [`Session`] for that endpoint. Note that `GetEndpoints` is first
93    /// called on the server and it is expected to support the endpoint you intend to connect to.
94    ///
95    /// # Returns
96    ///
97    /// * `Ok((Arc<Session>, SessionEventLoop))` - Session and event loop.
98    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
99    ///
100    pub async fn connect_to_endpoint_id(
101        &mut self,
102        endpoint_id: impl Into<String>,
103    ) -> Result<(Arc<Session>, SessionEventLoop<TcpConnector>), Error> {
104        self.session_builder()
105            .with_endpoints(self.get_server_endpoints().await?)
106            .connect_to_endpoint_id(endpoint_id)?
107            .build(self.certificate_store.clone())
108    }
109
110    /// Connects to an ad-hoc server endpoint description.
111    ///
112    /// This function returns both a reference to the session, and a `SessionEventLoop`. You must run and
113    /// poll the event loop in order to actually establish a connection.
114    ///
115    /// This method will not attempt to create a session on the server, that will only happen once you start polling
116    /// the session event loop.
117    ///
118    /// # Arguments
119    ///
120    /// * `endpoint` - Discovery endpoint, the client will first connect to this in order to get a list of the
121    ///   available endpoints on the server.
122    /// * `user_identity_token` - Identity token to use for authentication.
123    ///
124    /// # Returns
125    ///
126    /// * `Ok((Arc<Session>, SessionEventLoop))` - Session and event loop.
127    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
128    ///
129    pub async fn connect_to_matching_endpoint(
130        &mut self,
131        endpoint: impl Into<EndpointDescription>,
132        user_identity_token: IdentityToken,
133    ) -> Result<(Arc<Session>, SessionEventLoop<TcpConnector>), Error> {
134        let endpoint = endpoint.into();
135
136        // Get the server endpoints
137        let server_url = endpoint.endpoint_url.as_ref();
138
139        self.session_builder()
140            .with_endpoints(self.get_server_endpoints_from_url(server_url).await?)
141            .connect_to_matching_endpoint(endpoint)?
142            .user_identity_token(user_identity_token)
143            .build(self.certificate_store.clone())
144    }
145
146    /// Connects to a server directly using provided [`EndpointDescription`].
147    ///
148    /// This function returns both a reference to the session, and a `SessionEventLoop`. You must run and
149    /// poll the event loop in order to actually establish a connection.
150    ///
151    /// This method will not attempt to create a session on the server, that will only happen once you start polling
152    /// the session event loop.
153    ///
154    /// # Arguments
155    ///
156    /// * `endpoint` - Endpoint to connect to.
157    /// * `identity_token` - Identity token for authentication.
158    ///
159    /// # Returns
160    ///
161    /// * `Ok((Arc<Session>, SessionEventLoop))` - Session and event loop.
162    /// * `Err(Error)` - Endpoint is invalid.
163    ///
164    pub fn connect_to_endpoint_directly(
165        &mut self,
166        endpoint: impl Into<EndpointDescription>,
167        identity_token: IdentityToken,
168    ) -> Result<(Arc<Session>, SessionEventLoop<TcpConnector>), Error> {
169        self.session_builder()
170            .connect_to_endpoint_directly(endpoint)?
171            .user_identity_token(identity_token)
172            .build(self.certificate_store.clone())
173    }
174
175    /// Creates a new [`Session`] using the default endpoint specified in the config. If
176    /// there is no default, or the endpoint does not exist, this function will return an error
177    ///
178    /// This function returns both a reference to the session, and a `SessionEventLoop`. You must run and
179    /// poll the event loop in order to actually establish a connection.
180    ///
181    /// This method will not attempt to create a session on the server, that will only happen once you start polling
182    /// the session event loop.
183    ///
184    /// # Arguments
185    ///
186    /// * `endpoints` - A list of [`EndpointDescription`] containing the endpoints available on the server.
187    ///
188    /// # Returns
189    ///
190    /// * `Ok((Arc<Session>, SessionEventLoop))` - Session and event loop.
191    /// * `Err(String)` - Endpoint is invalid.
192    ///
193    pub async fn connect_to_default_endpoint(
194        &mut self,
195    ) -> Result<(Arc<Session>, SessionEventLoop<TcpConnector>), Error> {
196        self.session_builder()
197            .with_endpoints(self.get_server_endpoints().await?)
198            .connect_to_default_endpoint()?
199            .build(self.certificate_store.clone())
200    }
201
202    /// Create a secure channel using the provided [`SessionInfo`].
203    ///
204    /// This is used when creating temporary connections to the server, when creating a session,
205    /// [`Session`] manages its own channel.
206    fn channel_from_endpoint_info(
207        &self,
208        endpoint_info: EndpointInfo,
209        channel_lifetime: u32,
210    ) -> AsyncSecureChannel {
211        AsyncSecureChannel::new(
212            self.certificate_store.clone(),
213            endpoint_info,
214            self.config.session_retry_policy(),
215            self.config.performance.ignore_clock_skew,
216            Arc::default(),
217            TransportConfiguration {
218                send_buffer_size: self.config.decoding_options.max_chunk_size,
219                recv_buffer_size: self.config.decoding_options.max_incoming_chunk_size,
220                max_message_size: self.config.decoding_options.max_message_size,
221                max_chunk_count: self.config.decoding_options.max_chunk_count,
222            },
223            channel_lifetime,
224            // We should only ever need the default decoding context for temporary connections.
225            Arc::new(RwLock::new(ContextOwned::new_default(
226                NamespaceMap::new(),
227                self.decoding_options(),
228            ))),
229        )
230    }
231
232    /// Gets the [`ClientEndpoint`] information for the default endpoint, as defined
233    /// by the configuration. If there is no default endpoint, this function will return an error.
234    ///
235    /// # Returns
236    ///
237    /// * `Ok(ClientEndpoint)` - The default endpoint set in config.
238    /// * `Err(String)` - No default endpoint could be found.
239    pub fn default_endpoint(&self) -> Result<ClientEndpoint, String> {
240        let default_endpoint_id = self.config.default_endpoint.clone();
241        if default_endpoint_id.is_empty() {
242            Err("No default endpoint has been specified".to_string())
243        } else if let Some(endpoint) = self.config.endpoints.get(&default_endpoint_id) {
244            Ok(endpoint.clone())
245        } else {
246            Err(format!(
247                "Cannot find default endpoint with id {default_endpoint_id}"
248            ))
249        }
250    }
251
252    /// Get the list of endpoints for the server at the configured default endpoint.
253    ///
254    /// # Returns
255    ///
256    /// * `Ok(Vec<EndpointDescription>)` - A list of the available endpoints on the server.
257    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
258    pub async fn get_server_endpoints(&self) -> Result<Vec<EndpointDescription>, Error> {
259        let default_endpoint = self
260            .default_endpoint()
261            .map_err(|e| Error::new(StatusCode::BadConfigurationError, e))?;
262        if let Ok(server_url) = server_url_from_endpoint_url(&default_endpoint.url) {
263            self.get_server_endpoints_from_url(server_url).await
264        } else {
265            error!(
266                "Cannot create a server url from the specified endpoint url {}",
267                default_endpoint.url
268            );
269            Err(Error::new(
270                StatusCode::BadUnexpectedError,
271                format!(
272                    "Cannot create a server url from the specified endpoint url {}",
273                    default_endpoint.url
274                ),
275            ))
276        }
277    }
278
279    fn decoding_options(&self) -> DecodingOptions {
280        let decoding_options = &self.config.decoding_options;
281        DecodingOptions {
282            max_chunk_count: decoding_options.max_chunk_count,
283            max_message_size: decoding_options.max_message_size,
284            max_string_length: decoding_options.max_string_length,
285            max_byte_string_length: decoding_options.max_byte_string_length,
286            max_array_length: decoding_options.max_array_length,
287            client_offset: Duration::zero(),
288            ..Default::default()
289        }
290    }
291
292    async fn get_server_endpoints_inner(
293        &self,
294        endpoint: &EndpointDescription,
295        channel: &AsyncSecureChannel,
296        locale_ids: Option<Vec<UAString>>,
297        profile_uris: Option<Vec<UAString>>,
298    ) -> Result<Vec<EndpointDescription>, StatusCode> {
299        let request = GetEndpointsRequest {
300            request_header: channel.make_request_header(self.config.request_timeout),
301            endpoint_url: endpoint.endpoint_url.clone(),
302            locale_ids,
303            profile_uris,
304        };
305        // Send the message and wait for a response.
306        let response = channel.send(request, self.config.request_timeout).await?;
307        if let ResponseMessage::GetEndpoints(response) = response {
308            process_service_result(&response.response_header)?;
309            match response.endpoints {
310                None => Ok(Vec::new()),
311                Some(endpoints) => Ok(endpoints),
312            }
313        } else {
314            Err(process_unexpected_response(response))
315        }
316    }
317
318    /// Get the list of endpoints for the server at the given URL.
319    ///
320    /// # Arguments
321    ///
322    /// * `server` - Connector to an OPC-UA server. This is implemented for `String` and `&str`, which will
323    ///   do a direct connection.
324    ///
325    /// # Returns
326    ///
327    /// * `Ok(Vec<EndpointDescription>)` - A list of the available endpoints on the server.
328    /// * `Err(Error)` - Request failed, [Status code](StatusCode) is the reason for failure.
329    pub async fn get_server_endpoints_from_url(
330        &self,
331        server: impl ConnectorBuilder,
332    ) -> Result<Vec<EndpointDescription>, Error> {
333        self.get_endpoints(server, &[], &[]).await
334    }
335
336    /// Get the list of endpoints for the server at the given URL.
337    ///
338    /// # Arguments
339    ///
340    /// * `server` - Connector to an OPC-UA server. This is implemented for `String` and `&str`, which will
341    ///   do a direct connection.
342    /// * `locale_ids` - List of required locale IDs on the given server endpoint.
343    /// * `profile_uris` - Returned endpoints should match one of these profile URIs.
344    ///
345    /// # Returns
346    ///
347    /// * `Ok(Vec<EndpointDescription>)` - A list of the available endpoints on the server.
348    /// * `Err(Error)` - Request failed, [Status code](StatusCode) is the reason for failure.
349    pub async fn get_endpoints(
350        &self,
351        server: impl ConnectorBuilder,
352        locale_ids: &[&str],
353        profile_uris: &[&str],
354    ) -> Result<Vec<EndpointDescription>, Error> {
355        let server = server.build()?;
356        let preferred_locales = Vec::new();
357        // Most of these fields mean nothing when getting endpoints
358        let endpoint = server.default_endpoint();
359        let endpoint_info = EndpointInfo {
360            endpoint: endpoint.clone(),
361            user_identity_token: IdentityToken::Anonymous,
362            preferred_locales,
363        };
364        let channel = self.channel_from_endpoint_info(endpoint_info, self.config.channel_lifetime);
365
366        let mut evt_loop = channel
367            .connect(&server)
368            .await
369            .map_err(|e| Error::new(e, "Failed to connect to server"))?;
370
371        let send_fut = self.get_server_endpoints_inner(
372            &endpoint,
373            &channel,
374            if locale_ids.is_empty() {
375                None
376            } else {
377                Some(locale_ids.iter().map(|i| (*i).into()).collect())
378            },
379            if profile_uris.is_empty() {
380                None
381            } else {
382                Some(profile_uris.iter().map(|i| (*i).into()).collect())
383            },
384        );
385        pin!(send_fut);
386
387        let res = loop {
388            select! {
389                r = evt_loop.poll() => {
390                    if let TransportPollResult::Closed(e) = r {
391                        return Err(Error::new(e, "Transport closed unexpectedly"));
392                    }
393                },
394                res = &mut send_fut => break res.map_err(|e| Error::new(e, "Failed to get endpoints")),
395            }
396        };
397
398        channel.close_channel().await;
399
400        loop {
401            if matches!(evt_loop.poll().await, TransportPollResult::Closed(_)) {
402                break;
403            }
404        }
405
406        res
407    }
408
409    async fn find_servers_inner(
410        &self,
411        endpoint_url: String,
412        channel: &AsyncSecureChannel,
413        locale_ids: Option<Vec<UAString>>,
414        server_uris: Option<Vec<UAString>>,
415    ) -> Result<Vec<ApplicationDescription>, StatusCode> {
416        let request = FindServersRequest {
417            request_header: channel.make_request_header(self.config.request_timeout),
418            endpoint_url: endpoint_url.into(),
419            locale_ids,
420            server_uris,
421        };
422
423        let response = channel.send(request, self.config.request_timeout).await?;
424        if let ResponseMessage::FindServers(response) = response {
425            process_service_result(&response.response_header)?;
426            Ok(response.servers.unwrap_or_default())
427        } else {
428            Err(process_unexpected_response(response))
429        }
430    }
431
432    /// Connects to a discovery server and asks the server for a list of
433    /// available servers' [`ApplicationDescription`].
434    ///
435    /// # Arguments
436    ///
437    /// * `discovery_endpoint_url` - Discovery endpoint to connect to.
438    /// * `locale_ids` - List of locales to use.
439    /// * `server_uris` - List of servers to return. If empty, all known servers are returned.
440    ///
441    /// # Returns
442    ///
443    /// * `Ok(Vec<ApplicationDescription>)` - List of descriptions for servers known to the discovery server.
444    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
445    pub async fn find_servers(
446        &self,
447        discovery_endpoint: impl ConnectorBuilder,
448        locale_ids: Option<Vec<UAString>>,
449        server_uris: Option<Vec<UAString>>,
450    ) -> Result<Vec<ApplicationDescription>, StatusCode> {
451        let discovery_endpoint = discovery_endpoint.build()?;
452        let endpoint = discovery_endpoint.default_endpoint();
453        let session_info = EndpointInfo {
454            endpoint: endpoint.clone(),
455            user_identity_token: IdentityToken::Anonymous,
456            preferred_locales: Vec::new(),
457        };
458        let channel = self.channel_from_endpoint_info(session_info, self.config.channel_lifetime);
459
460        let mut evt_loop = channel.connect(&discovery_endpoint).await?;
461
462        let send_fut = self.find_servers_inner(
463            evt_loop.connected_url().to_owned(),
464            &channel,
465            locale_ids,
466            server_uris,
467        );
468        pin!(send_fut);
469
470        let res = loop {
471            select! {
472                r = evt_loop.poll() => {
473                    if let TransportPollResult::Closed(e) = r {
474                        return Err(e);
475                    }
476                },
477                res = &mut send_fut => break res
478            }
479        };
480
481        channel.close_channel().await;
482
483        loop {
484            if matches!(evt_loop.poll().await, TransportPollResult::Closed(_)) {
485                break;
486            }
487        }
488
489        res
490    }
491
492    async fn find_servers_on_network_inner(
493        &self,
494        starting_record_id: u32,
495        max_records_to_return: u32,
496        server_capability_filter: Option<Vec<UAString>>,
497        channel: &AsyncSecureChannel,
498    ) -> Result<FindServersOnNetworkResponse, StatusCode> {
499        let request = FindServersOnNetworkRequest {
500            request_header: channel.make_request_header(self.config.request_timeout),
501            starting_record_id,
502            max_records_to_return,
503            server_capability_filter,
504        };
505
506        let response = channel.send(request, self.config.request_timeout).await?;
507        if let ResponseMessage::FindServersOnNetwork(response) = response {
508            process_service_result(&response.response_header)?;
509            Ok(*response)
510        } else {
511            Err(process_unexpected_response(response))
512        }
513    }
514
515    /// Connects to a discovery server and asks for a list of available servers on the network.
516    ///
517    /// See OPC UA Part 4 - Services 5.5.3 for a complete description of the service.
518    ///
519    /// # Arguments
520    ///
521    /// * `discovery_endpoint_url` - Endpoint to connect to.
522    /// * `starting_record_id` - Only records with an identifier greater than this number
523    ///   will be returned.
524    /// * `max_records_to_return` - The maximum number of records to return in the response.
525    ///   0 indicates that there is no limit.
526    /// * `server_capability_filter` - List of server capability filters. Only records with
527    ///   all the specified server capabilities are returned.
528    ///
529    /// # Returns
530    ///
531    /// * `Ok(FindServersOnNetworkResponse)` - Full service response object.
532    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
533    pub async fn find_servers_on_network(
534        &self,
535        discovery_endpoint: impl ConnectorBuilder,
536        starting_record_id: u32,
537        max_records_to_return: u32,
538        server_capability_filter: Option<Vec<UAString>>,
539    ) -> Result<FindServersOnNetworkResponse, StatusCode> {
540        let discovery_endpoint = discovery_endpoint.build()?;
541        let endpoint = discovery_endpoint.default_endpoint();
542        let session_info = EndpointInfo {
543            endpoint: endpoint.clone(),
544            user_identity_token: IdentityToken::Anonymous,
545            preferred_locales: Vec::new(),
546        };
547        let channel = self.channel_from_endpoint_info(session_info, self.config.channel_lifetime);
548
549        let mut evt_loop = channel.connect(&discovery_endpoint).await?;
550
551        let send_fut = self.find_servers_on_network_inner(
552            starting_record_id,
553            max_records_to_return,
554            server_capability_filter,
555            &channel,
556        );
557        pin!(send_fut);
558
559        let res = loop {
560            select! {
561                r = evt_loop.poll() => {
562                    if let TransportPollResult::Closed(e) = r {
563                        return Err(e);
564                    }
565                },
566                res = &mut send_fut => break res
567            }
568        };
569
570        channel.close_channel().await;
571
572        loop {
573            if matches!(evt_loop.poll().await, TransportPollResult::Closed(_)) {
574                break;
575            }
576        }
577
578        res
579    }
580
581    /// Find an endpoint supplied from the list of endpoints that matches the input criteria.
582    ///
583    /// # Arguments
584    ///
585    /// * `endpoints` - List of available endpoints on the server.
586    /// * `endpoint_url` - Given endpoint URL.
587    /// * `security_policy` - Required security policy.
588    /// * `security_mode` - Required security mode.
589    ///
590    /// # Returns
591    ///
592    /// * `Some(EndpointDescription)` - Validated endpoint.
593    /// * `None` - No matching endpoint was found.
594    pub fn find_matching_endpoint(
595        endpoints: &[EndpointDescription],
596        endpoint_url: &str,
597        security_policy: SecurityPolicy,
598        security_mode: MessageSecurityMode,
599    ) -> Option<EndpointDescription> {
600        if security_policy == SecurityPolicy::Unknown {
601            panic!("Cannot match against unknown security policy");
602        }
603
604        let mut matching_endpoint = endpoints
605            .iter()
606            .find(|e| {
607                // Endpoint matches if the security mode, policy and url match
608                security_mode == e.security_mode
609                    && security_policy == SecurityPolicy::from_uri(e.security_policy_uri.as_ref())
610                    && url_matches_except_host(endpoint_url, e.endpoint_url.as_ref())
611            })
612            .cloned()?;
613
614        let hostname = hostname_from_url(endpoint_url).ok()?;
615        let new_endpoint_url =
616            url_with_replaced_hostname(matching_endpoint.endpoint_url.as_ref(), &hostname).ok()?;
617
618        // Issue #16, #17 - the server may advertise an endpoint whose hostname is inaccessible
619        // to the client so substitute the advertised hostname with the one the client supplied.
620        matching_endpoint.endpoint_url = new_endpoint_url.into();
621        Some(matching_endpoint)
622    }
623
624    /// Determine if we recognize the security of this endpoint.
625    ///
626    /// # Arguments
627    ///
628    /// * `endpoint` - Endpoint to check.
629    ///
630    /// # Returns
631    ///
632    /// * `bool` - `true` if the endpoint is supported.
633    pub fn is_supported_endpoint(&self, endpoint: &EndpointDescription) -> bool {
634        if let Ok(security_policy) = SecurityPolicy::from_str(endpoint.security_policy_uri.as_ref())
635        {
636            !matches!(security_policy, SecurityPolicy::Unknown)
637        } else {
638            false
639        }
640    }
641
642    async fn register_server_inner(
643        &self,
644        server: RegisteredServer,
645        channel: &AsyncSecureChannel,
646    ) -> Result<(), StatusCode> {
647        let request = RegisterServerRequest {
648            request_header: channel.make_request_header(self.config.request_timeout),
649            server,
650        };
651        let response = channel.send(request, self.config.request_timeout).await?;
652        if let ResponseMessage::RegisterServer(response) = response {
653            process_service_result(&response.response_header)?;
654            Ok(())
655        } else {
656            Err(process_unexpected_response(response))
657        }
658    }
659
660    /// Get the best endpoint for the server, "best" here is the endpoint with the highest security level.
661    ///
662    /// # Arguments
663    ///
664    /// * `discovery_endpoint` - Endpoint to connect to.
665    pub async fn get_best_endpoint(
666        &self,
667        discovery_endpoint: impl ConnectorBuilder,
668    ) -> Result<EndpointDescription, Error> {
669        let discovery_endpoint = discovery_endpoint.build()?;
670        let endpoints = self
671            .get_server_endpoints_from_url(
672                discovery_endpoint.default_endpoint().endpoint_url.as_ref(),
673            )
674            .await?;
675        if endpoints.is_empty() {
676            return Err(Error::new(
677                StatusCode::BadUnexpectedError,
678                "No endpoints returned from server",
679            ));
680        }
681
682        let Some(endpoint) = endpoints
683            .into_iter()
684            .filter(|e| self.is_supported_endpoint(e))
685            .max_by(|a, b| a.security_level.cmp(&b.security_level))
686        else {
687            error!("Cannot find an endpoint that we can use");
688            return Err(Error::new(
689                StatusCode::BadUnexpectedError,
690                "No supported endpoints returned from server",
691            ));
692        };
693
694        Ok(endpoint)
695    }
696
697    /// This function is used by servers that wish to register themselves with a discovery server.
698    /// i.e. one server is the client to another server. The server sends a [`RegisterServerRequest`]
699    /// to the discovery server to register itself. Servers are expected to re-register themselves periodically
700    /// with the discovery server, with a maximum of 10 minute intervals.
701    ///
702    /// See OPC UA Part 4 - Services 5.4.5 for complete description of the service and error responses.
703    ///
704    /// # Arguments
705    ///
706    /// * `connector` - Connector to the discovery server. This is implemented for `String` and `&str`.
707    ///   A simple usage would be to just pass `server_endpoint.endpoint_url.as_ref()` here.
708    /// * `server_endpoint` - The endpoint to use for registration. If this requires security, you first need to get an appropriate.
709    ///   server endpoint using `get_best_endpoint` or `get_server_endpoints_from_url`.
710    /// * `server` - The server to register
711    ///
712    /// # Returns
713    ///
714    /// * `Ok(())` - Success
715    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
716    ///
717    pub async fn register_server(
718        &self,
719        connector: impl ConnectorBuilder,
720        server_endpoint: &EndpointDescription,
721        server: RegisteredServer,
722    ) -> Result<(), StatusCode> {
723        let endpoint_info = EndpointInfo {
724            endpoint: server_endpoint.clone(),
725            user_identity_token: IdentityToken::Anonymous,
726            preferred_locales: Vec::new(),
727        };
728        let connector = connector.build()?;
729        let channel = self.channel_from_endpoint_info(endpoint_info, self.config.channel_lifetime);
730
731        let mut evt_loop = channel.connect(&connector).await?;
732
733        let send_fut = self.register_server_inner(server, &channel);
734        pin!(send_fut);
735
736        let res = loop {
737            select! {
738                r = evt_loop.poll() => {
739                    if let TransportPollResult::Closed(e) = r {
740                        return Err(e);
741                    }
742                },
743                res = &mut send_fut => break res
744            }
745        };
746
747        channel.close_channel().await;
748
749        loop {
750            if matches!(evt_loop.poll().await, TransportPollResult::Closed(_)) {
751                break;
752            }
753        }
754
755        res
756    }
757
758    /// Get the certificate store.
759    pub fn certificate_store(&self) -> &Arc<RwLock<CertificateStore>> {
760        &self.certificate_store
761    }
762}