sd_switch/systemd/
dbus.rs1use crate::systemd::dbus_manager;
2use crate::systemd::dbus_unit;
3use crate::systemd::UnitManager;
4use crate::systemd::UnitStatus;
5
6use crate::error::Error;
7use std::str::FromStr;
8use std::sync;
9use std::thread;
10use std::{collections::HashSet, result::Result, time::Duration};
11use zbus::zvariant::OwnedObjectPath;
12
13use super::SystemStatus;
14
15pub struct DbusServiceManager<'a> {
16 connection: &'a zbus::blocking::Connection,
17 proxy: dbus_manager::ManagerProxy<'a>,
18}
19
20pub struct DbusUnitManager<'a> {
21 proxy: dbus_unit::UnitProxy<'a>,
22}
23
24pub struct DbusUnitStatus {
25 name: String,
26 description: String,
27 active_state: String,
28 address: OwnedObjectPath,
29}
30
31impl UnitStatus for DbusUnitStatus {
32 fn name(&self) -> &str {
33 &self.name
34 }
35
36 fn description(&self) -> &str {
37 &self.description
38 }
39
40 fn active_state(&self) -> &str {
41 &self.active_state
42 }
43}
44
45#[derive(Debug)]
46pub struct UnitFile {
47 pub unit_name: String,
48 pub status: String,
49}
50
51type UnitStatusTuple = (
53 String,
54 String,
55 String,
56 String,
57 String,
58 String,
59 OwnedObjectPath,
60 u32,
61 String,
62 OwnedObjectPath,
63);
64
65#[derive(Hash, Eq, PartialEq)]
66pub struct Job {
67 path: OwnedObjectPath,
68}
69
70pub struct DbusJobSet<'a> {
71 manager: &'a DbusServiceManager<'a>,
72 job_removed_stream: Option<dbus_manager::JobRemovedIterator>,
73 jobs: HashSet<Job>,
74}
75
76impl<'a> DbusJobSet<'a> {
77 fn new(manager: &'a DbusServiceManager<'a>) -> Result<DbusJobSet<'a>, Error> {
78 let job_removed_stream = Some(manager.proxy.receive_job_removed()?);
79 Ok(DbusJobSet {
80 manager,
81 job_removed_stream,
82 jobs: HashSet::new(),
83 })
84 }
85}
86
87impl super::JobSet for DbusJobSet<'_> {
88 fn reload_unit(&mut self, unit_name: &str) -> Result<(), Error> {
89 let path = self.manager.proxy.reload_unit(unit_name, "replace")?;
90 self.jobs.insert(Job { path });
91 Ok(())
92 }
93
94 fn restart_unit(&mut self, unit_name: &str) -> Result<(), Error> {
95 let path = self.manager.proxy.restart_unit(unit_name, "replace")?;
96 self.jobs.insert(Job { path });
97 Ok(())
98 }
99
100 fn start_unit(&mut self, unit_name: &str) -> Result<(), Error> {
101 let path = self.manager.proxy.start_unit(unit_name, "replace")?;
102 self.jobs.insert(Job { path });
103 Ok(())
104 }
105
106 fn stop_unit(&mut self, unit_name: &str) -> Result<(), Error> {
107 let path = self.manager.proxy.stop_unit(unit_name, "replace")?;
108 self.jobs.insert(Job { path });
109 Ok(())
110 }
111
112 fn wait_for_all<F>(&mut self, job_handler: F, timeout: Duration) -> Result<(), Error>
113 where
114 F: Fn(&str, &str) + Send + 'static,
115 {
116 if self.jobs.is_empty() {
117 return Ok(());
118 }
119
120 if let Some(job_removed_stream) = self.job_removed_stream.take() {
121 let (tx, rx) = sync::mpsc::channel();
122
123 thread::scope(|s| {
124 let _: thread::ScopedJoinHandle<Result<(), Error>> = s.spawn(move || {
125 for job_removed in job_removed_stream {
126 let job = Job {
127 path: OwnedObjectPath::from(job_removed.args()?.job().clone()),
128 };
129 self.jobs.remove(&job);
130
131 if let Ok(args) = job_removed.args() {
132 job_handler(args.unit, args.result);
133 }
134
135 if self.jobs.is_empty() {
136 break;
137 }
138 }
139
140 let _ = tx.send(());
141
142 Ok(())
143 });
144
145 rx.recv_timeout(timeout)
146 .map_err(|e| Error::SdSwitch(e.to_string()))
147 })
148 } else {
149 Ok(())
150 }
151 }
152}
153
154impl Drop for DbusServiceManager<'_> {
155 fn drop(&mut self) {
156 if let Err(err) = self.proxy.unsubscribe() {
157 eprintln!("Error unsubscribing from proxy: {err}");
158 }
159 }
160}
161
162impl<'a> DbusServiceManager<'a> {
163 pub fn new(
164 connection: &'a zbus::blocking::Connection,
165 ) -> Result<DbusServiceManager<'a>, Error> {
166 let proxy = dbus_manager::ManagerProxy::new(connection)?;
167
168 proxy.subscribe()?;
169
170 Ok(DbusServiceManager { connection, proxy })
171 }
172}
173
174fn to_unit_status(t: UnitStatusTuple) -> DbusUnitStatus {
175 DbusUnitStatus {
176 name: t.0,
177 description: t.1,
178 active_state: t.3,
179 address: t.6,
180 }
181}
182
183impl<'a> super::ServiceManager for &'a DbusServiceManager<'a> {
184 type UnitManager = DbusUnitManager<'a>;
185 type UnitStatus = DbusUnitStatus;
186 type JobSet = DbusJobSet<'a>;
187
188 fn system_status(&self) -> Result<SystemStatus, Error> {
189 let state = &self.proxy.system_state()?;
190 SystemStatus::from_str(state)
191 }
192
193 fn daemon_reload(&self) -> Result<(), Error> {
195 let reloading = self.proxy.receive_reloading()?;
196
197 self.proxy.reload()?;
198
199 for result in reloading {
200 match result.args() {
201 Ok(args) if args.active => break,
202 Ok(_) => {}
203 Err(err) => {
204 eprintln!("Error listening for reload signal: {err}. Assuming done.");
205 break;
206 }
207 }
208 }
209
210 Ok(())
211 }
212
213 fn reset_failed(&self) -> Result<(), Error> {
214 self.proxy.reset_failed()?;
215 Ok(())
216 }
217
218 fn unit_manager(&self, status: &DbusUnitStatus) -> Result<DbusUnitManager<'a>, Error> {
220 let proxy = dbus_unit::UnitProxy::builder(self.connection)
221 .path(status.address.clone())?
222 .build()?;
223
224 Ok(DbusUnitManager { proxy })
225 }
226
227 fn new_job_set(&self) -> Result<DbusJobSet<'a>, Error> {
228 DbusJobSet::new(self)
229 }
230
231 fn list_units_by_states(&self, states: &[&str]) -> Result<Vec<DbusUnitStatus>, Error> {
232 let result = self
233 .proxy
234 .list_units_by_patterns(states, &[])?
235 .drain(..)
236 .map(to_unit_status)
237 .collect();
238
239 Ok(result)
240 }
241}
242
243impl UnitManager for DbusUnitManager<'_> {
244 fn refuse_manual_start(&self) -> Result<bool, Error> {
245 Ok(self.proxy.refuse_manual_start()?)
246 }
247
248 fn refuse_manual_stop(&self) -> Result<bool, Error> {
249 Ok(self.proxy.refuse_manual_stop()?)
250 }
251}