Skip to main content

mabi_opcua/
device.rs

1//! OPC UA device implementation (placeholder).
2
3use async_trait::async_trait;
4use parking_lot::RwLock;
5use std::collections::HashMap;
6use tokio::sync::broadcast;
7
8use mabi_core::{
9    device::{Device, DeviceInfo, DeviceState, DeviceStatistics},
10    error::{Error, Result},
11    protocol::Protocol,
12    types::{DataPoint, DataPointDef, DataPointId},
13    value::Value,
14};
15
16/// OPC UA device implementation.
17pub struct OpcUaDevice {
18    info: DeviceInfo,
19    point_defs: HashMap<String, DataPointDef>,
20    values: RwLock<HashMap<String, Value>>,
21    stats: RwLock<DeviceStatistics>,
22    event_tx: broadcast::Sender<DataPoint>,
23}
24
25impl OpcUaDevice {
26    /// Create a new OPC UA device.
27    pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
28        let (event_tx, _) = broadcast::channel(1000);
29
30        Self {
31            info: DeviceInfo::new(id, name, Protocol::OpcUa),
32            point_defs: HashMap::new(),
33            values: RwLock::new(HashMap::new()),
34            stats: RwLock::new(DeviceStatistics::default()),
35            event_tx,
36        }
37    }
38
39    /// Add a node/variable to the device.
40    pub fn add_node(&mut self, def: DataPointDef, initial_value: Value) {
41        self.values.write().insert(def.id.clone(), initial_value);
42        self.point_defs.insert(def.id.clone(), def);
43        self.info.point_count = self.point_defs.len();
44    }
45}
46
47#[async_trait]
48impl Device for OpcUaDevice {
49    fn info(&self) -> &DeviceInfo {
50        &self.info
51    }
52
53    async fn initialize(&mut self) -> Result<()> {
54        self.info.state = DeviceState::Online;
55        Ok(())
56    }
57
58    async fn start(&mut self) -> Result<()> {
59        self.info.state = DeviceState::Online;
60        Ok(())
61    }
62
63    async fn stop(&mut self) -> Result<()> {
64        self.info.state = DeviceState::Offline;
65        Ok(())
66    }
67
68    async fn tick(&mut self) -> Result<()> {
69        Ok(())
70    }
71
72    fn point_definitions(&self) -> Vec<&DataPointDef> {
73        self.point_defs.values().collect()
74    }
75
76    fn point_definition(&self, point_id: &str) -> Option<&DataPointDef> {
77        self.point_defs.get(point_id)
78    }
79
80    async fn read(&self, point_id: &str) -> Result<DataPoint> {
81        let value = self
82            .values
83            .read()
84            .get(point_id)
85            .cloned()
86            .ok_or_else(|| Error::point_not_found(&self.info.id, point_id))?;
87
88        self.stats.write().record_read();
89        let id = DataPointId::new(&self.info.id, point_id);
90        Ok(DataPoint::new(id, value))
91    }
92
93    async fn write(&mut self, point_id: &str, value: Value) -> Result<()> {
94        if !self.point_defs.contains_key(point_id) {
95            return Err(Error::point_not_found(&self.info.id, point_id));
96        }
97
98        self.values
99            .write()
100            .insert(point_id.to_string(), value.clone());
101        self.stats.write().record_write();
102
103        let id = DataPointId::new(&self.info.id, point_id);
104        let _ = self.event_tx.send(DataPoint::new(id, value));
105
106        Ok(())
107    }
108
109    fn subscribe(&self) -> Option<broadcast::Receiver<DataPoint>> {
110        Some(self.event_tx.subscribe())
111    }
112
113    fn statistics(&self) -> DeviceStatistics {
114        self.stats.read().clone()
115    }
116}