ts_runtime/control_runner.rs
1use core::{
2 net::{Ipv4Addr, Ipv6Addr},
3 time::Duration,
4};
5use std::{collections::HashMap, sync::Arc, time::Instant};
6
7use futures::StreamExt;
8use kameo::{
9 actor::{ActorRef, Spawn},
10 message::{Context, StreamMessage},
11 prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15 AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16 Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus, TkaSyncError, tka_disable,
17 tka_init_begin, tka_init_finish, tka_submit_signature,
18};
19use ts_magicsock::SelfEndpointType;
20
21use crate::{
22 derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
23 direct::EndpointAdvertisement,
24};
25
26/// Actor responsible for maintaining the connection to control.
27///
28/// This actor is responsible for proxying the map response stream onto the message bus.
29pub struct ControlRunner {
30 client: AsyncControlClient,
31 params: Params,
32
33 self_node: watch::Sender<Option<Node>>,
34 /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
35 /// server reads this to authorize incoming connections; absent policy means deny-all.
36 ssh_policy: watch::Sender<Option<SshPolicy>>,
37 /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
38 tka: watch::Sender<Option<TkaStatus>>,
39 /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
40 /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
41 /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
42 /// the result returns via the [`TkaSynced`] self-message).
43 tka_synced: Option<crate::tka_sync::SyncedTka>,
44 /// The verified TKA [`Authority`](ts_tka::Authority) the peer tracker **enforces** (Go
45 /// `tkaFilterNetmapLocked`). `None` until the first successful sync, and reset to `None` when the
46 /// lock is disabled. This is the SOLE delivery channel to the peer tracker (which holds the
47 /// matching `Receiver` and reads it on every peer upsert): a `watch` cell, not a bus message, so
48 /// the latest value is always readable, never dropped under load, and writes are strictly ordered
49 /// by this actor — a disable (`None`) can never be reordered behind or dropped before a stale
50 /// `Some`. Written only from [`apply_tka_synced`] (enable) and [`maybe_sync_tka`] (disable), both
51 /// on the actor thread. The published `Authority` has always passed `VerifiedAumChain::verify`.
52 tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
53 /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
54 /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
55 tka_syncing: bool,
56 /// Monotonic generation stamped when a disable (or a fresh sync) supersedes any in-flight sync.
57 /// `maybe_sync_tka` bumps this on a disable transition and captures it into each spawned sync;
58 /// [`apply_tka_synced`] discards a sync result whose captured generation is stale, so a lock
59 /// disabled *while a sync was in flight* is never re-enabled by that sync's late `Ok(Some)`
60 /// (the in-flight window the `tka_synced.is_some()` disable guard alone does not cover).
61 tka_generation: u64,
62 /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
63 /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
64 cert_domains: watch::Sender<Vec<String>>,
65 /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
66 /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
67 /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
68 /// own cell for the narrower TLS-cert use.
69 dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
70 /// Latest interactive-login / consent URL control asked this node to open
71 /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
72 /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user), and
73 /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) subscribes to it for the bus's
74 /// `browse_to_url` running-node events.
75 ///
76 /// **Sticky, not per-update** (Go `controlclient` `sess.lastPopBrowserURL`): control sends
77 /// `MapResponse.PopBrowserURL` empty on nearly every netmap tick, so this cell is updated ONLY on
78 /// a non-empty URL that differs from its current value (`sticky_update_pop_browser_url`, via
79 /// `send_if_modified` — the cell's own value is the "last URL seen", so no separate mirror is
80 /// needed). It is never reset to `None` by an empty update — matching Go's `direct.go` guard
81 /// `u != "" && u != sess.lastPopBrowserURL`. Updating on every tick would thrash the cell to
82 /// `None` and coalesce the URL away for a `watch` subscriber.
83 pop_browser_url: watch::Sender<Option<url::Url>>,
84 /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
85 /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
86 /// daemon's `tnet netcheck`). Empty until the first measurement.
87 netcheck: watch::Sender<crate::status::NetcheckReport>,
88 /// The DERP home region currently selected, with the latency measured for it at selection time.
89 /// `None` until the first home region is chosen. Used to apply selection **hysteresis** (Go
90 /// `netcheck.addReportHistoryAndSetPreferredDERP`): the home region is only switched when a new
91 /// region is *meaningfully* lower-latency than the current one, so jitter between near-equal
92 /// regions does not flap the home relay (which would cause repeated reconnects + brief loss).
93 home_region: Option<(ts_derp::RegionId, core::time::Duration)>,
94 /// Rolling history of per-cycle DERP-latency reports within the last [`DERP_HISTORY_MAX_AGE`]
95 /// (Go `netcheck` `maxAge = 5 * time.Minute`), each stamped with its arrival `Instant`. Feeds the
96 /// `bestRecent` smoothing (Go `addReportHistoryAndSetPreferredDERP`): the new home candidate is
97 /// chosen by each region's **minimum** latency over this window, not its raw current sample, so a
98 /// best region whose latency oscillates across the switch boundary does not flap the home relay.
99 /// Aged entries are evicted on each measurement; the buffer is therefore bounded by the netcheck
100 /// cadence × the window.
101 derp_report_history: Vec<(Instant, Arc<Vec<ts_netcheck::RegionResult>>)>,
102 /// Background task that bridges the control client's mid-session re-auth URL cell onto
103 /// [`Self::params`]'s device-state cell (sets [`DeviceState::NeedsLogin`] when control returns
104 /// `MachineNotAuthorized` on a live re-register — see [`bridge_reauth_url_to_state`]). Aborted on
105 /// [`Drop`] so it cannot outlive the actor (the [`DataplaneActor`](crate::dataplane) pattern).
106 reauth_bridge: tokio::task::JoinHandle<()>,
107}
108
109impl Drop for ControlRunner {
110 fn drop(&mut self) {
111 // Stop the re-auth bridge so it does not outlive the actor (mirrors `DataplaneActor`).
112 self.reauth_bridge.abort();
113 }
114}
115
116/// Control runner args.
117pub struct Params {
118 /// Control config.
119 pub(crate) config: ts_control::Config,
120
121 /// Auth key (if needed).
122 pub(crate) auth_key: Option<String>,
123
124 /// The [`crate::Env`] for this actor.
125 pub(crate) env: crate::Env,
126
127 /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
128 /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
129 /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
130 /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
131 pub(crate) state_tx: watch::Sender<crate::DeviceState>,
132
133 /// Sender for the TKA enforcement-authority cell the peer tracker reads (Go
134 /// `tkaFilterNetmapLocked`). Created in [`Runtime::spawn`](crate::Runtime) and threaded into BOTH
135 /// the peer tracker (the `Receiver`) and this runner (the `Sender`), so the runner is the sole
136 /// writer and the tracker reads the latest verified `Authority` on demand. `None` = no lock /
137 /// disabled (admit all).
138 pub(crate) tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
139}
140
141#[doc(hidden)]
142#[derive(Debug, thiserror::Error)]
143pub enum ControlRunnerError {
144 #[error(transparent)]
145 Control(#[from] ControlError),
146
147 #[error(transparent)]
148 Crate(#[from] crate::Error),
149}
150
151impl kameo::Actor for ControlRunner {
152 type Args = Params;
153 type Error = ControlRunnerError;
154
155 async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
156 loop {
157 match AsyncControlClient::check_auth(
158 ¶ms.config,
159 ¶ms.env.keys,
160 params.auth_key.as_deref(),
161 )
162 .await
163 {
164 Ok(()) => break,
165 Err(ControlError::MachineNotAuthorized(u)) => {
166 tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
167 // Surface "interactive login required" so a watcher / `wait_until_running` can
168 // tell the user to authorize, instead of seeing an opaque timeout. Registration
169 // keeps retrying (transient), so this is not a terminal `Failed`.
170 params
171 .state_tx
172 .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
173 tokio::time::sleep(Duration::from_secs(5)).await;
174 }
175 Err(e) => {
176 // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
177 // specific reason control gave AND publish it as a typed `Failed` state so
178 // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
179 // of the opaque `Internal(Actor)` the caller would otherwise see once the
180 // stopped actor is next asked. Publishing before `return Err` is why the state
181 // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
182 let reason = crate::RegistrationError::from(&e);
183 tracing::error!(error = %e, "registration failed; control runner stopping");
184 params
185 .state_tx
186 .send_replace(crate::DeviceState::Failed(reason));
187 return Err(e.into());
188 }
189 }
190 }
191 // check_auth succeeded, but the node is not "up" until the netmap stream is actually
192 // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
193 // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
194 // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
195 // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
196 // The control client's live map-poll loop publishes a mid-session re-auth URL here (set when
197 // a re-register returns `MachineNotAuthorized` because the node key expired/was revoked). The
198 // runtime owns the receiver; `connect` takes the sender. Created before `connect` so the
199 // sender is in place for the very first poll, and so the receiver outlives `bring_up`.
200 let (auth_url_tx, auth_url_rx) = watch::channel::<Option<url::Url>>(None);
201
202 let bring_up = async {
203 let (client, stream) = AsyncControlClient::connect(
204 ¶ms.config,
205 ¶ms.env.keys,
206 params.auth_key.as_deref(),
207 auth_url_tx,
208 )
209 .await?;
210
211 DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
212
213 params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
214 params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
215 slf.attach_stream(stream.boxed(), (), ());
216 Ok::<_, ControlRunnerError>(client)
217 };
218
219 let client = match bring_up.await {
220 Ok(client) => client,
221 Err(e) => {
222 tracing::error!(error = %e, "bringing up the control session failed");
223 // The control session never came up; surface it as a transient registration
224 // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
225 // stuck at `Connecting`.
226 params.state_tx.send_replace(crate::DeviceState::Failed(
227 crate::RegistrationError::NetworkUnreachable,
228 ));
229 return Err(e);
230 }
231 };
232
233 // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
234 // current (and flips to `Expired` if the self-node's key lapses).
235 params.state_tx.send_replace(crate::DeviceState::Running);
236
237 // Bridge the control client's mid-session re-auth URL cell onto the device-state cell: a
238 // `Some(url)` (control returned `MachineNotAuthorized` on a live re-register) becomes
239 // `DeviceState::NeedsLogin(url)` so the IPN bus surfaces `browse_to_url` and the embedder can
240 // prompt the user — the live-session analogue of the initial `check_auth` loop above. The
241 // recovery to `Running` is the netmap self-node handler's job (next good self-node), so this
242 // bridge only forwards `Some`. The task ends when the sender drops (the client's `run` task
243 // ended) and is aborted on actor `Drop`, so it cannot leak past the actor.
244 let reauth_bridge = {
245 let state_tx = params.state_tx.clone();
246 let mut auth_url_rx = auth_url_rx;
247 tokio::spawn(async move {
248 while auth_url_rx.changed().await.is_ok() {
249 let url = auth_url_rx.borrow_and_update().clone();
250 bridge_reauth_url_to_state(&state_tx, url.as_ref());
251 }
252 })
253 };
254
255 // Clone the TKA authority publisher before `params` moves into `Self` below. The matching
256 // `Receiver` lives on the peer tracker; this sender is the sole writer (enforce on sync,
257 // clear on disable).
258 let tka_authority = params.tka_authority.clone();
259
260 Ok(Self {
261 client,
262 params,
263 self_node: Default::default(),
264 ssh_policy: Default::default(),
265 tka: Default::default(),
266 tka_synced: None,
267 tka_authority,
268 tka_syncing: false,
269 tka_generation: 0,
270 cert_domains: Default::default(),
271 dns_config: Default::default(),
272 pop_browser_url: Default::default(),
273 netcheck: Default::default(),
274 home_region: None,
275 derp_report_history: Vec::new(),
276 reauth_bridge,
277 })
278 }
279}
280
281impl ControlRunner {
282 /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
283 /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
284 /// round-trip). The result returns via the [`TkaSynced`] self-message.
285 ///
286 /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
287 /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
288 /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
289 /// `tkaSyncIfNeeded`: a no-op when our head already matches.
290 fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
291 if !tka.is_enabled() {
292 // Lock disabled (or never enabled): clear enforcement by writing `None` to the authority
293 // cell the peer tracker reads — synchronously, so it can never be reordered behind or
294 // dropped before a stale `Some` (the failure a best-effort broadcast had). Always bump the
295 // generation so ANY sync currently in flight is invalidated: without this, a disable that
296 // races an in-flight sync (whose `take()` already cleared `tka_synced`) would be a no-op
297 // here, and the sync's late `Ok(Some)` would silently re-enable a lock control just turned
298 // off (the in-flight window the `tka_synced.is_some()` guard alone misses). Cheap and
299 // idempotent: clearing an already-`None` cell and bumping the generation are harmless.
300 self.tka_generation = self.tka_generation.wrapping_add(1);
301 if self.tka_synced.is_some() {
302 tracing::info!("TKA lock disabled; clearing enforcement (admitting all peers)");
303 self.tka_synced = None;
304 }
305 self.tka_authority.send_replace(None);
306 return;
307 }
308 if self.tka_syncing {
309 return; // a sync is already in flight; the next netmap will re-trigger if still stale
310 }
311 // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
312 // do. A malformed control head is treated as "different" (we'll attempt a sync, which
313 // fail-closes harmlessly).
314 if let Some(synced) = &self.tka_synced
315 && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
316 && synced.authority.head_matches(&control_head)
317 {
318 return;
319 }
320
321 // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
322 // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
323 // `tka_syncing` so we don't spawn a second concurrent sync. Capture the current generation so
324 // `apply_tka_synced` can discard this result if a disable bumped the generation while the sync
325 // was in flight (H1: don't re-enable a lock that was disabled mid-sync).
326 self.tka_syncing = true;
327 let generation = self.tka_generation;
328 let current = self.tka_synced.take();
329 let config = self.params.config.clone();
330 let keys = self.params.env.keys.clone();
331 tokio::spawn(async move {
332 let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
333 // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
334 // not allowed). A send failure just means the actor is gone — nothing to do.
335 if let Err(e) = self_ref.tell(TkaSynced { result, generation }).await {
336 tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
337 }
338 });
339 }
340
341 /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
342 /// state + publish the `Authority` to the peer tracker's enforcement cell (or, on inert/failed
343 /// sync, leave peers unaffected). Always clears the in-flight guard.
344 ///
345 /// `generation` is the value captured when the sync was spawned. If it no longer matches
346 /// `self.tka_generation`, the lock was disabled (or re-synced) while this sync was in flight, so
347 /// the result is discarded — never re-enabling an authority control has since turned off.
348 async fn apply_tka_synced(
349 &mut self,
350 result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
351 generation: u64,
352 ) {
353 self.tka_syncing = false;
354
355 // H1 guard: a disable (or a superseding sync) bumped the generation while this sync ran. Drop
356 // the stale result — `maybe_sync_tka`'s disable branch already cleared enforcement to `None`,
357 // and re-applying this `Some` would re-enforce a lock that is no longer active.
358 if generation != self.tka_generation {
359 tracing::info!(
360 "TKA sync result superseded (lock disabled or re-synced mid-flight); discarding"
361 );
362 return;
363 }
364
365 match result {
366 Ok(Some(synced)) => {
367 tracing::info!(
368 head = %synced.authority.head().to_base32(),
369 "TKA sync succeeded; enforcing verified Authority (Go tkaFilterNetmapLocked)"
370 );
371 // Deliver the verified Authority to the peer tracker's enforcement cell. The tracker
372 // reads it on every peer upsert and drops unauthorized peers. `Some(..)` = enforce; a
373 // `None` is written on disable. `watch` is the sole channel (last-write-wins, never
374 // dropped, ordered by this actor) — no bus, no re-publish-for-replay needed.
375 self.tka_authority
376 .send_replace(Some(synced.authority.clone()));
377
378 // Observability (Go `tkaFilterNetmapLocked`'s self check → `LockedOut` health
379 // warning): verify SELF's own node-key signature against the freshly-synced
380 // Authority and warn if self is NOT authorized. We never FILTER self (self never
381 // enters the peer db, so enforcement can't lock us out of our own netmap), but Go
382 // raises an operator-facing warning here because a self that the lock does not
383 // authorize means this node's key-signature is missing/invalid for the current lock
384 // — it will be unable to prove itself to locked peers. This fork has no health
385 // subsystem, so the signal is a `tracing::warn!` (its observability channel).
386 //
387 // `self_node` is a sticky cell set on every netmap carrying a self-node; if a sync
388 // somehow lands before the first self-node ever arrived it is `None`, so we skip the
389 // advisory this cycle and re-evaluate on the next sync — fine for observability-only.
390 // The `borrow()` ref is scoped to this `if let` and dropped before the `&mut self`
391 // write below.
392 if let Some(self_node) = self.self_node.borrow().as_ref() {
393 log_self_lockout(self_node, &synced.authority);
394 }
395
396 self.tka_synced = Some(synced);
397 }
398 Ok(None) => {
399 // Control has no lock for us (no genesis / disabled). Clear any authority we were
400 // previously enforcing — symmetric with the disable path — so a transition to
401 // "no lock" stops dropping peers. Not an error.
402 if self.tka_synced.is_some() {
403 tracing::info!("TKA sync: control reports no lock; clearing enforcement");
404 self.tka_synced = None;
405 }
406 self.tka_authority.send_replace(None);
407 }
408 Err(e) => {
409 // Transport or verify failure: log and leave the prior authority in place (a failed
410 // sync must not drop enforcement — that would fail OPEN). NEVER errors the netmap.
411 // The next netmap update re-triggers a sync attempt.
412 tracing::warn!(error = %e, "TKA sync failed; keeping prior enforcement state");
413 }
414 }
415 }
416
417 fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
418 where
419 F: FnOnce(&Node) -> R,
420 {
421 let mut sub = self.self_node.subscribe();
422 let mut shutdown = self.params.env.shutdown.clone();
423
424 async move {
425 tokio::select! {
426 _ = shutdown.wait_for(|x| *x) => {
427 None
428 },
429 node = sub.wait_for(Option::is_some) => {
430 Some(f(node.ok()?.as_ref()?))
431 },
432 }
433 }
434 }
435}
436
437/// Apply Go's sticky `PopBrowserURL` semantics to the consent-URL `watch` cell.
438///
439/// Control sends `MapResponse.PopBrowserURL` empty on nearly every netmap update, so the cell is
440/// updated ONLY when `incoming` is a non-empty URL that differs from the cell's current value —
441/// Go's `direct.go` guard `u != "" && u != sess.lastPopBrowserURL`. The cell is **never reset to
442/// `None`** by an empty/absent update — the running-node consent URL is sticky for the session.
443/// Updating unconditionally would thrash the cell to `None` on every tick and coalesce the URL away
444/// for a `watch`/bus subscriber.
445///
446/// The dedupe is in-place via [`watch::Sender::send_if_modified`] — the cell's own value is the
447/// "last URL sent" (this sticky path is its only writer), so no separate mirror field is needed and
448/// the watch is woken only on a genuine change (Go's `sess.lastPopBrowserURL` role, for free). This
449/// matches the [`send_if_modified`](watch::Sender::send_if_modified) idiom already used for the
450/// device-state cell in this handler.
451///
452/// Factored out of the netmap-update handler so the (easy-to-regress) sticky logic is unit-testable
453/// against a plain `watch` channel without standing up the actor.
454fn sticky_update_pop_browser_url(
455 cell: &watch::Sender<Option<url::Url>>,
456 incoming: Option<&url::Url>,
457) {
458 if let Some(url) = incoming {
459 cell.send_if_modified(|current| {
460 if current.as_ref() == Some(url) {
461 false
462 } else {
463 *current = Some(url.clone());
464 true
465 }
466 });
467 }
468}
469
470/// Map a mid-session re-auth URL surfaced by the control client onto the device-state cell.
471///
472/// The control client's live map-poll loop publishes an `Option<url::Url>` into a `watch` cell when
473/// a re-register hits `MachineNotAuthorized` (the node key expired/was revoked mid-session — see
474/// [`ts_control::AsyncControlClient::connect`]'s `auth_url_tx`). `ts_control` cannot name
475/// [`DeviceState`] (it must not depend on this crate), so this bridge fn does the translation:
476/// a `Some(url)` sets [`DeviceState::NeedsLogin`]`(url)` so the IPN bus derives `browse_to_url` and
477/// the embedder can prompt the user, exactly like the initial-registration `check_auth` path.
478///
479/// **Only `Some` drives a transition; `None` is ignored here.** The clear back to
480/// [`DeviceState::Running`] is owned by the netmap self-node handler (the next good self-node flips
481/// it — see the `StreamMessage::Next` arm), which is the authoritative "we are up again" signal; an
482/// independent `None`-clear in this bridge could race that and is unnecessary. The
483/// [`send_if_modified`](watch::Sender::send_if_modified) guard fires the watch only on a genuine
484/// state change (it is a no-op when the cell already holds `NeedsLogin(url)` for the same URL), so a
485/// re-auth URL re-surfaced across retries does not thrash the cell — mirroring the device-state
486/// dedupe in the netmap handler.
487///
488/// Factored out so the (regress-prone) map-and-guard is unit-testable against a plain `watch`
489/// channel without standing up the actor (mirrors [`sticky_update_pop_browser_url`]).
490pub(crate) fn bridge_reauth_url_to_state(
491 state_tx: &watch::Sender<crate::DeviceState>,
492 incoming: Option<&url::Url>,
493) {
494 if let Some(url) = incoming {
495 let next = crate::DeviceState::NeedsLogin(url.clone());
496 state_tx.send_if_modified(|current| {
497 if *current == next {
498 false
499 } else {
500 *current = next.clone();
501 true
502 }
503 });
504 }
505}
506
507/// The classification of SELF against the active network lock — the observability analog of Go
508/// `tkaFilterNetmapLocked`'s self check (which raises a `LockedOut` health warning).
509#[derive(Debug, Clone, PartialEq, Eq)]
510enum SelfLockVerdict {
511 /// Self carries no key-signature at all (empty). The common "not signed yet" case: the node
512 /// simply has not been signed for this lock — not locked out, just unsigned.
513 Unsigned,
514 /// Self's key-signature is authorized by the active lock; nothing to warn about.
515 Authorized,
516 /// Self has a key-signature but the lock does NOT authorize it (the message is the verify
517 /// error). The operator-facing `LockedOut` condition: locked peers will reject this node.
518 LockedOut(String),
519}
520
521/// Classify a node key + its key-signature against `authority` (pure: verify-and-classify, no
522/// logging, no I/O). Takes only the two fields it needs — not the whole `Node` — so the decision is
523/// unit-testable without constructing a full `Node` or standing up the actor.
524fn self_lock_verdict(
525 node_key: &ts_keys::NodePublicKey,
526 key_signature: &[u8],
527 authority: &ts_tka::Authority,
528) -> SelfLockVerdict {
529 // Mirror the peer path (`peer_tracker` `tka_snapshot_admits`): treat an empty signature as
530 // "unsigned" rather than the `LockedOut` bucket Go's `NodeKeyAuthorized` would put a nil sig in
531 // (it errors at decode). This is a deliberate, narrow divergence from a literal Go port: it
532 // avoids `warn`-spam on a lock that simply has not signed this node yet, and keeps self and peer
533 // classification consistent.
534 if key_signature.is_empty() {
535 return SelfLockVerdict::Unsigned;
536 }
537 match authority.node_key_authorized(&node_key.to_bytes(), key_signature) {
538 Ok(()) => SelfLockVerdict::Authorized,
539 Err(e) => SelfLockVerdict::LockedOut(e.to_string()),
540 }
541}
542
543/// Emit the self-locked-out observability signal (Go `tkaFilterNetmapLocked`'s self check → a
544/// `LockedOut` health warning): classify SELF against the freshly-synced `authority` and log.
545///
546/// This is **observability, not enforcement** — self never enters the peer db, so the lock can never
547/// filter our own node out of the netmap. But a self the lock does not authorize means this node's
548/// key-signature is absent or invalid for the active lock, so it cannot prove itself to locked peers
549/// (they will drop it); surfacing that lets an operator notice and re-sign. A never-signed node
550/// (empty signature) logs at `info`, distinct from a present-but-invalid signature (`warn`), so the
551/// common unsigned case does not spam a warning. This fork has no health subsystem, so the operator
552/// signal is a `tracing` event (its observability channel).
553fn log_self_lockout(self_node: &Node, authority: &ts_tka::Authority) {
554 match self_lock_verdict(&self_node.node_key, &self_node.key_signature, authority) {
555 SelfLockVerdict::Unsigned => tracing::info!(
556 "TKA: this node has no key-signature for the active lock; it cannot prove itself to \
557 locked peers until control signs it (not locked out, just unsigned)"
558 ),
559 SelfLockVerdict::Authorized => {
560 tracing::debug!("TKA: self node-key is authorized by the active lock")
561 }
562 SelfLockVerdict::LockedOut(error) => tracing::warn!(
563 %error,
564 "TKA self locked out: this node's key-signature is not authorized by the active \
565 network lock; locked peers will reject it until control re-signs this node \
566 (Go LockedOut)"
567 ),
568 }
569}
570
571// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
572// those generated fields carry no doc and can't take attributes, so wrap in a module where
573// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
574// are re-exported so callers keep referencing them at `control_runner::<Name>`.
575pub use msg_impl::*;
576
577#[allow(missing_docs)]
578mod msg_impl {
579 use kameo::{message::Context, reply::DelegatedReply};
580
581 use super::*;
582
583 #[kameo::messages]
584 impl ControlRunner {
585 /// Fetch the IPv4 address for this tailscale device.
586 #[message(ctx)]
587 pub fn ipv4(
588 &self,
589 ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
590 ) -> DelegatedReply<Option<Ipv4Addr>> {
591 let (deleg, replier) = ctx.reply_sender();
592
593 if let Some(replier) = replier {
594 let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
595
596 tokio::spawn(async move {
597 let ip = fut.await;
598 replier.send(ip);
599 });
600 }
601
602 deleg
603 }
604
605 /// Fetch the IPv6 address for this tailscale device.
606 #[message(ctx)]
607 pub fn ipv6(
608 &self,
609 ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
610 ) -> DelegatedReply<Option<Ipv6Addr>> {
611 let (deleg, replier) = ctx.reply_sender();
612
613 if let Some(replier) = replier {
614 let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
615
616 tokio::spawn(async move {
617 let ip = fut.await;
618 replier.send(ip);
619 });
620 }
621
622 deleg
623 }
624
625 /// Fetch the self node for this tailscale device.
626 #[message(ctx)]
627 pub fn self_node(
628 &self,
629 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
630 ) -> DelegatedReply<Option<Node>> {
631 let (deleg, replier) = ctx.reply_sender();
632
633 if let Some(replier) = replier {
634 let node = self.with_self_node(|node| node.clone());
635
636 tokio::spawn(async move {
637 let node = node.await;
638 replier.send(node)
639 });
640 }
641
642 deleg
643 }
644
645 /// Fetch the current Tailscale SSH policy, if control has pushed one.
646 ///
647 /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
648 /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
649 /// for a value: an absent policy is a legitimate, immediate answer.
650 #[message]
651 pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
652 self.ssh_policy.borrow().clone()
653 }
654
655 /// Fetch the current Tailnet Lock status, if control has pushed one.
656 ///
657 /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
658 #[message]
659 pub fn current_tka_status(&self) -> Option<TkaStatus> {
660 self.tka.borrow().clone()
661 }
662
663 /// Sign `node_key` directly with this node's network-lock key and submit the signature to
664 /// control (Go `tka.sign` for the Direct case → `tkaSubmitSignature`).
665 ///
666 /// Builds a `Direct` [`NodeKeySignature`](ts_tka::NodeKeySignature) via
667 /// [`sign_direct`](ts_tka::NodeKeySignature::sign_direct) over this node's inner ed25519
668 /// network-lock signing key, serializes it (raw CBOR), and POSTs it to `/machine/tka/sign`.
669 /// Mirrors `set_dns`/`get_certificate`: clones the control config + node keys into a spawned
670 /// task (delegated reply, so the round-trip doesn't block the mailbox) over a fresh Noise
671 /// channel.
672 ///
673 /// **Posture: this only *submits* a signature to control — it does NOT mutate the local
674 /// [`Authority`](ts_tka::Authority).** The local trusted-key state advances solely through the
675 /// existing verified-sync path (`sync_tka` → `VerifiedAumChain::verify`); a `tka_sign` success
676 /// is acknowledged to the caller, and the resulting AUM is picked up on the next netmap-driven
677 /// sync. Verify-and-log is unchanged.
678 #[message(ctx)]
679 pub fn tka_sign(
680 &self,
681 ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
682 node_key: [u8; 32],
683 ) -> DelegatedReply<Result<(), TkaSyncError>> {
684 let (deleg, replier) = ctx.reply_sender();
685
686 if let Some(replier) = replier {
687 let config = self.params.config.clone();
688 let keys = self.params.env.keys.clone();
689 tokio::spawn(async move {
690 // Sign the node key with our network-lock key, then submit the raw-CBOR NKS.
691 let nks = ts_tka::NodeKeySignature::sign_direct(
692 &node_key,
693 &keys.network_lock_keys.private.signing_key(),
694 );
695 let req = ts_control::TkaSubmitSignatureRequest {
696 // node_key + version are stamped by the RPC client from `keys`.
697 version: Default::default(),
698 node_key: keys.node_keys.public,
699 signature: nks.serialize(),
700 };
701 let result = tka_submit_signature(
702 &config.server_url,
703 &keys,
704 req,
705 config.allow_http_key_fetch,
706 )
707 .await
708 .map(|_response| ());
709 replier.send(result);
710 });
711 }
712
713 deleg
714 }
715
716 /// Disable Tailnet Lock by presenting the disablement secret to control (Go
717 /// `tka.disable` → `/machine/tka/disable`).
718 ///
719 /// Targets the **current** authority head (read from the cached [`TkaStatus`]); the caller
720 /// supplies the `disablement_secret` out of band (it is the operator-held capability that
721 /// authorizes turning the lock off). Mirrors `tka_sign`: clones config + keys into a spawned
722 /// task (delegated reply). Returns [`TkaSyncError::Unsupported`] when there is no known TKA
723 /// head (lock not in use / control hasn't pushed a status), since there is nothing to disable.
724 ///
725 /// **Submit-only, like `tka_sign`:** this POSTs the disablement to control and does NOT mutate
726 /// the local [`Authority`](ts_tka::Authority). Control acts on the disablement; this node
727 /// observes the result through the existing verified-sync path. Verify-and-log unchanged.
728 #[message(ctx)]
729 pub fn tka_disable(
730 &self,
731 ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
732 disablement_secret: Vec<u8>,
733 ) -> DelegatedReply<Result<(), TkaSyncError>> {
734 let (deleg, replier) = ctx.reply_sender();
735
736 if let Some(replier) = replier {
737 // Read the current head from the cached status BEFORE the spawn (can't borrow &self
738 // across the await). No head ⇒ no lock to disable ⇒ Unsupported.
739 let head = self.tka.borrow().as_ref().map(|s| s.head.clone());
740 let config = self.params.config.clone();
741 let keys = self.params.env.keys.clone();
742 tokio::spawn(async move {
743 let result = match head {
744 Some(head) => {
745 let req = ts_control::TkaDisableRequest {
746 // node_key + version are stamped by the RPC client from `keys`.
747 version: Default::default(),
748 node_key: keys.node_keys.public,
749 head,
750 disablement_secret,
751 };
752 tka_disable(&config.server_url, &keys, req, config.allow_http_key_fetch)
753 .await
754 .map(|_response| ())
755 }
756 None => Err(TkaSyncError::Unsupported),
757 };
758 replier.send(result);
759 });
760 }
761
762 deleg
763 }
764
765 /// Initialize Tailnet Lock with this node as the sole initial trusted key, gated by
766 /// `disablement_secret` (Go `LocalClient.NetworkLockInit` — the "lock yourself in" case).
767 ///
768 /// Builds + signs a genesis Checkpoint AUM whose only trusted key is this node's network-lock
769 /// public key (votes 1) and whose single DisablementValue is `disablement_value(secret)`, then
770 /// drives the two-phase init: `tka/init/begin` (submit the genesis) → if control needs no
771 /// further node signatures (`NeedSignatures` empty, the case when this node is the only key) →
772 /// `tka/init/finish` carrying the raw `disablement_secret` as `SupportDisablement`. Mirrors
773 /// `tka_sign`/`tka_disable`: cloned config + keys into a spawned task (delegated reply).
774 ///
775 /// If control returns a non-empty `NeedSignatures` (other nodes must be re-signed under the new
776 /// lock — a multi-node tailnet), this returns [`TkaSyncError::Unsupported`]: re-signing each
777 /// listed node (incl. the Rotation-key case) is a larger flow deferred to a fuller
778 /// `tka_init(keys, secrets)` — the single-node lock-init is the shipped subset.
779 ///
780 /// **Submit-only**, like `tka_sign`/`tka_disable`: this creates the lock at control and does
781 /// NOT seed the local [`Authority`](ts_tka::Authority) — the node picks up the new lock through
782 /// the existing verified netmap-sync (control pushes a `TKAInfo`, `maybe_sync_tka` bootstraps
783 /// the genesis through `VerifiedAumChain::verify`). Verify-and-log posture unchanged.
784 #[message(ctx)]
785 pub fn tka_init(
786 &self,
787 ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
788 disablement_secret: Vec<u8>,
789 ) -> DelegatedReply<Result<(), TkaSyncError>> {
790 let (deleg, replier) = ctx.reply_sender();
791
792 if let Some(replier) = replier {
793 let config = self.params.config.clone();
794 let keys = self.params.env.keys.clone();
795 tokio::spawn(async move {
796 let result = tka_init_run(&config, &keys, disablement_secret).await;
797 replier.send(result);
798 });
799 }
800
801 deleg
802 }
803
804 /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
805 ///
806 /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
807 /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
808 /// does not block waiting for a value).
809 #[message]
810 pub fn cert_domains(&self) -> Vec<String> {
811 self.cert_domains.borrow().clone()
812 }
813
814 /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
815 /// control has sent no DNS config yet. An immediate answer (does not block); the facade
816 /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
817 #[message]
818 pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
819 self.dns_config.borrow().clone()
820 }
821
822 /// The interactive-login / consent URL control last asked this node to open
823 /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
824 /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
825 #[message]
826 pub fn pop_browser_url(&self) -> Option<url::Url> {
827 self.pop_browser_url.borrow().clone()
828 }
829
830 /// Subscribe to the interactive-login / consent URL cell (`MapResponse.PopBrowserURL`).
831 ///
832 /// Returns a [`watch::Receiver`] whose value is the latest running-node consent URL, used by
833 /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) to surface `browse_to_url`
834 /// events mid-session. The cell is sticky (updated only on a new non-empty URL, never reset
835 /// to `None` by an empty update — see the field docs), so a subscriber is not thrashed and a
836 /// late subscriber sees the current URL. The initial value is `None` until control sends one.
837 #[message(derive(Clone))]
838 pub fn watch_browser_url(&self) -> watch::Receiver<Option<url::Url>> {
839 self.pop_browser_url.subscribe()
840 }
841
842 /// The latest network-conditions report (preferred DERP region + per-region latencies). An
843 /// immediate answer (does not block); empty before the first DERP-latency measurement. The
844 /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
845 #[message]
846 pub fn netcheck(&self) -> crate::status::NetcheckReport {
847 self.netcheck.borrow().clone()
848 }
849
850 /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
851 ///
852 /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
853 /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
854 /// for the round-trip.
855 #[message(ctx)]
856 pub fn fetch_id_token(
857 &self,
858 ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
859 audience: String,
860 ) -> DelegatedReply<Result<String, IdTokenError>> {
861 let (deleg, replier) = ctx.reply_sender();
862
863 if let Some(replier) = replier {
864 let config = self.params.config.clone();
865 let keys = self.params.env.keys.clone();
866 tokio::spawn(async move {
867 let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
868 replier.send(result);
869 });
870 }
871
872 deleg
873 }
874
875 /// Log this node out of the tailnet: deregister it by expiring its current node key.
876 ///
877 /// Mirrors `fetch_id_token`: clones the control config + node keys
878 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
879 /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
880 /// control-plane state change only — it does NOT stop this actor or tear down the datapath
881 /// (the caller follows up with the normal runtime shutdown), and it does not touch the
882 /// on-disk node key, so re-registering with the same key is the re-login path.
883 #[message(ctx)]
884 pub fn logout(
885 &self,
886 ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
887 ) -> DelegatedReply<Result<(), LogoutError>> {
888 let (deleg, replier) = ctx.reply_sender();
889
890 if let Some(replier) = replier {
891 let config = self.params.config.clone();
892 let keys = self.params.env.keys.clone();
893 tokio::spawn(async move {
894 let result = ts_control::logout(&config, &keys).await;
895 replier.send(result);
896 });
897 }
898
899 deleg
900 }
901
902 /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
903 /// `LocalClient.SetDNS`).
904 ///
905 /// Mirrors `fetch_id_token`: clones the control config + node keys
906 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
907 /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
908 /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
909 /// match, so the surfaced API takes only `name` + `value`.
910 #[message(ctx)]
911 pub fn set_dns(
912 &self,
913 ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
914 name: String,
915 value: String,
916 ) -> DelegatedReply<Result<(), SetDnsError>> {
917 let (deleg, replier) = ctx.reply_sender();
918
919 if let Some(replier) = replier {
920 let config = self.params.config.clone();
921 let keys = self.params.env.keys.clone();
922 tokio::spawn(async move {
923 let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
924 replier.send(result);
925 });
926 }
927
928 deleg
929 }
930 }
931
932 /// The reply type of the [`get_cert_pair`](ControlRunner::get_cert_pair) message: the issued
933 /// `(cert_chain_pem, key_pem)` PEM pair (the `tnet cert` surface) or a [`ts_control::CertError`].
934 /// Aliased so the message's `Context` type stays under clippy's `type_complexity` bar (the
935 /// nested `Result<(String, String), _>` trips it inline).
936 #[cfg(feature = "acme")]
937 pub type CertPairReply = Result<(String, String), ts_control::CertError>;
938
939 // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
940 // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
941 // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
942 // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
943 // block keeps the default build clean.
944 #[cfg(feature = "acme")]
945 #[kameo::messages]
946 impl ControlRunner {
947 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
948 /// client-side ACME DNS-01 engine (`acme` feature).
949 ///
950 /// Mirrors `fetch_id_token`: clones the control config + node keys
951 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
952 /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
953 /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
954 ///
955 /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
956 /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
957 /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
958 /// allows it). Persisting a generated key back into the key file is the embedder's job (no
959 /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
960 /// publish 501s.
961 #[message(ctx)]
962 pub fn get_certificate(
963 &self,
964 ctx: &mut Context<
965 Self,
966 DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
967 >,
968 name: String,
969 ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
970 let (deleg, replier) = ctx.reply_sender();
971
972 if let Some(replier) = replier {
973 let config = self.params.config.clone();
974 let keys = self.params.env.keys.clone();
975 tokio::spawn(async move {
976 let result = issue_certificate(&config, &keys, &name).await;
977 replier.send(result);
978 });
979 }
980
981 deleg
982 }
983
984 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` and return the
985 /// **PEM pair** — `(cert_chain_pem, key_pem)` — for writing the on-disk `.crt` + `.key`
986 /// (the daemon's `tnet cert`, Go's `LocalClient.CertPair`). `acme` feature.
987 ///
988 /// Identical issuance to [`get_certificate`](Self::get_certificate) (same client-side ACME
989 /// DNS-01 flow, same set-dns publish, same account-key handling), only the *shape* of the
990 /// result differs: this surfaces the raw chain + leaf-key PEMs instead of the opaque
991 /// [`CertifiedKey`](ts_control::tls::CertifiedKey). The leaf **private key** PEM is the
992 /// second tuple element and is NEVER logged — the spawned task sends it straight back to the
993 /// replier. SaaS-only: against a self-hosted control plane the set-dns publish 501s.
994 #[message(ctx)]
995 pub fn get_cert_pair(
996 &self,
997 ctx: &mut Context<Self, DelegatedReply<CertPairReply>>,
998 name: String,
999 ) -> DelegatedReply<CertPairReply> {
1000 let (deleg, replier) = ctx.reply_sender();
1001
1002 if let Some(replier) = replier {
1003 let config = self.params.config.clone();
1004 let keys = self.params.env.keys.clone();
1005 tokio::spawn(async move {
1006 let result = issue_cert_pair(&config, &keys, &name).await;
1007 replier.send(result);
1008 });
1009 }
1010
1011 deleg
1012 }
1013 }
1014}
1015
1016/// The `tka_init` body (the genesis-build + two-phase init/begin→init/finish choreography),
1017/// factored out of the actor handler so it runs in the spawned task. See [`ControlRunner::tka_init`].
1018///
1019/// "Lock yourself in": the genesis trusts only this node's network-lock key (votes 1) and stores one
1020/// DisablementValue = `disablement_value(secret)`. On a non-empty `NeedSignatures` (multi-node
1021/// tailnet needing re-signs) it returns [`TkaSyncError::Unsupported`] — the single-node subset.
1022async fn tka_init_run(
1023 config: &ts_control::Config,
1024 keys: &ts_keys::NodeState,
1025 disablement_secret: Vec<u8>,
1026) -> Result<(), TkaSyncError> {
1027 // Build the genesis: this node's NL public key as the sole trusted key, one disablement value.
1028 let nl_public = keys.network_lock_keys.public.to_bytes().to_vec();
1029 let genesis_key = ts_tka::AumKey {
1030 kind: ts_tka::KeyKind::Ed25519,
1031 votes: 1,
1032 public: nl_public,
1033 meta: Vec::new(),
1034 };
1035 let dvalue = ts_tka::disablement_value(&disablement_secret).to_vec();
1036 let mut genesis = ts_tka::Aum::new_genesis_checkpoint(vec![genesis_key], vec![dvalue])
1037 // A malformed genesis is a local construction bug, not a transient RPC failure — surface it as a
1038 // coarse internal error rather than NetworkError (which would invite a pointless retry).
1039 .map_err(|_| TkaSyncError::Internal(ts_control::TkaSyncInternalErrorKind::SerDe))?;
1040 genesis.sign(&keys.network_lock_keys.private.signing_key());
1041
1042 // Phase 1: submit the genesis. node_key + version are stamped by the RPC client from `keys`.
1043 let begin_req = ts_control::TkaInitBeginRequest {
1044 version: Default::default(),
1045 node_key: keys.node_keys.public,
1046 genesis_aum: genesis.serialize(),
1047 };
1048 let begin_resp = tka_init_begin(
1049 &config.server_url,
1050 keys,
1051 begin_req,
1052 config.allow_http_key_fetch,
1053 )
1054 .await?;
1055
1056 // Single-node case only: control must need no further node signatures. A non-empty
1057 // NeedSignatures means other nodes must be re-signed under the new lock — deferred.
1058 if !begin_resp.need_signatures.is_empty() {
1059 tracing::warn!(
1060 need = begin_resp.need_signatures.len(),
1061 "tka_init: control requires re-signing other nodes; the multi-node init is not yet \
1062 implemented (single-node lock-init only)"
1063 );
1064 return Err(TkaSyncError::Unsupported);
1065 }
1066
1067 // Phase 2: finish, carrying the raw disablement secret as SupportDisablement (Go sends the raw
1068 // secret here; only the genesis stores its Argon2i hash).
1069 let finish_req = ts_control::TkaInitFinishRequest {
1070 version: Default::default(),
1071 node_key: keys.node_keys.public,
1072 signatures: std::collections::BTreeMap::new(),
1073 support_disablement: disablement_secret,
1074 };
1075 tka_init_finish(
1076 &config.server_url,
1077 keys,
1078 finish_req,
1079 config.allow_http_key_fetch,
1080 )
1081 .await
1082 .map(|_response| ())
1083}
1084
1085/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
1086/// returning just the ready-to-serve [`CertifiedKey`](ts_control::tls::CertifiedKey) (the
1087/// `get_certificate` / `ListenTLS` path).
1088///
1089/// Thin wrapper over [`issue_cert_pair`] that drops the PEMs — one issuance, this caller just
1090/// doesn't need the on-disk pair. See [`issue_cert_pair`] for the account-key handling.
1091#[cfg(feature = "acme")]
1092async fn issue_certificate(
1093 config: &ts_control::Config,
1094 keys: &ts_keys::NodeState,
1095 name: &str,
1096) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
1097 issue_cert_pair_inner(config, keys, name)
1098 .await
1099 .map(|issued| issued.certified)
1100}
1101
1102/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
1103/// returning the **PEM pair** `(cert_chain_pem, key_pem)` for the daemon's on-disk `.crt`/`.key`
1104/// (`tnet cert`, Go `LocalClient.CertPair`).
1105///
1106/// Same single issuance as [`issue_certificate`]; only the result shape differs. The leaf
1107/// **private key** PEM is the second element and is NEVER logged here.
1108#[cfg(feature = "acme")]
1109async fn issue_cert_pair(
1110 config: &ts_control::Config,
1111 keys: &ts_keys::NodeState,
1112 name: &str,
1113) -> Result<(String, String), ts_control::CertError> {
1114 issue_cert_pair_inner(config, keys, name)
1115 .await
1116 .map(|issued| (issued.cert_chain_pem, issued.key_pem))
1117}
1118
1119/// Shared issuance core for [`issue_certificate`] and [`issue_cert_pair`]: load (or generate) the
1120/// ACME account key, target Let's Encrypt production, and run one DNS-01 issuance, returning the
1121/// full [`IssuedCert`](ts_control::acme::IssuedCert) so each caller projects out what it needs (one
1122/// ACME order, two consumers).
1123///
1124/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
1125/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
1126/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
1127/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]). Never logs the leaf
1128/// private key.
1129#[cfg(feature = "acme")]
1130async fn issue_cert_pair_inner(
1131 config: &ts_control::Config,
1132 keys: &ts_keys::NodeState,
1133 name: &str,
1134) -> Result<ts_control::acme::IssuedCert, ts_control::CertError> {
1135 let account_key = match keys.acme_account_key.as_deref() {
1136 Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
1137 None => {
1138 tracing::debug!(
1139 "no persisted ACME account key in key state; generating an ephemeral per-call key \
1140 (a new ACME account this issuance — not persisted back)"
1141 );
1142 ts_control::acme::AcmeAccountKey::generate()?.0
1143 }
1144 };
1145 let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
1146 .parse()
1147 .map_err(|e| {
1148 ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
1149 })?;
1150 ts_control::issue_cert_pair_via_setdns(config, keys, name, &account_key, &directory).await
1151}
1152
1153impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
1154 type Reply = ();
1155
1156 async fn handle(
1157 &mut self,
1158 msg: StreamMessage<Arc<StateUpdate>, (), ()>,
1159 ctx: &mut Context<Self, Self::Reply>,
1160 ) {
1161 match msg {
1162 StreamMessage::Started(_) => {
1163 tracing::trace!("started listening to state updates");
1164 }
1165
1166 StreamMessage::Next(msg) => {
1167 if let Some(node) = msg.node.as_ref() {
1168 // Reflect node-key expiry into the device state: control delivering a self-node
1169 // whose key is in the past means the node must re-authenticate. Otherwise the
1170 // arrival of a fresh self-node confirms we are Running (recovers the state if a
1171 // prior update had flipped it to Expired).
1172 let now_unix = std::time::SystemTime::now()
1173 .duration_since(std::time::UNIX_EPOCH)
1174 .map(|d| d.as_secs() as i64)
1175 .unwrap_or(0);
1176 let next = if node.key_expired_at_unix(now_unix) {
1177 crate::DeviceState::Expired
1178 } else {
1179 crate::DeviceState::Running
1180 };
1181 // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
1182 // self-node arrives on every netmap update).
1183 self.params.state_tx.send_if_modified(|s| {
1184 if *s != next {
1185 *s = next.clone();
1186 true
1187 } else {
1188 false
1189 }
1190 });
1191
1192 self.self_node.send_replace(Some(node.clone()));
1193 }
1194
1195 if let Some(policy) = msg.ssh_policy.as_ref() {
1196 self.ssh_policy.send_replace(Some(policy.clone()));
1197 }
1198
1199 if let Some(tka) = msg.tka.as_ref() {
1200 self.tka.send_replace(Some(tka.clone()));
1201 self.maybe_sync_tka(tka, ctx.actor_ref().clone());
1202 }
1203
1204 // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
1205 // An update with no DNS config, or one carrying no cert domains, means "none" — Go
1206 // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
1207 let cert_domains = msg
1208 .dns_config
1209 .as_ref()
1210 .map(|d| d.cert_domains.clone())
1211 .unwrap_or_default();
1212 self.cert_domains.send_replace(cert_domains);
1213
1214 // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
1215 // `None` when control sent no DNS config on this update — distinct from a present but
1216 // empty config (Go `netmap.NetworkMap.DNS`).
1217 self.dns_config.send_replace(msg.dns_config.clone());
1218
1219 // Track the interactive-login URL for `Device::pop_browser_url` /
1220 // `Runtime::watch_ipn_bus`. See `sticky_update_pop_browser_url` for the Go-faithful
1221 // sticky semantics (update only on a new non-empty URL; never reset to `None`).
1222 sticky_update_pop_browser_url(&self.pop_browser_url, msg.pop_browser_url.as_ref());
1223
1224 if let Err(e) = self.params.env.publish(msg).await {
1225 tracing::error!(error = %e, "publishing netmap update");
1226 }
1227 }
1228
1229 StreamMessage::Finished(_) => {
1230 tracing::error!("state update stream terminated")
1231 }
1232 }
1233 }
1234}
1235
1236/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
1237/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
1238/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
1239/// [`ControlRunner::apply_tka_synced`](ControlRunner).
1240#[doc(hidden)]
1241pub struct TkaSynced {
1242 pub(crate) result:
1243 Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
1244 /// The [`ControlRunner::tka_generation`] captured when this sync was spawned; the handler
1245 /// discards the result if it no longer matches (the lock was disabled/re-synced mid-flight).
1246 pub(crate) generation: u64,
1247}
1248
1249impl Message<TkaSynced> for ControlRunner {
1250 type Reply = ();
1251
1252 async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
1253 self.apply_tka_synced(msg.result, msg.generation).await;
1254 }
1255}
1256
1257impl Message<DerpLatencyMeasurement> for ControlRunner {
1258 type Reply = ();
1259
1260 async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
1261 let measurements = msg.measurement.as_ref().clone();
1262
1263 // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
1264 // the same measurements, before the home-region short-circuit below — an empty set still
1265 // yields a (default/empty) report rather than a stale one.
1266 self.netcheck
1267 .send_replace(crate::status::NetcheckReport::from_region_results(
1268 &measurements,
1269 ));
1270
1271 if measurements.is_empty() {
1272 tracing::debug!("derp latency measurements empty");
1273 return;
1274 };
1275
1276 // Record this cycle into the rolling history and evict reports older than the smoothing
1277 // window, then compute each region's `bestRecent` (5-min min). `Instant::now()` is the
1278 // arrival stamp; `best_recent` takes it as a param so the decision stays unit-testable.
1279 let now = Instant::now();
1280 self.derp_report_history
1281 .push((now, msg.measurement.clone()));
1282 self.derp_report_history
1283 .retain(|(stamp, _)| now.saturating_duration_since(*stamp) <= DERP_HISTORY_MAX_AGE);
1284 let best_recent = best_recent(&self.derp_report_history, now, DERP_HISTORY_MAX_AGE);
1285
1286 // Apply selection hysteresis (the pure decision lives in `select_home_region` for testability)
1287 // so jitter between near-equal regions does not flap the home relay. Go's asymmetric
1288 // smoothed-best vs raw-old comparison lives in `select_home_region`; here we just resolve the
1289 // chosen id back to its current-cycle latency for the home-region record + control update.
1290 let selected_id = select_home_region(
1291 self.home_region.map(|(id, _)| id),
1292 &measurements,
1293 &best_recent,
1294 )
1295 .expect("non-empty measurements always yield a selection");
1296 // `select_home_region` only ever returns an id drawn from `measurements`, so this lookup
1297 // always succeeds (same invariant the prior impl relied on when it returned the result by
1298 // reference). We record the current-cycle (raw) latency for the chosen region.
1299 let selected_latency = measurements
1300 .iter()
1301 .find(|m| m.id == selected_id)
1302 .expect("the selected region id is always one of the measurements")
1303 .latency;
1304
1305 let iter = measurements.iter().map(|result| {
1306 (
1307 result.latency_map_key.as_str(),
1308 result.latency.as_secs_f64(),
1309 )
1310 });
1311
1312 if self.home_region.map(|(id, _)| id) != Some(selected_id) {
1313 tracing::debug!(selected_region_id = ?selected_id, "updating home region");
1314 }
1315 self.home_region = Some((selected_id, selected_latency));
1316 self.client.set_home_region(selected_id, iter).await;
1317 }
1318}
1319
1320/// The window over which `best_recent` smooths per-region DERP latency (Go `netcheck` `maxAge`).
1321const DERP_HISTORY_MAX_AGE: Duration = Duration::from_secs(5 * 60);
1322
1323/// Compute each region's `bestRecent` — its **minimum** latency over the reports within
1324/// `max_age` of `now` (Go `addReportHistoryAndSetPreferredDERP`'s `bestRecent` map). Reports older
1325/// than the window are ignored. `now` and `max_age` are parameters (not clock-read) so this is
1326/// deterministically unit-testable. A region absent from every in-window report is absent from the
1327/// result.
1328fn best_recent(
1329 history: &[(Instant, Arc<Vec<ts_netcheck::RegionResult>>)],
1330 now: Instant,
1331 max_age: Duration,
1332) -> HashMap<ts_derp::RegionId, Duration> {
1333 let mut best: HashMap<ts_derp::RegionId, Duration> = HashMap::new();
1334 for (stamp, report) in history {
1335 // Skip reports outside the window. `saturating_duration_since` guards a `stamp` that is
1336 // somehow after `now` (clock skew): age 0, always in-window.
1337 if now.saturating_duration_since(*stamp) > max_age {
1338 continue;
1339 }
1340 for r in report.iter() {
1341 best.entry(r.id)
1342 .and_modify(|d| {
1343 if r.latency < *d {
1344 *d = r.latency;
1345 }
1346 })
1347 .or_insert(r.latency);
1348 }
1349 }
1350 best
1351}
1352
1353/// Choose the DERP home region id, applying Go's selection hysteresis
1354/// (`netcheck.addReportHistoryAndSetPreferredDERP`). Pure so the decision is unit-testable.
1355///
1356/// `measurements` is the current cycle sorted by latency ascending (so `measurements[0]` is the
1357/// raw-current best). `best_recent` is each region's smoothed (5-min-min) latency. Matching Go's
1358/// **asymmetric** comparison exactly: the new best candidate is chosen by the *smoothed* `best_recent`
1359/// latency (`bestAny`), while the old/home region is compared using its *current-cycle* (raw)
1360/// latency (`oldRegionCurLatency`). Smoothing the best damps oscillation of the best region across
1361/// the switch boundary that the raw-vs-raw comparison (the prior impl) would still flap on.
1362///
1363/// Keeps the `current` home region unless the new best is *meaningfully* lower-latency — switching
1364/// only when BOTH the current region's raw latency exceeds the smoothed-best by at least
1365/// `PREFERRED_DERP_ABSOLUTE_DIFF` (10ms) AND the smoothed-best is at most two-thirds of the current
1366/// region's raw latency (a >~33% improvement). On the first selection (`current` is `None`), when the
1367/// smoothed-best already IS the current region, or when the current region dropped out of the
1368/// measurements, returns the best directly. `None` only if `measurements` is empty.
1369fn select_home_region(
1370 current: Option<ts_derp::RegionId>,
1371 measurements: &[ts_netcheck::RegionResult],
1372 best_recent: &HashMap<ts_derp::RegionId, Duration>,
1373) -> Option<ts_derp::RegionId> {
1374 /// Go `netcheck.preferredDERPAbsoluteDiff`.
1375 const PREFERRED_DERP_ABSOLUTE_DIFF: Duration = Duration::from_millis(10);
1376
1377 // The smoothed latency for a region: its `best_recent` if present, else its current sample (a
1378 // region seen only this cycle has a 1-sample history, so its min == its current latency anyway).
1379 let smoothed = |m: &ts_netcheck::RegionResult| -> Duration {
1380 best_recent.get(&m.id).copied().unwrap_or(m.latency)
1381 };
1382
1383 // Pick the best candidate by SMOOTHED latency (Go `bestAny = min over regions of bestRecent`).
1384 // `measurements` is sorted by raw latency, but smoothing can reorder, so scan for the smoothed
1385 // minimum explicitly rather than trusting `measurements[0]`.
1386 let best = measurements.iter().min_by_key(|m| smoothed(m))?;
1387 let best_any = smoothed(best);
1388
1389 let Some(old_id) = current.filter(|id| *id != best.id) else {
1390 // First selection, or the smoothed-best already is the current home region.
1391 return Some(best.id);
1392 };
1393
1394 // Compare against the old region's CURRENT (raw) latency this cycle, if it is still present —
1395 // Go's `oldRegionCurLatency`, deliberately unsmoothed (the asymmetry).
1396 match measurements.iter().find(|m| m.id == old_id) {
1397 Some(old) => {
1398 // Byte-faithful to Go: `oldRegionCurLatency - bestAny < 10ms || bestAny >
1399 // oldRegionCurLatency/3*2`. `saturating_sub` matches Go's signed subtraction for the
1400 // `< 10ms` test (when `old < best_any` Go is negative → `< 10ms` true; saturating_sub
1401 // floors to 0 → also true). The two-thirds rule uses INTEGER `Duration` division
1402 // `(old/3)*2` — NOT float `* 2.0/3.0`: Go computes the threshold in integer nanoseconds
1403 // (`oldNs/3` truncates), and float arithmetic diverges from it at the exact 2/3 boundary
1404 // with whole-millisecond inputs (e.g. old=36ms, best=24ms: Go's `24ms > 24ms` is false →
1405 // switch, but float `0.024 > 0.0239999997` is true → keep). `Duration / u32` truncates
1406 // nanos exactly like Go and `* u32` is exact, reproducing `oldRegionCurLatency/3*2`.
1407 let keep_old = old.latency.saturating_sub(best_any) < PREFERRED_DERP_ABSOLUTE_DIFF
1408 || best_any > (old.latency / 3) * 2;
1409 Some(if keep_old { old.id } else { best.id })
1410 }
1411 // The current region is no longer reachable this cycle: take the new best.
1412 None => Some(best.id),
1413 }
1414}
1415
1416impl Message<EndpointAdvertisement> for ControlRunner {
1417 type Reply = ();
1418
1419 async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
1420 let endpoints: Vec<Endpoint> = msg
1421 .endpoints
1422 .iter()
1423 .map(|ep| Endpoint {
1424 endpoint: ep.addr,
1425 ty: match ep.ty {
1426 SelfEndpointType::Local => EndpointType::Local,
1427 SelfEndpointType::Stun => EndpointType::Stun,
1428 SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
1429 },
1430 })
1431 .collect();
1432
1433 tracing::debug!(
1434 n_endpoints = endpoints.len(),
1435 "advertising endpoints to control"
1436 );
1437
1438 self.client.set_endpoints(endpoints).await;
1439 }
1440}
1441
1442/// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control — the wire
1443/// half of a runtime [`Runtime::set_advertise_routes`](crate::Runtime::set_advertise_routes). Sent
1444/// as a direct `ask` from the runtime (not over the bus), so the route change reaches the live
1445/// map-poll client. `routes` is the final advertised set the caller wants control to grant.
1446#[derive(Debug)]
1447pub struct SetAdvertiseRoutes {
1448 /// The prefixes to advertise to control (already filtered to the final set).
1449 pub routes: Vec<ipnet::IpNet>,
1450}
1451
1452impl Message<SetAdvertiseRoutes> for ControlRunner {
1453 type Reply = ();
1454
1455 async fn handle(&mut self, msg: SetAdvertiseRoutes, _ctx: &mut Context<Self, Self::Reply>) {
1456 tracing::debug!(n_routes = msg.routes.len(), "advertising routes to control");
1457 self.client.set_routable_ips(msg.routes).await;
1458 }
1459}
1460
1461/// Update this node's `Hostinfo.Hostname` at control — the wire half of a runtime
1462/// [`Runtime::set_hostname`](crate::Runtime::set_hostname). A direct `ask` from the runtime, so the
1463/// change reaches the live map-poll client.
1464#[derive(Debug)]
1465pub struct SetHostname {
1466 /// The new hostname to report to control.
1467 pub hostname: String,
1468}
1469
1470impl Message<SetHostname> for ControlRunner {
1471 type Reply = ();
1472
1473 async fn handle(&mut self, msg: SetHostname, _ctx: &mut Context<Self, Self::Reply>) {
1474 tracing::debug!("updating hostname at control");
1475 self.client.set_hostname(msg.hostname).await;
1476 }
1477}
1478
1479#[cfg(test)]
1480mod reauth_bridge_tests {
1481 use tokio::sync::watch;
1482
1483 use super::bridge_reauth_url_to_state;
1484 use crate::DeviceState;
1485
1486 fn url(s: &str) -> url::Url {
1487 s.parse().unwrap()
1488 }
1489
1490 /// The bridge maps a surfaced re-auth URL onto `DeviceState::NeedsLogin(url)` — the fix's core:
1491 /// a mid-session `MachineNotAuthorized` (forwarded by the control client as `Some(url)`) becomes
1492 /// the "needs login" state the IPN bus turns into `browse_to_url`.
1493 #[test]
1494 fn bridge_maps_auth_url_to_needs_login() {
1495 let u = url("https://login.example/auth");
1496 let (tx, rx) = watch::channel(DeviceState::Running);
1497
1498 bridge_reauth_url_to_state(&tx, Some(&u));
1499
1500 assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1501 }
1502
1503 /// `None` never drives a transition — the recovery to `Running` is the netmap self-node
1504 /// handler's job, so the bridge ignores a `None` and leaves the state untouched.
1505 #[test]
1506 fn bridge_none_leaves_state_unchanged() {
1507 let (tx, rx) = watch::channel(DeviceState::Running);
1508
1509 bridge_reauth_url_to_state(&tx, None);
1510
1511 assert_eq!(*rx.borrow(), DeviceState::Running);
1512 }
1513
1514 /// Re-surfacing the same URL across retries does not re-fire the watch (`send_if_modified`
1515 /// dedupe against the cell's current value), so a stuck re-auth does not thrash subscribers.
1516 #[test]
1517 fn bridge_same_url_does_not_refire() {
1518 let u = url("https://login.example/auth");
1519 let (tx, mut rx) = watch::channel(DeviceState::Running);
1520
1521 bridge_reauth_url_to_state(&tx, Some(&u)); // first: fires
1522 assert!(rx.has_changed().unwrap(), "first NeedsLogin fires");
1523 rx.mark_unchanged();
1524 bridge_reauth_url_to_state(&tx, Some(&u)); // same URL: deduped
1525 assert!(
1526 !rx.has_changed().unwrap(),
1527 "the same re-auth URL must not re-fire the state watch"
1528 );
1529 }
1530
1531 /// A genuinely different re-auth URL after a prior one fires again (the dedupe tracks changes,
1532 /// it does not pin the first URL forever).
1533 #[test]
1534 fn bridge_new_url_after_prior_fires() {
1535 let a = url("https://login.example/a");
1536 let b = url("https://login.example/b");
1537 let (tx, rx) = watch::channel(DeviceState::Running);
1538
1539 bridge_reauth_url_to_state(&tx, Some(&a));
1540 bridge_reauth_url_to_state(&tx, Some(&b));
1541
1542 assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(b));
1543 }
1544
1545 /// End-to-end of the *clear* contract: after the bridge sets `NeedsLogin`, the netmap self-node
1546 /// path (modeled here as a direct `send_replace(Running)`, the exact transition the
1547 /// `StreamMessage::Next` handler performs on the next good self-node) flips back to `Running`.
1548 /// This pins that the bridge does NOT need a `None`-clear arm — recovery is owned elsewhere.
1549 #[test]
1550 fn running_netmap_clears_needs_login() {
1551 let u = url("https://login.example/auth");
1552 let (tx, rx) = watch::channel(DeviceState::Running);
1553
1554 bridge_reauth_url_to_state(&tx, Some(&u));
1555 assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1556
1557 // The self-node handler's recovery transition (next good netmap self-node → Running).
1558 tx.send_replace(DeviceState::Running);
1559 assert_eq!(*rx.borrow(), DeviceState::Running);
1560 }
1561}
1562
1563#[cfg(test)]
1564mod sticky_pop_browser_url_tests {
1565 use tokio::sync::watch;
1566
1567 use super::sticky_update_pop_browser_url;
1568
1569 fn url(s: &str) -> url::Url {
1570 s.parse().unwrap()
1571 }
1572
1573 /// A non-empty URL publishes to the cell.
1574 #[test]
1575 fn non_empty_url_publishes() {
1576 let (tx, rx) = watch::channel(None);
1577 let u = url("https://login.example/consent");
1578 sticky_update_pop_browser_url(&tx, Some(&u));
1579 assert_eq!(*rx.borrow(), Some(u));
1580 }
1581
1582 /// An absent (`None`) update — the common netmap tick — must NOT reset the cell. This is the
1583 /// regression guard for the thrash bug (a reset-every-tick would coalesce the URL away on the bus).
1584 #[test]
1585 fn absent_update_does_not_reset() {
1586 let u = url("https://login.example/consent");
1587 let (tx, rx) = watch::channel(Some(u.clone()));
1588 // Simulate many empty netmap updates.
1589 for _ in 0..5 {
1590 sticky_update_pop_browser_url(&tx, None);
1591 }
1592 assert_eq!(
1593 *rx.borrow(),
1594 Some(u),
1595 "empty updates must not clear the URL"
1596 );
1597 }
1598
1599 /// The same URL repeated does not re-fire the watch (in-place dedupe via `send_if_modified`), so
1600 /// a subscriber isn't woken spuriously. Proven by the borrow not having been marked changed.
1601 #[test]
1602 fn repeated_same_url_does_not_refire() {
1603 let u = url("https://login.example/consent");
1604 let (tx, mut rx) = watch::channel(None);
1605 sticky_update_pop_browser_url(&tx, Some(&u)); // first: fires
1606 assert!(rx.has_changed().unwrap(), "first non-empty URL fires");
1607 rx.mark_unchanged();
1608 sticky_update_pop_browser_url(&tx, Some(&u)); // same: deduped
1609 assert!(
1610 !rx.has_changed().unwrap(),
1611 "repeating the same URL must not re-fire the watch"
1612 );
1613 }
1614
1615 /// A genuinely new URL after a prior one fires again (sticky but tracks changes).
1616 #[test]
1617 fn new_url_after_prior_fires() {
1618 let a = url("https://login.example/a");
1619 let b = url("https://login.example/b");
1620 let (tx, rx) = watch::channel(None);
1621 sticky_update_pop_browser_url(&tx, Some(&a));
1622 sticky_update_pop_browser_url(&tx, Some(&b));
1623 assert_eq!(*rx.borrow(), Some(b));
1624 }
1625
1626 /// The realistic session sequence: a URL stays sticky through a run of `None` ticks, and a
1627 /// *different* URL after that gap still fires. Chains the legs the other tests cover in isolation
1628 /// (the actual control cadence is "URL, then many empty updates, then maybe a new URL").
1629 #[test]
1630 fn sticky_through_none_gap_then_new_url_fires() {
1631 let a = url("https://login.example/a");
1632 let b = url("https://login.example/b");
1633 let (tx, rx) = watch::channel(None);
1634 sticky_update_pop_browser_url(&tx, Some(&a));
1635 for _ in 0..3 {
1636 sticky_update_pop_browser_url(&tx, None);
1637 }
1638 assert_eq!(*rx.borrow(), Some(a), "stayed sticky through the None gap");
1639 sticky_update_pop_browser_url(&tx, Some(&b));
1640 assert_eq!(
1641 *rx.borrow(),
1642 Some(b),
1643 "a new URL after a None gap still fires"
1644 );
1645 }
1646
1647 /// Returning to a previously-seen URL (A → B → A) re-fires: the dedupe is against the cell's
1648 /// *current* value, not a full history, so A after B is a genuine change.
1649 #[test]
1650 fn returning_to_prior_url_refires() {
1651 let a = url("https://login.example/a");
1652 let b = url("https://login.example/b");
1653 let (tx, mut rx) = watch::channel(None);
1654 sticky_update_pop_browser_url(&tx, Some(&a));
1655 sticky_update_pop_browser_url(&tx, Some(&b));
1656 rx.mark_unchanged();
1657 sticky_update_pop_browser_url(&tx, Some(&a)); // back to A: differs from current (B) → fires
1658 assert!(
1659 rx.has_changed().unwrap(),
1660 "returning to a prior URL re-fires"
1661 );
1662 assert_eq!(*rx.borrow(), Some(a));
1663 }
1664
1665 /// End-to-end de-thrash: feed a realistic netmap cadence (empty, empty, URL, empty, empty)
1666 /// through the producer into a cell, and count the changes a `run_bus`-style subscriber would
1667 /// observe via `changed()`. The whole point of the fix is that exactly ONE change survives the
1668 /// surrounding `None` thrash — the pre-fix code (`send_replace` every tick) would have woken the
1669 /// subscriber on every empty tick and coalesced the URL away. This exercises the producer + the
1670 /// watch-subscribe path together (the two halves the unit tests cover in isolation).
1671 #[tokio::test]
1672 async fn end_to_end_one_change_survives_none_thrash() {
1673 let u = url("https://login.example/consent");
1674 let (tx, mut rx) = watch::channel(None);
1675 // The cadence control actually sends: mostly-empty MapResponses with one carrying the URL.
1676 let cadence = [None, None, Some(&u), None, None];
1677 for incoming in cadence {
1678 sticky_update_pop_browser_url(&tx, incoming);
1679 }
1680 // A subscriber sees exactly one change, and it carries the URL (not a coalesced `None`).
1681 let mut changes = 0;
1682 while rx.has_changed().unwrap() {
1683 let v = rx.borrow_and_update().clone();
1684 changes += 1;
1685 assert_eq!(v, Some(u.clone()), "the surviving change carries the URL");
1686 }
1687 assert_eq!(changes, 1, "exactly one change survives the None thrash");
1688 }
1689}
1690
1691#[cfg(test)]
1692mod home_region_hysteresis_tests {
1693 use core::time::Duration;
1694 use std::{collections::HashMap, sync::Arc, time::Instant};
1695
1696 use ts_derp::RegionId;
1697 use ts_netcheck::RegionResult;
1698
1699 use super::{DERP_HISTORY_MAX_AGE, best_recent, select_home_region};
1700
1701 fn region(id: u32, latency_ms: u64) -> RegionResult {
1702 RegionResult {
1703 latency: Duration::from_millis(latency_ms),
1704 id: RegionId(core::num::NonZeroU32::new(id).unwrap()),
1705 latency_map_key: format!("region-{id}"),
1706 connected_remote: "127.0.0.1:0".parse().unwrap(),
1707 }
1708 }
1709
1710 fn rid(id: u32) -> RegionId {
1711 RegionId(core::num::NonZeroU32::new(id).unwrap())
1712 }
1713
1714 /// Call `select_home_region` with NO smoothing history — `best_recent` empty, so each region's
1715 /// smoothed latency falls back to its current sample, reproducing the original raw-vs-raw
1716 /// hysteresis these tests pin. (The smoothing-specific tests below pass a populated map.)
1717 fn sel(current: Option<RegionId>, m: &[RegionResult]) -> Option<RegionId> {
1718 select_home_region(current, m, &HashMap::new())
1719 }
1720
1721 /// Empty measurements yield no selection.
1722 #[test]
1723 fn empty_measurements_select_none() {
1724 assert!(sel(Some(rid(1)), &[]).is_none());
1725 assert!(sel(None, &[]).is_none());
1726 }
1727
1728 /// First selection (no current home region) takes the best (lowest-latency) region directly.
1729 #[test]
1730 fn first_selection_takes_best() {
1731 let m = [region(1, 20), region(2, 50)];
1732 assert_eq!(sel(None, &m).unwrap(), rid(1));
1733 }
1734
1735 /// Jitter within the 10ms absolute-diff band keeps the current region (no flap). Current=region 2
1736 /// at 25ms; new best=region 1 at 20ms (only 5ms better) -> keep region 2.
1737 #[test]
1738 fn keeps_current_when_within_absolute_diff() {
1739 let m = [region(1, 20), region(2, 25)];
1740 assert_eq!(
1741 sel(Some(rid(2)), &m).unwrap(),
1742 rid(2),
1743 "a 5ms improvement (< 10ms) must not flap the home region"
1744 );
1745 }
1746
1747 /// A meaningful improvement (>10ms AND best <= 2/3 of current) switches. Current=region 2 at
1748 /// 100ms; new best=region 1 at 20ms -> switch to region 1.
1749 #[test]
1750 fn switches_on_meaningful_improvement() {
1751 let m = [region(1, 20), region(2, 100)];
1752 assert_eq!(
1753 sel(Some(rid(2)), &m).unwrap(),
1754 rid(1),
1755 "a large improvement must switch the home region"
1756 );
1757 }
1758
1759 /// The two-thirds rule: even past the 10ms absolute diff, an improvement that does not beat 2/3
1760 /// of the current latency keeps the current region. current=60ms, best=45ms: diff=15ms (>10ms,
1761 /// so the absolute test alone would switch), but 45 > 60*2/3=40, so keep.
1762 #[test]
1763 fn keeps_current_when_two_thirds_rule_not_met() {
1764 let m = [region(1, 45), region(2, 60)];
1765 assert_eq!(
1766 sel(Some(rid(2)), &m).unwrap(),
1767 rid(2),
1768 "best (45ms) is not <= 2/3 of current (40ms), so keep current despite >10ms diff"
1769 );
1770 }
1771
1772 /// When the current home region is no longer present in the measurements, take the new best.
1773 #[test]
1774 fn switches_when_current_region_absent() {
1775 let m = [region(1, 20), region(3, 25)];
1776 assert_eq!(
1777 sel(Some(rid(2)), &m).unwrap(),
1778 rid(1),
1779 "a current region absent from the measurements falls through to the best"
1780 );
1781 }
1782
1783 /// When the best already IS the current home region, it is kept (no spurious change).
1784 #[test]
1785 fn keeps_current_when_it_is_already_best() {
1786 let m = [region(2, 20), region(1, 50)];
1787 assert_eq!(sel(Some(rid(2)), &m).unwrap(), rid(2));
1788 }
1789
1790 /// `best_recent` is each region's MINIMUM latency over the in-window reports; a report older than
1791 /// `max_age` is excluded.
1792 #[test]
1793 fn best_recent_is_min_over_window_and_evicts_aged() {
1794 let now = Instant::now();
1795 // Two in-window reports for region 1 (50ms then 20ms) → min 20ms; region 2 once at 30ms.
1796 // One aged report (region 1 at 5ms) outside the window must be ignored.
1797 let history = vec![
1798 (
1799 now - Duration::from_secs(10 * 60), // aged out (> 5min)
1800 Arc::new(vec![region(1, 5)]),
1801 ),
1802 (
1803 now - Duration::from_secs(60),
1804 Arc::new(vec![region(1, 50), region(2, 30)]),
1805 ),
1806 (now, Arc::new(vec![region(1, 20)])),
1807 ];
1808 let br = best_recent(&history, now, DERP_HISTORY_MAX_AGE);
1809 assert_eq!(
1810 br.get(&rid(1)).copied(),
1811 Some(Duration::from_millis(20)),
1812 "region 1 min over the window is 20ms (the aged 5ms is excluded)"
1813 );
1814 assert_eq!(br.get(&rid(2)).copied(), Some(Duration::from_millis(30)));
1815 }
1816
1817 /// The asymmetric comparison: the new best is chosen by its SMOOTHED (best_recent) latency while
1818 /// the old region is compared on its RAW current latency. A best region whose CURRENT sample
1819 /// looks much better but whose 5-min MIN is only marginally better must NOT flap the home region
1820 /// — exactly the oscillation the raw-vs-raw comparison would have switched on.
1821 #[test]
1822 fn smoothed_best_damps_oscillation_across_boundary() {
1823 // Current home = region 2, raw 60ms this cycle. Region 1's CURRENT sample is 20ms (a >2/3,
1824 // >10ms improvement → raw-vs-raw would SWITCH), but its 5-min MIN (best_recent) is 50ms
1825 // (it oscillates). Smoothed-best 50ms vs raw-old 60ms: diff 10ms is NOT < 10ms, but
1826 // 50 > 60*2/3=40 → keepOld. So we KEEP region 2, where the raw comparison would have flapped.
1827 let m = [region(1, 20), region(2, 60)];
1828 let mut br = HashMap::new();
1829 br.insert(rid(1), Duration::from_millis(50)); // smoothed best is worse than its raw sample
1830 br.insert(rid(2), Duration::from_millis(60));
1831 assert_eq!(
1832 select_home_region(Some(rid(2)), &m, &br).unwrap(),
1833 rid(2),
1834 "a best region whose 5-min min is only marginally better must not flap the home region"
1835 );
1836
1837 // Sanity: with NO smoothing (raw 20ms best), the same inputs WOULD switch — proving the
1838 // smoothing is what holds it.
1839 assert_eq!(
1840 select_home_region(Some(rid(2)), &m, &HashMap::new()).unwrap(),
1841 rid(1),
1842 "raw-vs-raw (no smoothing) switches on the 20ms-vs-60ms current samples"
1843 );
1844 }
1845
1846 /// Smoothing can reorder which region is "best": `measurements` is sorted by raw latency, but the
1847 /// smoothed minimum may favor a different region. `select_home_region` must pick by smoothed
1848 /// latency, not blindly trust `measurements[0]`.
1849 #[test]
1850 fn smoothed_best_may_differ_from_raw_first() {
1851 // Raw order: region 1 (10ms) is first. But region 2's 5-min min is 5ms while region 1's is
1852 // 40ms (region 1's 10ms was a lucky low sample). Smoothed-best is region 2. First selection.
1853 let m = [region(1, 10), region(2, 12)];
1854 let mut br = HashMap::new();
1855 br.insert(rid(1), Duration::from_millis(40));
1856 br.insert(rid(2), Duration::from_millis(5));
1857 assert_eq!(
1858 select_home_region(None, &m, &br).unwrap(),
1859 rid(2),
1860 "the smoothed-best region wins even when it is not the raw-latency first"
1861 );
1862 }
1863
1864 /// Byte-faithful integer two-thirds boundary (the float-vs-integer divergence): at exactly
1865 /// `best == old * 2/3` (old=36ms, best=24ms), Go's integer `bestAny > old/3*2` = `24ms > 24ms`
1866 /// is FALSE, so it does NOT keep on the 2/3 arm; and `cond_a` `36-24=12ms < 10ms` is also false,
1867 /// so Go SWITCHES. A float `0.024 > 0.036*2.0/3.0 = 0.0239999997` would wrongly KEEP. This test
1868 /// pins the integer math: it must switch to the best.
1869 #[test]
1870 fn two_thirds_boundary_is_integer_not_float() {
1871 let m = [region(1, 24), region(2, 36)];
1872 // No smoothing (raw == smoothed): isolates the 2/3 arithmetic at the exact boundary.
1873 assert_eq!(
1874 sel(Some(rid(2)), &m).unwrap(),
1875 rid(1),
1876 "at best == old*2/3 the integer rule does NOT keep (Go switches); a float rule would keep"
1877 );
1878 }
1879
1880 /// The `cond_a` (absolute-diff) arm via `saturating_sub`: when the old region's RAW current
1881 /// latency is FASTER than the smoothed-best (old=20ms raw, smoothed-best=50ms), `old - best_any`
1882 /// underflows. Go's signed subtraction is negative (`< 10ms` → keepOld); `saturating_sub` floors
1883 /// to 0 (`< 10ms` → keepOld) — same outcome. The old region is kept.
1884 #[test]
1885 fn old_faster_than_smoothed_best_keeps_via_absolute_diff() {
1886 // Current home = region 2, raw 20ms. Region 1 is the raw-best at 15ms but its smoothed min is
1887 // 50ms (it oscillates badly). smoothed-best candidate by min = region 2 (raw 20 == smoothed
1888 // 20, since br[2]=20) vs region 1 smoothed 50 → best is region 2 itself → already-best path.
1889 // To exercise the old<best_any underflow we need best != old: make region 1 the smoothed best
1890 // at 18ms but the OLD region's raw 20ms... use: old=region2 raw 20, best=region1 smoothed 18.
1891 let m = [region(1, 15), region(2, 20)];
1892 let mut br = HashMap::new();
1893 br.insert(rid(1), Duration::from_millis(18)); // smoothed-best = region 1 at 18ms
1894 br.insert(rid(2), Duration::from_millis(25)); // region 2 smoothed worse than its raw 20ms
1895 // best_any = 18ms (region 1). old (region 2) RAW = 20ms. 20 - 18 = 2ms < 10ms → keepOld.
1896 assert_eq!(
1897 select_home_region(Some(rid(2)), &m, &br).unwrap(),
1898 rid(2),
1899 "old raw (20ms) within 10ms of smoothed-best (18ms) keeps via the absolute-diff arm"
1900 );
1901 }
1902}
1903
1904#[cfg(test)]
1905mod self_lockout_tests {
1906 use ts_tka::{AumHash, Authority, State};
1907
1908 use super::{SelfLockVerdict, self_lock_verdict};
1909
1910 fn node_key() -> ts_keys::NodePublicKey {
1911 ts_keys::NodePrivateKey::random().public_key()
1912 }
1913
1914 /// An empty key-signature is the "not signed yet" case: `Unsigned`, never a lockout warning —
1915 /// so a tailnet that simply has not signed this node does not spam a `warn`.
1916 #[test]
1917 fn empty_signature_is_unsigned_not_locked_out() {
1918 let authority = Authority::from_state(AumHash([0; 32]), State::default());
1919 assert_eq!(
1920 self_lock_verdict(&node_key(), &[], &authority),
1921 SelfLockVerdict::Unsigned
1922 );
1923 }
1924
1925 /// A non-empty key-signature that does not authorize self classifies as `LockedOut` — the
1926 /// operator-facing condition — and the verdict carries the verify error string for the log. Here
1927 /// the blob is non-empty (so we attempt verification rather than short-circuiting to `Unsigned`)
1928 /// but is not a valid NodeKeySignature CBOR (`0x01` decodes as a bare uint with trailing bytes),
1929 /// so `node_key_authorized` returns a `Decode` error → `LockedOut`. The cryptographic-rejection
1930 /// arms (`UntrustedKey` / `BadSignature` for a well-formed-but-untrusted NKS) are covered by
1931 /// `ts_tka`'s own `node_key_authorized` tests; this only needs to prove the runtime classifier
1932 /// routes a verify `Err` to `LockedOut`.
1933 #[test]
1934 fn unverifiable_signature_is_locked_out() {
1935 let authority = Authority::from_state(AumHash([0; 32]), State::default());
1936 let verdict = self_lock_verdict(&node_key(), &[0x01, 0x02, 0x03], &authority);
1937 assert!(
1938 matches!(verdict, SelfLockVerdict::LockedOut(_)),
1939 "a signature the lock cannot authorize must classify as LockedOut, got {verdict:?}"
1940 );
1941 }
1942}