1use crate::configuration::Configuration;
2use crate::status::StatusMessage;
3use log::{debug, error, info, trace};
4use rumqttc::{AsyncClient, ClientError, MqttOptions, QoS};
5use std::sync::atomic::{AtomicBool, Ordering};
6use sysinfo::{
7 Component, Components, CpuRefreshKind, MemoryRefreshKind, Networks, RefreshKind, System,
8};
9use tokio::task;
10use tokio::time::sleep;
11
12pub struct Daemon {
13 config: Configuration,
14 mqtt_config: MqttOptions,
15
16 stop: AtomicBool,
17
18 system: System,
19 network: Networks,
20 temp_component: Option<Component>,
21
22 last_total_transmitted: Option<u64>,
23 last_total_received: Option<u64>,
24}
25
26impl Daemon {
27 pub fn new(config: Configuration) -> Daemon {
28 info!("Daemon for {} starting", config.mqtt.entity);
29
30 let system = System::new_with_specifics(
31 RefreshKind::nothing()
32 .with_cpu(CpuRefreshKind::everything())
33 .with_memory(MemoryRefreshKind::nothing().with_ram()),
34 );
35
36 let network = Networks::new_with_refreshed_list();
37
38 let components = Components::new_with_refreshed_list();
39
40 let mut mqtt_config =
41 MqttOptions::new(&config.mqtt.entity, &config.mqtt.host, config.mqtt.port);
42 mqtt_config.set_credentials(&config.mqtt.user, &config.mqtt.password);
43
44 info!(
45 "Connecting to MQTT broker {}:{}",
46 config.mqtt.host, config.mqtt.port
47 );
48
49 Daemon {
50 mqtt_config,
51 stop: AtomicBool::new(false),
52 system,
53 network,
54 temp_component: Self::select_temp_component(
55 components,
56 config.sensors.temperature.as_deref(),
57 ),
58 last_total_transmitted: None,
59 last_total_received: None,
60 config,
61 }
62 }
63
64 fn select_temp_component(components: Components, temp_name: Option<&str>) -> Option<Component> {
65 let mut cmps = Vec::from(components);
66 let temp_label = temp_name?;
67
68 while let Some(c) = cmps.pop() {
69 if c.label() == temp_label {
70 return Some(c);
71 }
72 }
73 None
74 }
75
76 pub fn update_data(self: &mut Daemon) -> StatusMessage {
77 self.system.refresh_cpu_usage();
78
79 self.network.refresh(true);
80 let (net_tx, net_rx) = self.select_network();
81
82 let component = &mut self.temp_component;
83 if let Some(c) = component {
84 c.refresh();
85 }
86
87 StatusMessage {
88 cpu_usage: self.system.global_cpu_usage(),
89 disk_usage: None,
90 cpu_temp: component.as_ref().and_then(|c| c.temperature()),
91 net_tx: Self::update_rate(
92 &mut self.last_total_transmitted,
93 net_tx,
94 self.config.mqtt.update_period,
95 ),
96 net_rx: Self::update_rate(
97 &mut self.last_total_received,
98 net_rx,
99 self.config.mqtt.update_period,
100 ),
101 }
102 }
103
104 fn select_network(&mut self) -> (Option<u64>, Option<u64>) {
105 if let Some(network) = &self.config.sensors.network.as_deref() {
106 for (interface, net) in &self.network {
107 if network == interface {
108 return (Some(net.total_transmitted()), Some(net.total_received()));
109 }
110 }
111 };
112
113 (None, None)
114 }
115
116 fn update_rate(
117 last_val: &mut Option<u64>,
118 cur: Option<u64>,
119 update_period: u64,
120 ) -> Option<f64> {
121 let cur = cur?;
122 let last = *last_val;
123 *last_val = Some(cur);
124
125 if let Some(last) = last
126 && last <= cur
127 {
128 Some(((cur - last) / update_period) as f64 / 1024.0)
129 } else {
130 None
131 }
132 }
133
134 pub async fn run(self: &mut Daemon) {
135 let mut cycles_counter = 0;
136 let mut register: bool;
137
138 let expire_cycles = 60 / self.config.mqtt.update_period - 1;
139 let sleep_period = std::time::Duration::from_secs(self.config.mqtt.update_period);
140
141 let topic = format!("mqtt-system-monitor/{}/state", self.config.mqtt.entity);
142
143 let (client, mut eventloop) = AsyncClient::new(self.mqtt_config.clone(), 1);
144
145 task::spawn(async move {
146 while let Ok(notification) = eventloop.poll().await {
147 trace!("MQTT notification received: {notification:?}");
148 }
149 });
150
151 while !self.stop.load(Ordering::Relaxed) {
152 register = cycles_counter == 0;
153
154 if Daemon::publish(&client, &topic, self.update_data().to_string())
155 .await
156 .is_err()
157 {
158 break;
159 }
160
161 if register {
162 let registration_message = self.registration_message();
163
164 if Daemon::publish(&client, registration_message.0, registration_message.1)
165 .await
166 .is_err()
167 {
168 break;
169 };
170 }
171
172 if cycles_counter == expire_cycles {
173 cycles_counter = 0;
174 } else {
175 cycles_counter += 1;
176 }
177
178 sleep(sleep_period).await;
179 }
180 }
181
182 pub fn registration_message(&self) -> (String, String) {
183 let id = &self.config.mqtt.entity;
184 let prefix = &self.config.mqtt.registration_prefix;
185 let version = env!("CARGO_PKG_VERSION");
186 let package_name = env!("CARGO_PKG_NAME");
187 let url = env!("CARGO_PKG_HOMEPAGE");
188
189 (
190 format!("{prefix}/device/{}/config", self.config.mqtt.entity),
191 format!(
192 r#"{{
193 "device": {{
194 "name": "{id}",
195 "identifiers": "{id}"
196 }},
197 "origin": {{
198 "name": "{package_name}",
199 "sw_version": "{version}",
200 "url": "{url}"
201 }},
202 "components": {{
203 "cpu_temp": {{
204 "name": "{id} CPU temperature",
205 "platform": "sensor",
206 "device_class": "temperature",
207 "state_class": "measurement",
208 "unit_of_measurement": "°C",
209 "unique_id": "cpu_temp",
210 "value_template": "{{{{ value_json.cpu_temp }}}}",
211 "expire_after": 60
212 }},
213 "cpu_usage": {{
214 "name": "{id} CPU usage",
215 "platform": "sensor",
216 "device_class": null,
217 "icon": "mdi:cpu-64-bit",
218 "state_class": "measurement",
219 "unit_of_measurement": "%",
220 "unique_id": "cpu_usage",
221 "value_template": "{{{{ value_json.cpu_usage }}}}",
222 "expire_after": 60
223 }},
224 "net_rx": {{
225 "name": "{id} Network RX rate",
226 "platform": "sensor",
227 "device_class": "data_rate",
228 "state_class": "measurement",
229 "unit_of_measurement": "KiB/s",
230 "unique_id": "net_rx",
231 "value_template": "{{{{ value_json.net_rx }}}}",
232 "expire_after": 60
233 }},
234 "net_tx": {{
235 "name": "{id} Network TX rate",
236 "platform": "sensor",
237 "device_class": "data_rate",
238 "state_class": "measurement",
239 "unit_of_measurement": "KiB/s",
240 "unique_id": "net_tx",
241 "value_template": "{{{{ value_json.net_tx }}}}",
242 "expire_after": 60
243 }},
244 "disk_usage": {{
245 "name": "{id} Disk usage",
246 "platform": "sensor",
247 "device_class": "data_size",
248 "state_class": "measurement",
249 "unit_of_measurement": "B",
250 "unique_id": "disk_usage",
251 "value_template": "{{{{ value_json.disk_usage }}}}",
252 "expire_after": 60
253 }}
254 }},
255 "state_topic": "mqtt-system-monitor/{id}/state"
256}}"#
257 ),
258 )
259 }
260
261 async fn publish<S>(client: &AsyncClient, topic: S, data: String) -> Result<(), ClientError>
262 where
263 S: Into<String> + std::fmt::Display,
264 {
265 debug!("Publishing to topic {topic} : {data}");
266 match client.publish(topic, QoS::AtLeastOnce, false, data).await {
267 Err(message) => {
268 error!("MQTT publish error: {message}");
269
270 Err(message)
271 }
272 _ => Ok(()),
273 }
274 }
275}
276
277impl Drop for Daemon {
278 fn drop(&mut self) {
279 self.stop.store(true, Ordering::Relaxed)
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 #[test]
288 fn test_update_rate() {
289 let mut start: Option<u64> = None;
290
291 assert_eq!(Daemon::update_rate(&mut start, None, 10), None);
293 assert_eq!(start, None);
294
295 assert_eq!(Daemon::update_rate(&mut start, Some(10), 10), None);
297 assert_eq!(start, Some(10));
298
299 assert_eq!(
301 Daemon::update_rate(&mut start, Some(10 + 2 * 1024 * 10), 10),
302 Some(2.0)
303 );
304 assert_eq!(start, Some(10 + 2 * 1024 * 10));
305 }
306}