use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::protocol::Protocol;
use crate::tags::Tags;
use crate::types::{DataPoint, DataPointDef};
use crate::value::Value;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum DeviceState {
#[default]
Uninitialized,
Initializing,
Online,
Offline,
Error,
ShuttingDown,
}
impl DeviceState {
pub fn is_operational(&self) -> bool {
matches!(self, Self::Online)
}
pub fn can_accept_requests(&self) -> bool {
matches!(self, Self::Online | Self::Initializing)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceInfo {
pub id: String,
pub name: String,
pub description: String,
pub protocol: Protocol,
pub state: DeviceState,
pub point_count: usize,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
#[serde(default)]
pub metadata: HashMap<String, String>,
#[serde(default, skip_serializing_if = "Tags::is_empty")]
pub tags: Tags,
}
impl DeviceInfo {
pub fn new(id: impl Into<String>, name: impl Into<String>, protocol: Protocol) -> Self {
let now = Utc::now();
Self {
id: id.into(),
name: name.into(),
description: String::new(),
protocol,
state: DeviceState::Uninitialized,
point_count: 0,
created_at: now,
updated_at: now,
metadata: HashMap::new(),
tags: Tags::new(),
}
}
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = description.into();
self
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
pub fn with_tags(mut self, tags: Tags) -> Self {
self.tags = tags;
self
}
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.tags.insert(key.into(), value.into());
self
}
pub fn with_label(mut self, label: impl Into<String>) -> Self {
self.tags.add_label(label.into());
self
}
}
#[async_trait]
pub trait Device: Send + Sync {
fn info(&self) -> &DeviceInfo;
fn id(&self) -> &str {
&self.info().id
}
fn name(&self) -> &str {
&self.info().name
}
fn protocol(&self) -> Protocol {
self.info().protocol
}
fn state(&self) -> DeviceState {
self.info().state
}
async fn initialize(&mut self) -> Result<()>;
async fn start(&mut self) -> Result<()>;
async fn stop(&mut self) -> Result<()>;
async fn tick(&mut self) -> Result<()>;
fn point_definitions(&self) -> Vec<&DataPointDef>;
fn point_definition(&self, point_id: &str) -> Option<&DataPointDef>;
async fn read(&self, point_id: &str) -> Result<DataPoint>;
async fn read_multiple(&self, point_ids: &[&str]) -> Result<Vec<DataPoint>> {
let mut results = Vec::with_capacity(point_ids.len());
for point_id in point_ids {
results.push(self.read(point_id).await?);
}
Ok(results)
}
async fn read_all(&self) -> Result<Vec<DataPoint>> {
let point_ids: Vec<&str> = self
.point_definitions()
.iter()
.map(|d| d.id.as_str())
.collect();
self.read_multiple(&point_ids).await
}
async fn write(&mut self, point_id: &str, value: Value) -> Result<()>;
async fn write_multiple(&mut self, values: &[(&str, Value)]) -> Result<()> {
for (point_id, value) in values {
self.write(point_id, value.clone()).await?;
}
Ok(())
}
fn subscribe(&self) -> Option<tokio::sync::broadcast::Receiver<DataPoint>> {
None
}
fn statistics(&self) -> DeviceStatistics {
DeviceStatistics::default()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DeviceStatistics {
pub reads_total: u64,
pub writes_total: u64,
pub read_errors: u64,
pub write_errors: u64,
pub ticks_total: u64,
pub avg_tick_duration_us: u64,
pub last_error: Option<String>,
pub uptime_secs: u64,
}
impl DeviceStatistics {
pub fn record_read(&mut self) {
self.reads_total += 1;
}
pub fn record_read_error(&mut self, error: &str) {
self.read_errors += 1;
self.last_error = Some(error.to_string());
}
pub fn record_write(&mut self) {
self.writes_total += 1;
}
pub fn record_write_error(&mut self, error: &str) {
self.write_errors += 1;
self.last_error = Some(error.to_string());
}
pub fn record_tick(&mut self, duration_us: u64) {
self.ticks_total += 1;
self.avg_tick_duration_us =
(self.avg_tick_duration_us * (self.ticks_total - 1) + duration_us) / self.ticks_total;
}
}
pub type BoxedDevice = Box<dyn Device>;
pub type ArcDevice = Arc<dyn Device>;
#[derive(Clone)]
pub struct DeviceHandle {
inner: Arc<tokio::sync::RwLock<BoxedDevice>>,
cached_info: Arc<parking_lot::RwLock<DeviceInfo>>,
}
impl DeviceHandle {
pub fn new(device: BoxedDevice) -> Self {
let info = device.info().clone();
Self {
inner: Arc::new(tokio::sync::RwLock::new(device)),
cached_info: Arc::new(parking_lot::RwLock::new(info)),
}
}
pub fn info(&self) -> DeviceInfo {
self.cached_info.read().clone()
}
pub fn id(&self) -> String {
self.cached_info.read().id.clone()
}
pub fn state(&self) -> DeviceState {
self.cached_info.read().state
}
pub async fn refresh_info(&self) {
let info = self.inner.read().await.info().clone();
*self.cached_info.write() = info;
}
pub async fn initialize(&self) -> Result<()> {
let result = self.inner.write().await.initialize().await;
self.refresh_info().await;
result
}
pub async fn start(&self) -> Result<()> {
let result = self.inner.write().await.start().await;
self.refresh_info().await;
result
}
pub async fn stop(&self) -> Result<()> {
let result = self.inner.write().await.stop().await;
self.refresh_info().await;
result
}
pub async fn tick(&self) -> Result<()> {
self.inner.write().await.tick().await
}
pub async fn read(&self, point_id: &str) -> Result<DataPoint> {
self.inner.read().await.read(point_id).await
}
pub async fn write(&self, point_id: &str, value: Value) -> Result<()> {
self.inner.write().await.write(point_id, value).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_device_state() {
assert!(!DeviceState::Uninitialized.is_operational());
assert!(DeviceState::Online.is_operational());
assert!(DeviceState::Online.can_accept_requests());
}
#[test]
fn test_device_info() {
let info = DeviceInfo::new("dev-001", "Test Device", Protocol::ModbusTcp)
.with_description("A test device")
.with_metadata("location", "Building A");
assert_eq!(info.id, "dev-001");
assert_eq!(info.protocol, Protocol::ModbusTcp);
assert_eq!(
info.metadata.get("location"),
Some(&"Building A".to_string())
);
}
#[test]
fn test_device_info_with_tags() {
let info = DeviceInfo::new("dev-002", "Tagged Device", Protocol::BacnetIp)
.with_tag("zone", "hvac")
.with_tag("floor", "3")
.with_label("critical")
.with_label("monitored");
assert_eq!(info.tags.get("zone"), Some("hvac"));
assert_eq!(info.tags.get("floor"), Some("3"));
assert!(info.tags.has_label("critical"));
assert!(info.tags.has_label("monitored"));
}
#[test]
fn test_device_statistics() {
let mut stats = DeviceStatistics::default();
stats.record_read();
stats.record_tick(100);
stats.record_tick(200);
assert_eq!(stats.reads_total, 1);
assert_eq!(stats.ticks_total, 2);
assert_eq!(stats.avg_tick_duration_us, 150);
}
}