1use crate::configuration::Configuration;
2use crate::home_assistant::{RegistrationDescriptor, Sensor};
3use crate::status::StatusMessage;
4use log::{debug, error, info, trace};
5use rumqttc::{AsyncClient, ClientError, MqttOptions, QoS};
6use std::sync::atomic::{AtomicBool, Ordering};
7use sysinfo::{
8 Component, Components, CpuRefreshKind, MemoryRefreshKind, Networks, RefreshKind, System,
9};
10use tokio::task;
11use tokio::time::sleep;
12
13pub struct Daemon {
14 config: Configuration,
15 mqtt_config: MqttOptions,
16 registration_descriptor: RegistrationDescriptor,
17
18 stop: AtomicBool,
19
20 system: System,
21 network: Networks,
22 temp_component: Option<Component>,
23
24 last_total_transmitted: Option<u64>,
25 last_total_received: Option<u64>,
26}
27
28impl Daemon {
29 pub fn new(config: Configuration) -> Daemon {
30 info!("Daemon for {} starting", config.mqtt.entity);
31
32 let system = System::new_with_specifics(
33 RefreshKind::nothing()
34 .with_cpu(CpuRefreshKind::everything())
35 .with_memory(MemoryRefreshKind::nothing().with_ram()),
36 );
37
38 let network = Networks::new_with_refreshed_list();
39
40 let components = Components::new_with_refreshed_list();
41
42 let mut mqtt_config =
43 MqttOptions::new(&config.mqtt.entity, &config.mqtt.host, config.mqtt.port);
44 mqtt_config.set_credentials(&config.mqtt.user, &config.mqtt.password);
45
46 info!(
47 "Connecting to MQTT broker {}:{}",
48 config.mqtt.host, config.mqtt.port
49 );
50
51 Daemon {
52 mqtt_config,
53 stop: AtomicBool::new(false),
54 registration_descriptor: RegistrationDescriptor::new(&config.mqtt.entity),
55 system,
56 network,
57 temp_component: Self::select_temp_component(
58 components,
59 config.sensors.temperature.as_deref(),
60 ),
61 last_total_transmitted: None,
62 last_total_received: None,
63 config,
64 }
65 }
66
67 fn select_temp_component(components: Components, temp_name: Option<&str>) -> Option<Component> {
68 let mut cmps = Vec::from(components);
69 let temp_label = temp_name?;
70
71 while let Some(c) = cmps.pop() {
72 if c.label() == temp_label {
73 return Some(c);
74 }
75 }
76 None
77 }
78
79 pub fn update_data(self: &mut Daemon) -> StatusMessage {
80 self.system.refresh_cpu_usage();
81
82 self.network.refresh(true);
83 let (net_tx, net_rx) = self.select_network();
84
85 let component = &mut self.temp_component;
86 if let Some(c) = component {
87 c.refresh();
88 }
89
90 StatusMessage {
91 cpu_usage: self.system.global_cpu_usage(),
92 disk_usage: None,
93 cpu_temp: component.as_ref().and_then(|c| c.temperature()),
94 net_tx: Self::update_rate(
95 &mut self.last_total_transmitted,
96 net_tx,
97 self.config.mqtt.update_period,
98 ),
99 net_rx: Self::update_rate(
100 &mut self.last_total_received,
101 net_rx,
102 self.config.mqtt.update_period,
103 ),
104 }
105 }
106
107 fn select_network(&mut self) -> (Option<u64>, Option<u64>) {
108 if let Some(network) = &self.config.sensors.network.as_deref() {
109 for (interface, net) in &self.network {
110 if network == interface {
111 return (Some(net.total_transmitted()), Some(net.total_received()));
112 }
113 }
114 };
115
116 (None, None)
117 }
118
119 fn update_rate(
120 last_val: &mut Option<u64>,
121 cur: Option<u64>,
122 update_period: u64,
123 ) -> Option<f64> {
124 let cur = cur?;
125 let last = *last_val;
126 *last_val = Some(cur);
127
128 if let Some(last) = last
129 && last <= cur
130 {
131 Some(((cur - last) / update_period) as f64 / 1024.0)
132 } else {
133 None
134 }
135 }
136
137 pub fn register_sensors(&mut self) {
138 let entity = self.config.mqtt.entity.as_str();
139 self.registration_descriptor
140 .add_component(Sensor::CpuUsage, entity);
141 if self.temp_component.is_some() {
142 self.registration_descriptor
143 .add_component(Sensor::CpuTemperature, entity);
144 }
145 if self.config.sensors.network.is_some() {
146 self.registration_descriptor
147 .add_component(Sensor::NetTx, entity);
148 self.registration_descriptor
149 .add_component(Sensor::NetRx, entity);
150 }
151 }
152
153 pub async fn run(self: &mut Daemon) {
154 let mut cycles_counter = 0;
155 let mut register: bool;
156
157 let expire_cycles = 60 / self.config.mqtt.update_period - 1;
158 let sleep_period = std::time::Duration::from_secs(self.config.mqtt.update_period);
159
160 let topic = format!("mqtt-system-monitor/{}/state", self.config.mqtt.entity);
161 self.register_sensors();
162
163 let (client, mut eventloop) = AsyncClient::new(self.mqtt_config.clone(), 1);
164
165 task::spawn(async move {
166 while let Ok(notification) = eventloop.poll().await {
167 trace!("MQTT notification received: {notification:?}");
168 }
169 });
170
171 while !self.stop.load(Ordering::Relaxed) {
172 register = cycles_counter == 0;
173
174 if Daemon::publish(&client, &topic, self.update_data().to_string())
175 .await
176 .is_err()
177 {
178 break;
179 }
180
181 if register {
182 let prefix = &self.config.mqtt.registration_prefix;
183 let descriptor = self.registration_descriptor();
184
185 if Daemon::publish(
186 &client,
187 descriptor.discovery_topic(prefix),
188 descriptor.to_string(),
189 )
190 .await
191 .is_err()
192 {
193 break;
194 };
195 }
196
197 if cycles_counter == expire_cycles {
198 cycles_counter = 0;
199 } else {
200 cycles_counter += 1;
201 }
202
203 sleep(sleep_period).await;
204 }
205 }
206
207 pub fn registration_descriptor(&self) -> &RegistrationDescriptor {
208 &self.registration_descriptor
209 }
210
211 async fn publish<S>(client: &AsyncClient, topic: S, data: String) -> Result<(), ClientError>
212 where
213 S: Into<String> + std::fmt::Display,
214 {
215 debug!("Publishing to topic {topic} : {data}");
216 match client.publish(topic, QoS::AtLeastOnce, false, data).await {
217 Err(message) => {
218 error!("MQTT publish error: {message}");
219
220 Err(message)
221 }
222 _ => Ok(()),
223 }
224 }
225}
226
227impl Drop for Daemon {
228 fn drop(&mut self) {
229 self.stop.store(true, Ordering::Relaxed)
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn test_update_rate() {
239 let mut start: Option<u64> = None;
240
241 assert_eq!(Daemon::update_rate(&mut start, None, 10), None);
243 assert_eq!(start, None);
244
245 assert_eq!(Daemon::update_rate(&mut start, Some(10), 10), None);
247 assert_eq!(start, Some(10));
248
249 assert_eq!(
251 Daemon::update_rate(&mut start, Some(10 + 2 * 1024 * 10), 10),
252 Some(2.0)
253 );
254 assert_eq!(start, Some(10 + 2 * 1024 * 10));
255 }
256}