sysd_manager_comcontroler/sysdbus/
watcher.rs1use crate::{
2 errors::SystemdErrors,
3 runtime,
4 sysdbus::{dbus_proxies::Systemd1ManagerProxy, get_connection},
5};
6use base::enums::UnitDBusLevel;
7use futures_util::stream::StreamExt;
8use std::{
9 sync::OnceLock,
10 time::{SystemTime, UNIX_EPOCH},
11};
12use tokio::sync::broadcast;
13use tracing::{debug, error, info, warn};
14use zbus::{MatchRule, MessageStream};
15use zvariant::OwnedObjectPath;
16
17#[derive(Debug, Clone)]
18pub enum SystemdSignal {
19 UnitNew(String, OwnedObjectPath),
20 UnitRemoved(String, OwnedObjectPath),
21 JobNew(u32, OwnedObjectPath, String),
22 JobRemoved(u32, OwnedObjectPath, String, String),
23 StartupFinished(u64, u64, u64, u64, u64, u64),
24 UnitFilesChanged,
25 Reloading(bool),
26}
27
28impl SystemdSignal {
29 pub fn type_text(&self) -> &str {
30 match self {
31 SystemdSignal::UnitNew(_, _) => "UnitNew",
32 SystemdSignal::UnitRemoved(_, _) => "UnitRemoved",
33 SystemdSignal::JobNew(_, _, _) => "JobNew",
34 SystemdSignal::JobRemoved(_, _, _, _) => "JobRemoved",
35 SystemdSignal::StartupFinished(_, _, _, _, _, _) => "StartupFinished",
36 SystemdSignal::UnitFilesChanged => "UnitFilesChanged",
37 SystemdSignal::Reloading(_) => "Reloading",
38 }
39 }
40
41 pub fn details(&self) -> String {
42 match self {
43 SystemdSignal::UnitNew(id, unit) => format!("{id} {unit}"),
44 SystemdSignal::UnitRemoved(id, unit) => format!("{id} {unit}"),
45 SystemdSignal::JobNew(id, job, unit) => {
46 format!("unit={unit} id={id} path={job}")
47 }
48 SystemdSignal::JobRemoved(id, job, unit, result) => {
49 format!("unit={unit} id={id} path={job} result={result}")
50 }
51 SystemdSignal::StartupFinished(firmware, loader, kernel, initrd, userspace, total) => {
52 format!(
53 "firmware={firmware} loader={loader} kernel={kernel} initrd={initrd} userspace={userspace} total={total}",
54 )
55 }
56 SystemdSignal::UnitFilesChanged => String::new(),
57 SystemdSignal::Reloading(active) => format!("active={active}"),
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
63pub struct SystemdSignalRow {
64 pub time_stamp: u64,
65 pub signal: SystemdSignal,
66}
67
68impl SystemdSignalRow {
69 pub fn new(signal: SystemdSignal) -> Self {
70 let current_system_time = SystemTime::now();
71 let since_the_epoch = current_system_time
72 .duration_since(UNIX_EPOCH)
73 .expect("Time went backwards");
74 let time_stamp =
75 since_the_epoch.as_secs() * 1_000_000 + since_the_epoch.subsec_nanos() as u64 / 1_000;
76 SystemdSignalRow { time_stamp, signal }
77 }
78
79 pub fn type_text(&self) -> &str {
80 self.signal.type_text()
81 }
82
83 pub fn details(&self) -> String {
84 self.signal.details()
85 }
86}
87
88static SENDER: OnceLock<broadcast::Sender<SystemdSignalRow>> = OnceLock::new();
89pub fn init_signal_watcher() -> broadcast::Receiver<SystemdSignalRow> {
90 let sender = SENDER.get_or_init(|| {
91 let (systemd_signal_sender, _) = broadcast::channel(2500);
92
93 runtime().spawn(signal_watcher(systemd_signal_sender.clone()));
96
97 systemd_signal_sender
98 });
99
100 sender.subscribe()
101}
102
103async fn signal_watcher(
104 systemd_signal_sender: broadcast::Sender<SystemdSignalRow>,
105) -> Result<(), SystemdErrors> {
106 let connection = get_connection(UnitDBusLevel::System).await?;
107
108 let systemd_proxy = Systemd1ManagerProxy::new(&connection).await?;
109 if let Err(err) = systemd_proxy.subscribe().await {
110 warn!("Subscribe error {:?}", err);
111 };
112 let rule = MatchRule::builder()
113 .msg_type(zbus::message::Type::Signal)
114 .interface("org.freedesktop.systemd1.Manager")?
116 .build();
119
120 let mut stream = MessageStream::for_match_rule(
121 rule,
122 &connection,
123 Some(100),
125 )
126 .await?;
127
128 while let Some(message) = stream.next().await {
129 let signal = match message {
130 Ok(message) => match (
131 message.message_type(),
132 message
133 .header()
134 .member()
135 .map(|member_name| member_name.as_str()),
136 ) {
137 (zbus::message::Type::Signal, Some("UnitNew")) => {
138 let (unit, path): (String, OwnedObjectPath) = message.body().deserialize()?;
139 Some(SystemdSignal::UnitNew(unit, path))
140 }
141 (zbus::message::Type::Signal, Some("UnitRemoved")) => {
142 let (unit, path): (String, OwnedObjectPath) = message.body().deserialize()?;
143 Some(SystemdSignal::UnitRemoved(unit, path))
144 }
145 (zbus::message::Type::Signal, Some("JobRemoved")) => {
146 let (id, job, unit, result): (u32, OwnedObjectPath, String, String) =
147 message.body().deserialize()?;
148 Some(SystemdSignal::JobRemoved(id, job, unit, result))
149 }
150 (zbus::message::Type::Signal, Some("JobNew")) => {
151 let (id, job, unit): (u32, OwnedObjectPath, String) =
152 message.body().deserialize()?;
153 Some(SystemdSignal::JobNew(id, job, unit))
154 }
155 (zbus::message::Type::Signal, Some("Reloading")) => {
156 let active: bool = message.body().deserialize()?;
157 Some(SystemdSignal::Reloading(active))
158 }
159 (zbus::message::Type::Signal, Some("StartupFinished")) => {
160 let (firmware, loader, kernel, initrd, userspace, total): (
161 u64,
162 u64,
163 u64,
164 u64,
165 u64,
166 u64,
167 ) = message.body().deserialize()?;
168 Some(SystemdSignal::StartupFinished(
169 firmware, loader, kernel, initrd, userspace, total,
170 ))
171 }
172 (zbus::message::Type::Signal, Some("UnitFilesChanged")) => {
173 Some(SystemdSignal::UnitFilesChanged)
174 }
175
176 (zbus::message::Type::Signal, _) => {
177 warn!("Unhandled Signal {message:?}");
178 None
179 }
180 (zbus::message::Type::MethodCall, _) => {
181 info!("Method Call {message:?}");
182 None
183 }
184 (zbus::message::Type::MethodReturn, _) => {
185 info!("Method Ret {message:?}");
186 None
187 }
188 (zbus::message::Type::Error, _) => {
189 warn!("Error {message:?}");
190 None
191 }
192 },
193 Err(err) => {
194 error!("{err}");
195 None
196 }
197 };
198
199 if let Some(signal) = signal {
200 let signal_row = SystemdSignalRow::new(signal);
201
202 if let Err(error) = systemd_signal_sender.send(signal_row) {
203 debug!("Send signal Error {error:?}")
204 };
205 }
206 }
207
208 Ok(())
209}