Skip to main content

sysd_manager_comcontroler/sysdbus/
watcher.rs

1use crate::{
2    data::UnitInfo,
3    errors::SystemdErrors,
4    runtime,
5    sysdbus::{dbus_proxies::Systemd1ManagerProxy, get_connection},
6};
7use base::enums::UnitDBusLevel;
8use futures_util::stream::StreamExt;
9use std::time::{SystemTime, UNIX_EPOCH};
10use tokio::{
11    sync::{OnceCell, broadcast, oneshot},
12    task::JoinHandle,
13};
14use tracing::{debug, error, info, warn};
15use zbus::{MatchRule, MessageStream};
16use zvariant::OwnedObjectPath;
17
18#[derive(Debug, Clone, Eq, PartialEq, Hash)]
19pub enum SystemdSignal {
20    UnitNew(UnitDBusLevel, String),
21    UnitRemoved(UnitDBusLevel, String),
22    JobNew(UnitDBusLevel, u32, OwnedObjectPath, String),
23    JobRemoved(UnitDBusLevel, u32, OwnedObjectPath, String, String),
24    StartupFinished(UnitDBusLevel, u64, u64, u64, u64, u64, u64),
25    UnitFilesChanged(UnitDBusLevel),
26    Reloading(UnitDBusLevel, bool),
27}
28
29impl SystemdSignal {
30    pub fn type_text(&self) -> &str {
31        match self {
32            SystemdSignal::UnitNew(_, _) => "UnitNew",
33            SystemdSignal::UnitRemoved(_, _) => "UnitRemoved",
34            SystemdSignal::JobNew(_, _, _, _) => "JobNew",
35            SystemdSignal::JobRemoved(_, _, _, _, _) => "JobRemoved",
36            SystemdSignal::StartupFinished(_, _, _, _, _, _, _) => "StartupFinished",
37            SystemdSignal::UnitFilesChanged(_) => "UnitFilesChanged",
38            SystemdSignal::Reloading(_, _) => "Reloading",
39        }
40    }
41
42    pub fn bus_text(&self) -> &str {
43        let level = match self {
44            SystemdSignal::UnitNew(level, _) => level,
45            SystemdSignal::UnitRemoved(level, _) => level,
46            SystemdSignal::JobNew(level, _, _, _) => level,
47            SystemdSignal::JobRemoved(level, _, _, _, _) => level,
48            SystemdSignal::StartupFinished(level, _, _, _, _, _, _) => level,
49            SystemdSignal::UnitFilesChanged(level) => level,
50            SystemdSignal::Reloading(level, _) => level,
51        };
52        level.as_str()
53    }
54
55    pub fn details(&self) -> String {
56        match self {
57            SystemdSignal::UnitNew(_, id) => id.to_string(),
58            SystemdSignal::UnitRemoved(_, id) => id.to_string(),
59            SystemdSignal::JobNew(_, id, job, unit) => {
60                format!("unit={unit} id={id} path={job}")
61            }
62            SystemdSignal::JobRemoved(_, id, job, unit, result) => {
63                format!("unit={unit} id={id} path={job} result={result}")
64            }
65            SystemdSignal::StartupFinished(
66                _,
67                firmware,
68                loader,
69                kernel,
70                initrd,
71                userspace,
72                total,
73            ) => {
74                format!(
75                    "firmware={firmware} loader={loader} kernel={kernel} initrd={initrd} userspace={userspace} total={total}",
76                )
77            }
78            SystemdSignal::UnitFilesChanged(_) => String::new(),
79            SystemdSignal::Reloading(_, active) => format!("active={active}"),
80        }
81    }
82
83    pub fn toggle_unit(self) -> Self {
84        match self {
85            SystemdSignal::UnitNew(unit_dbus_level, unit_name) => {
86                Self::UnitRemoved(unit_dbus_level, unit_name)
87            }
88            SystemdSignal::UnitRemoved(unit_dbus_level, unit_name) => {
89                Self::UnitNew(unit_dbus_level, unit_name)
90            }
91            _ => self,
92        }
93    }
94
95    pub fn create_unit(&self) -> Option<UnitInfo> {
96        match self {
97            SystemdSignal::UnitNew(unit_dbus_level, unit_name) => {
98                Some(UnitInfo::from_unit_key(unit_name, *unit_dbus_level))
99            }
100            SystemdSignal::UnitRemoved(unit_dbus_level, unit_name) => {
101                Some(UnitInfo::from_unit_key(unit_name, *unit_dbus_level))
102            }
103            _ => None,
104        }
105    }
106}
107
108#[derive(Debug, Clone)]
109pub struct SystemdSignalRow {
110    pub time_stamp: u64,
111    pub signal: SystemdSignal,
112}
113
114impl SystemdSignalRow {
115    pub fn new(signal: SystemdSignal) -> Self {
116        let current_system_time = SystemTime::now();
117        let since_the_epoch = current_system_time
118            .duration_since(UNIX_EPOCH)
119            .expect("Time went backwards");
120        let time_stamp =
121            since_the_epoch.as_secs() * 1_000_000 + since_the_epoch.subsec_nanos() as u64 / 1_000;
122        SystemdSignalRow { time_stamp, signal }
123    }
124
125    pub fn type_text(&self) -> &str {
126        self.signal.type_text()
127    }
128
129    pub fn bus_text(&self) -> &str {
130        self.signal.bus_text()
131    }
132
133    pub fn details(&self) -> String {
134        self.signal.details()
135    }
136}
137
138static SENDER: OnceCell<broadcast::Sender<SystemdSignal>> = OnceCell::const_new();
139static WACHER_SYSTEM: OnceCell<JoinHandle<Result<(), SystemdErrors>>> = OnceCell::const_new();
140static WACHER_USER_SESSION: OnceCell<JoinHandle<Result<(), SystemdErrors>>> = OnceCell::const_new();
141
142pub async fn init_signal_watcher(level: UnitDBusLevel) -> broadcast::Receiver<SystemdSignal> {
143    let sender = SENDER
144        .get_or_init(async || {
145            let (systemd_signal_sender, _) = broadcast::channel(2500);
146
147            // let cancellation_token = tokio_util::sync::CancellationToken::new();
148
149            systemd_signal_sender
150        })
151        .await;
152
153    match level {
154        UnitDBusLevel::System => {
155            WACHER_SYSTEM
156                .get_or_init(|| spawn_signal_watcher(UnitDBusLevel::System, sender))
157                .await;
158        }
159        UnitDBusLevel::UserSession => {
160            WACHER_USER_SESSION
161                .get_or_init(|| spawn_signal_watcher(UnitDBusLevel::UserSession, sender))
162                .await;
163        }
164        UnitDBusLevel::Both => {
165            WACHER_SYSTEM.get_or_init(|| spawn_signal_watcher(UnitDBusLevel::System, sender));
166            WACHER_USER_SESSION
167                .get_or_init(|| spawn_signal_watcher(UnitDBusLevel::UserSession, sender))
168                .await;
169        }
170    };
171
172    sender.subscribe()
173}
174
175async fn spawn_signal_watcher(
176    level: UnitDBusLevel,
177    sender: &broadcast::Sender<SystemdSignal>,
178) -> JoinHandle<Result<(), SystemdErrors>> {
179    let sender = sender.clone();
180    let (tell_is_ready, is_ready_ok) = oneshot::channel();
181    let handle = runtime().spawn(signal_watcher(level, sender, tell_is_ready));
182
183    let _ = is_ready_ok
184        .await
185        .inspect_err(|err| error!("Tokio channel dropped {err:?}"));
186    handle
187}
188
189async fn signal_watcher(
190    level: UnitDBusLevel,
191    systemd_signal_sender: broadcast::Sender<SystemdSignal>,
192    tell_is_ready: oneshot::Sender<()>,
193) -> Result<(), SystemdErrors> {
194    info!("Starting Watcher {:?}", level);
195    let connection = get_connection(level).await?;
196
197    let systemd_proxy = Systemd1ManagerProxy::new(&connection).await?;
198    if let Err(err) = systemd_proxy.subscribe().await {
199        warn!("Subscribe error {:?}", err);
200    };
201    let rule = MatchRule::builder()
202        .msg_type(zbus::message::Type::Signal)
203        // .sender("org.freedesktop.DBus")?
204        .interface("org.freedesktop.systemd1.Manager")?
205        // .member("NameOwnerChanged")?
206        // .add_arg("org.freedesktop.zbus.MatchRuleStreamTest42")?
207        .build();
208
209    let mut stream = MessageStream::for_match_rule(
210        rule,
211        &connection,
212        // For such a specific match rule, we don't need a big queue.
213        Some(100),
214    )
215    .await?;
216
217    tell_is_ready.send(());
218
219    while let Some(message) = stream.next().await {
220        let signal = match message {
221            Ok(message) => match (
222                message.message_type(),
223                message
224                    .header()
225                    .member()
226                    .map(|member_name| member_name.as_str()),
227            ) {
228                (zbus::message::Type::Signal, Some("UnitNew")) => {
229                    let (unit, _path): (String, OwnedObjectPath) = message.body().deserialize()?;
230                    Some(SystemdSignal::UnitNew(level, unit))
231                }
232                (zbus::message::Type::Signal, Some("UnitRemoved")) => {
233                    let (unit, _path): (String, OwnedObjectPath) = message.body().deserialize()?;
234                    Some(SystemdSignal::UnitRemoved(level, unit))
235                }
236                (zbus::message::Type::Signal, Some("JobRemoved")) => {
237                    let (id, job, unit, result): (u32, OwnedObjectPath, String, String) =
238                        message.body().deserialize()?;
239                    Some(SystemdSignal::JobRemoved(level, id, job, unit, result))
240                }
241                (zbus::message::Type::Signal, Some("JobNew")) => {
242                    let (id, job, unit): (u32, OwnedObjectPath, String) =
243                        message.body().deserialize()?;
244                    Some(SystemdSignal::JobNew(level, id, job, unit))
245                }
246                (zbus::message::Type::Signal, Some("Reloading")) => {
247                    let active: bool = message.body().deserialize()?;
248                    Some(SystemdSignal::Reloading(level, active))
249                }
250                (zbus::message::Type::Signal, Some("StartupFinished")) => {
251                    let (firmware, loader, kernel, initrd, userspace, total): (
252                        u64,
253                        u64,
254                        u64,
255                        u64,
256                        u64,
257                        u64,
258                    ) = message.body().deserialize()?;
259                    Some(SystemdSignal::StartupFinished(
260                        level, firmware, loader, kernel, initrd, userspace, total,
261                    ))
262                }
263                (zbus::message::Type::Signal, Some("UnitFilesChanged")) => {
264                    Some(SystemdSignal::UnitFilesChanged(level))
265                }
266
267                (zbus::message::Type::Signal, _) => {
268                    warn!("Unhandled Signal {message:?}");
269                    None
270                }
271                (zbus::message::Type::MethodCall, _) => {
272                    info!("Method Call {message:?}");
273                    None
274                }
275                (zbus::message::Type::MethodReturn, _) => {
276                    info!("Method Ret {message:?}");
277                    None
278                }
279                (zbus::message::Type::Error, _) => {
280                    warn!("Error {message:?}");
281                    None
282                }
283            },
284            Err(err) => {
285                error!("{err}");
286                None
287            }
288        };
289
290        if let Some(signal) = signal
291            && let Err(error) = systemd_signal_sender.send(signal)
292        {
293            debug!("Send signal Error {error:?}")
294        };
295    }
296
297    Ok(())
298}