1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use opcua_core::{trace_read_lock, trace_write_lock};
5use opcua_nodes::{HasNodeId, NodeSetImport};
6
7use crate::{
8 address_space::{read_node_value, write_node_value, AddressSpace},
9 node_manager::{
10 DefaultTypeTree, MethodCall, MonitoredItemRef, MonitoredItemUpdateRef, NodeManagerBuilder,
11 NodeManagersRef, ParsedReadValueId, RequestContext, ServerContext, SyncSampler, WriteNode,
12 },
13 CreateMonitoredItem,
14};
15use opcua_core::sync::RwLock;
16use opcua_types::{
17 AttributeId, DataValue, MonitoringMode, NodeClass, NodeId, NumericRange, StatusCode,
18 TimestampsToReturn, Variant,
19};
20
21use super::{
22 InMemoryNodeManager, InMemoryNodeManagerBuilder, InMemoryNodeManagerImpl,
23 InMemoryNodeManagerImplBuilder, NamespaceMetadata,
24};
25
26pub type SimpleNodeManager = InMemoryNodeManager<SimpleNodeManagerImpl>;
29
30type WriteCB = Arc<dyn Fn(DataValue, &NumericRange) -> StatusCode + Send + Sync + 'static>;
31type ReadCB = Arc<
32 dyn Fn(&NumericRange, TimestampsToReturn, f64) -> Result<DataValue, StatusCode>
33 + Send
34 + Sync
35 + 'static,
36>;
37type MethodCB = Arc<dyn Fn(&[Variant]) -> Result<Vec<Variant>, StatusCode> + Send + Sync + 'static>;
38
39pub struct SimpleNodeManagerBuilder {
41 namespaces: Vec<NamespaceMetadata>,
42 imports: Vec<Box<dyn NodeSetImport>>,
43 name: String,
44}
45
46impl SimpleNodeManagerBuilder {
47 pub fn new(namespace: NamespaceMetadata, name: &str) -> Self {
50 Self {
51 namespaces: vec![namespace],
52 imports: Vec::new(),
53 name: name.to_owned(),
54 }
55 }
56
57 pub fn new_imports(imports: Vec<Box<dyn NodeSetImport>>, name: &str) -> Self {
60 Self {
61 namespaces: Vec::new(),
62 imports,
63 name: name.to_owned(),
64 }
65 }
66}
67
68impl InMemoryNodeManagerImplBuilder for SimpleNodeManagerBuilder {
69 type Impl = SimpleNodeManagerImpl;
70
71 fn build(mut self, context: ServerContext, address_space: &mut AddressSpace) -> Self::Impl {
72 {
73 let mut type_tree = context.type_tree.write();
74 for import in self.imports {
75 address_space.import_node_set(&*import, type_tree.namespaces_mut());
76 let nss = import.get_own_namespaces();
77 for ns in nss {
78 if !self.namespaces.iter().any(|n| n.namespace_uri == ns) {
79 self.namespaces.push(NamespaceMetadata {
80 namespace_uri: ns,
81 ..Default::default()
82 });
83 }
84 }
85 }
86 for ns in &mut self.namespaces {
87 ns.namespace_index = type_tree.namespaces_mut().add_namespace(&ns.namespace_uri);
88 }
89 }
90 for ns in &self.namespaces {
91 address_space.add_namespace(&ns.namespace_uri, ns.namespace_index);
92 }
93 SimpleNodeManagerImpl::new(self.namespaces, &self.name, context.node_managers.clone())
94 }
95}
96
97pub fn simple_node_manager(namespace: NamespaceMetadata, name: &str) -> impl NodeManagerBuilder {
100 InMemoryNodeManagerBuilder::new(SimpleNodeManagerBuilder::new(namespace, name))
101}
102
103pub fn simple_node_manager_imports(
106 imports: Vec<Box<dyn NodeSetImport>>,
107 name: &str,
108) -> impl NodeManagerBuilder {
109 InMemoryNodeManagerBuilder::new(SimpleNodeManagerBuilder::new_imports(imports, name))
110}
111
112pub struct SimpleNodeManagerImpl {
120 write_cbs: RwLock<HashMap<NodeId, WriteCB>>,
121 read_cbs: RwLock<HashMap<NodeId, ReadCB>>,
122 method_cbs: RwLock<HashMap<NodeId, MethodCB>>,
123 namespaces: Vec<NamespaceMetadata>,
124 #[allow(unused)]
125 node_managers: NodeManagersRef,
126 name: String,
127 samplers: SyncSampler,
128}
129
130#[async_trait]
131impl InMemoryNodeManagerImpl for SimpleNodeManagerImpl {
132 async fn init(&self, _address_space: &mut AddressSpace, context: ServerContext) {
133 self.samplers.run(
134 Duration::from_millis(
135 context
136 .info
137 .config
138 .limits
139 .subscriptions
140 .min_sampling_interval_ms as u64,
141 ),
142 context.subscriptions.clone(),
143 );
144 }
145
146 fn namespaces(&self) -> Vec<NamespaceMetadata> {
147 self.namespaces.clone()
148 }
149
150 fn name(&self) -> &str {
151 &self.name
152 }
153
154 async fn read_values(
155 &self,
156 context: &RequestContext,
157 address_space: &RwLock<AddressSpace>,
158 nodes: &[&ParsedReadValueId],
159 max_age: f64,
160 timestamps_to_return: TimestampsToReturn,
161 ) -> Vec<DataValue> {
162 let address_space = address_space.read();
163 let cbs = trace_read_lock!(self.read_cbs);
164
165 nodes
166 .iter()
167 .map(|n| {
168 self.read_node_value(
169 &cbs,
170 context,
171 &address_space,
172 n,
173 max_age,
174 timestamps_to_return,
175 )
176 })
177 .collect()
178 }
179
180 async fn create_value_monitored_items(
181 &self,
182 context: &RequestContext,
183 address_space: &RwLock<AddressSpace>,
184 items: &mut [&mut &mut CreateMonitoredItem],
185 ) {
186 let to_read: Vec<_> = items.iter().map(|r| r.item_to_monitor()).collect();
187 let values = self
188 .read_values(
189 context,
190 address_space,
191 &to_read,
192 0.0,
193 TimestampsToReturn::Both,
194 )
195 .await;
196
197 let cbs = trace_read_lock!(self.read_cbs);
198
199 for (value, node) in values.into_iter().zip(items.iter_mut()) {
200 if value.status() != StatusCode::BadAttributeIdInvalid {
201 node.set_initial_value(value);
202 }
203 node.set_status(StatusCode::Good);
204 let rf = &node.item_to_monitor().node_id;
205
206 if let Some(cb) = cbs.get(rf).cloned() {
207 let tss = node.timestamps_to_return();
208 let index_range = node.item_to_monitor().index_range.clone();
209
210 self.samplers.add_sampler(
211 node.item_to_monitor().node_id.clone(),
212 AttributeId::Value,
213 move || {
214 Some(match cb(&index_range, tss, 0.0) {
215 Err(e) => DataValue {
216 status: Some(e),
217 ..Default::default()
218 },
219 Ok(v) => v,
220 })
221 },
222 node.monitoring_mode(),
223 node.handle(),
224 Duration::from_millis(node.sampling_interval() as u64),
225 )
226 }
227 }
228 }
229
230 async fn modify_monitored_items(
231 &self,
232 _context: &RequestContext,
233 items: &[&MonitoredItemUpdateRef],
234 ) {
235 for it in items {
236 self.samplers.update_sampler(
237 it.node_id(),
238 it.attribute(),
239 it.handle(),
240 Duration::from_millis(it.update().revised_sampling_interval as u64),
241 );
242 }
243 }
244
245 async fn set_monitoring_mode(
246 &self,
247 _context: &RequestContext,
248 mode: MonitoringMode,
249 items: &[&MonitoredItemRef],
250 ) {
251 for it in items {
252 self.samplers
253 .set_sampler_mode(it.node_id(), it.attribute(), it.handle(), mode);
254 }
255 }
256
257 async fn delete_monitored_items(&self, _context: &RequestContext, items: &[&MonitoredItemRef]) {
258 for it in items {
259 self.samplers
260 .remove_sampler(it.node_id(), it.attribute(), it.handle());
261 }
262 }
263
264 async fn write(
265 &self,
266 context: &RequestContext,
267 address_space: &RwLock<AddressSpace>,
268 nodes_to_write: &mut [&mut WriteNode],
269 ) -> Result<(), StatusCode> {
270 let mut address_space = trace_write_lock!(address_space);
271 let type_tree = trace_read_lock!(context.type_tree);
272 let cbs = trace_read_lock!(self.write_cbs);
273
274 for write in nodes_to_write {
275 self.write_node_value(&cbs, context, &mut address_space, &type_tree, write);
276 }
277
278 Ok(())
279 }
280
281 async fn call(
282 &self,
283 _context: &RequestContext,
284 _address_space: &RwLock<AddressSpace>,
285 methods_to_call: &mut [&mut &mut MethodCall],
286 ) -> Result<(), StatusCode> {
287 let cbs = trace_read_lock!(self.method_cbs);
288 for method in methods_to_call {
289 if let Some(cb) = cbs.get(method.method_id()) {
290 match cb(method.arguments()) {
291 Ok(r) => {
292 method.set_outputs(r);
293 method.set_status(StatusCode::Good);
294 }
295 Err(e) => method.set_status(e),
296 }
297 }
298 }
299
300 Ok(())
301 }
302}
303
304impl SimpleNodeManagerImpl {
305 fn new(namespaces: Vec<NamespaceMetadata>, name: &str, node_managers: NodeManagersRef) -> Self {
306 Self {
307 write_cbs: Default::default(),
308 read_cbs: Default::default(),
309 method_cbs: Default::default(),
310 namespaces,
311 name: name.to_owned(),
312 node_managers,
313 samplers: SyncSampler::new(),
314 }
315 }
316
317 fn read_node_value(
318 &self,
319 cbs: &HashMap<NodeId, ReadCB>,
320 context: &RequestContext,
321 address_space: &AddressSpace,
322 node_to_read: &ParsedReadValueId,
323 max_age: f64,
324 timestamps_to_return: TimestampsToReturn,
325 ) -> DataValue {
326 let mut result_value = DataValue::null();
327 let node = match address_space.validate_node_read(context, node_to_read) {
329 Ok(n) => n,
330 Err(e) => {
331 result_value.status = Some(e);
332 return result_value;
333 }
334 };
335
336 if let Some(cb) = cbs.get(&node_to_read.node_id) {
338 match cb(&node_to_read.index_range, timestamps_to_return, max_age) {
339 Err(e) => DataValue {
340 status: Some(e),
341 ..Default::default()
342 },
343 Ok(v) => v,
344 }
345 } else {
346 read_node_value(node, context, node_to_read, max_age, timestamps_to_return)
348 }
349 }
350
351 fn write_node_value(
352 &self,
353 cbs: &HashMap<NodeId, WriteCB>,
354 context: &RequestContext,
355 address_space: &mut AddressSpace,
356 type_tree: &DefaultTypeTree,
357 write: &mut WriteNode,
358 ) {
359 let node = match address_space.validate_node_write(context, write.value(), type_tree) {
360 Ok(v) => v,
361 Err(e) => {
362 write.set_status(e);
363 return;
364 }
365 };
366
367 if node.node_class() != NodeClass::Variable
368 || write.value().attribute_id != AttributeId::Value
369 {
370 write.set_status(StatusCode::BadNotWritable);
371 return;
372 }
373
374 if let Some(cb) = cbs.get(node.as_node().node_id()) {
375 write.set_status(cb(write.value().value.clone(), &write.value().index_range));
377 } else if write.value().value.value.is_some() {
378 match write_node_value(node, write.value()) {
380 Ok(_) => write.set_status(StatusCode::Good),
381 Err(e) => write.set_status(e),
382 }
383 } else {
384 write.set_status(StatusCode::BadNothingToDo);
386 }
387 if write.status().is_good() {
388 if let Some(val) = node.as_mut_node().get_attribute(
389 TimestampsToReturn::Both,
390 write.value().attribute_id,
391 &NumericRange::None,
392 &opcua_types::DataEncoding::Binary,
393 ) {
394 context.subscriptions.notify_data_change(
395 [(val, node.node_id(), write.value().attribute_id)].into_iter(),
396 );
397 }
398 }
399 }
400
401 pub fn add_write_callback(
403 &self,
404 id: NodeId,
405 cb: impl Fn(DataValue, &NumericRange) -> StatusCode + Send + Sync + 'static,
406 ) {
407 let mut cbs = trace_write_lock!(self.write_cbs);
408 cbs.insert(id, Arc::new(cb));
409 }
410
411 pub fn add_read_callback(
413 &self,
414 id: NodeId,
415 cb: impl Fn(&NumericRange, TimestampsToReturn, f64) -> Result<DataValue, StatusCode>
416 + Send
417 + Sync
418 + 'static,
419 ) {
420 let mut cbs = trace_write_lock!(self.read_cbs);
421 cbs.insert(id, Arc::new(cb));
422 }
423
424 pub fn add_method_callback(
426 &self,
427 id: NodeId,
428 cb: impl Fn(&[Variant]) -> Result<Vec<Variant>, StatusCode> + Send + Sync + 'static,
429 ) {
430 let mut cbs = trace_write_lock!(self.method_cbs);
431 cbs.insert(id, Arc::new(cb));
432 }
433}