Skip to main content

opcua_server/node_manager/memory/
simple.rs

1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use opcua_core::{trace_read_lock, trace_write_lock};
5use opcua_nodes::{HasNodeId, NodeSetImport};
6
7use crate::{
8    address_space::{read_node_value, write_node_value, AddressSpace},
9    node_manager::{
10        DefaultTypeTree, MethodCall, MonitoredItemRef, MonitoredItemUpdateRef, NodeManagerBuilder,
11        NodeManagersRef, ParsedReadValueId, RequestContext, ServerContext, SyncSampler, WriteNode,
12    },
13    CreateMonitoredItem,
14};
15use opcua_core::sync::RwLock;
16use opcua_types::{
17    AttributeId, DataValue, MonitoringMode, NodeClass, NodeId, NumericRange, StatusCode,
18    TimestampsToReturn, Variant,
19};
20
21use super::{
22    InMemoryNodeManager, InMemoryNodeManagerBuilder, InMemoryNodeManagerImpl,
23    InMemoryNodeManagerImplBuilder, NamespaceMetadata,
24};
25
26/// A simple in-memory node manager with utility methods for updating the address space,
27/// and a mechanism for setting callbacks on `Read` and `Write` of values.
28pub type SimpleNodeManager = InMemoryNodeManager<SimpleNodeManagerImpl>;
29
30type WriteCB = Arc<dyn Fn(DataValue, &NumericRange) -> StatusCode + Send + Sync + 'static>;
31type ReadCB = Arc<
32    dyn Fn(&NumericRange, TimestampsToReturn, f64) -> Result<DataValue, StatusCode>
33        + Send
34        + Sync
35        + 'static,
36>;
37type MethodCB = Arc<dyn Fn(&[Variant]) -> Result<Vec<Variant>, StatusCode> + Send + Sync + 'static>;
38
39/// Builder for the [SimpleNodeManager].
40pub struct SimpleNodeManagerBuilder {
41    namespaces: Vec<NamespaceMetadata>,
42    imports: Vec<Box<dyn NodeSetImport>>,
43    name: String,
44}
45
46impl SimpleNodeManagerBuilder {
47    /// Create a new simple node manager builder with the given namespace
48    /// and name.
49    pub fn new(namespace: NamespaceMetadata, name: &str) -> Self {
50        Self {
51            namespaces: vec![namespace],
52            imports: Vec::new(),
53            name: name.to_owned(),
54        }
55    }
56
57    /// Create a new simple node manager that imports from the given list
58    /// of [NodeSetImport]s.
59    pub fn new_imports(imports: Vec<Box<dyn NodeSetImport>>, name: &str) -> Self {
60        Self {
61            namespaces: Vec::new(),
62            imports,
63            name: name.to_owned(),
64        }
65    }
66}
67
68impl InMemoryNodeManagerImplBuilder for SimpleNodeManagerBuilder {
69    type Impl = SimpleNodeManagerImpl;
70
71    fn build(mut self, context: ServerContext, address_space: &mut AddressSpace) -> Self::Impl {
72        {
73            let mut type_tree = context.type_tree.write();
74            for import in self.imports {
75                address_space.import_node_set(&*import, type_tree.namespaces_mut());
76                let nss = import.get_own_namespaces();
77                for ns in nss {
78                    if !self.namespaces.iter().any(|n| n.namespace_uri == ns) {
79                        self.namespaces.push(NamespaceMetadata {
80                            namespace_uri: ns,
81                            ..Default::default()
82                        });
83                    }
84                }
85            }
86            for ns in &mut self.namespaces {
87                ns.namespace_index = type_tree.namespaces_mut().add_namespace(&ns.namespace_uri);
88            }
89        }
90        for ns in &self.namespaces {
91            address_space.add_namespace(&ns.namespace_uri, ns.namespace_index);
92        }
93        SimpleNodeManagerImpl::new(self.namespaces, &self.name, context.node_managers.clone())
94    }
95}
96
97/// Create a node manager builder for the simple node manager with the given
98/// namespace and name.
99pub fn simple_node_manager(namespace: NamespaceMetadata, name: &str) -> impl NodeManagerBuilder {
100    InMemoryNodeManagerBuilder::new(SimpleNodeManagerBuilder::new(namespace, name))
101}
102
103/// Create a new simple node manager that imports from the given list
104/// of [NodeSetImport]s.
105pub fn simple_node_manager_imports(
106    imports: Vec<Box<dyn NodeSetImport>>,
107    name: &str,
108) -> impl NodeManagerBuilder {
109    InMemoryNodeManagerBuilder::new(SimpleNodeManagerBuilder::new_imports(imports, name))
110}
111
112/// Node manager designed to deal with simple, entirely in-memory, synchronous OPC-UA servers.
113///
114/// Use this if
115///
116///  - Your node hierarchy is known and small enough to fit in memory.
117///  - No read, write, or method call operations are async or particularly time consuming.
118///  - and you don't need to be able to write attributes other than `Value`.
119pub struct SimpleNodeManagerImpl {
120    write_cbs: RwLock<HashMap<NodeId, WriteCB>>,
121    read_cbs: RwLock<HashMap<NodeId, ReadCB>>,
122    method_cbs: RwLock<HashMap<NodeId, MethodCB>>,
123    namespaces: Vec<NamespaceMetadata>,
124    #[allow(unused)]
125    node_managers: NodeManagersRef,
126    name: String,
127    samplers: SyncSampler,
128}
129
130#[async_trait]
131impl InMemoryNodeManagerImpl for SimpleNodeManagerImpl {
132    async fn init(&self, _address_space: &mut AddressSpace, context: ServerContext) {
133        self.samplers.run(
134            Duration::from_millis(
135                context
136                    .info
137                    .config
138                    .limits
139                    .subscriptions
140                    .min_sampling_interval_ms as u64,
141            ),
142            context.subscriptions.clone(),
143        );
144    }
145
146    fn namespaces(&self) -> Vec<NamespaceMetadata> {
147        self.namespaces.clone()
148    }
149
150    fn name(&self) -> &str {
151        &self.name
152    }
153
154    async fn read_values(
155        &self,
156        context: &RequestContext,
157        address_space: &RwLock<AddressSpace>,
158        nodes: &[&ParsedReadValueId],
159        max_age: f64,
160        timestamps_to_return: TimestampsToReturn,
161    ) -> Vec<DataValue> {
162        let address_space = address_space.read();
163        let cbs = trace_read_lock!(self.read_cbs);
164
165        nodes
166            .iter()
167            .map(|n| {
168                self.read_node_value(
169                    &cbs,
170                    context,
171                    &address_space,
172                    n,
173                    max_age,
174                    timestamps_to_return,
175                )
176            })
177            .collect()
178    }
179
180    async fn create_value_monitored_items(
181        &self,
182        context: &RequestContext,
183        address_space: &RwLock<AddressSpace>,
184        items: &mut [&mut &mut CreateMonitoredItem],
185    ) {
186        let to_read: Vec<_> = items.iter().map(|r| r.item_to_monitor()).collect();
187        let values = self
188            .read_values(
189                context,
190                address_space,
191                &to_read,
192                0.0,
193                TimestampsToReturn::Both,
194            )
195            .await;
196
197        let cbs = trace_read_lock!(self.read_cbs);
198
199        for (value, node) in values.into_iter().zip(items.iter_mut()) {
200            if value.status() != StatusCode::BadAttributeIdInvalid {
201                node.set_initial_value(value);
202            }
203            node.set_status(StatusCode::Good);
204            let rf = &node.item_to_monitor().node_id;
205
206            if let Some(cb) = cbs.get(rf).cloned() {
207                let tss = node.timestamps_to_return();
208                let index_range = node.item_to_monitor().index_range.clone();
209
210                self.samplers.add_sampler(
211                    node.item_to_monitor().node_id.clone(),
212                    AttributeId::Value,
213                    move || {
214                        Some(match cb(&index_range, tss, 0.0) {
215                            Err(e) => DataValue {
216                                status: Some(e),
217                                ..Default::default()
218                            },
219                            Ok(v) => v,
220                        })
221                    },
222                    node.monitoring_mode(),
223                    node.handle(),
224                    Duration::from_millis(node.sampling_interval() as u64),
225                )
226            }
227        }
228    }
229
230    async fn modify_monitored_items(
231        &self,
232        _context: &RequestContext,
233        items: &[&MonitoredItemUpdateRef],
234    ) {
235        for it in items {
236            self.samplers.update_sampler(
237                it.node_id(),
238                it.attribute(),
239                it.handle(),
240                Duration::from_millis(it.update().revised_sampling_interval as u64),
241            );
242        }
243    }
244
245    async fn set_monitoring_mode(
246        &self,
247        _context: &RequestContext,
248        mode: MonitoringMode,
249        items: &[&MonitoredItemRef],
250    ) {
251        for it in items {
252            self.samplers
253                .set_sampler_mode(it.node_id(), it.attribute(), it.handle(), mode);
254        }
255    }
256
257    async fn delete_monitored_items(&self, _context: &RequestContext, items: &[&MonitoredItemRef]) {
258        for it in items {
259            self.samplers
260                .remove_sampler(it.node_id(), it.attribute(), it.handle());
261        }
262    }
263
264    async fn write(
265        &self,
266        context: &RequestContext,
267        address_space: &RwLock<AddressSpace>,
268        nodes_to_write: &mut [&mut WriteNode],
269    ) -> Result<(), StatusCode> {
270        let mut address_space = trace_write_lock!(address_space);
271        let type_tree = trace_read_lock!(context.type_tree);
272        let cbs = trace_read_lock!(self.write_cbs);
273
274        for write in nodes_to_write {
275            self.write_node_value(&cbs, context, &mut address_space, &type_tree, write);
276        }
277
278        Ok(())
279    }
280
281    async fn call(
282        &self,
283        _context: &RequestContext,
284        _address_space: &RwLock<AddressSpace>,
285        methods_to_call: &mut [&mut &mut MethodCall],
286    ) -> Result<(), StatusCode> {
287        let cbs = trace_read_lock!(self.method_cbs);
288        for method in methods_to_call {
289            if let Some(cb) = cbs.get(method.method_id()) {
290                match cb(method.arguments()) {
291                    Ok(r) => {
292                        method.set_outputs(r);
293                        method.set_status(StatusCode::Good);
294                    }
295                    Err(e) => method.set_status(e),
296                }
297            }
298        }
299
300        Ok(())
301    }
302}
303
304impl SimpleNodeManagerImpl {
305    fn new(namespaces: Vec<NamespaceMetadata>, name: &str, node_managers: NodeManagersRef) -> Self {
306        Self {
307            write_cbs: Default::default(),
308            read_cbs: Default::default(),
309            method_cbs: Default::default(),
310            namespaces,
311            name: name.to_owned(),
312            node_managers,
313            samplers: SyncSampler::new(),
314        }
315    }
316
317    fn read_node_value(
318        &self,
319        cbs: &HashMap<NodeId, ReadCB>,
320        context: &RequestContext,
321        address_space: &AddressSpace,
322        node_to_read: &ParsedReadValueId,
323        max_age: f64,
324        timestamps_to_return: TimestampsToReturn,
325    ) -> DataValue {
326        let mut result_value = DataValue::null();
327        // Check that the read is permitted.
328        let node = match address_space.validate_node_read(context, node_to_read) {
329            Ok(n) => n,
330            Err(e) => {
331                result_value.status = Some(e);
332                return result_value;
333            }
334        };
335
336        // If there is a callback registered, call that, otherwise read it from the node hierarchy.
337        if let Some(cb) = cbs.get(&node_to_read.node_id) {
338            match cb(&node_to_read.index_range, timestamps_to_return, max_age) {
339                Err(e) => DataValue {
340                    status: Some(e),
341                    ..Default::default()
342                },
343                Ok(v) => v,
344            }
345        } else {
346            // If it can't be found, read it from the node hierarchy.
347            read_node_value(node, context, node_to_read, max_age, timestamps_to_return)
348        }
349    }
350
351    fn write_node_value(
352        &self,
353        cbs: &HashMap<NodeId, WriteCB>,
354        context: &RequestContext,
355        address_space: &mut AddressSpace,
356        type_tree: &DefaultTypeTree,
357        write: &mut WriteNode,
358    ) {
359        let node = match address_space.validate_node_write(context, write.value(), type_tree) {
360            Ok(v) => v,
361            Err(e) => {
362                write.set_status(e);
363                return;
364            }
365        };
366
367        if node.node_class() != NodeClass::Variable
368            || write.value().attribute_id != AttributeId::Value
369        {
370            write.set_status(StatusCode::BadNotWritable);
371            return;
372        }
373
374        if let Some(cb) = cbs.get(node.as_node().node_id()) {
375            // If there is a callback registered, call that.
376            write.set_status(cb(write.value().value.clone(), &write.value().index_range));
377        } else if write.value().value.value.is_some() {
378            // If not, write the value to the node hierarchy.
379            match write_node_value(node, write.value()) {
380                Ok(_) => write.set_status(StatusCode::Good),
381                Err(e) => write.set_status(e),
382            }
383        } else {
384            // If no value is passed return an error.
385            write.set_status(StatusCode::BadNothingToDo);
386        }
387        if write.status().is_good() {
388            if let Some(val) = node.as_mut_node().get_attribute(
389                TimestampsToReturn::Both,
390                write.value().attribute_id,
391                &NumericRange::None,
392                &opcua_types::DataEncoding::Binary,
393            ) {
394                context.subscriptions.notify_data_change(
395                    [(val, node.node_id(), write.value().attribute_id)].into_iter(),
396                );
397            }
398        }
399    }
400
401    /// Add a callback called on `Write` for the node given by `id`.
402    pub fn add_write_callback(
403        &self,
404        id: NodeId,
405        cb: impl Fn(DataValue, &NumericRange) -> StatusCode + Send + Sync + 'static,
406    ) {
407        let mut cbs = trace_write_lock!(self.write_cbs);
408        cbs.insert(id, Arc::new(cb));
409    }
410
411    /// Add a callback for `Read` on the node given by `id`.
412    pub fn add_read_callback(
413        &self,
414        id: NodeId,
415        cb: impl Fn(&NumericRange, TimestampsToReturn, f64) -> Result<DataValue, StatusCode>
416            + Send
417            + Sync
418            + 'static,
419    ) {
420        let mut cbs = trace_write_lock!(self.read_cbs);
421        cbs.insert(id, Arc::new(cb));
422    }
423
424    /// Add a callback for `Call` on the method given by `id`.
425    pub fn add_method_callback(
426        &self,
427        id: NodeId,
428        cb: impl Fn(&[Variant]) -> Result<Vec<Variant>, StatusCode> + Send + Sync + 'static,
429    ) {
430        let mut cbs = trace_write_lock!(self.method_cbs);
431        cbs.insert(id, Arc::new(cb));
432    }
433}