Skip to main content

ts_runtime/
control_runner.rs

1use core::{
2    net::{Ipv4Addr, Ipv6Addr},
3    time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9    actor::{ActorRef, Spawn},
10    message::{Context, StreamMessage},
11    prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15    AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16    Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21    derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22    direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29    client: AsyncControlClient,
30    params: Params,
31
32    self_node: watch::Sender<Option<Node>>,
33    /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34    /// server reads this to authorize incoming connections; absent policy means deny-all.
35    ssh_policy: watch::Sender<Option<SshPolicy>>,
36    /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37    tka: watch::Sender<Option<TkaStatus>>,
38    /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
39    /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
40    /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
41    /// the result returns via the [`TkaSynced`] self-message).
42    tka_synced: Option<crate::tka_sync::SyncedTka>,
43    /// Published copy of the synced TKA [`Authority`](ts_tka::Authority) for the verify-and-log
44    /// consumer. `None` until the first successful sync. Observe-only: a reader uses it to *log*
45    /// whether a peer's node-key signature verifies, never to drop a peer (enforcement is a separate
46    /// gated decision).
47    tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
48    /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
49    /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
50    tka_syncing: bool,
51    /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
52    /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
53    cert_domains: watch::Sender<Vec<String>>,
54    /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
55    /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
56    /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
57    /// own cell for the narrower TLS-cert use.
58    dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
59    /// Latest interactive-login / consent URL control asked this node to open
60    /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
61    /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user), and
62    /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) subscribes to it for the bus's
63    /// `browse_to_url` running-node events.
64    ///
65    /// **Sticky, not per-update** (Go `controlclient` `sess.lastPopBrowserURL`): control sends
66    /// `MapResponse.PopBrowserURL` empty on nearly every netmap tick, so this cell is updated ONLY on
67    /// a non-empty URL that differs from its current value (`sticky_update_pop_browser_url`, via
68    /// `send_if_modified` — the cell's own value is the "last URL seen", so no separate mirror is
69    /// needed). It is never reset to `None` by an empty update — matching Go's `direct.go` guard
70    /// `u != "" && u != sess.lastPopBrowserURL`. Updating on every tick would thrash the cell to
71    /// `None` and coalesce the URL away for a `watch` subscriber.
72    pop_browser_url: watch::Sender<Option<url::Url>>,
73    /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
74    /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
75    /// daemon's `tnet netcheck`). Empty until the first measurement.
76    netcheck: watch::Sender<crate::status::NetcheckReport>,
77    /// Background task that bridges the control client's mid-session re-auth URL cell onto
78    /// [`Self::params`]'s device-state cell (sets [`DeviceState::NeedsLogin`] when control returns
79    /// `MachineNotAuthorized` on a live re-register — see [`bridge_reauth_url_to_state`]). Aborted on
80    /// [`Drop`] so it cannot outlive the actor (the [`DataplaneActor`](crate::dataplane) pattern).
81    reauth_bridge: tokio::task::JoinHandle<()>,
82}
83
84impl Drop for ControlRunner {
85    fn drop(&mut self) {
86        // Stop the re-auth bridge so it does not outlive the actor (mirrors `DataplaneActor`).
87        self.reauth_bridge.abort();
88    }
89}
90
91/// Control runner args.
92pub struct Params {
93    /// Control config.
94    pub(crate) config: ts_control::Config,
95
96    /// Auth key (if needed).
97    pub(crate) auth_key: Option<String>,
98
99    /// The [`crate::Env`] for this actor.
100    pub(crate) env: crate::Env,
101
102    /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
103    /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
104    /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
105    /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
106    pub(crate) state_tx: watch::Sender<crate::DeviceState>,
107}
108
109#[doc(hidden)]
110#[derive(Debug, thiserror::Error)]
111pub enum ControlRunnerError {
112    #[error(transparent)]
113    Control(#[from] ControlError),
114
115    #[error(transparent)]
116    Crate(#[from] crate::Error),
117}
118
119impl kameo::Actor for ControlRunner {
120    type Args = Params;
121    type Error = ControlRunnerError;
122
123    async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
124        loop {
125            match AsyncControlClient::check_auth(
126                &params.config,
127                &params.env.keys,
128                params.auth_key.as_deref(),
129            )
130            .await
131            {
132                Ok(()) => break,
133                Err(ControlError::MachineNotAuthorized(u)) => {
134                    tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
135                    // Surface "interactive login required" so a watcher / `wait_until_running` can
136                    // tell the user to authorize, instead of seeing an opaque timeout. Registration
137                    // keeps retrying (transient), so this is not a terminal `Failed`.
138                    params
139                        .state_tx
140                        .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
141                    tokio::time::sleep(Duration::from_secs(5)).await;
142                }
143                Err(e) => {
144                    // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
145                    // specific reason control gave AND publish it as a typed `Failed` state so
146                    // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
147                    // of the opaque `Internal(Actor)` the caller would otherwise see once the
148                    // stopped actor is next asked. Publishing before `return Err` is why the state
149                    // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
150                    let reason = crate::RegistrationError::from(&e);
151                    tracing::error!(error = %e, "registration failed; control runner stopping");
152                    params
153                        .state_tx
154                        .send_replace(crate::DeviceState::Failed(reason));
155                    return Err(e.into());
156                }
157            }
158        }
159        // check_auth succeeded, but the node is not "up" until the netmap stream is actually
160        // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
161        // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
162        // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
163        // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
164        // The control client's live map-poll loop publishes a mid-session re-auth URL here (set when
165        // a re-register returns `MachineNotAuthorized` because the node key expired/was revoked). The
166        // runtime owns the receiver; `connect` takes the sender. Created before `connect` so the
167        // sender is in place for the very first poll, and so the receiver outlives `bring_up`.
168        let (auth_url_tx, auth_url_rx) = watch::channel::<Option<url::Url>>(None);
169
170        let bring_up = async {
171            let (client, stream) = AsyncControlClient::connect(
172                &params.config,
173                &params.env.keys,
174                params.auth_key.as_deref(),
175                auth_url_tx,
176            )
177            .await?;
178
179            DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
180
181            params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
182            params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
183            slf.attach_stream(stream.boxed(), (), ());
184            Ok::<_, ControlRunnerError>(client)
185        };
186
187        let client = match bring_up.await {
188            Ok(client) => client,
189            Err(e) => {
190                tracing::error!(error = %e, "bringing up the control session failed");
191                // The control session never came up; surface it as a transient registration
192                // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
193                // stuck at `Connecting`.
194                params.state_tx.send_replace(crate::DeviceState::Failed(
195                    crate::RegistrationError::NetworkUnreachable,
196                ));
197                return Err(e);
198            }
199        };
200
201        // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
202        // current (and flips to `Expired` if the self-node's key lapses).
203        params.state_tx.send_replace(crate::DeviceState::Running);
204
205        // Bridge the control client's mid-session re-auth URL cell onto the device-state cell: a
206        // `Some(url)` (control returned `MachineNotAuthorized` on a live re-register) becomes
207        // `DeviceState::NeedsLogin(url)` so the IPN bus surfaces `browse_to_url` and the embedder can
208        // prompt the user — the live-session analogue of the initial `check_auth` loop above. The
209        // recovery to `Running` is the netmap self-node handler's job (next good self-node), so this
210        // bridge only forwards `Some`. The task ends when the sender drops (the client's `run` task
211        // ended) and is aborted on actor `Drop`, so it cannot leak past the actor.
212        let reauth_bridge = {
213            let state_tx = params.state_tx.clone();
214            let mut auth_url_rx = auth_url_rx;
215            tokio::spawn(async move {
216                while auth_url_rx.changed().await.is_ok() {
217                    let url = auth_url_rx.borrow_and_update().clone();
218                    bridge_reauth_url_to_state(&state_tx, url.as_ref());
219                }
220            })
221        };
222
223        Ok(Self {
224            client,
225            params,
226            self_node: Default::default(),
227            ssh_policy: Default::default(),
228            tka: Default::default(),
229            tka_synced: None,
230            tka_authority: Default::default(),
231            tka_syncing: false,
232            cert_domains: Default::default(),
233            dns_config: Default::default(),
234            pop_browser_url: Default::default(),
235            netcheck: Default::default(),
236            reauth_bridge,
237        })
238    }
239}
240
241impl ControlRunner {
242    /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
243    /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
244    /// round-trip). The result returns via the [`TkaSynced`] self-message.
245    ///
246    /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
247    /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
248    /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
249    /// `tkaSyncIfNeeded`: a no-op when our head already matches.
250    fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
251        if !tka.is_enabled() {
252            // Lock disabled (or never enabled): drop any synced state and stop publishing an
253            // Authority. Never an error; peers are unaffected.
254            if self.tka_synced.is_some() {
255                self.tka_synced = None;
256                self.tka_authority.send_replace(None);
257            }
258            return;
259        }
260        if self.tka_syncing {
261            return; // a sync is already in flight; the next netmap will re-trigger if still stale
262        }
263        // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
264        // do. A malformed control head is treated as "different" (we'll attempt a sync, which
265        // fail-closes harmlessly).
266        if let Some(synced) = &self.tka_synced
267            && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
268            && synced.authority.head_matches(&control_head)
269        {
270            return;
271        }
272
273        // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
274        // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
275        // `tka_syncing` so we don't spawn a second concurrent sync.
276        self.tka_syncing = true;
277        let current = self.tka_synced.take();
278        let config = self.params.config.clone();
279        let keys = self.params.env.keys.clone();
280        tokio::spawn(async move {
281            let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
282            // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
283            // not allowed). A send failure just means the actor is gone — nothing to do.
284            if let Err(e) = self_ref.tell(TkaSynced { result }).await {
285                tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
286            }
287        });
288    }
289
290    /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
291    /// state + publish the `Authority` (or, on inert/failed sync, leave peers unaffected). Always
292    /// clears the in-flight guard.
293    async fn apply_tka_synced(
294        &mut self,
295        result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
296    ) {
297        self.tka_syncing = false;
298        match result {
299            Ok(Some(synced)) => {
300                tracing::info!(
301                    head = %synced.authority.head().to_base32(),
302                    "TKA sync succeeded; publishing verified Authority (observe-only)"
303                );
304                self.tka_authority
305                    .send_replace(Some(synced.authority.clone()));
306                // Deliver the verified Authority to the peer tracker's observe-only verify-and-log
307                // seam (#136) over the bus. Re-published on every successful sync (no bus replay).
308                if let Err(e) = self
309                    .params
310                    .env
311                    .publish(crate::peer_tracker::TkaAuthorityUpdate(
312                        synced.authority.clone(),
313                    ))
314                    .await
315                {
316                    tracing::warn!(error = %e, "publishing TKA authority to peer tracker failed");
317                }
318                self.tka_synced = Some(synced);
319            }
320            Ok(None) => {
321                // Control has no lock for us (no genesis / disabled): stay inert. Not an error.
322                tracing::debug!("TKA sync: control reported no lock for this node (inert)");
323            }
324            Err(e) => {
325                // Transport or verify failure: log and stay inert. NEVER errors the netmap or drops a
326                // peer. The next netmap update re-triggers a sync attempt.
327                tracing::warn!(error = %e, "TKA sync failed; staying inert (no peer impact)");
328            }
329        }
330    }
331
332    fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
333    where
334        F: FnOnce(&Node) -> R,
335    {
336        let mut sub = self.self_node.subscribe();
337        let mut shutdown = self.params.env.shutdown.clone();
338
339        async move {
340            tokio::select! {
341                _ = shutdown.wait_for(|x| *x) => {
342                    None
343                },
344                node = sub.wait_for(Option::is_some) => {
345                    Some(f(node.ok()?.as_ref()?))
346                },
347            }
348        }
349    }
350}
351
352/// Apply Go's sticky `PopBrowserURL` semantics to the consent-URL `watch` cell.
353///
354/// Control sends `MapResponse.PopBrowserURL` empty on nearly every netmap update, so the cell is
355/// updated ONLY when `incoming` is a non-empty URL that differs from the cell's current value —
356/// Go's `direct.go` guard `u != "" && u != sess.lastPopBrowserURL`. The cell is **never reset to
357/// `None`** by an empty/absent update — the running-node consent URL is sticky for the session.
358/// Updating unconditionally would thrash the cell to `None` on every tick and coalesce the URL away
359/// for a `watch`/bus subscriber.
360///
361/// The dedupe is in-place via [`watch::Sender::send_if_modified`] — the cell's own value is the
362/// "last URL sent" (this sticky path is its only writer), so no separate mirror field is needed and
363/// the watch is woken only on a genuine change (Go's `sess.lastPopBrowserURL` role, for free). This
364/// matches the [`send_if_modified`](watch::Sender::send_if_modified) idiom already used for the
365/// device-state cell in this handler.
366///
367/// Factored out of the netmap-update handler so the (easy-to-regress) sticky logic is unit-testable
368/// against a plain `watch` channel without standing up the actor.
369fn sticky_update_pop_browser_url(
370    cell: &watch::Sender<Option<url::Url>>,
371    incoming: Option<&url::Url>,
372) {
373    if let Some(url) = incoming {
374        cell.send_if_modified(|current| {
375            if current.as_ref() == Some(url) {
376                false
377            } else {
378                *current = Some(url.clone());
379                true
380            }
381        });
382    }
383}
384
385/// Map a mid-session re-auth URL surfaced by the control client onto the device-state cell.
386///
387/// The control client's live map-poll loop publishes an `Option<url::Url>` into a `watch` cell when
388/// a re-register hits `MachineNotAuthorized` (the node key expired/was revoked mid-session — see
389/// [`ts_control::AsyncControlClient::connect`]'s `auth_url_tx`). `ts_control` cannot name
390/// [`DeviceState`] (it must not depend on this crate), so this bridge fn does the translation:
391/// a `Some(url)` sets [`DeviceState::NeedsLogin`]`(url)` so the IPN bus derives `browse_to_url` and
392/// the embedder can prompt the user, exactly like the initial-registration `check_auth` path.
393///
394/// **Only `Some` drives a transition; `None` is ignored here.** The clear back to
395/// [`DeviceState::Running`] is owned by the netmap self-node handler (the next good self-node flips
396/// it — see the `StreamMessage::Next` arm), which is the authoritative "we are up again" signal; an
397/// independent `None`-clear in this bridge could race that and is unnecessary. The
398/// [`send_if_modified`](watch::Sender::send_if_modified) guard fires the watch only on a genuine
399/// state change (it is a no-op when the cell already holds `NeedsLogin(url)` for the same URL), so a
400/// re-auth URL re-surfaced across retries does not thrash the cell — mirroring the device-state
401/// dedupe in the netmap handler.
402///
403/// Factored out so the (regress-prone) map-and-guard is unit-testable against a plain `watch`
404/// channel without standing up the actor (mirrors [`sticky_update_pop_browser_url`]).
405pub(crate) fn bridge_reauth_url_to_state(
406    state_tx: &watch::Sender<crate::DeviceState>,
407    incoming: Option<&url::Url>,
408) {
409    if let Some(url) = incoming {
410        let next = crate::DeviceState::NeedsLogin(url.clone());
411        state_tx.send_if_modified(|current| {
412            if *current == next {
413                false
414            } else {
415                *current = next.clone();
416                true
417            }
418        });
419    }
420}
421
422// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
423// those generated fields carry no doc and can't take attributes, so wrap in a module where
424// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
425// are re-exported so callers keep referencing them at `control_runner::<Name>`.
426pub use msg_impl::*;
427
428#[allow(missing_docs)]
429mod msg_impl {
430    use kameo::{message::Context, reply::DelegatedReply};
431
432    use super::*;
433
434    #[kameo::messages]
435    impl ControlRunner {
436        /// Fetch the IPv4 address for this tailscale device.
437        #[message(ctx)]
438        pub fn ipv4(
439            &self,
440            ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
441        ) -> DelegatedReply<Option<Ipv4Addr>> {
442            let (deleg, replier) = ctx.reply_sender();
443
444            if let Some(replier) = replier {
445                let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
446
447                tokio::spawn(async move {
448                    let ip = fut.await;
449                    replier.send(ip);
450                });
451            }
452
453            deleg
454        }
455
456        /// Fetch the IPv6 address for this tailscale device.
457        #[message(ctx)]
458        pub fn ipv6(
459            &self,
460            ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
461        ) -> DelegatedReply<Option<Ipv6Addr>> {
462            let (deleg, replier) = ctx.reply_sender();
463
464            if let Some(replier) = replier {
465                let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
466
467                tokio::spawn(async move {
468                    let ip = fut.await;
469                    replier.send(ip);
470                });
471            }
472
473            deleg
474        }
475
476        /// Fetch the self node for this tailscale device.
477        #[message(ctx)]
478        pub fn self_node(
479            &self,
480            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
481        ) -> DelegatedReply<Option<Node>> {
482            let (deleg, replier) = ctx.reply_sender();
483
484            if let Some(replier) = replier {
485                let node = self.with_self_node(|node| node.clone());
486
487                tokio::spawn(async move {
488                    let node = node.await;
489                    replier.send(node)
490                });
491            }
492
493            deleg
494        }
495
496        /// Fetch the current Tailscale SSH policy, if control has pushed one.
497        ///
498        /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
499        /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
500        /// for a value: an absent policy is a legitimate, immediate answer.
501        #[message]
502        pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
503            self.ssh_policy.borrow().clone()
504        }
505
506        /// Fetch the current Tailnet Lock status, if control has pushed one.
507        ///
508        /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
509        #[message]
510        pub fn current_tka_status(&self) -> Option<TkaStatus> {
511            self.tka.borrow().clone()
512        }
513
514        /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
515        ///
516        /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
517        /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
518        /// does not block waiting for a value).
519        #[message]
520        pub fn cert_domains(&self) -> Vec<String> {
521            self.cert_domains.borrow().clone()
522        }
523
524        /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
525        /// control has sent no DNS config yet. An immediate answer (does not block); the facade
526        /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
527        #[message]
528        pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
529            self.dns_config.borrow().clone()
530        }
531
532        /// The interactive-login / consent URL control last asked this node to open
533        /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
534        /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
535        #[message]
536        pub fn pop_browser_url(&self) -> Option<url::Url> {
537            self.pop_browser_url.borrow().clone()
538        }
539
540        /// Subscribe to the interactive-login / consent URL cell (`MapResponse.PopBrowserURL`).
541        ///
542        /// Returns a [`watch::Receiver`] whose value is the latest running-node consent URL, used by
543        /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) to surface `browse_to_url`
544        /// events mid-session. The cell is sticky (updated only on a new non-empty URL, never reset
545        /// to `None` by an empty update — see the field docs), so a subscriber is not thrashed and a
546        /// late subscriber sees the current URL. The initial value is `None` until control sends one.
547        #[message(derive(Clone))]
548        pub fn watch_browser_url(&self) -> watch::Receiver<Option<url::Url>> {
549            self.pop_browser_url.subscribe()
550        }
551
552        /// The latest network-conditions report (preferred DERP region + per-region latencies). An
553        /// immediate answer (does not block); empty before the first DERP-latency measurement. The
554        /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
555        #[message]
556        pub fn netcheck(&self) -> crate::status::NetcheckReport {
557            self.netcheck.borrow().clone()
558        }
559
560        /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
561        ///
562        /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
563        /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
564        /// for the round-trip.
565        #[message(ctx)]
566        pub fn fetch_id_token(
567            &self,
568            ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
569            audience: String,
570        ) -> DelegatedReply<Result<String, IdTokenError>> {
571            let (deleg, replier) = ctx.reply_sender();
572
573            if let Some(replier) = replier {
574                let config = self.params.config.clone();
575                let keys = self.params.env.keys.clone();
576                tokio::spawn(async move {
577                    let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
578                    replier.send(result);
579                });
580            }
581
582            deleg
583        }
584
585        /// Log this node out of the tailnet: deregister it by expiring its current node key.
586        ///
587        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
588        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
589        /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
590        /// control-plane state change only — it does NOT stop this actor or tear down the datapath
591        /// (the caller follows up with the normal runtime shutdown), and it does not touch the
592        /// on-disk node key, so re-registering with the same key is the re-login path.
593        #[message(ctx)]
594        pub fn logout(
595            &self,
596            ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
597        ) -> DelegatedReply<Result<(), LogoutError>> {
598            let (deleg, replier) = ctx.reply_sender();
599
600            if let Some(replier) = replier {
601                let config = self.params.config.clone();
602                let keys = self.params.env.keys.clone();
603                tokio::spawn(async move {
604                    let result = ts_control::logout(&config, &keys).await;
605                    replier.send(result);
606                });
607            }
608
609            deleg
610        }
611
612        /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
613        /// `LocalClient.SetDNS`).
614        ///
615        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
616        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
617        /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
618        /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
619        /// match, so the surfaced API takes only `name` + `value`.
620        #[message(ctx)]
621        pub fn set_dns(
622            &self,
623            ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
624            name: String,
625            value: String,
626        ) -> DelegatedReply<Result<(), SetDnsError>> {
627            let (deleg, replier) = ctx.reply_sender();
628
629            if let Some(replier) = replier {
630                let config = self.params.config.clone();
631                let keys = self.params.env.keys.clone();
632                tokio::spawn(async move {
633                    let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
634                    replier.send(result);
635                });
636            }
637
638            deleg
639        }
640    }
641
642    /// The reply type of the [`get_cert_pair`](ControlRunner::get_cert_pair) message: the issued
643    /// `(cert_chain_pem, key_pem)` PEM pair (the `tnet cert` surface) or a [`ts_control::CertError`].
644    /// Aliased so the message's `Context` type stays under clippy's `type_complexity` bar (the
645    /// nested `Result<(String, String), _>` trips it inline).
646    #[cfg(feature = "acme")]
647    pub type CertPairReply = Result<(String, String), ts_control::CertError>;
648
649    // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
650    // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
651    // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
652    // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
653    // block keeps the default build clean.
654    #[cfg(feature = "acme")]
655    #[kameo::messages]
656    impl ControlRunner {
657        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
658        /// client-side ACME DNS-01 engine (`acme` feature).
659        ///
660        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
661        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
662        /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
663        /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
664        ///
665        /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
666        /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
667        /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
668        /// allows it). Persisting a generated key back into the key file is the embedder's job (no
669        /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
670        /// publish 501s.
671        #[message(ctx)]
672        pub fn get_certificate(
673            &self,
674            ctx: &mut Context<
675                Self,
676                DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
677            >,
678            name: String,
679        ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
680            let (deleg, replier) = ctx.reply_sender();
681
682            if let Some(replier) = replier {
683                let config = self.params.config.clone();
684                let keys = self.params.env.keys.clone();
685                tokio::spawn(async move {
686                    let result = issue_certificate(&config, &keys, &name).await;
687                    replier.send(result);
688                });
689            }
690
691            deleg
692        }
693
694        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` and return the
695        /// **PEM pair** — `(cert_chain_pem, key_pem)` — for writing the on-disk `.crt` + `.key`
696        /// (the daemon's `tnet cert`, Go's `LocalClient.CertPair`). `acme` feature.
697        ///
698        /// Identical issuance to [`get_certificate`](Self::get_certificate) (same client-side ACME
699        /// DNS-01 flow, same set-dns publish, same account-key handling), only the *shape* of the
700        /// result differs: this surfaces the raw chain + leaf-key PEMs instead of the opaque
701        /// [`CertifiedKey`](ts_control::tls::CertifiedKey). The leaf **private key** PEM is the
702        /// second tuple element and is NEVER logged — the spawned task sends it straight back to the
703        /// replier. SaaS-only: against a self-hosted control plane the set-dns publish 501s.
704        #[message(ctx)]
705        pub fn get_cert_pair(
706            &self,
707            ctx: &mut Context<Self, DelegatedReply<CertPairReply>>,
708            name: String,
709        ) -> DelegatedReply<CertPairReply> {
710            let (deleg, replier) = ctx.reply_sender();
711
712            if let Some(replier) = replier {
713                let config = self.params.config.clone();
714                let keys = self.params.env.keys.clone();
715                tokio::spawn(async move {
716                    let result = issue_cert_pair(&config, &keys, &name).await;
717                    replier.send(result);
718                });
719            }
720
721            deleg
722        }
723    }
724}
725
726/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
727/// returning just the ready-to-serve [`CertifiedKey`](ts_control::tls::CertifiedKey) (the
728/// `get_certificate` / `ListenTLS` path).
729///
730/// Thin wrapper over [`issue_cert_pair`] that drops the PEMs — one issuance, this caller just
731/// doesn't need the on-disk pair. See [`issue_cert_pair`] for the account-key handling.
732#[cfg(feature = "acme")]
733async fn issue_certificate(
734    config: &ts_control::Config,
735    keys: &ts_keys::NodeState,
736    name: &str,
737) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
738    issue_cert_pair_inner(config, keys, name)
739        .await
740        .map(|issued| issued.certified)
741}
742
743/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
744/// returning the **PEM pair** `(cert_chain_pem, key_pem)` for the daemon's on-disk `.crt`/`.key`
745/// (`tnet cert`, Go `LocalClient.CertPair`).
746///
747/// Same single issuance as [`issue_certificate`]; only the result shape differs. The leaf
748/// **private key** PEM is the second element and is NEVER logged here.
749#[cfg(feature = "acme")]
750async fn issue_cert_pair(
751    config: &ts_control::Config,
752    keys: &ts_keys::NodeState,
753    name: &str,
754) -> Result<(String, String), ts_control::CertError> {
755    issue_cert_pair_inner(config, keys, name)
756        .await
757        .map(|issued| (issued.cert_chain_pem, issued.key_pem))
758}
759
760/// Shared issuance core for [`issue_certificate`] and [`issue_cert_pair`]: load (or generate) the
761/// ACME account key, target Let's Encrypt production, and run one DNS-01 issuance, returning the
762/// full [`IssuedCert`](ts_control::acme::IssuedCert) so each caller projects out what it needs (one
763/// ACME order, two consumers).
764///
765/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
766/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
767/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
768/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]). Never logs the leaf
769/// private key.
770#[cfg(feature = "acme")]
771async fn issue_cert_pair_inner(
772    config: &ts_control::Config,
773    keys: &ts_keys::NodeState,
774    name: &str,
775) -> Result<ts_control::acme::IssuedCert, ts_control::CertError> {
776    let account_key = match keys.acme_account_key.as_deref() {
777        Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
778        None => {
779            tracing::debug!(
780                "no persisted ACME account key in key state; generating an ephemeral per-call key \
781                 (a new ACME account this issuance — not persisted back)"
782            );
783            ts_control::acme::AcmeAccountKey::generate()?.0
784        }
785    };
786    let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
787        .parse()
788        .map_err(|e| {
789            ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
790        })?;
791    ts_control::issue_cert_pair_via_setdns(config, keys, name, &account_key, &directory).await
792}
793
794impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
795    type Reply = ();
796
797    async fn handle(
798        &mut self,
799        msg: StreamMessage<Arc<StateUpdate>, (), ()>,
800        ctx: &mut Context<Self, Self::Reply>,
801    ) {
802        match msg {
803            StreamMessage::Started(_) => {
804                tracing::trace!("started listening to state updates");
805            }
806
807            StreamMessage::Next(msg) => {
808                if let Some(node) = msg.node.as_ref() {
809                    // Reflect node-key expiry into the device state: control delivering a self-node
810                    // whose key is in the past means the node must re-authenticate. Otherwise the
811                    // arrival of a fresh self-node confirms we are Running (recovers the state if a
812                    // prior update had flipped it to Expired).
813                    let now_unix = std::time::SystemTime::now()
814                        .duration_since(std::time::UNIX_EPOCH)
815                        .map(|d| d.as_secs() as i64)
816                        .unwrap_or(0);
817                    let next = if node.key_expired_at_unix(now_unix) {
818                        crate::DeviceState::Expired
819                    } else {
820                        crate::DeviceState::Running
821                    };
822                    // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
823                    // self-node arrives on every netmap update).
824                    self.params.state_tx.send_if_modified(|s| {
825                        if *s != next {
826                            *s = next.clone();
827                            true
828                        } else {
829                            false
830                        }
831                    });
832
833                    self.self_node.send_replace(Some(node.clone()));
834                }
835
836                if let Some(policy) = msg.ssh_policy.as_ref() {
837                    self.ssh_policy.send_replace(Some(policy.clone()));
838                }
839
840                if let Some(tka) = msg.tka.as_ref() {
841                    self.tka.send_replace(Some(tka.clone()));
842                    self.maybe_sync_tka(tka, ctx.actor_ref().clone());
843                }
844
845                // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
846                // An update with no DNS config, or one carrying no cert domains, means "none" — Go
847                // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
848                let cert_domains = msg
849                    .dns_config
850                    .as_ref()
851                    .map(|d| d.cert_domains.clone())
852                    .unwrap_or_default();
853                self.cert_domains.send_replace(cert_domains);
854
855                // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
856                // `None` when control sent no DNS config on this update — distinct from a present but
857                // empty config (Go `netmap.NetworkMap.DNS`).
858                self.dns_config.send_replace(msg.dns_config.clone());
859
860                // Track the interactive-login URL for `Device::pop_browser_url` /
861                // `Runtime::watch_ipn_bus`. See `sticky_update_pop_browser_url` for the Go-faithful
862                // sticky semantics (update only on a new non-empty URL; never reset to `None`).
863                sticky_update_pop_browser_url(&self.pop_browser_url, msg.pop_browser_url.as_ref());
864
865                if let Err(e) = self.params.env.publish(msg).await {
866                    tracing::error!(error = %e, "publishing netmap update");
867                }
868            }
869
870            StreamMessage::Finished(_) => {
871                tracing::error!("state update stream terminated")
872            }
873        }
874    }
875}
876
877/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
878/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
879/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
880/// [`ControlRunner::apply_tka_synced`](ControlRunner).
881#[doc(hidden)]
882pub struct TkaSynced {
883    pub(crate) result:
884        Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
885}
886
887impl Message<TkaSynced> for ControlRunner {
888    type Reply = ();
889
890    async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
891        self.apply_tka_synced(msg.result).await;
892    }
893}
894
895impl Message<DerpLatencyMeasurement> for ControlRunner {
896    type Reply = ();
897
898    async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
899        let measurements = msg.measurement.as_ref().clone();
900
901        // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
902        // the same measurements, before the home-region short-circuit below — an empty set still
903        // yields a (default/empty) report rather than a stale one.
904        self.netcheck
905            .send_replace(crate::status::NetcheckReport::from_region_results(
906                &measurements,
907            ));
908
909        let Some(result) = measurements.first() else {
910            tracing::debug!("derp latency measurements empty");
911            return;
912        };
913
914        let iter = measurements.iter().map(|result| {
915            (
916                result.latency_map_key.as_str(),
917                result.latency.as_secs_f64(),
918            )
919        });
920
921        tracing::debug!(selected_region_id = ?result.id, "updating home region");
922
923        self.client.set_home_region(result.id, iter).await;
924    }
925}
926
927impl Message<EndpointAdvertisement> for ControlRunner {
928    type Reply = ();
929
930    async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
931        let endpoints: Vec<Endpoint> = msg
932            .endpoints
933            .iter()
934            .map(|ep| Endpoint {
935                endpoint: ep.addr,
936                ty: match ep.ty {
937                    SelfEndpointType::Local => EndpointType::Local,
938                    SelfEndpointType::Stun => EndpointType::Stun,
939                    SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
940                },
941            })
942            .collect();
943
944        tracing::debug!(
945            n_endpoints = endpoints.len(),
946            "advertising endpoints to control"
947        );
948
949        self.client.set_endpoints(endpoints).await;
950    }
951}
952
953/// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control — the wire
954/// half of a runtime [`Runtime::set_advertise_routes`](crate::Runtime::set_advertise_routes). Sent
955/// as a direct `ask` from the runtime (not over the bus), so the route change reaches the live
956/// map-poll client. `routes` is the final advertised set the caller wants control to grant.
957#[derive(Debug)]
958pub struct SetAdvertiseRoutes {
959    /// The prefixes to advertise to control (already filtered to the final set).
960    pub routes: Vec<ipnet::IpNet>,
961}
962
963impl Message<SetAdvertiseRoutes> for ControlRunner {
964    type Reply = ();
965
966    async fn handle(&mut self, msg: SetAdvertiseRoutes, _ctx: &mut Context<Self, Self::Reply>) {
967        tracing::debug!(n_routes = msg.routes.len(), "advertising routes to control");
968        self.client.set_routable_ips(msg.routes).await;
969    }
970}
971
972/// Update this node's `Hostinfo.Hostname` at control — the wire half of a runtime
973/// [`Runtime::set_hostname`](crate::Runtime::set_hostname). A direct `ask` from the runtime, so the
974/// change reaches the live map-poll client.
975#[derive(Debug)]
976pub struct SetHostname {
977    /// The new hostname to report to control.
978    pub hostname: String,
979}
980
981impl Message<SetHostname> for ControlRunner {
982    type Reply = ();
983
984    async fn handle(&mut self, msg: SetHostname, _ctx: &mut Context<Self, Self::Reply>) {
985        tracing::debug!("updating hostname at control");
986        self.client.set_hostname(msg.hostname).await;
987    }
988}
989
990#[cfg(test)]
991mod reauth_bridge_tests {
992    use tokio::sync::watch;
993
994    use super::bridge_reauth_url_to_state;
995    use crate::DeviceState;
996
997    fn url(s: &str) -> url::Url {
998        s.parse().unwrap()
999    }
1000
1001    /// The bridge maps a surfaced re-auth URL onto `DeviceState::NeedsLogin(url)` — the fix's core:
1002    /// a mid-session `MachineNotAuthorized` (forwarded by the control client as `Some(url)`) becomes
1003    /// the "needs login" state the IPN bus turns into `browse_to_url`.
1004    #[test]
1005    fn bridge_maps_auth_url_to_needs_login() {
1006        let u = url("https://login.example/auth");
1007        let (tx, rx) = watch::channel(DeviceState::Running);
1008
1009        bridge_reauth_url_to_state(&tx, Some(&u));
1010
1011        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1012    }
1013
1014    /// `None` never drives a transition — the recovery to `Running` is the netmap self-node
1015    /// handler's job, so the bridge ignores a `None` and leaves the state untouched.
1016    #[test]
1017    fn bridge_none_leaves_state_unchanged() {
1018        let (tx, rx) = watch::channel(DeviceState::Running);
1019
1020        bridge_reauth_url_to_state(&tx, None);
1021
1022        assert_eq!(*rx.borrow(), DeviceState::Running);
1023    }
1024
1025    /// Re-surfacing the same URL across retries does not re-fire the watch (`send_if_modified`
1026    /// dedupe against the cell's current value), so a stuck re-auth does not thrash subscribers.
1027    #[test]
1028    fn bridge_same_url_does_not_refire() {
1029        let u = url("https://login.example/auth");
1030        let (tx, mut rx) = watch::channel(DeviceState::Running);
1031
1032        bridge_reauth_url_to_state(&tx, Some(&u)); // first: fires
1033        assert!(rx.has_changed().unwrap(), "first NeedsLogin fires");
1034        rx.mark_unchanged();
1035        bridge_reauth_url_to_state(&tx, Some(&u)); // same URL: deduped
1036        assert!(
1037            !rx.has_changed().unwrap(),
1038            "the same re-auth URL must not re-fire the state watch"
1039        );
1040    }
1041
1042    /// A genuinely different re-auth URL after a prior one fires again (the dedupe tracks changes,
1043    /// it does not pin the first URL forever).
1044    #[test]
1045    fn bridge_new_url_after_prior_fires() {
1046        let a = url("https://login.example/a");
1047        let b = url("https://login.example/b");
1048        let (tx, rx) = watch::channel(DeviceState::Running);
1049
1050        bridge_reauth_url_to_state(&tx, Some(&a));
1051        bridge_reauth_url_to_state(&tx, Some(&b));
1052
1053        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(b));
1054    }
1055
1056    /// End-to-end of the *clear* contract: after the bridge sets `NeedsLogin`, the netmap self-node
1057    /// path (modeled here as a direct `send_replace(Running)`, the exact transition the
1058    /// `StreamMessage::Next` handler performs on the next good self-node) flips back to `Running`.
1059    /// This pins that the bridge does NOT need a `None`-clear arm — recovery is owned elsewhere.
1060    #[test]
1061    fn running_netmap_clears_needs_login() {
1062        let u = url("https://login.example/auth");
1063        let (tx, rx) = watch::channel(DeviceState::Running);
1064
1065        bridge_reauth_url_to_state(&tx, Some(&u));
1066        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1067
1068        // The self-node handler's recovery transition (next good netmap self-node → Running).
1069        tx.send_replace(DeviceState::Running);
1070        assert_eq!(*rx.borrow(), DeviceState::Running);
1071    }
1072}
1073
1074#[cfg(test)]
1075mod sticky_pop_browser_url_tests {
1076    use tokio::sync::watch;
1077
1078    use super::sticky_update_pop_browser_url;
1079
1080    fn url(s: &str) -> url::Url {
1081        s.parse().unwrap()
1082    }
1083
1084    /// A non-empty URL publishes to the cell.
1085    #[test]
1086    fn non_empty_url_publishes() {
1087        let (tx, rx) = watch::channel(None);
1088        let u = url("https://login.example/consent");
1089        sticky_update_pop_browser_url(&tx, Some(&u));
1090        assert_eq!(*rx.borrow(), Some(u));
1091    }
1092
1093    /// An absent (`None`) update — the common netmap tick — must NOT reset the cell. This is the
1094    /// regression guard for the thrash bug (a reset-every-tick would coalesce the URL away on the bus).
1095    #[test]
1096    fn absent_update_does_not_reset() {
1097        let u = url("https://login.example/consent");
1098        let (tx, rx) = watch::channel(Some(u.clone()));
1099        // Simulate many empty netmap updates.
1100        for _ in 0..5 {
1101            sticky_update_pop_browser_url(&tx, None);
1102        }
1103        assert_eq!(
1104            *rx.borrow(),
1105            Some(u),
1106            "empty updates must not clear the URL"
1107        );
1108    }
1109
1110    /// The same URL repeated does not re-fire the watch (in-place dedupe via `send_if_modified`), so
1111    /// a subscriber isn't woken spuriously. Proven by the borrow not having been marked changed.
1112    #[test]
1113    fn repeated_same_url_does_not_refire() {
1114        let u = url("https://login.example/consent");
1115        let (tx, mut rx) = watch::channel(None);
1116        sticky_update_pop_browser_url(&tx, Some(&u)); // first: fires
1117        assert!(rx.has_changed().unwrap(), "first non-empty URL fires");
1118        rx.mark_unchanged();
1119        sticky_update_pop_browser_url(&tx, Some(&u)); // same: deduped
1120        assert!(
1121            !rx.has_changed().unwrap(),
1122            "repeating the same URL must not re-fire the watch"
1123        );
1124    }
1125
1126    /// A genuinely new URL after a prior one fires again (sticky but tracks changes).
1127    #[test]
1128    fn new_url_after_prior_fires() {
1129        let a = url("https://login.example/a");
1130        let b = url("https://login.example/b");
1131        let (tx, rx) = watch::channel(None);
1132        sticky_update_pop_browser_url(&tx, Some(&a));
1133        sticky_update_pop_browser_url(&tx, Some(&b));
1134        assert_eq!(*rx.borrow(), Some(b));
1135    }
1136
1137    /// The realistic session sequence: a URL stays sticky through a run of `None` ticks, and a
1138    /// *different* URL after that gap still fires. Chains the legs the other tests cover in isolation
1139    /// (the actual control cadence is "URL, then many empty updates, then maybe a new URL").
1140    #[test]
1141    fn sticky_through_none_gap_then_new_url_fires() {
1142        let a = url("https://login.example/a");
1143        let b = url("https://login.example/b");
1144        let (tx, rx) = watch::channel(None);
1145        sticky_update_pop_browser_url(&tx, Some(&a));
1146        for _ in 0..3 {
1147            sticky_update_pop_browser_url(&tx, None);
1148        }
1149        assert_eq!(*rx.borrow(), Some(a), "stayed sticky through the None gap");
1150        sticky_update_pop_browser_url(&tx, Some(&b));
1151        assert_eq!(
1152            *rx.borrow(),
1153            Some(b),
1154            "a new URL after a None gap still fires"
1155        );
1156    }
1157
1158    /// Returning to a previously-seen URL (A → B → A) re-fires: the dedupe is against the cell's
1159    /// *current* value, not a full history, so A after B is a genuine change.
1160    #[test]
1161    fn returning_to_prior_url_refires() {
1162        let a = url("https://login.example/a");
1163        let b = url("https://login.example/b");
1164        let (tx, mut rx) = watch::channel(None);
1165        sticky_update_pop_browser_url(&tx, Some(&a));
1166        sticky_update_pop_browser_url(&tx, Some(&b));
1167        rx.mark_unchanged();
1168        sticky_update_pop_browser_url(&tx, Some(&a)); // back to A: differs from current (B) → fires
1169        assert!(
1170            rx.has_changed().unwrap(),
1171            "returning to a prior URL re-fires"
1172        );
1173        assert_eq!(*rx.borrow(), Some(a));
1174    }
1175
1176    /// End-to-end de-thrash: feed a realistic netmap cadence (empty, empty, URL, empty, empty)
1177    /// through the producer into a cell, and count the changes a `run_bus`-style subscriber would
1178    /// observe via `changed()`. The whole point of the fix is that exactly ONE change survives the
1179    /// surrounding `None` thrash — the pre-fix code (`send_replace` every tick) would have woken the
1180    /// subscriber on every empty tick and coalesced the URL away. This exercises the producer + the
1181    /// watch-subscribe path together (the two halves the unit tests cover in isolation).
1182    #[tokio::test]
1183    async fn end_to_end_one_change_survives_none_thrash() {
1184        let u = url("https://login.example/consent");
1185        let (tx, mut rx) = watch::channel(None);
1186        // The cadence control actually sends: mostly-empty MapResponses with one carrying the URL.
1187        let cadence = [None, None, Some(&u), None, None];
1188        for incoming in cadence {
1189            sticky_update_pop_browser_url(&tx, incoming);
1190        }
1191        // A subscriber sees exactly one change, and it carries the URL (not a coalesced `None`).
1192        let mut changes = 0;
1193        while rx.has_changed().unwrap() {
1194            let v = rx.borrow_and_update().clone();
1195            changes += 1;
1196            assert_eq!(v, Some(u.clone()), "the surviving change carries the URL");
1197        }
1198        assert_eq!(changes, 1, "exactly one change survives the None thrash");
1199    }
1200}