1use crate::{modbus, mqtt};
2
3use rumqttc::MqttOptions;
4use std::future::Future;
5use tokio::sync::{broadcast, mpsc};
6use tracing::{error, info};
7
8pub async fn run<P: Into<String> + Send>(
9 prefix: P,
10 mut mqtt_options: MqttOptions,
11 shutdown: impl Future,
12) -> crate::Result<()> {
13 let prefix = prefix.into();
14
15 let (notify_shutdown, _) = broadcast::channel(1);
16 let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1);
17
18 mqtt_options.set_last_will(rumqttc::LastWill {
19 topic: prefix.clone(),
20 message: "offline".into(),
21 qos: rumqttc::QoS::AtMostOnce,
22 retain: false,
23 });
24 let client_id = mqtt_options.client_id();
25 let mut mqtt_connection = mqtt::new(mqtt_options).await;
26 let mqtt = mqtt_connection.handle(prefix.clone());
27 mqtt.publish("online").await?;
28 info!(client_id, "MQTT connection established");
29
30 let mut connector = modbus::connector::new(
31 mqtt.clone(),
32 (notify_shutdown.subscribe(), shutdown_complete_tx.clone()).into(),
33 );
34
35 tokio::spawn(async move {
36 if let Err(err) = mqtt_connection.run().await {
37 error!(cause = %err, "MQTT connection error");
38 }
39 });
40
41 tokio::spawn(async move {
42 if let Err(err) = connector.run().await {
43 error!(cause = %err, "Modbus connector error");
44 }
45 });
46
47 shutdown.await;
48 drop(notify_shutdown);
49 drop(shutdown_complete_tx);
50
51 shutdown_complete_rx.recv().await;
53 mqtt.shutdown().await?;
54
55 Ok(())
56}