use std::{
any::{Any, TypeId},
ops::Index,
sync::{Arc, Weak},
};
use async_trait::async_trait;
use opcua_core::sync::RwLock;
use opcua_nodes::DefaultTypeTree;
use opcua_types::{
ExpandedNodeId, MonitoringMode, NodeId, ReadAnnotationDataDetails, ReadAtTimeDetails,
ReadEventDetails, ReadProcessedDetails, ReadRawModifiedDetails, StatusCode, TimestampsToReturn,
};
use tokio::sync::OnceCell;
mod attributes;
mod build;
mod context;
mod history;
pub mod memory;
mod method;
mod monitored_items;
mod node_management;
mod query;
mod utils;
mod view;
use crate::{diagnostics::NamespaceMetadata, ServerStatusWrapper};
use super::{
authenticator::AuthManager, info::ServerInfo, subscriptions::CreateMonitoredItem,
SubscriptionCache,
};
pub use {
attributes::{ParsedReadValueId, ParsedWriteValue, ReadNode, WriteNode},
build::NodeManagerBuilder,
context::{
RequestContext, RequestContextInner, TypeTreeForUser, TypeTreeForUserStatic,
TypeTreeReadContext,
},
history::{HistoryNode, HistoryResult, HistoryUpdateDetails, HistoryUpdateNode},
method::MethodCall,
monitored_items::{MonitoredItemRef, MonitoredItemUpdateRef},
node_management::{AddNodeItem, AddReferenceItem, DeleteNodeItem, DeleteReferenceItem},
query::{ParsedNodeTypeDescription, ParsedQueryDataDescription, QueryRequest},
utils::*,
view::{
impl_translate_browse_paths_using_browse, AddReferenceResult, BrowseNode, BrowsePathItem,
ExternalReference, ExternalReferenceRequest, NodeMetadata, RegisterNodeItem,
},
};
pub(crate) use context::resolve_external_references;
pub(crate) use context::DefaultTypeTreeGetter;
pub(crate) use history::HistoryReadDetails;
pub(crate) use query::QueryContinuationPoint;
pub(crate) use view::{BrowseContinuationPoint, ExternalReferencesContPoint};
pub trait NodeManagerCollection {
fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>>;
}
pub type DynNodeManager = dyn NodeManager + Send + Sync + 'static;
#[derive(Clone)]
pub struct NodeManagers {
node_managers: Arc<Vec<Arc<DynNodeManager>>>,
}
impl NodeManagerCollection for NodeManagers {
fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
self.iter().cloned()
}
}
impl NodeManagers {
pub fn iter(&self) -> impl Iterator<Item = &'_ Arc<DynNodeManager>> {
self.into_iter()
}
pub fn len(&self) -> usize {
self.node_managers.len()
}
pub fn is_empty(&self) -> bool {
self.node_managers.is_empty()
}
pub fn new(node_managers: Vec<Arc<DynNodeManager>>) -> Self {
Self {
node_managers: Arc::new(node_managers),
}
}
pub fn get(&self, index: usize) -> Option<&Arc<DynNodeManager>> {
self.node_managers.get(index)
}
pub fn get_of_type<T: NodeManager + Send + Sync + Any>(&self) -> Option<Arc<T>> {
for m in self {
let r = &**m;
if r.type_id() == TypeId::of::<T>() {
if let Ok(k) = m.clone().into_any_arc().downcast() {
return Some(k);
}
}
}
None
}
pub fn get_by_name<T: NodeManager + Send + Sync + Any>(&self, name: &str) -> Option<Arc<T>> {
for m in self {
let r = &**m;
if r.name() == name {
return m.clone().into_any_arc().downcast().ok();
}
}
None
}
pub fn as_weak(&self) -> NodeManagersRef {
let weak = Arc::downgrade(&self.node_managers);
NodeManagersRef {
node_managers: Arc::new(OnceCell::new_with(Some(weak))),
}
}
}
impl Index<usize> for NodeManagers {
type Output = Arc<DynNodeManager>;
fn index(&self, index: usize) -> &Self::Output {
&self.node_managers[index]
}
}
impl<'a> IntoIterator for &'a NodeManagers {
type Item = &'a Arc<DynNodeManager>;
type IntoIter = <&'a Vec<Arc<DynNodeManager>> as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
self.node_managers.iter()
}
}
#[derive(Clone)]
pub struct NodeManagersRef {
node_managers: Arc<OnceCell<Weak<Vec<Arc<DynNodeManager>>>>>,
}
impl NodeManagerCollection for NodeManagersRef {
fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
self.iter()
}
}
impl NodeManagersRef {
pub(crate) fn new_empty() -> Self {
Self {
node_managers: Default::default(),
}
}
pub(crate) fn init_from_node_managers(&self, node_managers: NodeManagers) {
self.node_managers
.set(Arc::downgrade(&node_managers.node_managers))
.expect("Node manager ref initialized more than once");
}
pub fn upgrade(&self) -> Option<NodeManagers> {
let node_managers = self.node_managers.get()?.upgrade()?;
Some(NodeManagers { node_managers })
}
pub fn iter(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
let node_managers = self.upgrade();
let len = node_managers.as_ref().map(|l| l.len()).unwrap_or_default();
(0..len).filter_map(move |i| node_managers.as_ref().map(move |r| r[i].clone()))
}
pub fn get_of_type<T: NodeManager + Send + Sync + Any>(&self) -> Option<Arc<T>> {
self.upgrade().and_then(|m| m.get_of_type())
}
pub fn get_by_name<T: NodeManager + Send + Sync + Any>(&self, name: &str) -> Option<Arc<T>> {
self.upgrade().and_then(|m| m.get_by_name(name))
}
pub fn get(&self, index: usize) -> Option<Arc<DynNodeManager>> {
self.upgrade().and_then(|m| m.get(index).cloned())
}
}
#[derive(Clone)]
pub struct ServerContext {
pub node_managers: NodeManagersRef,
pub subscriptions: Arc<SubscriptionCache>,
pub info: Arc<ServerInfo>,
pub authenticator: Arc<dyn AuthManager>,
pub type_tree: Arc<RwLock<DefaultTypeTree>>,
pub type_tree_getter: Arc<dyn TypeTreeForUser>,
pub status: Arc<ServerStatusWrapper>,
}
pub trait IntoAnyArc {
fn into_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
}
impl<T: Send + Sync + 'static> IntoAnyArc for T {
fn into_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self
}
}
#[allow(unused_variables)]
#[async_trait]
pub trait NodeManager: IntoAnyArc + Any {
fn owns_node(&self, id: &NodeId) -> bool;
fn name(&self) -> &str;
fn owns_server_events(&self) -> bool {
false
}
fn handle_new_node(&self, parent_id: &ExpandedNodeId) -> bool {
false
}
fn namespaces_for_user(&self, context: &RequestContext) -> Vec<NamespaceMetadata>;
async fn init(&self, type_tree: &mut DefaultTypeTree, context: ServerContext);
async fn resolve_external_references(
&self,
context: &RequestContext,
items: &mut [&mut ExternalReferenceRequest],
) {
}
async fn read(
&self,
context: &RequestContext,
max_age: f64,
timestamps_to_return: TimestampsToReturn,
nodes_to_read: &mut [&mut ReadNode],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
async fn history_read_raw_modified(
&self,
context: &RequestContext,
details: &ReadRawModifiedDetails,
nodes: &mut [&mut HistoryNode],
timestamps_to_return: TimestampsToReturn,
) -> Result<(), StatusCode> {
Err(StatusCode::BadHistoryOperationUnsupported)
}
async fn history_read_processed(
&self,
context: &RequestContext,
details: &ReadProcessedDetails,
nodes: &mut [&mut HistoryNode],
timestamps_to_return: TimestampsToReturn,
) -> Result<(), StatusCode> {
Err(StatusCode::BadHistoryOperationUnsupported)
}
async fn history_read_at_time(
&self,
context: &RequestContext,
details: &ReadAtTimeDetails,
nodes: &mut [&mut HistoryNode],
timestamps_to_return: TimestampsToReturn,
) -> Result<(), StatusCode> {
Err(StatusCode::BadHistoryOperationUnsupported)
}
async fn history_read_events(
&self,
context: &RequestContext,
details: &ReadEventDetails,
nodes: &mut [&mut HistoryNode],
timestamps_to_return: TimestampsToReturn,
) -> Result<(), StatusCode> {
Err(StatusCode::BadHistoryOperationUnsupported)
}
async fn history_read_annotations(
&self,
context: &RequestContext,
details: &ReadAnnotationDataDetails,
nodes: &mut [&mut HistoryNode],
timestamps_to_return: TimestampsToReturn,
) -> Result<(), StatusCode> {
Err(StatusCode::BadHistoryOperationUnsupported)
}
async fn write(
&self,
context: &RequestContext,
nodes_to_write: &mut [&mut WriteNode],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
async fn history_update(
&self,
context: &RequestContext,
nodes: &mut [&mut HistoryUpdateNode],
) -> Result<(), StatusCode> {
Err(StatusCode::BadHistoryOperationUnsupported)
}
async fn browse(
&self,
context: &RequestContext,
nodes_to_browse: &mut [BrowseNode],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
async fn translate_browse_paths_to_node_ids(
&self,
context: &RequestContext,
nodes: &mut [&mut BrowsePathItem],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
async fn register_nodes(
&self,
context: &RequestContext,
nodes: &mut [&mut RegisterNodeItem],
) -> Result<(), StatusCode> {
for node in nodes {
node.set_registered(true);
}
Ok(())
}
async fn unregister_nodes(
&self,
context: &RequestContext,
_nodes: &[&NodeId],
) -> Result<(), StatusCode> {
Ok(())
}
async fn create_monitored_items(
&self,
context: &RequestContext,
items: &mut [&mut CreateMonitoredItem],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
async fn modify_monitored_items(
&self,
context: &RequestContext,
items: &[&MonitoredItemUpdateRef],
) {
}
async fn set_monitoring_mode(
&self,
context: &RequestContext,
mode: MonitoringMode,
items: &[&MonitoredItemRef],
) {
}
async fn delete_monitored_items(&self, context: &RequestContext, items: &[&MonitoredItemRef]) {}
async fn query(
&self,
context: &RequestContext,
request: &mut QueryRequest,
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
async fn call(
&self,
context: &RequestContext,
methods_to_call: &mut [&mut MethodCall],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
async fn add_nodes(
&self,
context: &RequestContext,
nodes_to_add: &mut [&mut AddNodeItem],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
async fn add_references(
&self,
context: &RequestContext,
references_to_add: &mut [&mut AddReferenceItem],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
async fn delete_nodes(
&self,
context: &RequestContext,
nodes_to_delete: &mut [&mut DeleteNodeItem],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
async fn delete_node_references(
&self,
context: &RequestContext,
to_delete: &[&DeleteNodeItem],
) {
}
async fn delete_references(
&self,
context: &RequestContext,
references_to_delete: &mut [&mut DeleteReferenceItem],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
}