ts_runtime/control_runner.rs
1use core::{
2 net::{Ipv4Addr, Ipv6Addr},
3 time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9 actor::{ActorRef, Spawn},
10 message::{Context, StreamMessage},
11 prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15 AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16 Node, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21 derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22 direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29 client: AsyncControlClient,
30 params: Params,
31
32 self_node: watch::Sender<Option<Node>>,
33 /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34 /// server reads this to authorize incoming connections; absent policy means deny-all.
35 ssh_policy: watch::Sender<Option<SshPolicy>>,
36 /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37 tka: watch::Sender<Option<TkaStatus>>,
38 /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
39 /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
40 /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
41 /// the result returns via the [`TkaSynced`] self-message).
42 tka_synced: Option<crate::tka_sync::SyncedTka>,
43 /// Published copy of the synced TKA [`Authority`](ts_tka::Authority) for the verify-and-log
44 /// consumer. `None` until the first successful sync. Observe-only: a reader uses it to *log*
45 /// whether a peer's node-key signature verifies, never to drop a peer (enforcement is a separate
46 /// gated decision).
47 tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
48 /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
49 /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
50 tka_syncing: bool,
51 /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
52 /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
53 cert_domains: watch::Sender<Vec<String>>,
54 /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
55 /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
56 /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
57 /// own cell for the narrower TLS-cert use.
58 dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
59 /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
60 /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
61 /// daemon's `tnet netcheck`). Empty until the first measurement.
62 netcheck: watch::Sender<crate::status::NetcheckReport>,
63}
64
65/// Control runner args.
66pub struct Params {
67 /// Control config.
68 pub(crate) config: ts_control::Config,
69
70 /// Auth key (if needed).
71 pub(crate) auth_key: Option<String>,
72
73 /// The [`crate::Env`] for this actor.
74 pub(crate) env: crate::Env,
75
76 /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
77 /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
78 /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
79 /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
80 pub(crate) state_tx: watch::Sender<crate::DeviceState>,
81}
82
83#[doc(hidden)]
84#[derive(Debug, thiserror::Error)]
85pub enum ControlRunnerError {
86 #[error(transparent)]
87 Control(#[from] ControlError),
88
89 #[error(transparent)]
90 Crate(#[from] crate::Error),
91}
92
93impl kameo::Actor for ControlRunner {
94 type Args = Params;
95 type Error = ControlRunnerError;
96
97 async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
98 loop {
99 match AsyncControlClient::check_auth(
100 ¶ms.config,
101 ¶ms.env.keys,
102 params.auth_key.as_deref(),
103 )
104 .await
105 {
106 Ok(()) => break,
107 Err(ControlError::MachineNotAuthorized(u)) => {
108 tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
109 // Surface "interactive login required" so a watcher / `wait_until_running` can
110 // tell the user to authorize, instead of seeing an opaque timeout. Registration
111 // keeps retrying (transient), so this is not a terminal `Failed`.
112 params
113 .state_tx
114 .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
115 tokio::time::sleep(Duration::from_secs(5)).await;
116 }
117 Err(e) => {
118 // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
119 // specific reason control gave AND publish it as a typed `Failed` state so
120 // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
121 // of the opaque `Internal(Actor)` the caller would otherwise see once the
122 // stopped actor is next asked. Publishing before `return Err` is why the state
123 // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
124 let reason = crate::RegistrationError::from(&e);
125 tracing::error!(error = %e, "registration failed; control runner stopping");
126 params
127 .state_tx
128 .send_replace(crate::DeviceState::Failed(reason));
129 return Err(e.into());
130 }
131 }
132 }
133 // check_auth succeeded, but the node is not "up" until the netmap stream is actually
134 // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
135 // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
136 // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
137 // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
138 let bring_up = async {
139 let (client, stream) = AsyncControlClient::connect(
140 ¶ms.config,
141 ¶ms.env.keys,
142 params.auth_key.as_deref(),
143 )
144 .await?;
145
146 DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
147
148 params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
149 params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
150 slf.attach_stream(stream.boxed(), (), ());
151 Ok::<_, ControlRunnerError>(client)
152 };
153
154 let client = match bring_up.await {
155 Ok(client) => client,
156 Err(e) => {
157 tracing::error!(error = %e, "bringing up the control session failed");
158 // The control session never came up; surface it as a transient registration
159 // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
160 // stuck at `Connecting`.
161 params.state_tx.send_replace(crate::DeviceState::Failed(
162 crate::RegistrationError::NetworkUnreachable,
163 ));
164 return Err(e);
165 }
166 };
167
168 // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
169 // current (and flips to `Expired` if the self-node's key lapses).
170 params.state_tx.send_replace(crate::DeviceState::Running);
171
172 Ok(Self {
173 client,
174 params,
175 self_node: Default::default(),
176 ssh_policy: Default::default(),
177 tka: Default::default(),
178 tka_synced: None,
179 tka_authority: Default::default(),
180 tka_syncing: false,
181 cert_domains: Default::default(),
182 dns_config: Default::default(),
183 netcheck: Default::default(),
184 })
185 }
186}
187
188impl ControlRunner {
189 /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
190 /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
191 /// round-trip). The result returns via the [`TkaSynced`] self-message.
192 ///
193 /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
194 /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
195 /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
196 /// `tkaSyncIfNeeded`: a no-op when our head already matches.
197 fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
198 if !tka.is_enabled() {
199 // Lock disabled (or never enabled): drop any synced state and stop publishing an
200 // Authority. Never an error; peers are unaffected.
201 if self.tka_synced.is_some() {
202 self.tka_synced = None;
203 self.tka_authority.send_replace(None);
204 }
205 return;
206 }
207 if self.tka_syncing {
208 return; // a sync is already in flight; the next netmap will re-trigger if still stale
209 }
210 // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
211 // do. A malformed control head is treated as "different" (we'll attempt a sync, which
212 // fail-closes harmlessly).
213 if let Some(synced) = &self.tka_synced
214 && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
215 && synced.authority.head_matches(&control_head)
216 {
217 return;
218 }
219
220 // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
221 // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
222 // `tka_syncing` so we don't spawn a second concurrent sync.
223 self.tka_syncing = true;
224 let current = self.tka_synced.take();
225 let config = self.params.config.clone();
226 let keys = self.params.env.keys.clone();
227 tokio::spawn(async move {
228 let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
229 // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
230 // not allowed). A send failure just means the actor is gone — nothing to do.
231 if let Err(e) = self_ref.tell(TkaSynced { result }).await {
232 tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
233 }
234 });
235 }
236
237 /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
238 /// state + publish the `Authority` (or, on inert/failed sync, leave peers unaffected). Always
239 /// clears the in-flight guard.
240 async fn apply_tka_synced(
241 &mut self,
242 result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
243 ) {
244 self.tka_syncing = false;
245 match result {
246 Ok(Some(synced)) => {
247 tracing::info!(
248 head = %synced.authority.head().to_base32(),
249 "TKA sync succeeded; publishing verified Authority (observe-only)"
250 );
251 self.tka_authority
252 .send_replace(Some(synced.authority.clone()));
253 // Deliver the verified Authority to the peer tracker's observe-only verify-and-log
254 // seam (#136) over the bus. Re-published on every successful sync (no bus replay).
255 if let Err(e) = self
256 .params
257 .env
258 .publish(crate::peer_tracker::TkaAuthorityUpdate(
259 synced.authority.clone(),
260 ))
261 .await
262 {
263 tracing::warn!(error = %e, "publishing TKA authority to peer tracker failed");
264 }
265 self.tka_synced = Some(synced);
266 }
267 Ok(None) => {
268 // Control has no lock for us (no genesis / disabled): stay inert. Not an error.
269 tracing::debug!("TKA sync: control reported no lock for this node (inert)");
270 }
271 Err(e) => {
272 // Transport or verify failure: log and stay inert. NEVER errors the netmap or drops a
273 // peer. The next netmap update re-triggers a sync attempt.
274 tracing::warn!(error = %e, "TKA sync failed; staying inert (no peer impact)");
275 }
276 }
277 }
278
279 fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
280 where
281 F: FnOnce(&Node) -> R,
282 {
283 let mut sub = self.self_node.subscribe();
284 let mut shutdown = self.params.env.shutdown.clone();
285
286 async move {
287 tokio::select! {
288 _ = shutdown.wait_for(|x| *x) => {
289 None
290 },
291 node = sub.wait_for(Option::is_some) => {
292 Some(f(node.ok()?.as_ref()?))
293 },
294 }
295 }
296 }
297}
298
299// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
300// those generated fields carry no doc and can't take attributes, so wrap in a module where
301// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
302// are re-exported so callers keep referencing them at `control_runner::<Name>`.
303pub use msg_impl::*;
304
305#[allow(missing_docs)]
306mod msg_impl {
307 use kameo::{message::Context, reply::DelegatedReply};
308
309 use super::*;
310
311 #[kameo::messages]
312 impl ControlRunner {
313 /// Fetch the IPv4 address for this tailscale device.
314 #[message(ctx)]
315 pub fn ipv4(
316 &self,
317 ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
318 ) -> DelegatedReply<Option<Ipv4Addr>> {
319 let (deleg, replier) = ctx.reply_sender();
320
321 if let Some(replier) = replier {
322 let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
323
324 tokio::spawn(async move {
325 let ip = fut.await;
326 replier.send(ip);
327 });
328 }
329
330 deleg
331 }
332
333 /// Fetch the IPv6 address for this tailscale device.
334 #[message(ctx)]
335 pub fn ipv6(
336 &self,
337 ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
338 ) -> DelegatedReply<Option<Ipv6Addr>> {
339 let (deleg, replier) = ctx.reply_sender();
340
341 if let Some(replier) = replier {
342 let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
343
344 tokio::spawn(async move {
345 let ip = fut.await;
346 replier.send(ip);
347 });
348 }
349
350 deleg
351 }
352
353 /// Fetch the self node for this tailscale device.
354 #[message(ctx)]
355 pub fn self_node(
356 &self,
357 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
358 ) -> DelegatedReply<Option<Node>> {
359 let (deleg, replier) = ctx.reply_sender();
360
361 if let Some(replier) = replier {
362 let node = self.with_self_node(|node| node.clone());
363
364 tokio::spawn(async move {
365 let node = node.await;
366 replier.send(node)
367 });
368 }
369
370 deleg
371 }
372
373 /// Fetch the current Tailscale SSH policy, if control has pushed one.
374 ///
375 /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
376 /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
377 /// for a value: an absent policy is a legitimate, immediate answer.
378 #[message]
379 pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
380 self.ssh_policy.borrow().clone()
381 }
382
383 /// Fetch the current Tailnet Lock status, if control has pushed one.
384 ///
385 /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
386 #[message]
387 pub fn current_tka_status(&self) -> Option<TkaStatus> {
388 self.tka.borrow().clone()
389 }
390
391 /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
392 ///
393 /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
394 /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
395 /// does not block waiting for a value).
396 #[message]
397 pub fn cert_domains(&self) -> Vec<String> {
398 self.cert_domains.borrow().clone()
399 }
400
401 /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
402 /// control has sent no DNS config yet. An immediate answer (does not block); the facade
403 /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
404 #[message]
405 pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
406 self.dns_config.borrow().clone()
407 }
408
409 /// The latest network-conditions report (preferred DERP region + per-region latencies). An
410 /// immediate answer (does not block); empty before the first DERP-latency measurement. The
411 /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
412 #[message]
413 pub fn netcheck(&self) -> crate::status::NetcheckReport {
414 self.netcheck.borrow().clone()
415 }
416
417 /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
418 ///
419 /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
420 /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
421 /// for the round-trip.
422 #[message(ctx)]
423 pub fn fetch_id_token(
424 &self,
425 ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
426 audience: String,
427 ) -> DelegatedReply<Result<String, IdTokenError>> {
428 let (deleg, replier) = ctx.reply_sender();
429
430 if let Some(replier) = replier {
431 let config = self.params.config.clone();
432 let keys = self.params.env.keys.clone();
433 tokio::spawn(async move {
434 let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
435 replier.send(result);
436 });
437 }
438
439 deleg
440 }
441
442 /// Log this node out of the tailnet: deregister it by expiring its current node key.
443 ///
444 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
445 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
446 /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
447 /// control-plane state change only — it does NOT stop this actor or tear down the datapath
448 /// (the caller follows up with the normal runtime shutdown), and it does not touch the
449 /// on-disk node key, so re-registering with the same key is the re-login path.
450 #[message(ctx)]
451 pub fn logout(
452 &self,
453 ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
454 ) -> DelegatedReply<Result<(), LogoutError>> {
455 let (deleg, replier) = ctx.reply_sender();
456
457 if let Some(replier) = replier {
458 let config = self.params.config.clone();
459 let keys = self.params.env.keys.clone();
460 tokio::spawn(async move {
461 let result = ts_control::logout(&config, &keys).await;
462 replier.send(result);
463 });
464 }
465
466 deleg
467 }
468 }
469
470 // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
471 // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
472 // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
473 // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
474 // block keeps the default build clean.
475 #[cfg(feature = "acme")]
476 #[kameo::messages]
477 impl ControlRunner {
478 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
479 /// client-side ACME DNS-01 engine (`acme` feature).
480 ///
481 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
482 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
483 /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
484 /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
485 ///
486 /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
487 /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
488 /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
489 /// allows it). Persisting a generated key back into the key file is the embedder's job (no
490 /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
491 /// publish 501s.
492 #[message(ctx)]
493 pub fn get_certificate(
494 &self,
495 ctx: &mut Context<
496 Self,
497 DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
498 >,
499 name: String,
500 ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
501 let (deleg, replier) = ctx.reply_sender();
502
503 if let Some(replier) = replier {
504 let config = self.params.config.clone();
505 let keys = self.params.env.keys.clone();
506 tokio::spawn(async move {
507 let result = issue_certificate(&config, &keys, &name).await;
508 replier.send(result);
509 });
510 }
511
512 deleg
513 }
514 }
515}
516
517/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01.
518///
519/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
520/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
521/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
522/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]).
523#[cfg(feature = "acme")]
524async fn issue_certificate(
525 config: &ts_control::Config,
526 keys: &ts_keys::NodeState,
527 name: &str,
528) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
529 let account_key = match keys.acme_account_key.as_deref() {
530 Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
531 None => {
532 tracing::debug!(
533 "no persisted ACME account key in key state; generating an ephemeral per-call key \
534 (a new ACME account this issuance — not persisted back)"
535 );
536 ts_control::acme::AcmeAccountKey::generate()?.0
537 }
538 };
539 let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
540 .parse()
541 .map_err(|e| {
542 ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
543 })?;
544 ts_control::issue_certificate_via_setdns(config, keys, name, &account_key, &directory).await
545}
546
547impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
548 type Reply = ();
549
550 async fn handle(
551 &mut self,
552 msg: StreamMessage<Arc<StateUpdate>, (), ()>,
553 ctx: &mut Context<Self, Self::Reply>,
554 ) {
555 match msg {
556 StreamMessage::Started(_) => {
557 tracing::trace!("started listening to state updates");
558 }
559
560 StreamMessage::Next(msg) => {
561 if let Some(node) = msg.node.as_ref() {
562 // Reflect node-key expiry into the device state: control delivering a self-node
563 // whose key is in the past means the node must re-authenticate. Otherwise the
564 // arrival of a fresh self-node confirms we are Running (recovers the state if a
565 // prior update had flipped it to Expired).
566 let now_unix = std::time::SystemTime::now()
567 .duration_since(std::time::UNIX_EPOCH)
568 .map(|d| d.as_secs() as i64)
569 .unwrap_or(0);
570 let next = if node.key_expired_at_unix(now_unix) {
571 crate::DeviceState::Expired
572 } else {
573 crate::DeviceState::Running
574 };
575 // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
576 // self-node arrives on every netmap update).
577 self.params.state_tx.send_if_modified(|s| {
578 if *s != next {
579 *s = next.clone();
580 true
581 } else {
582 false
583 }
584 });
585
586 self.self_node.send_replace(Some(node.clone()));
587 }
588
589 if let Some(policy) = msg.ssh_policy.as_ref() {
590 self.ssh_policy.send_replace(Some(policy.clone()));
591 }
592
593 if let Some(tka) = msg.tka.as_ref() {
594 self.tka.send_replace(Some(tka.clone()));
595 self.maybe_sync_tka(tka, ctx.actor_ref().clone());
596 }
597
598 // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
599 // An update with no DNS config, or one carrying no cert domains, means "none" — Go
600 // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
601 let cert_domains = msg
602 .dns_config
603 .as_ref()
604 .map(|d| d.cert_domains.clone())
605 .unwrap_or_default();
606 self.cert_domains.send_replace(cert_domains);
607
608 // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
609 // `None` when control sent no DNS config on this update — distinct from a present but
610 // empty config (Go `netmap.NetworkMap.DNS`).
611 self.dns_config.send_replace(msg.dns_config.clone());
612
613 if let Err(e) = self.params.env.publish(msg).await {
614 tracing::error!(error = %e, "publishing netmap update");
615 }
616 }
617
618 StreamMessage::Finished(_) => {
619 tracing::error!("state update stream terminated")
620 }
621 }
622 }
623}
624
625/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
626/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
627/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
628/// [`ControlRunner::apply_tka_synced`](ControlRunner).
629#[doc(hidden)]
630pub struct TkaSynced {
631 pub(crate) result:
632 Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
633}
634
635impl Message<TkaSynced> for ControlRunner {
636 type Reply = ();
637
638 async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
639 self.apply_tka_synced(msg.result).await;
640 }
641}
642
643impl Message<DerpLatencyMeasurement> for ControlRunner {
644 type Reply = ();
645
646 async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
647 let measurements = msg.measurement.as_ref().clone();
648
649 // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
650 // the same measurements, before the home-region short-circuit below — an empty set still
651 // yields a (default/empty) report rather than a stale one.
652 self.netcheck
653 .send_replace(crate::status::NetcheckReport::from_region_results(
654 &measurements,
655 ));
656
657 let Some(result) = measurements.first() else {
658 tracing::debug!("derp latency measurements empty");
659 return;
660 };
661
662 let iter = measurements.iter().map(|result| {
663 (
664 result.latency_map_key.as_str(),
665 result.latency.as_secs_f64(),
666 )
667 });
668
669 tracing::debug!(selected_region_id = ?result.id, "updating home region");
670
671 self.client.set_home_region(result.id, iter).await;
672 }
673}
674
675impl Message<EndpointAdvertisement> for ControlRunner {
676 type Reply = ();
677
678 async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
679 let endpoints: Vec<Endpoint> = msg
680 .endpoints
681 .iter()
682 .map(|ep| Endpoint {
683 endpoint: ep.addr,
684 ty: match ep.ty {
685 SelfEndpointType::Local => EndpointType::Local,
686 SelfEndpointType::Stun => EndpointType::Stun,
687 SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
688 },
689 })
690 .collect();
691
692 tracing::debug!(
693 n_endpoints = endpoints.len(),
694 "advertising endpoints to control"
695 );
696
697 self.client.set_endpoints(endpoints).await;
698 }
699}