use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use struct_field_names_as_array::FieldNamesAsArray;
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::debug;
use tracing::error;
use tracing::warn;
use zbus::zvariant::ObjectPath;
use zbus::zvariant::OwnedObjectPath;
#[derive(Error, Debug)]
pub enum MonitordUnitsError {
#[error("Units D-Bus error: {0}")]
ZbusError(#[from] zbus::Error),
#[error("Integer conversion error: {0}")]
IntConversion(#[from] std::num::TryFromIntError),
#[error("System time error: {0}")]
SystemTimeError(#[from] std::time::SystemTimeError),
}
use crate::timer::TimerStats;
use crate::MachineStats;
pub use crate::unit_constants::is_unit_unhealthy;
pub use crate::unit_constants::SystemdUnitActiveState;
pub use crate::unit_constants::SystemdUnitLoadState;
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
pub struct UnitsCollectionTimings {
pub list_units_ms: f64,
pub unit_files_ms: f64,
pub per_unit_loop_ms: f64,
pub timer_dbus_fetches: u64,
pub state_dbus_fetches: u64,
pub service_dbus_fetches: u64,
}
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
pub struct UnitFilesScope {
pub generated: HashMap<String, u64>,
pub transient: HashMap<String, u64>,
}
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
pub struct UnitFilesStats {
pub root: UnitFilesScope,
pub user: UnitFilesScope,
}
#[derive(
serde::Serialize, serde::Deserialize, Clone, Debug, Default, FieldNamesAsArray, PartialEq,
)]
pub struct SystemdUnitStats {
pub activating_units: u64,
pub active_units: u64,
pub automount_units: u64,
pub device_units: u64,
pub failed_units: u64,
pub inactive_units: u64,
pub jobs_queued: u64,
pub loaded_units: u64,
pub masked_units: u64,
pub mount_units: u64,
pub not_found_units: u64,
pub path_units: u64,
pub scope_units: u64,
pub service_units: u64,
pub slice_units: u64,
pub socket_units: u64,
pub target_units: u64,
pub timer_units: u64,
pub timer_persistent_units: u64,
pub timer_remain_after_elapse: u64,
pub total_units: u64,
pub unit_files: UnitFilesStats,
pub service_stats: HashMap<String, ServiceStats>,
pub timer_stats: HashMap<String, TimerStats>,
pub unit_states: HashMap<String, UnitStates>,
pub collection_timings: UnitsCollectionTimings,
}
#[derive(
serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
)]
pub struct ServiceStats {
pub active_enter_timestamp: u64,
pub active_exit_timestamp: u64,
pub cpuusage_nsec: u64,
pub inactive_exit_timestamp: u64,
pub ioread_bytes: u64,
pub ioread_operations: u64,
pub memory_available: u64,
pub memory_current: u64,
pub nrestarts: u32,
pub processes: u32,
pub restart_usec: u64,
pub state_change_timestamp: u64,
pub status_errno: i32,
pub tasks_current: u64,
pub timeout_clean_usec: u64,
pub watchdog_usec: u64,
}
#[derive(
serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
)]
pub struct UnitStates {
pub active_state: SystemdUnitActiveState,
pub load_state: SystemdUnitLoadState,
pub unhealthy: bool,
pub time_in_state_usecs: Option<u64>,
}
#[derive(Debug)]
pub struct ListedUnit {
pub name: String, pub description: String, pub load_state: String, pub active_state: String, pub sub_state: String, pub follow_unit: String, pub unit_object_path: OwnedObjectPath, pub job_id: u32, pub job_type: String, pub job_object_path: OwnedObjectPath, }
impl
From<(
String,
String,
String,
String,
String,
String,
OwnedObjectPath,
u32,
String,
OwnedObjectPath,
)> for ListedUnit
{
fn from(
tuple: (
String,
String,
String,
String,
String,
String,
OwnedObjectPath,
u32,
String,
OwnedObjectPath,
),
) -> Self {
ListedUnit {
name: tuple.0,
description: tuple.1,
load_state: tuple.2,
active_state: tuple.3,
sub_state: tuple.4,
follow_unit: tuple.5,
unit_object_path: tuple.6,
job_id: tuple.7,
job_type: tuple.8,
job_object_path: tuple.9,
}
}
}
pub const SERVICE_FIELD_NAMES: &[&str] = &ServiceStats::FIELD_NAMES_AS_ARRAY;
pub const UNIT_FIELD_NAMES: &[&str] = &SystemdUnitStats::FIELD_NAMES_AS_ARRAY;
pub const UNIT_STATES_FIELD_NAMES: &[&str] = &UnitStates::FIELD_NAMES_AS_ARRAY;
async fn parse_service(
connection: &zbus::Connection,
name: &str,
object_path: &OwnedObjectPath,
) -> Result<ServiceStats, MonitordUnitsError> {
debug!("Parsing service {} stats", name);
let sp = crate::dbus::zbus_service::ServiceProxy::builder(connection)
.cache_properties(zbus::proxy::CacheProperties::No)
.path(object_path.clone())?
.build()
.await?;
let up = crate::dbus::zbus_unit::UnitProxy::builder(connection)
.cache_properties(zbus::proxy::CacheProperties::No)
.path(object_path.clone())?
.build()
.await?;
let (
active_enter_timestamp,
active_exit_timestamp,
cpuusage_nsec,
inactive_exit_timestamp,
ioread_bytes,
ioread_operations,
memory_current,
memory_available,
nrestarts,
processes,
restart_usec,
state_change_timestamp,
status_errno,
tasks_current,
timeout_clean_usec,
watchdog_usec,
) = tokio::join!(
up.active_enter_timestamp(),
up.active_exit_timestamp(),
sp.cpuusage_nsec(),
up.inactive_exit_timestamp(),
sp.ioread_bytes(),
sp.ioread_operations(),
sp.memory_current(),
sp.memory_available(),
sp.nrestarts(),
sp.get_processes(),
sp.restart_usec(),
up.state_change_timestamp(),
sp.status_errno(),
sp.tasks_current(),
sp.timeout_clean_usec(),
sp.watchdog_usec(),
);
Ok(ServiceStats {
active_enter_timestamp: active_enter_timestamp?,
active_exit_timestamp: active_exit_timestamp?,
cpuusage_nsec: cpuusage_nsec?,
inactive_exit_timestamp: inactive_exit_timestamp?,
ioread_bytes: ioread_bytes?,
ioread_operations: ioread_operations?,
memory_current: memory_current?,
memory_available: memory_available?,
nrestarts: nrestarts?,
processes: processes?.len().try_into()?,
restart_usec: restart_usec?,
state_change_timestamp: state_change_timestamp?,
status_errno: status_errno?,
tasks_current: tasks_current?,
timeout_clean_usec: timeout_clean_usec?,
watchdog_usec: watchdog_usec?,
})
}
async fn get_time_in_state(
connection: Option<&zbus::Connection>,
unit: &ListedUnit,
) -> Result<Option<u64>, MonitordUnitsError> {
match connection {
Some(c) => {
let up = crate::dbus::zbus_unit::UnitProxy::builder(c)
.cache_properties(zbus::proxy::CacheProperties::No)
.path(ObjectPath::from(unit.unit_object_path.clone()))?
.build()
.await?;
let now: u64 = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() * 1_000_000;
let state_change_timestamp = match up.state_change_timestamp().await {
Ok(sct) => sct,
Err(err) => {
error!(
"Unable to get state_change_timestamp for {} - Setting to 0: {:?}",
&unit.name, err,
);
0
}
};
Ok(Some(now - state_change_timestamp))
}
None => {
error!("No zbus connection passed, but time_in_state_usecs enabled");
Ok(None)
}
}
}
pub async fn parse_state(
stats: &mut SystemdUnitStats,
unit: &ListedUnit,
config: &crate::config::UnitsConfig,
connection: Option<&zbus::Connection>,
) -> Result<bool, MonitordUnitsError> {
if config.state_stats_blocklist.contains(&unit.name) {
debug!("Skipping state stats for {} due to blocklist", &unit.name);
return Ok(false);
}
if !config.state_stats_allowlist.is_empty()
&& !config.state_stats_allowlist.contains(&unit.name)
{
return Ok(false);
}
let active_state = SystemdUnitActiveState::from_str(&unit.active_state)
.unwrap_or(SystemdUnitActiveState::unknown);
let load_state = SystemdUnitLoadState::from_str(&unit.load_state.replace('-', "_"))
.unwrap_or(SystemdUnitLoadState::unknown);
let mut time_in_state_usecs: Option<u64> = None;
let mut did_dbus_fetch = false;
if config.state_stats_time_in_state {
time_in_state_usecs = get_time_in_state(connection, unit).await?;
did_dbus_fetch = connection.is_some();
}
stats.unit_states.insert(
unit.name.clone(),
UnitStates {
active_state,
load_state,
unhealthy: is_unit_unhealthy(active_state, load_state),
time_in_state_usecs,
},
);
Ok(did_dbus_fetch)
}
fn parse_unit(stats: &mut SystemdUnitStats, unit: &ListedUnit) {
match unit.name.rsplit('.').next() {
Some("automount") => stats.automount_units += 1,
Some("device") => stats.device_units += 1,
Some("mount") => stats.mount_units += 1,
Some("path") => stats.path_units += 1,
Some("scope") => stats.scope_units += 1,
Some("service") => stats.service_units += 1,
Some("slice") => stats.slice_units += 1,
Some("socket") => stats.socket_units += 1,
Some("target") => stats.target_units += 1,
Some("timer") => stats.timer_units += 1,
unknown => debug!("Found unhandled '{:?}' unit type", unknown),
};
match unit.load_state.as_str() {
"loaded" => stats.loaded_units += 1,
"masked" => stats.masked_units += 1,
"not-found" => stats.not_found_units += 1,
_ => debug!("{} is not loaded. It's {}", unit.name, unit.load_state),
};
match unit.active_state.as_str() {
"activating" => stats.activating_units += 1,
"active" => stats.active_units += 1,
"failed" => stats.failed_units += 1,
"inactive" => stats.inactive_units += 1,
unknown => debug!("Found unhandled '{}' unit state", unknown),
};
if unit.job_id != 0 {
stats.jobs_queued += 1;
}
}
const TRANSIENT_DIR: &str = "/run/systemd/transient";
async fn count_unit_files_by_type(path: &str) -> HashMap<String, u64> {
let mut dir = match tokio::fs::read_dir(path).await {
Ok(d) => d,
Err(err) => {
debug!("Unable to read {}: {:?}", path, err);
return HashMap::new();
}
};
let mut counts = HashMap::new();
loop {
match dir.next_entry().await {
Ok(Some(entry)) => {
let file_type = match entry.file_type().await {
Ok(ft) => ft,
Err(_) => continue,
};
if !file_type.is_file() {
continue;
}
let name = entry.file_name();
let unit_type = name
.to_str()
.and_then(|n| n.rsplit('.').next())
.unwrap_or("unknown");
*counts.entry(unit_type.to_string()).or_insert(0) += 1;
}
Ok(None) => break,
Err(err) => {
warn!("Error reading entry in {}: {:?}", path, err);
continue;
}
}
}
counts
}
fn merge_counts(target: &mut HashMap<String, u64>, source: HashMap<String, u64>) {
for (unit_type, count) in source {
*target.entry(unit_type).or_insert(0) += count;
}
}
async fn enumerate_user_transient_dirs(fs_root: &str) -> Vec<String> {
let user_dir = format!("{fs_root}/run/user");
match tokio::fs::read_dir(&user_dir).await {
Ok(mut entries) => {
let mut dirs = Vec::new();
loop {
match entries.next_entry().await {
Ok(Some(entry)) => {
dirs.push(format!("{}/systemd/transient", entry.path().display()));
}
Ok(None) => break,
Err(err) => {
warn!("Error reading entry in {}: {:?}", user_dir, err);
continue;
}
}
}
dirs
}
Err(err) => {
debug!("Unable to read {}: {:?}", user_dir, err);
Vec::new()
}
}
}
pub async fn collect_unit_files_stats(fs_root: &str) -> UnitFilesStats {
let gen_path = format!("{fs_root}/run/systemd/generator");
let gen_early_path = format!("{fs_root}/run/systemd/generator.early");
let gen_late_path = format!("{fs_root}/run/systemd/generator.late");
let transient_path = format!("{fs_root}{TRANSIENT_DIR}");
let (gen, gen_early, gen_late, root_transient, user_dirs) = tokio::join!(
count_unit_files_by_type(&gen_path),
count_unit_files_by_type(&gen_early_path),
count_unit_files_by_type(&gen_late_path),
count_unit_files_by_type(&transient_path),
enumerate_user_transient_dirs(fs_root),
);
let mut root_generated = HashMap::new();
merge_counts(&mut root_generated, gen);
merge_counts(&mut root_generated, gen_early);
merge_counts(&mut root_generated, gen_late);
let user_transient_counts =
futures_util::future::join_all(user_dirs.iter().map(|d| count_unit_files_by_type(d))).await;
let mut user_transient = HashMap::new();
for counts in user_transient_counts {
merge_counts(&mut user_transient, counts);
}
UnitFilesStats {
root: UnitFilesScope {
generated: root_generated,
transient: root_transient,
},
user: UnitFilesScope {
generated: HashMap::new(),
transient: user_transient,
},
}
}
pub async fn parse_unit_state(
config: &crate::config::Config,
connection: &zbus::Connection,
fs_root: &str,
) -> Result<SystemdUnitStats, MonitordUnitsError> {
if !config.units.state_stats_allowlist.is_empty() {
debug!(
"Using unit state allowlist: {:?}",
config.units.state_stats_allowlist
);
}
if !config.units.state_stats_blocklist.is_empty() {
debug!(
"Using unit state blocklist: {:?}",
config.units.state_stats_blocklist,
);
}
let mut stats = SystemdUnitStats::default();
let p = crate::dbus::zbus_systemd::ManagerProxy::builder(connection)
.cache_properties(zbus::proxy::CacheProperties::No)
.build()
.await?;
let (unit_files_result, units_result) = tokio::join!(
async {
let start = Instant::now();
let files = if config.units.unit_files {
collect_unit_files_stats(fs_root).await
} else {
UnitFilesStats::default()
};
(files, start.elapsed().as_secs_f64() * 1000.0)
},
async {
let start = Instant::now();
let units = p.list_units().await;
(units, start.elapsed().as_secs_f64() * 1000.0)
},
);
let (unit_files, unit_files_ms) = unit_files_result;
let (units_result, list_units_ms) = units_result;
stats.collection_timings.unit_files_ms = unit_files_ms;
stats.collection_timings.list_units_ms = list_units_ms;
stats.unit_files = unit_files;
let units = units_result?;
stats.total_units = units.len() as u64;
let per_unit_loop_start = Instant::now();
let mut state_dbus_fetches: u64 = 0;
let mut service_dbus_fetches: u64 = 0;
let mut timer_dbus_fetches: u64 = 0;
for unit_raw in units {
let unit: ListedUnit = unit_raw.into();
parse_unit(&mut stats, &unit);
if config.units.state_stats {
let did_dbus_fetch =
parse_state(&mut stats, &unit, &config.units, Some(connection)).await?;
if did_dbus_fetch {
state_dbus_fetches += 1;
}
}
if config.services.contains(&unit.name) {
debug!("Collecting service stats for {:?}", &unit);
match parse_service(connection, &unit.name, &unit.unit_object_path).await {
Ok(service_stats) => {
stats.service_stats.insert(unit.name.clone(), service_stats);
service_dbus_fetches += 1;
}
Err(err) => error!(
"Unable to get service stats for {} {}: {:#?}",
&unit.name, &unit.unit_object_path, err
),
}
}
if config.timers.enabled && unit.name.contains(".timer") {
if config.timers.blocklist.contains(&unit.name) {
debug!("Skipping timer stats for {} due to blocklist", &unit.name);
continue;
}
if !config.timers.allowlist.is_empty() && !config.timers.allowlist.contains(&unit.name)
{
continue;
}
let timer_stats: Option<TimerStats> =
match crate::timer::collect_timer_stats(connection, &mut stats, &unit).await {
Ok(ts) => {
timer_dbus_fetches += 1;
Some(ts)
}
Err(err) => {
error!("Failed to get {} stats: {:#?}", &unit.name, err);
None
}
};
if let Some(ts) = timer_stats {
stats.timer_stats.insert(unit.name.clone(), ts);
}
}
}
let per_unit_loop_elapsed = per_unit_loop_start.elapsed();
stats.collection_timings.per_unit_loop_ms = per_unit_loop_elapsed.as_secs_f64() * 1000.0;
stats.collection_timings.state_dbus_fetches = state_dbus_fetches;
stats.collection_timings.service_dbus_fetches = service_dbus_fetches;
stats.collection_timings.timer_dbus_fetches = timer_dbus_fetches;
debug!("unit stats: {:?}", stats);
Ok(stats)
}
pub async fn update_unit_stats(
config: Arc<crate::config::Config>,
connection: zbus::Connection,
locked_machine_stats: Arc<RwLock<MachineStats>>,
fs_root: String,
) -> anyhow::Result<()> {
let mut machine_stats = locked_machine_stats.write().await;
match parse_unit_state(&config, &connection, &fs_root).await {
Ok(units_stats) => machine_stats.units = units_stats,
Err(err) => error!("units stats failed: {:?}", err),
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use strum::IntoEnumIterator;
fn get_unit_file() -> ListedUnit {
ListedUnit {
name: String::from("apport-autoreport.timer"),
description: String::from(
"Process error reports when automatic reporting is enabled (timer based)",
),
load_state: String::from("loaded"),
active_state: String::from("inactive"),
sub_state: String::from("dead"),
follow_unit: String::from(""),
unit_object_path: ObjectPath::try_from(
"/org/freedesktop/systemd1/unit/apport_2dautoreport_2etimer",
)
.expect("Unable to make an object path")
.into(),
job_id: 0,
job_type: String::from(""),
job_object_path: ObjectPath::try_from("/").unwrap().into(),
}
}
#[tokio::test]
async fn test_state_parse() -> Result<(), MonitordUnitsError> {
let test_unit_name = String::from("apport-autoreport.timer");
let expected_stats = SystemdUnitStats {
activating_units: 0,
active_units: 0,
automount_units: 0,
device_units: 0,
failed_units: 0,
inactive_units: 0,
jobs_queued: 0,
loaded_units: 0,
masked_units: 0,
mount_units: 0,
not_found_units: 0,
path_units: 0,
scope_units: 0,
service_units: 0,
slice_units: 0,
socket_units: 0,
target_units: 0,
timer_units: 0,
timer_persistent_units: 0,
timer_remain_after_elapse: 0,
total_units: 0,
unit_files: UnitFilesStats::default(),
service_stats: HashMap::new(),
timer_stats: HashMap::new(),
unit_states: HashMap::from([(
test_unit_name.clone(),
UnitStates {
active_state: SystemdUnitActiveState::inactive,
load_state: SystemdUnitLoadState::loaded,
unhealthy: true,
time_in_state_usecs: None,
},
)]),
collection_timings: UnitsCollectionTimings::default(),
};
let mut stats = SystemdUnitStats::default();
let systemd_unit = get_unit_file();
let mut config = crate::config::UnitsConfig::default();
let did_fetch = parse_state(&mut stats, &systemd_unit, &config, None).await?;
assert_eq!(expected_stats, stats);
assert!(!did_fetch);
config.state_stats_allowlist = HashSet::from([test_unit_name.clone()]);
let mut allowlist_stats = SystemdUnitStats::default();
let did_fetch = parse_state(&mut allowlist_stats, &systemd_unit, &config, None).await?;
assert_eq!(expected_stats, allowlist_stats);
assert!(!did_fetch);
config.state_stats_blocklist = HashSet::from([test_unit_name]);
let mut blocklist_stats = SystemdUnitStats::default();
let expected_blocklist_stats = SystemdUnitStats::default();
let did_fetch = parse_state(&mut blocklist_stats, &systemd_unit, &config, None).await?;
assert_eq!(expected_blocklist_stats, blocklist_stats);
assert!(!did_fetch);
Ok(())
}
#[test]
fn test_unit_parse() {
let expected_stats = SystemdUnitStats {
activating_units: 0,
active_units: 0,
automount_units: 0,
device_units: 0,
failed_units: 0,
inactive_units: 1,
jobs_queued: 0,
loaded_units: 1,
masked_units: 0,
mount_units: 0,
not_found_units: 0,
path_units: 0,
scope_units: 0,
service_units: 0,
slice_units: 0,
socket_units: 0,
target_units: 0,
timer_units: 1,
timer_persistent_units: 0,
timer_remain_after_elapse: 0,
total_units: 0,
unit_files: UnitFilesStats::default(),
service_stats: HashMap::new(),
timer_stats: HashMap::new(),
unit_states: HashMap::new(),
collection_timings: UnitsCollectionTimings::default(),
};
let mut stats = SystemdUnitStats::default();
let systemd_unit = get_unit_file();
parse_unit(&mut stats, &systemd_unit);
assert_eq!(expected_stats, stats);
}
#[test]
fn test_unit_parse_activating() {
let mut activating_unit = get_unit_file();
activating_unit.active_state = String::from("activating");
let mut stats = SystemdUnitStats::default();
parse_unit(&mut stats, &activating_unit);
assert_eq!(stats.activating_units, 1);
assert_eq!(stats.active_units, 0);
assert_eq!(stats.inactive_units, 0);
}
#[test]
fn test_iterators() {
assert!(SystemdUnitActiveState::iter().collect::<Vec<_>>().len() > 0);
assert!(SystemdUnitLoadState::iter().collect::<Vec<_>>().len() > 0);
}
#[tokio::test]
async fn test_count_unit_files_by_type() {
let dir = tempfile::tempdir().expect("Unable to create temp dir");
let path = dir.path();
std::fs::write(path.join("sshd.service"), "").unwrap();
std::fs::write(path.join("nginx.service"), "").unwrap();
std::fs::write(path.join("boot.mount"), "").unwrap();
std::fs::write(path.join("swap.swap"), "").unwrap();
std::fs::create_dir(path.join("multi-user.target.wants")).unwrap();
let counts = count_unit_files_by_type(path.to_str().unwrap()).await;
assert_eq!(counts.get("service"), Some(&2));
assert_eq!(counts.get("mount"), Some(&1));
assert_eq!(counts.get("swap"), Some(&1));
assert_eq!(counts.get("wants"), None);
assert_eq!(counts.len(), 3);
}
#[tokio::test]
async fn test_count_unit_files_by_type_nonexistent_dir() {
let counts = count_unit_files_by_type("/nonexistent/path").await;
assert!(counts.is_empty());
}
#[tokio::test]
async fn test_collect_unit_files_stats_with_fs_root() {
let root = tempfile::tempdir().expect("Unable to create temp dir");
let root_path = root.path();
let gen_dir = root_path.join("run/systemd/generator");
std::fs::create_dir_all(&gen_dir).unwrap();
std::fs::write(gen_dir.join("boot.mount"), "").unwrap();
std::fs::write(gen_dir.join("swap.swap"), "").unwrap();
let transient_dir = root_path.join("run/systemd/transient");
std::fs::create_dir_all(&transient_dir).unwrap();
std::fs::write(transient_dir.join("run-thing.service"), "").unwrap();
let user_transient = root_path.join("run/user/1000/systemd/transient");
std::fs::create_dir_all(&user_transient).unwrap();
std::fs::write(user_transient.join("app-code.scope"), "").unwrap();
std::fs::write(user_transient.join("app-term.scope"), "").unwrap();
let stats = collect_unit_files_stats(root_path.to_str().unwrap()).await;
assert_eq!(stats.root.generated.get("mount"), Some(&1));
assert_eq!(stats.root.generated.get("swap"), Some(&1));
assert_eq!(stats.root.transient.get("service"), Some(&1));
assert_eq!(stats.user.transient.get("scope"), Some(&2));
assert!(stats.user.generated.is_empty());
}
}