Skip to main content

ts_runtime/
control_runner.rs

1use core::{
2    net::{Ipv4Addr, Ipv6Addr},
3    time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9    actor::{ActorRef, Spawn},
10    message::{Context, StreamMessage},
11    prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15    AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16    Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21    derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22    direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29    client: AsyncControlClient,
30    params: Params,
31
32    self_node: watch::Sender<Option<Node>>,
33    /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34    /// server reads this to authorize incoming connections; absent policy means deny-all.
35    ssh_policy: watch::Sender<Option<SshPolicy>>,
36    /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37    tka: watch::Sender<Option<TkaStatus>>,
38    /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
39    /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
40    /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
41    /// the result returns via the [`TkaSynced`] self-message).
42    tka_synced: Option<crate::tka_sync::SyncedTka>,
43    /// Published copy of the synced TKA [`Authority`](ts_tka::Authority) for the verify-and-log
44    /// consumer. `None` until the first successful sync. Observe-only: a reader uses it to *log*
45    /// whether a peer's node-key signature verifies, never to drop a peer (enforcement is a separate
46    /// gated decision).
47    tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
48    /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
49    /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
50    tka_syncing: bool,
51    /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
52    /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
53    cert_domains: watch::Sender<Vec<String>>,
54    /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
55    /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
56    /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
57    /// own cell for the narrower TLS-cert use.
58    dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
59    /// Latest interactive-login / consent URL control asked this node to open
60    /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
61    /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user), and
62    /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) subscribes to it for the bus's
63    /// `browse_to_url` running-node events.
64    ///
65    /// **Sticky, not per-update** (Go `controlclient` `sess.lastPopBrowserURL`): control sends
66    /// `MapResponse.PopBrowserURL` empty on nearly every netmap tick, so this cell is updated ONLY on
67    /// a non-empty URL that differs from its current value (`sticky_update_pop_browser_url`, via
68    /// `send_if_modified` — the cell's own value is the "last URL seen", so no separate mirror is
69    /// needed). It is never reset to `None` by an empty update — matching Go's `direct.go` guard
70    /// `u != "" && u != sess.lastPopBrowserURL`. Updating on every tick would thrash the cell to
71    /// `None` and coalesce the URL away for a `watch` subscriber.
72    pop_browser_url: watch::Sender<Option<url::Url>>,
73    /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
74    /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
75    /// daemon's `tnet netcheck`). Empty until the first measurement.
76    netcheck: watch::Sender<crate::status::NetcheckReport>,
77}
78
79/// Control runner args.
80pub struct Params {
81    /// Control config.
82    pub(crate) config: ts_control::Config,
83
84    /// Auth key (if needed).
85    pub(crate) auth_key: Option<String>,
86
87    /// The [`crate::Env`] for this actor.
88    pub(crate) env: crate::Env,
89
90    /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
91    /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
92    /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
93    /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
94    pub(crate) state_tx: watch::Sender<crate::DeviceState>,
95}
96
97#[doc(hidden)]
98#[derive(Debug, thiserror::Error)]
99pub enum ControlRunnerError {
100    #[error(transparent)]
101    Control(#[from] ControlError),
102
103    #[error(transparent)]
104    Crate(#[from] crate::Error),
105}
106
107impl kameo::Actor for ControlRunner {
108    type Args = Params;
109    type Error = ControlRunnerError;
110
111    async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
112        loop {
113            match AsyncControlClient::check_auth(
114                &params.config,
115                &params.env.keys,
116                params.auth_key.as_deref(),
117            )
118            .await
119            {
120                Ok(()) => break,
121                Err(ControlError::MachineNotAuthorized(u)) => {
122                    tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
123                    // Surface "interactive login required" so a watcher / `wait_until_running` can
124                    // tell the user to authorize, instead of seeing an opaque timeout. Registration
125                    // keeps retrying (transient), so this is not a terminal `Failed`.
126                    params
127                        .state_tx
128                        .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
129                    tokio::time::sleep(Duration::from_secs(5)).await;
130                }
131                Err(e) => {
132                    // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
133                    // specific reason control gave AND publish it as a typed `Failed` state so
134                    // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
135                    // of the opaque `Internal(Actor)` the caller would otherwise see once the
136                    // stopped actor is next asked. Publishing before `return Err` is why the state
137                    // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
138                    let reason = crate::RegistrationError::from(&e);
139                    tracing::error!(error = %e, "registration failed; control runner stopping");
140                    params
141                        .state_tx
142                        .send_replace(crate::DeviceState::Failed(reason));
143                    return Err(e.into());
144                }
145            }
146        }
147        // check_auth succeeded, but the node is not "up" until the netmap stream is actually
148        // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
149        // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
150        // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
151        // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
152        let bring_up = async {
153            let (client, stream) = AsyncControlClient::connect(
154                &params.config,
155                &params.env.keys,
156                params.auth_key.as_deref(),
157            )
158            .await?;
159
160            DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
161
162            params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
163            params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
164            slf.attach_stream(stream.boxed(), (), ());
165            Ok::<_, ControlRunnerError>(client)
166        };
167
168        let client = match bring_up.await {
169            Ok(client) => client,
170            Err(e) => {
171                tracing::error!(error = %e, "bringing up the control session failed");
172                // The control session never came up; surface it as a transient registration
173                // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
174                // stuck at `Connecting`.
175                params.state_tx.send_replace(crate::DeviceState::Failed(
176                    crate::RegistrationError::NetworkUnreachable,
177                ));
178                return Err(e);
179            }
180        };
181
182        // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
183        // current (and flips to `Expired` if the self-node's key lapses).
184        params.state_tx.send_replace(crate::DeviceState::Running);
185
186        Ok(Self {
187            client,
188            params,
189            self_node: Default::default(),
190            ssh_policy: Default::default(),
191            tka: Default::default(),
192            tka_synced: None,
193            tka_authority: Default::default(),
194            tka_syncing: false,
195            cert_domains: Default::default(),
196            dns_config: Default::default(),
197            pop_browser_url: Default::default(),
198            netcheck: Default::default(),
199        })
200    }
201}
202
203impl ControlRunner {
204    /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
205    /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
206    /// round-trip). The result returns via the [`TkaSynced`] self-message.
207    ///
208    /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
209    /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
210    /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
211    /// `tkaSyncIfNeeded`: a no-op when our head already matches.
212    fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
213        if !tka.is_enabled() {
214            // Lock disabled (or never enabled): drop any synced state and stop publishing an
215            // Authority. Never an error; peers are unaffected.
216            if self.tka_synced.is_some() {
217                self.tka_synced = None;
218                self.tka_authority.send_replace(None);
219            }
220            return;
221        }
222        if self.tka_syncing {
223            return; // a sync is already in flight; the next netmap will re-trigger if still stale
224        }
225        // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
226        // do. A malformed control head is treated as "different" (we'll attempt a sync, which
227        // fail-closes harmlessly).
228        if let Some(synced) = &self.tka_synced
229            && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
230            && synced.authority.head_matches(&control_head)
231        {
232            return;
233        }
234
235        // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
236        // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
237        // `tka_syncing` so we don't spawn a second concurrent sync.
238        self.tka_syncing = true;
239        let current = self.tka_synced.take();
240        let config = self.params.config.clone();
241        let keys = self.params.env.keys.clone();
242        tokio::spawn(async move {
243            let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
244            // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
245            // not allowed). A send failure just means the actor is gone — nothing to do.
246            if let Err(e) = self_ref.tell(TkaSynced { result }).await {
247                tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
248            }
249        });
250    }
251
252    /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
253    /// state + publish the `Authority` (or, on inert/failed sync, leave peers unaffected). Always
254    /// clears the in-flight guard.
255    async fn apply_tka_synced(
256        &mut self,
257        result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
258    ) {
259        self.tka_syncing = false;
260        match result {
261            Ok(Some(synced)) => {
262                tracing::info!(
263                    head = %synced.authority.head().to_base32(),
264                    "TKA sync succeeded; publishing verified Authority (observe-only)"
265                );
266                self.tka_authority
267                    .send_replace(Some(synced.authority.clone()));
268                // Deliver the verified Authority to the peer tracker's observe-only verify-and-log
269                // seam (#136) over the bus. Re-published on every successful sync (no bus replay).
270                if let Err(e) = self
271                    .params
272                    .env
273                    .publish(crate::peer_tracker::TkaAuthorityUpdate(
274                        synced.authority.clone(),
275                    ))
276                    .await
277                {
278                    tracing::warn!(error = %e, "publishing TKA authority to peer tracker failed");
279                }
280                self.tka_synced = Some(synced);
281            }
282            Ok(None) => {
283                // Control has no lock for us (no genesis / disabled): stay inert. Not an error.
284                tracing::debug!("TKA sync: control reported no lock for this node (inert)");
285            }
286            Err(e) => {
287                // Transport or verify failure: log and stay inert. NEVER errors the netmap or drops a
288                // peer. The next netmap update re-triggers a sync attempt.
289                tracing::warn!(error = %e, "TKA sync failed; staying inert (no peer impact)");
290            }
291        }
292    }
293
294    fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
295    where
296        F: FnOnce(&Node) -> R,
297    {
298        let mut sub = self.self_node.subscribe();
299        let mut shutdown = self.params.env.shutdown.clone();
300
301        async move {
302            tokio::select! {
303                _ = shutdown.wait_for(|x| *x) => {
304                    None
305                },
306                node = sub.wait_for(Option::is_some) => {
307                    Some(f(node.ok()?.as_ref()?))
308                },
309            }
310        }
311    }
312}
313
314/// Apply Go's sticky `PopBrowserURL` semantics to the consent-URL `watch` cell.
315///
316/// Control sends `MapResponse.PopBrowserURL` empty on nearly every netmap update, so the cell is
317/// updated ONLY when `incoming` is a non-empty URL that differs from the cell's current value —
318/// Go's `direct.go` guard `u != "" && u != sess.lastPopBrowserURL`. The cell is **never reset to
319/// `None`** by an empty/absent update — the running-node consent URL is sticky for the session.
320/// Updating unconditionally would thrash the cell to `None` on every tick and coalesce the URL away
321/// for a `watch`/bus subscriber.
322///
323/// The dedupe is in-place via [`watch::Sender::send_if_modified`] — the cell's own value is the
324/// "last URL sent" (this sticky path is its only writer), so no separate mirror field is needed and
325/// the watch is woken only on a genuine change (Go's `sess.lastPopBrowserURL` role, for free). This
326/// matches the [`send_if_modified`](watch::Sender::send_if_modified) idiom already used for the
327/// device-state cell in this handler.
328///
329/// Factored out of the netmap-update handler so the (easy-to-regress) sticky logic is unit-testable
330/// against a plain `watch` channel without standing up the actor.
331fn sticky_update_pop_browser_url(
332    cell: &watch::Sender<Option<url::Url>>,
333    incoming: Option<&url::Url>,
334) {
335    if let Some(url) = incoming {
336        cell.send_if_modified(|current| {
337            if current.as_ref() == Some(url) {
338                false
339            } else {
340                *current = Some(url.clone());
341                true
342            }
343        });
344    }
345}
346
347// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
348// those generated fields carry no doc and can't take attributes, so wrap in a module where
349// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
350// are re-exported so callers keep referencing them at `control_runner::<Name>`.
351pub use msg_impl::*;
352
353#[allow(missing_docs)]
354mod msg_impl {
355    use kameo::{message::Context, reply::DelegatedReply};
356
357    use super::*;
358
359    #[kameo::messages]
360    impl ControlRunner {
361        /// Fetch the IPv4 address for this tailscale device.
362        #[message(ctx)]
363        pub fn ipv4(
364            &self,
365            ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
366        ) -> DelegatedReply<Option<Ipv4Addr>> {
367            let (deleg, replier) = ctx.reply_sender();
368
369            if let Some(replier) = replier {
370                let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
371
372                tokio::spawn(async move {
373                    let ip = fut.await;
374                    replier.send(ip);
375                });
376            }
377
378            deleg
379        }
380
381        /// Fetch the IPv6 address for this tailscale device.
382        #[message(ctx)]
383        pub fn ipv6(
384            &self,
385            ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
386        ) -> DelegatedReply<Option<Ipv6Addr>> {
387            let (deleg, replier) = ctx.reply_sender();
388
389            if let Some(replier) = replier {
390                let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
391
392                tokio::spawn(async move {
393                    let ip = fut.await;
394                    replier.send(ip);
395                });
396            }
397
398            deleg
399        }
400
401        /// Fetch the self node for this tailscale device.
402        #[message(ctx)]
403        pub fn self_node(
404            &self,
405            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
406        ) -> DelegatedReply<Option<Node>> {
407            let (deleg, replier) = ctx.reply_sender();
408
409            if let Some(replier) = replier {
410                let node = self.with_self_node(|node| node.clone());
411
412                tokio::spawn(async move {
413                    let node = node.await;
414                    replier.send(node)
415                });
416            }
417
418            deleg
419        }
420
421        /// Fetch the current Tailscale SSH policy, if control has pushed one.
422        ///
423        /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
424        /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
425        /// for a value: an absent policy is a legitimate, immediate answer.
426        #[message]
427        pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
428            self.ssh_policy.borrow().clone()
429        }
430
431        /// Fetch the current Tailnet Lock status, if control has pushed one.
432        ///
433        /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
434        #[message]
435        pub fn current_tka_status(&self) -> Option<TkaStatus> {
436            self.tka.borrow().clone()
437        }
438
439        /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
440        ///
441        /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
442        /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
443        /// does not block waiting for a value).
444        #[message]
445        pub fn cert_domains(&self) -> Vec<String> {
446            self.cert_domains.borrow().clone()
447        }
448
449        /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
450        /// control has sent no DNS config yet. An immediate answer (does not block); the facade
451        /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
452        #[message]
453        pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
454            self.dns_config.borrow().clone()
455        }
456
457        /// The interactive-login / consent URL control last asked this node to open
458        /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
459        /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
460        #[message]
461        pub fn pop_browser_url(&self) -> Option<url::Url> {
462            self.pop_browser_url.borrow().clone()
463        }
464
465        /// Subscribe to the interactive-login / consent URL cell (`MapResponse.PopBrowserURL`).
466        ///
467        /// Returns a [`watch::Receiver`] whose value is the latest running-node consent URL, used by
468        /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) to surface `browse_to_url`
469        /// events mid-session. The cell is sticky (updated only on a new non-empty URL, never reset
470        /// to `None` by an empty update — see the field docs), so a subscriber is not thrashed and a
471        /// late subscriber sees the current URL. The initial value is `None` until control sends one.
472        #[message(derive(Clone))]
473        pub fn watch_browser_url(&self) -> watch::Receiver<Option<url::Url>> {
474            self.pop_browser_url.subscribe()
475        }
476
477        /// The latest network-conditions report (preferred DERP region + per-region latencies). An
478        /// immediate answer (does not block); empty before the first DERP-latency measurement. The
479        /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
480        #[message]
481        pub fn netcheck(&self) -> crate::status::NetcheckReport {
482            self.netcheck.borrow().clone()
483        }
484
485        /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
486        ///
487        /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
488        /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
489        /// for the round-trip.
490        #[message(ctx)]
491        pub fn fetch_id_token(
492            &self,
493            ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
494            audience: String,
495        ) -> DelegatedReply<Result<String, IdTokenError>> {
496            let (deleg, replier) = ctx.reply_sender();
497
498            if let Some(replier) = replier {
499                let config = self.params.config.clone();
500                let keys = self.params.env.keys.clone();
501                tokio::spawn(async move {
502                    let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
503                    replier.send(result);
504                });
505            }
506
507            deleg
508        }
509
510        /// Log this node out of the tailnet: deregister it by expiring its current node key.
511        ///
512        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
513        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
514        /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
515        /// control-plane state change only — it does NOT stop this actor or tear down the datapath
516        /// (the caller follows up with the normal runtime shutdown), and it does not touch the
517        /// on-disk node key, so re-registering with the same key is the re-login path.
518        #[message(ctx)]
519        pub fn logout(
520            &self,
521            ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
522        ) -> DelegatedReply<Result<(), LogoutError>> {
523            let (deleg, replier) = ctx.reply_sender();
524
525            if let Some(replier) = replier {
526                let config = self.params.config.clone();
527                let keys = self.params.env.keys.clone();
528                tokio::spawn(async move {
529                    let result = ts_control::logout(&config, &keys).await;
530                    replier.send(result);
531                });
532            }
533
534            deleg
535        }
536
537        /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
538        /// `LocalClient.SetDNS`).
539        ///
540        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
541        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
542        /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
543        /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
544        /// match, so the surfaced API takes only `name` + `value`.
545        #[message(ctx)]
546        pub fn set_dns(
547            &self,
548            ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
549            name: String,
550            value: String,
551        ) -> DelegatedReply<Result<(), SetDnsError>> {
552            let (deleg, replier) = ctx.reply_sender();
553
554            if let Some(replier) = replier {
555                let config = self.params.config.clone();
556                let keys = self.params.env.keys.clone();
557                tokio::spawn(async move {
558                    let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
559                    replier.send(result);
560                });
561            }
562
563            deleg
564        }
565    }
566
567    /// The reply type of the [`get_cert_pair`](ControlRunner::get_cert_pair) message: the issued
568    /// `(cert_chain_pem, key_pem)` PEM pair (the `tnet cert` surface) or a [`ts_control::CertError`].
569    /// Aliased so the message's `Context` type stays under clippy's `type_complexity` bar (the
570    /// nested `Result<(String, String), _>` trips it inline).
571    #[cfg(feature = "acme")]
572    pub type CertPairReply = Result<(String, String), ts_control::CertError>;
573
574    // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
575    // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
576    // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
577    // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
578    // block keeps the default build clean.
579    #[cfg(feature = "acme")]
580    #[kameo::messages]
581    impl ControlRunner {
582        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
583        /// client-side ACME DNS-01 engine (`acme` feature).
584        ///
585        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
586        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
587        /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
588        /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
589        ///
590        /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
591        /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
592        /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
593        /// allows it). Persisting a generated key back into the key file is the embedder's job (no
594        /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
595        /// publish 501s.
596        #[message(ctx)]
597        pub fn get_certificate(
598            &self,
599            ctx: &mut Context<
600                Self,
601                DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
602            >,
603            name: String,
604        ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
605            let (deleg, replier) = ctx.reply_sender();
606
607            if let Some(replier) = replier {
608                let config = self.params.config.clone();
609                let keys = self.params.env.keys.clone();
610                tokio::spawn(async move {
611                    let result = issue_certificate(&config, &keys, &name).await;
612                    replier.send(result);
613                });
614            }
615
616            deleg
617        }
618
619        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` and return the
620        /// **PEM pair** — `(cert_chain_pem, key_pem)` — for writing the on-disk `.crt` + `.key`
621        /// (the daemon's `tnet cert`, Go's `LocalClient.CertPair`). `acme` feature.
622        ///
623        /// Identical issuance to [`get_certificate`](Self::get_certificate) (same client-side ACME
624        /// DNS-01 flow, same set-dns publish, same account-key handling), only the *shape* of the
625        /// result differs: this surfaces the raw chain + leaf-key PEMs instead of the opaque
626        /// [`CertifiedKey`](ts_control::tls::CertifiedKey). The leaf **private key** PEM is the
627        /// second tuple element and is NEVER logged — the spawned task sends it straight back to the
628        /// replier. SaaS-only: against a self-hosted control plane the set-dns publish 501s.
629        #[message(ctx)]
630        pub fn get_cert_pair(
631            &self,
632            ctx: &mut Context<Self, DelegatedReply<CertPairReply>>,
633            name: String,
634        ) -> DelegatedReply<CertPairReply> {
635            let (deleg, replier) = ctx.reply_sender();
636
637            if let Some(replier) = replier {
638                let config = self.params.config.clone();
639                let keys = self.params.env.keys.clone();
640                tokio::spawn(async move {
641                    let result = issue_cert_pair(&config, &keys, &name).await;
642                    replier.send(result);
643                });
644            }
645
646            deleg
647        }
648    }
649}
650
651/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
652/// returning just the ready-to-serve [`CertifiedKey`](ts_control::tls::CertifiedKey) (the
653/// `get_certificate` / `ListenTLS` path).
654///
655/// Thin wrapper over [`issue_cert_pair`] that drops the PEMs — one issuance, this caller just
656/// doesn't need the on-disk pair. See [`issue_cert_pair`] for the account-key handling.
657#[cfg(feature = "acme")]
658async fn issue_certificate(
659    config: &ts_control::Config,
660    keys: &ts_keys::NodeState,
661    name: &str,
662) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
663    issue_cert_pair_inner(config, keys, name)
664        .await
665        .map(|issued| issued.certified)
666}
667
668/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
669/// returning the **PEM pair** `(cert_chain_pem, key_pem)` for the daemon's on-disk `.crt`/`.key`
670/// (`tnet cert`, Go `LocalClient.CertPair`).
671///
672/// Same single issuance as [`issue_certificate`]; only the result shape differs. The leaf
673/// **private key** PEM is the second element and is NEVER logged here.
674#[cfg(feature = "acme")]
675async fn issue_cert_pair(
676    config: &ts_control::Config,
677    keys: &ts_keys::NodeState,
678    name: &str,
679) -> Result<(String, String), ts_control::CertError> {
680    issue_cert_pair_inner(config, keys, name)
681        .await
682        .map(|issued| (issued.cert_chain_pem, issued.key_pem))
683}
684
685/// Shared issuance core for [`issue_certificate`] and [`issue_cert_pair`]: load (or generate) the
686/// ACME account key, target Let's Encrypt production, and run one DNS-01 issuance, returning the
687/// full [`IssuedCert`](ts_control::acme::IssuedCert) so each caller projects out what it needs (one
688/// ACME order, two consumers).
689///
690/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
691/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
692/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
693/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]). Never logs the leaf
694/// private key.
695#[cfg(feature = "acme")]
696async fn issue_cert_pair_inner(
697    config: &ts_control::Config,
698    keys: &ts_keys::NodeState,
699    name: &str,
700) -> Result<ts_control::acme::IssuedCert, ts_control::CertError> {
701    let account_key = match keys.acme_account_key.as_deref() {
702        Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
703        None => {
704            tracing::debug!(
705                "no persisted ACME account key in key state; generating an ephemeral per-call key \
706                 (a new ACME account this issuance — not persisted back)"
707            );
708            ts_control::acme::AcmeAccountKey::generate()?.0
709        }
710    };
711    let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
712        .parse()
713        .map_err(|e| {
714            ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
715        })?;
716    ts_control::issue_cert_pair_via_setdns(config, keys, name, &account_key, &directory).await
717}
718
719impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
720    type Reply = ();
721
722    async fn handle(
723        &mut self,
724        msg: StreamMessage<Arc<StateUpdate>, (), ()>,
725        ctx: &mut Context<Self, Self::Reply>,
726    ) {
727        match msg {
728            StreamMessage::Started(_) => {
729                tracing::trace!("started listening to state updates");
730            }
731
732            StreamMessage::Next(msg) => {
733                if let Some(node) = msg.node.as_ref() {
734                    // Reflect node-key expiry into the device state: control delivering a self-node
735                    // whose key is in the past means the node must re-authenticate. Otherwise the
736                    // arrival of a fresh self-node confirms we are Running (recovers the state if a
737                    // prior update had flipped it to Expired).
738                    let now_unix = std::time::SystemTime::now()
739                        .duration_since(std::time::UNIX_EPOCH)
740                        .map(|d| d.as_secs() as i64)
741                        .unwrap_or(0);
742                    let next = if node.key_expired_at_unix(now_unix) {
743                        crate::DeviceState::Expired
744                    } else {
745                        crate::DeviceState::Running
746                    };
747                    // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
748                    // self-node arrives on every netmap update).
749                    self.params.state_tx.send_if_modified(|s| {
750                        if *s != next {
751                            *s = next.clone();
752                            true
753                        } else {
754                            false
755                        }
756                    });
757
758                    self.self_node.send_replace(Some(node.clone()));
759                }
760
761                if let Some(policy) = msg.ssh_policy.as_ref() {
762                    self.ssh_policy.send_replace(Some(policy.clone()));
763                }
764
765                if let Some(tka) = msg.tka.as_ref() {
766                    self.tka.send_replace(Some(tka.clone()));
767                    self.maybe_sync_tka(tka, ctx.actor_ref().clone());
768                }
769
770                // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
771                // An update with no DNS config, or one carrying no cert domains, means "none" — Go
772                // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
773                let cert_domains = msg
774                    .dns_config
775                    .as_ref()
776                    .map(|d| d.cert_domains.clone())
777                    .unwrap_or_default();
778                self.cert_domains.send_replace(cert_domains);
779
780                // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
781                // `None` when control sent no DNS config on this update — distinct from a present but
782                // empty config (Go `netmap.NetworkMap.DNS`).
783                self.dns_config.send_replace(msg.dns_config.clone());
784
785                // Track the interactive-login URL for `Device::pop_browser_url` /
786                // `Runtime::watch_ipn_bus`. See `sticky_update_pop_browser_url` for the Go-faithful
787                // sticky semantics (update only on a new non-empty URL; never reset to `None`).
788                sticky_update_pop_browser_url(&self.pop_browser_url, msg.pop_browser_url.as_ref());
789
790                if let Err(e) = self.params.env.publish(msg).await {
791                    tracing::error!(error = %e, "publishing netmap update");
792                }
793            }
794
795            StreamMessage::Finished(_) => {
796                tracing::error!("state update stream terminated")
797            }
798        }
799    }
800}
801
802/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
803/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
804/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
805/// [`ControlRunner::apply_tka_synced`](ControlRunner).
806#[doc(hidden)]
807pub struct TkaSynced {
808    pub(crate) result:
809        Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
810}
811
812impl Message<TkaSynced> for ControlRunner {
813    type Reply = ();
814
815    async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
816        self.apply_tka_synced(msg.result).await;
817    }
818}
819
820impl Message<DerpLatencyMeasurement> for ControlRunner {
821    type Reply = ();
822
823    async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
824        let measurements = msg.measurement.as_ref().clone();
825
826        // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
827        // the same measurements, before the home-region short-circuit below — an empty set still
828        // yields a (default/empty) report rather than a stale one.
829        self.netcheck
830            .send_replace(crate::status::NetcheckReport::from_region_results(
831                &measurements,
832            ));
833
834        let Some(result) = measurements.first() else {
835            tracing::debug!("derp latency measurements empty");
836            return;
837        };
838
839        let iter = measurements.iter().map(|result| {
840            (
841                result.latency_map_key.as_str(),
842                result.latency.as_secs_f64(),
843            )
844        });
845
846        tracing::debug!(selected_region_id = ?result.id, "updating home region");
847
848        self.client.set_home_region(result.id, iter).await;
849    }
850}
851
852impl Message<EndpointAdvertisement> for ControlRunner {
853    type Reply = ();
854
855    async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
856        let endpoints: Vec<Endpoint> = msg
857            .endpoints
858            .iter()
859            .map(|ep| Endpoint {
860                endpoint: ep.addr,
861                ty: match ep.ty {
862                    SelfEndpointType::Local => EndpointType::Local,
863                    SelfEndpointType::Stun => EndpointType::Stun,
864                    SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
865                },
866            })
867            .collect();
868
869        tracing::debug!(
870            n_endpoints = endpoints.len(),
871            "advertising endpoints to control"
872        );
873
874        self.client.set_endpoints(endpoints).await;
875    }
876}
877
878/// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control — the wire
879/// half of a runtime [`Runtime::set_advertise_routes`](crate::Runtime::set_advertise_routes). Sent
880/// as a direct `ask` from the runtime (not over the bus), so the route change reaches the live
881/// map-poll client. `routes` is the final advertised set the caller wants control to grant.
882#[derive(Debug)]
883pub struct SetAdvertiseRoutes {
884    /// The prefixes to advertise to control (already filtered to the final set).
885    pub routes: Vec<ipnet::IpNet>,
886}
887
888impl Message<SetAdvertiseRoutes> for ControlRunner {
889    type Reply = ();
890
891    async fn handle(&mut self, msg: SetAdvertiseRoutes, _ctx: &mut Context<Self, Self::Reply>) {
892        tracing::debug!(n_routes = msg.routes.len(), "advertising routes to control");
893        self.client.set_routable_ips(msg.routes).await;
894    }
895}
896
897/// Update this node's `Hostinfo.Hostname` at control — the wire half of a runtime
898/// [`Runtime::set_hostname`](crate::Runtime::set_hostname). A direct `ask` from the runtime, so the
899/// change reaches the live map-poll client.
900#[derive(Debug)]
901pub struct SetHostname {
902    /// The new hostname to report to control.
903    pub hostname: String,
904}
905
906impl Message<SetHostname> for ControlRunner {
907    type Reply = ();
908
909    async fn handle(&mut self, msg: SetHostname, _ctx: &mut Context<Self, Self::Reply>) {
910        tracing::debug!("updating hostname at control");
911        self.client.set_hostname(msg.hostname).await;
912    }
913}
914
915#[cfg(test)]
916mod sticky_pop_browser_url_tests {
917    use tokio::sync::watch;
918
919    use super::sticky_update_pop_browser_url;
920
921    fn url(s: &str) -> url::Url {
922        s.parse().unwrap()
923    }
924
925    /// A non-empty URL publishes to the cell.
926    #[test]
927    fn non_empty_url_publishes() {
928        let (tx, rx) = watch::channel(None);
929        let u = url("https://login.example/consent");
930        sticky_update_pop_browser_url(&tx, Some(&u));
931        assert_eq!(*rx.borrow(), Some(u));
932    }
933
934    /// An absent (`None`) update — the common netmap tick — must NOT reset the cell. This is the
935    /// regression guard for the thrash bug (a reset-every-tick would coalesce the URL away on the bus).
936    #[test]
937    fn absent_update_does_not_reset() {
938        let u = url("https://login.example/consent");
939        let (tx, rx) = watch::channel(Some(u.clone()));
940        // Simulate many empty netmap updates.
941        for _ in 0..5 {
942            sticky_update_pop_browser_url(&tx, None);
943        }
944        assert_eq!(
945            *rx.borrow(),
946            Some(u),
947            "empty updates must not clear the URL"
948        );
949    }
950
951    /// The same URL repeated does not re-fire the watch (in-place dedupe via `send_if_modified`), so
952    /// a subscriber isn't woken spuriously. Proven by the borrow not having been marked changed.
953    #[test]
954    fn repeated_same_url_does_not_refire() {
955        let u = url("https://login.example/consent");
956        let (tx, mut rx) = watch::channel(None);
957        sticky_update_pop_browser_url(&tx, Some(&u)); // first: fires
958        assert!(rx.has_changed().unwrap(), "first non-empty URL fires");
959        rx.mark_unchanged();
960        sticky_update_pop_browser_url(&tx, Some(&u)); // same: deduped
961        assert!(
962            !rx.has_changed().unwrap(),
963            "repeating the same URL must not re-fire the watch"
964        );
965    }
966
967    /// A genuinely new URL after a prior one fires again (sticky but tracks changes).
968    #[test]
969    fn new_url_after_prior_fires() {
970        let a = url("https://login.example/a");
971        let b = url("https://login.example/b");
972        let (tx, rx) = watch::channel(None);
973        sticky_update_pop_browser_url(&tx, Some(&a));
974        sticky_update_pop_browser_url(&tx, Some(&b));
975        assert_eq!(*rx.borrow(), Some(b));
976    }
977
978    /// The realistic session sequence: a URL stays sticky through a run of `None` ticks, and a
979    /// *different* URL after that gap still fires. Chains the legs the other tests cover in isolation
980    /// (the actual control cadence is "URL, then many empty updates, then maybe a new URL").
981    #[test]
982    fn sticky_through_none_gap_then_new_url_fires() {
983        let a = url("https://login.example/a");
984        let b = url("https://login.example/b");
985        let (tx, rx) = watch::channel(None);
986        sticky_update_pop_browser_url(&tx, Some(&a));
987        for _ in 0..3 {
988            sticky_update_pop_browser_url(&tx, None);
989        }
990        assert_eq!(*rx.borrow(), Some(a), "stayed sticky through the None gap");
991        sticky_update_pop_browser_url(&tx, Some(&b));
992        assert_eq!(
993            *rx.borrow(),
994            Some(b),
995            "a new URL after a None gap still fires"
996        );
997    }
998
999    /// Returning to a previously-seen URL (A → B → A) re-fires: the dedupe is against the cell's
1000    /// *current* value, not a full history, so A after B is a genuine change.
1001    #[test]
1002    fn returning_to_prior_url_refires() {
1003        let a = url("https://login.example/a");
1004        let b = url("https://login.example/b");
1005        let (tx, mut rx) = watch::channel(None);
1006        sticky_update_pop_browser_url(&tx, Some(&a));
1007        sticky_update_pop_browser_url(&tx, Some(&b));
1008        rx.mark_unchanged();
1009        sticky_update_pop_browser_url(&tx, Some(&a)); // back to A: differs from current (B) → fires
1010        assert!(
1011            rx.has_changed().unwrap(),
1012            "returning to a prior URL re-fires"
1013        );
1014        assert_eq!(*rx.borrow(), Some(a));
1015    }
1016
1017    /// End-to-end de-thrash: feed a realistic netmap cadence (empty, empty, URL, empty, empty)
1018    /// through the producer into a cell, and count the changes a `run_bus`-style subscriber would
1019    /// observe via `changed()`. The whole point of the fix is that exactly ONE change survives the
1020    /// surrounding `None` thrash — the pre-fix code (`send_replace` every tick) would have woken the
1021    /// subscriber on every empty tick and coalesced the URL away. This exercises the producer + the
1022    /// watch-subscribe path together (the two halves the unit tests cover in isolation).
1023    #[tokio::test]
1024    async fn end_to_end_one_change_survives_none_thrash() {
1025        let u = url("https://login.example/consent");
1026        let (tx, mut rx) = watch::channel(None);
1027        // The cadence control actually sends: mostly-empty MapResponses with one carrying the URL.
1028        let cadence = [None, None, Some(&u), None, None];
1029        for incoming in cadence {
1030            sticky_update_pop_browser_url(&tx, incoming);
1031        }
1032        // A subscriber sees exactly one change, and it carries the URL (not a coalesced `None`).
1033        let mut changes = 0;
1034        while rx.has_changed().unwrap() {
1035            let v = rx.borrow_and_update().clone();
1036            changes += 1;
1037            assert_eq!(v, Some(u.clone()), "the surviving change carries the URL");
1038        }
1039        assert_eq!(changes, 1, "exactly one change survives the None thrash");
1040    }
1041}