1use std::collections::HashMap;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32
33use async_trait::async_trait;
34use irontide_core::Id20;
35use parking_lot::Mutex;
36use thiserror::Error;
37use tokio::sync::{broadcast, oneshot};
38use tracing::{debug, warn};
39
40use crate::alert::{Alert, AlertKind};
41use crate::settings::Settings;
42
43#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct NotificationRecord {
48 pub summary: String,
50 pub body: String,
53}
54
55#[derive(Debug, Error)]
59pub enum NotificationError {
60 #[error("notify-rust failed: {0}")]
64 Backend(String),
65 #[error("spawn_blocking join error: {0}")]
68 JoinError(String),
69 #[error("injected test failure: {0}")]
72 Test(String),
73}
74
75#[async_trait]
83pub trait NotificationSink: Send + Sync + 'static {
84 async fn show(&self, summary: &str, body: &str) -> Result<(), NotificationError>;
88}
89
90#[derive(Debug, Default, Clone, Copy)]
93pub struct LibNotifySink;
94
95impl LibNotifySink {
96 #[must_use]
99 pub fn new() -> Self {
100 Self
101 }
102}
103
104#[async_trait]
105impl NotificationSink for LibNotifySink {
106 async fn show(&self, summary: &str, body: &str) -> Result<(), NotificationError> {
107 let summary = summary.to_string();
108 let body = body.to_string();
109 tokio::task::spawn_blocking(move || {
110 notify_rust::Notification::new()
111 .summary(&summary)
112 .body(&body)
113 .appname("irontide")
114 .show()
115 .map(|_handle| ())
116 .map_err(|e| NotificationError::Backend(e.to_string()))
117 })
118 .await
119 .map_err(|e| NotificationError::JoinError(e.to_string()))?
120 }
121}
122
123#[derive(Debug, Clone, Default)]
127pub struct InMemorySink {
128 pub records: Arc<Mutex<Vec<NotificationRecord>>>,
131 fail_with: Option<String>,
134}
135
136impl InMemorySink {
137 #[must_use]
139 pub fn new() -> Self {
140 Self::default()
141 }
142
143 #[must_use]
146 pub fn with_failure(message: impl Into<String>) -> Self {
147 Self {
148 records: Arc::new(Mutex::new(Vec::new())),
149 fail_with: Some(message.into()),
150 }
151 }
152
153 #[must_use]
156 pub fn snapshot(&self) -> Vec<NotificationRecord> {
157 self.records.lock().clone()
158 }
159}
160
161#[async_trait]
162impl NotificationSink for InMemorySink {
163 async fn show(&self, summary: &str, body: &str) -> Result<(), NotificationError> {
164 if let Some(ref msg) = self.fail_with {
165 return Err(NotificationError::Test(msg.clone()));
166 }
167 self.records.lock().push(NotificationRecord {
168 summary: summary.to_string(),
169 body: body.to_string(),
170 });
171 Ok(())
172 }
173}
174
175#[must_use]
188pub fn sanitize_notification_text(s: &str) -> String {
189 let mut out = String::with_capacity(s.len());
190 for c in s.chars() {
191 match c {
192 '<' => out.push_str("<"),
193 '>' => out.push_str(">"),
194 '&' => out.push_str("&"),
195 '"' => out.push_str("""),
196 '\'' => out.push_str("'"),
197 _ => out.push(c),
198 }
199 }
200 out
201}
202
203fn dispatch_one(
209 settings: &Settings,
210 kind: &AlertKind,
211 name_cache: &HashMap<Id20, String>,
212) -> Option<(&'static str, String)> {
213 match kind {
214 AlertKind::TorrentFinished { info_hash } if settings.notify_on_complete => {
215 let raw = name_cache
216 .get(info_hash)
217 .cloned()
218 .unwrap_or_else(|| info_hash_short_hex(*info_hash));
219 let name = sanitize_notification_text(&raw);
220 Some(("IronTide", format!("{name} download complete")))
221 }
222 AlertKind::TorrentError { info_hash, message } if settings.notify_on_error => {
223 let raw = name_cache
224 .get(info_hash)
225 .cloned()
226 .unwrap_or_else(|| info_hash_short_hex(*info_hash));
227 let name = sanitize_notification_text(&raw);
228 let message = sanitize_notification_text(message);
229 Some(("IronTide", format!("{name}: {message}")))
230 }
231 _ => None,
232 }
233}
234
235fn info_hash_short_hex(hash: Id20) -> String {
238 let hex = hash.to_hex();
239 hex.chars().take(8).collect()
242}
243
244pub struct DispatcherOptions {
248 pub sink: Box<dyn NotificationSink>,
251 pub settings_rx: tokio::sync::watch::Receiver<Settings>,
257 pub alerts_rx: broadcast::Receiver<Alert>,
262 pub shutdown_rx: oneshot::Receiver<()>,
265}
266
267#[must_use]
271pub fn spawn_notification_dispatcher(opts: DispatcherOptions) -> tokio::task::JoinHandle<()> {
272 let DispatcherOptions {
273 sink,
274 settings_rx,
275 mut alerts_rx,
276 mut shutdown_rx,
277 } = opts;
278 tokio::spawn(async move {
279 let mut name_cache: HashMap<Id20, String> = HashMap::new();
280 let dbus_failure_logged = AtomicBool::new(false);
281 loop {
282 tokio::select! {
283 _ = &mut shutdown_rx => {
284 debug!("notification dispatcher: shutdown signal received");
285 break;
286 }
287 event = alerts_rx.recv() => {
288 let alert = match event {
289 Ok(alert) => alert,
290 Err(broadcast::error::RecvError::Lagged(n)) => {
291 warn!(lagged = n, "notification dispatcher: alert stream lagged");
298 continue;
299 }
300 Err(broadcast::error::RecvError::Closed) => {
301 debug!("notification dispatcher: alert stream closed");
302 break;
303 }
304 };
305 match &alert.kind {
310 AlertKind::TorrentAdded { info_hash, name } => {
311 name_cache.insert(*info_hash, name.clone());
312 continue;
313 }
314 AlertKind::TorrentRemoved { info_hash } => {
315 name_cache.remove(info_hash);
316 continue;
317 }
318 _ => {}
319 }
320 let settings = settings_rx.borrow().clone();
321 let Some((summary, body)) = dispatch_one(&settings, &alert.kind, &name_cache)
322 else {
323 continue;
324 };
325 if let Err(e) = sink.show(summary, &body).await
326 && !dbus_failure_logged.swap(true, Ordering::Relaxed)
327 {
328 warn!(
329 error = %e,
330 "notification dispatcher: sink failed; degrading silently for the rest of the session"
331 );
332 }
333 }
334 }
335 }
336 })
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use crate::alert::Alert;
343 use std::time::Duration;
344
345 fn fake_hash(byte: u8) -> Id20 {
346 Id20([byte; 20])
347 }
348
349 #[test]
350 fn sanitizer_escapes_all_five_html_metacharacters() {
351 let input = r#"<b>"hi" & 'bye'</b>"#;
352 let out = sanitize_notification_text(input);
353 assert_eq!(out, "<b>"hi" & 'bye'</b>");
354 }
355
356 #[test]
360 fn sanitizer_does_not_double_escape_existing_entities() {
361 assert_eq!(sanitize_notification_text("AT&T"), "AT&T");
363 assert_eq!(
365 sanitize_notification_text("<AT&T>"),
366 "<AT&T>"
367 );
368 }
369
370 #[test]
371 fn sanitizer_passes_through_non_ascii_unchanged() {
372 let input = "Façade — 管理 — 🦀";
374 assert_eq!(sanitize_notification_text(input), input);
375 }
376
377 #[test]
378 fn dispatch_one_skips_when_notify_on_complete_false() {
379 let s = Settings {
380 notify_on_complete: false,
381 ..Default::default()
382 };
383 let hash = fake_hash(0xAA);
384 let cache = HashMap::from([(hash, "test-torrent".to_string())]);
385 let result = dispatch_one(
386 &s,
387 &AlertKind::TorrentFinished { info_hash: hash },
388 &cache,
389 );
390 assert!(result.is_none(), "gate false must skip dispatch");
391 }
392
393 #[test]
394 fn dispatch_one_emits_when_notify_on_complete_true() {
395 let s = Settings {
396 notify_on_complete: true,
397 ..Default::default()
398 };
399 let hash = fake_hash(0xBB);
400 let cache = HashMap::from([(hash, "my-movie".to_string())]);
401 let (summary, body) = dispatch_one(
402 &s,
403 &AlertKind::TorrentFinished { info_hash: hash },
404 &cache,
405 )
406 .expect("gate true + finished must dispatch");
407 assert_eq!(summary, "IronTide");
408 assert_eq!(body, "my-movie download complete");
409 }
410
411 #[test]
412 fn dispatch_one_emits_error_body_with_sanitised_message() {
413 let s = Settings {
414 notify_on_error: true,
415 ..Default::default()
416 };
417 let hash = fake_hash(0xCC);
418 let cache = HashMap::from([(hash, "<evil>".to_string())]);
419 let (summary, body) = dispatch_one(
420 &s,
421 &AlertKind::TorrentError {
422 info_hash: hash,
423 message: "disk full <again>".to_string(),
424 },
425 &cache,
426 )
427 .expect("gate true + error must dispatch");
428 assert_eq!(summary, "IronTide");
429 assert_eq!(body, "<evil>: disk full <again>");
430 }
431
432 #[test]
433 fn dispatch_one_falls_back_to_hex_prefix_when_cache_miss() {
434 let s = Settings {
435 notify_on_complete: true,
436 ..Default::default()
437 };
438 let hash = fake_hash(0xDE);
439 let cache = HashMap::new();
440 let (_, body) =
441 dispatch_one(&s, &AlertKind::TorrentFinished { info_hash: hash }, &cache).unwrap();
442 assert!(
444 body.starts_with("dededede"),
445 "cache miss must fall back to hex prefix, got: {body}"
446 );
447 }
448
449 #[tokio::test]
450 async fn in_memory_sink_records_and_can_inject_failure() {
451 let sink = InMemorySink::new();
452 sink.show("title", "body").await.unwrap();
453 let snap = sink.snapshot();
454 assert_eq!(snap.len(), 1);
455 assert_eq!(snap[0].summary, "title");
456
457 let failing = InMemorySink::with_failure("boom");
458 let err = failing.show("t", "b").await.unwrap_err();
459 assert!(matches!(err, NotificationError::Test(_)));
460 }
461
462 #[tokio::test]
466 async fn dispatcher_emits_completion_notification_with_cached_name() {
467 let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
468 let (shutdown_tx, shutdown_rx) = oneshot::channel();
469 let settings = Settings {
470 notify_on_complete: true,
471 ..Default::default()
472 };
473 let (settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
474 let _ = &settings_tx;
476
477 let sink = InMemorySink::new();
478 let join = spawn_notification_dispatcher(DispatcherOptions {
479 sink: Box::new(sink.clone()),
480 settings_rx,
481 alerts_rx: alert_rx,
482 shutdown_rx,
483 });
484
485 let hash = fake_hash(0xEE);
486 alert_tx
487 .send(Alert::new(AlertKind::TorrentAdded {
488 info_hash: hash,
489 name: "demo-torrent".to_string(),
490 }))
491 .unwrap();
492 alert_tx
493 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
494 .unwrap();
495
496 tokio::time::sleep(Duration::from_millis(50)).await;
498 let _ = shutdown_tx.send(());
499 join.await.unwrap();
500
501 let records = sink.snapshot();
502 assert_eq!(records.len(), 1, "exactly one notification expected");
503 assert_eq!(records[0].summary, "IronTide");
504 assert_eq!(records[0].body, "demo-torrent download complete");
505 }
506
507 #[tokio::test]
509 async fn dispatcher_handles_sink_failure_without_crashing() {
510 let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
511 let (shutdown_tx, shutdown_rx) = oneshot::channel();
512 let settings = Settings {
513 notify_on_complete: true,
514 ..Default::default()
515 };
516 let (_settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
517
518 let sink = InMemorySink::with_failure("no dbus session");
519 let join = spawn_notification_dispatcher(DispatcherOptions {
520 sink: Box::new(sink),
521 settings_rx,
522 alerts_rx: alert_rx,
523 shutdown_rx,
524 });
525
526 let hash = fake_hash(0xFF);
527 alert_tx
528 .send(Alert::new(AlertKind::TorrentAdded {
529 info_hash: hash,
530 name: "fail-test".to_string(),
531 }))
532 .unwrap();
533 alert_tx
534 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
535 .unwrap();
536 alert_tx
537 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
538 .unwrap();
539
540 tokio::time::sleep(Duration::from_millis(50)).await;
541 let _ = shutdown_tx.send(());
542 join.await.expect("dispatcher must not panic on sink failure");
545 }
546
547 #[tokio::test]
550 async fn dispatcher_respects_live_settings_toggle() {
551 let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
552 let (shutdown_tx, shutdown_rx) = oneshot::channel();
553 let settings = Settings {
554 notify_on_complete: false,
555 ..Default::default()
556 };
557 let (settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
558
559 let sink = InMemorySink::new();
560 let join = spawn_notification_dispatcher(DispatcherOptions {
561 sink: Box::new(sink.clone()),
562 settings_rx,
563 alerts_rx: alert_rx,
564 shutdown_rx,
565 });
566
567 let hash_a = fake_hash(0xA1);
568 alert_tx
569 .send(Alert::new(AlertKind::TorrentAdded {
570 info_hash: hash_a,
571 name: "first".to_string(),
572 }))
573 .unwrap();
574 alert_tx
575 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash_a }))
576 .unwrap();
577 tokio::time::sleep(Duration::from_millis(30)).await;
578 settings_tx.send_modify(|s| s.notify_on_complete = true);
580
581 let hash_b = fake_hash(0xB2);
582 alert_tx
583 .send(Alert::new(AlertKind::TorrentAdded {
584 info_hash: hash_b,
585 name: "second".to_string(),
586 }))
587 .unwrap();
588 alert_tx
589 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash_b }))
590 .unwrap();
591 tokio::time::sleep(Duration::from_millis(50)).await;
592 let _ = shutdown_tx.send(());
593 join.await.unwrap();
594
595 let records = sink.snapshot();
596 assert_eq!(
597 records.len(),
598 1,
599 "only the second TorrentFinished must emit"
600 );
601 assert_eq!(records[0].body, "second download complete");
602 }
603
604 #[tokio::test]
609 async fn dispatcher_evicts_name_cache_on_torrent_removed() {
610 let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
611 let (shutdown_tx, shutdown_rx) = oneshot::channel();
612 let settings = Settings {
613 notify_on_complete: true,
614 ..Default::default()
615 };
616 let (_settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
617
618 let sink = InMemorySink::new();
619 let join = spawn_notification_dispatcher(DispatcherOptions {
620 sink: Box::new(sink.clone()),
621 settings_rx,
622 alerts_rx: alert_rx,
623 shutdown_rx,
624 });
625
626 let hash = fake_hash(0xCA);
627 alert_tx
628 .send(Alert::new(AlertKind::TorrentAdded {
629 info_hash: hash,
630 name: "cache-test".to_string(),
631 }))
632 .unwrap();
633 alert_tx
634 .send(Alert::new(AlertKind::TorrentRemoved { info_hash: hash }))
635 .unwrap();
636 alert_tx
638 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
639 .unwrap();
640
641 tokio::time::sleep(Duration::from_millis(50)).await;
642 let _ = shutdown_tx.send(());
643 join.await.unwrap();
644
645 let records = sink.snapshot();
646 assert_eq!(records.len(), 1);
647 assert!(
648 !records[0].body.contains("cache-test"),
649 "after eviction the cached name must NOT appear; got {}",
650 records[0].body
651 );
652 assert!(
654 records[0].body.starts_with("cacacaca"),
655 "expected hex-prefix fallback; got {}",
656 records[0].body
657 );
658 }
659
660 #[tokio::test]
661 async fn dispatcher_exits_cleanly_when_alert_stream_closes() {
662 let (alert_tx, alert_rx) = broadcast::channel::<Alert>(4);
663 let (_shutdown_tx, shutdown_rx) = oneshot::channel();
664 let (_settings_tx, settings_rx) = tokio::sync::watch::channel(Settings::default());
665 let join = spawn_notification_dispatcher(DispatcherOptions {
666 sink: Box::new(InMemorySink::new()),
667 settings_rx,
668 alerts_rx: alert_rx,
669 shutdown_rx,
670 });
671 drop(alert_tx);
673 tokio::time::timeout(Duration::from_secs(1), join)
675 .await
676 .expect("dispatcher must exit after broadcast Sender drops")
677 .unwrap();
678 }
679}