sd_switch/systemd/
dbus.rs

1use crate::systemd::dbus_manager;
2use crate::systemd::dbus_unit;
3use crate::systemd::UnitManager;
4use crate::systemd::UnitStatus;
5
6use crate::error::Error;
7use std::str::FromStr;
8use std::sync;
9use std::thread;
10use std::{collections::HashSet, result::Result, time::Duration};
11use zbus::zvariant::OwnedObjectPath;
12
13use super::SystemStatus;
14
15pub struct DbusServiceManager<'a> {
16    connection: &'a zbus::blocking::Connection,
17    proxy: dbus_manager::ManagerProxy<'a>,
18}
19
20pub struct DbusUnitManager<'a> {
21    proxy: dbus_unit::UnitProxy<'a>,
22}
23
24pub struct DbusUnitStatus {
25    name: String,
26    description: String,
27    active_state: String,
28    address: OwnedObjectPath,
29}
30
31impl UnitStatus for DbusUnitStatus {
32    fn name(&self) -> &str {
33        &self.name
34    }
35
36    fn description(&self) -> &str {
37        &self.description
38    }
39
40    fn active_state(&self) -> &str {
41        &self.active_state
42    }
43}
44
45#[derive(Debug)]
46pub struct UnitFile {
47    pub unit_name: String,
48    pub status: String,
49}
50
51/// A tuple representation of `UnitStatus` for use in the dbus API.
52type UnitStatusTuple = (
53    String,
54    String,
55    String,
56    String,
57    String,
58    String,
59    OwnedObjectPath,
60    u32,
61    String,
62    OwnedObjectPath,
63);
64
65#[derive(Hash, Eq, PartialEq)]
66pub struct Job {
67    path: OwnedObjectPath,
68}
69
70pub struct DbusJobSet<'a> {
71    manager: &'a DbusServiceManager<'a>,
72    job_removed_stream: Option<dbus_manager::JobRemovedIterator>,
73    jobs: HashSet<Job>,
74}
75
76impl<'a> DbusJobSet<'a> {
77    fn new(manager: &'a DbusServiceManager<'a>) -> Result<DbusJobSet<'a>, Error> {
78        let job_removed_stream = Some(manager.proxy.receive_job_removed()?);
79        Ok(DbusJobSet {
80            manager,
81            job_removed_stream,
82            jobs: HashSet::new(),
83        })
84    }
85}
86
87impl super::JobSet for DbusJobSet<'_> {
88    fn reload_unit(&mut self, unit_name: &str) -> Result<(), Error> {
89        let path = self.manager.proxy.reload_unit(unit_name, "replace")?;
90        self.jobs.insert(Job { path });
91        Ok(())
92    }
93
94    fn restart_unit(&mut self, unit_name: &str) -> Result<(), Error> {
95        let path = self.manager.proxy.restart_unit(unit_name, "replace")?;
96        self.jobs.insert(Job { path });
97        Ok(())
98    }
99
100    fn start_unit(&mut self, unit_name: &str) -> Result<(), Error> {
101        let path = self.manager.proxy.start_unit(unit_name, "replace")?;
102        self.jobs.insert(Job { path });
103        Ok(())
104    }
105
106    fn stop_unit(&mut self, unit_name: &str) -> Result<(), Error> {
107        let path = self.manager.proxy.stop_unit(unit_name, "replace")?;
108        self.jobs.insert(Job { path });
109        Ok(())
110    }
111
112    fn wait_for_all<F>(&mut self, job_handler: F, timeout: Duration) -> Result<(), Error>
113    where
114        F: Fn(&str, &str) + Send + 'static,
115    {
116        if self.jobs.is_empty() {
117            return Ok(());
118        }
119
120        if let Some(job_removed_stream) = self.job_removed_stream.take() {
121            let (tx, rx) = sync::mpsc::channel();
122
123            thread::scope(|s| {
124                let _: thread::ScopedJoinHandle<Result<(), Error>> = s.spawn(move || {
125                    for job_removed in job_removed_stream {
126                        let job = Job {
127                            path: OwnedObjectPath::from(job_removed.args()?.job().clone()),
128                        };
129                        self.jobs.remove(&job);
130
131                        if let Ok(args) = job_removed.args() {
132                            job_handler(args.unit, args.result);
133                        }
134
135                        if self.jobs.is_empty() {
136                            break;
137                        }
138                    }
139
140                    let _ = tx.send(());
141
142                    Ok(())
143                });
144
145                rx.recv_timeout(timeout)
146                    .map_err(|e| Error::SdSwitch(e.to_string()))
147            })
148        } else {
149            Ok(())
150        }
151    }
152}
153
154impl Drop for DbusServiceManager<'_> {
155    fn drop(&mut self) {
156        if let Err(err) = self.proxy.unsubscribe() {
157            eprintln!("Error unsubscribing from proxy: {err}");
158        }
159    }
160}
161
162impl<'a> DbusServiceManager<'a> {
163    pub fn new(
164        connection: &'a zbus::blocking::Connection,
165    ) -> Result<DbusServiceManager<'a>, Error> {
166        let proxy = dbus_manager::ManagerProxy::new(connection)?;
167
168        proxy.subscribe()?;
169
170        Ok(DbusServiceManager { connection, proxy })
171    }
172}
173
174fn to_unit_status(t: UnitStatusTuple) -> DbusUnitStatus {
175    DbusUnitStatus {
176        name: t.0,
177        description: t.1,
178        active_state: t.3,
179        address: t.6,
180    }
181}
182
183impl<'a> super::ServiceManager for &'a DbusServiceManager<'a> {
184    type UnitManager = DbusUnitManager<'a>;
185    type UnitStatus = DbusUnitStatus;
186    type JobSet = DbusJobSet<'a>;
187
188    fn system_status(&self) -> Result<SystemStatus, Error> {
189        let state = &self.proxy.system_state()?;
190        SystemStatus::from_str(state)
191    }
192
193    /// Performs a systemd daemon reload, blocking until complete.
194    fn daemon_reload(&self) -> Result<(), Error> {
195        let reloading = self.proxy.receive_reloading()?;
196
197        self.proxy.reload()?;
198
199        for result in reloading {
200            match result.args() {
201                Ok(args) if args.active => break,
202                Ok(_) => {}
203                Err(err) => {
204                    eprintln!("Error listening for reload signal: {err}. Assuming done.");
205                    break;
206                }
207            }
208        }
209
210        Ok(())
211    }
212
213    fn reset_failed(&self) -> Result<(), Error> {
214        self.proxy.reset_failed()?;
215        Ok(())
216    }
217
218    /// Builds a unit manager for the unit with the given status.
219    fn unit_manager(&self, status: &DbusUnitStatus) -> Result<DbusUnitManager<'a>, Error> {
220        let proxy = dbus_unit::UnitProxy::builder(self.connection)
221            .path(status.address.clone())?
222            .build()?;
223
224        Ok(DbusUnitManager { proxy })
225    }
226
227    fn new_job_set(&self) -> Result<DbusJobSet<'a>, Error> {
228        DbusJobSet::new(self)
229    }
230
231    fn list_units_by_states(&self, states: &[&str]) -> Result<Vec<DbusUnitStatus>, Error> {
232        let result = self
233            .proxy
234            .list_units_by_patterns(states, &[])?
235            .drain(..)
236            .map(to_unit_status)
237            .collect();
238
239        Ok(result)
240    }
241}
242
243impl UnitManager for DbusUnitManager<'_> {
244    fn refuse_manual_start(&self) -> Result<bool, Error> {
245        Ok(self.proxy.refuse_manual_start()?)
246    }
247
248    fn refuse_manual_stop(&self) -> Result<bool, Error> {
249        Ok(self.proxy.refuse_manual_stop()?)
250    }
251}