use crate::address::{PayloadFormat, TopicAddress, ValueType};
use crate::error::{MqttError, MqttResult};
#[derive(Debug, Clone, PartialEq)]
pub enum DecodedValue {
Int32(i32),
Float64(f64),
UInt32(u32),
String(String),
Int32Array(Vec<i32>),
Float64Array(Vec<f64>),
}
pub fn decode_payload(raw: &str, addr: &TopicAddress) -> MqttResult<DecodedValue> {
match addr.format {
PayloadFormat::Flat => decode_flat(raw, addr.value_type),
PayloadFormat::Json => {
let field = addr.json_field.as_deref().ok_or_else(|| {
MqttError::InvalidAddress("JSON format requires a field path".into())
})?;
decode_json(raw, addr.value_type, field)
}
}
}
pub fn encode_payload(value: &DecodedValue, addr: &TopicAddress) -> String {
let value = if addr.normalize_on_off {
normalize_value(value)
} else {
value.clone()
};
match addr.format {
PayloadFormat::Flat => encode_flat(&value),
PayloadFormat::Json => {
let field = addr.json_field.as_deref().unwrap_or("value");
encode_json(&value, field)
}
}
}
pub fn encode_flat(value: &DecodedValue) -> String {
match value {
DecodedValue::Int32(v) => v.to_string(),
DecodedValue::Float64(v) => format!("{v:.6}"),
DecodedValue::UInt32(v) => v.to_string(),
DecodedValue::String(v) => v.clone(),
DecodedValue::Int32Array(v) => {
let parts: Vec<String> = v.iter().map(|x| x.to_string()).collect();
parts.join(",")
}
DecodedValue::Float64Array(v) => {
let parts: Vec<String> = v.iter().map(|x| format_ostream_double(*x)).collect();
parts.join(",")
}
}
}
fn format_ostream_double(v: f64) -> String {
const SIG: usize = 6;
if v == 0.0 {
return if v.is_sign_negative() {
"-0".to_string()
} else {
"0".to_string()
};
}
if !v.is_finite() {
if v.is_nan() {
return "nan".to_string();
}
return if v < 0.0 {
"-inf".to_string()
} else {
"inf".to_string()
};
}
let sci = format!("{:.*e}", SIG - 1, v);
let (mantissa, exp_str) = sci.split_once('e').expect("LowerExp always has 'e'");
let exp: i32 = exp_str.parse().expect("LowerExp exponent is an integer");
if (-4..SIG as i32).contains(&exp) {
let frac = (SIG as i32 - 1 - exp).max(0) as usize;
strip_trailing_zeros(&format!("{v:.frac$}"))
} else {
let mantissa = strip_trailing_zeros(mantissa);
let sign = if exp < 0 { '-' } else { '+' };
format!("{mantissa}e{sign}{:02}", exp.abs())
}
}
fn strip_trailing_zeros(s: &str) -> String {
if s.contains('.') {
s.trim_end_matches('0').trim_end_matches('.').to_string()
} else {
s.to_string()
}
}
pub fn normalize_on_off(s: &str) -> String {
match s.trim().to_ascii_lowercase().as_str() {
"1" | "on" | "true" => "ON".to_string(),
"0" | "off" | "false" => "OFF".to_string(),
_ => s.to_string(),
}
}
fn normalize_value(value: &DecodedValue) -> DecodedValue {
match value {
DecodedValue::String(s) => DecodedValue::String(normalize_on_off(s)),
other => other.clone(),
}
}
fn encode_json(value: &DecodedValue, field_path: &str) -> String {
let json_value = match value {
DecodedValue::Int32(v) => serde_json::Value::from(*v),
DecodedValue::Float64(v) => serde_json::Value::from(*v),
DecodedValue::UInt32(v) => serde_json::Value::from(*v),
DecodedValue::String(v) => serde_json::Value::from(v.as_str()),
DecodedValue::Int32Array(v) => serde_json::Value::from(v.as_slice()),
DecodedValue::Float64Array(v) => serde_json::Value::from(v.as_slice()),
};
let keys: Vec<&str> = field_path.split('.').collect();
let mut result = json_value;
for key in keys.iter().rev() {
let mut obj = serde_json::Map::new();
obj.insert((*key).to_string(), result);
result = serde_json::Value::Object(obj);
}
result.to_string()
}
fn boolean_payload_to_int(s: &str) -> Option<i64> {
match s {
"true" => Some(1),
"false" => Some(0),
_ => None,
}
}
fn decode_flat(raw: &str, value_type: ValueType) -> MqttResult<DecodedValue> {
let trimmed = raw.trim();
match value_type {
ValueType::Int => {
if let Some(b) = boolean_payload_to_int(trimmed) {
return Ok(DecodedValue::Int32(b as i32));
}
let v: i32 = trimmed
.parse()
.map_err(|e| MqttError::ValueConversion(format!("INT parse: {e}")))?;
Ok(DecodedValue::Int32(v))
}
ValueType::Float => {
let v: f64 = trimmed
.parse()
.map_err(|e| MqttError::ValueConversion(format!("FLOAT parse: {e}")))?;
Ok(DecodedValue::Float64(v))
}
ValueType::Digital => {
if let Some(b) = boolean_payload_to_int(trimmed) {
return Ok(DecodedValue::UInt32(b as u32));
}
let v: u32 = trimmed
.parse()
.map_err(|e| MqttError::ValueConversion(format!("DIGITAL parse: {e}")))?;
Ok(DecodedValue::UInt32(v))
}
ValueType::String => Ok(DecodedValue::String(trimmed.to_string())),
ValueType::IntArray => {
let v = parse_int_array(trimmed)?;
Ok(DecodedValue::Int32Array(v))
}
ValueType::FloatArray => {
let v = parse_float_array(trimmed)?;
Ok(DecodedValue::Float64Array(v))
}
}
}
fn decode_json(raw: &str, value_type: ValueType, field_path: &str) -> MqttResult<DecodedValue> {
let json: serde_json::Value = serde_json::from_str(raw)?;
let value = extract_json_field(&json, field_path)
.ok_or_else(|| MqttError::JsonFieldNotFound(field_path.to_string()))?;
if value.is_null() {
return Err(MqttError::JsonFieldNotFound(field_path.to_string()));
}
let carrier = match value {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
if value_type == ValueType::String {
return Ok(DecodedValue::String(carrier));
}
decode_flat(&carrier, value_type)
}
fn extract_json_field<'a>(
json: &'a serde_json::Value,
target_key: &str,
) -> Option<&'a serde_json::Value> {
match json {
serde_json::Value::Object(map) => {
for (key, value) in map {
if key == target_key {
return Some(value);
}
if let Some(found) = extract_json_field(value, target_key) {
return Some(found);
}
}
None
}
serde_json::Value::Array(elems) => {
for elem in elems {
if let Some(found) = extract_json_field(elem, target_key) {
return Some(found);
}
}
None
}
_ => None,
}
}
fn parse_int_array(s: &str) -> MqttResult<Vec<i32>> {
let s = s.trim_start_matches('[').trim_end_matches(']');
let separator = if s.contains(',') { ',' } else { ' ' };
s.split(separator)
.map(|part| {
part.trim()
.parse::<i32>()
.map_err(|e| MqttError::ValueConversion(format!("INTARRAY element: {e}")))
})
.collect()
}
fn parse_float_array(s: &str) -> MqttResult<Vec<f64>> {
let s = s.trim_start_matches('[').trim_end_matches(']');
let separator = if s.contains(',') { ',' } else { ' ' };
s.split(separator)
.map(|part| {
part.trim()
.parse::<f64>()
.map_err(|e| MqttError::ValueConversion(format!("FLOATARRAY element: {e}")))
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::address::TopicAddress;
#[test]
fn decode_flat_int() {
let addr = TopicAddress::parse("FLAT:INT test/t").unwrap();
let val = decode_payload("42", &addr).unwrap();
assert_eq!(val, DecodedValue::Int32(42));
}
#[test]
fn decode_flat_int_negative() {
let addr = TopicAddress::parse("FLAT:INT test/t").unwrap();
let val = decode_payload("-7", &addr).unwrap();
assert_eq!(val, DecodedValue::Int32(-7));
}
#[test]
fn decode_flat_int_whitespace() {
let addr = TopicAddress::parse("FLAT:INT test/t").unwrap();
let val = decode_payload(" 100 \n", &addr).unwrap();
assert_eq!(val, DecodedValue::Int32(100));
}
#[test]
fn decode_flat_float() {
let addr = TopicAddress::parse("FLAT:FLOAT test/t").unwrap();
let val = decode_payload("3.15", &addr).unwrap();
assert_eq!(val, DecodedValue::Float64(3.15));
}
#[test]
fn decode_flat_digital() {
let addr = TopicAddress::parse("FLAT:DIGITAL test/t").unwrap();
let val = decode_payload("255", &addr).unwrap();
assert_eq!(val, DecodedValue::UInt32(255));
}
#[test]
fn decode_flat_string() {
let addr = TopicAddress::parse("FLAT:STRING test/t").unwrap();
let val = decode_payload("hello world", &addr).unwrap();
assert_eq!(val, DecodedValue::String("hello world".into()));
}
#[test]
fn decode_flat_int_array_comma() {
let addr = TopicAddress::parse("FLAT:INTARRAY test/t").unwrap();
let val = decode_payload("1,2,3,4", &addr).unwrap();
assert_eq!(val, DecodedValue::Int32Array(vec![1, 2, 3, 4]));
}
#[test]
fn decode_flat_int_array_brackets() {
let addr = TopicAddress::parse("FLAT:INTARRAY test/t").unwrap();
let val = decode_payload("[10, 20, 30]", &addr).unwrap();
assert_eq!(val, DecodedValue::Int32Array(vec![10, 20, 30]));
}
#[test]
fn decode_flat_float_array() {
let addr = TopicAddress::parse("FLAT:FLOATARRAY test/t").unwrap();
let val = decode_payload("1.1,2.2,3.3", &addr).unwrap();
assert_eq!(val, DecodedValue::Float64Array(vec![1.1, 2.2, 3.3]));
}
#[test]
fn decode_flat_invalid_int() {
let addr = TopicAddress::parse("FLAT:INT test/t").unwrap();
assert!(decode_payload("not_a_number", &addr).is_err());
}
#[test]
fn decode_json_float() {
let addr = TopicAddress::parse("JSON:FLOAT sensors/data humidity").unwrap();
let val = decode_payload(r#"{"humidity": 65.5}"#, &addr).unwrap();
assert_eq!(val, DecodedValue::Float64(65.5));
}
#[test]
fn decode_json_nested() {
let addr = TopicAddress::parse("JSON:INT sensors/data value").unwrap();
let val = decode_payload(r#"{"reading": {"value": 42}}"#, &addr).unwrap();
assert_eq!(val, DecodedValue::Int32(42));
}
#[test]
fn decode_json_deeply_nested() {
let addr = TopicAddress::parse("JSON:FLOAT device/data c").unwrap();
let val = decode_payload(r#"{"a": {"b": {"c": 9.99}}}"#, &addr).unwrap();
assert_eq!(val, DecodedValue::Float64(9.99));
}
#[test]
fn decode_json_whole_key_with_dot() {
let addr = TopicAddress::parse("JSON:INT dev/data a.b").unwrap();
let val = decode_payload(r#"{"a.b": 7}"#, &addr).unwrap();
assert_eq!(val, DecodedValue::Int32(7));
}
#[test]
fn decode_json_dotted_field_is_not_path_split() {
let addr = TopicAddress::parse("JSON:INT sensors/data reading.value").unwrap();
assert!(decode_payload(r#"{"reading": {"value": 42}}"#, &addr).is_err());
}
#[test]
fn decode_json_recurses_into_array_elements() {
let addr = TopicAddress::parse("JSON:FLOAT dev/data temp").unwrap();
let val = decode_payload(r#"{"sensors": [{"id": 1}, {"temp": 2.5}]}"#, &addr).unwrap();
assert_eq!(val, DecodedValue::Float64(2.5));
}
#[test]
fn decode_json_string() {
let addr = TopicAddress::parse("JSON:STRING device/data status").unwrap();
let val = decode_payload(r#"{"status": "OK"}"#, &addr).unwrap();
assert_eq!(val, DecodedValue::String("OK".into()));
}
#[test]
fn decode_json_string_non_string_value() {
let addr = TopicAddress::parse("JSON:STRING device/data count").unwrap();
let val = decode_payload(r#"{"count": 42}"#, &addr).unwrap();
assert_eq!(val, DecodedValue::String("42".into()));
}
#[test]
fn decode_json_field_not_found() {
let addr = TopicAddress::parse("JSON:FLOAT sensors/data missing").unwrap();
assert!(decode_payload(r#"{"other": 1.0}"#, &addr).is_err());
}
#[test]
fn decode_json_type_mismatch() {
let addr = TopicAddress::parse("JSON:INT sensors/data value").unwrap();
assert!(decode_payload(r#"{"value": "not_a_number"}"#, &addr).is_err());
}
#[test]
fn decode_json_invalid_json() {
let addr = TopicAddress::parse("JSON:INT sensors/data value").unwrap();
assert!(decode_payload("not json at all", &addr).is_err());
}
#[test]
fn decode_flat_int_boolean() {
let addr = TopicAddress::parse("FLAT:INT test/t").unwrap();
assert_eq!(
decode_payload("true", &addr).unwrap(),
DecodedValue::Int32(1)
);
assert_eq!(
decode_payload("false", &addr).unwrap(),
DecodedValue::Int32(0)
);
}
#[test]
fn decode_flat_digital_boolean() {
let addr = TopicAddress::parse("FLAT:DIGITAL test/t").unwrap();
assert_eq!(
decode_payload("true", &addr).unwrap(),
DecodedValue::UInt32(1)
);
assert_eq!(
decode_payload("false", &addr).unwrap(),
DecodedValue::UInt32(0)
);
}
#[test]
fn decode_flat_float_rejects_boolean() {
let addr = TopicAddress::parse("FLAT:FLOAT test/t").unwrap();
assert!(decode_payload("true", &addr).is_err());
}
#[test]
fn decode_json_int_boolean() {
let addr = TopicAddress::parse("JSON:INT dev/data flag").unwrap();
assert_eq!(
decode_payload(r#"{"flag": true}"#, &addr).unwrap(),
DecodedValue::Int32(1)
);
assert_eq!(
decode_payload(r#"{"flag": false}"#, &addr).unwrap(),
DecodedValue::Int32(0)
);
}
#[test]
fn decode_json_digital_boolean() {
let addr = TopicAddress::parse("JSON:DIGITAL dev/data flag").unwrap();
assert_eq!(
decode_payload(r#"{"flag": true}"#, &addr).unwrap(),
DecodedValue::UInt32(1)
);
}
#[test]
fn decode_json_int_string_boolean() {
let addr = TopicAddress::parse("JSON:INT dev/data flag").unwrap();
assert_eq!(
decode_payload(r#"{"flag": "true"}"#, &addr).unwrap(),
DecodedValue::Int32(1)
);
}
#[test]
fn decode_json_float_rejects_boolean() {
let addr = TopicAddress::parse("JSON:FLOAT dev/data flag").unwrap();
assert!(decode_payload(r#"{"flag": true}"#, &addr).is_err());
}
#[test]
fn decode_json_int_array() {
let addr = TopicAddress::parse("JSON:INTARRAY sensors/data readings").unwrap();
let val = decode_payload(r#"{"readings": [10, 20, 30]}"#, &addr).unwrap();
assert_eq!(val, DecodedValue::Int32Array(vec![10, 20, 30]));
}
#[test]
fn decode_json_int_array_nested() {
let addr = TopicAddress::parse("JSON:INTARRAY dev/data b").unwrap();
let val = decode_payload(r#"{"a": {"b": [1, 2, 3, 4]}}"#, &addr).unwrap();
assert_eq!(val, DecodedValue::Int32Array(vec![1, 2, 3, 4]));
}
#[test]
fn decode_json_float_array() {
let addr = TopicAddress::parse("JSON:FLOATARRAY sensors/data temps").unwrap();
let val = decode_payload(r#"{"temps": [1.5, 2.5]}"#, &addr).unwrap();
assert_eq!(val, DecodedValue::Float64Array(vec![1.5, 2.5]));
}
#[test]
fn decode_json_scalar_number_is_single_element_int_array() {
let addr = TopicAddress::parse("JSON:INTARRAY sensors/data readings").unwrap();
let val = decode_payload(r#"{"readings": 42}"#, &addr).unwrap();
assert_eq!(val, DecodedValue::Int32Array(vec![42]));
}
#[test]
fn decode_json_string_carried_numeric_and_array() {
let int = TopicAddress::parse("JSON:INT dev/d v").unwrap();
assert_eq!(
decode_payload(r#"{"v": "42"}"#, &int).unwrap(),
DecodedValue::Int32(42)
);
let float = TopicAddress::parse("JSON:FLOAT dev/d v").unwrap();
assert_eq!(
decode_payload(r#"{"v": "12.5"}"#, &float).unwrap(),
DecodedValue::Float64(12.5)
);
let digital = TopicAddress::parse("JSON:DIGITAL dev/d v").unwrap();
assert_eq!(
decode_payload(r#"{"v": "7"}"#, &digital).unwrap(),
DecodedValue::UInt32(7)
);
let int_arr = TopicAddress::parse("JSON:INTARRAY dev/d v").unwrap();
assert_eq!(
decode_payload(r#"{"v": "[1,2,3]"}"#, &int_arr).unwrap(),
DecodedValue::Int32Array(vec![1, 2, 3])
);
let float_arr = TopicAddress::parse("JSON:FLOATARRAY dev/d v").unwrap();
assert_eq!(
decode_payload(r#"{"v": "[1.5, 2.5]"}"#, &float_arr).unwrap(),
DecodedValue::Float64Array(vec![1.5, 2.5])
);
}
#[test]
fn decode_json_explicit_null_is_not_found() {
let addr = TopicAddress::parse("JSON:INT dev/d v").unwrap();
assert!(decode_payload(r#"{"v": null}"#, &addr).is_err());
}
#[test]
fn decode_flat_int_array_for_inbound_delivery() {
let addr = TopicAddress::parse("FLAT:INTARRAY test/arr").unwrap();
let val = decode_payload("[5, 6, 7]", &addr).unwrap();
assert_eq!(val, DecodedValue::Int32Array(vec![5, 6, 7]));
}
#[test]
fn encode_flat_values() {
assert_eq!(encode_flat(&DecodedValue::Int32(42)), "42");
assert_eq!(encode_flat(&DecodedValue::Float64(3.15)), "3.150000");
assert_eq!(encode_flat(&DecodedValue::Float64(22.5)), "22.500000");
assert_eq!(encode_flat(&DecodedValue::UInt32(255)), "255");
assert_eq!(encode_flat(&DecodedValue::String("hello".into())), "hello");
}
#[test]
fn encode_flat_arrays() {
assert_eq!(
encode_flat(&DecodedValue::Int32Array(vec![1, 2, 3])),
"1,2,3"
);
assert_eq!(
encode_flat(&DecodedValue::Float64Array(vec![1.1, 2.2])),
"1.1,2.2"
);
assert_eq!(
encode_flat(&DecodedValue::Float64Array(vec![
std::f64::consts::PI,
1234567.0
])),
"3.14159,1.23457e+06"
);
}
#[test]
fn format_ostream_double_matches_g6() {
assert_eq!(format_ostream_double(22.5), "22.5");
assert_eq!(format_ostream_double(1.0), "1");
assert_eq!(format_ostream_double(100.0), "100");
assert_eq!(format_ostream_double(0.1), "0.1");
assert_eq!(format_ostream_double(std::f64::consts::PI), "3.14159");
assert_eq!(format_ostream_double(0.0001234567), "0.000123457");
assert_eq!(format_ostream_double(999999.0), "999999");
assert_eq!(format_ostream_double(-22.5), "-22.5");
assert_eq!(format_ostream_double(1234567.0), "1.23457e+06");
assert_eq!(format_ostream_double(1000000.0), "1e+06");
assert_eq!(format_ostream_double(0.00001234567), "1.23457e-05");
assert_eq!(format_ostream_double(9999999.0), "1e+07");
assert_eq!(format_ostream_double(0.0), "0");
assert_eq!(format_ostream_double(-0.0), "-0");
}
#[test]
fn encode_json_string() {
let addr = TopicAddress::parse("JSON:STRING zigbee2mqtt/plug/set state").unwrap();
let result = encode_payload(&DecodedValue::String("ON".into()), &addr);
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["state"], "ON");
}
#[test]
fn encode_json_int() {
let addr = TopicAddress::parse("JSON:INT zigbee2mqtt/light/set brightness").unwrap();
let result = encode_payload(&DecodedValue::Int32(128), &addr);
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["brightness"], 128);
}
#[test]
fn encode_json_float() {
let addr = TopicAddress::parse("JSON:FLOAT device/set temperature").unwrap();
let result = encode_payload(&DecodedValue::Float64(22.5), &addr);
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["temperature"], 22.5);
}
#[test]
fn encode_json_nested() {
let addr = TopicAddress::parse("JSON:INT device/set settings.brightness").unwrap();
let result = encode_payload(&DecodedValue::Int32(200), &addr);
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["settings"]["brightness"], 200);
}
#[test]
fn encode_payload_flat_passthrough() {
let addr = TopicAddress::parse("FLAT:INT test/t").unwrap();
assert_eq!(encode_payload(&DecodedValue::Int32(42), &addr), "42");
}
fn addr_with_normalize(drv_info: &str) -> TopicAddress {
let mut addr = TopicAddress::parse(drv_info).unwrap();
addr.normalize_on_off = true;
addr
}
#[test]
fn normalize_on_variants() {
let addr = addr_with_normalize("JSON:STRING device/set state");
for input in &["1", "on", "On", "ON", "true", "TRUE", "True"] {
let result = encode_payload(&DecodedValue::String(input.to_string()), &addr);
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["state"], "ON", "input: {input}");
}
}
#[test]
fn normalize_off_variants() {
let addr = addr_with_normalize("JSON:STRING device/set state");
for input in &["0", "off", "Off", "OFF", "false", "FALSE", "False"] {
let result = encode_payload(&DecodedValue::String(input.to_string()), &addr);
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["state"], "OFF", "input: {input}");
}
}
#[test]
fn no_normalize_without_flag() {
let addr = TopicAddress::parse("JSON:STRING device/set state").unwrap();
let result = encode_payload(&DecodedValue::String("1".into()), &addr);
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["state"], "1"); }
#[test]
fn no_normalize_other_strings() {
let addr = addr_with_normalize("JSON:STRING device/set mode");
let result = encode_payload(&DecodedValue::String("auto".into()), &addr);
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["mode"], "auto");
}
#[test]
fn no_normalize_integers() {
let addr = addr_with_normalize("JSON:INT device/set brightness");
let result = encode_payload(&DecodedValue::Int32(0), &addr);
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["brightness"], 0);
let result = encode_payload(&DecodedValue::Int32(1), &addr);
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["brightness"], 1);
}
}