1use std::collections::HashMap;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9
10use crate::error::Result;
11use crate::protocol::Protocol;
12use crate::tags::Tags;
13use crate::types::{DataPoint, DataPointDef};
14use crate::value::Value;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
18#[serde(rename_all = "lowercase")]
19pub enum DeviceState {
20 #[default]
22 Uninitialized,
23 Initializing,
25 Online,
27 Offline,
29 Error,
31 ShuttingDown,
33}
34
35impl DeviceState {
36 pub fn is_operational(&self) -> bool {
38 matches!(self, Self::Online)
39 }
40
41 pub fn can_accept_requests(&self) -> bool {
43 matches!(self, Self::Online | Self::Initializing)
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct DeviceInfo {
50 pub id: String,
52 pub name: String,
54 pub description: String,
56 pub protocol: Protocol,
58 pub state: DeviceState,
60 pub point_count: usize,
62 pub created_at: DateTime<Utc>,
64 pub updated_at: DateTime<Utc>,
66 #[serde(default)]
68 pub metadata: HashMap<String, String>,
69 #[serde(default, skip_serializing_if = "Tags::is_empty")]
71 pub tags: Tags,
72}
73
74impl DeviceInfo {
75 pub fn new(id: impl Into<String>, name: impl Into<String>, protocol: Protocol) -> Self {
77 let now = Utc::now();
78 Self {
79 id: id.into(),
80 name: name.into(),
81 description: String::new(),
82 protocol,
83 state: DeviceState::Uninitialized,
84 point_count: 0,
85 created_at: now,
86 updated_at: now,
87 metadata: HashMap::new(),
88 tags: Tags::new(),
89 }
90 }
91
92 pub fn with_description(mut self, description: impl Into<String>) -> Self {
94 self.description = description.into();
95 self
96 }
97
98 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
100 self.metadata.insert(key.into(), value.into());
101 self
102 }
103
104 pub fn with_tags(mut self, tags: Tags) -> Self {
106 self.tags = tags;
107 self
108 }
109
110 pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
112 self.tags.insert(key.into(), value.into());
113 self
114 }
115
116 pub fn with_label(mut self, label: impl Into<String>) -> Self {
118 self.tags.add_label(label.into());
119 self
120 }
121}
122
123#[async_trait]
125pub trait Device: Send + Sync {
126 fn info(&self) -> &DeviceInfo;
128
129 fn id(&self) -> &str {
131 &self.info().id
132 }
133
134 fn name(&self) -> &str {
136 &self.info().name
137 }
138
139 fn protocol(&self) -> Protocol {
141 self.info().protocol
142 }
143
144 fn state(&self) -> DeviceState {
146 self.info().state
147 }
148
149 async fn initialize(&mut self) -> Result<()>;
151
152 async fn start(&mut self) -> Result<()>;
154
155 async fn stop(&mut self) -> Result<()>;
157
158 async fn tick(&mut self) -> Result<()>;
160
161 fn point_definitions(&self) -> Vec<&DataPointDef>;
163
164 fn point_definition(&self, point_id: &str) -> Option<&DataPointDef>;
166
167 async fn read(&self, point_id: &str) -> Result<DataPoint>;
169
170 async fn read_multiple(&self, point_ids: &[&str]) -> Result<Vec<DataPoint>> {
172 let mut results = Vec::with_capacity(point_ids.len());
173 for point_id in point_ids {
174 results.push(self.read(point_id).await?);
175 }
176 Ok(results)
177 }
178
179 async fn read_all(&self) -> Result<Vec<DataPoint>> {
181 let point_ids: Vec<&str> = self
182 .point_definitions()
183 .iter()
184 .map(|d| d.id.as_str())
185 .collect();
186 self.read_multiple(&point_ids).await
187 }
188
189 async fn write(&mut self, point_id: &str, value: Value) -> Result<()>;
191
192 async fn write_multiple(&mut self, values: &[(&str, Value)]) -> Result<()> {
194 for (point_id, value) in values {
195 self.write(point_id, value.clone()).await?;
196 }
197 Ok(())
198 }
199
200 fn subscribe(&self) -> Option<tokio::sync::broadcast::Receiver<DataPoint>> {
202 None
203 }
204
205 fn statistics(&self) -> DeviceStatistics {
207 DeviceStatistics::default()
208 }
209}
210
211#[derive(Debug, Clone, Default, Serialize, Deserialize)]
213pub struct DeviceStatistics {
214 pub reads_total: u64,
216 pub writes_total: u64,
218 pub read_errors: u64,
220 pub write_errors: u64,
222 pub ticks_total: u64,
224 pub avg_tick_duration_us: u64,
226 pub last_error: Option<String>,
228 pub uptime_secs: u64,
230}
231
232impl DeviceStatistics {
233 pub fn record_read(&mut self) {
235 self.reads_total += 1;
236 }
237
238 pub fn record_read_error(&mut self, error: &str) {
240 self.read_errors += 1;
241 self.last_error = Some(error.to_string());
242 }
243
244 pub fn record_write(&mut self) {
246 self.writes_total += 1;
247 }
248
249 pub fn record_write_error(&mut self, error: &str) {
251 self.write_errors += 1;
252 self.last_error = Some(error.to_string());
253 }
254
255 pub fn record_tick(&mut self, duration_us: u64) {
257 self.ticks_total += 1;
258 self.avg_tick_duration_us =
260 (self.avg_tick_duration_us * (self.ticks_total - 1) + duration_us) / self.ticks_total;
261 }
262}
263
264pub type BoxedDevice = Box<dyn Device>;
266
267pub type ArcDevice = Arc<dyn Device>;
269
270#[derive(Clone)]
273pub struct DeviceHandle {
274 inner: Arc<tokio::sync::RwLock<BoxedDevice>>,
275 cached_info: Arc<parking_lot::RwLock<DeviceInfo>>,
277}
278
279impl DeviceHandle {
280 pub fn new(device: BoxedDevice) -> Self {
282 let info = device.info().clone();
283 Self {
284 inner: Arc::new(tokio::sync::RwLock::new(device)),
285 cached_info: Arc::new(parking_lot::RwLock::new(info)),
286 }
287 }
288
289 pub fn info(&self) -> DeviceInfo {
291 self.cached_info.read().clone()
292 }
293
294 pub fn id(&self) -> String {
296 self.cached_info.read().id.clone()
297 }
298
299 pub fn state(&self) -> DeviceState {
301 self.cached_info.read().state
302 }
303
304 pub async fn refresh_info(&self) {
306 let info = self.inner.read().await.info().clone();
307 *self.cached_info.write() = info;
308 }
309
310 pub async fn initialize(&self) -> Result<()> {
312 let result = self.inner.write().await.initialize().await;
313 self.refresh_info().await;
314 result
315 }
316
317 pub async fn start(&self) -> Result<()> {
319 let result = self.inner.write().await.start().await;
320 self.refresh_info().await;
321 result
322 }
323
324 pub async fn stop(&self) -> Result<()> {
326 let result = self.inner.write().await.stop().await;
327 self.refresh_info().await;
328 result
329 }
330
331 pub async fn tick(&self) -> Result<()> {
333 self.inner.write().await.tick().await
334 }
335
336 pub async fn read(&self, point_id: &str) -> Result<DataPoint> {
338 self.inner.read().await.read(point_id).await
339 }
340
341 pub async fn write(&self, point_id: &str, value: Value) -> Result<()> {
343 self.inner.write().await.write(point_id, value).await
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350
351 #[test]
352 fn test_device_state() {
353 assert!(!DeviceState::Uninitialized.is_operational());
354 assert!(DeviceState::Online.is_operational());
355 assert!(DeviceState::Online.can_accept_requests());
356 }
357
358 #[test]
359 fn test_device_info() {
360 let info = DeviceInfo::new("dev-001", "Test Device", Protocol::ModbusTcp)
361 .with_description("A test device")
362 .with_metadata("location", "Building A");
363
364 assert_eq!(info.id, "dev-001");
365 assert_eq!(info.protocol, Protocol::ModbusTcp);
366 assert_eq!(
367 info.metadata.get("location"),
368 Some(&"Building A".to_string())
369 );
370 }
371
372 #[test]
373 fn test_device_info_with_tags() {
374 let info = DeviceInfo::new("dev-002", "Tagged Device", Protocol::BacnetIp)
375 .with_tag("zone", "hvac")
376 .with_tag("floor", "3")
377 .with_label("critical")
378 .with_label("monitored");
379
380 assert_eq!(info.tags.get("zone"), Some("hvac"));
381 assert_eq!(info.tags.get("floor"), Some("3"));
382 assert!(info.tags.has_label("critical"));
383 assert!(info.tags.has_label("monitored"));
384 }
385
386 #[test]
387 fn test_device_statistics() {
388 let mut stats = DeviceStatistics::default();
389 stats.record_read();
390 stats.record_tick(100);
391 stats.record_tick(200);
392
393 assert_eq!(stats.reads_total, 1);
394 assert_eq!(stats.ticks_total, 2);
395 assert_eq!(stats.avg_tick_duration_us, 150);
396 }
397}