ts_runtime/control_runner.rs
1use core::{
2 net::{Ipv4Addr, Ipv6Addr},
3 time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9 actor::{ActorRef, Spawn},
10 message::{Context, StreamMessage},
11 prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15 AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16 Node, SetDnsError, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21 derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22 direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29 client: AsyncControlClient,
30 params: Params,
31
32 self_node: watch::Sender<Option<Node>>,
33 /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34 /// server reads this to authorize incoming connections; absent policy means deny-all.
35 ssh_policy: watch::Sender<Option<SshPolicy>>,
36 /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37 tka: watch::Sender<Option<TkaStatus>>,
38 /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
39 /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
40 /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
41 /// the result returns via the [`TkaSynced`] self-message).
42 tka_synced: Option<crate::tka_sync::SyncedTka>,
43 /// Published copy of the synced TKA [`Authority`](ts_tka::Authority) for the verify-and-log
44 /// consumer. `None` until the first successful sync. Observe-only: a reader uses it to *log*
45 /// whether a peer's node-key signature verifies, never to drop a peer (enforcement is a separate
46 /// gated decision).
47 tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
48 /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
49 /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
50 tka_syncing: bool,
51 /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
52 /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
53 cert_domains: watch::Sender<Vec<String>>,
54 /// Latest full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` until
55 /// control sends one. The facade reads this for `Device::dns_config` (the daemon's
56 /// `tnet dns status`). A superset of [`cert_domains`](Self::cert_domains), which is kept as its
57 /// own cell for the narrower TLS-cert use.
58 dns_config: watch::Sender<Option<ts_control::DnsConfig>>,
59 /// Latest interactive-login / consent URL control asked this node to open
60 /// (`MapResponse.PopBrowserURL`), or `None` until control sends one. The facade reads this for
61 /// `Device::pop_browser_url` (a daemon driving a non-authkey login surfaces it to the user), and
62 /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) subscribes to it for the bus's
63 /// `browse_to_url` running-node events.
64 ///
65 /// **Sticky, not per-update** (Go `controlclient` `sess.lastPopBrowserURL`): control sends
66 /// `MapResponse.PopBrowserURL` empty on nearly every netmap tick, so this cell is updated ONLY on
67 /// a non-empty URL that differs from its current value (`sticky_update_pop_browser_url`, via
68 /// `send_if_modified` — the cell's own value is the "last URL seen", so no separate mirror is
69 /// needed). It is never reset to `None` by an empty update — matching Go's `direct.go` guard
70 /// `u != "" && u != sess.lastPopBrowserURL`. Updating on every tick would thrash the cell to
71 /// `None` and coalesce the URL away for a `watch` subscriber.
72 pop_browser_url: watch::Sender<Option<url::Url>>,
73 /// Latest network-conditions report (preferred DERP region + per-region latencies), updated each
74 /// time the DERP-latency measurer reports in. The facade reads this for `Device::netcheck` (the
75 /// daemon's `tnet netcheck`). Empty until the first measurement.
76 netcheck: watch::Sender<crate::status::NetcheckReport>,
77}
78
79/// Control runner args.
80pub struct Params {
81 /// Control config.
82 pub(crate) config: ts_control::Config,
83
84 /// Auth key (if needed).
85 pub(crate) auth_key: Option<String>,
86
87 /// The [`crate::Env`] for this actor.
88 pub(crate) env: crate::Env,
89
90 /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
91 /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
92 /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
93 /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
94 pub(crate) state_tx: watch::Sender<crate::DeviceState>,
95}
96
97#[doc(hidden)]
98#[derive(Debug, thiserror::Error)]
99pub enum ControlRunnerError {
100 #[error(transparent)]
101 Control(#[from] ControlError),
102
103 #[error(transparent)]
104 Crate(#[from] crate::Error),
105}
106
107impl kameo::Actor for ControlRunner {
108 type Args = Params;
109 type Error = ControlRunnerError;
110
111 async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
112 loop {
113 match AsyncControlClient::check_auth(
114 ¶ms.config,
115 ¶ms.env.keys,
116 params.auth_key.as_deref(),
117 )
118 .await
119 {
120 Ok(()) => break,
121 Err(ControlError::MachineNotAuthorized(u)) => {
122 tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
123 // Surface "interactive login required" so a watcher / `wait_until_running` can
124 // tell the user to authorize, instead of seeing an opaque timeout. Registration
125 // keeps retrying (transient), so this is not a terminal `Failed`.
126 params
127 .state_tx
128 .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
129 tokio::time::sleep(Duration::from_secs(5)).await;
130 }
131 Err(e) => {
132 // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
133 // specific reason control gave AND publish it as a typed `Failed` state so
134 // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
135 // of the opaque `Internal(Actor)` the caller would otherwise see once the
136 // stopped actor is next asked. Publishing before `return Err` is why the state
137 // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
138 let reason = crate::RegistrationError::from(&e);
139 tracing::error!(error = %e, "registration failed; control runner stopping");
140 params
141 .state_tx
142 .send_replace(crate::DeviceState::Failed(reason));
143 return Err(e.into());
144 }
145 }
146 }
147 // check_auth succeeded, but the node is not "up" until the netmap stream is actually
148 // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
149 // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
150 // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
151 // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
152 let bring_up = async {
153 let (client, stream) = AsyncControlClient::connect(
154 ¶ms.config,
155 ¶ms.env.keys,
156 params.auth_key.as_deref(),
157 )
158 .await?;
159
160 DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
161
162 params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
163 params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
164 slf.attach_stream(stream.boxed(), (), ());
165 Ok::<_, ControlRunnerError>(client)
166 };
167
168 let client = match bring_up.await {
169 Ok(client) => client,
170 Err(e) => {
171 tracing::error!(error = %e, "bringing up the control session failed");
172 // The control session never came up; surface it as a transient registration
173 // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
174 // stuck at `Connecting`.
175 params.state_tx.send_replace(crate::DeviceState::Failed(
176 crate::RegistrationError::NetworkUnreachable,
177 ));
178 return Err(e);
179 }
180 };
181
182 // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
183 // current (and flips to `Expired` if the self-node's key lapses).
184 params.state_tx.send_replace(crate::DeviceState::Running);
185
186 Ok(Self {
187 client,
188 params,
189 self_node: Default::default(),
190 ssh_policy: Default::default(),
191 tka: Default::default(),
192 tka_synced: None,
193 tka_authority: Default::default(),
194 tka_syncing: false,
195 cert_domains: Default::default(),
196 dns_config: Default::default(),
197 pop_browser_url: Default::default(),
198 netcheck: Default::default(),
199 })
200 }
201}
202
203impl ControlRunner {
204 /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
205 /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
206 /// round-trip). The result returns via the [`TkaSynced`] self-message.
207 ///
208 /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
209 /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
210 /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
211 /// `tkaSyncIfNeeded`: a no-op when our head already matches.
212 fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
213 if !tka.is_enabled() {
214 // Lock disabled (or never enabled): drop any synced state and stop publishing an
215 // Authority. Never an error; peers are unaffected.
216 if self.tka_synced.is_some() {
217 self.tka_synced = None;
218 self.tka_authority.send_replace(None);
219 }
220 return;
221 }
222 if self.tka_syncing {
223 return; // a sync is already in flight; the next netmap will re-trigger if still stale
224 }
225 // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
226 // do. A malformed control head is treated as "different" (we'll attempt a sync, which
227 // fail-closes harmlessly).
228 if let Some(synced) = &self.tka_synced
229 && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
230 && synced.authority.head_matches(&control_head)
231 {
232 return;
233 }
234
235 // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
236 // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
237 // `tka_syncing` so we don't spawn a second concurrent sync.
238 self.tka_syncing = true;
239 let current = self.tka_synced.take();
240 let config = self.params.config.clone();
241 let keys = self.params.env.keys.clone();
242 tokio::spawn(async move {
243 let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
244 // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
245 // not allowed). A send failure just means the actor is gone — nothing to do.
246 if let Err(e) = self_ref.tell(TkaSynced { result }).await {
247 tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
248 }
249 });
250 }
251
252 /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
253 /// state + publish the `Authority` (or, on inert/failed sync, leave peers unaffected). Always
254 /// clears the in-flight guard.
255 async fn apply_tka_synced(
256 &mut self,
257 result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
258 ) {
259 self.tka_syncing = false;
260 match result {
261 Ok(Some(synced)) => {
262 tracing::info!(
263 head = %synced.authority.head().to_base32(),
264 "TKA sync succeeded; publishing verified Authority (observe-only)"
265 );
266 self.tka_authority
267 .send_replace(Some(synced.authority.clone()));
268 // Deliver the verified Authority to the peer tracker's observe-only verify-and-log
269 // seam (#136) over the bus. Re-published on every successful sync (no bus replay).
270 if let Err(e) = self
271 .params
272 .env
273 .publish(crate::peer_tracker::TkaAuthorityUpdate(
274 synced.authority.clone(),
275 ))
276 .await
277 {
278 tracing::warn!(error = %e, "publishing TKA authority to peer tracker failed");
279 }
280 self.tka_synced = Some(synced);
281 }
282 Ok(None) => {
283 // Control has no lock for us (no genesis / disabled): stay inert. Not an error.
284 tracing::debug!("TKA sync: control reported no lock for this node (inert)");
285 }
286 Err(e) => {
287 // Transport or verify failure: log and stay inert. NEVER errors the netmap or drops a
288 // peer. The next netmap update re-triggers a sync attempt.
289 tracing::warn!(error = %e, "TKA sync failed; staying inert (no peer impact)");
290 }
291 }
292 }
293
294 fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
295 where
296 F: FnOnce(&Node) -> R,
297 {
298 let mut sub = self.self_node.subscribe();
299 let mut shutdown = self.params.env.shutdown.clone();
300
301 async move {
302 tokio::select! {
303 _ = shutdown.wait_for(|x| *x) => {
304 None
305 },
306 node = sub.wait_for(Option::is_some) => {
307 Some(f(node.ok()?.as_ref()?))
308 },
309 }
310 }
311 }
312}
313
314/// Apply Go's sticky `PopBrowserURL` semantics to the consent-URL `watch` cell.
315///
316/// Control sends `MapResponse.PopBrowserURL` empty on nearly every netmap update, so the cell is
317/// updated ONLY when `incoming` is a non-empty URL that differs from the cell's current value —
318/// Go's `direct.go` guard `u != "" && u != sess.lastPopBrowserURL`. The cell is **never reset to
319/// `None`** by an empty/absent update — the running-node consent URL is sticky for the session.
320/// Updating unconditionally would thrash the cell to `None` on every tick and coalesce the URL away
321/// for a `watch`/bus subscriber.
322///
323/// The dedupe is in-place via [`watch::Sender::send_if_modified`] — the cell's own value is the
324/// "last URL sent" (this sticky path is its only writer), so no separate mirror field is needed and
325/// the watch is woken only on a genuine change (Go's `sess.lastPopBrowserURL` role, for free). This
326/// matches the [`send_if_modified`](watch::Sender::send_if_modified) idiom already used for the
327/// device-state cell in this handler.
328///
329/// Factored out of the netmap-update handler so the (easy-to-regress) sticky logic is unit-testable
330/// against a plain `watch` channel without standing up the actor.
331fn sticky_update_pop_browser_url(
332 cell: &watch::Sender<Option<url::Url>>,
333 incoming: Option<&url::Url>,
334) {
335 if let Some(url) = incoming {
336 cell.send_if_modified(|current| {
337 if current.as_ref() == Some(url) {
338 false
339 } else {
340 *current = Some(url.clone());
341 true
342 }
343 });
344 }
345}
346
347// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
348// those generated fields carry no doc and can't take attributes, so wrap in a module where
349// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
350// are re-exported so callers keep referencing them at `control_runner::<Name>`.
351pub use msg_impl::*;
352
353#[allow(missing_docs)]
354mod msg_impl {
355 use kameo::{message::Context, reply::DelegatedReply};
356
357 use super::*;
358
359 #[kameo::messages]
360 impl ControlRunner {
361 /// Fetch the IPv4 address for this tailscale device.
362 #[message(ctx)]
363 pub fn ipv4(
364 &self,
365 ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
366 ) -> DelegatedReply<Option<Ipv4Addr>> {
367 let (deleg, replier) = ctx.reply_sender();
368
369 if let Some(replier) = replier {
370 let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
371
372 tokio::spawn(async move {
373 let ip = fut.await;
374 replier.send(ip);
375 });
376 }
377
378 deleg
379 }
380
381 /// Fetch the IPv6 address for this tailscale device.
382 #[message(ctx)]
383 pub fn ipv6(
384 &self,
385 ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
386 ) -> DelegatedReply<Option<Ipv6Addr>> {
387 let (deleg, replier) = ctx.reply_sender();
388
389 if let Some(replier) = replier {
390 let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
391
392 tokio::spawn(async move {
393 let ip = fut.await;
394 replier.send(ip);
395 });
396 }
397
398 deleg
399 }
400
401 /// Fetch the self node for this tailscale device.
402 #[message(ctx)]
403 pub fn self_node(
404 &self,
405 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
406 ) -> DelegatedReply<Option<Node>> {
407 let (deleg, replier) = ctx.reply_sender();
408
409 if let Some(replier) = replier {
410 let node = self.with_self_node(|node| node.clone());
411
412 tokio::spawn(async move {
413 let node = node.await;
414 replier.send(node)
415 });
416 }
417
418 deleg
419 }
420
421 /// Fetch the current Tailscale SSH policy, if control has pushed one.
422 ///
423 /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
424 /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
425 /// for a value: an absent policy is a legitimate, immediate answer.
426 #[message]
427 pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
428 self.ssh_policy.borrow().clone()
429 }
430
431 /// Fetch the current Tailnet Lock status, if control has pushed one.
432 ///
433 /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
434 #[message]
435 pub fn current_tka_status(&self) -> Option<TkaStatus> {
436 self.tka.borrow().clone()
437 }
438
439 /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
440 ///
441 /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
442 /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
443 /// does not block waiting for a value).
444 #[message]
445 pub fn cert_domains(&self) -> Vec<String> {
446 self.cert_domains.borrow().clone()
447 }
448
449 /// The full DNS config from control's netmap (Go `netmap.NetworkMap.DNS`), or `None` when
450 /// control has sent no DNS config yet. An immediate answer (does not block); the facade
451 /// surfaces this for `Device::dns_config` (the daemon's `tnet dns status`).
452 #[message]
453 pub fn dns_config(&self) -> Option<ts_control::DnsConfig> {
454 self.dns_config.borrow().clone()
455 }
456
457 /// The interactive-login / consent URL control last asked this node to open
458 /// (`MapResponse.PopBrowserURL`), or `None` when control has sent none. An immediate answer
459 /// (does not block); the facade surfaces this for `Device::pop_browser_url`.
460 #[message]
461 pub fn pop_browser_url(&self) -> Option<url::Url> {
462 self.pop_browser_url.borrow().clone()
463 }
464
465 /// Subscribe to the interactive-login / consent URL cell (`MapResponse.PopBrowserURL`).
466 ///
467 /// Returns a [`watch::Receiver`] whose value is the latest running-node consent URL, used by
468 /// [`Runtime::watch_ipn_bus`](crate::Runtime::watch_ipn_bus) to surface `browse_to_url`
469 /// events mid-session. The cell is sticky (updated only on a new non-empty URL, never reset
470 /// to `None` by an empty update — see the field docs), so a subscriber is not thrashed and a
471 /// late subscriber sees the current URL. The initial value is `None` until control sends one.
472 #[message(derive(Clone))]
473 pub fn watch_browser_url(&self) -> watch::Receiver<Option<url::Url>> {
474 self.pop_browser_url.subscribe()
475 }
476
477 /// The latest network-conditions report (preferred DERP region + per-region latencies). An
478 /// immediate answer (does not block); empty before the first DERP-latency measurement. The
479 /// facade surfaces this for `Device::netcheck` (the daemon's `tnet netcheck`).
480 #[message]
481 pub fn netcheck(&self) -> crate::status::NetcheckReport {
482 self.netcheck.borrow().clone()
483 }
484
485 /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
486 ///
487 /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
488 /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
489 /// for the round-trip.
490 #[message(ctx)]
491 pub fn fetch_id_token(
492 &self,
493 ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
494 audience: String,
495 ) -> DelegatedReply<Result<String, IdTokenError>> {
496 let (deleg, replier) = ctx.reply_sender();
497
498 if let Some(replier) = replier {
499 let config = self.params.config.clone();
500 let keys = self.params.env.keys.clone();
501 tokio::spawn(async move {
502 let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
503 replier.send(result);
504 });
505 }
506
507 deleg
508 }
509
510 /// Log this node out of the tailnet: deregister it by expiring its current node key.
511 ///
512 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
513 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
514 /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
515 /// control-plane state change only — it does NOT stop this actor or tear down the datapath
516 /// (the caller follows up with the normal runtime shutdown), and it does not touch the
517 /// on-disk node key, so re-registering with the same key is the re-login path.
518 #[message(ctx)]
519 pub fn logout(
520 &self,
521 ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
522 ) -> DelegatedReply<Result<(), LogoutError>> {
523 let (deleg, replier) = ctx.reply_sender();
524
525 if let Some(replier) = replier {
526 let config = self.params.config.clone();
527 let keys = self.params.env.keys.clone();
528 tokio::spawn(async move {
529 let result = ts_control::logout(&config, &keys).await;
530 replier.send(result);
531 });
532 }
533
534 deleg
535 }
536
537 /// Publish a DNS record for this node via control's `/machine/set-dns` (Go
538 /// `LocalClient.SetDNS`).
539 ///
540 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
541 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
542 /// POSTs the record over a fresh Noise channel. Go's `SetDNS` is `TXT`-only (its sole use is
543 /// the ACME DNS-01 `_acme-challenge` record); the record type is fixed to `"TXT"` here to
544 /// match, so the surfaced API takes only `name` + `value`.
545 #[message(ctx)]
546 pub fn set_dns(
547 &self,
548 ctx: &mut Context<Self, DelegatedReply<Result<(), SetDnsError>>>,
549 name: String,
550 value: String,
551 ) -> DelegatedReply<Result<(), SetDnsError>> {
552 let (deleg, replier) = ctx.reply_sender();
553
554 if let Some(replier) = replier {
555 let config = self.params.config.clone();
556 let keys = self.params.env.keys.clone();
557 tokio::spawn(async move {
558 let result = ts_control::set_dns(&config, &keys, &name, "TXT", &value).await;
559 replier.send(result);
560 });
561 }
562
563 deleg
564 }
565 }
566
567 /// The reply type of the [`get_cert_pair`](ControlRunner::get_cert_pair) message: the issued
568 /// `(cert_chain_pem, key_pem)` PEM pair (the `tnet cert` surface) or a [`ts_control::CertError`].
569 /// Aliased so the message's `Context` type stays under clippy's `type_complexity` bar (the
570 /// nested `Result<(String, String), _>` trips it inline).
571 #[cfg(feature = "acme")]
572 pub type CertPairReply = Result<(String, String), ts_control::CertError>;
573
574 // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
575 // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
576 // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
577 // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
578 // block keeps the default build clean.
579 #[cfg(feature = "acme")]
580 #[kameo::messages]
581 impl ControlRunner {
582 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
583 /// client-side ACME DNS-01 engine (`acme` feature).
584 ///
585 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
586 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
587 /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
588 /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
589 ///
590 /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
591 /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
592 /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
593 /// allows it). Persisting a generated key back into the key file is the embedder's job (no
594 /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
595 /// publish 501s.
596 #[message(ctx)]
597 pub fn get_certificate(
598 &self,
599 ctx: &mut Context<
600 Self,
601 DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
602 >,
603 name: String,
604 ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
605 let (deleg, replier) = ctx.reply_sender();
606
607 if let Some(replier) = replier {
608 let config = self.params.config.clone();
609 let keys = self.params.env.keys.clone();
610 tokio::spawn(async move {
611 let result = issue_certificate(&config, &keys, &name).await;
612 replier.send(result);
613 });
614 }
615
616 deleg
617 }
618
619 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` and return the
620 /// **PEM pair** — `(cert_chain_pem, key_pem)` — for writing the on-disk `.crt` + `.key`
621 /// (the daemon's `tnet cert`, Go's `LocalClient.CertPair`). `acme` feature.
622 ///
623 /// Identical issuance to [`get_certificate`](Self::get_certificate) (same client-side ACME
624 /// DNS-01 flow, same set-dns publish, same account-key handling), only the *shape* of the
625 /// result differs: this surfaces the raw chain + leaf-key PEMs instead of the opaque
626 /// [`CertifiedKey`](ts_control::tls::CertifiedKey). The leaf **private key** PEM is the
627 /// second tuple element and is NEVER logged — the spawned task sends it straight back to the
628 /// replier. SaaS-only: against a self-hosted control plane the set-dns publish 501s.
629 #[message(ctx)]
630 pub fn get_cert_pair(
631 &self,
632 ctx: &mut Context<Self, DelegatedReply<CertPairReply>>,
633 name: String,
634 ) -> DelegatedReply<CertPairReply> {
635 let (deleg, replier) = ctx.reply_sender();
636
637 if let Some(replier) = replier {
638 let config = self.params.config.clone();
639 let keys = self.params.env.keys.clone();
640 tokio::spawn(async move {
641 let result = issue_cert_pair(&config, &keys, &name).await;
642 replier.send(result);
643 });
644 }
645
646 deleg
647 }
648 }
649}
650
651/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
652/// returning just the ready-to-serve [`CertifiedKey`](ts_control::tls::CertifiedKey) (the
653/// `get_certificate` / `ListenTLS` path).
654///
655/// Thin wrapper over [`issue_cert_pair`] that drops the PEMs — one issuance, this caller just
656/// doesn't need the on-disk pair. See [`issue_cert_pair`] for the account-key handling.
657#[cfg(feature = "acme")]
658async fn issue_certificate(
659 config: &ts_control::Config,
660 keys: &ts_keys::NodeState,
661 name: &str,
662) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
663 issue_cert_pair_inner(config, keys, name)
664 .await
665 .map(|issued| issued.certified)
666}
667
668/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01,
669/// returning the **PEM pair** `(cert_chain_pem, key_pem)` for the daemon's on-disk `.crt`/`.key`
670/// (`tnet cert`, Go `LocalClient.CertPair`).
671///
672/// Same single issuance as [`issue_certificate`]; only the result shape differs. The leaf
673/// **private key** PEM is the second element and is NEVER logged here.
674#[cfg(feature = "acme")]
675async fn issue_cert_pair(
676 config: &ts_control::Config,
677 keys: &ts_keys::NodeState,
678 name: &str,
679) -> Result<(String, String), ts_control::CertError> {
680 issue_cert_pair_inner(config, keys, name)
681 .await
682 .map(|issued| (issued.cert_chain_pem, issued.key_pem))
683}
684
685/// Shared issuance core for [`issue_certificate`] and [`issue_cert_pair`]: load (or generate) the
686/// ACME account key, target Let's Encrypt production, and run one DNS-01 issuance, returning the
687/// full [`IssuedCert`](ts_control::acme::IssuedCert) so each caller projects out what it needs (one
688/// ACME order, two consumers).
689///
690/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
691/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
692/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
693/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]). Never logs the leaf
694/// private key.
695#[cfg(feature = "acme")]
696async fn issue_cert_pair_inner(
697 config: &ts_control::Config,
698 keys: &ts_keys::NodeState,
699 name: &str,
700) -> Result<ts_control::acme::IssuedCert, ts_control::CertError> {
701 let account_key = match keys.acme_account_key.as_deref() {
702 Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
703 None => {
704 tracing::debug!(
705 "no persisted ACME account key in key state; generating an ephemeral per-call key \
706 (a new ACME account this issuance — not persisted back)"
707 );
708 ts_control::acme::AcmeAccountKey::generate()?.0
709 }
710 };
711 let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
712 .parse()
713 .map_err(|e| {
714 ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
715 })?;
716 ts_control::issue_cert_pair_via_setdns(config, keys, name, &account_key, &directory).await
717}
718
719impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
720 type Reply = ();
721
722 async fn handle(
723 &mut self,
724 msg: StreamMessage<Arc<StateUpdate>, (), ()>,
725 ctx: &mut Context<Self, Self::Reply>,
726 ) {
727 match msg {
728 StreamMessage::Started(_) => {
729 tracing::trace!("started listening to state updates");
730 }
731
732 StreamMessage::Next(msg) => {
733 if let Some(node) = msg.node.as_ref() {
734 // Reflect node-key expiry into the device state: control delivering a self-node
735 // whose key is in the past means the node must re-authenticate. Otherwise the
736 // arrival of a fresh self-node confirms we are Running (recovers the state if a
737 // prior update had flipped it to Expired).
738 let now_unix = std::time::SystemTime::now()
739 .duration_since(std::time::UNIX_EPOCH)
740 .map(|d| d.as_secs() as i64)
741 .unwrap_or(0);
742 let next = if node.key_expired_at_unix(now_unix) {
743 crate::DeviceState::Expired
744 } else {
745 crate::DeviceState::Running
746 };
747 // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
748 // self-node arrives on every netmap update).
749 self.params.state_tx.send_if_modified(|s| {
750 if *s != next {
751 *s = next.clone();
752 true
753 } else {
754 false
755 }
756 });
757
758 self.self_node.send_replace(Some(node.clone()));
759 }
760
761 if let Some(policy) = msg.ssh_policy.as_ref() {
762 self.ssh_policy.send_replace(Some(policy.clone()));
763 }
764
765 if let Some(tka) = msg.tka.as_ref() {
766 self.tka.send_replace(Some(tka.clone()));
767 self.maybe_sync_tka(tka, ctx.actor_ref().clone());
768 }
769
770 // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
771 // An update with no DNS config, or one carrying no cert domains, means "none" — Go
772 // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
773 let cert_domains = msg
774 .dns_config
775 .as_ref()
776 .map(|d| d.cert_domains.clone())
777 .unwrap_or_default();
778 self.cert_domains.send_replace(cert_domains);
779
780 // Track the full DNS config for `Device::dns_config` (the daemon's `tnet dns status`).
781 // `None` when control sent no DNS config on this update — distinct from a present but
782 // empty config (Go `netmap.NetworkMap.DNS`).
783 self.dns_config.send_replace(msg.dns_config.clone());
784
785 // Track the interactive-login URL for `Device::pop_browser_url` /
786 // `Runtime::watch_ipn_bus`. See `sticky_update_pop_browser_url` for the Go-faithful
787 // sticky semantics (update only on a new non-empty URL; never reset to `None`).
788 sticky_update_pop_browser_url(&self.pop_browser_url, msg.pop_browser_url.as_ref());
789
790 if let Err(e) = self.params.env.publish(msg).await {
791 tracing::error!(error = %e, "publishing netmap update");
792 }
793 }
794
795 StreamMessage::Finished(_) => {
796 tracing::error!("state update stream terminated")
797 }
798 }
799 }
800}
801
802/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
803/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
804/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
805/// [`ControlRunner::apply_tka_synced`](ControlRunner).
806#[doc(hidden)]
807pub struct TkaSynced {
808 pub(crate) result:
809 Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
810}
811
812impl Message<TkaSynced> for ControlRunner {
813 type Reply = ();
814
815 async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
816 self.apply_tka_synced(msg.result).await;
817 }
818}
819
820impl Message<DerpLatencyMeasurement> for ControlRunner {
821 type Reply = ();
822
823 async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
824 let measurements = msg.measurement.as_ref().clone();
825
826 // Publish the net-report snapshot for `Device::netcheck` (the daemon's `tnet netcheck`) from
827 // the same measurements, before the home-region short-circuit below — an empty set still
828 // yields a (default/empty) report rather than a stale one.
829 self.netcheck
830 .send_replace(crate::status::NetcheckReport::from_region_results(
831 &measurements,
832 ));
833
834 let Some(result) = measurements.first() else {
835 tracing::debug!("derp latency measurements empty");
836 return;
837 };
838
839 let iter = measurements.iter().map(|result| {
840 (
841 result.latency_map_key.as_str(),
842 result.latency.as_secs_f64(),
843 )
844 });
845
846 tracing::debug!(selected_region_id = ?result.id, "updating home region");
847
848 self.client.set_home_region(result.id, iter).await;
849 }
850}
851
852impl Message<EndpointAdvertisement> for ControlRunner {
853 type Reply = ();
854
855 async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
856 let endpoints: Vec<Endpoint> = msg
857 .endpoints
858 .iter()
859 .map(|ep| Endpoint {
860 endpoint: ep.addr,
861 ty: match ep.ty {
862 SelfEndpointType::Local => EndpointType::Local,
863 SelfEndpointType::Stun => EndpointType::Stun,
864 SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
865 },
866 })
867 .collect();
868
869 tracing::debug!(
870 n_endpoints = endpoints.len(),
871 "advertising endpoints to control"
872 );
873
874 self.client.set_endpoints(endpoints).await;
875 }
876}
877
878/// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control — the wire
879/// half of a runtime [`Runtime::set_advertise_routes`](crate::Runtime::set_advertise_routes). Sent
880/// as a direct `ask` from the runtime (not over the bus), so the route change reaches the live
881/// map-poll client. `routes` is the final advertised set the caller wants control to grant.
882#[derive(Debug)]
883pub struct SetAdvertiseRoutes {
884 /// The prefixes to advertise to control (already filtered to the final set).
885 pub routes: Vec<ipnet::IpNet>,
886}
887
888impl Message<SetAdvertiseRoutes> for ControlRunner {
889 type Reply = ();
890
891 async fn handle(&mut self, msg: SetAdvertiseRoutes, _ctx: &mut Context<Self, Self::Reply>) {
892 tracing::debug!(n_routes = msg.routes.len(), "advertising routes to control");
893 self.client.set_routable_ips(msg.routes).await;
894 }
895}
896
897/// Update this node's `Hostinfo.Hostname` at control — the wire half of a runtime
898/// [`Runtime::set_hostname`](crate::Runtime::set_hostname). A direct `ask` from the runtime, so the
899/// change reaches the live map-poll client.
900#[derive(Debug)]
901pub struct SetHostname {
902 /// The new hostname to report to control.
903 pub hostname: String,
904}
905
906impl Message<SetHostname> for ControlRunner {
907 type Reply = ();
908
909 async fn handle(&mut self, msg: SetHostname, _ctx: &mut Context<Self, Self::Reply>) {
910 tracing::debug!("updating hostname at control");
911 self.client.set_hostname(msg.hostname).await;
912 }
913}
914
915#[cfg(test)]
916mod sticky_pop_browser_url_tests {
917 use tokio::sync::watch;
918
919 use super::sticky_update_pop_browser_url;
920
921 fn url(s: &str) -> url::Url {
922 s.parse().unwrap()
923 }
924
925 /// A non-empty URL publishes to the cell.
926 #[test]
927 fn non_empty_url_publishes() {
928 let (tx, rx) = watch::channel(None);
929 let u = url("https://login.example/consent");
930 sticky_update_pop_browser_url(&tx, Some(&u));
931 assert_eq!(*rx.borrow(), Some(u));
932 }
933
934 /// An absent (`None`) update — the common netmap tick — must NOT reset the cell. This is the
935 /// regression guard for the thrash bug (a reset-every-tick would coalesce the URL away on the bus).
936 #[test]
937 fn absent_update_does_not_reset() {
938 let u = url("https://login.example/consent");
939 let (tx, rx) = watch::channel(Some(u.clone()));
940 // Simulate many empty netmap updates.
941 for _ in 0..5 {
942 sticky_update_pop_browser_url(&tx, None);
943 }
944 assert_eq!(
945 *rx.borrow(),
946 Some(u),
947 "empty updates must not clear the URL"
948 );
949 }
950
951 /// The same URL repeated does not re-fire the watch (in-place dedupe via `send_if_modified`), so
952 /// a subscriber isn't woken spuriously. Proven by the borrow not having been marked changed.
953 #[test]
954 fn repeated_same_url_does_not_refire() {
955 let u = url("https://login.example/consent");
956 let (tx, mut rx) = watch::channel(None);
957 sticky_update_pop_browser_url(&tx, Some(&u)); // first: fires
958 assert!(rx.has_changed().unwrap(), "first non-empty URL fires");
959 rx.mark_unchanged();
960 sticky_update_pop_browser_url(&tx, Some(&u)); // same: deduped
961 assert!(
962 !rx.has_changed().unwrap(),
963 "repeating the same URL must not re-fire the watch"
964 );
965 }
966
967 /// A genuinely new URL after a prior one fires again (sticky but tracks changes).
968 #[test]
969 fn new_url_after_prior_fires() {
970 let a = url("https://login.example/a");
971 let b = url("https://login.example/b");
972 let (tx, rx) = watch::channel(None);
973 sticky_update_pop_browser_url(&tx, Some(&a));
974 sticky_update_pop_browser_url(&tx, Some(&b));
975 assert_eq!(*rx.borrow(), Some(b));
976 }
977
978 /// The realistic session sequence: a URL stays sticky through a run of `None` ticks, and a
979 /// *different* URL after that gap still fires. Chains the legs the other tests cover in isolation
980 /// (the actual control cadence is "URL, then many empty updates, then maybe a new URL").
981 #[test]
982 fn sticky_through_none_gap_then_new_url_fires() {
983 let a = url("https://login.example/a");
984 let b = url("https://login.example/b");
985 let (tx, rx) = watch::channel(None);
986 sticky_update_pop_browser_url(&tx, Some(&a));
987 for _ in 0..3 {
988 sticky_update_pop_browser_url(&tx, None);
989 }
990 assert_eq!(*rx.borrow(), Some(a), "stayed sticky through the None gap");
991 sticky_update_pop_browser_url(&tx, Some(&b));
992 assert_eq!(
993 *rx.borrow(),
994 Some(b),
995 "a new URL after a None gap still fires"
996 );
997 }
998
999 /// Returning to a previously-seen URL (A → B → A) re-fires: the dedupe is against the cell's
1000 /// *current* value, not a full history, so A after B is a genuine change.
1001 #[test]
1002 fn returning_to_prior_url_refires() {
1003 let a = url("https://login.example/a");
1004 let b = url("https://login.example/b");
1005 let (tx, mut rx) = watch::channel(None);
1006 sticky_update_pop_browser_url(&tx, Some(&a));
1007 sticky_update_pop_browser_url(&tx, Some(&b));
1008 rx.mark_unchanged();
1009 sticky_update_pop_browser_url(&tx, Some(&a)); // back to A: differs from current (B) → fires
1010 assert!(
1011 rx.has_changed().unwrap(),
1012 "returning to a prior URL re-fires"
1013 );
1014 assert_eq!(*rx.borrow(), Some(a));
1015 }
1016
1017 /// End-to-end de-thrash: feed a realistic netmap cadence (empty, empty, URL, empty, empty)
1018 /// through the producer into a cell, and count the changes a `run_bus`-style subscriber would
1019 /// observe via `changed()`. The whole point of the fix is that exactly ONE change survives the
1020 /// surrounding `None` thrash — the pre-fix code (`send_replace` every tick) would have woken the
1021 /// subscriber on every empty tick and coalesced the URL away. This exercises the producer + the
1022 /// watch-subscribe path together (the two halves the unit tests cover in isolation).
1023 #[tokio::test]
1024 async fn end_to_end_one_change_survives_none_thrash() {
1025 let u = url("https://login.example/consent");
1026 let (tx, mut rx) = watch::channel(None);
1027 // The cadence control actually sends: mostly-empty MapResponses with one carrying the URL.
1028 let cadence = [None, None, Some(&u), None, None];
1029 for incoming in cadence {
1030 sticky_update_pop_browser_url(&tx, incoming);
1031 }
1032 // A subscriber sees exactly one change, and it carries the URL (not a coalesced `None`).
1033 let mut changes = 0;
1034 while rx.has_changed().unwrap() {
1035 let v = rx.borrow_and_update().clone();
1036 changes += 1;
1037 assert_eq!(v, Some(u.clone()), "the surviving change carries the URL");
1038 }
1039 assert_eq!(changes, 1, "exactly one change survives the None thrash");
1040 }
1041}