ts_runtime/ipn_bus.rs
1//! Unified IPN notification bus: a single push-style stream that coalesces the device's
2//! connection-[`DeviceState`] and netmap peer-set changes into one [`Notify`] feed, mirroring Go
3//! `ipn` `LocalBackend.WatchNotifications` / the `WatchIPNBus` LocalAPI.
4//!
5//! Go delivers one `ipn.Notify` struct per event in which **only the changed fields are populated**
6//! (a nil field means "unchanged"); an optional subscribe-time mask ([`NotifyWatchOpt`]) front-loads
7//! an initial snapshot of the current state. [`Notify`] is the faithful Rust shape of that struct —
8//! a struct of `Option`s, not a per-event enum.
9//!
10//! # Coalescing: initial snapshot vs. streamed events
11//!
12//! The struct-of-`Option`s shape lets one `Notify` carry several changed fields at once. This bus
13//! exploits that **for the initial snapshot only**: the subscribe-time snapshot reads every source
14//! cell synchronously and packs the masked fields into one `Notify`. Post-subscribe, the merge loop
15//! is per-source — each source cell's change produces its own single-field `Notify` (a state change
16//! yields `state: Some`, a peer change yields `net_map: Some`), because the cells are independent
17//! `watch` channels with no cross-cell synchronization point to coalesce on. A consumer therefore
18//! sees at most one coalesced snapshot followed by single-field deltas. (Go can pack several fields
19//! into one streamed `Notify` because a single `MapResponse` updates several things together under
20//! one lock; the fork has already split those into separate cells, so the equivalent streamed events
21//! arrive separately here. The `Option` shape is still the right type — it keeps the snapshot
22//! faithful and leaves room for a future single source to set multiple fields.)
23//!
24//! # Why these sources
25//!
26//! The fork already decomposes Go's single notification channel into separate, individually-correct
27//! `watch` surfaces ([`Runtime::watch_state`](crate::Runtime::watch_state),
28//! [`Runtime::watch_netmap`](crate::Runtime::watch_netmap)). This bus *composes* the same cells (one
29//! source of truth — it cannot diverge from the narrow views) into the merged feed an embedder
30//! porting from Go's `WatchIPNBus` expects. The two cells it reads map onto Go `Notify` fields:
31//!
32//! - [`DeviceState`] → `Notify.State`, and the **registration-time** interactive-login URL carried
33//! by [`DeviceState::NeedsLogin`] (`Notify.browse_to_url`, derived from that state — control's
34//! `MachineNotAuthorized`).
35//! - the running-node consent URL (`MapResponse.PopBrowserURL`) → `Notify.browse_to_url` as a
36//! mid-session event. Go also forwards this `BrowseToURL` for an already-`Running` node (re-auth /
37//! forced-re-login nudges). The fork's backing cell is **sticky** (the producer updates it only on
38//! a new non-empty URL, never resets it to `None` on an empty update — Go's `direct.go` guard
39//! `u != "" && u != sess.lastPopBrowserURL`), so a `watch` subscriber is not thrashed. It is
40//! streamed post-subscribe but **not** front-loaded into the initial snapshot — Go replays only the
41//! registration `b.authURL` (the `NeedsLogin`-derived URL above) on a new watcher, never the
42//! running-node `PopBrowserURL`; a consumer wanting the current pending URL at subscribe time reads
43//! the sticky `pop_browser_url` pull API.
44//! - the peer set (`Vec<StatusNode>`) → `Notify.NetMap` (the embedder-facing peer view).
45//!
46//! Go's `Notify` has no packet-filter cap-grant field (caps are an internal `WhoIs` input, not an
47//! embedder notification), so the retained cap-grants cell is intentionally **not** surfaced here.
48//!
49//! # Lossy by design
50//!
51//! Like Go's bus (a bounded 128-deep channel drained with a non-blocking `select { case ch<-n:
52//! default: drop }`), delivery is best-effort: the per-watcher [`mpsc`](tokio::sync::mpsc) channel
53//! is bounded at [`NOTIFY_BUFFER`](crate::ipn_bus::NOTIFY_BUFFER) and a notification for a watcher
54//! whose buffer is full is **dropped**, never
55//! blocking the producer. The underlying `watch` cells are themselves coalescing, so a slow consumer
56//! observes the latest state, not every intermediate — the right semantics for state/netmap
57//! snapshots (and the reason this bus is not used for any at-least-once delivery).
58
59use tokio::sync::{mpsc, watch};
60
61use crate::{device_state::DeviceState, status::StatusNode};
62
63/// Per-watcher notification buffer depth. Matches Go's `ipn` bus channel size
64/// (`make(chan *ipn.Notify, 128)`): a bounded queue that the producer never blocks on — a full
65/// buffer drops the notification (see module docs).
66pub const NOTIFY_BUFFER: usize = 128;
67
68/// Selects which initial-state fields are front-loaded into the first [`Notify`] when a watcher
69/// subscribes (Go `ipn.NotifyWatchOpt`). A bitfield; combine with `|`.
70///
71/// The numeric values match Go's `NotifyWatchOpt` literals exactly (`NotifyInitialState = 1 << 1`,
72/// `NotifyInitialNetMap = 1 << 3`), so a mask built from Go's integer constants is wire-compatible.
73/// Bits Go defines but this bus does not yet surface (initial prefs/health/etc.) are simply not
74/// honored — passing them is harmless, exactly as an unrecognized bit is in Go.
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
76pub struct NotifyWatchOpt(u64);
77
78impl NotifyWatchOpt {
79 /// No initial snapshot: the watcher receives only changes that occur after it subscribes.
80 pub const fn empty() -> Self {
81 Self(0)
82 }
83
84 /// Front-load the current [`DeviceState`] (and, when it is [`DeviceState::NeedsLogin`], the
85 /// auth URL as `browse_to_url`) into the first [`Notify`]. Go `NotifyInitialState` (`1 << 1`).
86 pub const INITIAL_STATE: Self = Self(1 << 1);
87
88 /// Front-load the current peer set (`net_map`) into the first [`Notify`]. Go
89 /// `NotifyInitialNetMap` (`1 << 3`).
90 pub const INITIAL_NETMAP: Self = Self(1 << 3);
91
92 /// Whether all bits in `other` are set in `self`.
93 pub const fn contains(self, other: Self) -> bool {
94 self.0 & other.0 == other.0
95 }
96}
97
98impl core::ops::BitOr for NotifyWatchOpt {
99 type Output = Self;
100 fn bitor(self, rhs: Self) -> Self {
101 Self(self.0 | rhs.0)
102 }
103}
104
105/// A single notification from the [IPN bus](self), mirroring Go `ipn.Notify`: each field is `Some`
106/// only when it changed in this event (a `None` field means "unchanged"). One event may populate
107/// several fields at once (e.g. a netmap update that also moves the device state).
108///
109/// `#[non_exhaustive]` so future Go-parity fields (prefs, engine status, health) can be added
110/// without breaking embedders that match on it.
111#[derive(Debug, Clone, Default, PartialEq, Eq)]
112#[non_exhaustive]
113pub struct Notify {
114 /// The new device connection-state, if it changed (Go `Notify.State`).
115 pub state: Option<DeviceState>,
116 /// The new peer set, if the netmap changed (Go `Notify.NetMap`, embedder-facing peer view).
117 pub net_map: Option<Vec<StatusNode>>,
118 /// An interactive-login / consent URL the embedder should open (Go `Notify.BrowseToURL`). Two
119 /// sources feed it: the **registration-time** auth URL, derived from [`DeviceState::NeedsLogin`]
120 /// and set alongside `state` when the device enters that state; and the **mid-session**
121 /// `MapResponse.PopBrowserURL` (re-auth / consent on an already-running node), streamed on its own
122 /// as a standalone event. See the module docs for which is front-loaded into the initial snapshot
123 /// (only the registration URL) vs. streamed (both).
124 pub browse_to_url: Option<url::Url>,
125}
126
127impl Notify {
128 /// Whether this notification carries no populated field. An all-`None` `Notify` is never
129 /// delivered (the bus skips it), so observing one from [`IpnBusWatcher::next`] is impossible;
130 /// the predicate exists for the bus's own "is there anything to send?" check.
131 fn is_empty(&self) -> bool {
132 self.state.is_none() && self.net_map.is_none() && self.browse_to_url.is_none()
133 }
134}
135
136/// A handle to a live [IPN bus](self) subscription, mirroring Go's `IPNBusWatcher`. Await
137/// [`next`](Self::next) to receive [`Notify`] events; it returns `None` when the stream ends (the
138/// runtime shut down, or this watcher was dropped).
139#[derive(Debug)]
140pub struct IpnBusWatcher {
141 rx: mpsc::Receiver<Notify>,
142}
143
144impl IpnBusWatcher {
145 /// Await the next [`Notify`]. Returns `None` once the bus has terminated (runtime shutdown or
146 /// every source cell's sender dropped) — the clean end-of-stream signal, like Go's watcher
147 /// channel closing.
148 pub async fn next(&mut self) -> Option<Notify> {
149 self.rx.recv().await
150 }
151}
152
153/// Spawn the bus task feeding `tx` and return the consumer handle. Reads cloned `watch` receivers
154/// (so it never contends with the runtime's own readers) and a `shutdown` receiver that terminates
155/// the task. The task self-terminates on shutdown, on any source sender dropping, or when the
156/// returned [`IpnBusWatcher`] is dropped (the `tx` send then reports the channel closed) — so it
157/// cannot leak past the runtime or a discarded watcher.
158pub(crate) fn spawn_watcher(
159 mask: NotifyWatchOpt,
160 state_rx: watch::Receiver<DeviceState>,
161 peer_rx: watch::Receiver<Vec<StatusNode>>,
162 browser_rx: watch::Receiver<Option<url::Url>>,
163 shutdown_rx: watch::Receiver<bool>,
164) -> IpnBusWatcher {
165 let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
166 tokio::spawn(run_bus(
167 mask,
168 state_rx,
169 peer_rx,
170 browser_rx,
171 shutdown_rx,
172 tx,
173 ));
174 IpnBusWatcher { rx }
175}
176
177/// Try to deliver `n`, returning `true` when the bus should stop (the consumer is gone).
178///
179/// Mirrors Go's non-blocking `select { case ch <- n: default: /* drop */ }`: a `Full` buffer drops
180/// the notification and keeps streaming (best-effort delivery, never block the producer); a `Closed`
181/// channel means the watcher was dropped, so the task is done.
182fn deliver(tx: &mpsc::Sender<Notify>, n: Notify) -> bool {
183 match tx.try_send(n) {
184 Ok(()) => false,
185 Err(mpsc::error::TrySendError::Full(_)) => false,
186 Err(mpsc::error::TrySendError::Closed(_)) => true,
187 }
188}
189
190/// The interactive-login URL implied by a device state: `Some` only for [`DeviceState::NeedsLogin`].
191/// The single derivation rule for `browse_to_url`, shared by the initial snapshot and the streaming
192/// state arm so the two can never drift (see module docs on the registration-time URL).
193fn browse_url_for(state: &DeviceState) -> Option<url::Url> {
194 match state {
195 DeviceState::NeedsLogin(u) => Some(u.clone()),
196 _ => None,
197 }
198}
199
200/// Build the `Notify` for a device-state transition: the state plus its derived `browse_to_url`.
201fn state_notify(state: DeviceState) -> Notify {
202 let browse_to_url = browse_url_for(&state);
203 Notify {
204 state: Some(state),
205 net_map: None,
206 browse_to_url,
207 }
208}
209
210/// The bus loop, factored out of [`spawn_watcher`] so the (subtle) ordering — the masked initial
211/// snapshot, the `borrow_and_update` that prevents an initial-value busy-loop, the shutdown arm, and
212/// sender-drop termination — is unit-testable against plain `watch`/`mpsc` channels without standing
213/// up a runtime (mirrors [`device_state::wait_for_running`](crate::device_state::wait_for_running)).
214pub(crate) async fn run_bus(
215 mask: NotifyWatchOpt,
216 mut state_rx: watch::Receiver<DeviceState>,
217 mut peer_rx: watch::Receiver<Vec<StatusNode>>,
218 mut browser_rx: watch::Receiver<Option<url::Url>>,
219 mut shutdown_rx: watch::Receiver<bool>,
220 tx: mpsc::Sender<Notify>,
221) {
222 // If the runtime is already shutting down, end before doing anything. This also marks the
223 // shutdown cell's initial `false` as *seen* so the `select!` arm below doesn't fire spuriously
224 // on the unobserved initial value (the classic `watch`-in-`select!` busy-loop).
225 if *shutdown_rx.borrow_and_update() {
226 return;
227 }
228
229 // Initial snapshot: ONE coalesced `Notify` carrying whichever masked fields are requested
230 // (Go front-loads State+NetMap into a single `ini` struct). `borrow_and_update` reads the
231 // current value AND marks it seen, so the streaming loop's first `changed()` waits for a real
232 // transition instead of re-emitting the value we just snapshotted.
233 let mut initial = Notify::default();
234 {
235 let state = state_rx.borrow_and_update();
236 if mask.contains(NotifyWatchOpt::INITIAL_STATE) {
237 initial.browse_to_url = browse_url_for(&state);
238 initial.state = Some(state.clone());
239 }
240 }
241 {
242 let peers = peer_rx.borrow_and_update();
243 if mask.contains(NotifyWatchOpt::INITIAL_NETMAP) {
244 initial.net_map = Some(peers.clone());
245 }
246 }
247 // Mark the running-node browser-URL cell's initial value seen so the streaming arm waits for a
248 // real post-subscribe change (busy-loop prevention, same as the cells above). Its current value
249 // is deliberately NOT front-loaded into the initial snapshot: Go replays only the
250 // registration-time auth URL (the `NeedsLogin`-derived `browse_to_url` above), never the
251 // running-node `MapResponse.PopBrowserURL`, on a new watcher's initial state. A consumer wanting
252 // the current pending consent URL at subscribe time reads the sticky `pop_browser_url` pull API;
253 // the bus streams future transitions.
254 browser_rx.borrow_and_update();
255 if !initial.is_empty() && deliver(&tx, initial) {
256 return;
257 }
258
259 // Stream subsequent changes. `biased` makes shutdown take priority over data so a teardown is
260 // observed promptly. Each data arm re-reads with `borrow_and_update().clone()` into an owned
261 // value and drops the borrow guard *before* the next await — never holding a `watch` read guard
262 // across `.changed()` (which would deadlock). A sender-drop (`changed()` => `Err`) ends the
263 // stream, exactly as `wait_for_running` treats it.
264 loop {
265 tokio::select! {
266 biased;
267 _ = shutdown_rx.changed() => return,
268 // The consumer dropped its `IpnBusWatcher`: reclaim the task immediately rather than
269 // waiting for the next source change to surface a `Closed` on the next `deliver`. On an
270 // idle (quiet) device that next change might be far off, so without this arm a dropped
271 // watcher would leave the task parked until shutdown. `Sender::closed()` resolves once
272 // every receiver is gone.
273 _ = tx.closed() => return,
274 changed = state_rx.changed() => {
275 if changed.is_err() {
276 return;
277 }
278 let state = state_rx.borrow_and_update().clone();
279 if deliver(&tx, state_notify(state)) {
280 return;
281 }
282 }
283 changed = peer_rx.changed() => {
284 if changed.is_err() {
285 return;
286 }
287 let peers = peer_rx.borrow_and_update().clone();
288 let notify = Notify {
289 state: None,
290 net_map: Some(peers),
291 browse_to_url: None,
292 };
293 if deliver(&tx, notify) {
294 return;
295 }
296 }
297 changed = browser_rx.changed() => {
298 if changed.is_err() {
299 return;
300 }
301 // The running-node consent URL (`MapResponse.PopBrowserURL`). The producer cell is
302 // de-thrashed (updated only on a new non-empty URL, never reset to `None`), so a
303 // change here carries a fresh `Some(url)`; skip the defensive `None` case rather than
304 // emit an empty `browse_to_url`.
305 let url = browser_rx.borrow_and_update().clone();
306 if let Some(url) = url {
307 let notify = Notify {
308 state: None,
309 net_map: None,
310 browse_to_url: Some(url),
311 };
312 if deliver(&tx, notify) {
313 return;
314 }
315 }
316 }
317 }
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use core::time::Duration;
324
325 use tokio::sync::{mpsc, watch};
326
327 use super::*;
328
329 /// The hand-made channel senders (state, peer, browser-URL, shutdown) plus the consumer handle
330 /// that [`harness`] returns — the four source senders let a test drive `run_bus`, and the
331 /// `IpnBusWatcher` observes what it emits.
332 type Harness = (
333 watch::Sender<DeviceState>,
334 watch::Sender<Vec<StatusNode>>,
335 watch::Sender<Option<url::Url>>,
336 watch::Sender<bool>,
337 IpnBusWatcher,
338 );
339
340 /// Drive `run_bus` on a task against hand-made channels, returning the senders (state, peer,
341 /// browser-URL, shutdown) and the consumer handle. Mirrors how `device_state` tests drive
342 /// `wait_for_running` off a plain `watch`.
343 fn harness(mask: NotifyWatchOpt, state: DeviceState, peers: Vec<StatusNode>) -> Harness {
344 let (state_tx, state_rx) = watch::channel(state);
345 let (peer_tx, peer_rx) = watch::channel(peers);
346 let (browser_tx, browser_rx) = watch::channel(None);
347 let (shutdown_tx, shutdown_rx) = watch::channel(false);
348 let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
349 tokio::spawn(run_bus(
350 mask,
351 state_rx,
352 peer_rx,
353 browser_rx,
354 shutdown_rx,
355 tx,
356 ));
357 (
358 state_tx,
359 peer_tx,
360 browser_tx,
361 shutdown_tx,
362 IpnBusWatcher { rx },
363 )
364 }
365
366 fn login_url() -> url::Url {
367 "https://login.example/auth".parse().unwrap()
368 }
369
370 fn consent_url() -> url::Url {
371 "https://login.example/consent".parse().unwrap()
372 }
373
374 /// A minimal non-empty peer, so a `net_map` payload assertion exercises a real value rather than
375 /// the degenerate empty-vec round-trip.
376 fn peer(id: &str) -> StatusNode {
377 use core::net::{IpAddr, Ipv4Addr, Ipv6Addr};
378 StatusNode {
379 stable_id: ts_control::StableNodeId(id.to_owned()),
380 display_name: id.to_owned(),
381 ipv4: IpAddr::V4(Ipv4Addr::new(100, 64, 0, 1)),
382 ipv6: IpAddr::V6(Ipv6Addr::LOCALHOST),
383 online: Some(true),
384 last_seen: None,
385 allowed_routes: Vec::new(),
386 is_exit_node: false,
387 cur_addr: None,
388 relay: None,
389 ssh_host_keys: Vec::new(),
390 }
391 }
392
393 /// A negative-assertion window: long enough that a real-but-slow event would still arrive within
394 /// it on a loaded CI box (so "nothing arrived" is trustworthy, not just "nothing arrived *yet*").
395 const QUIET_WINDOW: Duration = Duration::from_millis(250);
396
397 /// `NotifyWatchOpt` is a faithful bitfield: Go's literal values, `contains`, and `|` compose.
398 #[test]
399 fn mask_bitfield_semantics() {
400 assert!(NotifyWatchOpt::empty().contains(NotifyWatchOpt::empty()));
401 assert!(!NotifyWatchOpt::empty().contains(NotifyWatchOpt::INITIAL_STATE));
402 let both = NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP;
403 assert!(both.contains(NotifyWatchOpt::INITIAL_STATE));
404 assert!(both.contains(NotifyWatchOpt::INITIAL_NETMAP));
405 // Wire-compatible with Go's NotifyWatchOpt integer literals.
406 assert_eq!(NotifyWatchOpt::INITIAL_STATE, NotifyWatchOpt(1 << 1));
407 assert_eq!(NotifyWatchOpt::INITIAL_NETMAP, NotifyWatchOpt(1 << 3));
408 }
409
410 /// `NotifyInitialState` front-loads the current state into the first `Notify` (state only, no
411 /// net_map).
412 #[tokio::test]
413 async fn initial_state_snapshot_emitted_when_masked() {
414 let (_s, _p, _b, _sd, mut w) = harness(
415 NotifyWatchOpt::INITIAL_STATE,
416 DeviceState::Running,
417 Vec::new(),
418 );
419 let n = w.next().await.expect("initial snapshot");
420 assert_eq!(n.state, Some(DeviceState::Running));
421 assert_eq!(n.net_map, None);
422 assert_eq!(n.browse_to_url, None);
423 }
424
425 /// `NotifyInitialNetMap` front-loads the current peer set (net_map only, no state).
426 #[tokio::test]
427 async fn initial_netmap_snapshot_emitted_when_masked() {
428 let (_s, _p, _b, _sd, mut w) = harness(
429 NotifyWatchOpt::INITIAL_NETMAP,
430 DeviceState::Running,
431 Vec::new(),
432 );
433 let n = w.next().await.expect("initial snapshot");
434 assert_eq!(n.net_map, Some(Vec::new()));
435 assert_eq!(n.state, None);
436 }
437
438 /// Both initial bits coalesce into ONE `Notify` (Go builds a single `ini` struct), not two
439 /// separate events.
440 #[tokio::test]
441 async fn initial_snapshot_coalesces_both_fields() {
442 let (_s, _p, _b, _sd, mut w) = harness(
443 NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP,
444 DeviceState::Running,
445 Vec::new(),
446 );
447 let n = w.next().await.expect("initial snapshot");
448 assert_eq!(n.state, Some(DeviceState::Running));
449 assert_eq!(n.net_map, Some(Vec::new()));
450 }
451
452 /// An empty mask sends NO initial snapshot; the watcher then receives the next real transition.
453 #[tokio::test]
454 async fn empty_mask_skips_initial_then_streams_change() {
455 let (state_tx, _p, _b, _sd, mut w) =
456 harness(NotifyWatchOpt::empty(), DeviceState::Connecting, Vec::new());
457 // No initial snapshot: nothing within the quiet window.
458 assert!(
459 tokio::time::timeout(QUIET_WINDOW, w.next()).await.is_err(),
460 "empty mask must not emit an initial snapshot"
461 );
462 // Positive anchor: the watcher is still live and delivers the next real transition (so the
463 // negative assertion above was "nothing to send", not "stream already dead").
464 state_tx.send_replace(DeviceState::Running);
465 let n = w.next().await.expect("change after subscribe");
466 assert_eq!(n.state, Some(DeviceState::Running));
467 }
468
469 /// A `NeedsLogin` transition derives `browse_to_url` alongside `state` — one source of truth for
470 /// the auth URL.
471 #[tokio::test]
472 async fn needs_login_transition_derives_browse_to_url() {
473 // Subscribe with INITIAL_STATE so awaiting the first `next()` (the snapshot) is a
474 // deterministic barrier proving the bus task has finished its init borrows and entered the
475 // streaming loop — only then is a post-subscribe send guaranteed to be observed (no sleeps,
476 // no spawn-vs-send race). Any change after `.changed()`'s seen-version is detected even if
477 // the loop is not yet parked on `.changed()`.
478 let (state_tx, _p, _b, _sd, mut w) = harness(
479 NotifyWatchOpt::INITIAL_STATE,
480 DeviceState::Connecting,
481 Vec::new(),
482 );
483 let snap = w.next().await.expect("initial snapshot");
484 assert_eq!(snap.state, Some(DeviceState::Connecting));
485 assert_eq!(snap.browse_to_url, None);
486 state_tx.send_replace(DeviceState::NeedsLogin(login_url()));
487 let n = w.next().await.expect("needs-login event");
488 assert_eq!(n.state, Some(DeviceState::NeedsLogin(login_url())));
489 assert_eq!(n.browse_to_url, Some(login_url()));
490 }
491
492 /// `NeedsLogin` present at subscribe is front-loaded with its `browse_to_url` (matches Go: the
493 /// initial snapshot carries `BrowseToURL` only when `state == NeedsLogin`).
494 #[tokio::test]
495 async fn initial_needs_login_includes_browse_to_url() {
496 let (_s, _p, _b, _sd, mut w) = harness(
497 NotifyWatchOpt::INITIAL_STATE,
498 DeviceState::NeedsLogin(login_url()),
499 Vec::new(),
500 );
501 let n = w.next().await.expect("initial snapshot");
502 assert_eq!(n.browse_to_url, Some(login_url()));
503 }
504
505 /// A peer-set change streams as a `net_map` notification (no state field), carrying the actual
506 /// new peer payload (not just the degenerate empty round-trip).
507 #[tokio::test]
508 async fn peer_change_streams_netmap() {
509 // INITIAL_NETMAP snapshot is the barrier (proves the task finished its init borrows and is
510 // in the streaming loop) before we send — avoids the spawn-vs-send race.
511 let (_s, peer_tx, _b, _sd, mut w) = harness(
512 NotifyWatchOpt::INITIAL_NETMAP,
513 DeviceState::Running,
514 Vec::new(),
515 );
516 let snap = w.next().await.expect("initial netmap snapshot");
517 assert_eq!(snap.net_map, Some(Vec::new()));
518 // Send a NON-EMPTY peer set so the assertion proves the payload is actually carried through,
519 // not merely that a notification fires.
520 let peers = vec![peer("peer-a"), peer("peer-b")];
521 peer_tx.send_replace(peers.clone());
522 let n = w.next().await.expect("netmap change");
523 assert_eq!(n.net_map, Some(peers));
524 assert_eq!(n.state, None);
525 }
526
527 /// After the initial snapshot, with no further changes, the bus does NOT re-emit — proving the
528 /// `borrow_and_update` correctly marks the snapshotted values seen (no initial-value busy-loop).
529 #[tokio::test]
530 async fn no_spurious_reemit_after_initial() {
531 let (state_tx, _p, _b, _sd, mut w) = harness(
532 NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP,
533 DeviceState::Running,
534 Vec::new(),
535 );
536 let _initial = w.next().await.expect("initial snapshot");
537 assert!(
538 tokio::time::timeout(QUIET_WINDOW, w.next()).await.is_err(),
539 "no change occurred, so no further notification must arrive"
540 );
541 // Positive liveness anchor: prove the watcher was genuinely alive during the quiet window
542 // (not dropped/dead, which would ALSO deliver nothing and make the assertion above vacuous).
543 // A real transition after the silence must still be delivered.
544 state_tx.send_replace(DeviceState::Expired);
545 let n = w
546 .next()
547 .await
548 .expect("watcher still live after the quiet window");
549 assert_eq!(n.state, Some(DeviceState::Expired));
550 }
551
552 /// Flipping the shutdown cell terminates the stream: `next()` returns `None`.
553 #[tokio::test]
554 async fn shutdown_terminates_stream() {
555 let (_s, _p, _b, shutdown_tx, mut w) =
556 harness(NotifyWatchOpt::empty(), DeviceState::Running, Vec::new());
557 shutdown_tx.send_replace(true);
558 assert_eq!(w.next().await, None, "shutdown must end the stream");
559 }
560
561 /// If the runtime is already shutting down at subscribe time, the stream ends immediately.
562 #[tokio::test]
563 async fn already_shutdown_ends_immediately() {
564 let (state_tx, state_rx) = watch::channel(DeviceState::Running);
565 let (peer_tx, peer_rx) = watch::channel(Vec::new());
566 let (browser_tx, browser_rx) = watch::channel(None);
567 let (_shutdown_tx, shutdown_rx) = watch::channel(true);
568 let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
569 tokio::spawn(run_bus(
570 NotifyWatchOpt::INITIAL_STATE,
571 state_rx,
572 peer_rx,
573 browser_rx,
574 shutdown_rx,
575 tx,
576 ));
577 let mut w = IpnBusWatcher { rx };
578 assert_eq!(w.next().await, None, "already-shutdown must emit nothing");
579 // Keep the source senders alive until after the assertion so termination is attributable to
580 // the shutdown flag, not a sender drop.
581 drop((state_tx, peer_tx, browser_tx));
582 }
583
584 /// Dropping every source sender (runtime tearing down without the graceful flag) also ends the
585 /// stream rather than hanging.
586 #[tokio::test]
587 async fn source_sender_drop_terminates_stream() {
588 let (state_tx, _p, _b, _sd, mut w) =
589 harness(NotifyWatchOpt::empty(), DeviceState::Running, Vec::new());
590 drop((state_tx, _p, _b, _sd));
591 assert_eq!(w.next().await, None, "all senders gone must end the stream");
592 }
593
594 /// Streamed (post-subscribe) events are delivered per-source: a state change and a peer change
595 /// arrive as TWO single-field `Notify`s, not one coalesced event. This pins the documented
596 /// contract (only the *initial snapshot* coalesces; the loop is per-cell) so a future change to
597 /// the merge loop can't silently alter it.
598 #[tokio::test]
599 async fn streamed_events_are_per_source_not_coalesced() {
600 let (state_tx, peer_tx, _b, _sd, mut w) = harness(
601 NotifyWatchOpt::INITIAL_STATE,
602 DeviceState::Connecting,
603 Vec::new(),
604 );
605 let _snap = w.next().await.expect("initial snapshot barrier");
606 // Move two distinct sources. They are independent watch cells, so the bus emits one Notify
607 // per source — never a single Notify carrying both `state` and `net_map`.
608 state_tx.send_replace(DeviceState::Running);
609 peer_tx.send_replace(vec![peer("peer-a")]);
610 let first = w.next().await.expect("first event");
611 let second = w.next().await.expect("second event");
612 for n in [&first, &second] {
613 assert!(
614 n.state.is_some() ^ n.net_map.is_some(),
615 "each streamed Notify carries exactly one of state / net_map, got {n:?}"
616 );
617 }
618 // Both fields were delivered, just across two events (order is biased-but-unspecified here).
619 assert!(
620 first.state.is_some() || second.state.is_some(),
621 "a state event arrived"
622 );
623 assert!(
624 first.net_map.is_some() || second.net_map.is_some(),
625 "a net_map event arrived"
626 );
627 }
628
629 /// A sequence of state transitions yields one ordered `Notify` per transition, with
630 /// `browse_to_url` set only on the `NeedsLogin` one — proving the loop re-arms correctly across
631 /// more than a single cycle and preserves order.
632 #[tokio::test]
633 async fn sequential_state_transitions_stream_in_order() {
634 let (state_tx, _p, _b, _sd, mut w) = harness(
635 NotifyWatchOpt::INITIAL_STATE,
636 DeviceState::Connecting,
637 Vec::new(),
638 );
639 assert_eq!(
640 w.next().await.expect("snapshot").state,
641 Some(DeviceState::Connecting)
642 );
643 for next in [
644 DeviceState::Running,
645 DeviceState::NeedsLogin(login_url()),
646 DeviceState::Expired,
647 ] {
648 state_tx.send_replace(next.clone());
649 let n = w.next().await.expect("transition");
650 assert_eq!(n.state, Some(next.clone()));
651 assert_eq!(n.net_map, None);
652 let expect_url = matches!(next, DeviceState::NeedsLogin(_)).then(login_url);
653 assert_eq!(n.browse_to_url, expect_url);
654 }
655 }
656
657 /// Each non-login state flows through as `state: Some(..)` with `browse_to_url: None` — closes
658 /// the enum (the earlier tests only exercised Connecting / Running / NeedsLogin).
659 #[tokio::test]
660 async fn expired_and_failed_states_stream_without_url() {
661 for state in [
662 DeviceState::Expired,
663 DeviceState::Failed(crate::RegistrationError::AuthRejected("bad key".into())),
664 ] {
665 let (state_tx, _p, _b, _sd, mut w) = harness(
666 NotifyWatchOpt::INITIAL_STATE,
667 DeviceState::Connecting,
668 Vec::new(),
669 );
670 let _snap = w.next().await.expect("snapshot barrier");
671 state_tx.send_replace(state.clone());
672 let n = w.next().await.expect("state event");
673 assert_eq!(n.state, Some(state));
674 assert_eq!(n.browse_to_url, None);
675 }
676 }
677
678 /// "Lossy by design": when the consumer never drains, a flood of changes fills the bounded
679 /// buffer and excess notifications are DROPPED — the producer (`send_replace` on the source
680 /// cell + the bus task) must never block. If `deliver` were changed to a blocking `send().await`,
681 /// the bus task would wedge and the subsequent shutdown would never be observed → this test would
682 /// hang (caught by the suite timeout). Proves the non-blocking `try_send` contract.
683 #[tokio::test]
684 async fn full_buffer_drops_and_never_blocks_producer() {
685 let (state_tx, _p, _b, shutdown_tx, mut w) =
686 harness(NotifyWatchOpt::empty(), DeviceState::Connecting, Vec::new());
687 // Never call w.next(): the per-watcher mpsc fills to NOTIFY_BUFFER then drops the rest.
688 // Push well past the buffer depth; yield so the bus task runs each send.
689 for _ in 0..(NOTIFY_BUFFER * 2 + 16) {
690 state_tx.send_replace(DeviceState::Running);
691 state_tx.send_replace(DeviceState::Connecting);
692 tokio::task::yield_now().await;
693 }
694 // The producer never blocked (we got here). The bus task is also not wedged: a shutdown is
695 // still observed promptly and ends the stream once the buffer drains.
696 shutdown_tx.send_replace(true);
697 // Drain whatever buffered (≤ NOTIFY_BUFFER) then the stream must terminate with None.
698 let mut drained = 0usize;
699 while let Some(_n) = w.next().await {
700 drained += 1;
701 assert!(
702 drained <= NOTIFY_BUFFER,
703 "buffer must be bounded at NOTIFY_BUFFER ({NOTIFY_BUFFER}), drained {drained}"
704 );
705 }
706 }
707
708 /// Dropping the `IpnBusWatcher` reclaims the bus task PROMPTLY via the `tx.closed()` select arm —
709 /// no subsequent source change is needed (the regression guard for the idle-device leak the
710 /// `tx.closed()` arm fixes). Proven by observing the task drop its cloned `state_rx`, which falls
711 /// the sender's `receiver_count` back to 0 once the task returns.
712 #[tokio::test]
713 async fn consumer_drop_terminates_task() {
714 let (state_tx, _p, _b, _sd, w) =
715 harness(NotifyWatchOpt::empty(), DeviceState::Connecting, Vec::new());
716 // Sanity: the bus task is live and holds a clone of the state receiver.
717 assert_eq!(
718 state_tx.receiver_count(),
719 1,
720 "bus task holds the source receiver"
721 );
722 // Drop the consumer with NO further change: its mpsc Receiver is gone, so `tx.closed()`
723 // resolves and the task must return on its own (not wait for an event).
724 drop(w);
725 // Poll until the task has returned (and thus dropped its state_rx). Bounded: a real leak
726 // never reaches 0 and fails by timing out under the suite cap. yield_now lets the task run.
727 while state_tx.receiver_count() != 0 {
728 tokio::task::yield_now().await;
729 }
730 assert_eq!(
731 state_tx.receiver_count(),
732 0,
733 "bus task must reclaim (drop its source receiver) once the consumer is gone"
734 );
735 }
736
737 /// A running-node consent URL (`MapResponse.PopBrowserURL`, via the de-thrashed browser cell)
738 /// streams as a standalone `browse_to_url` event — no `state`, no `net_map`.
739 #[tokio::test]
740 async fn running_node_browser_url_streams_standalone() {
741 // INITIAL_STATE snapshot is the barrier proving the task is in its streaming loop.
742 let (_s, _p, browser_tx, _sd, mut w) = harness(
743 NotifyWatchOpt::INITIAL_STATE,
744 DeviceState::Running,
745 Vec::new(),
746 );
747 let snap = w.next().await.expect("initial snapshot");
748 assert_eq!(snap.state, Some(DeviceState::Running));
749 assert_eq!(
750 snap.browse_to_url, None,
751 "running-node URL is not front-loaded"
752 );
753 // Control pushes a consent URL mid-session (the producer sends Some on a new URL).
754 browser_tx.send_replace(Some(consent_url()));
755 let n = w.next().await.expect("browse-to-url event");
756 assert_eq!(n.browse_to_url, Some(consent_url()));
757 assert_eq!(n.state, None);
758 assert_eq!(n.net_map, None);
759 }
760
761 /// The running-node consent URL is NOT front-loaded into the initial snapshot even when present
762 /// at subscribe time (Go replays only the registration `b.authURL`, never `PopBrowserURL`). The
763 /// sticky value is reachable via the pull API, not the bus snapshot.
764 #[tokio::test]
765 async fn running_node_browser_url_not_in_initial_snapshot() {
766 let (state_tx, state_rx) = watch::channel(DeviceState::Running);
767 let (peer_tx, peer_rx) = watch::channel(Vec::new());
768 // Browser cell already holds a URL at subscribe time.
769 let (browser_tx, browser_rx) = watch::channel(Some(consent_url()));
770 let (shutdown_tx, shutdown_rx) = watch::channel(false);
771 let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
772 tokio::spawn(run_bus(
773 NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP,
774 state_rx,
775 peer_rx,
776 browser_rx,
777 shutdown_rx,
778 tx,
779 ));
780 let mut w = IpnBusWatcher { rx };
781 let snap = w.next().await.expect("initial snapshot");
782 // The snapshot carries state + net_map (masked) but NOT the pre-existing browser URL.
783 assert_eq!(snap.state, Some(DeviceState::Running));
784 assert_eq!(snap.net_map, Some(Vec::new()));
785 assert_eq!(
786 snap.browse_to_url, None,
787 "pre-existing running-node URL must not be front-loaded"
788 );
789 // It only arrives once it CHANGES post-subscribe.
790 let next = consent_url();
791 let mut next2 = next.clone();
792 next2.set_path("/consent2");
793 browser_tx.send_replace(Some(next2.clone()));
794 let n = w.next().await.expect("browser-url change after subscribe");
795 assert_eq!(n.browse_to_url, Some(next2));
796 drop((state_tx, peer_tx, shutdown_tx));
797 }
798
799 /// Mid-session re-auth, end to end through the bus: control returns `MachineNotAuthorized` on a
800 /// live re-register, the control client surfaces the URL, the runtime bridge sets
801 /// [`DeviceState::NeedsLogin`] — which the bus turns into a `browse_to_url` event — and then a
802 /// successful re-register flips the device back to `Running`, clearing `browse_to_url`. This is
803 /// the user-visible contract of the fix (the dropped re-auth URL now reaches the embedder, and
804 /// goes away once the node recovers), exercised over the same `state_tx` the bridge writes.
805 #[tokio::test]
806 async fn mid_session_reauth_surfaces_browse_to_url_then_clears() {
807 // Subscribe with INITIAL_STATE so the first `next()` (the snapshot) is the barrier proving
808 // the bus task is in its streaming loop before we drive transitions.
809 let (state_tx, _p, _b, _sd, mut w) = harness(
810 NotifyWatchOpt::INITIAL_STATE,
811 DeviceState::Running,
812 Vec::new(),
813 );
814 let snap = w.next().await.expect("initial snapshot");
815 assert_eq!(snap.state, Some(DeviceState::Running));
816 assert_eq!(snap.browse_to_url, None);
817
818 // Mid-session re-auth: the bridge sets NeedsLogin(url) on the state cell.
819 state_tx.send_replace(DeviceState::NeedsLogin(login_url()));
820 let n = w.next().await.expect("needs-login event");
821 assert_eq!(n.state, Some(DeviceState::NeedsLogin(login_url())));
822 assert_eq!(
823 n.browse_to_url,
824 Some(login_url()),
825 "the re-auth URL must reach the embedder as browse_to_url"
826 );
827
828 // A later successful re-register: the netmap self-node handler flips back to Running, and
829 // the bus reports the state change with browse_to_url cleared.
830 state_tx.send_replace(DeviceState::Running);
831 let n = w.next().await.expect("recovery event");
832 assert_eq!(n.state, Some(DeviceState::Running));
833 assert_eq!(
834 n.browse_to_url, None,
835 "recovering to Running clears the browse_to_url"
836 );
837 }
838
839 /// Two distinct consent URLs in sequence stream as two `browse_to_url` events.
840 #[tokio::test]
841 async fn sequential_browser_urls_stream_each() {
842 let (_s, _p, browser_tx, _sd, mut w) = harness(
843 NotifyWatchOpt::INITIAL_STATE,
844 DeviceState::Running,
845 Vec::new(),
846 );
847 let _snap = w.next().await.expect("snapshot barrier");
848 let url_a = consent_url();
849 let mut url_b = consent_url();
850 url_b.set_path("/consent-b");
851 browser_tx.send_replace(Some(url_a.clone()));
852 assert_eq!(
853 w.next().await.expect("first url").browse_to_url,
854 Some(url_a)
855 );
856 browser_tx.send_replace(Some(url_b.clone()));
857 assert_eq!(
858 w.next().await.expect("second url").browse_to_url,
859 Some(url_b)
860 );
861 }
862
863 /// A browser-URL change and a state change arrive as TWO distinct single-field events (the new
864 /// browser arm doesn't coalesce into, or clobber, a concurrent state transition). Companion to
865 /// `streamed_events_are_per_source_not_coalesced` (state+peer), for the browser+state pair.
866 #[tokio::test]
867 async fn browser_url_and_state_change_interleave() {
868 let (state_tx, _p, browser_tx, _sd, mut w) = harness(
869 NotifyWatchOpt::INITIAL_STATE,
870 DeviceState::Running,
871 Vec::new(),
872 );
873 let _snap = w.next().await.expect("snapshot barrier");
874 browser_tx.send_replace(Some(consent_url()));
875 state_tx.send_replace(DeviceState::Expired);
876 let a = w.next().await.expect("first event");
877 let b = w.next().await.expect("second event");
878 for n in [&a, &b] {
879 assert!(
880 n.state.is_some() ^ n.browse_to_url.is_some(),
881 "each streamed event carries exactly one of state / browse_to_url, got {n:?}"
882 );
883 assert_eq!(n.net_map, None);
884 }
885 assert!(
886 a.browse_to_url.is_some() || b.browse_to_url.is_some(),
887 "a browse_to_url event arrived"
888 );
889 assert!(
890 a.state.is_some() || b.state.is_some(),
891 "a state event arrived"
892 );
893 }
894}