use crate::{
reactions::{EventPriority, ReactionAction},
types::SessionId,
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
};
#[derive(Debug, Clone)]
pub struct NotificationPayload {
pub session_id: SessionId,
pub reaction_key: String,
pub action: ReactionAction,
pub priority: EventPriority,
pub title: String,
pub body: String,
pub escalated: bool,
}
#[derive(Debug, thiserror::Error)]
pub enum NotifierError {
#[error("notifier I/O failure: {0}")]
Io(String),
#[error("notifier configuration error: {0}")]
Config(String),
#[error("notifier external service error: {status}: {message}")]
Service { status: u16, message: String },
#[error("notifier timed out after {elapsed_ms}ms")]
Timeout { elapsed_ms: u64 },
#[error("notifier unavailable: {0}")]
Unavailable(String),
}
#[async_trait]
pub trait Notifier: Send + Sync {
fn name(&self) -> &str;
async fn send(&self, payload: &NotificationPayload) -> Result<(), NotifierError>;
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct NotificationRouting(HashMap<EventPriority, Vec<String>>);
impl NotificationRouting {
pub fn names_for(&self, priority: EventPriority) -> Option<&[String]> {
self.0.get(&priority).map(Vec::as_slice)
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn from_map(map: HashMap<EventPriority, Vec<String>>) -> Self {
Self(map)
}
}
pub struct NotifierRegistry {
plugins: HashMap<String, Arc<dyn Notifier>>,
routing: NotificationRouting,
warned: Mutex<HashSet<String>>,
}
impl NotifierRegistry {
pub fn new(routing: NotificationRouting) -> Self {
Self {
plugins: HashMap::new(),
routing,
warned: Mutex::new(HashSet::new()),
}
}
pub fn register(&mut self, name: impl Into<String>, plugin: Arc<dyn Notifier>) {
self.plugins.insert(name.into(), plugin);
}
pub fn get(&self, name: &str) -> Option<Arc<dyn Notifier>> {
self.plugins.get(name).cloned()
}
pub fn len(&self) -> usize {
self.plugins.len()
}
pub fn is_empty(&self) -> bool {
self.plugins.is_empty()
}
pub fn resolve(&self, priority: EventPriority) -> Vec<(String, Arc<dyn Notifier>)> {
let Some(names) = self.routing.names_for(priority) else {
self.warn_once(format!("priority.{}", priority.as_str()), || {
tracing::warn!(
priority = priority.as_str(),
"notification-routing has no entry for priority; notification dropped"
);
});
return Vec::new();
};
if names.is_empty() {
self.warn_once(format!("priority.{}", priority.as_str()), || {
tracing::warn!(
priority = priority.as_str(),
"notification-routing has an empty list for priority; notification dropped"
);
});
return Vec::new();
}
let mut out = Vec::with_capacity(names.len());
for name in names {
if let Some(plugin) = self.plugins.get(name) {
out.push((name.clone(), plugin.clone()));
} else {
let key = format!("{}.{}", priority.as_str(), name);
let missing_name = name.clone();
self.warn_once(key, || {
tracing::warn!(
priority = priority.as_str(),
notifier = missing_name.as_str(),
"notification-routing references unregistered notifier; skipping"
);
});
}
}
out
}
fn warn_once<F: FnOnce()>(&self, key: String, emit: F) {
let fire = {
let mut set = self.warned.lock().unwrap_or_else(|e| {
tracing::error!(
"notifier registry warned mutex poisoned; recovering inner state: {e}"
);
e.into_inner()
});
set.insert(key)
};
if fire {
emit();
}
}
#[cfg(test)]
pub(crate) fn warned_count(&self) -> usize {
self.warned
.lock()
.unwrap_or_else(|e| {
tracing::error!(
"notifier registry warned mutex poisoned; recovering inner state: {e}"
);
e.into_inner()
})
.len()
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::sync::Mutex as StdMutex;
pub(crate) struct TestNotifier {
name: String,
received: Arc<StdMutex<Vec<NotificationPayload>>>,
}
impl TestNotifier {
pub(crate) fn new(
name: impl Into<String>,
) -> (Self, Arc<StdMutex<Vec<NotificationPayload>>>) {
let received = Arc::new(StdMutex::new(Vec::new()));
(
Self {
name: name.into(),
received: Arc::clone(&received),
},
received,
)
}
}
#[async_trait]
impl Notifier for TestNotifier {
fn name(&self) -> &str {
&self.name
}
async fn send(&self, payload: &NotificationPayload) -> Result<(), NotifierError> {
self.received
.lock()
.unwrap_or_else(|e| {
tracing::error!("test notifier mutex poisoned; recovering inner state: {e}");
e.into_inner()
})
.push(payload.clone());
Ok(())
}
}
fn fake_payload(priority: EventPriority) -> NotificationPayload {
NotificationPayload {
session_id: SessionId("sess-test".into()),
reaction_key: "ci-failed".into(),
action: ReactionAction::Notify,
priority,
title: "CI broke on sess-test".into(),
body: "tests failed on main".into(),
escalated: false,
}
}
#[test]
fn routing_default_is_empty() {
let r = NotificationRouting::default();
assert!(r.is_empty());
assert_eq!(r.len(), 0);
assert!(r.names_for(EventPriority::Urgent).is_none());
}
#[test]
fn routing_yaml_round_trip() {
let yaml = r#"
urgent: [stdout, ntfy]
action: [stdout, ntfy]
warning: [stdout]
info: [stdout]
"#;
let parsed: NotificationRouting = serde_yaml::from_str(yaml).unwrap();
assert_eq!(parsed.len(), 4);
assert_eq!(
parsed.names_for(EventPriority::Urgent).unwrap(),
&["stdout".to_string(), "ntfy".to_string()]
);
assert_eq!(
parsed.names_for(EventPriority::Info).unwrap(),
&["stdout".to_string()]
);
let back = serde_yaml::to_string(&parsed).unwrap();
let again: NotificationRouting = serde_yaml::from_str(&back).unwrap();
assert_eq!(parsed, again);
}
#[test]
fn routing_rejects_unknown_priority_key() {
let yaml = "critical: [stdout]\n";
let result: std::result::Result<NotificationRouting, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"expected parse error for unknown priority, got {result:?}"
);
}
#[test]
fn routing_preserves_empty_list_distinct_from_missing() {
let yaml = "warning: []\n";
let parsed: NotificationRouting = serde_yaml::from_str(yaml).unwrap();
assert_eq!(parsed.names_for(EventPriority::Warning), Some(&[][..]));
assert!(parsed.names_for(EventPriority::Urgent).is_none());
}
#[test]
fn registry_new_is_empty() {
let r = NotifierRegistry::new(NotificationRouting::default());
assert!(r.is_empty());
assert_eq!(r.len(), 0);
assert!(r.get("stdout").is_none());
}
#[test]
fn registry_register_and_get_round_trip() {
let (tn, _received) = TestNotifier::new("stdout");
let mut reg = NotifierRegistry::new(NotificationRouting::default());
reg.register("stdout", Arc::new(tn));
assert_eq!(reg.len(), 1);
let got = reg.get("stdout").expect("plugin should be registered");
assert_eq!(got.name(), "stdout");
}
#[test]
fn registry_register_overwrites_existing() {
let (first, _) = TestNotifier::new("first");
let (second, _) = TestNotifier::new("second");
let mut reg = NotifierRegistry::new(NotificationRouting::default());
reg.register("slot", Arc::new(first));
reg.register("slot", Arc::new(second));
assert_eq!(reg.len(), 1);
assert_eq!(reg.get("slot").unwrap().name(), "second");
}
#[test]
fn resolve_empty_routing_returns_empty_and_warns_once() {
let reg = NotifierRegistry::new(NotificationRouting::default());
assert!(reg.resolve(EventPriority::Urgent).is_empty());
assert_eq!(reg.warned_count(), 1);
assert!(reg.resolve(EventPriority::Urgent).is_empty());
assert_eq!(reg.warned_count(), 1, "same-priority miss must dedup");
assert!(reg.resolve(EventPriority::Warning).is_empty());
assert_eq!(reg.warned_count(), 2);
}
#[test]
fn resolve_returns_only_registered_names() {
let mut routing = HashMap::new();
routing.insert(
EventPriority::Urgent,
vec!["stdout".to_string(), "ntfy".to_string()],
);
let (tn, _received) = TestNotifier::new("stdout");
let mut reg = NotifierRegistry::new(NotificationRouting::from_map(routing));
reg.register("stdout", Arc::new(tn));
let resolved = reg.resolve(EventPriority::Urgent);
assert_eq!(resolved.len(), 1, "should return only the registered one");
assert_eq!(resolved[0].0, "stdout");
assert_eq!(reg.warned_count(), 1, "one warn for missing 'ntfy'");
let again = reg.resolve(EventPriority::Urgent);
assert_eq!(again.len(), 1);
assert_eq!(reg.warned_count(), 1);
}
#[test]
fn resolve_distinct_missing_names_are_warned_separately() {
let mut routing = HashMap::new();
routing.insert(EventPriority::Urgent, vec!["missing-a".to_string()]);
routing.insert(EventPriority::Warning, vec!["missing-b".to_string()]);
let reg = NotifierRegistry::new(NotificationRouting::from_map(routing));
assert!(reg.resolve(EventPriority::Urgent).is_empty());
assert!(reg.resolve(EventPriority::Warning).is_empty());
assert_eq!(reg.warned_count(), 2);
}
#[test]
fn resolve_empty_list_warns_once() {
let mut routing = HashMap::new();
routing.insert(EventPriority::Warning, Vec::<String>::new());
let reg = NotifierRegistry::new(NotificationRouting::from_map(routing));
assert!(reg.resolve(EventPriority::Warning).is_empty());
assert_eq!(reg.warned_count(), 1);
assert!(reg.resolve(EventPriority::Warning).is_empty());
assert_eq!(reg.warned_count(), 1);
}
#[test]
fn resolve_returns_plugins_in_routing_order() {
let mut routing = HashMap::new();
routing.insert(
EventPriority::Info,
vec!["a".to_string(), "b".to_string(), "c".to_string()],
);
let (a, _) = TestNotifier::new("a");
let (b, _) = TestNotifier::new("b");
let (c, _) = TestNotifier::new("c");
let mut reg = NotifierRegistry::new(NotificationRouting::from_map(routing));
reg.register("a", Arc::new(a));
reg.register("b", Arc::new(b));
reg.register("c", Arc::new(c));
let resolved = reg.resolve(EventPriority::Info);
let names: Vec<&str> = resolved.iter().map(|(n, _)| n.as_str()).collect();
assert_eq!(names, vec!["a", "b", "c"]);
}
#[tokio::test]
async fn test_notifier_records_payload() {
let (tn, received) = TestNotifier::new("test");
let payload = fake_payload(EventPriority::Urgent);
tn.send(&payload).await.unwrap();
let got = received.lock().unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got[0].reaction_key, "ci-failed");
assert_eq!(got[0].priority, EventPriority::Urgent);
}
}