1#[cfg(feature = "data")]
2use crate::data::apply_menu_diffs;
3use crate::data::TrayItemMap;
4use crate::dbus::dbus_menu_proxy::{DBusMenuProxy, PropertiesUpdate};
5use crate::dbus::notifier_item_proxy::StatusNotifierItemProxy;
6use crate::dbus::notifier_watcher_proxy::StatusNotifierWatcherProxy;
7use crate::dbus::status_notifier_watcher::StatusNotifierWatcher;
8use crate::dbus::{self, OwnedValueExt};
9use crate::error::{Error, Result};
10use crate::item::{self, IconPixmap, Status, StatusNotifierItem, Tooltip};
11use crate::menu::{MenuDiff, TrayMenu};
12use crate::names;
13use dbus::DBusProps;
14use futures_lite::StreamExt;
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17use tokio::spawn;
18use tokio::sync::{broadcast, mpsc};
19use tokio::time::{sleep, timeout, Instant};
20use tracing::{debug, error, trace, warn};
21use zbus::fdo::{DBusProxy, PropertiesProxy};
22use zbus::names::InterfaceName;
23use zbus::zvariant::{Array, Structure, Value};
24use zbus::{Connection, Message};
25
26use self::names::ITEM_OBJECT;
27
28#[derive(Debug, Clone)]
32pub enum Event {
33 Add(String, Box<StatusNotifierItem>),
35 Update(String, UpdateEvent),
39 Remove(String),
41}
42
43#[derive(Debug, Clone)]
45pub enum UpdateEvent {
46 AttentionIcon(Option<String>),
47 Icon {
48 icon_name: Option<String>,
49 icon_pixmap: Option<Vec<IconPixmap>>,
50 },
51 OverlayIcon(Option<String>),
52 Status(Status),
53 Title(Option<String>),
54 Tooltip(Option<Tooltip>),
55 Menu(TrayMenu),
58 MenuDiff(Vec<MenuDiff>),
61 MenuConnect(String),
64}
65
66#[derive(Debug, Clone)]
69pub enum ActivateRequest {
70 MenuItem {
72 address: String,
73 menu_path: String,
74 submenu_id: i32,
75 },
76 Default { address: String, x: i32, y: i32 },
79 Secondary { address: String, x: i32, y: i32 },
82}
83
84const PROPERTIES_INTERFACE: &str = "org.kde.StatusNotifierItem";
85
86#[derive(Debug)]
88pub struct Client {
89 tx: broadcast::Sender<Event>,
90 _rx: broadcast::Receiver<Event>,
91 connection: Connection,
92
93 #[cfg(feature = "data")]
94 items: TrayItemMap,
95}
96
97impl Client {
98 pub async fn new() -> Result<Self> {
121 let connection = Connection::session().await?;
122 let (tx, rx) = broadcast::channel(32);
123
124 StatusNotifierWatcher::new().attach_to(&connection).await?;
126
127 let watcher_proxy = StatusNotifierWatcherProxy::new(&connection).await?;
129
130 let pid = std::process::id();
133 let mut i = 0;
134 let wellknown = loop {
135 use zbus::fdo::RequestNameReply::{AlreadyOwner, Exists, InQueue, PrimaryOwner};
136
137 i += 1;
138 let wellknown = format!("org.freedesktop.StatusNotifierHost-{pid}-{i}");
139 let wellknown: zbus::names::WellKnownName = wellknown
140 .try_into()
141 .expect("generated well-known name is invalid");
142
143 let flags = [zbus::fdo::RequestNameFlags::DoNotQueue];
144 match connection
145 .request_name_with_flags(&wellknown, flags.into_iter().collect())
146 .await?
147 {
148 PrimaryOwner => break wellknown,
149 Exists | AlreadyOwner => {}
150 InQueue => unreachable!(
151 "request_name_with_flags returned InQueue even though we specified DoNotQueue"
152 ),
153 };
154 };
155
156 debug!("wellknown: {wellknown}");
157 watcher_proxy
158 .register_status_notifier_host(&wellknown)
159 .await?;
160 let items = TrayItemMap::new();
161
162 {
164 let connection = connection.clone();
165 let tx = tx.clone();
166 let items = items.clone();
167
168 let mut stream = watcher_proxy
169 .receive_status_notifier_item_registered()
170 .await?;
171
172 spawn(async move {
173 while let Some(item) = stream.next().await {
174 let address = item.args().map(|args| args.service);
175
176 if let Ok(address) = address {
177 debug!("received new item: {address}");
178 if let Err(err) = Self::handle_item(
179 address,
180 connection.clone(),
181 tx.clone(),
182 items.clone(),
183 )
184 .await
185 {
186 error!("{err}");
187 break;
188 }
189 }
190 }
191
192 Ok::<(), Error>(())
193 });
194 }
195
196 {
200 let connection = connection.clone();
201 let tx = tx.clone();
202 let items = items.clone();
203
204 spawn(async move {
205 let initial_items = watcher_proxy.registered_status_notifier_items().await?;
206 debug!("initial items: {initial_items:?}");
207
208 for item in initial_items {
209 if let Err(err) =
210 Self::handle_item(&item, connection.clone(), tx.clone(), items.clone())
211 .await
212 {
213 error!("{err}");
214 }
215 }
216
217 Ok::<(), Error>(())
218 });
219 }
220
221 {
224 let tx = tx.clone();
225 let items = items.clone();
226
227 let dbus_proxy = DBusProxy::new(&connection).await?;
228
229 let mut stream = dbus_proxy.receive_name_acquired().await?;
230
231 spawn(async move {
232 while let Some(thing) = stream.next().await {
233 let body = thing.args()?;
234 if body.name == names::WATCHER_BUS {
235 for dest in items.clear_items() {
236 tx.send(Event::Remove(dest))?;
237 }
238 }
239 }
240
241 Ok::<(), Error>(())
242 });
243 }
244
245 debug!("tray client initialized");
246
247 Ok(Self {
248 connection,
249 tx,
250 _rx: rx,
251 #[cfg(feature = "data")]
252 items,
253 })
254 }
255
256 async fn handle_item(
259 address: &str,
260 connection: Connection,
261 tx: broadcast::Sender<Event>,
262 items: TrayItemMap,
263 ) -> Result<()> {
264 let (destination, path) = parse_address(address);
265
266 let properties_proxy = PropertiesProxy::builder(&connection)
267 .destination(destination.to_string())?
268 .path(path.clone())?
269 .build()
270 .await?;
271
272 let properties = Self::get_item_properties(destination, &path, &properties_proxy).await?;
273
274 items.new_item(destination.into(), &properties);
275
276 tx.send(Event::Add(
277 destination.to_string(),
278 properties.clone().into(),
279 ))?;
280
281 {
282 let connection = connection.clone();
283 let destination = destination.to_string();
284 let items = items.clone();
285 let tx = tx.clone();
286
287 spawn(async move {
288 Self::watch_item_properties(
289 &destination,
290 &path,
291 &connection,
292 properties_proxy,
293 tx,
294 items,
295 )
296 .await?;
297
298 debug!("Stopped watching {destination}{path}");
299 Ok::<(), Error>(())
300 });
301 }
302
303 if let Some(menu) = properties.menu {
304 let destination = destination.to_string();
305
306 tx.send(Event::Update(
307 destination.clone(),
308 UpdateEvent::MenuConnect(menu.clone()),
309 ))?;
310
311 spawn(async move {
312 Self::watch_menu(destination, &menu, &connection, tx, items).await?;
313 Ok::<(), Error>(())
314 });
315 }
316
317 Ok(())
318 }
319
320 async fn get_item_properties(
322 destination: &str,
323 path: &str,
324 properties_proxy: &PropertiesProxy<'_>,
325 ) -> Result<StatusNotifierItem> {
326 let properties = properties_proxy
327 .get_all(
328 InterfaceName::from_static_str(PROPERTIES_INTERFACE)
329 .expect("to be valid interface name"),
330 )
331 .await;
332
333 let properties = match properties {
334 Ok(properties) => properties,
335 Err(err) => {
336 error!("Error fetching properties from {destination}{path}: {err:?}");
337 return Err(err.into());
338 }
339 };
340
341 StatusNotifierItem::try_from(DBusProps(properties))
342 }
343
344 async fn watch_item_properties(
347 destination: &str,
348 path: &str,
349 connection: &Connection,
350 properties_proxy: PropertiesProxy<'_>,
351 tx: broadcast::Sender<Event>,
352 items: TrayItemMap,
353 ) -> Result<()> {
354 let notifier_item_proxy = StatusNotifierItemProxy::builder(connection)
355 .destination(destination)?
356 .path(path)?
357 .build()
358 .await?;
359
360 let dbus_proxy = DBusProxy::new(connection).await?;
361
362 let mut disconnect_stream = dbus_proxy.receive_name_owner_changed().await?;
363 let mut props_changed = notifier_item_proxy.inner().receive_all_signals().await?;
364
365 loop {
366 tokio::select! {
367 Some(change) = props_changed.next() => {
368 match Self::get_update_event(change, &properties_proxy).await {
369 Ok(Some(event)) => {
370 cfg_if::cfg_if! {
371 if #[cfg(feature = "data")] {
372 items.apply_update_event(destination, &event);
373 }
374 }
375 debug!("[{destination}{path}] received property change: {event:?}");
376 tx.send(Event::Update(destination.to_string(), event))?;
377 }
378 Err(e) => {
379 error!("Error parsing update properties from {destination}{path}: {e:?}");
380 }
381 _ => {}
382 }
383 }
384 Some(signal) = disconnect_stream.next() => {
385 let args = signal.args()?;
386 let old = args.old_owner();
387 let new = args.new_owner();
388
389 if let (Some(old), None) = (old.as_ref(), new.as_ref()) {
390 if old == destination {
391 debug!("[{destination}{path}] disconnected");
392
393 let watcher_proxy = StatusNotifierWatcherProxy::new(connection)
394 .await
395 .expect("Failed to open StatusNotifierWatcherProxy");
396
397 if let Err(error) = watcher_proxy.unregister_status_notifier_item(old).await {
398 error!("{error:?}");
399 }
400
401
402 items.remove_item(destination);
403
404 tx.send(Event::Remove(destination.to_string()))?;
405 break Ok(());
406 }
407 }
408 }
409 }
410 }
411 }
412
413 async fn get_update_event(
415 change: Message,
416 properties_proxy: &PropertiesProxy<'_>,
417 ) -> Result<Option<UpdateEvent>> {
418 use UpdateEvent::{AttentionIcon, Icon, OverlayIcon, Status, Title, Tooltip};
419
420 let header = change.header();
421 let member = header
422 .member()
423 .ok_or(Error::InvalidData("Update message header missing `member`"))?;
424
425 macro_rules! get_property {
426 ($name:expr) => {
427 match properties_proxy
428 .get(
429 InterfaceName::from_static_str(PROPERTIES_INTERFACE)
430 .expect("to be valid interface name"),
431 $name,
432 )
433 .await
434 {
435 Ok(v) => Ok(Some(v)),
436 Err(e) => match e {
437 zbus::fdo::Error::InvalidArgs(_) => {
439 warn!("{e}");
440 Ok(None)
441 }
442 _ => Err(Into::<Error>::into(e)),
443 },
444 }
445 };
446 }
447
448 let property = match member.as_str() {
449 "NewAttentionIcon" => Some(AttentionIcon(
450 get_property!("AttentionIconName")?
451 .as_ref()
452 .map(OwnedValueExt::to_string)
453 .transpose()?,
454 )),
455 "NewIcon" => {
456 let icon_name = match get_property!("IconName") {
457 Ok(name) => name,
458 Err(e) => {
459 warn!("Error getting IconName: {e:?}");
460 None
461 }
462 }
463 .as_ref()
464 .map(OwnedValueExt::to_string)
465 .transpose()
466 .ok()
467 .flatten();
468
469 let icon_pixmap = match get_property!("IconPixmap") {
470 Ok(pixmap) => pixmap,
471 Err(e) => {
472 warn!("Error getting IconPixmap: {e:?}");
473 None
474 }
475 }
476 .as_deref()
477 .map(Value::downcast_ref::<&Array>)
478 .transpose()?
479 .map(IconPixmap::from_array)
480 .transpose()?;
481
482 Some(Icon {
483 icon_name,
484 icon_pixmap,
485 })
486 }
487 "NewOverlayIcon" => Some(OverlayIcon(
488 get_property!("OverlayIconName")?
489 .as_ref()
490 .map(OwnedValueExt::to_string)
491 .transpose()?,
492 )),
493 "NewStatus" => Some(Status(
494 get_property!("Status")?
495 .as_deref()
496 .map(Value::downcast_ref::<&str>)
497 .transpose()?
498 .map(item::Status::from)
499 .unwrap_or_default(), )),
501 "NewTitle" => Some(Title(
502 get_property!("Title")?
503 .as_ref()
504 .map(OwnedValueExt::to_string)
505 .transpose()?,
506 )),
507 "NewToolTip" => Some(Tooltip(
508 get_property!("ToolTip")?
509 .as_deref()
510 .map(Value::downcast_ref::<&Structure>)
511 .transpose()?
512 .map(crate::item::Tooltip::try_from)
513 .transpose()?,
514 )),
515 _ => {
516 warn!("received unhandled update event: {member}");
517 None
518 }
519 };
520
521 debug!("received tray item update: {member} -> {property:?}");
522
523 Ok(property)
524 }
525
526 async fn watch_menu(
532 destination: String,
533 menu_path: &str,
534 connection: &Connection,
535 tx: broadcast::Sender<Event>,
536 items: TrayItemMap,
537 ) -> Result<()> {
538 const LAYOUT_UPDATE_INTERVAL_MS: Duration = Duration::from_millis(50);
539
540 let dbus_menu_proxy = DBusMenuProxy::builder(connection)
541 .destination(destination.as_str())?
542 .path(menu_path)?
543 .build()
544 .await?;
545
546 debug!("[{destination}{menu_path}] getting initial menu");
547 let menu = dbus_menu_proxy.get_layout(0, -1, &[]).await?;
548 let menu = TrayMenu::try_from(menu)?;
549
550 items.update_menu(&destination, &menu);
551
552 tx.send(Event::Update(
553 destination.to_string(),
554 UpdateEvent::Menu(menu),
555 ))?;
556
557 let mut layout_updated = dbus_menu_proxy.receive_layout_updated().await?;
558 let mut properties_updated = dbus_menu_proxy.receive_items_properties_updated().await?;
559
560 let last_layout_update = Arc::new(Mutex::new(Instant::now()));
561 let (layout_tx, mut layout_rx) = mpsc::channel(4);
562
563 loop {
564 tokio::select!(
565 Some(ev) = layout_updated.next() => {
566 trace!("received layout update");
567
568 let now = Instant::now();
569 *last_layout_update.lock().expect("should get lock") = now;
570
571 let args = ev.args()?;
572
573 let last_layout_update = last_layout_update.clone();
574 let layout_tx = layout_tx.clone();
575 spawn(async move {
576 sleep(LAYOUT_UPDATE_INTERVAL_MS).await;
577 if *last_layout_update.lock().expect("should get lock") == now {
578 trace!("dispatching layout update");
579 layout_tx.send(args.parent).await.expect("should send");
580 }
581 });
582 }
583 Some(layout_parent) = layout_rx.recv() => {
584 debug!("[{destination}{menu_path}] layout update");
585
586 let get_layout = dbus_menu_proxy.get_layout(layout_parent, -1, &[]);
587
588 let menu = match timeout(Duration::from_secs(1), get_layout).await {
589 Ok(Ok(menu)) => {
590 debug!("got new menu layout");
591 menu
592 }
593 Ok(Err(err)) => {
594 error!("error fetching layout: {err:?}");
595 break;
596 }
597 Err(_) => {
598 error!("Timeout getting layout");
599 break;
600 }
601 };
602
603 let menu = TrayMenu::try_from(menu)?;
604
605 items.update_menu(&destination, &menu);
606
607 debug!("sending new menu for '{destination}'");
608 trace!("new menu for '{destination}': {menu:?}");
609 tx.send(Event::Update(
610 destination.to_string(),
611 UpdateEvent::Menu(menu),
612 ))?;
613 }
614 Some(change) = properties_updated.next() => {
615 let body = change.message().body();
616 let update: PropertiesUpdate= body.deserialize::<PropertiesUpdate>()?;
617 let diffs = Vec::try_from(update)?;
618
619 #[cfg(feature = "data")]
620 if let Some((_, Some(menu))) = items
621 .get_map()
622 .lock()
623 .expect("mutex lock should succeed")
624 .get_mut(&destination)
625 {
626 apply_menu_diffs(menu, &diffs);
627 } else {
628 error!("could not find item in state");
629 }
630
631 tx.send(Event::Update(
632 destination.to_string(),
633 UpdateEvent::MenuDiff(diffs),
634 ))?;
635
636 }
638 );
639 }
640
641 Ok(())
642 }
643
644 async fn get_notifier_item_proxy(
645 &self,
646 address: String,
647 ) -> Result<StatusNotifierItemProxy<'_>> {
648 let proxy = StatusNotifierItemProxy::builder(&self.connection)
649 .destination(address)?
650 .path(ITEM_OBJECT)?
651 .build()
652 .await?;
653 Ok(proxy)
654 }
655
656 async fn get_menu_proxy(
657 &self,
658 address: String,
659 menu_path: String,
660 ) -> Result<DBusMenuProxy<'_>> {
661 let proxy = DBusMenuProxy::builder(&self.connection)
662 .destination(address)?
663 .path(menu_path)?
664 .build()
665 .await?;
666
667 Ok(proxy)
668 }
669
670 #[must_use]
675 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
676 self.tx.subscribe()
677 }
678
679 #[cfg(feature = "data")]
681 #[must_use]
682 pub fn items(&self) -> std::sync::Arc<std::sync::Mutex<crate::data::BaseMap>> {
683 self.items.get_map()
684 }
685
686 pub async fn about_to_show_menuitem(
695 &self,
696 address: String,
697 menu_path: String,
698 id: i32,
699 ) -> Result<bool> {
700 let proxy = self.get_menu_proxy(address, menu_path).await?;
701 Ok(proxy.about_to_show(id).await?)
702 }
703
704 pub async fn activate(&self, req: ActivateRequest) -> Result<()> {
715 macro_rules! timeout_event {
716 ($event:expr) => {
717 if timeout(Duration::from_secs(1), $event).await.is_err() {
718 error!("Timed out sending activate event");
719 }
720 };
721 }
722 match req {
723 ActivateRequest::MenuItem {
724 address,
725 menu_path,
726 submenu_id,
727 } => {
728 let proxy = self.get_menu_proxy(address, menu_path).await?;
729 let timestamp = SystemTime::now()
730 .duration_since(UNIX_EPOCH)
731 .expect("time should flow forwards");
732
733 let event = proxy.event(
734 submenu_id,
735 "clicked",
736 &Value::I32(0),
737 timestamp.as_secs() as u32,
738 );
739
740 timeout_event!(event);
741 }
742 ActivateRequest::Default { address, x, y } => {
743 let proxy = self.get_notifier_item_proxy(address).await?;
744 let event = proxy.activate(x, y);
745
746 timeout_event!(event);
747 }
748 ActivateRequest::Secondary { address, x, y } => {
749 let proxy = self.get_notifier_item_proxy(address).await?;
750 let event = proxy.secondary_activate(x, y);
751
752 timeout_event!(event);
753 }
754 }
755
756 Ok(())
757 }
758}
759
760fn parse_address(address: &str) -> (&str, String) {
761 address
762 .split_once('/')
763 .map_or((address, String::from("/StatusNotifierItem")), |(d, p)| {
764 (d, format!("/{p}"))
765 })
766}
767
768#[cfg(test)]
769mod tests {
770 use super::*;
771
772 #[test]
773 fn parse_unnamed() {
774 let address = ":1.58/StatusNotifierItem";
775 let (destination, path) = parse_address(address);
776
777 assert_eq!(":1.58", destination);
778 assert_eq!("/StatusNotifierItem", path);
779 }
780
781 #[test]
782 fn parse_named() {
783 let address = ":1.72/org/ayatana/NotificationItem/dropbox_client_1398";
784 let (destination, path) = parse_address(address);
785
786 assert_eq!(":1.72", destination);
787 assert_eq!("/org/ayatana/NotificationItem/dropbox_client_1398", path);
788 }
789}