Skip to main content

ts_runtime/
control_runner.rs

1use core::{
2    net::{Ipv4Addr, Ipv6Addr},
3    time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9    actor::{ActorRef, Spawn},
10    message::{Context, StreamMessage},
11    prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15    AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16    Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus, TkaSyncError, tka_disable,
17    tka_init_begin, tka_init_finish, tka_submit_signature,
18};
19use ts_magicsock::SelfEndpointType;
20
21use crate::{
22    derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
23    direct::EndpointAdvertisement,
24};
25
26/// Actor responsible for maintaining the connection to control.
27///
28/// This actor is responsible for proxying the map response stream onto the message bus.
29pub struct ControlRunner {
30    client: AsyncControlClient,
31    params: Params,
32
33    self_node: watch::Sender<Option<Node>>,
34    /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
35    /// server reads this to authorize incoming connections; absent policy means deny-all.
36    ssh_policy: watch::Sender<Option<SshPolicy>>,
37    /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
38    tka: watch::Sender<Option<TkaStatus>>,
39    /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
40    /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
41    /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
42    /// the result returns via the [`TkaSynced`] self-message).
43    tka_synced: Option<crate::tka_sync::SyncedTka>,
44    /// The verified TKA [`Authority`](ts_tka::Authority) the peer tracker **enforces** (Go
45    /// `tkaFilterNetmapLocked`). `None` until the first successful sync, and reset to `None` when the
46    /// lock is disabled. This is the SOLE delivery channel to the peer tracker (which holds the
47    /// matching `Receiver` and reads it on every peer upsert): a `watch` cell, not a bus message, so
48    /// the latest value is always readable, never dropped under load, and writes are strictly ordered
49    /// by this actor — a disable (`None`) can never be reordered behind or dropped before a stale
50    /// `Some`. Written only from [`apply_tka_synced`] (enable) and [`maybe_sync_tka`] (disable), both
51    /// on the actor thread. The published `Authority` has always passed `VerifiedAumChain::verify`.
52    tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
53    /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
54    /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
55    tka_syncing: bool,
56    /// Monotonic generation stamped when a disable (or a fresh sync) supersedes any in-flight sync.
57    /// `maybe_sync_tka` bumps this on a disable transition and captures it into each spawned sync;
58    /// [`apply_tka_synced`] discards a sync result whose captured generation is stale, so a lock
59    /// disabled *while a sync was in flight* is never re-enabled by that sync's late `Ok(Some)`
60    /// (the in-flight window the `tka_synced.is_some()` disable guard alone does not cover).
61    tka_generation: u64,
62    /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
63    /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
64    cert_domains: watch::Sender<Vec<String>>,
65    /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
66    /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
67    /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
68    /// own cell for the narrower TLS-cert use.
69    dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
70    /// Latest interactive-login / consent URL control asked this node to open
71    /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
72    /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user), and
73    /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) subscribes to it for the bus's
74    /// `browse_to_url` running-node events.
75    ///
76    /// **Sticky, not per-update** (Go `controlclient` `sess.lastPopBrowserURL`): control sends
77    /// `MapResponse.PopBrowserURL` empty on nearly every netmap tick, so this cell is updated ONLY on
78    /// a non-empty URL that differs from its current value (`sticky_update_pop_browser_url`, via
79    /// `send_if_modified` — the cell's own value is the "last URL seen", so no separate mirror is
80    /// needed). It is never reset to `None` by an empty update — matching Go's `direct.go` guard
81    /// `u != "" && u != sess.lastPopBrowserURL`. Updating on every tick would thrash the cell to
82    /// `None` and coalesce the URL away for a `watch` subscriber.
83    pop_browser_url: watch::Sender<Option<url::Url>>,
84    /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
85    /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
86    /// daemon's `tnet netcheck`). Empty until the first measurement.
87    netcheck: watch::Sender<crate::status::NetcheckReport>,
88    /// Background task that bridges the control client's mid-session re-auth URL cell onto
89    /// [`Self::params`]'s device-state cell (sets [`DeviceState::NeedsLogin`] when control returns
90    /// `MachineNotAuthorized` on a live re-register — see [`bridge_reauth_url_to_state`]). Aborted on
91    /// [`Drop`] so it cannot outlive the actor (the [`DataplaneActor`](crate::dataplane) pattern).
92    reauth_bridge: tokio::task::JoinHandle<()>,
93}
94
95impl Drop for ControlRunner {
96    fn drop(&mut self) {
97        // Stop the re-auth bridge so it does not outlive the actor (mirrors `DataplaneActor`).
98        self.reauth_bridge.abort();
99    }
100}
101
102/// Control runner args.
103pub struct Params {
104    /// Control config.
105    pub(crate) config: ts_control::Config,
106
107    /// Auth key (if needed).
108    pub(crate) auth_key: Option<String>,
109
110    /// The [`crate::Env`] for this actor.
111    pub(crate) env: crate::Env,
112
113    /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
114    /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
115    /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
116    /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
117    pub(crate) state_tx: watch::Sender<crate::DeviceState>,
118
119    /// Sender for the TKA enforcement-authority cell the peer tracker reads (Go
120    /// `tkaFilterNetmapLocked`). Created in [`Runtime::spawn`](crate::Runtime) and threaded into BOTH
121    /// the peer tracker (the `Receiver`) and this runner (the `Sender`), so the runner is the sole
122    /// writer and the tracker reads the latest verified `Authority` on demand. `None` = no lock /
123    /// disabled (admit all).
124    pub(crate) tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
125}
126
127#[doc(hidden)]
128#[derive(Debug, thiserror::Error)]
129pub enum ControlRunnerError {
130    #[error(transparent)]
131    Control(#[from] ControlError),
132
133    #[error(transparent)]
134    Crate(#[from] crate::Error),
135}
136
137impl kameo::Actor for ControlRunner {
138    type Args = Params;
139    type Error = ControlRunnerError;
140
141    async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
142        loop {
143            match AsyncControlClient::check_auth(
144                &params.config,
145                &params.env.keys,
146                params.auth_key.as_deref(),
147            )
148            .await
149            {
150                Ok(()) => break,
151                Err(ControlError::MachineNotAuthorized(u)) => {
152                    tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
153                    // Surface "interactive login required" so a watcher / `wait_until_running` can
154                    // tell the user to authorize, instead of seeing an opaque timeout. Registration
155                    // keeps retrying (transient), so this is not a terminal `Failed`.
156                    params
157                        .state_tx
158                        .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
159                    tokio::time::sleep(Duration::from_secs(5)).await;
160                }
161                Err(e) => {
162                    // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
163                    // specific reason control gave AND publish it as a typed `Failed` state so
164                    // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
165                    // of the opaque `Internal(Actor)` the caller would otherwise see once the
166                    // stopped actor is next asked. Publishing before `return Err` is why the state
167                    // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
168                    let reason = crate::RegistrationError::from(&e);
169                    tracing::error!(error = %e, "registration failed; control runner stopping");
170                    params
171                        .state_tx
172                        .send_replace(crate::DeviceState::Failed(reason));
173                    return Err(e.into());
174                }
175            }
176        }
177        // check_auth succeeded, but the node is not "up" until the netmap stream is actually
178        // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
179        // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
180        // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
181        // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
182        // The control client's live map-poll loop publishes a mid-session re-auth URL here (set when
183        // a re-register returns `MachineNotAuthorized` because the node key expired/was revoked). The
184        // runtime owns the receiver; `connect` takes the sender. Created before `connect` so the
185        // sender is in place for the very first poll, and so the receiver outlives `bring_up`.
186        let (auth_url_tx, auth_url_rx) = watch::channel::<Option<url::Url>>(None);
187
188        let bring_up = async {
189            let (client, stream) = AsyncControlClient::connect(
190                &params.config,
191                &params.env.keys,
192                params.auth_key.as_deref(),
193                auth_url_tx,
194            )
195            .await?;
196
197            DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
198
199            params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
200            params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
201            slf.attach_stream(stream.boxed(), (), ());
202            Ok::<_, ControlRunnerError>(client)
203        };
204
205        let client = match bring_up.await {
206            Ok(client) => client,
207            Err(e) => {
208                tracing::error!(error = %e, "bringing up the control session failed");
209                // The control session never came up; surface it as a transient registration
210                // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
211                // stuck at `Connecting`.
212                params.state_tx.send_replace(crate::DeviceState::Failed(
213                    crate::RegistrationError::NetworkUnreachable,
214                ));
215                return Err(e);
216            }
217        };
218
219        // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
220        // current (and flips to `Expired` if the self-node's key lapses).
221        params.state_tx.send_replace(crate::DeviceState::Running);
222
223        // Bridge the control client's mid-session re-auth URL cell onto the device-state cell: a
224        // `Some(url)` (control returned `MachineNotAuthorized` on a live re-register) becomes
225        // `DeviceState::NeedsLogin(url)` so the IPN bus surfaces `browse_to_url` and the embedder can
226        // prompt the user — the live-session analogue of the initial `check_auth` loop above. The
227        // recovery to `Running` is the netmap self-node handler's job (next good self-node), so this
228        // bridge only forwards `Some`. The task ends when the sender drops (the client's `run` task
229        // ended) and is aborted on actor `Drop`, so it cannot leak past the actor.
230        let reauth_bridge = {
231            let state_tx = params.state_tx.clone();
232            let mut auth_url_rx = auth_url_rx;
233            tokio::spawn(async move {
234                while auth_url_rx.changed().await.is_ok() {
235                    let url = auth_url_rx.borrow_and_update().clone();
236                    bridge_reauth_url_to_state(&state_tx, url.as_ref());
237                }
238            })
239        };
240
241        // Clone the TKA authority publisher before `params` moves into `Self` below. The matching
242        // `Receiver` lives on the peer tracker; this sender is the sole writer (enforce on sync,
243        // clear on disable).
244        let tka_authority = params.tka_authority.clone();
245
246        Ok(Self {
247            client,
248            params,
249            self_node: Default::default(),
250            ssh_policy: Default::default(),
251            tka: Default::default(),
252            tka_synced: None,
253            tka_authority,
254            tka_syncing: false,
255            tka_generation: 0,
256            cert_domains: Default::default(),
257            dns_config: Default::default(),
258            pop_browser_url: Default::default(),
259            netcheck: Default::default(),
260            reauth_bridge,
261        })
262    }
263}
264
265impl ControlRunner {
266    /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
267    /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
268    /// round-trip). The result returns via the [`TkaSynced`] self-message.
269    ///
270    /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
271    /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
272    /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
273    /// `tkaSyncIfNeeded`: a no-op when our head already matches.
274    fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
275        if !tka.is_enabled() {
276            // Lock disabled (or never enabled): clear enforcement by writing `None` to the authority
277            // cell the peer tracker reads — synchronously, so it can never be reordered behind or
278            // dropped before a stale `Some` (the failure a best-effort broadcast had). Always bump the
279            // generation so ANY sync currently in flight is invalidated: without this, a disable that
280            // races an in-flight sync (whose `take()` already cleared `tka_synced`) would be a no-op
281            // here, and the sync's late `Ok(Some)` would silently re-enable a lock control just turned
282            // off (the in-flight window the `tka_synced.is_some()` guard alone misses). Cheap and
283            // idempotent: clearing an already-`None` cell and bumping the generation are harmless.
284            self.tka_generation = self.tka_generation.wrapping_add(1);
285            if self.tka_synced.is_some() {
286                tracing::info!("TKA lock disabled; clearing enforcement (admitting all peers)");
287                self.tka_synced = None;
288            }
289            self.tka_authority.send_replace(None);
290            return;
291        }
292        if self.tka_syncing {
293            return; // a sync is already in flight; the next netmap will re-trigger if still stale
294        }
295        // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
296        // do. A malformed control head is treated as "different" (we'll attempt a sync, which
297        // fail-closes harmlessly).
298        if let Some(synced) = &self.tka_synced
299            && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
300            && synced.authority.head_matches(&control_head)
301        {
302            return;
303        }
304
305        // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
306        // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
307        // `tka_syncing` so we don't spawn a second concurrent sync. Capture the current generation so
308        // `apply_tka_synced` can discard this result if a disable bumped the generation while the sync
309        // was in flight (H1: don't re-enable a lock that was disabled mid-sync).
310        self.tka_syncing = true;
311        let generation = self.tka_generation;
312        let current = self.tka_synced.take();
313        let config = self.params.config.clone();
314        let keys = self.params.env.keys.clone();
315        tokio::spawn(async move {
316            let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
317            // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
318            // not allowed). A send failure just means the actor is gone — nothing to do.
319            if let Err(e) = self_ref.tell(TkaSynced { result, generation }).await {
320                tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
321            }
322        });
323    }
324
325    /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
326    /// state + publish the `Authority` to the peer tracker's enforcement cell (or, on inert/failed
327    /// sync, leave peers unaffected). Always clears the in-flight guard.
328    ///
329    /// `generation` is the value captured when the sync was spawned. If it no longer matches
330    /// `self.tka_generation`, the lock was disabled (or re-synced) while this sync was in flight, so
331    /// the result is discarded — never re-enabling an authority control has since turned off.
332    async fn apply_tka_synced(
333        &mut self,
334        result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
335        generation: u64,
336    ) {
337        self.tka_syncing = false;
338
339        // H1 guard: a disable (or a superseding sync) bumped the generation while this sync ran. Drop
340        // the stale result — `maybe_sync_tka`'s disable branch already cleared enforcement to `None`,
341        // and re-applying this `Some` would re-enforce a lock that is no longer active.
342        if generation != self.tka_generation {
343            tracing::info!(
344                "TKA sync result superseded (lock disabled or re-synced mid-flight); discarding"
345            );
346            return;
347        }
348
349        match result {
350            Ok(Some(synced)) => {
351                tracing::info!(
352                    head = %synced.authority.head().to_base32(),
353                    "TKA sync succeeded; enforcing verified Authority (Go tkaFilterNetmapLocked)"
354                );
355                // Deliver the verified Authority to the peer tracker's enforcement cell. The tracker
356                // reads it on every peer upsert and drops unauthorized peers. `Some(..)` = enforce; a
357                // `None` is written on disable. `watch` is the sole channel (last-write-wins, never
358                // dropped, ordered by this actor) — no bus, no re-publish-for-replay needed.
359                self.tka_authority
360                    .send_replace(Some(synced.authority.clone()));
361                self.tka_synced = Some(synced);
362            }
363            Ok(None) => {
364                // Control has no lock for us (no genesis / disabled). Clear any authority we were
365                // previously enforcing — symmetric with the disable path — so a transition to
366                // "no lock" stops dropping peers. Not an error.
367                if self.tka_synced.is_some() {
368                    tracing::info!("TKA sync: control reports no lock; clearing enforcement");
369                    self.tka_synced = None;
370                }
371                self.tka_authority.send_replace(None);
372            }
373            Err(e) => {
374                // Transport or verify failure: log and leave the prior authority in place (a failed
375                // sync must not drop enforcement — that would fail OPEN). NEVER errors the netmap.
376                // The next netmap update re-triggers a sync attempt.
377                tracing::warn!(error = %e, "TKA sync failed; keeping prior enforcement state");
378            }
379        }
380    }
381
382    fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
383    where
384        F: FnOnce(&Node) -> R,
385    {
386        let mut sub = self.self_node.subscribe();
387        let mut shutdown = self.params.env.shutdown.clone();
388
389        async move {
390            tokio::select! {
391                _ = shutdown.wait_for(|x| *x) => {
392                    None
393                },
394                node = sub.wait_for(Option::is_some) => {
395                    Some(f(node.ok()?.as_ref()?))
396                },
397            }
398        }
399    }
400}
401
402/// Apply Go's sticky `PopBrowserURL` semantics to the consent-URL `watch` cell.
403///
404/// Control sends `MapResponse.PopBrowserURL` empty on nearly every netmap update, so the cell is
405/// updated ONLY when `incoming` is a non-empty URL that differs from the cell's current value —
406/// Go's `direct.go` guard `u != "" && u != sess.lastPopBrowserURL`. The cell is **never reset to
407/// `None`** by an empty/absent update — the running-node consent URL is sticky for the session.
408/// Updating unconditionally would thrash the cell to `None` on every tick and coalesce the URL away
409/// for a `watch`/bus subscriber.
410///
411/// The dedupe is in-place via [`watch::Sender::send_if_modified`] — the cell's own value is the
412/// "last URL sent" (this sticky path is its only writer), so no separate mirror field is needed and
413/// the watch is woken only on a genuine change (Go's `sess.lastPopBrowserURL` role, for free). This
414/// matches the [`send_if_modified`](watch::Sender::send_if_modified) idiom already used for the
415/// device-state cell in this handler.
416///
417/// Factored out of the netmap-update handler so the (easy-to-regress) sticky logic is unit-testable
418/// against a plain `watch` channel without standing up the actor.
419fn sticky_update_pop_browser_url(
420    cell: &watch::Sender<Option<url::Url>>,
421    incoming: Option<&url::Url>,
422) {
423    if let Some(url) = incoming {
424        cell.send_if_modified(|current| {
425            if current.as_ref() == Some(url) {
426                false
427            } else {
428                *current = Some(url.clone());
429                true
430            }
431        });
432    }
433}
434
435/// Map a mid-session re-auth URL surfaced by the control client onto the device-state cell.
436///
437/// The control client's live map-poll loop publishes an `Option<url::Url>` into a `watch` cell when
438/// a re-register hits `MachineNotAuthorized` (the node key expired/was revoked mid-session — see
439/// [`ts_control::AsyncControlClient::connect`]'s `auth_url_tx`). `ts_control` cannot name
440/// [`DeviceState`] (it must not depend on this crate), so this bridge fn does the translation:
441/// a `Some(url)` sets [`DeviceState::NeedsLogin`]`(url)` so the IPN bus derives `browse_to_url` and
442/// the embedder can prompt the user, exactly like the initial-registration `check_auth` path.
443///
444/// **Only `Some` drives a transition; `None` is ignored here.** The clear back to
445/// [`DeviceState::Running`] is owned by the netmap self-node handler (the next good self-node flips
446/// it — see the `StreamMessage::Next` arm), which is the authoritative "we are up again" signal; an
447/// independent `None`-clear in this bridge could race that and is unnecessary. The
448/// [`send_if_modified`](watch::Sender::send_if_modified) guard fires the watch only on a genuine
449/// state change (it is a no-op when the cell already holds `NeedsLogin(url)` for the same URL), so a
450/// re-auth URL re-surfaced across retries does not thrash the cell — mirroring the device-state
451/// dedupe in the netmap handler.
452///
453/// Factored out so the (regress-prone) map-and-guard is unit-testable against a plain `watch`
454/// channel without standing up the actor (mirrors [`sticky_update_pop_browser_url`]).
455pub(crate) fn bridge_reauth_url_to_state(
456    state_tx: &watch::Sender<crate::DeviceState>,
457    incoming: Option<&url::Url>,
458) {
459    if let Some(url) = incoming {
460        let next = crate::DeviceState::NeedsLogin(url.clone());
461        state_tx.send_if_modified(|current| {
462            if *current == next {
463                false
464            } else {
465                *current = next.clone();
466                true
467            }
468        });
469    }
470}
471
472// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
473// those generated fields carry no doc and can't take attributes, so wrap in a module where
474// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
475// are re-exported so callers keep referencing them at `control_runner::<Name>`.
476pub use msg_impl::*;
477
478#[allow(missing_docs)]
479mod msg_impl {
480    use kameo::{message::Context, reply::DelegatedReply};
481
482    use super::*;
483
484    #[kameo::messages]
485    impl ControlRunner {
486        /// Fetch the IPv4 address for this tailscale device.
487        #[message(ctx)]
488        pub fn ipv4(
489            &self,
490            ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
491        ) -> DelegatedReply<Option<Ipv4Addr>> {
492            let (deleg, replier) = ctx.reply_sender();
493
494            if let Some(replier) = replier {
495                let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
496
497                tokio::spawn(async move {
498                    let ip = fut.await;
499                    replier.send(ip);
500                });
501            }
502
503            deleg
504        }
505
506        /// Fetch the IPv6 address for this tailscale device.
507        #[message(ctx)]
508        pub fn ipv6(
509            &self,
510            ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
511        ) -> DelegatedReply<Option<Ipv6Addr>> {
512            let (deleg, replier) = ctx.reply_sender();
513
514            if let Some(replier) = replier {
515                let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
516
517                tokio::spawn(async move {
518                    let ip = fut.await;
519                    replier.send(ip);
520                });
521            }
522
523            deleg
524        }
525
526        /// Fetch the self node for this tailscale device.
527        #[message(ctx)]
528        pub fn self_node(
529            &self,
530            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
531        ) -> DelegatedReply<Option<Node>> {
532            let (deleg, replier) = ctx.reply_sender();
533
534            if let Some(replier) = replier {
535                let node = self.with_self_node(|node| node.clone());
536
537                tokio::spawn(async move {
538                    let node = node.await;
539                    replier.send(node)
540                });
541            }
542
543            deleg
544        }
545
546        /// Fetch the current Tailscale SSH policy, if control has pushed one.
547        ///
548        /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
549        /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
550        /// for a value: an absent policy is a legitimate, immediate answer.
551        #[message]
552        pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
553            self.ssh_policy.borrow().clone()
554        }
555
556        /// Fetch the current Tailnet Lock status, if control has pushed one.
557        ///
558        /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
559        #[message]
560        pub fn current_tka_status(&self) -> Option<TkaStatus> {
561            self.tka.borrow().clone()
562        }
563
564        /// Sign `node_key` directly with this node's network-lock key and submit the signature to
565        /// control (Go `tka.sign` for the Direct case → `tkaSubmitSignature`).
566        ///
567        /// Builds a `Direct` [`NodeKeySignature`](ts_tka::NodeKeySignature) via
568        /// [`sign_direct`](ts_tka::NodeKeySignature::sign_direct) over this node's inner ed25519
569        /// network-lock signing key, serializes it (raw CBOR), and POSTs it to `/machine/tka/sign`.
570        /// Mirrors `set_dns`/`get_certificate`: clones the control config + node keys into a spawned
571        /// task (delegated reply, so the round-trip doesn't block the mailbox) over a fresh Noise
572        /// channel.
573        ///
574        /// **Posture: this only *submits* a signature to control — it does NOT mutate the local
575        /// [`Authority`](ts_tka::Authority).** The local trusted-key state advances solely through the
576        /// existing verified-sync path (`sync_tka` → `VerifiedAumChain::verify`); a `tka_sign` success
577        /// is acknowledged to the caller, and the resulting AUM is picked up on the next netmap-driven
578        /// sync. Verify-and-log is unchanged.
579        #[message(ctx)]
580        pub fn tka_sign(
581            &self,
582            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
583            node_key: [u8; 32],
584        ) -> DelegatedReply<Result<(), TkaSyncError>> {
585            let (deleg, replier) = ctx.reply_sender();
586
587            if let Some(replier) = replier {
588                let config = self.params.config.clone();
589                let keys = self.params.env.keys.clone();
590                tokio::spawn(async move {
591                    // Sign the node key with our network-lock key, then submit the raw-CBOR NKS.
592                    let nks = ts_tka::NodeKeySignature::sign_direct(
593                        &node_key,
594                        &keys.network_lock_keys.private.signing_key(),
595                    );
596                    let req = ts_control::TkaSubmitSignatureRequest {
597                        // node_key + version are stamped by the RPC client from `keys`.
598                        version: Default::default(),
599                        node_key: keys.node_keys.public,
600                        signature: nks.serialize(),
601                    };
602                    let result = tka_submit_signature(
603                        &config.server_url,
604                        &keys,
605                        req,
606                        config.allow_http_key_fetch,
607                    )
608                    .await
609                    .map(|_response| ());
610                    replier.send(result);
611                });
612            }
613
614            deleg
615        }
616
617        /// Disable Tailnet Lock by presenting the disablement secret to control (Go
618        /// `tka.disable` → `/machine/tka/disable`).
619        ///
620        /// Targets the **current** authority head (read from the cached [`TkaStatus`]); the caller
621        /// supplies the `disablement_secret` out of band (it is the operator-held capability that
622        /// authorizes turning the lock off). Mirrors `tka_sign`: clones config + keys into a spawned
623        /// task (delegated reply). Returns [`TkaSyncError::Unsupported`] when there is no known TKA
624        /// head (lock not in use / control hasn't pushed a status), since there is nothing to disable.
625        ///
626        /// **Submit-only, like `tka_sign`:** this POSTs the disablement to control and does NOT mutate
627        /// the local [`Authority`](ts_tka::Authority). Control acts on the disablement; this node
628        /// observes the result through the existing verified-sync path. Verify-and-log unchanged.
629        #[message(ctx)]
630        pub fn tka_disable(
631            &self,
632            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
633            disablement_secret: Vec<u8>,
634        ) -> DelegatedReply<Result<(), TkaSyncError>> {
635            let (deleg, replier) = ctx.reply_sender();
636
637            if let Some(replier) = replier {
638                // Read the current head from the cached status BEFORE the spawn (can't borrow &self
639                // across the await). No head ⇒ no lock to disable ⇒ Unsupported.
640                let head = self.tka.borrow().as_ref().map(|s| s.head.clone());
641                let config = self.params.config.clone();
642                let keys = self.params.env.keys.clone();
643                tokio::spawn(async move {
644                    let result = match head {
645                        Some(head) => {
646                            let req = ts_control::TkaDisableRequest {
647                                // node_key + version are stamped by the RPC client from `keys`.
648                                version: Default::default(),
649                                node_key: keys.node_keys.public,
650                                head,
651                                disablement_secret,
652                            };
653                            tka_disable(&config.server_url, &keys, req, config.allow_http_key_fetch)
654                                .await
655                                .map(|_response| ())
656                        }
657                        None => Err(TkaSyncError::Unsupported),
658                    };
659                    replier.send(result);
660                });
661            }
662
663            deleg
664        }
665
666        /// Initialize Tailnet Lock with this node as the sole initial trusted key, gated by
667        /// `disablement_secret` (Go `LocalClient.NetworkLockInit` — the "lock yourself in" case).
668        ///
669        /// Builds + signs a genesis Checkpoint AUM whose only trusted key is this node's network-lock
670        /// public key (votes 1) and whose single DisablementValue is `disablement_value(secret)`, then
671        /// drives the two-phase init: `tka/init/begin` (submit the genesis) → if control needs no
672        /// further node signatures (`NeedSignatures` empty, the case when this node is the only key) →
673        /// `tka/init/finish` carrying the raw `disablement_secret` as `SupportDisablement`. Mirrors
674        /// `tka_sign`/`tka_disable`: cloned config + keys into a spawned task (delegated reply).
675        ///
676        /// If control returns a non-empty `NeedSignatures` (other nodes must be re-signed under the new
677        /// lock — a multi-node tailnet), this returns [`TkaSyncError::Unsupported`]: re-signing each
678        /// listed node (incl. the Rotation-key case) is a larger flow deferred to a fuller
679        /// `tka_init(keys, secrets)` — the single-node lock-init is the shipped subset.
680        ///
681        /// **Submit-only**, like `tka_sign`/`tka_disable`: this creates the lock at control and does
682        /// NOT seed the local [`Authority`](ts_tka::Authority) — the node picks up the new lock through
683        /// the existing verified netmap-sync (control pushes a `TKAInfo`, `maybe_sync_tka` bootstraps
684        /// the genesis through `VerifiedAumChain::verify`). Verify-and-log posture unchanged.
685        #[message(ctx)]
686        pub fn tka_init(
687            &self,
688            ctx: &mut Context<Self, DelegatedReply<Result<(), TkaSyncError>>>,
689            disablement_secret: Vec<u8>,
690        ) -> DelegatedReply<Result<(), TkaSyncError>> {
691            let (deleg, replier) = ctx.reply_sender();
692
693            if let Some(replier) = replier {
694                let config = self.params.config.clone();
695                let keys = self.params.env.keys.clone();
696                tokio::spawn(async move {
697                    let result = tka_init_run(&config, &keys, disablement_secret).await;
698                    replier.send(result);
699                });
700            }
701
702            deleg
703        }
704
705        /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
706        ///
707        /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
708        /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
709        /// does not block waiting for a value).
710        #[message]
711        pub fn cert_domains(&self) -> Vec<String> {
712            self.cert_domains.borrow().clone()
713        }
714
715        /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
716        /// control has sent no DNS config yet. An immediate answer (does not block); the facade
717        /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
718        #[message]
719        pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
720            self.dns_config.borrow().clone()
721        }
722
723        /// The interactive-login / consent URL control last asked this node to open
724        /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
725        /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
726        #[message]
727        pub fn pop_browser_url(&self) -> Option<url::Url> {
728            self.pop_browser_url.borrow().clone()
729        }
730
731        /// Subscribe to the interactive-login / consent URL cell (`MapResponse.PopBrowserURL`).
732        ///
733        /// Returns a [`watch::Receiver`] whose value is the latest running-node consent URL, used by
734        /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) to surface `browse_to_url`
735        /// events mid-session. The cell is sticky (updated only on a new non-empty URL, never reset
736        /// to `None` by an empty update — see the field docs), so a subscriber is not thrashed and a
737        /// late subscriber sees the current URL. The initial value is `None` until control sends one.
738        #[message(derive(Clone))]
739        pub fn watch_browser_url(&self) -> watch::Receiver<Option<url::Url>> {
740            self.pop_browser_url.subscribe()
741        }
742
743        /// The latest network-conditions report (preferred DERP region + per-region latencies). An
744        /// immediate answer (does not block); empty before the first DERP-latency measurement. The
745        /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
746        #[message]
747        pub fn netcheck(&self) -> crate::status::NetcheckReport {
748            self.netcheck.borrow().clone()
749        }
750
751        /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
752        ///
753        /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
754        /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
755        /// for the round-trip.
756        #[message(ctx)]
757        pub fn fetch_id_token(
758            &self,
759            ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
760            audience: String,
761        ) -> DelegatedReply<Result<String, IdTokenError>> {
762            let (deleg, replier) = ctx.reply_sender();
763
764            if let Some(replier) = replier {
765                let config = self.params.config.clone();
766                let keys = self.params.env.keys.clone();
767                tokio::spawn(async move {
768                    let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
769                    replier.send(result);
770                });
771            }
772
773            deleg
774        }
775
776        /// Log this node out of the tailnet: deregister it by expiring its current node key.
777        ///
778        /// Mirrors `fetch_id_token`: clones the control config + node keys
779        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
780        /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
781        /// control-plane state change only — it does NOT stop this actor or tear down the datapath
782        /// (the caller follows up with the normal runtime shutdown), and it does not touch the
783        /// on-disk node key, so re-registering with the same key is the re-login path.
784        #[message(ctx)]
785        pub fn logout(
786            &self,
787            ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
788        ) -> DelegatedReply<Result<(), LogoutError>> {
789            let (deleg, replier) = ctx.reply_sender();
790
791            if let Some(replier) = replier {
792                let config = self.params.config.clone();
793                let keys = self.params.env.keys.clone();
794                tokio::spawn(async move {
795                    let result = ts_control::logout(&config, &keys).await;
796                    replier.send(result);
797                });
798            }
799
800            deleg
801        }
802
803        /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
804        /// `LocalClient.SetDNS`).
805        ///
806        /// Mirrors `fetch_id_token`: clones the control config + node keys
807        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
808        /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
809        /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
810        /// match, so the surfaced API takes only `name` + `value`.
811        #[message(ctx)]
812        pub fn set_dns(
813            &self,
814            ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
815            name: String,
816            value: String,
817        ) -> DelegatedReply<Result<(), SetDnsError>> {
818            let (deleg, replier) = ctx.reply_sender();
819
820            if let Some(replier) = replier {
821                let config = self.params.config.clone();
822                let keys = self.params.env.keys.clone();
823                tokio::spawn(async move {
824                    let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
825                    replier.send(result);
826                });
827            }
828
829            deleg
830        }
831    }
832
833    /// The reply type of the [`get_cert_pair`](ControlRunner::get_cert_pair) message: the issued
834    /// `(cert_chain_pem, key_pem)` PEM pair (the `tnet cert` surface) or a [`ts_control::CertError`].
835    /// Aliased so the message's `Context` type stays under clippy's `type_complexity` bar (the
836    /// nested `Result<(String, String), _>` trips it inline).
837    #[cfg(feature = "acme")]
838    pub type CertPairReply = Result<(String, String), ts_control::CertError>;
839
840    // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
841    // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
842    // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
843    // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
844    // block keeps the default build clean.
845    #[cfg(feature = "acme")]
846    #[kameo::messages]
847    impl ControlRunner {
848        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
849        /// client-side ACME DNS-01 engine (`acme` feature).
850        ///
851        /// Mirrors `fetch_id_token`: clones the control config + node keys
852        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
853        /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
854        /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
855        ///
856        /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
857        /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
858        /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
859        /// allows it). Persisting a generated key back into the key file is the embedder's job (no
860        /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
861        /// publish 501s.
862        #[message(ctx)]
863        pub fn get_certificate(
864            &self,
865            ctx: &mut Context<
866                Self,
867                DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
868            >,
869            name: String,
870        ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
871            let (deleg, replier) = ctx.reply_sender();
872
873            if let Some(replier) = replier {
874                let config = self.params.config.clone();
875                let keys = self.params.env.keys.clone();
876                tokio::spawn(async move {
877                    let result = issue_certificate(&config, &keys, &name).await;
878                    replier.send(result);
879                });
880            }
881
882            deleg
883        }
884
885        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` and return the
886        /// **PEM pair** — `(cert_chain_pem, key_pem)` — for writing the on-disk `.crt` + `.key`
887        /// (the daemon's `tnet cert`, Go's `LocalClient.CertPair`). `acme` feature.
888        ///
889        /// Identical issuance to [`get_certificate`](Self::get_certificate) (same client-side ACME
890        /// DNS-01 flow, same set-dns publish, same account-key handling), only the *shape* of the
891        /// result differs: this surfaces the raw chain + leaf-key PEMs instead of the opaque
892        /// [`CertifiedKey`](ts_control::tls::CertifiedKey). The leaf **private key** PEM is the
893        /// second tuple element and is NEVER logged — the spawned task sends it straight back to the
894        /// replier. SaaS-only: against a self-hosted control plane the set-dns publish 501s.
895        #[message(ctx)]
896        pub fn get_cert_pair(
897            &self,
898            ctx: &mut Context<Self, DelegatedReply<CertPairReply>>,
899            name: String,
900        ) -> DelegatedReply<CertPairReply> {
901            let (deleg, replier) = ctx.reply_sender();
902
903            if let Some(replier) = replier {
904                let config = self.params.config.clone();
905                let keys = self.params.env.keys.clone();
906                tokio::spawn(async move {
907                    let result = issue_cert_pair(&config, &keys, &name).await;
908                    replier.send(result);
909                });
910            }
911
912            deleg
913        }
914    }
915}
916
917/// The `tka_init` body (the genesis-build + two-phase init/begin→init/finish choreography),
918/// factored out of the actor handler so it runs in the spawned task. See [`ControlRunner::tka_init`].
919///
920/// "Lock yourself in": the genesis trusts only this node's network-lock key (votes 1) and stores one
921/// DisablementValue = `disablement_value(secret)`. On a non-empty `NeedSignatures` (multi-node
922/// tailnet needing re-signs) it returns [`TkaSyncError::Unsupported`] — the single-node subset.
923async fn tka_init_run(
924    config: &ts_control::Config,
925    keys: &ts_keys::NodeState,
926    disablement_secret: Vec<u8>,
927) -> Result<(), TkaSyncError> {
928    // Build the genesis: this node's NL public key as the sole trusted key, one disablement value.
929    let nl_public = keys.network_lock_keys.public.to_bytes().to_vec();
930    let genesis_key = ts_tka::AumKey {
931        kind: ts_tka::KeyKind::Ed25519,
932        votes: 1,
933        public: nl_public,
934        meta: Vec::new(),
935    };
936    let dvalue = ts_tka::disablement_value(&disablement_secret).to_vec();
937    let mut genesis = ts_tka::Aum::new_genesis_checkpoint(vec![genesis_key], vec![dvalue])
938        // A malformed genesis is a local construction bug, not a transient RPC failure — surface it as a
939        // coarse internal error rather than NetworkError (which would invite a pointless retry).
940        .map_err(|_| TkaSyncError::Internal(ts_control::TkaSyncInternalErrorKind::SerDe))?;
941    genesis.sign(&keys.network_lock_keys.private.signing_key());
942
943    // Phase 1: submit the genesis. node_key + version are stamped by the RPC client from `keys`.
944    let begin_req = ts_control::TkaInitBeginRequest {
945        version: Default::default(),
946        node_key: keys.node_keys.public,
947        genesis_aum: genesis.serialize(),
948    };
949    let begin_resp = tka_init_begin(
950        &config.server_url,
951        keys,
952        begin_req,
953        config.allow_http_key_fetch,
954    )
955    .await?;
956
957    // Single-node case only: control must need no further node signatures. A non-empty
958    // NeedSignatures means other nodes must be re-signed under the new lock — deferred.
959    if !begin_resp.need_signatures.is_empty() {
960        tracing::warn!(
961            need = begin_resp.need_signatures.len(),
962            "tka_init: control requires re-signing other nodes; the multi-node init is not yet \
963             implemented (single-node lock-init only)"
964        );
965        return Err(TkaSyncError::Unsupported);
966    }
967
968    // Phase 2: finish, carrying the raw disablement secret as SupportDisablement (Go sends the raw
969    // secret here; only the genesis stores its Argon2i hash).
970    let finish_req = ts_control::TkaInitFinishRequest {
971        version: Default::default(),
972        node_key: keys.node_keys.public,
973        signatures: std::collections::BTreeMap::new(),
974        support_disablement: disablement_secret,
975    };
976    tka_init_finish(
977        &config.server_url,
978        keys,
979        finish_req,
980        config.allow_http_key_fetch,
981    )
982    .await
983    .map(|_response| ())
984}
985
986/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
987/// returning just the ready-to-serve [`CertifiedKey`](ts_control::tls::CertifiedKey) (the
988/// `get_certificate` / `ListenTLS` path).
989///
990/// Thin wrapper over [`issue_cert_pair`] that drops the PEMs — one issuance, this caller just
991/// doesn't need the on-disk pair. See [`issue_cert_pair`] for the account-key handling.
992#[cfg(feature = "acme")]
993async fn issue_certificate(
994    config: &ts_control::Config,
995    keys: &ts_keys::NodeState,
996    name: &str,
997) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
998    issue_cert_pair_inner(config, keys, name)
999        .await
1000        .map(|issued| issued.certified)
1001}
1002
1003/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
1004/// returning the **PEM pair** `(cert_chain_pem, key_pem)` for the daemon's on-disk `.crt`/`.key`
1005/// (`tnet cert`, Go `LocalClient.CertPair`).
1006///
1007/// Same single issuance as [`issue_certificate`]; only the result shape differs. The leaf
1008/// **private key** PEM is the second element and is NEVER logged here.
1009#[cfg(feature = "acme")]
1010async fn issue_cert_pair(
1011    config: &ts_control::Config,
1012    keys: &ts_keys::NodeState,
1013    name: &str,
1014) -> Result<(String, String), ts_control::CertError> {
1015    issue_cert_pair_inner(config, keys, name)
1016        .await
1017        .map(|issued| (issued.cert_chain_pem, issued.key_pem))
1018}
1019
1020/// Shared issuance core for [`issue_certificate`] and [`issue_cert_pair`]: load (or generate) the
1021/// ACME account key, target Let's Encrypt production, and run one DNS-01 issuance, returning the
1022/// full [`IssuedCert`](ts_control::acme::IssuedCert) so each caller projects out what it needs (one
1023/// ACME order, two consumers).
1024///
1025/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
1026/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
1027/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
1028/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]). Never logs the leaf
1029/// private key.
1030#[cfg(feature = "acme")]
1031async fn issue_cert_pair_inner(
1032    config: &ts_control::Config,
1033    keys: &ts_keys::NodeState,
1034    name: &str,
1035) -> Result<ts_control::acme::IssuedCert, ts_control::CertError> {
1036    let account_key = match keys.acme_account_key.as_deref() {
1037        Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
1038        None => {
1039            tracing::debug!(
1040                "no persisted ACME account key in key state; generating an ephemeral per-call key \
1041                 (a new ACME account this issuance — not persisted back)"
1042            );
1043            ts_control::acme::AcmeAccountKey::generate()?.0
1044        }
1045    };
1046    let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
1047        .parse()
1048        .map_err(|e| {
1049            ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
1050        })?;
1051    ts_control::issue_cert_pair_via_setdns(config, keys, name, &account_key, &directory).await
1052}
1053
1054impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
1055    type Reply = ();
1056
1057    async fn handle(
1058        &mut self,
1059        msg: StreamMessage<Arc<StateUpdate>, (), ()>,
1060        ctx: &mut Context<Self, Self::Reply>,
1061    ) {
1062        match msg {
1063            StreamMessage::Started(_) => {
1064                tracing::trace!("started listening to state updates");
1065            }
1066
1067            StreamMessage::Next(msg) => {
1068                if let Some(node) = msg.node.as_ref() {
1069                    // Reflect node-key expiry into the device state: control delivering a self-node
1070                    // whose key is in the past means the node must re-authenticate. Otherwise the
1071                    // arrival of a fresh self-node confirms we are Running (recovers the state if a
1072                    // prior update had flipped it to Expired).
1073                    let now_unix = std::time::SystemTime::now()
1074                        .duration_since(std::time::UNIX_EPOCH)
1075                        .map(|d| d.as_secs() as i64)
1076                        .unwrap_or(0);
1077                    let next = if node.key_expired_at_unix(now_unix) {
1078                        crate::DeviceState::Expired
1079                    } else {
1080                        crate::DeviceState::Running
1081                    };
1082                    // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
1083                    // self-node arrives on every netmap update).
1084                    self.params.state_tx.send_if_modified(|s| {
1085                        if *s != next {
1086                            *s = next.clone();
1087                            true
1088                        } else {
1089                            false
1090                        }
1091                    });
1092
1093                    self.self_node.send_replace(Some(node.clone()));
1094                }
1095
1096                if let Some(policy) = msg.ssh_policy.as_ref() {
1097                    self.ssh_policy.send_replace(Some(policy.clone()));
1098                }
1099
1100                if let Some(tka) = msg.tka.as_ref() {
1101                    self.tka.send_replace(Some(tka.clone()));
1102                    self.maybe_sync_tka(tka, ctx.actor_ref().clone());
1103                }
1104
1105                // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
1106                // An update with no DNS config, or one carrying no cert domains, means "none" — Go
1107                // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
1108                let cert_domains = msg
1109                    .dns_config
1110                    .as_ref()
1111                    .map(|d| d.cert_domains.clone())
1112                    .unwrap_or_default();
1113                self.cert_domains.send_replace(cert_domains);
1114
1115                // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
1116                // `None` when control sent no DNS config on this update — distinct from a present but
1117                // empty config (Go `netmap.NetworkMap.DNS`).
1118                self.dns_config.send_replace(msg.dns_config.clone());
1119
1120                // Track the interactive-login URL for `Device::pop_browser_url` /
1121                // `Runtime::watch_ipn_bus`. See `sticky_update_pop_browser_url` for the Go-faithful
1122                // sticky semantics (update only on a new non-empty URL; never reset to `None`).
1123                sticky_update_pop_browser_url(&self.pop_browser_url, msg.pop_browser_url.as_ref());
1124
1125                if let Err(e) = self.params.env.publish(msg).await {
1126                    tracing::error!(error = %e, "publishing netmap update");
1127                }
1128            }
1129
1130            StreamMessage::Finished(_) => {
1131                tracing::error!("state update stream terminated")
1132            }
1133        }
1134    }
1135}
1136
1137/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
1138/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
1139/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
1140/// [`ControlRunner::apply_tka_synced`](ControlRunner).
1141#[doc(hidden)]
1142pub struct TkaSynced {
1143    pub(crate) result:
1144        Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
1145    /// The [`ControlRunner::tka_generation`] captured when this sync was spawned; the handler
1146    /// discards the result if it no longer matches (the lock was disabled/re-synced mid-flight).
1147    pub(crate) generation: u64,
1148}
1149
1150impl Message<TkaSynced> for ControlRunner {
1151    type Reply = ();
1152
1153    async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
1154        self.apply_tka_synced(msg.result, msg.generation).await;
1155    }
1156}
1157
1158impl Message<DerpLatencyMeasurement> for ControlRunner {
1159    type Reply = ();
1160
1161    async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
1162        let measurements = msg.measurement.as_ref().clone();
1163
1164        // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
1165        // the same measurements, before the home-region short-circuit below — an empty set still
1166        // yields a (default/empty) report rather than a stale one.
1167        self.netcheck
1168            .send_replace(crate::status::NetcheckReport::from_region_results(
1169                &measurements,
1170            ));
1171
1172        let Some(result) = measurements.first() else {
1173            tracing::debug!("derp latency measurements empty");
1174            return;
1175        };
1176
1177        let iter = measurements.iter().map(|result| {
1178            (
1179                result.latency_map_key.as_str(),
1180                result.latency.as_secs_f64(),
1181            )
1182        });
1183
1184        tracing::debug!(selected_region_id = ?result.id, "updating home region");
1185
1186        self.client.set_home_region(result.id, iter).await;
1187    }
1188}
1189
1190impl Message<EndpointAdvertisement> for ControlRunner {
1191    type Reply = ();
1192
1193    async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
1194        let endpoints: Vec<Endpoint> = msg
1195            .endpoints
1196            .iter()
1197            .map(|ep| Endpoint {
1198                endpoint: ep.addr,
1199                ty: match ep.ty {
1200                    SelfEndpointType::Local => EndpointType::Local,
1201                    SelfEndpointType::Stun => EndpointType::Stun,
1202                    SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
1203                },
1204            })
1205            .collect();
1206
1207        tracing::debug!(
1208            n_endpoints = endpoints.len(),
1209            "advertising endpoints to control"
1210        );
1211
1212        self.client.set_endpoints(endpoints).await;
1213    }
1214}
1215
1216/// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control — the wire
1217/// half of a runtime [`Runtime::set_advertise_routes`](crate::Runtime::set_advertise_routes). Sent
1218/// as a direct `ask` from the runtime (not over the bus), so the route change reaches the live
1219/// map-poll client. `routes` is the final advertised set the caller wants control to grant.
1220#[derive(Debug)]
1221pub struct SetAdvertiseRoutes {
1222    /// The prefixes to advertise to control (already filtered to the final set).
1223    pub routes: Vec<ipnet::IpNet>,
1224}
1225
1226impl Message<SetAdvertiseRoutes> for ControlRunner {
1227    type Reply = ();
1228
1229    async fn handle(&mut self, msg: SetAdvertiseRoutes, _ctx: &mut Context<Self, Self::Reply>) {
1230        tracing::debug!(n_routes = msg.routes.len(), "advertising routes to control");
1231        self.client.set_routable_ips(msg.routes).await;
1232    }
1233}
1234
1235/// Update this node's `Hostinfo.Hostname` at control — the wire half of a runtime
1236/// [`Runtime::set_hostname`](crate::Runtime::set_hostname). A direct `ask` from the runtime, so the
1237/// change reaches the live map-poll client.
1238#[derive(Debug)]
1239pub struct SetHostname {
1240    /// The new hostname to report to control.
1241    pub hostname: String,
1242}
1243
1244impl Message<SetHostname> for ControlRunner {
1245    type Reply = ();
1246
1247    async fn handle(&mut self, msg: SetHostname, _ctx: &mut Context<Self, Self::Reply>) {
1248        tracing::debug!("updating hostname at control");
1249        self.client.set_hostname(msg.hostname).await;
1250    }
1251}
1252
1253#[cfg(test)]
1254mod reauth_bridge_tests {
1255    use tokio::sync::watch;
1256
1257    use super::bridge_reauth_url_to_state;
1258    use crate::DeviceState;
1259
1260    fn url(s: &str) -> url::Url {
1261        s.parse().unwrap()
1262    }
1263
1264    /// The bridge maps a surfaced re-auth URL onto `DeviceState::NeedsLogin(url)` — the fix's core:
1265    /// a mid-session `MachineNotAuthorized` (forwarded by the control client as `Some(url)`) becomes
1266    /// the "needs login" state the IPN bus turns into `browse_to_url`.
1267    #[test]
1268    fn bridge_maps_auth_url_to_needs_login() {
1269        let u = url("https://login.example/auth");
1270        let (tx, rx) = watch::channel(DeviceState::Running);
1271
1272        bridge_reauth_url_to_state(&tx, Some(&u));
1273
1274        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1275    }
1276
1277    /// `None` never drives a transition — the recovery to `Running` is the netmap self-node
1278    /// handler's job, so the bridge ignores a `None` and leaves the state untouched.
1279    #[test]
1280    fn bridge_none_leaves_state_unchanged() {
1281        let (tx, rx) = watch::channel(DeviceState::Running);
1282
1283        bridge_reauth_url_to_state(&tx, None);
1284
1285        assert_eq!(*rx.borrow(), DeviceState::Running);
1286    }
1287
1288    /// Re-surfacing the same URL across retries does not re-fire the watch (`send_if_modified`
1289    /// dedupe against the cell's current value), so a stuck re-auth does not thrash subscribers.
1290    #[test]
1291    fn bridge_same_url_does_not_refire() {
1292        let u = url("https://login.example/auth");
1293        let (tx, mut rx) = watch::channel(DeviceState::Running);
1294
1295        bridge_reauth_url_to_state(&tx, Some(&u)); // first: fires
1296        assert!(rx.has_changed().unwrap(), "first NeedsLogin fires");
1297        rx.mark_unchanged();
1298        bridge_reauth_url_to_state(&tx, Some(&u)); // same URL: deduped
1299        assert!(
1300            !rx.has_changed().unwrap(),
1301            "the same re-auth URL must not re-fire the state watch"
1302        );
1303    }
1304
1305    /// A genuinely different re-auth URL after a prior one fires again (the dedupe tracks changes,
1306    /// it does not pin the first URL forever).
1307    #[test]
1308    fn bridge_new_url_after_prior_fires() {
1309        let a = url("https://login.example/a");
1310        let b = url("https://login.example/b");
1311        let (tx, rx) = watch::channel(DeviceState::Running);
1312
1313        bridge_reauth_url_to_state(&tx, Some(&a));
1314        bridge_reauth_url_to_state(&tx, Some(&b));
1315
1316        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(b));
1317    }
1318
1319    /// End-to-end of the *clear* contract: after the bridge sets `NeedsLogin`, the netmap self-node
1320    /// path (modeled here as a direct `send_replace(Running)`, the exact transition the
1321    /// `StreamMessage::Next` handler performs on the next good self-node) flips back to `Running`.
1322    /// This pins that the bridge does NOT need a `None`-clear arm — recovery is owned elsewhere.
1323    #[test]
1324    fn running_netmap_clears_needs_login() {
1325        let u = url("https://login.example/auth");
1326        let (tx, rx) = watch::channel(DeviceState::Running);
1327
1328        bridge_reauth_url_to_state(&tx, Some(&u));
1329        assert_eq!(*rx.borrow(), DeviceState::NeedsLogin(u));
1330
1331        // The self-node handler's recovery transition (next good netmap self-node → Running).
1332        tx.send_replace(DeviceState::Running);
1333        assert_eq!(*rx.borrow(), DeviceState::Running);
1334    }
1335}
1336
1337#[cfg(test)]
1338mod sticky_pop_browser_url_tests {
1339    use tokio::sync::watch;
1340
1341    use super::sticky_update_pop_browser_url;
1342
1343    fn url(s: &str) -> url::Url {
1344        s.parse().unwrap()
1345    }
1346
1347    /// A non-empty URL publishes to the cell.
1348    #[test]
1349    fn non_empty_url_publishes() {
1350        let (tx, rx) = watch::channel(None);
1351        let u = url("https://login.example/consent");
1352        sticky_update_pop_browser_url(&tx, Some(&u));
1353        assert_eq!(*rx.borrow(), Some(u));
1354    }
1355
1356    /// An absent (`None`) update — the common netmap tick — must NOT reset the cell. This is the
1357    /// regression guard for the thrash bug (a reset-every-tick would coalesce the URL away on the bus).
1358    #[test]
1359    fn absent_update_does_not_reset() {
1360        let u = url("https://login.example/consent");
1361        let (tx, rx) = watch::channel(Some(u.clone()));
1362        // Simulate many empty netmap updates.
1363        for _ in 0..5 {
1364            sticky_update_pop_browser_url(&tx, None);
1365        }
1366        assert_eq!(
1367            *rx.borrow(),
1368            Some(u),
1369            "empty updates must not clear the URL"
1370        );
1371    }
1372
1373    /// The same URL repeated does not re-fire the watch (in-place dedupe via `send_if_modified`), so
1374    /// a subscriber isn't woken spuriously. Proven by the borrow not having been marked changed.
1375    #[test]
1376    fn repeated_same_url_does_not_refire() {
1377        let u = url("https://login.example/consent");
1378        let (tx, mut rx) = watch::channel(None);
1379        sticky_update_pop_browser_url(&tx, Some(&u)); // first: fires
1380        assert!(rx.has_changed().unwrap(), "first non-empty URL fires");
1381        rx.mark_unchanged();
1382        sticky_update_pop_browser_url(&tx, Some(&u)); // same: deduped
1383        assert!(
1384            !rx.has_changed().unwrap(),
1385            "repeating the same URL must not re-fire the watch"
1386        );
1387    }
1388
1389    /// A genuinely new URL after a prior one fires again (sticky but tracks changes).
1390    #[test]
1391    fn new_url_after_prior_fires() {
1392        let a = url("https://login.example/a");
1393        let b = url("https://login.example/b");
1394        let (tx, rx) = watch::channel(None);
1395        sticky_update_pop_browser_url(&tx, Some(&a));
1396        sticky_update_pop_browser_url(&tx, Some(&b));
1397        assert_eq!(*rx.borrow(), Some(b));
1398    }
1399
1400    /// The realistic session sequence: a URL stays sticky through a run of `None` ticks, and a
1401    /// *different* URL after that gap still fires. Chains the legs the other tests cover in isolation
1402    /// (the actual control cadence is "URL, then many empty updates, then maybe a new URL").
1403    #[test]
1404    fn sticky_through_none_gap_then_new_url_fires() {
1405        let a = url("https://login.example/a");
1406        let b = url("https://login.example/b");
1407        let (tx, rx) = watch::channel(None);
1408        sticky_update_pop_browser_url(&tx, Some(&a));
1409        for _ in 0..3 {
1410            sticky_update_pop_browser_url(&tx, None);
1411        }
1412        assert_eq!(*rx.borrow(), Some(a), "stayed sticky through the None gap");
1413        sticky_update_pop_browser_url(&tx, Some(&b));
1414        assert_eq!(
1415            *rx.borrow(),
1416            Some(b),
1417            "a new URL after a None gap still fires"
1418        );
1419    }
1420
1421    /// Returning to a previously-seen URL (A → B → A) re-fires: the dedupe is against the cell's
1422    /// *current* value, not a full history, so A after B is a genuine change.
1423    #[test]
1424    fn returning_to_prior_url_refires() {
1425        let a = url("https://login.example/a");
1426        let b = url("https://login.example/b");
1427        let (tx, mut rx) = watch::channel(None);
1428        sticky_update_pop_browser_url(&tx, Some(&a));
1429        sticky_update_pop_browser_url(&tx, Some(&b));
1430        rx.mark_unchanged();
1431        sticky_update_pop_browser_url(&tx, Some(&a)); // back to A: differs from current (B) → fires
1432        assert!(
1433            rx.has_changed().unwrap(),
1434            "returning to a prior URL re-fires"
1435        );
1436        assert_eq!(*rx.borrow(), Some(a));
1437    }
1438
1439    /// End-to-end de-thrash: feed a realistic netmap cadence (empty, empty, URL, empty, empty)
1440    /// through the producer into a cell, and count the changes a `run_bus`-style subscriber would
1441    /// observe via `changed()`. The whole point of the fix is that exactly ONE change survives the
1442    /// surrounding `None` thrash — the pre-fix code (`send_replace` every tick) would have woken the
1443    /// subscriber on every empty tick and coalesced the URL away. This exercises the producer + the
1444    /// watch-subscribe path together (the two halves the unit tests cover in isolation).
1445    #[tokio::test]
1446    async fn end_to_end_one_change_survives_none_thrash() {
1447        let u = url("https://login.example/consent");
1448        let (tx, mut rx) = watch::channel(None);
1449        // The cadence control actually sends: mostly-empty MapResponses with one carrying the URL.
1450        let cadence = [None, None, Some(&u), None, None];
1451        for incoming in cadence {
1452            sticky_update_pop_browser_url(&tx, incoming);
1453        }
1454        // A subscriber sees exactly one change, and it carries the URL (not a coalesced `None`).
1455        let mut changes = 0;
1456        while rx.has_changed().unwrap() {
1457            let v = rx.borrow_and_update().clone();
1458            changes += 1;
1459            assert_eq!(v, Some(u.clone()), "the surviving change carries the URL");
1460        }
1461        assert_eq!(changes, 1, "exactly one change survives the None thrash");
1462    }
1463}