Skip to main content

ts_runtime/
control_runner.rs

1use core::{
2    net::{Ipv4Addr, Ipv6Addr},
3    time::Duration,
4};
5use std::{collections::HashMap, sync::Arc, time::Instant};
6
7use futures::StreamExt;
8use kameo::{
9    actor::{ActorRef, Spawn},
10    message::{Context, StreamMessage},
11    prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15    AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16    Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus, TkaSyncError, tka_disable,
17    tka_init_begin, tka_init_finish, tka_submit_signature,
18};
19use ts_magicsock::SelfEndpointType;
20
21use crate::{
22    derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
23    direct::EndpointAdvertisement,
24};
25
26/// Actor responsible for maintaining the connection to control.
27///
28/// This actor is responsible for proxying the map response stream onto the message bus.
29pub struct ControlRunner {
30    client: AsyncControlClient,
31    params: Params,
32
33    self_node: watch::Sender<Option<Node>>,
34    /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
35    /// server reads this to authorize incoming connections; absent policy means deny-all.
36    ssh_policy: watch::Sender<Option<SshPolicy>>,
37    /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
38    tka: watch::Sender<Option<TkaStatus>>,
39    /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
40    /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
41    /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
42    /// the result returns via the [`TkaSynced`] self-message).
43    tka_synced: Option<crate::tka_sync::SyncedTka>,
44    /// The verified TKA [`Authority`](ts_tka::Authority) the peer tracker **enforces** (Go
45    /// `tkaFilterNetmapLocked`). `None` until the first successful sync, and reset to `None` when the
46    /// lock is disabled. This is the SOLE delivery channel to the peer tracker (which holds the
47    /// matching `Receiver` and reads it on every peer upsert): a `watch` cell, not a bus message, so
48    /// the latest value is always readable, never dropped under load, and writes are strictly ordered
49    /// by this actor — a disable (`None`) can never be reordered behind or dropped before a stale
50    /// `Some`. Written only from [`apply_tka_synced`] (enable) and [`maybe_sync_tka`] (disable), both
51    /// on the actor thread. The published `Authority` has always passed `VerifiedAumChain::verify`.
52    tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
53    /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
54    /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
55    tka_syncing: bool,
56    /// Monotonic generation stamped when a disable (or a fresh sync) supersedes any in-flight sync.
57    /// `maybe_sync_tka` bumps this on a disable transition and captures it into each spawned sync;
58    /// [`apply_tka_synced`] discards a sync result whose captured generation is stale, so a lock
59    /// disabled *while a sync was in flight* is never re-enabled by that sync's late `Ok(Some)`
60    /// (the in-flight window the `tka_synced.is_some()` disable guard alone does not cover).
61    tka_generation: u64,
62    /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
63    /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
64    cert_domains: watch::Sender<Vec<String>>,
65    /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
66    /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
67    /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
68    /// own cell for the narrower TLS-cert use.
69    dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
70    /// Latest interactive-login / consent URL control asked this node to open
71    /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
72    /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user), and
73    /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) subscribes to it for the bus's
74    /// `browse_to_url` running-node events.
75    ///
76    /// **Sticky, not per-update** (Go `controlclient` `sess.lastPopBrowserURL`): control sends
77    /// `MapResponse.PopBrowserURL` empty on nearly every netmap tick, so this cell is updated ONLY on
78    /// a non-empty URL that differs from its current value (`sticky_update_pop_browser_url`, via
79    /// `send_if_modified` — the cell's own value is the "last URL seen", so no separate mirror is
80    /// needed). It is never reset to `None` by an empty update — matching Go's `direct.go` guard
81    /// `u != "" && u != sess.lastPopBrowserURL`. Updating on every tick would thrash the cell to
82    /// `None` and coalesce the URL away for a `watch` subscriber.
83    pop_browser_url: watch::Sender<Option<url::Url>>,
84    /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
85    /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
86    /// daemon's `tnet netcheck`). Empty until the first measurement.
87    netcheck: watch::Sender<crate::status::NetcheckReport>,
88    /// The DERP home region currently selected, with the latency measured for it at selection time.
89    /// `None` until the first home region is chosen. Used to apply selection **hysteresis** (Go
90    /// `netcheck.addReportHistoryAndSetPreferredDERP`): the home region is only switched when a new
91    /// region is *meaningfully* lower-latency than the current one, so jitter between near-equal
92    /// regions does not flap the home relay (which would cause repeated reconnects + brief loss).
93    home_region: Option<(ts_derp::RegionId, core::time::Duration)>,
94    /// Rolling history of per-cycle DERP-latency reports within the last [`DERP_HISTORY_MAX_AGE`]
95    /// (Go `netcheck` `maxAge = 5 * time.Minute`), each stamped with its arrival `Instant`. Feeds the
96    /// `bestRecent` smoothing (Go `addReportHistoryAndSetPreferredDERP`): the new home candidate is
97    /// chosen by each region's **minimum** latency over this window, not its raw current sample, so a
98    /// best region whose latency oscillates across the switch boundary does not flap the home relay.
99    /// Aged entries are evicted on each measurement; the buffer is therefore bounded by the netcheck
100    /// cadence × the window.
101    derp_report_history: Vec<(Instant, Arc<Vec<ts_netcheck::RegionResult>>)>,
102    /// Consecutive automatic-reauth attempts that have NOT yet recovered the node to a good (non-
103    /// expired) self-node. The circuit breaker for [`expiry_action`]'s `Reauthenticate` path: a
104    /// one-shot / already-consumed auth key cannot re-register, so without a bound an expired node
105    /// would sit in [`DeviceState::Reauthenticating`] indefinitely (the rotated re-register keeps
106    /// failing or control keeps returning a still-expired self-node).
107    ///
108    /// Incremented once per expired self-node seen while *already* `Reauthenticating` (each such
109    /// arrival is evidence the prior reauth did not recover); reset to `0` whenever a good
110    /// (non-expired) self-node arrives (the node recovered) — see the [`StreamMessage::Next`] handler.
111    /// At [`MAX_REAUTH_ATTEMPTS`] the runner stops re-arming reauth and flips the cell to the terminal
112    /// [`DeviceState::Expired`], giving the cell a stable terminal state (a genuinely good self-node
113    /// can still recover it later). The one-shot `Command::Reauth` is fired ONLY on the transition
114    /// INTO `Reauthenticating`, never re-fired while already reauthenticating, so the node key is
115    /// rotated at most once per episode (a second rotation would lose the original `OldNodeKey`
116    /// anchor control needs to link the rotation).
117    reauth_attempts: u32,
118    /// Background task that bridges the control client's mid-session re-auth URL cell onto
119    /// [`Self::params`]'s device-state cell (sets [`DeviceState::NeedsLogin`] when control returns
120    /// `MachineNotAuthorized` on a live re-register — see [`bridge_reauth_url_to_state`]). Aborted on
121    /// [`Drop`] so it cannot outlive the actor (the [`DataplaneActor`](crate::dataplane) pattern).
122    reauth_bridge: tokio::task::JoinHandle<()>,
123}
124
125/// The number of consecutive failed automatic-reauth attempts after which the runner gives up
126/// re-arming reauth and flips the device to the terminal [`DeviceState::Expired`] (the circuit
127/// breaker from the design's deferred-question Q1). A one-shot auth key was consumed by the first
128/// registration, so an auto re-register with it cannot succeed; this bounds that case to today's
129/// terminal behavior instead of an indefinite `Reauthenticating` spell. The first arrival fires the
130/// reauth; each subsequent expired self-node while still reauthenticating counts toward this cap.
131const MAX_REAUTH_ATTEMPTS: u32 = 3;
132
133impl Drop for ControlRunner {
134    fn drop(&mut self) {
135        // Stop the re-auth bridge so it does not outlive the actor (mirrors `DataplaneActor`).
136        self.reauth_bridge.abort();
137    }
138}
139
140/// Control runner args.
141pub struct Params {
142    /// Control config.
143    pub(crate) config: ts_control::Config,
144
145    /// Auth key (if needed).
146    pub(crate) auth_key: Option<String>,
147
148    /// The [`crate::Env`] for this actor.
149    pub(crate) env: crate::Env,
150
151    /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
152    /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
153    /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
154    /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
155    pub(crate) state_tx: watch::Sender<crate::DeviceState>,
156
157    /// Sender for the TKA enforcement-authority cell the peer tracker reads (Go
158    /// `tkaFilterNetmapLocked`). Created in [`Runtime::spawn`](crate::Runtime) and threaded into BOTH
159    /// the peer tracker (the `Receiver`) and this runner (the `Sender`), so the runner is the sole
160    /// writer and the tracker reads the latest verified `Authority` on demand. `None` = no lock /
161    /// disabled (admit all).
162    pub(crate) tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
163
164    /// Sender for the selected DERP home region (the **smoothed** `bestRecent` + hysteresis choice,
165    /// Go `report.PreferredDERP`). Created in [`Runtime::spawn`](crate::Runtime); the runner is the
166    /// sole writer and [`Multiderp`](crate::multiderp) holds the `Receiver`, so the local DERP relay
167    /// follows the SAME home the runner advertises to control — not the raw per-cycle latency
168    /// minimum (which would flap on jitter and disagree with the advertised home). `None` until the
169    /// first home is chosen.
170    pub(crate) home_region: watch::Sender<Option<ts_derp::RegionId>>,
171}
172
173#[doc(hidden)]
174#[derive(Debug, thiserror::Error)]
175pub enum ControlRunnerError {
176    #[error(transparent)]
177    Control(#[from] ControlError),
178
179    #[error(transparent)]
180    Crate(#[from] crate::Error),
181}
182
183impl kameo::Actor for ControlRunner {
184    type Args = Params;
185    type Error = ControlRunnerError;
186
187    async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
188        loop {
189            match AsyncControlClient::check_auth(
190                &params.config,
191                &params.env.keys,
192                params.auth_key.as_deref(),
193            )
194            .await
195            {
196                Ok(()) => break,
197                Err(ControlError::MachineNotAuthorized(u)) => {
198                    tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
199                    // Surface "interactive login required" so a watcher / `wait_until_running` can
200                    // tell the user to authorize, instead of seeing an opaque timeout. Registration
201                    // keeps retrying (transient), so this is not a terminal `Failed`.
202                    params
203                        .state_tx
204                        .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
205                    tokio::time::sleep(Duration::from_secs(5)).await;
206                }
207                Err(ControlError::NeedsMachineAuth) => {
208                    // The node is registered with a valid key but awaiting ADMIN APPROVAL on an
209                    // approval-gated tailnet, and control offered NO interactive URL. This is
210                    // TRANSIENT (Go's `NeedsMachineAuth`): poll registration until an admin approves,
211                    // then `check_auth` returns `Ok(())` → the loop breaks and the node comes up with
212                    // no re-registration. Publishing the (no-URL) `NeedsMachineAuth` state — NOT a
213                    // terminal `Failed` and NOT `NeedsLogin` (there is no URL to open) — lets a
214                    // watcher / `wait_until_running` see "awaiting approval" instead of an opaque
215                    // timeout. Same 5s poll cadence as the `MachineNotAuthorized(url)` arm.
216                    tracing::info!(
217                        "machine awaiting admin approval to join the tailnet; polling until approved"
218                    );
219                    params
220                        .state_tx
221                        .send_replace(crate::DeviceState::NeedsMachineAuth);
222                    tokio::time::sleep(Duration::from_secs(5)).await;
223                }
224                Err(ControlError::RateLimited(retry_after)) => {
225                    // Control asked us to slow down (HTTP 429). Wait exactly the server-requested
226                    // cooldown and retry — this is transient, NOT a terminal `Failed`, so we must
227                    // not stop the runner (mirrors Go's `authRoutine` sleeping `rle.retryAfter`).
228                    tracing::warn!(
229                        ?retry_after,
230                        "control rate-limited registration; waiting before retry"
231                    );
232                    tokio::time::sleep(retry_after).await;
233                }
234                Err(e) => {
235                    // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
236                    // specific reason control gave AND publish it as a typed `Failed` state so
237                    // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
238                    // of the opaque `Internal(Actor)` the caller would otherwise see once the
239                    // stopped actor is next asked. Publishing before `return Err` is why the state
240                    // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
241                    let reason = crate::RegistrationError::from(&e);
242                    tracing::error!(error = %e, "registration failed; control runner stopping");
243                    params
244                        .state_tx
245                        .send_replace(crate::DeviceState::Failed(reason));
246                    return Err(e.into());
247                }
248            }
249        }
250        // check_auth succeeded, but the node is not "up" until the netmap stream is actually
251        // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
252        // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
253        // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
254        // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
255        // The control client's live map-poll loop publishes a mid-session re-auth URL here (set when
256        // a re-register returns `MachineNotAuthorized` because the node key expired/was revoked). The
257        // runtime owns the receiver; `connect` takes the sender. Created before `connect` so the
258        // sender is in place for the very first poll, and so the receiver outlives `bring_up`.
259        let (auth_url_tx, auth_url_rx) = watch::channel::<Option<url::Url>>(None);
260
261        // `connect` issues its own `machine/register` POST (a second one after `check_auth`'s), so
262        // it too can hit a 429. Wrap it in the same honor-`Retry-After` retry as the `check_auth`
263        // loop above: a rate-limit is transient — sleeping the server-requested cooldown and
264        // retrying must NOT stop the runtime (a 429 here previously fell into the `Err(e)` arm →
265        // `Failed` → actor stop). `connect` consumes a `watch::Sender` by value (and drops it on a
266        // failed register, before it would be moved into the live-poll task), so we keep the original
267        // `auth_url_tx` alive here across all attempts and hand `connect` a CLONE each time. That
268        // keeps the runtime's `auth_url_rx` (the `reauth_bridge` receiver) paired with a live sender:
269        // recreating a fresh sender per attempt instead would orphan the bridge the moment the
270        // original dropped, silently killing mid-session re-auth-URL delivery.
271        let client = loop {
272            let bring_up = async {
273                let (client, stream) = AsyncControlClient::connect(
274                    &params.config,
275                    &params.env.keys,
276                    params.auth_key.as_deref(),
277                    auth_url_tx.clone(),
278                )
279                .await?;
280
281                DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
282
283                params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
284                params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
285                slf.attach_stream(stream.boxed(), (), ());
286                Ok::<_, ControlRunnerError>(client)
287            };
288
289            match bring_up.await {
290                Ok(client) => break client,
291                Err(ControlRunnerError::Control(ControlError::RateLimited(retry_after))) => {
292                    tracing::warn!(
293                        ?retry_after,
294                        "control rate-limited the session bring-up; waiting before retry"
295                    );
296                    tokio::time::sleep(retry_after).await;
297                }
298                Err(ControlRunnerError::Control(ControlError::NeedsMachineAuth)) => {
299                    // `connect` issues its OWN `machine/register` POST (a second one after
300                    // `check_auth`'s), so it too can come back "awaiting admin approval, no URL". In
301                    // the normal flow the `check_auth` loop above already gated this (it only breaks
302                    // once registration returns `Ok`, and approval is monotonic), so this arm is the
303                    // defensive twin for a node de-authorized in the race window between the two POSTs:
304                    // treat it as TRANSIENT exactly like the `check_auth` arm — publish the (no-URL)
305                    // `NeedsMachineAuth` state, poll the same 5s, and retry the bring-up — rather than
306                    // collapsing into the terminal `Failed` arm below (which would permanently stop
307                    // the runner on a recoverable await-approval). Mirrors Go's `NeedsMachineAuth`.
308                    tracing::info!(
309                        "machine awaiting admin approval during session bring-up; polling until \
310                         approved"
311                    );
312                    params
313                        .state_tx
314                        .send_replace(crate::DeviceState::NeedsMachineAuth);
315                    tokio::time::sleep(Duration::from_secs(5)).await;
316                }
317                Err(e) => {
318                    tracing::error!(error = %e, "bringing up the control session failed");
319                    // The control session never came up; surface it as a transient registration
320                    // failure (a retry / fresh `Device::new` may succeed) rather than leaving the
321                    // state stuck at `Connecting`.
322                    params.state_tx.send_replace(crate::DeviceState::Failed(
323                        crate::RegistrationError::NetworkUnreachable,
324                    ));
325                    return Err(e);
326                }
327            }
328        };
329
330        // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
331        // current (and flips to `Expired` if the self-node's key lapses).
332        params.state_tx.send_replace(crate::DeviceState::Running);
333
334        // Bridge the control client's mid-session re-auth URL cell onto the device-state cell: a
335        // `Some(url)` (control returned `MachineNotAuthorized` on a live re-register) becomes
336        // `DeviceState::NeedsLogin(url)` so the IPN bus surfaces `browse_to_url` and the embedder can
337        // prompt the user — the live-session analogue of the initial `check_auth` loop above. The
338        // recovery to `Running` is the netmap self-node handler's job (next good self-node), so this
339        // bridge only forwards `Some`. The task ends when the sender drops (the client's `run` task
340        // ended) and is aborted on actor `Drop`, so it cannot leak past the actor.
341        let reauth_bridge = {
342            let state_tx = params.state_tx.clone();
343            let mut auth_url_rx = auth_url_rx;
344            tokio::spawn(async move {
345                while auth_url_rx.changed().await.is_ok() {
346                    let url = auth_url_rx.borrow_and_update().clone();
347                    bridge_reauth_url_to_state(&state_tx, url.as_ref());
348                }
349            })
350        };
351
352        // Clone the TKA authority publisher before `params` moves into `Self` below. The matching
353        // `Receiver` lives on the peer tracker; this sender is the sole writer (enforce on sync,
354        // clear on disable).
355        let tka_authority = params.tka_authority.clone();
356
357        Ok(Self {
358            client,
359            params,
360            self_node: Default::default(),
361            ssh_policy: Default::default(),
362            tka: Default::default(),
363            tka_synced: None,
364            tka_authority,
365            tka_syncing: false,
366            tka_generation: 0,
367            cert_domains: Default::default(),
368            dns_config: Default::default(),
369            pop_browser_url: Default::default(),
370            netcheck: Default::default(),
371            home_region: None,
372            derp_report_history: Vec::new(),
373            reauth_attempts: 0,
374            reauth_bridge,
375        })
376    }
377}
378
379impl ControlRunner {
380    /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
381    /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
382    /// round-trip). The result returns via the [`TkaSynced`] self-message.
383    ///
384    /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
385    /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
386    /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
387    /// `tkaSyncIfNeeded`: a no-op when our head already matches.
388    fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
389        if !tka.is_enabled() {
390            // Lock disabled (or never enabled): clear enforcement by writing `None` to the authority
391            // cell the peer tracker reads — synchronously, so it can never be reordered behind or
392            // dropped before a stale `Some` (the failure a best-effort broadcast had). Always bump the
393            // generation so ANY sync currently in flight is invalidated: without this, a disable that
394            // races an in-flight sync (whose `take()` already cleared `tka_synced`) would be a no-op
395            // here, and the sync's late `Ok(Some)` would silently re-enable a lock control just turned
396            // off (the in-flight window the `tka_synced.is_some()` guard alone misses). Cheap and
397            // idempotent: clearing an already-`None` cell and bumping the generation are harmless.
398            self.tka_generation = self.tka_generation.wrapping_add(1);
399            if self.tka_synced.is_some() {
400                tracing::info!("TKA lock disabled; clearing enforcement (admitting all peers)");
401                self.tka_synced = None;
402            }
403            self.tka_authority.send_replace(None);
404            return;
405        }
406        if self.tka_syncing {
407            return; // a sync is already in flight; the next netmap will re-trigger if still stale
408        }
409        // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
410        // do. A malformed control head is treated as "different" (we'll attempt a sync, which
411        // fail-closes harmlessly).
412        if let Some(synced) = &self.tka_synced
413            && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
414            && synced.authority.head_matches(&control_head)
415        {
416            return;
417        }
418
419        // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
420        // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
421        // `tka_syncing` so we don't spawn a second concurrent sync. Capture the current generation so
422        // `apply_tka_synced` can discard this result if a disable bumped the generation while the sync
423        // was in flight (H1: don't re-enable a lock that was disabled mid-sync).
424        self.tka_syncing = true;
425        let generation = self.tka_generation;
426        let current = self.tka_synced.take();
427        let config = self.params.config.clone();
428        let keys = self.params.env.keys.clone();
429        tokio::spawn(async move {
430            let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
431            // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
432            // not allowed). A send failure just means the actor is gone — nothing to do.
433            if let Err(e) = self_ref.tell(TkaSynced { result, generation }).await {
434                tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
435            }
436        });
437    }
438
439    /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
440    /// state + publish the `Authority` to the peer tracker's enforcement cell (or, on inert/failed
441    /// sync, leave peers unaffected). Always clears the in-flight guard.
442    ///
443    /// `generation` is the value captured when the sync was spawned. If it no longer matches
444    /// `self.tka_generation`, the lock was disabled (or re-synced) while this sync was in flight, so
445    /// the result is discarded — never re-enabling an authority control has since turned off.
446    async fn apply_tka_synced(
447        &mut self,
448        result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
449        generation: u64,
450    ) {
451        self.tka_syncing = false;
452
453        // H1 guard: a disable (or a superseding sync) bumped the generation while this sync ran. Drop
454        // the stale result — `maybe_sync_tka`'s disable branch already cleared enforcement to `None`,
455        // and re-applying this `Some` would re-enforce a lock that is no longer active.
456        if generation != self.tka_generation {
457            tracing::info!(
458                "TKA sync result superseded (lock disabled or re-synced mid-flight); discarding"
459            );
460            return;
461        }
462
463        match result {
464            Ok(Some(synced)) => {
465                tracing::info!(
466                    head = %synced.authority.head().to_base32(),
467                    "TKA sync succeeded; enforcing verified Authority (Go tkaFilterNetmapLocked)"
468                );
469                // Deliver the verified Authority to the peer tracker's enforcement cell. The tracker
470                // reads it on every peer upsert and drops unauthorized peers. `Some(..)` = enforce; a
471                // `None` is written on disable. `watch` is the sole channel (last-write-wins, never
472                // dropped, ordered by this actor) — no bus, no re-publish-for-replay needed.
473                self.tka_authority
474                    .send_replace(Some(synced.authority.clone()));
475
476                // Observability (Go `tkaFilterNetmapLocked`'s self check → `LockedOut` health
477                // warning): verify SELF's own node-key signature against the freshly-synced
478                // Authority and warn if self is NOT authorized. We never FILTER self (self never
479                // enters the peer db, so enforcement can't lock us out of our own netmap), but Go
480                // raises an operator-facing warning here because a self that the lock does not
481                // authorize means this node's key-signature is missing/invalid for the current lock
482                // — it will be unable to prove itself to locked peers. This fork has no health
483                // subsystem, so the signal is a `tracing::warn!` (its observability channel).
484                //
485                // `self_node` is a sticky cell set on every netmap carrying a self-node; if a sync
486                // somehow lands before the first self-node ever arrived it is `None`, so we skip the
487                // advisory this cycle and re-evaluate on the next sync — fine for observability-only.
488                // The `borrow()` ref is scoped to this `if let` and dropped before the `&mut self`
489                // write below.
490                if let Some(self_node) = self.self_node.borrow().as_ref() {
491                    log_self_lockout(self_node, &synced.authority);
492                }
493
494                self.tka_synced = Some(synced);
495            }
496            Ok(None) => {
497                // Control has no lock for us (no genesis / disabled). Clear any authority we were
498                // previously enforcing — symmetric with the disable path — so a transition to
499                // "no lock" stops dropping peers. Not an error.
500                if self.tka_synced.is_some() {
501                    tracing::info!("TKA sync: control reports no lock; clearing enforcement");
502                    self.tka_synced = None;
503                }
504                self.tka_authority.send_replace(None);
505            }
506            Err(e) => {
507                // Transport or verify failure: log and leave the prior authority in place (a failed
508                // sync must not drop enforcement — that would fail OPEN). NEVER errors the netmap.
509                // The next netmap update re-triggers a sync attempt.
510                tracing::warn!(error = %e, "TKA sync failed; keeping prior enforcement state");
511            }
512        }
513    }
514
515    fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
516    where
517        F: FnOnce(&Node) -> R,
518    {
519        let mut sub = self.self_node.subscribe();
520        let mut shutdown = self.params.env.shutdown.clone();
521
522        async move {
523            tokio::select! {
524                _ = shutdown.wait_for(|x| *x) => {
525                    None
526                },
527                node = sub.wait_for(Option::is_some) => {
528                    Some(f(node.ok()?.as_ref()?))
529                },
530            }
531        }
532    }
533}
534
535/// Apply Go's sticky `PopBrowserURL` semantics to the consent-URL `watch` cell.
536///
537/// Control sends `MapResponse.PopBrowserURL` empty on nearly every netmap update, so the cell is
538/// updated ONLY when `incoming` is a non-empty URL that differs from the cell's current value —
539/// Go's `direct.go` guard `u != "" && u != sess.lastPopBrowserURL`. The cell is **never reset to
540/// `None`** by an empty/absent update — the running-node consent URL is sticky for the session.
541/// Updating unconditionally would thrash the cell to `None` on every tick and coalesce the URL away
542/// for a `watch`/bus subscriber.
543///
544/// The dedupe is in-place via [`watch::Sender::send_if_modified`] — the cell's own value is the
545/// "last URL sent" (this sticky path is its only writer), so no separate mirror field is needed and
546/// the watch is woken only on a genuine change (Go's `sess.lastPopBrowserURL` role, for free). This
547/// matches the [`send_if_modified`](watch::Sender::send_if_modified) idiom already used for the
548/// device-state cell in this handler.
549///
550/// Factored out of the netmap-update handler so the (easy-to-regress) sticky logic is unit-testable
551/// against a plain `watch` channel without standing up the actor.
552fn sticky_update_pop_browser_url(
553    cell: &watch::Sender<Option<url::Url>>,
554    incoming: Option<&url::Url>,
555) {
556    if let Some(url) = incoming {
557        cell.send_if_modified(|current| {
558            if current.as_ref() == Some(url) {
559                false
560            } else {
561                *current = Some(url.clone());
562                true
563            }
564        });
565    }
566}
567
568/// Map a mid-session re-auth URL surfaced by the control client onto the device-state cell.
569///
570/// The control client's live map-poll loop publishes an `Option<url::Url>` into a `watch` cell when
571/// a re-register hits `MachineNotAuthorized` (the node key expired/was revoked mid-session — see
572/// [`ts_control::AsyncControlClient::connect`]'s `auth_url_tx`). `ts_control` cannot name
573/// [`DeviceState`] (it must not depend on this crate), so this bridge fn does the translation:
574/// a `Some(url)` sets [`DeviceState::NeedsLogin`]`(url)` so the IPN bus derives `browse_to_url` and
575/// the embedder can prompt the user, exactly like the initial-registration `check_auth` path.
576///
577/// **Only `Some` drives a transition; `None` is ignored here.** The clear back to
578/// [`DeviceState::Running`] is owned by the netmap self-node handler (the next good self-node flips
579/// it — see the `StreamMessage::Next` arm), which is the authoritative "we are up again" signal; an
580/// independent `None`-clear in this bridge could race that and is unnecessary. The
581/// [`send_if_modified`](watch::Sender::send_if_modified) guard fires the watch only on a genuine
582/// state change (it is a no-op when the cell already holds `NeedsLogin(url)` for the same URL), so a
583/// re-auth URL re-surfaced across retries does not thrash the cell — mirroring the device-state
584/// dedupe in the netmap handler.
585///
586/// Factored out so the (regress-prone) map-and-guard is unit-testable against a plain `watch`
587/// channel without standing up the actor (mirrors [`sticky_update_pop_browser_url`]).
588/// **Auto-reauth ownership.** While an automatic re-auth is in flight (the cell holds
589/// [`DeviceState::Reauthenticating`]), this bridge must NOT downgrade it to `NeedsLogin`. An
590/// auto-reauth's rotated re-register surfaces an auth URL through the SAME `auth_url_tx` cell this
591/// bridge watches, but on a headless / auth-key node there is no human to visit it — flipping to
592/// `NeedsLogin` would surface a misleading `browse_to_url` AND, by moving the cell off
593/// `Reauthenticating`, let the next expired self-node re-fire reauth (a second node-key rotation that
594/// loses the original `OldNodeKey` anchor). The auto-reauth path owns the cell until it recovers to
595/// `Running` (the netmap self-node handler) or its circuit breaker trips it to `Expired`; this bridge
596/// stands down for the duration.
597pub(crate) fn bridge_reauth_url_to_state(
598    state_tx: &watch::Sender<crate::DeviceState>,
599    incoming: Option<&url::Url>,
600) {
601    if let Some(url) = incoming {
602        let next = crate::DeviceState::NeedsLogin(url.clone());
603        state_tx.send_if_modified(|current| {
604            // Do not clobber an in-flight automatic re-auth (see the ownership note above): leave
605            // `Reauthenticating` untouched so it can recover to `Running` or trip to `Expired` on its
606            // own terms.
607            if *current == crate::DeviceState::Reauthenticating || *current == next {
608                false
609            } else {
610                *current = next.clone();
611                true
612            }
613        });
614    }
615}
616
617/// What to do when control delivers a self-node whose node-key expiry has passed — the decision
618/// behind the [`StreamMessage::Next`] handler's expiry branch, factored into a pure function so the
619/// full input matrix is unit-testable (mirrors [`bridge_reauth_url_to_state`] being pure).
620#[derive(Debug, Clone, Copy, PartialEq, Eq)]
621pub(crate) enum ExpiryAction {
622    /// The key is not expired — the node is up. (`→ DeviceState::Running`.)
623    Running,
624    /// The key expired and auto-reauth is permitted (auth key retained, reauth enabled, TKA NOT
625    /// enforcing): rotate the node key + re-register (`→ DeviceState::Reauthenticating` +
626    /// `Command::Reauth`).
627    Reauthenticate,
628    /// The key expired and auto-reauth is NOT permitted (no auth key, reauth disabled, or TKA
629    /// enforcing): fall back to today's terminal behavior (`→ DeviceState::Expired`).
630    Expired,
631}
632
633/// Decide the action for an expired-or-not self node (pure; the live handler at
634/// [`StreamMessage::Next`] applies it). Go's `ipnlocal` runs `doLogin` (rotate the node key +
635/// re-register with the stored auth key) when an auth-key node's key expires; this fork does the
636/// same, gated by three safety conditions:
637///
638/// - `key_expired` — control reported the self-node's key expiry is in the past.
639/// - `has_auth_key` — a usable auth key is retained for a non-interactive re-register (without one
640///   there is nothing to re-register with → fall back to `Expired`, today's behavior).
641/// - `reauth_enabled` — the `reauth_on_expiry` config opt-out is on (default true).
642/// - `tka_active` — Tailnet Lock enforcement is currently active. **Hard safety gate:** a node-key
643///   rotation on a locked tailnet would install an UNSIGNED key, locking the node out of locked
644///   peers (the TKA re-sign is a separate follow-up — see `keystate.rs` `rotate_node_key`). So when
645///   the lock is enforcing, never rotate — fall back to `Expired`.
646///
647/// Truth table: not expired → `Running`; expired AND auth-key AND reauth-enabled AND NOT TKA →
648/// `Reauthenticate`; otherwise → `Expired`.
649pub(crate) fn expiry_action(
650    key_expired: bool,
651    has_auth_key: bool,
652    reauth_enabled: bool,
653    tka_active: bool,
654) -> ExpiryAction {
655    if !key_expired {
656        return ExpiryAction::Running;
657    }
658    if has_auth_key && reauth_enabled && !tka_active {
659        ExpiryAction::Reauthenticate
660    } else {
661        ExpiryAction::Expired
662    }
663}
664
665/// The outcome of one step of the bounded reauth sub-state-machine: the device state to publish, the
666/// updated consecutive-attempt counter, and whether to fire the one-shot `Command::Reauth` this step.
667#[derive(Debug, Clone, Copy, PartialEq, Eq)]
668pub(crate) struct ReauthStep {
669    /// The device state to publish for this self-node.
670    pub next: ReauthState,
671    /// The consecutive-failed-reauth counter after this step (the caller stores it back).
672    pub attempts: u32,
673    /// Whether to fire the one-shot `Command::Reauth` (rotate + re-register) this step. Set ONLY on
674    /// the transition INTO reauthenticating, so the node key rotates at most once per episode.
675    pub fire_reauth: bool,
676}
677
678/// The device state a [`ReauthStep`] resolves to — the subset of [`DeviceState`](crate::DeviceState)
679/// the circuit breaker can produce. Kept as its own enum so the breaker stays pure (no dependency on
680/// the URL-carrying `DeviceState` variants) and the truth table is exhaustively testable.
681#[derive(Debug, Clone, Copy, PartialEq, Eq)]
682pub(crate) enum ReauthState {
683    /// The node is up — `→ DeviceState::Running`.
684    Running,
685    /// An automatic re-auth is in flight — `→ DeviceState::Reauthenticating`.
686    Reauthenticating,
687    /// Terminal expiry — `→ DeviceState::Expired` (either the non-reauth path or the breaker tripped).
688    Expired,
689}
690
691/// One step of the bounded reauth sub-state-machine (pure; the [`StreamMessage::Next`] handler stores
692/// the result back into [`ControlRunner::reauth_attempts`] and publishes [`ReauthStep::next`]). This
693/// is the circuit breaker for the design's deferred-question Q1: a one-shot / already-consumed auth
694/// key cannot re-register, so the `Reauthenticate` path must settle rather than loop forever.
695///
696/// Inputs:
697/// - `action` — the [`expiry_action`] verdict for this self-node.
698/// - `already_reauthing` — whether the published state is currently
699///   [`DeviceState::Reauthenticating`](crate::DeviceState::Reauthenticating) (the prior reauth has
700///   not yet recovered the node).
701/// - `attempts` — the current consecutive-failed-reauth counter.
702///
703/// Rules:
704/// - `Running` → reset the counter to `0` (the node recovered) and report `Running`.
705/// - `Reauthenticate` while NOT already reauthing → ENTER reauthenticating: counter `= 1`, fire the
706///   one-shot reauth.
707/// - `Reauthenticate` while ALREADY reauthing → the prior reauth did not recover; COUNT it (counter
708///   `+= 1`) and do NOT re-fire (a second rotation would lose the original `OldNodeKey` anchor). At
709///   [`MAX_REAUTH_ATTEMPTS`] the breaker trips to terminal `Expired`; below the cap, stay reauthing.
710/// - `Expired` → terminal; the counter is irrelevant (this path never armed the reauth machine), so
711///   it is reset to `0`.
712pub(crate) fn reauth_circuit_step(
713    action: ExpiryAction,
714    already_reauthing: bool,
715    attempts: u32,
716) -> ReauthStep {
717    match action {
718        ExpiryAction::Running => ReauthStep {
719            next: ReauthState::Running,
720            attempts: 0,
721            fire_reauth: false,
722        },
723        ExpiryAction::Reauthenticate if !already_reauthing => ReauthStep {
724            next: ReauthState::Reauthenticating,
725            attempts: 1,
726            fire_reauth: true,
727        },
728        ExpiryAction::Reauthenticate => {
729            let attempts = attempts.saturating_add(1);
730            if attempts >= MAX_REAUTH_ATTEMPTS {
731                ReauthStep {
732                    next: ReauthState::Expired,
733                    attempts,
734                    fire_reauth: false,
735                }
736            } else {
737                ReauthStep {
738                    next: ReauthState::Reauthenticating,
739                    attempts,
740                    fire_reauth: false,
741                }
742            }
743        }
744        ExpiryAction::Expired => ReauthStep {
745            next: ReauthState::Expired,
746            attempts: 0,
747            fire_reauth: false,
748        },
749    }
750}
751
752/// The classification of SELF against the active network lock — the observability analog of Go
753/// `tkaFilterNetmapLocked`'s self check (which raises a `LockedOut` health warning).
754#[derive(Debug, Clone, PartialEq, Eq)]
755enum SelfLockVerdict {
756    /// Self carries no key-signature at all (empty). The common "not signed yet" case: the node
757    /// simply has not been signed for this lock — not locked out, just unsigned.
758    Unsigned,
759    /// Self's key-signature is authorized by the active lock; nothing to warn about.
760    Authorized,
761    /// Self has a key-signature but the lock does NOT authorize it (the message is the verify
762    /// error). The operator-facing `LockedOut` condition: locked peers will reject this node.
763    LockedOut(String),
764}
765
766/// Classify a node key + its key-signature against `authority` (pure: verify-and-classify, no
767/// logging, no I/O). Takes only the two fields it needs — not the whole `Node` — so the decision is
768/// unit-testable without constructing a full `Node` or standing up the actor.
769fn self_lock_verdict(
770    node_key: &ts_keys::NodePublicKey,
771    key_signature: &[u8],
772    authority: &ts_tka::Authority,
773) -> SelfLockVerdict {
774    // Mirror the peer path (`peer_tracker` `tka_snapshot_admits`): treat an empty signature as
775    // "unsigned" rather than the `LockedOut` bucket Go's `NodeKeyAuthorized` would put a nil sig in
776    // (it errors at decode). This is a deliberate, narrow divergence from a literal Go port: it
777    // avoids `warn`-spam on a lock that simply has not signed this node yet, and keeps self and peer
778    // classification consistent.
779    if key_signature.is_empty() {
780        return SelfLockVerdict::Unsigned;
781    }
782    match authority.node_key_authorized(&node_key.to_bytes(), key_signature) {
783        Ok(()) => SelfLockVerdict::Authorized,
784        Err(e) => SelfLockVerdict::LockedOut(e.to_string()),
785    }
786}
787
788/// Emit the self-locked-out observability signal (Go `tkaFilterNetmapLocked`'s self check → a
789/// `LockedOut` health warning): classify SELF against the freshly-synced `authority` and log.
790///
791/// This is **observability, not enforcement** — self never enters the peer db, so the lock can never
792/// filter our own node out of the netmap. But a self the lock does not authorize means this node's
793/// key-signature is absent or invalid for the active lock, so it cannot prove itself to locked peers
794/// (they will drop it); surfacing that lets an operator notice and re-sign. A never-signed node
795/// (empty signature) logs at `info`, distinct from a present-but-invalid signature (`warn`), so the
796/// common unsigned case does not spam a warning. This fork has no health subsystem, so the operator
797/// signal is a `tracing` event (its observability channel).
798fn log_self_lockout(self_node: &Node, authority: &ts_tka::Authority) {
799    match self_lock_verdict(&self_node.node_key, &self_node.key_signature, authority) {
800        SelfLockVerdict::Unsigned => tracing::info!(
801            "TKA: this node has no key-signature for the active lock; it cannot prove itself to \
802             locked peers until control signs it (not locked out, just unsigned)"
803        ),
804        SelfLockVerdict::Authorized => {
805            tracing::debug!("TKA: self node-key is authorized by the active lock")
806        }
807        SelfLockVerdict::LockedOut(error) => tracing::warn!(
808            %error,
809            "TKA self locked out: this node's key-signature is not authorized by the active \
810             network lock; locked peers will reject it until control re-signs this node \
811             (Go LockedOut)"
812        ),
813    }
814}
815
816// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
817// those generated fields carry no doc and can't take attributes, so wrap in a module where
818// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
819// are re-exported so callers keep referencing them at `control_runner::<Name>`.
820pub use msg_impl::*;
821
822#[allow(missing_docs)]
823mod msg_impl {
824    use kameo::{message::Context, reply::DelegatedReply};
825
826    use super::*;
827
828    #[kameo::messages]
829    impl ControlRunner {
830        /// Fetch the IPv4 address for this tailscale device.
831        #[message(ctx)]
832        pub fn ipv4(
833            &self,
834            ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
835        ) -> DelegatedReply<Option<Ipv4Addr>> {
836            let (deleg, replier) = ctx.reply_sender();
837
838            if let Some(replier) = replier {
839                let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
840
841                tokio::spawn(async move {
842                    let ip = fut.await;
843                    replier.send(ip);
844                });
845            }
846
847            deleg
848        }
849
850        /// Fetch the IPv6 address for this tailscale device.
851        #[message(ctx)]
852        pub fn ipv6(
853            &self,
854            ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
855        ) -> DelegatedReply<Option<Ipv6Addr>> {
856            let (deleg, replier) = ctx.reply_sender();
857
858            if let Some(replier) = replier {
859                let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
860
861                tokio::spawn(async move {
862                    let ip = fut.await;
863                    replier.send(ip);
864                });
865            }
866
867            deleg
868        }
869
870        /// Fetch the self node for this tailscale device.
871        #[message(ctx)]
872        pub fn self_node(
873            &self,
874            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
875        ) -> DelegatedReply<Option<Node>> {
876            let (deleg, replier) = ctx.reply_sender();
877
878            if let Some(replier) = replier {
879                let node = self.with_self_node(|node| node.clone());
880
881                tokio::spawn(async move {
882                    let node = node.await;
883                    replier.send(node)
884                });
885            }
886
887            deleg
888        }
889
890        /// Fetch the current Tailscale SSH policy, if control has pushed one.
891        ///
892        /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
893        /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
894        /// for a value: an absent policy is a legitimate, immediate answer.
895        #[message]
896        pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
897            self.ssh_policy.borrow().clone()
898        }
899
900        /// Fetch the current Tailnet Lock status, if control has pushed one.
901        ///
902        /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
903        #[message]
904        pub fn current_tka_status(&self) -> Option<TkaStatus> {
905            self.tka.borrow().clone()
906        }
907
908        /// Read up to `limit` entries of the Tailnet-Lock update-chain log, **head-first** (newest →
909        /// oldest), from the locally-synced AUM chain (Go `NetworkLockLog`).
910        ///
911        /// A **pure local read** — no crypto, no mutation, no control round-trip: it walks the
912        /// already-verified `SyncedTka` store this actor owns. Returns an empty `Vec` when no lock is
913        /// synced (Go's `b.tka == nil`). Synchronous (no spawn), like `current_tka_status` — the
914        /// chain is in memory.
915        #[message]
916        pub fn tka_log(&self, limit: usize) -> Vec<crate::tka_sync::TkaLogEntry> {
917            let Some(synced) = self.tka_synced.as_ref() else {
918                return Vec::new();
919            };
920            crate::tka_sync::tka_log_entries(&synced.store, synced.oldest, limit)
921        }
922
923        /// Sign `node_key` directly with this node's network-lock key and submit the signature to
924        /// control (Go `tka.sign` for the Direct case → `tkaSubmitSignature`).
925        ///
926        /// Builds a `Direct` [`NodeKeySignature`](ts_tka::NodeKeySignature) via
927        /// [`sign_direct`](ts_tka::NodeKeySignature::sign_direct) over this node's inner ed25519
928        /// network-lock signing key, serializes it (raw CBOR), and POSTs it to `/machine/tka/sign`.
929        /// Mirrors `set_dns`/`get_certificate`: clones the control config + node keys into a spawned
930        /// task (delegated reply, so the round-trip doesn't block the mailbox) over a fresh Noise
931        /// channel.
932        ///
933        /// **Posture: this only *submits* a signature to control — it does NOT mutate the local
934        /// [`Authority`](ts_tka::Authority).** The local trusted-key state advances solely through the
935        /// existing verified-sync path (`sync_tka` → `VerifiedAumChain::verify`); a `tka_sign` success
936        /// is acknowledged to the caller, and the resulting AUM is picked up on the next netmap-driven
937        /// sync. Verify-and-log is unchanged.
938        #[message(ctx)]
939        pub fn tka_sign(
940            &self,
941            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
942            node_key: [u8; 32],
943        ) -> DelegatedReply<Result<(), TkaSyncError>> {
944            let (deleg, replier) = ctx.reply_sender();
945
946            if let Some(replier) = replier {
947                let config = self.params.config.clone();
948                let keys = self.params.env.keys.clone();
949                tokio::spawn(async move {
950                    // Sign the node key with our network-lock key, then submit the raw-CBOR NKS.
951                    let nks = ts_tka::NodeKeySignature::sign_direct(
952                        &node_key,
953                        &keys.network_lock_keys.private.signing_key(),
954                    );
955                    let req = ts_control::TkaSubmitSignatureRequest {
956                        // node_key + version are stamped by the RPC client from `keys`.
957                        version: Default::default(),
958                        node_key: keys.node_keys.public,
959                        signature: nks.serialize(),
960                    };
961                    let result = tka_submit_signature(
962                        &config.server_url,
963                        &keys,
964                        req,
965                        config.allow_http_key_fetch,
966                    )
967                    .await
968                    .map(|_response| ());
969                    replier.send(result);
970                });
971            }
972
973            deleg
974        }
975
976        /// Disable Tailnet Lock by presenting the disablement secret to control (Go
977        /// `tka.disable` → `/machine/tka/disable`).
978        ///
979        /// Targets the **current** authority head (read from the cached [`TkaStatus`]); the caller
980        /// supplies the `disablement_secret` out of band (it is the operator-held capability that
981        /// authorizes turning the lock off). Mirrors `tka_sign`: clones config + keys into a spawned
982        /// task (delegated reply). Returns [`TkaSyncError::Unsupported`] when there is no known TKA
983        /// head (lock not in use / control hasn't pushed a status), since there is nothing to disable.
984        ///
985        /// **Submit-only, like `tka_sign`:** this POSTs the disablement to control and does NOT mutate
986        /// the local [`Authority`](ts_tka::Authority). Control acts on the disablement; this node
987        /// observes the result through the existing verified-sync path. Verify-and-log unchanged.
988        #[message(ctx)]
989        pub fn tka_disable(
990            &self,
991            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
992            disablement_secret: Vec<u8>,
993        ) -> DelegatedReply<Result<(), TkaSyncError>> {
994            let (deleg, replier) = ctx.reply_sender();
995
996            if let Some(replier) = replier {
997                // Read the current head from the cached status BEFORE the spawn (can't borrow &self
998                // across the await). No head ⇒ no lock to disable ⇒ Unsupported.
999                let head = self.tka.borrow().as_ref().map(|s| s.head.clone());
1000                let config = self.params.config.clone();
1001                let keys = self.params.env.keys.clone();
1002                tokio::spawn(async move {
1003                    let result = match head {
1004                        Some(head) => {
1005                            let req = ts_control::TkaDisableRequest {
1006                                // node_key + version are stamped by the RPC client from `keys`.
1007                                version: Default::default(),
1008                                node_key: keys.node_keys.public,
1009                                head,
1010                                disablement_secret,
1011                            };
1012                            tka_disable(&config.server_url, &keys, req, config.allow_http_key_fetch)
1013                                .await
1014                                .map(|_response| ())
1015                        }
1016                        None => Err(TkaSyncError::Unsupported),
1017                    };
1018                    replier.send(result);
1019                });
1020            }
1021
1022            deleg
1023        }
1024
1025        /// Initialize Tailnet Lock with this node as the sole initial trusted key, gated by
1026        /// `disablement_secret` (Go `LocalClient.NetworkLockInit` — the "lock yourself in" case).
1027        ///
1028        /// Builds + signs a genesis Checkpoint AUM whose only trusted key is this node's network-lock
1029        /// public key (votes 1) and whose single DisablementValue is `disablement_value(secret)`, then
1030        /// drives the two-phase init: `tka/init/begin` (submit the genesis) → if control needs no
1031        /// further node signatures (`NeedSignatures` empty, the case when this node is the only key) →
1032        /// `tka/init/finish` carrying the raw `disablement_secret` as `SupportDisablement`. Mirrors
1033        /// `tka_sign`/`tka_disable`: cloned config + keys into a spawned task (delegated reply).
1034        ///
1035        /// If control returns a non-empty `NeedSignatures` (other nodes must be re-signed under the new
1036        /// lock — a multi-node tailnet), this returns [`TkaSyncError::Unsupported`]: re-signing each
1037        /// listed node (incl. the Rotation-key case) is a larger flow deferred to a fuller
1038        /// `tka_init(keys, secrets)` — the single-node lock-init is the shipped subset.
1039        ///
1040        /// **Submit-only**, like `tka_sign`/`tka_disable`: this creates the lock at control and does
1041        /// NOT seed the local [`Authority`](ts_tka::Authority) — the node picks up the new lock through
1042        /// the existing verified netmap-sync (control pushes a `TKAInfo`, `maybe_sync_tka` bootstraps
1043        /// the genesis through `VerifiedAumChain::verify`). Verify-and-log posture unchanged.
1044        #[message(ctx)]
1045        pub fn tka_init(
1046            &self,
1047            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
1048            disablement_secret: Vec<u8>,
1049        ) -> DelegatedReply<Result<(), TkaSyncError>> {
1050            let (deleg, replier) = ctx.reply_sender();
1051
1052            if let Some(replier) = replier {
1053                let config = self.params.config.clone();
1054                let keys = self.params.env.keys.clone();
1055                tokio::spawn(async move {
1056                    let result = tka_init_run(&config, &keys, disablement_secret).await;
1057                    replier.send(result);
1058                });
1059            }
1060
1061            deleg
1062        }
1063
1064        /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
1065        ///
1066        /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
1067        /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
1068        /// does not block waiting for a value).
1069        #[message]
1070        pub fn cert_domains(&self) -> Vec<String> {
1071            self.cert_domains.borrow().clone()
1072        }
1073
1074        /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
1075        /// control has sent no DNS config yet. An immediate answer (does not block); the facade
1076        /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
1077        #[message]
1078        pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
1079            self.dns_config.borrow().clone()
1080        }
1081
1082        /// The interactive-login / consent URL control last asked this node to open
1083        /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
1084        /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
1085        #[message]
1086        pub fn pop_browser_url(&self) -> Option<url::Url> {
1087            self.pop_browser_url.borrow().clone()
1088        }
1089
1090        /// Subscribe to the interactive-login / consent URL cell (`MapResponse.PopBrowserURL`).
1091        ///
1092        /// Returns a [`watch::Receiver`] whose value is the latest running-node consent URL, used by
1093        /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) to surface `browse_to_url`
1094        /// events mid-session. The cell is sticky (updated only on a new non-empty URL, never reset
1095        /// to `None` by an empty update — see the field docs), so a subscriber is not thrashed and a
1096        /// late subscriber sees the current URL. The initial value is `None` until control sends one.
1097        #[message(derive(Clone))]
1098        pub fn watch_browser_url(&self) -> watch::Receiver<Option<url::Url>> {
1099            self.pop_browser_url.subscribe()
1100        }
1101
1102        /// The latest network-conditions report (preferred DERP region + per-region latencies). An
1103        /// immediate answer (does not block); empty before the first DERP-latency measurement. The
1104        /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
1105        #[message]
1106        pub fn netcheck(&self) -> crate::status::NetcheckReport {
1107            self.netcheck.borrow().clone()
1108        }
1109
1110        /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
1111        ///
1112        /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
1113        /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
1114        /// for the round-trip.
1115        #[message(ctx)]
1116        pub fn fetch_id_token(
1117            &self,
1118            ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
1119            audience: String,
1120        ) -> DelegatedReply<Result<String, IdTokenError>> {
1121            let (deleg, replier) = ctx.reply_sender();
1122
1123            if let Some(replier) = replier {
1124                let config = self.params.config.clone();
1125                let keys = self.params.env.keys.clone();
1126                tokio::spawn(async move {
1127                    let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
1128                    replier.send(result);
1129                });
1130            }
1131
1132            deleg
1133        }
1134
1135        /// Log this node out of the tailnet: deregister it by expiring its current node key.
1136        ///
1137        /// Mirrors `fetch_id_token`: clones the control config + node keys
1138        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
1139        /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
1140        /// control-plane state change only — it does NOT stop this actor or tear down the datapath
1141        /// (the caller follows up with the normal runtime shutdown), and it does not touch the
1142        /// on-disk node key, so re-registering with the same key is the re-login path.
1143        #[message(ctx)]
1144        pub fn logout(
1145            &self,
1146            ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
1147        ) -> DelegatedReply<Result<(), LogoutError>> {
1148            let (deleg, replier) = ctx.reply_sender();
1149
1150            if let Some(replier) = replier {
1151                let config = self.params.config.clone();
1152                let keys = self.params.env.keys.clone();
1153                tokio::spawn(async move {
1154                    let result = ts_control::logout(&config, &keys).await;
1155                    replier.send(result);
1156                });
1157            }
1158
1159            deleg
1160        }
1161
1162        /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
1163        /// `LocalClient.SetDNS`).
1164        ///
1165        /// Mirrors `fetch_id_token`: clones the control config + node keys
1166        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
1167        /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
1168        /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
1169        /// match, so the surfaced API takes only `name` + `value`.
1170        #[message(ctx)]
1171        pub fn set_dns(
1172            &self,
1173            ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
1174            name: String,
1175            value: String,
1176        ) -> DelegatedReply<Result<(), SetDnsError>> {
1177            let (deleg, replier) = ctx.reply_sender();
1178
1179            if let Some(replier) = replier {
1180                let config = self.params.config.clone();
1181                let keys = self.params.env.keys.clone();
1182                tokio::spawn(async move {
1183                    let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
1184                    replier.send(result);
1185                });
1186            }
1187
1188            deleg
1189        }
1190    }
1191
1192    /// The reply type of the [`get_cert_pair`](ControlRunner::get_cert_pair) message: the issued
1193    /// `(cert_chain_pem, key_pem)` PEM pair (the `tnet cert` surface) or a [`ts_control::CertError`].
1194    /// Aliased so the message's `Context` type stays under clippy's `type_complexity` bar (the
1195    /// nested `Result<(String, String), _>` trips it inline).
1196    #[cfg(feature = "acme")]
1197    pub type CertPairReply = Result<(String, String), ts_control::CertError>;
1198
1199    // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
1200    // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
1201    // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
1202    // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
1203    // block keeps the default build clean.
1204    #[cfg(feature = "acme")]
1205    #[kameo::messages]
1206    impl ControlRunner {
1207        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
1208        /// client-side ACME DNS-01 engine (`acme` feature).
1209        ///
1210        /// Mirrors `fetch_id_token`: clones the control config + node keys
1211        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
1212        /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
1213        /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
1214        ///
1215        /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
1216        /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
1217        /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
1218        /// allows it). Persisting a generated key back into the key file is the embedder's job (no
1219        /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
1220        /// publish 501s.
1221        #[message(ctx)]
1222        pub fn get_certificate(
1223            &self,
1224            ctx: &mut Context<
1225                Self,
1226                DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
1227            >,
1228            name: String,
1229        ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
1230            let (deleg, replier) = ctx.reply_sender();
1231
1232            if let Some(replier) = replier {
1233                let config = self.params.config.clone();
1234                let keys = self.params.env.keys.clone();
1235                tokio::spawn(async move {
1236                    let result = issue_certificate(&config, &keys, &name).await;
1237                    replier.send(result);
1238                });
1239            }
1240
1241            deleg
1242        }
1243
1244        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` and return the
1245        /// **PEM pair** — `(cert_chain_pem, key_pem)` — for writing the on-disk `.crt` + `.key`
1246        /// (the daemon's `tnet cert`, Go's `LocalClient.CertPair`). `acme` feature.
1247        ///
1248        /// Identical issuance to [`get_certificate`](Self::get_certificate) (same client-side ACME
1249        /// DNS-01 flow, same set-dns publish, same account-key handling), only the *shape* of the
1250        /// result differs: this surfaces the raw chain + leaf-key PEMs instead of the opaque
1251        /// [`CertifiedKey`](ts_control::tls::CertifiedKey). The leaf **private key** PEM is the
1252        /// second tuple element and is NEVER logged — the spawned task sends it straight back to the
1253        /// replier. SaaS-only: against a self-hosted control plane the set-dns publish 501s.
1254        #[message(ctx)]
1255        pub fn get_cert_pair(
1256            &self,
1257            ctx: &mut Context<Self, DelegatedReply<CertPairReply>>,
1258            name: String,
1259        ) -> DelegatedReply<CertPairReply> {
1260            let (deleg, replier) = ctx.reply_sender();
1261
1262            if let Some(replier) = replier {
1263                let config = self.params.config.clone();
1264                let keys = self.params.env.keys.clone();
1265                tokio::spawn(async move {
1266                    let result = issue_cert_pair(&config, &keys, &name).await;
1267                    replier.send(result);
1268                });
1269            }
1270
1271            deleg
1272        }
1273    }
1274}
1275
1276/// The `tka_init` body (the genesis-build + two-phase init/begin→init/finish choreography),
1277/// factored out of the actor handler so it runs in the spawned task. See [`ControlRunner::tka_init`].
1278///
1279/// "Lock yourself in": the genesis trusts only this node's network-lock key (votes 1) and stores one
1280/// DisablementValue = `disablement_value(secret)`. On a non-empty `NeedSignatures` (multi-node
1281/// tailnet needing re-signs) it returns [`TkaSyncError::Unsupported`] — the single-node subset.
1282async fn tka_init_run(
1283    config: &ts_control::Config,
1284    keys: &ts_keys::NodeState,
1285    disablement_secret: Vec<u8>,
1286) -> Result<(), TkaSyncError> {
1287    // Build the genesis: this node's NL public key as the sole trusted key, one disablement value.
1288    let nl_public = keys.network_lock_keys.public.to_bytes().to_vec();
1289    let genesis_key = ts_tka::AumKey {
1290        kind: ts_tka::KeyKind::Ed25519,
1291        votes: 1,
1292        public: nl_public,
1293        meta: Vec::new(),
1294    };
1295    let dvalue = ts_tka::disablement_value(&disablement_secret).to_vec();
1296    let mut genesis = ts_tka::Aum::new_genesis_checkpoint(vec![genesis_key], vec![dvalue])
1297        // A malformed genesis is a local construction bug, not a transient RPC failure — surface it as a
1298        // coarse internal error rather than NetworkError (which would invite a pointless retry).
1299        .map_err(|_| TkaSyncError::Internal(ts_control::TkaSyncInternalErrorKind::SerDe))?;
1300    genesis.sign(&keys.network_lock_keys.private.signing_key());
1301
1302    // Phase 1: submit the genesis. node_key + version are stamped by the RPC client from `keys`.
1303    let begin_req = ts_control::TkaInitBeginRequest {
1304        version: Default::default(),
1305        node_key: keys.node_keys.public,
1306        genesis_aum: genesis.serialize(),
1307    };
1308    let begin_resp = tka_init_begin(
1309        &config.server_url,
1310        keys,
1311        begin_req,
1312        config.allow_http_key_fetch,
1313    )
1314    .await?;
1315
1316    // Single-node case only: control must need no further node signatures. A non-empty
1317    // NeedSignatures means other nodes must be re-signed under the new lock — deferred.
1318    if !begin_resp.need_signatures.is_empty() {
1319        tracing::warn!(
1320            need = begin_resp.need_signatures.len(),
1321            "tka_init: control requires re-signing other nodes; the multi-node init is not yet \
1322             implemented (single-node lock-init only)"
1323        );
1324        return Err(TkaSyncError::Unsupported);
1325    }
1326
1327    // Phase 2: finish, carrying the raw disablement secret as SupportDisablement (Go sends the raw
1328    // secret here; only the genesis stores its Argon2i hash).
1329    let finish_req = ts_control::TkaInitFinishRequest {
1330        version: Default::default(),
1331        node_key: keys.node_keys.public,
1332        signatures: std::collections::BTreeMap::new(),
1333        support_disablement: disablement_secret,
1334    };
1335    tka_init_finish(
1336        &config.server_url,
1337        keys,
1338        finish_req,
1339        config.allow_http_key_fetch,
1340    )
1341    .await
1342    .map(|_response| ())
1343}
1344
1345/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
1346/// returning just the ready-to-serve [`CertifiedKey`](ts_control::tls::CertifiedKey) (the
1347/// `get_certificate` / `ListenTLS` path).
1348///
1349/// Thin wrapper over [`issue_cert_pair`] that drops the PEMs — one issuance, this caller just
1350/// doesn't need the on-disk pair. See [`issue_cert_pair`] for the account-key handling.
1351#[cfg(feature = "acme")]
1352async fn issue_certificate(
1353    config: &ts_control::Config,
1354    keys: &ts_keys::NodeState,
1355    name: &str,
1356) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
1357    issue_cert_pair_inner(config, keys, name)
1358        .await
1359        .map(|issued| issued.certified)
1360}
1361
1362/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
1363/// returning the **PEM pair** `(cert_chain_pem, key_pem)` for the daemon's on-disk `.crt`/`.key`
1364/// (`tnet cert`, Go `LocalClient.CertPair`).
1365///
1366/// Same single issuance as [`issue_certificate`]; only the result shape differs. The leaf
1367/// **private key** PEM is the second element and is NEVER logged here.
1368#[cfg(feature = "acme")]
1369async fn issue_cert_pair(
1370    config: &ts_control::Config,
1371    keys: &ts_keys::NodeState,
1372    name: &str,
1373) -> Result<(String, String), ts_control::CertError> {
1374    issue_cert_pair_inner(config, keys, name)
1375        .await
1376        .map(|issued| (issued.cert_chain_pem, issued.key_pem))
1377}
1378
1379/// Shared issuance core for [`issue_certificate`] and [`issue_cert_pair`]: load (or generate) the
1380/// ACME account key, target Let's Encrypt production, and run one DNS-01 issuance, returning the
1381/// full [`IssuedCert`](ts_control::acme::IssuedCert) so each caller projects out what it needs (one
1382/// ACME order, two consumers).
1383///
1384/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
1385/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
1386/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
1387/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]). Never logs the leaf
1388/// private key.
1389#[cfg(feature = "acme")]
1390async fn issue_cert_pair_inner(
1391    config: &ts_control::Config,
1392    keys: &ts_keys::NodeState,
1393    name: &str,
1394) -> Result<ts_control::acme::IssuedCert, ts_control::CertError> {
1395    let account_key = match keys.acme_account_key.as_deref() {
1396        Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
1397        None => {
1398            tracing::debug!(
1399                "no persisted ACME account key in key state; generating an ephemeral per-call key \
1400                 (a new ACME account this issuance — not persisted back)"
1401            );
1402            ts_control::acme::AcmeAccountKey::generate()?.0
1403        }
1404    };
1405    let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
1406        .parse()
1407        .map_err(|e| {
1408            ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
1409        })?;
1410    ts_control::issue_cert_pair_via_setdns(config, keys, name, &account_key, &directory).await
1411}
1412
1413impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
1414    type Reply = ();
1415
1416    async fn handle(
1417        &mut self,
1418        msg: StreamMessage<Arc<StateUpdate>, (), ()>,
1419        ctx: &mut Context<Self, Self::Reply>,
1420    ) {
1421        match msg {
1422            StreamMessage::Started(_) => {
1423                tracing::trace!("started listening to state updates");
1424            }
1425
1426            StreamMessage::Next(msg) => {
1427                if let Some(node) = msg.node.as_ref() {
1428                    // Reflect node-key expiry into the device state. Control delivering a self-node
1429                    // whose key is in the past means the node must re-authenticate; the arrival of a
1430                    // fresh (non-expired) self-node confirms we are Running (recovering the state if a
1431                    // prior update had flipped it to Expired/Reauthenticating). On expiry, decide
1432                    // between an automatic re-auth (Go `doLogin`: rotate key + re-register with the
1433                    // stored auth key) and the terminal Expired state via the pure `expiry_action`:
1434                    //   - auth key retained, reauth enabled, and TKA NOT enforcing → Reauthenticate.
1435                    //   - otherwise → Expired (no auth key / reauth disabled / TKA-locked).
1436                    // The TKA gate is a hard safety constraint: rotating on a locked tailnet would
1437                    // install an unsigned key and lock this node out of locked peers (the TKA re-sign
1438                    // is a separate follow-up). Recovery from Reauthenticating is automatic — the next
1439                    // good self-node flips back to Running at this same handler.
1440                    let now_unix = std::time::SystemTime::now()
1441                        .duration_since(std::time::UNIX_EPOCH)
1442                        .map(|d| d.as_secs() as i64)
1443                        .unwrap_or(0);
1444                    let action = expiry_action(
1445                        node.key_expired_at_unix(now_unix),
1446                        self.params.auth_key.is_some(),
1447                        self.params.config.reauth_on_expiry,
1448                        self.tka_authority.borrow().is_some(),
1449                    );
1450
1451                    // Bounded reauth sub-state-machine (circuit breaker), evaluated by the pure
1452                    // `reauth_circuit_step`. The `Reauthenticate` path must settle — a one-shot /
1453                    // already-consumed auth key cannot re-register, so an unbounded reauth would sit in
1454                    // `Reauthenticating` forever. The step takes the `expiry_action` verdict, whether we
1455                    // are ALREADY reauthenticating (i.e. the prior reauth has not recovered), and the
1456                    // current attempt counter, and returns the target state, the new counter, and
1457                    // whether to fire the one-shot reauth (fired ONLY on entry, so the node key rotates
1458                    // at most once per episode — a second rotation would lose the original `OldNodeKey`
1459                    // anchor). At `MAX_REAUTH_ATTEMPTS` it trips to terminal `Expired`.
1460                    //
1461                    // NOTE: we may act (count, and eventually flip to `Expired`) even when the published
1462                    // state does NOT change — a repeated `Reauthenticating` self-node is exactly the
1463                    // "prior reauth didn't recover" signal we must tally, so the counting reads the
1464                    // pre-step state directly here and `send_if_modified`'s `changed` only gates the
1465                    // log/firing below, never the counting.
1466                    let already_reauthing = matches!(
1467                        &*self.params.state_tx.borrow(),
1468                        crate::DeviceState::Reauthenticating
1469                    );
1470                    let step = reauth_circuit_step(action, already_reauthing, self.reauth_attempts);
1471                    self.reauth_attempts = step.attempts;
1472                    if step.next == ReauthState::Expired
1473                        && action == ExpiryAction::Reauthenticate
1474                        && already_reauthing
1475                    {
1476                        tracing::warn!(
1477                            attempts = step.attempts,
1478                            "automatic re-auth did not recover the node after {MAX_REAUTH_ATTEMPTS} \
1479                             attempts (auth key likely one-shot / consumed); falling back to terminal \
1480                             Expired"
1481                        );
1482                    }
1483                    let next = match step.next {
1484                        ReauthState::Running => crate::DeviceState::Running,
1485                        ReauthState::Reauthenticating => crate::DeviceState::Reauthenticating,
1486                        ReauthState::Expired => crate::DeviceState::Expired,
1487                    };
1488
1489                    // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
1490                    // self-node arrives on every netmap update). Returns whether the state changed.
1491                    let changed = self.params.state_tx.send_if_modified(|s| {
1492                        if *s != next {
1493                            *s = next.clone();
1494                            true
1495                        } else {
1496                            false
1497                        }
1498                    });
1499
1500                    if changed && step.fire_reauth {
1501                        tracing::info!(
1502                            "self node-key expired; starting automatic re-auth (rotate node key + \
1503                             re-register with stored auth key)"
1504                        );
1505                        self.client.reauth().await;
1506                    }
1507
1508                    self.self_node.send_replace(Some(node.clone()));
1509                }
1510
1511                if let Some(policy) = msg.ssh_policy.as_ref() {
1512                    self.ssh_policy.send_replace(Some(policy.clone()));
1513                }
1514
1515                if let Some(tka) = msg.tka.as_ref() {
1516                    self.tka.send_replace(Some(tka.clone()));
1517                    self.maybe_sync_tka(tka, ctx.actor_ref().clone());
1518                }
1519
1520                // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
1521                // An update with no DNS config, or one carrying no cert domains, means "none" — Go
1522                // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
1523                let cert_domains = msg
1524                    .dns_config
1525                    .as_ref()
1526                    .map(|d| d.cert_domains.clone())
1527                    .unwrap_or_default();
1528                self.cert_domains.send_replace(cert_domains);
1529
1530                // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
1531                // `None` when control sent no DNS config on this update — distinct from a present but
1532                // empty config (Go `netmap.NetworkMap.DNS`).
1533                self.dns_config.send_replace(msg.dns_config.clone());
1534
1535                // Track the interactive-login URL for `Device::pop_browser_url` /
1536                // `Runtime::watch_ipn_bus`. See `sticky_update_pop_browser_url` for the Go-faithful
1537                // sticky semantics (update only on a new non-empty URL; never reset to `None`).
1538                sticky_update_pop_browser_url(&self.pop_browser_url, msg.pop_browser_url.as_ref());
1539
1540                if let Err(e) = self.params.env.publish(msg).await {
1541                    tracing::error!(error = %e, "publishing netmap update");
1542                }
1543            }
1544
1545            StreamMessage::Finished(_) => {
1546                tracing::error!("state update stream terminated")
1547            }
1548        }
1549    }
1550}
1551
1552/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
1553/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
1554/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
1555/// [`ControlRunner::apply_tka_synced`](ControlRunner).
1556#[doc(hidden)]
1557pub struct TkaSynced {
1558    pub(crate) result:
1559        Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
1560    /// The [`ControlRunner::tka_generation`] captured when this sync was spawned; the handler
1561    /// discards the result if it no longer matches (the lock was disabled/re-synced mid-flight).
1562    pub(crate) generation: u64,
1563}
1564
1565impl Message<TkaSynced> for ControlRunner {
1566    type Reply = ();
1567
1568    async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
1569        self.apply_tka_synced(msg.result, msg.generation).await;
1570    }
1571}
1572
1573impl Message<DerpLatencyMeasurement> for ControlRunner {
1574    type Reply = ();
1575
1576    async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
1577        let measurements = msg.measurement.as_ref().clone();
1578
1579        // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
1580        // the same measurements, before the home-region short-circuit below — an empty set still
1581        // yields a (default/empty) report rather than a stale one.
1582        self.netcheck
1583            .send_replace(crate::status::NetcheckReport::from_region_results(
1584                &measurements,
1585            ));
1586
1587        if measurements.is_empty() {
1588            tracing::debug!("derp latency measurements empty");
1589            return;
1590        };
1591
1592        // Record this cycle into the rolling history and evict reports older than the smoothing
1593        // window, then compute each region's `bestRecent` (5-min min). `Instant::now()` is the
1594        // arrival stamp; `best_recent` takes it as a param so the decision stays unit-testable.
1595        let now = Instant::now();
1596        self.derp_report_history
1597            .push((now, msg.measurement.clone()));
1598        self.derp_report_history
1599            .retain(|(stamp, _)| now.saturating_duration_since(*stamp) <= DERP_HISTORY_MAX_AGE);
1600        let best_recent = best_recent(&self.derp_report_history, now, DERP_HISTORY_MAX_AGE);
1601
1602        // Apply selection hysteresis (the pure decision lives in `select_home_region` for testability)
1603        // so jitter between near-equal regions does not flap the home relay. Go's asymmetric
1604        // smoothed-best vs raw-old comparison lives in `select_home_region`; here we just resolve the
1605        // chosen id back to its current-cycle latency for the home-region record + control update.
1606        let selected_id = select_home_region(
1607            self.home_region.map(|(id, _)| id),
1608            &measurements,
1609            &best_recent,
1610        )
1611        .expect("non-empty measurements always yield a selection");
1612        // `select_home_region` only ever returns an id drawn from `measurements`, so this lookup
1613        // always succeeds (same invariant the prior impl relied on when it returned the result by
1614        // reference). We record the current-cycle (raw) latency for the chosen region.
1615        let selected_latency = measurements
1616            .iter()
1617            .find(|m| m.id == selected_id)
1618            .expect("the selected region id is always one of the measurements")
1619            .latency;
1620
1621        let iter = measurements.iter().map(|result| {
1622            (
1623                result.latency_map_key.as_str(),
1624                result.latency.as_secs_f64(),
1625            )
1626        });
1627
1628        if self.home_region.map(|(id, _)| id) != Some(selected_id) {
1629            tracing::debug!(selected_region_id = ?selected_id, "updating home region");
1630        }
1631        self.home_region = Some((selected_id, selected_latency));
1632        // Advertise the smoothed home to control AND drive the local DERP relay to the same region
1633        // (Go `report.PreferredDERP` feeds both). `send_replace` wakes the watch on every send (it
1634        // does NOT coalesce same-value writes), so Multiderp's bridge sees a `SetHomeRegion` each
1635        // cycle; the de-dup is one layer down — `home_transition` returns `Unchanged` for a
1636        // re-selection of the current home, so the relay only churns on an actual home change.
1637        self.client.set_home_region(selected_id, iter).await;
1638        self.params.home_region.send_replace(Some(selected_id));
1639    }
1640}
1641
1642/// The window over which `best_recent` smooths per-region DERP latency (Go `netcheck` `maxAge`).
1643const DERP_HISTORY_MAX_AGE: Duration = Duration::from_secs(5 * 60);
1644
1645/// Compute each region's `bestRecent` — its **minimum** latency over the reports within
1646/// `max_age` of `now` (Go `addReportHistoryAndSetPreferredDERP`'s `bestRecent` map). Reports older
1647/// than the window are ignored. `now` and `max_age` are parameters (not clock-read) so this is
1648/// deterministically unit-testable. A region absent from every in-window report is absent from the
1649/// result.
1650fn best_recent(
1651    history: &[(Instant, Arc<Vec<ts_netcheck::RegionResult>>)],
1652    now: Instant,
1653    max_age: Duration,
1654) -> HashMap<ts_derp::RegionId, Duration> {
1655    let mut best: HashMap<ts_derp::RegionId, Duration> = HashMap::new();
1656    for (stamp, report) in history {
1657        // Skip reports outside the window. `saturating_duration_since` guards a `stamp` that is
1658        // somehow after `now` (clock skew): age 0, always in-window.
1659        if now.saturating_duration_since(*stamp) > max_age {
1660            continue;
1661        }
1662        for r in report.iter() {
1663            best.entry(r.id)
1664                .and_modify(|d| {
1665                    if r.latency < *d {
1666                        *d = r.latency;
1667                    }
1668                })
1669                .or_insert(r.latency);
1670        }
1671    }
1672    best
1673}
1674
1675/// Choose the DERP home region id, applying Go's selection hysteresis
1676/// (`netcheck.addReportHistoryAndSetPreferredDERP`). Pure so the decision is unit-testable.
1677///
1678/// `measurements` is the current cycle sorted by latency ascending (so `measurements[0]` is the
1679/// raw-current best). `best_recent` is each region's smoothed (5-min-min) latency. Matching Go's
1680/// **asymmetric** comparison exactly: the new best candidate is chosen by the *smoothed* `best_recent`
1681/// latency (`bestAny`), while the old/home region is compared using its *current-cycle* (raw)
1682/// latency (`oldRegionCurLatency`). Smoothing the best damps oscillation of the best region across
1683/// the switch boundary that the raw-vs-raw comparison (the prior impl) would still flap on.
1684///
1685/// Keeps the `current` home region unless the new best is *meaningfully* lower-latency — switching
1686/// only when BOTH the current region's raw latency exceeds the smoothed-best by at least
1687/// `PREFERRED_DERP_ABSOLUTE_DIFF` (10ms) AND the smoothed-best is at most two-thirds of the current
1688/// region's raw latency (a >~33% improvement). On the first selection (`current` is `None`), when the
1689/// smoothed-best already IS the current region, or when the current region dropped out of the
1690/// measurements, returns the best directly. `None` only if `measurements` is empty.
1691fn select_home_region(
1692    current: Option<ts_derp::RegionId>,
1693    measurements: &[ts_netcheck::RegionResult],
1694    best_recent: &HashMap<ts_derp::RegionId, Duration>,
1695) -> Option<ts_derp::RegionId> {
1696    /// Go `netcheck.preferredDERPAbsoluteDiff`.
1697    const PREFERRED_DERP_ABSOLUTE_DIFF: Duration = Duration::from_millis(10);
1698
1699    // The smoothed latency for a region: its `best_recent` if present, else its current sample (a
1700    // region seen only this cycle has a 1-sample history, so its min == its current latency anyway).
1701    let smoothed = |m: &ts_netcheck::RegionResult| -> Duration {
1702        best_recent.get(&m.id).copied().unwrap_or(m.latency)
1703    };
1704
1705    // Pick the best candidate by SMOOTHED latency (Go `bestAny = min over regions of bestRecent`).
1706    // `measurements` is sorted by raw latency, but smoothing can reorder, so scan for the smoothed
1707    // minimum explicitly rather than trusting `measurements[0]`.
1708    let best = measurements.iter().min_by_key(|m| smoothed(m))?;
1709    let best_any = smoothed(best);
1710
1711    let Some(old_id) = current.filter(|id| *id != best.id) else {
1712        // First selection, or the smoothed-best already is the current home region.
1713        return Some(best.id);
1714    };
1715
1716    // Compare against the old region's CURRENT (raw) latency this cycle, if it is still present —
1717    // Go's `oldRegionCurLatency`, deliberately unsmoothed (the asymmetry).
1718    match measurements.iter().find(|m| m.id == old_id) {
1719        Some(old) => {
1720            // Byte-faithful to Go: `oldRegionCurLatency - bestAny < 10ms || bestAny >
1721            // oldRegionCurLatency/3*2`. `saturating_sub` matches Go's signed subtraction for the
1722            // `< 10ms` test (when `old < best_any` Go is negative → `< 10ms` true; saturating_sub
1723            // floors to 0 → also true). The two-thirds rule uses INTEGER `Duration` division
1724            // `(old/3)*2` — NOT float `* 2.0/3.0`: Go computes the threshold in integer nanoseconds
1725            // (`oldNs/3` truncates), and float arithmetic diverges from it at the exact 2/3 boundary
1726            // with whole-millisecond inputs (e.g. old=36ms, best=24ms: Go's `24ms > 24ms` is false →
1727            // switch, but float `0.024 > 0.0239999997` is true → keep). `Duration / u32` truncates
1728            // nanos exactly like Go and `* u32` is exact, reproducing `oldRegionCurLatency/3*2`.
1729            let keep_old = old.latency.saturating_sub(best_any) < PREFERRED_DERP_ABSOLUTE_DIFF
1730                || best_any > (old.latency / 3) * 2;
1731            Some(if keep_old { old.id } else { best.id })
1732        }
1733        // The current region is no longer reachable this cycle: take the new best.
1734        None => Some(best.id),
1735    }
1736}
1737
1738impl Message<EndpointAdvertisement> for ControlRunner {
1739    type Reply = ();
1740
1741    async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
1742        let endpoints: Vec<Endpoint> = msg
1743            .endpoints
1744            .iter()
1745            .map(|ep| Endpoint {
1746                endpoint: ep.addr,
1747                ty: match ep.ty {
1748                    SelfEndpointType::Local => EndpointType::Local,
1749                    SelfEndpointType::Stun => EndpointType::Stun,
1750                    SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
1751                },
1752            })
1753            .collect();
1754
1755        tracing::debug!(
1756            n_endpoints = endpoints.len(),
1757            "advertising endpoints to control"
1758        );
1759
1760        self.client.set_endpoints(endpoints).await;
1761    }
1762}
1763
1764/// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control — the wire
1765/// half of a runtime [`Runtime::set_advertise_routes`](crate::Runtime::set_advertise_routes). Sent
1766/// as a direct `ask` from the runtime (not over the bus), so the route change reaches the live
1767/// map-poll client. `routes` is the final advertised set the caller wants control to grant.
1768#[derive(Debug)]
1769pub struct SetAdvertiseRoutes {
1770    /// The prefixes to advertise to control (already filtered to the final set).
1771    pub routes: Vec<ipnet::IpNet>,
1772}
1773
1774impl Message<SetAdvertiseRoutes> for ControlRunner {
1775    type Reply = ();
1776
1777    async fn handle(&mut self, msg: SetAdvertiseRoutes, _ctx: &mut Context<Self, Self::Reply>) {
1778        tracing::debug!(n_routes = msg.routes.len(), "advertising routes to control");
1779        self.client.set_routable_ips(msg.routes).await;
1780    }
1781}
1782
1783/// Update this node's `Hostinfo.Hostname` at control — the wire half of a runtime
1784/// [`Runtime::set_hostname`](crate::Runtime::set_hostname). A direct `ask` from the runtime, so the
1785/// change reaches the live map-poll client.
1786#[derive(Debug)]
1787pub struct SetHostname {
1788    /// The new hostname to report to control.
1789    pub hostname: String,
1790}
1791
1792impl Message<SetHostname> for ControlRunner {
1793    type Reply = ();
1794
1795    async fn handle(&mut self, msg: SetHostname, _ctx: &mut Context<Self, Self::Reply>) {
1796        tracing::debug!("updating hostname at control");
1797        self.client.set_hostname(msg.hostname).await;
1798    }
1799}
1800
1801#[cfg(test)]
1802mod reauth_bridge_tests {
1803    use tokio::sync::watch;
1804
1805    use super::bridge_reauth_url_to_state;
1806    use crate::DeviceState;
1807
1808    fn url(s: &str) -> url::Url {
1809        s.parse().unwrap()
1810    }
1811
1812    /// The bridge maps a surfaced re-auth URL onto `DeviceState::NeedsLogin(url)` — the fix's core:
1813    /// a mid-session `MachineNotAuthorized` (forwarded by the control client as `Some(url)`) becomes
1814    /// the "needs login" state the IPN bus turns into `browse_to_url`.
1815    #[test]
1816    fn bridge_maps_auth_url_to_needs_login() {
1817        let u = url("https://login.example/auth");
1818        let (tx, rx) = watch::channel(DeviceState::Running);
1819
1820        bridge_reauth_url_to_state(&tx, Some(&u));
1821
1822        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1823    }
1824
1825    /// `None` never drives a transition — the recovery to `Running` is the netmap self-node
1826    /// handler's job, so the bridge ignores a `None` and leaves the state untouched.
1827    #[test]
1828    fn bridge_none_leaves_state_unchanged() {
1829        let (tx, rx) = watch::channel(DeviceState::Running);
1830
1831        bridge_reauth_url_to_state(&tx, None);
1832
1833        assert_eq!(*rx.borrow(), DeviceState::Running);
1834    }
1835
1836    /// Re-surfacing the same URL across retries does not re-fire the watch (`send_if_modified`
1837    /// dedupe against the cell's current value), so a stuck re-auth does not thrash subscribers.
1838    #[test]
1839    fn bridge_same_url_does_not_refire() {
1840        let u = url("https://login.example/auth");
1841        let (tx, mut rx) = watch::channel(DeviceState::Running);
1842
1843        bridge_reauth_url_to_state(&tx, Some(&u)); // first: fires
1844        assert!(rx.has_changed().unwrap(), "first NeedsLogin fires");
1845        rx.mark_unchanged();
1846        bridge_reauth_url_to_state(&tx, Some(&u)); // same URL: deduped
1847        assert!(
1848            !rx.has_changed().unwrap(),
1849            "the same re-auth URL must not re-fire the state watch"
1850        );
1851    }
1852
1853    /// A genuinely different re-auth URL after a prior one fires again (the dedupe tracks changes,
1854    /// it does not pin the first URL forever).
1855    #[test]
1856    fn bridge_new_url_after_prior_fires() {
1857        let a = url("https://login.example/a");
1858        let b = url("https://login.example/b");
1859        let (tx, rx) = watch::channel(DeviceState::Running);
1860
1861        bridge_reauth_url_to_state(&tx, Some(&a));
1862        bridge_reauth_url_to_state(&tx, Some(&b));
1863
1864        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(b));
1865    }
1866
1867    /// End-to-end of the *clear* contract: after the bridge sets `NeedsLogin`, the netmap self-node
1868    /// path (modeled here as a direct `send_replace(Running)`, the exact transition the
1869    /// `StreamMessage::Next` handler performs on the next good self-node) flips back to `Running`.
1870    /// This pins that the bridge does NOT need a `None`-clear arm — recovery is owned elsewhere.
1871    #[test]
1872    fn running_netmap_clears_needs_login() {
1873        let u = url("https://login.example/auth");
1874        let (tx, rx) = watch::channel(DeviceState::Running);
1875
1876        bridge_reauth_url_to_state(&tx, Some(&u));
1877        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1878
1879        // The self-node handler's recovery transition (next good netmap self-node → Running).
1880        tx.send_replace(DeviceState::Running);
1881        assert_eq!(*rx.borrow(), DeviceState::Running);
1882    }
1883
1884    /// Fix 2 — the bridge must NOT clobber an in-flight automatic re-auth. While the cell holds
1885    /// `Reauthenticating`, an auth URL surfaced by the rotated re-register (over the same
1886    /// `auth_url_tx` cell) must leave the state UNTOUCHED: flipping to `NeedsLogin` would surface a
1887    /// misleading `browse_to_url` on a headless node and, by moving the cell off `Reauthenticating`,
1888    /// re-arm a second node-key rotation that loses the original `OldNodeKey` anchor. The auto-reauth
1889    /// path owns the cell until it recovers to `Running` or trips to `Expired`.
1890    #[test]
1891    fn bridge_does_not_clobber_reauthenticating() {
1892        let u = url("https://login.example/auth");
1893        let (tx, rx) = watch::channel(DeviceState::Reauthenticating);
1894
1895        bridge_reauth_url_to_state(&tx, Some(&u));
1896
1897        assert_eq!(
1898            *rx.borrow(),
1899            DeviceState::Reauthenticating,
1900            "a surfaced auth URL must not downgrade an in-flight auto-reauth to NeedsLogin"
1901        );
1902    }
1903
1904    /// The no-clobber guard is scoped to `Reauthenticating` only — it does not change the bridge's
1905    /// behavior in any other state. From `Running` (and likewise `Connecting`/`NeedsLogin`) a
1906    /// surfaced URL still drives `NeedsLogin` as before, so an ordinary interactive re-auth is
1907    /// unaffected.
1908    #[test]
1909    fn bridge_still_sets_needs_login_from_non_reauthenticating() {
1910        let u = url("https://login.example/auth");
1911        for start in [
1912            DeviceState::Running,
1913            DeviceState::Connecting,
1914            DeviceState::Expired,
1915        ] {
1916            let (tx, rx) = watch::channel(start);
1917            bridge_reauth_url_to_state(&tx, Some(&u));
1918            assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u.clone()));
1919        }
1920    }
1921}
1922
1923#[cfg(test)]
1924mod sticky_pop_browser_url_tests {
1925    use tokio::sync::watch;
1926
1927    use super::sticky_update_pop_browser_url;
1928
1929    fn url(s: &str) -> url::Url {
1930        s.parse().unwrap()
1931    }
1932
1933    /// A non-empty URL publishes to the cell.
1934    #[test]
1935    fn non_empty_url_publishes() {
1936        let (tx, rx) = watch::channel(None);
1937        let u = url("https://login.example/consent");
1938        sticky_update_pop_browser_url(&tx, Some(&u));
1939        assert_eq!(*rx.borrow(), Some(u));
1940    }
1941
1942    /// An absent (`None`) update — the common netmap tick — must NOT reset the cell. This is the
1943    /// regression guard for the thrash bug (a reset-every-tick would coalesce the URL away on the bus).
1944    #[test]
1945    fn absent_update_does_not_reset() {
1946        let u = url("https://login.example/consent");
1947        let (tx, rx) = watch::channel(Some(u.clone()));
1948        // Simulate many empty netmap updates.
1949        for _ in 0..5 {
1950            sticky_update_pop_browser_url(&tx, None);
1951        }
1952        assert_eq!(
1953            *rx.borrow(),
1954            Some(u),
1955            "empty updates must not clear the URL"
1956        );
1957    }
1958
1959    /// The same URL repeated does not re-fire the watch (in-place dedupe via `send_if_modified`), so
1960    /// a subscriber isn't woken spuriously. Proven by the borrow not having been marked changed.
1961    #[test]
1962    fn repeated_same_url_does_not_refire() {
1963        let u = url("https://login.example/consent");
1964        let (tx, mut rx) = watch::channel(None);
1965        sticky_update_pop_browser_url(&tx, Some(&u)); // first: fires
1966        assert!(rx.has_changed().unwrap(), "first non-empty URL fires");
1967        rx.mark_unchanged();
1968        sticky_update_pop_browser_url(&tx, Some(&u)); // same: deduped
1969        assert!(
1970            !rx.has_changed().unwrap(),
1971            "repeating the same URL must not re-fire the watch"
1972        );
1973    }
1974
1975    /// A genuinely new URL after a prior one fires again (sticky but tracks changes).
1976    #[test]
1977    fn new_url_after_prior_fires() {
1978        let a = url("https://login.example/a");
1979        let b = url("https://login.example/b");
1980        let (tx, rx) = watch::channel(None);
1981        sticky_update_pop_browser_url(&tx, Some(&a));
1982        sticky_update_pop_browser_url(&tx, Some(&b));
1983        assert_eq!(*rx.borrow(), Some(b));
1984    }
1985
1986    /// The realistic session sequence: a URL stays sticky through a run of `None` ticks, and a
1987    /// *different* URL after that gap still fires. Chains the legs the other tests cover in isolation
1988    /// (the actual control cadence is "URL, then many empty updates, then maybe a new URL").
1989    #[test]
1990    fn sticky_through_none_gap_then_new_url_fires() {
1991        let a = url("https://login.example/a");
1992        let b = url("https://login.example/b");
1993        let (tx, rx) = watch::channel(None);
1994        sticky_update_pop_browser_url(&tx, Some(&a));
1995        for _ in 0..3 {
1996            sticky_update_pop_browser_url(&tx, None);
1997        }
1998        assert_eq!(*rx.borrow(), Some(a), "stayed sticky through the None gap");
1999        sticky_update_pop_browser_url(&tx, Some(&b));
2000        assert_eq!(
2001            *rx.borrow(),
2002            Some(b),
2003            "a new URL after a None gap still fires"
2004        );
2005    }
2006
2007    /// Returning to a previously-seen URL (A → B → A) re-fires: the dedupe is against the cell's
2008    /// *current* value, not a full history, so A after B is a genuine change.
2009    #[test]
2010    fn returning_to_prior_url_refires() {
2011        let a = url("https://login.example/a");
2012        let b = url("https://login.example/b");
2013        let (tx, mut rx) = watch::channel(None);
2014        sticky_update_pop_browser_url(&tx, Some(&a));
2015        sticky_update_pop_browser_url(&tx, Some(&b));
2016        rx.mark_unchanged();
2017        sticky_update_pop_browser_url(&tx, Some(&a)); // back to A: differs from current (B) → fires
2018        assert!(
2019            rx.has_changed().unwrap(),
2020            "returning to a prior URL re-fires"
2021        );
2022        assert_eq!(*rx.borrow(), Some(a));
2023    }
2024
2025    /// End-to-end de-thrash: feed a realistic netmap cadence (empty, empty, URL, empty, empty)
2026    /// through the producer into a cell, and count the changes a `run_bus`-style subscriber would
2027    /// observe via `changed()`. The whole point of the fix is that exactly ONE change survives the
2028    /// surrounding `None` thrash — the pre-fix code (`send_replace` every tick) would have woken the
2029    /// subscriber on every empty tick and coalesced the URL away. This exercises the producer + the
2030    /// watch-subscribe path together (the two halves the unit tests cover in isolation).
2031    #[tokio::test]
2032    async fn end_to_end_one_change_survives_none_thrash() {
2033        let u = url("https://login.example/consent");
2034        let (tx, mut rx) = watch::channel(None);
2035        // The cadence control actually sends: mostly-empty MapResponses with one carrying the URL.
2036        let cadence = [None, None, Some(&u), None, None];
2037        for incoming in cadence {
2038            sticky_update_pop_browser_url(&tx, incoming);
2039        }
2040        // A subscriber sees exactly one change, and it carries the URL (not a coalesced `None`).
2041        let mut changes = 0;
2042        while rx.has_changed().unwrap() {
2043            let v = rx.borrow_and_update().clone();
2044            changes += 1;
2045            assert_eq!(v, Some(u.clone()), "the surviving change carries the URL");
2046        }
2047        assert_eq!(changes, 1, "exactly one change survives the None thrash");
2048    }
2049}
2050
2051#[cfg(test)]
2052mod home_region_hysteresis_tests {
2053    use core::time::Duration;
2054    use std::{collections::HashMap, sync::Arc, time::Instant};
2055
2056    use ts_derp::RegionId;
2057    use ts_netcheck::RegionResult;
2058
2059    use super::{DERP_HISTORY_MAX_AGE, best_recent, select_home_region};
2060
2061    fn region(id: u32, latency_ms: u64) -> RegionResult {
2062        RegionResult {
2063            latency: Duration::from_millis(latency_ms),
2064            id: RegionId(core::num::NonZeroU32::new(id).unwrap()),
2065            latency_map_key: format!("region-{id}"),
2066            connected_remote: "127.0.0.1:0".parse().unwrap(),
2067        }
2068    }
2069
2070    fn rid(id: u32) -> RegionId {
2071        RegionId(core::num::NonZeroU32::new(id).unwrap())
2072    }
2073
2074    /// Call `select_home_region` with NO smoothing history — `best_recent` empty, so each region's
2075    /// smoothed latency falls back to its current sample, reproducing the original raw-vs-raw
2076    /// hysteresis these tests pin. (The smoothing-specific tests below pass a populated map.)
2077    fn sel(current: Option<RegionId>, m: &[RegionResult]) -> Option<RegionId> {
2078        select_home_region(current, m, &HashMap::new())
2079    }
2080
2081    /// Empty measurements yield no selection.
2082    #[test]
2083    fn empty_measurements_select_none() {
2084        assert!(sel(Some(rid(1)), &[]).is_none());
2085        assert!(sel(None, &[]).is_none());
2086    }
2087
2088    /// First selection (no current home region) takes the best (lowest-latency) region directly.
2089    #[test]
2090    fn first_selection_takes_best() {
2091        let m = [region(1, 20), region(2, 50)];
2092        assert_eq!(sel(None, &m).unwrap(), rid(1));
2093    }
2094
2095    /// Jitter within the 10ms absolute-diff band keeps the current region (no flap). Current=region 2
2096    /// at 25ms; new best=region 1 at 20ms (only 5ms better) -> keep region 2.
2097    #[test]
2098    fn keeps_current_when_within_absolute_diff() {
2099        let m = [region(1, 20), region(2, 25)];
2100        assert_eq!(
2101            sel(Some(rid(2)), &m).unwrap(),
2102            rid(2),
2103            "a 5ms improvement (< 10ms) must not flap the home region"
2104        );
2105    }
2106
2107    /// A meaningful improvement (>10ms AND best <= 2/3 of current) switches. Current=region 2 at
2108    /// 100ms; new best=region 1 at 20ms -> switch to region 1.
2109    #[test]
2110    fn switches_on_meaningful_improvement() {
2111        let m = [region(1, 20), region(2, 100)];
2112        assert_eq!(
2113            sel(Some(rid(2)), &m).unwrap(),
2114            rid(1),
2115            "a large improvement must switch the home region"
2116        );
2117    }
2118
2119    /// The two-thirds rule: even past the 10ms absolute diff, an improvement that does not beat 2/3
2120    /// of the current latency keeps the current region. current=60ms, best=45ms: diff=15ms (>10ms,
2121    /// so the absolute test alone would switch), but 45 > 60*2/3=40, so keep.
2122    #[test]
2123    fn keeps_current_when_two_thirds_rule_not_met() {
2124        let m = [region(1, 45), region(2, 60)];
2125        assert_eq!(
2126            sel(Some(rid(2)), &m).unwrap(),
2127            rid(2),
2128            "best (45ms) is not <= 2/3 of current (40ms), so keep current despite >10ms diff"
2129        );
2130    }
2131
2132    /// When the current home region is no longer present in the measurements, take the new best.
2133    #[test]
2134    fn switches_when_current_region_absent() {
2135        let m = [region(1, 20), region(3, 25)];
2136        assert_eq!(
2137            sel(Some(rid(2)), &m).unwrap(),
2138            rid(1),
2139            "a current region absent from the measurements falls through to the best"
2140        );
2141    }
2142
2143    /// When the best already IS the current home region, it is kept (no spurious change).
2144    #[test]
2145    fn keeps_current_when_it_is_already_best() {
2146        let m = [region(2, 20), region(1, 50)];
2147        assert_eq!(sel(Some(rid(2)), &m).unwrap(), rid(2));
2148    }
2149
2150    /// `best_recent` is each region's MINIMUM latency over the in-window reports; a report older than
2151    /// `max_age` is excluded.
2152    #[test]
2153    fn best_recent_is_min_over_window_and_evicts_aged() {
2154        let now = Instant::now();
2155        // Two in-window reports for region 1 (50ms then 20ms) → min 20ms; region 2 once at 30ms.
2156        // One aged report (region 1 at 5ms) outside the window must be ignored.
2157        let history = vec![
2158            (
2159                now - Duration::from_secs(10 * 60), // aged out (> 5min)
2160                Arc::new(vec![region(1, 5)]),
2161            ),
2162            (
2163                now - Duration::from_secs(60),
2164                Arc::new(vec![region(1, 50), region(2, 30)]),
2165            ),
2166            (now, Arc::new(vec![region(1, 20)])),
2167        ];
2168        let br = best_recent(&history, now, DERP_HISTORY_MAX_AGE);
2169        assert_eq!(
2170            br.get(&rid(1)).copied(),
2171            Some(Duration::from_millis(20)),
2172            "region 1 min over the window is 20ms (the aged 5ms is excluded)"
2173        );
2174        assert_eq!(br.get(&rid(2)).copied(), Some(Duration::from_millis(30)));
2175    }
2176
2177    /// The asymmetric comparison: the new best is chosen by its SMOOTHED (best_recent) latency while
2178    /// the old region is compared on its RAW current latency. A best region whose CURRENT sample
2179    /// looks much better but whose 5-min MIN is only marginally better must NOT flap the home region
2180    /// — exactly the oscillation the raw-vs-raw comparison would have switched on.
2181    #[test]
2182    fn smoothed_best_damps_oscillation_across_boundary() {
2183        // Current home = region 2, raw 60ms this cycle. Region 1's CURRENT sample is 20ms (a >2/3,
2184        // >10ms improvement → raw-vs-raw would SWITCH), but its 5-min MIN (best_recent) is 50ms
2185        // (it oscillates). Smoothed-best 50ms vs raw-old 60ms: diff 10ms is NOT < 10ms, but
2186        // 50 > 60*2/3=40 → keepOld. So we KEEP region 2, where the raw comparison would have flapped.
2187        let m = [region(1, 20), region(2, 60)];
2188        let mut br = HashMap::new();
2189        br.insert(rid(1), Duration::from_millis(50)); // smoothed best is worse than its raw sample
2190        br.insert(rid(2), Duration::from_millis(60));
2191        assert_eq!(
2192            select_home_region(Some(rid(2)), &m, &br).unwrap(),
2193            rid(2),
2194            "a best region whose 5-min min is only marginally better must not flap the home region"
2195        );
2196
2197        // Sanity: with NO smoothing (raw 20ms best), the same inputs WOULD switch — proving the
2198        // smoothing is what holds it.
2199        assert_eq!(
2200            select_home_region(Some(rid(2)), &m, &HashMap::new()).unwrap(),
2201            rid(1),
2202            "raw-vs-raw (no smoothing) switches on the 20ms-vs-60ms current samples"
2203        );
2204    }
2205
2206    /// Smoothing can reorder which region is "best": `measurements` is sorted by raw latency, but the
2207    /// smoothed minimum may favor a different region. `select_home_region` must pick by smoothed
2208    /// latency, not blindly trust `measurements[0]`.
2209    #[test]
2210    fn smoothed_best_may_differ_from_raw_first() {
2211        // Raw order: region 1 (10ms) is first. But region 2's 5-min min is 5ms while region 1's is
2212        // 40ms (region 1's 10ms was a lucky low sample). Smoothed-best is region 2. First selection.
2213        let m = [region(1, 10), region(2, 12)];
2214        let mut br = HashMap::new();
2215        br.insert(rid(1), Duration::from_millis(40));
2216        br.insert(rid(2), Duration::from_millis(5));
2217        assert_eq!(
2218            select_home_region(None, &m, &br).unwrap(),
2219            rid(2),
2220            "the smoothed-best region wins even when it is not the raw-latency first"
2221        );
2222    }
2223
2224    /// Byte-faithful integer two-thirds boundary (the float-vs-integer divergence): at exactly
2225    /// `best == old * 2/3` (old=36ms, best=24ms), Go's integer `bestAny > old/3*2` = `24ms > 24ms`
2226    /// is FALSE, so it does NOT keep on the 2/3 arm; and `cond_a` `36-24=12ms < 10ms` is also false,
2227    /// so Go SWITCHES. A float `0.024 > 0.036*2.0/3.0 = 0.0239999997` would wrongly KEEP. This test
2228    /// pins the integer math: it must switch to the best.
2229    #[test]
2230    fn two_thirds_boundary_is_integer_not_float() {
2231        let m = [region(1, 24), region(2, 36)];
2232        // No smoothing (raw == smoothed): isolates the 2/3 arithmetic at the exact boundary.
2233        assert_eq!(
2234            sel(Some(rid(2)), &m).unwrap(),
2235            rid(1),
2236            "at best == old*2/3 the integer rule does NOT keep (Go switches); a float rule would keep"
2237        );
2238    }
2239
2240    /// The `cond_a` (absolute-diff) arm via `saturating_sub`: when the old region's RAW current
2241    /// latency is FASTER than the smoothed-best (old=20ms raw, smoothed-best=50ms), `old - best_any`
2242    /// underflows. Go's signed subtraction is negative (`< 10ms` → keepOld); `saturating_sub` floors
2243    /// to 0 (`< 10ms` → keepOld) — same outcome. The old region is kept.
2244    #[test]
2245    fn old_faster_than_smoothed_best_keeps_via_absolute_diff() {
2246        // Current home = region 2, raw 20ms. Region 1 is the raw-best at 15ms but its smoothed min is
2247        // 50ms (it oscillates badly). smoothed-best candidate by min = region 2 (raw 20 == smoothed
2248        // 20, since br[2]=20) vs region 1 smoothed 50 → best is region 2 itself → already-best path.
2249        // To exercise the old<best_any underflow we need best != old: make region 1 the smoothed best
2250        // at 18ms but the OLD region's raw 20ms... use: old=region2 raw 20, best=region1 smoothed 18.
2251        let m = [region(1, 15), region(2, 20)];
2252        let mut br = HashMap::new();
2253        br.insert(rid(1), Duration::from_millis(18)); // smoothed-best = region 1 at 18ms
2254        br.insert(rid(2), Duration::from_millis(25)); // region 2 smoothed worse than its raw 20ms
2255        // best_any = 18ms (region 1). old (region 2) RAW = 20ms. 20 - 18 = 2ms < 10ms → keepOld.
2256        assert_eq!(
2257            select_home_region(Some(rid(2)), &m, &br).unwrap(),
2258            rid(2),
2259            "old raw (20ms) within 10ms of smoothed-best (18ms) keeps via the absolute-diff arm"
2260        );
2261    }
2262}
2263
2264#[cfg(test)]
2265mod expiry_action_tests {
2266    use super::{ExpiryAction, expiry_action};
2267
2268    /// Not expired → `Running`, regardless of the other inputs (the gate fields are only consulted
2269    /// once the key is expired).
2270    #[test]
2271    fn not_expired_is_always_running() {
2272        for has_auth_key in [false, true] {
2273            for reauth_enabled in [false, true] {
2274                for tka_active in [false, true] {
2275                    assert_eq!(
2276                        expiry_action(false, has_auth_key, reauth_enabled, tka_active),
2277                        ExpiryAction::Running,
2278                        "a non-expired key is Running for any gate combination"
2279                    );
2280                }
2281            }
2282        }
2283    }
2284
2285    /// The ONLY input combination that auto-reauths: expired AND auth key retained AND reauth enabled
2286    /// AND TKA not enforcing.
2287    #[test]
2288    fn expired_with_authkey_reauth_enabled_and_no_tka_reauthenticates() {
2289        assert_eq!(
2290            expiry_action(true, true, true, false),
2291            ExpiryAction::Reauthenticate
2292        );
2293    }
2294
2295    /// Every other expired combination falls back to the terminal `Expired` (today's behavior, no
2296    /// regression): no auth key, reauth disabled, or TKA enforcing each independently forces Expired.
2297    #[test]
2298    fn expired_falls_back_to_expired_for_every_other_combination() {
2299        // The full expired-input matrix minus the single Reauthenticate cell above.
2300        for has_auth_key in [false, true] {
2301            for reauth_enabled in [false, true] {
2302                for tka_active in [false, true] {
2303                    let action = expiry_action(true, has_auth_key, reauth_enabled, tka_active);
2304                    if has_auth_key && reauth_enabled && !tka_active {
2305                        // The one Reauthenticate cell, asserted above.
2306                        assert_eq!(action, ExpiryAction::Reauthenticate);
2307                    } else {
2308                        assert_eq!(
2309                            action,
2310                            ExpiryAction::Expired,
2311                            "expired with has_auth_key={has_auth_key}, \
2312                             reauth_enabled={reauth_enabled}, tka_active={tka_active} must be Expired"
2313                        );
2314                    }
2315                }
2316            }
2317        }
2318    }
2319
2320    /// The TKA safety gate in isolation: even with an auth key and reauth enabled, an ACTIVE lock
2321    /// forces `Expired` (never rotate an unsigned key on a locked tailnet). This is the hard
2322    /// constraint from the design — pinned as its own test so a regression that drops the `!tka_active`
2323    /// term is caught explicitly.
2324    #[test]
2325    fn tka_active_forces_expired_even_when_reauth_would_otherwise_fire() {
2326        assert_eq!(
2327            expiry_action(true, true, true, true),
2328            ExpiryAction::Expired,
2329            "an enforcing Tailnet Lock must veto auto-reauth (unsigned-key lockout safety gate)"
2330        );
2331    }
2332
2333    /// No auth key forces `Expired`: there is nothing to non-interactively re-register with, so even
2334    /// with reauth enabled and no lock the node goes terminal (unchanged from today).
2335    #[test]
2336    fn no_auth_key_forces_expired() {
2337        assert_eq!(
2338            expiry_action(true, false, true, false),
2339            ExpiryAction::Expired
2340        );
2341    }
2342
2343    /// The config opt-out: `reauth_on_expiry=false` forces `Expired` even with an auth key and no
2344    /// lock (the conservative posture / historical behavior).
2345    #[test]
2346    fn reauth_disabled_forces_expired() {
2347        assert_eq!(
2348            expiry_action(true, true, false, false),
2349            ExpiryAction::Expired
2350        );
2351    }
2352}
2353
2354#[cfg(test)]
2355mod reauth_circuit_tests {
2356    use super::{ExpiryAction, MAX_REAUTH_ATTEMPTS, ReauthState, reauth_circuit_step};
2357
2358    /// Entering reauth (a `Reauthenticate` verdict while NOT already reauthenticating) fires the
2359    /// one-shot reauth and starts the counter at 1.
2360    #[test]
2361    fn enter_reauth_fires_once_and_starts_counter() {
2362        let step = reauth_circuit_step(ExpiryAction::Reauthenticate, false, 0);
2363        assert_eq!(step.next, ReauthState::Reauthenticating);
2364        assert_eq!(
2365            step.attempts, 1,
2366            "the entering attempt starts the counter at 1"
2367        );
2368        assert!(step.fire_reauth, "entry fires the one-shot Command::Reauth");
2369    }
2370
2371    /// A subsequent expired self-node while ALREADY reauthenticating (prior reauth not recovered)
2372    /// counts up but does NOT re-fire — re-firing would rotate the node key a second time and lose the
2373    /// original `OldNodeKey` anchor.
2374    #[test]
2375    fn still_reauthing_counts_but_does_not_refire() {
2376        let step = reauth_circuit_step(ExpiryAction::Reauthenticate, true, 1);
2377        assert_eq!(step.next, ReauthState::Reauthenticating);
2378        assert_eq!(
2379            step.attempts, 2,
2380            "a non-recovering reauth increments the counter"
2381        );
2382        assert!(
2383            !step.fire_reauth,
2384            "must NOT re-fire reauth while already reauthenticating (no second rotation)"
2385        );
2386    }
2387
2388    /// At `MAX_REAUTH_ATTEMPTS` consecutive non-recovering reauths, the breaker trips to the terminal
2389    /// `Expired` and stops re-arming reauth (the one-shot auth-key case settles instead of looping).
2390    #[test]
2391    fn trips_to_expired_at_cap() {
2392        // One step below the cap still stays reauthenticating.
2393        let below =
2394            reauth_circuit_step(ExpiryAction::Reauthenticate, true, MAX_REAUTH_ATTEMPTS - 2);
2395        assert_eq!(below.next, ReauthState::Reauthenticating);
2396        assert!(!below.fire_reauth);
2397
2398        // The step that reaches the cap flips to terminal Expired and does not fire.
2399        let at_cap =
2400            reauth_circuit_step(ExpiryAction::Reauthenticate, true, MAX_REAUTH_ATTEMPTS - 1);
2401        assert_eq!(at_cap.attempts, MAX_REAUTH_ATTEMPTS);
2402        assert_eq!(
2403            at_cap.next,
2404            ReauthState::Expired,
2405            "at the cap the circuit breaker trips to terminal Expired"
2406        );
2407        assert!(
2408            !at_cap.fire_reauth,
2409            "a tripped breaker never fires another reauth"
2410        );
2411    }
2412
2413    /// A good (non-expired) self-node resets the counter to 0 (the node recovered) — so a later,
2414    /// genuine expiry cycle gets a fresh full budget of attempts, never a stale leftover count.
2415    #[test]
2416    fn running_resets_counter() {
2417        let step = reauth_circuit_step(ExpiryAction::Running, true, MAX_REAUTH_ATTEMPTS);
2418        assert_eq!(step.next, ReauthState::Running);
2419        assert_eq!(
2420            step.attempts, 0,
2421            "recovery to Running resets the attempt counter"
2422        );
2423        assert!(!step.fire_reauth);
2424    }
2425
2426    /// The non-reauth terminal path (`expiry_action` → `Expired`: no auth key / reauth disabled /
2427    /// TKA-locked) reports `Expired` and never fires reauth; the counter is irrelevant (this path
2428    /// never armed the machine), so it resets to 0.
2429    #[test]
2430    fn expired_action_is_terminal_without_firing() {
2431        let step = reauth_circuit_step(ExpiryAction::Expired, false, 0);
2432        assert_eq!(step.next, ReauthState::Expired);
2433        assert!(!step.fire_reauth);
2434        assert_eq!(step.attempts, 0);
2435    }
2436
2437    /// The full episode as the handler drives it, feeding each step's `attempts` into the next: a
2438    /// one-shot auth key that never recovers fires reauth exactly ONCE, counts each repeated expired
2439    /// self-node, and settles on terminal `Expired` at the cap — never an indefinite `Reauthenticating`
2440    /// spell and never a second rotation.
2441    #[test]
2442    fn full_episode_one_shot_key_settles_at_expired_after_one_fire() {
2443        // Step 1: first expired self-node (state was Running → not already reauthing): enter + fire.
2444        let s1 = reauth_circuit_step(ExpiryAction::Reauthenticate, false, 0);
2445        assert_eq!(s1.next, ReauthState::Reauthenticating);
2446        assert!(s1.fire_reauth);
2447        let mut attempts = s1.attempts;
2448        let mut fires = 1; // counted the one fire
2449
2450        // Steps 2..: still expired while reauthenticating — count only, never re-fire — until the cap.
2451        loop {
2452            let s = reauth_circuit_step(ExpiryAction::Reauthenticate, true, attempts);
2453            if s.fire_reauth {
2454                fires += 1;
2455            }
2456            attempts = s.attempts;
2457            if s.next == ReauthState::Expired {
2458                break;
2459            }
2460            assert_eq!(s.next, ReauthState::Reauthenticating);
2461            assert!(attempts < MAX_REAUTH_ATTEMPTS);
2462        }
2463
2464        assert_eq!(attempts, MAX_REAUTH_ATTEMPTS, "settles exactly at the cap");
2465        assert_eq!(
2466            fires, 1,
2467            "reauth (and thus a node-key rotation) fires exactly once across the whole episode"
2468        );
2469    }
2470
2471    /// Recovery then a fresh failing episode: `Reauthenticating → Running` (reset) → a later expiry
2472    /// re-enters and re-fires. Proves the reset gives the next episode a full attempt budget rather
2473    /// than carrying a stale count that would trip the breaker early.
2474    #[test]
2475    fn recovery_then_new_episode_re_fires() {
2476        // Episode 1 entry.
2477        let e1 = reauth_circuit_step(ExpiryAction::Reauthenticate, false, 0);
2478        assert!(e1.fire_reauth);
2479        // A non-recovering step, then recovery to Running resets.
2480        let mid = reauth_circuit_step(ExpiryAction::Reauthenticate, true, e1.attempts);
2481        assert_eq!(mid.attempts, 2);
2482        let recovered = reauth_circuit_step(ExpiryAction::Running, true, mid.attempts);
2483        assert_eq!(recovered.attempts, 0);
2484
2485        // Episode 2: a fresh expiry (state is Running again → not already reauthing) re-enters + fires.
2486        let e2 = reauth_circuit_step(ExpiryAction::Reauthenticate, false, recovered.attempts);
2487        assert_eq!(e2.next, ReauthState::Reauthenticating);
2488        assert_eq!(e2.attempts, 1, "the new episode starts fresh at 1");
2489        assert!(
2490            e2.fire_reauth,
2491            "a new episode after recovery fires reauth again"
2492        );
2493    }
2494}
2495
2496#[cfg(test)]
2497mod self_lockout_tests {
2498    use ts_tka::{AumHash, Authority, State};
2499
2500    use super::{SelfLockVerdict, self_lock_verdict};
2501
2502    fn node_key() -> ts_keys::NodePublicKey {
2503        ts_keys::NodePrivateKey::random().public_key()
2504    }
2505
2506    /// An empty key-signature is the "not signed yet" case: `Unsigned`, never a lockout warning —
2507    /// so a tailnet that simply has not signed this node does not spam a `warn`.
2508    #[test]
2509    fn empty_signature_is_unsigned_not_locked_out() {
2510        let authority = Authority::from_state(AumHash([0; 32]), State::default());
2511        assert_eq!(
2512            self_lock_verdict(&node_key(), &[], &authority),
2513            SelfLockVerdict::Unsigned
2514        );
2515    }
2516
2517    /// A non-empty key-signature that does not authorize self classifies as `LockedOut` — the
2518    /// operator-facing condition — and the verdict carries the verify error string for the log. Here
2519    /// the blob is non-empty (so we attempt verification rather than short-circuiting to `Unsigned`)
2520    /// but is not a valid NodeKeySignature CBOR (`0x01` decodes as a bare uint with trailing bytes),
2521    /// so `node_key_authorized` returns a `Decode` error → `LockedOut`. The cryptographic-rejection
2522    /// arms (`UntrustedKey` / `BadSignature` for a well-formed-but-untrusted NKS) are covered by
2523    /// `ts_tka`'s own `node_key_authorized` tests; this only needs to prove the runtime classifier
2524    /// routes a verify `Err` to `LockedOut`.
2525    #[test]
2526    fn unverifiable_signature_is_locked_out() {
2527        let authority = Authority::from_state(AumHash([0; 32]), State::default());
2528        let verdict = self_lock_verdict(&node_key(), &[0x01, 0x02, 0x03], &authority);
2529        assert!(
2530            matches!(verdict, SelfLockVerdict::LockedOut(_)),
2531            "a signature the lock cannot authorize must classify as LockedOut, got {verdict:?}"
2532        );
2533    }
2534}