use std::collections::HashMap;
use serde_json::Value;
use crate::symbols::*;
pub struct TopicClassifier {
exact: HashMap<String, u16>,
topic_keywords: HashMap<String, u16>,
payload_keys: HashMap<String, u16>,
}
impl Default for TopicClassifier {
fn default() -> Self {
Self::new()
}
}
impl TopicClassifier {
pub fn new() -> Self {
let topic_keywords = HashMap::from([
("temp".into(), SENSOR_TEMP),
("temperature".into(), SENSOR_TEMP),
("humidity".into(), SENSOR_HUMIDITY),
("pressure".into(), SENSOR_PRESSURE),
("voltage".into(), SENSOR_VOLTAGE),
("current".into(), SENSOR_CURRENT),
("power".into(), SENSOR_POWER_DRAW),
("vibration".into(), SENSOR_VIBRATION),
("noise".into(), SENSOR_NOISE),
("altitude".into(), SENSOR_ALTITUDE),
("sensor".into(), SENSOR_POLL),
("threshold".into(), SENSOR_THRESHOLD),
("alert".into(), SENSOR_ALERT),
("motor".into(), ACTUATOR_MOTOR_START),
("servo".into(), ACTUATOR_SERVO_SET),
("relay".into(), ACTUATOR_RELAY_ON),
("solenoid".into(), ACTUATOR_SOLENOID_OPEN),
("stepper".into(), ACTUATOR_STEPPER_STEP),
("led".into(), ACTUATOR_LED_SET),
("buzzer".into(), ACTUATOR_BUZZER_ON),
("actuator".into(), ACTUATOR_MOTOR_START),
("gpio".into(), GPIO_READ),
("pin".into(), GPIO_READ),
("pwm".into(), GPIO_PWM_SET),
("analog".into(), GPIO_ANALOG_READ),
("adc".into(), GPIO_ANALOG_READ),
("dac".into(), GPIO_ANALOG_WRITE),
("ota".into(), EDGE_OTA_START),
("update".into(), EDGE_OTA_START),
("watchdog".into(), EDGE_WDT_KICK),
("wdt".into(), EDGE_WDT_KICK),
("sleep".into(), EDGE_SLEEP),
("wake".into(), EDGE_WAKE),
("twin".into(), EDGE_TWIN_SYNC),
("shadow".into(), EDGE_TWIN_SYNC),
("provision".into(), EDGE_PROVISION),
("reset".into(), EDGE_RESET),
("inference".into(), EDGE_INFER),
("metrics".into(), METRICS_GAUGE_SET),
("telemetry".into(), METRICS_GAUGE_SET),
("counter".into(), METRICS_COUNTER_INC),
("event".into(), METRICS_EVENT_EMIT),
("log".into(), METRICS_EVENT_EMIT),
("status".into(), METRICS_GAUGE_CURRENT),
("cmd".into(), PROTO_MQTT_PUB),
("command".into(), PROTO_MQTT_PUB),
]);
let payload_keys = HashMap::from([
("temperature".into(), SENSOR_TEMP),
("temp".into(), SENSOR_TEMP),
("humidity".into(), SENSOR_HUMIDITY),
("pressure".into(), SENSOR_PRESSURE),
("voltage".into(), SENSOR_VOLTAGE),
("current".into(), SENSOR_CURRENT),
("rpm".into(), ACTUATOR_MOTOR_START),
("speed".into(), ACTUATOR_MOTOR_START),
("angle".into(), ACTUATOR_SERVO_SET),
("state".into(), METRICS_GAUGE_CURRENT),
("value".into(), METRICS_GAUGE_SET),
("count".into(), METRICS_COUNTER_INC),
("level".into(), METRICS_GAUGE_SET),
]);
Self {
exact: HashMap::new(),
topic_keywords,
payload_keys,
}
}
pub fn classify(&self, topic: &str, payload: &Value) -> u16 {
if let Some(&sym) = self.exact.get(topic) {
return sym;
}
let normalized = topic.to_ascii_lowercase().replace('-', "_");
for part in normalized.split('/').rev() {
let best = self.topic_keywords.iter()
.filter(|(kw, _)| part.contains(kw.as_str()))
.max_by_key(|(kw, _)| kw.len());
if let Some((_, &sym)) = best {
return sym;
}
}
if let Value::Object(map) = payload {
for key in map.keys() {
if let Some(&sym) = self.payload_keys.get(key.to_ascii_lowercase().as_str()) {
return sym;
}
}
}
SENSOR_POLL
}
pub fn register_topic(&mut self, topic: impl Into<String>, symbol: u16) {
self.exact.insert(topic.into(), symbol);
}
pub fn register_keyword(&mut self, keyword: impl Into<String>, symbol: u16) {
self.topic_keywords.insert(keyword.into().to_ascii_lowercase(), symbol);
}
}
pub struct DeltaFilter {
last_values: HashMap<String, f64>,
threshold: f64,
}
impl DeltaFilter {
pub fn new(threshold: f64) -> Self {
Self { last_values: HashMap::new(), threshold }
}
pub fn should_suppress(&mut self, topic: &str, payload: &Value) -> bool {
if self.threshold <= 0.0 {
return false;
}
let value: Option<f64> = match payload {
Value::Number(n) => n.as_f64(),
Value::Object(map) if map.len() == 1 => {
map.values().next().and_then(Value::as_f64)
}
_ => None,
};
let value = match value {
Some(v) => v,
None => return false, };
match self.last_values.get(topic).copied() {
None => {
self.last_values.insert(topic.to_string(), value);
false }
Some(last) => {
let changed = if last == 0.0 {
value.abs() > self.threshold
} else {
(value - last).abs() / last.abs() >= self.threshold
};
if changed {
self.last_values.insert(topic.to_string(), value);
}
!changed
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn clf() -> TopicClassifier { TopicClassifier::new() }
fn null() -> Value { Value::Object(Default::default()) }
#[test] fn temp() { assert_eq!(clf().classify("sensors/temperature", &null()), SENSOR_TEMP); }
#[test] fn temp_abbrev() { assert_eq!(clf().classify("devices/node/temp", &null()), SENSOR_TEMP); }
#[test] fn humidity() { assert_eq!(clf().classify("home/humidity", &null()), SENSOR_HUMIDITY); }
#[test] fn pressure() { assert_eq!(clf().classify("tank/pressure", &null()), SENSOR_PRESSURE); }
#[test] fn voltage() { assert_eq!(clf().classify("battery/voltage", &null()), SENSOR_VOLTAGE); }
#[test] fn alert() { assert_eq!(clf().classify("zone/alert", &null()), SENSOR_ALERT); }
#[test] fn threshold() { assert_eq!(clf().classify("cfg/threshold", &null()), SENSOR_THRESHOLD); }
#[test] fn motor() { assert_eq!(clf().classify("spindle/motor", &null()), ACTUATOR_MOTOR_START); }
#[test] fn relay() { assert_eq!(clf().classify("ctrl/relay", &null()), ACTUATOR_RELAY_ON); }
#[test] fn led() { assert_eq!(clf().classify("ui/led", &null()), ACTUATOR_LED_SET); }
#[test] fn gpio() { assert_eq!(clf().classify("rpi/gpio/17", &null()), GPIO_READ); }
#[test] fn pwm() { assert_eq!(clf().classify("rpi/pwm/ch0", &null()), GPIO_PWM_SET); }
#[test] fn ota() { assert_eq!(clf().classify("edge/ota", &null()), EDGE_OTA_START); }
#[test] fn twin() { assert_eq!(clf().classify("device/twin", &null()), EDGE_TWIN_SYNC); }
#[test] fn shadow() { assert_eq!(clf().classify("device/shadow",&null()),EDGE_TWIN_SYNC); }
#[test] fn metrics() { assert_eq!(clf().classify("app/metrics", &null()), METRICS_GAUGE_SET); }
#[test] fn telemetry() { assert_eq!(clf().classify("node/telemetry",&null()), METRICS_GAUGE_SET); }
#[test] fn counter() { assert_eq!(clf().classify("req/counter", &null()), METRICS_COUNTER_INC); }
#[test] fn status() { assert_eq!(clf().classify("device/status", &null()), METRICS_GAUGE_CURRENT); }
#[test] fn cmd() { assert_eq!(clf().classify("device/cmd", &null()), PROTO_MQTT_PUB); }
#[test]
fn rightmost_wins() {
assert_eq!(clf().classify("motor/temperature", &null()), SENSOR_TEMP);
}
#[test]
fn payload_temp() {
let p = serde_json::json!({ "temperature": 22.5 });
assert_eq!(clf().classify("data", &p), SENSOR_TEMP);
}
#[test]
fn payload_rpm() {
let p = serde_json::json!({ "rpm": 1200 });
assert_eq!(clf().classify("data", &p), ACTUATOR_MOTOR_START);
}
#[test]
fn unknown_falls_back_to_poll() {
let p = serde_json::json!({ "blob": "abc" });
assert_eq!(clf().classify("xyzzy/unknown", &p), SENSOR_POLL);
}
#[test]
fn exact_override() {
let mut c = clf();
c.register_topic("sensors/temp", SENSOR_ALERT);
assert_eq!(c.classify("sensors/temp", &null()), SENSOR_ALERT);
}
#[test]
fn keyword_override() {
let mut c = clf();
c.register_keyword("gripper", ACTUATOR_SERVO_SET);
assert_eq!(c.classify("machine/gripper", &null()), ACTUATOR_SERVO_SET);
}
#[test]
fn case_insensitive() {
assert_eq!(clf().classify("SENSORS/TEMPERATURE", &null()), SENSOR_TEMP);
}
fn filter(threshold: f64) -> DeltaFilter { DeltaFilter::new(threshold) }
#[test]
fn first_reading_passes() {
let mut f = filter(0.05);
assert!(!f.should_suppress("t", &serde_json::json!(100.0)));
}
#[test]
fn unchanged_suppressed() {
let mut f = filter(0.05);
f.should_suppress("t", &serde_json::json!(100.0));
assert!(f.should_suppress("t", &serde_json::json!(100.0)));
}
#[test]
fn small_change_suppressed() {
let mut f = filter(0.05); f.should_suppress("t", &serde_json::json!(100.0));
assert!(f.should_suppress("t", &serde_json::json!(102.0))); }
#[test]
fn large_change_passes() {
let mut f = filter(0.05);
f.should_suppress("t", &serde_json::json!(100.0));
assert!(!f.should_suppress("t", &serde_json::json!(110.0))); }
#[test]
fn zero_threshold_passes_all() {
let mut f = filter(0.0);
f.should_suppress("t", &serde_json::json!(100.0));
assert!(!f.should_suppress("t", &serde_json::json!(100.0)));
}
#[test]
fn multi_key_dict_always_passes() {
let mut f = filter(0.05);
let p = serde_json::json!({ "a": 1, "b": 2 });
assert!(!f.should_suppress("t", &p));
assert!(!f.should_suppress("t", &p));
}
#[test]
fn string_payload_always_passes() {
let mut f = filter(0.05);
let p = Value::String("hello".into());
assert!(!f.should_suppress("t", &p));
assert!(!f.should_suppress("t", &p));
}
#[test]
fn independent_topics() {
let mut f = filter(0.05);
f.should_suppress("a", &serde_json::json!(100.0));
f.should_suppress("b", &serde_json::json!(200.0));
assert!(f.should_suppress("a", &serde_json::json!(101.0))); assert!(!f.should_suppress("b", &serde_json::json!(220.0))); }
}