Skip to main content

ts_runtime/
control_runner.rs

1use core::{
2    net::{Ipv4Addr, Ipv6Addr},
3    time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9    actor::{ActorRef, Spawn},
10    message::{Context, StreamMessage},
11    prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15    AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16    Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus, TkaSyncError, tka_disable,
17    tka_init_begin, tka_init_finish, tka_submit_signature,
18};
19use ts_magicsock::SelfEndpointType;
20
21use crate::{
22    derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
23    direct::EndpointAdvertisement,
24};
25
26/// Actor responsible for maintaining the connection to control.
27///
28/// This actor is responsible for proxying the map response stream onto the message bus.
29pub struct ControlRunner {
30    client: AsyncControlClient,
31    params: Params,
32
33    self_node: watch::Sender<Option<Node>>,
34    /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
35    /// server reads this to authorize incoming connections; absent policy means deny-all.
36    ssh_policy: watch::Sender<Option<SshPolicy>>,
37    /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
38    tka: watch::Sender<Option<TkaStatus>>,
39    /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
40    /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
41    /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
42    /// the result returns via the [`TkaSynced`] self-message).
43    tka_synced: Option<crate::tka_sync::SyncedTka>,
44    /// The verified TKA [`Authority`](ts_tka::Authority) the peer tracker **enforces** (Go
45    /// `tkaFilterNetmapLocked`). `None` until the first successful sync, and reset to `None` when the
46    /// lock is disabled. This is the SOLE delivery channel to the peer tracker (which holds the
47    /// matching `Receiver` and reads it on every peer upsert): a `watch` cell, not a bus message, so
48    /// the latest value is always readable, never dropped under load, and writes are strictly ordered
49    /// by this actor — a disable (`None`) can never be reordered behind or dropped before a stale
50    /// `Some`. Written only from [`apply_tka_synced`] (enable) and [`maybe_sync_tka`] (disable), both
51    /// on the actor thread. The published `Authority` has always passed `VerifiedAumChain::verify`.
52    tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
53    /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
54    /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
55    tka_syncing: bool,
56    /// Monotonic generation stamped when a disable (or a fresh sync) supersedes any in-flight sync.
57    /// `maybe_sync_tka` bumps this on a disable transition and captures it into each spawned sync;
58    /// [`apply_tka_synced`] discards a sync result whose captured generation is stale, so a lock
59    /// disabled *while a sync was in flight* is never re-enabled by that sync's late `Ok(Some)`
60    /// (the in-flight window the `tka_synced.is_some()` disable guard alone does not cover).
61    tka_generation: u64,
62    /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
63    /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
64    cert_domains: watch::Sender<Vec<String>>,
65    /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
66    /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
67    /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
68    /// own cell for the narrower TLS-cert use.
69    dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
70    /// Latest interactive-login / consent URL control asked this node to open
71    /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
72    /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user), and
73    /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) subscribes to it for the bus's
74    /// `browse_to_url` running-node events.
75    ///
76    /// **Sticky, not per-update** (Go `controlclient` `sess.lastPopBrowserURL`): control sends
77    /// `MapResponse.PopBrowserURL` empty on nearly every netmap tick, so this cell is updated ONLY on
78    /// a non-empty URL that differs from its current value (`sticky_update_pop_browser_url`, via
79    /// `send_if_modified` — the cell's own value is the "last URL seen", so no separate mirror is
80    /// needed). It is never reset to `None` by an empty update — matching Go's `direct.go` guard
81    /// `u != "" && u != sess.lastPopBrowserURL`. Updating on every tick would thrash the cell to
82    /// `None` and coalesce the URL away for a `watch` subscriber.
83    pop_browser_url: watch::Sender<Option<url::Url>>,
84    /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
85    /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
86    /// daemon's `tnet netcheck`). Empty until the first measurement.
87    netcheck: watch::Sender<crate::status::NetcheckReport>,
88    /// The DERP home region currently selected, with the latency measured for it at selection time.
89    /// `None` until the first home region is chosen. Used to apply selection **hysteresis** (Go
90    /// `netcheck.addReportHistoryAndSetPreferredDERP`): the home region is only switched when a new
91    /// region is *meaningfully* lower-latency than the current one, so jitter between near-equal
92    /// regions does not flap the home relay (which would cause repeated reconnects + brief loss).
93    home_region: Option<(ts_derp::RegionId, core::time::Duration)>,
94    /// Background task that bridges the control client's mid-session re-auth URL cell onto
95    /// [`Self::params`]'s device-state cell (sets [`DeviceState::NeedsLogin`] when control returns
96    /// `MachineNotAuthorized` on a live re-register — see [`bridge_reauth_url_to_state`]). Aborted on
97    /// [`Drop`] so it cannot outlive the actor (the [`DataplaneActor`](crate::dataplane) pattern).
98    reauth_bridge: tokio::task::JoinHandle<()>,
99}
100
101impl Drop for ControlRunner {
102    fn drop(&mut self) {
103        // Stop the re-auth bridge so it does not outlive the actor (mirrors `DataplaneActor`).
104        self.reauth_bridge.abort();
105    }
106}
107
108/// Control runner args.
109pub struct Params {
110    /// Control config.
111    pub(crate) config: ts_control::Config,
112
113    /// Auth key (if needed).
114    pub(crate) auth_key: Option<String>,
115
116    /// The [`crate::Env`] for this actor.
117    pub(crate) env: crate::Env,
118
119    /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
120    /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
121    /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
122    /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
123    pub(crate) state_tx: watch::Sender<crate::DeviceState>,
124
125    /// Sender for the TKA enforcement-authority cell the peer tracker reads (Go
126    /// `tkaFilterNetmapLocked`). Created in [`Runtime::spawn`](crate::Runtime) and threaded into BOTH
127    /// the peer tracker (the `Receiver`) and this runner (the `Sender`), so the runner is the sole
128    /// writer and the tracker reads the latest verified `Authority` on demand. `None` = no lock /
129    /// disabled (admit all).
130    pub(crate) tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
131}
132
133#[doc(hidden)]
134#[derive(Debug, thiserror::Error)]
135pub enum ControlRunnerError {
136    #[error(transparent)]
137    Control(#[from] ControlError),
138
139    #[error(transparent)]
140    Crate(#[from] crate::Error),
141}
142
143impl kameo::Actor for ControlRunner {
144    type Args = Params;
145    type Error = ControlRunnerError;
146
147    async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
148        loop {
149            match AsyncControlClient::check_auth(
150                &params.config,
151                &params.env.keys,
152                params.auth_key.as_deref(),
153            )
154            .await
155            {
156                Ok(()) => break,
157                Err(ControlError::MachineNotAuthorized(u)) => {
158                    tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
159                    // Surface "interactive login required" so a watcher / `wait_until_running` can
160                    // tell the user to authorize, instead of seeing an opaque timeout. Registration
161                    // keeps retrying (transient), so this is not a terminal `Failed`.
162                    params
163                        .state_tx
164                        .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
165                    tokio::time::sleep(Duration::from_secs(5)).await;
166                }
167                Err(e) => {
168                    // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
169                    // specific reason control gave AND publish it as a typed `Failed` state so
170                    // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
171                    // of the opaque `Internal(Actor)` the caller would otherwise see once the
172                    // stopped actor is next asked. Publishing before `return Err` is why the state
173                    // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
174                    let reason = crate::RegistrationError::from(&e);
175                    tracing::error!(error = %e, "registration failed; control runner stopping");
176                    params
177                        .state_tx
178                        .send_replace(crate::DeviceState::Failed(reason));
179                    return Err(e.into());
180                }
181            }
182        }
183        // check_auth succeeded, but the node is not "up" until the netmap stream is actually
184        // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
185        // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
186        // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
187        // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
188        // The control client's live map-poll loop publishes a mid-session re-auth URL here (set when
189        // a re-register returns `MachineNotAuthorized` because the node key expired/was revoked). The
190        // runtime owns the receiver; `connect` takes the sender. Created before `connect` so the
191        // sender is in place for the very first poll, and so the receiver outlives `bring_up`.
192        let (auth_url_tx, auth_url_rx) = watch::channel::<Option<url::Url>>(None);
193
194        let bring_up = async {
195            let (client, stream) = AsyncControlClient::connect(
196                &params.config,
197                &params.env.keys,
198                params.auth_key.as_deref(),
199                auth_url_tx,
200            )
201            .await?;
202
203            DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
204
205            params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
206            params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
207            slf.attach_stream(stream.boxed(), (), ());
208            Ok::<_, ControlRunnerError>(client)
209        };
210
211        let client = match bring_up.await {
212            Ok(client) => client,
213            Err(e) => {
214                tracing::error!(error = %e, "bringing up the control session failed");
215                // The control session never came up; surface it as a transient registration
216                // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
217                // stuck at `Connecting`.
218                params.state_tx.send_replace(crate::DeviceState::Failed(
219                    crate::RegistrationError::NetworkUnreachable,
220                ));
221                return Err(e);
222            }
223        };
224
225        // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
226        // current (and flips to `Expired` if the self-node's key lapses).
227        params.state_tx.send_replace(crate::DeviceState::Running);
228
229        // Bridge the control client's mid-session re-auth URL cell onto the device-state cell: a
230        // `Some(url)` (control returned `MachineNotAuthorized` on a live re-register) becomes
231        // `DeviceState::NeedsLogin(url)` so the IPN bus surfaces `browse_to_url` and the embedder can
232        // prompt the user — the live-session analogue of the initial `check_auth` loop above. The
233        // recovery to `Running` is the netmap self-node handler's job (next good self-node), so this
234        // bridge only forwards `Some`. The task ends when the sender drops (the client's `run` task
235        // ended) and is aborted on actor `Drop`, so it cannot leak past the actor.
236        let reauth_bridge = {
237            let state_tx = params.state_tx.clone();
238            let mut auth_url_rx = auth_url_rx;
239            tokio::spawn(async move {
240                while auth_url_rx.changed().await.is_ok() {
241                    let url = auth_url_rx.borrow_and_update().clone();
242                    bridge_reauth_url_to_state(&state_tx, url.as_ref());
243                }
244            })
245        };
246
247        // Clone the TKA authority publisher before `params` moves into `Self` below. The matching
248        // `Receiver` lives on the peer tracker; this sender is the sole writer (enforce on sync,
249        // clear on disable).
250        let tka_authority = params.tka_authority.clone();
251
252        Ok(Self {
253            client,
254            params,
255            self_node: Default::default(),
256            ssh_policy: Default::default(),
257            tka: Default::default(),
258            tka_synced: None,
259            tka_authority,
260            tka_syncing: false,
261            tka_generation: 0,
262            cert_domains: Default::default(),
263            dns_config: Default::default(),
264            pop_browser_url: Default::default(),
265            netcheck: Default::default(),
266            home_region: None,
267            reauth_bridge,
268        })
269    }
270}
271
272impl ControlRunner {
273    /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
274    /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
275    /// round-trip). The result returns via the [`TkaSynced`] self-message.
276    ///
277    /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
278    /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
279    /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
280    /// `tkaSyncIfNeeded`: a no-op when our head already matches.
281    fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
282        if !tka.is_enabled() {
283            // Lock disabled (or never enabled): clear enforcement by writing `None` to the authority
284            // cell the peer tracker reads — synchronously, so it can never be reordered behind or
285            // dropped before a stale `Some` (the failure a best-effort broadcast had). Always bump the
286            // generation so ANY sync currently in flight is invalidated: without this, a disable that
287            // races an in-flight sync (whose `take()` already cleared `tka_synced`) would be a no-op
288            // here, and the sync's late `Ok(Some)` would silently re-enable a lock control just turned
289            // off (the in-flight window the `tka_synced.is_some()` guard alone misses). Cheap and
290            // idempotent: clearing an already-`None` cell and bumping the generation are harmless.
291            self.tka_generation = self.tka_generation.wrapping_add(1);
292            if self.tka_synced.is_some() {
293                tracing::info!("TKA lock disabled; clearing enforcement (admitting all peers)");
294                self.tka_synced = None;
295            }
296            self.tka_authority.send_replace(None);
297            return;
298        }
299        if self.tka_syncing {
300            return; // a sync is already in flight; the next netmap will re-trigger if still stale
301        }
302        // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
303        // do. A malformed control head is treated as "different" (we'll attempt a sync, which
304        // fail-closes harmlessly).
305        if let Some(synced) = &self.tka_synced
306            && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
307            && synced.authority.head_matches(&control_head)
308        {
309            return;
310        }
311
312        // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
313        // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
314        // `tka_syncing` so we don't spawn a second concurrent sync. Capture the current generation so
315        // `apply_tka_synced` can discard this result if a disable bumped the generation while the sync
316        // was in flight (H1: don't re-enable a lock that was disabled mid-sync).
317        self.tka_syncing = true;
318        let generation = self.tka_generation;
319        let current = self.tka_synced.take();
320        let config = self.params.config.clone();
321        let keys = self.params.env.keys.clone();
322        tokio::spawn(async move {
323            let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
324            // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
325            // not allowed). A send failure just means the actor is gone — nothing to do.
326            if let Err(e) = self_ref.tell(TkaSynced { result, generation }).await {
327                tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
328            }
329        });
330    }
331
332    /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
333    /// state + publish the `Authority` to the peer tracker's enforcement cell (or, on inert/failed
334    /// sync, leave peers unaffected). Always clears the in-flight guard.
335    ///
336    /// `generation` is the value captured when the sync was spawned. If it no longer matches
337    /// `self.tka_generation`, the lock was disabled (or re-synced) while this sync was in flight, so
338    /// the result is discarded — never re-enabling an authority control has since turned off.
339    async fn apply_tka_synced(
340        &mut self,
341        result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
342        generation: u64,
343    ) {
344        self.tka_syncing = false;
345
346        // H1 guard: a disable (or a superseding sync) bumped the generation while this sync ran. Drop
347        // the stale result — `maybe_sync_tka`'s disable branch already cleared enforcement to `None`,
348        // and re-applying this `Some` would re-enforce a lock that is no longer active.
349        if generation != self.tka_generation {
350            tracing::info!(
351                "TKA sync result superseded (lock disabled or re-synced mid-flight); discarding"
352            );
353            return;
354        }
355
356        match result {
357            Ok(Some(synced)) => {
358                tracing::info!(
359                    head = %synced.authority.head().to_base32(),
360                    "TKA sync succeeded; enforcing verified Authority (Go tkaFilterNetmapLocked)"
361                );
362                // Deliver the verified Authority to the peer tracker's enforcement cell. The tracker
363                // reads it on every peer upsert and drops unauthorized peers. `Some(..)` = enforce; a
364                // `None` is written on disable. `watch` is the sole channel (last-write-wins, never
365                // dropped, ordered by this actor) — no bus, no re-publish-for-replay needed.
366                self.tka_authority
367                    .send_replace(Some(synced.authority.clone()));
368
369                // Observability (Go `tkaFilterNetmapLocked`'s self check → `LockedOut` health
370                // warning): verify SELF's own node-key signature against the freshly-synced
371                // Authority and warn if self is NOT authorized. We never FILTER self (self never
372                // enters the peer db, so enforcement can't lock us out of our own netmap), but Go
373                // raises an operator-facing warning here because a self that the lock does not
374                // authorize means this node's key-signature is missing/invalid for the current lock
375                // — it will be unable to prove itself to locked peers. This fork has no health
376                // subsystem, so the signal is a `tracing::warn!` (its observability channel).
377                //
378                // `self_node` is a sticky cell set on every netmap carrying a self-node; if a sync
379                // somehow lands before the first self-node ever arrived it is `None`, so we skip the
380                // advisory this cycle and re-evaluate on the next sync — fine for observability-only.
381                // The `borrow()` ref is scoped to this `if let` and dropped before the `&mut self`
382                // write below.
383                if let Some(self_node) = self.self_node.borrow().as_ref() {
384                    log_self_lockout(self_node, &synced.authority);
385                }
386
387                self.tka_synced = Some(synced);
388            }
389            Ok(None) => {
390                // Control has no lock for us (no genesis / disabled). Clear any authority we were
391                // previously enforcing — symmetric with the disable path — so a transition to
392                // "no lock" stops dropping peers. Not an error.
393                if self.tka_synced.is_some() {
394                    tracing::info!("TKA sync: control reports no lock; clearing enforcement");
395                    self.tka_synced = None;
396                }
397                self.tka_authority.send_replace(None);
398            }
399            Err(e) => {
400                // Transport or verify failure: log and leave the prior authority in place (a failed
401                // sync must not drop enforcement — that would fail OPEN). NEVER errors the netmap.
402                // The next netmap update re-triggers a sync attempt.
403                tracing::warn!(error = %e, "TKA sync failed; keeping prior enforcement state");
404            }
405        }
406    }
407
408    fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
409    where
410        F: FnOnce(&Node) -> R,
411    {
412        let mut sub = self.self_node.subscribe();
413        let mut shutdown = self.params.env.shutdown.clone();
414
415        async move {
416            tokio::select! {
417                _ = shutdown.wait_for(|x| *x) => {
418                    None
419                },
420                node = sub.wait_for(Option::is_some) => {
421                    Some(f(node.ok()?.as_ref()?))
422                },
423            }
424        }
425    }
426}
427
428/// Apply Go's sticky `PopBrowserURL` semantics to the consent-URL `watch` cell.
429///
430/// Control sends `MapResponse.PopBrowserURL` empty on nearly every netmap update, so the cell is
431/// updated ONLY when `incoming` is a non-empty URL that differs from the cell's current value —
432/// Go's `direct.go` guard `u != "" && u != sess.lastPopBrowserURL`. The cell is **never reset to
433/// `None`** by an empty/absent update — the running-node consent URL is sticky for the session.
434/// Updating unconditionally would thrash the cell to `None` on every tick and coalesce the URL away
435/// for a `watch`/bus subscriber.
436///
437/// The dedupe is in-place via [`watch::Sender::send_if_modified`] — the cell's own value is the
438/// "last URL sent" (this sticky path is its only writer), so no separate mirror field is needed and
439/// the watch is woken only on a genuine change (Go's `sess.lastPopBrowserURL` role, for free). This
440/// matches the [`send_if_modified`](watch::Sender::send_if_modified) idiom already used for the
441/// device-state cell in this handler.
442///
443/// Factored out of the netmap-update handler so the (easy-to-regress) sticky logic is unit-testable
444/// against a plain `watch` channel without standing up the actor.
445fn sticky_update_pop_browser_url(
446    cell: &watch::Sender<Option<url::Url>>,
447    incoming: Option<&url::Url>,
448) {
449    if let Some(url) = incoming {
450        cell.send_if_modified(|current| {
451            if current.as_ref() == Some(url) {
452                false
453            } else {
454                *current = Some(url.clone());
455                true
456            }
457        });
458    }
459}
460
461/// Map a mid-session re-auth URL surfaced by the control client onto the device-state cell.
462///
463/// The control client's live map-poll loop publishes an `Option<url::Url>` into a `watch` cell when
464/// a re-register hits `MachineNotAuthorized` (the node key expired/was revoked mid-session — see
465/// [`ts_control::AsyncControlClient::connect`]'s `auth_url_tx`). `ts_control` cannot name
466/// [`DeviceState`] (it must not depend on this crate), so this bridge fn does the translation:
467/// a `Some(url)` sets [`DeviceState::NeedsLogin`]`(url)` so the IPN bus derives `browse_to_url` and
468/// the embedder can prompt the user, exactly like the initial-registration `check_auth` path.
469///
470/// **Only `Some` drives a transition; `None` is ignored here.** The clear back to
471/// [`DeviceState::Running`] is owned by the netmap self-node handler (the next good self-node flips
472/// it — see the `StreamMessage::Next` arm), which is the authoritative "we are up again" signal; an
473/// independent `None`-clear in this bridge could race that and is unnecessary. The
474/// [`send_if_modified`](watch::Sender::send_if_modified) guard fires the watch only on a genuine
475/// state change (it is a no-op when the cell already holds `NeedsLogin(url)` for the same URL), so a
476/// re-auth URL re-surfaced across retries does not thrash the cell — mirroring the device-state
477/// dedupe in the netmap handler.
478///
479/// Factored out so the (regress-prone) map-and-guard is unit-testable against a plain `watch`
480/// channel without standing up the actor (mirrors [`sticky_update_pop_browser_url`]).
481pub(crate) fn bridge_reauth_url_to_state(
482    state_tx: &watch::Sender<crate::DeviceState>,
483    incoming: Option<&url::Url>,
484) {
485    if let Some(url) = incoming {
486        let next = crate::DeviceState::NeedsLogin(url.clone());
487        state_tx.send_if_modified(|current| {
488            if *current == next {
489                false
490            } else {
491                *current = next.clone();
492                true
493            }
494        });
495    }
496}
497
498/// The classification of SELF against the active network lock — the observability analog of Go
499/// `tkaFilterNetmapLocked`'s self check (which raises a `LockedOut` health warning).
500#[derive(Debug, Clone, PartialEq, Eq)]
501enum SelfLockVerdict {
502    /// Self carries no key-signature at all (empty). The common "not signed yet" case: the node
503    /// simply has not been signed for this lock — not locked out, just unsigned.
504    Unsigned,
505    /// Self's key-signature is authorized by the active lock; nothing to warn about.
506    Authorized,
507    /// Self has a key-signature but the lock does NOT authorize it (the message is the verify
508    /// error). The operator-facing `LockedOut` condition: locked peers will reject this node.
509    LockedOut(String),
510}
511
512/// Classify a node key + its key-signature against `authority` (pure: verify-and-classify, no
513/// logging, no I/O). Takes only the two fields it needs — not the whole `Node` — so the decision is
514/// unit-testable without constructing a full `Node` or standing up the actor.
515fn self_lock_verdict(
516    node_key: &ts_keys::NodePublicKey,
517    key_signature: &[u8],
518    authority: &ts_tka::Authority,
519) -> SelfLockVerdict {
520    // Mirror the peer path (`peer_tracker` `tka_snapshot_admits`): treat an empty signature as
521    // "unsigned" rather than the `LockedOut` bucket Go's `NodeKeyAuthorized` would put a nil sig in
522    // (it errors at decode). This is a deliberate, narrow divergence from a literal Go port: it
523    // avoids `warn`-spam on a lock that simply has not signed this node yet, and keeps self and peer
524    // classification consistent.
525    if key_signature.is_empty() {
526        return SelfLockVerdict::Unsigned;
527    }
528    match authority.node_key_authorized(&node_key.to_bytes(), key_signature) {
529        Ok(()) => SelfLockVerdict::Authorized,
530        Err(e) => SelfLockVerdict::LockedOut(e.to_string()),
531    }
532}
533
534/// Emit the self-locked-out observability signal (Go `tkaFilterNetmapLocked`'s self check → a
535/// `LockedOut` health warning): classify SELF against the freshly-synced `authority` and log.
536///
537/// This is **observability, not enforcement** — self never enters the peer db, so the lock can never
538/// filter our own node out of the netmap. But a self the lock does not authorize means this node's
539/// key-signature is absent or invalid for the active lock, so it cannot prove itself to locked peers
540/// (they will drop it); surfacing that lets an operator notice and re-sign. A never-signed node
541/// (empty signature) logs at `info`, distinct from a present-but-invalid signature (`warn`), so the
542/// common unsigned case does not spam a warning. This fork has no health subsystem, so the operator
543/// signal is a `tracing` event (its observability channel).
544fn log_self_lockout(self_node: &Node, authority: &ts_tka::Authority) {
545    match self_lock_verdict(&self_node.node_key, &self_node.key_signature, authority) {
546        SelfLockVerdict::Unsigned => tracing::info!(
547            "TKA: this node has no key-signature for the active lock; it cannot prove itself to \
548             locked peers until control signs it (not locked out, just unsigned)"
549        ),
550        SelfLockVerdict::Authorized => {
551            tracing::debug!("TKA: self node-key is authorized by the active lock")
552        }
553        SelfLockVerdict::LockedOut(error) => tracing::warn!(
554            %error,
555            "TKA self locked out: this node's key-signature is not authorized by the active \
556             network lock; locked peers will reject it until control re-signs this node \
557             (Go LockedOut)"
558        ),
559    }
560}
561
562// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
563// those generated fields carry no doc and can't take attributes, so wrap in a module where
564// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
565// are re-exported so callers keep referencing them at `control_runner::<Name>`.
566pub use msg_impl::*;
567
568#[allow(missing_docs)]
569mod msg_impl {
570    use kameo::{message::Context, reply::DelegatedReply};
571
572    use super::*;
573
574    #[kameo::messages]
575    impl ControlRunner {
576        /// Fetch the IPv4 address for this tailscale device.
577        #[message(ctx)]
578        pub fn ipv4(
579            &self,
580            ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
581        ) -> DelegatedReply<Option<Ipv4Addr>> {
582            let (deleg, replier) = ctx.reply_sender();
583
584            if let Some(replier) = replier {
585                let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
586
587                tokio::spawn(async move {
588                    let ip = fut.await;
589                    replier.send(ip);
590                });
591            }
592
593            deleg
594        }
595
596        /// Fetch the IPv6 address for this tailscale device.
597        #[message(ctx)]
598        pub fn ipv6(
599            &self,
600            ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
601        ) -> DelegatedReply<Option<Ipv6Addr>> {
602            let (deleg, replier) = ctx.reply_sender();
603
604            if let Some(replier) = replier {
605                let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
606
607                tokio::spawn(async move {
608                    let ip = fut.await;
609                    replier.send(ip);
610                });
611            }
612
613            deleg
614        }
615
616        /// Fetch the self node for this tailscale device.
617        #[message(ctx)]
618        pub fn self_node(
619            &self,
620            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
621        ) -> DelegatedReply<Option<Node>> {
622            let (deleg, replier) = ctx.reply_sender();
623
624            if let Some(replier) = replier {
625                let node = self.with_self_node(|node| node.clone());
626
627                tokio::spawn(async move {
628                    let node = node.await;
629                    replier.send(node)
630                });
631            }
632
633            deleg
634        }
635
636        /// Fetch the current Tailscale SSH policy, if control has pushed one.
637        ///
638        /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
639        /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
640        /// for a value: an absent policy is a legitimate, immediate answer.
641        #[message]
642        pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
643            self.ssh_policy.borrow().clone()
644        }
645
646        /// Fetch the current Tailnet Lock status, if control has pushed one.
647        ///
648        /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
649        #[message]
650        pub fn current_tka_status(&self) -> Option<TkaStatus> {
651            self.tka.borrow().clone()
652        }
653
654        /// Sign `node_key` directly with this node's network-lock key and submit the signature to
655        /// control (Go `tka.sign` for the Direct case → `tkaSubmitSignature`).
656        ///
657        /// Builds a `Direct` [`NodeKeySignature`](ts_tka::NodeKeySignature) via
658        /// [`sign_direct`](ts_tka::NodeKeySignature::sign_direct) over this node's inner ed25519
659        /// network-lock signing key, serializes it (raw CBOR), and POSTs it to `/machine/tka/sign`.
660        /// Mirrors `set_dns`/`get_certificate`: clones the control config + node keys into a spawned
661        /// task (delegated reply, so the round-trip doesn't block the mailbox) over a fresh Noise
662        /// channel.
663        ///
664        /// **Posture: this only *submits* a signature to control — it does NOT mutate the local
665        /// [`Authority`](ts_tka::Authority).** The local trusted-key state advances solely through the
666        /// existing verified-sync path (`sync_tka` → `VerifiedAumChain::verify`); a `tka_sign` success
667        /// is acknowledged to the caller, and the resulting AUM is picked up on the next netmap-driven
668        /// sync. Verify-and-log is unchanged.
669        #[message(ctx)]
670        pub fn tka_sign(
671            &self,
672            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
673            node_key: [u8; 32],
674        ) -> DelegatedReply<Result<(), TkaSyncError>> {
675            let (deleg, replier) = ctx.reply_sender();
676
677            if let Some(replier) = replier {
678                let config = self.params.config.clone();
679                let keys = self.params.env.keys.clone();
680                tokio::spawn(async move {
681                    // Sign the node key with our network-lock key, then submit the raw-CBOR NKS.
682                    let nks = ts_tka::NodeKeySignature::sign_direct(
683                        &node_key,
684                        &keys.network_lock_keys.private.signing_key(),
685                    );
686                    let req = ts_control::TkaSubmitSignatureRequest {
687                        // node_key + version are stamped by the RPC client from `keys`.
688                        version: Default::default(),
689                        node_key: keys.node_keys.public,
690                        signature: nks.serialize(),
691                    };
692                    let result = tka_submit_signature(
693                        &config.server_url,
694                        &keys,
695                        req,
696                        config.allow_http_key_fetch,
697                    )
698                    .await
699                    .map(|_response| ());
700                    replier.send(result);
701                });
702            }
703
704            deleg
705        }
706
707        /// Disable Tailnet Lock by presenting the disablement secret to control (Go
708        /// `tka.disable` → `/machine/tka/disable`).
709        ///
710        /// Targets the **current** authority head (read from the cached [`TkaStatus`]); the caller
711        /// supplies the `disablement_secret` out of band (it is the operator-held capability that
712        /// authorizes turning the lock off). Mirrors `tka_sign`: clones config + keys into a spawned
713        /// task (delegated reply). Returns [`TkaSyncError::Unsupported`] when there is no known TKA
714        /// head (lock not in use / control hasn't pushed a status), since there is nothing to disable.
715        ///
716        /// **Submit-only, like `tka_sign`:** this POSTs the disablement to control and does NOT mutate
717        /// the local [`Authority`](ts_tka::Authority). Control acts on the disablement; this node
718        /// observes the result through the existing verified-sync path. Verify-and-log unchanged.
719        #[message(ctx)]
720        pub fn tka_disable(
721            &self,
722            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
723            disablement_secret: Vec<u8>,
724        ) -> DelegatedReply<Result<(), TkaSyncError>> {
725            let (deleg, replier) = ctx.reply_sender();
726
727            if let Some(replier) = replier {
728                // Read the current head from the cached status BEFORE the spawn (can't borrow &self
729                // across the await). No head ⇒ no lock to disable ⇒ Unsupported.
730                let head = self.tka.borrow().as_ref().map(|s| s.head.clone());
731                let config = self.params.config.clone();
732                let keys = self.params.env.keys.clone();
733                tokio::spawn(async move {
734                    let result = match head {
735                        Some(head) => {
736                            let req = ts_control::TkaDisableRequest {
737                                // node_key + version are stamped by the RPC client from `keys`.
738                                version: Default::default(),
739                                node_key: keys.node_keys.public,
740                                head,
741                                disablement_secret,
742                            };
743                            tka_disable(&config.server_url, &keys, req, config.allow_http_key_fetch)
744                                .await
745                                .map(|_response| ())
746                        }
747                        None => Err(TkaSyncError::Unsupported),
748                    };
749                    replier.send(result);
750                });
751            }
752
753            deleg
754        }
755
756        /// Initialize Tailnet Lock with this node as the sole initial trusted key, gated by
757        /// `disablement_secret` (Go `LocalClient.NetworkLockInit` — the "lock yourself in" case).
758        ///
759        /// Builds + signs a genesis Checkpoint AUM whose only trusted key is this node's network-lock
760        /// public key (votes 1) and whose single DisablementValue is `disablement_value(secret)`, then
761        /// drives the two-phase init: `tka/init/begin` (submit the genesis) → if control needs no
762        /// further node signatures (`NeedSignatures` empty, the case when this node is the only key) →
763        /// `tka/init/finish` carrying the raw `disablement_secret` as `SupportDisablement`. Mirrors
764        /// `tka_sign`/`tka_disable`: cloned config + keys into a spawned task (delegated reply).
765        ///
766        /// If control returns a non-empty `NeedSignatures` (other nodes must be re-signed under the new
767        /// lock — a multi-node tailnet), this returns [`TkaSyncError::Unsupported`]: re-signing each
768        /// listed node (incl. the Rotation-key case) is a larger flow deferred to a fuller
769        /// `tka_init(keys, secrets)` — the single-node lock-init is the shipped subset.
770        ///
771        /// **Submit-only**, like `tka_sign`/`tka_disable`: this creates the lock at control and does
772        /// NOT seed the local [`Authority`](ts_tka::Authority) — the node picks up the new lock through
773        /// the existing verified netmap-sync (control pushes a `TKAInfo`, `maybe_sync_tka` bootstraps
774        /// the genesis through `VerifiedAumChain::verify`). Verify-and-log posture unchanged.
775        #[message(ctx)]
776        pub fn tka_init(
777            &self,
778            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
779            disablement_secret: Vec<u8>,
780        ) -> DelegatedReply<Result<(), TkaSyncError>> {
781            let (deleg, replier) = ctx.reply_sender();
782
783            if let Some(replier) = replier {
784                let config = self.params.config.clone();
785                let keys = self.params.env.keys.clone();
786                tokio::spawn(async move {
787                    let result = tka_init_run(&config, &keys, disablement_secret).await;
788                    replier.send(result);
789                });
790            }
791
792            deleg
793        }
794
795        /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
796        ///
797        /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
798        /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
799        /// does not block waiting for a value).
800        #[message]
801        pub fn cert_domains(&self) -> Vec<String> {
802            self.cert_domains.borrow().clone()
803        }
804
805        /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
806        /// control has sent no DNS config yet. An immediate answer (does not block); the facade
807        /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
808        #[message]
809        pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
810            self.dns_config.borrow().clone()
811        }
812
813        /// The interactive-login / consent URL control last asked this node to open
814        /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
815        /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
816        #[message]
817        pub fn pop_browser_url(&self) -> Option<url::Url> {
818            self.pop_browser_url.borrow().clone()
819        }
820
821        /// Subscribe to the interactive-login / consent URL cell (`MapResponse.PopBrowserURL`).
822        ///
823        /// Returns a [`watch::Receiver`] whose value is the latest running-node consent URL, used by
824        /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) to surface `browse_to_url`
825        /// events mid-session. The cell is sticky (updated only on a new non-empty URL, never reset
826        /// to `None` by an empty update — see the field docs), so a subscriber is not thrashed and a
827        /// late subscriber sees the current URL. The initial value is `None` until control sends one.
828        #[message(derive(Clone))]
829        pub fn watch_browser_url(&self) -> watch::Receiver<Option<url::Url>> {
830            self.pop_browser_url.subscribe()
831        }
832
833        /// The latest network-conditions report (preferred DERP region + per-region latencies). An
834        /// immediate answer (does not block); empty before the first DERP-latency measurement. The
835        /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
836        #[message]
837        pub fn netcheck(&self) -> crate::status::NetcheckReport {
838            self.netcheck.borrow().clone()
839        }
840
841        /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
842        ///
843        /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
844        /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
845        /// for the round-trip.
846        #[message(ctx)]
847        pub fn fetch_id_token(
848            &self,
849            ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
850            audience: String,
851        ) -> DelegatedReply<Result<String, IdTokenError>> {
852            let (deleg, replier) = ctx.reply_sender();
853
854            if let Some(replier) = replier {
855                let config = self.params.config.clone();
856                let keys = self.params.env.keys.clone();
857                tokio::spawn(async move {
858                    let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
859                    replier.send(result);
860                });
861            }
862
863            deleg
864        }
865
866        /// Log this node out of the tailnet: deregister it by expiring its current node key.
867        ///
868        /// Mirrors `fetch_id_token`: clones the control config + node keys
869        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
870        /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
871        /// control-plane state change only — it does NOT stop this actor or tear down the datapath
872        /// (the caller follows up with the normal runtime shutdown), and it does not touch the
873        /// on-disk node key, so re-registering with the same key is the re-login path.
874        #[message(ctx)]
875        pub fn logout(
876            &self,
877            ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
878        ) -> DelegatedReply<Result<(), LogoutError>> {
879            let (deleg, replier) = ctx.reply_sender();
880
881            if let Some(replier) = replier {
882                let config = self.params.config.clone();
883                let keys = self.params.env.keys.clone();
884                tokio::spawn(async move {
885                    let result = ts_control::logout(&config, &keys).await;
886                    replier.send(result);
887                });
888            }
889
890            deleg
891        }
892
893        /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
894        /// `LocalClient.SetDNS`).
895        ///
896        /// Mirrors `fetch_id_token`: clones the control config + node keys
897        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
898        /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
899        /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
900        /// match, so the surfaced API takes only `name` + `value`.
901        #[message(ctx)]
902        pub fn set_dns(
903            &self,
904            ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
905            name: String,
906            value: String,
907        ) -> DelegatedReply<Result<(), SetDnsError>> {
908            let (deleg, replier) = ctx.reply_sender();
909
910            if let Some(replier) = replier {
911                let config = self.params.config.clone();
912                let keys = self.params.env.keys.clone();
913                tokio::spawn(async move {
914                    let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
915                    replier.send(result);
916                });
917            }
918
919            deleg
920        }
921    }
922
923    /// The reply type of the [`get_cert_pair`](ControlRunner::get_cert_pair) message: the issued
924    /// `(cert_chain_pem, key_pem)` PEM pair (the `tnet cert` surface) or a [`ts_control::CertError`].
925    /// Aliased so the message's `Context` type stays under clippy's `type_complexity` bar (the
926    /// nested `Result<(String, String), _>` trips it inline).
927    #[cfg(feature = "acme")]
928    pub type CertPairReply = Result<(String, String), ts_control::CertError>;
929
930    // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
931    // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
932    // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
933    // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
934    // block keeps the default build clean.
935    #[cfg(feature = "acme")]
936    #[kameo::messages]
937    impl ControlRunner {
938        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
939        /// client-side ACME DNS-01 engine (`acme` feature).
940        ///
941        /// Mirrors `fetch_id_token`: clones the control config + node keys
942        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
943        /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
944        /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
945        ///
946        /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
947        /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
948        /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
949        /// allows it). Persisting a generated key back into the key file is the embedder's job (no
950        /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
951        /// publish 501s.
952        #[message(ctx)]
953        pub fn get_certificate(
954            &self,
955            ctx: &mut Context<
956                Self,
957                DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
958            >,
959            name: String,
960        ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
961            let (deleg, replier) = ctx.reply_sender();
962
963            if let Some(replier) = replier {
964                let config = self.params.config.clone();
965                let keys = self.params.env.keys.clone();
966                tokio::spawn(async move {
967                    let result = issue_certificate(&config, &keys, &name).await;
968                    replier.send(result);
969                });
970            }
971
972            deleg
973        }
974
975        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` and return the
976        /// **PEM pair** — `(cert_chain_pem, key_pem)` — for writing the on-disk `.crt` + `.key`
977        /// (the daemon's `tnet cert`, Go's `LocalClient.CertPair`). `acme` feature.
978        ///
979        /// Identical issuance to [`get_certificate`](Self::get_certificate) (same client-side ACME
980        /// DNS-01 flow, same set-dns publish, same account-key handling), only the *shape* of the
981        /// result differs: this surfaces the raw chain + leaf-key PEMs instead of the opaque
982        /// [`CertifiedKey`](ts_control::tls::CertifiedKey). The leaf **private key** PEM is the
983        /// second tuple element and is NEVER logged — the spawned task sends it straight back to the
984        /// replier. SaaS-only: against a self-hosted control plane the set-dns publish 501s.
985        #[message(ctx)]
986        pub fn get_cert_pair(
987            &self,
988            ctx: &mut Context<Self, DelegatedReply<CertPairReply>>,
989            name: String,
990        ) -> DelegatedReply<CertPairReply> {
991            let (deleg, replier) = ctx.reply_sender();
992
993            if let Some(replier) = replier {
994                let config = self.params.config.clone();
995                let keys = self.params.env.keys.clone();
996                tokio::spawn(async move {
997                    let result = issue_cert_pair(&config, &keys, &name).await;
998                    replier.send(result);
999                });
1000            }
1001
1002            deleg
1003        }
1004    }
1005}
1006
1007/// The `tka_init` body (the genesis-build + two-phase init/begin→init/finish choreography),
1008/// factored out of the actor handler so it runs in the spawned task. See [`ControlRunner::tka_init`].
1009///
1010/// "Lock yourself in": the genesis trusts only this node's network-lock key (votes 1) and stores one
1011/// DisablementValue = `disablement_value(secret)`. On a non-empty `NeedSignatures` (multi-node
1012/// tailnet needing re-signs) it returns [`TkaSyncError::Unsupported`] — the single-node subset.
1013async fn tka_init_run(
1014    config: &ts_control::Config,
1015    keys: &ts_keys::NodeState,
1016    disablement_secret: Vec<u8>,
1017) -> Result<(), TkaSyncError> {
1018    // Build the genesis: this node's NL public key as the sole trusted key, one disablement value.
1019    let nl_public = keys.network_lock_keys.public.to_bytes().to_vec();
1020    let genesis_key = ts_tka::AumKey {
1021        kind: ts_tka::KeyKind::Ed25519,
1022        votes: 1,
1023        public: nl_public,
1024        meta: Vec::new(),
1025    };
1026    let dvalue = ts_tka::disablement_value(&disablement_secret).to_vec();
1027    let mut genesis = ts_tka::Aum::new_genesis_checkpoint(vec![genesis_key], vec![dvalue])
1028        // A malformed genesis is a local construction bug, not a transient RPC failure — surface it as a
1029        // coarse internal error rather than NetworkError (which would invite a pointless retry).
1030        .map_err(|_| TkaSyncError::Internal(ts_control::TkaSyncInternalErrorKind::SerDe))?;
1031    genesis.sign(&keys.network_lock_keys.private.signing_key());
1032
1033    // Phase 1: submit the genesis. node_key + version are stamped by the RPC client from `keys`.
1034    let begin_req = ts_control::TkaInitBeginRequest {
1035        version: Default::default(),
1036        node_key: keys.node_keys.public,
1037        genesis_aum: genesis.serialize(),
1038    };
1039    let begin_resp = tka_init_begin(
1040        &config.server_url,
1041        keys,
1042        begin_req,
1043        config.allow_http_key_fetch,
1044    )
1045    .await?;
1046
1047    // Single-node case only: control must need no further node signatures. A non-empty
1048    // NeedSignatures means other nodes must be re-signed under the new lock — deferred.
1049    if !begin_resp.need_signatures.is_empty() {
1050        tracing::warn!(
1051            need = begin_resp.need_signatures.len(),
1052            "tka_init: control requires re-signing other nodes; the multi-node init is not yet \
1053             implemented (single-node lock-init only)"
1054        );
1055        return Err(TkaSyncError::Unsupported);
1056    }
1057
1058    // Phase 2: finish, carrying the raw disablement secret as SupportDisablement (Go sends the raw
1059    // secret here; only the genesis stores its Argon2i hash).
1060    let finish_req = ts_control::TkaInitFinishRequest {
1061        version: Default::default(),
1062        node_key: keys.node_keys.public,
1063        signatures: std::collections::BTreeMap::new(),
1064        support_disablement: disablement_secret,
1065    };
1066    tka_init_finish(
1067        &config.server_url,
1068        keys,
1069        finish_req,
1070        config.allow_http_key_fetch,
1071    )
1072    .await
1073    .map(|_response| ())
1074}
1075
1076/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
1077/// returning just the ready-to-serve [`CertifiedKey`](ts_control::tls::CertifiedKey) (the
1078/// `get_certificate` / `ListenTLS` path).
1079///
1080/// Thin wrapper over [`issue_cert_pair`] that drops the PEMs — one issuance, this caller just
1081/// doesn't need the on-disk pair. See [`issue_cert_pair`] for the account-key handling.
1082#[cfg(feature = "acme")]
1083async fn issue_certificate(
1084    config: &ts_control::Config,
1085    keys: &ts_keys::NodeState,
1086    name: &str,
1087) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
1088    issue_cert_pair_inner(config, keys, name)
1089        .await
1090        .map(|issued| issued.certified)
1091}
1092
1093/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
1094/// returning the **PEM pair** `(cert_chain_pem, key_pem)` for the daemon's on-disk `.crt`/`.key`
1095/// (`tnet cert`, Go `LocalClient.CertPair`).
1096///
1097/// Same single issuance as [`issue_certificate`]; only the result shape differs. The leaf
1098/// **private key** PEM is the second element and is NEVER logged here.
1099#[cfg(feature = "acme")]
1100async fn issue_cert_pair(
1101    config: &ts_control::Config,
1102    keys: &ts_keys::NodeState,
1103    name: &str,
1104) -> Result<(String, String), ts_control::CertError> {
1105    issue_cert_pair_inner(config, keys, name)
1106        .await
1107        .map(|issued| (issued.cert_chain_pem, issued.key_pem))
1108}
1109
1110/// Shared issuance core for [`issue_certificate`] and [`issue_cert_pair`]: load (or generate) the
1111/// ACME account key, target Let's Encrypt production, and run one DNS-01 issuance, returning the
1112/// full [`IssuedCert`](ts_control::acme::IssuedCert) so each caller projects out what it needs (one
1113/// ACME order, two consumers).
1114///
1115/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
1116/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
1117/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
1118/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]). Never logs the leaf
1119/// private key.
1120#[cfg(feature = "acme")]
1121async fn issue_cert_pair_inner(
1122    config: &ts_control::Config,
1123    keys: &ts_keys::NodeState,
1124    name: &str,
1125) -> Result<ts_control::acme::IssuedCert, ts_control::CertError> {
1126    let account_key = match keys.acme_account_key.as_deref() {
1127        Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
1128        None => {
1129            tracing::debug!(
1130                "no persisted ACME account key in key state; generating an ephemeral per-call key \
1131                 (a new ACME account this issuance — not persisted back)"
1132            );
1133            ts_control::acme::AcmeAccountKey::generate()?.0
1134        }
1135    };
1136    let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
1137        .parse()
1138        .map_err(|e| {
1139            ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
1140        })?;
1141    ts_control::issue_cert_pair_via_setdns(config, keys, name, &account_key, &directory).await
1142}
1143
1144impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
1145    type Reply = ();
1146
1147    async fn handle(
1148        &mut self,
1149        msg: StreamMessage<Arc<StateUpdate>, (), ()>,
1150        ctx: &mut Context<Self, Self::Reply>,
1151    ) {
1152        match msg {
1153            StreamMessage::Started(_) => {
1154                tracing::trace!("started listening to state updates");
1155            }
1156
1157            StreamMessage::Next(msg) => {
1158                if let Some(node) = msg.node.as_ref() {
1159                    // Reflect node-key expiry into the device state: control delivering a self-node
1160                    // whose key is in the past means the node must re-authenticate. Otherwise the
1161                    // arrival of a fresh self-node confirms we are Running (recovers the state if a
1162                    // prior update had flipped it to Expired).
1163                    let now_unix = std::time::SystemTime::now()
1164                        .duration_since(std::time::UNIX_EPOCH)
1165                        .map(|d| d.as_secs() as i64)
1166                        .unwrap_or(0);
1167                    let next = if node.key_expired_at_unix(now_unix) {
1168                        crate::DeviceState::Expired
1169                    } else {
1170                        crate::DeviceState::Running
1171                    };
1172                    // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
1173                    // self-node arrives on every netmap update).
1174                    self.params.state_tx.send_if_modified(|s| {
1175                        if *s != next {
1176                            *s = next.clone();
1177                            true
1178                        } else {
1179                            false
1180                        }
1181                    });
1182
1183                    self.self_node.send_replace(Some(node.clone()));
1184                }
1185
1186                if let Some(policy) = msg.ssh_policy.as_ref() {
1187                    self.ssh_policy.send_replace(Some(policy.clone()));
1188                }
1189
1190                if let Some(tka) = msg.tka.as_ref() {
1191                    self.tka.send_replace(Some(tka.clone()));
1192                    self.maybe_sync_tka(tka, ctx.actor_ref().clone());
1193                }
1194
1195                // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
1196                // An update with no DNS config, or one carrying no cert domains, means "none" — Go
1197                // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
1198                let cert_domains = msg
1199                    .dns_config
1200                    .as_ref()
1201                    .map(|d| d.cert_domains.clone())
1202                    .unwrap_or_default();
1203                self.cert_domains.send_replace(cert_domains);
1204
1205                // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
1206                // `None` when control sent no DNS config on this update — distinct from a present but
1207                // empty config (Go `netmap.NetworkMap.DNS`).
1208                self.dns_config.send_replace(msg.dns_config.clone());
1209
1210                // Track the interactive-login URL for `Device::pop_browser_url` /
1211                // `Runtime::watch_ipn_bus`. See `sticky_update_pop_browser_url` for the Go-faithful
1212                // sticky semantics (update only on a new non-empty URL; never reset to `None`).
1213                sticky_update_pop_browser_url(&self.pop_browser_url, msg.pop_browser_url.as_ref());
1214
1215                if let Err(e) = self.params.env.publish(msg).await {
1216                    tracing::error!(error = %e, "publishing netmap update");
1217                }
1218            }
1219
1220            StreamMessage::Finished(_) => {
1221                tracing::error!("state update stream terminated")
1222            }
1223        }
1224    }
1225}
1226
1227/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
1228/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
1229/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
1230/// [`ControlRunner::apply_tka_synced`](ControlRunner).
1231#[doc(hidden)]
1232pub struct TkaSynced {
1233    pub(crate) result:
1234        Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
1235    /// The [`ControlRunner::tka_generation`] captured when this sync was spawned; the handler
1236    /// discards the result if it no longer matches (the lock was disabled/re-synced mid-flight).
1237    pub(crate) generation: u64,
1238}
1239
1240impl Message<TkaSynced> for ControlRunner {
1241    type Reply = ();
1242
1243    async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
1244        self.apply_tka_synced(msg.result, msg.generation).await;
1245    }
1246}
1247
1248impl Message<DerpLatencyMeasurement> for ControlRunner {
1249    type Reply = ();
1250
1251    async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
1252        let measurements = msg.measurement.as_ref().clone();
1253
1254        // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
1255        // the same measurements, before the home-region short-circuit below — an empty set still
1256        // yields a (default/empty) report rather than a stale one.
1257        self.netcheck
1258            .send_replace(crate::status::NetcheckReport::from_region_results(
1259                &measurements,
1260            ));
1261
1262        if measurements.is_empty() {
1263            tracing::debug!("derp latency measurements empty");
1264            return;
1265        };
1266
1267        // Apply selection hysteresis (the pure decision lives in `select_home_region` for testability)
1268        // so jitter between near-equal regions does not flap the home relay. Copy the chosen id +
1269        // latency out of the borrowed result so nothing borrows `measurements` across the `.await`.
1270        let (selected_id, selected_latency) = {
1271            let selected = select_home_region(self.home_region.map(|(id, _)| id), &measurements)
1272                .expect("non-empty measurements always yield a selection");
1273            (selected.id, selected.latency)
1274        };
1275
1276        let iter = measurements.iter().map(|result| {
1277            (
1278                result.latency_map_key.as_str(),
1279                result.latency.as_secs_f64(),
1280            )
1281        });
1282
1283        if self.home_region.map(|(id, _)| id) != Some(selected_id) {
1284            tracing::debug!(selected_region_id = ?selected_id, "updating home region");
1285        }
1286        self.home_region = Some((selected_id, selected_latency));
1287        self.client.set_home_region(selected_id, iter).await;
1288    }
1289}
1290
1291/// Choose the DERP home region from `measurements` (expected sorted by latency ascending, so
1292/// `measurements[0]` is the lowest-latency "best"), applying Go's selection hysteresis
1293/// (`netcheck.addReportHistoryAndSetPreferredDERP`). Pure so the decision is unit-testable.
1294///
1295/// Keeps the `current` home region (when it is still present in `measurements`) unless the new best
1296/// is *meaningfully* lower-latency — switching only when BOTH: the current region's fresh latency
1297/// exceeds the best by at least `PREFERRED_DERP_ABSOLUTE_DIFF` (10ms), AND the best is at most
1298/// two-thirds of the current region's latency (a >~33% improvement). This avoids flapping the home
1299/// relay between regions whose latencies jitter within ~10ms. On the first selection (`current` is
1300/// `None`), when the best already IS the current region, or when the current region dropped out of
1301/// the measurements, returns the best directly. `None` only if `measurements` is empty.
1302fn select_home_region(
1303    current: Option<ts_derp::RegionId>,
1304    measurements: &[ts_netcheck::RegionResult],
1305) -> Option<&ts_netcheck::RegionResult> {
1306    /// Go `netcheck.preferredDERPAbsoluteDiff`.
1307    const PREFERRED_DERP_ABSOLUTE_DIFF: core::time::Duration =
1308        core::time::Duration::from_millis(10);
1309
1310    let best = measurements.first()?;
1311
1312    let Some(old_id) = current.filter(|id| *id != best.id) else {
1313        // First selection, or the best already is the current home region.
1314        return Some(best);
1315    };
1316
1317    // Compare against the current region's FRESH latency (not a stale one), if it is still present.
1318    match measurements.iter().find(|m| m.id == old_id) {
1319        Some(old) => {
1320            let keep_old = old.latency.saturating_sub(best.latency) < PREFERRED_DERP_ABSOLUTE_DIFF
1321                || best.latency.as_secs_f64() > old.latency.as_secs_f64() * 2.0 / 3.0;
1322            Some(if keep_old { old } else { best })
1323        }
1324        // The current region is no longer reachable this cycle: take the new best.
1325        None => Some(best),
1326    }
1327}
1328
1329impl Message<EndpointAdvertisement> for ControlRunner {
1330    type Reply = ();
1331
1332    async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
1333        let endpoints: Vec<Endpoint> = msg
1334            .endpoints
1335            .iter()
1336            .map(|ep| Endpoint {
1337                endpoint: ep.addr,
1338                ty: match ep.ty {
1339                    SelfEndpointType::Local => EndpointType::Local,
1340                    SelfEndpointType::Stun => EndpointType::Stun,
1341                    SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
1342                },
1343            })
1344            .collect();
1345
1346        tracing::debug!(
1347            n_endpoints = endpoints.len(),
1348            "advertising endpoints to control"
1349        );
1350
1351        self.client.set_endpoints(endpoints).await;
1352    }
1353}
1354
1355/// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control — the wire
1356/// half of a runtime [`Runtime::set_advertise_routes`](crate::Runtime::set_advertise_routes). Sent
1357/// as a direct `ask` from the runtime (not over the bus), so the route change reaches the live
1358/// map-poll client. `routes` is the final advertised set the caller wants control to grant.
1359#[derive(Debug)]
1360pub struct SetAdvertiseRoutes {
1361    /// The prefixes to advertise to control (already filtered to the final set).
1362    pub routes: Vec<ipnet::IpNet>,
1363}
1364
1365impl Message<SetAdvertiseRoutes> for ControlRunner {
1366    type Reply = ();
1367
1368    async fn handle(&mut self, msg: SetAdvertiseRoutes, _ctx: &mut Context<Self, Self::Reply>) {
1369        tracing::debug!(n_routes = msg.routes.len(), "advertising routes to control");
1370        self.client.set_routable_ips(msg.routes).await;
1371    }
1372}
1373
1374/// Update this node's `Hostinfo.Hostname` at control — the wire half of a runtime
1375/// [`Runtime::set_hostname`](crate::Runtime::set_hostname). A direct `ask` from the runtime, so the
1376/// change reaches the live map-poll client.
1377#[derive(Debug)]
1378pub struct SetHostname {
1379    /// The new hostname to report to control.
1380    pub hostname: String,
1381}
1382
1383impl Message<SetHostname> for ControlRunner {
1384    type Reply = ();
1385
1386    async fn handle(&mut self, msg: SetHostname, _ctx: &mut Context<Self, Self::Reply>) {
1387        tracing::debug!("updating hostname at control");
1388        self.client.set_hostname(msg.hostname).await;
1389    }
1390}
1391
1392#[cfg(test)]
1393mod reauth_bridge_tests {
1394    use tokio::sync::watch;
1395
1396    use super::bridge_reauth_url_to_state;
1397    use crate::DeviceState;
1398
1399    fn url(s: &str) -> url::Url {
1400        s.parse().unwrap()
1401    }
1402
1403    /// The bridge maps a surfaced re-auth URL onto `DeviceState::NeedsLogin(url)` — the fix's core:
1404    /// a mid-session `MachineNotAuthorized` (forwarded by the control client as `Some(url)`) becomes
1405    /// the "needs login" state the IPN bus turns into `browse_to_url`.
1406    #[test]
1407    fn bridge_maps_auth_url_to_needs_login() {
1408        let u = url("https://login.example/auth");
1409        let (tx, rx) = watch::channel(DeviceState::Running);
1410
1411        bridge_reauth_url_to_state(&tx, Some(&u));
1412
1413        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1414    }
1415
1416    /// `None` never drives a transition — the recovery to `Running` is the netmap self-node
1417    /// handler's job, so the bridge ignores a `None` and leaves the state untouched.
1418    #[test]
1419    fn bridge_none_leaves_state_unchanged() {
1420        let (tx, rx) = watch::channel(DeviceState::Running);
1421
1422        bridge_reauth_url_to_state(&tx, None);
1423
1424        assert_eq!(*rx.borrow(), DeviceState::Running);
1425    }
1426
1427    /// Re-surfacing the same URL across retries does not re-fire the watch (`send_if_modified`
1428    /// dedupe against the cell's current value), so a stuck re-auth does not thrash subscribers.
1429    #[test]
1430    fn bridge_same_url_does_not_refire() {
1431        let u = url("https://login.example/auth");
1432        let (tx, mut rx) = watch::channel(DeviceState::Running);
1433
1434        bridge_reauth_url_to_state(&tx, Some(&u)); // first: fires
1435        assert!(rx.has_changed().unwrap(), "first NeedsLogin fires");
1436        rx.mark_unchanged();
1437        bridge_reauth_url_to_state(&tx, Some(&u)); // same URL: deduped
1438        assert!(
1439            !rx.has_changed().unwrap(),
1440            "the same re-auth URL must not re-fire the state watch"
1441        );
1442    }
1443
1444    /// A genuinely different re-auth URL after a prior one fires again (the dedupe tracks changes,
1445    /// it does not pin the first URL forever).
1446    #[test]
1447    fn bridge_new_url_after_prior_fires() {
1448        let a = url("https://login.example/a");
1449        let b = url("https://login.example/b");
1450        let (tx, rx) = watch::channel(DeviceState::Running);
1451
1452        bridge_reauth_url_to_state(&tx, Some(&a));
1453        bridge_reauth_url_to_state(&tx, Some(&b));
1454
1455        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(b));
1456    }
1457
1458    /// End-to-end of the *clear* contract: after the bridge sets `NeedsLogin`, the netmap self-node
1459    /// path (modeled here as a direct `send_replace(Running)`, the exact transition the
1460    /// `StreamMessage::Next` handler performs on the next good self-node) flips back to `Running`.
1461    /// This pins that the bridge does NOT need a `None`-clear arm — recovery is owned elsewhere.
1462    #[test]
1463    fn running_netmap_clears_needs_login() {
1464        let u = url("https://login.example/auth");
1465        let (tx, rx) = watch::channel(DeviceState::Running);
1466
1467        bridge_reauth_url_to_state(&tx, Some(&u));
1468        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1469
1470        // The self-node handler's recovery transition (next good netmap self-node → Running).
1471        tx.send_replace(DeviceState::Running);
1472        assert_eq!(*rx.borrow(), DeviceState::Running);
1473    }
1474}
1475
1476#[cfg(test)]
1477mod sticky_pop_browser_url_tests {
1478    use tokio::sync::watch;
1479
1480    use super::sticky_update_pop_browser_url;
1481
1482    fn url(s: &str) -> url::Url {
1483        s.parse().unwrap()
1484    }
1485
1486    /// A non-empty URL publishes to the cell.
1487    #[test]
1488    fn non_empty_url_publishes() {
1489        let (tx, rx) = watch::channel(None);
1490        let u = url("https://login.example/consent");
1491        sticky_update_pop_browser_url(&tx, Some(&u));
1492        assert_eq!(*rx.borrow(), Some(u));
1493    }
1494
1495    /// An absent (`None`) update — the common netmap tick — must NOT reset the cell. This is the
1496    /// regression guard for the thrash bug (a reset-every-tick would coalesce the URL away on the bus).
1497    #[test]
1498    fn absent_update_does_not_reset() {
1499        let u = url("https://login.example/consent");
1500        let (tx, rx) = watch::channel(Some(u.clone()));
1501        // Simulate many empty netmap updates.
1502        for _ in 0..5 {
1503            sticky_update_pop_browser_url(&tx, None);
1504        }
1505        assert_eq!(
1506            *rx.borrow(),
1507            Some(u),
1508            "empty updates must not clear the URL"
1509        );
1510    }
1511
1512    /// The same URL repeated does not re-fire the watch (in-place dedupe via `send_if_modified`), so
1513    /// a subscriber isn't woken spuriously. Proven by the borrow not having been marked changed.
1514    #[test]
1515    fn repeated_same_url_does_not_refire() {
1516        let u = url("https://login.example/consent");
1517        let (tx, mut rx) = watch::channel(None);
1518        sticky_update_pop_browser_url(&tx, Some(&u)); // first: fires
1519        assert!(rx.has_changed().unwrap(), "first non-empty URL fires");
1520        rx.mark_unchanged();
1521        sticky_update_pop_browser_url(&tx, Some(&u)); // same: deduped
1522        assert!(
1523            !rx.has_changed().unwrap(),
1524            "repeating the same URL must not re-fire the watch"
1525        );
1526    }
1527
1528    /// A genuinely new URL after a prior one fires again (sticky but tracks changes).
1529    #[test]
1530    fn new_url_after_prior_fires() {
1531        let a = url("https://login.example/a");
1532        let b = url("https://login.example/b");
1533        let (tx, rx) = watch::channel(None);
1534        sticky_update_pop_browser_url(&tx, Some(&a));
1535        sticky_update_pop_browser_url(&tx, Some(&b));
1536        assert_eq!(*rx.borrow(), Some(b));
1537    }
1538
1539    /// The realistic session sequence: a URL stays sticky through a run of `None` ticks, and a
1540    /// *different* URL after that gap still fires. Chains the legs the other tests cover in isolation
1541    /// (the actual control cadence is "URL, then many empty updates, then maybe a new URL").
1542    #[test]
1543    fn sticky_through_none_gap_then_new_url_fires() {
1544        let a = url("https://login.example/a");
1545        let b = url("https://login.example/b");
1546        let (tx, rx) = watch::channel(None);
1547        sticky_update_pop_browser_url(&tx, Some(&a));
1548        for _ in 0..3 {
1549            sticky_update_pop_browser_url(&tx, None);
1550        }
1551        assert_eq!(*rx.borrow(), Some(a), "stayed sticky through the None gap");
1552        sticky_update_pop_browser_url(&tx, Some(&b));
1553        assert_eq!(
1554            *rx.borrow(),
1555            Some(b),
1556            "a new URL after a None gap still fires"
1557        );
1558    }
1559
1560    /// Returning to a previously-seen URL (A → B → A) re-fires: the dedupe is against the cell's
1561    /// *current* value, not a full history, so A after B is a genuine change.
1562    #[test]
1563    fn returning_to_prior_url_refires() {
1564        let a = url("https://login.example/a");
1565        let b = url("https://login.example/b");
1566        let (tx, mut rx) = watch::channel(None);
1567        sticky_update_pop_browser_url(&tx, Some(&a));
1568        sticky_update_pop_browser_url(&tx, Some(&b));
1569        rx.mark_unchanged();
1570        sticky_update_pop_browser_url(&tx, Some(&a)); // back to A: differs from current (B) → fires
1571        assert!(
1572            rx.has_changed().unwrap(),
1573            "returning to a prior URL re-fires"
1574        );
1575        assert_eq!(*rx.borrow(), Some(a));
1576    }
1577
1578    /// End-to-end de-thrash: feed a realistic netmap cadence (empty, empty, URL, empty, empty)
1579    /// through the producer into a cell, and count the changes a `run_bus`-style subscriber would
1580    /// observe via `changed()`. The whole point of the fix is that exactly ONE change survives the
1581    /// surrounding `None` thrash — the pre-fix code (`send_replace` every tick) would have woken the
1582    /// subscriber on every empty tick and coalesced the URL away. This exercises the producer + the
1583    /// watch-subscribe path together (the two halves the unit tests cover in isolation).
1584    #[tokio::test]
1585    async fn end_to_end_one_change_survives_none_thrash() {
1586        let u = url("https://login.example/consent");
1587        let (tx, mut rx) = watch::channel(None);
1588        // The cadence control actually sends: mostly-empty MapResponses with one carrying the URL.
1589        let cadence = [None, None, Some(&u), None, None];
1590        for incoming in cadence {
1591            sticky_update_pop_browser_url(&tx, incoming);
1592        }
1593        // A subscriber sees exactly one change, and it carries the URL (not a coalesced `None`).
1594        let mut changes = 0;
1595        while rx.has_changed().unwrap() {
1596            let v = rx.borrow_and_update().clone();
1597            changes += 1;
1598            assert_eq!(v, Some(u.clone()), "the surviving change carries the URL");
1599        }
1600        assert_eq!(changes, 1, "exactly one change survives the None thrash");
1601    }
1602}
1603
1604#[cfg(test)]
1605mod home_region_hysteresis_tests {
1606    use core::time::Duration;
1607
1608    use ts_derp::RegionId;
1609    use ts_netcheck::RegionResult;
1610
1611    use super::select_home_region;
1612
1613    fn region(id: u32, latency_ms: u64) -> RegionResult {
1614        RegionResult {
1615            latency: Duration::from_millis(latency_ms),
1616            id: RegionId(core::num::NonZeroU32::new(id).unwrap()),
1617            latency_map_key: format!("region-{id}"),
1618            connected_remote: "127.0.0.1:0".parse().unwrap(),
1619        }
1620    }
1621
1622    fn rid(id: u32) -> RegionId {
1623        RegionId(core::num::NonZeroU32::new(id).unwrap())
1624    }
1625
1626    /// Empty measurements yield no selection.
1627    #[test]
1628    fn empty_measurements_select_none() {
1629        assert!(select_home_region(Some(rid(1)), &[]).is_none());
1630        assert!(select_home_region(None, &[]).is_none());
1631    }
1632
1633    /// First selection (no current home region) takes the best (lowest-latency) region directly.
1634    #[test]
1635    fn first_selection_takes_best() {
1636        let m = [region(1, 20), region(2, 50)];
1637        assert_eq!(select_home_region(None, &m).unwrap().id, rid(1));
1638    }
1639
1640    /// Jitter within the 10ms absolute-diff band keeps the current region (no flap). Current=region 2
1641    /// at 25ms; new best=region 1 at 20ms (only 5ms better) -> keep region 2.
1642    #[test]
1643    fn keeps_current_when_within_absolute_diff() {
1644        let m = [region(1, 20), region(2, 25)];
1645        let sel = select_home_region(Some(rid(2)), &m).unwrap();
1646        assert_eq!(
1647            sel.id,
1648            rid(2),
1649            "a 5ms improvement (< 10ms) must not flap the home region"
1650        );
1651    }
1652
1653    /// A meaningful improvement (>10ms AND best <= 2/3 of current) switches. Current=region 2 at
1654    /// 100ms; new best=region 1 at 20ms -> switch to region 1.
1655    #[test]
1656    fn switches_on_meaningful_improvement() {
1657        let m = [region(1, 20), region(2, 100)];
1658        assert_eq!(
1659            select_home_region(Some(rid(2)), &m).unwrap().id,
1660            rid(1),
1661            "a large improvement must switch the home region"
1662        );
1663    }
1664
1665    /// The two-thirds rule: even past the 10ms absolute diff, an improvement that does not beat 2/3
1666    /// of the current latency keeps the current region. Current=region 2 at 30ms; best=region 1 at
1667    /// 21ms: diff is 9ms (< 10ms keeps anyway) — use 30 vs 21 where diff=9ms. To isolate the 2/3 rule,
1668    /// use current=60ms, best=45ms: diff=15ms (>10ms, so the absolute test alone would switch), but
1669    /// 45 > 60*2/3=40, so keep.
1670    #[test]
1671    fn keeps_current_when_two_thirds_rule_not_met() {
1672        let m = [region(1, 45), region(2, 60)];
1673        let sel = select_home_region(Some(rid(2)), &m).unwrap();
1674        assert_eq!(
1675            sel.id,
1676            rid(2),
1677            "best (45ms) is not <= 2/3 of current (40ms), so keep current despite >10ms diff"
1678        );
1679    }
1680
1681    /// When the current home region is no longer present in the measurements, take the new best.
1682    #[test]
1683    fn switches_when_current_region_absent() {
1684        let m = [region(1, 20), region(3, 25)];
1685        assert_eq!(
1686            select_home_region(Some(rid(2)), &m).unwrap().id,
1687            rid(1),
1688            "a current region absent from the measurements falls through to the best"
1689        );
1690    }
1691
1692    /// When the best already IS the current home region, it is kept (no spurious change).
1693    #[test]
1694    fn keeps_current_when_it_is_already_best() {
1695        let m = [region(2, 20), region(1, 50)];
1696        assert_eq!(select_home_region(Some(rid(2)), &m).unwrap().id, rid(2));
1697    }
1698}
1699
1700#[cfg(test)]
1701mod self_lockout_tests {
1702    use ts_tka::{AumHash, Authority, State};
1703
1704    use super::{SelfLockVerdict, self_lock_verdict};
1705
1706    fn node_key() -> ts_keys::NodePublicKey {
1707        ts_keys::NodePrivateKey::random().public_key()
1708    }
1709
1710    /// An empty key-signature is the "not signed yet" case: `Unsigned`, never a lockout warning —
1711    /// so a tailnet that simply has not signed this node does not spam a `warn`.
1712    #[test]
1713    fn empty_signature_is_unsigned_not_locked_out() {
1714        let authority = Authority::from_state(AumHash([0; 32]), State::default());
1715        assert_eq!(
1716            self_lock_verdict(&node_key(), &[], &authority),
1717            SelfLockVerdict::Unsigned
1718        );
1719    }
1720
1721    /// A non-empty key-signature that does not authorize self classifies as `LockedOut` — the
1722    /// operator-facing condition — and the verdict carries the verify error string for the log. Here
1723    /// the blob is non-empty (so we attempt verification rather than short-circuiting to `Unsigned`)
1724    /// but is not a valid NodeKeySignature CBOR (`0x01` decodes as a bare uint with trailing bytes),
1725    /// so `node_key_authorized` returns a `Decode` error → `LockedOut`. The cryptographic-rejection
1726    /// arms (`UntrustedKey` / `BadSignature` for a well-formed-but-untrusted NKS) are covered by
1727    /// `ts_tka`'s own `node_key_authorized` tests; this only needs to prove the runtime classifier
1728    /// routes a verify `Err` to `LockedOut`.
1729    #[test]
1730    fn unverifiable_signature_is_locked_out() {
1731        let authority = Authority::from_state(AumHash([0; 32]), State::default());
1732        let verdict = self_lock_verdict(&node_key(), &[0x01, 0x02, 0x03], &authority);
1733        assert!(
1734            matches!(verdict, SelfLockVerdict::LockedOut(_)),
1735            "a signature the lock cannot authorize must classify as LockedOut, got {verdict:?}"
1736        );
1737    }
1738}