1use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use tokio::sync::RwLock;
12use tracing::error;
13use tracing::info;
14
15pub mod config;
16pub(crate) mod dbus;
17pub mod json;
18pub mod logging;
19pub mod machines;
20pub mod networkd;
21pub mod pid1;
22pub mod system;
23pub mod timer;
24pub mod units;
25
26pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
27
28#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
29pub struct MachineStats {
30 pub networkd: networkd::NetworkdState,
31 pub pid1: Option<pid1::Pid1Stats>,
32 pub system_state: system::SystemdSystemState,
33 pub units: units::SystemdUnitStats,
34 pub version: system::SystemdVersion,
35}
36
37#[derive(serde::Serialize, serde::Deserialize, Debug, Default, Eq, PartialEq)]
39pub struct MonitordStats {
40 pub networkd: networkd::NetworkdState,
41 pub pid1: Option<pid1::Pid1Stats>,
42 pub system_state: system::SystemdSystemState,
43 pub units: units::SystemdUnitStats,
44 pub version: system::SystemdVersion,
45 pub machines: HashMap<String, MachineStats>,
46}
47
48pub fn print_stats(
50 key_prefix: &str,
51 output_format: &config::MonitordOutputFormat,
52 stats: &MonitordStats,
53) {
54 match output_format {
55 config::MonitordOutputFormat::Json => println!(
56 "{}",
57 serde_json::to_string(&stats).expect("Invalid JSON serialization")
58 ),
59 config::MonitordOutputFormat::JsonFlat => println!(
60 "{}",
61 json::flatten(stats, &key_prefix.to_string()).expect("Invalid JSON serialization")
62 ),
63 config::MonitordOutputFormat::JsonPretty => println!(
64 "{}",
65 serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
66 ),
67 }
68}
69
70pub async fn stat_collector(
73 config: config::Config,
74 maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
75 output_stats: bool,
76) -> anyhow::Result<()> {
77 let mut collect_interval_ms: u128 = 0;
78 if config.monitord.daemon {
79 collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
80 }
81
82 let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
83 maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
84 let locked_machine_stats: Arc<RwLock<MachineStats>> =
85 Arc::new(RwLock::new(MachineStats::default()));
86 std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
87 let sdc = zbus::Connection::system().await?;
88 let mut join_set = tokio::task::JoinSet::new();
89
90 loop {
91 let collect_start_time = Instant::now();
92 let mut ran_collector_count: u8 = 0;
93
94 info!("Starting stat collection run");
95
96 if config.pid1.enabled {
98 ran_collector_count += 1;
99 join_set.spawn(crate::pid1::update_pid1_stats(
100 1,
101 locked_machine_stats.clone(),
102 ));
103 }
104
105 if config.networkd.enabled {
107 ran_collector_count += 1;
108 join_set.spawn(crate::networkd::update_networkd_stats(
109 config.networkd.link_state_dir.clone(),
110 None,
111 sdc.clone(),
112 locked_machine_stats.clone(),
113 ));
114 }
115
116 if config.system_state.enabled {
118 ran_collector_count += 1;
119 join_set.spawn(crate::system::update_system_stats(
120 sdc.clone(),
121 locked_machine_stats.clone(),
122 ));
123 }
124 join_set.spawn(crate::system::update_version(
126 sdc.clone(),
127 locked_machine_stats.clone(),
128 ));
129
130 if config.units.enabled {
132 ran_collector_count += 1;
133 join_set.spawn(crate::units::update_unit_stats(
134 config.clone(),
135 sdc.clone(),
136 locked_machine_stats.clone(),
137 ));
138 }
139
140 if config.machines.enabled {
141 ran_collector_count += 1;
142 join_set.spawn(crate::machines::update_machines_stats(
143 config.clone(),
144 sdc.clone(),
145 locked_monitord_stats.clone(),
146 ));
147 }
148
149 if ran_collector_count < 1 {
150 error!("No collectors scheduled to run. Exiting");
151 std::process::exit(1);
152 }
153
154 while let Some(res) = join_set.join_next().await {
156 match res {
157 Ok(r) => match r {
158 Ok(_) => (),
159 Err(e) => {
160 error!("Collection specific failure: {:?}", e);
161 }
162 },
163 Err(e) => {
164 error!("Join error: {:?}", e);
165 }
166 }
167 }
168
169 {
170 let mut monitord_stats = locked_monitord_stats.write().await;
172 let machine_stats = locked_machine_stats.read().await;
173 monitord_stats.pid1 = machine_stats.pid1.clone();
174 monitord_stats.networkd = machine_stats.networkd.clone();
175 monitord_stats.system_state = machine_stats.system_state;
176 monitord_stats.version = machine_stats.version.clone();
177 monitord_stats.units = machine_stats.units.clone();
178 }
179
180 let elapsed_runtime_ms = collect_start_time.elapsed().as_millis();
181
182 info!("stat collection run took {}ms", elapsed_runtime_ms);
183 if output_stats {
184 let monitord_stats = locked_monitord_stats.read().await;
185 print_stats(
186 &config.monitord.key_prefix,
187 &config.monitord.output_format,
188 &monitord_stats,
189 );
190 }
191 if !config.monitord.daemon {
192 break;
193 }
194 let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
195 info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
196 tokio::time::sleep(Duration::from_millis(
197 sleep_time_ms
198 .try_into()
199 .expect("Sleep time does not fit into a u64 :O"),
200 ))
201 .await;
202 }
203 Ok(())
204}