monitord 0.18.0

monitord ... know how happy your systemd is! 😊
Documentation
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use thiserror::Error;
use tokio::sync::RwLock;
use tracing::{debug, error};

use crate::MachineStats;
use crate::MonitordStats;

#[derive(Error, Debug)]
pub enum MonitordMachinesError {
    #[error("Machines D-Bus error: {0}")]
    ZbusError(#[from] zbus::Error),
}

pub fn filter_machines(
    machines: Vec<crate::dbus::zbus_machines::ListedMachine>,
    allowlist: &HashSet<String>,
    blocklist: &HashSet<String>,
) -> Vec<crate::dbus::zbus_machines::ListedMachine> {
    machines
        .into_iter()
        .filter(|c| c.class == "container")
        .filter(|c| !blocklist.contains(&c.name))
        .filter(|c| allowlist.is_empty() || allowlist.contains(&c.name))
        .collect()
}

pub async fn get_machines(
    connection: &zbus::Connection,
    config: &crate::config::Config,
) -> Result<HashMap<String, u32>, MonitordMachinesError> {
    let c = crate::dbus::zbus_machines::ManagerProxy::new(connection).await?;
    let mut results = HashMap::<String, u32>::new();

    let machines = c.list_machines().await?;

    for machine in filter_machines(
        machines,
        &config.machines.allowlist,
        &config.machines.blocklist,
    ) {
        let m = c.get_machine(&machine.name).await?;
        let leader_pid = m.leader().await?;
        results.insert(machine.name, leader_pid);
    }

    Ok(results)
}

pub async fn update_machines_stats(
    config: Arc<crate::config::Config>,
    connection: zbus::Connection,
    locked_monitord_stats: Arc<RwLock<MonitordStats>>,
) -> anyhow::Result<()> {
    let locked_machine_stats: Arc<RwLock<MachineStats>> =
        Arc::new(RwLock::new(MachineStats::default()));

    for (machine, leader_pid) in get_machines(&connection, &config).await?.into_iter() {
        debug!(
            "Collecting container: machine: {} leader_pid: {}",
            machine, leader_pid
        );
        let container_address = format!(
            "unix:path=/proc/{}/root/run/dbus/system_bus_socket",
            leader_pid
        );
        let sdc = zbus::connection::Builder::address(container_address.as_str())?
            .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
            .build()
            .await?;
        let mut join_set = tokio::task::JoinSet::new();

        if config.pid1.enabled {
            join_set.spawn(crate::pid1::update_pid1_stats(
                leader_pid as i32,
                locked_machine_stats.clone(),
            ));
        }

        if config.networkd.enabled {
            join_set.spawn(crate::networkd::update_networkd_stats(
                config.networkd.link_state_dir.clone(),
                None,
                sdc.clone(),
                locked_machine_stats.clone(),
            ));
        }

        if config.system_state.enabled {
            join_set.spawn(crate::system::update_system_stats(
                sdc.clone(),
                locked_machine_stats.clone(),
            ));
        }

        join_set.spawn(crate::system::update_version(
            sdc.clone(),
            locked_machine_stats.clone(),
        ));

        if config.units.enabled {
            join_set.spawn(crate::units::update_unit_stats(
                Arc::clone(&config),
                sdc.clone(),
                locked_machine_stats.clone(),
            ));
        }

        if config.dbus_stats.enabled {
            join_set.spawn(crate::dbus_stats::update_dbus_stats(
                Arc::clone(&config),
                sdc.clone(),
                locked_machine_stats.clone(),
            ));
        }

        while let Some(res) = join_set.join_next().await {
            match res {
                Ok(r) => match r {
                    Ok(_) => (),
                    Err(e) => {
                        error!(
                            "Collection specific failure (container {}): {:?}",
                            machine, e
                        );
                    }
                },
                Err(e) => {
                    error!("Join error (container {}): {:?}", machine, e);
                }
            }
        }

        {
            let mut monitord_stats = locked_monitord_stats.write().await;
            let machine_stats = locked_machine_stats.read().await;
            monitord_stats
                .machines
                .insert(machine, machine_stats.clone());
        }
    }

    Ok(())
}

#[cfg(test)]
mod tests {
    use std::collections::HashSet;
    use zbus::zvariant::OwnedObjectPath;

    #[test]
    fn test_filter_machines() {
        let machines = vec![
            crate::dbus::zbus_machines::ListedMachine {
                name: "foo".to_string(),
                class: "container".to_string(),
                service: "".to_string(),
                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
            },
            crate::dbus::zbus_machines::ListedMachine {
                name: "bar".to_string(),
                class: "container".to_string(),
                service: "".to_string(),
                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
            },
            crate::dbus::zbus_machines::ListedMachine {
                name: "baz".to_string(),
                class: "container".to_string(),
                service: "".to_string(),
                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
            },
        ];
        let allowlist = HashSet::from(["foo".to_string(), "baz".to_string()]);
        let blocklist = HashSet::from(["bar".to_string()]);

        let filtered = super::filter_machines(machines, &allowlist, &blocklist);

        assert_eq!(filtered.len(), 2);
        assert_eq!(filtered[0].name, "foo");
        assert_eq!(filtered[1].name, "baz");
    }
}