use std::collections::HashMap;
use std::time::Duration;
use asyn_rs::port_handle::PortHandle;
use asyn_rs::request::ParamSetValue;
use rumqttc::{AsyncClient, Event, Incoming, MqttOptions};
use tokio::sync::mpsc;
use crate::address::{TopicAddress, ValueType};
use crate::config::MqttConfig;
use crate::driver::PublishRequest;
use crate::payload::{DecodedValue, decode_payload};
pub async fn mqtt_event_loop(
config: MqttConfig,
topics: Vec<String>,
topic_map: HashMap<String, Vec<(usize, TopicAddress)>>,
port_handle: PortHandle,
publish_rx: mpsc::UnboundedReceiver<PublishRequest>,
connected_param: usize,
) {
let mut mqttoptions =
MqttOptions::new(&config.client_id, &config.broker_host, config.broker_port);
mqttoptions.set_keep_alive(Duration::from_secs(config.keep_alive_secs));
mqttoptions.set_clean_session(config.clean_session);
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 256);
tokio::spawn(publish_task(client.clone(), publish_rx));
let mut is_connected = false;
loop {
match eventloop.poll().await {
Ok(Event::Incoming(Incoming::Publish(publish))) => {
if !is_connected {
tracing::debug!(
"MQTT Publish received while Connected=0 — restoring Connected=1"
);
mark_connected(&port_handle, connected_param).await;
is_connected = true;
}
handle_incoming_message(&publish.topic, &publish.payload, &topic_map, &port_handle)
.await;
}
Ok(Event::Incoming(Incoming::ConnAck(_))) => {
tracing::info!("MQTT connected, subscribing to {} topics", topics.len());
mark_connected(&port_handle, connected_param).await;
is_connected = true;
let sub_client = client.clone();
let sub_topics = topics.clone();
let sub_qos = config.qos;
tokio::spawn(async move {
subscribe_all(&sub_client, &sub_topics, sub_qos).await;
});
}
Ok(Event::Incoming(Incoming::PingResp)) => {
if !is_connected {
tracing::debug!(
"MQTT PingResp received while Connected=0 — restoring Connected=1"
);
mark_connected(&port_handle, connected_param).await;
is_connected = true;
}
}
Err(e) => {
tracing::error!("MQTT connection error: {e}");
let _ = port_handle
.set_params_and_notify(
0,
vec![ParamSetValue::Int32 {
reason: connected_param,
addr: 0,
value: 0,
}],
)
.await;
is_connected = false;
tokio::time::sleep(Duration::from_secs(1)).await;
}
_ => {}
}
}
}
async fn mark_connected(port_handle: &PortHandle, connected_param: usize) {
let _ = port_handle
.set_params_and_notify(
0,
vec![ParamSetValue::Int32 {
reason: connected_param,
addr: 0,
value: 1,
}],
)
.await;
}
async fn publish_task(
client: AsyncClient,
mut publish_rx: mpsc::UnboundedReceiver<PublishRequest>,
) {
while let Some(req) = publish_rx.recv().await {
let qos: rumqttc::QoS = req.qos.into();
if let Err(e) = client
.publish(&req.topic, qos, req.retained, req.payload.as_bytes())
.await
{
tracing::warn!("MQTT publish to '{}' failed: {e}", req.topic);
}
}
}
async fn subscribe_all(client: &AsyncClient, topics: &[String], qos: crate::config::QoS) {
let rqos: rumqttc::QoS = qos.into();
for topic in topics {
if let Err(e) = client.subscribe(topic, rqos).await {
tracing::warn!("MQTT subscribe to '{topic}' failed: {e}");
}
}
}
async fn handle_incoming_message(
topic: &str,
payload: &[u8],
topic_map: &HashMap<String, Vec<(usize, TopicAddress)>>,
port_handle: &PortHandle,
) {
let payload_str = match std::str::from_utf8(payload) {
Ok(s) => s,
Err(e) => {
tracing::warn!("Non-UTF8 payload on topic '{topic}': {e}");
return;
}
};
let subscribers = match topic_map.get(topic) {
Some(subs) => subs,
None => return,
};
let mut batch_updates = Vec::new();
for (reason, addr) in subscribers {
match decode_payload(payload_str, addr) {
Ok(decoded) => {
match decoded {
DecodedValue::Int32(v) => {
batch_updates.push(ParamSetValue::Int32 {
reason: *reason,
addr: 0,
value: v,
});
}
DecodedValue::Float64(v) => {
batch_updates.push(ParamSetValue::Float64 {
reason: *reason,
addr: 0,
value: v,
});
}
DecodedValue::String(v) => {
batch_updates.push(ParamSetValue::Octet {
reason: *reason,
addr: 0,
value: v,
});
}
DecodedValue::Float64Array(v) => {
batch_updates.push(ParamSetValue::Float64Array {
reason: *reason,
addr: 0,
value: v,
});
}
DecodedValue::UInt32(v) => {
batch_updates.push(ParamSetValue::UInt32Digital {
reason: *reason,
addr: 0,
value: v,
mask: 0xFFFF_FFFF,
});
}
DecodedValue::Int32Array(v) => {
batch_updates.push(ParamSetValue::Int32Array {
reason: *reason,
addr: 0,
value: v,
});
}
}
}
Err(e) => {
tracing::debug!(
"Failed to decode '{}' on topic '{topic}': {e}",
addr.value_type.label(),
);
}
}
}
if !batch_updates.is_empty()
&& let Err(e) = port_handle.set_params_and_notify(0, batch_updates).await
{
eprintln!("set_params_and_notify error (mqtt payload): {e}");
}
}
impl ValueType {
fn label(&self) -> &'static str {
match self {
ValueType::Int => "INT",
ValueType::Float => "FLOAT",
ValueType::Digital => "DIGITAL",
ValueType::String => "STRING",
ValueType::IntArray => "INTARRAY",
ValueType::FloatArray => "FLOATARRAY",
}
}
}