wireband-edge 0.4.1

Lightweight Wire.Band client — semantic data middleware for any domain (IoT, AI/ML, DeFi, legal, geospatial, supply chain, and more)
Documentation
//! Device twin — maintains reported/desired state and emits sync/delta events.
//!
//! Mirrors the Azure IoT Hub / AWS IoT Core device shadow concept.
//! All state is stored locally; sync to/from the Wire.Band backend
//! is done by emitting [`EDGE_TWIN_SYNC`] and [`EDGE_TWIN_DELTA`] frames.

use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use serde_json::{Map, Value};
use tokio::sync::RwLock;
use tracing::{debug, info};

use crate::client::WireBandClient;
use crate::frame;
use crate::symbols::{EDGE_TWIN_DELTA, EDGE_TWIN_SYNC};

fn unix_ts() -> f64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs_f64()
}

/// Local device twin: tracks reported state and desired state, computes deltas.
///
/// # Example
///
/// ```ignore
/// use wireband_edge::agent::DeviceTwin;
///
/// let twin = DeviceTwin::new("factory-rpi4");
///
/// // Device reports its current state
/// twin.update_reported(serde_json::json!({ "temp": 72.3, "fan_rpm": 1200 })
///     .as_object().unwrap().clone()).await;
///
/// // Server sets desired state
/// twin.set_desired(serde_json::json!({ "fan_rpm": 1500 })
///     .as_object().unwrap().clone()).await;
///
/// // Emit delta (fan_rpm differs)
/// twin.emit_delta(&client).await;
/// ```
#[derive(Clone)]
pub struct DeviceTwin {
    device_id: String,
    reported:  Arc<RwLock<Map<String, Value>>>,
    desired:   Arc<RwLock<Map<String, Value>>>,
}

impl DeviceTwin {
    pub fn new(device_id: impl Into<String>) -> Self {
        Self {
            device_id: device_id.into(),
            reported:  Arc::new(RwLock::new(Map::new())),
            desired:   Arc::new(RwLock::new(Map::new())),
        }
    }

    // -----------------------------------------------------------------------
    // State mutations
    // -----------------------------------------------------------------------

    /// Merge a patch into reported state. Null values remove keys (JSON Merge Patch).
    pub async fn update_reported(&self, patch: Map<String, Value>) {
        let mut reported = self.reported.write().await;
        for (k, v) in patch {
            if v.is_null() {
                reported.remove(&k);
            } else {
                reported.insert(k, v);
            }
        }
        debug!(device_id = %self.device_id, keys = reported.len(), "Reported state updated");
    }

    /// Replace the desired state (as received from the backend/server).
    pub async fn set_desired(&self, desired: Map<String, Value>) {
        *self.desired.write().await = desired;
        debug!(device_id = %self.device_id, "Desired state updated");
    }

    // -----------------------------------------------------------------------
    // State reads
    // -----------------------------------------------------------------------

    pub async fn reported(&self) -> Map<String, Value> {
        self.reported.read().await.clone()
    }

    pub async fn desired(&self) -> Map<String, Value> {
        self.desired.read().await.clone()
    }

    /// Compute keys where desired ≠ reported. Returns `None` when in sync.
    pub async fn delta(&self) -> Option<Map<String, Value>> {
        let reported = self.reported.read().await;
        let desired  = self.desired.read().await;

        let delta: Map<String, Value> = desired
            .iter()
            .filter(|(k, dv)| reported.get(*k) != Some(dv))
            .map(|(k, dv)| (k.clone(), dv.clone()))
            .collect();

        if delta.is_empty() { None } else { Some(delta) }
    }

    // -----------------------------------------------------------------------
    // Wire.Band events
    // -----------------------------------------------------------------------

    /// Emit `EDGE_TWIN_SYNC` — full reported state snapshot to backend.
    pub async fn sync(&self, client: &WireBandClient) {
        let state = Value::Object(self.reported.read().await.clone());
        let topic = format!("twin/{}/sync", self.device_id);
        let encoded = frame::encode(EDGE_TWIN_SYNC, &topic, &state);
        client.buffer_event(topic, EDGE_TWIN_SYNC, encoded, unix_ts()).await;
        info!(device_id = %self.device_id, "Twin sync emitted");
    }

    /// Emit `EDGE_TWIN_DELTA` if reported and desired differ. Returns `true` if a delta existed.
    pub async fn emit_delta(&self, client: &WireBandClient) -> bool {
        match self.delta().await {
            None => false,
            Some(delta) => {
                let data  = Value::Object(delta);
                let topic = format!("twin/{}/delta", self.device_id);
                let encoded = frame::encode(EDGE_TWIN_DELTA, &topic, &data);
                client.buffer_event(topic, EDGE_TWIN_DELTA, encoded, unix_ts()).await;
                info!(device_id = %self.device_id, "Twin delta emitted");
                true
            }
        }
    }
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    fn map(v: Value) -> Map<String, Value> {
        v.as_object().unwrap().clone()
    }

    #[tokio::test]
    async fn reported_merge_patch() {
        let twin = DeviceTwin::new("test");
        twin.update_reported(map(json!({ "a": 1, "b": 2 }))).await;
        twin.update_reported(map(json!({ "b": null, "c": 3 }))).await;
        let state = twin.reported().await;
        assert_eq!(state.get("a"), Some(&json!(1)));
        assert_eq!(state.get("b"), None);  // removed by null patch
        assert_eq!(state.get("c"), Some(&json!(3)));
    }

    #[tokio::test]
    async fn delta_when_desired_differs() {
        let twin = DeviceTwin::new("test");
        twin.update_reported(map(json!({ "fan_rpm": 1200 }))).await;
        twin.set_desired(map(json!({ "fan_rpm": 1500 }))).await;
        let delta = twin.delta().await.unwrap();
        assert_eq!(delta.get("fan_rpm"), Some(&json!(1500)));
    }

    #[tokio::test]
    async fn no_delta_when_in_sync() {
        let twin = DeviceTwin::new("test");
        twin.update_reported(map(json!({ "fan_rpm": 1200 }))).await;
        twin.set_desired(map(json!({ "fan_rpm": 1200 }))).await;
        assert!(twin.delta().await.is_none());
    }

    #[tokio::test]
    async fn delta_only_includes_differing_keys() {
        let twin = DeviceTwin::new("test");
        twin.update_reported(map(json!({ "a": 1, "b": 2, "c": 3 }))).await;
        twin.set_desired(map(json!({ "a": 1, "b": 99 }))).await;
        let delta = twin.delta().await.unwrap();
        assert!(!delta.contains_key("a")); // same
        assert_eq!(delta.get("b"), Some(&json!(99))); // different
    }
}