use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use serde_json::{Map, Value};
use tokio::sync::RwLock;
use tracing::{debug, info};
use crate::client::WireBandClient;
use crate::frame;
use crate::symbols::{EDGE_TWIN_DELTA, EDGE_TWIN_SYNC};
fn unix_ts() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
#[derive(Clone)]
pub struct DeviceTwin {
device_id: String,
reported: Arc<RwLock<Map<String, Value>>>,
desired: Arc<RwLock<Map<String, Value>>>,
}
impl DeviceTwin {
pub fn new(device_id: impl Into<String>) -> Self {
Self {
device_id: device_id.into(),
reported: Arc::new(RwLock::new(Map::new())),
desired: Arc::new(RwLock::new(Map::new())),
}
}
pub async fn update_reported(&self, patch: Map<String, Value>) {
let mut reported = self.reported.write().await;
for (k, v) in patch {
if v.is_null() {
reported.remove(&k);
} else {
reported.insert(k, v);
}
}
debug!(device_id = %self.device_id, keys = reported.len(), "Reported state updated");
}
pub async fn set_desired(&self, desired: Map<String, Value>) {
*self.desired.write().await = desired;
debug!(device_id = %self.device_id, "Desired state updated");
}
pub async fn reported(&self) -> Map<String, Value> {
self.reported.read().await.clone()
}
pub async fn desired(&self) -> Map<String, Value> {
self.desired.read().await.clone()
}
pub async fn delta(&self) -> Option<Map<String, Value>> {
let reported = self.reported.read().await;
let desired = self.desired.read().await;
let delta: Map<String, Value> = desired
.iter()
.filter(|(k, dv)| reported.get(*k) != Some(dv))
.map(|(k, dv)| (k.clone(), dv.clone()))
.collect();
if delta.is_empty() { None } else { Some(delta) }
}
pub async fn sync(&self, client: &WireBandClient) {
let state = Value::Object(self.reported.read().await.clone());
let topic = format!("twin/{}/sync", self.device_id);
let encoded = frame::encode(EDGE_TWIN_SYNC, &topic, &state);
client.buffer_event(topic, EDGE_TWIN_SYNC, encoded, unix_ts()).await;
info!(device_id = %self.device_id, "Twin sync emitted");
}
pub async fn emit_delta(&self, client: &WireBandClient) -> bool {
match self.delta().await {
None => false,
Some(delta) => {
let data = Value::Object(delta);
let topic = format!("twin/{}/delta", self.device_id);
let encoded = frame::encode(EDGE_TWIN_DELTA, &topic, &data);
client.buffer_event(topic, EDGE_TWIN_DELTA, encoded, unix_ts()).await;
info!(device_id = %self.device_id, "Twin delta emitted");
true
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn map(v: Value) -> Map<String, Value> {
v.as_object().unwrap().clone()
}
#[tokio::test]
async fn reported_merge_patch() {
let twin = DeviceTwin::new("test");
twin.update_reported(map(json!({ "a": 1, "b": 2 }))).await;
twin.update_reported(map(json!({ "b": null, "c": 3 }))).await;
let state = twin.reported().await;
assert_eq!(state.get("a"), Some(&json!(1)));
assert_eq!(state.get("b"), None); assert_eq!(state.get("c"), Some(&json!(3)));
}
#[tokio::test]
async fn delta_when_desired_differs() {
let twin = DeviceTwin::new("test");
twin.update_reported(map(json!({ "fan_rpm": 1200 }))).await;
twin.set_desired(map(json!({ "fan_rpm": 1500 }))).await;
let delta = twin.delta().await.unwrap();
assert_eq!(delta.get("fan_rpm"), Some(&json!(1500)));
}
#[tokio::test]
async fn no_delta_when_in_sync() {
let twin = DeviceTwin::new("test");
twin.update_reported(map(json!({ "fan_rpm": 1200 }))).await;
twin.set_desired(map(json!({ "fan_rpm": 1200 }))).await;
assert!(twin.delta().await.is_none());
}
#[tokio::test]
async fn delta_only_includes_differing_keys() {
let twin = DeviceTwin::new("test");
twin.update_reported(map(json!({ "a": 1, "b": 2, "c": 3 }))).await;
twin.set_desired(map(json!({ "a": 1, "b": 99 }))).await;
let delta = twin.delta().await.unwrap();
assert!(!delta.contains_key("a")); assert_eq!(delta.get("b"), Some(&json!(99))); }
}