mqtt_system_monitor/
daemon.rs

1use crate::configuration::Configuration;
2use crate::status::StatusMessage;
3use log::{debug, error, info, trace};
4use rumqttc::{AsyncClient, ClientError, MqttOptions, QoS};
5use std::sync::atomic::{AtomicBool, Ordering};
6use sysinfo::{
7    Component, Components, CpuRefreshKind, MemoryRefreshKind, Networks, RefreshKind, System,
8};
9use tokio::task;
10use tokio::time::sleep;
11
12pub struct Daemon {
13    config: Configuration,
14    mqtt_config: MqttOptions,
15
16    stop: AtomicBool,
17
18    system: System,
19    network: Networks,
20    temp_component: Option<Component>,
21
22    last_total_transmitted: Option<u64>,
23    last_total_received: Option<u64>,
24}
25
26impl Daemon {
27    pub fn new(config: Configuration) -> Daemon {
28        info!("Daemon for {} starting", config.mqtt.entity);
29
30        let system = System::new_with_specifics(
31            RefreshKind::nothing()
32                .with_cpu(CpuRefreshKind::everything())
33                .with_memory(MemoryRefreshKind::nothing().with_ram()),
34        );
35
36        let network = Networks::new_with_refreshed_list();
37
38        let components = Components::new_with_refreshed_list();
39
40        let mut mqtt_config =
41            MqttOptions::new(&config.mqtt.entity, &config.mqtt.host, config.mqtt.port);
42        mqtt_config.set_credentials(&config.mqtt.user, &config.mqtt.password);
43
44        info!(
45            "Connecting to MQTT broker {}:{}",
46            config.mqtt.host, config.mqtt.port
47        );
48
49        Daemon {
50            mqtt_config,
51            stop: AtomicBool::new(false),
52            system,
53            network,
54            temp_component: Self::select_temp_component(
55                components,
56                config.sensors.temperature.as_deref(),
57            ),
58            last_total_transmitted: None,
59            last_total_received: None,
60            config,
61        }
62    }
63
64    fn select_temp_component(components: Components, temp_name: Option<&str>) -> Option<Component> {
65        let mut cmps = Vec::from(components);
66        let temp_label = temp_name?;
67
68        while let Some(c) = cmps.pop() {
69            if c.label() == temp_label {
70                return Some(c);
71            }
72        }
73        None
74    }
75
76    pub fn update_data(self: &mut Daemon) -> StatusMessage {
77        self.system.refresh_cpu_usage();
78
79        self.network.refresh(true);
80        let (net_tx, net_rx) = self.select_network();
81
82        let component = &mut self.temp_component;
83        if let Some(c) = component {
84            c.refresh();
85        }
86
87        StatusMessage {
88            cpu_usage: self.system.global_cpu_usage(),
89            disk_usage: None,
90            cpu_temp: component.as_ref().and_then(|c| c.temperature()),
91            net_tx: Self::update_rate(
92                &mut self.last_total_transmitted,
93                net_tx,
94                self.config.mqtt.update_period,
95            ),
96            net_rx: Self::update_rate(
97                &mut self.last_total_received,
98                net_rx,
99                self.config.mqtt.update_period,
100            ),
101        }
102    }
103
104    fn select_network(&mut self) -> (Option<u64>, Option<u64>) {
105        if let Some(network) = &self.config.sensors.network.as_deref() {
106            for (interface, net) in &self.network {
107                if network == interface {
108                    return (Some(net.total_transmitted()), Some(net.total_received()));
109                }
110            }
111        };
112
113        (None, None)
114    }
115
116    fn update_rate(last_val: &mut Option<u64>, cur: Option<u64>, update_period: u64) -> Option<f64> {
117        let cur = cur?;
118        let last = *last_val;
119        *last_val = Some(cur);
120
121        if let Some(last) = last && last <= cur {
122            Some(((cur - last) / update_period) as f64 / 1024.0)
123        } else {
124            None
125        }
126    }
127
128    pub async fn run(self: &mut Daemon) {
129        let mut cycles_counter = 0;
130        let mut register: bool;
131
132        let expire_cycles = 60 / self.config.mqtt.update_period - 1;
133        let sleep_period = std::time::Duration::from_secs(self.config.mqtt.update_period);
134
135        let topic = format!("mqtt-system-monitor/{}/state", self.config.mqtt.entity);
136
137        let (client, mut eventloop) = AsyncClient::new(self.mqtt_config.clone(), 1);
138
139        task::spawn(async move {
140            while let Ok(notification) = eventloop.poll().await {
141                trace!("MQTT notification received: {notification:?}");
142            }
143        });
144
145        while !self.stop.load(Ordering::Relaxed) {
146            register = cycles_counter == 0;
147
148            if Daemon::publish(&client, &topic, self.update_data().to_string())
149                .await
150                .is_err()
151            {
152                break;
153            }
154
155            if register {
156                let registration_message = self.registration_message();
157
158                if Daemon::publish(&client, registration_message.0, registration_message.1)
159                    .await
160                    .is_err()
161                {
162                    break;
163                };
164            }
165
166            if cycles_counter == expire_cycles {
167                cycles_counter = 0;
168            } else {
169                cycles_counter += 1;
170            }
171
172            sleep(sleep_period).await;
173        }
174    }
175
176    pub fn registration_message(&self) -> (String, String) {
177        let id = &self.config.mqtt.entity;
178        let prefix = &self.config.mqtt.registration_prefix;
179        let version = env!("CARGO_PKG_VERSION");
180        let package_name = env!("CARGO_PKG_NAME");
181        let url = env!("CARGO_PKG_HOMEPAGE");
182
183        (
184            format!("{prefix}/device/{}/config", self.config.mqtt.entity),
185            format!(
186                r#"{{
187  "device": {{
188    "name": "{id}",
189    "identifiers": "{id}"
190  }},
191  "origin": {{
192    "name": "{package_name}",
193    "sw_version": "{version}",
194    "url": "{url}"
195  }},
196  "components": {{
197    "cpu_temp": {{
198      "name": "{id} CPU temperature",
199      "platform": "sensor",
200      "device_class": "temperature",
201      "state_class": "measurement",
202      "unit_of_measurement": "°C",
203      "unique_id": "cpu_temp",
204      "value_template": "{{{{ value_json.cpu_temp }}}}",
205      "expire_after": 60
206    }},
207    "cpu_usage": {{
208      "name": "{id} CPU usage",
209      "platform": "sensor",
210      "device_class": null,
211      "icon": "mdi:cpu-64-bit",
212      "state_class": "measurement",
213      "unit_of_measurement": "%",
214      "unique_id": "cpu_usage",
215      "value_template": "{{{{ value_json.cpu_usage }}}}",
216      "expire_after": 60
217    }},
218    "net_rx": {{
219      "name": "{id} Network RX rate",
220      "platform": "sensor",
221      "device_class": "data_rate",
222      "state_class": "measurement",
223      "unit_of_measurement": "KiB/s",
224      "unique_id": "net_rx",
225      "value_template": "{{{{ value_json.net_rx }}}}",
226      "expire_after": 60
227    }},
228    "net_tx": {{
229      "name": "{id} Network TX rate",
230      "platform": "sensor",
231      "device_class": "data_rate",
232      "state_class": "measurement",
233      "unit_of_measurement": "KiB/s",
234      "unique_id": "net_tx",
235      "value_template": "{{{{ value_json.net_tx }}}}",
236      "expire_after": 60
237    }},
238    "disk_usage": {{
239      "name": "{id} Disk usage",
240      "platform": "sensor",
241      "device_class": "data_size",
242      "state_class": "measurement",
243      "unit_of_measurement": "B",
244      "unique_id": "disk_usage",
245      "value_template": "{{{{ value_json.disk_usage }}}}",
246      "expire_after": 60
247    }}
248  }},
249  "state_topic": "mqtt-system-monitor/{id}/state"
250}}"#
251            ),
252        )
253    }
254
255    async fn publish<S>(client: &AsyncClient, topic: S, data: String) -> Result<(), ClientError>
256    where
257        S: Into<String> + std::fmt::Display,
258    {
259        debug!("Publishing to topic {topic} : {data}");
260        match client.publish(topic, QoS::AtLeastOnce, false, data).await {
261            Err(message) => {
262                error!("MQTT publish error: {message}");
263
264                Err(message)
265            }
266            _ => Ok(()),
267        }
268    }
269}
270
271impl Drop for Daemon {
272    fn drop(&mut self) {
273        self.stop.store(true, Ordering::Relaxed)
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280
281    #[test]
282    fn test_update_rate() {
283        let mut start: Option<u64> = None;
284
285        // As long as we don't have any data to send, the start stays at None
286        assert_eq!(Daemon::update_rate(&mut start, None, 10), None);
287        assert_eq!(start, None);
288
289        // At first iteration we return None because the rate is not known yet
290        assert_eq!(Daemon::update_rate(&mut start, Some(10), 10), None);
291        assert_eq!(start, Some(10));
292
293        // The total received was increased by 20 KiBytes, divided by the update of 10 is 2 KiBytes/s
294        assert_eq!(
295            Daemon::update_rate(&mut start, Some(10 + 2 * 1024 * 10), 10),
296            Some(2.0)
297        );
298        assert_eq!(start, Some(10 + 2 * 1024 * 10));
299    }
300}