Skip to main content

ts_runtime/
control_runner.rs

1use core::{
2    net::{Ipv4Addr, Ipv6Addr},
3    time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9    actor::{ActorRef, Spawn},
10    message::{Context, StreamMessage},
11    prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15    AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16    Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21    derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22    direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29    client: AsyncControlClient,
30    params: Params,
31
32    self_node: watch::Sender<Option<Node>>,
33    /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34    /// server reads this to authorize incoming connections; absent policy means deny-all.
35    ssh_policy: watch::Sender<Option<SshPolicy>>,
36    /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37    tka: watch::Sender<Option<TkaStatus>>,
38    /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
39    /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
40    /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
41    /// the result returns via the [`TkaSynced`] self-message).
42    tka_synced: Option<crate::tka_sync::SyncedTka>,
43    /// Published copy of the synced TKA [`Authority`](ts_tka::Authority) for the verify-and-log
44    /// consumer. `None` until the first successful sync. Observe-only: a reader uses it to *log*
45    /// whether a peer's node-key signature verifies, never to drop a peer (enforcement is a separate
46    /// gated decision).
47    tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
48    /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
49    /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
50    tka_syncing: bool,
51    /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
52    /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
53    cert_domains: watch::Sender<Vec<String>>,
54    /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
55    /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
56    /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
57    /// own cell for the narrower TLS-cert use.
58    dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
59    /// Latest interactive-login / consent URL control asked this node to open
60    /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
61    /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user).
62    /// Replaced (not accumulated) on each update.
63    pop_browser_url: watch::Sender<Option<url::Url>>,
64    /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
65    /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
66    /// daemon's `tnet netcheck`). Empty until the first measurement.
67    netcheck: watch::Sender<crate::status::NetcheckReport>,
68}
69
70/// Control runner args.
71pub struct Params {
72    /// Control config.
73    pub(crate) config: ts_control::Config,
74
75    /// Auth key (if needed).
76    pub(crate) auth_key: Option<String>,
77
78    /// The [`crate::Env`] for this actor.
79    pub(crate) env: crate::Env,
80
81    /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
82    /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
83    /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
84    /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
85    pub(crate) state_tx: watch::Sender<crate::DeviceState>,
86}
87
88#[doc(hidden)]
89#[derive(Debug, thiserror::Error)]
90pub enum ControlRunnerError {
91    #[error(transparent)]
92    Control(#[from] ControlError),
93
94    #[error(transparent)]
95    Crate(#[from] crate::Error),
96}
97
98impl kameo::Actor for ControlRunner {
99    type Args = Params;
100    type Error = ControlRunnerError;
101
102    async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
103        loop {
104            match AsyncControlClient::check_auth(
105                &params.config,
106                &params.env.keys,
107                params.auth_key.as_deref(),
108            )
109            .await
110            {
111                Ok(()) => break,
112                Err(ControlError::MachineNotAuthorized(u)) => {
113                    tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
114                    // Surface "interactive login required" so a watcher / `wait_until_running` can
115                    // tell the user to authorize, instead of seeing an opaque timeout. Registration
116                    // keeps retrying (transient), so this is not a terminal `Failed`.
117                    params
118                        .state_tx
119                        .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
120                    tokio::time::sleep(Duration::from_secs(5)).await;
121                }
122                Err(e) => {
123                    // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
124                    // specific reason control gave AND publish it as a typed `Failed` state so
125                    // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
126                    // of the opaque `Internal(Actor)` the caller would otherwise see once the
127                    // stopped actor is next asked. Publishing before `return Err` is why the state
128                    // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
129                    let reason = crate::RegistrationError::from(&e);
130                    tracing::error!(error = %e, "registration failed; control runner stopping");
131                    params
132                        .state_tx
133                        .send_replace(crate::DeviceState::Failed(reason));
134                    return Err(e.into());
135                }
136            }
137        }
138        // check_auth succeeded, but the node is not "up" until the netmap stream is actually
139        // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
140        // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
141        // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
142        // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
143        let bring_up = async {
144            let (client, stream) = AsyncControlClient::connect(
145                &params.config,
146                &params.env.keys,
147                params.auth_key.as_deref(),
148            )
149            .await?;
150
151            DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
152
153            params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
154            params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
155            slf.attach_stream(stream.boxed(), (), ());
156            Ok::<_, ControlRunnerError>(client)
157        };
158
159        let client = match bring_up.await {
160            Ok(client) => client,
161            Err(e) => {
162                tracing::error!(error = %e, "bringing up the control session failed");
163                // The control session never came up; surface it as a transient registration
164                // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
165                // stuck at `Connecting`.
166                params.state_tx.send_replace(crate::DeviceState::Failed(
167                    crate::RegistrationError::NetworkUnreachable,
168                ));
169                return Err(e);
170            }
171        };
172
173        // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
174        // current (and flips to `Expired` if the self-node's key lapses).
175        params.state_tx.send_replace(crate::DeviceState::Running);
176
177        Ok(Self {
178            client,
179            params,
180            self_node: Default::default(),
181            ssh_policy: Default::default(),
182            tka: Default::default(),
183            tka_synced: None,
184            tka_authority: Default::default(),
185            tka_syncing: false,
186            cert_domains: Default::default(),
187            dns_config: Default::default(),
188            pop_browser_url: Default::default(),
189            netcheck: Default::default(),
190        })
191    }
192}
193
194impl ControlRunner {
195    /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
196    /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
197    /// round-trip). The result returns via the [`TkaSynced`] self-message.
198    ///
199    /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
200    /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
201    /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
202    /// `tkaSyncIfNeeded`: a no-op when our head already matches.
203    fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
204        if !tka.is_enabled() {
205            // Lock disabled (or never enabled): drop any synced state and stop publishing an
206            // Authority. Never an error; peers are unaffected.
207            if self.tka_synced.is_some() {
208                self.tka_synced = None;
209                self.tka_authority.send_replace(None);
210            }
211            return;
212        }
213        if self.tka_syncing {
214            return; // a sync is already in flight; the next netmap will re-trigger if still stale
215        }
216        // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
217        // do. A malformed control head is treated as "different" (we'll attempt a sync, which
218        // fail-closes harmlessly).
219        if let Some(synced) = &self.tka_synced
220            && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
221            && synced.authority.head_matches(&control_head)
222        {
223            return;
224        }
225
226        // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
227        // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
228        // `tka_syncing` so we don't spawn a second concurrent sync.
229        self.tka_syncing = true;
230        let current = self.tka_synced.take();
231        let config = self.params.config.clone();
232        let keys = self.params.env.keys.clone();
233        tokio::spawn(async move {
234            let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
235            // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
236            // not allowed). A send failure just means the actor is gone — nothing to do.
237            if let Err(e) = self_ref.tell(TkaSynced { result }).await {
238                tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
239            }
240        });
241    }
242
243    /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
244    /// state + publish the `Authority` (or, on inert/failed sync, leave peers unaffected). Always
245    /// clears the in-flight guard.
246    async fn apply_tka_synced(
247        &mut self,
248        result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
249    ) {
250        self.tka_syncing = false;
251        match result {
252            Ok(Some(synced)) => {
253                tracing::info!(
254                    head = %synced.authority.head().to_base32(),
255                    "TKA sync succeeded; publishing verified Authority (observe-only)"
256                );
257                self.tka_authority
258                    .send_replace(Some(synced.authority.clone()));
259                // Deliver the verified Authority to the peer tracker's observe-only verify-and-log
260                // seam (#136) over the bus. Re-published on every successful sync (no bus replay).
261                if let Err(e) = self
262                    .params
263                    .env
264                    .publish(crate::peer_tracker::TkaAuthorityUpdate(
265                        synced.authority.clone(),
266                    ))
267                    .await
268                {
269                    tracing::warn!(error = %e, "publishing TKA authority to peer tracker failed");
270                }
271                self.tka_synced = Some(synced);
272            }
273            Ok(None) => {
274                // Control has no lock for us (no genesis / disabled): stay inert. Not an error.
275                tracing::debug!("TKA sync: control reported no lock for this node (inert)");
276            }
277            Err(e) => {
278                // Transport or verify failure: log and stay inert. NEVER errors the netmap or drops a
279                // peer. The next netmap update re-triggers a sync attempt.
280                tracing::warn!(error = %e, "TKA sync failed; staying inert (no peer impact)");
281            }
282        }
283    }
284
285    fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
286    where
287        F: FnOnce(&Node) -> R,
288    {
289        let mut sub = self.self_node.subscribe();
290        let mut shutdown = self.params.env.shutdown.clone();
291
292        async move {
293            tokio::select! {
294                _ = shutdown.wait_for(|x| *x) => {
295                    None
296                },
297                node = sub.wait_for(Option::is_some) => {
298                    Some(f(node.ok()?.as_ref()?))
299                },
300            }
301        }
302    }
303}
304
305// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
306// those generated fields carry no doc and can't take attributes, so wrap in a module where
307// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
308// are re-exported so callers keep referencing them at `control_runner::<Name>`.
309pub use msg_impl::*;
310
311#[allow(missing_docs)]
312mod msg_impl {
313    use kameo::{message::Context, reply::DelegatedReply};
314
315    use super::*;
316
317    #[kameo::messages]
318    impl ControlRunner {
319        /// Fetch the IPv4 address for this tailscale device.
320        #[message(ctx)]
321        pub fn ipv4(
322            &self,
323            ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
324        ) -> DelegatedReply<Option<Ipv4Addr>> {
325            let (deleg, replier) = ctx.reply_sender();
326
327            if let Some(replier) = replier {
328                let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
329
330                tokio::spawn(async move {
331                    let ip = fut.await;
332                    replier.send(ip);
333                });
334            }
335
336            deleg
337        }
338
339        /// Fetch the IPv6 address for this tailscale device.
340        #[message(ctx)]
341        pub fn ipv6(
342            &self,
343            ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
344        ) -> DelegatedReply<Option<Ipv6Addr>> {
345            let (deleg, replier) = ctx.reply_sender();
346
347            if let Some(replier) = replier {
348                let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
349
350                tokio::spawn(async move {
351                    let ip = fut.await;
352                    replier.send(ip);
353                });
354            }
355
356            deleg
357        }
358
359        /// Fetch the self node for this tailscale device.
360        #[message(ctx)]
361        pub fn self_node(
362            &self,
363            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
364        ) -> DelegatedReply<Option<Node>> {
365            let (deleg, replier) = ctx.reply_sender();
366
367            if let Some(replier) = replier {
368                let node = self.with_self_node(|node| node.clone());
369
370                tokio::spawn(async move {
371                    let node = node.await;
372                    replier.send(node)
373                });
374            }
375
376            deleg
377        }
378
379        /// Fetch the current Tailscale SSH policy, if control has pushed one.
380        ///
381        /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
382        /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
383        /// for a value: an absent policy is a legitimate, immediate answer.
384        #[message]
385        pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
386            self.ssh_policy.borrow().clone()
387        }
388
389        /// Fetch the current Tailnet Lock status, if control has pushed one.
390        ///
391        /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
392        #[message]
393        pub fn current_tka_status(&self) -> Option<TkaStatus> {
394            self.tka.borrow().clone()
395        }
396
397        /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
398        ///
399        /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
400        /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
401        /// does not block waiting for a value).
402        #[message]
403        pub fn cert_domains(&self) -> Vec<String> {
404            self.cert_domains.borrow().clone()
405        }
406
407        /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
408        /// control has sent no DNS config yet. An immediate answer (does not block); the facade
409        /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
410        #[message]
411        pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
412            self.dns_config.borrow().clone()
413        }
414
415        /// The interactive-login / consent URL control last asked this node to open
416        /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
417        /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
418        #[message]
419        pub fn pop_browser_url(&self) -> Option<url::Url> {
420            self.pop_browser_url.borrow().clone()
421        }
422
423        /// The latest network-conditions report (preferred DERP region + per-region latencies). An
424        /// immediate answer (does not block); empty before the first DERP-latency measurement. The
425        /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
426        #[message]
427        pub fn netcheck(&self) -> crate::status::NetcheckReport {
428            self.netcheck.borrow().clone()
429        }
430
431        /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
432        ///
433        /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
434        /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
435        /// for the round-trip.
436        #[message(ctx)]
437        pub fn fetch_id_token(
438            &self,
439            ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
440            audience: String,
441        ) -> DelegatedReply<Result<String, IdTokenError>> {
442            let (deleg, replier) = ctx.reply_sender();
443
444            if let Some(replier) = replier {
445                let config = self.params.config.clone();
446                let keys = self.params.env.keys.clone();
447                tokio::spawn(async move {
448                    let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
449                    replier.send(result);
450                });
451            }
452
453            deleg
454        }
455
456        /// Log this node out of the tailnet: deregister it by expiring its current node key.
457        ///
458        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
459        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
460        /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
461        /// control-plane state change only — it does NOT stop this actor or tear down the datapath
462        /// (the caller follows up with the normal runtime shutdown), and it does not touch the
463        /// on-disk node key, so re-registering with the same key is the re-login path.
464        #[message(ctx)]
465        pub fn logout(
466            &self,
467            ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
468        ) -> DelegatedReply<Result<(), LogoutError>> {
469            let (deleg, replier) = ctx.reply_sender();
470
471            if let Some(replier) = replier {
472                let config = self.params.config.clone();
473                let keys = self.params.env.keys.clone();
474                tokio::spawn(async move {
475                    let result = ts_control::logout(&config, &keys).await;
476                    replier.send(result);
477                });
478            }
479
480            deleg
481        }
482
483        /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
484        /// `LocalClient.SetDNS`).
485        ///
486        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
487        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
488        /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
489        /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
490        /// match, so the surfaced API takes only `name` + `value`.
491        #[message(ctx)]
492        pub fn set_dns(
493            &self,
494            ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
495            name: String,
496            value: String,
497        ) -> DelegatedReply<Result<(), SetDnsError>> {
498            let (deleg, replier) = ctx.reply_sender();
499
500            if let Some(replier) = replier {
501                let config = self.params.config.clone();
502                let keys = self.params.env.keys.clone();
503                tokio::spawn(async move {
504                    let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
505                    replier.send(result);
506                });
507            }
508
509            deleg
510        }
511    }
512
513    // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
514    // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
515    // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
516    // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
517    // block keeps the default build clean.
518    #[cfg(feature = "acme")]
519    #[kameo::messages]
520    impl ControlRunner {
521        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
522        /// client-side ACME DNS-01 engine (`acme` feature).
523        ///
524        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
525        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
526        /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
527        /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
528        ///
529        /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
530        /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
531        /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
532        /// allows it). Persisting a generated key back into the key file is the embedder's job (no
533        /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
534        /// publish 501s.
535        #[message(ctx)]
536        pub fn get_certificate(
537            &self,
538            ctx: &mut Context<
539                Self,
540                DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
541            >,
542            name: String,
543        ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
544            let (deleg, replier) = ctx.reply_sender();
545
546            if let Some(replier) = replier {
547                let config = self.params.config.clone();
548                let keys = self.params.env.keys.clone();
549                tokio::spawn(async move {
550                    let result = issue_certificate(&config, &keys, &name).await;
551                    replier.send(result);
552                });
553            }
554
555            deleg
556        }
557    }
558}
559
560/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01.
561///
562/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
563/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
564/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
565/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]).
566#[cfg(feature = "acme")]
567async fn issue_certificate(
568    config: &ts_control::Config,
569    keys: &ts_keys::NodeState,
570    name: &str,
571) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
572    let account_key = match keys.acme_account_key.as_deref() {
573        Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
574        None => {
575            tracing::debug!(
576                "no persisted ACME account key in key state; generating an ephemeral per-call key \
577                 (a new ACME account this issuance — not persisted back)"
578            );
579            ts_control::acme::AcmeAccountKey::generate()?.0
580        }
581    };
582    let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
583        .parse()
584        .map_err(|e| {
585            ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
586        })?;
587    ts_control::issue_certificate_via_setdns(config, keys, name, &account_key, &directory).await
588}
589
590impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
591    type Reply = ();
592
593    async fn handle(
594        &mut self,
595        msg: StreamMessage<Arc<StateUpdate>, (), ()>,
596        ctx: &mut Context<Self, Self::Reply>,
597    ) {
598        match msg {
599            StreamMessage::Started(_) => {
600                tracing::trace!("started listening to state updates");
601            }
602
603            StreamMessage::Next(msg) => {
604                if let Some(node) = msg.node.as_ref() {
605                    // Reflect node-key expiry into the device state: control delivering a self-node
606                    // whose key is in the past means the node must re-authenticate. Otherwise the
607                    // arrival of a fresh self-node confirms we are Running (recovers the state if a
608                    // prior update had flipped it to Expired).
609                    let now_unix = std::time::SystemTime::now()
610                        .duration_since(std::time::UNIX_EPOCH)
611                        .map(|d| d.as_secs() as i64)
612                        .unwrap_or(0);
613                    let next = if node.key_expired_at_unix(now_unix) {
614                        crate::DeviceState::Expired
615                    } else {
616                        crate::DeviceState::Running
617                    };
618                    // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
619                    // self-node arrives on every netmap update).
620                    self.params.state_tx.send_if_modified(|s| {
621                        if *s != next {
622                            *s = next.clone();
623                            true
624                        } else {
625                            false
626                        }
627                    });
628
629                    self.self_node.send_replace(Some(node.clone()));
630                }
631
632                if let Some(policy) = msg.ssh_policy.as_ref() {
633                    self.ssh_policy.send_replace(Some(policy.clone()));
634                }
635
636                if let Some(tka) = msg.tka.as_ref() {
637                    self.tka.send_replace(Some(tka.clone()));
638                    self.maybe_sync_tka(tka, ctx.actor_ref().clone());
639                }
640
641                // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
642                // An update with no DNS config, or one carrying no cert domains, means "none" — Go
643                // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
644                let cert_domains = msg
645                    .dns_config
646                    .as_ref()
647                    .map(|d| d.cert_domains.clone())
648                    .unwrap_or_default();
649                self.cert_domains.send_replace(cert_domains);
650
651                // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
652                // `None` when control sent no DNS config on this update — distinct from a present but
653                // empty config (Go `netmap.NetworkMap.DNS`).
654                self.dns_config.send_replace(msg.dns_config.clone());
655
656                // Track the interactive-login URL for `Device::pop_browser_url`. `None` on updates
657                // that carry none — control sends it only when it wants a browser opened
658                // (`MapResponse.PopBrowserURL`); replace rather than accumulate.
659                self.pop_browser_url
660                    .send_replace(msg.pop_browser_url.clone());
661
662                if let Err(e) = self.params.env.publish(msg).await {
663                    tracing::error!(error = %e, "publishing netmap update");
664                }
665            }
666
667            StreamMessage::Finished(_) => {
668                tracing::error!("state update stream terminated")
669            }
670        }
671    }
672}
673
674/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
675/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
676/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
677/// [`ControlRunner::apply_tka_synced`](ControlRunner).
678#[doc(hidden)]
679pub struct TkaSynced {
680    pub(crate) result:
681        Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
682}
683
684impl Message<TkaSynced> for ControlRunner {
685    type Reply = ();
686
687    async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
688        self.apply_tka_synced(msg.result).await;
689    }
690}
691
692impl Message<DerpLatencyMeasurement> for ControlRunner {
693    type Reply = ();
694
695    async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
696        let measurements = msg.measurement.as_ref().clone();
697
698        // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
699        // the same measurements, before the home-region short-circuit below — an empty set still
700        // yields a (default/empty) report rather than a stale one.
701        self.netcheck
702            .send_replace(crate::status::NetcheckReport::from_region_results(
703                &measurements,
704            ));
705
706        let Some(result) = measurements.first() else {
707            tracing::debug!("derp latency measurements empty");
708            return;
709        };
710
711        let iter = measurements.iter().map(|result| {
712            (
713                result.latency_map_key.as_str(),
714                result.latency.as_secs_f64(),
715            )
716        });
717
718        tracing::debug!(selected_region_id = ?result.id, "updating home region");
719
720        self.client.set_home_region(result.id, iter).await;
721    }
722}
723
724impl Message<EndpointAdvertisement> for ControlRunner {
725    type Reply = ();
726
727    async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
728        let endpoints: Vec<Endpoint> = msg
729            .endpoints
730            .iter()
731            .map(|ep| Endpoint {
732                endpoint: ep.addr,
733                ty: match ep.ty {
734                    SelfEndpointType::Local => EndpointType::Local,
735                    SelfEndpointType::Stun => EndpointType::Stun,
736                    SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
737                },
738            })
739            .collect();
740
741        tracing::debug!(
742            n_endpoints = endpoints.len(),
743            "advertising endpoints to control"
744        );
745
746        self.client.set_endpoints(endpoints).await;
747    }
748}