1use crate::dbus::dbus_menu_proxy::{DBusMenuProxy, PropertiesUpdate};
2use crate::dbus::notifier_item_proxy::StatusNotifierItemProxy;
3use crate::dbus::notifier_watcher_proxy::StatusNotifierWatcherProxy;
4use crate::dbus::status_notifier_watcher::StatusNotifierWatcher;
5use crate::dbus::{self, OwnedValueExt};
6use crate::error::{Error, Result};
7use crate::item::{self, Status, StatusNotifierItem, Tooltip};
8use crate::menu::{MenuDiff, TrayMenu};
9use crate::names;
10use dbus::DBusProps;
11use futures_lite::StreamExt;
12use std::collections::HashMap;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, SystemTime, UNIX_EPOCH};
15use tokio::spawn;
16use tokio::sync::broadcast;
17use tokio::time::timeout;
18use tracing::{debug, error, trace, warn};
19use zbus::fdo::{DBusProxy, PropertiesProxy};
20use zbus::names::InterfaceName;
21use zbus::zvariant::{Structure, Value};
22use zbus::{Connection, Message};
23
24use self::names::ITEM_OBJECT;
25
26#[derive(Debug, Clone)]
30pub enum Event {
31 Add(String, Box<StatusNotifierItem>),
33 Update(String, UpdateEvent),
37 Remove(String),
39}
40
41#[derive(Debug, Clone)]
43pub enum UpdateEvent {
44 AttentionIcon(Option<String>),
45 Icon(Option<String>),
46 OverlayIcon(Option<String>),
47 Status(Status),
48 Title(Option<String>),
49 Tooltip(Option<Tooltip>),
50 Menu(TrayMenu),
53 MenuDiff(Vec<MenuDiff>),
56 MenuConnect(String),
59}
60
61#[derive(Debug, Clone)]
64pub enum ActivateRequest {
65 MenuItem {
67 address: String,
68 menu_path: String,
69 submenu_id: i32,
70 },
71 Default { address: String, x: i32, y: i32 },
74 Secondary { address: String, x: i32, y: i32 },
77}
78
79type State = HashMap<String, (StatusNotifierItem, Option<TrayMenu>)>;
80
81const PROPERTIES_INTERFACE: &str = "org.kde.StatusNotifierItem";
82
83#[derive(Debug)]
85pub struct Client {
86 tx: broadcast::Sender<Event>,
87 _rx: broadcast::Receiver<Event>,
88 connection: Connection,
89
90 items: Arc<Mutex<State>>,
91}
92
93impl Client {
94 pub async fn new() -> Result<Self> {
117 let connection = Connection::session().await?;
118 let (tx, rx) = broadcast::channel(32);
119
120 StatusNotifierWatcher::new().attach_to(&connection).await?;
122
123 let watcher_proxy = StatusNotifierWatcherProxy::new(&connection).await?;
125
126 let pid = std::process::id();
129 let mut i = 0;
130 let wellknown = loop {
131 use zbus::fdo::RequestNameReply::*;
132
133 i += 1;
134 let wellknown = format!("org.freedesktop.StatusNotifierHost-{pid}-{i}");
135 let wellknown: zbus::names::WellKnownName = wellknown
136 .try_into()
137 .expect("generated well-known name is invalid");
138
139 let flags = [zbus::fdo::RequestNameFlags::DoNotQueue];
140 match connection
141 .request_name_with_flags(&wellknown, flags.into_iter().collect())
142 .await?
143 {
144 PrimaryOwner => break wellknown,
145 Exists | AlreadyOwner => {}
146 InQueue => unreachable!(
147 "request_name_with_flags returned InQueue even though we specified DoNotQueue"
148 ),
149 };
150 };
151
152 debug!("wellknown: {wellknown}");
153 watcher_proxy
154 .register_status_notifier_host(&wellknown)
155 .await?;
156
157 let items = Arc::new(Mutex::new(HashMap::new()));
158
159 {
161 let connection = connection.clone();
162 let tx = tx.clone();
163 let items = items.clone();
164
165 let mut stream = watcher_proxy
166 .receive_status_notifier_item_registered()
167 .await?;
168
169 spawn(async move {
170 while let Some(item) = stream.next().await {
171 let address = item.args().map(|args| args.service);
172
173 if let Ok(address) = address {
174 debug!("received new item: {address}");
175 if let Err(err) = Self::handle_item(
176 address,
177 connection.clone(),
178 tx.clone(),
179 items.clone(),
180 )
181 .await
182 {
183 error!("{err}");
184 break;
185 }
186 }
187 }
188
189 Ok::<(), Error>(())
190 });
191 }
192
193 {
197 let connection = connection.clone();
198 let tx = tx.clone();
199 let items = items.clone();
200
201 spawn(async move {
202 let initial_items = watcher_proxy.registered_status_notifier_items().await?;
203 debug!("initial items: {initial_items:?}");
204
205 for item in initial_items {
206 if let Err(err) =
207 Self::handle_item(&item, connection.clone(), tx.clone(), items.clone())
208 .await
209 {
210 error!("{err}");
211 }
212 }
213
214 Ok::<(), Error>(())
215 });
216 }
217
218 {
221 let tx = tx.clone();
222 let items = items.clone();
223
224 let dbus_proxy = DBusProxy::new(&connection).await?;
225
226 let mut stream = dbus_proxy.receive_name_acquired().await?;
227
228 spawn(async move {
229 while let Some(thing) = stream.next().await {
230 let body = thing.args()?;
231 if body.name == names::WATCHER_BUS {
232 let mut items = items.lock().expect("mutex lock should succeed");
233 let keys = items.keys().cloned().collect::<Vec<_>>();
234 for address in keys {
235 items.remove(&address);
236 tx.send(Event::Remove(address))?;
237 }
238 }
239 }
240
241 Ok::<(), Error>(())
242 });
243 }
244
245 debug!("tray client initialized");
246
247 Ok(Self {
248 connection,
249 tx,
250 _rx: rx,
251 items,
252 })
253 }
254
255 async fn handle_item(
258 address: &str,
259 connection: Connection,
260 tx: broadcast::Sender<Event>,
261 items: Arc<Mutex<State>>,
262 ) -> crate::error::Result<()> {
263 let (destination, path) = parse_address(address);
264
265 let properties_proxy = PropertiesProxy::builder(&connection)
266 .destination(destination.to_string())?
267 .path(path.clone())?
268 .build()
269 .await?;
270
271 let properties = Self::get_item_properties(destination, &path, &properties_proxy).await?;
272
273 items
274 .lock()
275 .expect("mutex lock should succeed")
276 .insert(destination.into(), (properties.clone(), None));
277
278 tx.send(Event::Add(
279 destination.to_string(),
280 properties.clone().into(),
281 ))?;
282
283 {
284 let connection = connection.clone();
285 let destination = destination.to_string();
286 let items = items.clone();
287 let tx = tx.clone();
288
289 spawn(async move {
290 Self::watch_item_properties(
291 &destination,
292 &path,
293 &connection,
294 properties_proxy,
295 items,
296 tx,
297 )
298 .await?;
299
300 debug!("Stopped watching {destination}{path}");
301 Ok::<(), Error>(())
302 });
303 }
304
305 if let Some(menu) = properties.menu {
306 let destination = destination.to_string();
307
308 tx.send(Event::Update(
309 destination.clone(),
310 UpdateEvent::MenuConnect(menu.clone()),
311 ))?;
312
313 spawn(async move {
314 Self::watch_menu(destination, &menu, &connection, tx, items).await?;
315 Ok::<(), Error>(())
316 });
317 }
318
319 Ok(())
320 }
321
322 async fn get_item_properties(
324 destination: &str,
325 path: &str,
326 properties_proxy: &PropertiesProxy<'_>,
327 ) -> crate::error::Result<StatusNotifierItem> {
328 let properties = properties_proxy
329 .get_all(
330 InterfaceName::from_static_str(PROPERTIES_INTERFACE)
331 .expect("to be valid interface name"),
332 )
333 .await;
334
335 let properties = match properties {
336 Ok(properties) => properties,
337 Err(err) => {
338 error!("Error fetching properties from {destination}{path}: {err:?}");
339 return Err(err.into());
340 }
341 };
342
343 StatusNotifierItem::try_from(DBusProps(properties))
344 }
345
346 async fn watch_item_properties(
349 destination: &str,
350 path: &str,
351 connection: &Connection,
352 properties_proxy: PropertiesProxy<'_>,
353 items: Arc<Mutex<State>>,
354 tx: broadcast::Sender<Event>,
355 ) -> crate::error::Result<()> {
356 let notifier_item_proxy = StatusNotifierItemProxy::builder(connection)
357 .destination(destination)?
358 .path(path)?
359 .build()
360 .await?;
361
362 let dbus_proxy = DBusProxy::new(connection).await?;
363
364 let mut disconnect_stream = dbus_proxy.receive_name_owner_changed().await?;
365 let mut props_changed = notifier_item_proxy.inner().receive_all_signals().await?;
366
367 loop {
368 tokio::select! {
369 Some(change) = props_changed.next() => {
370 match Self::get_update_event(change, &properties_proxy).await {
371 Ok(Some(event)) => {
372 debug!("[{destination}{path}] received property change: {event:?}");
373 tx.send(Event::Update(destination.to_string(), event))?;
374 }
375 Err(e) => {
376 error!("Error parsing update properties from {destination}{path}: {e:?}");
377 }
378 _ => {}
379 }
380 }
381 Some(signal) = disconnect_stream.next() => {
382 let args = signal.args()?;
383 let old = args.old_owner();
384 let new = args.new_owner();
385
386 if let (Some(old), None) = (old.as_ref(), new.as_ref()) {
387 if old == destination {
388 debug!("[{destination}{path}] disconnected");
389
390 let watcher_proxy = StatusNotifierWatcherProxy::new(connection)
391 .await
392 .expect("Failed to open StatusNotifierWatcherProxy");
393
394 if let Err(error) = watcher_proxy.unregister_status_notifier_item(old).await {
395 error!("{error:?}");
396 }
397
398 items.lock().expect("mutex lock should succeed").remove(&destination.to_string());
399
400 tx.send(Event::Remove(destination.to_string()))?;
401 break Ok(());
402 }
403 }
404 }
405 }
406 }
407 }
408
409 async fn get_update_event(
411 change: Message,
412 properties_proxy: &PropertiesProxy<'_>,
413 ) -> Result<Option<UpdateEvent>> {
414 let header = change.header();
415 let member = header
416 .member()
417 .ok_or(Error::InvalidData("Update message header missing `member`"))?;
418
419 let property_name = match member.as_str() {
420 "NewAttentionIcon" => "AttentionIconName",
421 "NewIcon" => "IconName",
422 "NewOverlayIcon" => "OverlayIconName",
423 "NewStatus" => "Status",
424 "NewTitle" => "Title",
425 "NewToolTip" => "ToolTip",
426 _ => &member.as_str()["New".len()..],
427 };
428
429 let property = properties_proxy
430 .get(
431 InterfaceName::from_static_str(PROPERTIES_INTERFACE)
432 .expect("to be valid interface name"),
433 property_name,
434 )
435 .await?;
436
437 debug!("received tray item update: {member} -> {property:?}");
438
439 use UpdateEvent::*;
440 Ok(match member.as_str() {
441 "NewAttentionIcon" => Some(AttentionIcon(property.to_string().ok())),
442 "NewIcon" => Some(Icon(property.to_string().ok())),
443 "NewOverlayIcon" => Some(OverlayIcon(property.to_string().ok())),
444 "NewStatus" => Some(Status(
445 property.downcast_ref::<&str>().map(item::Status::from)?,
446 )),
447 "NewTitle" => Some(Title(property.to_string().ok())),
448 "NewToolTip" => Some(Tooltip({
449 property
450 .downcast_ref::<&Structure>()
451 .ok()
452 .map(crate::item::Tooltip::try_from)
453 .transpose()?
454 })),
455 _ => {
456 warn!("received unhandled update event: {member}");
457 None
458 }
459 })
460 }
461
462 async fn watch_menu(
468 destination: String,
469 menu_path: &str,
470 connection: &Connection,
471 tx: broadcast::Sender<Event>,
472 items: Arc<Mutex<State>>,
473 ) -> crate::error::Result<()> {
474 let dbus_menu_proxy = DBusMenuProxy::builder(connection)
475 .destination(destination.as_str())?
476 .path(menu_path)?
477 .build()
478 .await?;
479
480 let menu = dbus_menu_proxy.get_layout(0, 10, &[]).await?;
481 let menu = TrayMenu::try_from(menu)?;
482
483 if let Some((_, menu_cache)) = items
484 .lock()
485 .expect("mutex lock should succeed")
486 .get_mut(&destination)
487 {
488 menu_cache.replace(menu.clone());
489 } else {
490 error!("could not find item in state");
491 }
492
493 tx.send(Event::Update(
494 destination.to_string(),
495 UpdateEvent::Menu(menu),
496 ))?;
497
498 let mut layout_updated = dbus_menu_proxy.receive_layout_updated().await?;
499 let mut properties_updated = dbus_menu_proxy.receive_items_properties_updated().await?;
500
501 loop {
502 tokio::select!(
503 Some(_) = layout_updated.next() => {
504 debug!("[{destination}{menu_path}] layout update");
505
506 let get_layout = dbus_menu_proxy.get_layout(0, 10, &[]);
507
508 let menu = match timeout(Duration::from_secs(1), get_layout).await {
509 Ok(Ok(menu)) => {
510 debug!("got new menu layout");
511 menu
512 }
513 Ok(Err(err)) => {
514 error!("error fetching layout: {err:?}");
515 break;
516 }
517 Err(_) => {
518 error!("Timeout getting layout");
519 break;
520 }
521 };
522
523 let menu = TrayMenu::try_from(menu)?;
524
525 if let Some((_, menu_cache)) = items
526 .lock()
527 .expect("mutex lock should succeed")
528 .get_mut(&destination)
529 {
530 menu_cache.replace(menu.clone());
531 } else {
532 error!("could not find item in state");
533 }
534
535 debug!("sending new menu for '{destination}'");
536 trace!("new menu for '{destination}': {menu:?}");
537 tx.send(Event::Update(
538 destination.to_string(),
539 UpdateEvent::Menu(menu),
540 ))?;
541 }
542 Some(change) = properties_updated.next() => {
543 let body = change.message().body();
544 let update: PropertiesUpdate= body.deserialize::<PropertiesUpdate>()?;
545 let diffs = Vec::try_from(update)?;
546
547 tx.send(Event::Update(
548 destination.to_string(),
549 UpdateEvent::MenuDiff(diffs),
550 ))?;
551
552 }
554 );
555 }
556
557 Ok(())
558 }
559
560 async fn get_notifier_item_proxy(
561 &self,
562 address: String,
563 ) -> crate::error::Result<StatusNotifierItemProxy<'_>> {
564 let proxy = StatusNotifierItemProxy::builder(&self.connection)
565 .destination(address)?
566 .path(ITEM_OBJECT)?
567 .build()
568 .await?;
569 Ok(proxy)
570 }
571
572 async fn get_menu_proxy(
573 &self,
574 address: String,
575 menu_path: String,
576 ) -> crate::error::Result<DBusMenuProxy<'_>> {
577 let proxy = DBusMenuProxy::builder(&self.connection)
578 .destination(address)?
579 .path(menu_path)?
580 .build()
581 .await?;
582 Ok(proxy)
583 }
584
585 #[must_use]
590 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
591 self.tx.subscribe()
592 }
593
594 #[must_use]
596 pub fn items(&self) -> Arc<Mutex<State>> {
597 self.items.clone()
598 }
599
600 pub async fn about_to_show_menuitem(
605 &self,
606 address: String,
607 menu_path: String,
608 id: i32,
609 ) -> crate::error::Result<bool> {
610 let proxy = self.get_menu_proxy(address, menu_path).await?;
611 Ok(proxy.about_to_show(id).await?)
612 }
613
614 pub async fn activate(&self, req: ActivateRequest) -> crate::error::Result<()> {
625 macro_rules! timeout_event {
626 ($event:expr) => {
627 if timeout(Duration::from_secs(1), $event).await.is_err() {
628 error!("Timed out sending activate event");
629 }
630 };
631 }
632 match req {
633 ActivateRequest::MenuItem {
634 address,
635 menu_path,
636 submenu_id,
637 } => {
638 let proxy = self.get_menu_proxy(address, menu_path).await?;
639 let timestamp = SystemTime::now()
640 .duration_since(UNIX_EPOCH)
641 .expect("time should flow forwards");
642
643 let event = proxy.event(
644 submenu_id,
645 "clicked",
646 &Value::I32(0),
647 timestamp.as_secs() as u32,
648 );
649
650 timeout_event!(event);
651 }
652 ActivateRequest::Default { address, x, y } => {
653 let proxy = self.get_notifier_item_proxy(address).await?;
654 let event = proxy.activate(x, y);
655
656 timeout_event!(event);
657 }
658 ActivateRequest::Secondary { address, x, y } => {
659 let proxy = self.get_notifier_item_proxy(address).await?;
660 let event = proxy.secondary_activate(x, y);
661
662 timeout_event!(event);
663 }
664 }
665
666 Ok(())
667 }
668}
669
670fn parse_address(address: &str) -> (&str, String) {
671 address
672 .split_once('/')
673 .map_or((address, String::from("/StatusNotifierItem")), |(d, p)| {
674 (d, format!("/{p}"))
675 })
676}
677
678#[cfg(test)]
679mod tests {
680 use super::*;
681
682 #[test]
683 fn parse_unnamed() {
684 let address = ":1.58/StatusNotifierItem";
685 let (destination, path) = parse_address(address);
686
687 assert_eq!(":1.58", destination);
688 assert_eq!("/StatusNotifierItem", path);
689 }
690
691 #[test]
692 fn parse_named() {
693 let address = ":1.72/org/ayatana/NotificationItem/dropbox_client_1398";
694 let (destination, path) = parse_address(address);
695
696 assert_eq!(":1.72", destination);
697 assert_eq!("/org/ayatana/NotificationItem/dropbox_client_1398", path);
698 }
699}