Skip to main content

irontide_session/
notification.rs

1//! M226: engine-side OS notification dispatcher.
2//!
3//! Subscribes to the existing alert broadcast stream and emits OS-level
4//! desktop notifications on `TorrentFinished` and `TorrentError`. No new
5//! alert kinds are introduced — the dispatcher is a pure consumer of
6//! [`AlertKind::TorrentAdded`] / [`AlertKind::TorrentRemoved`] (for the
7//! name cache) plus [`AlertKind::TorrentFinished`] / [`AlertKind::TorrentError`]
8//! (for the notification trigger).
9//!
10//! The dispatcher is split from the OS-side delivery via the
11//! [`NotificationSink`] trait so tests can inject [`InMemorySink`] and
12//! observe the records without ever loading `notify-rust` into the test
13//! binary. Production code uses [`LibNotifySink`], which wraps
14//! `notify_rust::Notification::show()` in `tokio::task::spawn_blocking`
15//! (the underlying D-Bus call is synchronous-blocking; running it on a
16//! tokio worker thread would starve the runtime).
17//!
18//! Live-toggle semantics: the `notify_on_complete` / `notify_on_error`
19//! gates are read **fresh per alert** from a snapshot of `Settings`
20//! handed to the dispatcher via a `tokio::sync::watch` channel, so
21//! `apply_settings` flips take effect on the next alert without
22//! restarting the dispatcher (matches the `classify_immediate` contract).
23//!
24//! D-Bus absence is tolerated: the first `sink.show()` failure logs a
25//! single `tracing::warn!` (gated by an `AtomicBool` flag); subsequent
26//! failures degrade silently so headless deployments don't churn the log
27//! stream.
28
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32
33use async_trait::async_trait;
34use irontide_core::Id20;
35use parking_lot::Mutex;
36use thiserror::Error;
37use tokio::sync::{broadcast, oneshot};
38use tracing::{debug, warn};
39
40use crate::alert::{Alert, AlertKind};
41use crate::settings::Settings;
42
43/// Per-record state captured by [`InMemorySink`] for assertion in tests.
44/// Not exposed in production callers — production callers only consume
45/// the `Result` from [`NotificationSink::show`].
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct NotificationRecord {
48    /// First line / title of the notification (e.g. `"IronTide"`).
49    pub summary: String,
50    /// Body text — torrent name + status, already HTML-escaped via
51    /// [`sanitize_notification_text`].
52    pub body: String,
53}
54
55/// Failure modes for the sink contract. The dispatcher catches every
56/// variant and logs once via `dbus_failure_logged`; nothing propagates
57/// up the alert pipeline.
58#[derive(Debug, Error)]
59pub enum NotificationError {
60    /// The underlying `notify-rust` call returned an error (no D-Bus
61    /// session, daemon refused the message, etc.). String-coerced
62    /// because `notify_rust::error::Error` is not `Clone`.
63    #[error("notify-rust failed: {0}")]
64    Backend(String),
65    /// `tokio::task::spawn_blocking` join failure. Should never happen
66    /// outside of runtime shutdown; we surface it so tests can observe.
67    #[error("spawn_blocking join error: {0}")]
68    JoinError(String),
69    /// Injected by [`InMemorySink::with_failure`] for the
70    /// `notification_dispatcher_handles_sink_failure` test.
71    #[error("injected test failure: {0}")]
72    Test(String),
73}
74
75/// The dispatcher → OS bridge. Production wires this through
76/// [`LibNotifySink`]; tests inject [`InMemorySink`].
77///
78/// Why a trait at all: the F14 alternative (a `#[cfg(test)]`
79/// environment-variable hack) couples the prod code path to test
80/// configuration AND links `notify-rust` into the test binary. A trait
81/// keeps the boundary explicit and decouples test runtime from D-Bus.
82#[async_trait]
83pub trait NotificationSink: Send + Sync + 'static {
84    /// Emit a single notification. `summary` is the title; `body` is
85    /// already-sanitized HTML-escaped text suitable for daemons that
86    /// interpret a subset of HTML markup (KDE, GNOME).
87    async fn show(&self, summary: &str, body: &str) -> Result<(), NotificationError>;
88}
89
90/// Production sink: wraps `notify-rust` and runs the blocking call on
91/// the dedicated tokio blocking pool. Cheap to clone (it's a ZST).
92#[derive(Debug, Default, Clone, Copy)]
93pub struct LibNotifySink;
94
95impl LibNotifySink {
96    /// Construct a fresh production sink. Equivalent to `LibNotifySink`
97    /// but reads like a constructor at the call site.
98    #[must_use]
99    pub fn new() -> Self {
100        Self
101    }
102}
103
104#[async_trait]
105impl NotificationSink for LibNotifySink {
106    async fn show(&self, summary: &str, body: &str) -> Result<(), NotificationError> {
107        let summary = summary.to_string();
108        let body = body.to_string();
109        tokio::task::spawn_blocking(move || {
110            notify_rust::Notification::new()
111                .summary(&summary)
112                .body(&body)
113                .appname("irontide")
114                .show()
115                .map(|_handle| ())
116                .map_err(|e| NotificationError::Backend(e.to_string()))
117        })
118        .await
119        .map_err(|e| NotificationError::JoinError(e.to_string()))?
120    }
121}
122
123/// Test-only sink: stores every emitted record in an `Arc<Mutex<Vec<_>>>`
124/// so the test can drain + assert without parsing log output. Optionally
125/// returns a stubbed failure to exercise the `dbus_failure_logged` path.
126#[derive(Debug, Clone, Default)]
127pub struct InMemorySink {
128    /// Records accumulated across every [`Self::show`] call. Tests
129    /// `lock()` + `clone()` to snapshot.
130    pub records: Arc<Mutex<Vec<NotificationRecord>>>,
131    /// When set, [`Self::show`] returns this error string verbatim
132    /// (wrapped in [`NotificationError::Test`]) without recording.
133    fail_with: Option<String>,
134}
135
136impl InMemorySink {
137    /// Construct an empty in-memory sink with no failure injection.
138    #[must_use]
139    pub fn new() -> Self {
140        Self::default()
141    }
142
143    /// Construct a sink that fails every `show()` with the given message.
144    /// Used by `notification_dispatcher_handles_sink_failure`.
145    #[must_use]
146    pub fn with_failure(message: impl Into<String>) -> Self {
147        Self {
148            records: Arc::new(Mutex::new(Vec::new())),
149            fail_with: Some(message.into()),
150        }
151    }
152
153    /// Snapshot of the records vector. Convenient for assertions that
154    /// don't want to hold the lock.
155    #[must_use]
156    pub fn snapshot(&self) -> Vec<NotificationRecord> {
157        self.records.lock().clone()
158    }
159}
160
161#[async_trait]
162impl NotificationSink for InMemorySink {
163    async fn show(&self, summary: &str, body: &str) -> Result<(), NotificationError> {
164        if let Some(ref msg) = self.fail_with {
165            return Err(NotificationError::Test(msg.clone()));
166        }
167        self.records.lock().push(NotificationRecord {
168            summary: summary.to_string(),
169            body: body.to_string(),
170        });
171        Ok(())
172    }
173}
174
175/// HTML-escape a single string char-by-char so subsequent passes (e.g.
176/// running `replace("<", "&lt;")` after `replace("&", "&amp;")`)
177/// can't double-encode an already-escaped entity. Single pass, single
178/// allocation. Five characters covered per the WHATWG HTML-fragment
179/// rule (`<`, `>`, `&`, `"`, `'`). Anything else passes through verbatim
180/// — non-ASCII (CJK, accents, emoji) survives the escape.
181///
182/// G4 from the M226 plan: sequential `.replace()` is **not** safe.
183/// `"AT&T".replace("<", "&lt;").replace("&", "&amp;")` produces
184/// `"AT&amp;T"`, but `"<AT&T>".replace("<", "&lt;").replace("&", "&amp;")`
185/// produces `"&amp;lt;AT&amp;T&gt;"` — the second pass re-escapes the
186/// `&` we just emitted. Char-by-char closes that hole.
187#[must_use]
188pub fn sanitize_notification_text(s: &str) -> String {
189    let mut out = String::with_capacity(s.len());
190    for c in s.chars() {
191        match c {
192            '<' => out.push_str("&lt;"),
193            '>' => out.push_str("&gt;"),
194            '&' => out.push_str("&amp;"),
195            '"' => out.push_str("&quot;"),
196            '\'' => out.push_str("&apos;"),
197            _ => out.push(c),
198        }
199    }
200    out
201}
202
203/// Pure-function variant of the per-alert dispatch logic, peeled out
204/// from the dispatcher loop so it can be unit-tested in isolation
205/// without spinning up a real session. Returns the body text to pass to
206/// `sink.show()`, or `None` if the alert should be skipped (gate
207/// disabled or unrelated alert kind).
208fn dispatch_one(
209    settings: &Settings,
210    kind: &AlertKind,
211    name_cache: &HashMap<Id20, String>,
212) -> Option<(&'static str, String)> {
213    match kind {
214        AlertKind::TorrentFinished { info_hash } if settings.notify_on_complete => {
215            let raw = name_cache
216                .get(info_hash)
217                .cloned()
218                .unwrap_or_else(|| info_hash_short_hex(*info_hash));
219            let name = sanitize_notification_text(&raw);
220            Some(("IronTide", format!("{name} download complete")))
221        }
222        AlertKind::TorrentError { info_hash, message } if settings.notify_on_error => {
223            let raw = name_cache
224                .get(info_hash)
225                .cloned()
226                .unwrap_or_else(|| info_hash_short_hex(*info_hash));
227            let name = sanitize_notification_text(&raw);
228            let message = sanitize_notification_text(message);
229            Some(("IronTide", format!("{name}: {message}")))
230        }
231        _ => None,
232    }
233}
234
235/// Fallback name when no `TorrentAdded` event has populated the cache
236/// AND the async `torrent_info` lookup also fails (rare: shutdown race).
237fn info_hash_short_hex(hash: Id20) -> String {
238    let hex = hash.to_hex();
239    // Take the first 8 lowercase-hex chars — same convention the GUI
240    // uses for "unnamed magnet" placeholders before metadata arrives.
241    hex.chars().take(8).collect()
242}
243
244/// Dispatcher options for tests + production. Production callers
245/// invoke [`DispatcherOptions::production`] which carries a
246/// [`LibNotifySink`]; tests construct directly with an [`InMemorySink`].
247pub struct DispatcherOptions {
248    /// Box so the dispatcher stays object-safe across [`InMemorySink`]
249    /// (tests) and [`LibNotifySink`] (production).
250    pub sink: Box<dyn NotificationSink>,
251    /// Live settings snapshot; the dispatcher reads `notify_on_complete`
252    /// / `notify_on_error` from the most-recently-broadcast value on
253    /// every alert. `tokio::sync::watch` is the natural fit — exactly
254    /// one writer (`SessionActor::handle_apply_settings`) + many
255    /// readers (each cheap clone).
256    pub settings_rx: tokio::sync::watch::Receiver<Settings>,
257    /// Broadcast subscription. The dispatcher takes ownership; passing
258    /// it in lets the caller acquire the subscription BEFORE the
259    /// dispatcher task is spawned (avoids the missed-alert race on
260    /// session startup — H5 in the plan).
261    pub alerts_rx: broadcast::Receiver<Alert>,
262    /// Shutdown signal: drop the sender or call `.send(())` to ask the
263    /// dispatcher to exit cleanly.
264    pub shutdown_rx: oneshot::Receiver<()>,
265}
266
267/// Spawn a notification dispatcher task and return its `JoinHandle`. The
268/// caller owns the handle; awaiting it during session shutdown waits for
269/// the dispatcher's last in-flight `sink.show()` to drain.
270#[must_use]
271pub fn spawn_notification_dispatcher(opts: DispatcherOptions) -> tokio::task::JoinHandle<()> {
272    let DispatcherOptions {
273        sink,
274        settings_rx,
275        mut alerts_rx,
276        mut shutdown_rx,
277    } = opts;
278    tokio::spawn(async move {
279        let mut name_cache: HashMap<Id20, String> = HashMap::new();
280        let dbus_failure_logged = AtomicBool::new(false);
281        loop {
282            tokio::select! {
283                _ = &mut shutdown_rx => {
284                    debug!("notification dispatcher: shutdown signal received");
285                    break;
286                }
287                event = alerts_rx.recv() => {
288                    let alert = match event {
289                        Ok(alert) => alert,
290                        Err(broadcast::error::RecvError::Lagged(n)) => {
291                            // Lagged means we missed `n` alerts; the
292                            // name cache may be inconsistent. Log + keep
293                            // going — losing a few notifications is
294                            // strictly preferable to crashing the
295                            // dispatcher (which would silently kill OS
296                            // toasts for the rest of the session).
297                            warn!(lagged = n, "notification dispatcher: alert stream lagged");
298                            continue;
299                        }
300                        Err(broadcast::error::RecvError::Closed) => {
301                            debug!("notification dispatcher: alert stream closed");
302                            break;
303                        }
304                    };
305                    // Maintain the name cache before deciding whether to
306                    // dispatch — a TorrentAdded immediately followed by
307                    // TorrentFinished must observe the name in cache
308                    // (single-thread channel ordering guarantees this).
309                    match &alert.kind {
310                        AlertKind::TorrentAdded { info_hash, name } => {
311                            name_cache.insert(*info_hash, name.clone());
312                            continue;
313                        }
314                        AlertKind::TorrentRemoved { info_hash } => {
315                            name_cache.remove(info_hash);
316                            continue;
317                        }
318                        _ => {}
319                    }
320                    let settings = settings_rx.borrow().clone();
321                    let Some((summary, body)) = dispatch_one(&settings, &alert.kind, &name_cache)
322                    else {
323                        continue;
324                    };
325                    if let Err(e) = sink.show(summary, &body).await
326                        && !dbus_failure_logged.swap(true, Ordering::Relaxed)
327                    {
328                        warn!(
329                            error = %e,
330                            "notification dispatcher: sink failed; degrading silently for the rest of the session"
331                        );
332                    }
333                }
334            }
335        }
336    })
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use crate::alert::Alert;
343    use std::time::Duration;
344
345    fn fake_hash(byte: u8) -> Id20 {
346        Id20([byte; 20])
347    }
348
349    #[test]
350    fn sanitizer_escapes_all_five_html_metacharacters() {
351        let input = r#"<b>"hi" & 'bye'</b>"#;
352        let out = sanitize_notification_text(input);
353        assert_eq!(out, "&lt;b&gt;&quot;hi&quot; &amp; &apos;bye&apos;&lt;/b&gt;");
354    }
355
356    /// M226 G4 regression guard: sequential `.replace("<")` then
357    /// `.replace("&")` produces `&amp;lt;` for input `<`. Char-by-char
358    /// must NOT double-escape.
359    #[test]
360    fn sanitizer_does_not_double_escape_existing_entities() {
361        // "AT&T" → "AT&amp;T" (NOT "AT&amp;amp;T")
362        assert_eq!(sanitize_notification_text("AT&T"), "AT&amp;T");
363        // "<AT&T>" → "&lt;AT&amp;T&gt;" (NOT "&amp;lt;AT&amp;amp;T&amp;gt;")
364        assert_eq!(
365            sanitize_notification_text("<AT&T>"),
366            "&lt;AT&amp;T&gt;"
367        );
368    }
369
370    #[test]
371    fn sanitizer_passes_through_non_ascii_unchanged() {
372        // Canadian English orthography + CJK + emoji must survive verbatim.
373        let input = "Façade — 管理 — 🦀";
374        assert_eq!(sanitize_notification_text(input), input);
375    }
376
377    #[test]
378    fn dispatch_one_skips_when_notify_on_complete_false() {
379        let s = Settings {
380            notify_on_complete: false,
381            ..Default::default()
382        };
383        let hash = fake_hash(0xAA);
384        let cache = HashMap::from([(hash, "test-torrent".to_string())]);
385        let result = dispatch_one(
386            &s,
387            &AlertKind::TorrentFinished { info_hash: hash },
388            &cache,
389        );
390        assert!(result.is_none(), "gate false must skip dispatch");
391    }
392
393    #[test]
394    fn dispatch_one_emits_when_notify_on_complete_true() {
395        let s = Settings {
396            notify_on_complete: true,
397            ..Default::default()
398        };
399        let hash = fake_hash(0xBB);
400        let cache = HashMap::from([(hash, "my-movie".to_string())]);
401        let (summary, body) = dispatch_one(
402            &s,
403            &AlertKind::TorrentFinished { info_hash: hash },
404            &cache,
405        )
406        .expect("gate true + finished must dispatch");
407        assert_eq!(summary, "IronTide");
408        assert_eq!(body, "my-movie download complete");
409    }
410
411    #[test]
412    fn dispatch_one_emits_error_body_with_sanitised_message() {
413        let s = Settings {
414            notify_on_error: true,
415            ..Default::default()
416        };
417        let hash = fake_hash(0xCC);
418        let cache = HashMap::from([(hash, "<evil>".to_string())]);
419        let (summary, body) = dispatch_one(
420            &s,
421            &AlertKind::TorrentError {
422                info_hash: hash,
423                message: "disk full <again>".to_string(),
424            },
425            &cache,
426        )
427        .expect("gate true + error must dispatch");
428        assert_eq!(summary, "IronTide");
429        assert_eq!(body, "&lt;evil&gt;: disk full &lt;again&gt;");
430    }
431
432    #[test]
433    fn dispatch_one_falls_back_to_hex_prefix_when_cache_miss() {
434        let s = Settings {
435            notify_on_complete: true,
436            ..Default::default()
437        };
438        let hash = fake_hash(0xDE);
439        let cache = HashMap::new();
440        let (_, body) =
441            dispatch_one(&s, &AlertKind::TorrentFinished { info_hash: hash }, &cache).unwrap();
442        // First 8 chars of the lowercase hex of [0xDE; 20] is "dededede".
443        assert!(
444            body.starts_with("dededede"),
445            "cache miss must fall back to hex prefix, got: {body}"
446        );
447    }
448
449    #[tokio::test]
450    async fn in_memory_sink_records_and_can_inject_failure() {
451        let sink = InMemorySink::new();
452        sink.show("title", "body").await.unwrap();
453        let snap = sink.snapshot();
454        assert_eq!(snap.len(), 1);
455        assert_eq!(snap[0].summary, "title");
456
457        let failing = InMemorySink::with_failure("boom");
458        let err = failing.show("t", "b").await.unwrap_err();
459        assert!(matches!(err, NotificationError::Test(_)));
460    }
461
462    /// Drive the dispatcher loop end-to-end with [`InMemorySink`] + a real
463    /// broadcast channel. Confirms that the alert pump → cache lookup →
464    /// [`dispatch_one`] → `sink.show` chain works.
465    #[tokio::test]
466    async fn dispatcher_emits_completion_notification_with_cached_name() {
467        let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
468        let (shutdown_tx, shutdown_rx) = oneshot::channel();
469        let settings = Settings {
470            notify_on_complete: true,
471            ..Default::default()
472        };
473        let (settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
474        // Suppress unused-var lint for the test-only writer handle.
475        let _ = &settings_tx;
476
477        let sink = InMemorySink::new();
478        let join = spawn_notification_dispatcher(DispatcherOptions {
479            sink: Box::new(sink.clone()),
480            settings_rx,
481            alerts_rx: alert_rx,
482            shutdown_rx,
483        });
484
485        let hash = fake_hash(0xEE);
486        alert_tx
487            .send(Alert::new(AlertKind::TorrentAdded {
488                info_hash: hash,
489                name: "demo-torrent".to_string(),
490            }))
491            .unwrap();
492        alert_tx
493            .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
494            .unwrap();
495
496        // Give the dispatcher loop time to drain.
497        tokio::time::sleep(Duration::from_millis(50)).await;
498        let _ = shutdown_tx.send(());
499        join.await.unwrap();
500
501        let records = sink.snapshot();
502        assert_eq!(records.len(), 1, "exactly one notification expected");
503        assert_eq!(records[0].summary, "IronTide");
504        assert_eq!(records[0].body, "demo-torrent download complete");
505    }
506
507    /// Sink failure must NOT crash the dispatcher and must log-once.
508    #[tokio::test]
509    async fn dispatcher_handles_sink_failure_without_crashing() {
510        let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
511        let (shutdown_tx, shutdown_rx) = oneshot::channel();
512        let settings = Settings {
513            notify_on_complete: true,
514            ..Default::default()
515        };
516        let (_settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
517
518        let sink = InMemorySink::with_failure("no dbus session");
519        let join = spawn_notification_dispatcher(DispatcherOptions {
520            sink: Box::new(sink),
521            settings_rx,
522            alerts_rx: alert_rx,
523            shutdown_rx,
524        });
525
526        let hash = fake_hash(0xFF);
527        alert_tx
528            .send(Alert::new(AlertKind::TorrentAdded {
529                info_hash: hash,
530                name: "fail-test".to_string(),
531            }))
532            .unwrap();
533        alert_tx
534            .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
535            .unwrap();
536        alert_tx
537            .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
538            .unwrap();
539
540        tokio::time::sleep(Duration::from_millis(50)).await;
541        let _ = shutdown_tx.send(());
542        // Awaiting the join handle is the assertion: a panicking
543        // dispatcher would surface here as JoinError.
544        join.await.expect("dispatcher must not panic on sink failure");
545    }
546
547    /// Live toggle: TorrentFinished#1 fires with gate=false (no record),
548    /// then settings flip to true, TorrentFinished#2 fires (record).
549    #[tokio::test]
550    async fn dispatcher_respects_live_settings_toggle() {
551        let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
552        let (shutdown_tx, shutdown_rx) = oneshot::channel();
553        let settings = Settings {
554            notify_on_complete: false,
555            ..Default::default()
556        };
557        let (settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
558
559        let sink = InMemorySink::new();
560        let join = spawn_notification_dispatcher(DispatcherOptions {
561            sink: Box::new(sink.clone()),
562            settings_rx,
563            alerts_rx: alert_rx,
564            shutdown_rx,
565        });
566
567        let hash_a = fake_hash(0xA1);
568        alert_tx
569            .send(Alert::new(AlertKind::TorrentAdded {
570                info_hash: hash_a,
571                name: "first".to_string(),
572            }))
573            .unwrap();
574        alert_tx
575            .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash_a }))
576            .unwrap();
577        tokio::time::sleep(Duration::from_millis(30)).await;
578        // Flip gate to true; subsequent finish must emit.
579        settings_tx.send_modify(|s| s.notify_on_complete = true);
580
581        let hash_b = fake_hash(0xB2);
582        alert_tx
583            .send(Alert::new(AlertKind::TorrentAdded {
584                info_hash: hash_b,
585                name: "second".to_string(),
586            }))
587            .unwrap();
588        alert_tx
589            .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash_b }))
590            .unwrap();
591        tokio::time::sleep(Duration::from_millis(50)).await;
592        let _ = shutdown_tx.send(());
593        join.await.unwrap();
594
595        let records = sink.snapshot();
596        assert_eq!(
597            records.len(),
598            1,
599            "only the second TorrentFinished must emit"
600        );
601        assert_eq!(records[0].body, "second download complete");
602    }
603
604    /// Cache eviction: after [`AlertKind::TorrentRemoved`] the next [`AlertKind::TorrentFinished`]
605    /// for the same hash must fall through to the hex-prefix fallback.
606    /// ([`AlertKind::TorrentFinished`] after removal isn't a normal real-world ordering
607    /// but the dispatcher must tolerate the broadcast race regardless.)
608    #[tokio::test]
609    async fn dispatcher_evicts_name_cache_on_torrent_removed() {
610        let (alert_tx, alert_rx) = broadcast::channel::<Alert>(16);
611        let (shutdown_tx, shutdown_rx) = oneshot::channel();
612        let settings = Settings {
613            notify_on_complete: true,
614            ..Default::default()
615        };
616        let (_settings_tx, settings_rx) = tokio::sync::watch::channel(settings);
617
618        let sink = InMemorySink::new();
619        let join = spawn_notification_dispatcher(DispatcherOptions {
620            sink: Box::new(sink.clone()),
621            settings_rx,
622            alerts_rx: alert_rx,
623            shutdown_rx,
624        });
625
626        let hash = fake_hash(0xCA);
627        alert_tx
628            .send(Alert::new(AlertKind::TorrentAdded {
629                info_hash: hash,
630                name: "cache-test".to_string(),
631            }))
632            .unwrap();
633        alert_tx
634            .send(Alert::new(AlertKind::TorrentRemoved { info_hash: hash }))
635            .unwrap();
636        // Now drive a Finished for the same hash; cache should be empty.
637        alert_tx
638            .send(Alert::new(AlertKind::TorrentFinished { info_hash: hash }))
639            .unwrap();
640
641        tokio::time::sleep(Duration::from_millis(50)).await;
642        let _ = shutdown_tx.send(());
643        join.await.unwrap();
644
645        let records = sink.snapshot();
646        assert_eq!(records.len(), 1);
647        assert!(
648            !records[0].body.contains("cache-test"),
649            "after eviction the cached name must NOT appear; got {}",
650            records[0].body
651        );
652        // hex prefix of [0xCA; 20] starts with "cacacaca".
653        assert!(
654            records[0].body.starts_with("cacacaca"),
655            "expected hex-prefix fallback; got {}",
656            records[0].body
657        );
658    }
659
660    #[tokio::test]
661    async fn dispatcher_exits_cleanly_when_alert_stream_closes() {
662        let (alert_tx, alert_rx) = broadcast::channel::<Alert>(4);
663        let (_shutdown_tx, shutdown_rx) = oneshot::channel();
664        let (_settings_tx, settings_rx) = tokio::sync::watch::channel(Settings::default());
665        let join = spawn_notification_dispatcher(DispatcherOptions {
666            sink: Box::new(InMemorySink::new()),
667            settings_rx,
668            alerts_rx: alert_rx,
669            shutdown_rx,
670        });
671        // Dropping the broadcast Sender closes the stream.
672        drop(alert_tx);
673        // Dispatcher must observe Closed and exit.
674        tokio::time::timeout(Duration::from_secs(1), join)
675            .await
676            .expect("dispatcher must exit after broadcast Sender drops")
677            .unwrap();
678    }
679}