mod sensor_parser;
mod state_parser;
pub use sensor_parser::{EnergyReading, SensorData, StatusSnsResponse};
pub use state_parser::TelemetryState;
use crate::error::ParseError;
use crate::state::StateChange;
#[derive(Debug, Clone)]
pub enum TelemetryMessage {
State {
device_topic: String,
state: TelemetryState,
},
Sensor {
device_topic: String,
data: SensorData,
},
LastWill {
device_topic: String,
online: bool,
},
Result {
device_topic: String,
payload: String,
},
}
impl TelemetryMessage {
#[must_use]
pub fn device_topic(&self) -> &str {
match self {
Self::State { device_topic, .. }
| Self::Sensor { device_topic, .. }
| Self::LastWill { device_topic, .. }
| Self::Result { device_topic, .. } => device_topic,
}
}
#[must_use]
pub fn to_state_changes(&self) -> Vec<StateChange> {
match self {
Self::State { state, .. } => state.to_state_changes(),
Self::Sensor { data, .. } => data.to_state_changes(),
Self::LastWill { .. } | Self::Result { .. } => Vec::new(),
}
}
#[must_use]
pub fn to_system_info(&self) -> Option<crate::state::SystemInfo> {
match self {
Self::State { state, .. } => {
let info = state.to_system_info();
if info.is_empty() { None } else { Some(info) }
}
_ => None,
}
}
#[must_use]
pub fn is_online(&self) -> bool {
matches!(self, Self::LastWill { online: true, .. })
}
#[must_use]
pub fn is_offline(&self) -> bool {
matches!(self, Self::LastWill { online: false, .. })
}
}
pub fn parse_telemetry(topic: &str, payload: &str) -> Result<TelemetryMessage, ParseError> {
let parts: Vec<&str> = topic.split('/').collect();
if parts.len() < 3 {
return Err(ParseError::UnexpectedFormat(format!(
"Invalid topic format: {topic}"
)));
}
let prefix = parts[0];
let device_topic = parts[1].to_string();
let suffix = parts[2];
match (prefix, suffix) {
("tele", "STATE") => {
let state = state_parser::parse_state(payload)?;
Ok(TelemetryMessage::State {
device_topic,
state,
})
}
("tele", "SENSOR") => {
let data = sensor_parser::parse_sensor(payload)?;
Ok(TelemetryMessage::Sensor { device_topic, data })
}
("tele", "LWT") => {
let online = payload.eq_ignore_ascii_case("online");
Ok(TelemetryMessage::LastWill {
device_topic,
online,
})
}
("stat", "RESULT") => Ok(TelemetryMessage::Result {
device_topic,
payload: payload.to_string(),
}),
_ => Err(ParseError::UnexpectedFormat(format!(
"Unknown topic type: {prefix}/{suffix}"
))),
}
}
#[must_use]
pub fn extract_device_topic(mqtt_topic: &str) -> Option<&str> {
let parts: Vec<&str> = mqtt_topic.split('/').collect();
if parts.len() >= 3 {
Some(parts[1])
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_state_message() {
let msg = parse_telemetry("tele/tasmota_bulb/STATE", r#"{"POWER":"ON"}"#).unwrap();
assert_eq!(msg.device_topic(), "tasmota_bulb");
assert!(matches!(msg, TelemetryMessage::State { .. }));
}
#[test]
fn parse_sensor_message() {
let msg = parse_telemetry(
"tele/smart_plug/SENSOR",
r#"{"Time":"2024-01-01T12:00:00","ENERGY":{"Power":100}}"#,
)
.unwrap();
assert_eq!(msg.device_topic(), "smart_plug");
assert!(matches!(msg, TelemetryMessage::Sensor { .. }));
}
#[test]
fn parse_lwt_online() {
let msg = parse_telemetry("tele/device/LWT", "Online").unwrap();
assert_eq!(msg.device_topic(), "device");
assert!(msg.is_online());
assert!(!msg.is_offline());
}
#[test]
fn parse_lwt_offline() {
let msg = parse_telemetry("tele/device/LWT", "Offline").unwrap();
assert!(msg.is_offline());
assert!(!msg.is_online());
}
#[test]
fn parse_result_message() {
let msg = parse_telemetry("stat/device/RESULT", r#"{"POWER":"ON"}"#).unwrap();
assert!(matches!(msg, TelemetryMessage::Result { .. }));
}
#[test]
fn extract_device_topic_valid() {
assert_eq!(
extract_device_topic("tele/tasmota_bulb/STATE"),
Some("tasmota_bulb")
);
assert_eq!(
extract_device_topic("stat/my_device/RESULT"),
Some("my_device")
);
assert_eq!(extract_device_topic("cmnd/switch/POWER"), Some("switch"));
}
#[test]
fn extract_device_topic_invalid() {
assert_eq!(extract_device_topic("invalid"), None);
assert_eq!(extract_device_topic("only/two"), None);
}
#[test]
fn invalid_topic_format() {
let result = parse_telemetry("invalid", "{}");
assert!(result.is_err());
}
#[test]
fn unknown_topic_type() {
let result = parse_telemetry("foo/device/BAR", "{}");
assert!(result.is_err());
}
#[test]
fn telemetry_message_to_system_info_state() {
use std::time::Duration;
let msg = parse_telemetry(
"tele/device/STATE",
r#"{"UptimeSec":172800,"Wifi":{"Signal":-55}}"#,
)
.unwrap();
let info = msg.to_system_info();
assert!(info.is_some());
let info = info.unwrap();
assert_eq!(info.uptime(), Some(Duration::from_secs(172800)));
assert_eq!(info.wifi_rssi(), Some(-55));
}
#[test]
fn telemetry_message_to_system_info_none_for_sensor() {
let msg = parse_telemetry("tele/device/SENSOR", r#"{"ENERGY":{"Power":100}}"#).unwrap();
assert!(msg.to_system_info().is_none());
}
#[test]
fn telemetry_message_to_system_info_none_for_lwt() {
let msg = parse_telemetry("tele/device/LWT", "Online").unwrap();
assert!(msg.to_system_info().is_none());
}
#[test]
fn telemetry_message_to_system_info_none_when_empty() {
let msg = parse_telemetry("tele/device/STATE", r#"{"POWER":"ON"}"#).unwrap();
assert!(msg.to_system_info().is_none());
}
}