Skip to main content

opcua_client/session/
connection.rs

1use std::{net::SocketAddr, str::FromStr, sync::Arc};
2
3use opcua_core::{comms::url::is_opc_ua_binary_url, config::Config, sync::RwLock};
4use opcua_crypto::{CertificateStore, SecurityPolicy};
5use opcua_types::{
6    ContextOwned, EndpointDescription, Error, MessageSecurityMode, NamespaceMap, NodeId,
7    StatusCode, TypeLoader, UserTokenType,
8};
9use tokio::net::TcpListener;
10
11use crate::{
12    reverse_connect::TcpConnectorReceiver,
13    transport::{
14        tcp::TransportConfiguration, ConnectorBuilder, ReverseHelloVerifier, ReverseTcpConnector,
15    },
16    AsyncSecureChannel, ClientConfig, IdentityToken,
17};
18
19use super::{Client, EndpointInfo, Session, SessionEventLoop};
20
21struct SessionBuilderInner {
22    session_id: Option<NodeId>,
23    user_identity_token: IdentityToken,
24    type_loaders: Vec<Arc<dyn TypeLoader>>,
25}
26
27/// Trait for getting a connection builder for a given endpoint.
28/// This is not the neatest interface, but it makes it possible to use a different
29/// connection source in the session builder.
30///
31/// Essentially, ConnectionSource takes an endpoint, and returns a connector builder,
32/// which is directly converted into a connector, which is then used to create a
33/// transport.
34///
35/// In practice:
36///
37///  - A ConnectionSource uses an EndpointDescription to get a ConnectorBuilder.
38///  - A ConnectorBuilder is directly converted into a Connector. This trait exists
39///    so that methods that connect to an endpoint can take a type that implements ConnectorBuilder,
40///    for example an endpoint description.
41///  - A Connector is used to create a transport, which is then used to connect to the server.
42pub trait ConnectionSource {
43    /// The type of connector builder returned by this connection source.
44    type Builder: ConnectorBuilder;
45
46    /// Get a connector builder for the given endpoint description.
47    fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error>;
48}
49
50/// Connection source for a direct OPC/TCP binary connection.
51/// This is the default connection source used by the session builder, and by far the most
52/// common when connecting to an OPC-UA server.
53pub struct DirectConnectionSource;
54
55impl ConnectionSource for DirectConnectionSource {
56    type Builder = String;
57    fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error> {
58        Ok(endpoint.endpoint_url.as_ref().to_string())
59    }
60}
61
62/// Connection source for a reverse connection.
63/// When using this, the server will initiate the connection to the client.
64pub struct ReverseConnectionSource {
65    listener: TcpConnectorReceiver,
66    verifier: Option<Arc<dyn ReverseHelloVerifier + Send + Sync>>,
67}
68
69impl ReverseConnectionSource {
70    /// Create a new reverse connection source with a TCP listener.
71    pub fn new_listener(listener: Arc<TcpListener>) -> Self {
72        Self {
73            listener: TcpConnectorReceiver::Listener(listener),
74            verifier: None,
75        }
76    }
77
78    /// Create a new reverse connection source listening on the given address.
79    pub fn new_address(address: SocketAddr) -> Self {
80        Self {
81            listener: TcpConnectorReceiver::Address(address),
82            verifier: None,
83        }
84    }
85
86    /// Set a custom verifier for the reverse connection source.
87    /// If not set, the default verifier will be used, which
88    /// simply compares the endpoint URL with the configured endpoint URL.
89    pub fn with_verifier(
90        mut self,
91        verifier: impl ReverseHelloVerifier + Send + Sync + 'static,
92    ) -> Self {
93        self.verifier = Some(Arc::new(verifier));
94        self
95    }
96}
97
98impl ConnectionSource for ReverseConnectionSource {
99    type Builder = ReverseTcpConnector;
100
101    fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error> {
102        if let Some(verifier) = self.verifier.clone() {
103            Ok(ReverseTcpConnector::new(
104                self.listener.clone(),
105                verifier,
106                endpoint.clone(),
107            ))
108        } else {
109            Ok(ReverseTcpConnector::new_default(
110                endpoint.clone(),
111                self.listener.clone(),
112            ))
113        }
114    }
115}
116
117/// Type-state builder for a session and session event loop.
118/// To use, you will typically first call [SessionBuilder::with_endpoints] to set
119/// a list of available endpoints, then one of the `connect_to` methods, then finally
120/// [SessionBuilder::build].
121pub struct SessionBuilder<'a, T = (), R = (), C = DirectConnectionSource> {
122    endpoint: T,
123    config: &'a ClientConfig,
124    endpoints: R,
125    inner: SessionBuilderInner,
126    connection_source: C,
127}
128
129impl<'a> SessionBuilder<'a, (), (), DirectConnectionSource> {
130    /// Create a new, empty session builder.
131    pub fn new(config: &'a ClientConfig) -> Self {
132        Self {
133            endpoint: (),
134            config,
135            endpoints: (),
136            inner: SessionBuilderInner {
137                session_id: None,
138                user_identity_token: IdentityToken::Anonymous,
139                type_loaders: Vec::new(),
140            },
141            connection_source: DirectConnectionSource,
142        }
143    }
144}
145
146impl<'a, T, C> SessionBuilder<'a, T, (), C> {
147    /// Set a list of available endpoints on the server.
148    ///
149    /// You'll typically get this from [Client::get_server_endpoints].
150    pub fn with_endpoints(
151        self,
152        endpoints: Vec<EndpointDescription>,
153    ) -> SessionBuilder<'a, T, Vec<EndpointDescription>, C> {
154        SessionBuilder {
155            inner: self.inner,
156            endpoint: self.endpoint,
157            config: self.config,
158            endpoints,
159            connection_source: self.connection_source,
160        }
161    }
162}
163
164impl<'a, T, R, C> SessionBuilder<'a, T, R, C> {
165    /// Set the user identity token to use.
166    pub fn user_identity_token(mut self, identity_token: IdentityToken) -> Self {
167        self.inner.user_identity_token = identity_token;
168        self
169    }
170
171    /// Set an initial session ID. The session will try to reactivate this session
172    /// before creating a new session. This can be useful to persist session IDs
173    /// between program executions, to avoid having to recreate subscriptions.
174    pub fn session_id(mut self, session_id: NodeId) -> Self {
175        self.inner.session_id = Some(session_id);
176        self
177    }
178
179    /// Add an initial type loader to the session. You can add more of these later.
180    /// Note that custom type loaders will likely not work until namespaces
181    /// are fetched from the server.
182    pub fn type_loader(mut self, type_loader: Arc<dyn TypeLoader>) -> Self {
183        self.inner.type_loaders.push(type_loader);
184        self
185    }
186
187    fn endpoint_supports_token(&self, endpoint: &EndpointDescription) -> bool {
188        match &self.inner.user_identity_token {
189            IdentityToken::Anonymous => {
190                endpoint.user_identity_tokens.is_none()
191                    || endpoint
192                        .user_identity_tokens
193                        .as_ref()
194                        .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::Anonymous))
195            }
196            IdentityToken::UserName(_, _) => endpoint
197                .user_identity_tokens
198                .as_ref()
199                .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::UserName)),
200            IdentityToken::X509(_, _) => endpoint
201                .user_identity_tokens
202                .as_ref()
203                .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::Certificate)),
204            IdentityToken::IssuedToken(_) => endpoint
205                .user_identity_tokens
206                .as_ref()
207                .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::IssuedToken)),
208        }
209    }
210
211    /// Set the connection source to use. This is used to create the transport
212    /// connector. Defaults to a direct TCP connection, implemented by `()`.
213    pub fn with_connector<CS>(self, connection_source: CS) -> SessionBuilder<'a, T, R, CS>
214    where
215        CS: ConnectionSource,
216    {
217        SessionBuilder {
218            inner: self.inner,
219            endpoint: self.endpoint,
220            config: self.config,
221            endpoints: self.endpoints,
222            connection_source,
223        }
224    }
225}
226
227impl<'a, C> SessionBuilder<'a, (), Vec<EndpointDescription>, C> {
228    /// Connect to an endpoint matching the given endpoint description.
229    pub fn connect_to_matching_endpoint(
230        self,
231        endpoint: impl Into<EndpointDescription>,
232    ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
233        let endpoint = endpoint.into();
234
235        let security_policy = SecurityPolicy::from_str(endpoint.security_policy_uri.as_ref())
236            .map_err(|_| {
237                Error::new(
238                    StatusCode::BadSecurityPolicyRejected,
239                    format!(
240                        "Invalid security policy: {}",
241                        endpoint.security_policy_uri.as_ref()
242                    ),
243                )
244            })?;
245        let server_endpoint = Client::find_matching_endpoint(
246            &self.endpoints,
247            endpoint.endpoint_url.as_ref(),
248            security_policy,
249            endpoint.security_mode,
250        )
251        .ok_or(Error::new(
252            StatusCode::BadTcpEndpointUrlInvalid,
253            format!(
254                "Cannot find matching endpoint for {}",
255                endpoint.endpoint_url.as_ref()
256            ),
257        ))?;
258
259        Ok(SessionBuilder {
260            inner: self.inner,
261            endpoint: server_endpoint,
262            config: self.config,
263            endpoints: self.endpoints,
264            connection_source: self.connection_source,
265        })
266    }
267
268    /// Connect to the configured default endpoint, this will use the user identity token configured in the
269    /// default endpoint.
270    pub fn connect_to_default_endpoint(
271        mut self,
272    ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
273        let default_endpoint_id = self.config.default_endpoint.clone();
274        let endpoint = if default_endpoint_id.is_empty() {
275            return Err(Error::new(
276                StatusCode::BadConfigurationError,
277                "No default endpoint has been specified",
278            ));
279        } else if let Some(endpoint) = self.config.endpoints.get(&default_endpoint_id) {
280            endpoint.clone()
281        } else {
282            return Err(Error::new(
283                StatusCode::BadInvalidArgument,
284                format!("Cannot find default endpoint with id {default_endpoint_id}"),
285            ));
286        };
287        let user_identity_token = self.config.client_identity_token(&endpoint.user_token_id)?;
288        let endpoint = self
289            .config
290            .endpoint_description_for_client_endpoint(&endpoint, &self.endpoints)?;
291        self.inner.user_identity_token = user_identity_token;
292        Ok(SessionBuilder {
293            inner: self.inner,
294            endpoint,
295            config: self.config,
296            endpoints: self.endpoints,
297            connection_source: self.connection_source,
298        })
299    }
300
301    /// Connect to the configured endpoint with the given id, this will use the user identity token configured in the
302    /// configured endpoint.
303    pub fn connect_to_endpoint_id(
304        mut self,
305        endpoint_id: impl Into<String>,
306    ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
307        let endpoint_id = endpoint_id.into();
308        let endpoint = self.config.endpoints.get(&endpoint_id).ok_or_else(|| {
309            Error::new(
310                StatusCode::BadInvalidArgument,
311                format!("Cannot find endpoint with id {endpoint_id}"),
312            )
313        })?;
314        let user_identity_token = self.config.client_identity_token(&endpoint.user_token_id)?;
315
316        let endpoint = self
317            .config
318            .endpoint_description_for_client_endpoint(endpoint, &self.endpoints)?;
319        self.inner.user_identity_token = user_identity_token;
320        Ok(SessionBuilder {
321            inner: self.inner,
322            endpoint,
323            config: self.config,
324            endpoints: self.endpoints,
325            connection_source: self.connection_source,
326        })
327    }
328
329    /// Attempt to pick the "best" endpoint. If `secure` is `false` this means
330    /// any unencrypted endpoint that supports the configured identity token.
331    /// If `secure` is `true`, the endpoint that supports the configured identity token with the highest
332    /// `securityLevel`.
333    pub fn connect_to_best_endpoint(
334        self,
335        secure: bool,
336    ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
337        let endpoint = if secure {
338            self.endpoints
339                .iter()
340                .filter(|e| self.endpoint_supports_token(e))
341                .max_by(|a, b| a.security_level.cmp(&b.security_level))
342        } else {
343            self.endpoints.iter().find(|e| {
344                e.security_mode == MessageSecurityMode::None && self.endpoint_supports_token(e)
345            })
346        };
347        let Some(endpoint) = endpoint else {
348            return Err(Error::new(
349                StatusCode::BadInvalidArgument,
350                "No suitable endpoint found",
351            ));
352        };
353        Ok(SessionBuilder {
354            inner: self.inner,
355            endpoint: endpoint.clone(),
356            config: self.config,
357            endpoints: self.endpoints,
358            connection_source: self.connection_source,
359        })
360    }
361}
362
363impl<'a, R, C> SessionBuilder<'a, (), R, C> {
364    /// Connect directly to an endpoint description, this does not require you to list
365    /// endpoints on the server first.
366    pub fn connect_to_endpoint_directly(
367        self,
368        endpoint: impl Into<EndpointDescription>,
369    ) -> Result<SessionBuilder<'a, EndpointDescription, R, C>, Error> {
370        let endpoint = endpoint.into();
371        if !is_opc_ua_binary_url(endpoint.endpoint_url.as_ref()) {
372            return Err(Error::new(
373                StatusCode::BadTcpEndpointUrlInvalid,
374                format!(
375                    "Endpoint url {} is not a valid / supported url",
376                    endpoint.endpoint_url
377                ),
378            ));
379        }
380        Ok(SessionBuilder {
381            endpoint,
382            config: self.config,
383            endpoints: self.endpoints,
384            inner: self.inner,
385            connection_source: self.connection_source,
386        })
387    }
388}
389
390type ResultEventLoop<C> =
391    SessionEventLoop<<<C as ConnectionSource>::Builder as ConnectorBuilder>::ConnectorType>;
392
393impl<R, C> SessionBuilder<'_, EndpointDescription, R, C>
394where
395    C: ConnectionSource,
396{
397    /// Build the session and session event loop. Note that you will need to
398    /// start polling the event loop before a connection is actually established.
399    pub fn build(
400        self,
401        certificate_store: Arc<RwLock<CertificateStore>>,
402    ) -> Result<(Arc<Session>, ResultEventLoop<C>), Error> {
403        let connector = self
404            .connection_source
405            .get_connector(&self.endpoint)?
406            .build()?;
407        let ctx = self.make_encoding_context();
408        Ok(Session::new(
409            Self::build_channel_inner(
410                certificate_store,
411                self.inner.user_identity_token,
412                self.endpoint,
413                self.config,
414                ctx,
415            ),
416            self.config.session_name.clone().into(),
417            self.config.application_description(),
418            self.config.session_retry_policy(),
419            self.config.decoding_options.as_comms_decoding_options(),
420            self.config,
421            self.inner.session_id,
422            connector,
423        ))
424    }
425
426    fn make_encoding_context(&self) -> ContextOwned {
427        let mut encoding_context = ContextOwned::new_default(
428            NamespaceMap::new(),
429            self.config.decoding_options.as_comms_decoding_options(),
430        );
431
432        for loader in self.inner.type_loaders.iter().cloned() {
433            encoding_context.loaders_mut().add(loader);
434        }
435
436        encoding_context
437    }
438
439    fn build_channel_inner(
440        certificate_store: Arc<RwLock<CertificateStore>>,
441        identity_token: IdentityToken,
442        endpoint: EndpointDescription,
443        config: &ClientConfig,
444        ctx: ContextOwned,
445    ) -> AsyncSecureChannel {
446        AsyncSecureChannel::new(
447            certificate_store,
448            EndpointInfo {
449                endpoint,
450                user_identity_token: identity_token,
451                preferred_locales: config.preferred_locales.clone(),
452            },
453            config.session_retry_policy(),
454            config.performance.ignore_clock_skew,
455            Arc::default(),
456            TransportConfiguration {
457                send_buffer_size: config.decoding_options.max_chunk_size,
458                recv_buffer_size: config.decoding_options.max_incoming_chunk_size,
459                max_message_size: config.decoding_options.max_message_size,
460                max_chunk_count: config.decoding_options.max_chunk_count,
461            },
462            config.channel_lifetime,
463            Arc::new(RwLock::new(ctx)),
464        )
465    }
466
467    /// Build a channel only, not creating a session.
468    /// This is useful if you want to manage the session lifetime yourself.
469    pub fn build_channel(
470        self,
471        certificate_store: Arc<RwLock<CertificateStore>>,
472    ) -> Result<AsyncSecureChannel, Error> {
473        let ctx = self.make_encoding_context();
474        Ok(Self::build_channel_inner(
475            certificate_store,
476            self.inner.user_identity_token,
477            self.endpoint,
478            self.config,
479            ctx,
480        ))
481    }
482}