use std::{
sync::{atomic::AtomicU8, Arc},
time::{Duration, Instant},
};
use opcua_nodes::DefaultTypeTree;
use tokio_util::sync::CancellationToken;
use tracing::info;
use opcua_core::sync::RwLock;
use opcua_types::{AttributeId, DataValue, LocalizedText, ServerState, VariableId};
use crate::{reverse_connect::ReverseConnectHandle, ServerStatusWrapper};
use super::{
info::ServerInfo, node_manager::NodeManagers, session::manager::SessionManager,
SubscriptionCache,
};
#[derive(Clone)]
pub struct ServerHandle {
info: Arc<ServerInfo>,
service_level: Arc<AtomicU8>,
subscriptions: Arc<SubscriptionCache>,
node_managers: NodeManagers,
session_manager: Arc<RwLock<SessionManager>>,
type_tree: Arc<RwLock<DefaultTypeTree>>,
token: CancellationToken,
status: Arc<ServerStatusWrapper>,
reverse_connect: ReverseConnectHandle,
}
impl ServerHandle {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
info: Arc<ServerInfo>,
service_level: Arc<AtomicU8>,
subscriptions: Arc<SubscriptionCache>,
node_managers: NodeManagers,
session_manager: Arc<RwLock<SessionManager>>,
type_tree: Arc<RwLock<DefaultTypeTree>>,
status: Arc<ServerStatusWrapper>,
token: CancellationToken,
reverse_connect: ReverseConnectHandle,
) -> Self {
Self {
info,
service_level,
subscriptions,
node_managers,
session_manager,
type_tree,
status,
token,
reverse_connect,
}
}
pub fn info(&self) -> &Arc<ServerInfo> {
&self.info
}
pub fn subscriptions(&self) -> &Arc<SubscriptionCache> {
&self.subscriptions
}
pub fn set_service_level(&self, sl: u8) {
self.service_level
.store(sl, std::sync::atomic::Ordering::Relaxed);
self.subscriptions.notify_data_change(
[(
DataValue::new_now(sl),
&VariableId::Server_ServiceLevel.into(),
AttributeId::Value,
)]
.into_iter(),
);
}
pub fn node_managers(&self) -> &NodeManagers {
&self.node_managers
}
pub fn session_manager(&self) -> &RwLock<SessionManager> {
&self.session_manager
}
pub fn type_tree(&self) -> &RwLock<DefaultTypeTree> {
&self.type_tree
}
pub fn set_server_state(&self, state: ServerState) {
self.status.set_state(state);
}
pub fn token(&self) -> &CancellationToken {
&self.token
}
pub fn cancel(&self) {
self.token.cancel();
}
pub fn get_namespace_index(&self, namespace: &str) -> Option<u16> {
self.type_tree.read().namespaces().get_index(namespace)
}
pub fn shutdown_after(&self, time: Duration, reason: impl Into<LocalizedText>) {
let deadline = Instant::now() + time;
self.status
.schedule_shutdown(reason.into(), Instant::now() + time);
let token = self.token.clone();
info!("Shutting down server in {time:?}");
tokio::task::spawn(async move {
tokio::time::sleep_until(deadline.into()).await;
token.cancel();
});
}
pub fn add_reverse_connect_target(
&self,
target: crate::reverse_connect::ReverseConnectTargetConfig,
) {
self.reverse_connect.add_target(target);
}
pub fn remove_reverse_connect_target(&self, id: &str) {
self.reverse_connect.remove_target(id);
}
}