mqtt_system_monitor/
daemon.rs

1use crate::configuration::Configuration;
2use crate::home_assistant::{RegistrationDescriptor, Sensor};
3use crate::status::StatusMessage;
4use log::{debug, error, info, trace};
5use rumqttc::{AsyncClient, ClientError, MqttOptions, QoS};
6use std::sync::atomic::{AtomicBool, Ordering};
7use sysinfo::{
8    Component, Components, CpuRefreshKind, MemoryRefreshKind, Networks, RefreshKind, System,
9};
10use tokio::task;
11use tokio::time::sleep;
12
13pub struct Daemon {
14    config: Configuration,
15    mqtt_config: MqttOptions,
16    registration_descriptor: RegistrationDescriptor,
17
18    stop: AtomicBool,
19
20    system: System,
21    network: Networks,
22    temp_component: Option<Component>,
23
24    last_total_transmitted: Option<u64>,
25    last_total_received: Option<u64>,
26}
27
28impl Daemon {
29    pub fn new(config: Configuration) -> Daemon {
30        info!("Daemon for {} starting", config.mqtt.entity);
31
32        let system = System::new_with_specifics(
33            RefreshKind::nothing()
34                .with_cpu(CpuRefreshKind::everything())
35                .with_memory(MemoryRefreshKind::nothing().with_ram()),
36        );
37
38        let network = Networks::new_with_refreshed_list();
39
40        let components = Components::new_with_refreshed_list();
41
42        let mut mqtt_config =
43            MqttOptions::new(&config.mqtt.entity, &config.mqtt.host, config.mqtt.port);
44        mqtt_config.set_credentials(&config.mqtt.user, &config.mqtt.password);
45
46        info!(
47            "Connecting to MQTT broker {}:{}",
48            config.mqtt.host, config.mqtt.port
49        );
50
51        Daemon {
52            mqtt_config,
53            stop: AtomicBool::new(false),
54            registration_descriptor: RegistrationDescriptor::new(&config.mqtt.entity),
55            system,
56            network,
57            temp_component: Self::select_temp_component(
58                components,
59                config.sensors.temperature.as_deref(),
60            ),
61            last_total_transmitted: None,
62            last_total_received: None,
63            config,
64        }
65    }
66
67    fn select_temp_component(components: Components, temp_name: Option<&str>) -> Option<Component> {
68        let mut cmps = Vec::from(components);
69        let temp_label = temp_name?;
70
71        while let Some(c) = cmps.pop() {
72            if c.label() == temp_label {
73                return Some(c);
74            }
75        }
76        None
77    }
78
79    pub fn update_data(self: &mut Daemon) -> StatusMessage {
80        self.system.refresh_cpu_usage();
81
82        self.network.refresh(true);
83        let (net_tx, net_rx) = self.select_network();
84
85        let component = &mut self.temp_component;
86        if let Some(c) = component {
87            c.refresh();
88        }
89
90        StatusMessage {
91            cpu_usage: self.system.global_cpu_usage(),
92            disk_usage: None,
93            cpu_temp: component.as_ref().and_then(|c| c.temperature()),
94            net_tx: Self::update_rate(
95                &mut self.last_total_transmitted,
96                net_tx,
97                self.config.mqtt.update_period,
98            ),
99            net_rx: Self::update_rate(
100                &mut self.last_total_received,
101                net_rx,
102                self.config.mqtt.update_period,
103            ),
104        }
105    }
106
107    fn select_network(&mut self) -> (Option<u64>, Option<u64>) {
108        if let Some(network) = &self.config.sensors.network.as_deref() {
109            for (interface, net) in &self.network {
110                if network == interface {
111                    return (Some(net.total_transmitted()), Some(net.total_received()));
112                }
113            }
114        };
115
116        (None, None)
117    }
118
119    fn update_rate(
120        last_val: &mut Option<u64>,
121        cur: Option<u64>,
122        update_period: u64,
123    ) -> Option<f64> {
124        let cur = cur?;
125        let last = *last_val;
126        *last_val = Some(cur);
127
128        if let Some(last) = last
129            && last <= cur
130        {
131            Some(((cur - last) / update_period) as f64 / 1024.0)
132        } else {
133            None
134        }
135    }
136
137    pub fn register_sensors(&mut self) {
138        let entity = self.config.mqtt.entity.as_str();
139        self.registration_descriptor
140            .add_component(Sensor::CpuUsage, entity);
141        if self.temp_component.is_some() {
142            self.registration_descriptor
143                .add_component(Sensor::CpuTemperature, entity);
144        }
145        if self.config.sensors.network.is_some() {
146            self.registration_descriptor
147                .add_component(Sensor::NetTx, entity);
148            self.registration_descriptor
149                .add_component(Sensor::NetRx, entity);
150        }
151    }
152
153    pub async fn run(self: &mut Daemon) {
154        let mut cycles_counter = 0;
155        let mut register: bool;
156
157        let expire_cycles = 60 / self.config.mqtt.update_period - 1;
158        let sleep_period = std::time::Duration::from_secs(self.config.mqtt.update_period);
159
160        let topic = format!("mqtt-system-monitor/{}/state", self.config.mqtt.entity);
161        self.register_sensors();
162
163        let (client, mut eventloop) = AsyncClient::new(self.mqtt_config.clone(), 1);
164
165        task::spawn(async move {
166            while let Ok(notification) = eventloop.poll().await {
167                trace!("MQTT notification received: {notification:?}");
168            }
169        });
170
171        while !self.stop.load(Ordering::Relaxed) {
172            register = cycles_counter == 0;
173
174            if Daemon::publish(&client, &topic, self.update_data().to_string())
175                .await
176                .is_err()
177            {
178                break;
179            }
180
181            if register {
182                let prefix = &self.config.mqtt.registration_prefix;
183                let descriptor = self.registration_descriptor();
184
185                if Daemon::publish(
186                    &client,
187                    descriptor.discovery_topic(prefix),
188                    descriptor.to_string(),
189                )
190                .await
191                .is_err()
192                {
193                    break;
194                };
195            }
196
197            if cycles_counter == expire_cycles {
198                cycles_counter = 0;
199            } else {
200                cycles_counter += 1;
201            }
202
203            sleep(sleep_period).await;
204        }
205    }
206
207    pub fn registration_descriptor(&self) -> &RegistrationDescriptor {
208        &self.registration_descriptor
209    }
210
211    async fn publish<S>(client: &AsyncClient, topic: S, data: String) -> Result<(), ClientError>
212    where
213        S: Into<String> + std::fmt::Display,
214    {
215        debug!("Publishing to topic {topic} : {data}");
216        match client.publish(topic, QoS::AtLeastOnce, false, data).await {
217            Err(message) => {
218                error!("MQTT publish error: {message}");
219
220                Err(message)
221            }
222            _ => Ok(()),
223        }
224    }
225}
226
227impl Drop for Daemon {
228    fn drop(&mut self) {
229        self.stop.store(true, Ordering::Relaxed)
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn test_update_rate() {
239        let mut start: Option<u64> = None;
240
241        // As long as we don't have any data to send, the start stays at None
242        assert_eq!(Daemon::update_rate(&mut start, None, 10), None);
243        assert_eq!(start, None);
244
245        // At first iteration we return None because the rate is not known yet
246        assert_eq!(Daemon::update_rate(&mut start, Some(10), 10), None);
247        assert_eq!(start, Some(10));
248
249        // The total received was increased by 20 KiBytes, divided by the update of 10 is 2 KiBytes/s
250        assert_eq!(
251            Daemon::update_rate(&mut start, Some(10 + 2 * 1024 * 10), 10),
252            Some(2.0)
253        );
254        assert_eq!(start, Some(10 + 2 * 1024 * 10));
255    }
256}