Skip to main content

ts_runtime/
control_runner.rs

1use core::{
2    net::{Ipv4Addr, Ipv6Addr},
3    time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9    actor::{ActorRef, Spawn},
10    message::{Context, StreamMessage},
11    prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15    AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16    Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus, TkaSyncError, tka_disable,
17    tka_init_begin, tka_init_finish, tka_submit_signature,
18};
19use ts_magicsock::SelfEndpointType;
20
21use crate::{
22    derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
23    direct::EndpointAdvertisement,
24};
25
26/// Actor responsible for maintaining the connection to control.
27///
28/// This actor is responsible for proxying the map response stream onto the message bus.
29pub struct ControlRunner {
30    client: AsyncControlClient,
31    params: Params,
32
33    self_node: watch::Sender<Option<Node>>,
34    /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
35    /// server reads this to authorize incoming connections; absent policy means deny-all.
36    ssh_policy: watch::Sender<Option<SshPolicy>>,
37    /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
38    tka: watch::Sender<Option<TkaStatus>>,
39    /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
40    /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
41    /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
42    /// the result returns via the [`TkaSynced`] self-message).
43    tka_synced: Option<crate::tka_sync::SyncedTka>,
44    /// Published copy of the synced TKA [`Authority`](ts_tka::Authority) for the verify-and-log
45    /// consumer. `None` until the first successful sync. Observe-only: a reader uses it to *log*
46    /// whether a peer's node-key signature verifies, never to drop a peer (enforcement is a separate
47    /// gated decision).
48    tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
49    /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
50    /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
51    tka_syncing: bool,
52    /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
53    /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
54    cert_domains: watch::Sender<Vec<String>>,
55    /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
56    /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
57    /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
58    /// own cell for the narrower TLS-cert use.
59    dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
60    /// Latest interactive-login / consent URL control asked this node to open
61    /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
62    /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user), and
63    /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) subscribes to it for the bus's
64    /// `browse_to_url` running-node events.
65    ///
66    /// **Sticky, not per-update** (Go `controlclient` `sess.lastPopBrowserURL`): control sends
67    /// `MapResponse.PopBrowserURL` empty on nearly every netmap tick, so this cell is updated ONLY on
68    /// a non-empty URL that differs from its current value (`sticky_update_pop_browser_url`, via
69    /// `send_if_modified` — the cell's own value is the "last URL seen", so no separate mirror is
70    /// needed). It is never reset to `None` by an empty update — matching Go's `direct.go` guard
71    /// `u != "" && u != sess.lastPopBrowserURL`. Updating on every tick would thrash the cell to
72    /// `None` and coalesce the URL away for a `watch` subscriber.
73    pop_browser_url: watch::Sender<Option<url::Url>>,
74    /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
75    /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
76    /// daemon's `tnet netcheck`). Empty until the first measurement.
77    netcheck: watch::Sender<crate::status::NetcheckReport>,
78    /// Background task that bridges the control client's mid-session re-auth URL cell onto
79    /// [`Self::params`]'s device-state cell (sets [`DeviceState::NeedsLogin`] when control returns
80    /// `MachineNotAuthorized` on a live re-register — see [`bridge_reauth_url_to_state`]). Aborted on
81    /// [`Drop`] so it cannot outlive the actor (the [`DataplaneActor`](crate::dataplane) pattern).
82    reauth_bridge: tokio::task::JoinHandle<()>,
83}
84
85impl Drop for ControlRunner {
86    fn drop(&mut self) {
87        // Stop the re-auth bridge so it does not outlive the actor (mirrors `DataplaneActor`).
88        self.reauth_bridge.abort();
89    }
90}
91
92/// Control runner args.
93pub struct Params {
94    /// Control config.
95    pub(crate) config: ts_control::Config,
96
97    /// Auth key (if needed).
98    pub(crate) auth_key: Option<String>,
99
100    /// The [`crate::Env`] for this actor.
101    pub(crate) env: crate::Env,
102
103    /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
104    /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
105    /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
106    /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
107    pub(crate) state_tx: watch::Sender<crate::DeviceState>,
108}
109
110#[doc(hidden)]
111#[derive(Debug, thiserror::Error)]
112pub enum ControlRunnerError {
113    #[error(transparent)]
114    Control(#[from] ControlError),
115
116    #[error(transparent)]
117    Crate(#[from] crate::Error),
118}
119
120impl kameo::Actor for ControlRunner {
121    type Args = Params;
122    type Error = ControlRunnerError;
123
124    async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
125        loop {
126            match AsyncControlClient::check_auth(
127                &params.config,
128                &params.env.keys,
129                params.auth_key.as_deref(),
130            )
131            .await
132            {
133                Ok(()) => break,
134                Err(ControlError::MachineNotAuthorized(u)) => {
135                    tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
136                    // Surface "interactive login required" so a watcher / `wait_until_running` can
137                    // tell the user to authorize, instead of seeing an opaque timeout. Registration
138                    // keeps retrying (transient), so this is not a terminal `Failed`.
139                    params
140                        .state_tx
141                        .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
142                    tokio::time::sleep(Duration::from_secs(5)).await;
143                }
144                Err(e) => {
145                    // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
146                    // specific reason control gave AND publish it as a typed `Failed` state so
147                    // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
148                    // of the opaque `Internal(Actor)` the caller would otherwise see once the
149                    // stopped actor is next asked. Publishing before `return Err` is why the state
150                    // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
151                    let reason = crate::RegistrationError::from(&e);
152                    tracing::error!(error = %e, "registration failed; control runner stopping");
153                    params
154                        .state_tx
155                        .send_replace(crate::DeviceState::Failed(reason));
156                    return Err(e.into());
157                }
158            }
159        }
160        // check_auth succeeded, but the node is not "up" until the netmap stream is actually
161        // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
162        // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
163        // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
164        // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
165        // The control client's live map-poll loop publishes a mid-session re-auth URL here (set when
166        // a re-register returns `MachineNotAuthorized` because the node key expired/was revoked). The
167        // runtime owns the receiver; `connect` takes the sender. Created before `connect` so the
168        // sender is in place for the very first poll, and so the receiver outlives `bring_up`.
169        let (auth_url_tx, auth_url_rx) = watch::channel::<Option<url::Url>>(None);
170
171        let bring_up = async {
172            let (client, stream) = AsyncControlClient::connect(
173                &params.config,
174                &params.env.keys,
175                params.auth_key.as_deref(),
176                auth_url_tx,
177            )
178            .await?;
179
180            DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
181
182            params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
183            params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
184            slf.attach_stream(stream.boxed(), (), ());
185            Ok::<_, ControlRunnerError>(client)
186        };
187
188        let client = match bring_up.await {
189            Ok(client) => client,
190            Err(e) => {
191                tracing::error!(error = %e, "bringing up the control session failed");
192                // The control session never came up; surface it as a transient registration
193                // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
194                // stuck at `Connecting`.
195                params.state_tx.send_replace(crate::DeviceState::Failed(
196                    crate::RegistrationError::NetworkUnreachable,
197                ));
198                return Err(e);
199            }
200        };
201
202        // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
203        // current (and flips to `Expired` if the self-node's key lapses).
204        params.state_tx.send_replace(crate::DeviceState::Running);
205
206        // Bridge the control client's mid-session re-auth URL cell onto the device-state cell: a
207        // `Some(url)` (control returned `MachineNotAuthorized` on a live re-register) becomes
208        // `DeviceState::NeedsLogin(url)` so the IPN bus surfaces `browse_to_url` and the embedder can
209        // prompt the user — the live-session analogue of the initial `check_auth` loop above. The
210        // recovery to `Running` is the netmap self-node handler's job (next good self-node), so this
211        // bridge only forwards `Some`. The task ends when the sender drops (the client's `run` task
212        // ended) and is aborted on actor `Drop`, so it cannot leak past the actor.
213        let reauth_bridge = {
214            let state_tx = params.state_tx.clone();
215            let mut auth_url_rx = auth_url_rx;
216            tokio::spawn(async move {
217                while auth_url_rx.changed().await.is_ok() {
218                    let url = auth_url_rx.borrow_and_update().clone();
219                    bridge_reauth_url_to_state(&state_tx, url.as_ref());
220                }
221            })
222        };
223
224        Ok(Self {
225            client,
226            params,
227            self_node: Default::default(),
228            ssh_policy: Default::default(),
229            tka: Default::default(),
230            tka_synced: None,
231            tka_authority: Default::default(),
232            tka_syncing: false,
233            cert_domains: Default::default(),
234            dns_config: Default::default(),
235            pop_browser_url: Default::default(),
236            netcheck: Default::default(),
237            reauth_bridge,
238        })
239    }
240}
241
242impl ControlRunner {
243    /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
244    /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
245    /// round-trip). The result returns via the [`TkaSynced`] self-message.
246    ///
247    /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
248    /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
249    /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
250    /// `tkaSyncIfNeeded`: a no-op when our head already matches.
251    fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
252        if !tka.is_enabled() {
253            // Lock disabled (or never enabled): drop any synced state and stop publishing an
254            // Authority. Never an error; peers are unaffected.
255            if self.tka_synced.is_some() {
256                self.tka_synced = None;
257                self.tka_authority.send_replace(None);
258            }
259            return;
260        }
261        if self.tka_syncing {
262            return; // a sync is already in flight; the next netmap will re-trigger if still stale
263        }
264        // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
265        // do. A malformed control head is treated as "different" (we'll attempt a sync, which
266        // fail-closes harmlessly).
267        if let Some(synced) = &self.tka_synced
268            && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
269            && synced.authority.head_matches(&control_head)
270        {
271            return;
272        }
273
274        // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
275        // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
276        // `tka_syncing` so we don't spawn a second concurrent sync.
277        self.tka_syncing = true;
278        let current = self.tka_synced.take();
279        let config = self.params.config.clone();
280        let keys = self.params.env.keys.clone();
281        tokio::spawn(async move {
282            let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
283            // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
284            // not allowed). A send failure just means the actor is gone — nothing to do.
285            if let Err(e) = self_ref.tell(TkaSynced { result }).await {
286                tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
287            }
288        });
289    }
290
291    /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
292    /// state + publish the `Authority` (or, on inert/failed sync, leave peers unaffected). Always
293    /// clears the in-flight guard.
294    async fn apply_tka_synced(
295        &mut self,
296        result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
297    ) {
298        self.tka_syncing = false;
299        match result {
300            Ok(Some(synced)) => {
301                tracing::info!(
302                    head = %synced.authority.head().to_base32(),
303                    "TKA sync succeeded; publishing verified Authority (observe-only)"
304                );
305                self.tka_authority
306                    .send_replace(Some(synced.authority.clone()));
307                // Deliver the verified Authority to the peer tracker's observe-only verify-and-log
308                // seam (#136) over the bus. Re-published on every successful sync (no bus replay).
309                if let Err(e) = self
310                    .params
311                    .env
312                    .publish(crate::peer_tracker::TkaAuthorityUpdate(
313                        synced.authority.clone(),
314                    ))
315                    .await
316                {
317                    tracing::warn!(error = %e, "publishing TKA authority to peer tracker failed");
318                }
319                self.tka_synced = Some(synced);
320            }
321            Ok(None) => {
322                // Control has no lock for us (no genesis / disabled): stay inert. Not an error.
323                tracing::debug!("TKA sync: control reported no lock for this node (inert)");
324            }
325            Err(e) => {
326                // Transport or verify failure: log and stay inert. NEVER errors the netmap or drops a
327                // peer. The next netmap update re-triggers a sync attempt.
328                tracing::warn!(error = %e, "TKA sync failed; staying inert (no peer impact)");
329            }
330        }
331    }
332
333    fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
334    where
335        F: FnOnce(&Node) -> R,
336    {
337        let mut sub = self.self_node.subscribe();
338        let mut shutdown = self.params.env.shutdown.clone();
339
340        async move {
341            tokio::select! {
342                _ = shutdown.wait_for(|x| *x) => {
343                    None
344                },
345                node = sub.wait_for(Option::is_some) => {
346                    Some(f(node.ok()?.as_ref()?))
347                },
348            }
349        }
350    }
351}
352
353/// Apply Go's sticky `PopBrowserURL` semantics to the consent-URL `watch` cell.
354///
355/// Control sends `MapResponse.PopBrowserURL` empty on nearly every netmap update, so the cell is
356/// updated ONLY when `incoming` is a non-empty URL that differs from the cell's current value —
357/// Go's `direct.go` guard `u != "" && u != sess.lastPopBrowserURL`. The cell is **never reset to
358/// `None`** by an empty/absent update — the running-node consent URL is sticky for the session.
359/// Updating unconditionally would thrash the cell to `None` on every tick and coalesce the URL away
360/// for a `watch`/bus subscriber.
361///
362/// The dedupe is in-place via [`watch::Sender::send_if_modified`] — the cell's own value is the
363/// "last URL sent" (this sticky path is its only writer), so no separate mirror field is needed and
364/// the watch is woken only on a genuine change (Go's `sess.lastPopBrowserURL` role, for free). This
365/// matches the [`send_if_modified`](watch::Sender::send_if_modified) idiom already used for the
366/// device-state cell in this handler.
367///
368/// Factored out of the netmap-update handler so the (easy-to-regress) sticky logic is unit-testable
369/// against a plain `watch` channel without standing up the actor.
370fn sticky_update_pop_browser_url(
371    cell: &watch::Sender<Option<url::Url>>,
372    incoming: Option<&url::Url>,
373) {
374    if let Some(url) = incoming {
375        cell.send_if_modified(|current| {
376            if current.as_ref() == Some(url) {
377                false
378            } else {
379                *current = Some(url.clone());
380                true
381            }
382        });
383    }
384}
385
386/// Map a mid-session re-auth URL surfaced by the control client onto the device-state cell.
387///
388/// The control client's live map-poll loop publishes an `Option<url::Url>` into a `watch` cell when
389/// a re-register hits `MachineNotAuthorized` (the node key expired/was revoked mid-session — see
390/// [`ts_control::AsyncControlClient::connect`]'s `auth_url_tx`). `ts_control` cannot name
391/// [`DeviceState`] (it must not depend on this crate), so this bridge fn does the translation:
392/// a `Some(url)` sets [`DeviceState::NeedsLogin`]`(url)` so the IPN bus derives `browse_to_url` and
393/// the embedder can prompt the user, exactly like the initial-registration `check_auth` path.
394///
395/// **Only `Some` drives a transition; `None` is ignored here.** The clear back to
396/// [`DeviceState::Running`] is owned by the netmap self-node handler (the next good self-node flips
397/// it — see the `StreamMessage::Next` arm), which is the authoritative "we are up again" signal; an
398/// independent `None`-clear in this bridge could race that and is unnecessary. The
399/// [`send_if_modified`](watch::Sender::send_if_modified) guard fires the watch only on a genuine
400/// state change (it is a no-op when the cell already holds `NeedsLogin(url)` for the same URL), so a
401/// re-auth URL re-surfaced across retries does not thrash the cell — mirroring the device-state
402/// dedupe in the netmap handler.
403///
404/// Factored out so the (regress-prone) map-and-guard is unit-testable against a plain `watch`
405/// channel without standing up the actor (mirrors [`sticky_update_pop_browser_url`]).
406pub(crate) fn bridge_reauth_url_to_state(
407    state_tx: &watch::Sender<crate::DeviceState>,
408    incoming: Option<&url::Url>,
409) {
410    if let Some(url) = incoming {
411        let next = crate::DeviceState::NeedsLogin(url.clone());
412        state_tx.send_if_modified(|current| {
413            if *current == next {
414                false
415            } else {
416                *current = next.clone();
417                true
418            }
419        });
420    }
421}
422
423// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
424// those generated fields carry no doc and can't take attributes, so wrap in a module where
425// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
426// are re-exported so callers keep referencing them at `control_runner::<Name>`.
427pub use msg_impl::*;
428
429#[allow(missing_docs)]
430mod msg_impl {
431    use kameo::{message::Context, reply::DelegatedReply};
432
433    use super::*;
434
435    #[kameo::messages]
436    impl ControlRunner {
437        /// Fetch the IPv4 address for this tailscale device.
438        #[message(ctx)]
439        pub fn ipv4(
440            &self,
441            ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
442        ) -> DelegatedReply<Option<Ipv4Addr>> {
443            let (deleg, replier) = ctx.reply_sender();
444
445            if let Some(replier) = replier {
446                let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
447
448                tokio::spawn(async move {
449                    let ip = fut.await;
450                    replier.send(ip);
451                });
452            }
453
454            deleg
455        }
456
457        /// Fetch the IPv6 address for this tailscale device.
458        #[message(ctx)]
459        pub fn ipv6(
460            &self,
461            ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
462        ) -> DelegatedReply<Option<Ipv6Addr>> {
463            let (deleg, replier) = ctx.reply_sender();
464
465            if let Some(replier) = replier {
466                let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
467
468                tokio::spawn(async move {
469                    let ip = fut.await;
470                    replier.send(ip);
471                });
472            }
473
474            deleg
475        }
476
477        /// Fetch the self node for this tailscale device.
478        #[message(ctx)]
479        pub fn self_node(
480            &self,
481            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
482        ) -> DelegatedReply<Option<Node>> {
483            let (deleg, replier) = ctx.reply_sender();
484
485            if let Some(replier) = replier {
486                let node = self.with_self_node(|node| node.clone());
487
488                tokio::spawn(async move {
489                    let node = node.await;
490                    replier.send(node)
491                });
492            }
493
494            deleg
495        }
496
497        /// Fetch the current Tailscale SSH policy, if control has pushed one.
498        ///
499        /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
500        /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
501        /// for a value: an absent policy is a legitimate, immediate answer.
502        #[message]
503        pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
504            self.ssh_policy.borrow().clone()
505        }
506
507        /// Fetch the current Tailnet Lock status, if control has pushed one.
508        ///
509        /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
510        #[message]
511        pub fn current_tka_status(&self) -> Option<TkaStatus> {
512            self.tka.borrow().clone()
513        }
514
515        /// Sign `node_key` directly with this node's network-lock key and submit the signature to
516        /// control (Go `tka.sign` for the Direct case → `tkaSubmitSignature`).
517        ///
518        /// Builds a `Direct` [`NodeKeySignature`](ts_tka::NodeKeySignature) via
519        /// [`sign_direct`](ts_tka::NodeKeySignature::sign_direct) over this node's inner ed25519
520        /// network-lock signing key, serializes it (raw CBOR), and POSTs it to `/machine/tka/sign`.
521        /// Mirrors `set_dns`/`get_certificate`: clones the control config + node keys into a spawned
522        /// task (delegated reply, so the round-trip doesn't block the mailbox) over a fresh Noise
523        /// channel.
524        ///
525        /// **Posture: this only *submits* a signature to control — it does NOT mutate the local
526        /// [`Authority`](ts_tka::Authority).** The local trusted-key state advances solely through the
527        /// existing verified-sync path (`sync_tka` → `VerifiedAumChain::verify`); a `tka_sign` success
528        /// is acknowledged to the caller, and the resulting AUM is picked up on the next netmap-driven
529        /// sync. Verify-and-log is unchanged.
530        #[message(ctx)]
531        pub fn tka_sign(
532            &self,
533            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
534            node_key: [u8; 32],
535        ) -> DelegatedReply<Result<(), TkaSyncError>> {
536            let (deleg, replier) = ctx.reply_sender();
537
538            if let Some(replier) = replier {
539                let config = self.params.config.clone();
540                let keys = self.params.env.keys.clone();
541                tokio::spawn(async move {
542                    // Sign the node key with our network-lock key, then submit the raw-CBOR NKS.
543                    let nks = ts_tka::NodeKeySignature::sign_direct(
544                        &node_key,
545                        &keys.network_lock_keys.private.signing_key(),
546                    );
547                    let req = ts_control::TkaSubmitSignatureRequest {
548                        // node_key + version are stamped by the RPC client from `keys`.
549                        version: Default::default(),
550                        node_key: keys.node_keys.public,
551                        signature: nks.serialize(),
552                    };
553                    let result = tka_submit_signature(
554                        &config.server_url,
555                        &keys,
556                        req,
557                        config.allow_http_key_fetch,
558                    )
559                    .await
560                    .map(|_response| ());
561                    replier.send(result);
562                });
563            }
564
565            deleg
566        }
567
568        /// Disable Tailnet Lock by presenting the disablement secret to control (Go
569        /// `tka.disable` → `/machine/tka/disable`).
570        ///
571        /// Targets the **current** authority head (read from the cached [`TkaStatus`]); the caller
572        /// supplies the `disablement_secret` out of band (it is the operator-held capability that
573        /// authorizes turning the lock off). Mirrors `tka_sign`: clones config + keys into a spawned
574        /// task (delegated reply). Returns [`TkaSyncError::Unsupported`] when there is no known TKA
575        /// head (lock not in use / control hasn't pushed a status), since there is nothing to disable.
576        ///
577        /// **Submit-only, like `tka_sign`:** this POSTs the disablement to control and does NOT mutate
578        /// the local [`Authority`](ts_tka::Authority). Control acts on the disablement; this node
579        /// observes the result through the existing verified-sync path. Verify-and-log unchanged.
580        #[message(ctx)]
581        pub fn tka_disable(
582            &self,
583            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
584            disablement_secret: Vec<u8>,
585        ) -> DelegatedReply<Result<(), TkaSyncError>> {
586            let (deleg, replier) = ctx.reply_sender();
587
588            if let Some(replier) = replier {
589                // Read the current head from the cached status BEFORE the spawn (can't borrow &self
590                // across the await). No head ⇒ no lock to disable ⇒ Unsupported.
591                let head = self.tka.borrow().as_ref().map(|s| s.head.clone());
592                let config = self.params.config.clone();
593                let keys = self.params.env.keys.clone();
594                tokio::spawn(async move {
595                    let result = match head {
596                        Some(head) => {
597                            let req = ts_control::TkaDisableRequest {
598                                // node_key + version are stamped by the RPC client from `keys`.
599                                version: Default::default(),
600                                node_key: keys.node_keys.public,
601                                head,
602                                disablement_secret,
603                            };
604                            tka_disable(&config.server_url, &keys, req, config.allow_http_key_fetch)
605                                .await
606                                .map(|_response| ())
607                        }
608                        None => Err(TkaSyncError::Unsupported),
609                    };
610                    replier.send(result);
611                });
612            }
613
614            deleg
615        }
616
617        /// Initialize Tailnet Lock with this node as the sole initial trusted key, gated by
618        /// `disablement_secret` (Go `LocalClient.NetworkLockInit` — the "lock yourself in" case).
619        ///
620        /// Builds + signs a genesis Checkpoint AUM whose only trusted key is this node's network-lock
621        /// public key (votes 1) and whose single DisablementValue is `disablement_value(secret)`, then
622        /// drives the two-phase init: `tka/init/begin` (submit the genesis) → if control needs no
623        /// further node signatures (`NeedSignatures` empty, the case when this node is the only key) →
624        /// `tka/init/finish` carrying the raw `disablement_secret` as `SupportDisablement`. Mirrors
625        /// `tka_sign`/`tka_disable`: cloned config + keys into a spawned task (delegated reply).
626        ///
627        /// If control returns a non-empty `NeedSignatures` (other nodes must be re-signed under the new
628        /// lock — a multi-node tailnet), this returns [`TkaSyncError::Unsupported`]: re-signing each
629        /// listed node (incl. the Rotation-key case) is a larger flow deferred to a fuller
630        /// `tka_init(keys, secrets)` — the single-node lock-init is the shipped subset.
631        ///
632        /// **Submit-only**, like `tka_sign`/`tka_disable`: this creates the lock at control and does
633        /// NOT seed the local [`Authority`](ts_tka::Authority) — the node picks up the new lock through
634        /// the existing verified netmap-sync (control pushes a `TKAInfo`, `maybe_sync_tka` bootstraps
635        /// the genesis through `VerifiedAumChain::verify`). Verify-and-log posture unchanged.
636        #[message(ctx)]
637        pub fn tka_init(
638            &self,
639            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
640            disablement_secret: Vec<u8>,
641        ) -> DelegatedReply<Result<(), TkaSyncError>> {
642            let (deleg, replier) = ctx.reply_sender();
643
644            if let Some(replier) = replier {
645                let config = self.params.config.clone();
646                let keys = self.params.env.keys.clone();
647                tokio::spawn(async move {
648                    let result = tka_init_run(&config, &keys, disablement_secret).await;
649                    replier.send(result);
650                });
651            }
652
653            deleg
654        }
655
656        /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
657        ///
658        /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
659        /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
660        /// does not block waiting for a value).
661        #[message]
662        pub fn cert_domains(&self) -> Vec<String> {
663            self.cert_domains.borrow().clone()
664        }
665
666        /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
667        /// control has sent no DNS config yet. An immediate answer (does not block); the facade
668        /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
669        #[message]
670        pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
671            self.dns_config.borrow().clone()
672        }
673
674        /// The interactive-login / consent URL control last asked this node to open
675        /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
676        /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
677        #[message]
678        pub fn pop_browser_url(&self) -> Option<url::Url> {
679            self.pop_browser_url.borrow().clone()
680        }
681
682        /// Subscribe to the interactive-login / consent URL cell (`MapResponse.PopBrowserURL`).
683        ///
684        /// Returns a [`watch::Receiver`] whose value is the latest running-node consent URL, used by
685        /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) to surface `browse_to_url`
686        /// events mid-session. The cell is sticky (updated only on a new non-empty URL, never reset
687        /// to `None` by an empty update — see the field docs), so a subscriber is not thrashed and a
688        /// late subscriber sees the current URL. The initial value is `None` until control sends one.
689        #[message(derive(Clone))]
690        pub fn watch_browser_url(&self) -> watch::Receiver<Option<url::Url>> {
691            self.pop_browser_url.subscribe()
692        }
693
694        /// The latest network-conditions report (preferred DERP region + per-region latencies). An
695        /// immediate answer (does not block); empty before the first DERP-latency measurement. The
696        /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
697        #[message]
698        pub fn netcheck(&self) -> crate::status::NetcheckReport {
699            self.netcheck.borrow().clone()
700        }
701
702        /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
703        ///
704        /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
705        /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
706        /// for the round-trip.
707        #[message(ctx)]
708        pub fn fetch_id_token(
709            &self,
710            ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
711            audience: String,
712        ) -> DelegatedReply<Result<String, IdTokenError>> {
713            let (deleg, replier) = ctx.reply_sender();
714
715            if let Some(replier) = replier {
716                let config = self.params.config.clone();
717                let keys = self.params.env.keys.clone();
718                tokio::spawn(async move {
719                    let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
720                    replier.send(result);
721                });
722            }
723
724            deleg
725        }
726
727        /// Log this node out of the tailnet: deregister it by expiring its current node key.
728        ///
729        /// Mirrors `fetch_id_token`: clones the control config + node keys
730        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
731        /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
732        /// control-plane state change only — it does NOT stop this actor or tear down the datapath
733        /// (the caller follows up with the normal runtime shutdown), and it does not touch the
734        /// on-disk node key, so re-registering with the same key is the re-login path.
735        #[message(ctx)]
736        pub fn logout(
737            &self,
738            ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
739        ) -> DelegatedReply<Result<(), LogoutError>> {
740            let (deleg, replier) = ctx.reply_sender();
741
742            if let Some(replier) = replier {
743                let config = self.params.config.clone();
744                let keys = self.params.env.keys.clone();
745                tokio::spawn(async move {
746                    let result = ts_control::logout(&config, &keys).await;
747                    replier.send(result);
748                });
749            }
750
751            deleg
752        }
753
754        /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
755        /// `LocalClient.SetDNS`).
756        ///
757        /// Mirrors `fetch_id_token`: clones the control config + node keys
758        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
759        /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
760        /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
761        /// match, so the surfaced API takes only `name` + `value`.
762        #[message(ctx)]
763        pub fn set_dns(
764            &self,
765            ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
766            name: String,
767            value: String,
768        ) -> DelegatedReply<Result<(), SetDnsError>> {
769            let (deleg, replier) = ctx.reply_sender();
770
771            if let Some(replier) = replier {
772                let config = self.params.config.clone();
773                let keys = self.params.env.keys.clone();
774                tokio::spawn(async move {
775                    let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
776                    replier.send(result);
777                });
778            }
779
780            deleg
781        }
782    }
783
784    /// The reply type of the [`get_cert_pair`](ControlRunner::get_cert_pair) message: the issued
785    /// `(cert_chain_pem, key_pem)` PEM pair (the `tnet cert` surface) or a [`ts_control::CertError`].
786    /// Aliased so the message's `Context` type stays under clippy's `type_complexity` bar (the
787    /// nested `Result<(String, String), _>` trips it inline).
788    #[cfg(feature = "acme")]
789    pub type CertPairReply = Result<(String, String), ts_control::CertError>;
790
791    // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
792    // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
793    // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
794    // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
795    // block keeps the default build clean.
796    #[cfg(feature = "acme")]
797    #[kameo::messages]
798    impl ControlRunner {
799        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
800        /// client-side ACME DNS-01 engine (`acme` feature).
801        ///
802        /// Mirrors `fetch_id_token`: clones the control config + node keys
803        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
804        /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
805        /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
806        ///
807        /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
808        /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
809        /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
810        /// allows it). Persisting a generated key back into the key file is the embedder's job (no
811        /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
812        /// publish 501s.
813        #[message(ctx)]
814        pub fn get_certificate(
815            &self,
816            ctx: &mut Context<
817                Self,
818                DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
819            >,
820            name: String,
821        ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
822            let (deleg, replier) = ctx.reply_sender();
823
824            if let Some(replier) = replier {
825                let config = self.params.config.clone();
826                let keys = self.params.env.keys.clone();
827                tokio::spawn(async move {
828                    let result = issue_certificate(&config, &keys, &name).await;
829                    replier.send(result);
830                });
831            }
832
833            deleg
834        }
835
836        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` and return the
837        /// **PEM pair** — `(cert_chain_pem, key_pem)` — for writing the on-disk `.crt` + `.key`
838        /// (the daemon's `tnet cert`, Go's `LocalClient.CertPair`). `acme` feature.
839        ///
840        /// Identical issuance to [`get_certificate`](Self::get_certificate) (same client-side ACME
841        /// DNS-01 flow, same set-dns publish, same account-key handling), only the *shape* of the
842        /// result differs: this surfaces the raw chain + leaf-key PEMs instead of the opaque
843        /// [`CertifiedKey`](ts_control::tls::CertifiedKey). The leaf **private key** PEM is the
844        /// second tuple element and is NEVER logged — the spawned task sends it straight back to the
845        /// replier. SaaS-only: against a self-hosted control plane the set-dns publish 501s.
846        #[message(ctx)]
847        pub fn get_cert_pair(
848            &self,
849            ctx: &mut Context<Self, DelegatedReply<CertPairReply>>,
850            name: String,
851        ) -> DelegatedReply<CertPairReply> {
852            let (deleg, replier) = ctx.reply_sender();
853
854            if let Some(replier) = replier {
855                let config = self.params.config.clone();
856                let keys = self.params.env.keys.clone();
857                tokio::spawn(async move {
858                    let result = issue_cert_pair(&config, &keys, &name).await;
859                    replier.send(result);
860                });
861            }
862
863            deleg
864        }
865    }
866}
867
868/// The `tka_init` body (the genesis-build + two-phase init/begin→init/finish choreography),
869/// factored out of the actor handler so it runs in the spawned task. See [`ControlRunner::tka_init`].
870///
871/// "Lock yourself in": the genesis trusts only this node's network-lock key (votes 1) and stores one
872/// DisablementValue = `disablement_value(secret)`. On a non-empty `NeedSignatures` (multi-node
873/// tailnet needing re-signs) it returns [`TkaSyncError::Unsupported`] — the single-node subset.
874async fn tka_init_run(
875    config: &ts_control::Config,
876    keys: &ts_keys::NodeState,
877    disablement_secret: Vec<u8>,
878) -> Result<(), TkaSyncError> {
879    // Build the genesis: this node's NL public key as the sole trusted key, one disablement value.
880    let nl_public = keys.network_lock_keys.public.to_bytes().to_vec();
881    let genesis_key = ts_tka::AumKey {
882        kind: ts_tka::KeyKind::Ed25519,
883        votes: 1,
884        public: nl_public,
885        meta: Vec::new(),
886    };
887    let dvalue = ts_tka::disablement_value(&disablement_secret).to_vec();
888    let mut genesis = ts_tka::Aum::new_genesis_checkpoint(vec![genesis_key], vec![dvalue])
889        // A malformed genesis is a local construction bug, not a transient RPC failure — surface it as a
890        // coarse internal error rather than NetworkError (which would invite a pointless retry).
891        .map_err(|_| TkaSyncError::Internal(ts_control::TkaSyncInternalErrorKind::SerDe))?;
892    genesis.sign(&keys.network_lock_keys.private.signing_key());
893
894    // Phase 1: submit the genesis. node_key + version are stamped by the RPC client from `keys`.
895    let begin_req = ts_control::TkaInitBeginRequest {
896        version: Default::default(),
897        node_key: keys.node_keys.public,
898        genesis_aum: genesis.serialize(),
899    };
900    let begin_resp = tka_init_begin(
901        &config.server_url,
902        keys,
903        begin_req,
904        config.allow_http_key_fetch,
905    )
906    .await?;
907
908    // Single-node case only: control must need no further node signatures. A non-empty
909    // NeedSignatures means other nodes must be re-signed under the new lock — deferred.
910    if !begin_resp.need_signatures.is_empty() {
911        tracing::warn!(
912            need = begin_resp.need_signatures.len(),
913            "tka_init: control requires re-signing other nodes; the multi-node init is not yet \
914             implemented (single-node lock-init only)"
915        );
916        return Err(TkaSyncError::Unsupported);
917    }
918
919    // Phase 2: finish, carrying the raw disablement secret as SupportDisablement (Go sends the raw
920    // secret here; only the genesis stores its Argon2i hash).
921    let finish_req = ts_control::TkaInitFinishRequest {
922        version: Default::default(),
923        node_key: keys.node_keys.public,
924        signatures: std::collections::BTreeMap::new(),
925        support_disablement: disablement_secret,
926    };
927    tka_init_finish(
928        &config.server_url,
929        keys,
930        finish_req,
931        config.allow_http_key_fetch,
932    )
933    .await
934    .map(|_response| ())
935}
936
937/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
938/// returning just the ready-to-serve [`CertifiedKey`](ts_control::tls::CertifiedKey) (the
939/// `get_certificate` / `ListenTLS` path).
940///
941/// Thin wrapper over [`issue_cert_pair`] that drops the PEMs — one issuance, this caller just
942/// doesn't need the on-disk pair. See [`issue_cert_pair`] for the account-key handling.
943#[cfg(feature = "acme")]
944async fn issue_certificate(
945    config: &ts_control::Config,
946    keys: &ts_keys::NodeState,
947    name: &str,
948) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
949    issue_cert_pair_inner(config, keys, name)
950        .await
951        .map(|issued| issued.certified)
952}
953
954/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
955/// returning the **PEM pair** `(cert_chain_pem, key_pem)` for the daemon's on-disk `.crt`/`.key`
956/// (`tnet cert`, Go `LocalClient.CertPair`).
957///
958/// Same single issuance as [`issue_certificate`]; only the result shape differs. The leaf
959/// **private key** PEM is the second element and is NEVER logged here.
960#[cfg(feature = "acme")]
961async fn issue_cert_pair(
962    config: &ts_control::Config,
963    keys: &ts_keys::NodeState,
964    name: &str,
965) -> Result<(String, String), ts_control::CertError> {
966    issue_cert_pair_inner(config, keys, name)
967        .await
968        .map(|issued| (issued.cert_chain_pem, issued.key_pem))
969}
970
971/// Shared issuance core for [`issue_certificate`] and [`issue_cert_pair`]: load (or generate) the
972/// ACME account key, target Let's Encrypt production, and run one DNS-01 issuance, returning the
973/// full [`IssuedCert`](ts_control::acme::IssuedCert) so each caller projects out what it needs (one
974/// ACME order, two consumers).
975///
976/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
977/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
978/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
979/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]). Never logs the leaf
980/// private key.
981#[cfg(feature = "acme")]
982async fn issue_cert_pair_inner(
983    config: &ts_control::Config,
984    keys: &ts_keys::NodeState,
985    name: &str,
986) -> Result<ts_control::acme::IssuedCert, ts_control::CertError> {
987    let account_key = match keys.acme_account_key.as_deref() {
988        Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
989        None => {
990            tracing::debug!(
991                "no persisted ACME account key in key state; generating an ephemeral per-call key \
992                 (a new ACME account this issuance — not persisted back)"
993            );
994            ts_control::acme::AcmeAccountKey::generate()?.0
995        }
996    };
997    let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
998        .parse()
999        .map_err(|e| {
1000            ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
1001        })?;
1002    ts_control::issue_cert_pair_via_setdns(config, keys, name, &account_key, &directory).await
1003}
1004
1005impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
1006    type Reply = ();
1007
1008    async fn handle(
1009        &mut self,
1010        msg: StreamMessage<Arc<StateUpdate>, (), ()>,
1011        ctx: &mut Context<Self, Self::Reply>,
1012    ) {
1013        match msg {
1014            StreamMessage::Started(_) => {
1015                tracing::trace!("started listening to state updates");
1016            }
1017
1018            StreamMessage::Next(msg) => {
1019                if let Some(node) = msg.node.as_ref() {
1020                    // Reflect node-key expiry into the device state: control delivering a self-node
1021                    // whose key is in the past means the node must re-authenticate. Otherwise the
1022                    // arrival of a fresh self-node confirms we are Running (recovers the state if a
1023                    // prior update had flipped it to Expired).
1024                    let now_unix = std::time::SystemTime::now()
1025                        .duration_since(std::time::UNIX_EPOCH)
1026                        .map(|d| d.as_secs() as i64)
1027                        .unwrap_or(0);
1028                    let next = if node.key_expired_at_unix(now_unix) {
1029                        crate::DeviceState::Expired
1030                    } else {
1031                        crate::DeviceState::Running
1032                    };
1033                    // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
1034                    // self-node arrives on every netmap update).
1035                    self.params.state_tx.send_if_modified(|s| {
1036                        if *s != next {
1037                            *s = next.clone();
1038                            true
1039                        } else {
1040                            false
1041                        }
1042                    });
1043
1044                    self.self_node.send_replace(Some(node.clone()));
1045                }
1046
1047                if let Some(policy) = msg.ssh_policy.as_ref() {
1048                    self.ssh_policy.send_replace(Some(policy.clone()));
1049                }
1050
1051                if let Some(tka) = msg.tka.as_ref() {
1052                    self.tka.send_replace(Some(tka.clone()));
1053                    self.maybe_sync_tka(tka, ctx.actor_ref().clone());
1054                }
1055
1056                // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
1057                // An update with no DNS config, or one carrying no cert domains, means "none" — Go
1058                // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
1059                let cert_domains = msg
1060                    .dns_config
1061                    .as_ref()
1062                    .map(|d| d.cert_domains.clone())
1063                    .unwrap_or_default();
1064                self.cert_domains.send_replace(cert_domains);
1065
1066                // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
1067                // `None` when control sent no DNS config on this update — distinct from a present but
1068                // empty config (Go `netmap.NetworkMap.DNS`).
1069                self.dns_config.send_replace(msg.dns_config.clone());
1070
1071                // Track the interactive-login URL for `Device::pop_browser_url` /
1072                // `Runtime::watch_ipn_bus`. See `sticky_update_pop_browser_url` for the Go-faithful
1073                // sticky semantics (update only on a new non-empty URL; never reset to `None`).
1074                sticky_update_pop_browser_url(&self.pop_browser_url, msg.pop_browser_url.as_ref());
1075
1076                if let Err(e) = self.params.env.publish(msg).await {
1077                    tracing::error!(error = %e, "publishing netmap update");
1078                }
1079            }
1080
1081            StreamMessage::Finished(_) => {
1082                tracing::error!("state update stream terminated")
1083            }
1084        }
1085    }
1086}
1087
1088/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
1089/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
1090/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
1091/// [`ControlRunner::apply_tka_synced`](ControlRunner).
1092#[doc(hidden)]
1093pub struct TkaSynced {
1094    pub(crate) result:
1095        Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
1096}
1097
1098impl Message<TkaSynced> for ControlRunner {
1099    type Reply = ();
1100
1101    async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
1102        self.apply_tka_synced(msg.result).await;
1103    }
1104}
1105
1106impl Message<DerpLatencyMeasurement> for ControlRunner {
1107    type Reply = ();
1108
1109    async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
1110        let measurements = msg.measurement.as_ref().clone();
1111
1112        // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
1113        // the same measurements, before the home-region short-circuit below — an empty set still
1114        // yields a (default/empty) report rather than a stale one.
1115        self.netcheck
1116            .send_replace(crate::status::NetcheckReport::from_region_results(
1117                &measurements,
1118            ));
1119
1120        let Some(result) = measurements.first() else {
1121            tracing::debug!("derp latency measurements empty");
1122            return;
1123        };
1124
1125        let iter = measurements.iter().map(|result| {
1126            (
1127                result.latency_map_key.as_str(),
1128                result.latency.as_secs_f64(),
1129            )
1130        });
1131
1132        tracing::debug!(selected_region_id = ?result.id, "updating home region");
1133
1134        self.client.set_home_region(result.id, iter).await;
1135    }
1136}
1137
1138impl Message<EndpointAdvertisement> for ControlRunner {
1139    type Reply = ();
1140
1141    async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
1142        let endpoints: Vec<Endpoint> = msg
1143            .endpoints
1144            .iter()
1145            .map(|ep| Endpoint {
1146                endpoint: ep.addr,
1147                ty: match ep.ty {
1148                    SelfEndpointType::Local => EndpointType::Local,
1149                    SelfEndpointType::Stun => EndpointType::Stun,
1150                    SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
1151                },
1152            })
1153            .collect();
1154
1155        tracing::debug!(
1156            n_endpoints = endpoints.len(),
1157            "advertising endpoints to control"
1158        );
1159
1160        self.client.set_endpoints(endpoints).await;
1161    }
1162}
1163
1164/// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control — the wire
1165/// half of a runtime [`Runtime::set_advertise_routes`](crate::Runtime::set_advertise_routes). Sent
1166/// as a direct `ask` from the runtime (not over the bus), so the route change reaches the live
1167/// map-poll client. `routes` is the final advertised set the caller wants control to grant.
1168#[derive(Debug)]
1169pub struct SetAdvertiseRoutes {
1170    /// The prefixes to advertise to control (already filtered to the final set).
1171    pub routes: Vec<ipnet::IpNet>,
1172}
1173
1174impl Message<SetAdvertiseRoutes> for ControlRunner {
1175    type Reply = ();
1176
1177    async fn handle(&mut self, msg: SetAdvertiseRoutes, _ctx: &mut Context<Self, Self::Reply>) {
1178        tracing::debug!(n_routes = msg.routes.len(), "advertising routes to control");
1179        self.client.set_routable_ips(msg.routes).await;
1180    }
1181}
1182
1183/// Update this node's `Hostinfo.Hostname` at control — the wire half of a runtime
1184/// [`Runtime::set_hostname`](crate::Runtime::set_hostname). A direct `ask` from the runtime, so the
1185/// change reaches the live map-poll client.
1186#[derive(Debug)]
1187pub struct SetHostname {
1188    /// The new hostname to report to control.
1189    pub hostname: String,
1190}
1191
1192impl Message<SetHostname> for ControlRunner {
1193    type Reply = ();
1194
1195    async fn handle(&mut self, msg: SetHostname, _ctx: &mut Context<Self, Self::Reply>) {
1196        tracing::debug!("updating hostname at control");
1197        self.client.set_hostname(msg.hostname).await;
1198    }
1199}
1200
1201#[cfg(test)]
1202mod reauth_bridge_tests {
1203    use tokio::sync::watch;
1204
1205    use super::bridge_reauth_url_to_state;
1206    use crate::DeviceState;
1207
1208    fn url(s: &str) -> url::Url {
1209        s.parse().unwrap()
1210    }
1211
1212    /// The bridge maps a surfaced re-auth URL onto `DeviceState::NeedsLogin(url)` — the fix's core:
1213    /// a mid-session `MachineNotAuthorized` (forwarded by the control client as `Some(url)`) becomes
1214    /// the "needs login" state the IPN bus turns into `browse_to_url`.
1215    #[test]
1216    fn bridge_maps_auth_url_to_needs_login() {
1217        let u = url("https://login.example/auth");
1218        let (tx, rx) = watch::channel(DeviceState::Running);
1219
1220        bridge_reauth_url_to_state(&tx, Some(&u));
1221
1222        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1223    }
1224
1225    /// `None` never drives a transition — the recovery to `Running` is the netmap self-node
1226    /// handler's job, so the bridge ignores a `None` and leaves the state untouched.
1227    #[test]
1228    fn bridge_none_leaves_state_unchanged() {
1229        let (tx, rx) = watch::channel(DeviceState::Running);
1230
1231        bridge_reauth_url_to_state(&tx, None);
1232
1233        assert_eq!(*rx.borrow(), DeviceState::Running);
1234    }
1235
1236    /// Re-surfacing the same URL across retries does not re-fire the watch (`send_if_modified`
1237    /// dedupe against the cell's current value), so a stuck re-auth does not thrash subscribers.
1238    #[test]
1239    fn bridge_same_url_does_not_refire() {
1240        let u = url("https://login.example/auth");
1241        let (tx, mut rx) = watch::channel(DeviceState::Running);
1242
1243        bridge_reauth_url_to_state(&tx, Some(&u)); // first: fires
1244        assert!(rx.has_changed().unwrap(), "first NeedsLogin fires");
1245        rx.mark_unchanged();
1246        bridge_reauth_url_to_state(&tx, Some(&u)); // same URL: deduped
1247        assert!(
1248            !rx.has_changed().unwrap(),
1249            "the same re-auth URL must not re-fire the state watch"
1250        );
1251    }
1252
1253    /// A genuinely different re-auth URL after a prior one fires again (the dedupe tracks changes,
1254    /// it does not pin the first URL forever).
1255    #[test]
1256    fn bridge_new_url_after_prior_fires() {
1257        let a = url("https://login.example/a");
1258        let b = url("https://login.example/b");
1259        let (tx, rx) = watch::channel(DeviceState::Running);
1260
1261        bridge_reauth_url_to_state(&tx, Some(&a));
1262        bridge_reauth_url_to_state(&tx, Some(&b));
1263
1264        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(b));
1265    }
1266
1267    /// End-to-end of the *clear* contract: after the bridge sets `NeedsLogin`, the netmap self-node
1268    /// path (modeled here as a direct `send_replace(Running)`, the exact transition the
1269    /// `StreamMessage::Next` handler performs on the next good self-node) flips back to `Running`.
1270    /// This pins that the bridge does NOT need a `None`-clear arm — recovery is owned elsewhere.
1271    #[test]
1272    fn running_netmap_clears_needs_login() {
1273        let u = url("https://login.example/auth");
1274        let (tx, rx) = watch::channel(DeviceState::Running);
1275
1276        bridge_reauth_url_to_state(&tx, Some(&u));
1277        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1278
1279        // The self-node handler's recovery transition (next good netmap self-node → Running).
1280        tx.send_replace(DeviceState::Running);
1281        assert_eq!(*rx.borrow(), DeviceState::Running);
1282    }
1283}
1284
1285#[cfg(test)]
1286mod sticky_pop_browser_url_tests {
1287    use tokio::sync::watch;
1288
1289    use super::sticky_update_pop_browser_url;
1290
1291    fn url(s: &str) -> url::Url {
1292        s.parse().unwrap()
1293    }
1294
1295    /// A non-empty URL publishes to the cell.
1296    #[test]
1297    fn non_empty_url_publishes() {
1298        let (tx, rx) = watch::channel(None);
1299        let u = url("https://login.example/consent");
1300        sticky_update_pop_browser_url(&tx, Some(&u));
1301        assert_eq!(*rx.borrow(), Some(u));
1302    }
1303
1304    /// An absent (`None`) update — the common netmap tick — must NOT reset the cell. This is the
1305    /// regression guard for the thrash bug (a reset-every-tick would coalesce the URL away on the bus).
1306    #[test]
1307    fn absent_update_does_not_reset() {
1308        let u = url("https://login.example/consent");
1309        let (tx, rx) = watch::channel(Some(u.clone()));
1310        // Simulate many empty netmap updates.
1311        for _ in 0..5 {
1312            sticky_update_pop_browser_url(&tx, None);
1313        }
1314        assert_eq!(
1315            *rx.borrow(),
1316            Some(u),
1317            "empty updates must not clear the URL"
1318        );
1319    }
1320
1321    /// The same URL repeated does not re-fire the watch (in-place dedupe via `send_if_modified`), so
1322    /// a subscriber isn't woken spuriously. Proven by the borrow not having been marked changed.
1323    #[test]
1324    fn repeated_same_url_does_not_refire() {
1325        let u = url("https://login.example/consent");
1326        let (tx, mut rx) = watch::channel(None);
1327        sticky_update_pop_browser_url(&tx, Some(&u)); // first: fires
1328        assert!(rx.has_changed().unwrap(), "first non-empty URL fires");
1329        rx.mark_unchanged();
1330        sticky_update_pop_browser_url(&tx, Some(&u)); // same: deduped
1331        assert!(
1332            !rx.has_changed().unwrap(),
1333            "repeating the same URL must not re-fire the watch"
1334        );
1335    }
1336
1337    /// A genuinely new URL after a prior one fires again (sticky but tracks changes).
1338    #[test]
1339    fn new_url_after_prior_fires() {
1340        let a = url("https://login.example/a");
1341        let b = url("https://login.example/b");
1342        let (tx, rx) = watch::channel(None);
1343        sticky_update_pop_browser_url(&tx, Some(&a));
1344        sticky_update_pop_browser_url(&tx, Some(&b));
1345        assert_eq!(*rx.borrow(), Some(b));
1346    }
1347
1348    /// The realistic session sequence: a URL stays sticky through a run of `None` ticks, and a
1349    /// *different* URL after that gap still fires. Chains the legs the other tests cover in isolation
1350    /// (the actual control cadence is "URL, then many empty updates, then maybe a new URL").
1351    #[test]
1352    fn sticky_through_none_gap_then_new_url_fires() {
1353        let a = url("https://login.example/a");
1354        let b = url("https://login.example/b");
1355        let (tx, rx) = watch::channel(None);
1356        sticky_update_pop_browser_url(&tx, Some(&a));
1357        for _ in 0..3 {
1358            sticky_update_pop_browser_url(&tx, None);
1359        }
1360        assert_eq!(*rx.borrow(), Some(a), "stayed sticky through the None gap");
1361        sticky_update_pop_browser_url(&tx, Some(&b));
1362        assert_eq!(
1363            *rx.borrow(),
1364            Some(b),
1365            "a new URL after a None gap still fires"
1366        );
1367    }
1368
1369    /// Returning to a previously-seen URL (A → B → A) re-fires: the dedupe is against the cell's
1370    /// *current* value, not a full history, so A after B is a genuine change.
1371    #[test]
1372    fn returning_to_prior_url_refires() {
1373        let a = url("https://login.example/a");
1374        let b = url("https://login.example/b");
1375        let (tx, mut rx) = watch::channel(None);
1376        sticky_update_pop_browser_url(&tx, Some(&a));
1377        sticky_update_pop_browser_url(&tx, Some(&b));
1378        rx.mark_unchanged();
1379        sticky_update_pop_browser_url(&tx, Some(&a)); // back to A: differs from current (B) → fires
1380        assert!(
1381            rx.has_changed().unwrap(),
1382            "returning to a prior URL re-fires"
1383        );
1384        assert_eq!(*rx.borrow(), Some(a));
1385    }
1386
1387    /// End-to-end de-thrash: feed a realistic netmap cadence (empty, empty, URL, empty, empty)
1388    /// through the producer into a cell, and count the changes a `run_bus`-style subscriber would
1389    /// observe via `changed()`. The whole point of the fix is that exactly ONE change survives the
1390    /// surrounding `None` thrash — the pre-fix code (`send_replace` every tick) would have woken the
1391    /// subscriber on every empty tick and coalesced the URL away. This exercises the producer + the
1392    /// watch-subscribe path together (the two halves the unit tests cover in isolation).
1393    #[tokio::test]
1394    async fn end_to_end_one_change_survives_none_thrash() {
1395        let u = url("https://login.example/consent");
1396        let (tx, mut rx) = watch::channel(None);
1397        // The cadence control actually sends: mostly-empty MapResponses with one carrying the URL.
1398        let cadence = [None, None, Some(&u), None, None];
1399        for incoming in cadence {
1400            sticky_update_pop_browser_url(&tx, incoming);
1401        }
1402        // A subscriber sees exactly one change, and it carries the URL (not a coalesced `None`).
1403        let mut changes = 0;
1404        while rx.has_changed().unwrap() {
1405            let v = rx.borrow_and_update().clone();
1406            changes += 1;
1407            assert_eq!(v, Some(u.clone()), "the surviving change carries the URL");
1408        }
1409        assert_eq!(changes, 1, "exactly one change survives the None thrash");
1410    }
1411}