1use std::{net::SocketAddr, str::FromStr, sync::Arc};
2
3use opcua_core::{comms::url::is_opc_ua_binary_url, config::Config, sync::RwLock};
4use opcua_crypto::{CertificateStore, SecurityPolicy};
5use opcua_types::{
6 ContextOwned, EndpointDescription, Error, MessageSecurityMode, NamespaceMap, NodeId,
7 StatusCode, TypeLoader, UserTokenType,
8};
9use tokio::net::TcpListener;
10
11use crate::{
12 reverse_connect::TcpConnectorReceiver,
13 transport::{
14 tcp::TransportConfiguration, Connector, ConnectorBuilder, ReverseHelloVerifier,
15 ReverseTcpConnector,
16 },
17 AsyncSecureChannel, ClientConfig, IdentityToken,
18};
19
20use super::{Client, EndpointInfo, Session, SessionEventLoop};
21
22struct SessionBuilderInner {
23 session_id: Option<NodeId>,
24 user_identity_token: IdentityToken,
25 type_loaders: Vec<Arc<dyn TypeLoader>>,
26}
27
28pub trait ConnectionSource {
44 type Builder: ConnectorBuilder;
46
47 fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error>;
49}
50
51pub struct DirectConnectionSource;
55
56impl ConnectionSource for DirectConnectionSource {
57 type Builder = String;
58 fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error> {
59 Ok(endpoint.endpoint_url.as_ref().to_string())
60 }
61}
62
63pub struct ReverseConnectionSource {
66 listener: TcpConnectorReceiver,
67 verifier: Option<Arc<dyn ReverseHelloVerifier + Send + Sync>>,
68}
69
70impl ReverseConnectionSource {
71 pub fn new_listener(listener: Arc<TcpListener>) -> Self {
73 Self {
74 listener: TcpConnectorReceiver::Listener(listener),
75 verifier: None,
76 }
77 }
78
79 pub fn new_address(address: SocketAddr) -> Self {
81 Self {
82 listener: TcpConnectorReceiver::Address(address),
83 verifier: None,
84 }
85 }
86
87 pub fn with_verifier(
91 mut self,
92 verifier: impl ReverseHelloVerifier + Send + Sync + 'static,
93 ) -> Self {
94 self.verifier = Some(Arc::new(verifier));
95 self
96 }
97}
98
99impl ConnectionSource for ReverseConnectionSource {
100 type Builder = ReverseTcpConnector;
101
102 fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error> {
103 if let Some(verifier) = self.verifier.clone() {
104 Ok(ReverseTcpConnector::new(
105 self.listener.clone(),
106 verifier,
107 endpoint.clone(),
108 ))
109 } else {
110 Ok(ReverseTcpConnector::new_default(
111 endpoint.clone(),
112 self.listener.clone(),
113 ))
114 }
115 }
116}
117
118pub struct SessionBuilder<'a, T = (), R = (), C = DirectConnectionSource> {
123 endpoint: T,
124 config: &'a ClientConfig,
125 endpoints: R,
126 inner: SessionBuilderInner,
127 connection_source: C,
128}
129
130impl<'a> SessionBuilder<'a, (), (), DirectConnectionSource> {
131 pub fn new(config: &'a ClientConfig) -> Self {
133 Self {
134 endpoint: (),
135 config,
136 endpoints: (),
137 inner: SessionBuilderInner {
138 session_id: None,
139 user_identity_token: IdentityToken::Anonymous,
140 type_loaders: Vec::new(),
141 },
142 connection_source: DirectConnectionSource,
143 }
144 }
145}
146
147impl<'a, T, C> SessionBuilder<'a, T, (), C> {
148 pub fn with_endpoints(
152 self,
153 endpoints: Vec<EndpointDescription>,
154 ) -> SessionBuilder<'a, T, Vec<EndpointDescription>, C> {
155 SessionBuilder {
156 inner: self.inner,
157 endpoint: self.endpoint,
158 config: self.config,
159 endpoints,
160 connection_source: self.connection_source,
161 }
162 }
163}
164
165impl<'a, T, R, C> SessionBuilder<'a, T, R, C> {
166 pub fn user_identity_token(mut self, identity_token: IdentityToken) -> Self {
168 self.inner.user_identity_token = identity_token;
169 self
170 }
171
172 pub fn session_id(mut self, session_id: NodeId) -> Self {
176 self.inner.session_id = Some(session_id);
177 self
178 }
179
180 pub fn type_loader(mut self, type_loader: Arc<dyn TypeLoader>) -> Self {
184 self.inner.type_loaders.push(type_loader);
185 self
186 }
187
188 fn endpoint_supports_token(&self, endpoint: &EndpointDescription) -> bool {
189 match &self.inner.user_identity_token {
190 IdentityToken::Anonymous => {
191 endpoint.user_identity_tokens.is_none()
192 || endpoint
193 .user_identity_tokens
194 .as_ref()
195 .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::Anonymous))
196 }
197 IdentityToken::UserName(_, _) => endpoint
198 .user_identity_tokens
199 .as_ref()
200 .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::UserName)),
201 IdentityToken::X509(_, _) => endpoint
202 .user_identity_tokens
203 .as_ref()
204 .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::Certificate)),
205 IdentityToken::IssuedToken(_) => endpoint
206 .user_identity_tokens
207 .as_ref()
208 .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::IssuedToken)),
209 }
210 }
211
212 pub fn with_connector<CS>(self, connection_source: CS) -> SessionBuilder<'a, T, R, CS>
215 where
216 CS: ConnectionSource,
217 {
218 SessionBuilder {
219 inner: self.inner,
220 endpoint: self.endpoint,
221 config: self.config,
222 endpoints: self.endpoints,
223 connection_source,
224 }
225 }
226}
227
228impl<'a, C> SessionBuilder<'a, (), Vec<EndpointDescription>, C> {
229 pub fn connect_to_matching_endpoint(
231 self,
232 endpoint: impl Into<EndpointDescription>,
233 ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
234 let endpoint = endpoint.into();
235
236 let security_policy = SecurityPolicy::from_str(endpoint.security_policy_uri.as_ref())
237 .map_err(|_| {
238 Error::new(
239 StatusCode::BadSecurityPolicyRejected,
240 format!(
241 "Invalid security policy: {}",
242 endpoint.security_policy_uri.as_ref()
243 ),
244 )
245 })?;
246 let server_endpoint = Client::find_matching_endpoint(
247 &self.endpoints,
248 endpoint.endpoint_url.as_ref(),
249 security_policy,
250 endpoint.security_mode,
251 )
252 .ok_or(Error::new(
253 StatusCode::BadTcpEndpointUrlInvalid,
254 format!(
255 "Cannot find matching endpoint for {}",
256 endpoint.endpoint_url.as_ref()
257 ),
258 ))?;
259
260 Ok(SessionBuilder {
261 inner: self.inner,
262 endpoint: server_endpoint,
263 config: self.config,
264 endpoints: self.endpoints,
265 connection_source: self.connection_source,
266 })
267 }
268
269 pub fn connect_to_default_endpoint(
272 mut self,
273 ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
274 let default_endpoint_id = self.config.default_endpoint.clone();
275 let endpoint = if default_endpoint_id.is_empty() {
276 return Err(Error::new(
277 StatusCode::BadConfigurationError,
278 "No default endpoint has been specified",
279 ));
280 } else if let Some(endpoint) = self.config.endpoints.get(&default_endpoint_id) {
281 endpoint.clone()
282 } else {
283 return Err(Error::new(
284 StatusCode::BadInvalidArgument,
285 format!("Cannot find default endpoint with id {default_endpoint_id}"),
286 ));
287 };
288 let user_identity_token = self.config.client_identity_token(&endpoint.user_token_id)?;
289 let endpoint = self
290 .config
291 .endpoint_description_for_client_endpoint(&endpoint, &self.endpoints)?;
292 self.inner.user_identity_token = user_identity_token;
293 Ok(SessionBuilder {
294 inner: self.inner,
295 endpoint,
296 config: self.config,
297 endpoints: self.endpoints,
298 connection_source: self.connection_source,
299 })
300 }
301
302 pub fn connect_to_endpoint_id(
305 mut self,
306 endpoint_id: impl Into<String>,
307 ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
308 let endpoint_id = endpoint_id.into();
309 let endpoint = self.config.endpoints.get(&endpoint_id).ok_or_else(|| {
310 Error::new(
311 StatusCode::BadInvalidArgument,
312 format!("Cannot find endpoint with id {endpoint_id}"),
313 )
314 })?;
315 let user_identity_token = self.config.client_identity_token(&endpoint.user_token_id)?;
316
317 let endpoint = self
318 .config
319 .endpoint_description_for_client_endpoint(endpoint, &self.endpoints)?;
320 self.inner.user_identity_token = user_identity_token;
321 Ok(SessionBuilder {
322 inner: self.inner,
323 endpoint,
324 config: self.config,
325 endpoints: self.endpoints,
326 connection_source: self.connection_source,
327 })
328 }
329
330 pub fn connect_to_best_endpoint(
335 self,
336 secure: bool,
337 ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
338 let endpoint = if secure {
339 self.endpoints
340 .iter()
341 .filter(|e| self.endpoint_supports_token(e))
342 .max_by(|a, b| a.security_level.cmp(&b.security_level))
343 } else {
344 self.endpoints.iter().find(|e| {
345 e.security_mode == MessageSecurityMode::None && self.endpoint_supports_token(e)
346 })
347 };
348 let Some(endpoint) = endpoint else {
349 return Err(Error::new(
350 StatusCode::BadInvalidArgument,
351 "No suitable endpoint found",
352 ));
353 };
354 Ok(SessionBuilder {
355 inner: self.inner,
356 endpoint: endpoint.clone(),
357 config: self.config,
358 endpoints: self.endpoints,
359 connection_source: self.connection_source,
360 })
361 }
362}
363
364impl<'a, R, C> SessionBuilder<'a, (), R, C> {
365 pub fn connect_to_endpoint_directly(
368 self,
369 endpoint: impl Into<EndpointDescription>,
370 ) -> Result<SessionBuilder<'a, EndpointDescription, R, C>, Error> {
371 let endpoint = endpoint.into();
372 if !is_opc_ua_binary_url(endpoint.endpoint_url.as_ref()) {
373 return Err(Error::new(
374 StatusCode::BadTcpEndpointUrlInvalid,
375 format!(
376 "Endpoint url {} is not a valid / supported url",
377 endpoint.endpoint_url
378 ),
379 ));
380 }
381 Ok(SessionBuilder {
382 endpoint,
383 config: self.config,
384 endpoints: self.endpoints,
385 inner: self.inner,
386 connection_source: self.connection_source,
387 })
388 }
389}
390
391impl<R, C> SessionBuilder<'_, EndpointDescription, R, C>
392where
393 C: ConnectionSource,
394{
395 pub fn build(
398 self,
399 certificate_store: Arc<RwLock<CertificateStore>>,
400 ) -> Result<(Arc<Session>, SessionEventLoop), Error> {
401 let connector = self
402 .connection_source
403 .get_connector(&self.endpoint)?
404 .build()?;
405 let ctx = self.make_encoding_context();
406 Ok(Session::new(
407 Self::build_channel_inner(
408 certificate_store,
409 self.inner.user_identity_token,
410 self.endpoint,
411 self.config,
412 connector,
413 ctx,
414 ),
415 self.config.session_name.clone().into(),
416 self.config.application_description(),
417 self.config.session_retry_policy(),
418 self.config.decoding_options.as_comms_decoding_options(),
419 self.config,
420 self.inner.session_id,
421 ))
422 }
423
424 fn make_encoding_context(&self) -> ContextOwned {
425 let mut encoding_context = ContextOwned::new_default(
426 NamespaceMap::new(),
427 self.config.decoding_options.as_comms_decoding_options(),
428 );
429
430 for loader in self.inner.type_loaders.iter().cloned() {
431 encoding_context.loaders_mut().add(loader);
432 }
433
434 encoding_context
435 }
436
437 fn build_channel_inner(
438 certificate_store: Arc<RwLock<CertificateStore>>,
439 identity_token: IdentityToken,
440 endpoint: EndpointDescription,
441 config: &ClientConfig,
442 connector: Box<dyn Connector + Send + Sync + 'static>,
443 ctx: ContextOwned,
444 ) -> AsyncSecureChannel {
445 AsyncSecureChannel::new(
446 certificate_store,
447 EndpointInfo {
448 endpoint,
449 user_identity_token: identity_token,
450 preferred_locales: config.preferred_locales.clone(),
451 },
452 config.session_retry_policy(),
453 config.performance.ignore_clock_skew,
454 Arc::default(),
455 TransportConfiguration {
456 send_buffer_size: config.decoding_options.max_chunk_size,
457 recv_buffer_size: config.decoding_options.max_incoming_chunk_size,
458 max_message_size: config.decoding_options.max_message_size,
459 max_chunk_count: config.decoding_options.max_chunk_count,
460 },
461 connector,
462 config.channel_lifetime,
463 Arc::new(RwLock::new(ctx)),
464 )
465 }
466
467 pub fn build_channel(
470 self,
471 certificate_store: Arc<RwLock<CertificateStore>>,
472 ) -> Result<AsyncSecureChannel, Error> {
473 let ctx = self.make_encoding_context();
474 let connector = self
475 .connection_source
476 .get_connector(&self.endpoint)?
477 .build()?;
478 Ok(Self::build_channel_inner(
479 certificate_store,
480 self.inner.user_identity_token,
481 self.endpoint,
482 self.config,
483 connector,
484 ctx,
485 ))
486 }
487}