1use crate::{
2 data::UnitInfo,
3 errors::SystemdErrors,
4 runtime,
5 sysdbus::{dbus_proxies::Systemd1ManagerProxy, get_connection},
6};
7use base::enums::UnitDBusLevel;
8use futures_util::stream::StreamExt;
9use std::time::{SystemTime, UNIX_EPOCH};
10use tokio::{
11 sync::{OnceCell, broadcast, oneshot},
12 task::JoinHandle,
13};
14use tracing::{debug, error, info, warn};
15use zbus::{MatchRule, MessageStream};
16use zvariant::OwnedObjectPath;
17
18#[derive(Debug, Clone, Eq, PartialEq, Hash)]
19pub enum SystemdSignal {
20 UnitNew(UnitDBusLevel, String),
21 UnitRemoved(UnitDBusLevel, String),
22 JobNew(UnitDBusLevel, u32, OwnedObjectPath, String),
23 JobRemoved(UnitDBusLevel, u32, OwnedObjectPath, String, String),
24 StartupFinished(UnitDBusLevel, u64, u64, u64, u64, u64, u64),
25 UnitFilesChanged(UnitDBusLevel),
26 Reloading(UnitDBusLevel, bool),
27}
28
29impl SystemdSignal {
30 pub fn type_text(&self) -> &str {
31 match self {
32 SystemdSignal::UnitNew(_, _) => "UnitNew",
33 SystemdSignal::UnitRemoved(_, _) => "UnitRemoved",
34 SystemdSignal::JobNew(_, _, _, _) => "JobNew",
35 SystemdSignal::JobRemoved(_, _, _, _, _) => "JobRemoved",
36 SystemdSignal::StartupFinished(_, _, _, _, _, _, _) => "StartupFinished",
37 SystemdSignal::UnitFilesChanged(_) => "UnitFilesChanged",
38 SystemdSignal::Reloading(_, _) => "Reloading",
39 }
40 }
41
42 pub fn bus_text(&self) -> &str {
43 let level = match self {
44 SystemdSignal::UnitNew(level, _) => level,
45 SystemdSignal::UnitRemoved(level, _) => level,
46 SystemdSignal::JobNew(level, _, _, _) => level,
47 SystemdSignal::JobRemoved(level, _, _, _, _) => level,
48 SystemdSignal::StartupFinished(level, _, _, _, _, _, _) => level,
49 SystemdSignal::UnitFilesChanged(level) => level,
50 SystemdSignal::Reloading(level, _) => level,
51 };
52 level.as_str()
53 }
54
55 pub fn details(&self) -> String {
56 match self {
57 SystemdSignal::UnitNew(_, id) => id.to_string(),
58 SystemdSignal::UnitRemoved(_, id) => id.to_string(),
59 SystemdSignal::JobNew(_, id, job, unit) => {
60 format!("unit={unit} id={id} path={job}")
61 }
62 SystemdSignal::JobRemoved(_, id, job, unit, result) => {
63 format!("unit={unit} id={id} path={job} result={result}")
64 }
65 SystemdSignal::StartupFinished(
66 _,
67 firmware,
68 loader,
69 kernel,
70 initrd,
71 userspace,
72 total,
73 ) => {
74 format!(
75 "firmware={firmware} loader={loader} kernel={kernel} initrd={initrd} userspace={userspace} total={total}",
76 )
77 }
78 SystemdSignal::UnitFilesChanged(_) => String::new(),
79 SystemdSignal::Reloading(_, active) => format!("active={active}"),
80 }
81 }
82
83 pub fn toggle_unit(self) -> Self {
84 match self {
85 SystemdSignal::UnitNew(unit_dbus_level, unit_name) => {
86 Self::UnitRemoved(unit_dbus_level, unit_name)
87 }
88 SystemdSignal::UnitRemoved(unit_dbus_level, unit_name) => {
89 Self::UnitNew(unit_dbus_level, unit_name)
90 }
91 _ => self,
92 }
93 }
94
95 pub fn create_unit(&self) -> Option<UnitInfo> {
96 match self {
97 SystemdSignal::UnitNew(unit_dbus_level, unit_name) => {
98 Some(UnitInfo::from_unit_key(unit_name, *unit_dbus_level))
99 }
100 SystemdSignal::UnitRemoved(unit_dbus_level, unit_name) => {
101 Some(UnitInfo::from_unit_key(unit_name, *unit_dbus_level))
102 }
103 _ => None,
104 }
105 }
106}
107
108#[derive(Debug, Clone)]
109pub struct SystemdSignalRow {
110 pub time_stamp: u64,
111 pub signal: SystemdSignal,
112}
113
114impl SystemdSignalRow {
115 pub fn new(signal: SystemdSignal) -> Self {
116 let current_system_time = SystemTime::now();
117 let since_the_epoch = current_system_time
118 .duration_since(UNIX_EPOCH)
119 .expect("Time went backwards");
120 let time_stamp =
121 since_the_epoch.as_secs() * 1_000_000 + since_the_epoch.subsec_nanos() as u64 / 1_000;
122 SystemdSignalRow { time_stamp, signal }
123 }
124
125 pub fn type_text(&self) -> &str {
126 self.signal.type_text()
127 }
128
129 pub fn bus_text(&self) -> &str {
130 self.signal.bus_text()
131 }
132
133 pub fn details(&self) -> String {
134 self.signal.details()
135 }
136}
137
138static SENDER: OnceCell<broadcast::Sender<SystemdSignal>> = OnceCell::const_new();
139static WACHER_SYSTEM: OnceCell<JoinHandle<Result<(), SystemdErrors>>> = OnceCell::const_new();
140static WACHER_USER_SESSION: OnceCell<JoinHandle<Result<(), SystemdErrors>>> = OnceCell::const_new();
141
142pub async fn init_signal_watcher(level: UnitDBusLevel) -> broadcast::Receiver<SystemdSignal> {
143 let sender = SENDER
144 .get_or_init(async || {
145 let (systemd_signal_sender, _) = broadcast::channel(2500);
146
147 systemd_signal_sender
150 })
151 .await;
152
153 match level {
154 UnitDBusLevel::System => {
155 WACHER_SYSTEM
156 .get_or_init(|| spawn_signal_watcher(UnitDBusLevel::System, sender))
157 .await;
158 }
159 UnitDBusLevel::UserSession => {
160 WACHER_USER_SESSION
161 .get_or_init(|| spawn_signal_watcher(UnitDBusLevel::UserSession, sender))
162 .await;
163 }
164 UnitDBusLevel::Both => {
165 WACHER_SYSTEM.get_or_init(|| spawn_signal_watcher(UnitDBusLevel::System, sender));
166 WACHER_USER_SESSION
167 .get_or_init(|| spawn_signal_watcher(UnitDBusLevel::UserSession, sender))
168 .await;
169 }
170 };
171
172 sender.subscribe()
173}
174
175async fn spawn_signal_watcher(
176 level: UnitDBusLevel,
177 sender: &broadcast::Sender<SystemdSignal>,
178) -> JoinHandle<Result<(), SystemdErrors>> {
179 let sender = sender.clone();
180 let (tell_is_ready, is_ready_ok) = oneshot::channel();
181 let handle = runtime().spawn(signal_watcher(level, sender, tell_is_ready));
182
183 let _ = is_ready_ok
184 .await
185 .inspect_err(|err| error!("Tokio channel dropped {err:?}"));
186 handle
187}
188
189async fn signal_watcher(
190 level: UnitDBusLevel,
191 systemd_signal_sender: broadcast::Sender<SystemdSignal>,
192 tell_is_ready: oneshot::Sender<()>,
193) -> Result<(), SystemdErrors> {
194 info!("Starting Watcher {:?}", level);
195 let connection = get_connection(level).await?;
196
197 let systemd_proxy = Systemd1ManagerProxy::new(&connection).await?;
198 if let Err(err) = systemd_proxy.subscribe().await {
199 warn!("Subscribe error {:?}", err);
200 };
201 let rule = MatchRule::builder()
202 .msg_type(zbus::message::Type::Signal)
203 .interface("org.freedesktop.systemd1.Manager")?
205 .build();
208
209 let mut stream = MessageStream::for_match_rule(
210 rule,
211 &connection,
212 Some(100),
214 )
215 .await?;
216
217 tell_is_ready.send(());
218
219 while let Some(message) = stream.next().await {
220 let signal = match message {
221 Ok(message) => match (
222 message.message_type(),
223 message
224 .header()
225 .member()
226 .map(|member_name| member_name.as_str()),
227 ) {
228 (zbus::message::Type::Signal, Some("UnitNew")) => {
229 let (unit, _path): (String, OwnedObjectPath) = message.body().deserialize()?;
230 Some(SystemdSignal::UnitNew(level, unit))
231 }
232 (zbus::message::Type::Signal, Some("UnitRemoved")) => {
233 let (unit, _path): (String, OwnedObjectPath) = message.body().deserialize()?;
234 Some(SystemdSignal::UnitRemoved(level, unit))
235 }
236 (zbus::message::Type::Signal, Some("JobRemoved")) => {
237 let (id, job, unit, result): (u32, OwnedObjectPath, String, String) =
238 message.body().deserialize()?;
239 Some(SystemdSignal::JobRemoved(level, id, job, unit, result))
240 }
241 (zbus::message::Type::Signal, Some("JobNew")) => {
242 let (id, job, unit): (u32, OwnedObjectPath, String) =
243 message.body().deserialize()?;
244 Some(SystemdSignal::JobNew(level, id, job, unit))
245 }
246 (zbus::message::Type::Signal, Some("Reloading")) => {
247 let active: bool = message.body().deserialize()?;
248 Some(SystemdSignal::Reloading(level, active))
249 }
250 (zbus::message::Type::Signal, Some("StartupFinished")) => {
251 let (firmware, loader, kernel, initrd, userspace, total): (
252 u64,
253 u64,
254 u64,
255 u64,
256 u64,
257 u64,
258 ) = message.body().deserialize()?;
259 Some(SystemdSignal::StartupFinished(
260 level, firmware, loader, kernel, initrd, userspace, total,
261 ))
262 }
263 (zbus::message::Type::Signal, Some("UnitFilesChanged")) => {
264 Some(SystemdSignal::UnitFilesChanged(level))
265 }
266
267 (zbus::message::Type::Signal, _) => {
268 warn!("Unhandled Signal {message:?}");
269 None
270 }
271 (zbus::message::Type::MethodCall, _) => {
272 info!("Method Call {message:?}");
273 None
274 }
275 (zbus::message::Type::MethodReturn, _) => {
276 info!("Method Ret {message:?}");
277 None
278 }
279 (zbus::message::Type::Error, _) => {
280 warn!("Error {message:?}");
281 None
282 }
283 },
284 Err(err) => {
285 error!("{err}");
286 None
287 }
288 };
289
290 if let Some(signal) = signal
291 && let Err(error) = systemd_signal_sender.send(signal)
292 {
293 debug!("Send signal Error {error:?}")
294 };
295 }
296
297 Ok(())
298}