modbus_mqtt/
server.rs

1use crate::{modbus, mqtt};
2
3use rumqttc::MqttOptions;
4use std::future::Future;
5use tokio::sync::{broadcast, mpsc};
6use tracing::{error, info};
7
8pub async fn run<P: Into<String> + Send>(
9    prefix: P,
10    mut mqtt_options: MqttOptions,
11    shutdown: impl Future,
12) -> crate::Result<()> {
13    let prefix = prefix.into();
14
15    let (notify_shutdown, _) = broadcast::channel(1);
16    let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1);
17
18    mqtt_options.set_last_will(rumqttc::LastWill {
19        topic: prefix.clone(),
20        message: "offline".into(),
21        qos: rumqttc::QoS::AtMostOnce,
22        retain: false,
23    });
24    let client_id = mqtt_options.client_id();
25    let mut mqtt_connection = mqtt::new(mqtt_options).await;
26    let mqtt = mqtt_connection.handle(prefix.clone());
27    mqtt.publish("online").await?;
28    info!(client_id, "MQTT connection established");
29
30    let mut connector = modbus::connector::new(
31        mqtt.clone(),
32        (notify_shutdown.subscribe(), shutdown_complete_tx.clone()).into(),
33    );
34
35    tokio::spawn(async move {
36        if let Err(err) = mqtt_connection.run().await {
37            error!(cause = %err, "MQTT connection error");
38        }
39    });
40
41    tokio::spawn(async move {
42        if let Err(err) = connector.run().await {
43            error!(cause = %err, "Modbus connector error");
44        }
45    });
46
47    shutdown.await;
48    drop(notify_shutdown);
49    drop(shutdown_complete_tx);
50
51    // We want MQTT to be the last thing to shutdown, so it gets shutdown after everything else
52    shutdown_complete_rx.recv().await;
53    mqtt.shutdown().await?;
54
55    Ok(())
56}