system_tray/
client.rs

1use crate::dbus::dbus_menu_proxy::{DBusMenuProxy, PropertiesUpdate};
2use crate::dbus::notifier_item_proxy::StatusNotifierItemProxy;
3use crate::dbus::notifier_watcher_proxy::StatusNotifierWatcherProxy;
4use crate::dbus::status_notifier_watcher::StatusNotifierWatcher;
5use crate::dbus::{self, OwnedValueExt};
6use crate::error::{Error, Result};
7use crate::item::{self, Status, StatusNotifierItem, Tooltip};
8use crate::menu::{MenuDiff, TrayMenu};
9use crate::names;
10use dbus::DBusProps;
11use futures_lite::StreamExt;
12use std::collections::HashMap;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, SystemTime, UNIX_EPOCH};
15use tokio::spawn;
16use tokio::sync::broadcast;
17use tokio::time::timeout;
18use tracing::{debug, error, trace, warn};
19use zbus::fdo::{DBusProxy, PropertiesProxy};
20use zbus::names::InterfaceName;
21use zbus::zvariant::{Structure, Value};
22use zbus::{Connection, Message};
23
24use self::names::ITEM_OBJECT;
25
26/// An event emitted by the client
27/// representing a change from either the `StatusNotifierItem`
28/// or `DBusMenu` protocols.
29#[derive(Debug, Clone)]
30pub enum Event {
31    /// A new `StatusNotifierItem` was added.
32    Add(String, Box<StatusNotifierItem>),
33    /// An update was received for an existing `StatusNotifierItem`.
34    /// This could be either an update to the item itself,
35    /// or an update to the associated menu.
36    Update(String, UpdateEvent),
37    /// A `StatusNotifierItem` was unregistered.
38    Remove(String),
39}
40
41/// The specific change associated with an update event.
42#[derive(Debug, Clone)]
43pub enum UpdateEvent {
44    AttentionIcon(Option<String>),
45    Icon(Option<String>),
46    OverlayIcon(Option<String>),
47    Status(Status),
48    Title(Option<String>),
49    Tooltip(Option<Tooltip>),
50    /// A menu layout has changed.
51    /// The entire layout is sent.
52    Menu(TrayMenu),
53    /// One or more menu properties have changed.
54    /// Only the updated properties are sent.
55    MenuDiff(Vec<MenuDiff>),
56    /// A new menu has connected to the item.
57    /// Its name on bus is sent.
58    MenuConnect(String),
59}
60
61/// A request to 'activate' one of the menu items,
62/// typically sent when it is clicked.
63#[derive(Debug, Clone)]
64pub enum ActivateRequest {
65    /// Submenu ID
66    MenuItem {
67        address: String,
68        menu_path: String,
69        submenu_id: i32,
70    },
71    /// Default activation for the tray.
72    /// The parameter(x and y) represents screen coordinates and is to be considered an hint to the item where to show eventual windows (if any).
73    Default { address: String, x: i32, y: i32 },
74    /// Secondary activation(less important) for the tray.
75    /// The parameter(x and y) represents screen coordinates and is to be considered an hint to the item where to show eventual windows (if any).
76    Secondary { address: String, x: i32, y: i32 },
77}
78
79type State = HashMap<String, (StatusNotifierItem, Option<TrayMenu>)>;
80
81const PROPERTIES_INTERFACE: &str = "org.kde.StatusNotifierItem";
82
83/// Client for watching the tray.
84#[derive(Debug)]
85pub struct Client {
86    tx: broadcast::Sender<Event>,
87    _rx: broadcast::Receiver<Event>,
88    connection: Connection,
89
90    items: Arc<Mutex<State>>,
91}
92
93impl Client {
94    /// Creates and initializes the client.
95    ///
96    /// The client will begin listening to items and menus and sending events immediately.
97    /// It is recommended that consumers immediately follow the call to `new` with a `subscribe` call,
98    /// then immediately follow that with a call to `items` to get the state to not miss any events.
99    ///
100    /// The value of `service_name` must be unique on the session bus.
101    /// It is recommended to use something similar to the format of `appid-numid`,
102    /// where `numid` is a short-ish random integer.
103    ///
104    /// # Errors
105    ///
106    /// If the initialization fails for any reason,
107    /// for example if unable to connect to the bus,
108    /// this method will return an error.
109    ///
110    /// # Panics
111    ///
112    /// If the generated well-known name is invalid, the library will panic
113    /// as this indicates a major bug.
114    ///
115    /// Likewise, the spawned tasks may panic if they cannot get a `Mutex` lock.
116    pub async fn new() -> Result<Self> {
117        let connection = Connection::session().await?;
118        let (tx, rx) = broadcast::channel(32);
119
120        // first start server...
121        StatusNotifierWatcher::new().attach_to(&connection).await?;
122
123        // ...then connect to it
124        let watcher_proxy = StatusNotifierWatcherProxy::new(&connection).await?;
125
126        // register a host on the watcher to declare we want to watch items
127        // get a well-known name
128        let pid = std::process::id();
129        let mut i = 0;
130        let wellknown = loop {
131            use zbus::fdo::RequestNameReply::*;
132
133            i += 1;
134            let wellknown = format!("org.freedesktop.StatusNotifierHost-{pid}-{i}");
135            let wellknown: zbus::names::WellKnownName = wellknown
136                .try_into()
137                .expect("generated well-known name is invalid");
138
139            let flags = [zbus::fdo::RequestNameFlags::DoNotQueue];
140            match connection
141                .request_name_with_flags(&wellknown, flags.into_iter().collect())
142                .await?
143            {
144                PrimaryOwner => break wellknown,
145                Exists | AlreadyOwner => {}
146                InQueue => unreachable!(
147                    "request_name_with_flags returned InQueue even though we specified DoNotQueue"
148                ),
149            };
150        };
151
152        debug!("wellknown: {wellknown}");
153        watcher_proxy
154            .register_status_notifier_host(&wellknown)
155            .await?;
156
157        let items = Arc::new(Mutex::new(HashMap::new()));
158
159        // handle new items
160        {
161            let connection = connection.clone();
162            let tx = tx.clone();
163            let items = items.clone();
164
165            let mut stream = watcher_proxy
166                .receive_status_notifier_item_registered()
167                .await?;
168
169            spawn(async move {
170                while let Some(item) = stream.next().await {
171                    let address = item.args().map(|args| args.service);
172
173                    if let Ok(address) = address {
174                        debug!("received new item: {address}");
175                        if let Err(err) = Self::handle_item(
176                            address,
177                            connection.clone(),
178                            tx.clone(),
179                            items.clone(),
180                        )
181                        .await
182                        {
183                            error!("{err}");
184                            break;
185                        }
186                    }
187                }
188
189                Ok::<(), Error>(())
190            });
191        }
192
193        // then lastly get all items
194        // it can take so long to fetch all items that we have to do this last,
195        // otherwise some incoming items get missed
196        {
197            let connection = connection.clone();
198            let tx = tx.clone();
199            let items = items.clone();
200
201            spawn(async move {
202                let initial_items = watcher_proxy.registered_status_notifier_items().await?;
203                debug!("initial items: {initial_items:?}");
204
205                for item in initial_items {
206                    if let Err(err) =
207                        Self::handle_item(&item, connection.clone(), tx.clone(), items.clone())
208                            .await
209                    {
210                        error!("{err}");
211                    }
212                }
213
214                Ok::<(), Error>(())
215            });
216        }
217
218        // Handle other watchers unregistering and this one taking over
219        // It is necessary to clear all items as our watcher will then re-send them all
220        {
221            let tx = tx.clone();
222            let items = items.clone();
223
224            let dbus_proxy = DBusProxy::new(&connection).await?;
225
226            let mut stream = dbus_proxy.receive_name_acquired().await?;
227
228            spawn(async move {
229                while let Some(thing) = stream.next().await {
230                    let body = thing.args()?;
231                    if body.name == names::WATCHER_BUS {
232                        let mut items = items.lock().expect("mutex lock should succeed");
233                        let keys = items.keys().cloned().collect::<Vec<_>>();
234                        for address in keys {
235                            items.remove(&address);
236                            tx.send(Event::Remove(address))?;
237                        }
238                    }
239                }
240
241                Ok::<(), Error>(())
242            });
243        }
244
245        debug!("tray client initialized");
246
247        Ok(Self {
248            connection,
249            tx,
250            _rx: rx,
251            items,
252        })
253    }
254
255    /// Processes an incoming item to send the initial add event,
256    /// then set up listeners for it and its menu.
257    async fn handle_item(
258        address: &str,
259        connection: Connection,
260        tx: broadcast::Sender<Event>,
261        items: Arc<Mutex<State>>,
262    ) -> crate::error::Result<()> {
263        let (destination, path) = parse_address(address);
264
265        let properties_proxy = PropertiesProxy::builder(&connection)
266            .destination(destination.to_string())?
267            .path(path.clone())?
268            .build()
269            .await?;
270
271        let properties = Self::get_item_properties(destination, &path, &properties_proxy).await?;
272
273        items
274            .lock()
275            .expect("mutex lock should succeed")
276            .insert(destination.into(), (properties.clone(), None));
277
278        tx.send(Event::Add(
279            destination.to_string(),
280            properties.clone().into(),
281        ))?;
282
283        {
284            let connection = connection.clone();
285            let destination = destination.to_string();
286            let items = items.clone();
287            let tx = tx.clone();
288
289            spawn(async move {
290                Self::watch_item_properties(
291                    &destination,
292                    &path,
293                    &connection,
294                    properties_proxy,
295                    items,
296                    tx,
297                )
298                .await?;
299
300                debug!("Stopped watching {destination}{path}");
301                Ok::<(), Error>(())
302            });
303        }
304
305        if let Some(menu) = properties.menu {
306            let destination = destination.to_string();
307
308            tx.send(Event::Update(
309                destination.clone(),
310                UpdateEvent::MenuConnect(menu.clone()),
311            ))?;
312
313            spawn(async move {
314                Self::watch_menu(destination, &menu, &connection, tx, items).await?;
315                Ok::<(), Error>(())
316            });
317        }
318
319        Ok(())
320    }
321
322    /// Gets the properties for an SNI item.
323    async fn get_item_properties(
324        destination: &str,
325        path: &str,
326        properties_proxy: &PropertiesProxy<'_>,
327    ) -> crate::error::Result<StatusNotifierItem> {
328        let properties = properties_proxy
329            .get_all(
330                InterfaceName::from_static_str(PROPERTIES_INTERFACE)
331                    .expect("to be valid interface name"),
332            )
333            .await;
334
335        let properties = match properties {
336            Ok(properties) => properties,
337            Err(err) => {
338                error!("Error fetching properties from {destination}{path}: {err:?}");
339                return Err(err.into());
340            }
341        };
342
343        StatusNotifierItem::try_from(DBusProps(properties))
344    }
345
346    /// Watches an SNI item's properties,
347    /// sending an update event whenever they change.
348    async fn watch_item_properties(
349        destination: &str,
350        path: &str,
351        connection: &Connection,
352        properties_proxy: PropertiesProxy<'_>,
353        items: Arc<Mutex<State>>,
354        tx: broadcast::Sender<Event>,
355    ) -> crate::error::Result<()> {
356        let notifier_item_proxy = StatusNotifierItemProxy::builder(connection)
357            .destination(destination)?
358            .path(path)?
359            .build()
360            .await?;
361
362        let dbus_proxy = DBusProxy::new(connection).await?;
363
364        let mut disconnect_stream = dbus_proxy.receive_name_owner_changed().await?;
365        let mut props_changed = notifier_item_proxy.inner().receive_all_signals().await?;
366
367        loop {
368            tokio::select! {
369                Some(change) = props_changed.next() => {
370                    match Self::get_update_event(change, &properties_proxy).await {
371                        Ok(Some(event)) => {
372                                debug!("[{destination}{path}] received property change: {event:?}");
373                                tx.send(Event::Update(destination.to_string(), event))?;
374                            }
375                        Err(e) => {
376                            error!("Error parsing update properties from {destination}{path}: {e:?}");
377                        }
378                        _ => {}
379                    }
380                }
381                Some(signal) = disconnect_stream.next() => {
382                    let args = signal.args()?;
383                    let old = args.old_owner();
384                    let new = args.new_owner();
385
386                    if let (Some(old), None) = (old.as_ref(), new.as_ref()) {
387                        if old == destination {
388                            debug!("[{destination}{path}] disconnected");
389
390                            let watcher_proxy = StatusNotifierWatcherProxy::new(connection)
391                                .await
392                                .expect("Failed to open StatusNotifierWatcherProxy");
393
394                            if let Err(error) = watcher_proxy.unregister_status_notifier_item(old).await {
395                                error!("{error:?}");
396                            }
397
398                            items.lock().expect("mutex lock should succeed").remove(&destination.to_string());
399
400                            tx.send(Event::Remove(destination.to_string()))?;
401                            break Ok(());
402                        }
403                    }
404                }
405            }
406        }
407    }
408
409    /// Gets the update event for a `DBus` properties change message.
410    async fn get_update_event(
411        change: Message,
412        properties_proxy: &PropertiesProxy<'_>,
413    ) -> Result<Option<UpdateEvent>> {
414        let header = change.header();
415        let member = header
416            .member()
417            .ok_or(Error::InvalidData("Update message header missing `member`"))?;
418
419        let property_name = match member.as_str() {
420            "NewAttentionIcon" => "AttentionIconName",
421            "NewIcon" => "IconName",
422            "NewOverlayIcon" => "OverlayIconName",
423            "NewStatus" => "Status",
424            "NewTitle" => "Title",
425            "NewToolTip" => "ToolTip",
426            _ => &member.as_str()["New".len()..],
427        };
428
429        let property = properties_proxy
430            .get(
431                InterfaceName::from_static_str(PROPERTIES_INTERFACE)
432                    .expect("to be valid interface name"),
433                property_name,
434            )
435            .await?;
436
437        debug!("received tray item update: {member} -> {property:?}");
438
439        use UpdateEvent::*;
440        Ok(match member.as_str() {
441            "NewAttentionIcon" => Some(AttentionIcon(property.to_string().ok())),
442            "NewIcon" => Some(Icon(property.to_string().ok())),
443            "NewOverlayIcon" => Some(OverlayIcon(property.to_string().ok())),
444            "NewStatus" => Some(Status(
445                property.downcast_ref::<&str>().map(item::Status::from)?,
446            )),
447            "NewTitle" => Some(Title(property.to_string().ok())),
448            "NewToolTip" => Some(Tooltip({
449                property
450                    .downcast_ref::<&Structure>()
451                    .ok()
452                    .map(crate::item::Tooltip::try_from)
453                    .transpose()?
454            })),
455            _ => {
456                warn!("received unhandled update event: {member}");
457                None
458            }
459        })
460    }
461
462    /// Watches the `DBusMenu` associated with an SNI item.
463    ///
464    /// This gets the initial menu, sending an update event immediately.
465    /// Update events are then sent for any further updates
466    /// until the item is removed.
467    async fn watch_menu(
468        destination: String,
469        menu_path: &str,
470        connection: &Connection,
471        tx: broadcast::Sender<Event>,
472        items: Arc<Mutex<State>>,
473    ) -> crate::error::Result<()> {
474        let dbus_menu_proxy = DBusMenuProxy::builder(connection)
475            .destination(destination.as_str())?
476            .path(menu_path)?
477            .build()
478            .await?;
479
480        let menu = dbus_menu_proxy.get_layout(0, 10, &[]).await?;
481        let menu = TrayMenu::try_from(menu)?;
482
483        if let Some((_, menu_cache)) = items
484            .lock()
485            .expect("mutex lock should succeed")
486            .get_mut(&destination)
487        {
488            menu_cache.replace(menu.clone());
489        } else {
490            error!("could not find item in state");
491        }
492
493        tx.send(Event::Update(
494            destination.to_string(),
495            UpdateEvent::Menu(menu),
496        ))?;
497
498        let mut layout_updated = dbus_menu_proxy.receive_layout_updated().await?;
499        let mut properties_updated = dbus_menu_proxy.receive_items_properties_updated().await?;
500
501        loop {
502            tokio::select!(
503                Some(_) = layout_updated.next() => {
504                    debug!("[{destination}{menu_path}] layout update");
505
506                    let get_layout = dbus_menu_proxy.get_layout(0, 10, &[]);
507
508                    let menu = match timeout(Duration::from_secs(1), get_layout).await {
509                        Ok(Ok(menu)) => {
510                            debug!("got new menu layout");
511                            menu
512                        }
513                        Ok(Err(err)) => {
514                            error!("error fetching layout: {err:?}");
515                            break;
516                        }
517                        Err(_) => {
518                            error!("Timeout getting layout");
519                            break;
520                        }
521                    };
522
523                    let menu = TrayMenu::try_from(menu)?;
524
525                    if let Some((_, menu_cache)) = items
526                        .lock()
527                        .expect("mutex lock should succeed")
528                        .get_mut(&destination)
529                    {
530                        menu_cache.replace(menu.clone());
531                    } else {
532                        error!("could not find item in state");
533                    }
534
535                    debug!("sending new menu for '{destination}'");
536                    trace!("new menu for '{destination}': {menu:?}");
537                    tx.send(Event::Update(
538                        destination.to_string(),
539                        UpdateEvent::Menu(menu),
540                    ))?;
541                }
542                Some(change) = properties_updated.next() => {
543                    let body = change.message().body();
544                    let update: PropertiesUpdate= body.deserialize::<PropertiesUpdate>()?;
545                    let diffs = Vec::try_from(update)?;
546
547                    tx.send(Event::Update(
548                        destination.to_string(),
549                        UpdateEvent::MenuDiff(diffs),
550                    ))?;
551
552                    // FIXME: Menu cache gonna be out of sync
553                }
554            );
555        }
556
557        Ok(())
558    }
559
560    async fn get_notifier_item_proxy(
561        &self,
562        address: String,
563    ) -> crate::error::Result<StatusNotifierItemProxy<'_>> {
564        let proxy = StatusNotifierItemProxy::builder(&self.connection)
565            .destination(address)?
566            .path(ITEM_OBJECT)?
567            .build()
568            .await?;
569        Ok(proxy)
570    }
571
572    async fn get_menu_proxy(
573        &self,
574        address: String,
575        menu_path: String,
576    ) -> crate::error::Result<DBusMenuProxy<'_>> {
577        let proxy = DBusMenuProxy::builder(&self.connection)
578            .destination(address)?
579            .path(menu_path)?
580            .build()
581            .await?;
582        Ok(proxy)
583    }
584
585    /// Subscribes to the events broadcast channel,
586    /// returning a new receiver.
587    ///
588    /// Once the client is dropped, the receiver will close.
589    #[must_use]
590    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
591        self.tx.subscribe()
592    }
593
594    /// Gets all current items, including their menus if present.
595    #[must_use]
596    pub fn items(&self) -> Arc<Mutex<State>> {
597        self.items.clone()
598    }
599
600    /// One should call this method with id=0 when opening the root menu.
601    ///
602    /// ID refers to the menuitem id.
603    /// Returns `needsUpdate`
604    pub async fn about_to_show_menuitem(
605        &self,
606        address: String,
607        menu_path: String,
608        id: i32,
609    ) -> crate::error::Result<bool> {
610        let proxy = self.get_menu_proxy(address, menu_path).await?;
611        Ok(proxy.about_to_show(id).await?)
612    }
613
614    /// Sends an activate request for a menu item.
615    ///
616    /// # Errors
617    ///
618    /// The method will return an error if the connection to the `DBus` object fails,
619    /// or if sending the event fails for any reason.
620    ///
621    /// # Panics
622    ///
623    /// If the system time is somehow before the Unix epoch.
624    pub async fn activate(&self, req: ActivateRequest) -> crate::error::Result<()> {
625        macro_rules! timeout_event {
626            ($event:expr) => {
627                if timeout(Duration::from_secs(1), $event).await.is_err() {
628                    error!("Timed out sending activate event");
629                }
630            };
631        }
632        match req {
633            ActivateRequest::MenuItem {
634                address,
635                menu_path,
636                submenu_id,
637            } => {
638                let proxy = self.get_menu_proxy(address, menu_path).await?;
639                let timestamp = SystemTime::now()
640                    .duration_since(UNIX_EPOCH)
641                    .expect("time should flow forwards");
642
643                let event = proxy.event(
644                    submenu_id,
645                    "clicked",
646                    &Value::I32(0),
647                    timestamp.as_secs() as u32,
648                );
649
650                timeout_event!(event);
651            }
652            ActivateRequest::Default { address, x, y } => {
653                let proxy = self.get_notifier_item_proxy(address).await?;
654                let event = proxy.activate(x, y);
655
656                timeout_event!(event);
657            }
658            ActivateRequest::Secondary { address, x, y } => {
659                let proxy = self.get_notifier_item_proxy(address).await?;
660                let event = proxy.secondary_activate(x, y);
661
662                timeout_event!(event);
663            }
664        }
665
666        Ok(())
667    }
668}
669
670fn parse_address(address: &str) -> (&str, String) {
671    address
672        .split_once('/')
673        .map_or((address, String::from("/StatusNotifierItem")), |(d, p)| {
674            (d, format!("/{p}"))
675        })
676}
677
678#[cfg(test)]
679mod tests {
680    use super::*;
681
682    #[test]
683    fn parse_unnamed() {
684        let address = ":1.58/StatusNotifierItem";
685        let (destination, path) = parse_address(address);
686
687        assert_eq!(":1.58", destination);
688        assert_eq!("/StatusNotifierItem", path);
689    }
690
691    #[test]
692    fn parse_named() {
693        let address = ":1.72/org/ayatana/NotificationItem/dropbox_client_1398";
694        let (destination, path) = parse_address(address);
695
696        assert_eq!(":1.72", destination);
697        assert_eq!("/org/ayatana/NotificationItem/dropbox_client_1398", path);
698    }
699}