Skip to main content

opcua_server/
server_handle.rs

1use std::{
2    sync::{atomic::AtomicU8, Arc},
3    time::{Duration, Instant},
4};
5
6use opcua_nodes::DefaultTypeTree;
7use tokio_util::sync::CancellationToken;
8use tracing::info;
9
10use opcua_core::sync::RwLock;
11use opcua_types::{AttributeId, DataValue, LocalizedText, ServerState, VariableId};
12
13use crate::{reverse_connect::ReverseConnectHandle, ServerStatusWrapper};
14
15use super::{
16    info::ServerInfo, node_manager::NodeManagers, session::manager::SessionManager,
17    SubscriptionCache,
18};
19
20/// Reference to a server instance containing tools to modify the server
21/// while it is running.
22#[derive(Clone)]
23pub struct ServerHandle {
24    info: Arc<ServerInfo>,
25    service_level: Arc<AtomicU8>,
26    subscriptions: Arc<SubscriptionCache>,
27    node_managers: NodeManagers,
28    session_manager: Arc<RwLock<SessionManager>>,
29    type_tree: Arc<RwLock<DefaultTypeTree>>,
30    token: CancellationToken,
31    status: Arc<ServerStatusWrapper>,
32    reverse_connect: ReverseConnectHandle,
33}
34
35impl ServerHandle {
36    #[allow(clippy::too_many_arguments)]
37    pub(crate) fn new(
38        info: Arc<ServerInfo>,
39        service_level: Arc<AtomicU8>,
40        subscriptions: Arc<SubscriptionCache>,
41        node_managers: NodeManagers,
42        session_manager: Arc<RwLock<SessionManager>>,
43        type_tree: Arc<RwLock<DefaultTypeTree>>,
44        status: Arc<ServerStatusWrapper>,
45        token: CancellationToken,
46        reverse_connect: ReverseConnectHandle,
47    ) -> Self {
48        Self {
49            info,
50            service_level,
51            subscriptions,
52            node_managers,
53            session_manager,
54            type_tree,
55            status,
56            token,
57            reverse_connect,
58        }
59    }
60
61    /// Get a reference to the ServerInfo, containing configuration and other shared server data.
62    pub fn info(&self) -> &Arc<ServerInfo> {
63        &self.info
64    }
65
66    /// Get a reference to the subscription cache.
67    pub fn subscriptions(&self) -> &Arc<SubscriptionCache> {
68        &self.subscriptions
69    }
70
71    /// Set the service level, properly notifying subscribed clients of the change.
72    pub fn set_service_level(&self, sl: u8) {
73        self.service_level
74            .store(sl, std::sync::atomic::Ordering::Relaxed);
75        self.subscriptions.notify_data_change(
76            [(
77                DataValue::new_now(sl),
78                &VariableId::Server_ServiceLevel.into(),
79                AttributeId::Value,
80            )]
81            .into_iter(),
82        );
83    }
84
85    /// Get a reference to the node managers on the server.
86    pub fn node_managers(&self) -> &NodeManagers {
87        &self.node_managers
88    }
89
90    /// Get a reference to the session manager, containing all currently active sessions.
91    pub fn session_manager(&self) -> &RwLock<SessionManager> {
92        &self.session_manager
93    }
94
95    /// Get a reference to the type tree, containing shared information about types in the server.
96    pub fn type_tree(&self) -> &RwLock<DefaultTypeTree> {
97        &self.type_tree
98    }
99
100    /// Set the server state. Note that this does not do anything beyond just setting
101    /// the state and notifying clients.
102    pub fn set_server_state(&self, state: ServerState) {
103        self.status.set_state(state);
104    }
105
106    /// Get the cancellation token.
107    pub fn token(&self) -> &CancellationToken {
108        &self.token
109    }
110
111    /// Signal the server to stop.
112    pub fn cancel(&self) {
113        self.token.cancel();
114    }
115
116    /// Shorthand for getting the index of a namespace defined in the global server type tree.
117    pub fn get_namespace_index(&self, namespace: &str) -> Option<u16> {
118        self.type_tree.read().namespaces().get_index(namespace)
119    }
120
121    /// Tell the server to stop after `time` has elapsed. This will
122    /// update the `SecondsTillShutdown` variable on the server as needed.
123    pub fn shutdown_after(&self, time: Duration, reason: impl Into<LocalizedText>) {
124        let deadline = Instant::now() + time;
125        self.status
126            .schedule_shutdown(reason.into(), Instant::now() + time);
127        let token = self.token.clone();
128        info!("Shutting down server in {time:?}");
129        tokio::task::spawn(async move {
130            tokio::time::sleep_until(deadline.into()).await;
131            token.cancel();
132        });
133    }
134
135    /// Add a reverse connect target to the server.
136    /// If a target with the same ID has already been added, this does nothing.
137    pub fn add_reverse_connect_target(
138        &self,
139        target: crate::reverse_connect::ReverseConnectTargetConfig,
140    ) {
141        self.reverse_connect.add_target(target);
142    }
143
144    /// Remove a reverse connect target from the server.
145    /// If the target does not exist, this does nothing.
146    ///
147    /// This will not disconnect any existing connections to the target,
148    /// only prevent new ones from being established.
149    pub fn remove_reverse_connect_target(&self, id: &str) {
150        self.reverse_connect.remove_target(id);
151    }
152}