opcua_server/node_manager/
mod.rs

1//! The [NodeManager] trait, as well as utilities related to
2//! calling services on this, and tooling for implementing custom node managers.
3//!
4//! See docs/advanced_server.md for help on how to implement custom node managers.
5
6use std::{
7    any::{Any, TypeId},
8    ops::Index,
9    sync::{Arc, Weak},
10};
11
12use async_trait::async_trait;
13use opcua_core::sync::RwLock;
14use opcua_nodes::DefaultTypeTree;
15use opcua_types::{
16    ExpandedNodeId, MonitoringMode, NodeId, ReadAnnotationDataDetails, ReadAtTimeDetails,
17    ReadEventDetails, ReadProcessedDetails, ReadRawModifiedDetails, StatusCode, TimestampsToReturn,
18};
19use tokio::sync::OnceCell;
20
21mod attributes;
22mod build;
23mod context;
24mod history;
25pub mod memory;
26mod method;
27mod monitored_items;
28mod node_management;
29mod query;
30mod utils;
31mod view;
32
33use crate::{diagnostics::NamespaceMetadata, ServerStatusWrapper};
34
35use super::{
36    authenticator::AuthManager, info::ServerInfo, subscriptions::CreateMonitoredItem,
37    SubscriptionCache,
38};
39
40pub use {
41    attributes::{ParsedReadValueId, ParsedWriteValue, ReadNode, WriteNode},
42    build::NodeManagerBuilder,
43    context::{RequestContext, TypeTreeForUser, TypeTreeForUserStatic, TypeTreeReadContext},
44    history::{HistoryNode, HistoryResult, HistoryUpdateDetails, HistoryUpdateNode},
45    method::MethodCall,
46    monitored_items::{MonitoredItemRef, MonitoredItemUpdateRef},
47    node_management::{AddNodeItem, AddReferenceItem, DeleteNodeItem, DeleteReferenceItem},
48    query::{ParsedNodeTypeDescription, ParsedQueryDataDescription, QueryRequest},
49    utils::*,
50    view::{
51        impl_translate_browse_paths_using_browse, AddReferenceResult, BrowseNode, BrowsePathItem,
52        ExternalReference, ExternalReferenceRequest, NodeMetadata, RegisterNodeItem,
53    },
54};
55
56pub(crate) use context::resolve_external_references;
57pub(crate) use context::DefaultTypeTreeGetter;
58pub(crate) use history::HistoryReadDetails;
59pub(crate) use query::QueryContinuationPoint;
60pub(crate) use view::{BrowseContinuationPoint, ExternalReferencesContPoint};
61
62/// Trait for a collection of node managers, to allow abstracting over
63/// weak or strong references to the node manager collection.
64pub trait NodeManagerCollection {
65    /// Iterate over the node managers on the server.
66    fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>>;
67}
68
69/// Type alias for a dyn reference to a node manager.
70pub type DynNodeManager = dyn NodeManager + Send + Sync + 'static;
71
72#[derive(Clone)]
73/// Wrapper around the server managed list of node managers.
74pub struct NodeManagers {
75    node_managers: Arc<Vec<Arc<DynNodeManager>>>,
76}
77
78impl NodeManagerCollection for NodeManagers {
79    fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
80        self.iter().cloned()
81    }
82}
83
84impl NodeManagers {
85    /// Iterate by reference over the node managers.
86    pub fn iter(&self) -> impl Iterator<Item = &'_ Arc<DynNodeManager>> {
87        self.into_iter()
88    }
89
90    /// Get the length of the node manager collection.
91    pub fn len(&self) -> usize {
92        self.node_managers.len()
93    }
94
95    /// Return `true` if the node manager collection is empty.
96    pub fn is_empty(&self) -> bool {
97        self.node_managers.is_empty()
98    }
99
100    /// Create a new node manager collection from a vector of node managers.
101    pub fn new(node_managers: Vec<Arc<DynNodeManager>>) -> Self {
102        Self {
103            node_managers: Arc::new(node_managers),
104        }
105    }
106
107    /// Get a node manager by index.
108    pub fn get(&self, index: usize) -> Option<&Arc<DynNodeManager>> {
109        self.node_managers.get(index)
110    }
111
112    /// Get the first node manager with the specified type.
113    pub fn get_of_type<T: NodeManager + Send + Sync + Any>(&self) -> Option<Arc<T>> {
114        for m in self {
115            let r = &**m;
116            if r.type_id() == TypeId::of::<T>() {
117                if let Ok(k) = m.clone().into_any_arc().downcast() {
118                    return Some(k);
119                }
120            }
121        }
122
123        None
124    }
125
126    /// Get the first node manager with the specified name and try to cast it to the type `T`.
127    ///
128    /// If there are multiple node managers with the same name, only the first will ever
129    /// be returned by this. Avoid having duplicate node managers.
130    pub fn get_by_name<T: NodeManager + Send + Sync + Any>(&self, name: &str) -> Option<Arc<T>> {
131        for m in self {
132            let r = &**m;
133            if r.name() == name {
134                return m.clone().into_any_arc().downcast().ok();
135            }
136        }
137        None
138    }
139
140    /// Create a weak reference to the node managers.
141    /// A node manager should avoid holding a copy of the `NodeManagers` object since that
142    /// results in a circular reference which will leak memory once dropped.
143    /// (This does not really matter if you don't care about memory leaks when the server is dropped.)
144    pub fn as_weak(&self) -> NodeManagersRef {
145        let weak = Arc::downgrade(&self.node_managers);
146        NodeManagersRef {
147            node_managers: Arc::new(OnceCell::new_with(Some(weak))),
148        }
149    }
150}
151
152impl Index<usize> for NodeManagers {
153    type Output = Arc<DynNodeManager>;
154
155    fn index(&self, index: usize) -> &Self::Output {
156        &self.node_managers[index]
157    }
158}
159
160impl<'a> IntoIterator for &'a NodeManagers {
161    type Item = &'a Arc<DynNodeManager>;
162
163    type IntoIter = <&'a Vec<Arc<DynNodeManager>> as IntoIterator>::IntoIter;
164
165    fn into_iter(self) -> Self::IntoIter {
166        self.node_managers.iter()
167    }
168}
169
170#[derive(Clone)]
171/// A weak reference to the node manager collection.
172pub struct NodeManagersRef {
173    /// This complex structure is here because node managers need to be able to store a reference
174    /// to a _future_ weak reference to the node managers.
175    node_managers: Arc<OnceCell<Weak<Vec<Arc<DynNodeManager>>>>>,
176}
177
178impl NodeManagerCollection for NodeManagersRef {
179    fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
180        self.iter()
181    }
182}
183
184impl NodeManagersRef {
185    pub(crate) fn new_empty() -> Self {
186        Self {
187            node_managers: Default::default(),
188        }
189    }
190
191    pub(crate) fn init_from_node_managers(&self, node_managers: NodeManagers) {
192        self.node_managers
193            .set(Arc::downgrade(&node_managers.node_managers))
194            .expect("Node manager ref initialized more than once");
195    }
196
197    /// Upgrade this node manager ref. Note that node managers should avoid keeping
198    /// a permanent copy of the NodeManagers struct, to avoid circular references leading
199    /// to a memory leak when the server is dropped.
200    ///
201    /// If this fails, it means that the server is dropped, so feel free to abort anything going on.
202    pub fn upgrade(&self) -> Option<NodeManagers> {
203        let node_managers = self.node_managers.get()?.upgrade()?;
204        Some(NodeManagers { node_managers })
205    }
206
207    /// Iterate over node managers. If the server is dropped this iterator will be _empty_.
208    pub fn iter(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
209        let node_managers = self.upgrade();
210        let len = node_managers.as_ref().map(|l| l.len()).unwrap_or_default();
211        (0..len).filter_map(move |i| node_managers.as_ref().map(move |r| r[i].clone()))
212    }
213
214    /// Get the first node manager with the specified type.
215    pub fn get_of_type<T: NodeManager + Send + Sync + Any>(&self) -> Option<Arc<T>> {
216        self.upgrade().and_then(|m| m.get_of_type())
217    }
218
219    /// Get the first node manager with the specified name and try to cast it to the type `T`.
220    ///
221    /// If there are multiple node managers with the same name, only the first will ever
222    /// be returned by this. Avoid having duplicate node managers.
223    pub fn get_by_name<T: NodeManager + Send + Sync + Any>(&self, name: &str) -> Option<Arc<T>> {
224        self.upgrade().and_then(|m| m.get_by_name(name))
225    }
226
227    /// Get the node manager at the specified index.
228    pub fn get(&self, index: usize) -> Option<Arc<DynNodeManager>> {
229        self.upgrade().and_then(|m| m.get(index).cloned())
230    }
231}
232
233#[derive(Clone)]
234/// General server context, passed when requests are made to the node managers on
235/// behalf of the server itself, and not a user.
236pub struct ServerContext {
237    /// Weak reference to the node manager collection.
238    pub node_managers: NodeManagersRef,
239    /// Cache containing the subscriptions managed by the server.
240    pub subscriptions: Arc<SubscriptionCache>,
241    /// General server state and configuration.
242    pub info: Arc<ServerInfo>,
243    /// Global authenticator object.
244    pub authenticator: Arc<dyn AuthManager>,
245    /// The server default type tree.
246    pub type_tree: Arc<RwLock<DefaultTypeTree>>,
247    /// Wrapper to get a type tree for a specific user.
248    pub type_tree_getter: Arc<dyn TypeTreeForUser>,
249    /// Wrapper managing the `ServerStatus` server variable.
250    pub status: Arc<ServerStatusWrapper>,
251}
252
253/// This trait is a workaround for the lack of
254/// dyn upcasting coercion.
255pub trait IntoAnyArc {
256    /// Upcast to `Arc<dyn Any>`.
257    fn into_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
258}
259
260impl<T: Send + Sync + 'static> IntoAnyArc for T {
261    fn into_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
262        self
263    }
264}
265
266/// Trait for a type that implements logic for responding to requests.
267/// Implementations of this trait may make external calls for node information,
268/// or do other complex tasks.
269///
270/// Note that each request is passed to every node manager concurrently.
271/// It is up to each node manager to avoid responding to requests for nodes
272/// managed by a different node manager.
273///
274/// Requests are spawned on the tokio thread pool. Avoid making blocking calls in
275/// methods on this trait. If you need to do blocking work use `tokio::spawn_blocking`,
276/// though you should use async IO as much as possible.
277///
278/// For a simpler interface see InMemoryNodeManager, use this trait directly
279/// if you need to control how all node information is stored.
280#[allow(unused_variables)]
281#[async_trait]
282pub trait NodeManager: IntoAnyArc + Any {
283    /// Return whether this node manager owns the given node, this is used for
284    /// propagating service-level errors.
285    ///
286    /// If a service returns an error, all nodes it owns will get that error,
287    /// even if this is a cross node-manager request like Browse.
288    fn owns_node(&self, id: &NodeId) -> bool;
289
290    /// Name of this node manager, for debug purposes.
291    fn name(&self) -> &str;
292
293    /// Return whether this node manager owns events on the server.
294    /// The first node manager that returns true here will be called when
295    /// reading or updating historical server events.
296    fn owns_server_events(&self) -> bool {
297        false
298    }
299
300    /// Return whether this node should handle requests to create a node
301    /// for the given parent ID. This is only called if no new node ID is
302    /// requested, otherwise owns_node is called on the requested node ID.
303    ///
304    /// Returning true here doesn't mean that creating the new node must
305    /// succeed, only that _if_ the parent node exists, this node manager
306    /// would be the one to create the requested node.
307    fn handle_new_node(&self, parent_id: &ExpandedNodeId) -> bool {
308        false
309    }
310
311    /// Namespaces for a given user, used to populate the namespace array.
312    /// This being a method allows different users to see different namespaces.
313    fn namespaces_for_user(&self, context: &RequestContext) -> Vec<NamespaceMetadata>;
314
315    /// Perform any necessary loading of nodes, should populate the type tree if
316    /// needed.
317    async fn init(&self, type_tree: &mut DefaultTypeTree, context: ServerContext);
318
319    /// Resolve a list of references given by a different node manager.
320    async fn resolve_external_references(
321        &self,
322        context: &RequestContext,
323        items: &mut [&mut ExternalReferenceRequest],
324    ) {
325    }
326
327    // ATTRIBUTES
328    /// Execute the Read service. This should set results on the given nodes_to_read as needed.
329    async fn read(
330        &self,
331        context: &RequestContext,
332        max_age: f64,
333        timestamps_to_return: TimestampsToReturn,
334        nodes_to_read: &mut [&mut ReadNode],
335    ) -> Result<(), StatusCode> {
336        Err(StatusCode::BadServiceUnsupported)
337    }
338
339    /// Perform the history read raw modified service. This should write results
340    /// to the `nodes` list of type either `HistoryData` or `HistoryModifiedData`
341    async fn history_read_raw_modified(
342        &self,
343        context: &RequestContext,
344        details: &ReadRawModifiedDetails,
345        nodes: &mut [&mut HistoryNode],
346        timestamps_to_return: TimestampsToReturn,
347    ) -> Result<(), StatusCode> {
348        Err(StatusCode::BadHistoryOperationUnsupported)
349    }
350
351    /// Perform the history read processed service. This should write results
352    /// to the `nodes` list of type `HistoryData`.
353    async fn history_read_processed(
354        &self,
355        context: &RequestContext,
356        details: &ReadProcessedDetails,
357        nodes: &mut [&mut HistoryNode],
358        timestamps_to_return: TimestampsToReturn,
359    ) -> Result<(), StatusCode> {
360        Err(StatusCode::BadHistoryOperationUnsupported)
361    }
362
363    /// Perform the history read processed service. This should write results
364    /// to the `nodes` list of type `HistoryData`.
365    async fn history_read_at_time(
366        &self,
367        context: &RequestContext,
368        details: &ReadAtTimeDetails,
369        nodes: &mut [&mut HistoryNode],
370        timestamps_to_return: TimestampsToReturn,
371    ) -> Result<(), StatusCode> {
372        Err(StatusCode::BadHistoryOperationUnsupported)
373    }
374
375    /// Perform the history read events service. This should write results
376    /// to the `nodes` list of type `HistoryEvent`.
377    async fn history_read_events(
378        &self,
379        context: &RequestContext,
380        details: &ReadEventDetails,
381        nodes: &mut [&mut HistoryNode],
382        timestamps_to_return: TimestampsToReturn,
383    ) -> Result<(), StatusCode> {
384        Err(StatusCode::BadHistoryOperationUnsupported)
385    }
386
387    /// Perform the history read annotations data service. This should write
388    /// results to the `nodes` list of type `Annotation`.
389    async fn history_read_annotations(
390        &self,
391        context: &RequestContext,
392        details: &ReadAnnotationDataDetails,
393        nodes: &mut [&mut HistoryNode],
394        timestamps_to_return: TimestampsToReturn,
395    ) -> Result<(), StatusCode> {
396        Err(StatusCode::BadHistoryOperationUnsupported)
397    }
398
399    /// Perform the write service. This should write results
400    /// to the `nodes_to_write` list. The default result is `BadNodeIdUnknown`
401    async fn write(
402        &self,
403        context: &RequestContext,
404        nodes_to_write: &mut [&mut WriteNode],
405    ) -> Result<(), StatusCode> {
406        Err(StatusCode::BadServiceUnsupported)
407    }
408
409    /// Perform the HistoryUpdate service. This should write result
410    /// status codes to the `nodes` list as appropriate.
411    async fn history_update(
412        &self,
413        context: &RequestContext,
414        nodes: &mut [&mut HistoryUpdateNode],
415    ) -> Result<(), StatusCode> {
416        Err(StatusCode::BadHistoryOperationUnsupported)
417    }
418
419    // VIEW
420    /// Perform the Browse or BrowseNext service.
421    async fn browse(
422        &self,
423        context: &RequestContext,
424        nodes_to_browse: &mut [BrowseNode],
425    ) -> Result<(), StatusCode> {
426        Err(StatusCode::BadServiceUnsupported)
427    }
428
429    /// Perform the translate browse paths to node IDs service.
430    async fn translate_browse_paths_to_node_ids(
431        &self,
432        context: &RequestContext,
433        nodes: &mut [&mut BrowsePathItem],
434    ) -> Result<(), StatusCode> {
435        Err(StatusCode::BadServiceUnsupported)
436    }
437
438    /// Perform the register nodes service. The default behavior for this service is to
439    /// do nothing and pretend the nodes were registered.
440    async fn register_nodes(
441        &self,
442        context: &RequestContext,
443        nodes: &mut [&mut RegisterNodeItem],
444    ) -> Result<(), StatusCode> {
445        // Most servers don't actually do anything with node registration, it is reasonable
446        // to just pretend the nodes are registered.
447        for node in nodes {
448            node.set_registered(true);
449        }
450
451        Ok(())
452    }
453
454    /// Perform the unregister nodes service. The default behavior for this service is to
455    /// do nothing.
456    async fn unregister_nodes(
457        &self,
458        context: &RequestContext,
459        _nodes: &[&NodeId],
460    ) -> Result<(), StatusCode> {
461        // Again, just do nothing
462        Ok(())
463    }
464
465    /// Prepare for monitored item creation, the node manager must take action to
466    /// sample data for each produced monitored item, according to the parameters.
467    /// Monitored item parameters have already been revised according to server limits,
468    /// but the node manager is allowed to further revise sampling interval.
469    ///
470    /// The node manager should also read the initial value of each monitored item,
471    /// and set the status code if monitored item creation failed.
472    ///
473    /// The node manager is responsible for tracking the subscription no matter what
474    /// the value of monitoring_mode is, but should only sample if monitoring_mode
475    /// is not Disabled.
476    async fn create_monitored_items(
477        &self,
478        context: &RequestContext,
479        items: &mut [&mut CreateMonitoredItem],
480    ) -> Result<(), StatusCode> {
481        Err(StatusCode::BadServiceUnsupported)
482    }
483
484    /// Modify monitored items. This method is purely informative for the node manager,
485    /// to let it modify sampling intervals, apply a new filter, or similar.
486    ///
487    /// Node managers are not required to take any action here, and this method is not
488    /// allowed to fail.
489    async fn modify_monitored_items(
490        &self,
491        context: &RequestContext,
492        items: &[&MonitoredItemUpdateRef],
493    ) {
494    }
495
496    /// Modify monitored items. This method is purely informative for the node manager,
497    /// to let it pause or resume sampling. Note that this should _not_ delete context
498    /// stored from `create_monitored_items`, since it may be called again to resume sampling.
499    ///
500    /// The node manager should sample so long as monitoring mode is not `Disabled`, the difference
501    /// between `Reporting` and `Sampling` is handled by the server.
502    ///
503    /// Node managers are not required to take any action here, and this method is not
504    /// allowed to fail.
505    async fn set_monitoring_mode(
506        &self,
507        context: &RequestContext,
508        mode: MonitoringMode,
509        items: &[&MonitoredItemRef],
510    ) {
511    }
512
513    /// Delete monitored items. This method is purely informative for the node manager,
514    /// to let it stop sampling, or similar.
515    ///
516    /// Node managers are not required to take any action here, and this method is not
517    /// allowed to fail. Most node managers that implement subscriptions will want to do
518    /// something with this.
519    ///
520    /// This method may be given monitored items that were never created, or were
521    /// created for a different node manager. Attempting to delete a monitored item
522    /// that does not exist is handled elsewhere and should be a no-op here.
523    async fn delete_monitored_items(&self, context: &RequestContext, items: &[&MonitoredItemRef]) {}
524
525    /// Perform a query on the address space.
526    ///
527    /// All node managers must be able to query in order for the
528    /// server to support querying.
529    ///
530    /// The node manager should set a continuation point if it reaches
531    /// limits, but is responsible for not exceeding max_data_sets_to_return
532    /// and max_references_to_return.
533    async fn query(
534        &self,
535        context: &RequestContext,
536        request: &mut QueryRequest,
537    ) -> Result<(), StatusCode> {
538        Err(StatusCode::BadServiceUnsupported)
539    }
540
541    /// Call a list of methods.
542    ///
543    /// The node manager should validate the method arguments and set
544    /// an output error if the arguments are invalid.
545    ///
546    /// The node manager _must_ ensure that argument output lists and
547    /// method output lists are of the correct length according to the
548    /// method definition.
549    async fn call(
550        &self,
551        context: &RequestContext,
552        methods_to_call: &mut [&mut MethodCall],
553    ) -> Result<(), StatusCode> {
554        Err(StatusCode::BadServiceUnsupported)
555    }
556
557    /// Add a list of nodes.
558    ///
559    /// This should create the nodes, or set a failed status as appropriate.
560    /// If a node was created, the status should be set to Good.
561    async fn add_nodes(
562        &self,
563        context: &RequestContext,
564        nodes_to_add: &mut [&mut AddNodeItem],
565    ) -> Result<(), StatusCode> {
566        Err(StatusCode::BadServiceUnsupported)
567    }
568
569    /// Add a list of references.
570    ///
571    /// This will be given all references where the source _or_
572    /// target belongs to this node manager. A reference is
573    /// considered successfully added if either source_status
574    /// or target_status are Good.
575    ///
576    /// If you want to explicitly set the reference to failed,
577    /// set both source and target status. Note that it may
578    /// already have been added in a different node manager, you are
579    /// responsible for any cleanup if you do this.
580    async fn add_references(
581        &self,
582        context: &RequestContext,
583        references_to_add: &mut [&mut AddReferenceItem],
584    ) -> Result<(), StatusCode> {
585        Err(StatusCode::BadServiceUnsupported)
586    }
587
588    /// Delete a list of nodes.
589    ///
590    /// This will be given all nodes that belong to this node manager.
591    ///
592    /// Typically, you also want to implement `delete_node_references` if
593    /// there are other node managers that support deletes.
594    async fn delete_nodes(
595        &self,
596        context: &RequestContext,
597        nodes_to_delete: &mut [&mut DeleteNodeItem],
598    ) -> Result<(), StatusCode> {
599        Err(StatusCode::BadServiceUnsupported)
600    }
601
602    /// Delete references for the given list of nodes.
603    /// The node manager should respect `delete_target_references`.
604    ///
605    /// This is not allowed to fail, you should make it impossible to delete
606    /// nodes with immutable references.
607    async fn delete_node_references(
608        &self,
609        context: &RequestContext,
610        to_delete: &[&DeleteNodeItem],
611    ) {
612    }
613
614    /// Delete a list of references.
615    ///
616    /// This will be given all references where the source _or_
617    /// target belongs to this node manager. A reference is
618    /// considered successfully added if either source_status
619    /// or target_status are Good.
620    ///
621    /// If you want to explicitly set the reference to failed,
622    /// set both source and target status. Note that it may
623    /// already have been deleted in a different node manager, you are
624    /// responsible for any cleanup if you do this.
625    async fn delete_references(
626        &self,
627        context: &RequestContext,
628        references_to_delete: &mut [&mut DeleteReferenceItem],
629    ) -> Result<(), StatusCode> {
630        Err(StatusCode::BadServiceUnsupported)
631    }
632}