stray/notifier_watcher/
mod.rs1use crate::dbus::dbusmenu_proxy::DBusMenuProxy;
2use crate::dbus::notifier_item_proxy::StatusNotifierItemProxy;
3use crate::dbus::notifier_watcher_proxy::StatusNotifierWatcherProxy;
4use crate::error::Result;
5use crate::message::menu::TrayMenu;
6use crate::message::NotifierItemCommand;
7use crate::notifier_watcher::notifier_address::NotifierAddress;
8use crate::{
9 DbusNotifierWatcher, InterfaceName, MenuLayout, NotifierItemMessage, StatusNotifierItem,
10};
11use tokio::sync::{broadcast, mpsc};
12use tokio_stream::StreamExt;
13use zbus::fdo::PropertiesProxy;
14use zbus::{Connection, ConnectionBuilder};
15
16pub(crate) mod notifier_address;
17
18pub struct StatusNotifierWatcher {
21 pub(crate) tx: broadcast::Sender<NotifierItemMessage>,
22}
23
24impl StatusNotifierWatcher {
25 pub async fn new(cmd_rx: mpsc::Receiver<NotifierItemCommand>) -> Result<StatusNotifierWatcher> {
29 let (tx, _) = broadcast::channel(5);
30
31 {
32 let tx = tx.clone();
33 tokio::spawn(async move {
34 tracing::info!("Starting notifier watcher");
35 start_notifier_watcher(tx)
36 .await
37 .expect("Unexpected StatusNotifierError ");
38 });
39 }
40
41 {
42 tokio::spawn(async move {
43 dispatch_ui_command(cmd_rx)
44 .await
45 .expect("Unexpected error while dispatching UI command");
46 });
47 }
48
49 Ok(StatusNotifierWatcher { tx })
50 }
51}
52
53async fn dispatch_ui_command(mut cmd_rx: mpsc::Receiver<NotifierItemCommand>) -> Result<()> {
55 let connection = Connection::session().await?;
56
57 while let Some(command) = cmd_rx.recv().await {
58 match command {
59 NotifierItemCommand::MenuItemClicked {
60 submenu_id: id,
61 menu_path,
62 notifier_address,
63 } => {
64 let dbus_menu_proxy = DBusMenuProxy::builder(&connection)
65 .destination(notifier_address)
66 .unwrap()
67 .path(menu_path)
68 .unwrap()
69 .build()
70 .await?;
71
72 dbus_menu_proxy
73 .event(
74 id,
75 "clicked",
76 &zbus::zvariant::Value::I32(32),
77 chrono::offset::Local::now().timestamp_subsec_micros(),
78 )
79 .await?;
80 }
81 }
82 }
83
84 Ok(())
85}
86
87async fn start_notifier_watcher(sender: broadcast::Sender<NotifierItemMessage>) -> Result<()> {
88 let watcher = DbusNotifierWatcher::new(sender.clone());
89
90 ConnectionBuilder::session()?
91 .name("org.kde.StatusNotifierWatcher")?
92 .serve_at("/StatusNotifierWatcher", watcher)?
93 .build()
94 .await?;
95
96 let connection = Connection::session().await?;
97
98 let status_notifier_removed = {
99 let connection = connection.clone();
100 tokio::spawn(async move {
101 status_notifier_removed_handle(connection).await?;
102 Result::<()>::Ok(())
103 })
104 };
105
106 let status_notifier = {
107 let connection = connection.clone();
108 tokio::spawn(async move { status_notifier_handle(connection, sender).await.unwrap() })
109 };
110
111 let _ = tokio::join!(status_notifier, status_notifier_removed,);
112
113 Ok(())
114}
115
116async fn status_notifier_removed_handle(connection: Connection) -> Result<()> {
119 let dbus_proxy = zbus::fdo::DBusProxy::new(&connection).await.unwrap();
120
121 let mut changed = dbus_proxy
122 .receive_name_owner_changed()
123 .await
124 .expect("fail to receive Dbus NameOwnerChanged");
125
126 while let Some(signal) = changed.next().await {
127 let args = signal.args().expect("Failed to get signal args");
128 let old = args.old_owner();
129 let new = args.new_owner();
130
131 if old.is_some() && new.is_none() {
132 let old_owner: String = old.as_ref().unwrap().to_string();
133 let watcher_proxy = StatusNotifierWatcherProxy::new(&connection)
134 .await
135 .expect("Failed to open StatusNotifierWatcherProxy");
136
137 watcher_proxy
138 .unregister_status_notifier_item(&old_owner)
139 .await
140 .expect("failed to unregister status notifier");
141 }
142 }
143
144 Ok(())
145}
146
147async fn status_notifier_handle(
153 connection: Connection,
154 sender: broadcast::Sender<NotifierItemMessage>,
155) -> Result<()> {
156 let status_notifier_proxy = StatusNotifierWatcherProxy::new(&connection).await?;
157
158 let notifier_items: Vec<String> = status_notifier_proxy
159 .registered_status_notifier_items()
160 .await?;
161
162 tracing::info!("Got {} notifier items", notifier_items.len());
163
164 for service in notifier_items.iter() {
166 let service = NotifierAddress::from_notifier_service(service);
167 if let Ok(notifier_address) = service {
168 let connection = connection.clone();
169 let sender = sender.clone();
170 watch_notifier_props(notifier_address, connection, sender).await?;
171 }
172 }
173
174 let mut new_notifier = status_notifier_proxy
176 .receive_status_notifier_item_registered()
177 .await?;
178
179 while let Some(notifier) = new_notifier.next().await {
180 let args = notifier.args()?;
181 let service: &str = args.service();
182 tracing::info!(
183 "StatusNotifierItemRegistered signal received service={}",
184 service
185 );
186
187 let service = NotifierAddress::from_notifier_service(service);
188 if let Ok(notifier_address) = service {
189 let connection = connection.clone();
190 let sender = sender.clone();
191 tokio::spawn(async move {
192 watch_notifier_props(notifier_address, connection, sender).await?;
193 Result::<()>::Ok(())
194 });
195 }
196 }
197
198 Ok(())
199}
200
201async fn watch_notifier_props(
203 address_parts: NotifierAddress,
204 connection: Connection,
205 sender: broadcast::Sender<NotifierItemMessage>,
206) -> Result<()> {
207 tokio::spawn(async move {
208 let dbus_properties_proxy = zbus::fdo::PropertiesProxy::builder(&connection)
210 .destination(address_parts.destination.as_str())?
211 .path(address_parts.path.as_str())?
212 .build()
213 .await?;
214
215 fetch_properties_and_update(
217 sender.clone(),
218 &dbus_properties_proxy,
219 address_parts.destination.clone(),
220 connection.clone(),
221 )
222 .await?;
223
224 let notifier_item_proxy = StatusNotifierItemProxy::builder(&connection)
226 .destination(address_parts.destination.as_str())?
227 .path(address_parts.path.as_str())?
228 .build()
229 .await?;
230
231 let mut props_changed = notifier_item_proxy.receive_all_signals().await?;
232
233 while props_changed.next().await.is_some() {
235 fetch_properties_and_update(
236 sender.clone(),
237 &dbus_properties_proxy,
238 address_parts.destination.clone(),
239 connection.clone(),
240 )
241 .await?;
242 }
243
244 Result::<()>::Ok(())
245 });
246
247 Ok(())
248}
249
250async fn fetch_properties_and_update(
252 sender: broadcast::Sender<NotifierItemMessage>,
253 dbus_properties_proxy: &PropertiesProxy<'_>,
254 item_address: String,
255 connection: Connection,
256) -> Result<()> {
257 let interface = InterfaceName::from_static_str("org.kde.StatusNotifierItem")?;
258 let props = dbus_properties_proxy.get_all(interface).await?;
259 let item = StatusNotifierItem::try_from(props);
260
261 if let Ok(item) = item {
263 let menu = match &item.menu {
264 None => None,
265 Some(menu_address) => watch_menu(
266 item_address.clone(),
267 item.clone(),
268 connection.clone(),
269 menu_address.clone(),
270 sender.clone(),
271 )
272 .await
273 .ok(),
274 };
275
276 tracing::info!("StatusNotifierItem updated, dbus-address={item_address}");
277
278 sender
279 .send(NotifierItemMessage::Update {
280 address: item_address.to_string(),
281 item: Box::new(item),
282 menu,
283 })
284 .expect("Failed to dispatch NotifierItemMessage");
285 }
286
287 Ok(())
288}
289
290async fn watch_menu(
291 item_address: String,
292 item: StatusNotifierItem,
293 connection: Connection,
294 menu_address: String,
295 sender: broadcast::Sender<NotifierItemMessage>,
296) -> Result<TrayMenu> {
297 let dbus_menu_proxy = DBusMenuProxy::builder(&connection)
298 .destination(item_address.as_str())?
299 .path(menu_address.as_str())?
300 .build()
301 .await?;
302
303 let menu: MenuLayout = dbus_menu_proxy.get_layout(0, 10, &[]).await.unwrap();
304
305 tokio::spawn(async move {
306 let dbus_menu_proxy = DBusMenuProxy::builder(&connection)
307 .destination(item_address.as_str())?
308 .path(menu_address.as_str())?
309 .build()
310 .await?;
311
312 let mut props_changed = dbus_menu_proxy.receive_all_signals().await?;
313
314 while props_changed.next().await.is_some() {
315 let menu: MenuLayout = dbus_menu_proxy.get_layout(0, 10, &[]).await.unwrap();
316 let menu = TrayMenu::try_from(menu).ok();
317 sender.send(NotifierItemMessage::Update {
318 address: item_address.to_string(),
319 item: Box::new(item.clone()),
320 menu,
321 })?;
322 }
323 anyhow::Result::<(), anyhow::Error>::Ok(())
324 });
325
326 TrayMenu::try_from(menu).map_err(Into::into)
327}