wireband-edge 0.4.0

Lightweight Wire.Band client — semantic data middleware for any domain (IoT, AI/ML, DeFi, legal, geospatial, supply chain, and more)
Documentation
//! MQTT topic classifier and numeric delta filter.

use std::collections::HashMap;

use serde_json::Value;

use crate::symbols::*;

// ---------------------------------------------------------------------------
// Topic Classifier
// ---------------------------------------------------------------------------

/// Maps MQTT topic paths and payload keys to theta IoT symbols.
///
/// Resolution order (first match wins):
///   1. Exact topic overrides registered via [`TopicClassifier::register_topic`]
///   2. Topic path segment keywords, rightmost segment checked first
///   3. Payload key heuristics (for generic "data" topics)
///   4. Fallback: [`SENSOR_POLL`]
///
/// All keyword matching is case-insensitive.
pub struct TopicClassifier {
    exact:          HashMap<String, u16>,
    topic_keywords: HashMap<String, u16>,
    payload_keys:   HashMap<String, u16>,
}

impl Default for TopicClassifier {
    fn default() -> Self {
        Self::new()
    }
}

impl TopicClassifier {
    pub fn new() -> Self {
        let topic_keywords = HashMap::from([
            // Sensors
            ("temp".into(),        SENSOR_TEMP),
            ("temperature".into(), SENSOR_TEMP),
            ("humidity".into(),    SENSOR_HUMIDITY),
            ("pressure".into(),    SENSOR_PRESSURE),
            ("voltage".into(),     SENSOR_VOLTAGE),
            ("current".into(),     SENSOR_CURRENT),
            ("power".into(),       SENSOR_POWER_DRAW),
            ("vibration".into(),   SENSOR_VIBRATION),
            ("noise".into(),       SENSOR_NOISE),
            ("altitude".into(),    SENSOR_ALTITUDE),
            ("sensor".into(),      SENSOR_POLL),
            ("threshold".into(),   SENSOR_THRESHOLD),
            ("alert".into(),       SENSOR_ALERT),
            // Actuators
            ("motor".into(),       ACTUATOR_MOTOR_START),
            ("servo".into(),       ACTUATOR_SERVO_SET),
            ("relay".into(),       ACTUATOR_RELAY_ON),
            ("solenoid".into(),    ACTUATOR_SOLENOID_OPEN),
            ("stepper".into(),     ACTUATOR_STEPPER_STEP),
            ("led".into(),         ACTUATOR_LED_SET),
            ("buzzer".into(),      ACTUATOR_BUZZER_ON),
            ("actuator".into(),    ACTUATOR_MOTOR_START),
            // GPIO
            ("gpio".into(),        GPIO_READ),
            ("pin".into(),         GPIO_READ),
            ("pwm".into(),         GPIO_PWM_SET),
            ("analog".into(),      GPIO_ANALOG_READ),
            ("adc".into(),         GPIO_ANALOG_READ),
            ("dac".into(),         GPIO_ANALOG_WRITE),
            // Edge lifecycle
            ("ota".into(),         EDGE_OTA_START),
            ("update".into(),      EDGE_OTA_START),
            ("watchdog".into(),    EDGE_WDT_KICK),
            ("wdt".into(),         EDGE_WDT_KICK),
            ("sleep".into(),       EDGE_SLEEP),
            ("wake".into(),        EDGE_WAKE),
            ("twin".into(),        EDGE_TWIN_SYNC),
            ("shadow".into(),      EDGE_TWIN_SYNC),
            ("provision".into(),   EDGE_PROVISION),
            ("reset".into(),       EDGE_RESET),
            ("inference".into(),   EDGE_INFER),
            // Metrics
            ("metrics".into(),     METRICS_GAUGE_SET),
            ("telemetry".into(),   METRICS_GAUGE_SET),
            ("counter".into(),     METRICS_COUNTER_INC),
            ("event".into(),       METRICS_EVENT_EMIT),
            ("log".into(),         METRICS_EVENT_EMIT),
            ("status".into(),      METRICS_GAUGE_CURRENT),
            // Protocol
            ("cmd".into(),         PROTO_MQTT_PUB),
            ("command".into(),     PROTO_MQTT_PUB),
        ]);

        let payload_keys = HashMap::from([
            ("temperature".into(), SENSOR_TEMP),
            ("temp".into(),        SENSOR_TEMP),
            ("humidity".into(),    SENSOR_HUMIDITY),
            ("pressure".into(),    SENSOR_PRESSURE),
            ("voltage".into(),     SENSOR_VOLTAGE),
            ("current".into(),     SENSOR_CURRENT),
            ("rpm".into(),         ACTUATOR_MOTOR_START),
            ("speed".into(),       ACTUATOR_MOTOR_START),
            ("angle".into(),       ACTUATOR_SERVO_SET),
            ("state".into(),       METRICS_GAUGE_CURRENT),
            ("value".into(),       METRICS_GAUGE_SET),
            ("count".into(),       METRICS_COUNTER_INC),
            ("level".into(),       METRICS_GAUGE_SET),
        ]);

        Self {
            exact: HashMap::new(),
            topic_keywords,
            payload_keys,
        }
    }

    /// Classify a topic + payload to the best-matching theta symbol.
    pub fn classify(&self, topic: &str, payload: &Value) -> u16 {
        // 1. Exact topic override
        if let Some(&sym) = self.exact.get(topic) {
            return sym;
        }

        // 2. Topic path keywords — rightmost segment first, longest match wins.
        // Longest-match is deterministic and prevents short keywords ("pin", "log")
        // from incorrectly firing on longer segments ("spindle", "analog").
        let normalized = topic.to_ascii_lowercase().replace('-', "_");
        for part in normalized.split('/').rev() {
            let best = self.topic_keywords.iter()
                .filter(|(kw, _)| part.contains(kw.as_str()))
                .max_by_key(|(kw, _)| kw.len());
            if let Some((_, &sym)) = best {
                return sym;
            }
        }

        // 3. Payload key heuristics
        if let Value::Object(map) = payload {
            for key in map.keys() {
                if let Some(&sym) = self.payload_keys.get(key.to_ascii_lowercase().as_str()) {
                    return sym;
                }
            }
        }

        // 4. Fallback
        SENSOR_POLL
    }

    /// Pin an exact MQTT topic to a specific theta symbol.
    pub fn register_topic(&mut self, topic: impl Into<String>, symbol: u16) {
        self.exact.insert(topic.into(), symbol);
    }

    /// Add a custom topic keyword → symbol mapping.
    pub fn register_keyword(&mut self, keyword: impl Into<String>, symbol: u16) {
        self.topic_keywords.insert(keyword.into().to_ascii_lowercase(), symbol);
    }
}

// ---------------------------------------------------------------------------
// Delta Filter
// ---------------------------------------------------------------------------

/// Suppresses numeric sensor readings that haven't changed enough.
///
/// Only applies to scalar numeric payloads and single-key dicts.
/// Multi-key objects and strings always pass through.
pub struct DeltaFilter {
    last_values: HashMap<String, f64>,
    threshold:   f64,
}

impl DeltaFilter {
    pub fn new(threshold: f64) -> Self {
        Self { last_values: HashMap::new(), threshold }
    }

    /// Returns `true` if this reading should be suppressed (not forwarded).
    pub fn should_suppress(&mut self, topic: &str, payload: &Value) -> bool {
        if self.threshold <= 0.0 {
            return false;
        }

        let value: Option<f64> = match payload {
            Value::Number(n) => n.as_f64(),
            Value::Object(map) if map.len() == 1 => {
                map.values().next().and_then(Value::as_f64)
            }
            _ => None,
        };

        let value = match value {
            Some(v) => v,
            None => return false, // non-scalar always passes
        };

        match self.last_values.get(topic).copied() {
            None => {
                self.last_values.insert(topic.to_string(), value);
                false // first reading always passes
            }
            Some(last) => {
                let changed = if last == 0.0 {
                    value.abs() > self.threshold
                } else {
                    (value - last).abs() / last.abs() >= self.threshold
                };
                if changed {
                    self.last_values.insert(topic.to_string(), value);
                }
                !changed
            }
        }
    }
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;

    fn clf() -> TopicClassifier { TopicClassifier::new() }
    fn null() -> Value { Value::Object(Default::default()) }

    // -- Sensor keywords --

    #[test] fn temp()        { assert_eq!(clf().classify("sensors/temperature", &null()), SENSOR_TEMP); }
    #[test] fn temp_abbrev() { assert_eq!(clf().classify("devices/node/temp",   &null()), SENSOR_TEMP); }
    #[test] fn humidity()    { assert_eq!(clf().classify("home/humidity",        &null()), SENSOR_HUMIDITY); }
    #[test] fn pressure()    { assert_eq!(clf().classify("tank/pressure",        &null()), SENSOR_PRESSURE); }
    #[test] fn voltage()     { assert_eq!(clf().classify("battery/voltage",      &null()), SENSOR_VOLTAGE); }
    #[test] fn alert()       { assert_eq!(clf().classify("zone/alert",           &null()), SENSOR_ALERT); }
    #[test] fn threshold()   { assert_eq!(clf().classify("cfg/threshold",        &null()), SENSOR_THRESHOLD); }

    // -- Actuator keywords --

    #[test] fn motor()   { assert_eq!(clf().classify("spindle/motor", &null()), ACTUATOR_MOTOR_START); }
    #[test] fn relay()   { assert_eq!(clf().classify("ctrl/relay",    &null()), ACTUATOR_RELAY_ON); }
    #[test] fn led()     { assert_eq!(clf().classify("ui/led",        &null()), ACTUATOR_LED_SET); }

    // -- GPIO keywords --

    #[test] fn gpio() { assert_eq!(clf().classify("rpi/gpio/17", &null()), GPIO_READ); }
    #[test] fn pwm()  { assert_eq!(clf().classify("rpi/pwm/ch0", &null()), GPIO_PWM_SET); }

    // -- Edge lifecycle --

    #[test] fn ota()    { assert_eq!(clf().classify("edge/ota",    &null()), EDGE_OTA_START); }
    #[test] fn twin()   { assert_eq!(clf().classify("device/twin", &null()), EDGE_TWIN_SYNC); }
    #[test] fn shadow() { assert_eq!(clf().classify("device/shadow",&null()),EDGE_TWIN_SYNC); }

    // -- Metrics --

    #[test] fn metrics()   { assert_eq!(clf().classify("app/metrics",   &null()), METRICS_GAUGE_SET); }
    #[test] fn telemetry() { assert_eq!(clf().classify("node/telemetry",&null()), METRICS_GAUGE_SET); }
    #[test] fn counter()   { assert_eq!(clf().classify("req/counter",   &null()), METRICS_COUNTER_INC); }
    #[test] fn status()    { assert_eq!(clf().classify("device/status", &null()), METRICS_GAUGE_CURRENT); }
    #[test] fn cmd()       { assert_eq!(clf().classify("device/cmd",    &null()), PROTO_MQTT_PUB); }

    // -- Rightmost segment wins --

    #[test]
    fn rightmost_wins() {
        // path "motor/temperature" → rightmost is "temperature" → SENSOR_TEMP
        assert_eq!(clf().classify("motor/temperature", &null()), SENSOR_TEMP);
    }

    // -- Payload key fallback --

    #[test]
    fn payload_temp() {
        let p = serde_json::json!({ "temperature": 22.5 });
        assert_eq!(clf().classify("data", &p), SENSOR_TEMP);
    }

    #[test]
    fn payload_rpm() {
        let p = serde_json::json!({ "rpm": 1200 });
        assert_eq!(clf().classify("data", &p), ACTUATOR_MOTOR_START);
    }

    // -- Fallback --

    #[test]
    fn unknown_falls_back_to_poll() {
        let p = serde_json::json!({ "blob": "abc" });
        assert_eq!(clf().classify("xyzzy/unknown", &p), SENSOR_POLL);
    }

    // -- Overrides --

    #[test]
    fn exact_override() {
        let mut c = clf();
        c.register_topic("sensors/temp", SENSOR_ALERT);
        assert_eq!(c.classify("sensors/temp", &null()), SENSOR_ALERT);
    }

    #[test]
    fn keyword_override() {
        // "gripper" contains no pre-existing keyword substrings
        let mut c = clf();
        c.register_keyword("gripper", ACTUATOR_SERVO_SET);
        assert_eq!(c.classify("machine/gripper", &null()), ACTUATOR_SERVO_SET);
    }

    // -- Case insensitivity --

    #[test]
    fn case_insensitive() {
        assert_eq!(clf().classify("SENSORS/TEMPERATURE", &null()), SENSOR_TEMP);
    }

    // -----------------------------------------------------------------------
    // Delta filter tests
    // -----------------------------------------------------------------------

    fn filter(threshold: f64) -> DeltaFilter { DeltaFilter::new(threshold) }

    #[test]
    fn first_reading_passes() {
        let mut f = filter(0.05);
        assert!(!f.should_suppress("t", &serde_json::json!(100.0)));
    }

    #[test]
    fn unchanged_suppressed() {
        let mut f = filter(0.05);
        f.should_suppress("t", &serde_json::json!(100.0));
        assert!(f.should_suppress("t", &serde_json::json!(100.0)));
    }

    #[test]
    fn small_change_suppressed() {
        let mut f = filter(0.05); // 5% threshold
        f.should_suppress("t", &serde_json::json!(100.0));
        assert!(f.should_suppress("t", &serde_json::json!(102.0))); // 2% < 5%
    }

    #[test]
    fn large_change_passes() {
        let mut f = filter(0.05);
        f.should_suppress("t", &serde_json::json!(100.0));
        assert!(!f.should_suppress("t", &serde_json::json!(110.0))); // 10% >= 5%
    }

    #[test]
    fn zero_threshold_passes_all() {
        let mut f = filter(0.0);
        f.should_suppress("t", &serde_json::json!(100.0));
        assert!(!f.should_suppress("t", &serde_json::json!(100.0)));
    }

    #[test]
    fn multi_key_dict_always_passes() {
        let mut f = filter(0.05);
        let p = serde_json::json!({ "a": 1, "b": 2 });
        assert!(!f.should_suppress("t", &p));
        assert!(!f.should_suppress("t", &p));
    }

    #[test]
    fn string_payload_always_passes() {
        let mut f = filter(0.05);
        let p = Value::String("hello".into());
        assert!(!f.should_suppress("t", &p));
        assert!(!f.should_suppress("t", &p));
    }

    #[test]
    fn independent_topics() {
        let mut f = filter(0.05);
        f.should_suppress("a", &serde_json::json!(100.0));
        f.should_suppress("b", &serde_json::json!(200.0));
        assert!(f.should_suppress("a",  &serde_json::json!(101.0))); // 1% — suppressed
        assert!(!f.should_suppress("b", &serde_json::json!(220.0))); // 10% — passes
    }
}