use crate::systemd::dbus_manager;
use crate::systemd::dbus_unit;
use crate::systemd::UnitManager;
use crate::systemd::UnitStatus;
use crate::error::Error;
use std::str::FromStr;
use std::sync;
use std::thread;
use std::{collections::HashSet, result::Result, time::Duration};
use zbus::zvariant::OwnedObjectPath;
use super::SystemStatus;
pub struct DbusServiceManager<'a> {
connection: &'a zbus::blocking::Connection,
proxy: dbus_manager::ManagerProxy<'a>,
}
pub struct DbusUnitManager<'a> {
proxy: dbus_unit::UnitProxy<'a>,
}
pub struct DbusUnitStatus {
name: String,
description: String,
active_state: String,
address: OwnedObjectPath,
}
impl UnitStatus for DbusUnitStatus {
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
&self.description
}
fn active_state(&self) -> &str {
&self.active_state
}
}
#[derive(Debug)]
pub struct UnitFile {
pub unit_name: String,
pub status: String,
}
type UnitStatusTuple = (
String,
String,
String,
String,
String,
String,
OwnedObjectPath,
u32,
String,
OwnedObjectPath,
);
#[derive(Hash, Eq, PartialEq)]
pub struct Job {
path: OwnedObjectPath,
}
pub struct DbusJobSet<'a> {
manager: &'a DbusServiceManager<'a>,
job_removed_stream: Option<dbus_manager::JobRemovedIterator>,
jobs: HashSet<Job>,
}
impl<'a> DbusJobSet<'a> {
fn new(manager: &'a DbusServiceManager<'a>) -> Result<DbusJobSet<'a>, Error> {
let job_removed_stream = Some(manager.proxy.receive_job_removed()?);
Ok(DbusJobSet {
manager,
job_removed_stream,
jobs: HashSet::new(),
})
}
}
impl super::JobSet for DbusJobSet<'_> {
fn reload_unit(&mut self, unit_name: &str) -> Result<(), Error> {
let path = self.manager.proxy.reload_unit(unit_name, "replace")?;
self.jobs.insert(Job { path });
Ok(())
}
fn restart_unit(&mut self, unit_name: &str) -> Result<(), Error> {
let path = self.manager.proxy.restart_unit(unit_name, "replace")?;
self.jobs.insert(Job { path });
Ok(())
}
fn start_unit(&mut self, unit_name: &str) -> Result<(), Error> {
let path = self.manager.proxy.start_unit(unit_name, "replace")?;
self.jobs.insert(Job { path });
Ok(())
}
fn stop_unit(&mut self, unit_name: &str) -> Result<(), Error> {
let path = self.manager.proxy.stop_unit(unit_name, "replace")?;
self.jobs.insert(Job { path });
Ok(())
}
fn wait_for_all<F>(&mut self, job_handler: F, timeout: Duration) -> Result<(), Error>
where
F: Fn(&str, &str) + Send + 'static,
{
if self.jobs.is_empty() {
return Ok(());
}
if let Some(job_removed_stream) = self.job_removed_stream.take() {
let (tx, rx) = sync::mpsc::channel();
thread::scope(|s| {
let _: thread::ScopedJoinHandle<Result<(), Error>> = s.spawn(move || {
for job_removed in job_removed_stream {
let job = Job {
path: OwnedObjectPath::from(job_removed.args()?.job().clone()),
};
self.jobs.remove(&job);
if let Ok(args) = job_removed.args() {
job_handler(args.unit, args.result);
}
if self.jobs.is_empty() {
break;
}
}
let _ = tx.send(());
Ok(())
});
rx.recv_timeout(timeout)
.map_err(|e| Error::SdSwitch(e.to_string()))
})
} else {
Ok(())
}
}
}
impl Drop for DbusServiceManager<'_> {
fn drop(&mut self) {
if let Err(err) = self.proxy.unsubscribe() {
eprintln!("Error unsubscribing from proxy: {err}");
}
}
}
impl<'a> DbusServiceManager<'a> {
pub fn new(
connection: &'a zbus::blocking::Connection,
) -> Result<DbusServiceManager<'a>, Error> {
let proxy = dbus_manager::ManagerProxy::new(connection)?;
proxy.subscribe()?;
Ok(DbusServiceManager { connection, proxy })
}
}
fn to_unit_status(t: UnitStatusTuple) -> DbusUnitStatus {
DbusUnitStatus {
name: t.0,
description: t.1,
active_state: t.3,
address: t.6,
}
}
impl<'a> super::ServiceManager for &'a DbusServiceManager<'a> {
type UnitManager = DbusUnitManager<'a>;
type UnitStatus = DbusUnitStatus;
type JobSet = DbusJobSet<'a>;
fn system_status(&self) -> Result<SystemStatus, Error> {
let state = &self.proxy.system_state()?;
SystemStatus::from_str(state)
}
fn daemon_reload(&self) -> Result<(), Error> {
let reloading = self.proxy.receive_reloading()?;
self.proxy.reload()?;
for result in reloading {
match result.args() {
Ok(args) if args.active => break,
Ok(_) => {}
Err(err) => {
eprintln!("Error listening for reload signal: {err}. Assuming done.");
break;
}
}
}
Ok(())
}
fn reset_failed(&self) -> Result<(), Error> {
self.proxy.reset_failed()?;
Ok(())
}
fn unit_manager(&self, status: &DbusUnitStatus) -> Result<DbusUnitManager<'a>, Error> {
let proxy = dbus_unit::UnitProxy::builder(self.connection)
.path(status.address.clone())?
.build()?;
Ok(DbusUnitManager { proxy })
}
fn new_job_set(&self) -> Result<DbusJobSet<'a>, Error> {
DbusJobSet::new(self)
}
fn list_units_by_states(&self, states: &[&str]) -> Result<Vec<DbusUnitStatus>, Error> {
let result = self
.proxy
.list_units_by_patterns(states, &[])?
.drain(..)
.map(to_unit_status)
.collect();
Ok(result)
}
}
impl UnitManager for DbusUnitManager<'_> {
fn refuse_manual_start(&self) -> Result<bool, Error> {
Ok(self.proxy.refuse_manual_start()?)
}
fn refuse_manual_stop(&self) -> Result<bool, Error> {
Ok(self.proxy.refuse_manual_stop()?)
}
}