opcua_server/node_manager/mod.rs
1//! The [NodeManager] trait, as well as utilities related to
2//! calling services on this, and tooling for implementing custom node managers.
3//!
4//! See docs/advanced_server.md for help on how to implement custom node managers.
5
6use std::{
7 any::{Any, TypeId},
8 ops::Index,
9 sync::{Arc, Weak},
10};
11
12use async_trait::async_trait;
13use opcua_core::sync::RwLock;
14use opcua_nodes::DefaultTypeTree;
15use opcua_types::{
16 ExpandedNodeId, MonitoringMode, NodeId, ReadAnnotationDataDetails, ReadAtTimeDetails,
17 ReadEventDetails, ReadProcessedDetails, ReadRawModifiedDetails, StatusCode, TimestampsToReturn,
18};
19use tokio::sync::OnceCell;
20
21mod attributes;
22mod build;
23mod context;
24mod history;
25pub mod memory;
26mod method;
27mod monitored_items;
28mod node_management;
29mod query;
30mod utils;
31mod view;
32
33use crate::{diagnostics::NamespaceMetadata, ServerStatusWrapper};
34
35use super::{
36 authenticator::AuthManager, info::ServerInfo, subscriptions::CreateMonitoredItem,
37 SubscriptionCache,
38};
39
40pub use {
41 attributes::{ParsedReadValueId, ParsedWriteValue, ReadNode, WriteNode},
42 build::NodeManagerBuilder,
43 context::{RequestContext, TypeTreeForUser, TypeTreeForUserStatic, TypeTreeReadContext},
44 history::{HistoryNode, HistoryResult, HistoryUpdateDetails, HistoryUpdateNode},
45 method::MethodCall,
46 monitored_items::{MonitoredItemRef, MonitoredItemUpdateRef},
47 node_management::{AddNodeItem, AddReferenceItem, DeleteNodeItem, DeleteReferenceItem},
48 query::{ParsedNodeTypeDescription, ParsedQueryDataDescription, QueryRequest},
49 utils::*,
50 view::{
51 impl_translate_browse_paths_using_browse, AddReferenceResult, BrowseNode, BrowsePathItem,
52 ExternalReference, ExternalReferenceRequest, NodeMetadata, RegisterNodeItem,
53 },
54};
55
56pub(crate) use context::resolve_external_references;
57pub(crate) use context::DefaultTypeTreeGetter;
58pub(crate) use history::HistoryReadDetails;
59pub(crate) use query::QueryContinuationPoint;
60pub(crate) use view::{BrowseContinuationPoint, ExternalReferencesContPoint};
61
62/// Trait for a collection of node managers, to allow abstracting over
63/// weak or strong references to the node manager collection.
64pub trait NodeManagerCollection {
65 /// Iterate over the node managers on the server.
66 fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>>;
67}
68
69/// Type alias for a dyn reference to a node manager.
70pub type DynNodeManager = dyn NodeManager + Send + Sync + 'static;
71
72#[derive(Clone)]
73/// Wrapper around the server managed list of node managers.
74pub struct NodeManagers {
75 node_managers: Arc<Vec<Arc<DynNodeManager>>>,
76}
77
78impl NodeManagerCollection for NodeManagers {
79 fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
80 self.iter().cloned()
81 }
82}
83
84impl NodeManagers {
85 /// Iterate by reference over the node managers.
86 pub fn iter(&self) -> impl Iterator<Item = &'_ Arc<DynNodeManager>> {
87 self.into_iter()
88 }
89
90 /// Get the length of the node manager collection.
91 pub fn len(&self) -> usize {
92 self.node_managers.len()
93 }
94
95 /// Return `true` if the node manager collection is empty.
96 pub fn is_empty(&self) -> bool {
97 self.node_managers.is_empty()
98 }
99
100 /// Create a new node manager collection from a vector of node managers.
101 pub fn new(node_managers: Vec<Arc<DynNodeManager>>) -> Self {
102 Self {
103 node_managers: Arc::new(node_managers),
104 }
105 }
106
107 /// Get a node manager by index.
108 pub fn get(&self, index: usize) -> Option<&Arc<DynNodeManager>> {
109 self.node_managers.get(index)
110 }
111
112 /// Get the first node manager with the specified type.
113 pub fn get_of_type<T: NodeManager + Send + Sync + Any>(&self) -> Option<Arc<T>> {
114 for m in self {
115 let r = &**m;
116 if r.type_id() == TypeId::of::<T>() {
117 if let Ok(k) = m.clone().into_any_arc().downcast() {
118 return Some(k);
119 }
120 }
121 }
122
123 None
124 }
125
126 /// Get the first node manager with the specified name and try to cast it to the type `T`.
127 ///
128 /// If there are multiple node managers with the same name, only the first will ever
129 /// be returned by this. Avoid having duplicate node managers.
130 pub fn get_by_name<T: NodeManager + Send + Sync + Any>(&self, name: &str) -> Option<Arc<T>> {
131 for m in self {
132 let r = &**m;
133 if r.name() == name {
134 return m.clone().into_any_arc().downcast().ok();
135 }
136 }
137 None
138 }
139
140 /// Create a weak reference to the node managers.
141 /// A node manager should avoid holding a copy of the `NodeManagers` object since that
142 /// results in a circular reference which will leak memory once dropped.
143 /// (This does not really matter if you don't care about memory leaks when the server is dropped.)
144 pub fn as_weak(&self) -> NodeManagersRef {
145 let weak = Arc::downgrade(&self.node_managers);
146 NodeManagersRef {
147 node_managers: Arc::new(OnceCell::new_with(Some(weak))),
148 }
149 }
150}
151
152impl Index<usize> for NodeManagers {
153 type Output = Arc<DynNodeManager>;
154
155 fn index(&self, index: usize) -> &Self::Output {
156 &self.node_managers[index]
157 }
158}
159
160impl<'a> IntoIterator for &'a NodeManagers {
161 type Item = &'a Arc<DynNodeManager>;
162
163 type IntoIter = <&'a Vec<Arc<DynNodeManager>> as IntoIterator>::IntoIter;
164
165 fn into_iter(self) -> Self::IntoIter {
166 self.node_managers.iter()
167 }
168}
169
170#[derive(Clone)]
171/// A weak reference to the node manager collection.
172pub struct NodeManagersRef {
173 /// This complex structure is here because node managers need to be able to store a reference
174 /// to a _future_ weak reference to the node managers.
175 node_managers: Arc<OnceCell<Weak<Vec<Arc<DynNodeManager>>>>>,
176}
177
178impl NodeManagerCollection for NodeManagersRef {
179 fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
180 self.iter()
181 }
182}
183
184impl NodeManagersRef {
185 pub(crate) fn new_empty() -> Self {
186 Self {
187 node_managers: Default::default(),
188 }
189 }
190
191 pub(crate) fn init_from_node_managers(&self, node_managers: NodeManagers) {
192 self.node_managers
193 .set(Arc::downgrade(&node_managers.node_managers))
194 .expect("Node manager ref initialized more than once");
195 }
196
197 /// Upgrade this node manager ref. Note that node managers should avoid keeping
198 /// a permanent copy of the NodeManagers struct, to avoid circular references leading
199 /// to a memory leak when the server is dropped.
200 ///
201 /// If this fails, it means that the server is dropped, so feel free to abort anything going on.
202 pub fn upgrade(&self) -> Option<NodeManagers> {
203 let node_managers = self.node_managers.get()?.upgrade()?;
204 Some(NodeManagers { node_managers })
205 }
206
207 /// Iterate over node managers. If the server is dropped this iterator will be _empty_.
208 pub fn iter(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
209 let node_managers = self.upgrade();
210 let len = node_managers.as_ref().map(|l| l.len()).unwrap_or_default();
211 (0..len).filter_map(move |i| node_managers.as_ref().map(move |r| r[i].clone()))
212 }
213
214 /// Get the first node manager with the specified type.
215 pub fn get_of_type<T: NodeManager + Send + Sync + Any>(&self) -> Option<Arc<T>> {
216 self.upgrade().and_then(|m| m.get_of_type())
217 }
218
219 /// Get the first node manager with the specified name and try to cast it to the type `T`.
220 ///
221 /// If there are multiple node managers with the same name, only the first will ever
222 /// be returned by this. Avoid having duplicate node managers.
223 pub fn get_by_name<T: NodeManager + Send + Sync + Any>(&self, name: &str) -> Option<Arc<T>> {
224 self.upgrade().and_then(|m| m.get_by_name(name))
225 }
226
227 /// Get the node manager at the specified index.
228 pub fn get(&self, index: usize) -> Option<Arc<DynNodeManager>> {
229 self.upgrade().and_then(|m| m.get(index).cloned())
230 }
231}
232
233#[derive(Clone)]
234/// General server context, passed when requests are made to the node managers on
235/// behalf of the server itself, and not a user.
236pub struct ServerContext {
237 /// Weak reference to the node manager collection.
238 pub node_managers: NodeManagersRef,
239 /// Cache containing the subscriptions managed by the server.
240 pub subscriptions: Arc<SubscriptionCache>,
241 /// General server state and configuration.
242 pub info: Arc<ServerInfo>,
243 /// Global authenticator object.
244 pub authenticator: Arc<dyn AuthManager>,
245 /// The server default type tree.
246 pub type_tree: Arc<RwLock<DefaultTypeTree>>,
247 /// Wrapper to get a type tree for a specific user.
248 pub type_tree_getter: Arc<dyn TypeTreeForUser>,
249 /// Wrapper managing the `ServerStatus` server variable.
250 pub status: Arc<ServerStatusWrapper>,
251}
252
253/// This trait is a workaround for the lack of
254/// dyn upcasting coercion.
255pub trait IntoAnyArc {
256 /// Upcast to `Arc<dyn Any>`.
257 fn into_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
258}
259
260impl<T: Send + Sync + 'static> IntoAnyArc for T {
261 fn into_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
262 self
263 }
264}
265
266/// Trait for a type that implements logic for responding to requests.
267/// Implementations of this trait may make external calls for node information,
268/// or do other complex tasks.
269///
270/// Note that each request is passed to every node manager concurrently.
271/// It is up to each node manager to avoid responding to requests for nodes
272/// managed by a different node manager.
273///
274/// Requests are spawned on the tokio thread pool. Avoid making blocking calls in
275/// methods on this trait. If you need to do blocking work use `tokio::spawn_blocking`,
276/// though you should use async IO as much as possible.
277///
278/// For a simpler interface see InMemoryNodeManager, use this trait directly
279/// if you need to control how all node information is stored.
280#[allow(unused_variables)]
281#[async_trait]
282pub trait NodeManager: IntoAnyArc + Any {
283 /// Return whether this node manager owns the given node, this is used for
284 /// propagating service-level errors.
285 ///
286 /// If a service returns an error, all nodes it owns will get that error,
287 /// even if this is a cross node-manager request like Browse.
288 fn owns_node(&self, id: &NodeId) -> bool;
289
290 /// Name of this node manager, for debug purposes.
291 fn name(&self) -> &str;
292
293 /// Return whether this node manager owns events on the server.
294 /// The first node manager that returns true here will be called when
295 /// reading or updating historical server events.
296 fn owns_server_events(&self) -> bool {
297 false
298 }
299
300 /// Return whether this node should handle requests to create a node
301 /// for the given parent ID. This is only called if no new node ID is
302 /// requested, otherwise owns_node is called on the requested node ID.
303 ///
304 /// Returning true here doesn't mean that creating the new node must
305 /// succeed, only that _if_ the parent node exists, this node manager
306 /// would be the one to create the requested node.
307 fn handle_new_node(&self, parent_id: &ExpandedNodeId) -> bool {
308 false
309 }
310
311 /// Namespaces for a given user, used to populate the namespace array.
312 /// This being a method allows different users to see different namespaces.
313 fn namespaces_for_user(&self, context: &RequestContext) -> Vec<NamespaceMetadata>;
314
315 /// Perform any necessary loading of nodes, should populate the type tree if
316 /// needed.
317 async fn init(&self, type_tree: &mut DefaultTypeTree, context: ServerContext);
318
319 /// Resolve a list of references given by a different node manager.
320 async fn resolve_external_references(
321 &self,
322 context: &RequestContext,
323 items: &mut [&mut ExternalReferenceRequest],
324 ) {
325 }
326
327 // ATTRIBUTES
328 /// Execute the Read service. This should set results on the given nodes_to_read as needed.
329 async fn read(
330 &self,
331 context: &RequestContext,
332 max_age: f64,
333 timestamps_to_return: TimestampsToReturn,
334 nodes_to_read: &mut [&mut ReadNode],
335 ) -> Result<(), StatusCode> {
336 Err(StatusCode::BadServiceUnsupported)
337 }
338
339 /// Perform the history read raw modified service. This should write results
340 /// to the `nodes` list of type either `HistoryData` or `HistoryModifiedData`
341 async fn history_read_raw_modified(
342 &self,
343 context: &RequestContext,
344 details: &ReadRawModifiedDetails,
345 nodes: &mut [&mut HistoryNode],
346 timestamps_to_return: TimestampsToReturn,
347 ) -> Result<(), StatusCode> {
348 Err(StatusCode::BadHistoryOperationUnsupported)
349 }
350
351 /// Perform the history read processed service. This should write results
352 /// to the `nodes` list of type `HistoryData`.
353 async fn history_read_processed(
354 &self,
355 context: &RequestContext,
356 details: &ReadProcessedDetails,
357 nodes: &mut [&mut HistoryNode],
358 timestamps_to_return: TimestampsToReturn,
359 ) -> Result<(), StatusCode> {
360 Err(StatusCode::BadHistoryOperationUnsupported)
361 }
362
363 /// Perform the history read processed service. This should write results
364 /// to the `nodes` list of type `HistoryData`.
365 async fn history_read_at_time(
366 &self,
367 context: &RequestContext,
368 details: &ReadAtTimeDetails,
369 nodes: &mut [&mut HistoryNode],
370 timestamps_to_return: TimestampsToReturn,
371 ) -> Result<(), StatusCode> {
372 Err(StatusCode::BadHistoryOperationUnsupported)
373 }
374
375 /// Perform the history read events service. This should write results
376 /// to the `nodes` list of type `HistoryEvent`.
377 async fn history_read_events(
378 &self,
379 context: &RequestContext,
380 details: &ReadEventDetails,
381 nodes: &mut [&mut HistoryNode],
382 timestamps_to_return: TimestampsToReturn,
383 ) -> Result<(), StatusCode> {
384 Err(StatusCode::BadHistoryOperationUnsupported)
385 }
386
387 /// Perform the history read annotations data service. This should write
388 /// results to the `nodes` list of type `Annotation`.
389 async fn history_read_annotations(
390 &self,
391 context: &RequestContext,
392 details: &ReadAnnotationDataDetails,
393 nodes: &mut [&mut HistoryNode],
394 timestamps_to_return: TimestampsToReturn,
395 ) -> Result<(), StatusCode> {
396 Err(StatusCode::BadHistoryOperationUnsupported)
397 }
398
399 /// Perform the write service. This should write results
400 /// to the `nodes_to_write` list. The default result is `BadNodeIdUnknown`
401 async fn write(
402 &self,
403 context: &RequestContext,
404 nodes_to_write: &mut [&mut WriteNode],
405 ) -> Result<(), StatusCode> {
406 Err(StatusCode::BadServiceUnsupported)
407 }
408
409 /// Perform the HistoryUpdate service. This should write result
410 /// status codes to the `nodes` list as appropriate.
411 async fn history_update(
412 &self,
413 context: &RequestContext,
414 nodes: &mut [&mut HistoryUpdateNode],
415 ) -> Result<(), StatusCode> {
416 Err(StatusCode::BadHistoryOperationUnsupported)
417 }
418
419 // VIEW
420 /// Perform the Browse or BrowseNext service.
421 async fn browse(
422 &self,
423 context: &RequestContext,
424 nodes_to_browse: &mut [BrowseNode],
425 ) -> Result<(), StatusCode> {
426 Err(StatusCode::BadServiceUnsupported)
427 }
428
429 /// Perform the translate browse paths to node IDs service.
430 async fn translate_browse_paths_to_node_ids(
431 &self,
432 context: &RequestContext,
433 nodes: &mut [&mut BrowsePathItem],
434 ) -> Result<(), StatusCode> {
435 Err(StatusCode::BadServiceUnsupported)
436 }
437
438 /// Perform the register nodes service. The default behavior for this service is to
439 /// do nothing and pretend the nodes were registered.
440 async fn register_nodes(
441 &self,
442 context: &RequestContext,
443 nodes: &mut [&mut RegisterNodeItem],
444 ) -> Result<(), StatusCode> {
445 // Most servers don't actually do anything with node registration, it is reasonable
446 // to just pretend the nodes are registered.
447 for node in nodes {
448 node.set_registered(true);
449 }
450
451 Ok(())
452 }
453
454 /// Perform the unregister nodes service. The default behavior for this service is to
455 /// do nothing.
456 async fn unregister_nodes(
457 &self,
458 context: &RequestContext,
459 _nodes: &[&NodeId],
460 ) -> Result<(), StatusCode> {
461 // Again, just do nothing
462 Ok(())
463 }
464
465 /// Prepare for monitored item creation, the node manager must take action to
466 /// sample data for each produced monitored item, according to the parameters.
467 /// Monitored item parameters have already been revised according to server limits,
468 /// but the node manager is allowed to further revise sampling interval.
469 ///
470 /// The node manager should also read the initial value of each monitored item,
471 /// and set the status code if monitored item creation failed.
472 ///
473 /// The node manager is responsible for tracking the subscription no matter what
474 /// the value of monitoring_mode is, but should only sample if monitoring_mode
475 /// is not Disabled.
476 async fn create_monitored_items(
477 &self,
478 context: &RequestContext,
479 items: &mut [&mut CreateMonitoredItem],
480 ) -> Result<(), StatusCode> {
481 Err(StatusCode::BadServiceUnsupported)
482 }
483
484 /// Modify monitored items. This method is purely informative for the node manager,
485 /// to let it modify sampling intervals, apply a new filter, or similar.
486 ///
487 /// Node managers are not required to take any action here, and this method is not
488 /// allowed to fail.
489 async fn modify_monitored_items(
490 &self,
491 context: &RequestContext,
492 items: &[&MonitoredItemUpdateRef],
493 ) {
494 }
495
496 /// Modify monitored items. This method is purely informative for the node manager,
497 /// to let it pause or resume sampling. Note that this should _not_ delete context
498 /// stored from `create_monitored_items`, since it may be called again to resume sampling.
499 ///
500 /// The node manager should sample so long as monitoring mode is not `Disabled`, the difference
501 /// between `Reporting` and `Sampling` is handled by the server.
502 ///
503 /// Node managers are not required to take any action here, and this method is not
504 /// allowed to fail.
505 async fn set_monitoring_mode(
506 &self,
507 context: &RequestContext,
508 mode: MonitoringMode,
509 items: &[&MonitoredItemRef],
510 ) {
511 }
512
513 /// Delete monitored items. This method is purely informative for the node manager,
514 /// to let it stop sampling, or similar.
515 ///
516 /// Node managers are not required to take any action here, and this method is not
517 /// allowed to fail. Most node managers that implement subscriptions will want to do
518 /// something with this.
519 ///
520 /// This method may be given monitored items that were never created, or were
521 /// created for a different node manager. Attempting to delete a monitored item
522 /// that does not exist is handled elsewhere and should be a no-op here.
523 async fn delete_monitored_items(&self, context: &RequestContext, items: &[&MonitoredItemRef]) {}
524
525 /// Perform a query on the address space.
526 ///
527 /// All node managers must be able to query in order for the
528 /// server to support querying.
529 ///
530 /// The node manager should set a continuation point if it reaches
531 /// limits, but is responsible for not exceeding max_data_sets_to_return
532 /// and max_references_to_return.
533 async fn query(
534 &self,
535 context: &RequestContext,
536 request: &mut QueryRequest,
537 ) -> Result<(), StatusCode> {
538 Err(StatusCode::BadServiceUnsupported)
539 }
540
541 /// Call a list of methods.
542 ///
543 /// The node manager should validate the method arguments and set
544 /// an output error if the arguments are invalid.
545 ///
546 /// The node manager _must_ ensure that argument output lists and
547 /// method output lists are of the correct length according to the
548 /// method definition.
549 async fn call(
550 &self,
551 context: &RequestContext,
552 methods_to_call: &mut [&mut MethodCall],
553 ) -> Result<(), StatusCode> {
554 Err(StatusCode::BadServiceUnsupported)
555 }
556
557 /// Add a list of nodes.
558 ///
559 /// This should create the nodes, or set a failed status as appropriate.
560 /// If a node was created, the status should be set to Good.
561 async fn add_nodes(
562 &self,
563 context: &RequestContext,
564 nodes_to_add: &mut [&mut AddNodeItem],
565 ) -> Result<(), StatusCode> {
566 Err(StatusCode::BadServiceUnsupported)
567 }
568
569 /// Add a list of references.
570 ///
571 /// This will be given all references where the source _or_
572 /// target belongs to this node manager. A reference is
573 /// considered successfully added if either source_status
574 /// or target_status are Good.
575 ///
576 /// If you want to explicitly set the reference to failed,
577 /// set both source and target status. Note that it may
578 /// already have been added in a different node manager, you are
579 /// responsible for any cleanup if you do this.
580 async fn add_references(
581 &self,
582 context: &RequestContext,
583 references_to_add: &mut [&mut AddReferenceItem],
584 ) -> Result<(), StatusCode> {
585 Err(StatusCode::BadServiceUnsupported)
586 }
587
588 /// Delete a list of nodes.
589 ///
590 /// This will be given all nodes that belong to this node manager.
591 ///
592 /// Typically, you also want to implement `delete_node_references` if
593 /// there are other node managers that support deletes.
594 async fn delete_nodes(
595 &self,
596 context: &RequestContext,
597 nodes_to_delete: &mut [&mut DeleteNodeItem],
598 ) -> Result<(), StatusCode> {
599 Err(StatusCode::BadServiceUnsupported)
600 }
601
602 /// Delete references for the given list of nodes.
603 /// The node manager should respect `delete_target_references`.
604 ///
605 /// This is not allowed to fail, you should make it impossible to delete
606 /// nodes with immutable references.
607 async fn delete_node_references(
608 &self,
609 context: &RequestContext,
610 to_delete: &[&DeleteNodeItem],
611 ) {
612 }
613
614 /// Delete a list of references.
615 ///
616 /// This will be given all references where the source _or_
617 /// target belongs to this node manager. A reference is
618 /// considered successfully added if either source_status
619 /// or target_status are Good.
620 ///
621 /// If you want to explicitly set the reference to failed,
622 /// set both source and target status. Note that it may
623 /// already have been deleted in a different node manager, you are
624 /// responsible for any cleanup if you do this.
625 async fn delete_references(
626 &self,
627 context: &RequestContext,
628 references_to_delete: &mut [&mut DeleteReferenceItem],
629 ) -> Result<(), StatusCode> {
630 Err(StatusCode::BadServiceUnsupported)
631 }
632}