1use std::{net::SocketAddr, str::FromStr, sync::Arc};
2
3use opcua_core::{comms::url::is_opc_ua_binary_url, config::Config, sync::RwLock};
4use opcua_crypto::{CertificateStore, SecurityPolicy};
5use opcua_types::{
6 ContextOwned, EndpointDescription, Error, MessageSecurityMode, NamespaceMap, NodeId,
7 StatusCode, TypeLoader, UserTokenType,
8};
9use tokio::net::TcpListener;
10
11use crate::{
12 reverse_connect::TcpConnectorReceiver,
13 transport::{
14 tcp::TransportConfiguration, ConnectorBuilder, ReverseHelloVerifier, ReverseTcpConnector,
15 },
16 AsyncSecureChannel, ClientConfig, IdentityToken,
17};
18
19use super::{Client, EndpointInfo, Session, SessionEventLoop};
20
21struct SessionBuilderInner {
22 session_id: Option<NodeId>,
23 user_identity_token: IdentityToken,
24 type_loaders: Vec<Arc<dyn TypeLoader>>,
25}
26
27pub trait ConnectionSource {
43 type Builder: ConnectorBuilder;
45
46 fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error>;
48}
49
50pub struct DirectConnectionSource;
54
55impl ConnectionSource for DirectConnectionSource {
56 type Builder = String;
57 fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error> {
58 Ok(endpoint.endpoint_url.as_ref().to_string())
59 }
60}
61
62pub struct ReverseConnectionSource {
65 listener: TcpConnectorReceiver,
66 verifier: Option<Arc<dyn ReverseHelloVerifier + Send + Sync>>,
67}
68
69impl ReverseConnectionSource {
70 pub fn new_listener(listener: Arc<TcpListener>) -> Self {
72 Self {
73 listener: TcpConnectorReceiver::Listener(listener),
74 verifier: None,
75 }
76 }
77
78 pub fn new_address(address: SocketAddr) -> Self {
80 Self {
81 listener: TcpConnectorReceiver::Address(address),
82 verifier: None,
83 }
84 }
85
86 pub fn with_verifier(
90 mut self,
91 verifier: impl ReverseHelloVerifier + Send + Sync + 'static,
92 ) -> Self {
93 self.verifier = Some(Arc::new(verifier));
94 self
95 }
96}
97
98impl ConnectionSource for ReverseConnectionSource {
99 type Builder = ReverseTcpConnector;
100
101 fn get_connector(&self, endpoint: &EndpointDescription) -> Result<Self::Builder, Error> {
102 if let Some(verifier) = self.verifier.clone() {
103 Ok(ReverseTcpConnector::new(
104 self.listener.clone(),
105 verifier,
106 endpoint.clone(),
107 ))
108 } else {
109 Ok(ReverseTcpConnector::new_default(
110 endpoint.clone(),
111 self.listener.clone(),
112 ))
113 }
114 }
115}
116
117pub struct SessionBuilder<'a, T = (), R = (), C = DirectConnectionSource> {
122 endpoint: T,
123 config: &'a ClientConfig,
124 endpoints: R,
125 inner: SessionBuilderInner,
126 connection_source: C,
127}
128
129impl<'a> SessionBuilder<'a, (), (), DirectConnectionSource> {
130 pub fn new(config: &'a ClientConfig) -> Self {
132 Self {
133 endpoint: (),
134 config,
135 endpoints: (),
136 inner: SessionBuilderInner {
137 session_id: None,
138 user_identity_token: IdentityToken::Anonymous,
139 type_loaders: Vec::new(),
140 },
141 connection_source: DirectConnectionSource,
142 }
143 }
144}
145
146impl<'a, T, C> SessionBuilder<'a, T, (), C> {
147 pub fn with_endpoints(
151 self,
152 endpoints: Vec<EndpointDescription>,
153 ) -> SessionBuilder<'a, T, Vec<EndpointDescription>, C> {
154 SessionBuilder {
155 inner: self.inner,
156 endpoint: self.endpoint,
157 config: self.config,
158 endpoints,
159 connection_source: self.connection_source,
160 }
161 }
162}
163
164impl<'a, T, R, C> SessionBuilder<'a, T, R, C> {
165 pub fn user_identity_token(mut self, identity_token: IdentityToken) -> Self {
167 self.inner.user_identity_token = identity_token;
168 self
169 }
170
171 pub fn session_id(mut self, session_id: NodeId) -> Self {
175 self.inner.session_id = Some(session_id);
176 self
177 }
178
179 pub fn type_loader(mut self, type_loader: Arc<dyn TypeLoader>) -> Self {
183 self.inner.type_loaders.push(type_loader);
184 self
185 }
186
187 fn endpoint_supports_token(&self, endpoint: &EndpointDescription) -> bool {
188 match &self.inner.user_identity_token {
189 IdentityToken::Anonymous => {
190 endpoint.user_identity_tokens.is_none()
191 || endpoint
192 .user_identity_tokens
193 .as_ref()
194 .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::Anonymous))
195 }
196 IdentityToken::UserName(_, _) => endpoint
197 .user_identity_tokens
198 .as_ref()
199 .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::UserName)),
200 IdentityToken::X509(_, _) => endpoint
201 .user_identity_tokens
202 .as_ref()
203 .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::Certificate)),
204 IdentityToken::IssuedToken(_) => endpoint
205 .user_identity_tokens
206 .as_ref()
207 .is_some_and(|e| e.iter().any(|p| p.token_type == UserTokenType::IssuedToken)),
208 }
209 }
210
211 pub fn with_connector<CS>(self, connection_source: CS) -> SessionBuilder<'a, T, R, CS>
214 where
215 CS: ConnectionSource,
216 {
217 SessionBuilder {
218 inner: self.inner,
219 endpoint: self.endpoint,
220 config: self.config,
221 endpoints: self.endpoints,
222 connection_source,
223 }
224 }
225}
226
227impl<'a, C> SessionBuilder<'a, (), Vec<EndpointDescription>, C> {
228 pub fn connect_to_matching_endpoint(
230 self,
231 endpoint: impl Into<EndpointDescription>,
232 ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
233 let endpoint = endpoint.into();
234
235 let security_policy = SecurityPolicy::from_str(endpoint.security_policy_uri.as_ref())
236 .map_err(|_| {
237 Error::new(
238 StatusCode::BadSecurityPolicyRejected,
239 format!(
240 "Invalid security policy: {}",
241 endpoint.security_policy_uri.as_ref()
242 ),
243 )
244 })?;
245 let server_endpoint = Client::find_matching_endpoint(
246 &self.endpoints,
247 endpoint.endpoint_url.as_ref(),
248 security_policy,
249 endpoint.security_mode,
250 )
251 .ok_or(Error::new(
252 StatusCode::BadTcpEndpointUrlInvalid,
253 format!(
254 "Cannot find matching endpoint for {}",
255 endpoint.endpoint_url.as_ref()
256 ),
257 ))?;
258
259 Ok(SessionBuilder {
260 inner: self.inner,
261 endpoint: server_endpoint,
262 config: self.config,
263 endpoints: self.endpoints,
264 connection_source: self.connection_source,
265 })
266 }
267
268 pub fn connect_to_default_endpoint(
271 mut self,
272 ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
273 let default_endpoint_id = self.config.default_endpoint.clone();
274 let endpoint = if default_endpoint_id.is_empty() {
275 return Err(Error::new(
276 StatusCode::BadConfigurationError,
277 "No default endpoint has been specified",
278 ));
279 } else if let Some(endpoint) = self.config.endpoints.get(&default_endpoint_id) {
280 endpoint.clone()
281 } else {
282 return Err(Error::new(
283 StatusCode::BadInvalidArgument,
284 format!("Cannot find default endpoint with id {default_endpoint_id}"),
285 ));
286 };
287 let user_identity_token = self.config.client_identity_token(&endpoint.user_token_id)?;
288 let endpoint = self
289 .config
290 .endpoint_description_for_client_endpoint(&endpoint, &self.endpoints)?;
291 self.inner.user_identity_token = user_identity_token;
292 Ok(SessionBuilder {
293 inner: self.inner,
294 endpoint,
295 config: self.config,
296 endpoints: self.endpoints,
297 connection_source: self.connection_source,
298 })
299 }
300
301 pub fn connect_to_endpoint_id(
304 mut self,
305 endpoint_id: impl Into<String>,
306 ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
307 let endpoint_id = endpoint_id.into();
308 let endpoint = self.config.endpoints.get(&endpoint_id).ok_or_else(|| {
309 Error::new(
310 StatusCode::BadInvalidArgument,
311 format!("Cannot find endpoint with id {endpoint_id}"),
312 )
313 })?;
314 let user_identity_token = self.config.client_identity_token(&endpoint.user_token_id)?;
315
316 let endpoint = self
317 .config
318 .endpoint_description_for_client_endpoint(endpoint, &self.endpoints)?;
319 self.inner.user_identity_token = user_identity_token;
320 Ok(SessionBuilder {
321 inner: self.inner,
322 endpoint,
323 config: self.config,
324 endpoints: self.endpoints,
325 connection_source: self.connection_source,
326 })
327 }
328
329 pub fn connect_to_best_endpoint(
334 self,
335 secure: bool,
336 ) -> Result<SessionBuilder<'a, EndpointDescription, Vec<EndpointDescription>, C>, Error> {
337 let endpoint = if secure {
338 self.endpoints
339 .iter()
340 .filter(|e| self.endpoint_supports_token(e))
341 .max_by(|a, b| a.security_level.cmp(&b.security_level))
342 } else {
343 self.endpoints.iter().find(|e| {
344 e.security_mode == MessageSecurityMode::None && self.endpoint_supports_token(e)
345 })
346 };
347 let Some(endpoint) = endpoint else {
348 return Err(Error::new(
349 StatusCode::BadInvalidArgument,
350 "No suitable endpoint found",
351 ));
352 };
353 Ok(SessionBuilder {
354 inner: self.inner,
355 endpoint: endpoint.clone(),
356 config: self.config,
357 endpoints: self.endpoints,
358 connection_source: self.connection_source,
359 })
360 }
361}
362
363impl<'a, R, C> SessionBuilder<'a, (), R, C> {
364 pub fn connect_to_endpoint_directly(
367 self,
368 endpoint: impl Into<EndpointDescription>,
369 ) -> Result<SessionBuilder<'a, EndpointDescription, R, C>, Error> {
370 let endpoint = endpoint.into();
371 if !is_opc_ua_binary_url(endpoint.endpoint_url.as_ref()) {
372 return Err(Error::new(
373 StatusCode::BadTcpEndpointUrlInvalid,
374 format!(
375 "Endpoint url {} is not a valid / supported url",
376 endpoint.endpoint_url
377 ),
378 ));
379 }
380 Ok(SessionBuilder {
381 endpoint,
382 config: self.config,
383 endpoints: self.endpoints,
384 inner: self.inner,
385 connection_source: self.connection_source,
386 })
387 }
388}
389
390type ResultEventLoop<C> =
391 SessionEventLoop<<<C as ConnectionSource>::Builder as ConnectorBuilder>::ConnectorType>;
392
393impl<R, C> SessionBuilder<'_, EndpointDescription, R, C>
394where
395 C: ConnectionSource,
396{
397 pub fn build(
400 self,
401 certificate_store: Arc<RwLock<CertificateStore>>,
402 ) -> Result<(Arc<Session>, ResultEventLoop<C>), Error> {
403 let connector = self
404 .connection_source
405 .get_connector(&self.endpoint)?
406 .build()?;
407 let ctx = self.make_encoding_context();
408 Ok(Session::new(
409 Self::build_channel_inner(
410 certificate_store,
411 self.inner.user_identity_token,
412 self.endpoint,
413 self.config,
414 ctx,
415 ),
416 self.config.session_name.clone().into(),
417 self.config.application_description(),
418 self.config.session_retry_policy(),
419 self.config.decoding_options.as_comms_decoding_options(),
420 self.config,
421 self.inner.session_id,
422 connector,
423 ))
424 }
425
426 fn make_encoding_context(&self) -> ContextOwned {
427 let mut encoding_context = ContextOwned::new_default(
428 NamespaceMap::new(),
429 self.config.decoding_options.as_comms_decoding_options(),
430 );
431
432 for loader in self.inner.type_loaders.iter().cloned() {
433 encoding_context.loaders_mut().add(loader);
434 }
435
436 encoding_context
437 }
438
439 fn build_channel_inner(
440 certificate_store: Arc<RwLock<CertificateStore>>,
441 identity_token: IdentityToken,
442 endpoint: EndpointDescription,
443 config: &ClientConfig,
444 ctx: ContextOwned,
445 ) -> AsyncSecureChannel {
446 AsyncSecureChannel::new(
447 certificate_store,
448 EndpointInfo {
449 endpoint,
450 user_identity_token: identity_token,
451 preferred_locales: config.preferred_locales.clone(),
452 },
453 config.session_retry_policy(),
454 config.performance.ignore_clock_skew,
455 Arc::default(),
456 TransportConfiguration {
457 send_buffer_size: config.decoding_options.max_chunk_size,
458 recv_buffer_size: config.decoding_options.max_incoming_chunk_size,
459 max_message_size: config.decoding_options.max_message_size,
460 max_chunk_count: config.decoding_options.max_chunk_count,
461 },
462 config.channel_lifetime,
463 Arc::new(RwLock::new(ctx)),
464 )
465 }
466
467 pub fn build_channel(
470 self,
471 certificate_store: Arc<RwLock<CertificateStore>>,
472 ) -> Result<AsyncSecureChannel, Error> {
473 let ctx = self.make_encoding_context();
474 Ok(Self::build_channel_inner(
475 certificate_store,
476 self.inner.user_identity_token,
477 self.endpoint,
478 self.config,
479 ctx,
480 ))
481 }
482}