use std::sync::Arc;
use std::collections::HashMap;
use std::time::Duration;
use std::time::Instant;
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
#[derive(Error, Debug)]
pub enum MonitordError {
#[error("D-Bus connection error: {0}")]
ZbusError(#[from] zbus::Error),
}
pub mod boot;
pub mod config;
pub(crate) mod dbus;
pub mod dbus_stats;
pub mod json;
pub mod logging;
pub mod machines;
pub mod networkd;
pub mod pid1;
pub mod system;
pub mod timer;
pub mod unit_constants;
pub mod units;
pub mod varlink;
pub mod varlink_networkd;
pub mod varlink_units;
pub mod verify;
pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
pub struct CollectorTiming {
pub name: String,
pub start_offset_ms: f64,
pub elapsed_ms: f64,
pub success: bool,
}
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
pub struct MachineStats {
pub networkd: networkd::NetworkdState,
pub pid1: Option<pid1::Pid1Stats>,
pub system_state: system::SystemdSystemState,
pub units: units::SystemdUnitStats,
pub version: system::SystemdVersion,
pub dbus_stats: Option<dbus_stats::DBusStats>,
#[serde(skip_serializing_if = "Option::is_none")]
pub boot_blame: Option<boot::BootBlameStats>,
pub verify_stats: Option<verify::VerifyStats>,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Default, PartialEq)]
pub struct MonitordStats {
pub networkd: networkd::NetworkdState,
pub pid1: Option<pid1::Pid1Stats>,
pub system_state: system::SystemdSystemState,
pub units: units::SystemdUnitStats,
pub version: system::SystemdVersion,
pub dbus_stats: Option<dbus_stats::DBusStats>,
pub machines: HashMap<String, MachineStats>,
#[serde(skip_serializing_if = "Option::is_none")]
pub boot_blame: Option<boot::BootBlameStats>,
pub verify_stats: Option<verify::VerifyStats>,
pub stat_collection_run_time_ms: f64,
pub collector_timings: Vec<CollectorTiming>,
}
pub fn print_stats(
key_prefix: &str,
output_format: &config::MonitordOutputFormat,
stats: &MonitordStats,
) {
match output_format {
config::MonitordOutputFormat::Json => println!(
"{}",
serde_json::to_string(&stats).expect("Invalid JSON serialization")
),
config::MonitordOutputFormat::JsonFlat => println!(
"{}",
json::flatten(stats, key_prefix).expect("Invalid JSON serialization")
),
config::MonitordOutputFormat::JsonPretty => println!(
"{}",
serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
),
}
}
fn set_stat_collection_run_time(stats: &mut MonitordStats, elapsed_runtime: Duration) {
stats.stat_collection_run_time_ms = elapsed_runtime.as_secs_f64() * 1000.0;
}
type TimedCollectorOutput = (String, anyhow::Result<()>, Duration, Duration);
fn spawn_timed<F>(
join_set: &mut tokio::task::JoinSet<TimedCollectorOutput>,
name: &'static str,
collect_start: Instant,
fut: F,
) where
F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
{
join_set.spawn(async move {
let task_first_poll = Instant::now();
let start_offset = task_first_poll.duration_since(collect_start);
let result = fut.await;
let elapsed = task_first_poll.elapsed();
(name.to_string(), result, start_offset, elapsed)
});
}
async fn get_or_create_dbus_connection(
config: &config::Config,
maybe_connection: Option<zbus::Connection>,
) -> Result<zbus::Connection, MonitordError> {
match maybe_connection {
Some(conn) => Ok(conn),
None => Ok(zbus::connection::Builder::system()?
.method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
.build()
.await?),
}
}
pub async fn stat_collector(
config: config::Config,
maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
output_stats: bool,
maybe_connection: Option<zbus::Connection>,
) -> Result<Option<zbus::Connection>, MonitordError> {
let mut collect_interval_ms: u128 = 0;
if config.monitord.daemon {
collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
}
let config = Arc::new(config);
let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
let locked_machine_stats: Arc<RwLock<MachineStats>> =
Arc::new(RwLock::new(MachineStats::default()));
let cached_machine_connections: Arc<tokio::sync::Mutex<machines::MachineConnections>> =
Arc::new(tokio::sync::Mutex::new(HashMap::new()));
std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
let sdc = get_or_create_dbus_connection(&config, maybe_connection).await?;
let mut join_set: tokio::task::JoinSet<TimedCollectorOutput> = tokio::task::JoinSet::new();
let mut had_error;
loop {
let collect_start_time = Instant::now();
info!("Starting stat collection run");
spawn_timed(
&mut join_set,
"version",
collect_start_time,
crate::system::update_version(sdc.clone(), locked_machine_stats.clone()),
);
if config.pid1.enabled {
spawn_timed(
&mut join_set,
"pid1",
collect_start_time,
crate::pid1::update_pid1_stats(1, locked_machine_stats.clone()),
);
}
if config.networkd.enabled {
let config_clone = Arc::clone(&config);
let sdc_clone = sdc.clone();
let stats_clone = locked_machine_stats.clone();
spawn_timed(&mut join_set, "networkd", collect_start_time, async move {
if config_clone.varlink.enabled {
let socket_path = crate::varlink_networkd::NETWORK_SOCKET_PATH.to_string();
match crate::varlink_networkd::get_networkd_state(&socket_path).await {
Ok(networkd_stats) => {
let mut machine_stats = stats_clone.write().await;
machine_stats.networkd = networkd_stats;
return Ok(());
}
Err(err) => {
warn!(
"Varlink networkd stats failed, falling back to file-based: {:?}",
err
);
}
}
}
crate::networkd::update_networkd_stats(
config_clone.networkd.link_state_dir.clone(),
None,
sdc_clone,
stats_clone,
)
.await
});
}
if config.system_state.enabled {
spawn_timed(
&mut join_set,
"system_state",
collect_start_time,
crate::system::update_system_stats(sdc.clone(), locked_machine_stats.clone()),
);
}
if config.units.enabled {
let config_clone = Arc::clone(&config);
let sdc_clone = sdc.clone();
let stats_clone = locked_machine_stats.clone();
spawn_timed(&mut join_set, "units", collect_start_time, async move {
if config_clone.varlink.enabled {
let socket_path = crate::varlink_units::METRICS_SOCKET_PATH.to_string();
match crate::varlink_units::update_unit_stats(
Arc::clone(&config_clone),
stats_clone.clone(),
socket_path,
)
.await
{
Ok(()) => {
match crate::timer::collect_all_timers_dbus(&sdc_clone, &config_clone)
.await
{
Ok(timer_stats) => {
let mut ms = stats_clone.write().await;
ms.units.timer_stats = timer_stats.timer_stats;
ms.units.timer_persistent_units =
timer_stats.timer_persistent_units;
ms.units.timer_remain_after_elapse =
timer_stats.timer_remain_after_elapse;
}
Err(err) => {
warn!("Varlink timer stats (D-Bus fallback) failed: {:?}", err);
}
}
if config_clone.units.unit_files {
let unit_files = crate::units::collect_unit_files_stats("").await;
let mut ms = stats_clone.write().await;
ms.units.unit_files = unit_files;
}
return Ok(());
}
Err(err) => {
warn!(
"Varlink units stats failed, falling back to D-Bus: {:?}",
err
);
}
}
}
crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone, String::new())
.await
});
}
if config.machines.enabled {
spawn_timed(
&mut join_set,
"machines",
collect_start_time,
crate::machines::update_machines_stats(
Arc::clone(&config),
sdc.clone(),
locked_monitord_stats.clone(),
cached_machine_connections.clone(),
),
);
}
if config.dbus_stats.enabled {
spawn_timed(
&mut join_set,
"dbus_stats",
collect_start_time,
crate::dbus_stats::update_dbus_stats(
Arc::clone(&config),
sdc.clone(),
locked_machine_stats.clone(),
),
);
}
if config.boot_blame.enabled {
spawn_timed(
&mut join_set,
"boot_blame",
collect_start_time,
crate::boot::update_boot_blame_stats(
Arc::clone(&config),
sdc.clone(),
locked_machine_stats.clone(),
),
);
}
if config.verify.enabled {
spawn_timed(
&mut join_set,
"verify",
collect_start_time,
crate::verify::update_verify_stats(
sdc.clone(),
locked_machine_stats.clone(),
config.verify.allowlist.clone(),
config.verify.blocklist.clone(),
),
);
}
if join_set.len() == 1 {
warn!("No collectors except systemd version scheduled to run. Exiting");
}
had_error = false;
let mut timings: Vec<CollectorTiming> = Vec::new();
while let Some(res) = join_set.join_next().await {
match res {
Ok((name, collector_result, start_offset, elapsed)) => {
let success = collector_result.is_ok();
if let Err(e) = collector_result {
had_error = true;
error!("Collector '{}' failure: {:?}", name, e);
}
timings.push(CollectorTiming {
name,
start_offset_ms: start_offset.as_secs_f64() * 1000.0,
elapsed_ms: elapsed.as_secs_f64() * 1000.0,
success,
});
}
Err(e) => {
had_error = true;
error!("Join error: {:?}", e);
}
}
}
let elapsed_runtime = collect_start_time.elapsed();
let elapsed_runtime_ms = elapsed_runtime.as_millis();
timings.sort_by(|a, b| {
b.elapsed_ms
.partial_cmp(&a.elapsed_ms)
.unwrap_or(std::cmp::Ordering::Equal)
});
for t in &timings {
debug!(
"collector '{}' start_offset={:.1}ms elapsed={:.1}ms{}",
t.name,
t.start_offset_ms,
t.elapsed_ms,
if t.success { "" } else { " (FAILED)" },
);
}
{
let mut monitord_stats = locked_monitord_stats.write().await;
let machine_stats = locked_machine_stats.read().await;
monitord_stats.pid1 = machine_stats.pid1.clone();
monitord_stats.networkd = machine_stats.networkd.clone();
monitord_stats.system_state = machine_stats.system_state;
monitord_stats.version = machine_stats.version.clone();
monitord_stats.units = machine_stats.units.clone();
monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
monitord_stats.boot_blame = machine_stats.boot_blame.clone();
monitord_stats.verify_stats = machine_stats.verify_stats.clone();
set_stat_collection_run_time(&mut monitord_stats, elapsed_runtime);
monitord_stats.collector_timings = timings;
}
info!("stat collection run took {}ms", elapsed_runtime_ms);
if output_stats {
let monitord_stats = locked_monitord_stats.read().await;
print_stats(
&config.monitord.key_prefix,
&config.monitord.output_format,
&monitord_stats,
);
}
if !config.monitord.daemon {
break;
}
let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
tokio::time::sleep(Duration::from_millis(
sleep_time_ms
.try_into()
.expect("Sleep time does not fit into a u64 :O"),
))
.await;
}
Ok(if had_error { None } else { Some(sdc) })
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stat_collection_run_time_ms_conversion() {
let mut stats = MonitordStats::default();
set_stat_collection_run_time(&mut stats, Duration::from_millis(5));
assert_eq!(stats.stat_collection_run_time_ms, 5.0);
set_stat_collection_run_time(&mut stats, Duration::from_micros(500));
assert!((stats.stat_collection_run_time_ms - 0.5).abs() < f64::EPSILON);
}
}