sd-switch 0.5.4

A systemd unit reload/restart utility for Home Manager
Documentation
use crate::systemd::dbus_manager;
use crate::systemd::dbus_unit;
use crate::systemd::UnitManager;
use crate::systemd::UnitStatus;

use crate::error::Error;
use std::str::FromStr;
use std::sync;
use std::thread;
use std::{collections::HashSet, result::Result, time::Duration};
use zbus::zvariant::OwnedObjectPath;

use super::SystemStatus;

pub struct DbusServiceManager<'a> {
    connection: &'a zbus::blocking::Connection,
    proxy: dbus_manager::ManagerProxy<'a>,
}

pub struct DbusUnitManager<'a> {
    proxy: dbus_unit::UnitProxy<'a>,
}

pub struct DbusUnitStatus {
    name: String,
    description: String,
    active_state: String,
    address: OwnedObjectPath,
}

impl UnitStatus for DbusUnitStatus {
    fn name(&self) -> &str {
        &self.name
    }

    fn description(&self) -> &str {
        &self.description
    }

    fn active_state(&self) -> &str {
        &self.active_state
    }
}

#[derive(Debug)]
pub struct UnitFile {
    pub unit_name: String,
    pub status: String,
}

/// A tuple representation of `UnitStatus` for use in the dbus API.
type UnitStatusTuple = (
    String,
    String,
    String,
    String,
    String,
    String,
    OwnedObjectPath,
    u32,
    String,
    OwnedObjectPath,
);

#[derive(Hash, Eq, PartialEq)]
pub struct Job {
    path: OwnedObjectPath,
}

pub struct DbusJobSet<'a> {
    manager: &'a DbusServiceManager<'a>,
    job_removed_stream: Option<dbus_manager::JobRemovedIterator>,
    jobs: HashSet<Job>,
}

impl<'a> DbusJobSet<'a> {
    fn new(manager: &'a DbusServiceManager<'a>) -> Result<DbusJobSet<'a>, Error> {
        let job_removed_stream = Some(manager.proxy.receive_job_removed()?);
        Ok(DbusJobSet {
            manager,
            job_removed_stream,
            jobs: HashSet::new(),
        })
    }
}

impl super::JobSet for DbusJobSet<'_> {
    fn reload_unit(&mut self, unit_name: &str) -> Result<(), Error> {
        let path = self.manager.proxy.reload_unit(unit_name, "replace")?;
        self.jobs.insert(Job { path });
        Ok(())
    }

    fn restart_unit(&mut self, unit_name: &str) -> Result<(), Error> {
        let path = self.manager.proxy.restart_unit(unit_name, "replace")?;
        self.jobs.insert(Job { path });
        Ok(())
    }

    fn start_unit(&mut self, unit_name: &str) -> Result<(), Error> {
        let path = self.manager.proxy.start_unit(unit_name, "replace")?;
        self.jobs.insert(Job { path });
        Ok(())
    }

    fn stop_unit(&mut self, unit_name: &str) -> Result<(), Error> {
        let path = self.manager.proxy.stop_unit(unit_name, "replace")?;
        self.jobs.insert(Job { path });
        Ok(())
    }

    fn wait_for_all<F>(&mut self, job_handler: F, timeout: Duration) -> Result<(), Error>
    where
        F: Fn(&str, &str) + Send + 'static,
    {
        if self.jobs.is_empty() {
            return Ok(());
        }

        if let Some(job_removed_stream) = self.job_removed_stream.take() {
            let (tx, rx) = sync::mpsc::channel();

            thread::scope(|s| {
                let _: thread::ScopedJoinHandle<Result<(), Error>> = s.spawn(move || {
                    for job_removed in job_removed_stream {
                        let job = Job {
                            path: OwnedObjectPath::from(job_removed.args()?.job().clone()),
                        };
                        self.jobs.remove(&job);

                        if let Ok(args) = job_removed.args() {
                            job_handler(args.unit, args.result);
                        }

                        if self.jobs.is_empty() {
                            break;
                        }
                    }

                    let _ = tx.send(());

                    Ok(())
                });

                rx.recv_timeout(timeout)
                    .map_err(|e| Error::SdSwitch(e.to_string()))
            })
        } else {
            Ok(())
        }
    }
}

impl Drop for DbusServiceManager<'_> {
    fn drop(&mut self) {
        if let Err(err) = self.proxy.unsubscribe() {
            eprintln!("Error unsubscribing from proxy: {err}");
        }
    }
}

impl<'a> DbusServiceManager<'a> {
    pub fn new(
        connection: &'a zbus::blocking::Connection,
    ) -> Result<DbusServiceManager<'a>, Error> {
        let proxy = dbus_manager::ManagerProxy::new(connection)?;

        proxy.subscribe()?;

        Ok(DbusServiceManager { connection, proxy })
    }
}

fn to_unit_status(t: UnitStatusTuple) -> DbusUnitStatus {
    DbusUnitStatus {
        name: t.0,
        description: t.1,
        active_state: t.3,
        address: t.6,
    }
}

impl<'a> super::ServiceManager for &'a DbusServiceManager<'a> {
    type UnitManager = DbusUnitManager<'a>;
    type UnitStatus = DbusUnitStatus;
    type JobSet = DbusJobSet<'a>;

    fn system_status(&self) -> Result<SystemStatus, Error> {
        let state = &self.proxy.system_state()?;
        SystemStatus::from_str(state)
    }

    /// Performs a systemd daemon reload, blocking until complete.
    fn daemon_reload(&self) -> Result<(), Error> {
        let reloading = self.proxy.receive_reloading()?;

        self.proxy.reload()?;

        for result in reloading {
            match result.args() {
                Ok(args) if args.active => break,
                Ok(_) => {}
                Err(err) => {
                    eprintln!("Error listening for reload signal: {err}. Assuming done.");
                    break;
                }
            }
        }

        Ok(())
    }

    fn reset_failed(&self) -> Result<(), Error> {
        self.proxy.reset_failed()?;
        Ok(())
    }

    /// Builds a unit manager for the unit with the given status.
    fn unit_manager(&self, status: &DbusUnitStatus) -> Result<DbusUnitManager<'a>, Error> {
        let proxy = dbus_unit::UnitProxy::builder(self.connection)
            .path(status.address.clone())?
            .build()?;

        Ok(DbusUnitManager { proxy })
    }

    fn new_job_set(&self) -> Result<DbusJobSet<'a>, Error> {
        DbusJobSet::new(self)
    }

    fn list_units_by_states(&self, states: &[&str]) -> Result<Vec<DbusUnitStatus>, Error> {
        let result = self
            .proxy
            .list_units_by_patterns(states, &[])?
            .drain(..)
            .map(to_unit_status)
            .collect();

        Ok(result)
    }
}

impl UnitManager for DbusUnitManager<'_> {
    fn refuse_manual_start(&self) -> Result<bool, Error> {
        Ok(self.proxy.refuse_manual_start()?)
    }

    fn refuse_manual_stop(&self) -> Result<bool, Error> {
        Ok(self.proxy.refuse_manual_stop()?)
    }
}