ts_runtime/control_runner.rs
1use core::{
2 net::{Ipv4Addr, Ipv6Addr},
3 time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9 actor::{ActorRef, Spawn},
10 message::{Context, StreamMessage},
11 prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15 AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16 Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21 derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22 direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29 client: AsyncControlClient,
30 params: Params,
31
32 self_node: watch::Sender<Option<Node>>,
33 /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34 /// server reads this to authorize incoming connections; absent policy means deny-all.
35 ssh_policy: watch::Sender<Option<SshPolicy>>,
36 /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37 tka: watch::Sender<Option<TkaStatus>>,
38 /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
39 /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
40 /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
41 /// the result returns via the [`TkaSynced`] self-message).
42 tka_synced: Option<crate::tka_sync::SyncedTka>,
43 /// Published copy of the synced TKA [`Authority`](ts_tka::Authority) for the verify-and-log
44 /// consumer. `None` until the first successful sync. Observe-only: a reader uses it to *log*
45 /// whether a peer's node-key signature verifies, never to drop a peer (enforcement is a separate
46 /// gated decision).
47 tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
48 /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
49 /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
50 tka_syncing: bool,
51 /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
52 /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
53 cert_domains: watch::Sender<Vec<String>>,
54 /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
55 /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
56 /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
57 /// own cell for the narrower TLS-cert use.
58 dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
59 /// Latest interactive-login / consent URL control asked this node to open
60 /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
61 /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user).
62 /// Replaced (not accumulated) on each update.
63 pop_browser_url: watch::Sender<Option<url::Url>>,
64 /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
65 /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
66 /// daemon's `tnet netcheck`). Empty until the first measurement.
67 netcheck: watch::Sender<crate::status::NetcheckReport>,
68}
69
70/// Control runner args.
71pub struct Params {
72 /// Control config.
73 pub(crate) config: ts_control::Config,
74
75 /// Auth key (if needed).
76 pub(crate) auth_key: Option<String>,
77
78 /// The [`crate::Env`] for this actor.
79 pub(crate) env: crate::Env,
80
81 /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
82 /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
83 /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
84 /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
85 pub(crate) state_tx: watch::Sender<crate::DeviceState>,
86}
87
88#[doc(hidden)]
89#[derive(Debug, thiserror::Error)]
90pub enum ControlRunnerError {
91 #[error(transparent)]
92 Control(#[from] ControlError),
93
94 #[error(transparent)]
95 Crate(#[from] crate::Error),
96}
97
98impl kameo::Actor for ControlRunner {
99 type Args = Params;
100 type Error = ControlRunnerError;
101
102 async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
103 loop {
104 match AsyncControlClient::check_auth(
105 ¶ms.config,
106 ¶ms.env.keys,
107 params.auth_key.as_deref(),
108 )
109 .await
110 {
111 Ok(()) => break,
112 Err(ControlError::MachineNotAuthorized(u)) => {
113 tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
114 // Surface "interactive login required" so a watcher / `wait_until_running` can
115 // tell the user to authorize, instead of seeing an opaque timeout. Registration
116 // keeps retrying (transient), so this is not a terminal `Failed`.
117 params
118 .state_tx
119 .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
120 tokio::time::sleep(Duration::from_secs(5)).await;
121 }
122 Err(e) => {
123 // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
124 // specific reason control gave AND publish it as a typed `Failed` state so
125 // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
126 // of the opaque `Internal(Actor)` the caller would otherwise see once the
127 // stopped actor is next asked. Publishing before `return Err` is why the state
128 // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
129 let reason = crate::RegistrationError::from(&e);
130 tracing::error!(error = %e, "registration failed; control runner stopping");
131 params
132 .state_tx
133 .send_replace(crate::DeviceState::Failed(reason));
134 return Err(e.into());
135 }
136 }
137 }
138 // check_auth succeeded, but the node is not "up" until the netmap stream is actually
139 // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
140 // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
141 // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
142 // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
143 let bring_up = async {
144 let (client, stream) = AsyncControlClient::connect(
145 ¶ms.config,
146 ¶ms.env.keys,
147 params.auth_key.as_deref(),
148 )
149 .await?;
150
151 DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
152
153 params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
154 params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
155 slf.attach_stream(stream.boxed(), (), ());
156 Ok::<_, ControlRunnerError>(client)
157 };
158
159 let client = match bring_up.await {
160 Ok(client) => client,
161 Err(e) => {
162 tracing::error!(error = %e, "bringing up the control session failed");
163 // The control session never came up; surface it as a transient registration
164 // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
165 // stuck at `Connecting`.
166 params.state_tx.send_replace(crate::DeviceState::Failed(
167 crate::RegistrationError::NetworkUnreachable,
168 ));
169 return Err(e);
170 }
171 };
172
173 // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
174 // current (and flips to `Expired` if the self-node's key lapses).
175 params.state_tx.send_replace(crate::DeviceState::Running);
176
177 Ok(Self {
178 client,
179 params,
180 self_node: Default::default(),
181 ssh_policy: Default::default(),
182 tka: Default::default(),
183 tka_synced: None,
184 tka_authority: Default::default(),
185 tka_syncing: false,
186 cert_domains: Default::default(),
187 dns_config: Default::default(),
188 pop_browser_url: Default::default(),
189 netcheck: Default::default(),
190 })
191 }
192}
193
194impl ControlRunner {
195 /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
196 /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
197 /// round-trip). The result returns via the [`TkaSynced`] self-message.
198 ///
199 /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
200 /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
201 /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
202 /// `tkaSyncIfNeeded`: a no-op when our head already matches.
203 fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
204 if !tka.is_enabled() {
205 // Lock disabled (or never enabled): drop any synced state and stop publishing an
206 // Authority. Never an error; peers are unaffected.
207 if self.tka_synced.is_some() {
208 self.tka_synced = None;
209 self.tka_authority.send_replace(None);
210 }
211 return;
212 }
213 if self.tka_syncing {
214 return; // a sync is already in flight; the next netmap will re-trigger if still stale
215 }
216 // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
217 // do. A malformed control head is treated as "different" (we'll attempt a sync, which
218 // fail-closes harmlessly).
219 if let Some(synced) = &self.tka_synced
220 && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
221 && synced.authority.head_matches(&control_head)
222 {
223 return;
224 }
225
226 // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
227 // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
228 // `tka_syncing` so we don't spawn a second concurrent sync.
229 self.tka_syncing = true;
230 let current = self.tka_synced.take();
231 let config = self.params.config.clone();
232 let keys = self.params.env.keys.clone();
233 tokio::spawn(async move {
234 let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
235 // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
236 // not allowed). A send failure just means the actor is gone — nothing to do.
237 if let Err(e) = self_ref.tell(TkaSynced { result }).await {
238 tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
239 }
240 });
241 }
242
243 /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
244 /// state + publish the `Authority` (or, on inert/failed sync, leave peers unaffected). Always
245 /// clears the in-flight guard.
246 async fn apply_tka_synced(
247 &mut self,
248 result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
249 ) {
250 self.tka_syncing = false;
251 match result {
252 Ok(Some(synced)) => {
253 tracing::info!(
254 head = %synced.authority.head().to_base32(),
255 "TKA sync succeeded; publishing verified Authority (observe-only)"
256 );
257 self.tka_authority
258 .send_replace(Some(synced.authority.clone()));
259 // Deliver the verified Authority to the peer tracker's observe-only verify-and-log
260 // seam (#136) over the bus. Re-published on every successful sync (no bus replay).
261 if let Err(e) = self
262 .params
263 .env
264 .publish(crate::peer_tracker::TkaAuthorityUpdate(
265 synced.authority.clone(),
266 ))
267 .await
268 {
269 tracing::warn!(error = %e, "publishing TKA authority to peer tracker failed");
270 }
271 self.tka_synced = Some(synced);
272 }
273 Ok(None) => {
274 // Control has no lock for us (no genesis / disabled): stay inert. Not an error.
275 tracing::debug!("TKA sync: control reported no lock for this node (inert)");
276 }
277 Err(e) => {
278 // Transport or verify failure: log and stay inert. NEVER errors the netmap or drops a
279 // peer. The next netmap update re-triggers a sync attempt.
280 tracing::warn!(error = %e, "TKA sync failed; staying inert (no peer impact)");
281 }
282 }
283 }
284
285 fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
286 where
287 F: FnOnce(&Node) -> R,
288 {
289 let mut sub = self.self_node.subscribe();
290 let mut shutdown = self.params.env.shutdown.clone();
291
292 async move {
293 tokio::select! {
294 _ = shutdown.wait_for(|x| *x) => {
295 None
296 },
297 node = sub.wait_for(Option::is_some) => {
298 Some(f(node.ok()?.as_ref()?))
299 },
300 }
301 }
302 }
303}
304
305// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
306// those generated fields carry no doc and can't take attributes, so wrap in a module where
307// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
308// are re-exported so callers keep referencing them at `control_runner::<Name>`.
309pub use msg_impl::*;
310
311#[allow(missing_docs)]
312mod msg_impl {
313 use kameo::{message::Context, reply::DelegatedReply};
314
315 use super::*;
316
317 #[kameo::messages]
318 impl ControlRunner {
319 /// Fetch the IPv4 address for this tailscale device.
320 #[message(ctx)]
321 pub fn ipv4(
322 &self,
323 ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
324 ) -> DelegatedReply<Option<Ipv4Addr>> {
325 let (deleg, replier) = ctx.reply_sender();
326
327 if let Some(replier) = replier {
328 let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
329
330 tokio::spawn(async move {
331 let ip = fut.await;
332 replier.send(ip);
333 });
334 }
335
336 deleg
337 }
338
339 /// Fetch the IPv6 address for this tailscale device.
340 #[message(ctx)]
341 pub fn ipv6(
342 &self,
343 ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
344 ) -> DelegatedReply<Option<Ipv6Addr>> {
345 let (deleg, replier) = ctx.reply_sender();
346
347 if let Some(replier) = replier {
348 let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
349
350 tokio::spawn(async move {
351 let ip = fut.await;
352 replier.send(ip);
353 });
354 }
355
356 deleg
357 }
358
359 /// Fetch the self node for this tailscale device.
360 #[message(ctx)]
361 pub fn self_node(
362 &self,
363 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
364 ) -> DelegatedReply<Option<Node>> {
365 let (deleg, replier) = ctx.reply_sender();
366
367 if let Some(replier) = replier {
368 let node = self.with_self_node(|node| node.clone());
369
370 tokio::spawn(async move {
371 let node = node.await;
372 replier.send(node)
373 });
374 }
375
376 deleg
377 }
378
379 /// Fetch the current Tailscale SSH policy, if control has pushed one.
380 ///
381 /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
382 /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
383 /// for a value: an absent policy is a legitimate, immediate answer.
384 #[message]
385 pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
386 self.ssh_policy.borrow().clone()
387 }
388
389 /// Fetch the current Tailnet Lock status, if control has pushed one.
390 ///
391 /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
392 #[message]
393 pub fn current_tka_status(&self) -> Option<TkaStatus> {
394 self.tka.borrow().clone()
395 }
396
397 /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
398 ///
399 /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
400 /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
401 /// does not block waiting for a value).
402 #[message]
403 pub fn cert_domains(&self) -> Vec<String> {
404 self.cert_domains.borrow().clone()
405 }
406
407 /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
408 /// control has sent no DNS config yet. An immediate answer (does not block); the facade
409 /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
410 #[message]
411 pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
412 self.dns_config.borrow().clone()
413 }
414
415 /// The interactive-login / consent URL control last asked this node to open
416 /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
417 /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
418 #[message]
419 pub fn pop_browser_url(&self) -> Option<url::Url> {
420 self.pop_browser_url.borrow().clone()
421 }
422
423 /// The latest network-conditions report (preferred DERP region + per-region latencies). An
424 /// immediate answer (does not block); empty before the first DERP-latency measurement. The
425 /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
426 #[message]
427 pub fn netcheck(&self) -> crate::status::NetcheckReport {
428 self.netcheck.borrow().clone()
429 }
430
431 /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
432 ///
433 /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
434 /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
435 /// for the round-trip.
436 #[message(ctx)]
437 pub fn fetch_id_token(
438 &self,
439 ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
440 audience: String,
441 ) -> DelegatedReply<Result<String, IdTokenError>> {
442 let (deleg, replier) = ctx.reply_sender();
443
444 if let Some(replier) = replier {
445 let config = self.params.config.clone();
446 let keys = self.params.env.keys.clone();
447 tokio::spawn(async move {
448 let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
449 replier.send(result);
450 });
451 }
452
453 deleg
454 }
455
456 /// Log this node out of the tailnet: deregister it by expiring its current node key.
457 ///
458 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
459 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
460 /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
461 /// control-plane state change only — it does NOT stop this actor or tear down the datapath
462 /// (the caller follows up with the normal runtime shutdown), and it does not touch the
463 /// on-disk node key, so re-registering with the same key is the re-login path.
464 #[message(ctx)]
465 pub fn logout(
466 &self,
467 ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
468 ) -> DelegatedReply<Result<(), LogoutError>> {
469 let (deleg, replier) = ctx.reply_sender();
470
471 if let Some(replier) = replier {
472 let config = self.params.config.clone();
473 let keys = self.params.env.keys.clone();
474 tokio::spawn(async move {
475 let result = ts_control::logout(&config, &keys).await;
476 replier.send(result);
477 });
478 }
479
480 deleg
481 }
482
483 /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
484 /// `LocalClient.SetDNS`).
485 ///
486 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
487 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
488 /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
489 /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
490 /// match, so the surfaced API takes only `name` + `value`.
491 #[message(ctx)]
492 pub fn set_dns(
493 &self,
494 ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
495 name: String,
496 value: String,
497 ) -> DelegatedReply<Result<(), SetDnsError>> {
498 let (deleg, replier) = ctx.reply_sender();
499
500 if let Some(replier) = replier {
501 let config = self.params.config.clone();
502 let keys = self.params.env.keys.clone();
503 tokio::spawn(async move {
504 let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
505 replier.send(result);
506 });
507 }
508
509 deleg
510 }
511 }
512
513 // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
514 // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
515 // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
516 // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
517 // block keeps the default build clean.
518 #[cfg(feature = "acme")]
519 #[kameo::messages]
520 impl ControlRunner {
521 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
522 /// client-side ACME DNS-01 engine (`acme` feature).
523 ///
524 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
525 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
526 /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
527 /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
528 ///
529 /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
530 /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
531 /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
532 /// allows it). Persisting a generated key back into the key file is the embedder's job (no
533 /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
534 /// publish 501s.
535 #[message(ctx)]
536 pub fn get_certificate(
537 &self,
538 ctx: &mut Context<
539 Self,
540 DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
541 >,
542 name: String,
543 ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
544 let (deleg, replier) = ctx.reply_sender();
545
546 if let Some(replier) = replier {
547 let config = self.params.config.clone();
548 let keys = self.params.env.keys.clone();
549 tokio::spawn(async move {
550 let result = issue_certificate(&config, &keys, &name).await;
551 replier.send(result);
552 });
553 }
554
555 deleg
556 }
557 }
558}
559
560/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01.
561///
562/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
563/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
564/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
565/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]).
566#[cfg(feature = "acme")]
567async fn issue_certificate(
568 config: &ts_control::Config,
569 keys: &ts_keys::NodeState,
570 name: &str,
571) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
572 let account_key = match keys.acme_account_key.as_deref() {
573 Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
574 None => {
575 tracing::debug!(
576 "no persisted ACME account key in key state; generating an ephemeral per-call key \
577 (a new ACME account this issuance — not persisted back)"
578 );
579 ts_control::acme::AcmeAccountKey::generate()?.0
580 }
581 };
582 let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
583 .parse()
584 .map_err(|e| {
585 ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
586 })?;
587 ts_control::issue_certificate_via_setdns(config, keys, name, &account_key, &directory).await
588}
589
590impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
591 type Reply = ();
592
593 async fn handle(
594 &mut self,
595 msg: StreamMessage<Arc<StateUpdate>, (), ()>,
596 ctx: &mut Context<Self, Self::Reply>,
597 ) {
598 match msg {
599 StreamMessage::Started(_) => {
600 tracing::trace!("started listening to state updates");
601 }
602
603 StreamMessage::Next(msg) => {
604 if let Some(node) = msg.node.as_ref() {
605 // Reflect node-key expiry into the device state: control delivering a self-node
606 // whose key is in the past means the node must re-authenticate. Otherwise the
607 // arrival of a fresh self-node confirms we are Running (recovers the state if a
608 // prior update had flipped it to Expired).
609 let now_unix = std::time::SystemTime::now()
610 .duration_since(std::time::UNIX_EPOCH)
611 .map(|d| d.as_secs() as i64)
612 .unwrap_or(0);
613 let next = if node.key_expired_at_unix(now_unix) {
614 crate::DeviceState::Expired
615 } else {
616 crate::DeviceState::Running
617 };
618 // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
619 // self-node arrives on every netmap update).
620 self.params.state_tx.send_if_modified(|s| {
621 if *s != next {
622 *s = next.clone();
623 true
624 } else {
625 false
626 }
627 });
628
629 self.self_node.send_replace(Some(node.clone()));
630 }
631
632 if let Some(policy) = msg.ssh_policy.as_ref() {
633 self.ssh_policy.send_replace(Some(policy.clone()));
634 }
635
636 if let Some(tka) = msg.tka.as_ref() {
637 self.tka.send_replace(Some(tka.clone()));
638 self.maybe_sync_tka(tka, ctx.actor_ref().clone());
639 }
640
641 // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
642 // An update with no DNS config, or one carrying no cert domains, means "none" — Go
643 // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
644 let cert_domains = msg
645 .dns_config
646 .as_ref()
647 .map(|d| d.cert_domains.clone())
648 .unwrap_or_default();
649 self.cert_domains.send_replace(cert_domains);
650
651 // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
652 // `None` when control sent no DNS config on this update — distinct from a present but
653 // empty config (Go `netmap.NetworkMap.DNS`).
654 self.dns_config.send_replace(msg.dns_config.clone());
655
656 // Track the interactive-login URL for `Device::pop_browser_url`. `None` on updates
657 // that carry none — control sends it only when it wants a browser opened
658 // (`MapResponse.PopBrowserURL`); replace rather than accumulate.
659 self.pop_browser_url
660 .send_replace(msg.pop_browser_url.clone());
661
662 if let Err(e) = self.params.env.publish(msg).await {
663 tracing::error!(error = %e, "publishing netmap update");
664 }
665 }
666
667 StreamMessage::Finished(_) => {
668 tracing::error!("state update stream terminated")
669 }
670 }
671 }
672}
673
674/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
675/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
676/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
677/// [`ControlRunner::apply_tka_synced`](ControlRunner).
678#[doc(hidden)]
679pub struct TkaSynced {
680 pub(crate) result:
681 Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
682}
683
684impl Message<TkaSynced> for ControlRunner {
685 type Reply = ();
686
687 async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
688 self.apply_tka_synced(msg.result).await;
689 }
690}
691
692impl Message<DerpLatencyMeasurement> for ControlRunner {
693 type Reply = ();
694
695 async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
696 let measurements = msg.measurement.as_ref().clone();
697
698 // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
699 // the same measurements, before the home-region short-circuit below — an empty set still
700 // yields a (default/empty) report rather than a stale one.
701 self.netcheck
702 .send_replace(crate::status::NetcheckReport::from_region_results(
703 &measurements,
704 ));
705
706 let Some(result) = measurements.first() else {
707 tracing::debug!("derp latency measurements empty");
708 return;
709 };
710
711 let iter = measurements.iter().map(|result| {
712 (
713 result.latency_map_key.as_str(),
714 result.latency.as_secs_f64(),
715 )
716 });
717
718 tracing::debug!(selected_region_id = ?result.id, "updating home region");
719
720 self.client.set_home_region(result.id, iter).await;
721 }
722}
723
724impl Message<EndpointAdvertisement> for ControlRunner {
725 type Reply = ();
726
727 async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
728 let endpoints: Vec<Endpoint> = msg
729 .endpoints
730 .iter()
731 .map(|ep| Endpoint {
732 endpoint: ep.addr,
733 ty: match ep.ty {
734 SelfEndpointType::Local => EndpointType::Local,
735 SelfEndpointType::Stun => EndpointType::Stun,
736 SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
737 },
738 })
739 .collect();
740
741 tracing::debug!(
742 n_endpoints = endpoints.len(),
743 "advertising endpoints to control"
744 );
745
746 self.client.set_endpoints(endpoints).await;
747 }
748}