monitord 0.23.0

monitord ... know how happy your systemd is! 😊
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
//! # monitord Crate
//!
//! `monitord` is a library to gather statistics about systemd.

use std::sync::Arc;

use std::collections::HashMap;
use std::time::Duration;
use std::time::Instant;

use thiserror::Error;
use tokio::sync::RwLock;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;

#[derive(Error, Debug)]
pub enum MonitordError {
    #[error("D-Bus connection error: {0}")]
    ZbusError(#[from] zbus::Error),
}

pub mod boot;
pub mod config;
pub(crate) mod dbus;
pub mod dbus_stats;
pub mod json;
pub mod logging;
pub mod machines;
pub mod networkd;
pub mod pid1;
pub mod system;
pub mod timer;
pub mod unit_constants;
pub mod units;
pub mod varlink;
pub mod varlink_networkd;
pub mod varlink_units;
pub mod verify;

pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";

/// Per-collector timing for a single stat collection run.
///
/// `start_offset_ms` is the wall time between the top of the collection cycle and
/// the moment this collector's future was first polled. A non-trivial offset
/// indicates the spawn/scheduling loop or the runtime is delaying first poll,
/// which means collectors are not starting in parallel as intended.
///
/// `elapsed_ms` is the wall time between first poll and completion.
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
pub struct CollectorTiming {
    /// Name of the collector (e.g. "units", "pid1", "dbus_stats")
    pub name: String,
    /// Milliseconds from top of the run until the spawned future's first poll.
    /// Should be small (< a few ms) when collectors are truly running in parallel.
    pub start_offset_ms: f64,
    /// Milliseconds from first poll to future completion.
    pub elapsed_ms: f64,
    /// Whether the collector returned Ok.
    pub success: bool,
}

/// Stats collected for a single systemd-nspawn container or VM managed by systemd-machined
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
pub struct MachineStats {
    /// systemd-networkd interface states inside the container
    pub networkd: networkd::NetworkdState,
    /// PID 1 process stats from procfs (using the container's leader PID)
    pub pid1: Option<pid1::Pid1Stats>,
    /// Overall systemd system state (e.g. running, degraded) inside the container
    pub system_state: system::SystemdSystemState,
    /// Aggregated systemd unit counts and per-service/timer stats inside the container
    pub units: units::SystemdUnitStats,
    /// systemd version running inside the container
    pub version: system::SystemdVersion,
    /// D-Bus daemon/broker statistics inside the container
    pub dbus_stats: Option<dbus_stats::DBusStats>,
    /// Boot blame statistics: slowest units at boot with activation times in seconds
    #[serde(skip_serializing_if = "Option::is_none")]
    pub boot_blame: Option<boot::BootBlameStats>,
    /// Unit verification error statistics
    pub verify_stats: Option<verify::VerifyStats>,
}

/// Root struct containing all enabled monitord metrics for the host system and containers
#[derive(serde::Serialize, serde::Deserialize, Debug, Default, PartialEq)]
pub struct MonitordStats {
    /// systemd-networkd interface states and managed interface count
    pub networkd: networkd::NetworkdState,
    /// PID 1 (systemd) process stats from procfs: CPU, memory, FDs, tasks
    pub pid1: Option<pid1::Pid1Stats>,
    /// Overall systemd manager state (e.g. running, degraded, initializing)
    pub system_state: system::SystemdSystemState,
    /// Aggregated systemd unit counts by type/state and per-service/timer detailed metrics
    pub units: units::SystemdUnitStats,
    /// Installed systemd version (major.minor.revision.os)
    pub version: system::SystemdVersion,
    /// D-Bus daemon/broker statistics (connections, bus names, match rules, per-peer accounting)
    pub dbus_stats: Option<dbus_stats::DBusStats>,
    /// Per-container stats keyed by machine name, collected via systemd-machined
    pub machines: HashMap<String, MachineStats>,
    /// Boot blame statistics: slowest units at boot with activation times in seconds
    #[serde(skip_serializing_if = "Option::is_none")]
    pub boot_blame: Option<boot::BootBlameStats>,
    /// Unit verification error statistics
    pub verify_stats: Option<verify::VerifyStats>,
    /// End-to-end duration of the last stat collection run in milliseconds.
    pub stat_collection_run_time_ms: f64,
    /// Per-collector timings from the last run, sorted slowest first. Empty
    /// before the first run completes. Callers compute parallelism ratio
    /// (sum of `elapsed_ms` / `stat_collection_run_time_ms`) and identify the
    /// gating collector (first entry) directly from this vector.
    pub collector_timings: Vec<CollectorTiming>,
}

/// Print statistics in the format set in configuration
pub fn print_stats(
    key_prefix: &str,
    output_format: &config::MonitordOutputFormat,
    stats: &MonitordStats,
) {
    match output_format {
        config::MonitordOutputFormat::Json => println!(
            "{}",
            serde_json::to_string(&stats).expect("Invalid JSON serialization")
        ),
        config::MonitordOutputFormat::JsonFlat => println!(
            "{}",
            json::flatten(stats, key_prefix).expect("Invalid JSON serialization")
        ),
        config::MonitordOutputFormat::JsonPretty => println!(
            "{}",
            serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
        ),
    }
}

fn set_stat_collection_run_time(stats: &mut MonitordStats, elapsed_runtime: Duration) {
    stats.stat_collection_run_time_ms = elapsed_runtime.as_secs_f64() * 1000.0;
}

/// Output produced by every spawned collector future after wrapping with timing.
type TimedCollectorOutput = (String, anyhow::Result<()>, Duration, Duration);

/// Spawn a collector future onto the join set with timing instrumentation.
///
/// The wrapping closure records the moment the future is first polled (relative
/// to `collect_start`) and the elapsed wall time until it completes. Both
/// durations and the collector name are returned alongside the original result.
fn spawn_timed<F>(
    join_set: &mut tokio::task::JoinSet<TimedCollectorOutput>,
    name: &'static str,
    collect_start: Instant,
    fut: F,
) where
    F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
{
    join_set.spawn(async move {
        let task_first_poll = Instant::now();
        let start_offset = task_first_poll.duration_since(collect_start);
        let result = fut.await;
        let elapsed = task_first_poll.elapsed();
        (name.to_string(), result, start_offset, elapsed)
    });
}

/// Reuse an existing D-Bus connection or create a new system bus connection.
async fn get_or_create_dbus_connection(
    config: &config::Config,
    maybe_connection: Option<zbus::Connection>,
) -> Result<zbus::Connection, MonitordError> {
    match maybe_connection {
        Some(conn) => Ok(conn),
        None => Ok(zbus::connection::Builder::system()?
            .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
            .build()
            .await?),
    }
}

/// Main statistic collection function running what's required by configuration in parallel
/// Takes an optional locked stats struct to update and to output stats to STDOUT or not.
/// Takes an optional D-Bus connection. Returns `Some(connection)` if the
/// collection cycle completed without errors (meaning the connection is reusable),
/// `None` if errors occurred.
pub async fn stat_collector(
    config: config::Config,
    maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
    output_stats: bool,
    maybe_connection: Option<zbus::Connection>,
) -> Result<Option<zbus::Connection>, MonitordError> {
    let mut collect_interval_ms: u128 = 0;
    if config.monitord.daemon {
        collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
    }

    let config = Arc::new(config);
    let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
        maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
    let locked_machine_stats: Arc<RwLock<MachineStats>> =
        Arc::new(RwLock::new(MachineStats::default()));
    let cached_machine_connections: Arc<tokio::sync::Mutex<machines::MachineConnections>> =
        Arc::new(tokio::sync::Mutex::new(HashMap::new()));
    std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
    let sdc = get_or_create_dbus_connection(&config, maybe_connection).await?;
    let mut join_set: tokio::task::JoinSet<TimedCollectorOutput> = tokio::task::JoinSet::new();
    let mut had_error;

    loop {
        let collect_start_time = Instant::now();
        info!("Starting stat collection run");

        // Always collect systemd version

        spawn_timed(
            &mut join_set,
            "version",
            collect_start_time,
            crate::system::update_version(sdc.clone(), locked_machine_stats.clone()),
        );

        // Collect pid1 procfs stats
        if config.pid1.enabled {
            spawn_timed(
                &mut join_set,
                "pid1",
                collect_start_time,
                crate::pid1::update_pid1_stats(1, locked_machine_stats.clone()),
            );
        }

        // Run networkd collector if enabled
        if config.networkd.enabled {
            let config_clone = Arc::clone(&config);
            let sdc_clone = sdc.clone();
            let stats_clone = locked_machine_stats.clone();
            spawn_timed(&mut join_set, "networkd", collect_start_time, async move {
                if config_clone.varlink.enabled {
                    let socket_path = crate::varlink_networkd::NETWORK_SOCKET_PATH.to_string();
                    match crate::varlink_networkd::get_networkd_state(&socket_path).await {
                        Ok(networkd_stats) => {
                            let mut machine_stats = stats_clone.write().await;
                            machine_stats.networkd = networkd_stats;
                            return Ok(());
                        }
                        Err(err) => {
                            warn!(
                                "Varlink networkd stats failed, falling back to file-based: {:?}",
                                err
                            );
                        }
                    }
                }
                crate::networkd::update_networkd_stats(
                    config_clone.networkd.link_state_dir.clone(),
                    None,
                    sdc_clone,
                    stats_clone,
                )
                .await
            });
        }

        // Run system running (SystemState) state collector
        if config.system_state.enabled {
            spawn_timed(
                &mut join_set,
                "system_state",
                collect_start_time,
                crate::system::update_system_stats(sdc.clone(), locked_machine_stats.clone()),
            );
        }

        // Run service collectors if there are services listed in config
        if config.units.enabled {
            let config_clone = Arc::clone(&config);
            let sdc_clone = sdc.clone();
            let stats_clone = locked_machine_stats.clone();
            spawn_timed(&mut join_set, "units", collect_start_time, async move {
                if config_clone.varlink.enabled {
                    let socket_path = crate::varlink_units::METRICS_SOCKET_PATH.to_string();
                    match crate::varlink_units::update_unit_stats(
                        Arc::clone(&config_clone),
                        stats_clone.clone(),
                        socket_path,
                    )
                    .await
                    {
                        Ok(()) => {
                            // Timer properties are not yet exposed via varlink; collect via D-Bus.
                            match crate::timer::collect_all_timers_dbus(&sdc_clone, &config_clone)
                                .await
                            {
                                Ok(timer_stats) => {
                                    let mut ms = stats_clone.write().await;
                                    ms.units.timer_stats = timer_stats.timer_stats;
                                    ms.units.timer_persistent_units =
                                        timer_stats.timer_persistent_units;
                                    ms.units.timer_remain_after_elapse =
                                        timer_stats.timer_remain_after_elapse;
                                }
                                Err(err) => {
                                    warn!("Varlink timer stats (D-Bus fallback) failed: {:?}", err);
                                }
                            }
                            if config_clone.units.unit_files {
                                let unit_files = crate::units::collect_unit_files_stats("").await;
                                let mut ms = stats_clone.write().await;
                                ms.units.unit_files = unit_files;
                            }
                            return Ok(());
                        }
                        Err(err) => {
                            warn!(
                                "Varlink units stats failed, falling back to D-Bus: {:?}",
                                err
                            );
                        }
                    }
                }
                crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone, String::new())
                    .await
            });
        }

        if config.machines.enabled {
            spawn_timed(
                &mut join_set,
                "machines",
                collect_start_time,
                crate::machines::update_machines_stats(
                    Arc::clone(&config),
                    sdc.clone(),
                    locked_monitord_stats.clone(),
                    cached_machine_connections.clone(),
                ),
            );
        }

        if config.dbus_stats.enabled {
            spawn_timed(
                &mut join_set,
                "dbus_stats",
                collect_start_time,
                crate::dbus_stats::update_dbus_stats(
                    Arc::clone(&config),
                    sdc.clone(),
                    locked_machine_stats.clone(),
                ),
            );
        }

        if config.boot_blame.enabled {
            spawn_timed(
                &mut join_set,
                "boot_blame",
                collect_start_time,
                crate::boot::update_boot_blame_stats(
                    Arc::clone(&config),
                    sdc.clone(),
                    locked_machine_stats.clone(),
                ),
            );
        }

        if config.verify.enabled {
            spawn_timed(
                &mut join_set,
                "verify",
                collect_start_time,
                crate::verify::update_verify_stats(
                    sdc.clone(),
                    locked_machine_stats.clone(),
                    config.verify.allowlist.clone(),
                    config.verify.blocklist.clone(),
                ),
            );
        }

        if join_set.len() == 1 {
            warn!("No collectors except systemd version scheduled to run. Exiting");
        }

        // Drain join_set, collect per-collector timings + log per-collector failures
        had_error = false;
        let mut timings: Vec<CollectorTiming> = Vec::new();
        while let Some(res) = join_set.join_next().await {
            match res {
                Ok((name, collector_result, start_offset, elapsed)) => {
                    let success = collector_result.is_ok();
                    if let Err(e) = collector_result {
                        had_error = true;
                        error!("Collector '{}' failure: {:?}", name, e);
                    }
                    timings.push(CollectorTiming {
                        name,
                        start_offset_ms: start_offset.as_secs_f64() * 1000.0,
                        elapsed_ms: elapsed.as_secs_f64() * 1000.0,
                        success,
                    });
                }
                Err(e) => {
                    had_error = true;
                    error!("Join error: {:?}", e);
                }
            }
        }

        let elapsed_runtime = collect_start_time.elapsed();
        let elapsed_runtime_ms = elapsed_runtime.as_millis();

        // Sort timings by elapsed desc so the slowest collector is first in the JSON output
        timings.sort_by(|a, b| {
            b.elapsed_ms
                .partial_cmp(&a.elapsed_ms)
                .unwrap_or(std::cmp::Ordering::Equal)
        });

        // Per-collector lines log at debug! to keep daemon-mode noise low.
        // The same data is on MonitordStats::collector_timings for callers that need it.
        for t in &timings {
            debug!(
                "collector '{}' start_offset={:.1}ms elapsed={:.1}ms{}",
                t.name,
                t.start_offset_ms,
                t.elapsed_ms,
                if t.success { "" } else { " (FAILED)" },
            );
        }

        {
            // Update monitord stats with machine stats
            let mut monitord_stats = locked_monitord_stats.write().await;
            let machine_stats = locked_machine_stats.read().await;
            monitord_stats.pid1 = machine_stats.pid1.clone();
            monitord_stats.networkd = machine_stats.networkd.clone();
            monitord_stats.system_state = machine_stats.system_state;
            monitord_stats.version = machine_stats.version.clone();
            monitord_stats.units = machine_stats.units.clone();
            monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
            monitord_stats.boot_blame = machine_stats.boot_blame.clone();
            monitord_stats.verify_stats = machine_stats.verify_stats.clone();
            set_stat_collection_run_time(&mut monitord_stats, elapsed_runtime);
            monitord_stats.collector_timings = timings;
        }

        info!("stat collection run took {}ms", elapsed_runtime_ms);
        if output_stats {
            let monitord_stats = locked_monitord_stats.read().await;
            print_stats(
                &config.monitord.key_prefix,
                &config.monitord.output_format,
                &monitord_stats,
            );
        }
        if !config.monitord.daemon {
            break;
        }
        let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
        info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
        tokio::time::sleep(Duration::from_millis(
            sleep_time_ms
                .try_into()
                .expect("Sleep time does not fit into a u64 :O"),
        ))
        .await;
    }
    Ok(if had_error { None } else { Some(sdc) })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_stat_collection_run_time_ms_conversion() {
        let mut stats = MonitordStats::default();
        set_stat_collection_run_time(&mut stats, Duration::from_millis(5));
        assert_eq!(stats.stat_collection_run_time_ms, 5.0);

        set_stat_collection_run_time(&mut stats, Duration::from_micros(500));
        assert!((stats.stat_collection_run_time_ms - 0.5).abs() < f64::EPSILON);
    }
}