1use std::collections::HashMap;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32
33use async_trait::async_trait;
34use irontide_core::Id20;
35use parking_lot::Mutex;
36use thiserror::Error;
37use tokio::sync::{broadcast, oneshot};
38use tracing::{debug, warn};
39
40use crate::alert::{Alert, AlertKind};
41use irontide_settings::Settings;
42
43#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct NotificationRecord {
48 pub summary: String,
50 pub body: String,
53}
54
55#[derive(Debug, Error)]
59pub enum NotificationError {
60 #[error("notify-rust failed: {0}")]
64 Backend(String),
65 #[error("spawn_blocking join error: {0}")]
68 JoinError(String),
69 #[error("injected test failure: {0}")]
72 Test(String),
73}
74
75#[async_trait]
83pub trait NotificationSink: Send + Sync + 'static {
84 async fn show(&self, summary: &str, body: &str) -> Result<(), NotificationError>;
88}
89
90#[derive(Debug, Default, Clone, Copy)]
93pub struct LibNotifySink;
94
95impl LibNotifySink {
96 #[must_use]
99 pub fn new() -> Self {
100 Self
101 }
102}
103
104#[async_trait]
105impl NotificationSink for LibNotifySink {
106 async fn show(&self, summary: &str, body: &str) -> Result<(), NotificationError> {
107 let summary = summary.to_string();
108 let body = body.to_string();
109 tokio::task::spawn_blocking(move || {
110 notify_rust::Notification::new()
111 .summary(&summary)
112 .body(&body)
113 .appname("irontide")
114 .show()
115 .map(|_handle| ())
116 .map_err(|e| NotificationError::Backend(e.to_string()))
117 })
118 .await
119 .map_err(|e| NotificationError::JoinError(e.to_string()))?
120 }
121}
122
123#[derive(Debug, Clone, Default)]
127pub struct InMemorySink {
128 pub records: Arc<Mutex<Vec<NotificationRecord>>>,
131 fail_with: Option<String>,
134}
135
136impl InMemorySink {
137 #[must_use]
139 pub fn new() -> Self {
140 Self::default()
141 }
142
143 #[must_use]
146 pub fn with_failure(message: impl Into<String>) -> Self {
147 Self {
148 records: Arc::new(Mutex::new(Vec::new())),
149 fail_with: Some(message.into()),
150 }
151 }
152
153 #[must_use]
156 pub fn snapshot(&self) -> Vec<NotificationRecord> {
157 self.records.lock().clone()
158 }
159}
160
161#[async_trait]
162impl NotificationSink for InMemorySink {
163 async fn show(&self, summary: &str, body: &str) -> Result<(), NotificationError> {
164 if let Some(ref msg) = self.fail_with {
165 return Err(NotificationError::Test(msg.clone()));
166 }
167 self.records.lock().push(NotificationRecord {
168 summary: summary.to_string(),
169 body: body.to_string(),
170 });
171 Ok(())
172 }
173}
174
175#[must_use]
188pub fn sanitize_notification_text(s: &str) -> String {
189 let mut out = String::with_capacity(s.len());
190 for c in s.chars() {
191 match c {
192 '<' => out.push_str("<"),
193 '>' => out.push_str(">"),
194 '&' => out.push_str("&"),
195 '"' => out.push_str("""),
196 '\'' => out.push_str("'"),
197 _ => out.push(c),
198 }
199 }
200 out
201}
202
203fn dispatch_one(
209 settings: &Settings,
210 kind: &AlertKind,
211 name_cache: &HashMap<Id20, String>,
212) -> Option<(&'static str, String)> {
213 match kind {
214 AlertKind::TorrentFinished { info_hash } if settings.notify_on_complete => {
215 let raw = name_cache
216 .get(info_hash)
217 .cloned()
218 .unwrap_or_else(|| info_hash_short_hex(*info_hash));
219 let name = sanitize_notification_text(&raw);
220 Some(("IronTide", format!("{name} download complete")))
221 }
222 AlertKind::TorrentError { info_hash, message } if settings.notify_on_error => {
223 let raw = name_cache
224 .get(info_hash)
225 .cloned()
226 .unwrap_or_else(|| info_hash_short_hex(*info_hash));
227 let name = sanitize_notification_text(&raw);
228 let message = sanitize_notification_text(message);
229 Some(("IronTide", format!("{name}: {message}")))
230 }
231 _ => None,
232 }
233}
234
235fn info_hash_short_hex(hash: Id20) -> String {
238 let hex = hash.to_hex();
239 hex.chars().take(8).collect()
242}
243
244pub struct DispatcherOptions {
248 pub sink: Box<dyn NotificationSink>,
251 pub settings_rx: tokio::sync::watch::Receiver<Settings>,
257 pub alerts_rx: broadcast::Receiver<Alert>,
262 pub shutdown_rx: oneshot::Receiver<()>,
265}
266
267#[must_use]
271pub fn spawn_notification_dispatcher(opts: DispatcherOptions) -> tokio::task::JoinHandle<()> {
272 let DispatcherOptions {
273 sink,
274 settings_rx,
275 mut alerts_rx,
276 mut shutdown_rx,
277 } = opts;
278 tokio::spawn(async move {
279 let mut name_cache: HashMap<Id20, String> = HashMap::new();
280 let dbus_failure_logged = AtomicBool::new(false);
281 loop {
282 tokio::select! {
283 _ = &mut shutdown_rx => {
284 debug!("notification dispatcher: shutdown signal received");
285 break;
286 }
287 event = alerts_rx.recv() => {
288 let alert = match event {
289 Ok(alert) => alert,
290 Err(broadcast::error::RecvError::Lagged(n)) => {
291 warn!(lagged = n, "notification dispatcher: alert stream lagged");
298 continue;
299 }
300 Err(broadcast::error::RecvError::Closed) => {
301 debug!("notification dispatcher: alert stream closed");
302 break;
303 }
304 };
305 match &alert.kind {
310 AlertKind::TorrentAdded { info_hash, name } => {
311 name_cache.insert(*info_hash, name.clone());
312 continue;
313 }
314 AlertKind::TorrentRemoved { info_hash } => {
315 name_cache.remove(info_hash);
316 continue;
317 }
318 _ => {}
319 }
320 let settings = settings_rx.borrow().clone();
321 let Some((summary, body)) = dispatch_one(&settings, &alert.kind, &name_cache)
322 else {
323 continue;
324 };
325 if let Err(e) = sink.show(summary, &body).await
326 && !dbus_failure_logged.swap(true, Ordering::Relaxed)
327 {
328 warn!(
329 error = %e,
330 "notification dispatcher: sink failed; degrading silently for the rest of the session"
331 );
332 }
333 }
334 }
335 }
336 })
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use crate::alert::Alert;
343 use std::time::Duration;
344
345 fn fake_hash(byte: u8) -> Id20 {
346 Id20([byte; 20])
347 }
348
349 #[test]
350 fn sanitizer_escapes_all_five_html_metacharacters() {
351 let input = r#"<b>"hi" & 'bye'</b>"#;
352 let out = sanitize_notification_text(input);
353 assert_eq!(
354 out,
355 "<b>"hi" & 'bye'</b>"
356 );
357 }
358
359 #[test]
363 fn sanitizer_does_not_double_escape_existing_entities() {
364 assert_eq!(sanitize_notification_text("AT&T"), "AT&T");
366 assert_eq!(sanitize_notification_text("<AT&T>"), "<AT&T>");
368 }
369
370 #[test]
371 fn sanitizer_passes_through_non_ascii_unchanged() {
372 let input = "Façade — 管理 — 🦀";
374 assert_eq!(sanitize_notification_text(input), input);
375 }
376
377 #[test]
378 fn dispatch_one_skips_when_notify_on_complete_false() {
379 let s = Settings {
380 notify_on_complete: false,
381 ..Default::default()
382 };
383 let hash = fake_hash(0xAA);
384 let cache = HashMap::from([(hash, "test-torrent".to_string())]);
385 let result = dispatch_one(&s, &AlertKind::TorrentFinished { info_hash: hash }, &cache);
386 assert!(result.is_none(), "gate false must skip dispatch");
387 }
388
389 #[test]
390 fn dispatch_one_emits_when_notify_on_complete_true() {
391 let s = Settings {
392 notify_on_complete: true,
393 ..Default::default()
394 };
395 let hash = fake_hash(0xBB);
396 let cache = HashMap::from([(hash, "my-movie".to_string())]);
397 let (summary, body) =
398 dispatch_one(&s, &AlertKind::TorrentFinished { info_hash: hash }, &cache)
399 .expect("gate true + finished must dispatch");
400 assert_eq!(summary, "IronTide");
401 assert_eq!(body, "my-movie download complete");
402 }
403
404 #[test]
405 fn dispatch_one_emits_error_body_with_sanitised_message() {
406 let s = Settings {
407 notify_on_error: true,
408 ..Default::default()
409 };
410 let hash = fake_hash(0xCC);
411 let cache = HashMap::from([(hash, "<evil>".to_string())]);
412 let (summary, body) = dispatch_one(
413 &s,
414 &AlertKind::TorrentError {
415 info_hash: hash,
416 message: "disk full <again>".to_string(),
417 },
418 &cache,
419 )
420 .expect("gate true + error must dispatch");
421 assert_eq!(summary, "IronTide");
422 assert_eq!(body, "<evil>: disk full <again>");
423 }
424
425 #[test]
426 fn dispatch_one_falls_back_to_hex_prefix_when_cache_miss() {
427 let s = Settings {
428 notify_on_complete: true,
429 ..Default::default()
430 };
431 let hash = fake_hash(0xDE);
432 let cache = HashMap::new();
433 let (_, body) =
434 dispatch_one(&s, &AlertKind::TorrentFinished { info_hash: hash }, &cache).unwrap();
435 assert!(
437 body.starts_with("dededede"),
438 "cache miss must fall back to hex prefix, got: {body}"
439 );
440 }
441
442 #[tokio::test]
443 async fn in_memory_sink_records_and_can_inject_failure() {
444 let sink = InMemorySink::new();
445 sink.show("title", "body").await.unwrap();
446 let snap = sink.snapshot();
447 assert_eq!(snap.len(), 1);
448 assert_eq!(snap[0].summary, "title");
449
450 let failing = InMemorySink::with_failure("boom");
451 let err = failing.show("t", "b").await.unwrap_err();
452 assert!(matches!(err, NotificationError::Test(_)));
453 }
454
455 #[tokio::test]
459 async fn dispatcher_emits_completion_notification_with_cached_name() {
460 let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
461 let (shutdown_tx, shutdown_rx) = oneshot::channel();
462 let settings = Settings {
463 notify_on_complete: true,
464 ..Default::default()
465 };
466 let (settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
467 let _ = &settings_tx;
469
470 let sink = InMemorySink::new();
471 let join = spawn_notification_dispatcher(DispatcherOptions {
472 sink: Box::new(sink.clone()),
473 settings_rx,
474 alerts_rx: alert_rx,
475 shutdown_rx,
476 });
477
478 let hash = fake_hash(0xEE);
479 alert_tx
480 .send(Alert::new(AlertKind::TorrentAdded {
481 info_hash: hash,
482 name: "demo-torrent".to_string(),
483 }))
484 .unwrap();
485 alert_tx
486 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
487 .unwrap();
488
489 tokio::time::sleep(Duration::from_millis(50)).await;
491 let _ = shutdown_tx.send(());
492 join.await.unwrap();
493
494 let records = sink.snapshot();
495 assert_eq!(records.len(), 1, "exactly one notification expected");
496 assert_eq!(records[0].summary, "IronTide");
497 assert_eq!(records[0].body, "demo-torrent download complete");
498 }
499
500 #[tokio::test]
502 async fn dispatcher_handles_sink_failure_without_crashing() {
503 let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
504 let (shutdown_tx, shutdown_rx) = oneshot::channel();
505 let settings = Settings {
506 notify_on_complete: true,
507 ..Default::default()
508 };
509 let (_settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
510
511 let sink = InMemorySink::with_failure("no dbus session");
512 let join = spawn_notification_dispatcher(DispatcherOptions {
513 sink: Box::new(sink),
514 settings_rx,
515 alerts_rx: alert_rx,
516 shutdown_rx,
517 });
518
519 let hash = fake_hash(0xFF);
520 alert_tx
521 .send(Alert::new(AlertKind::TorrentAdded {
522 info_hash: hash,
523 name: "fail-test".to_string(),
524 }))
525 .unwrap();
526 alert_tx
527 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
528 .unwrap();
529 alert_tx
530 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
531 .unwrap();
532
533 tokio::time::sleep(Duration::from_millis(50)).await;
534 let _ = shutdown_tx.send(());
535 join.await
538 .expect("dispatcher must not panic on sink failure");
539 }
540
541 #[tokio::test]
544 async fn dispatcher_respects_live_settings_toggle() {
545 let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
546 let (shutdown_tx, shutdown_rx) = oneshot::channel();
547 let settings = Settings {
548 notify_on_complete: false,
549 ..Default::default()
550 };
551 let (settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
552
553 let sink = InMemorySink::new();
554 let join = spawn_notification_dispatcher(DispatcherOptions {
555 sink: Box::new(sink.clone()),
556 settings_rx,
557 alerts_rx: alert_rx,
558 shutdown_rx,
559 });
560
561 let hash_a = fake_hash(0xA1);
562 alert_tx
563 .send(Alert::new(AlertKind::TorrentAdded {
564 info_hash: hash_a,
565 name: "first".to_string(),
566 }))
567 .unwrap();
568 alert_tx
569 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash_a }))
570 .unwrap();
571 tokio::time::sleep(Duration::from_millis(30)).await;
572 settings_tx.send_modify(|s| s.notify_on_complete = true);
574
575 let hash_b = fake_hash(0xB2);
576 alert_tx
577 .send(Alert::new(AlertKind::TorrentAdded {
578 info_hash: hash_b,
579 name: "second".to_string(),
580 }))
581 .unwrap();
582 alert_tx
583 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash_b }))
584 .unwrap();
585 tokio::time::sleep(Duration::from_millis(50)).await;
586 let _ = shutdown_tx.send(());
587 join.await.unwrap();
588
589 let records = sink.snapshot();
590 assert_eq!(
591 records.len(),
592 1,
593 "only the second TorrentFinished must emit"
594 );
595 assert_eq!(records[0].body, "second download complete");
596 }
597
598 #[tokio::test]
603 async fn dispatcher_evicts_name_cache_on_torrent_removed() {
604 let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
605 let (shutdown_tx, shutdown_rx) = oneshot::channel();
606 let settings = Settings {
607 notify_on_complete: true,
608 ..Default::default()
609 };
610 let (_settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
611
612 let sink = InMemorySink::new();
613 let join = spawn_notification_dispatcher(DispatcherOptions {
614 sink: Box::new(sink.clone()),
615 settings_rx,
616 alerts_rx: alert_rx,
617 shutdown_rx,
618 });
619
620 let hash = fake_hash(0xCA);
621 alert_tx
622 .send(Alert::new(AlertKind::TorrentAdded {
623 info_hash: hash,
624 name: "cache-test".to_string(),
625 }))
626 .unwrap();
627 alert_tx
628 .send(Alert::new(AlertKind::TorrentRemoved { info_hash: hash }))
629 .unwrap();
630 alert_tx
632 .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
633 .unwrap();
634
635 tokio::time::sleep(Duration::from_millis(50)).await;
636 let _ = shutdown_tx.send(());
637 join.await.unwrap();
638
639 let records = sink.snapshot();
640 assert_eq!(records.len(), 1);
641 assert!(
642 !records[0].body.contains("cache-test"),
643 "after eviction the cached name must NOT appear; got {}",
644 records[0].body
645 );
646 assert!(
648 records[0].body.starts_with("cacacaca"),
649 "expected hex-prefix fallback; got {}",
650 records[0].body
651 );
652 }
653
654 #[tokio::test]
655 async fn dispatcher_exits_cleanly_when_alert_stream_closes() {
656 let (alert_tx, alert_rx) = broadcast::channel::<Alert>(4);
657 let (_shutdown_tx, shutdown_rx) = oneshot::channel();
658 let (_settings_tx, settings_rx) = tokio::sync::watch::channel(Settings::default());
659 let join = spawn_notification_dispatcher(DispatcherOptions {
660 sink: Box::new(InMemorySink::new()),
661 settings_rx,
662 alerts_rx: alert_rx,
663 shutdown_rx,
664 });
665 drop(alert_tx);
667 tokio::time::timeout(Duration::from_secs(1), join)
669 .await
670 .expect("dispatcher must exit after broadcast Sender drops")
671 .unwrap();
672 }
673}