mod client;
mod connect;
mod connection;
mod event_loop;
mod request_builder;
mod retry;
mod services;
#[derive(Debug, Clone)]
pub struct EndpointInfo {
pub endpoint: EndpointDescription,
pub user_identity_token: IdentityToken,
pub preferred_locales: Vec<String>,
}
impl From<EndpointDescription> for EndpointInfo {
fn from(value: EndpointDescription) -> Self {
Self {
endpoint: value,
user_identity_token: IdentityToken::Anonymous,
preferred_locales: Vec::new(),
}
}
}
impl From<(EndpointDescription, IdentityToken)> for EndpointInfo {
fn from(value: (EndpointDescription, IdentityToken)) -> Self {
Self {
endpoint: value.0,
user_identity_token: value.1,
preferred_locales: Vec::new(),
}
}
}
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use arc_swap::ArcSwap;
pub use client::Client;
pub use connect::SessionConnectMode;
pub use connection::{
ConnectionSource, DirectConnectionSource, ReverseConnectionSource, SessionBuilder,
};
pub use event_loop::{SessionActivity, SessionEventLoop, SessionPollResult};
use opcua_core::handle::AtomicHandle;
use opcua_core::sync::{Mutex, RwLock};
pub use request_builder::UARequest;
pub use retry::{DefaultRetryPolicy, RequestRetryPolicy};
pub use services::attributes::{
HistoryRead, HistoryReadAction, HistoryUpdate, HistoryUpdateAction, Read, Write,
};
pub use services::method::Call;
pub use services::node_management::{AddNodes, AddReferences, DeleteNodes, DeleteReferences};
pub use services::session::{ActivateSession, Cancel, CloseSession, CreateSession};
use services::subscriptions::state::SubscriptionState;
pub use services::subscriptions::{
CreateMonitoredItems, CreateSubscription, DataChangeCallback, DeleteMonitoredItems,
DeleteSubscriptions, EventCallback, ModifyMonitoredItems, ModifySubscription, MonitoredItem,
MonitoredItemMap, OnSubscriptionNotification, OnSubscriptionNotificationCore,
PreInsertMonitoredItems, Publish, PublishLimits, Republish, SetMonitoringMode,
SetPublishingMode, SetTriggering, Subscription, SubscriptionActivity, SubscriptionCache,
SubscriptionCallbacks, SubscriptionEventLoopState, TransferSubscriptions,
};
pub use services::view::{
Browse, BrowseNext, RegisterNodes, TranslateBrowsePaths, UnregisterNodes,
};
use tracing::{error, info};
#[allow(unused)]
macro_rules! session_warn {
($session: expr, $($arg:tt)*) => {
tracing::warn!("session:{} {}", $session.session_id(), format!($($arg)*));
}
}
#[allow(unused)]
pub(crate) use session_warn;
#[allow(unused)]
macro_rules! session_error {
($session: expr, $($arg:tt)*) => {
tracing::error!("session:{} {}", $session.session_id(), format!($($arg)*));
}
}
#[allow(unused)]
pub(crate) use session_error;
#[allow(unused)]
macro_rules! session_debug {
($session: expr, $($arg:tt)*) => {
tracing::debug!("session:{} {}", $session.session_id(), format!($($arg)*));
}
}
#[allow(unused)]
pub(crate) use session_debug;
#[allow(unused)]
macro_rules! session_trace {
($session: expr, $($arg:tt)*) => {
tracing::trace!("session:{} {}", $session.session_id(), format!($($arg)*));
}
}
#[allow(unused)]
pub(crate) use session_trace;
use opcua_core::ResponseMessage;
use opcua_types::{
ApplicationDescription, ContextOwned, DecodingOptions, EndpointDescription, Error, IntegerId,
NamespaceMap, NodeId, ReadValueId, RequestHeader, ResponseHeader, StatusCode,
TimestampsToReturn, TypeLoader, UAString, VariableId, Variant,
};
use crate::browser::Browser;
use crate::transport::Connector;
use crate::{AsyncSecureChannel, ClientConfig, ExponentialBackoff, SessionRetryPolicy};
use super::IdentityToken;
pub(crate) fn process_service_result(response_header: &ResponseHeader) -> Result<(), StatusCode> {
if response_header.service_result.is_bad() {
info!(
"Received a bad service result {} from the request",
response_header.service_result
);
Err(response_header.service_result)
} else {
Ok(())
}
}
pub(crate) fn process_unexpected_response(response: ResponseMessage) -> StatusCode {
match response {
ResponseMessage::ServiceFault(service_fault) => {
error!(
"Received a service fault of {} for the request",
service_fault.response_header.service_result
);
service_fault.response_header.service_result
}
_ => {
error!("Received an unexpected response to the request");
StatusCode::BadUnknownResponse
}
}
}
#[derive(Clone, Copy)]
pub(super) enum SessionState {
Disconnected,
Connected,
Connecting,
}
static NEXT_SESSION_ID: AtomicU32 = AtomicU32::new(1);
pub struct Session {
pub(super) channel: AsyncSecureChannel,
pub(super) state_watch_rx: tokio::sync::watch::Receiver<SessionState>,
pub(super) state_watch_tx: tokio::sync::watch::Sender<SessionState>,
pub(super) session_id: Arc<ArcSwap<NodeId>>,
pub(super) internal_session_id: AtomicU32,
pub(super) session_name: UAString,
pub(super) application_description: ApplicationDescription,
pub(super) request_timeout: Duration,
pub(super) publish_timeout: Duration,
pub(super) recreate_monitored_items_chunk: usize,
pub(super) recreate_subscriptions: bool,
pub(super) should_reconnect: AtomicBool,
pub(super) session_timeout: f64,
pub subscription_state: Mutex<SubscriptionState>,
pub(super) publish_limits_watch_rx: tokio::sync::watch::Receiver<PublishLimits>,
pub(super) publish_limits_watch_tx: tokio::sync::watch::Sender<PublishLimits>,
pub(super) monitored_item_handle: AtomicHandle,
pub(super) trigger_publish_tx: tokio::sync::watch::Sender<Instant>,
pub(super) session_nonce_length: usize,
decoding_options: DecodingOptions,
}
impl Session {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new<T: Connector + Send + Sync + 'static>(
channel: AsyncSecureChannel,
session_name: UAString,
application_description: ApplicationDescription,
session_retry_policy: SessionRetryPolicy,
decoding_options: DecodingOptions,
config: &ClientConfig,
session_id: Option<NodeId>,
connector: T,
) -> (Arc<Self>, SessionEventLoop<T>) {
let (publish_limits_watch_tx, publish_limits_watch_rx) =
tokio::sync::watch::channel(PublishLimits::new());
let (state_watch_tx, state_watch_rx) =
tokio::sync::watch::channel(SessionState::Disconnected);
let (trigger_publish_tx, trigger_publish_rx) = tokio::sync::watch::channel(Instant::now());
let session = Arc::new(Session {
channel,
internal_session_id: AtomicU32::new(NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed)),
state_watch_rx,
state_watch_tx,
session_id: Arc::new(ArcSwap::new(Arc::new(session_id.unwrap_or_default()))),
session_name,
application_description,
request_timeout: config.request_timeout,
session_timeout: config.session_timeout as f64,
publish_timeout: config.publish_timeout,
recreate_monitored_items_chunk: config.performance.recreate_monitored_items_chunk,
recreate_subscriptions: config.recreate_subscriptions,
should_reconnect: AtomicBool::new(true),
subscription_state: Mutex::new(SubscriptionState::new(
config.min_publish_interval,
publish_limits_watch_tx.clone(),
)),
monitored_item_handle: AtomicHandle::new(1000),
publish_limits_watch_rx,
publish_limits_watch_tx,
trigger_publish_tx,
session_nonce_length: config.session_nonce_length,
decoding_options,
});
(
session.clone(),
SessionEventLoop::new(
session,
session_retry_policy,
trigger_publish_rx,
config.keep_alive_interval,
config.max_failed_keep_alive_count,
connector,
),
)
}
pub(super) fn make_request_header(&self) -> RequestHeader {
self.channel.make_request_header(self.request_timeout)
}
pub(crate) fn reset(&self) {
self.session_id.store(Arc::new(NodeId::null()));
self.internal_session_id.store(
NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed),
Ordering::Relaxed,
);
}
async fn wait_for_state(&self, connected: bool) -> bool {
let mut rx = self.state_watch_rx.clone();
let res = rx
.wait_for(|s| {
connected && matches!(*s, SessionState::Connected)
|| !connected && matches!(*s, SessionState::Disconnected)
})
.await
.is_ok();
res
}
pub fn session_id(&self) -> u32 {
self.internal_session_id.load(Ordering::Relaxed)
}
pub fn server_session_id(&self) -> NodeId {
(**(*self.session_id).load()).clone()
}
pub async fn wait_for_connection(&self) -> bool {
self.wait_for_state(true).await
}
pub fn disable_reconnects(&self) {
self.should_reconnect.store(false, Ordering::Relaxed);
}
pub fn enable_reconnects(&self) {
self.should_reconnect.store(true, Ordering::Relaxed);
}
pub async fn disconnect_inner(
&self,
delete_subscriptions: bool,
disable_reconnect: bool,
) -> Result<(), StatusCode> {
if disable_reconnect {
self.should_reconnect.store(false, Ordering::Relaxed);
}
let mut res = Ok(());
if let Err(e) = self.close_session(delete_subscriptions).await {
res = Err(e);
session_warn!(
self,
"Failed to close session, channel will be closed anyway: {e}"
);
}
self.channel.close_channel().await;
self.wait_for_state(false).await;
res
}
pub async fn disconnect(&self) -> Result<(), StatusCode> {
self.disconnect_inner(true, true).await
}
pub async fn disconnect_without_delete_subscriptions(&self) -> Result<(), StatusCode> {
self.disconnect_inner(false, true).await
}
pub fn decoding_options(&self) -> &DecodingOptions {
&self.decoding_options
}
pub fn channel(&self) -> &AsyncSecureChannel {
&self.channel
}
pub fn request_handle(&self) -> IntegerId {
self.channel.request_handle()
}
pub fn encoding_context(&self) -> &RwLock<ContextOwned> {
self.channel.encoding_context()
}
pub fn endpoint_info(&self) -> &EndpointInfo {
self.channel.endpoint_info()
}
pub fn set_namespaces(&self, namespaces: NamespaceMap) {
*self.encoding_context().write().namespaces_mut() = namespaces;
}
pub fn add_type_loader(&self, type_loader: Arc<dyn TypeLoader>) {
self.encoding_context()
.write()
.loaders_mut()
.add(type_loader);
}
pub fn context(&self) -> Arc<RwLock<ContextOwned>> {
self.channel.secure_channel.read().context_arc()
}
pub fn browser(&self) -> Browser<'_, (), DefaultRetryPolicy<'_>> {
Browser::new(
self,
(),
DefaultRetryPolicy::new(ExponentialBackoff::new(
Duration::from_secs(30),
Some(5),
Duration::from_millis(500),
)),
)
}
pub async fn read_namespace_array(&self) -> Result<NamespaceMap, Error> {
let nodeid: NodeId = VariableId::Server_NamespaceArray.into();
let result = self
.read(
&[ReadValueId::from(nodeid)],
TimestampsToReturn::Neither,
0.0,
)
.await
.map_err(|status_code| {
Error::new(status_code, "Reading Server namespace array failed")
})?;
if let Some(Variant::Array(array)) = &result[0].value {
let map = NamespaceMap::new_from_variant_array(&array.values)
.map_err(|e| Error::new(StatusCode::Bad, e))?;
let map_clone = map.clone();
self.set_namespaces(map);
Ok(map_clone)
} else {
Err(Error::new(
StatusCode::BadNoValue,
format!("Server namespace array is None. The server has an issue {result:?}"),
))
}
}
pub fn get_namespace_index_from_cache(&self, url: &str) -> Option<u16> {
self.encoding_context().read().namespaces().get_index(url)
}
pub async fn get_namespace_index(&self, url: &str) -> Result<u16, Error> {
if let Some(idx) = self.get_namespace_index_from_cache(url) {
return Ok(idx);
};
let map = self.read_namespace_array().await?;
let idx = map.get_index(url).ok_or_else(|| {
Error::new(
StatusCode::BadNoMatch,
format!(
"Url {} not found in namespace array. Namspace array is {:?}",
url, &map
),
)
})?;
Ok(idx)
}
}