stray/notifier_watcher/
mod.rs

1use crate::dbus::dbusmenu_proxy::DBusMenuProxy;
2use crate::dbus::notifier_item_proxy::StatusNotifierItemProxy;
3use crate::dbus::notifier_watcher_proxy::StatusNotifierWatcherProxy;
4use crate::error::Result;
5use crate::message::menu::TrayMenu;
6use crate::message::NotifierItemCommand;
7use crate::notifier_watcher::notifier_address::NotifierAddress;
8use crate::{
9    DbusNotifierWatcher, InterfaceName, MenuLayout, NotifierItemMessage, StatusNotifierItem,
10};
11use tokio::sync::{broadcast, mpsc};
12use tokio_stream::StreamExt;
13use zbus::fdo::PropertiesProxy;
14use zbus::{Connection, ConnectionBuilder};
15
16pub(crate) mod notifier_address;
17
18/// Wrap the implementation of [org.freedesktop.StatusNotifierWatcher](https://www.freedesktop.org/wiki/Specifications/StatusNotifierItem/StatusNotifierWatcher/)
19/// and [org.freedesktop.StatusNotifierHost](https://www.freedesktop.org/wiki/Specifications/StatusNotifierItem/StatusNotifierHost/).
20pub struct StatusNotifierWatcher {
21    pub(crate) tx: broadcast::Sender<NotifierItemMessage>,
22}
23
24impl StatusNotifierWatcher {
25    /// Creates a new system stray and register a [StatusNotifierWatcher](https://www.freedesktop.org/wiki/Specifications/StatusNotifierItem/StatusNotifierWatcher/) and [StatusNotifierHost](https://www.freedesktop.org/wiki/Specifications/StatusNotifierItem/StatusNotifierHost/) on dbus.
26    /// Once created you can receive [`StatusNotifierItem`]. Once created you can start to poll message
27    /// using the [`Stream`] implementation.
28    pub async fn new(cmd_rx: mpsc::Receiver<NotifierItemCommand>) -> Result<StatusNotifierWatcher> {
29        let (tx, _) = broadcast::channel(5);
30
31        {
32            let tx = tx.clone();
33            tokio::spawn(async move {
34                tracing::info!("Starting notifier watcher");
35                start_notifier_watcher(tx)
36                    .await
37                    .expect("Unexpected StatusNotifierError ");
38            });
39        }
40
41        {
42            tokio::spawn(async move {
43                dispatch_ui_command(cmd_rx)
44                    .await
45                    .expect("Unexpected error while dispatching UI command");
46            });
47        }
48
49        Ok(StatusNotifierWatcher { tx })
50    }
51}
52
53// Forward UI command to the Dbus menu proxy
54async fn dispatch_ui_command(mut cmd_rx: mpsc::Receiver<NotifierItemCommand>) -> Result<()> {
55    let connection = Connection::session().await?;
56
57    while let Some(command) = cmd_rx.recv().await {
58        match command {
59            NotifierItemCommand::MenuItemClicked {
60                submenu_id: id,
61                menu_path,
62                notifier_address,
63            } => {
64                let dbus_menu_proxy = DBusMenuProxy::builder(&connection)
65                    .destination(notifier_address)
66                    .unwrap()
67                    .path(menu_path)
68                    .unwrap()
69                    .build()
70                    .await?;
71
72                dbus_menu_proxy
73                    .event(
74                        id,
75                        "clicked",
76                        &zbus::zvariant::Value::I32(32),
77                        chrono::offset::Local::now().timestamp_subsec_micros(),
78                    )
79                    .await?;
80            }
81        }
82    }
83
84    Ok(())
85}
86
87async fn start_notifier_watcher(sender: broadcast::Sender<NotifierItemMessage>) -> Result<()> {
88    let watcher = DbusNotifierWatcher::new(sender.clone());
89
90    ConnectionBuilder::session()?
91        .name("org.kde.StatusNotifierWatcher")?
92        .serve_at("/StatusNotifierWatcher", watcher)?
93        .build()
94        .await?;
95
96    let connection = Connection::session().await?;
97
98    let status_notifier_removed = {
99        let connection = connection.clone();
100        tokio::spawn(async move {
101            status_notifier_removed_handle(connection).await?;
102            Result::<()>::Ok(())
103        })
104    };
105
106    let status_notifier = {
107        let connection = connection.clone();
108        tokio::spawn(async move { status_notifier_handle(connection, sender).await.unwrap() })
109    };
110
111    let _ = tokio::join!(status_notifier, status_notifier_removed,);
112
113    Ok(())
114}
115
116// Listen for 'NameOwnerChanged' on DBus whenever a service is removed
117// send 'UnregisterStatusNotifierItem' request to 'StatusNotifierWatcher' via dbus
118async fn status_notifier_removed_handle(connection: Connection) -> Result<()> {
119    let dbus_proxy = zbus::fdo::DBusProxy::new(&connection).await.unwrap();
120
121    let mut changed = dbus_proxy
122        .receive_name_owner_changed()
123        .await
124        .expect("fail to receive Dbus NameOwnerChanged");
125
126    while let Some(signal) = changed.next().await {
127        let args = signal.args().expect("Failed to get signal args");
128        let old = args.old_owner();
129        let new = args.new_owner();
130
131        if old.is_some() && new.is_none() {
132            let old_owner: String = old.as_ref().unwrap().to_string();
133            let watcher_proxy = StatusNotifierWatcherProxy::new(&connection)
134                .await
135                .expect("Failed to open StatusNotifierWatcherProxy");
136
137            watcher_proxy
138                .unregister_status_notifier_item(&old_owner)
139                .await
140                .expect("failed to unregister status notifier");
141        }
142    }
143
144    Ok(())
145}
146
147// 1. Start StatusNotifierHost on DBus
148// 2. Query already registered StatusNotifier, call GetAll to update the UI  and  listen for property changes via Dbus.PropertiesChanged
149// 3. subscribe to StatusNotifierWatcher.RegisteredStatusNotifierItems
150// 4. Whenever a new notifier is registered repeat steps 2
151// FIXME : Move this to HOST
152async fn status_notifier_handle(
153    connection: Connection,
154    sender: broadcast::Sender<NotifierItemMessage>,
155) -> Result<()> {
156    let status_notifier_proxy = StatusNotifierWatcherProxy::new(&connection).await?;
157
158    let notifier_items: Vec<String> = status_notifier_proxy
159        .registered_status_notifier_items()
160        .await?;
161
162    tracing::info!("Got {} notifier items", notifier_items.len());
163
164    // Start watching for all registered notifier items
165    for service in notifier_items.iter() {
166        let service = NotifierAddress::from_notifier_service(service);
167        if let Ok(notifier_address) = service {
168            let connection = connection.clone();
169            let sender = sender.clone();
170            watch_notifier_props(notifier_address, connection, sender).await?;
171        }
172    }
173
174    // Listen for new notifier items
175    let mut new_notifier = status_notifier_proxy
176        .receive_status_notifier_item_registered()
177        .await?;
178
179    while let Some(notifier) = new_notifier.next().await {
180        let args = notifier.args()?;
181        let service: &str = args.service();
182        tracing::info!(
183            "StatusNotifierItemRegistered signal received service={}",
184            service
185        );
186
187        let service = NotifierAddress::from_notifier_service(service);
188        if let Ok(notifier_address) = service {
189            let connection = connection.clone();
190            let sender = sender.clone();
191            tokio::spawn(async move {
192                watch_notifier_props(notifier_address, connection, sender).await?;
193                Result::<()>::Ok(())
194            });
195        }
196    }
197
198    Ok(())
199}
200
201// Listen for PropertiesChanged on DBus and send an update request on change
202async fn watch_notifier_props(
203    address_parts: NotifierAddress,
204    connection: Connection,
205    sender: broadcast::Sender<NotifierItemMessage>,
206) -> Result<()> {
207    tokio::spawn(async move {
208        // Connect to DBus.Properties
209        let dbus_properties_proxy = zbus::fdo::PropertiesProxy::builder(&connection)
210            .destination(address_parts.destination.as_str())?
211            .path(address_parts.path.as_str())?
212            .build()
213            .await?;
214
215        // call Properties.GetAll once and send an update to the UI
216        fetch_properties_and_update(
217            sender.clone(),
218            &dbus_properties_proxy,
219            address_parts.destination.clone(),
220            connection.clone(),
221        )
222        .await?;
223
224        // Connect to the notifier proxy to watch for properties change
225        let notifier_item_proxy = StatusNotifierItemProxy::builder(&connection)
226            .destination(address_parts.destination.as_str())?
227            .path(address_parts.path.as_str())?
228            .build()
229            .await?;
230
231        let mut props_changed = notifier_item_proxy.receive_all_signals().await?;
232
233        // Whenever a property change query all props and update the UI
234        while props_changed.next().await.is_some() {
235            fetch_properties_and_update(
236                sender.clone(),
237                &dbus_properties_proxy,
238                address_parts.destination.clone(),
239                connection.clone(),
240            )
241            .await?;
242        }
243
244        Result::<()>::Ok(())
245    });
246
247    Ok(())
248}
249
250// Fetch Properties from DBus proxy and send an update to the UI channel
251async fn fetch_properties_and_update(
252    sender: broadcast::Sender<NotifierItemMessage>,
253    dbus_properties_proxy: &PropertiesProxy<'_>,
254    item_address: String,
255    connection: Connection,
256) -> Result<()> {
257    let interface = InterfaceName::from_static_str("org.kde.StatusNotifierItem")?;
258    let props = dbus_properties_proxy.get_all(interface).await?;
259    let item = StatusNotifierItem::try_from(props);
260
261    // Only send item that maps correctly to our internal StatusNotifierItem representation
262    if let Ok(item) = item {
263        let menu = match &item.menu {
264            None => None,
265            Some(menu_address) => watch_menu(
266                item_address.clone(),
267                item.clone(),
268                connection.clone(),
269                menu_address.clone(),
270                sender.clone(),
271            )
272            .await
273            .ok(),
274        };
275
276        tracing::info!("StatusNotifierItem updated, dbus-address={item_address}");
277
278        sender
279            .send(NotifierItemMessage::Update {
280                address: item_address.to_string(),
281                item: Box::new(item),
282                menu,
283            })
284            .expect("Failed to dispatch NotifierItemMessage");
285    }
286
287    Ok(())
288}
289
290async fn watch_menu(
291    item_address: String,
292    item: StatusNotifierItem,
293    connection: Connection,
294    menu_address: String,
295    sender: broadcast::Sender<NotifierItemMessage>,
296) -> Result<TrayMenu> {
297    let dbus_menu_proxy = DBusMenuProxy::builder(&connection)
298        .destination(item_address.as_str())?
299        .path(menu_address.as_str())?
300        .build()
301        .await?;
302
303    let menu: MenuLayout = dbus_menu_proxy.get_layout(0, 10, &[]).await.unwrap();
304
305    tokio::spawn(async move {
306        let dbus_menu_proxy = DBusMenuProxy::builder(&connection)
307            .destination(item_address.as_str())?
308            .path(menu_address.as_str())?
309            .build()
310            .await?;
311
312        let mut props_changed = dbus_menu_proxy.receive_all_signals().await?;
313
314        while props_changed.next().await.is_some() {
315            let menu: MenuLayout = dbus_menu_proxy.get_layout(0, 10, &[]).await.unwrap();
316            let menu = TrayMenu::try_from(menu).ok();
317            sender.send(NotifierItemMessage::Update {
318                address: item_address.to_string(),
319                item: Box::new(item.clone()),
320                menu,
321            })?;
322        }
323        anyhow::Result::<(), anyhow::Error>::Ok(())
324    });
325
326    TrayMenu::try_from(menu).map_err(Into::into)
327}