opcua_server/node_manager/mod.rs
1//! The [NodeManager] trait, as well as utilities related to
2//! calling services on this, and tooling for implementing custom node managers.
3//!
4//! See docs/advanced_server.md for help on how to implement custom node managers.
5
6use std::{
7 any::{Any, TypeId},
8 ops::Index,
9 sync::{Arc, Weak},
10};
11
12use async_trait::async_trait;
13use opcua_core::sync::RwLock;
14use opcua_nodes::DefaultTypeTree;
15use opcua_types::{
16 ExpandedNodeId, MonitoringMode, NodeId, ReadAnnotationDataDetails, ReadAtTimeDetails,
17 ReadEventDetails, ReadProcessedDetails, ReadRawModifiedDetails, StatusCode, TimestampsToReturn,
18};
19use tokio::sync::OnceCell;
20
21mod attributes;
22mod build;
23mod context;
24mod history;
25pub mod memory;
26mod method;
27mod monitored_items;
28mod node_management;
29mod query;
30mod utils;
31mod view;
32
33use crate::{diagnostics::NamespaceMetadata, ServerStatusWrapper};
34
35use super::{
36 authenticator::AuthManager, info::ServerInfo, subscriptions::CreateMonitoredItem,
37 SubscriptionCache,
38};
39
40pub use {
41 attributes::{ParsedReadValueId, ParsedWriteValue, ReadNode, WriteNode},
42 build::NodeManagerBuilder,
43 context::{
44 RequestContext, RequestContextInner, TypeTreeForUser, TypeTreeForUserStatic,
45 TypeTreeReadContext,
46 },
47 history::{HistoryNode, HistoryResult, HistoryUpdateDetails, HistoryUpdateNode},
48 method::MethodCall,
49 monitored_items::{MonitoredItemRef, MonitoredItemUpdateRef},
50 node_management::{AddNodeItem, AddReferenceItem, DeleteNodeItem, DeleteReferenceItem},
51 query::{ParsedNodeTypeDescription, ParsedQueryDataDescription, QueryRequest},
52 utils::*,
53 view::{
54 impl_translate_browse_paths_using_browse, AddReferenceResult, BrowseNode, BrowsePathItem,
55 ExternalReference, ExternalReferenceRequest, NodeMetadata, RegisterNodeItem,
56 },
57};
58
59pub(crate) use context::resolve_external_references;
60pub(crate) use context::DefaultTypeTreeGetter;
61pub(crate) use history::HistoryReadDetails;
62pub(crate) use query::QueryContinuationPoint;
63pub(crate) use view::{BrowseContinuationPoint, ExternalReferencesContPoint};
64
65/// Trait for a collection of node managers, to allow abstracting over
66/// weak or strong references to the node manager collection.
67pub trait NodeManagerCollection {
68 /// Iterate over the node managers on the server.
69 fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>>;
70}
71
72/// Type alias for a dyn reference to a node manager.
73pub type DynNodeManager = dyn NodeManager + Send + Sync + 'static;
74
75#[derive(Clone)]
76/// Wrapper around the server managed list of node managers.
77pub struct NodeManagers {
78 node_managers: Arc<Vec<Arc<DynNodeManager>>>,
79}
80
81impl NodeManagerCollection for NodeManagers {
82 fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
83 self.iter().cloned()
84 }
85}
86
87impl NodeManagers {
88 /// Iterate by reference over the node managers.
89 pub fn iter(&self) -> impl Iterator<Item = &'_ Arc<DynNodeManager>> {
90 self.into_iter()
91 }
92
93 /// Get the length of the node manager collection.
94 pub fn len(&self) -> usize {
95 self.node_managers.len()
96 }
97
98 /// Return `true` if the node manager collection is empty.
99 pub fn is_empty(&self) -> bool {
100 self.node_managers.is_empty()
101 }
102
103 /// Create a new node manager collection from a vector of node managers.
104 pub fn new(node_managers: Vec<Arc<DynNodeManager>>) -> Self {
105 Self {
106 node_managers: Arc::new(node_managers),
107 }
108 }
109
110 /// Get a node manager by index.
111 pub fn get(&self, index: usize) -> Option<&Arc<DynNodeManager>> {
112 self.node_managers.get(index)
113 }
114
115 /// Get the first node manager with the specified type.
116 pub fn get_of_type<T: NodeManager + Send + Sync + Any>(&self) -> Option<Arc<T>> {
117 for m in self {
118 let r = &**m;
119 if r.type_id() == TypeId::of::<T>() {
120 if let Ok(k) = m.clone().into_any_arc().downcast() {
121 return Some(k);
122 }
123 }
124 }
125
126 None
127 }
128
129 /// Get the first node manager with the specified name and try to cast it to the type `T`.
130 ///
131 /// If there are multiple node managers with the same name, only the first will ever
132 /// be returned by this. Avoid having duplicate node managers.
133 pub fn get_by_name<T: NodeManager + Send + Sync + Any>(&self, name: &str) -> Option<Arc<T>> {
134 for m in self {
135 let r = &**m;
136 if r.name() == name {
137 return m.clone().into_any_arc().downcast().ok();
138 }
139 }
140 None
141 }
142
143 /// Create a weak reference to the node managers.
144 /// A node manager should avoid holding a copy of the `NodeManagers` object since that
145 /// results in a circular reference which will leak memory once dropped.
146 /// (This does not really matter if you don't care about memory leaks when the server is dropped.)
147 pub fn as_weak(&self) -> NodeManagersRef {
148 let weak = Arc::downgrade(&self.node_managers);
149 NodeManagersRef {
150 node_managers: Arc::new(OnceCell::new_with(Some(weak))),
151 }
152 }
153}
154
155impl Index<usize> for NodeManagers {
156 type Output = Arc<DynNodeManager>;
157
158 fn index(&self, index: usize) -> &Self::Output {
159 &self.node_managers[index]
160 }
161}
162
163impl<'a> IntoIterator for &'a NodeManagers {
164 type Item = &'a Arc<DynNodeManager>;
165
166 type IntoIter = <&'a Vec<Arc<DynNodeManager>> as IntoIterator>::IntoIter;
167
168 fn into_iter(self) -> Self::IntoIter {
169 self.node_managers.iter()
170 }
171}
172
173#[derive(Clone)]
174/// A weak reference to the node manager collection.
175pub struct NodeManagersRef {
176 /// This complex structure is here because node managers need to be able to store a reference
177 /// to a _future_ weak reference to the node managers.
178 node_managers: Arc<OnceCell<Weak<Vec<Arc<DynNodeManager>>>>>,
179}
180
181impl NodeManagerCollection for NodeManagersRef {
182 fn iter_node_managers(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
183 self.iter()
184 }
185}
186
187impl NodeManagersRef {
188 pub(crate) fn new_empty() -> Self {
189 Self {
190 node_managers: Default::default(),
191 }
192 }
193
194 pub(crate) fn init_from_node_managers(&self, node_managers: NodeManagers) {
195 self.node_managers
196 .set(Arc::downgrade(&node_managers.node_managers))
197 .expect("Node manager ref initialized more than once");
198 }
199
200 /// Upgrade this node manager ref. Note that node managers should avoid keeping
201 /// a permanent copy of the NodeManagers struct, to avoid circular references leading
202 /// to a memory leak when the server is dropped.
203 ///
204 /// If this fails, it means that the server is dropped, so feel free to abort anything going on.
205 pub fn upgrade(&self) -> Option<NodeManagers> {
206 let node_managers = self.node_managers.get()?.upgrade()?;
207 Some(NodeManagers { node_managers })
208 }
209
210 /// Iterate over node managers. If the server is dropped this iterator will be _empty_.
211 pub fn iter(&self) -> impl Iterator<Item = Arc<DynNodeManager>> {
212 let node_managers = self.upgrade();
213 let len = node_managers.as_ref().map(|l| l.len()).unwrap_or_default();
214 (0..len).filter_map(move |i| node_managers.as_ref().map(move |r| r[i].clone()))
215 }
216
217 /// Get the first node manager with the specified type.
218 pub fn get_of_type<T: NodeManager + Send + Sync + Any>(&self) -> Option<Arc<T>> {
219 self.upgrade().and_then(|m| m.get_of_type())
220 }
221
222 /// Get the first node manager with the specified name and try to cast it to the type `T`.
223 ///
224 /// If there are multiple node managers with the same name, only the first will ever
225 /// be returned by this. Avoid having duplicate node managers.
226 pub fn get_by_name<T: NodeManager + Send + Sync + Any>(&self, name: &str) -> Option<Arc<T>> {
227 self.upgrade().and_then(|m| m.get_by_name(name))
228 }
229
230 /// Get the node manager at the specified index.
231 pub fn get(&self, index: usize) -> Option<Arc<DynNodeManager>> {
232 self.upgrade().and_then(|m| m.get(index).cloned())
233 }
234}
235
236#[derive(Clone)]
237/// General server context, passed when requests are made to the node managers on
238/// behalf of the server itself, and not a user.
239pub struct ServerContext {
240 /// Weak reference to the node manager collection.
241 pub node_managers: NodeManagersRef,
242 /// Cache containing the subscriptions managed by the server.
243 pub subscriptions: Arc<SubscriptionCache>,
244 /// General server state and configuration.
245 pub info: Arc<ServerInfo>,
246 /// Global authenticator object.
247 pub authenticator: Arc<dyn AuthManager>,
248 /// The server default type tree.
249 pub type_tree: Arc<RwLock<DefaultTypeTree>>,
250 /// Wrapper to get a type tree for a specific user.
251 pub type_tree_getter: Arc<dyn TypeTreeForUser>,
252 /// Wrapper managing the `ServerStatus` server variable.
253 pub status: Arc<ServerStatusWrapper>,
254}
255
256/// This trait is a workaround for the lack of
257/// dyn upcasting coercion.
258pub trait IntoAnyArc {
259 /// Upcast to `Arc<dyn Any>`.
260 fn into_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
261}
262
263impl<T: Send + Sync + 'static> IntoAnyArc for T {
264 fn into_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
265 self
266 }
267}
268
269/// Trait for a type that implements logic for responding to requests.
270/// Implementations of this trait may make external calls for node information,
271/// or do other complex tasks.
272///
273/// Note that each request is passed to every node manager concurrently.
274/// It is up to each node manager to avoid responding to requests for nodes
275/// managed by a different node manager.
276///
277/// Requests are spawned on the tokio thread pool. Avoid making blocking calls in
278/// methods on this trait. If you need to do blocking work use `tokio::spawn_blocking`,
279/// though you should use async IO as much as possible.
280///
281/// For a simpler interface see InMemoryNodeManager, use this trait directly
282/// if you need to control how all node information is stored.
283#[allow(unused_variables)]
284#[async_trait]
285pub trait NodeManager: IntoAnyArc + Any {
286 /// Return whether this node manager owns the given node, this is used for
287 /// propagating service-level errors.
288 ///
289 /// If a service returns an error, all nodes it owns will get that error,
290 /// even if this is a cross node-manager request like Browse.
291 fn owns_node(&self, id: &NodeId) -> bool;
292
293 /// Name of this node manager, for debug purposes.
294 fn name(&self) -> &str;
295
296 /// Return whether this node manager owns events on the server.
297 /// The first node manager that returns true here will be called when
298 /// reading or updating historical server events.
299 fn owns_server_events(&self) -> bool {
300 false
301 }
302
303 /// Return whether this node should handle requests to create a node
304 /// for the given parent ID. This is only called if no new node ID is
305 /// requested, otherwise owns_node is called on the requested node ID.
306 ///
307 /// Returning true here doesn't mean that creating the new node must
308 /// succeed, only that _if_ the parent node exists, this node manager
309 /// would be the one to create the requested node.
310 fn handle_new_node(&self, parent_id: &ExpandedNodeId) -> bool {
311 false
312 }
313
314 /// Namespaces for a given user, used to populate the namespace array.
315 /// This being a method allows different users to see different namespaces.
316 fn namespaces_for_user(&self, context: &RequestContext) -> Vec<NamespaceMetadata>;
317
318 /// Perform any necessary loading of nodes, should populate the type tree if
319 /// needed.
320 async fn init(&self, type_tree: &mut DefaultTypeTree, context: ServerContext);
321
322 /// Resolve a list of references given by a different node manager.
323 async fn resolve_external_references(
324 &self,
325 context: &RequestContext,
326 items: &mut [&mut ExternalReferenceRequest],
327 ) {
328 }
329
330 // ATTRIBUTES
331 /// Execute the Read service. This should set results on the given nodes_to_read as needed.
332 async fn read(
333 &self,
334 context: &RequestContext,
335 max_age: f64,
336 timestamps_to_return: TimestampsToReturn,
337 nodes_to_read: &mut [&mut ReadNode],
338 ) -> Result<(), StatusCode> {
339 Err(StatusCode::BadServiceUnsupported)
340 }
341
342 /// Perform the history read raw modified service. This should write results
343 /// to the `nodes` list of type either `HistoryData` or `HistoryModifiedData`
344 async fn history_read_raw_modified(
345 &self,
346 context: &RequestContext,
347 details: &ReadRawModifiedDetails,
348 nodes: &mut [&mut HistoryNode],
349 timestamps_to_return: TimestampsToReturn,
350 ) -> Result<(), StatusCode> {
351 Err(StatusCode::BadHistoryOperationUnsupported)
352 }
353
354 /// Perform the history read processed service. This should write results
355 /// to the `nodes` list of type `HistoryData`.
356 async fn history_read_processed(
357 &self,
358 context: &RequestContext,
359 details: &ReadProcessedDetails,
360 nodes: &mut [&mut HistoryNode],
361 timestamps_to_return: TimestampsToReturn,
362 ) -> Result<(), StatusCode> {
363 Err(StatusCode::BadHistoryOperationUnsupported)
364 }
365
366 /// Perform the history read processed service. This should write results
367 /// to the `nodes` list of type `HistoryData`.
368 async fn history_read_at_time(
369 &self,
370 context: &RequestContext,
371 details: &ReadAtTimeDetails,
372 nodes: &mut [&mut HistoryNode],
373 timestamps_to_return: TimestampsToReturn,
374 ) -> Result<(), StatusCode> {
375 Err(StatusCode::BadHistoryOperationUnsupported)
376 }
377
378 /// Perform the history read events service. This should write results
379 /// to the `nodes` list of type `HistoryEvent`.
380 async fn history_read_events(
381 &self,
382 context: &RequestContext,
383 details: &ReadEventDetails,
384 nodes: &mut [&mut HistoryNode],
385 timestamps_to_return: TimestampsToReturn,
386 ) -> Result<(), StatusCode> {
387 Err(StatusCode::BadHistoryOperationUnsupported)
388 }
389
390 /// Perform the history read annotations data service. This should write
391 /// results to the `nodes` list of type `Annotation`.
392 async fn history_read_annotations(
393 &self,
394 context: &RequestContext,
395 details: &ReadAnnotationDataDetails,
396 nodes: &mut [&mut HistoryNode],
397 timestamps_to_return: TimestampsToReturn,
398 ) -> Result<(), StatusCode> {
399 Err(StatusCode::BadHistoryOperationUnsupported)
400 }
401
402 /// Perform the write service. This should write results
403 /// to the `nodes_to_write` list. The default result is `BadNodeIdUnknown`
404 async fn write(
405 &self,
406 context: &RequestContext,
407 nodes_to_write: &mut [&mut WriteNode],
408 ) -> Result<(), StatusCode> {
409 Err(StatusCode::BadServiceUnsupported)
410 }
411
412 /// Perform the HistoryUpdate service. This should write result
413 /// status codes to the `nodes` list as appropriate.
414 async fn history_update(
415 &self,
416 context: &RequestContext,
417 nodes: &mut [&mut HistoryUpdateNode],
418 ) -> Result<(), StatusCode> {
419 Err(StatusCode::BadHistoryOperationUnsupported)
420 }
421
422 // VIEW
423 /// Perform the Browse or BrowseNext service.
424 async fn browse(
425 &self,
426 context: &RequestContext,
427 nodes_to_browse: &mut [BrowseNode],
428 ) -> Result<(), StatusCode> {
429 Err(StatusCode::BadServiceUnsupported)
430 }
431
432 /// Perform the translate browse paths to node IDs service.
433 async fn translate_browse_paths_to_node_ids(
434 &self,
435 context: &RequestContext,
436 nodes: &mut [&mut BrowsePathItem],
437 ) -> Result<(), StatusCode> {
438 Err(StatusCode::BadServiceUnsupported)
439 }
440
441 /// Perform the register nodes service. The default behavior for this service is to
442 /// do nothing and pretend the nodes were registered.
443 async fn register_nodes(
444 &self,
445 context: &RequestContext,
446 nodes: &mut [&mut RegisterNodeItem],
447 ) -> Result<(), StatusCode> {
448 // Most servers don't actually do anything with node registration, it is reasonable
449 // to just pretend the nodes are registered.
450 for node in nodes {
451 node.set_registered(true);
452 }
453
454 Ok(())
455 }
456
457 /// Perform the unregister nodes service. The default behavior for this service is to
458 /// do nothing.
459 async fn unregister_nodes(
460 &self,
461 context: &RequestContext,
462 _nodes: &[&NodeId],
463 ) -> Result<(), StatusCode> {
464 // Again, just do nothing
465 Ok(())
466 }
467
468 /// Prepare for monitored item creation, the node manager must take action to
469 /// sample data for each produced monitored item, according to the parameters.
470 /// Monitored item parameters have already been revised according to server limits,
471 /// but the node manager is allowed to further revise sampling interval.
472 ///
473 /// The node manager should also read the initial value of each monitored item,
474 /// and set the status code if monitored item creation failed.
475 ///
476 /// The node manager is responsible for tracking the subscription no matter what
477 /// the value of monitoring_mode is, but should only sample if monitoring_mode
478 /// is not Disabled.
479 async fn create_monitored_items(
480 &self,
481 context: &RequestContext,
482 items: &mut [&mut CreateMonitoredItem],
483 ) -> Result<(), StatusCode> {
484 Err(StatusCode::BadServiceUnsupported)
485 }
486
487 /// Modify monitored items. This method is purely informative for the node manager,
488 /// to let it modify sampling intervals, apply a new filter, or similar.
489 ///
490 /// Node managers are not required to take any action here, and this method is not
491 /// allowed to fail.
492 async fn modify_monitored_items(
493 &self,
494 context: &RequestContext,
495 items: &[&MonitoredItemUpdateRef],
496 ) {
497 }
498
499 /// Modify monitored items. This method is purely informative for the node manager,
500 /// to let it pause or resume sampling. Note that this should _not_ delete context
501 /// stored from `create_monitored_items`, since it may be called again to resume sampling.
502 ///
503 /// The node manager should sample so long as monitoring mode is not `Disabled`, the difference
504 /// between `Reporting` and `Sampling` is handled by the server.
505 ///
506 /// Node managers are not required to take any action here, and this method is not
507 /// allowed to fail.
508 async fn set_monitoring_mode(
509 &self,
510 context: &RequestContext,
511 mode: MonitoringMode,
512 items: &[&MonitoredItemRef],
513 ) {
514 }
515
516 /// Delete monitored items. This method is purely informative for the node manager,
517 /// to let it stop sampling, or similar.
518 ///
519 /// Node managers are not required to take any action here, and this method is not
520 /// allowed to fail. Most node managers that implement subscriptions will want to do
521 /// something with this.
522 ///
523 /// This method may be given monitored items that were never created, or were
524 /// created for a different node manager. Attempting to delete a monitored item
525 /// that does not exist is handled elsewhere and should be a no-op here.
526 async fn delete_monitored_items(&self, context: &RequestContext, items: &[&MonitoredItemRef]) {}
527
528 /// Perform a query on the address space.
529 ///
530 /// All node managers must be able to query in order for the
531 /// server to support querying.
532 ///
533 /// The node manager should set a continuation point if it reaches
534 /// limits, but is responsible for not exceeding max_data_sets_to_return
535 /// and max_references_to_return.
536 async fn query(
537 &self,
538 context: &RequestContext,
539 request: &mut QueryRequest,
540 ) -> Result<(), StatusCode> {
541 Err(StatusCode::BadServiceUnsupported)
542 }
543
544 /// Call a list of methods.
545 ///
546 /// The node manager should validate the method arguments and set
547 /// an output error if the arguments are invalid.
548 ///
549 /// The node manager _must_ ensure that argument output lists and
550 /// method output lists are of the correct length according to the
551 /// method definition.
552 async fn call(
553 &self,
554 context: &RequestContext,
555 methods_to_call: &mut [&mut MethodCall],
556 ) -> Result<(), StatusCode> {
557 Err(StatusCode::BadServiceUnsupported)
558 }
559
560 /// Add a list of nodes.
561 ///
562 /// This should create the nodes, or set a failed status as appropriate.
563 /// If a node was created, the status should be set to Good.
564 async fn add_nodes(
565 &self,
566 context: &RequestContext,
567 nodes_to_add: &mut [&mut AddNodeItem],
568 ) -> Result<(), StatusCode> {
569 Err(StatusCode::BadServiceUnsupported)
570 }
571
572 /// Add a list of references.
573 ///
574 /// This will be given all references where the source _or_
575 /// target belongs to this node manager. A reference is
576 /// considered successfully added if either source_status
577 /// or target_status are Good.
578 ///
579 /// If you want to explicitly set the reference to failed,
580 /// set both source and target status. Note that it may
581 /// already have been added in a different node manager, you are
582 /// responsible for any cleanup if you do this.
583 async fn add_references(
584 &self,
585 context: &RequestContext,
586 references_to_add: &mut [&mut AddReferenceItem],
587 ) -> Result<(), StatusCode> {
588 Err(StatusCode::BadServiceUnsupported)
589 }
590
591 /// Delete a list of nodes.
592 ///
593 /// This will be given all nodes that belong to this node manager.
594 ///
595 /// Typically, you also want to implement `delete_node_references` if
596 /// there are other node managers that support deletes.
597 async fn delete_nodes(
598 &self,
599 context: &RequestContext,
600 nodes_to_delete: &mut [&mut DeleteNodeItem],
601 ) -> Result<(), StatusCode> {
602 Err(StatusCode::BadServiceUnsupported)
603 }
604
605 /// Delete references for the given list of nodes.
606 /// The node manager should respect `delete_target_references`.
607 ///
608 /// This is not allowed to fail, you should make it impossible to delete
609 /// nodes with immutable references.
610 async fn delete_node_references(
611 &self,
612 context: &RequestContext,
613 to_delete: &[&DeleteNodeItem],
614 ) {
615 }
616
617 /// Delete a list of references.
618 ///
619 /// This will be given all references where the source _or_
620 /// target belongs to this node manager. A reference is
621 /// considered successfully added if either source_status
622 /// or target_status are Good.
623 ///
624 /// If you want to explicitly set the reference to failed,
625 /// set both source and target status. Note that it may
626 /// already have been deleted in a different node manager, you are
627 /// responsible for any cleanup if you do this.
628 async fn delete_references(
629 &self,
630 context: &RequestContext,
631 references_to_delete: &mut [&mut DeleteReferenceItem],
632 ) -> Result<(), StatusCode> {
633 Err(StatusCode::BadServiceUnsupported)
634 }
635}