1use crate::configuration::Configuration;
2use crate::status::StatusMessage;
3use log::{debug, error, info, trace};
4use rumqttc::{AsyncClient, ClientError, MqttOptions, QoS};
5use std::sync::atomic::{AtomicBool, Ordering};
6use sysinfo::{
7 Component, Components, CpuRefreshKind, MemoryRefreshKind, Networks, RefreshKind, System,
8};
9use tokio::task;
10use tokio::time::sleep;
11
12pub struct Daemon {
13 config: Configuration,
14 mqtt_config: MqttOptions,
15
16 stop: AtomicBool,
17
18 system: System,
19 network: Networks,
20 temp_component: Option<Component>,
21
22 last_total_transmitted: Option<u64>,
23 last_total_received: Option<u64>,
24}
25
26impl Daemon {
27 pub fn new(config: Configuration) -> Daemon {
28 info!("Daemon for {} starting", config.mqtt.entity);
29
30 let system = System::new_with_specifics(
31 RefreshKind::nothing()
32 .with_cpu(CpuRefreshKind::everything())
33 .with_memory(MemoryRefreshKind::nothing().with_ram()),
34 );
35
36 let network = Networks::new_with_refreshed_list();
37
38 let components = Components::new_with_refreshed_list();
39
40 let mut mqtt_config =
41 MqttOptions::new(&config.mqtt.entity, &config.mqtt.host, config.mqtt.port);
42 mqtt_config.set_credentials(&config.mqtt.user, &config.mqtt.password);
43
44 info!(
45 "Connecting to MQTT broker {}:{}",
46 config.mqtt.host, config.mqtt.port
47 );
48
49 Daemon {
50 mqtt_config,
51 stop: AtomicBool::new(false),
52 system,
53 network,
54 temp_component: Self::select_temp_component(
55 components,
56 config.sensors.temperature.as_deref(),
57 ),
58 last_total_transmitted: None,
59 last_total_received: None,
60 config,
61 }
62 }
63
64 fn select_temp_component(components: Components, temp_name: Option<&str>) -> Option<Component> {
65 let mut cmps = Vec::from(components);
66 let temp_label = temp_name?;
67
68 while let Some(c) = cmps.pop() {
69 if c.label() == temp_label {
70 return Some(c);
71 }
72 }
73 None
74 }
75
76 pub fn update_data(self: &mut Daemon) -> StatusMessage {
77 self.system.refresh_cpu_usage();
78
79 self.network.refresh(true);
80 let (net_tx, net_rx) = self.select_network();
81
82 let component = &mut self.temp_component;
83 if let Some(c) = component {
84 c.refresh();
85 }
86
87 StatusMessage {
88 cpu_usage: self.system.global_cpu_usage(),
89 disk_usage: None,
90 cpu_temp: component.as_ref().and_then(|c| c.temperature()),
91 net_tx: Self::update_rate(
92 &mut self.last_total_transmitted,
93 net_tx,
94 self.config.mqtt.update_period,
95 ),
96 net_rx: Self::update_rate(
97 &mut self.last_total_received,
98 net_rx,
99 self.config.mqtt.update_period,
100 ),
101 }
102 }
103
104 fn select_network(&mut self) -> (Option<u64>, Option<u64>) {
105 if let Some(network) = &self.config.sensors.network.as_deref() {
106 for (interface, net) in &self.network {
107 if network == interface {
108 return (Some(net.total_transmitted()), Some(net.total_received()));
109 }
110 }
111 };
112
113 (None, None)
114 }
115
116 fn update_rate(last_val: &mut Option<u64>, cur: Option<u64>, update_period: u64) -> Option<f64> {
117 let cur = cur?;
118 let last = *last_val;
119 *last_val = Some(cur);
120
121 if let Some(last) = last && last <= cur {
122 Some(((cur - last) / update_period) as f64 / 1024.0)
123 } else {
124 None
125 }
126 }
127
128 pub async fn run(self: &mut Daemon) {
129 let mut cycles_counter = 0;
130 let mut register: bool;
131
132 let expire_cycles = 60 / self.config.mqtt.update_period - 1;
133 let sleep_period = std::time::Duration::from_secs(self.config.mqtt.update_period);
134
135 let topic = format!("mqtt-system-monitor/{}/state", self.config.mqtt.entity);
136
137 let (client, mut eventloop) = AsyncClient::new(self.mqtt_config.clone(), 1);
138
139 task::spawn(async move {
140 while let Ok(notification) = eventloop.poll().await {
141 trace!("MQTT notification received: {notification:?}");
142 }
143 });
144
145 while !self.stop.load(Ordering::Relaxed) {
146 register = cycles_counter == 0;
147
148 if Daemon::publish(&client, &topic, self.update_data().to_string())
149 .await
150 .is_err()
151 {
152 break;
153 }
154
155 if register {
156 let registration_message = self.registration_message();
157
158 if Daemon::publish(&client, registration_message.0, registration_message.1)
159 .await
160 .is_err()
161 {
162 break;
163 };
164 }
165
166 if cycles_counter == expire_cycles {
167 cycles_counter = 0;
168 } else {
169 cycles_counter += 1;
170 }
171
172 sleep(sleep_period).await;
173 }
174 }
175
176 pub fn registration_message(&self) -> (String, String) {
177 let id = &self.config.mqtt.entity;
178 let prefix = &self.config.mqtt.registration_prefix;
179 let version = env!("CARGO_PKG_VERSION");
180 let package_name = env!("CARGO_PKG_NAME");
181 let url = env!("CARGO_PKG_HOMEPAGE");
182
183 (
184 format!("{prefix}/device/{}/config", self.config.mqtt.entity),
185 format!(
186 r#"{{
187 "device": {{
188 "name": "{id}",
189 "identifiers": "{id}"
190 }},
191 "origin": {{
192 "name": "{package_name}",
193 "sw_version": "{version}",
194 "url": "{url}"
195 }},
196 "components": {{
197 "cpu_temp": {{
198 "name": "{id} CPU temperature",
199 "platform": "sensor",
200 "device_class": "temperature",
201 "state_class": "measurement",
202 "unit_of_measurement": "°C",
203 "unique_id": "cpu_temp",
204 "value_template": "{{{{ value_json.cpu_temp }}}}",
205 "expire_after": 60
206 }},
207 "cpu_usage": {{
208 "name": "{id} CPU usage",
209 "platform": "sensor",
210 "device_class": null,
211 "icon": "mdi:cpu-64-bit",
212 "state_class": "measurement",
213 "unit_of_measurement": "%",
214 "unique_id": "cpu_usage",
215 "value_template": "{{{{ value_json.cpu_usage }}}}",
216 "expire_after": 60
217 }},
218 "net_rx": {{
219 "name": "{id} Network RX rate",
220 "platform": "sensor",
221 "device_class": "data_rate",
222 "state_class": "measurement",
223 "unit_of_measurement": "KiB/s",
224 "unique_id": "net_rx",
225 "value_template": "{{{{ value_json.net_rx }}}}",
226 "expire_after": 60
227 }},
228 "net_tx": {{
229 "name": "{id} Network TX rate",
230 "platform": "sensor",
231 "device_class": "data_rate",
232 "state_class": "measurement",
233 "unit_of_measurement": "KiB/s",
234 "unique_id": "net_tx",
235 "value_template": "{{{{ value_json.net_tx }}}}",
236 "expire_after": 60
237 }},
238 "disk_usage": {{
239 "name": "{id} Disk usage",
240 "platform": "sensor",
241 "device_class": "data_size",
242 "state_class": "measurement",
243 "unit_of_measurement": "B",
244 "unique_id": "disk_usage",
245 "value_template": "{{{{ value_json.disk_usage }}}}",
246 "expire_after": 60
247 }}
248 }},
249 "state_topic": "mqtt-system-monitor/{id}/state"
250}}"#
251 ),
252 )
253 }
254
255 async fn publish<S>(client: &AsyncClient, topic: S, data: String) -> Result<(), ClientError>
256 where
257 S: Into<String> + std::fmt::Display,
258 {
259 debug!("Publishing to topic {topic} : {data}");
260 match client.publish(topic, QoS::AtLeastOnce, false, data).await {
261 Err(message) => {
262 error!("MQTT publish error: {message}");
263
264 Err(message)
265 }
266 _ => Ok(()),
267 }
268 }
269}
270
271impl Drop for Daemon {
272 fn drop(&mut self) {
273 self.stop.store(true, Ordering::Relaxed)
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 #[test]
282 fn test_update_rate() {
283 let mut start: Option<u64> = None;
284
285 assert_eq!(Daemon::update_rate(&mut start, None, 10), None);
287 assert_eq!(start, None);
288
289 assert_eq!(Daemon::update_rate(&mut start, Some(10), 10), None);
291 assert_eq!(start, Some(10));
292
293 assert_eq!(
295 Daemon::update_rate(&mut start, Some(10 + 2 * 1024 * 10), 10),
296 Some(2.0)
297 );
298 assert_eq!(start, Some(10 + 2 * 1024 * 10));
299 }
300}