use std::{ops::Deref, time::Duration};
use async_channel::Sender;
use cache::ConnectionCache;
use futures::{Stream, TryStreamExt};
use itertools::Itertools;
use serde::{Deserialize, Deserializer};
use waybar_cffi::gtk::glib::{self};
use zbus::{
Connection, MatchRule, Message, MessageStream,
fdo::MonitoringProxy,
names::{InterfaceName, MemberName},
zvariant::{DeserializeDict, Optional, Type},
};
mod cache;
pub fn stream() -> impl Stream<Item = EnrichedNotification> {
let (tx, rx) = async_channel::unbounded();
glib::spawn_future_local(async move {
match monitor_dbus(tx).await {
Ok(()) => tracing::info!("no longer monitoring D-Bus"),
Err(e) => tracing::error!(%e, "D-Bus error"),
}
});
async_stream::stream! {
while let Ok(notification) = rx.recv().await {
yield notification;
}
}
}
#[derive(Debug, Clone)]
pub struct EnrichedNotification {
notification: Notification,
pid: Option<u32>,
}
impl EnrichedNotification {
pub fn notification(&self) -> &Notification {
&self.notification
}
pub fn pid(&self) -> Option<i64> {
match self.pid {
Some(pid) => Some(pid.into()),
None => self.notification.hints.sender_pid,
}
}
}
#[allow(dead_code)]
#[derive(Debug, Clone, Deserialize, Type)]
pub struct Notification {
pub app_name: Optional<String>,
pub replaces_id: Optional<u32>,
pub app_icon: Optional<String>,
pub summary: String,
pub body: Optional<String>,
pub actions: Actions,
pub hints: Hints,
pub expire_timeout: i32,
}
#[allow(dead_code)]
#[derive(Debug, Clone, Type)]
#[zvariant(signature = "as")]
pub struct Actions(Vec<Action>);
impl Deref for Actions {
type Target = Vec<Action>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct Action {
pub id: String,
pub localised: String,
}
impl<'de> Deserialize<'de> for Actions {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Ok(Self(
Vec::<String>::deserialize(deserializer)?
.into_iter()
.tuples::<(_, _)>()
.map(|(id, localised)| Action { id, localised })
.collect(),
))
}
}
#[allow(dead_code)]
#[derive(Debug, Clone, DeserializeDict, Type)]
#[zvariant(rename_all = "kebab-case", signature = "a{sv}")]
pub struct Hints {
pub desktop_entry: Option<String>,
pub sender_pid: Option<i64>,
}
static INTERFACE: &str = "org.freedesktop.Notifications";
static METHOD: &str = "Notify";
#[tracing::instrument(level = "TRACE", skip_all, err)]
async fn monitor_dbus(tx: Sender<EnrichedNotification>) -> anyhow::Result<()> {
let cache = cache::ConnectionCache::new(Duration::from_secs(86400));
let conn = Connection::session().await?;
let proxy = MonitoringProxy::new(&conn).await?;
proxy
.become_monitor(
&[MatchRule::builder()
.interface(INTERFACE)?
.member(METHOD)?
.build()],
0,
)
.await?;
let mut stream = MessageStream::from(conn);
while let Some(msg) = stream.try_next().await? {
if let Err(e) = process_message(&tx, &cache, &msg).await {
tracing::error!(%e, ?msg, "error processing notification message");
}
}
Ok(())
}
async fn process_message(
tx: &Sender<EnrichedNotification>,
cache: &ConnectionCache,
msg: &Message,
) -> anyhow::Result<()> {
if msg.header().interface() == Some(&InterfaceName::from_static_str(INTERFACE)?)
&& msg.header().member() == Some(&MemberName::from_static_str(METHOD)?)
{
let pid = if let Some(sender) = msg.header().sender() {
cache.get(sender).await
} else {
None
};
tx.send(EnrichedNotification {
notification: msg.body().deserialize()?,
pid,
})
.await?;
}
Ok(())
}