Skip to main content

sysd_manager_comcontroler/sysdbus/
watcher.rs

1use crate::{
2    errors::SystemdErrors,
3    runtime,
4    sysdbus::{dbus_proxies::Systemd1ManagerProxy, get_connection},
5};
6use base::enums::UnitDBusLevel;
7use futures_util::stream::StreamExt;
8use std::{
9    sync::OnceLock,
10    time::{SystemTime, UNIX_EPOCH},
11};
12use tokio::sync::broadcast;
13use tracing::{debug, error, info, warn};
14use zbus::{MatchRule, MessageStream};
15use zvariant::OwnedObjectPath;
16
17#[derive(Debug, Clone)]
18pub enum SystemdSignal {
19    UnitNew(String, OwnedObjectPath),
20    UnitRemoved(String, OwnedObjectPath),
21    JobNew(u32, OwnedObjectPath, String),
22    JobRemoved(u32, OwnedObjectPath, String, String),
23    StartupFinished(u64, u64, u64, u64, u64, u64),
24    UnitFilesChanged,
25    Reloading(bool),
26}
27
28impl SystemdSignal {
29    pub fn type_text(&self) -> &str {
30        match self {
31            SystemdSignal::UnitNew(_, _) => "UnitNew",
32            SystemdSignal::UnitRemoved(_, _) => "UnitRemoved",
33            SystemdSignal::JobNew(_, _, _) => "JobNew",
34            SystemdSignal::JobRemoved(_, _, _, _) => "JobRemoved",
35            SystemdSignal::StartupFinished(_, _, _, _, _, _) => "StartupFinished",
36            SystemdSignal::UnitFilesChanged => "UnitFilesChanged",
37            SystemdSignal::Reloading(_) => "Reloading",
38        }
39    }
40
41    pub fn details(&self) -> String {
42        match self {
43            SystemdSignal::UnitNew(id, unit) => format!("{id} {unit}"),
44            SystemdSignal::UnitRemoved(id, unit) => format!("{id} {unit}"),
45            SystemdSignal::JobNew(id, job, unit) => {
46                format!("unit={unit} id={id} path={job}")
47            }
48            SystemdSignal::JobRemoved(id, job, unit, result) => {
49                format!("unit={unit} id={id} path={job} result={result}")
50            }
51            SystemdSignal::StartupFinished(firmware, loader, kernel, initrd, userspace, total) => {
52                format!(
53                    "firmware={firmware} loader={loader} kernel={kernel} initrd={initrd} userspace={userspace} total={total}",
54                )
55            }
56            SystemdSignal::UnitFilesChanged => String::new(),
57            SystemdSignal::Reloading(active) => format!("active={active}"),
58        }
59    }
60}
61
62#[derive(Debug, Clone)]
63pub struct SystemdSignalRow {
64    pub time_stamp: u64,
65    pub signal: SystemdSignal,
66}
67
68impl SystemdSignalRow {
69    pub fn new(signal: SystemdSignal) -> Self {
70        let current_system_time = SystemTime::now();
71        let since_the_epoch = current_system_time
72            .duration_since(UNIX_EPOCH)
73            .expect("Time went backwards");
74        let time_stamp =
75            since_the_epoch.as_secs() * 1_000_000 + since_the_epoch.subsec_nanos() as u64 / 1_000;
76        SystemdSignalRow { time_stamp, signal }
77    }
78
79    pub fn type_text(&self) -> &str {
80        self.signal.type_text()
81    }
82
83    pub fn details(&self) -> String {
84        self.signal.details()
85    }
86}
87
88static SENDER: OnceLock<broadcast::Sender<SystemdSignalRow>> = OnceLock::new();
89pub fn init_signal_watcher() -> broadcast::Receiver<SystemdSignalRow> {
90    let sender = SENDER.get_or_init(|| {
91        let (systemd_signal_sender, _) = broadcast::channel(2500);
92
93        // let cancellation_token = tokio_util::sync::CancellationToken::new();
94
95        runtime().spawn(signal_watcher(systemd_signal_sender.clone()));
96
97        systemd_signal_sender
98    });
99
100    sender.subscribe()
101}
102
103async fn signal_watcher(
104    systemd_signal_sender: broadcast::Sender<SystemdSignalRow>,
105) -> Result<(), SystemdErrors> {
106    let connection = get_connection(UnitDBusLevel::System).await?;
107
108    let systemd_proxy = Systemd1ManagerProxy::new(&connection).await?;
109    if let Err(err) = systemd_proxy.subscribe().await {
110        warn!("Subscribe error {:?}", err);
111    };
112    let rule = MatchRule::builder()
113        .msg_type(zbus::message::Type::Signal)
114        // .sender("org.freedesktop.DBus")?
115        .interface("org.freedesktop.systemd1.Manager")?
116        // .member("NameOwnerChanged")?
117        // .add_arg("org.freedesktop.zbus.MatchRuleStreamTest42")?
118        .build();
119
120    let mut stream = MessageStream::for_match_rule(
121        rule,
122        &connection,
123        // For such a specific match rule, we don't need a big queue.
124        Some(100),
125    )
126    .await?;
127
128    while let Some(message) = stream.next().await {
129        let signal = match message {
130            Ok(message) => match (
131                message.message_type(),
132                message
133                    .header()
134                    .member()
135                    .map(|member_name| member_name.as_str()),
136            ) {
137                (zbus::message::Type::Signal, Some("UnitNew")) => {
138                    let (unit, path): (String, OwnedObjectPath) = message.body().deserialize()?;
139                    Some(SystemdSignal::UnitNew(unit, path))
140                }
141                (zbus::message::Type::Signal, Some("UnitRemoved")) => {
142                    let (unit, path): (String, OwnedObjectPath) = message.body().deserialize()?;
143                    Some(SystemdSignal::UnitRemoved(unit, path))
144                }
145                (zbus::message::Type::Signal, Some("JobRemoved")) => {
146                    let (id, job, unit, result): (u32, OwnedObjectPath, String, String) =
147                        message.body().deserialize()?;
148                    Some(SystemdSignal::JobRemoved(id, job, unit, result))
149                }
150                (zbus::message::Type::Signal, Some("JobNew")) => {
151                    let (id, job, unit): (u32, OwnedObjectPath, String) =
152                        message.body().deserialize()?;
153                    Some(SystemdSignal::JobNew(id, job, unit))
154                }
155                (zbus::message::Type::Signal, Some("Reloading")) => {
156                    let active: bool = message.body().deserialize()?;
157                    Some(SystemdSignal::Reloading(active))
158                }
159                (zbus::message::Type::Signal, Some("StartupFinished")) => {
160                    let (firmware, loader, kernel, initrd, userspace, total): (
161                        u64,
162                        u64,
163                        u64,
164                        u64,
165                        u64,
166                        u64,
167                    ) = message.body().deserialize()?;
168                    Some(SystemdSignal::StartupFinished(
169                        firmware, loader, kernel, initrd, userspace, total,
170                    ))
171                }
172                (zbus::message::Type::Signal, Some("UnitFilesChanged")) => {
173                    Some(SystemdSignal::UnitFilesChanged)
174                }
175
176                (zbus::message::Type::Signal, _) => {
177                    warn!("Unhandled Signal {message:?}");
178                    None
179                }
180                (zbus::message::Type::MethodCall, _) => {
181                    info!("Method Call {message:?}");
182                    None
183                }
184                (zbus::message::Type::MethodReturn, _) => {
185                    info!("Method Ret {message:?}");
186                    None
187                }
188                (zbus::message::Type::Error, _) => {
189                    warn!("Error {message:?}");
190                    None
191                }
192            },
193            Err(err) => {
194                error!("{err}");
195                None
196            }
197        };
198
199        if let Some(signal) = signal {
200            let signal_row = SystemdSignalRow::new(signal);
201
202            if let Err(error) = systemd_signal_sender.send(signal_row) {
203                debug!("Send signal Error {error:?}")
204            };
205        }
206    }
207
208    Ok(())
209}