ts_runtime/control_runner.rs
1use core::{
2 net::{Ipv4Addr, Ipv6Addr},
3 time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9 actor::{ActorRef, Spawn},
10 message::{Context, StreamMessage},
11 prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15 AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16 Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21 derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22 direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29 client: AsyncControlClient,
30 params: Params,
31
32 self_node: watch::Sender<Option<Node>>,
33 /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34 /// server reads this to authorize incoming connections; absent policy means deny-all.
35 ssh_policy: watch::Sender<Option<SshPolicy>>,
36 /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37 tka: watch::Sender<Option<TkaStatus>>,
38 /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
39 /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
40 /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
41 /// the result returns via the [`TkaSynced`] self-message).
42 tka_synced: Option<crate::tka_sync::SyncedTka>,
43 /// Published copy of the synced TKA [`Authority`](ts_tka::Authority) for the verify-and-log
44 /// consumer. `None` until the first successful sync. Observe-only: a reader uses it to *log*
45 /// whether a peer's node-key signature verifies, never to drop a peer (enforcement is a separate
46 /// gated decision).
47 tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
48 /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
49 /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
50 tka_syncing: bool,
51 /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
52 /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
53 cert_domains: watch::Sender<Vec<String>>,
54 /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
55 /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
56 /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
57 /// own cell for the narrower TLS-cert use.
58 dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
59 /// Latest interactive-login / consent URL control asked this node to open
60 /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
61 /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user), and
62 /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) subscribes to it for the bus's
63 /// `browse_to_url` running-node events.
64 ///
65 /// **Sticky, not per-update** (Go `controlclient` `sess.lastPopBrowserURL`): control sends
66 /// `MapResponse.PopBrowserURL` empty on nearly every netmap tick, so this cell is updated ONLY on
67 /// a non-empty URL that differs from its current value (`sticky_update_pop_browser_url`, via
68 /// `send_if_modified` — the cell's own value is the "last URL seen", so no separate mirror is
69 /// needed). It is never reset to `None` by an empty update — matching Go's `direct.go` guard
70 /// `u != "" && u != sess.lastPopBrowserURL`. Updating on every tick would thrash the cell to
71 /// `None` and coalesce the URL away for a `watch` subscriber.
72 pop_browser_url: watch::Sender<Option<url::Url>>,
73 /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
74 /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
75 /// daemon's `tnet netcheck`). Empty until the first measurement.
76 netcheck: watch::Sender<crate::status::NetcheckReport>,
77}
78
79/// Control runner args.
80pub struct Params {
81 /// Control config.
82 pub(crate) config: ts_control::Config,
83
84 /// Auth key (if needed).
85 pub(crate) auth_key: Option<String>,
86
87 /// The [`crate::Env`] for this actor.
88 pub(crate) env: crate::Env,
89
90 /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
91 /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
92 /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
93 /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
94 pub(crate) state_tx: watch::Sender<crate::DeviceState>,
95}
96
97#[doc(hidden)]
98#[derive(Debug, thiserror::Error)]
99pub enum ControlRunnerError {
100 #[error(transparent)]
101 Control(#[from] ControlError),
102
103 #[error(transparent)]
104 Crate(#[from] crate::Error),
105}
106
107impl kameo::Actor for ControlRunner {
108 type Args = Params;
109 type Error = ControlRunnerError;
110
111 async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
112 loop {
113 match AsyncControlClient::check_auth(
114 ¶ms.config,
115 ¶ms.env.keys,
116 params.auth_key.as_deref(),
117 )
118 .await
119 {
120 Ok(()) => break,
121 Err(ControlError::MachineNotAuthorized(u)) => {
122 tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
123 // Surface "interactive login required" so a watcher / `wait_until_running` can
124 // tell the user to authorize, instead of seeing an opaque timeout. Registration
125 // keeps retrying (transient), so this is not a terminal `Failed`.
126 params
127 .state_tx
128 .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
129 tokio::time::sleep(Duration::from_secs(5)).await;
130 }
131 Err(e) => {
132 // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
133 // specific reason control gave AND publish it as a typed `Failed` state so
134 // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
135 // of the opaque `Internal(Actor)` the caller would otherwise see once the
136 // stopped actor is next asked. Publishing before `return Err` is why the state
137 // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
138 let reason = crate::RegistrationError::from(&e);
139 tracing::error!(error = %e, "registration failed; control runner stopping");
140 params
141 .state_tx
142 .send_replace(crate::DeviceState::Failed(reason));
143 return Err(e.into());
144 }
145 }
146 }
147 // check_auth succeeded, but the node is not "up" until the netmap stream is actually
148 // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
149 // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
150 // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
151 // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
152 let bring_up = async {
153 let (client, stream) = AsyncControlClient::connect(
154 ¶ms.config,
155 ¶ms.env.keys,
156 params.auth_key.as_deref(),
157 )
158 .await?;
159
160 DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
161
162 params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
163 params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
164 slf.attach_stream(stream.boxed(), (), ());
165 Ok::<_, ControlRunnerError>(client)
166 };
167
168 let client = match bring_up.await {
169 Ok(client) => client,
170 Err(e) => {
171 tracing::error!(error = %e, "bringing up the control session failed");
172 // The control session never came up; surface it as a transient registration
173 // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
174 // stuck at `Connecting`.
175 params.state_tx.send_replace(crate::DeviceState::Failed(
176 crate::RegistrationError::NetworkUnreachable,
177 ));
178 return Err(e);
179 }
180 };
181
182 // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
183 // current (and flips to `Expired` if the self-node's key lapses).
184 params.state_tx.send_replace(crate::DeviceState::Running);
185
186 Ok(Self {
187 client,
188 params,
189 self_node: Default::default(),
190 ssh_policy: Default::default(),
191 tka: Default::default(),
192 tka_synced: None,
193 tka_authority: Default::default(),
194 tka_syncing: false,
195 cert_domains: Default::default(),
196 dns_config: Default::default(),
197 pop_browser_url: Default::default(),
198 netcheck: Default::default(),
199 })
200 }
201}
202
203impl ControlRunner {
204 /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
205 /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
206 /// round-trip). The result returns via the [`TkaSynced`] self-message.
207 ///
208 /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
209 /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
210 /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
211 /// `tkaSyncIfNeeded`: a no-op when our head already matches.
212 fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
213 if !tka.is_enabled() {
214 // Lock disabled (or never enabled): drop any synced state and stop publishing an
215 // Authority. Never an error; peers are unaffected.
216 if self.tka_synced.is_some() {
217 self.tka_synced = None;
218 self.tka_authority.send_replace(None);
219 }
220 return;
221 }
222 if self.tka_syncing {
223 return; // a sync is already in flight; the next netmap will re-trigger if still stale
224 }
225 // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
226 // do. A malformed control head is treated as "different" (we'll attempt a sync, which
227 // fail-closes harmlessly).
228 if let Some(synced) = &self.tka_synced
229 && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
230 && synced.authority.head_matches(&control_head)
231 {
232 return;
233 }
234
235 // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
236 // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
237 // `tka_syncing` so we don't spawn a second concurrent sync.
238 self.tka_syncing = true;
239 let current = self.tka_synced.take();
240 let config = self.params.config.clone();
241 let keys = self.params.env.keys.clone();
242 tokio::spawn(async move {
243 let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
244 // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
245 // not allowed). A send failure just means the actor is gone — nothing to do.
246 if let Err(e) = self_ref.tell(TkaSynced { result }).await {
247 tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
248 }
249 });
250 }
251
252 /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
253 /// state + publish the `Authority` (or, on inert/failed sync, leave peers unaffected). Always
254 /// clears the in-flight guard.
255 async fn apply_tka_synced(
256 &mut self,
257 result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
258 ) {
259 self.tka_syncing = false;
260 match result {
261 Ok(Some(synced)) => {
262 tracing::info!(
263 head = %synced.authority.head().to_base32(),
264 "TKA sync succeeded; publishing verified Authority (observe-only)"
265 );
266 self.tka_authority
267 .send_replace(Some(synced.authority.clone()));
268 // Deliver the verified Authority to the peer tracker's observe-only verify-and-log
269 // seam (#136) over the bus. Re-published on every successful sync (no bus replay).
270 if let Err(e) = self
271 .params
272 .env
273 .publish(crate::peer_tracker::TkaAuthorityUpdate(
274 synced.authority.clone(),
275 ))
276 .await
277 {
278 tracing::warn!(error = %e, "publishing TKA authority to peer tracker failed");
279 }
280 self.tka_synced = Some(synced);
281 }
282 Ok(None) => {
283 // Control has no lock for us (no genesis / disabled): stay inert. Not an error.
284 tracing::debug!("TKA sync: control reported no lock for this node (inert)");
285 }
286 Err(e) => {
287 // Transport or verify failure: log and stay inert. NEVER errors the netmap or drops a
288 // peer. The next netmap update re-triggers a sync attempt.
289 tracing::warn!(error = %e, "TKA sync failed; staying inert (no peer impact)");
290 }
291 }
292 }
293
294 fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
295 where
296 F: FnOnce(&Node) -> R,
297 {
298 let mut sub = self.self_node.subscribe();
299 let mut shutdown = self.params.env.shutdown.clone();
300
301 async move {
302 tokio::select! {
303 _ = shutdown.wait_for(|x| *x) => {
304 None
305 },
306 node = sub.wait_for(Option::is_some) => {
307 Some(f(node.ok()?.as_ref()?))
308 },
309 }
310 }
311 }
312}
313
314/// Apply Go's sticky `PopBrowserURL` semantics to the consent-URL `watch` cell.
315///
316/// Control sends `MapResponse.PopBrowserURL` empty on nearly every netmap update, so the cell is
317/// updated ONLY when `incoming` is a non-empty URL that differs from the cell's current value —
318/// Go's `direct.go` guard `u != "" && u != sess.lastPopBrowserURL`. The cell is **never reset to
319/// `None`** by an empty/absent update — the running-node consent URL is sticky for the session.
320/// Updating unconditionally would thrash the cell to `None` on every tick and coalesce the URL away
321/// for a `watch`/bus subscriber.
322///
323/// The dedupe is in-place via [`watch::Sender::send_if_modified`] — the cell's own value is the
324/// "last URL sent" (this sticky path is its only writer), so no separate mirror field is needed and
325/// the watch is woken only on a genuine change (Go's `sess.lastPopBrowserURL` role, for free). This
326/// matches the [`send_if_modified`](watch::Sender::send_if_modified) idiom already used for the
327/// device-state cell in this handler.
328///
329/// Factored out of the netmap-update handler so the (easy-to-regress) sticky logic is unit-testable
330/// against a plain `watch` channel without standing up the actor.
331fn sticky_update_pop_browser_url(
332 cell: &watch::Sender<Option<url::Url>>,
333 incoming: Option<&url::Url>,
334) {
335 if let Some(url) = incoming {
336 cell.send_if_modified(|current| {
337 if current.as_ref() == Some(url) {
338 false
339 } else {
340 *current = Some(url.clone());
341 true
342 }
343 });
344 }
345}
346
347// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
348// those generated fields carry no doc and can't take attributes, so wrap in a module where
349// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
350// are re-exported so callers keep referencing them at `control_runner::<Name>`.
351pub use msg_impl::*;
352
353#[allow(missing_docs)]
354mod msg_impl {
355 use kameo::{message::Context, reply::DelegatedReply};
356
357 use super::*;
358
359 #[kameo::messages]
360 impl ControlRunner {
361 /// Fetch the IPv4 address for this tailscale device.
362 #[message(ctx)]
363 pub fn ipv4(
364 &self,
365 ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
366 ) -> DelegatedReply<Option<Ipv4Addr>> {
367 let (deleg, replier) = ctx.reply_sender();
368
369 if let Some(replier) = replier {
370 let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
371
372 tokio::spawn(async move {
373 let ip = fut.await;
374 replier.send(ip);
375 });
376 }
377
378 deleg
379 }
380
381 /// Fetch the IPv6 address for this tailscale device.
382 #[message(ctx)]
383 pub fn ipv6(
384 &self,
385 ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
386 ) -> DelegatedReply<Option<Ipv6Addr>> {
387 let (deleg, replier) = ctx.reply_sender();
388
389 if let Some(replier) = replier {
390 let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
391
392 tokio::spawn(async move {
393 let ip = fut.await;
394 replier.send(ip);
395 });
396 }
397
398 deleg
399 }
400
401 /// Fetch the self node for this tailscale device.
402 #[message(ctx)]
403 pub fn self_node(
404 &self,
405 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
406 ) -> DelegatedReply<Option<Node>> {
407 let (deleg, replier) = ctx.reply_sender();
408
409 if let Some(replier) = replier {
410 let node = self.with_self_node(|node| node.clone());
411
412 tokio::spawn(async move {
413 let node = node.await;
414 replier.send(node)
415 });
416 }
417
418 deleg
419 }
420
421 /// Fetch the current Tailscale SSH policy, if control has pushed one.
422 ///
423 /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
424 /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
425 /// for a value: an absent policy is a legitimate, immediate answer.
426 #[message]
427 pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
428 self.ssh_policy.borrow().clone()
429 }
430
431 /// Fetch the current Tailnet Lock status, if control has pushed one.
432 ///
433 /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
434 #[message]
435 pub fn current_tka_status(&self) -> Option<TkaStatus> {
436 self.tka.borrow().clone()
437 }
438
439 /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
440 ///
441 /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
442 /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
443 /// does not block waiting for a value).
444 #[message]
445 pub fn cert_domains(&self) -> Vec<String> {
446 self.cert_domains.borrow().clone()
447 }
448
449 /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
450 /// control has sent no DNS config yet. An immediate answer (does not block); the facade
451 /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
452 #[message]
453 pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
454 self.dns_config.borrow().clone()
455 }
456
457 /// The interactive-login / consent URL control last asked this node to open
458 /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
459 /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
460 #[message]
461 pub fn pop_browser_url(&self) -> Option<url::Url> {
462 self.pop_browser_url.borrow().clone()
463 }
464
465 /// Subscribe to the interactive-login / consent URL cell (`MapResponse.PopBrowserURL`).
466 ///
467 /// Returns a [`watch::Receiver`] whose value is the latest running-node consent URL, used by
468 /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) to surface `browse_to_url`
469 /// events mid-session. The cell is sticky (updated only on a new non-empty URL, never reset
470 /// to `None` by an empty update — see the field docs), so a subscriber is not thrashed and a
471 /// late subscriber sees the current URL. The initial value is `None` until control sends one.
472 #[message(derive(Clone))]
473 pub fn watch_browser_url(&self) -> watch::Receiver<Option<url::Url>> {
474 self.pop_browser_url.subscribe()
475 }
476
477 /// The latest network-conditions report (preferred DERP region + per-region latencies). An
478 /// immediate answer (does not block); empty before the first DERP-latency measurement. The
479 /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
480 #[message]
481 pub fn netcheck(&self) -> crate::status::NetcheckReport {
482 self.netcheck.borrow().clone()
483 }
484
485 /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
486 ///
487 /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
488 /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
489 /// for the round-trip.
490 #[message(ctx)]
491 pub fn fetch_id_token(
492 &self,
493 ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
494 audience: String,
495 ) -> DelegatedReply<Result<String, IdTokenError>> {
496 let (deleg, replier) = ctx.reply_sender();
497
498 if let Some(replier) = replier {
499 let config = self.params.config.clone();
500 let keys = self.params.env.keys.clone();
501 tokio::spawn(async move {
502 let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
503 replier.send(result);
504 });
505 }
506
507 deleg
508 }
509
510 /// Log this node out of the tailnet: deregister it by expiring its current node key.
511 ///
512 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
513 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
514 /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
515 /// control-plane state change only — it does NOT stop this actor or tear down the datapath
516 /// (the caller follows up with the normal runtime shutdown), and it does not touch the
517 /// on-disk node key, so re-registering with the same key is the re-login path.
518 #[message(ctx)]
519 pub fn logout(
520 &self,
521 ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
522 ) -> DelegatedReply<Result<(), LogoutError>> {
523 let (deleg, replier) = ctx.reply_sender();
524
525 if let Some(replier) = replier {
526 let config = self.params.config.clone();
527 let keys = self.params.env.keys.clone();
528 tokio::spawn(async move {
529 let result = ts_control::logout(&config, &keys).await;
530 replier.send(result);
531 });
532 }
533
534 deleg
535 }
536
537 /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
538 /// `LocalClient.SetDNS`).
539 ///
540 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
541 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
542 /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
543 /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
544 /// match, so the surfaced API takes only `name` + `value`.
545 #[message(ctx)]
546 pub fn set_dns(
547 &self,
548 ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
549 name: String,
550 value: String,
551 ) -> DelegatedReply<Result<(), SetDnsError>> {
552 let (deleg, replier) = ctx.reply_sender();
553
554 if let Some(replier) = replier {
555 let config = self.params.config.clone();
556 let keys = self.params.env.keys.clone();
557 tokio::spawn(async move {
558 let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
559 replier.send(result);
560 });
561 }
562
563 deleg
564 }
565 }
566
567 // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
568 // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
569 // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
570 // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
571 // block keeps the default build clean.
572 #[cfg(feature = "acme")]
573 #[kameo::messages]
574 impl ControlRunner {
575 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
576 /// client-side ACME DNS-01 engine (`acme` feature).
577 ///
578 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
579 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
580 /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
581 /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
582 ///
583 /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
584 /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
585 /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
586 /// allows it). Persisting a generated key back into the key file is the embedder's job (no
587 /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
588 /// publish 501s.
589 #[message(ctx)]
590 pub fn get_certificate(
591 &self,
592 ctx: &mut Context<
593 Self,
594 DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
595 >,
596 name: String,
597 ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
598 let (deleg, replier) = ctx.reply_sender();
599
600 if let Some(replier) = replier {
601 let config = self.params.config.clone();
602 let keys = self.params.env.keys.clone();
603 tokio::spawn(async move {
604 let result = issue_certificate(&config, &keys, &name).await;
605 replier.send(result);
606 });
607 }
608
609 deleg
610 }
611 }
612}
613
614/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01.
615///
616/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
617/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
618/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
619/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]).
620#[cfg(feature = "acme")]
621async fn issue_certificate(
622 config: &ts_control::Config,
623 keys: &ts_keys::NodeState,
624 name: &str,
625) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
626 let account_key = match keys.acme_account_key.as_deref() {
627 Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
628 None => {
629 tracing::debug!(
630 "no persisted ACME account key in key state; generating an ephemeral per-call key \
631 (a new ACME account this issuance — not persisted back)"
632 );
633 ts_control::acme::AcmeAccountKey::generate()?.0
634 }
635 };
636 let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
637 .parse()
638 .map_err(|e| {
639 ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
640 })?;
641 ts_control::issue_certificate_via_setdns(config, keys, name, &account_key, &directory).await
642}
643
644impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
645 type Reply = ();
646
647 async fn handle(
648 &mut self,
649 msg: StreamMessage<Arc<StateUpdate>, (), ()>,
650 ctx: &mut Context<Self, Self::Reply>,
651 ) {
652 match msg {
653 StreamMessage::Started(_) => {
654 tracing::trace!("started listening to state updates");
655 }
656
657 StreamMessage::Next(msg) => {
658 if let Some(node) = msg.node.as_ref() {
659 // Reflect node-key expiry into the device state: control delivering a self-node
660 // whose key is in the past means the node must re-authenticate. Otherwise the
661 // arrival of a fresh self-node confirms we are Running (recovers the state if a
662 // prior update had flipped it to Expired).
663 let now_unix = std::time::SystemTime::now()
664 .duration_since(std::time::UNIX_EPOCH)
665 .map(|d| d.as_secs() as i64)
666 .unwrap_or(0);
667 let next = if node.key_expired_at_unix(now_unix) {
668 crate::DeviceState::Expired
669 } else {
670 crate::DeviceState::Running
671 };
672 // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
673 // self-node arrives on every netmap update).
674 self.params.state_tx.send_if_modified(|s| {
675 if *s != next {
676 *s = next.clone();
677 true
678 } else {
679 false
680 }
681 });
682
683 self.self_node.send_replace(Some(node.clone()));
684 }
685
686 if let Some(policy) = msg.ssh_policy.as_ref() {
687 self.ssh_policy.send_replace(Some(policy.clone()));
688 }
689
690 if let Some(tka) = msg.tka.as_ref() {
691 self.tka.send_replace(Some(tka.clone()));
692 self.maybe_sync_tka(tka, ctx.actor_ref().clone());
693 }
694
695 // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
696 // An update with no DNS config, or one carrying no cert domains, means "none" — Go
697 // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
698 let cert_domains = msg
699 .dns_config
700 .as_ref()
701 .map(|d| d.cert_domains.clone())
702 .unwrap_or_default();
703 self.cert_domains.send_replace(cert_domains);
704
705 // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
706 // `None` when control sent no DNS config on this update — distinct from a present but
707 // empty config (Go `netmap.NetworkMap.DNS`).
708 self.dns_config.send_replace(msg.dns_config.clone());
709
710 // Track the interactive-login URL for `Device::pop_browser_url` /
711 // `Runtime::watch_ipn_bus`. See `sticky_update_pop_browser_url` for the Go-faithful
712 // sticky semantics (update only on a new non-empty URL; never reset to `None`).
713 sticky_update_pop_browser_url(&self.pop_browser_url, msg.pop_browser_url.as_ref());
714
715 if let Err(e) = self.params.env.publish(msg).await {
716 tracing::error!(error = %e, "publishing netmap update");
717 }
718 }
719
720 StreamMessage::Finished(_) => {
721 tracing::error!("state update stream terminated")
722 }
723 }
724 }
725}
726
727/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
728/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
729/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
730/// [`ControlRunner::apply_tka_synced`](ControlRunner).
731#[doc(hidden)]
732pub struct TkaSynced {
733 pub(crate) result:
734 Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
735}
736
737impl Message<TkaSynced> for ControlRunner {
738 type Reply = ();
739
740 async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
741 self.apply_tka_synced(msg.result).await;
742 }
743}
744
745impl Message<DerpLatencyMeasurement> for ControlRunner {
746 type Reply = ();
747
748 async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
749 let measurements = msg.measurement.as_ref().clone();
750
751 // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
752 // the same measurements, before the home-region short-circuit below — an empty set still
753 // yields a (default/empty) report rather than a stale one.
754 self.netcheck
755 .send_replace(crate::status::NetcheckReport::from_region_results(
756 &measurements,
757 ));
758
759 let Some(result) = measurements.first() else {
760 tracing::debug!("derp latency measurements empty");
761 return;
762 };
763
764 let iter = measurements.iter().map(|result| {
765 (
766 result.latency_map_key.as_str(),
767 result.latency.as_secs_f64(),
768 )
769 });
770
771 tracing::debug!(selected_region_id = ?result.id, "updating home region");
772
773 self.client.set_home_region(result.id, iter).await;
774 }
775}
776
777impl Message<EndpointAdvertisement> for ControlRunner {
778 type Reply = ();
779
780 async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
781 let endpoints: Vec<Endpoint> = msg
782 .endpoints
783 .iter()
784 .map(|ep| Endpoint {
785 endpoint: ep.addr,
786 ty: match ep.ty {
787 SelfEndpointType::Local => EndpointType::Local,
788 SelfEndpointType::Stun => EndpointType::Stun,
789 SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
790 },
791 })
792 .collect();
793
794 tracing::debug!(
795 n_endpoints = endpoints.len(),
796 "advertising endpoints to control"
797 );
798
799 self.client.set_endpoints(endpoints).await;
800 }
801}
802
803/// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control — the wire
804/// half of a runtime [`Runtime::set_advertise_routes`](crate::Runtime::set_advertise_routes). Sent
805/// as a direct `ask` from the runtime (not over the bus), so the route change reaches the live
806/// map-poll client. `routes` is the final advertised set the caller wants control to grant.
807#[derive(Debug)]
808pub struct SetAdvertiseRoutes {
809 /// The prefixes to advertise to control (already filtered to the final set).
810 pub routes: Vec<ipnet::IpNet>,
811}
812
813impl Message<SetAdvertiseRoutes> for ControlRunner {
814 type Reply = ();
815
816 async fn handle(&mut self, msg: SetAdvertiseRoutes, _ctx: &mut Context<Self, Self::Reply>) {
817 tracing::debug!(n_routes = msg.routes.len(), "advertising routes to control");
818 self.client.set_routable_ips(msg.routes).await;
819 }
820}
821
822/// Update this node's `Hostinfo.Hostname` at control — the wire half of a runtime
823/// [`Runtime::set_hostname`](crate::Runtime::set_hostname). A direct `ask` from the runtime, so the
824/// change reaches the live map-poll client.
825#[derive(Debug)]
826pub struct SetHostname {
827 /// The new hostname to report to control.
828 pub hostname: String,
829}
830
831impl Message<SetHostname> for ControlRunner {
832 type Reply = ();
833
834 async fn handle(&mut self, msg: SetHostname, _ctx: &mut Context<Self, Self::Reply>) {
835 tracing::debug!("updating hostname at control");
836 self.client.set_hostname(msg.hostname).await;
837 }
838}
839
840#[cfg(test)]
841mod sticky_pop_browser_url_tests {
842 use tokio::sync::watch;
843
844 use super::sticky_update_pop_browser_url;
845
846 fn url(s: &str) -> url::Url {
847 s.parse().unwrap()
848 }
849
850 /// A non-empty URL publishes to the cell.
851 #[test]
852 fn non_empty_url_publishes() {
853 let (tx, rx) = watch::channel(None);
854 let u = url("https://login.example/consent");
855 sticky_update_pop_browser_url(&tx, Some(&u));
856 assert_eq!(*rx.borrow(), Some(u));
857 }
858
859 /// An absent (`None`) update — the common netmap tick — must NOT reset the cell. This is the
860 /// regression guard for the thrash bug (a reset-every-tick would coalesce the URL away on the bus).
861 #[test]
862 fn absent_update_does_not_reset() {
863 let u = url("https://login.example/consent");
864 let (tx, rx) = watch::channel(Some(u.clone()));
865 // Simulate many empty netmap updates.
866 for _ in 0..5 {
867 sticky_update_pop_browser_url(&tx, None);
868 }
869 assert_eq!(
870 *rx.borrow(),
871 Some(u),
872 "empty updates must not clear the URL"
873 );
874 }
875
876 /// The same URL repeated does not re-fire the watch (in-place dedupe via `send_if_modified`), so
877 /// a subscriber isn't woken spuriously. Proven by the borrow not having been marked changed.
878 #[test]
879 fn repeated_same_url_does_not_refire() {
880 let u = url("https://login.example/consent");
881 let (tx, mut rx) = watch::channel(None);
882 sticky_update_pop_browser_url(&tx, Some(&u)); // first: fires
883 assert!(rx.has_changed().unwrap(), "first non-empty URL fires");
884 rx.mark_unchanged();
885 sticky_update_pop_browser_url(&tx, Some(&u)); // same: deduped
886 assert!(
887 !rx.has_changed().unwrap(),
888 "repeating the same URL must not re-fire the watch"
889 );
890 }
891
892 /// A genuinely new URL after a prior one fires again (sticky but tracks changes).
893 #[test]
894 fn new_url_after_prior_fires() {
895 let a = url("https://login.example/a");
896 let b = url("https://login.example/b");
897 let (tx, rx) = watch::channel(None);
898 sticky_update_pop_browser_url(&tx, Some(&a));
899 sticky_update_pop_browser_url(&tx, Some(&b));
900 assert_eq!(*rx.borrow(), Some(b));
901 }
902
903 /// The realistic session sequence: a URL stays sticky through a run of `None` ticks, and a
904 /// *different* URL after that gap still fires. Chains the legs the other tests cover in isolation
905 /// (the actual control cadence is "URL, then many empty updates, then maybe a new URL").
906 #[test]
907 fn sticky_through_none_gap_then_new_url_fires() {
908 let a = url("https://login.example/a");
909 let b = url("https://login.example/b");
910 let (tx, rx) = watch::channel(None);
911 sticky_update_pop_browser_url(&tx, Some(&a));
912 for _ in 0..3 {
913 sticky_update_pop_browser_url(&tx, None);
914 }
915 assert_eq!(*rx.borrow(), Some(a), "stayed sticky through the None gap");
916 sticky_update_pop_browser_url(&tx, Some(&b));
917 assert_eq!(
918 *rx.borrow(),
919 Some(b),
920 "a new URL after a None gap still fires"
921 );
922 }
923
924 /// Returning to a previously-seen URL (A → B → A) re-fires: the dedupe is against the cell's
925 /// *current* value, not a full history, so A after B is a genuine change.
926 #[test]
927 fn returning_to_prior_url_refires() {
928 let a = url("https://login.example/a");
929 let b = url("https://login.example/b");
930 let (tx, mut rx) = watch::channel(None);
931 sticky_update_pop_browser_url(&tx, Some(&a));
932 sticky_update_pop_browser_url(&tx, Some(&b));
933 rx.mark_unchanged();
934 sticky_update_pop_browser_url(&tx, Some(&a)); // back to A: differs from current (B) → fires
935 assert!(
936 rx.has_changed().unwrap(),
937 "returning to a prior URL re-fires"
938 );
939 assert_eq!(*rx.borrow(), Some(a));
940 }
941
942 /// End-to-end de-thrash: feed a realistic netmap cadence (empty, empty, URL, empty, empty)
943 /// through the producer into a cell, and count the changes a `run_bus`-style subscriber would
944 /// observe via `changed()`. The whole point of the fix is that exactly ONE change survives the
945 /// surrounding `None` thrash — the pre-fix code (`send_replace` every tick) would have woken the
946 /// subscriber on every empty tick and coalesced the URL away. This exercises the producer + the
947 /// watch-subscribe path together (the two halves the unit tests cover in isolation).
948 #[tokio::test]
949 async fn end_to_end_one_change_survives_none_thrash() {
950 let u = url("https://login.example/consent");
951 let (tx, mut rx) = watch::channel(None);
952 // The cadence control actually sends: mostly-empty MapResponses with one carrying the URL.
953 let cadence = [None, None, Some(&u), None, None];
954 for incoming in cadence {
955 sticky_update_pop_browser_url(&tx, incoming);
956 }
957 // A subscriber sees exactly one change, and it carries the URL (not a coalesced `None`).
958 let mut changes = 0;
959 while rx.has_changed().unwrap() {
960 let v = rx.borrow_and_update().clone();
961 changes += 1;
962 assert_eq!(v, Some(u.clone()), "the surviving change carries the URL");
963 }
964 assert_eq!(changes, 1, "exactly one change survives the None thrash");
965 }
966}