ts_runtime/control_runner.rs
1use core::{
2 net::{Ipv4Addr, Ipv6Addr},
3 time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9 actor::{ActorRef, Spawn},
10 message::{Context, StreamMessage},
11 prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15 AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16 Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus, TkaSyncError, tka_disable,
17 tka_submit_signature,
18};
19use ts_magicsock::SelfEndpointType;
20
21use crate::{
22 derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
23 direct::EndpointAdvertisement,
24};
25
26/// Actor responsible for maintaining the connection to control.
27///
28/// This actor is responsible for proxying the map response stream onto the message bus.
29pub struct ControlRunner {
30 client: AsyncControlClient,
31 params: Params,
32
33 self_node: watch::Sender<Option<Node>>,
34 /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
35 /// server reads this to authorize incoming connections; absent policy means deny-all.
36 ssh_policy: watch::Sender<Option<SshPolicy>>,
37 /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
38 tka: watch::Sender<Option<TkaStatus>>,
39 /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
40 /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
41 /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
42 /// the result returns via the [`TkaSynced`] self-message).
43 tka_synced: Option<crate::tka_sync::SyncedTka>,
44 /// Published copy of the synced TKA [`Authority`](ts_tka::Authority) for the verify-and-log
45 /// consumer. `None` until the first successful sync. Observe-only: a reader uses it to *log*
46 /// whether a peer's node-key signature verifies, never to drop a peer (enforcement is a separate
47 /// gated decision).
48 tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
49 /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
50 /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
51 tka_syncing: bool,
52 /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
53 /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
54 cert_domains: watch::Sender<Vec<String>>,
55 /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
56 /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
57 /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
58 /// own cell for the narrower TLS-cert use.
59 dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
60 /// Latest interactive-login / consent URL control asked this node to open
61 /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
62 /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user), and
63 /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) subscribes to it for the bus's
64 /// `browse_to_url` running-node events.
65 ///
66 /// **Sticky, not per-update** (Go `controlclient` `sess.lastPopBrowserURL`): control sends
67 /// `MapResponse.PopBrowserURL` empty on nearly every netmap tick, so this cell is updated ONLY on
68 /// a non-empty URL that differs from its current value (`sticky_update_pop_browser_url`, via
69 /// `send_if_modified` — the cell's own value is the "last URL seen", so no separate mirror is
70 /// needed). It is never reset to `None` by an empty update — matching Go's `direct.go` guard
71 /// `u != "" && u != sess.lastPopBrowserURL`. Updating on every tick would thrash the cell to
72 /// `None` and coalesce the URL away for a `watch` subscriber.
73 pop_browser_url: watch::Sender<Option<url::Url>>,
74 /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
75 /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
76 /// daemon's `tnet netcheck`). Empty until the first measurement.
77 netcheck: watch::Sender<crate::status::NetcheckReport>,
78 /// Background task that bridges the control client's mid-session re-auth URL cell onto
79 /// [`Self::params`]'s device-state cell (sets [`DeviceState::NeedsLogin`] when control returns
80 /// `MachineNotAuthorized` on a live re-register — see [`bridge_reauth_url_to_state`]). Aborted on
81 /// [`Drop`] so it cannot outlive the actor (the [`DataplaneActor`](crate::dataplane) pattern).
82 reauth_bridge: tokio::task::JoinHandle<()>,
83}
84
85impl Drop for ControlRunner {
86 fn drop(&mut self) {
87 // Stop the re-auth bridge so it does not outlive the actor (mirrors `DataplaneActor`).
88 self.reauth_bridge.abort();
89 }
90}
91
92/// Control runner args.
93pub struct Params {
94 /// Control config.
95 pub(crate) config: ts_control::Config,
96
97 /// Auth key (if needed).
98 pub(crate) auth_key: Option<String>,
99
100 /// The [`crate::Env`] for this actor.
101 pub(crate) env: crate::Env,
102
103 /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
104 /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
105 /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
106 /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
107 pub(crate) state_tx: watch::Sender<crate::DeviceState>,
108}
109
110#[doc(hidden)]
111#[derive(Debug, thiserror::Error)]
112pub enum ControlRunnerError {
113 #[error(transparent)]
114 Control(#[from] ControlError),
115
116 #[error(transparent)]
117 Crate(#[from] crate::Error),
118}
119
120impl kameo::Actor for ControlRunner {
121 type Args = Params;
122 type Error = ControlRunnerError;
123
124 async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
125 loop {
126 match AsyncControlClient::check_auth(
127 ¶ms.config,
128 ¶ms.env.keys,
129 params.auth_key.as_deref(),
130 )
131 .await
132 {
133 Ok(()) => break,
134 Err(ControlError::MachineNotAuthorized(u)) => {
135 tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
136 // Surface "interactive login required" so a watcher / `wait_until_running` can
137 // tell the user to authorize, instead of seeing an opaque timeout. Registration
138 // keeps retrying (transient), so this is not a terminal `Failed`.
139 params
140 .state_tx
141 .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
142 tokio::time::sleep(Duration::from_secs(5)).await;
143 }
144 Err(e) => {
145 // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
146 // specific reason control gave AND publish it as a typed `Failed` state so
147 // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
148 // of the opaque `Internal(Actor)` the caller would otherwise see once the
149 // stopped actor is next asked. Publishing before `return Err` is why the state
150 // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
151 let reason = crate::RegistrationError::from(&e);
152 tracing::error!(error = %e, "registration failed; control runner stopping");
153 params
154 .state_tx
155 .send_replace(crate::DeviceState::Failed(reason));
156 return Err(e.into());
157 }
158 }
159 }
160 // check_auth succeeded, but the node is not "up" until the netmap stream is actually
161 // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
162 // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
163 // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
164 // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
165 // The control client's live map-poll loop publishes a mid-session re-auth URL here (set when
166 // a re-register returns `MachineNotAuthorized` because the node key expired/was revoked). The
167 // runtime owns the receiver; `connect` takes the sender. Created before `connect` so the
168 // sender is in place for the very first poll, and so the receiver outlives `bring_up`.
169 let (auth_url_tx, auth_url_rx) = watch::channel::<Option<url::Url>>(None);
170
171 let bring_up = async {
172 let (client, stream) = AsyncControlClient::connect(
173 ¶ms.config,
174 ¶ms.env.keys,
175 params.auth_key.as_deref(),
176 auth_url_tx,
177 )
178 .await?;
179
180 DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
181
182 params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
183 params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
184 slf.attach_stream(stream.boxed(), (), ());
185 Ok::<_, ControlRunnerError>(client)
186 };
187
188 let client = match bring_up.await {
189 Ok(client) => client,
190 Err(e) => {
191 tracing::error!(error = %e, "bringing up the control session failed");
192 // The control session never came up; surface it as a transient registration
193 // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
194 // stuck at `Connecting`.
195 params.state_tx.send_replace(crate::DeviceState::Failed(
196 crate::RegistrationError::NetworkUnreachable,
197 ));
198 return Err(e);
199 }
200 };
201
202 // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
203 // current (and flips to `Expired` if the self-node's key lapses).
204 params.state_tx.send_replace(crate::DeviceState::Running);
205
206 // Bridge the control client's mid-session re-auth URL cell onto the device-state cell: a
207 // `Some(url)` (control returned `MachineNotAuthorized` on a live re-register) becomes
208 // `DeviceState::NeedsLogin(url)` so the IPN bus surfaces `browse_to_url` and the embedder can
209 // prompt the user — the live-session analogue of the initial `check_auth` loop above. The
210 // recovery to `Running` is the netmap self-node handler's job (next good self-node), so this
211 // bridge only forwards `Some`. The task ends when the sender drops (the client's `run` task
212 // ended) and is aborted on actor `Drop`, so it cannot leak past the actor.
213 let reauth_bridge = {
214 let state_tx = params.state_tx.clone();
215 let mut auth_url_rx = auth_url_rx;
216 tokio::spawn(async move {
217 while auth_url_rx.changed().await.is_ok() {
218 let url = auth_url_rx.borrow_and_update().clone();
219 bridge_reauth_url_to_state(&state_tx, url.as_ref());
220 }
221 })
222 };
223
224 Ok(Self {
225 client,
226 params,
227 self_node: Default::default(),
228 ssh_policy: Default::default(),
229 tka: Default::default(),
230 tka_synced: None,
231 tka_authority: Default::default(),
232 tka_syncing: false,
233 cert_domains: Default::default(),
234 dns_config: Default::default(),
235 pop_browser_url: Default::default(),
236 netcheck: Default::default(),
237 reauth_bridge,
238 })
239 }
240}
241
242impl ControlRunner {
243 /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
244 /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
245 /// round-trip). The result returns via the [`TkaSynced`] self-message.
246 ///
247 /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
248 /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
249 /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
250 /// `tkaSyncIfNeeded`: a no-op when our head already matches.
251 fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
252 if !tka.is_enabled() {
253 // Lock disabled (or never enabled): drop any synced state and stop publishing an
254 // Authority. Never an error; peers are unaffected.
255 if self.tka_synced.is_some() {
256 self.tka_synced = None;
257 self.tka_authority.send_replace(None);
258 }
259 return;
260 }
261 if self.tka_syncing {
262 return; // a sync is already in flight; the next netmap will re-trigger if still stale
263 }
264 // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
265 // do. A malformed control head is treated as "different" (we'll attempt a sync, which
266 // fail-closes harmlessly).
267 if let Some(synced) = &self.tka_synced
268 && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
269 && synced.authority.head_matches(&control_head)
270 {
271 return;
272 }
273
274 // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
275 // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
276 // `tka_syncing` so we don't spawn a second concurrent sync.
277 self.tka_syncing = true;
278 let current = self.tka_synced.take();
279 let config = self.params.config.clone();
280 let keys = self.params.env.keys.clone();
281 tokio::spawn(async move {
282 let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
283 // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
284 // not allowed). A send failure just means the actor is gone — nothing to do.
285 if let Err(e) = self_ref.tell(TkaSynced { result }).await {
286 tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
287 }
288 });
289 }
290
291 /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
292 /// state + publish the `Authority` (or, on inert/failed sync, leave peers unaffected). Always
293 /// clears the in-flight guard.
294 async fn apply_tka_synced(
295 &mut self,
296 result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
297 ) {
298 self.tka_syncing = false;
299 match result {
300 Ok(Some(synced)) => {
301 tracing::info!(
302 head = %synced.authority.head().to_base32(),
303 "TKA sync succeeded; publishing verified Authority (observe-only)"
304 );
305 self.tka_authority
306 .send_replace(Some(synced.authority.clone()));
307 // Deliver the verified Authority to the peer tracker's observe-only verify-and-log
308 // seam (#136) over the bus. Re-published on every successful sync (no bus replay).
309 if let Err(e) = self
310 .params
311 .env
312 .publish(crate::peer_tracker::TkaAuthorityUpdate(
313 synced.authority.clone(),
314 ))
315 .await
316 {
317 tracing::warn!(error = %e, "publishing TKA authority to peer tracker failed");
318 }
319 self.tka_synced = Some(synced);
320 }
321 Ok(None) => {
322 // Control has no lock for us (no genesis / disabled): stay inert. Not an error.
323 tracing::debug!("TKA sync: control reported no lock for this node (inert)");
324 }
325 Err(e) => {
326 // Transport or verify failure: log and stay inert. NEVER errors the netmap or drops a
327 // peer. The next netmap update re-triggers a sync attempt.
328 tracing::warn!(error = %e, "TKA sync failed; staying inert (no peer impact)");
329 }
330 }
331 }
332
333 fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
334 where
335 F: FnOnce(&Node) -> R,
336 {
337 let mut sub = self.self_node.subscribe();
338 let mut shutdown = self.params.env.shutdown.clone();
339
340 async move {
341 tokio::select! {
342 _ = shutdown.wait_for(|x| *x) => {
343 None
344 },
345 node = sub.wait_for(Option::is_some) => {
346 Some(f(node.ok()?.as_ref()?))
347 },
348 }
349 }
350 }
351}
352
353/// Apply Go's sticky `PopBrowserURL` semantics to the consent-URL `watch` cell.
354///
355/// Control sends `MapResponse.PopBrowserURL` empty on nearly every netmap update, so the cell is
356/// updated ONLY when `incoming` is a non-empty URL that differs from the cell's current value —
357/// Go's `direct.go` guard `u != "" && u != sess.lastPopBrowserURL`. The cell is **never reset to
358/// `None`** by an empty/absent update — the running-node consent URL is sticky for the session.
359/// Updating unconditionally would thrash the cell to `None` on every tick and coalesce the URL away
360/// for a `watch`/bus subscriber.
361///
362/// The dedupe is in-place via [`watch::Sender::send_if_modified`] — the cell's own value is the
363/// "last URL sent" (this sticky path is its only writer), so no separate mirror field is needed and
364/// the watch is woken only on a genuine change (Go's `sess.lastPopBrowserURL` role, for free). This
365/// matches the [`send_if_modified`](watch::Sender::send_if_modified) idiom already used for the
366/// device-state cell in this handler.
367///
368/// Factored out of the netmap-update handler so the (easy-to-regress) sticky logic is unit-testable
369/// against a plain `watch` channel without standing up the actor.
370fn sticky_update_pop_browser_url(
371 cell: &watch::Sender<Option<url::Url>>,
372 incoming: Option<&url::Url>,
373) {
374 if let Some(url) = incoming {
375 cell.send_if_modified(|current| {
376 if current.as_ref() == Some(url) {
377 false
378 } else {
379 *current = Some(url.clone());
380 true
381 }
382 });
383 }
384}
385
386/// Map a mid-session re-auth URL surfaced by the control client onto the device-state cell.
387///
388/// The control client's live map-poll loop publishes an `Option<url::Url>` into a `watch` cell when
389/// a re-register hits `MachineNotAuthorized` (the node key expired/was revoked mid-session — see
390/// [`ts_control::AsyncControlClient::connect`]'s `auth_url_tx`). `ts_control` cannot name
391/// [`DeviceState`] (it must not depend on this crate), so this bridge fn does the translation:
392/// a `Some(url)` sets [`DeviceState::NeedsLogin`]`(url)` so the IPN bus derives `browse_to_url` and
393/// the embedder can prompt the user, exactly like the initial-registration `check_auth` path.
394///
395/// **Only `Some` drives a transition; `None` is ignored here.** The clear back to
396/// [`DeviceState::Running`] is owned by the netmap self-node handler (the next good self-node flips
397/// it — see the `StreamMessage::Next` arm), which is the authoritative "we are up again" signal; an
398/// independent `None`-clear in this bridge could race that and is unnecessary. The
399/// [`send_if_modified`](watch::Sender::send_if_modified) guard fires the watch only on a genuine
400/// state change (it is a no-op when the cell already holds `NeedsLogin(url)` for the same URL), so a
401/// re-auth URL re-surfaced across retries does not thrash the cell — mirroring the device-state
402/// dedupe in the netmap handler.
403///
404/// Factored out so the (regress-prone) map-and-guard is unit-testable against a plain `watch`
405/// channel without standing up the actor (mirrors [`sticky_update_pop_browser_url`]).
406pub(crate) fn bridge_reauth_url_to_state(
407 state_tx: &watch::Sender<crate::DeviceState>,
408 incoming: Option<&url::Url>,
409) {
410 if let Some(url) = incoming {
411 let next = crate::DeviceState::NeedsLogin(url.clone());
412 state_tx.send_if_modified(|current| {
413 if *current == next {
414 false
415 } else {
416 *current = next.clone();
417 true
418 }
419 });
420 }
421}
422
423// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
424// those generated fields carry no doc and can't take attributes, so wrap in a module where
425// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
426// are re-exported so callers keep referencing them at `control_runner::<Name>`.
427pub use msg_impl::*;
428
429#[allow(missing_docs)]
430mod msg_impl {
431 use kameo::{message::Context, reply::DelegatedReply};
432
433 use super::*;
434
435 #[kameo::messages]
436 impl ControlRunner {
437 /// Fetch the IPv4 address for this tailscale device.
438 #[message(ctx)]
439 pub fn ipv4(
440 &self,
441 ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
442 ) -> DelegatedReply<Option<Ipv4Addr>> {
443 let (deleg, replier) = ctx.reply_sender();
444
445 if let Some(replier) = replier {
446 let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
447
448 tokio::spawn(async move {
449 let ip = fut.await;
450 replier.send(ip);
451 });
452 }
453
454 deleg
455 }
456
457 /// Fetch the IPv6 address for this tailscale device.
458 #[message(ctx)]
459 pub fn ipv6(
460 &self,
461 ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
462 ) -> DelegatedReply<Option<Ipv6Addr>> {
463 let (deleg, replier) = ctx.reply_sender();
464
465 if let Some(replier) = replier {
466 let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
467
468 tokio::spawn(async move {
469 let ip = fut.await;
470 replier.send(ip);
471 });
472 }
473
474 deleg
475 }
476
477 /// Fetch the self node for this tailscale device.
478 #[message(ctx)]
479 pub fn self_node(
480 &self,
481 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
482 ) -> DelegatedReply<Option<Node>> {
483 let (deleg, replier) = ctx.reply_sender();
484
485 if let Some(replier) = replier {
486 let node = self.with_self_node(|node| node.clone());
487
488 tokio::spawn(async move {
489 let node = node.await;
490 replier.send(node)
491 });
492 }
493
494 deleg
495 }
496
497 /// Fetch the current Tailscale SSH policy, if control has pushed one.
498 ///
499 /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
500 /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
501 /// for a value: an absent policy is a legitimate, immediate answer.
502 #[message]
503 pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
504 self.ssh_policy.borrow().clone()
505 }
506
507 /// Fetch the current Tailnet Lock status, if control has pushed one.
508 ///
509 /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
510 #[message]
511 pub fn current_tka_status(&self) -> Option<TkaStatus> {
512 self.tka.borrow().clone()
513 }
514
515 /// Sign `node_key` directly with this node's network-lock key and submit the signature to
516 /// control (Go `tka.sign` for the Direct case → `tkaSubmitSignature`).
517 ///
518 /// Builds a `Direct` [`NodeKeySignature`](ts_tka::NodeKeySignature) via
519 /// [`sign_direct`](ts_tka::NodeKeySignature::sign_direct) over this node's inner ed25519
520 /// network-lock signing key, serializes it (raw CBOR), and POSTs it to `/machine/tka/sign`.
521 /// Mirrors `set_dns`/`get_certificate`: clones the control config + node keys into a spawned
522 /// task (delegated reply, so the round-trip doesn't block the mailbox) over a fresh Noise
523 /// channel.
524 ///
525 /// **Posture: this only *submits* a signature to control — it does NOT mutate the local
526 /// [`Authority`](ts_tka::Authority).** The local trusted-key state advances solely through the
527 /// existing verified-sync path (`sync_tka` → `VerifiedAumChain::verify`); a `tka_sign` success
528 /// is acknowledged to the caller, and the resulting AUM is picked up on the next netmap-driven
529 /// sync. Verify-and-log is unchanged.
530 #[message(ctx)]
531 pub fn tka_sign(
532 &self,
533 ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
534 node_key: [u8; 32],
535 ) -> DelegatedReply<Result<(), TkaSyncError>> {
536 let (deleg, replier) = ctx.reply_sender();
537
538 if let Some(replier) = replier {
539 let config = self.params.config.clone();
540 let keys = self.params.env.keys.clone();
541 tokio::spawn(async move {
542 // Sign the node key with our network-lock key, then submit the raw-CBOR NKS.
543 let nks = ts_tka::NodeKeySignature::sign_direct(
544 &node_key,
545 &keys.network_lock_keys.private.signing_key(),
546 );
547 let req = ts_control::TkaSubmitSignatureRequest {
548 // node_key + version are stamped by the RPC client from `keys`.
549 version: Default::default(),
550 node_key: keys.node_keys.public,
551 signature: nks.serialize(),
552 };
553 let result = tka_submit_signature(
554 &config.server_url,
555 &keys,
556 req,
557 config.allow_http_key_fetch,
558 )
559 .await
560 .map(|_response| ());
561 replier.send(result);
562 });
563 }
564
565 deleg
566 }
567
568 /// Disable Tailnet Lock by presenting the disablement secret to control (Go
569 /// `tka.disable` → `/machine/tka/disable`).
570 ///
571 /// Targets the **current** authority head (read from the cached [`TkaStatus`]); the caller
572 /// supplies the `disablement_secret` out of band (it is the operator-held capability that
573 /// authorizes turning the lock off). Mirrors `tka_sign`: clones config + keys into a spawned
574 /// task (delegated reply). Returns [`TkaSyncError::Unsupported`] when there is no known TKA
575 /// head (lock not in use / control hasn't pushed a status), since there is nothing to disable.
576 ///
577 /// **Submit-only, like `tka_sign`:** this POSTs the disablement to control and does NOT mutate
578 /// the local [`Authority`](ts_tka::Authority). Control acts on the disablement; this node
579 /// observes the result through the existing verified-sync path. Verify-and-log unchanged.
580 #[message(ctx)]
581 pub fn tka_disable(
582 &self,
583 ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
584 disablement_secret: Vec<u8>,
585 ) -> DelegatedReply<Result<(), TkaSyncError>> {
586 let (deleg, replier) = ctx.reply_sender();
587
588 if let Some(replier) = replier {
589 // Read the current head from the cached status BEFORE the spawn (can't borrow &self
590 // across the await). No head ⇒ no lock to disable ⇒ Unsupported.
591 let head = self.tka.borrow().as_ref().map(|s| s.head.clone());
592 let config = self.params.config.clone();
593 let keys = self.params.env.keys.clone();
594 tokio::spawn(async move {
595 let result = match head {
596 Some(head) => {
597 let req = ts_control::TkaDisableRequest {
598 // node_key + version are stamped by the RPC client from `keys`.
599 version: Default::default(),
600 node_key: keys.node_keys.public,
601 head,
602 disablement_secret,
603 };
604 tka_disable(&config.server_url, &keys, req, config.allow_http_key_fetch)
605 .await
606 .map(|_response| ())
607 }
608 None => Err(TkaSyncError::Unsupported),
609 };
610 replier.send(result);
611 });
612 }
613
614 deleg
615 }
616
617 /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
618 ///
619 /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
620 /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
621 /// does not block waiting for a value).
622 #[message]
623 pub fn cert_domains(&self) -> Vec<String> {
624 self.cert_domains.borrow().clone()
625 }
626
627 /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
628 /// control has sent no DNS config yet. An immediate answer (does not block); the facade
629 /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
630 #[message]
631 pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
632 self.dns_config.borrow().clone()
633 }
634
635 /// The interactive-login / consent URL control last asked this node to open
636 /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
637 /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
638 #[message]
639 pub fn pop_browser_url(&self) -> Option<url::Url> {
640 self.pop_browser_url.borrow().clone()
641 }
642
643 /// Subscribe to the interactive-login / consent URL cell (`MapResponse.PopBrowserURL`).
644 ///
645 /// Returns a [`watch::Receiver`] whose value is the latest running-node consent URL, used by
646 /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) to surface `browse_to_url`
647 /// events mid-session. The cell is sticky (updated only on a new non-empty URL, never reset
648 /// to `None` by an empty update — see the field docs), so a subscriber is not thrashed and a
649 /// late subscriber sees the current URL. The initial value is `None` until control sends one.
650 #[message(derive(Clone))]
651 pub fn watch_browser_url(&self) -> watch::Receiver<Option<url::Url>> {
652 self.pop_browser_url.subscribe()
653 }
654
655 /// The latest network-conditions report (preferred DERP region + per-region latencies). An
656 /// immediate answer (does not block); empty before the first DERP-latency measurement. The
657 /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
658 #[message]
659 pub fn netcheck(&self) -> crate::status::NetcheckReport {
660 self.netcheck.borrow().clone()
661 }
662
663 /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
664 ///
665 /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
666 /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
667 /// for the round-trip.
668 #[message(ctx)]
669 pub fn fetch_id_token(
670 &self,
671 ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
672 audience: String,
673 ) -> DelegatedReply<Result<String, IdTokenError>> {
674 let (deleg, replier) = ctx.reply_sender();
675
676 if let Some(replier) = replier {
677 let config = self.params.config.clone();
678 let keys = self.params.env.keys.clone();
679 tokio::spawn(async move {
680 let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
681 replier.send(result);
682 });
683 }
684
685 deleg
686 }
687
688 /// Log this node out of the tailnet: deregister it by expiring its current node key.
689 ///
690 /// Mirrors `fetch_id_token`: clones the control config + node keys
691 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
692 /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
693 /// control-plane state change only — it does NOT stop this actor or tear down the datapath
694 /// (the caller follows up with the normal runtime shutdown), and it does not touch the
695 /// on-disk node key, so re-registering with the same key is the re-login path.
696 #[message(ctx)]
697 pub fn logout(
698 &self,
699 ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
700 ) -> DelegatedReply<Result<(), LogoutError>> {
701 let (deleg, replier) = ctx.reply_sender();
702
703 if let Some(replier) = replier {
704 let config = self.params.config.clone();
705 let keys = self.params.env.keys.clone();
706 tokio::spawn(async move {
707 let result = ts_control::logout(&config, &keys).await;
708 replier.send(result);
709 });
710 }
711
712 deleg
713 }
714
715 /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
716 /// `LocalClient.SetDNS`).
717 ///
718 /// Mirrors `fetch_id_token`: clones the control config + node keys
719 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
720 /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
721 /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
722 /// match, so the surfaced API takes only `name` + `value`.
723 #[message(ctx)]
724 pub fn set_dns(
725 &self,
726 ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
727 name: String,
728 value: String,
729 ) -> DelegatedReply<Result<(), SetDnsError>> {
730 let (deleg, replier) = ctx.reply_sender();
731
732 if let Some(replier) = replier {
733 let config = self.params.config.clone();
734 let keys = self.params.env.keys.clone();
735 tokio::spawn(async move {
736 let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
737 replier.send(result);
738 });
739 }
740
741 deleg
742 }
743 }
744
745 /// The reply type of the [`get_cert_pair`](ControlRunner::get_cert_pair) message: the issued
746 /// `(cert_chain_pem, key_pem)` PEM pair (the `tnet cert` surface) or a [`ts_control::CertError`].
747 /// Aliased so the message's `Context` type stays under clippy's `type_complexity` bar (the
748 /// nested `Result<(String, String), _>` trips it inline).
749 #[cfg(feature = "acme")]
750 pub type CertPairReply = Result<(String, String), ts_control::CertError>;
751
752 // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
753 // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
754 // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
755 // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
756 // block keeps the default build clean.
757 #[cfg(feature = "acme")]
758 #[kameo::messages]
759 impl ControlRunner {
760 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
761 /// client-side ACME DNS-01 engine (`acme` feature).
762 ///
763 /// Mirrors `fetch_id_token`: clones the control config + node keys
764 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
765 /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
766 /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
767 ///
768 /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
769 /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
770 /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
771 /// allows it). Persisting a generated key back into the key file is the embedder's job (no
772 /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
773 /// publish 501s.
774 #[message(ctx)]
775 pub fn get_certificate(
776 &self,
777 ctx: &mut Context<
778 Self,
779 DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
780 >,
781 name: String,
782 ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
783 let (deleg, replier) = ctx.reply_sender();
784
785 if let Some(replier) = replier {
786 let config = self.params.config.clone();
787 let keys = self.params.env.keys.clone();
788 tokio::spawn(async move {
789 let result = issue_certificate(&config, &keys, &name).await;
790 replier.send(result);
791 });
792 }
793
794 deleg
795 }
796
797 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` and return the
798 /// **PEM pair** — `(cert_chain_pem, key_pem)` — for writing the on-disk `.crt` + `.key`
799 /// (the daemon's `tnet cert`, Go's `LocalClient.CertPair`). `acme` feature.
800 ///
801 /// Identical issuance to [`get_certificate`](Self::get_certificate) (same client-side ACME
802 /// DNS-01 flow, same set-dns publish, same account-key handling), only the *shape* of the
803 /// result differs: this surfaces the raw chain + leaf-key PEMs instead of the opaque
804 /// [`CertifiedKey`](ts_control::tls::CertifiedKey). The leaf **private key** PEM is the
805 /// second tuple element and is NEVER logged — the spawned task sends it straight back to the
806 /// replier. SaaS-only: against a self-hosted control plane the set-dns publish 501s.
807 #[message(ctx)]
808 pub fn get_cert_pair(
809 &self,
810 ctx: &mut Context<Self, DelegatedReply<CertPairReply>>,
811 name: String,
812 ) -> DelegatedReply<CertPairReply> {
813 let (deleg, replier) = ctx.reply_sender();
814
815 if let Some(replier) = replier {
816 let config = self.params.config.clone();
817 let keys = self.params.env.keys.clone();
818 tokio::spawn(async move {
819 let result = issue_cert_pair(&config, &keys, &name).await;
820 replier.send(result);
821 });
822 }
823
824 deleg
825 }
826 }
827}
828
829/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
830/// returning just the ready-to-serve [`CertifiedKey`](ts_control::tls::CertifiedKey) (the
831/// `get_certificate` / `ListenTLS` path).
832///
833/// Thin wrapper over [`issue_cert_pair`] that drops the PEMs — one issuance, this caller just
834/// doesn't need the on-disk pair. See [`issue_cert_pair`] for the account-key handling.
835#[cfg(feature = "acme")]
836async fn issue_certificate(
837 config: &ts_control::Config,
838 keys: &ts_keys::NodeState,
839 name: &str,
840) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
841 issue_cert_pair_inner(config, keys, name)
842 .await
843 .map(|issued| issued.certified)
844}
845
846/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
847/// returning the **PEM pair** `(cert_chain_pem, key_pem)` for the daemon's on-disk `.crt`/`.key`
848/// (`tnet cert`, Go `LocalClient.CertPair`).
849///
850/// Same single issuance as [`issue_certificate`]; only the result shape differs. The leaf
851/// **private key** PEM is the second element and is NEVER logged here.
852#[cfg(feature = "acme")]
853async fn issue_cert_pair(
854 config: &ts_control::Config,
855 keys: &ts_keys::NodeState,
856 name: &str,
857) -> Result<(String, String), ts_control::CertError> {
858 issue_cert_pair_inner(config, keys, name)
859 .await
860 .map(|issued| (issued.cert_chain_pem, issued.key_pem))
861}
862
863/// Shared issuance core for [`issue_certificate`] and [`issue_cert_pair`]: load (or generate) the
864/// ACME account key, target Let's Encrypt production, and run one DNS-01 issuance, returning the
865/// full [`IssuedCert`](ts_control::acme::IssuedCert) so each caller projects out what it needs (one
866/// ACME order, two consumers).
867///
868/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
869/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
870/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
871/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]). Never logs the leaf
872/// private key.
873#[cfg(feature = "acme")]
874async fn issue_cert_pair_inner(
875 config: &ts_control::Config,
876 keys: &ts_keys::NodeState,
877 name: &str,
878) -> Result<ts_control::acme::IssuedCert, ts_control::CertError> {
879 let account_key = match keys.acme_account_key.as_deref() {
880 Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
881 None => {
882 tracing::debug!(
883 "no persisted ACME account key in key state; generating an ephemeral per-call key \
884 (a new ACME account this issuance — not persisted back)"
885 );
886 ts_control::acme::AcmeAccountKey::generate()?.0
887 }
888 };
889 let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
890 .parse()
891 .map_err(|e| {
892 ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
893 })?;
894 ts_control::issue_cert_pair_via_setdns(config, keys, name, &account_key, &directory).await
895}
896
897impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
898 type Reply = ();
899
900 async fn handle(
901 &mut self,
902 msg: StreamMessage<Arc<StateUpdate>, (), ()>,
903 ctx: &mut Context<Self, Self::Reply>,
904 ) {
905 match msg {
906 StreamMessage::Started(_) => {
907 tracing::trace!("started listening to state updates");
908 }
909
910 StreamMessage::Next(msg) => {
911 if let Some(node) = msg.node.as_ref() {
912 // Reflect node-key expiry into the device state: control delivering a self-node
913 // whose key is in the past means the node must re-authenticate. Otherwise the
914 // arrival of a fresh self-node confirms we are Running (recovers the state if a
915 // prior update had flipped it to Expired).
916 let now_unix = std::time::SystemTime::now()
917 .duration_since(std::time::UNIX_EPOCH)
918 .map(|d| d.as_secs() as i64)
919 .unwrap_or(0);
920 let next = if node.key_expired_at_unix(now_unix) {
921 crate::DeviceState::Expired
922 } else {
923 crate::DeviceState::Running
924 };
925 // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
926 // self-node arrives on every netmap update).
927 self.params.state_tx.send_if_modified(|s| {
928 if *s != next {
929 *s = next.clone();
930 true
931 } else {
932 false
933 }
934 });
935
936 self.self_node.send_replace(Some(node.clone()));
937 }
938
939 if let Some(policy) = msg.ssh_policy.as_ref() {
940 self.ssh_policy.send_replace(Some(policy.clone()));
941 }
942
943 if let Some(tka) = msg.tka.as_ref() {
944 self.tka.send_replace(Some(tka.clone()));
945 self.maybe_sync_tka(tka, ctx.actor_ref().clone());
946 }
947
948 // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
949 // An update with no DNS config, or one carrying no cert domains, means "none" — Go
950 // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
951 let cert_domains = msg
952 .dns_config
953 .as_ref()
954 .map(|d| d.cert_domains.clone())
955 .unwrap_or_default();
956 self.cert_domains.send_replace(cert_domains);
957
958 // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
959 // `None` when control sent no DNS config on this update — distinct from a present but
960 // empty config (Go `netmap.NetworkMap.DNS`).
961 self.dns_config.send_replace(msg.dns_config.clone());
962
963 // Track the interactive-login URL for `Device::pop_browser_url` /
964 // `Runtime::watch_ipn_bus`. See `sticky_update_pop_browser_url` for the Go-faithful
965 // sticky semantics (update only on a new non-empty URL; never reset to `None`).
966 sticky_update_pop_browser_url(&self.pop_browser_url, msg.pop_browser_url.as_ref());
967
968 if let Err(e) = self.params.env.publish(msg).await {
969 tracing::error!(error = %e, "publishing netmap update");
970 }
971 }
972
973 StreamMessage::Finished(_) => {
974 tracing::error!("state update stream terminated")
975 }
976 }
977 }
978}
979
980/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
981/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
982/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
983/// [`ControlRunner::apply_tka_synced`](ControlRunner).
984#[doc(hidden)]
985pub struct TkaSynced {
986 pub(crate) result:
987 Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
988}
989
990impl Message<TkaSynced> for ControlRunner {
991 type Reply = ();
992
993 async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
994 self.apply_tka_synced(msg.result).await;
995 }
996}
997
998impl Message<DerpLatencyMeasurement> for ControlRunner {
999 type Reply = ();
1000
1001 async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
1002 let measurements = msg.measurement.as_ref().clone();
1003
1004 // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
1005 // the same measurements, before the home-region short-circuit below — an empty set still
1006 // yields a (default/empty) report rather than a stale one.
1007 self.netcheck
1008 .send_replace(crate::status::NetcheckReport::from_region_results(
1009 &measurements,
1010 ));
1011
1012 let Some(result) = measurements.first() else {
1013 tracing::debug!("derp latency measurements empty");
1014 return;
1015 };
1016
1017 let iter = measurements.iter().map(|result| {
1018 (
1019 result.latency_map_key.as_str(),
1020 result.latency.as_secs_f64(),
1021 )
1022 });
1023
1024 tracing::debug!(selected_region_id = ?result.id, "updating home region");
1025
1026 self.client.set_home_region(result.id, iter).await;
1027 }
1028}
1029
1030impl Message<EndpointAdvertisement> for ControlRunner {
1031 type Reply = ();
1032
1033 async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
1034 let endpoints: Vec<Endpoint> = msg
1035 .endpoints
1036 .iter()
1037 .map(|ep| Endpoint {
1038 endpoint: ep.addr,
1039 ty: match ep.ty {
1040 SelfEndpointType::Local => EndpointType::Local,
1041 SelfEndpointType::Stun => EndpointType::Stun,
1042 SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
1043 },
1044 })
1045 .collect();
1046
1047 tracing::debug!(
1048 n_endpoints = endpoints.len(),
1049 "advertising endpoints to control"
1050 );
1051
1052 self.client.set_endpoints(endpoints).await;
1053 }
1054}
1055
1056/// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control — the wire
1057/// half of a runtime [`Runtime::set_advertise_routes`](crate::Runtime::set_advertise_routes). Sent
1058/// as a direct `ask` from the runtime (not over the bus), so the route change reaches the live
1059/// map-poll client. `routes` is the final advertised set the caller wants control to grant.
1060#[derive(Debug)]
1061pub struct SetAdvertiseRoutes {
1062 /// The prefixes to advertise to control (already filtered to the final set).
1063 pub routes: Vec<ipnet::IpNet>,
1064}
1065
1066impl Message<SetAdvertiseRoutes> for ControlRunner {
1067 type Reply = ();
1068
1069 async fn handle(&mut self, msg: SetAdvertiseRoutes, _ctx: &mut Context<Self, Self::Reply>) {
1070 tracing::debug!(n_routes = msg.routes.len(), "advertising routes to control");
1071 self.client.set_routable_ips(msg.routes).await;
1072 }
1073}
1074
1075/// Update this node's `Hostinfo.Hostname` at control — the wire half of a runtime
1076/// [`Runtime::set_hostname`](crate::Runtime::set_hostname). A direct `ask` from the runtime, so the
1077/// change reaches the live map-poll client.
1078#[derive(Debug)]
1079pub struct SetHostname {
1080 /// The new hostname to report to control.
1081 pub hostname: String,
1082}
1083
1084impl Message<SetHostname> for ControlRunner {
1085 type Reply = ();
1086
1087 async fn handle(&mut self, msg: SetHostname, _ctx: &mut Context<Self, Self::Reply>) {
1088 tracing::debug!("updating hostname at control");
1089 self.client.set_hostname(msg.hostname).await;
1090 }
1091}
1092
1093#[cfg(test)]
1094mod reauth_bridge_tests {
1095 use tokio::sync::watch;
1096
1097 use super::bridge_reauth_url_to_state;
1098 use crate::DeviceState;
1099
1100 fn url(s: &str) -> url::Url {
1101 s.parse().unwrap()
1102 }
1103
1104 /// The bridge maps a surfaced re-auth URL onto `DeviceState::NeedsLogin(url)` — the fix's core:
1105 /// a mid-session `MachineNotAuthorized` (forwarded by the control client as `Some(url)`) becomes
1106 /// the "needs login" state the IPN bus turns into `browse_to_url`.
1107 #[test]
1108 fn bridge_maps_auth_url_to_needs_login() {
1109 let u = url("https://login.example/auth");
1110 let (tx, rx) = watch::channel(DeviceState::Running);
1111
1112 bridge_reauth_url_to_state(&tx, Some(&u));
1113
1114 assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1115 }
1116
1117 /// `None` never drives a transition — the recovery to `Running` is the netmap self-node
1118 /// handler's job, so the bridge ignores a `None` and leaves the state untouched.
1119 #[test]
1120 fn bridge_none_leaves_state_unchanged() {
1121 let (tx, rx) = watch::channel(DeviceState::Running);
1122
1123 bridge_reauth_url_to_state(&tx, None);
1124
1125 assert_eq!(*rx.borrow(), DeviceState::Running);
1126 }
1127
1128 /// Re-surfacing the same URL across retries does not re-fire the watch (`send_if_modified`
1129 /// dedupe against the cell's current value), so a stuck re-auth does not thrash subscribers.
1130 #[test]
1131 fn bridge_same_url_does_not_refire() {
1132 let u = url("https://login.example/auth");
1133 let (tx, mut rx) = watch::channel(DeviceState::Running);
1134
1135 bridge_reauth_url_to_state(&tx, Some(&u)); // first: fires
1136 assert!(rx.has_changed().unwrap(), "first NeedsLogin fires");
1137 rx.mark_unchanged();
1138 bridge_reauth_url_to_state(&tx, Some(&u)); // same URL: deduped
1139 assert!(
1140 !rx.has_changed().unwrap(),
1141 "the same re-auth URL must not re-fire the state watch"
1142 );
1143 }
1144
1145 /// A genuinely different re-auth URL after a prior one fires again (the dedupe tracks changes,
1146 /// it does not pin the first URL forever).
1147 #[test]
1148 fn bridge_new_url_after_prior_fires() {
1149 let a = url("https://login.example/a");
1150 let b = url("https://login.example/b");
1151 let (tx, rx) = watch::channel(DeviceState::Running);
1152
1153 bridge_reauth_url_to_state(&tx, Some(&a));
1154 bridge_reauth_url_to_state(&tx, Some(&b));
1155
1156 assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(b));
1157 }
1158
1159 /// End-to-end of the *clear* contract: after the bridge sets `NeedsLogin`, the netmap self-node
1160 /// path (modeled here as a direct `send_replace(Running)`, the exact transition the
1161 /// `StreamMessage::Next` handler performs on the next good self-node) flips back to `Running`.
1162 /// This pins that the bridge does NOT need a `None`-clear arm — recovery is owned elsewhere.
1163 #[test]
1164 fn running_netmap_clears_needs_login() {
1165 let u = url("https://login.example/auth");
1166 let (tx, rx) = watch::channel(DeviceState::Running);
1167
1168 bridge_reauth_url_to_state(&tx, Some(&u));
1169 assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1170
1171 // The self-node handler's recovery transition (next good netmap self-node → Running).
1172 tx.send_replace(DeviceState::Running);
1173 assert_eq!(*rx.borrow(), DeviceState::Running);
1174 }
1175}
1176
1177#[cfg(test)]
1178mod sticky_pop_browser_url_tests {
1179 use tokio::sync::watch;
1180
1181 use super::sticky_update_pop_browser_url;
1182
1183 fn url(s: &str) -> url::Url {
1184 s.parse().unwrap()
1185 }
1186
1187 /// A non-empty URL publishes to the cell.
1188 #[test]
1189 fn non_empty_url_publishes() {
1190 let (tx, rx) = watch::channel(None);
1191 let u = url("https://login.example/consent");
1192 sticky_update_pop_browser_url(&tx, Some(&u));
1193 assert_eq!(*rx.borrow(), Some(u));
1194 }
1195
1196 /// An absent (`None`) update — the common netmap tick — must NOT reset the cell. This is the
1197 /// regression guard for the thrash bug (a reset-every-tick would coalesce the URL away on the bus).
1198 #[test]
1199 fn absent_update_does_not_reset() {
1200 let u = url("https://login.example/consent");
1201 let (tx, rx) = watch::channel(Some(u.clone()));
1202 // Simulate many empty netmap updates.
1203 for _ in 0..5 {
1204 sticky_update_pop_browser_url(&tx, None);
1205 }
1206 assert_eq!(
1207 *rx.borrow(),
1208 Some(u),
1209 "empty updates must not clear the URL"
1210 );
1211 }
1212
1213 /// The same URL repeated does not re-fire the watch (in-place dedupe via `send_if_modified`), so
1214 /// a subscriber isn't woken spuriously. Proven by the borrow not having been marked changed.
1215 #[test]
1216 fn repeated_same_url_does_not_refire() {
1217 let u = url("https://login.example/consent");
1218 let (tx, mut rx) = watch::channel(None);
1219 sticky_update_pop_browser_url(&tx, Some(&u)); // first: fires
1220 assert!(rx.has_changed().unwrap(), "first non-empty URL fires");
1221 rx.mark_unchanged();
1222 sticky_update_pop_browser_url(&tx, Some(&u)); // same: deduped
1223 assert!(
1224 !rx.has_changed().unwrap(),
1225 "repeating the same URL must not re-fire the watch"
1226 );
1227 }
1228
1229 /// A genuinely new URL after a prior one fires again (sticky but tracks changes).
1230 #[test]
1231 fn new_url_after_prior_fires() {
1232 let a = url("https://login.example/a");
1233 let b = url("https://login.example/b");
1234 let (tx, rx) = watch::channel(None);
1235 sticky_update_pop_browser_url(&tx, Some(&a));
1236 sticky_update_pop_browser_url(&tx, Some(&b));
1237 assert_eq!(*rx.borrow(), Some(b));
1238 }
1239
1240 /// The realistic session sequence: a URL stays sticky through a run of `None` ticks, and a
1241 /// *different* URL after that gap still fires. Chains the legs the other tests cover in isolation
1242 /// (the actual control cadence is "URL, then many empty updates, then maybe a new URL").
1243 #[test]
1244 fn sticky_through_none_gap_then_new_url_fires() {
1245 let a = url("https://login.example/a");
1246 let b = url("https://login.example/b");
1247 let (tx, rx) = watch::channel(None);
1248 sticky_update_pop_browser_url(&tx, Some(&a));
1249 for _ in 0..3 {
1250 sticky_update_pop_browser_url(&tx, None);
1251 }
1252 assert_eq!(*rx.borrow(), Some(a), "stayed sticky through the None gap");
1253 sticky_update_pop_browser_url(&tx, Some(&b));
1254 assert_eq!(
1255 *rx.borrow(),
1256 Some(b),
1257 "a new URL after a None gap still fires"
1258 );
1259 }
1260
1261 /// Returning to a previously-seen URL (A → B → A) re-fires: the dedupe is against the cell's
1262 /// *current* value, not a full history, so A after B is a genuine change.
1263 #[test]
1264 fn returning_to_prior_url_refires() {
1265 let a = url("https://login.example/a");
1266 let b = url("https://login.example/b");
1267 let (tx, mut rx) = watch::channel(None);
1268 sticky_update_pop_browser_url(&tx, Some(&a));
1269 sticky_update_pop_browser_url(&tx, Some(&b));
1270 rx.mark_unchanged();
1271 sticky_update_pop_browser_url(&tx, Some(&a)); // back to A: differs from current (B) → fires
1272 assert!(
1273 rx.has_changed().unwrap(),
1274 "returning to a prior URL re-fires"
1275 );
1276 assert_eq!(*rx.borrow(), Some(a));
1277 }
1278
1279 /// End-to-end de-thrash: feed a realistic netmap cadence (empty, empty, URL, empty, empty)
1280 /// through the producer into a cell, and count the changes a `run_bus`-style subscriber would
1281 /// observe via `changed()`. The whole point of the fix is that exactly ONE change survives the
1282 /// surrounding `None` thrash — the pre-fix code (`send_replace` every tick) would have woken the
1283 /// subscriber on every empty tick and coalesced the URL away. This exercises the producer + the
1284 /// watch-subscribe path together (the two halves the unit tests cover in isolation).
1285 #[tokio::test]
1286 async fn end_to_end_one_change_survives_none_thrash() {
1287 let u = url("https://login.example/consent");
1288 let (tx, mut rx) = watch::channel(None);
1289 // The cadence control actually sends: mostly-empty MapResponses with one carrying the URL.
1290 let cadence = [None, None, Some(&u), None, None];
1291 for incoming in cadence {
1292 sticky_update_pop_browser_url(&tx, incoming);
1293 }
1294 // A subscriber sees exactly one change, and it carries the URL (not a coalesced `None`).
1295 let mut changes = 0;
1296 while rx.has_changed().unwrap() {
1297 let v = rx.borrow_and_update().clone();
1298 changes += 1;
1299 assert_eq!(v, Some(u.clone()), "the surviving change carries the URL");
1300 }
1301 assert_eq!(changes, 1, "exactly one change survives the None thrash");
1302 }
1303}