Skip to main content

opcua_client/session/
mod.rs

1mod client;
2mod connect;
3mod connection;
4mod event_loop;
5mod request_builder;
6mod retry;
7mod services;
8
9/// Information about the server endpoint, security policy, security mode and user identity that the session will
10/// will use to establish a connection.
11#[derive(Debug, Clone)]
12pub struct EndpointInfo {
13    /// The endpoint
14    pub endpoint: EndpointDescription,
15    /// User identity token
16    pub user_identity_token: IdentityToken,
17    /// Preferred language locales
18    pub preferred_locales: Vec<String>,
19}
20
21impl From<EndpointDescription> for EndpointInfo {
22    fn from(value: EndpointDescription) -> Self {
23        Self {
24            endpoint: value,
25            user_identity_token: IdentityToken::Anonymous,
26            preferred_locales: Vec::new(),
27        }
28    }
29}
30
31impl From<(EndpointDescription, IdentityToken)> for EndpointInfo {
32    fn from(value: (EndpointDescription, IdentityToken)) -> Self {
33        Self {
34            endpoint: value.0,
35            user_identity_token: value.1,
36            preferred_locales: Vec::new(),
37        }
38    }
39}
40
41use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
42use std::sync::Arc;
43use std::time::{Duration, Instant};
44
45use arc_swap::ArcSwap;
46pub use client::Client;
47pub use connect::SessionConnectMode;
48pub use connection::{
49    ConnectionSource, DirectConnectionSource, ReverseConnectionSource, SessionBuilder,
50};
51pub use event_loop::{SessionActivity, SessionEventLoop, SessionPollResult};
52use opcua_core::handle::AtomicHandle;
53use opcua_core::sync::{Mutex, RwLock};
54pub use request_builder::UARequest;
55pub use retry::{DefaultRetryPolicy, RequestRetryPolicy};
56pub use services::attributes::{
57    HistoryRead, HistoryReadAction, HistoryUpdate, HistoryUpdateAction, Read, Write,
58};
59pub use services::method::Call;
60pub use services::node_management::{AddNodes, AddReferences, DeleteNodes, DeleteReferences};
61pub use services::session::{ActivateSession, Cancel, CloseSession, CreateSession};
62use services::subscriptions::state::SubscriptionState;
63pub use services::subscriptions::{
64    CreateMonitoredItems, CreateSubscription, DataChangeCallback, DeleteMonitoredItems,
65    DeleteSubscriptions, EventCallback, ModifyMonitoredItems, ModifySubscription, MonitoredItem,
66    MonitoredItemMap, OnSubscriptionNotification, OnSubscriptionNotificationCore,
67    PreInsertMonitoredItems, Publish, PublishLimits, Republish, SetMonitoringMode,
68    SetPublishingMode, SetTriggering, Subscription, SubscriptionActivity, SubscriptionCache,
69    SubscriptionCallbacks, SubscriptionEventLoopState, TransferSubscriptions,
70};
71pub use services::view::{
72    Browse, BrowseNext, RegisterNodes, TranslateBrowsePaths, UnregisterNodes,
73};
74use tracing::{error, info};
75
76#[allow(unused)]
77macro_rules! session_warn {
78    ($session: expr, $($arg:tt)*) =>  {
79        tracing::warn!("session:{} {}", $session.session_id(), format!($($arg)*));
80    }
81}
82#[allow(unused)]
83pub(crate) use session_warn;
84
85#[allow(unused)]
86macro_rules! session_error {
87    ($session: expr, $($arg:tt)*) =>  {
88        tracing::error!("session:{} {}", $session.session_id(), format!($($arg)*));
89    }
90}
91#[allow(unused)]
92pub(crate) use session_error;
93
94#[allow(unused)]
95macro_rules! session_debug {
96    ($session: expr, $($arg:tt)*) =>  {
97        tracing::debug!("session:{} {}", $session.session_id(), format!($($arg)*));
98    }
99}
100#[allow(unused)]
101pub(crate) use session_debug;
102
103#[allow(unused)]
104macro_rules! session_trace {
105    ($session: expr, $($arg:tt)*) =>  {
106        tracing::trace!("session:{} {}", $session.session_id(), format!($($arg)*));
107    }
108}
109#[allow(unused)]
110pub(crate) use session_trace;
111
112use opcua_core::ResponseMessage;
113use opcua_types::{
114    ApplicationDescription, ContextOwned, DecodingOptions, EndpointDescription, Error, IntegerId,
115    NamespaceMap, NodeId, ReadValueId, RequestHeader, ResponseHeader, StatusCode,
116    TimestampsToReturn, TypeLoader, UAString, VariableId, Variant,
117};
118
119use crate::browser::Browser;
120use crate::transport::Connector;
121use crate::{AsyncSecureChannel, ClientConfig, ExponentialBackoff, SessionRetryPolicy};
122
123use super::IdentityToken;
124
125/// Process the service result, i.e. where the request "succeeded" but the response
126/// contains a failure status code.
127pub(crate) fn process_service_result(response_header: &ResponseHeader) -> Result<(), StatusCode> {
128    if response_header.service_result.is_bad() {
129        info!(
130            "Received a bad service result {} from the request",
131            response_header.service_result
132        );
133        Err(response_header.service_result)
134    } else {
135        Ok(())
136    }
137}
138
139pub(crate) fn process_unexpected_response(response: ResponseMessage) -> StatusCode {
140    match response {
141        ResponseMessage::ServiceFault(service_fault) => {
142            error!(
143                "Received a service fault of {} for the request",
144                service_fault.response_header.service_result
145            );
146            service_fault.response_header.service_result
147        }
148        _ => {
149            error!("Received an unexpected response to the request");
150            StatusCode::BadUnknownResponse
151        }
152    }
153}
154
155#[derive(Clone, Copy)]
156pub(super) enum SessionState {
157    Disconnected,
158    Connected,
159    Connecting,
160}
161
162static NEXT_SESSION_ID: AtomicU32 = AtomicU32::new(1);
163
164/// An OPC-UA session. This session provides methods for all supported services that require an open session.
165///
166/// Note that not all servers may support all service requests and calling an unsupported API
167/// may cause the connection to be dropped. Your client is expected to know the capabilities of
168/// the server it is calling to avoid this.
169///
170pub struct Session {
171    pub(super) channel: AsyncSecureChannel,
172    pub(super) state_watch_rx: tokio::sync::watch::Receiver<SessionState>,
173    pub(super) state_watch_tx: tokio::sync::watch::Sender<SessionState>,
174    pub(super) session_id: Arc<ArcSwap<NodeId>>,
175    pub(super) internal_session_id: AtomicU32,
176    pub(super) session_name: UAString,
177    pub(super) application_description: ApplicationDescription,
178    pub(super) request_timeout: Duration,
179    pub(super) publish_timeout: Duration,
180    pub(super) recreate_monitored_items_chunk: usize,
181    pub(super) recreate_subscriptions: bool,
182    pub(super) should_reconnect: AtomicBool,
183    pub(super) session_timeout: f64,
184    /// Reference to the subscription cache for the client.
185    pub subscription_state: Mutex<SubscriptionState>,
186    pub(super) publish_limits_watch_rx: tokio::sync::watch::Receiver<PublishLimits>,
187    pub(super) publish_limits_watch_tx: tokio::sync::watch::Sender<PublishLimits>,
188    pub(super) monitored_item_handle: AtomicHandle,
189    pub(super) trigger_publish_tx: tokio::sync::watch::Sender<Instant>,
190    pub(super) session_nonce_length: usize,
191    decoding_options: DecodingOptions,
192}
193
194impl Session {
195    #[allow(clippy::too_many_arguments)]
196    pub(crate) fn new<T: Connector + Send + Sync + 'static>(
197        channel: AsyncSecureChannel,
198        session_name: UAString,
199        application_description: ApplicationDescription,
200        session_retry_policy: SessionRetryPolicy,
201        decoding_options: DecodingOptions,
202        config: &ClientConfig,
203        session_id: Option<NodeId>,
204        connector: T,
205    ) -> (Arc<Self>, SessionEventLoop<T>) {
206        let (publish_limits_watch_tx, publish_limits_watch_rx) =
207            tokio::sync::watch::channel(PublishLimits::new());
208        let (state_watch_tx, state_watch_rx) =
209            tokio::sync::watch::channel(SessionState::Disconnected);
210        let (trigger_publish_tx, trigger_publish_rx) = tokio::sync::watch::channel(Instant::now());
211
212        let session = Arc::new(Session {
213            channel,
214            internal_session_id: AtomicU32::new(NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed)),
215            state_watch_rx,
216            state_watch_tx,
217            session_id: Arc::new(ArcSwap::new(Arc::new(session_id.unwrap_or_default()))),
218            session_name,
219            application_description,
220            request_timeout: config.request_timeout,
221            session_timeout: config.session_timeout as f64,
222            publish_timeout: config.publish_timeout,
223            recreate_monitored_items_chunk: config.performance.recreate_monitored_items_chunk,
224            recreate_subscriptions: config.recreate_subscriptions,
225            should_reconnect: AtomicBool::new(true),
226            subscription_state: Mutex::new(SubscriptionState::new(
227                config.min_publish_interval,
228                publish_limits_watch_tx.clone(),
229            )),
230            monitored_item_handle: AtomicHandle::new(1000),
231            publish_limits_watch_rx,
232            publish_limits_watch_tx,
233            trigger_publish_tx,
234            session_nonce_length: config.session_nonce_length,
235            decoding_options,
236        });
237
238        (
239            session.clone(),
240            SessionEventLoop::new(
241                session,
242                session_retry_policy,
243                trigger_publish_rx,
244                config.keep_alive_interval,
245                config.max_failed_keep_alive_count,
246                connector,
247            ),
248        )
249    }
250
251    /// Create a request header with the default timeout.
252    pub(super) fn make_request_header(&self) -> RequestHeader {
253        self.channel.make_request_header(self.request_timeout)
254    }
255
256    /// Reset the session after a hard disconnect, clearing the session ID and incrementing the internal
257    /// session counter.
258    pub(crate) fn reset(&self) {
259        self.session_id.store(Arc::new(NodeId::null()));
260        self.internal_session_id.store(
261            NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed),
262            Ordering::Relaxed,
263        );
264    }
265
266    /// Wait for the session to be in either a connected or disconnected state.
267    async fn wait_for_state(&self, connected: bool) -> bool {
268        let mut rx = self.state_watch_rx.clone();
269
270        let res = rx
271            .wait_for(|s| {
272                connected && matches!(*s, SessionState::Connected)
273                    || !connected && matches!(*s, SessionState::Disconnected)
274            })
275            .await
276            .is_ok();
277
278        // Compiler limitation
279        res
280    }
281
282    /// The internal ID of the session, used to keep track of multiple sessions in the same program.
283    pub fn session_id(&self) -> u32 {
284        self.internal_session_id.load(Ordering::Relaxed)
285    }
286
287    /// Get the current session ID. This is different from `session_id`, which is the client-side ID
288    /// to keep track of multiple sessions. This is the session ID the server uses to identify this session.
289    pub fn server_session_id(&self) -> NodeId {
290        (**(*self.session_id).load()).clone()
291    }
292
293    /// Convenience method to wait for a connection to the server.
294    ///
295    /// You should also monitor the session event loop. If it ends, this method will never return.
296    pub async fn wait_for_connection(&self) -> bool {
297        self.wait_for_state(true).await
298    }
299
300    /// Disable automatic reconnects.
301    /// This will make the event loop quit the next time
302    /// it disconnects for whatever reason.
303    pub fn disable_reconnects(&self) {
304        self.should_reconnect.store(false, Ordering::Relaxed);
305    }
306
307    /// Enable automatic reconnects.
308    /// Automatically reconnecting is enabled by default.
309    pub fn enable_reconnects(&self) {
310        self.should_reconnect.store(true, Ordering::Relaxed);
311    }
312
313    /// Inner method for disconnect. [`Session::disconnect`] and [`Session::disconnect_without_delete_subscriptions`]
314    /// are shortands for this with `delete_subscriptions` set to `false` and `true` respectively, and
315    /// `disable_reconnect` set to `true`.
316    pub async fn disconnect_inner(
317        &self,
318        delete_subscriptions: bool,
319        disable_reconnect: bool,
320    ) -> Result<(), StatusCode> {
321        if disable_reconnect {
322            self.should_reconnect.store(false, Ordering::Relaxed);
323        }
324        let mut res = Ok(());
325        if let Err(e) = self.close_session(delete_subscriptions).await {
326            res = Err(e);
327            session_warn!(
328                self,
329                "Failed to close session, channel will be closed anyway: {e}"
330            );
331        }
332        self.channel.close_channel().await;
333
334        self.wait_for_state(false).await;
335
336        res
337    }
338
339    /// Disconnect from the server and wait until disconnected.
340    /// This will set the `should_reconnect` flag to false on the session, indicating
341    /// that it should not attempt to reconnect to the server. You may clear this flag
342    /// yourself to
343    pub async fn disconnect(&self) -> Result<(), StatusCode> {
344        self.disconnect_inner(true, true).await
345    }
346
347    /// Disconnect the server without deleting subscriptions, then wait until disconnected.
348    pub async fn disconnect_without_delete_subscriptions(&self) -> Result<(), StatusCode> {
349        self.disconnect_inner(false, true).await
350    }
351
352    /// Get the decoding options used by the session.
353    pub fn decoding_options(&self) -> &DecodingOptions {
354        &self.decoding_options
355    }
356
357    /// Get a reference to the inner secure channel.
358    pub fn channel(&self) -> &AsyncSecureChannel {
359        &self.channel
360    }
361
362    /// Get the next request handle.
363    pub fn request_handle(&self) -> IntegerId {
364        self.channel.request_handle()
365    }
366
367    /// Get a reference to the global encoding context.
368    pub fn encoding_context(&self) -> &RwLock<ContextOwned> {
369        self.channel.encoding_context()
370    }
371
372    /// Get the target endpoint for the session.
373    pub fn endpoint_info(&self) -> &EndpointInfo {
374        self.channel.endpoint_info()
375    }
376
377    /// Set the namespace array on the session.
378    /// Make sure that this namespace array contains the base namespace,
379    /// or the session may behave unexpectedly.
380    pub fn set_namespaces(&self, namespaces: NamespaceMap) {
381        *self.encoding_context().write().namespaces_mut() = namespaces;
382    }
383
384    /// Add a type loader to the encoding context.
385    /// Note that there is no mechanism to ensure uniqueness,
386    /// you should avoid adding the same type loader more than once, it will
387    /// work, but there will be a small performance overhead.
388    pub fn add_type_loader(&self, type_loader: Arc<dyn TypeLoader>) {
389        self.encoding_context()
390            .write()
391            .loaders_mut()
392            .add(type_loader);
393    }
394
395    /// Get a reference to the encoding
396    pub fn context(&self) -> Arc<RwLock<ContextOwned>> {
397        self.channel.secure_channel.read().context_arc()
398    }
399
400    /// Create a browser, used to recursively browse the node hierarchy.
401    ///
402    /// You must call `handler` on the returned browser and set a browse policy
403    /// before it can be used. You can, for example, use [BrowseFilter](crate::browser::BrowseFilter)
404    pub fn browser(&self) -> Browser<'_, (), DefaultRetryPolicy<'_>> {
405        Browser::new(
406            self,
407            (),
408            DefaultRetryPolicy::new(ExponentialBackoff::new(
409                Duration::from_secs(30),
410                Some(5),
411                Duration::from_millis(500),
412            )),
413        )
414    }
415
416    /// Return namespace array from server and store in namespace cache
417    pub async fn read_namespace_array(&self) -> Result<NamespaceMap, Error> {
418        let nodeid: NodeId = VariableId::Server_NamespaceArray.into();
419        let result = self
420            .read(
421                &[ReadValueId::from(nodeid)],
422                TimestampsToReturn::Neither,
423                0.0,
424            )
425            .await
426            .map_err(|status_code| {
427                Error::new(status_code, "Reading Server namespace array failed")
428            })?;
429        if let Some(Variant::Array(array)) = &result[0].value {
430            let map = NamespaceMap::new_from_variant_array(&array.values)
431                .map_err(|e| Error::new(StatusCode::Bad, e))?;
432            let map_clone = map.clone();
433            self.set_namespaces(map);
434            Ok(map_clone)
435        } else {
436            Err(Error::new(
437                StatusCode::BadNoValue,
438                format!("Server namespace array is None. The server has an issue {result:?}"),
439            ))
440        }
441    }
442
443    /// Return index of supplied namespace url from cache
444    pub fn get_namespace_index_from_cache(&self, url: &str) -> Option<u16> {
445        self.encoding_context().read().namespaces().get_index(url)
446    }
447
448    /// Return index of supplied namespace url
449    /// by first looking at namespace cache and querying server if necessary
450    pub async fn get_namespace_index(&self, url: &str) -> Result<u16, Error> {
451        if let Some(idx) = self.get_namespace_index_from_cache(url) {
452            return Ok(idx);
453        };
454        let map = self.read_namespace_array().await?;
455        let idx = map.get_index(url).ok_or_else(|| {
456            Error::new(
457                StatusCode::BadNoMatch,
458                format!(
459                    "Url {} not found in namespace array. Namspace array is {:?}",
460                    url, &map
461                ),
462            )
463        })?;
464        Ok(idx)
465    }
466}