use std::sync::Arc;
use std::time::Duration;
use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS, TlsConfiguration, Transport};
use tokio::sync::broadcast;
use tracing::{debug, error, info, warn};
use crate::config::MqttConfig;
use crate::state::{AppState, ReadingEvent};
pub struct MqttPublisher {
state: Arc<AppState>,
}
impl MqttPublisher {
pub fn new(state: Arc<AppState>) -> Self {
Self { state }
}
pub async fn start(&self) {
let config = self.state.config.read().await;
let mqtt_config = config.mqtt.clone();
drop(config);
if !mqtt_config.enabled {
info!("MQTT publisher is disabled");
return;
}
info!("Starting MQTT publisher to {}", mqtt_config.broker);
let state = Arc::clone(&self.state);
let shutdown_rx = self.state.subscribe_shutdown();
tokio::spawn(async move {
run_mqtt_publisher(state, mqtt_config, shutdown_rx).await;
});
}
}
async fn run_mqtt_publisher(
state: Arc<AppState>,
config: MqttConfig,
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
) {
let (host, port, use_tls) = match parse_broker_url(&config.broker) {
Ok(parsed) => parsed,
Err(e) => {
error!("Invalid MQTT broker URL: {}", e);
return;
}
};
let mut mqtt_options = MqttOptions::new(&config.client_id, host, port);
mqtt_options.set_keep_alive(Duration::from_secs(config.keep_alive));
if let (Some(username), Some(password)) = (&config.username, &config.password) {
mqtt_options.set_credentials(username, password);
}
if use_tls {
mqtt_options.set_transport(Transport::tls_with_config(TlsConfiguration::Native));
}
let qos = match config.qos {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
_ => QoS::ExactlyOnce,
};
let (client, mut eventloop) = AsyncClient::new(mqtt_options, 100);
let mut readings_rx = state.readings_tx.subscribe();
let mut reload_rx = state.collector.subscribe_reload();
info!(
"MQTT publisher connected to {} with prefix '{}'",
config.broker, config.topic_prefix
);
let eventloop_handle = tokio::spawn(async move {
let mut consecutive_errors: u32 = 0;
let max_backoff = Duration::from_secs(300); loop {
match eventloop.poll().await {
Ok(Event::Incoming(Packet::ConnAck(ack))) => {
if consecutive_errors > 0 {
info!(
"MQTT reconnected after {} errors: {:?}",
consecutive_errors, ack
);
} else {
info!("MQTT connected: {:?}", ack);
}
consecutive_errors = 0;
}
Ok(Event::Incoming(Packet::PingResp)) => {
debug!("MQTT ping response received");
}
Ok(Event::Outgoing(_)) => {
}
Ok(_) => {}
Err(e) => {
consecutive_errors = consecutive_errors.saturating_add(1);
let backoff = Duration::from_secs(5)
.saturating_mul(2u32.saturating_pow(consecutive_errors.min(6)))
.min(max_backoff);
if consecutive_errors <= 3 {
warn!(
"MQTT connection error: {}. Reconnecting in {:?}...",
e, backoff
);
} else if consecutive_errors.is_multiple_of(50) {
error!(
"MQTT connection error ({} consecutive): {}. Reconnecting in {:?}...",
consecutive_errors, e, backoff
);
}
tokio::time::sleep(backoff).await;
}
}
}
});
if config.homeassistant {
tokio::time::sleep(Duration::from_secs(2)).await;
let devices = configured_devices(&state).await;
if let Err(e) = publish_ha_discovery(&client, &config, &devices, qos).await {
warn!("Failed to publish HA discovery: {}", e);
}
}
loop {
tokio::select! {
result = readings_rx.recv() => {
match result {
Ok(event) => {
if let Err(e) = publish_reading(&client, &config, &state, &event, qos).await {
warn!("Failed to publish reading: {}", e);
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("MQTT publisher lagged, missed {} readings", n);
}
Err(broadcast::error::RecvError::Closed) => {
info!("Readings channel closed, stopping MQTT publisher");
break;
}
}
}
result = reload_rx.changed() => {
if result.is_ok() && config.homeassistant {
let devices = configured_devices(&state).await;
if let Err(e) = publish_ha_discovery(&client, &config, &devices, qos).await {
warn!("Failed to refresh HA discovery after config reload: {}", e);
}
}
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
info!("MQTT publisher received stop signal");
break;
}
}
}
}
if let Err(e) = client.disconnect().await {
debug!("Error disconnecting MQTT client: {}", e);
}
eventloop_handle.abort();
info!("MQTT publisher stopped");
}
async fn publish_reading(
client: &AsyncClient,
config: &MqttConfig,
state: &AppState,
event: &ReadingEvent,
qos: QoS,
) -> Result<(), rumqttc::ClientError> {
let device_name = sanitize_topic_segment(
configured_device_name(state, &event.device_id)
.await
.as_deref()
.unwrap_or(&event.device_id),
);
let prefix = &config.topic_prefix;
let retain = config.retain;
let json_topic = format!("{}/{}/json", prefix, device_name);
let json_payload = serde_json::to_string(&event.reading).unwrap_or_default();
client
.publish(&json_topic, qos, retain, json_payload.as_bytes())
.await?;
let reading = &event.reading;
let device_type = aranet_types::DeviceType::from_name(&event.device_id);
let has_co2 = device_type.map_or(reading.co2 > 0, |dt| dt.has_co2());
let has_temp = device_type.map_or(reading.temperature != 0.0 || reading.humidity > 0, |dt| {
dt.has_temperature()
});
let has_pressure = device_type.map_or(reading.pressure > 0.0, |dt| dt.has_pressure());
if has_co2 {
let co2_topic = format!("{}/{}/co2", prefix, device_name);
client
.publish(&co2_topic, qos, retain, reading.co2.to_string().as_bytes())
.await?;
}
if has_temp {
let temp_topic = format!("{}/{}/temperature", prefix, device_name);
client
.publish(
&temp_topic,
qos,
retain,
format!("{:.2}", reading.temperature).as_bytes(),
)
.await?;
let humidity_topic = format!("{}/{}/humidity", prefix, device_name);
client
.publish(
&humidity_topic,
qos,
retain,
reading.humidity.to_string().as_bytes(),
)
.await?;
}
if has_pressure {
let pressure_topic = format!("{}/{}/pressure", prefix, device_name);
client
.publish(
&pressure_topic,
qos,
retain,
format!("{:.2}", reading.pressure).as_bytes(),
)
.await?;
}
let battery_topic = format!("{}/{}/battery", prefix, device_name);
client
.publish(
&battery_topic,
qos,
retain,
reading.battery.to_string().as_bytes(),
)
.await?;
let status_topic = format!("{}/{}/status", prefix, device_name);
let status_str = match reading.status {
aranet_types::Status::Green => "green",
aranet_types::Status::Yellow => "yellow",
aranet_types::Status::Red => "red",
aranet_types::Status::Error => "error",
_ => "unknown",
};
client
.publish(&status_topic, qos, retain, status_str.as_bytes())
.await?;
if let Some(radon) = reading.radon {
let radon_topic = format!("{}/{}/radon", prefix, device_name);
client
.publish(&radon_topic, qos, retain, radon.to_string().as_bytes())
.await?;
}
if let Some(avg) = reading.radon_avg_24h {
let topic = format!("{}/{}/radon_avg_24h", prefix, device_name);
client
.publish(&topic, qos, retain, avg.to_string().as_bytes())
.await?;
}
if let Some(avg) = reading.radon_avg_7d {
let topic = format!("{}/{}/radon_avg_7d", prefix, device_name);
client
.publish(&topic, qos, retain, avg.to_string().as_bytes())
.await?;
}
if let Some(avg) = reading.radon_avg_30d {
let topic = format!("{}/{}/radon_avg_30d", prefix, device_name);
client
.publish(&topic, qos, retain, avg.to_string().as_bytes())
.await?;
}
if let Some(rate) = reading.radiation_rate {
let rate_topic = format!("{}/{}/radiation_rate", prefix, device_name);
client
.publish(&rate_topic, qos, retain, format!("{:.4}", rate).as_bytes())
.await?;
}
if let Some(total) = reading.radiation_total {
let total_topic = format!("{}/{}/radiation_total", prefix, device_name);
client
.publish(
&total_topic,
qos,
retain,
format!("{:.6}", total).as_bytes(),
)
.await?;
}
debug!(
"Published reading for {} to MQTT (CO2={})",
event.device_id, reading.co2
);
Ok(())
}
async fn configured_devices(state: &AppState) -> Vec<crate::config::DeviceConfig> {
let config = state.config.read().await;
config.devices.clone()
}
async fn configured_device_name(state: &AppState, device_id: &str) -> Option<String> {
let config = state.config.read().await;
config
.devices
.iter()
.find(|device| device.address == device_id)
.map(|device| {
device
.alias
.clone()
.unwrap_or_else(|| device.address.clone())
})
}
async fn publish_ha_discovery(
client: &AsyncClient,
config: &MqttConfig,
devices: &[crate::config::DeviceConfig],
qos: QoS,
) -> Result<(), rumqttc::ClientError> {
let prefix = &config.ha_discovery_prefix;
let topic_prefix = &config.topic_prefix;
for device in devices {
let device_name =
sanitize_topic_segment(device.alias.as_deref().unwrap_or(&device.address));
let display_name = device.alias.as_deref().unwrap_or(&device.address);
let device_type = aranet_types::DeviceType::from_name(&device.address).or_else(|| {
device
.alias
.as_deref()
.and_then(aranet_types::DeviceType::from_name)
});
let has_co2 = device_type.is_none_or(|dt| dt.has_co2());
let has_temp = device_type.is_none_or(|dt| dt.has_temperature());
let has_pressure = device_type.is_none_or(|dt| dt.has_pressure());
let device_json = serde_json::json!({
"identifiers": [format!("aranet_{}", device_name)],
"name": display_name,
"manufacturer": "SAF Tehnika",
"model": device_type.map(|dt| dt.to_string()).unwrap_or_else(|| "Aranet".to_string()),
});
let publish_sensor = |metric: &str,
name_suffix: &str,
unit: &str,
device_class: Option<&str>,
state_class: Option<&str>| {
let unique_id = format!("aranet_{}_{}", device_name, metric);
let sensor_name = format!("{} {}", display_name, name_suffix);
let state_topic = format!("{}/{}/{}", topic_prefix, device_name, metric);
let config_topic = format!("{}/sensor/{}_{}/config", prefix, device_name, metric);
let mut payload = serde_json::json!({
"name": sensor_name,
"unique_id": unique_id,
"state_topic": state_topic,
"unit_of_measurement": unit,
"device": device_json,
});
if let Some(dc) = device_class {
payload["device_class"] = serde_json::json!(dc);
}
if let Some(sc) = state_class {
payload["state_class"] = serde_json::json!(sc);
}
(config_topic, payload.to_string())
};
let mut sensors = vec![];
if has_co2 {
sensors.push(publish_sensor(
"co2",
"CO\u{2082}",
"ppm",
Some("carbon_dioxide"),
Some("measurement"),
));
}
if has_temp {
sensors.push(publish_sensor(
"temperature",
"Temperature",
"\u{00b0}C",
Some("temperature"),
Some("measurement"),
));
sensors.push(publish_sensor(
"humidity",
"Humidity",
"%",
Some("humidity"),
Some("measurement"),
));
}
if has_pressure {
sensors.push(publish_sensor(
"pressure",
"Pressure",
"hPa",
Some("atmospheric_pressure"),
Some("measurement"),
));
}
sensors.push(publish_sensor(
"battery",
"Battery",
"%",
Some("battery"),
Some("measurement"),
));
for (config_topic, payload) in sensors {
client
.publish(&config_topic, qos, true, payload.as_bytes())
.await?;
}
info!("Published HA discovery for device: {}", display_name);
}
Ok(())
}
fn parse_broker_url(url: &str) -> Result<(String, u16, bool), String> {
let (scheme, rest) = if let Some(stripped) = url.strip_prefix("mqtt://") {
("mqtt", stripped)
} else if let Some(stripped) = url.strip_prefix("mqtts://") {
("mqtts", stripped)
} else {
return Err("Invalid scheme: URL must start with mqtt:// or mqtts://".to_string());
};
let use_tls = scheme == "mqtts";
let default_port = if use_tls { 8883 } else { 1883 };
let (host, port) = if let Some((h, p)) = rest.rsplit_once(':') {
let port = p
.parse::<u16>()
.map_err(|_| format!("Invalid port: {}", p))?;
(h.to_string(), port)
} else {
(rest.to_string(), default_port)
};
if host.is_empty() {
return Err("Host cannot be empty".to_string());
}
Ok((host, port, use_tls))
}
fn sanitize_topic_segment(s: &str) -> String {
s.replace(['#', '+', ' ', '/'], "_")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_broker_url_mqtt() {
let (host, port, tls) = parse_broker_url("mqtt://localhost:1883").unwrap();
assert_eq!(host, "localhost");
assert_eq!(port, 1883);
assert!(!tls);
}
#[test]
fn test_parse_broker_url_mqtts() {
let (host, port, tls) = parse_broker_url("mqtts://broker.example.com:8883").unwrap();
assert_eq!(host, "broker.example.com");
assert_eq!(port, 8883);
assert!(tls);
}
#[test]
fn test_parse_broker_url_default_port() {
let (host, port, tls) = parse_broker_url("mqtt://localhost").unwrap();
assert_eq!(host, "localhost");
assert_eq!(port, 1883);
assert!(!tls);
let (host, port, tls) = parse_broker_url("mqtts://secure.example.com").unwrap();
assert_eq!(host, "secure.example.com");
assert_eq!(port, 8883);
assert!(tls);
}
#[test]
fn test_parse_broker_url_invalid_scheme() {
assert!(parse_broker_url("http://localhost:1883").is_err());
assert!(parse_broker_url("localhost:1883").is_err());
}
#[test]
fn test_parse_broker_url_empty_host() {
assert!(parse_broker_url("mqtt://:1883").is_err());
}
#[test]
fn test_sanitize_topic_segment() {
assert_eq!(sanitize_topic_segment("Aranet4 17C3C"), "Aranet4_17C3C");
assert_eq!(sanitize_topic_segment("device#1"), "device_1");
assert_eq!(sanitize_topic_segment("sensor+temp"), "sensor_temp");
assert_eq!(sanitize_topic_segment("path/to/device"), "path_to_device");
}
#[test]
fn test_sanitize_topic_segment_normal() {
assert_eq!(sanitize_topic_segment("office"), "office");
assert_eq!(sanitize_topic_segment("kitchen-sensor"), "kitchen-sensor");
}
}