opcua_client/session/
connection.rs

1use std::{net::SocketAddr, str::FromStr, sync::Arc};
2
3use opcua_core::{comms::url::is_opc_ua_binary_url, config::Config, sync::RwLock};
4use opcua_crypto::{CertificateStore, SecurityPolicy};
5use opcua_types::{
6    ContextOwned, EndpointDescription, Error, MessageSecurityMode, NamespaceMap, NodeId,
7    StatusCode, TypeLoader, UserTokenType,
8};
9use tokio::net::TcpListener;
10
11use crate::{
12    reverse_connect::TcpConnectorReceiver,
13    transport::{
14        tcp::TransportConfiguration, Connector, ConnectorBuilder, ReverseHelloVerifier,
15        ReverseTcpConnector,
16    },
17    AsyncSecureChannel, ClientConfig, IdentityToken,
18};
19
20use super::{Client, EndpointInfo, Session, SessionEventLoop};
21
22struct SessionBuilderInner {
23    session_id: Option<NodeId>,
24    user_identity_token: IdentityToken,
25    type_loaders: Vec<Arc<dyn TypeLoader>>,
26}
27
28/// Trait for getting a connection builder for a given endpoint.
29/// This is not the neatest interface, but it makes it possible to use a different
30/// connection source in the session builder.
31///
32/// Essentially, ConnectionSource takes an endpoint, and returns a connector builder,
33/// which is directly converted into a connector, which is then used to create a
34/// transport.
35///
36/// In practice:
37///
38///  - A ConnectionSource uses an EndpointDescription to get a ConnectorBuilder.
39///  - A ConnectorBuilder is directly converted into a Connector. This trait exists
40///    so that methods that connect to an endpoint can take a type that implements ConnectorBuilder,
41///    for example an endpoint description.
42///  - A Connector is used to create a transport, which is then used to connect to the server.
43pub trait ConnectionSource {
44    /// The type of connector builder returned by this connection source.
45    type Builder: ConnectorBuilder;
46
47    /// Get a connector builder for the given endpoint description.
48    fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error>;
49}
50
51/// Connection source for a direct OPC/TCP binary connection.
52/// This is the default connection source used by the session builder, and by far the most
53/// common when connecting to an OPC-UA server.
54pub struct DirectConnectionSource;
55
56impl ConnectionSource for DirectConnectionSource {
57    type Builder = String;
58    fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error> {
59        Ok(endpoint.endpoint_url.as_ref().to_string())
60    }
61}
62
63/// Connection source for a reverse connection.
64/// When using this, the server will initiate the connection to the client.
65pub struct ReverseConnectionSource {
66    listener: TcpConnectorReceiver,
67    verifier: Option<Arc<dyn ReverseHelloVerifier + Send + Sync>>,
68}
69
70impl ReverseConnectionSource {
71    /// Create a new reverse connection source with a TCP listener.
72    pub fn new_listener(listener: Arc<TcpListener>) -> Self {
73        Self {
74            listener: TcpConnectorReceiver::Listener(listener),
75            verifier: None,
76        }
77    }
78
79    /// Create a new reverse connection source listening on the given address.
80    pub fn new_address(address: SocketAddr) -> Self {
81        Self {
82            listener: TcpConnectorReceiver::Address(address),
83            verifier: None,
84        }
85    }
86
87    /// Set a custom verifier for the reverse connection source.
88    /// If not set, the default verifier will be used, which
89    /// simply compares the endpoint URL with the configured endpoint URL.
90    pub fn with_verifier(
91        mut self,
92        verifier: impl ReverseHelloVerifier + Send + Sync + 'static,
93    ) -> Self {
94        self.verifier = Some(Arc::new(verifier));
95        self
96    }
97}
98
99impl ConnectionSource for ReverseConnectionSource {
100    type Builder = ReverseTcpConnector;
101
102    fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error> {
103        if let Some(verifier) = self.verifier.clone() {
104            Ok(ReverseTcpConnector::new(
105                self.listener.clone(),
106                verifier,
107                endpoint.clone(),
108            ))
109        } else {
110            Ok(ReverseTcpConnector::new_default(
111                endpoint.clone(),
112                self.listener.clone(),
113            ))
114        }
115    }
116}
117
118/// Type-state builder for a session and session event loop.
119/// To use, you will typically first call [SessionBuilder::with_endpoints] to set
120/// a list of available endpoints, then one of the `connect_to` methods, then finally
121/// [SessionBuilder::build].
122pub struct SessionBuilder<'a, T = (), R = (), C = DirectConnectionSource> {
123    endpoint: T,
124    config: &'a ClientConfig,
125    endpoints: R,
126    inner: SessionBuilderInner,
127    connection_source: C,
128}
129
130impl<'a> SessionBuilder<'a, (), (), DirectConnectionSource> {
131    /// Create a new, empty session builder.
132    pub fn new(config: &'a ClientConfig) -> Self {
133        Self {
134            endpoint: (),
135            config,
136            endpoints: (),
137            inner: SessionBuilderInner {
138                session_id: None,
139                user_identity_token: IdentityToken::Anonymous,
140                type_loaders: Vec::new(),
141            },
142            connection_source: DirectConnectionSource,
143        }
144    }
145}
146
147impl<'a, T, C> SessionBuilder<'a, T, (), C> {
148    /// Set a list of available endpoints on the server.
149    ///
150    /// You'll typically get this from [Client::get_server_endpoints].
151    pub fn with_endpoints(
152        self,
153        endpoints: Vec<EndpointDescription>,
154    ) -> SessionBuilder<'a, T, Vec<EndpointDescription>, C> {
155        SessionBuilder {
156            inner: self.inner,
157            endpoint: self.endpoint,
158            config: self.config,
159            endpoints,
160            connection_source: self.connection_source,
161        }
162    }
163}
164
165impl<'a, T, R, C> SessionBuilder<'a, T, R, C> {
166    /// Set the user identity token to use.
167    pub fn user_identity_token(mut self, identity_token: IdentityToken) -> Self {
168        self.inner.user_identity_token = identity_token;
169        self
170    }
171
172    /// Set an initial session ID. The session will try to reactivate this session
173    /// before creating a new session. This can be useful to persist session IDs
174    /// between program executions, to avoid having to recreate subscriptions.
175    pub fn session_id(mut self, session_id: NodeId) -> Self {
176        self.inner.session_id = Some(session_id);
177        self
178    }
179
180    /// Add an initial type loader to the session. You can add more of these later.
181    /// Note that custom type loaders will likely not work until namespaces
182    /// are fetched from the server.
183    pub fn type_loader(mut self, type_loader: Arc<dyn TypeLoader>) -> Self {
184        self.inner.type_loaders.push(type_loader);
185        self
186    }
187
188    fn endpoint_supports_token(&self, endpoint: &EndpointDescription) -> bool {
189        match &self.inner.user_identity_token {
190            IdentityToken::Anonymous => {
191                endpoint.user_identity_tokens.is_none()
192                    || endpoint
193                        .user_identity_tokens
194                        .as_ref()
195                        .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::Anonymous))
196            }
197            IdentityToken::UserName(_, _) => endpoint
198                .user_identity_tokens
199                .as_ref()
200                .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::UserName)),
201            IdentityToken::X509(_, _) => endpoint
202                .user_identity_tokens
203                .as_ref()
204                .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::Certificate)),
205            IdentityToken::IssuedToken(_) => endpoint
206                .user_identity_tokens
207                .as_ref()
208                .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::IssuedToken)),
209        }
210    }
211
212    /// Set the connection source to use. This is used to create the transport
213    /// connector. Defaults to a direct TCP connection, implemented by `()`.
214    pub fn with_connector<CS>(self, connection_source: CS) -> SessionBuilder<'a, T, R, CS>
215    where
216        CS: ConnectionSource,
217    {
218        SessionBuilder {
219            inner: self.inner,
220            endpoint: self.endpoint,
221            config: self.config,
222            endpoints: self.endpoints,
223            connection_source,
224        }
225    }
226}
227
228impl<'a, C> SessionBuilder<'a, (), Vec<EndpointDescription>, C> {
229    /// Connect to an endpoint matching the given endpoint description.
230    pub fn connect_to_matching_endpoint(
231        self,
232        endpoint: impl Into<EndpointDescription>,
233    ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
234        let endpoint = endpoint.into();
235
236        let security_policy = SecurityPolicy::from_str(endpoint.security_policy_uri.as_ref())
237            .map_err(|_| {
238                Error::new(
239                    StatusCode::BadSecurityPolicyRejected,
240                    format!(
241                        "Invalid security policy: {}",
242                        endpoint.security_policy_uri.as_ref()
243                    ),
244                )
245            })?;
246        let server_endpoint = Client::find_matching_endpoint(
247            &self.endpoints,
248            endpoint.endpoint_url.as_ref(),
249            security_policy,
250            endpoint.security_mode,
251        )
252        .ok_or(Error::new(
253            StatusCode::BadTcpEndpointUrlInvalid,
254            format!(
255                "Cannot find matching endpoint for {}",
256                endpoint.endpoint_url.as_ref()
257            ),
258        ))?;
259
260        Ok(SessionBuilder {
261            inner: self.inner,
262            endpoint: server_endpoint,
263            config: self.config,
264            endpoints: self.endpoints,
265            connection_source: self.connection_source,
266        })
267    }
268
269    /// Connect to the configured default endpoint, this will use the user identity token configured in the
270    /// default endpoint.
271    pub fn connect_to_default_endpoint(
272        mut self,
273    ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
274        let default_endpoint_id = self.config.default_endpoint.clone();
275        let endpoint = if default_endpoint_id.is_empty() {
276            return Err(Error::new(
277                StatusCode::BadConfigurationError,
278                "No default endpoint has been specified",
279            ));
280        } else if let Some(endpoint) = self.config.endpoints.get(&default_endpoint_id) {
281            endpoint.clone()
282        } else {
283            return Err(Error::new(
284                StatusCode::BadInvalidArgument,
285                format!("Cannot find default endpoint with id {default_endpoint_id}"),
286            ));
287        };
288        let user_identity_token = self.config.client_identity_token(&endpoint.user_token_id)?;
289        let endpoint = self
290            .config
291            .endpoint_description_for_client_endpoint(&endpoint, &self.endpoints)?;
292        self.inner.user_identity_token = user_identity_token;
293        Ok(SessionBuilder {
294            inner: self.inner,
295            endpoint,
296            config: self.config,
297            endpoints: self.endpoints,
298            connection_source: self.connection_source,
299        })
300    }
301
302    /// Connect to the configured endpoint with the given id, this will use the user identity token configured in the
303    /// configured endpoint.
304    pub fn connect_to_endpoint_id(
305        mut self,
306        endpoint_id: impl Into<String>,
307    ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
308        let endpoint_id = endpoint_id.into();
309        let endpoint = self.config.endpoints.get(&endpoint_id).ok_or_else(|| {
310            Error::new(
311                StatusCode::BadInvalidArgument,
312                format!("Cannot find endpoint with id {endpoint_id}"),
313            )
314        })?;
315        let user_identity_token = self.config.client_identity_token(&endpoint.user_token_id)?;
316
317        let endpoint = self
318            .config
319            .endpoint_description_for_client_endpoint(endpoint, &self.endpoints)?;
320        self.inner.user_identity_token = user_identity_token;
321        Ok(SessionBuilder {
322            inner: self.inner,
323            endpoint,
324            config: self.config,
325            endpoints: self.endpoints,
326            connection_source: self.connection_source,
327        })
328    }
329
330    /// Attempt to pick the "best" endpoint. If `secure` is `false` this means
331    /// any unencrypted endpoint that supports the configured identity token.
332    /// If `secure` is `true`, the endpoint that supports the configured identity token with the highest
333    /// `securityLevel`.
334    pub fn connect_to_best_endpoint(
335        self,
336        secure: bool,
337    ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
338        let endpoint = if secure {
339            self.endpoints
340                .iter()
341                .filter(|e| self.endpoint_supports_token(e))
342                .max_by(|a, b| a.security_level.cmp(&b.security_level))
343        } else {
344            self.endpoints.iter().find(|e| {
345                e.security_mode == MessageSecurityMode::None && self.endpoint_supports_token(e)
346            })
347        };
348        let Some(endpoint) = endpoint else {
349            return Err(Error::new(
350                StatusCode::BadInvalidArgument,
351                "No suitable endpoint found",
352            ));
353        };
354        Ok(SessionBuilder {
355            inner: self.inner,
356            endpoint: endpoint.clone(),
357            config: self.config,
358            endpoints: self.endpoints,
359            connection_source: self.connection_source,
360        })
361    }
362}
363
364impl<'a, R, C> SessionBuilder<'a, (), R, C> {
365    /// Connect directly to an endpoint description, this does not require you to list
366    /// endpoints on the server first.
367    pub fn connect_to_endpoint_directly(
368        self,
369        endpoint: impl Into<EndpointDescription>,
370    ) -> Result<SessionBuilder<'a, EndpointDescription, R, C>, Error> {
371        let endpoint = endpoint.into();
372        if !is_opc_ua_binary_url(endpoint.endpoint_url.as_ref()) {
373            return Err(Error::new(
374                StatusCode::BadTcpEndpointUrlInvalid,
375                format!(
376                    "Endpoint url {} is not a valid / supported url",
377                    endpoint.endpoint_url
378                ),
379            ));
380        }
381        Ok(SessionBuilder {
382            endpoint,
383            config: self.config,
384            endpoints: self.endpoints,
385            inner: self.inner,
386            connection_source: self.connection_source,
387        })
388    }
389}
390
391impl<R, C> SessionBuilder<'_, EndpointDescription, R, C>
392where
393    C: ConnectionSource,
394{
395    /// Build the session and session event loop. Note that you will need to
396    /// start polling the event loop before a connection is actually established.
397    pub fn build(
398        self,
399        certificate_store: Arc<RwLock<CertificateStore>>,
400    ) -> Result<(Arc<Session>, SessionEventLoop), Error> {
401        let connector = self
402            .connection_source
403            .get_connector(&self.endpoint)?
404            .build()?;
405        let ctx = self.make_encoding_context();
406        Ok(Session::new(
407            Self::build_channel_inner(
408                certificate_store,
409                self.inner.user_identity_token,
410                self.endpoint,
411                self.config,
412                connector,
413                ctx,
414            ),
415            self.config.session_name.clone().into(),
416            self.config.application_description(),
417            self.config.session_retry_policy(),
418            self.config.decoding_options.as_comms_decoding_options(),
419            self.config,
420            self.inner.session_id,
421        ))
422    }
423
424    fn make_encoding_context(&self) -> ContextOwned {
425        let mut encoding_context = ContextOwned::new_default(
426            NamespaceMap::new(),
427            self.config.decoding_options.as_comms_decoding_options(),
428        );
429
430        for loader in self.inner.type_loaders.iter().cloned() {
431            encoding_context.loaders_mut().add(loader);
432        }
433
434        encoding_context
435    }
436
437    fn build_channel_inner(
438        certificate_store: Arc<RwLock<CertificateStore>>,
439        identity_token: IdentityToken,
440        endpoint: EndpointDescription,
441        config: &ClientConfig,
442        connector: Box<dyn Connector + Send + Sync + 'static>,
443        ctx: ContextOwned,
444    ) -> AsyncSecureChannel {
445        AsyncSecureChannel::new(
446            certificate_store,
447            EndpointInfo {
448                endpoint,
449                user_identity_token: identity_token,
450                preferred_locales: config.preferred_locales.clone(),
451            },
452            config.session_retry_policy(),
453            config.performance.ignore_clock_skew,
454            Arc::default(),
455            TransportConfiguration {
456                send_buffer_size: config.decoding_options.max_chunk_size,
457                recv_buffer_size: config.decoding_options.max_incoming_chunk_size,
458                max_message_size: config.decoding_options.max_message_size,
459                max_chunk_count: config.decoding_options.max_chunk_count,
460            },
461            connector,
462            config.channel_lifetime,
463            Arc::new(RwLock::new(ctx)),
464        )
465    }
466
467    /// Build a channel only, not creating a session.
468    /// This is useful if you want to manage the session lifetime yourself.
469    pub fn build_channel(
470        self,
471        certificate_store: Arc<RwLock<CertificateStore>>,
472    ) -> Result<AsyncSecureChannel, Error> {
473        let ctx = self.make_encoding_context();
474        let connector = self
475            .connection_source
476            .get_connector(&self.endpoint)?
477            .build()?;
478        Ok(Self::build_channel_inner(
479            certificate_store,
480            self.inner.user_identity_token,
481            self.endpoint,
482            self.config,
483            connector,
484            ctx,
485        ))
486    }
487}