Skip to main content

ts_runtime/
ipn_bus.rs

1//! Unified IPN notification bus: a single push-style stream that coalesces the device's
2//! connection-[`DeviceState`] and netmap peer-set changes into one [`Notify`] feed, mirroring Go
3//! `ipn` `LocalBackend.WatchNotifications` / the `WatchIPNBus` LocalAPI.
4//!
5//! Go delivers one `ipn.Notify` struct per event in which **only the changed fields are populated**
6//! (a nil field means "unchanged"); an optional subscribe-time mask ([`NotifyWatchOpt`]) front-loads
7//! an initial snapshot of the current state. [`Notify`] is the faithful Rust shape of that struct —
8//! a struct of `Option`s, not a per-event enum.
9//!
10//! # Coalescing: initial snapshot vs. streamed events
11//!
12//! The struct-of-`Option`s shape lets one `Notify` carry several changed fields at once. This bus
13//! exploits that **for the initial snapshot only**: the subscribe-time snapshot reads every source
14//! cell synchronously and packs the masked fields into one `Notify`. Post-subscribe, the merge loop
15//! is per-source — each source cell's change produces its own single-field `Notify` (a state change
16//! yields `state: Some`, a peer change yields `net_map: Some`), because the cells are independent
17//! `watch` channels with no cross-cell synchronization point to coalesce on. A consumer therefore
18//! sees at most one coalesced snapshot followed by single-field deltas. (Go can pack several fields
19//! into one streamed `Notify` because a single `MapResponse` updates several things together under
20//! one lock; the fork has already split those into separate cells, so the equivalent streamed events
21//! arrive separately here. The `Option` shape is still the right type — it keeps the snapshot
22//! faithful and leaves room for a future single source to set multiple fields.)
23//!
24//! # Why these sources
25//!
26//! The fork already decomposes Go's single notification channel into separate, individually-correct
27//! `watch` surfaces ([`Runtime::watch_state`](crate::Runtime::watch_state),
28//! [`Runtime::watch_netmap`](crate::Runtime::watch_netmap)). This bus *composes* the same cells (one
29//! source of truth — it cannot diverge from the narrow views) into the merged feed an embedder
30//! porting from Go's `WatchIPNBus` expects. The two cells it reads map onto Go `Notify` fields:
31//!
32//! - [`DeviceState`] → `Notify.State`, and the **registration-time** interactive-login URL carried
33//!   by [`DeviceState::NeedsLogin`] (`Notify.browse_to_url`, derived from that state — control's
34//!   `MachineNotAuthorized`).
35//! - the running-node consent URL (`MapResponse.PopBrowserURL`) → `Notify.browse_to_url` as a
36//!   mid-session event. Go also forwards this `BrowseToURL` for an already-`Running` node (re-auth /
37//!   forced-re-login nudges). The fork's backing cell is **sticky** (the producer updates it only on
38//!   a new non-empty URL, never resets it to `None` on an empty update — Go's `direct.go` guard
39//!   `u != "" && u != sess.lastPopBrowserURL`), so a `watch` subscriber is not thrashed. It is
40//!   streamed post-subscribe but **not** front-loaded into the initial snapshot — Go replays only the
41//!   registration `b.authURL` (the `NeedsLogin`-derived URL above) on a new watcher, never the
42//!   running-node `PopBrowserURL`; a consumer wanting the current pending URL at subscribe time reads
43//!   the sticky `pop_browser_url` pull API.
44//! - the peer set (`Vec<StatusNode>`) → `Notify.NetMap` (the embedder-facing peer view).
45//!
46//! Go's `Notify` has no packet-filter cap-grant field (caps are an internal `WhoIs` input, not an
47//! embedder notification), so the retained cap-grants cell is intentionally **not** surfaced here.
48//!
49//! # Lossy by design
50//!
51//! Like Go's bus (a bounded 128-deep channel drained with a non-blocking `select { case ch<-n:
52//! default: drop }`), delivery is best-effort: the per-watcher [`mpsc`] is bounded at
53//! [`NOTIFY_BUFFER`] and a notification for a watcher whose buffer is full is **dropped**, never
54//! blocking the producer. The underlying `watch` cells are themselves coalescing, so a slow consumer
55//! observes the latest state, not every intermediate — the right semantics for state/netmap
56//! snapshots (and the reason this bus is not used for any at-least-once delivery).
57
58use tokio::sync::{mpsc, watch};
59
60use crate::{device_state::DeviceState, status::StatusNode};
61
62/// Per-watcher notification buffer depth. Matches Go's `ipn` bus channel size
63/// (`make(chan *ipn.Notify, 128)`): a bounded queue that the producer never blocks on — a full
64/// buffer drops the notification (see module docs).
65pub const NOTIFY_BUFFER: usize = 128;
66
67/// Selects which initial-state fields are front-loaded into the first [`Notify`] when a watcher
68/// subscribes (Go `ipn.NotifyWatchOpt`). A bitfield; combine with `|`.
69///
70/// The numeric values match Go's `NotifyWatchOpt` literals exactly (`NotifyInitialState = 1 << 1`,
71/// `NotifyInitialNetMap = 1 << 3`), so a mask built from Go's integer constants is wire-compatible.
72/// Bits Go defines but this bus does not yet surface (initial prefs/health/etc.) are simply not
73/// honored — passing them is harmless, exactly as an unrecognized bit is in Go.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
75pub struct NotifyWatchOpt(u64);
76
77impl NotifyWatchOpt {
78    /// No initial snapshot: the watcher receives only changes that occur after it subscribes.
79    pub const fn empty() -> Self {
80        Self(0)
81    }
82
83    /// Front-load the current [`DeviceState`] (and, when it is [`DeviceState::NeedsLogin`], the
84    /// auth URL as `browse_to_url`) into the first [`Notify`]. Go `NotifyInitialState` (`1 << 1`).
85    pub const INITIAL_STATE: Self = Self(1 << 1);
86
87    /// Front-load the current peer set (`net_map`) into the first [`Notify`]. Go
88    /// `NotifyInitialNetMap` (`1 << 3`).
89    pub const INITIAL_NETMAP: Self = Self(1 << 3);
90
91    /// Whether all bits in `other` are set in `self`.
92    pub const fn contains(self, other: Self) -> bool {
93        self.0 & other.0 == other.0
94    }
95}
96
97impl core::ops::BitOr for NotifyWatchOpt {
98    type Output = Self;
99    fn bitor(self, rhs: Self) -> Self {
100        Self(self.0 | rhs.0)
101    }
102}
103
104/// A single notification from the [IPN bus](self), mirroring Go `ipn.Notify`: each field is `Some`
105/// only when it changed in this event (a `None` field means "unchanged"). One event may populate
106/// several fields at once (e.g. a netmap update that also moves the device state).
107///
108/// `#[non_exhaustive]` so future Go-parity fields (prefs, engine status, health) can be added
109/// without breaking embedders that match on it.
110#[derive(Debug, Clone, Default, PartialEq, Eq)]
111#[non_exhaustive]
112pub struct Notify {
113    /// The new device connection-state, if it changed (Go `Notify.State`).
114    pub state: Option<DeviceState>,
115    /// The new peer set, if the netmap changed (Go `Notify.NetMap`, embedder-facing peer view).
116    pub net_map: Option<Vec<StatusNode>>,
117    /// An interactive-login / consent URL the embedder should open (Go `Notify.BrowseToURL`). Two
118    /// sources feed it: the **registration-time** auth URL, derived from [`DeviceState::NeedsLogin`]
119    /// and set alongside `state` when the device enters that state; and the **mid-session**
120    /// `MapResponse.PopBrowserURL` (re-auth / consent on an already-running node), streamed on its own
121    /// as a standalone event. See the module docs for which is front-loaded into the initial snapshot
122    /// (only the registration URL) vs. streamed (both).
123    pub browse_to_url: Option<url::Url>,
124}
125
126impl Notify {
127    /// Whether this notification carries no populated field. An all-`None` `Notify` is never
128    /// delivered (the bus skips it), so observing one from [`IpnBusWatcher::next`] is impossible;
129    /// the predicate exists for the bus's own "is there anything to send?" check.
130    fn is_empty(&self) -> bool {
131        self.state.is_none() && self.net_map.is_none() && self.browse_to_url.is_none()
132    }
133}
134
135/// A handle to a live [IPN bus](self) subscription, mirroring Go's `IPNBusWatcher`. Await
136/// [`next`](Self::next) to receive [`Notify`] events; it returns `None` when the stream ends (the
137/// runtime shut down, or this watcher was dropped).
138#[derive(Debug)]
139pub struct IpnBusWatcher {
140    rx: mpsc::Receiver<Notify>,
141}
142
143impl IpnBusWatcher {
144    /// Await the next [`Notify`]. Returns `None` once the bus has terminated (runtime shutdown or
145    /// every source cell's sender dropped) — the clean end-of-stream signal, like Go's watcher
146    /// channel closing.
147    pub async fn next(&mut self) -> Option<Notify> {
148        self.rx.recv().await
149    }
150}
151
152/// Spawn the bus task feeding `tx` and return the consumer handle. Reads cloned `watch` receivers
153/// (so it never contends with the runtime's own readers) and a `shutdown` receiver that terminates
154/// the task. The task self-terminates on shutdown, on any source sender dropping, or when the
155/// returned [`IpnBusWatcher`] is dropped (the `tx` send then reports the channel closed) — so it
156/// cannot leak past the runtime or a discarded watcher.
157pub(crate) fn spawn_watcher(
158    mask: NotifyWatchOpt,
159    state_rx: watch::Receiver<DeviceState>,
160    peer_rx: watch::Receiver<Vec<StatusNode>>,
161    browser_rx: watch::Receiver<Option<url::Url>>,
162    shutdown_rx: watch::Receiver<bool>,
163) -> IpnBusWatcher {
164    let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
165    tokio::spawn(run_bus(
166        mask,
167        state_rx,
168        peer_rx,
169        browser_rx,
170        shutdown_rx,
171        tx,
172    ));
173    IpnBusWatcher { rx }
174}
175
176/// Try to deliver `n`, returning `true` when the bus should stop (the consumer is gone).
177///
178/// Mirrors Go's non-blocking `select { case ch <- n: default: /* drop */ }`: a `Full` buffer drops
179/// the notification and keeps streaming (best-effort delivery, never block the producer); a `Closed`
180/// channel means the watcher was dropped, so the task is done.
181fn deliver(tx: &mpsc::Sender<Notify>, n: Notify) -> bool {
182    match tx.try_send(n) {
183        Ok(()) => false,
184        Err(mpsc::error::TrySendError::Full(_)) => false,
185        Err(mpsc::error::TrySendError::Closed(_)) => true,
186    }
187}
188
189/// The interactive-login URL implied by a device state: `Some` only for [`DeviceState::NeedsLogin`].
190/// The single derivation rule for `browse_to_url`, shared by the initial snapshot and the streaming
191/// state arm so the two can never drift (see module docs on the registration-time URL).
192fn browse_url_for(state: &DeviceState) -> Option<url::Url> {
193    match state {
194        DeviceState::NeedsLogin(u) => Some(u.clone()),
195        _ => None,
196    }
197}
198
199/// Build the `Notify` for a device-state transition: the state plus its derived `browse_to_url`.
200fn state_notify(state: DeviceState) -> Notify {
201    let browse_to_url = browse_url_for(&state);
202    Notify {
203        state: Some(state),
204        net_map: None,
205        browse_to_url,
206    }
207}
208
209/// The bus loop, factored out of [`spawn_watcher`] so the (subtle) ordering — the masked initial
210/// snapshot, the `borrow_and_update` that prevents an initial-value busy-loop, the shutdown arm, and
211/// sender-drop termination — is unit-testable against plain `watch`/`mpsc` channels without standing
212/// up a runtime (mirrors [`device_state::wait_for_running`](crate::device_state::wait_for_running)).
213pub(crate) async fn run_bus(
214    mask: NotifyWatchOpt,
215    mut state_rx: watch::Receiver<DeviceState>,
216    mut peer_rx: watch::Receiver<Vec<StatusNode>>,
217    mut browser_rx: watch::Receiver<Option<url::Url>>,
218    mut shutdown_rx: watch::Receiver<bool>,
219    tx: mpsc::Sender<Notify>,
220) {
221    // If the runtime is already shutting down, end before doing anything. This also marks the
222    // shutdown cell's initial `false` as *seen* so the `select!` arm below doesn't fire spuriously
223    // on the unobserved initial value (the classic `watch`-in-`select!` busy-loop).
224    if *shutdown_rx.borrow_and_update() {
225        return;
226    }
227
228    // Initial snapshot: ONE coalesced `Notify` carrying whichever masked fields are requested
229    // (Go front-loads State+NetMap into a single `ini` struct). `borrow_and_update` reads the
230    // current value AND marks it seen, so the streaming loop's first `changed()` waits for a real
231    // transition instead of re-emitting the value we just snapshotted.
232    let mut initial = Notify::default();
233    {
234        let state = state_rx.borrow_and_update();
235        if mask.contains(NotifyWatchOpt::INITIAL_STATE) {
236            initial.browse_to_url = browse_url_for(&state);
237            initial.state = Some(state.clone());
238        }
239    }
240    {
241        let peers = peer_rx.borrow_and_update();
242        if mask.contains(NotifyWatchOpt::INITIAL_NETMAP) {
243            initial.net_map = Some(peers.clone());
244        }
245    }
246    // Mark the running-node browser-URL cell's initial value seen so the streaming arm waits for a
247    // real post-subscribe change (busy-loop prevention, same as the cells above). Its current value
248    // is deliberately NOT front-loaded into the initial snapshot: Go replays only the
249    // registration-time auth URL (the `NeedsLogin`-derived `browse_to_url` above), never the
250    // running-node `MapResponse.PopBrowserURL`, on a new watcher's initial state. A consumer wanting
251    // the current pending consent URL at subscribe time reads the sticky `pop_browser_url` pull API;
252    // the bus streams future transitions.
253    browser_rx.borrow_and_update();
254    if !initial.is_empty() && deliver(&tx, initial) {
255        return;
256    }
257
258    // Stream subsequent changes. `biased` makes shutdown take priority over data so a teardown is
259    // observed promptly. Each data arm re-reads with `borrow_and_update().clone()` into an owned
260    // value and drops the borrow guard *before* the next await — never holding a `watch` read guard
261    // across `.changed()` (which would deadlock). A sender-drop (`changed()` => `Err`) ends the
262    // stream, exactly as `wait_for_running` treats it.
263    loop {
264        tokio::select! {
265            biased;
266            _ = shutdown_rx.changed() => return,
267            // The consumer dropped its `IpnBusWatcher`: reclaim the task immediately rather than
268            // waiting for the next source change to surface a `Closed` on the next `deliver`. On an
269            // idle (quiet) device that next change might be far off, so without this arm a dropped
270            // watcher would leave the task parked until shutdown. `Sender::closed()` resolves once
271            // every receiver is gone.
272            _ = tx.closed() => return,
273            changed = state_rx.changed() => {
274                if changed.is_err() {
275                    return;
276                }
277                let state = state_rx.borrow_and_update().clone();
278                if deliver(&tx, state_notify(state)) {
279                    return;
280                }
281            }
282            changed = peer_rx.changed() => {
283                if changed.is_err() {
284                    return;
285                }
286                let peers = peer_rx.borrow_and_update().clone();
287                let notify = Notify {
288                    state: None,
289                    net_map: Some(peers),
290                    browse_to_url: None,
291                };
292                if deliver(&tx, notify) {
293                    return;
294                }
295            }
296            changed = browser_rx.changed() => {
297                if changed.is_err() {
298                    return;
299                }
300                // The running-node consent URL (`MapResponse.PopBrowserURL`). The producer cell is
301                // de-thrashed (updated only on a new non-empty URL, never reset to `None`), so a
302                // change here carries a fresh `Some(url)`; skip the defensive `None` case rather than
303                // emit an empty `browse_to_url`.
304                let url = browser_rx.borrow_and_update().clone();
305                if let Some(url) = url {
306                    let notify = Notify {
307                        state: None,
308                        net_map: None,
309                        browse_to_url: Some(url),
310                    };
311                    if deliver(&tx, notify) {
312                        return;
313                    }
314                }
315            }
316        }
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use core::time::Duration;
323
324    use tokio::sync::{mpsc, watch};
325
326    use super::*;
327
328    /// The hand-made channel senders (state, peer, browser-URL, shutdown) plus the consumer handle
329    /// that [`harness`] returns — the four source senders let a test drive `run_bus`, and the
330    /// `IpnBusWatcher` observes what it emits.
331    type Harness = (
332        watch::Sender<DeviceState>,
333        watch::Sender<Vec<StatusNode>>,
334        watch::Sender<Option<url::Url>>,
335        watch::Sender<bool>,
336        IpnBusWatcher,
337    );
338
339    /// Drive `run_bus` on a task against hand-made channels, returning the senders (state, peer,
340    /// browser-URL, shutdown) and the consumer handle. Mirrors how `device_state` tests drive
341    /// `wait_for_running` off a plain `watch`.
342    fn harness(mask: NotifyWatchOpt, state: DeviceState, peers: Vec<StatusNode>) -> Harness {
343        let (state_tx, state_rx) = watch::channel(state);
344        let (peer_tx, peer_rx) = watch::channel(peers);
345        let (browser_tx, browser_rx) = watch::channel(None);
346        let (shutdown_tx, shutdown_rx) = watch::channel(false);
347        let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
348        tokio::spawn(run_bus(
349            mask,
350            state_rx,
351            peer_rx,
352            browser_rx,
353            shutdown_rx,
354            tx,
355        ));
356        (
357            state_tx,
358            peer_tx,
359            browser_tx,
360            shutdown_tx,
361            IpnBusWatcher { rx },
362        )
363    }
364
365    fn login_url() -> url::Url {
366        "https://login.example/auth".parse().unwrap()
367    }
368
369    fn consent_url() -> url::Url {
370        "https://login.example/consent".parse().unwrap()
371    }
372
373    /// A minimal non-empty peer, so a `net_map` payload assertion exercises a real value rather than
374    /// the degenerate empty-vec round-trip.
375    fn peer(id: &str) -> StatusNode {
376        use core::net::{IpAddr, Ipv4Addr, Ipv6Addr};
377        StatusNode {
378            stable_id: ts_control::StableNodeId(id.to_owned()),
379            display_name: id.to_owned(),
380            ipv4: IpAddr::V4(Ipv4Addr::new(100, 64, 0, 1)),
381            ipv6: IpAddr::V6(Ipv6Addr::LOCALHOST),
382            online: Some(true),
383            last_seen: None,
384            allowed_routes: Vec::new(),
385            is_exit_node: false,
386            cur_addr: None,
387            relay: None,
388        }
389    }
390
391    /// A negative-assertion window: long enough that a real-but-slow event would still arrive within
392    /// it on a loaded CI box (so "nothing arrived" is trustworthy, not just "nothing arrived *yet*").
393    const QUIET_WINDOW: Duration = Duration::from_millis(250);
394
395    /// `NotifyWatchOpt` is a faithful bitfield: Go's literal values, `contains`, and `|` compose.
396    #[test]
397    fn mask_bitfield_semantics() {
398        assert!(NotifyWatchOpt::empty().contains(NotifyWatchOpt::empty()));
399        assert!(!NotifyWatchOpt::empty().contains(NotifyWatchOpt::INITIAL_STATE));
400        let both = NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP;
401        assert!(both.contains(NotifyWatchOpt::INITIAL_STATE));
402        assert!(both.contains(NotifyWatchOpt::INITIAL_NETMAP));
403        // Wire-compatible with Go's NotifyWatchOpt integer literals.
404        assert_eq!(NotifyWatchOpt::INITIAL_STATE, NotifyWatchOpt(1 << 1));
405        assert_eq!(NotifyWatchOpt::INITIAL_NETMAP, NotifyWatchOpt(1 << 3));
406    }
407
408    /// `NotifyInitialState` front-loads the current state into the first `Notify` (state only, no
409    /// net_map).
410    #[tokio::test]
411    async fn initial_state_snapshot_emitted_when_masked() {
412        let (_s, _p, _b, _sd, mut w) = harness(
413            NotifyWatchOpt::INITIAL_STATE,
414            DeviceState::Running,
415            Vec::new(),
416        );
417        let n = w.next().await.expect("initial snapshot");
418        assert_eq!(n.state, Some(DeviceState::Running));
419        assert_eq!(n.net_map, None);
420        assert_eq!(n.browse_to_url, None);
421    }
422
423    /// `NotifyInitialNetMap` front-loads the current peer set (net_map only, no state).
424    #[tokio::test]
425    async fn initial_netmap_snapshot_emitted_when_masked() {
426        let (_s, _p, _b, _sd, mut w) = harness(
427            NotifyWatchOpt::INITIAL_NETMAP,
428            DeviceState::Running,
429            Vec::new(),
430        );
431        let n = w.next().await.expect("initial snapshot");
432        assert_eq!(n.net_map, Some(Vec::new()));
433        assert_eq!(n.state, None);
434    }
435
436    /// Both initial bits coalesce into ONE `Notify` (Go builds a single `ini` struct), not two
437    /// separate events.
438    #[tokio::test]
439    async fn initial_snapshot_coalesces_both_fields() {
440        let (_s, _p, _b, _sd, mut w) = harness(
441            NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP,
442            DeviceState::Running,
443            Vec::new(),
444        );
445        let n = w.next().await.expect("initial snapshot");
446        assert_eq!(n.state, Some(DeviceState::Running));
447        assert_eq!(n.net_map, Some(Vec::new()));
448    }
449
450    /// An empty mask sends NO initial snapshot; the watcher then receives the next real transition.
451    #[tokio::test]
452    async fn empty_mask_skips_initial_then_streams_change() {
453        let (state_tx, _p, _b, _sd, mut w) =
454            harness(NotifyWatchOpt::empty(), DeviceState::Connecting, Vec::new());
455        // No initial snapshot: nothing within the quiet window.
456        assert!(
457            tokio::time::timeout(QUIET_WINDOW, w.next()).await.is_err(),
458            "empty mask must not emit an initial snapshot"
459        );
460        // Positive anchor: the watcher is still live and delivers the next real transition (so the
461        // negative assertion above was "nothing to send", not "stream already dead").
462        state_tx.send_replace(DeviceState::Running);
463        let n = w.next().await.expect("change after subscribe");
464        assert_eq!(n.state, Some(DeviceState::Running));
465    }
466
467    /// A `NeedsLogin` transition derives `browse_to_url` alongside `state` — one source of truth for
468    /// the auth URL.
469    #[tokio::test]
470    async fn needs_login_transition_derives_browse_to_url() {
471        // Subscribe with INITIAL_STATE so awaiting the first `next()` (the snapshot) is a
472        // deterministic barrier proving the bus task has finished its init borrows and entered the
473        // streaming loop — only then is a post-subscribe send guaranteed to be observed (no sleeps,
474        // no spawn-vs-send race). Any change after `.changed()`'s seen-version is detected even if
475        // the loop is not yet parked on `.changed()`.
476        let (state_tx, _p, _b, _sd, mut w) = harness(
477            NotifyWatchOpt::INITIAL_STATE,
478            DeviceState::Connecting,
479            Vec::new(),
480        );
481        let snap = w.next().await.expect("initial snapshot");
482        assert_eq!(snap.state, Some(DeviceState::Connecting));
483        assert_eq!(snap.browse_to_url, None);
484        state_tx.send_replace(DeviceState::NeedsLogin(login_url()));
485        let n = w.next().await.expect("needs-login event");
486        assert_eq!(n.state, Some(DeviceState::NeedsLogin(login_url())));
487        assert_eq!(n.browse_to_url, Some(login_url()));
488    }
489
490    /// `NeedsLogin` present at subscribe is front-loaded with its `browse_to_url` (matches Go: the
491    /// initial snapshot carries `BrowseToURL` only when `state == NeedsLogin`).
492    #[tokio::test]
493    async fn initial_needs_login_includes_browse_to_url() {
494        let (_s, _p, _b, _sd, mut w) = harness(
495            NotifyWatchOpt::INITIAL_STATE,
496            DeviceState::NeedsLogin(login_url()),
497            Vec::new(),
498        );
499        let n = w.next().await.expect("initial snapshot");
500        assert_eq!(n.browse_to_url, Some(login_url()));
501    }
502
503    /// A peer-set change streams as a `net_map` notification (no state field), carrying the actual
504    /// new peer payload (not just the degenerate empty round-trip).
505    #[tokio::test]
506    async fn peer_change_streams_netmap() {
507        // INITIAL_NETMAP snapshot is the barrier (proves the task finished its init borrows and is
508        // in the streaming loop) before we send — avoids the spawn-vs-send race.
509        let (_s, peer_tx, _b, _sd, mut w) = harness(
510            NotifyWatchOpt::INITIAL_NETMAP,
511            DeviceState::Running,
512            Vec::new(),
513        );
514        let snap = w.next().await.expect("initial netmap snapshot");
515        assert_eq!(snap.net_map, Some(Vec::new()));
516        // Send a NON-EMPTY peer set so the assertion proves the payload is actually carried through,
517        // not merely that a notification fires.
518        let peers = vec![peer("peer-a"), peer("peer-b")];
519        peer_tx.send_replace(peers.clone());
520        let n = w.next().await.expect("netmap change");
521        assert_eq!(n.net_map, Some(peers));
522        assert_eq!(n.state, None);
523    }
524
525    /// After the initial snapshot, with no further changes, the bus does NOT re-emit — proving the
526    /// `borrow_and_update` correctly marks the snapshotted values seen (no initial-value busy-loop).
527    #[tokio::test]
528    async fn no_spurious_reemit_after_initial() {
529        let (state_tx, _p, _b, _sd, mut w) = harness(
530            NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP,
531            DeviceState::Running,
532            Vec::new(),
533        );
534        let _initial = w.next().await.expect("initial snapshot");
535        assert!(
536            tokio::time::timeout(QUIET_WINDOW, w.next()).await.is_err(),
537            "no change occurred, so no further notification must arrive"
538        );
539        // Positive liveness anchor: prove the watcher was genuinely alive during the quiet window
540        // (not dropped/dead, which would ALSO deliver nothing and make the assertion above vacuous).
541        // A real transition after the silence must still be delivered.
542        state_tx.send_replace(DeviceState::Expired);
543        let n = w
544            .next()
545            .await
546            .expect("watcher still live after the quiet window");
547        assert_eq!(n.state, Some(DeviceState::Expired));
548    }
549
550    /// Flipping the shutdown cell terminates the stream: `next()` returns `None`.
551    #[tokio::test]
552    async fn shutdown_terminates_stream() {
553        let (_s, _p, _b, shutdown_tx, mut w) =
554            harness(NotifyWatchOpt::empty(), DeviceState::Running, Vec::new());
555        shutdown_tx.send_replace(true);
556        assert_eq!(w.next().await, None, "shutdown must end the stream");
557    }
558
559    /// If the runtime is already shutting down at subscribe time, the stream ends immediately.
560    #[tokio::test]
561    async fn already_shutdown_ends_immediately() {
562        let (state_tx, state_rx) = watch::channel(DeviceState::Running);
563        let (peer_tx, peer_rx) = watch::channel(Vec::new());
564        let (browser_tx, browser_rx) = watch::channel(None);
565        let (_shutdown_tx, shutdown_rx) = watch::channel(true);
566        let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
567        tokio::spawn(run_bus(
568            NotifyWatchOpt::INITIAL_STATE,
569            state_rx,
570            peer_rx,
571            browser_rx,
572            shutdown_rx,
573            tx,
574        ));
575        let mut w = IpnBusWatcher { rx };
576        assert_eq!(w.next().await, None, "already-shutdown must emit nothing");
577        // Keep the source senders alive until after the assertion so termination is attributable to
578        // the shutdown flag, not a sender drop.
579        drop((state_tx, peer_tx, browser_tx));
580    }
581
582    /// Dropping every source sender (runtime tearing down without the graceful flag) also ends the
583    /// stream rather than hanging.
584    #[tokio::test]
585    async fn source_sender_drop_terminates_stream() {
586        let (state_tx, _p, _b, _sd, mut w) =
587            harness(NotifyWatchOpt::empty(), DeviceState::Running, Vec::new());
588        drop((state_tx, _p, _b, _sd));
589        assert_eq!(w.next().await, None, "all senders gone must end the stream");
590    }
591
592    /// Streamed (post-subscribe) events are delivered per-source: a state change and a peer change
593    /// arrive as TWO single-field `Notify`s, not one coalesced event. This pins the documented
594    /// contract (only the *initial snapshot* coalesces; the loop is per-cell) so a future change to
595    /// the merge loop can't silently alter it.
596    #[tokio::test]
597    async fn streamed_events_are_per_source_not_coalesced() {
598        let (state_tx, peer_tx, _b, _sd, mut w) = harness(
599            NotifyWatchOpt::INITIAL_STATE,
600            DeviceState::Connecting,
601            Vec::new(),
602        );
603        let _snap = w.next().await.expect("initial snapshot barrier");
604        // Move two distinct sources. They are independent watch cells, so the bus emits one Notify
605        // per source — never a single Notify carrying both `state` and `net_map`.
606        state_tx.send_replace(DeviceState::Running);
607        peer_tx.send_replace(vec![peer("peer-a")]);
608        let first = w.next().await.expect("first event");
609        let second = w.next().await.expect("second event");
610        for n in [&first, &second] {
611            assert!(
612                n.state.is_some() ^ n.net_map.is_some(),
613                "each streamed Notify carries exactly one of state / net_map, got {n:?}"
614            );
615        }
616        // Both fields were delivered, just across two events (order is biased-but-unspecified here).
617        assert!(
618            first.state.is_some() || second.state.is_some(),
619            "a state event arrived"
620        );
621        assert!(
622            first.net_map.is_some() || second.net_map.is_some(),
623            "a net_map event arrived"
624        );
625    }
626
627    /// A sequence of state transitions yields one ordered `Notify` per transition, with
628    /// `browse_to_url` set only on the `NeedsLogin` one — proving the loop re-arms correctly across
629    /// more than a single cycle and preserves order.
630    #[tokio::test]
631    async fn sequential_state_transitions_stream_in_order() {
632        let (state_tx, _p, _b, _sd, mut w) = harness(
633            NotifyWatchOpt::INITIAL_STATE,
634            DeviceState::Connecting,
635            Vec::new(),
636        );
637        assert_eq!(
638            w.next().await.expect("snapshot").state,
639            Some(DeviceState::Connecting)
640        );
641        for next in [
642            DeviceState::Running,
643            DeviceState::NeedsLogin(login_url()),
644            DeviceState::Expired,
645        ] {
646            state_tx.send_replace(next.clone());
647            let n = w.next().await.expect("transition");
648            assert_eq!(n.state, Some(next.clone()));
649            assert_eq!(n.net_map, None);
650            let expect_url = matches!(next, DeviceState::NeedsLogin(_)).then(login_url);
651            assert_eq!(n.browse_to_url, expect_url);
652        }
653    }
654
655    /// Each non-login state flows through as `state: Some(..)` with `browse_to_url: None` — closes
656    /// the enum (the earlier tests only exercised Connecting / Running / NeedsLogin).
657    #[tokio::test]
658    async fn expired_and_failed_states_stream_without_url() {
659        for state in [
660            DeviceState::Expired,
661            DeviceState::Failed(crate::RegistrationError::AuthRejected("bad key".into())),
662        ] {
663            let (state_tx, _p, _b, _sd, mut w) = harness(
664                NotifyWatchOpt::INITIAL_STATE,
665                DeviceState::Connecting,
666                Vec::new(),
667            );
668            let _snap = w.next().await.expect("snapshot barrier");
669            state_tx.send_replace(state.clone());
670            let n = w.next().await.expect("state event");
671            assert_eq!(n.state, Some(state));
672            assert_eq!(n.browse_to_url, None);
673        }
674    }
675
676    /// "Lossy by design": when the consumer never drains, a flood of changes fills the bounded
677    /// buffer and excess notifications are DROPPED — the producer (`send_replace` on the source
678    /// cell + the bus task) must never block. If `deliver` were changed to a blocking `send().await`,
679    /// the bus task would wedge and the subsequent shutdown would never be observed → this test would
680    /// hang (caught by the suite timeout). Proves the non-blocking `try_send` contract.
681    #[tokio::test]
682    async fn full_buffer_drops_and_never_blocks_producer() {
683        let (state_tx, _p, _b, shutdown_tx, mut w) =
684            harness(NotifyWatchOpt::empty(), DeviceState::Connecting, Vec::new());
685        // Never call w.next(): the per-watcher mpsc fills to NOTIFY_BUFFER then drops the rest.
686        // Push well past the buffer depth; yield so the bus task runs each send.
687        for _ in 0..(NOTIFY_BUFFER * 2 + 16) {
688            state_tx.send_replace(DeviceState::Running);
689            state_tx.send_replace(DeviceState::Connecting);
690            tokio::task::yield_now().await;
691        }
692        // The producer never blocked (we got here). The bus task is also not wedged: a shutdown is
693        // still observed promptly and ends the stream once the buffer drains.
694        shutdown_tx.send_replace(true);
695        // Drain whatever buffered (≤ NOTIFY_BUFFER) then the stream must terminate with None.
696        let mut drained = 0usize;
697        while let Some(_n) = w.next().await {
698            drained += 1;
699            assert!(
700                drained <= NOTIFY_BUFFER,
701                "buffer must be bounded at NOTIFY_BUFFER ({NOTIFY_BUFFER}), drained {drained}"
702            );
703        }
704    }
705
706    /// Dropping the `IpnBusWatcher` reclaims the bus task PROMPTLY via the `tx.closed()` select arm —
707    /// no subsequent source change is needed (the regression guard for the idle-device leak the
708    /// `tx.closed()` arm fixes). Proven by observing the task drop its cloned `state_rx`, which falls
709    /// the sender's `receiver_count` back to 0 once the task returns.
710    #[tokio::test]
711    async fn consumer_drop_terminates_task() {
712        let (state_tx, _p, _b, _sd, w) =
713            harness(NotifyWatchOpt::empty(), DeviceState::Connecting, Vec::new());
714        // Sanity: the bus task is live and holds a clone of the state receiver.
715        assert_eq!(
716            state_tx.receiver_count(),
717            1,
718            "bus task holds the source receiver"
719        );
720        // Drop the consumer with NO further change: its mpsc Receiver is gone, so `tx.closed()`
721        // resolves and the task must return on its own (not wait for an event).
722        drop(w);
723        // Poll until the task has returned (and thus dropped its state_rx). Bounded: a real leak
724        // never reaches 0 and fails by timing out under the suite cap. yield_now lets the task run.
725        while state_tx.receiver_count() != 0 {
726            tokio::task::yield_now().await;
727        }
728        assert_eq!(
729            state_tx.receiver_count(),
730            0,
731            "bus task must reclaim (drop its source receiver) once the consumer is gone"
732        );
733    }
734
735    /// A running-node consent URL (`MapResponse.PopBrowserURL`, via the de-thrashed browser cell)
736    /// streams as a standalone `browse_to_url` event — no `state`, no `net_map`.
737    #[tokio::test]
738    async fn running_node_browser_url_streams_standalone() {
739        // INITIAL_STATE snapshot is the barrier proving the task is in its streaming loop.
740        let (_s, _p, browser_tx, _sd, mut w) = harness(
741            NotifyWatchOpt::INITIAL_STATE,
742            DeviceState::Running,
743            Vec::new(),
744        );
745        let snap = w.next().await.expect("initial snapshot");
746        assert_eq!(snap.state, Some(DeviceState::Running));
747        assert_eq!(
748            snap.browse_to_url, None,
749            "running-node URL is not front-loaded"
750        );
751        // Control pushes a consent URL mid-session (the producer sends Some on a new URL).
752        browser_tx.send_replace(Some(consent_url()));
753        let n = w.next().await.expect("browse-to-url event");
754        assert_eq!(n.browse_to_url, Some(consent_url()));
755        assert_eq!(n.state, None);
756        assert_eq!(n.net_map, None);
757    }
758
759    /// The running-node consent URL is NOT front-loaded into the initial snapshot even when present
760    /// at subscribe time (Go replays only the registration `b.authURL`, never `PopBrowserURL`). The
761    /// sticky value is reachable via the pull API, not the bus snapshot.
762    #[tokio::test]
763    async fn running_node_browser_url_not_in_initial_snapshot() {
764        let (state_tx, state_rx) = watch::channel(DeviceState::Running);
765        let (peer_tx, peer_rx) = watch::channel(Vec::new());
766        // Browser cell already holds a URL at subscribe time.
767        let (browser_tx, browser_rx) = watch::channel(Some(consent_url()));
768        let (shutdown_tx, shutdown_rx) = watch::channel(false);
769        let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
770        tokio::spawn(run_bus(
771            NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP,
772            state_rx,
773            peer_rx,
774            browser_rx,
775            shutdown_rx,
776            tx,
777        ));
778        let mut w = IpnBusWatcher { rx };
779        let snap = w.next().await.expect("initial snapshot");
780        // The snapshot carries state + net_map (masked) but NOT the pre-existing browser URL.
781        assert_eq!(snap.state, Some(DeviceState::Running));
782        assert_eq!(snap.net_map, Some(Vec::new()));
783        assert_eq!(
784            snap.browse_to_url, None,
785            "pre-existing running-node URL must not be front-loaded"
786        );
787        // It only arrives once it CHANGES post-subscribe.
788        let next = consent_url();
789        let mut next2 = next.clone();
790        next2.set_path("/consent2");
791        browser_tx.send_replace(Some(next2.clone()));
792        let n = w.next().await.expect("browser-url change after subscribe");
793        assert_eq!(n.browse_to_url, Some(next2));
794        drop((state_tx, peer_tx, shutdown_tx));
795    }
796
797    /// Mid-session re-auth, end to end through the bus: control returns `MachineNotAuthorized` on a
798    /// live re-register, the control client surfaces the URL, the runtime bridge sets
799    /// [`DeviceState::NeedsLogin`] — which the bus turns into a `browse_to_url` event — and then a
800    /// successful re-register flips the device back to `Running`, clearing `browse_to_url`. This is
801    /// the user-visible contract of the fix (the dropped re-auth URL now reaches the embedder, and
802    /// goes away once the node recovers), exercised over the same `state_tx` the bridge writes.
803    #[tokio::test]
804    async fn mid_session_reauth_surfaces_browse_to_url_then_clears() {
805        // Subscribe with INITIAL_STATE so the first `next()` (the snapshot) is the barrier proving
806        // the bus task is in its streaming loop before we drive transitions.
807        let (state_tx, _p, _b, _sd, mut w) = harness(
808            NotifyWatchOpt::INITIAL_STATE,
809            DeviceState::Running,
810            Vec::new(),
811        );
812        let snap = w.next().await.expect("initial snapshot");
813        assert_eq!(snap.state, Some(DeviceState::Running));
814        assert_eq!(snap.browse_to_url, None);
815
816        // Mid-session re-auth: the bridge sets NeedsLogin(url) on the state cell.
817        state_tx.send_replace(DeviceState::NeedsLogin(login_url()));
818        let n = w.next().await.expect("needs-login event");
819        assert_eq!(n.state, Some(DeviceState::NeedsLogin(login_url())));
820        assert_eq!(
821            n.browse_to_url,
822            Some(login_url()),
823            "the re-auth URL must reach the embedder as browse_to_url"
824        );
825
826        // A later successful re-register: the netmap self-node handler flips back to Running, and
827        // the bus reports the state change with browse_to_url cleared.
828        state_tx.send_replace(DeviceState::Running);
829        let n = w.next().await.expect("recovery event");
830        assert_eq!(n.state, Some(DeviceState::Running));
831        assert_eq!(
832            n.browse_to_url, None,
833            "recovering to Running clears the browse_to_url"
834        );
835    }
836
837    /// Two distinct consent URLs in sequence stream as two `browse_to_url` events.
838    #[tokio::test]
839    async fn sequential_browser_urls_stream_each() {
840        let (_s, _p, browser_tx, _sd, mut w) = harness(
841            NotifyWatchOpt::INITIAL_STATE,
842            DeviceState::Running,
843            Vec::new(),
844        );
845        let _snap = w.next().await.expect("snapshot barrier");
846        let url_a = consent_url();
847        let mut url_b = consent_url();
848        url_b.set_path("/consent-b");
849        browser_tx.send_replace(Some(url_a.clone()));
850        assert_eq!(
851            w.next().await.expect("first url").browse_to_url,
852            Some(url_a)
853        );
854        browser_tx.send_replace(Some(url_b.clone()));
855        assert_eq!(
856            w.next().await.expect("second url").browse_to_url,
857            Some(url_b)
858        );
859    }
860
861    /// A browser-URL change and a state change arrive as TWO distinct single-field events (the new
862    /// browser arm doesn't coalesce into, or clobber, a concurrent state transition). Companion to
863    /// `streamed_events_are_per_source_not_coalesced` (state+peer), for the browser+state pair.
864    #[tokio::test]
865    async fn browser_url_and_state_change_interleave() {
866        let (state_tx, _p, browser_tx, _sd, mut w) = harness(
867            NotifyWatchOpt::INITIAL_STATE,
868            DeviceState::Running,
869            Vec::new(),
870        );
871        let _snap = w.next().await.expect("snapshot barrier");
872        browser_tx.send_replace(Some(consent_url()));
873        state_tx.send_replace(DeviceState::Expired);
874        let a = w.next().await.expect("first event");
875        let b = w.next().await.expect("second event");
876        for n in [&a, &b] {
877            assert!(
878                n.state.is_some() ^ n.browse_to_url.is_some(),
879                "each streamed event carries exactly one of state / browse_to_url, got {n:?}"
880            );
881            assert_eq!(n.net_map, None);
882        }
883        assert!(
884            a.browse_to_url.is_some() || b.browse_to_url.is_some(),
885            "a browse_to_url event arrived"
886        );
887        assert!(
888            a.state.is_some() || b.state.is_some(),
889            "a state event arrived"
890        );
891    }
892}