opcua_server/node_manager/memory/
core.rs

1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use chrono::Offset;
5use hashbrown::HashMap;
6
7use crate::{
8    address_space::{read_node_value, AddressSpace, CoreNamespace},
9    diagnostics::NamespaceMetadata,
10    load_method_args,
11    node_manager::{
12        MethodCall, MonitoredItemRef, MonitoredItemUpdateRef, NodeManagersRef, ParsedReadValueId,
13        RequestContext, ServerContext, SyncSampler,
14    },
15    subscriptions::CreateMonitoredItem,
16    ServerCapabilities, ServerStatusWrapper,
17};
18use opcua_core::{sync::RwLock, trace_lock};
19use opcua_types::{
20    DataValue, DateTime, ExtensionObject, IdType, Identifier, MethodId, MonitoringMode, NodeId,
21    NumericRange, ObjectId, ReferenceTypeId, StatusCode, TimeZoneDataType, TimestampsToReturn,
22    VariableId, Variant, VariantScalarTypeId, VariantTypeId,
23};
24
25use super::{InMemoryNodeManager, InMemoryNodeManagerImpl, InMemoryNodeManagerImplBuilder};
26
27/// Node manager impl for the core namespace.
28pub struct CoreNodeManagerImpl {
29    sampler: SyncSampler,
30    node_managers: NodeManagersRef,
31    status: Arc<ServerStatusWrapper>,
32}
33
34/// Node manager for the core namespace.
35pub type CoreNodeManager = InMemoryNodeManager<CoreNodeManagerImpl>;
36
37/// Builder for the [CoreNodeManager].
38pub struct CoreNodeManagerBuilder;
39
40impl InMemoryNodeManagerImplBuilder for CoreNodeManagerBuilder {
41    type Impl = CoreNodeManagerImpl;
42
43    fn build(self, context: ServerContext, address_space: &mut AddressSpace) -> Self::Impl {
44        {
45            let mut type_tree = context.type_tree.write();
46            address_space.import_node_set(&CoreNamespace, type_tree.namespaces_mut());
47        }
48
49        CoreNodeManagerImpl::new(context.node_managers.clone(), context.status.clone())
50    }
51}
52
53/*
54The core node manager serves as an example for how you can create a simple
55node manager based on the in-memory node manager.
56
57In this case the data is largely static, so all we need to really
58implement is Read, leaving the responsibility for notifying any subscriptions
59of changes to these to the one doing the modifying.
60*/
61
62#[async_trait]
63impl InMemoryNodeManagerImpl for CoreNodeManagerImpl {
64    async fn init(&self, address_space: &mut AddressSpace, context: ServerContext) {
65        self.add_aggregates(address_space, &context.info.capabilities);
66        let interval = context
67            .info
68            .config
69            .limits
70            .subscriptions
71            .min_sampling_interval_ms
72            .floor() as u64;
73        let sampler_interval = if interval > 0 { interval } else { 100 };
74        self.sampler.run(
75            Duration::from_millis(sampler_interval),
76            context.subscriptions.clone(),
77        );
78    }
79
80    fn namespaces(&self) -> Vec<NamespaceMetadata> {
81        vec![NamespaceMetadata {
82            // If necessary we could read this from the address space here,
83            // but I don't think we need to, the diagnostics node manager
84            // has an exception for the base namespace.
85            is_namespace_subset: Some(false),
86            namespace_publication_date: None,
87            namespace_version: None,
88            namespace_uri: "http://opcfoundation.org/UA/".to_owned(),
89            static_node_id_types: Some(vec![IdType::Numeric]),
90            namespace_index: 0,
91            ..Default::default()
92        }]
93    }
94
95    fn name(&self) -> &str {
96        "core"
97    }
98
99    async fn read_values(
100        &self,
101        context: &RequestContext,
102        address_space: &RwLock<AddressSpace>,
103        nodes: &[&ParsedReadValueId],
104        max_age: f64,
105        timestamps_to_return: TimestampsToReturn,
106    ) -> Vec<DataValue> {
107        let address_space = address_space.read();
108
109        nodes
110            .iter()
111            .map(|n| {
112                self.read_node_value(context, &address_space, n, max_age, timestamps_to_return)
113            })
114            .collect()
115    }
116
117    async fn call(
118        &self,
119        context: &RequestContext,
120        _address_space: &RwLock<AddressSpace>,
121        methods_to_call: &mut [&mut &mut MethodCall],
122    ) -> Result<(), StatusCode> {
123        for method in methods_to_call {
124            if let Err(e) = self.call_builtin_method(method, context) {
125                method.set_status(e);
126            }
127        }
128        Ok(())
129    }
130
131    async fn create_value_monitored_items(
132        &self,
133        context: &RequestContext,
134        address_space: &RwLock<AddressSpace>,
135        items: &mut [&mut &mut CreateMonitoredItem],
136    ) {
137        let address_space = address_space.read();
138        for node in items {
139            let value = self.read_node_value(
140                context,
141                &address_space,
142                node.item_to_monitor(),
143                0.0,
144                node.timestamps_to_return(),
145            );
146            if value.status() == StatusCode::BadUserAccessDenied {
147                node.set_status(StatusCode::BadUserAccessDenied);
148                continue;
149            }
150            if value.status() != StatusCode::BadAttributeIdInvalid {
151                node.set_initial_value(value);
152            }
153            node.set_status(StatusCode::Good);
154
155            if let Some(var_id) = self.status.get_managed_id(&node.item_to_monitor().node_id) {
156                self.status.subscribe_to_component(
157                    var_id,
158                    node.monitoring_mode(),
159                    node.handle(),
160                    Duration::from_millis(node.sampling_interval() as u64),
161                );
162            } else if self.is_internal_sampled(&node.item_to_monitor().node_id, context) {
163                if let Err(e) = self.add_internal_sampler(node, context) {
164                    node.set_status(e);
165                }
166            }
167        }
168    }
169
170    async fn set_monitoring_mode(
171        &self,
172        _context: &RequestContext,
173        mode: MonitoringMode,
174        items: &[&MonitoredItemRef],
175    ) {
176        for item in items {
177            if self.status.get_managed_id(item.node_id()).is_some() {
178                self.status.sampler().set_sampler_mode(
179                    item.node_id(),
180                    item.attribute(),
181                    item.handle(),
182                    mode,
183                );
184            }
185        }
186    }
187
188    async fn modify_monitored_items(
189        &self,
190        _context: &RequestContext,
191        items: &[&MonitoredItemUpdateRef],
192    ) {
193        for item in items {
194            if self.status.get_managed_id(item.node_id()).is_some() {
195                self.status.sampler().update_sampler(
196                    item.node_id(),
197                    item.attribute(),
198                    item.handle(),
199                    Duration::from_millis(item.update().revised_sampling_interval as u64),
200                );
201            }
202        }
203    }
204
205    async fn delete_monitored_items(&self, _context: &RequestContext, items: &[&MonitoredItemRef]) {
206        for item in items {
207            if self.status.get_managed_id(item.node_id()).is_some() {
208                self.status.sampler().remove_sampler(
209                    item.node_id(),
210                    item.attribute(),
211                    item.handle(),
212                );
213            }
214        }
215    }
216}
217
218impl CoreNodeManagerImpl {
219    pub(super) fn new(node_managers: NodeManagersRef, status: Arc<ServerStatusWrapper>) -> Self {
220        Self {
221            sampler: SyncSampler::new(),
222            status,
223            node_managers,
224        }
225    }
226
227    fn read_node_value(
228        &self,
229        context: &RequestContext,
230        address_space: &AddressSpace,
231        node_to_read: &ParsedReadValueId,
232        max_age: f64,
233        timestamps_to_return: TimestampsToReturn,
234    ) -> DataValue {
235        let mut result_value = DataValue::null();
236        // Check that the read is permitted.
237        let node = match address_space.validate_node_read(context, node_to_read) {
238            Ok(n) => n,
239            Err(e) => {
240                result_value.status = Some(e);
241                return result_value;
242            }
243        };
244        // Try to read a special value, that is obtained from somewhere else.
245        // A custom node manager might read this from some device, or get them
246        // in some other way.
247
248        // In this case, the values are largely read from configuration.
249        if let Some(v) = self.read_server_value(context, node_to_read) {
250            v
251        } else {
252            // If it can't be found, read it from the node hierarchy.
253            read_node_value(node, context, node_to_read, max_age, timestamps_to_return)
254        }
255    }
256
257    fn get_variable_id(&self, node: &NodeId) -> Option<VariableId> {
258        if node.namespace != 0 {
259            return None;
260        }
261        let Identifier::Numeric(identifier) = node.identifier else {
262            return None;
263        };
264        VariableId::try_from(identifier).ok()
265    }
266
267    fn is_internal_sampled(&self, node: &NodeId, context: &RequestContext) -> bool {
268        let Some(variable_id) = self.get_variable_id(node) else {
269            return false;
270        };
271
272        context.info.diagnostics.is_mapped(variable_id)
273    }
274
275    fn add_internal_sampler(
276        &self,
277        monitored_item: &mut CreateMonitoredItem,
278        context: &RequestContext,
279    ) -> Result<(), StatusCode> {
280        let Some(var_id) = self.get_variable_id(&monitored_item.item_to_monitor().node_id) else {
281            return Err(StatusCode::BadNodeIdUnknown);
282        };
283
284        if context.info.diagnostics.is_mapped(var_id) {
285            let info = context.info.clone();
286            self.sampler.add_sampler(
287                monitored_item.item_to_monitor().node_id.clone(),
288                monitored_item.item_to_monitor().attribute_id,
289                move || info.diagnostics.get(var_id),
290                monitored_item.monitoring_mode(),
291                monitored_item.handle(),
292                Duration::from_millis(monitored_item.sampling_interval() as u64),
293            );
294            Ok(())
295        } else {
296            Err(StatusCode::BadNodeIdUnknown)
297        }
298    }
299
300    fn read_server_value(
301        &self,
302        context: &RequestContext,
303        node: &ParsedReadValueId,
304    ) -> Option<DataValue> {
305        let var_id = self.get_variable_id(&node.node_id)?;
306
307        let limits = &context.info.config.limits;
308        let hist_cap = &context.info.capabilities.history;
309
310        let v: Variant = match var_id {
311            VariableId::Server_ServerCapabilities_MaxArrayLength => {
312                (limits.max_array_length as u32).into()
313            }
314            VariableId::Server_ServerCapabilities_MaxBrowseContinuationPoints => {
315                (limits.max_browse_continuation_points as u16).into()
316            }
317            VariableId::Server_ServerCapabilities_MaxByteStringLength => {
318                (limits.max_byte_string_length as u32).into()
319            }
320            VariableId::Server_ServerCapabilities_MaxHistoryContinuationPoints => {
321                (limits.max_history_continuation_points as u16).into()
322            }
323            VariableId::Server_ServerCapabilities_MaxQueryContinuationPoints => {
324                (limits.max_query_continuation_points as u16).into()
325            }
326            VariableId::Server_ServerCapabilities_MaxStringLength => {
327                (limits.max_string_length as u32).into()
328            }
329            VariableId::Server_ServerCapabilities_MinSupportedSampleRate => {
330                (limits.subscriptions.min_sampling_interval_ms as u32).into()
331            }
332            VariableId::Server_ServerCapabilities_OperationLimits_MaxMonitoredItemsPerCall => {
333                (limits.operational.max_monitored_items_per_call as u32).into()
334            }
335            VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerBrowse => {
336                (limits.operational.max_nodes_per_browse as u32).into()
337            }
338            VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadData => {
339                (limits.operational.max_nodes_per_history_read_data as u32).into()
340            }
341            VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadEvents => {
342                (limits.operational.max_nodes_per_history_read_events as u32).into()
343            }
344            VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateData => {
345                (limits.operational.max_nodes_per_history_update as u32).into()
346            }
347            VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateEvents => {
348                (limits.operational.max_nodes_per_history_update as u32).into()
349            }
350            VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerMethodCall => {
351                (limits.operational.max_nodes_per_method_call as u32).into()
352            }
353            VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerNodeManagement => {
354                (limits.operational.max_nodes_per_node_management as u32).into()
355            }
356            VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerRead => {
357                (limits.operational.max_nodes_per_read as u32).into()
358            }
359            VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerRegisterNodes => {
360                (limits.operational.max_nodes_per_register_nodes as u32).into()
361            }
362            VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerTranslateBrowsePathsToNodeIds => {
363                (limits.operational.max_nodes_per_translate_browse_paths_to_node_ids as u32).into()
364            }
365            VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerWrite => {
366                (limits.operational.max_nodes_per_write as u32).into()
367            }
368            VariableId::Server_ServerCapabilities_ServerProfileArray => {
369                context.info.capabilities.profiles.clone().into()
370            }
371
372            // History capabilities
373            VariableId::HistoryServerCapabilities_AccessHistoryDataCapability => {
374                hist_cap.access_history_data.into()
375            }
376            VariableId::HistoryServerCapabilities_AccessHistoryEventsCapability => {
377                hist_cap.access_history_events.into()
378            }
379            VariableId::HistoryServerCapabilities_DeleteAtTimeCapability => {
380                hist_cap.delete_at_time.into()
381            }
382            VariableId::HistoryServerCapabilities_DeleteEventCapability => {
383                hist_cap.delete_event.into()
384            }
385            VariableId::HistoryServerCapabilities_DeleteRawCapability => {
386                hist_cap.delete_raw.into()
387            }
388            VariableId::HistoryServerCapabilities_InsertAnnotationCapability => {
389                hist_cap.insert_annotation.into()
390            }
391            VariableId::HistoryServerCapabilities_InsertDataCapability => {
392                hist_cap.insert_data.into()
393            }
394            VariableId::HistoryServerCapabilities_InsertEventCapability => {
395                hist_cap.insert_event.into()
396            }
397            VariableId::HistoryServerCapabilities_MaxReturnDataValues => {
398                hist_cap.max_return_data_values.into()
399            }
400            VariableId::HistoryServerCapabilities_MaxReturnEventValues => {
401                hist_cap.max_return_event_values.into()
402            }
403            VariableId::HistoryServerCapabilities_ReplaceDataCapability => {
404                hist_cap.replace_data.into()
405            }
406            VariableId::HistoryServerCapabilities_ReplaceEventCapability => {
407                hist_cap.replace_event.into()
408            }
409            VariableId::HistoryServerCapabilities_ServerTimestampSupported => {
410                hist_cap.server_timestamp_supported.into()
411            }
412            VariableId::HistoryServerCapabilities_UpdateDataCapability => {
413                hist_cap.update_data.into()
414            }
415            VariableId::HistoryServerCapabilities_UpdateEventCapability => {
416                hist_cap.update_event.into()
417            }
418
419            // Misc server status
420            VariableId::Server_ServiceLevel => {
421                context.info.service_level.load(std::sync::atomic::Ordering::Relaxed).into()
422            }
423            VariableId::Server_LocalTime => {
424                let offset = chrono::Local::now().offset().fix().local_minus_utc() / 60;
425                ExtensionObject::from_message(TimeZoneDataType {
426                    offset: offset.try_into().ok()?,
427                    // TODO: Figure out how to set this. Chrono does not provide a way to
428                    // tell whether daylight savings is in effect for the local time zone.
429                    daylight_saving_in_offset: false,
430                }).into()
431            }
432
433            // ServerStatus
434            VariableId::Server_ServerStatus => {
435                self.status.full_status_obj().into()
436            }
437            VariableId::Server_ServerStatus_BuildInfo => {
438                ExtensionObject::from_message(self.status.build_info()).into()
439            }
440            VariableId::Server_ServerStatus_BuildInfo_BuildDate => {
441                self.status.build_info().build_date.into()
442            }
443            VariableId::Server_ServerStatus_BuildInfo_BuildNumber => {
444                self.status.build_info().build_number.into()
445            }
446            VariableId::Server_ServerStatus_BuildInfo_ManufacturerName => {
447                self.status.build_info().manufacturer_name.into()
448            }
449            VariableId::Server_ServerStatus_BuildInfo_ProductName => {
450                self.status.build_info().product_name.into()
451            }
452            VariableId::Server_ServerStatus_BuildInfo_ProductUri => {
453                self.status.build_info().product_uri.into()
454            }
455            VariableId::Server_ServerStatus_BuildInfo_SoftwareVersion => {
456                self.status.build_info().software_version.into()
457            }
458            VariableId::Server_ServerStatus_CurrentTime => {
459                DateTime::now().into()
460            }
461            VariableId::Server_ServerStatus_SecondsTillShutdown => {
462                match self.status.seconds_till_shutdown() {
463                    Some(x) => x.into(),
464                    None => Variant::Empty
465                }
466            }
467            VariableId::Server_ServerStatus_ShutdownReason => {
468                self.status.shutdown_reason().into()
469            }
470            VariableId::Server_ServerStatus_StartTime => {
471                self.status.start_time().into()
472            }
473            VariableId::Server_ServerStatus_State => {
474                (self.status.state() as i32).into()
475            }
476
477            VariableId::Server_NamespaceArray => {
478                // This actually calls into other node managers to obtain the value, in fact
479                // it calls into _this_ node manager as well.
480                // Be careful to avoid holding exclusive locks in a way that causes a deadlock
481                // when doing this. Here we hold a read lock on the address space,
482                // but in this case it doesn't matter.
483                let nss: HashMap<_, _> = self.node_managers.iter().flat_map(|n| n.namespaces_for_user(context)).map(|ns| (ns.namespace_index, ns.namespace_uri)).collect();
484                // Make sure that holes are filled with empty strings, so that the
485                // namespace array actually has correct indices.
486                let &max = nss.keys().max()?;
487                let namespaces: Vec<_> = (0..(max + 1)).map(|idx| nss.get(&idx).cloned().unwrap_or_default()).collect();
488                namespaces.into()
489            }
490
491            r if context.info.diagnostics.is_mapped(r) => {
492                let perms = context.info.authenticator.core_permissions(&context.token);
493                if !perms.read_diagnostics {
494                    return Some(DataValue::new_now_status(Variant::Empty, StatusCode::BadUserAccessDenied));
495                } else {
496                    return Some(context.info.diagnostics.get(r).unwrap_or_default())
497                }
498            }
499
500            _ => return None,
501
502        };
503
504        let v = if !matches!(node.index_range, NumericRange::None) {
505            match v.range_of(&node.index_range) {
506                Ok(v) => v,
507                Err(e) => {
508                    return Some(DataValue {
509                        value: None,
510                        status: Some(e),
511                        ..Default::default()
512                    })
513                }
514            }
515        } else {
516            v
517        };
518
519        Some(DataValue {
520            value: Some(v),
521            status: Some(StatusCode::Good),
522            source_timestamp: Some(**context.info.start_time.load()),
523            server_timestamp: Some(**context.info.start_time.load()),
524            ..Default::default()
525        })
526    }
527
528    fn add_aggregates(&self, address_space: &mut AddressSpace, capabilities: &ServerCapabilities) {
529        for aggregate in &capabilities.history.aggregates {
530            address_space.insert_reference(
531                &ObjectId::HistoryServerCapabilities_AggregateFunctions.into(),
532                aggregate,
533                ReferenceTypeId::Organizes,
534            )
535        }
536    }
537
538    fn call_builtin_method(
539        &self,
540        call: &mut MethodCall,
541        context: &RequestContext,
542    ) -> Result<(), StatusCode> {
543        let Ok(id) = call.method_id().as_method_id() else {
544            return Ok(());
545        };
546
547        match id {
548            MethodId::Server_GetMonitoredItems => {
549                let id = load_method_args!(call, UInt32)?;
550                let subs = context
551                    .subscriptions
552                    .get_session_subscriptions(context.session_id)
553                    .ok_or(StatusCode::BadSessionIdInvalid)?;
554                let subs = trace_lock!(subs);
555                let sub = subs.get(id).ok_or(StatusCode::BadSubscriptionIdInvalid)?;
556                let (ids, handles): (Vec<_>, Vec<_>) =
557                    sub.items().map(|i| (i.id(), i.client_handle())).unzip();
558                call.set_outputs(vec![ids.into(), handles.into()]);
559                call.set_status(StatusCode::Good);
560            }
561            MethodId::Server_ResendData => {
562                let id = load_method_args!(call, UInt32)?;
563                let subs = context
564                    .subscriptions
565                    .get_session_subscriptions(context.session_id)
566                    .ok_or(StatusCode::BadSessionIdInvalid)?;
567                let mut subs = trace_lock!(subs);
568                let sub = subs
569                    .get_mut(id)
570                    .ok_or(StatusCode::BadSubscriptionIdInvalid)?;
571                sub.set_resend_data();
572                call.set_status(StatusCode::Good);
573            }
574            _ => return Err(StatusCode::BadNotSupported),
575        }
576        Ok(())
577    }
578}