use crate::names;
use std::borrow::Cow;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use tracing::{debug, error, info, warn};
use zbus::message::Header;
use zbus::object_server::Interface;
use zbus::object_server::SignalEmitter;
use zbus::{export::ordered_stream::OrderedStreamExt, interface, Connection};
#[derive(Debug, Default)]
pub struct StatusNotifierWatcher {
tasks: tokio::task::JoinSet<()>,
hosts: Arc<Mutex<HashSet<String>>>,
items: Arc<Mutex<HashSet<String>>>,
}
#[interface(name = "org.kde.StatusNotifierWatcher")]
impl StatusNotifierWatcher {
async fn register_status_notifier_host(
&mut self,
service: &str,
#[zbus(header)] hdr: Header<'_>,
#[zbus(connection)] con: &Connection,
#[zbus(signal_emitter)] signal_emitter: SignalEmitter<'_>,
) -> zbus::fdo::Result<()> {
let (service, _) = parse_service(service, hdr, con).await?;
info!("new host: {}", service);
let added_first = {
let mut hosts = self.hosts.lock().expect("mutex lock should succeed");
if !hosts.insert(service.to_string()) {
return Ok(());
}
hosts.len() == 1
};
if added_first {
self.is_status_notifier_host_registered_changed(&signal_emitter)
.await?;
}
StatusNotifierWatcher::status_notifier_host_registered(&signal_emitter).await?;
self.tasks.spawn({
let hosts = self.hosts.clone();
let signal_emitter = signal_emitter.to_owned();
let con = con.to_owned();
async move {
if let Err(e) = wait_for_service_exit(&con, service.as_ref().into()).await {
error!("failed to wait for service exit: {}", e);
}
info!("lost host: {}", service);
let removed_last = {
let mut hosts = hosts.lock().expect("mutex lock should succeed");
let did_remove = hosts.remove(service.as_str());
did_remove && hosts.is_empty()
};
if removed_last {
if let Err(e) =
StatusNotifierWatcher::is_status_notifier_host_registered_refresh(
&signal_emitter,
)
.await
{
error!("failed to signal Watcher: {}", e);
}
}
if let Err(e) =
StatusNotifierWatcher::status_notifier_host_unregistered(&signal_emitter).await
{
error!("failed to signal Watcher: {}", e);
}
}
});
Ok(())
}
#[zbus(signal)]
async fn status_notifier_host_registered(
signal_emitter: &SignalEmitter<'_>,
) -> zbus::Result<()>;
#[zbus(signal)]
async fn status_notifier_host_unregistered(
signal_emitter: &SignalEmitter<'_>,
) -> zbus::Result<()>;
#[zbus(property)]
fn is_status_notifier_host_registered(&self) -> bool {
let hosts = self.hosts.lock().expect("mutex lock should succeed");
!hosts.is_empty()
}
async fn register_status_notifier_item(
&mut self,
service: &str,
#[zbus(header)] hdr: Header<'_>,
#[zbus(connection)] con: &Connection,
#[zbus(signal_emitter)] signal_emitter: SignalEmitter<'_>,
) -> zbus::fdo::Result<()> {
let (service, objpath) = parse_service(service, hdr, con).await?;
let service = zbus::names::BusName::Unique(service);
let item = format!("{service}{objpath}");
{
let mut items = self.items.lock().expect("mutex lock should succeed");
if !items.insert(item.clone()) {
info!("new item: {} (duplicate)", item);
return Ok(());
}
}
info!("new item: {}", item);
self.registered_status_notifier_items_changed(&signal_emitter)
.await?;
StatusNotifierWatcher::status_notifier_item_registered(&signal_emitter, item.as_ref())
.await?;
self.tasks.spawn({
let items = self.items.clone();
let signal_emitter = signal_emitter.to_owned();
let con = con.to_owned();
async move {
if let Err(e) = wait_for_service_exit(&con, service.as_ref()).await {
error!("failed to wait for service exit: {}", e);
}
debug!("gone item: {}", &item);
{
let mut items = items.lock().expect("mutex lock should succeed");
items.remove(&item);
}
if let Err(e) =
StatusNotifierWatcher::registered_status_notifier_items_refresh(&signal_emitter)
.await
{
error!("failed to signal Watcher: {}", e);
}
if let Err(e) = StatusNotifierWatcher::status_notifier_item_unregistered(
&signal_emitter,
item.as_ref(),
)
.await
{
error!("failed to signal Watcher: {}", e);
}
}
});
Ok(())
}
async fn unregister_status_notifier_item(
&mut self,
service: &str,
#[zbus(header)] hdr: Header<'_>,
#[zbus(connection)] con: &Connection,
#[zbus(signal_emitter)] context: SignalEmitter<'_>,
) -> zbus::fdo::Result<()> {
debug!("received item unregister: {service}");
let (service, objpath) = parse_service(service, hdr, con).await?;
let service = zbus::names::BusName::Unique(service);
let item = format!("{service}{objpath}");
self.items
.lock()
.expect("mutex lock should succeed")
.remove(&item);
if let Err(err) = Self::status_notifier_item_unregistered(&context, &item).await {
error!("{err:?}");
}
Ok(())
}
#[zbus(signal)]
async fn status_notifier_item_registered(
signal_emitter: &SignalEmitter<'_>,
service: &str,
) -> zbus::Result<()>;
#[zbus(signal)]
async fn status_notifier_item_unregistered(
signal_emitter: &SignalEmitter<'_>,
service: &str,
) -> zbus::Result<()>;
#[zbus(property)]
fn registered_status_notifier_items(&self) -> Vec<String> {
let items = self.items.lock().expect("mutex lock should succeed");
items.iter().cloned().collect()
}
#[zbus(property)]
#[allow(clippy::unused_self)]
fn protocol_version(&self) -> i32 {
0
}
}
impl StatusNotifierWatcher {
pub fn new() -> Self {
Self::default()
}
pub async fn attach_to(self, con: &zbus::Connection) -> zbus::Result<()> {
if !con.object_server().at(names::WATCHER_OBJECT, self).await? {
return Err(zbus::Error::Failure(format!(
"Object already exists at {} on this connection -- is StatusNotifierWatcher already running?",
names::WATCHER_OBJECT
)));
}
let flags: [zbus::fdo::RequestNameFlags; 0] = [];
match con
.request_name_with_flags(names::WATCHER_BUS, flags.into_iter().collect())
.await
{
Ok(_) | Err(zbus::Error::NameTaken) => Ok(()), Err(e) => Err(e),
}
}
async fn is_status_notifier_host_registered_refresh(
signal_emitter: &SignalEmitter<'_>,
) -> zbus::Result<()> {
zbus::fdo::Properties::properties_changed(
signal_emitter,
Self::name(),
std::collections::HashMap::new(),
Cow::Borrowed(&["IsStatusNotifierHostRegistered"]),
)
.await
}
async fn registered_status_notifier_items_refresh(
signal_emitter: &SignalEmitter<'_>,
) -> zbus::Result<()> {
zbus::fdo::Properties::properties_changed(
signal_emitter,
Self::name(),
std::collections::HashMap::new(),
Cow::Borrowed(&["RegisteredStatusNotifierItems"]),
)
.await
}
}
async fn parse_service<'a>(
service: &'a str,
hdr: Header<'_>,
con: &Connection,
) -> zbus::fdo::Result<(zbus::names::UniqueName<'static>, &'a str)> {
if service.starts_with('/') {
if let Some(sender) = hdr.sender() {
Ok((sender.to_owned(), service))
} else {
warn!("unknown sender");
Err(zbus::fdo::Error::InvalidArgs("Unknown bus address".into()))
}
} else {
let busname: zbus::names::BusName = match service.try_into() {
Ok(x) => x,
Err(e) => {
warn!("received invalid bus name {:?}: {}", service, e);
return Err(zbus::fdo::Error::InvalidArgs(e.to_string()));
}
};
if let zbus::names::BusName::Unique(unique) = busname {
Ok((unique.to_owned(), names::ITEM_OBJECT))
} else {
let dbus = zbus::fdo::DBusProxy::new(con).await?;
match dbus.get_name_owner(busname).await {
Ok(owner) => Ok((owner.into_inner(), names::ITEM_OBJECT)),
Err(e) => {
warn!("failed to get owner of {:?}: {}", service, e);
Err(e)
}
}
}
}
}
async fn wait_for_service_exit(
con: &Connection,
service: zbus::names::BusName<'_>,
) -> zbus::fdo::Result<()> {
let dbus = zbus::fdo::DBusProxy::new(con).await?;
let mut owner_changes = dbus
.receive_name_owner_changed_with_args(&[(0, &service)])
.await?;
if !dbus.name_has_owner(service.as_ref()).await? {
return Ok(());
}
while let Some(sig) = owner_changes.next().await {
let args = sig.args()?;
if args.new_owner().is_none() {
break;
}
}
Ok(())
}