Skip to main content

opcua_server/node_manager/
mod.rs

1//! The [NodeManager] trait, as well as utilities related to
2//! calling services on this, and tooling for implementing custom node managers.
3//!
4//! See docs/advanced_server.md for help on how to implement custom node managers.
5
6use std::{
7    any::{Any, TypeId},
8    ops::Index,
9    sync::{Arc, Weak},
10};
11
12use async_trait::async_trait;
13use opcua_core::sync::RwLock;
14use opcua_nodes::DefaultTypeTree;
15use opcua_types::{
16    ExpandedNodeId, MonitoringMode, NodeId, ReadAnnotationDataDetails, ReadAtTimeDetails,
17    ReadEventDetails, ReadProcessedDetails, ReadRawModifiedDetails, StatusCode, TimestampsToReturn,
18};
19use tokio::sync::OnceCell;
20
21mod attributes;
22mod build;
23mod context;
24mod history;
25pub mod memory;
26mod method;
27mod monitored_items;
28mod node_management;
29mod query;
30mod utils;
31mod view;
32
33use crate::{diagnostics::NamespaceMetadata, ServerStatusWrapper};
34
35use super::{
36    authenticator::AuthManager, info::ServerInfo, subscriptions::CreateMonitoredItem,
37    SubscriptionCache,
38};
39
40pub use {
41    attributes::{ParsedReadValueId, ParsedWriteValue, ReadNode, WriteNode},
42    build::NodeManagerBuilder,
43    context::{
44        RequestContext, RequestContextInner, TypeTreeForUser, TypeTreeForUserStatic,
45        TypeTreeReadContext,
46    },
47    history::{HistoryNode, HistoryResult, HistoryUpdateDetails, HistoryUpdateNode},
48    method::MethodCall,
49    monitored_items::{MonitoredItemRef, MonitoredItemUpdateRef},
50    node_management::{AddNodeItem, AddReferenceItem, DeleteNodeItem, DeleteReferenceItem},
51    query::{ParsedNodeTypeDescription, ParsedQueryDataDescription, QueryRequest},
52    utils::*,
53    view::{
54        impl_translate_browse_paths_using_browse, AddReferenceResult, BrowseNode, BrowsePathItem,
55        ExternalReference, ExternalReferenceRequest, NodeMetadata, RegisterNodeItem,
56    },
57};
58
59pub(crate) use context::resolve_external_references;
60pub(crate) use context::DefaultTypeTreeGetter;
61pub(crate) use history::HistoryReadDetails;
62pub(crate) use query::QueryContinuationPoint;
63pub(crate) use view::{BrowseContinuationPoint, ExternalReferencesContPoint};
64
65/// Trait for a collection of node managers, to allow abstracting over
66/// weak or strong references to the node manager collection.
67pub trait NodeManagerCollection {
68    /// Iterate over the node managers on the server.
69    fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>>;
70}
71
72/// Type alias for a dyn reference to a node manager.
73pub type DynNodeManager = dyn NodeManager + Send + Sync + 'static;
74
75#[derive(Clone)]
76/// Wrapper around the server managed list of node managers.
77pub struct NodeManagers {
78    node_managers: Arc<Vec<Arc<DynNodeManager>>>,
79}
80
81impl NodeManagerCollection for NodeManagers {
82    fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
83        self.iter().cloned()
84    }
85}
86
87impl NodeManagers {
88    /// Iterate by reference over the node managers.
89    pub fn iter(&self) -> impl Iterator<Item = &'_ Arc<DynNodeManager>> {
90        self.into_iter()
91    }
92
93    /// Get the length of the node manager collection.
94    pub fn len(&self) -> usize {
95        self.node_managers.len()
96    }
97
98    /// Return `true` if the node manager collection is empty.
99    pub fn is_empty(&self) -> bool {
100        self.node_managers.is_empty()
101    }
102
103    /// Create a new node manager collection from a vector of node managers.
104    pub fn new(node_managers: Vec<Arc<DynNodeManager>>) -> Self {
105        Self {
106            node_managers: Arc::new(node_managers),
107        }
108    }
109
110    /// Get a node manager by index.
111    pub fn get(&self, index: usize) -> Option<&Arc<DynNodeManager>> {
112        self.node_managers.get(index)
113    }
114
115    /// Get the first node manager with the specified type.
116    pub fn get_of_type<T: NodeManager + Send + Sync + Any>(&self) -> Option<Arc<T>> {
117        for m in self {
118            let r = &**m;
119            if r.type_id() == TypeId::of::<T>() {
120                if let Ok(k) = m.clone().into_any_arc().downcast() {
121                    return Some(k);
122                }
123            }
124        }
125
126        None
127    }
128
129    /// Get the first node manager with the specified name and try to cast it to the type `T`.
130    ///
131    /// If there are multiple node managers with the same name, only the first will ever
132    /// be returned by this. Avoid having duplicate node managers.
133    pub fn get_by_name<T: NodeManager + Send + Sync + Any>(&self, name: &str) -> Option<Arc<T>> {
134        for m in self {
135            let r = &**m;
136            if r.name() == name {
137                return m.clone().into_any_arc().downcast().ok();
138            }
139        }
140        None
141    }
142
143    /// Create a weak reference to the node managers.
144    /// A node manager should avoid holding a copy of the `NodeManagers` object since that
145    /// results in a circular reference which will leak memory once dropped.
146    /// (This does not really matter if you don't care about memory leaks when the server is dropped.)
147    pub fn as_weak(&self) -> NodeManagersRef {
148        let weak = Arc::downgrade(&self.node_managers);
149        NodeManagersRef {
150            node_managers: Arc::new(OnceCell::new_with(Some(weak))),
151        }
152    }
153}
154
155impl Index<usize> for NodeManagers {
156    type Output = Arc<DynNodeManager>;
157
158    fn index(&self, index: usize) -> &Self::Output {
159        &self.node_managers[index]
160    }
161}
162
163impl<'a> IntoIterator for &'a NodeManagers {
164    type Item = &'a Arc<DynNodeManager>;
165
166    type IntoIter = <&'a Vec<Arc<DynNodeManager>> as IntoIterator>::IntoIter;
167
168    fn into_iter(self) -> Self::IntoIter {
169        self.node_managers.iter()
170    }
171}
172
173#[derive(Clone)]
174/// A weak reference to the node manager collection.
175pub struct NodeManagersRef {
176    /// This complex structure is here because node managers need to be able to store a reference
177    /// to a _future_ weak reference to the node managers.
178    node_managers: Arc<OnceCell<Weak<Vec<Arc<DynNodeManager>>>>>,
179}
180
181impl NodeManagerCollection for NodeManagersRef {
182    fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
183        self.iter()
184    }
185}
186
187impl NodeManagersRef {
188    pub(crate) fn new_empty() -> Self {
189        Self {
190            node_managers: Default::default(),
191        }
192    }
193
194    pub(crate) fn init_from_node_managers(&self, node_managers: NodeManagers) {
195        self.node_managers
196            .set(Arc::downgrade(&node_managers.node_managers))
197            .expect("Node manager ref initialized more than once");
198    }
199
200    /// Upgrade this node manager ref. Note that node managers should avoid keeping
201    /// a permanent copy of the NodeManagers struct, to avoid circular references leading
202    /// to a memory leak when the server is dropped.
203    ///
204    /// If this fails, it means that the server is dropped, so feel free to abort anything going on.
205    pub fn upgrade(&self) -> Option<NodeManagers> {
206        let node_managers = self.node_managers.get()?.upgrade()?;
207        Some(NodeManagers { node_managers })
208    }
209
210    /// Iterate over node managers. If the server is dropped this iterator will be _empty_.
211    pub fn iter(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
212        let node_managers = self.upgrade();
213        let len = node_managers.as_ref().map(|l| l.len()).unwrap_or_default();
214        (0..len).filter_map(move |i| node_managers.as_ref().map(move |r| r[i].clone()))
215    }
216
217    /// Get the first node manager with the specified type.
218    pub fn get_of_type<T: NodeManager + Send + Sync + Any>(&self) -> Option<Arc<T>> {
219        self.upgrade().and_then(|m| m.get_of_type())
220    }
221
222    /// Get the first node manager with the specified name and try to cast it to the type `T`.
223    ///
224    /// If there are multiple node managers with the same name, only the first will ever
225    /// be returned by this. Avoid having duplicate node managers.
226    pub fn get_by_name<T: NodeManager + Send + Sync + Any>(&self, name: &str) -> Option<Arc<T>> {
227        self.upgrade().and_then(|m| m.get_by_name(name))
228    }
229
230    /// Get the node manager at the specified index.
231    pub fn get(&self, index: usize) -> Option<Arc<DynNodeManager>> {
232        self.upgrade().and_then(|m| m.get(index).cloned())
233    }
234}
235
236#[derive(Clone)]
237/// General server context, passed when requests are made to the node managers on
238/// behalf of the server itself, and not a user.
239pub struct ServerContext {
240    /// Weak reference to the node manager collection.
241    pub node_managers: NodeManagersRef,
242    /// Cache containing the subscriptions managed by the server.
243    pub subscriptions: Arc<SubscriptionCache>,
244    /// General server state and configuration.
245    pub info: Arc<ServerInfo>,
246    /// Global authenticator object.
247    pub authenticator: Arc<dyn AuthManager>,
248    /// The server default type tree.
249    pub type_tree: Arc<RwLock<DefaultTypeTree>>,
250    /// Wrapper to get a type tree for a specific user.
251    pub type_tree_getter: Arc<dyn TypeTreeForUser>,
252    /// Wrapper managing the `ServerStatus` server variable.
253    pub status: Arc<ServerStatusWrapper>,
254}
255
256/// This trait is a workaround for the lack of
257/// dyn upcasting coercion.
258pub trait IntoAnyArc {
259    /// Upcast to `Arc<dyn Any>`.
260    fn into_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
261}
262
263impl<T: Send + Sync + 'static> IntoAnyArc for T {
264    fn into_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
265        self
266    }
267}
268
269/// Trait for a type that implements logic for responding to requests.
270/// Implementations of this trait may make external calls for node information,
271/// or do other complex tasks.
272///
273/// Note that each request is passed to every node manager concurrently.
274/// It is up to each node manager to avoid responding to requests for nodes
275/// managed by a different node manager.
276///
277/// Requests are spawned on the tokio thread pool. Avoid making blocking calls in
278/// methods on this trait. If you need to do blocking work use `tokio::spawn_blocking`,
279/// though you should use async IO as much as possible.
280///
281/// For a simpler interface see InMemoryNodeManager, use this trait directly
282/// if you need to control how all node information is stored.
283#[allow(unused_variables)]
284#[async_trait]
285pub trait NodeManager: IntoAnyArc + Any {
286    /// Return whether this node manager owns the given node, this is used for
287    /// propagating service-level errors.
288    ///
289    /// If a service returns an error, all nodes it owns will get that error,
290    /// even if this is a cross node-manager request like Browse.
291    fn owns_node(&self, id: &NodeId) -> bool;
292
293    /// Name of this node manager, for debug purposes.
294    fn name(&self) -> &str;
295
296    /// Return whether this node manager owns events on the server.
297    /// The first node manager that returns true here will be called when
298    /// reading or updating historical server events.
299    fn owns_server_events(&self) -> bool {
300        false
301    }
302
303    /// Return whether this node should handle requests to create a node
304    /// for the given parent ID. This is only called if no new node ID is
305    /// requested, otherwise owns_node is called on the requested node ID.
306    ///
307    /// Returning true here doesn't mean that creating the new node must
308    /// succeed, only that _if_ the parent node exists, this node manager
309    /// would be the one to create the requested node.
310    fn handle_new_node(&self, parent_id: &ExpandedNodeId) -> bool {
311        false
312    }
313
314    /// Namespaces for a given user, used to populate the namespace array.
315    /// This being a method allows different users to see different namespaces.
316    fn namespaces_for_user(&self, context: &RequestContext) -> Vec<NamespaceMetadata>;
317
318    /// Perform any necessary loading of nodes, should populate the type tree if
319    /// needed.
320    async fn init(&self, type_tree: &mut DefaultTypeTree, context: ServerContext);
321
322    /// Resolve a list of references given by a different node manager.
323    async fn resolve_external_references(
324        &self,
325        context: &RequestContext,
326        items: &mut [&mut ExternalReferenceRequest],
327    ) {
328    }
329
330    // ATTRIBUTES
331    /// Execute the Read service. This should set results on the given nodes_to_read as needed.
332    async fn read(
333        &self,
334        context: &RequestContext,
335        max_age: f64,
336        timestamps_to_return: TimestampsToReturn,
337        nodes_to_read: &mut [&mut ReadNode],
338    ) -> Result<(), StatusCode> {
339        Err(StatusCode::BadServiceUnsupported)
340    }
341
342    /// Perform the history read raw modified service. This should write results
343    /// to the `nodes` list of type either `HistoryData` or `HistoryModifiedData`
344    async fn history_read_raw_modified(
345        &self,
346        context: &RequestContext,
347        details: &ReadRawModifiedDetails,
348        nodes: &mut [&mut HistoryNode],
349        timestamps_to_return: TimestampsToReturn,
350    ) -> Result<(), StatusCode> {
351        Err(StatusCode::BadHistoryOperationUnsupported)
352    }
353
354    /// Perform the history read processed service. This should write results
355    /// to the `nodes` list of type `HistoryData`.
356    async fn history_read_processed(
357        &self,
358        context: &RequestContext,
359        details: &ReadProcessedDetails,
360        nodes: &mut [&mut HistoryNode],
361        timestamps_to_return: TimestampsToReturn,
362    ) -> Result<(), StatusCode> {
363        Err(StatusCode::BadHistoryOperationUnsupported)
364    }
365
366    /// Perform the history read processed service. This should write results
367    /// to the `nodes` list of type `HistoryData`.
368    async fn history_read_at_time(
369        &self,
370        context: &RequestContext,
371        details: &ReadAtTimeDetails,
372        nodes: &mut [&mut HistoryNode],
373        timestamps_to_return: TimestampsToReturn,
374    ) -> Result<(), StatusCode> {
375        Err(StatusCode::BadHistoryOperationUnsupported)
376    }
377
378    /// Perform the history read events service. This should write results
379    /// to the `nodes` list of type `HistoryEvent`.
380    async fn history_read_events(
381        &self,
382        context: &RequestContext,
383        details: &ReadEventDetails,
384        nodes: &mut [&mut HistoryNode],
385        timestamps_to_return: TimestampsToReturn,
386    ) -> Result<(), StatusCode> {
387        Err(StatusCode::BadHistoryOperationUnsupported)
388    }
389
390    /// Perform the history read annotations data service. This should write
391    /// results to the `nodes` list of type `Annotation`.
392    async fn history_read_annotations(
393        &self,
394        context: &RequestContext,
395        details: &ReadAnnotationDataDetails,
396        nodes: &mut [&mut HistoryNode],
397        timestamps_to_return: TimestampsToReturn,
398    ) -> Result<(), StatusCode> {
399        Err(StatusCode::BadHistoryOperationUnsupported)
400    }
401
402    /// Perform the write service. This should write results
403    /// to the `nodes_to_write` list. The default result is `BadNodeIdUnknown`
404    async fn write(
405        &self,
406        context: &RequestContext,
407        nodes_to_write: &mut [&mut WriteNode],
408    ) -> Result<(), StatusCode> {
409        Err(StatusCode::BadServiceUnsupported)
410    }
411
412    /// Perform the HistoryUpdate service. This should write result
413    /// status codes to the `nodes` list as appropriate.
414    async fn history_update(
415        &self,
416        context: &RequestContext,
417        nodes: &mut [&mut HistoryUpdateNode],
418    ) -> Result<(), StatusCode> {
419        Err(StatusCode::BadHistoryOperationUnsupported)
420    }
421
422    // VIEW
423    /// Perform the Browse or BrowseNext service.
424    async fn browse(
425        &self,
426        context: &RequestContext,
427        nodes_to_browse: &mut [BrowseNode],
428    ) -> Result<(), StatusCode> {
429        Err(StatusCode::BadServiceUnsupported)
430    }
431
432    /// Perform the translate browse paths to node IDs service.
433    async fn translate_browse_paths_to_node_ids(
434        &self,
435        context: &RequestContext,
436        nodes: &mut [&mut BrowsePathItem],
437    ) -> Result<(), StatusCode> {
438        Err(StatusCode::BadServiceUnsupported)
439    }
440
441    /// Perform the register nodes service. The default behavior for this service is to
442    /// do nothing and pretend the nodes were registered.
443    async fn register_nodes(
444        &self,
445        context: &RequestContext,
446        nodes: &mut [&mut RegisterNodeItem],
447    ) -> Result<(), StatusCode> {
448        // Most servers don't actually do anything with node registration, it is reasonable
449        // to just pretend the nodes are registered.
450        for node in nodes {
451            node.set_registered(true);
452        }
453
454        Ok(())
455    }
456
457    /// Perform the unregister nodes service. The default behavior for this service is to
458    /// do nothing.
459    async fn unregister_nodes(
460        &self,
461        context: &RequestContext,
462        _nodes: &[&NodeId],
463    ) -> Result<(), StatusCode> {
464        // Again, just do nothing
465        Ok(())
466    }
467
468    /// Prepare for monitored item creation, the node manager must take action to
469    /// sample data for each produced monitored item, according to the parameters.
470    /// Monitored item parameters have already been revised according to server limits,
471    /// but the node manager is allowed to further revise sampling interval.
472    ///
473    /// The node manager should also read the initial value of each monitored item,
474    /// and set the status code if monitored item creation failed.
475    ///
476    /// The node manager is responsible for tracking the subscription no matter what
477    /// the value of monitoring_mode is, but should only sample if monitoring_mode
478    /// is not Disabled.
479    async fn create_monitored_items(
480        &self,
481        context: &RequestContext,
482        items: &mut [&mut CreateMonitoredItem],
483    ) -> Result<(), StatusCode> {
484        Err(StatusCode::BadServiceUnsupported)
485    }
486
487    /// Modify monitored items. This method is purely informative for the node manager,
488    /// to let it modify sampling intervals, apply a new filter, or similar.
489    ///
490    /// Node managers are not required to take any action here, and this method is not
491    /// allowed to fail.
492    async fn modify_monitored_items(
493        &self,
494        context: &RequestContext,
495        items: &[&MonitoredItemUpdateRef],
496    ) {
497    }
498
499    /// Modify monitored items. This method is purely informative for the node manager,
500    /// to let it pause or resume sampling. Note that this should _not_ delete context
501    /// stored from `create_monitored_items`, since it may be called again to resume sampling.
502    ///
503    /// The node manager should sample so long as monitoring mode is not `Disabled`, the difference
504    /// between `Reporting` and `Sampling` is handled by the server.
505    ///
506    /// Node managers are not required to take any action here, and this method is not
507    /// allowed to fail.
508    async fn set_monitoring_mode(
509        &self,
510        context: &RequestContext,
511        mode: MonitoringMode,
512        items: &[&MonitoredItemRef],
513    ) {
514    }
515
516    /// Delete monitored items. This method is purely informative for the node manager,
517    /// to let it stop sampling, or similar.
518    ///
519    /// Node managers are not required to take any action here, and this method is not
520    /// allowed to fail. Most node managers that implement subscriptions will want to do
521    /// something with this.
522    ///
523    /// This method may be given monitored items that were never created, or were
524    /// created for a different node manager. Attempting to delete a monitored item
525    /// that does not exist is handled elsewhere and should be a no-op here.
526    async fn delete_monitored_items(&self, context: &RequestContext, items: &[&MonitoredItemRef]) {}
527
528    /// Perform a query on the address space.
529    ///
530    /// All node managers must be able to query in order for the
531    /// server to support querying.
532    ///
533    /// The node manager should set a continuation point if it reaches
534    /// limits, but is responsible for not exceeding max_data_sets_to_return
535    /// and max_references_to_return.
536    async fn query(
537        &self,
538        context: &RequestContext,
539        request: &mut QueryRequest,
540    ) -> Result<(), StatusCode> {
541        Err(StatusCode::BadServiceUnsupported)
542    }
543
544    /// Call a list of methods.
545    ///
546    /// The node manager should validate the method arguments and set
547    /// an output error if the arguments are invalid.
548    ///
549    /// The node manager _must_ ensure that argument output lists and
550    /// method output lists are of the correct length according to the
551    /// method definition.
552    async fn call(
553        &self,
554        context: &RequestContext,
555        methods_to_call: &mut [&mut MethodCall],
556    ) -> Result<(), StatusCode> {
557        Err(StatusCode::BadServiceUnsupported)
558    }
559
560    /// Add a list of nodes.
561    ///
562    /// This should create the nodes, or set a failed status as appropriate.
563    /// If a node was created, the status should be set to Good.
564    async fn add_nodes(
565        &self,
566        context: &RequestContext,
567        nodes_to_add: &mut [&mut AddNodeItem],
568    ) -> Result<(), StatusCode> {
569        Err(StatusCode::BadServiceUnsupported)
570    }
571
572    /// Add a list of references.
573    ///
574    /// This will be given all references where the source _or_
575    /// target belongs to this node manager. A reference is
576    /// considered successfully added if either source_status
577    /// or target_status are Good.
578    ///
579    /// If you want to explicitly set the reference to failed,
580    /// set both source and target status. Note that it may
581    /// already have been added in a different node manager, you are
582    /// responsible for any cleanup if you do this.
583    async fn add_references(
584        &self,
585        context: &RequestContext,
586        references_to_add: &mut [&mut AddReferenceItem],
587    ) -> Result<(), StatusCode> {
588        Err(StatusCode::BadServiceUnsupported)
589    }
590
591    /// Delete a list of nodes.
592    ///
593    /// This will be given all nodes that belong to this node manager.
594    ///
595    /// Typically, you also want to implement `delete_node_references` if
596    /// there are other node managers that support deletes.
597    async fn delete_nodes(
598        &self,
599        context: &RequestContext,
600        nodes_to_delete: &mut [&mut DeleteNodeItem],
601    ) -> Result<(), StatusCode> {
602        Err(StatusCode::BadServiceUnsupported)
603    }
604
605    /// Delete references for the given list of nodes.
606    /// The node manager should respect `delete_target_references`.
607    ///
608    /// This is not allowed to fail, you should make it impossible to delete
609    /// nodes with immutable references.
610    async fn delete_node_references(
611        &self,
612        context: &RequestContext,
613        to_delete: &[&DeleteNodeItem],
614    ) {
615    }
616
617    /// Delete a list of references.
618    ///
619    /// This will be given all references where the source _or_
620    /// target belongs to this node manager. A reference is
621    /// considered successfully added if either source_status
622    /// or target_status are Good.
623    ///
624    /// If you want to explicitly set the reference to failed,
625    /// set both source and target status. Note that it may
626    /// already have been deleted in a different node manager, you are
627    /// responsible for any cleanup if you do this.
628    async fn delete_references(
629        &self,
630        context: &RequestContext,
631        references_to_delete: &mut [&mut DeleteReferenceItem],
632    ) -> Result<(), StatusCode> {
633        Err(StatusCode::BadServiceUnsupported)
634    }
635}