1use async_trait::async_trait;
4use parking_lot::RwLock;
5use std::collections::HashMap;
6use tokio::sync::broadcast;
7
8use mabi_core::{
9 device::{Device, DeviceInfo, DeviceState, DeviceStatistics},
10 error::{Error, Result},
11 protocol::Protocol,
12 types::{DataPoint, DataPointDef, DataPointId},
13 value::Value,
14};
15
16pub struct OpcUaDevice {
18 info: DeviceInfo,
19 point_defs: HashMap<String, DataPointDef>,
20 values: RwLock<HashMap<String, Value>>,
21 stats: RwLock<DeviceStatistics>,
22 event_tx: broadcast::Sender<DataPoint>,
23}
24
25impl OpcUaDevice {
26 pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
28 let (event_tx, _) = broadcast::channel(1000);
29
30 Self {
31 info: DeviceInfo::new(id, name, Protocol::OpcUa),
32 point_defs: HashMap::new(),
33 values: RwLock::new(HashMap::new()),
34 stats: RwLock::new(DeviceStatistics::default()),
35 event_tx,
36 }
37 }
38
39 pub fn add_node(&mut self, def: DataPointDef, initial_value: Value) {
41 self.values.write().insert(def.id.clone(), initial_value);
42 self.point_defs.insert(def.id.clone(), def);
43 self.info.point_count = self.point_defs.len();
44 }
45}
46
47#[async_trait]
48impl Device for OpcUaDevice {
49 fn info(&self) -> &DeviceInfo {
50 &self.info
51 }
52
53 async fn initialize(&mut self) -> Result<()> {
54 self.info.state = DeviceState::Online;
55 Ok(())
56 }
57
58 async fn start(&mut self) -> Result<()> {
59 self.info.state = DeviceState::Online;
60 Ok(())
61 }
62
63 async fn stop(&mut self) -> Result<()> {
64 self.info.state = DeviceState::Offline;
65 Ok(())
66 }
67
68 async fn tick(&mut self) -> Result<()> {
69 Ok(())
70 }
71
72 fn point_definitions(&self) -> Vec<&DataPointDef> {
73 self.point_defs.values().collect()
74 }
75
76 fn point_definition(&self, point_id: &str) -> Option<&DataPointDef> {
77 self.point_defs.get(point_id)
78 }
79
80 async fn read(&self, point_id: &str) -> Result<DataPoint> {
81 let value = self
82 .values
83 .read()
84 .get(point_id)
85 .cloned()
86 .ok_or_else(|| Error::point_not_found(&self.info.id, point_id))?;
87
88 self.stats.write().record_read();
89 let id = DataPointId::new(&self.info.id, point_id);
90 Ok(DataPoint::new(id, value))
91 }
92
93 async fn write(&mut self, point_id: &str, value: Value) -> Result<()> {
94 if !self.point_defs.contains_key(point_id) {
95 return Err(Error::point_not_found(&self.info.id, point_id));
96 }
97
98 self.values
99 .write()
100 .insert(point_id.to_string(), value.clone());
101 self.stats.write().record_write();
102
103 let id = DataPointId::new(&self.info.id, point_id);
104 let _ = self.event_tx.send(DataPoint::new(id, value));
105
106 Ok(())
107 }
108
109 fn subscribe(&self) -> Option<broadcast::Receiver<DataPoint>> {
110 Some(self.event_tx.subscribe())
111 }
112
113 fn statistics(&self) -> DeviceStatistics {
114 self.stats.read().clone()
115 }
116}