monitord/
lib.rs

1//! # monitord Crate
2//!
3//! `monitord` is a library to gather statistics about systemd.
4
5use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use tokio::sync::RwLock;
12use tracing::error;
13use tracing::info;
14
15pub mod config;
16pub(crate) mod dbus;
17pub mod json;
18pub mod logging;
19pub mod machines;
20pub mod networkd;
21pub mod pid1;
22pub mod system;
23pub mod timer;
24pub mod units;
25
26pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
27
28#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
29pub struct MachineStats {
30    pub networkd: networkd::NetworkdState,
31    pub pid1: Option<pid1::Pid1Stats>,
32    pub system_state: system::SystemdSystemState,
33    pub units: units::SystemdUnitStats,
34    pub version: system::SystemdVersion,
35}
36
37/// Main monitord stats struct collection all enabled stats
38#[derive(serde::Serialize, serde::Deserialize, Debug, Default, Eq, PartialEq)]
39pub struct MonitordStats {
40    pub networkd: networkd::NetworkdState,
41    pub pid1: Option<pid1::Pid1Stats>,
42    pub system_state: system::SystemdSystemState,
43    pub units: units::SystemdUnitStats,
44    pub version: system::SystemdVersion,
45    pub machines: HashMap<String, MachineStats>,
46}
47
48/// Print statistics in the format set in configuration
49pub fn print_stats(
50    key_prefix: &str,
51    output_format: &config::MonitordOutputFormat,
52    stats: &MonitordStats,
53) {
54    match output_format {
55        config::MonitordOutputFormat::Json => println!(
56            "{}",
57            serde_json::to_string(&stats).expect("Invalid JSON serialization")
58        ),
59        config::MonitordOutputFormat::JsonFlat => println!(
60            "{}",
61            json::flatten(stats, &key_prefix.to_string()).expect("Invalid JSON serialization")
62        ),
63        config::MonitordOutputFormat::JsonPretty => println!(
64            "{}",
65            serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
66        ),
67    }
68}
69
70/// Main statictic collection function running what's required by configuration in parallel
71/// Takes an optional locked stats struct to update and to output stats to STDOUT or not
72pub async fn stat_collector(
73    config: config::Config,
74    maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
75    output_stats: bool,
76) -> anyhow::Result<()> {
77    let mut collect_interval_ms: u128 = 0;
78    if config.monitord.daemon {
79        collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
80    }
81
82    let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
83        maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
84    let locked_machine_stats: Arc<RwLock<MachineStats>> =
85        Arc::new(RwLock::new(MachineStats::default()));
86    std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
87    let sdc = zbus::Connection::system().await?;
88    let mut join_set = tokio::task::JoinSet::new();
89
90    loop {
91        let collect_start_time = Instant::now();
92        let mut ran_collector_count: u8 = 0;
93
94        info!("Starting stat collection run");
95
96        // Collect pid1 procfs stats
97        if config.pid1.enabled {
98            ran_collector_count += 1;
99            join_set.spawn(crate::pid1::update_pid1_stats(
100                1,
101                locked_machine_stats.clone(),
102            ));
103        }
104
105        // Run networkd collector if enabled
106        if config.networkd.enabled {
107            ran_collector_count += 1;
108            join_set.spawn(crate::networkd::update_networkd_stats(
109                config.networkd.link_state_dir.clone(),
110                None,
111                sdc.clone(),
112                locked_machine_stats.clone(),
113            ));
114        }
115
116        // Run system running (SystemState) state collector
117        if config.system_state.enabled {
118            ran_collector_count += 1;
119            join_set.spawn(crate::system::update_system_stats(
120                sdc.clone(),
121                locked_machine_stats.clone(),
122            ));
123        }
124        // Not incrementing the ran_collector_count on purpose as this is always on by default
125        join_set.spawn(crate::system::update_version(
126            sdc.clone(),
127            locked_machine_stats.clone(),
128        ));
129
130        // Run service collectors if there are services listed in config
131        if config.units.enabled {
132            ran_collector_count += 1;
133            join_set.spawn(crate::units::update_unit_stats(
134                config.clone(),
135                sdc.clone(),
136                locked_machine_stats.clone(),
137            ));
138        }
139
140        if config.machines.enabled {
141            ran_collector_count += 1;
142            join_set.spawn(crate::machines::update_machines_stats(
143                config.clone(),
144                sdc.clone(),
145                locked_monitord_stats.clone(),
146            ));
147        }
148
149        if ran_collector_count < 1 {
150            error!("No collectors scheduled to run. Exiting");
151            std::process::exit(1);
152        }
153
154        // Check all collection for errors and log if one fails
155        while let Some(res) = join_set.join_next().await {
156            match res {
157                Ok(r) => match r {
158                    Ok(_) => (),
159                    Err(e) => {
160                        error!("Collection specific failure: {:?}", e);
161                    }
162                },
163                Err(e) => {
164                    error!("Join error: {:?}", e);
165                }
166            }
167        }
168
169        {
170            // Update monitord stats with machine stats
171            let mut monitord_stats = locked_monitord_stats.write().await;
172            let machine_stats = locked_machine_stats.read().await;
173            monitord_stats.pid1 = machine_stats.pid1.clone();
174            monitord_stats.networkd = machine_stats.networkd.clone();
175            monitord_stats.system_state = machine_stats.system_state;
176            monitord_stats.version = machine_stats.version.clone();
177            monitord_stats.units = machine_stats.units.clone();
178        }
179
180        let elapsed_runtime_ms = collect_start_time.elapsed().as_millis();
181
182        info!("stat collection run took {}ms", elapsed_runtime_ms);
183        if output_stats {
184            let monitord_stats = locked_monitord_stats.read().await;
185            print_stats(
186                &config.monitord.key_prefix,
187                &config.monitord.output_format,
188                &monitord_stats,
189            );
190        }
191        if !config.monitord.daemon {
192            break;
193        }
194        let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
195        info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
196        tokio::time::sleep(Duration::from_millis(
197            sleep_time_ms
198                .try_into()
199                .expect("Sleep time does not fit into a u64 :O"),
200        ))
201        .await;
202    }
203    Ok(())
204}