1mod client;
2mod connect;
3mod connection;
4mod event_loop;
5mod request_builder;
6mod retry;
7mod services;
8
9#[derive(Debug, Clone)]
12pub struct EndpointInfo {
13 pub endpoint: EndpointDescription,
15 pub user_identity_token: IdentityToken,
17 pub preferred_locales: Vec<String>,
19}
20
21impl From<EndpointDescription> for EndpointInfo {
22 fn from(value: EndpointDescription) -> Self {
23 Self {
24 endpoint: value,
25 user_identity_token: IdentityToken::Anonymous,
26 preferred_locales: Vec::new(),
27 }
28 }
29}
30
31impl From<(EndpointDescription, IdentityToken)> for EndpointInfo {
32 fn from(value: (EndpointDescription, IdentityToken)) -> Self {
33 Self {
34 endpoint: value.0,
35 user_identity_token: value.1,
36 preferred_locales: Vec::new(),
37 }
38 }
39}
40
41use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
42use std::sync::Arc;
43use std::time::{Duration, Instant};
44
45use arc_swap::ArcSwap;
46pub use client::Client;
47pub use connect::SessionConnectMode;
48pub use connection::{
49 ConnectionSource, DirectConnectionSource, ReverseConnectionSource, SessionBuilder,
50};
51pub use event_loop::{SessionActivity, SessionEventLoop, SessionPollResult};
52use opcua_core::handle::AtomicHandle;
53use opcua_core::sync::{Mutex, RwLock};
54pub use request_builder::UARequest;
55pub use retry::{DefaultRetryPolicy, RequestRetryPolicy};
56pub use services::attributes::{
57 HistoryRead, HistoryReadAction, HistoryUpdate, HistoryUpdateAction, Read, Write,
58};
59pub use services::method::Call;
60pub use services::node_management::{AddNodes, AddReferences, DeleteNodes, DeleteReferences};
61pub use services::session::{ActivateSession, Cancel, CloseSession, CreateSession};
62use services::subscriptions::state::SubscriptionState;
63pub use services::subscriptions::{
64 CreateMonitoredItems, CreateSubscription, DataChangeCallback, DeleteMonitoredItems,
65 DeleteSubscriptions, EventCallback, ModifyMonitoredItems, ModifySubscription, MonitoredItem,
66 MonitoredItemMap, OnSubscriptionNotification, OnSubscriptionNotificationCore,
67 PreInsertMonitoredItems, Publish, PublishLimits, Republish, SetMonitoringMode,
68 SetPublishingMode, SetTriggering, Subscription, SubscriptionActivity, SubscriptionCache,
69 SubscriptionCallbacks, SubscriptionEventLoopState, TransferSubscriptions,
70};
71pub use services::view::{
72 Browse, BrowseNext, RegisterNodes, TranslateBrowsePaths, UnregisterNodes,
73};
74use tracing::{error, info};
75
76#[allow(unused)]
77macro_rules! session_warn {
78 ($session: expr, $($arg:tt)*) => {
79 tracing::warn!("session:{} {}", $session.session_id(), format!($($arg)*));
80 }
81}
82#[allow(unused)]
83pub(crate) use session_warn;
84
85#[allow(unused)]
86macro_rules! session_error {
87 ($session: expr, $($arg:tt)*) => {
88 tracing::error!("session:{} {}", $session.session_id(), format!($($arg)*));
89 }
90}
91#[allow(unused)]
92pub(crate) use session_error;
93
94#[allow(unused)]
95macro_rules! session_debug {
96 ($session: expr, $($arg:tt)*) => {
97 tracing::debug!("session:{} {}", $session.session_id(), format!($($arg)*));
98 }
99}
100#[allow(unused)]
101pub(crate) use session_debug;
102
103#[allow(unused)]
104macro_rules! session_trace {
105 ($session: expr, $($arg:tt)*) => {
106 tracing::trace!("session:{} {}", $session.session_id(), format!($($arg)*));
107 }
108}
109#[allow(unused)]
110pub(crate) use session_trace;
111
112use opcua_core::ResponseMessage;
113use opcua_types::{
114 ApplicationDescription, ContextOwned, DecodingOptions, EndpointDescription, Error, IntegerId,
115 NamespaceMap, NodeId, ReadValueId, RequestHeader, ResponseHeader, StatusCode,
116 TimestampsToReturn, TypeLoader, UAString, VariableId, Variant,
117};
118
119use crate::browser::Browser;
120use crate::transport::Connector;
121use crate::{AsyncSecureChannel, ClientConfig, ExponentialBackoff, SessionRetryPolicy};
122
123use super::IdentityToken;
124
125pub(crate) fn process_service_result(response_header: &ResponseHeader) -> Result<(), StatusCode> {
128 if response_header.service_result.is_bad() {
129 info!(
130 "Received a bad service result {} from the request",
131 response_header.service_result
132 );
133 Err(response_header.service_result)
134 } else {
135 Ok(())
136 }
137}
138
139pub(crate) fn process_unexpected_response(response: ResponseMessage) -> StatusCode {
140 match response {
141 ResponseMessage::ServiceFault(service_fault) => {
142 error!(
143 "Received a service fault of {} for the request",
144 service_fault.response_header.service_result
145 );
146 service_fault.response_header.service_result
147 }
148 _ => {
149 error!("Received an unexpected response to the request");
150 StatusCode::BadUnknownResponse
151 }
152 }
153}
154
155#[derive(Clone, Copy)]
156pub(super) enum SessionState {
157 Disconnected,
158 Connected,
159 Connecting,
160}
161
162static NEXT_SESSION_ID: AtomicU32 = AtomicU32::new(1);
163
164pub struct Session {
171 pub(super) channel: AsyncSecureChannel,
172 pub(super) state_watch_rx: tokio::sync::watch::Receiver<SessionState>,
173 pub(super) state_watch_tx: tokio::sync::watch::Sender<SessionState>,
174 pub(super) session_id: Arc<ArcSwap<NodeId>>,
175 pub(super) internal_session_id: AtomicU32,
176 pub(super) session_name: UAString,
177 pub(super) application_description: ApplicationDescription,
178 pub(super) request_timeout: Duration,
179 pub(super) publish_timeout: Duration,
180 pub(super) recreate_monitored_items_chunk: usize,
181 pub(super) recreate_subscriptions: bool,
182 pub(super) should_reconnect: AtomicBool,
183 pub(super) session_timeout: f64,
184 pub subscription_state: Mutex<SubscriptionState>,
186 pub(super) publish_limits_watch_rx: tokio::sync::watch::Receiver<PublishLimits>,
187 pub(super) publish_limits_watch_tx: tokio::sync::watch::Sender<PublishLimits>,
188 pub(super) monitored_item_handle: AtomicHandle,
189 pub(super) trigger_publish_tx: tokio::sync::watch::Sender<Instant>,
190 pub(super) session_nonce_length: usize,
191 decoding_options: DecodingOptions,
192}
193
194impl Session {
195 #[allow(clippy::too_many_arguments)]
196 pub(crate) fn new<T: Connector + Send + Sync + 'static>(
197 channel: AsyncSecureChannel,
198 session_name: UAString,
199 application_description: ApplicationDescription,
200 session_retry_policy: SessionRetryPolicy,
201 decoding_options: DecodingOptions,
202 config: &ClientConfig,
203 session_id: Option<NodeId>,
204 connector: T,
205 ) -> (Arc<Self>, SessionEventLoop<T>) {
206 let (publish_limits_watch_tx, publish_limits_watch_rx) =
207 tokio::sync::watch::channel(PublishLimits::new());
208 let (state_watch_tx, state_watch_rx) =
209 tokio::sync::watch::channel(SessionState::Disconnected);
210 let (trigger_publish_tx, trigger_publish_rx) = tokio::sync::watch::channel(Instant::now());
211
212 let session = Arc::new(Session {
213 channel,
214 internal_session_id: AtomicU32::new(NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed)),
215 state_watch_rx,
216 state_watch_tx,
217 session_id: Arc::new(ArcSwap::new(Arc::new(session_id.unwrap_or_default()))),
218 session_name,
219 application_description,
220 request_timeout: config.request_timeout,
221 session_timeout: config.session_timeout as f64,
222 publish_timeout: config.publish_timeout,
223 recreate_monitored_items_chunk: config.performance.recreate_monitored_items_chunk,
224 recreate_subscriptions: config.recreate_subscriptions,
225 should_reconnect: AtomicBool::new(true),
226 subscription_state: Mutex::new(SubscriptionState::new(
227 config.min_publish_interval,
228 publish_limits_watch_tx.clone(),
229 )),
230 monitored_item_handle: AtomicHandle::new(1000),
231 publish_limits_watch_rx,
232 publish_limits_watch_tx,
233 trigger_publish_tx,
234 session_nonce_length: config.session_nonce_length,
235 decoding_options,
236 });
237
238 (
239 session.clone(),
240 SessionEventLoop::new(
241 session,
242 session_retry_policy,
243 trigger_publish_rx,
244 config.keep_alive_interval,
245 config.max_failed_keep_alive_count,
246 connector,
247 ),
248 )
249 }
250
251 pub(super) fn make_request_header(&self) -> RequestHeader {
253 self.channel.make_request_header(self.request_timeout)
254 }
255
256 pub(crate) fn reset(&self) {
259 self.session_id.store(Arc::new(NodeId::null()));
260 self.internal_session_id.store(
261 NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed),
262 Ordering::Relaxed,
263 );
264 }
265
266 async fn wait_for_state(&self, connected: bool) -> bool {
268 let mut rx = self.state_watch_rx.clone();
269
270 let res = rx
271 .wait_for(|s| {
272 connected && matches!(*s, SessionState::Connected)
273 || !connected && matches!(*s, SessionState::Disconnected)
274 })
275 .await
276 .is_ok();
277
278 res
280 }
281
282 pub fn session_id(&self) -> u32 {
284 self.internal_session_id.load(Ordering::Relaxed)
285 }
286
287 pub fn server_session_id(&self) -> NodeId {
290 (**(*self.session_id).load()).clone()
291 }
292
293 pub async fn wait_for_connection(&self) -> bool {
297 self.wait_for_state(true).await
298 }
299
300 pub fn disable_reconnects(&self) {
304 self.should_reconnect.store(false, Ordering::Relaxed);
305 }
306
307 pub fn enable_reconnects(&self) {
310 self.should_reconnect.store(true, Ordering::Relaxed);
311 }
312
313 pub async fn disconnect_inner(
317 &self,
318 delete_subscriptions: bool,
319 disable_reconnect: bool,
320 ) -> Result<(), StatusCode> {
321 if disable_reconnect {
322 self.should_reconnect.store(false, Ordering::Relaxed);
323 }
324 let mut res = Ok(());
325 if let Err(e) = self.close_session(delete_subscriptions).await {
326 res = Err(e);
327 session_warn!(
328 self,
329 "Failed to close session, channel will be closed anyway: {e}"
330 );
331 }
332 self.channel.close_channel().await;
333
334 self.wait_for_state(false).await;
335
336 res
337 }
338
339 pub async fn disconnect(&self) -> Result<(), StatusCode> {
344 self.disconnect_inner(true, true).await
345 }
346
347 pub async fn disconnect_without_delete_subscriptions(&self) -> Result<(), StatusCode> {
349 self.disconnect_inner(false, true).await
350 }
351
352 pub fn decoding_options(&self) -> &DecodingOptions {
354 &self.decoding_options
355 }
356
357 pub fn channel(&self) -> &AsyncSecureChannel {
359 &self.channel
360 }
361
362 pub fn request_handle(&self) -> IntegerId {
364 self.channel.request_handle()
365 }
366
367 pub fn encoding_context(&self) -> &RwLock<ContextOwned> {
369 self.channel.encoding_context()
370 }
371
372 pub fn endpoint_info(&self) -> &EndpointInfo {
374 self.channel.endpoint_info()
375 }
376
377 pub fn set_namespaces(&self, namespaces: NamespaceMap) {
381 *self.encoding_context().write().namespaces_mut() = namespaces;
382 }
383
384 pub fn add_type_loader(&self, type_loader: Arc<dyn TypeLoader>) {
389 self.encoding_context()
390 .write()
391 .loaders_mut()
392 .add(type_loader);
393 }
394
395 pub fn context(&self) -> Arc<RwLock<ContextOwned>> {
397 self.channel.secure_channel.read().context_arc()
398 }
399
400 pub fn browser(&self) -> Browser<'_, (), DefaultRetryPolicy<'_>> {
405 Browser::new(
406 self,
407 (),
408 DefaultRetryPolicy::new(ExponentialBackoff::new(
409 Duration::from_secs(30),
410 Some(5),
411 Duration::from_millis(500),
412 )),
413 )
414 }
415
416 pub async fn read_namespace_array(&self) -> Result<NamespaceMap, Error> {
418 let nodeid: NodeId = VariableId::Server_NamespaceArray.into();
419 let result = self
420 .read(
421 &[ReadValueId::from(nodeid)],
422 TimestampsToReturn::Neither,
423 0.0,
424 )
425 .await
426 .map_err(|status_code| {
427 Error::new(status_code, "Reading Server namespace array failed")
428 })?;
429 if let Some(Variant::Array(array)) = &result[0].value {
430 let map = NamespaceMap::new_from_variant_array(&array.values)
431 .map_err(|e| Error::new(StatusCode::Bad, e))?;
432 let map_clone = map.clone();
433 self.set_namespaces(map);
434 Ok(map_clone)
435 } else {
436 Err(Error::new(
437 StatusCode::BadNoValue,
438 format!("Server namespace array is None. The server has an issue {result:?}"),
439 ))
440 }
441 }
442
443 pub fn get_namespace_index_from_cache(&self, url: &str) -> Option<u16> {
445 self.encoding_context().read().namespaces().get_index(url)
446 }
447
448 pub async fn get_namespace_index(&self, url: &str) -> Result<u16, Error> {
451 if let Some(idx) = self.get_namespace_index_from_cache(url) {
452 return Ok(idx);
453 };
454 let map = self.read_namespace_array().await?;
455 let idx = map.get_index(url).ok_or_else(|| {
456 Error::new(
457 StatusCode::BadNoMatch,
458 format!(
459 "Url {} not found in namespace array. Namspace array is {:?}",
460 url, &map
461 ),
462 )
463 })?;
464 Ok(idx)
465 }
466}