mqtt_system_monitor/
daemon.rs

1use crate::configuration::Configuration;
2use crate::status::StatusMessage;
3use log::{debug, error, info, trace};
4use rumqttc::{AsyncClient, ClientError, MqttOptions, QoS};
5use std::sync::atomic::{AtomicBool, Ordering};
6use sysinfo::{
7    Component, Components, CpuRefreshKind, MemoryRefreshKind, Networks, RefreshKind, System,
8};
9use tokio::task;
10use tokio::time::sleep;
11
12pub struct Daemon {
13    config: Configuration,
14    mqtt_config: MqttOptions,
15
16    stop: AtomicBool,
17
18    system: System,
19    network: Networks,
20    temp_component: Option<Component>,
21
22    last_total_transmitted: Option<u64>,
23    last_total_received: Option<u64>,
24}
25
26impl Daemon {
27    pub fn new(config: Configuration) -> Daemon {
28        info!("Daemon for {} starting", config.mqtt.entity);
29
30        let system = System::new_with_specifics(
31            RefreshKind::nothing()
32                .with_cpu(CpuRefreshKind::everything())
33                .with_memory(MemoryRefreshKind::nothing().with_ram()),
34        );
35
36        let network = Networks::new_with_refreshed_list();
37
38        let components = Components::new_with_refreshed_list();
39
40        let mut mqtt_config =
41            MqttOptions::new(&config.mqtt.entity, &config.mqtt.host, config.mqtt.port);
42        mqtt_config.set_credentials(&config.mqtt.user, &config.mqtt.password);
43
44        info!(
45            "Connecting to MQTT broker {}:{}",
46            config.mqtt.host, config.mqtt.port
47        );
48
49        Daemon {
50            mqtt_config,
51            stop: AtomicBool::new(false),
52            system,
53            network,
54            temp_component: Self::select_temp_component(
55                components,
56                config.sensors.temperature.as_deref(),
57            ),
58            last_total_transmitted: None,
59            last_total_received: None,
60            config,
61        }
62    }
63
64    fn select_temp_component(components: Components, temp_name: Option<&str>) -> Option<Component> {
65        let mut cmps = Vec::from(components);
66        let temp_label = temp_name?;
67
68        while let Some(c) = cmps.pop() {
69            if c.label() == temp_label {
70                return Some(c);
71            }
72        }
73        None
74    }
75
76    pub fn update_data(self: &mut Daemon) -> StatusMessage {
77        self.system.refresh_cpu_usage();
78
79        self.network.refresh(true);
80        let (net_tx, net_rx) = self.select_network();
81
82        let component = &mut self.temp_component;
83        if let Some(c) = component {
84            c.refresh();
85        }
86
87        StatusMessage {
88            cpu_usage: self.system.global_cpu_usage(),
89            disk_usage: None,
90            cpu_temp: component.as_ref().and_then(|c| c.temperature()),
91            net_tx: Self::update_rate(
92                &mut self.last_total_transmitted,
93                net_tx,
94                self.config.mqtt.update_period,
95            ),
96            net_rx: Self::update_rate(
97                &mut self.last_total_received,
98                net_rx,
99                self.config.mqtt.update_period,
100            ),
101        }
102    }
103
104    fn select_network(&mut self) -> (Option<u64>, Option<u64>) {
105        if let Some(network) = &self.config.sensors.network.as_deref() {
106            for (interface, net) in &self.network {
107                if network == interface {
108                    return (Some(net.total_transmitted()), Some(net.total_received()));
109                }
110            }
111        };
112
113        (None, None)
114    }
115
116    fn update_rate(
117        last_val: &mut Option<u64>,
118        cur: Option<u64>,
119        update_period: u64,
120    ) -> Option<f64> {
121        let cur = cur?;
122        let last = *last_val;
123        *last_val = Some(cur);
124
125        if let Some(last) = last
126            && last <= cur
127        {
128            Some(((cur - last) / update_period) as f64 / 1024.0)
129        } else {
130            None
131        }
132    }
133
134    pub async fn run(self: &mut Daemon) {
135        let mut cycles_counter = 0;
136        let mut register: bool;
137
138        let expire_cycles = 60 / self.config.mqtt.update_period - 1;
139        let sleep_period = std::time::Duration::from_secs(self.config.mqtt.update_period);
140
141        let topic = format!("mqtt-system-monitor/{}/state", self.config.mqtt.entity);
142
143        let (client, mut eventloop) = AsyncClient::new(self.mqtt_config.clone(), 1);
144
145        task::spawn(async move {
146            while let Ok(notification) = eventloop.poll().await {
147                trace!("MQTT notification received: {notification:?}");
148            }
149        });
150
151        while !self.stop.load(Ordering::Relaxed) {
152            register = cycles_counter == 0;
153
154            if Daemon::publish(&client, &topic, self.update_data().to_string())
155                .await
156                .is_err()
157            {
158                break;
159            }
160
161            if register {
162                let registration_message = self.registration_message();
163
164                if Daemon::publish(&client, registration_message.0, registration_message.1)
165                    .await
166                    .is_err()
167                {
168                    break;
169                };
170            }
171
172            if cycles_counter == expire_cycles {
173                cycles_counter = 0;
174            } else {
175                cycles_counter += 1;
176            }
177
178            sleep(sleep_period).await;
179        }
180    }
181
182    pub fn registration_message(&self) -> (String, String) {
183        let id = &self.config.mqtt.entity;
184        let prefix = &self.config.mqtt.registration_prefix;
185        let version = env!("CARGO_PKG_VERSION");
186        let package_name = env!("CARGO_PKG_NAME");
187        let url = env!("CARGO_PKG_HOMEPAGE");
188
189        (
190            format!("{prefix}/device/{}/config", self.config.mqtt.entity),
191            format!(
192                r#"{{
193  "device": {{
194    "name": "{id}",
195    "identifiers": "{id}"
196  }},
197  "origin": {{
198    "name": "{package_name}",
199    "sw_version": "{version}",
200    "url": "{url}"
201  }},
202  "components": {{
203    "cpu_temp": {{
204      "name": "{id} CPU temperature",
205      "platform": "sensor",
206      "device_class": "temperature",
207      "state_class": "measurement",
208      "unit_of_measurement": "°C",
209      "unique_id": "cpu_temp",
210      "value_template": "{{{{ value_json.cpu_temp }}}}",
211      "expire_after": 60
212    }},
213    "cpu_usage": {{
214      "name": "{id} CPU usage",
215      "platform": "sensor",
216      "device_class": null,
217      "icon": "mdi:cpu-64-bit",
218      "state_class": "measurement",
219      "unit_of_measurement": "%",
220      "unique_id": "cpu_usage",
221      "value_template": "{{{{ value_json.cpu_usage }}}}",
222      "expire_after": 60
223    }},
224    "net_rx": {{
225      "name": "{id} Network RX rate",
226      "platform": "sensor",
227      "device_class": "data_rate",
228      "state_class": "measurement",
229      "unit_of_measurement": "KiB/s",
230      "unique_id": "net_rx",
231      "value_template": "{{{{ value_json.net_rx }}}}",
232      "expire_after": 60
233    }},
234    "net_tx": {{
235      "name": "{id} Network TX rate",
236      "platform": "sensor",
237      "device_class": "data_rate",
238      "state_class": "measurement",
239      "unit_of_measurement": "KiB/s",
240      "unique_id": "net_tx",
241      "value_template": "{{{{ value_json.net_tx }}}}",
242      "expire_after": 60
243    }},
244    "disk_usage": {{
245      "name": "{id} Disk usage",
246      "platform": "sensor",
247      "device_class": "data_size",
248      "state_class": "measurement",
249      "unit_of_measurement": "B",
250      "unique_id": "disk_usage",
251      "value_template": "{{{{ value_json.disk_usage }}}}",
252      "expire_after": 60
253    }}
254  }},
255  "state_topic": "mqtt-system-monitor/{id}/state"
256}}"#
257            ),
258        )
259    }
260
261    async fn publish<S>(client: &AsyncClient, topic: S, data: String) -> Result<(), ClientError>
262    where
263        S: Into<String> + std::fmt::Display,
264    {
265        debug!("Publishing to topic {topic} : {data}");
266        match client.publish(topic, QoS::AtLeastOnce, false, data).await {
267            Err(message) => {
268                error!("MQTT publish error: {message}");
269
270                Err(message)
271            }
272            _ => Ok(()),
273        }
274    }
275}
276
277impl Drop for Daemon {
278    fn drop(&mut self) {
279        self.stop.store(true, Ordering::Relaxed)
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    #[test]
288    fn test_update_rate() {
289        let mut start: Option<u64> = None;
290
291        // As long as we don't have any data to send, the start stays at None
292        assert_eq!(Daemon::update_rate(&mut start, None, 10), None);
293        assert_eq!(start, None);
294
295        // At first iteration we return None because the rate is not known yet
296        assert_eq!(Daemon::update_rate(&mut start, Some(10), 10), None);
297        assert_eq!(start, Some(10));
298
299        // The total received was increased by 20 KiBytes, divided by the update of 10 is 2 KiBytes/s
300        assert_eq!(
301            Daemon::update_rate(&mut start, Some(10 + 2 * 1024 * 10), 10),
302            Some(2.0)
303        );
304        assert_eq!(start, Some(10 + 2 * 1024 * 10));
305    }
306}