1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use chrono::Offset;
5use hashbrown::HashMap;
6
7use crate::{
8 address_space::{read_node_value, AddressSpace, CoreNamespace},
9 diagnostics::NamespaceMetadata,
10 load_method_args,
11 node_manager::{
12 MethodCall, MonitoredItemRef, MonitoredItemUpdateRef, NodeManagersRef, ParsedReadValueId,
13 RequestContext, ServerContext, SyncSampler,
14 },
15 subscriptions::CreateMonitoredItem,
16 ServerCapabilities, ServerStatusWrapper,
17};
18use opcua_core::{sync::RwLock, trace_lock};
19use opcua_types::{
20 DataValue, DateTime, ExtensionObject, IdType, Identifier, MethodId, MonitoringMode, NodeId,
21 NumericRange, ObjectId, ReferenceTypeId, StatusCode, TimeZoneDataType, TimestampsToReturn,
22 VariableId, Variant, VariantScalarTypeId, VariantTypeId,
23};
24
25use super::{InMemoryNodeManager, InMemoryNodeManagerImpl, InMemoryNodeManagerImplBuilder};
26
27pub struct CoreNodeManagerImpl {
29 sampler: SyncSampler,
30 node_managers: NodeManagersRef,
31 status: Arc<ServerStatusWrapper>,
32}
33
34pub type CoreNodeManager = InMemoryNodeManager<CoreNodeManagerImpl>;
36
37pub struct CoreNodeManagerBuilder;
39
40impl InMemoryNodeManagerImplBuilder for CoreNodeManagerBuilder {
41 type Impl = CoreNodeManagerImpl;
42
43 fn build(self, context: ServerContext, address_space: &mut AddressSpace) -> Self::Impl {
44 {
45 let mut type_tree = context.type_tree.write();
46 address_space.import_node_set(&CoreNamespace, type_tree.namespaces_mut());
47 }
48
49 CoreNodeManagerImpl::new(context.node_managers.clone(), context.status.clone())
50 }
51}
52
53#[async_trait]
63impl InMemoryNodeManagerImpl for CoreNodeManagerImpl {
64 async fn init(&self, address_space: &mut AddressSpace, context: ServerContext) {
65 self.add_aggregates(address_space, &context.info.capabilities);
66 let interval = context
67 .info
68 .config
69 .limits
70 .subscriptions
71 .min_sampling_interval_ms
72 .floor() as u64;
73 let sampler_interval = if interval > 0 { interval } else { 100 };
74 self.sampler.run(
75 Duration::from_millis(sampler_interval),
76 context.subscriptions.clone(),
77 );
78 }
79
80 fn namespaces(&self) -> Vec<NamespaceMetadata> {
81 vec![NamespaceMetadata {
82 is_namespace_subset: Some(false),
86 namespace_publication_date: None,
87 namespace_version: None,
88 namespace_uri: "http://opcfoundation.org/UA/".to_owned(),
89 static_node_id_types: Some(vec![IdType::Numeric]),
90 namespace_index: 0,
91 ..Default::default()
92 }]
93 }
94
95 fn name(&self) -> &str {
96 "core"
97 }
98
99 async fn read_values(
100 &self,
101 context: &RequestContext,
102 address_space: &RwLock<AddressSpace>,
103 nodes: &[&ParsedReadValueId],
104 max_age: f64,
105 timestamps_to_return: TimestampsToReturn,
106 ) -> Vec<DataValue> {
107 let address_space = address_space.read();
108
109 nodes
110 .iter()
111 .map(|n| {
112 self.read_node_value(context, &address_space, n, max_age, timestamps_to_return)
113 })
114 .collect()
115 }
116
117 async fn call(
118 &self,
119 context: &RequestContext,
120 _address_space: &RwLock<AddressSpace>,
121 methods_to_call: &mut [&mut &mut MethodCall],
122 ) -> Result<(), StatusCode> {
123 for method in methods_to_call {
124 if let Err(e) = self.call_builtin_method(method, context) {
125 method.set_status(e);
126 }
127 }
128 Ok(())
129 }
130
131 async fn create_value_monitored_items(
132 &self,
133 context: &RequestContext,
134 address_space: &RwLock<AddressSpace>,
135 items: &mut [&mut &mut CreateMonitoredItem],
136 ) {
137 let address_space = address_space.read();
138 for node in items {
139 let value = self.read_node_value(
140 context,
141 &address_space,
142 node.item_to_monitor(),
143 0.0,
144 node.timestamps_to_return(),
145 );
146 if value.status() == StatusCode::BadUserAccessDenied {
147 node.set_status(StatusCode::BadUserAccessDenied);
148 continue;
149 }
150 if value.status() != StatusCode::BadAttributeIdInvalid {
151 node.set_initial_value(value);
152 }
153 node.set_status(StatusCode::Good);
154
155 if let Some(var_id) = self.status.get_managed_id(&node.item_to_monitor().node_id) {
156 self.status.subscribe_to_component(
157 var_id,
158 node.monitoring_mode(),
159 node.handle(),
160 Duration::from_millis(node.sampling_interval() as u64),
161 );
162 } else if self.is_internal_sampled(&node.item_to_monitor().node_id, context) {
163 if let Err(e) = self.add_internal_sampler(node, context) {
164 node.set_status(e);
165 }
166 }
167 }
168 }
169
170 async fn set_monitoring_mode(
171 &self,
172 _context: &RequestContext,
173 mode: MonitoringMode,
174 items: &[&MonitoredItemRef],
175 ) {
176 for item in items {
177 if self.status.get_managed_id(item.node_id()).is_some() {
178 self.status.sampler().set_sampler_mode(
179 item.node_id(),
180 item.attribute(),
181 item.handle(),
182 mode,
183 );
184 }
185 }
186 }
187
188 async fn modify_monitored_items(
189 &self,
190 _context: &RequestContext,
191 items: &[&MonitoredItemUpdateRef],
192 ) {
193 for item in items {
194 if self.status.get_managed_id(item.node_id()).is_some() {
195 self.status.sampler().update_sampler(
196 item.node_id(),
197 item.attribute(),
198 item.handle(),
199 Duration::from_millis(item.update().revised_sampling_interval as u64),
200 );
201 }
202 }
203 }
204
205 async fn delete_monitored_items(&self, _context: &RequestContext, items: &[&MonitoredItemRef]) {
206 for item in items {
207 if self.status.get_managed_id(item.node_id()).is_some() {
208 self.status.sampler().remove_sampler(
209 item.node_id(),
210 item.attribute(),
211 item.handle(),
212 );
213 }
214 }
215 }
216}
217
218impl CoreNodeManagerImpl {
219 pub(super) fn new(node_managers: NodeManagersRef, status: Arc<ServerStatusWrapper>) -> Self {
220 Self {
221 sampler: SyncSampler::new(),
222 status,
223 node_managers,
224 }
225 }
226
227 fn read_node_value(
228 &self,
229 context: &RequestContext,
230 address_space: &AddressSpace,
231 node_to_read: &ParsedReadValueId,
232 max_age: f64,
233 timestamps_to_return: TimestampsToReturn,
234 ) -> DataValue {
235 let mut result_value = DataValue::null();
236 let node = match address_space.validate_node_read(context, node_to_read) {
238 Ok(n) => n,
239 Err(e) => {
240 result_value.status = Some(e);
241 return result_value;
242 }
243 };
244 if let Some(v) = self.read_server_value(context, node_to_read) {
250 v
251 } else {
252 read_node_value(node, context, node_to_read, max_age, timestamps_to_return)
254 }
255 }
256
257 fn get_variable_id(&self, node: &NodeId) -> Option<VariableId> {
258 if node.namespace != 0 {
259 return None;
260 }
261 let Identifier::Numeric(identifier) = node.identifier else {
262 return None;
263 };
264 VariableId::try_from(identifier).ok()
265 }
266
267 fn is_internal_sampled(&self, node: &NodeId, context: &RequestContext) -> bool {
268 let Some(variable_id) = self.get_variable_id(node) else {
269 return false;
270 };
271
272 context.info.diagnostics.is_mapped(variable_id)
273 }
274
275 fn add_internal_sampler(
276 &self,
277 monitored_item: &mut CreateMonitoredItem,
278 context: &RequestContext,
279 ) -> Result<(), StatusCode> {
280 let Some(var_id) = self.get_variable_id(&monitored_item.item_to_monitor().node_id) else {
281 return Err(StatusCode::BadNodeIdUnknown);
282 };
283
284 if context.info.diagnostics.is_mapped(var_id) {
285 let info = context.info.clone();
286 self.sampler.add_sampler(
287 monitored_item.item_to_monitor().node_id.clone(),
288 monitored_item.item_to_monitor().attribute_id,
289 move || info.diagnostics.get(var_id),
290 monitored_item.monitoring_mode(),
291 monitored_item.handle(),
292 Duration::from_millis(monitored_item.sampling_interval() as u64),
293 );
294 Ok(())
295 } else {
296 Err(StatusCode::BadNodeIdUnknown)
297 }
298 }
299
300 fn read_server_value(
301 &self,
302 context: &RequestContext,
303 node: &ParsedReadValueId,
304 ) -> Option<DataValue> {
305 let var_id = self.get_variable_id(&node.node_id)?;
306
307 let limits = &context.info.config.limits;
308 let hist_cap = &context.info.capabilities.history;
309
310 let v: Variant = match var_id {
311 VariableId::Server_ServerCapabilities_MaxArrayLength => {
312 (limits.max_array_length as u32).into()
313 }
314 VariableId::Server_ServerCapabilities_MaxBrowseContinuationPoints => {
315 (limits.max_browse_continuation_points as u16).into()
316 }
317 VariableId::Server_ServerCapabilities_MaxByteStringLength => {
318 (limits.max_byte_string_length as u32).into()
319 }
320 VariableId::Server_ServerCapabilities_MaxHistoryContinuationPoints => {
321 (limits.max_history_continuation_points as u16).into()
322 }
323 VariableId::Server_ServerCapabilities_MaxQueryContinuationPoints => {
324 (limits.max_query_continuation_points as u16).into()
325 }
326 VariableId::Server_ServerCapabilities_MaxStringLength => {
327 (limits.max_string_length as u32).into()
328 }
329 VariableId::Server_ServerCapabilities_MinSupportedSampleRate => {
330 (limits.subscriptions.min_sampling_interval_ms as u32).into()
331 }
332 VariableId::Server_ServerCapabilities_OperationLimits_MaxMonitoredItemsPerCall => {
333 (limits.operational.max_monitored_items_per_call as u32).into()
334 }
335 VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerBrowse => {
336 (limits.operational.max_nodes_per_browse as u32).into()
337 }
338 VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadData => {
339 (limits.operational.max_nodes_per_history_read_data as u32).into()
340 }
341 VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadEvents => {
342 (limits.operational.max_nodes_per_history_read_events as u32).into()
343 }
344 VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateData => {
345 (limits.operational.max_nodes_per_history_update as u32).into()
346 }
347 VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateEvents => {
348 (limits.operational.max_nodes_per_history_update as u32).into()
349 }
350 VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerMethodCall => {
351 (limits.operational.max_nodes_per_method_call as u32).into()
352 }
353 VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerNodeManagement => {
354 (limits.operational.max_nodes_per_node_management as u32).into()
355 }
356 VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerRead => {
357 (limits.operational.max_nodes_per_read as u32).into()
358 }
359 VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerRegisterNodes => {
360 (limits.operational.max_nodes_per_register_nodes as u32).into()
361 }
362 VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerTranslateBrowsePathsToNodeIds => {
363 (limits.operational.max_nodes_per_translate_browse_paths_to_node_ids as u32).into()
364 }
365 VariableId::Server_ServerCapabilities_OperationLimits_MaxNodesPerWrite => {
366 (limits.operational.max_nodes_per_write as u32).into()
367 }
368 VariableId::Server_ServerCapabilities_ServerProfileArray => {
369 context.info.capabilities.profiles.clone().into()
370 }
371
372 VariableId::HistoryServerCapabilities_AccessHistoryDataCapability => {
374 hist_cap.access_history_data.into()
375 }
376 VariableId::HistoryServerCapabilities_AccessHistoryEventsCapability => {
377 hist_cap.access_history_events.into()
378 }
379 VariableId::HistoryServerCapabilities_DeleteAtTimeCapability => {
380 hist_cap.delete_at_time.into()
381 }
382 VariableId::HistoryServerCapabilities_DeleteEventCapability => {
383 hist_cap.delete_event.into()
384 }
385 VariableId::HistoryServerCapabilities_DeleteRawCapability => {
386 hist_cap.delete_raw.into()
387 }
388 VariableId::HistoryServerCapabilities_InsertAnnotationCapability => {
389 hist_cap.insert_annotation.into()
390 }
391 VariableId::HistoryServerCapabilities_InsertDataCapability => {
392 hist_cap.insert_data.into()
393 }
394 VariableId::HistoryServerCapabilities_InsertEventCapability => {
395 hist_cap.insert_event.into()
396 }
397 VariableId::HistoryServerCapabilities_MaxReturnDataValues => {
398 hist_cap.max_return_data_values.into()
399 }
400 VariableId::HistoryServerCapabilities_MaxReturnEventValues => {
401 hist_cap.max_return_event_values.into()
402 }
403 VariableId::HistoryServerCapabilities_ReplaceDataCapability => {
404 hist_cap.replace_data.into()
405 }
406 VariableId::HistoryServerCapabilities_ReplaceEventCapability => {
407 hist_cap.replace_event.into()
408 }
409 VariableId::HistoryServerCapabilities_ServerTimestampSupported => {
410 hist_cap.server_timestamp_supported.into()
411 }
412 VariableId::HistoryServerCapabilities_UpdateDataCapability => {
413 hist_cap.update_data.into()
414 }
415 VariableId::HistoryServerCapabilities_UpdateEventCapability => {
416 hist_cap.update_event.into()
417 }
418
419 VariableId::Server_ServiceLevel => {
421 context.info.service_level.load(std::sync::atomic::Ordering::Relaxed).into()
422 }
423 VariableId::Server_LocalTime => {
424 let offset = chrono::Local::now().offset().fix().local_minus_utc() / 60;
425 ExtensionObject::from_message(TimeZoneDataType {
426 offset: offset.try_into().ok()?,
427 daylight_saving_in_offset: false,
430 }).into()
431 }
432
433 VariableId::Server_ServerStatus => {
435 self.status.full_status_obj().into()
436 }
437 VariableId::Server_ServerStatus_BuildInfo => {
438 ExtensionObject::from_message(self.status.build_info()).into()
439 }
440 VariableId::Server_ServerStatus_BuildInfo_BuildDate => {
441 self.status.build_info().build_date.into()
442 }
443 VariableId::Server_ServerStatus_BuildInfo_BuildNumber => {
444 self.status.build_info().build_number.into()
445 }
446 VariableId::Server_ServerStatus_BuildInfo_ManufacturerName => {
447 self.status.build_info().manufacturer_name.into()
448 }
449 VariableId::Server_ServerStatus_BuildInfo_ProductName => {
450 self.status.build_info().product_name.into()
451 }
452 VariableId::Server_ServerStatus_BuildInfo_ProductUri => {
453 self.status.build_info().product_uri.into()
454 }
455 VariableId::Server_ServerStatus_BuildInfo_SoftwareVersion => {
456 self.status.build_info().software_version.into()
457 }
458 VariableId::Server_ServerStatus_CurrentTime => {
459 DateTime::now().into()
460 }
461 VariableId::Server_ServerStatus_SecondsTillShutdown => {
462 match self.status.seconds_till_shutdown() {
463 Some(x) => x.into(),
464 None => Variant::Empty
465 }
466 }
467 VariableId::Server_ServerStatus_ShutdownReason => {
468 self.status.shutdown_reason().into()
469 }
470 VariableId::Server_ServerStatus_StartTime => {
471 self.status.start_time().into()
472 }
473 VariableId::Server_ServerStatus_State => {
474 (self.status.state() as i32).into()
475 }
476
477 VariableId::Server_NamespaceArray => {
478 let nss: HashMap<_, _> = self.node_managers.iter().flat_map(|n| n.namespaces_for_user(context)).map(|ns| (ns.namespace_index, ns.namespace_uri)).collect();
484 let &max = nss.keys().max()?;
487 let namespaces: Vec<_> = (0..(max + 1)).map(|idx| nss.get(&idx).cloned().unwrap_or_default()).collect();
488 namespaces.into()
489 }
490
491 r if context.info.diagnostics.is_mapped(r) => {
492 let perms = context.info.authenticator.core_permissions(&context.token);
493 if !perms.read_diagnostics {
494 return Some(DataValue::new_now_status(Variant::Empty, StatusCode::BadUserAccessDenied));
495 } else {
496 return Some(context.info.diagnostics.get(r).unwrap_or_default())
497 }
498 }
499
500 _ => return None,
501
502 };
503
504 let v = if !matches!(node.index_range, NumericRange::None) {
505 match v.range_of(&node.index_range) {
506 Ok(v) => v,
507 Err(e) => {
508 return Some(DataValue {
509 value: None,
510 status: Some(e),
511 ..Default::default()
512 })
513 }
514 }
515 } else {
516 v
517 };
518
519 Some(DataValue {
520 value: Some(v),
521 status: Some(StatusCode::Good),
522 source_timestamp: Some(**context.info.start_time.load()),
523 server_timestamp: Some(**context.info.start_time.load()),
524 ..Default::default()
525 })
526 }
527
528 fn add_aggregates(&self, address_space: &mut AddressSpace, capabilities: &ServerCapabilities) {
529 for aggregate in &capabilities.history.aggregates {
530 address_space.insert_reference(
531 &ObjectId::HistoryServerCapabilities_AggregateFunctions.into(),
532 aggregate,
533 ReferenceTypeId::Organizes,
534 )
535 }
536 }
537
538 fn call_builtin_method(
539 &self,
540 call: &mut MethodCall,
541 context: &RequestContext,
542 ) -> Result<(), StatusCode> {
543 let Ok(id) = call.method_id().as_method_id() else {
544 return Ok(());
545 };
546
547 match id {
548 MethodId::Server_GetMonitoredItems => {
549 let id = load_method_args!(call, UInt32)?;
550 let subs = context
551 .subscriptions
552 .get_session_subscriptions(context.session_id)
553 .ok_or(StatusCode::BadSessionIdInvalid)?;
554 let subs = trace_lock!(subs);
555 let sub = subs.get(id).ok_or(StatusCode::BadSubscriptionIdInvalid)?;
556 let (ids, handles): (Vec<_>, Vec<_>) =
557 sub.items().map(|i| (i.id(), i.client_handle())).unzip();
558 call.set_outputs(vec![ids.into(), handles.into()]);
559 call.set_status(StatusCode::Good);
560 }
561 MethodId::Server_ResendData => {
562 let id = load_method_args!(call, UInt32)?;
563 let subs = context
564 .subscriptions
565 .get_session_subscriptions(context.session_id)
566 .ok_or(StatusCode::BadSessionIdInvalid)?;
567 let mut subs = trace_lock!(subs);
568 let sub = subs
569 .get_mut(id)
570 .ok_or(StatusCode::BadSubscriptionIdInvalid)?;
571 sub.set_resend_data();
572 call.set_status(StatusCode::Good);
573 }
574 _ => return Err(StatusCode::BadNotSupported),
575 }
576 Ok(())
577 }
578}