ts_runtime/control_runner.rs
1use core::{
2 net::{Ipv4Addr, Ipv6Addr},
3 time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9 actor::{ActorRef, Spawn},
10 message::{Context, StreamMessage},
11 prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15 AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16 Node, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21 derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22 direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29 client: AsyncControlClient,
30 params: Params,
31
32 self_node: watch::Sender<Option<Node>>,
33 /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34 /// server reads this to authorize incoming connections; absent policy means deny-all.
35 ssh_policy: watch::Sender<Option<SshPolicy>>,
36 /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37 tka: watch::Sender<Option<TkaStatus>>,
38 /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
39 /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
40 cert_domains: watch::Sender<Vec<String>>,
41}
42
43/// Control runner args.
44pub struct Params {
45 /// Control config.
46 pub(crate) config: ts_control::Config,
47
48 /// Auth key (if needed).
49 pub(crate) auth_key: Option<String>,
50
51 /// The [`crate::Env`] for this actor.
52 pub(crate) env: crate::Env,
53
54 /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
55 /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
56 /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
57 /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
58 pub(crate) state_tx: watch::Sender<crate::DeviceState>,
59}
60
61#[doc(hidden)]
62#[derive(Debug, thiserror::Error)]
63pub enum ControlRunnerError {
64 #[error(transparent)]
65 Control(#[from] ControlError),
66
67 #[error(transparent)]
68 Crate(#[from] crate::Error),
69}
70
71impl kameo::Actor for ControlRunner {
72 type Args = Params;
73 type Error = ControlRunnerError;
74
75 async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
76 loop {
77 match AsyncControlClient::check_auth(
78 ¶ms.config,
79 ¶ms.env.keys,
80 params.auth_key.as_deref(),
81 )
82 .await
83 {
84 Ok(()) => break,
85 Err(ControlError::MachineNotAuthorized(u)) => {
86 tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
87 // Surface "interactive login required" so a watcher / `wait_until_running` can
88 // tell the user to authorize, instead of seeing an opaque timeout. Registration
89 // keeps retrying (transient), so this is not a terminal `Failed`.
90 params
91 .state_tx
92 .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
93 tokio::time::sleep(Duration::from_secs(5)).await;
94 }
95 Err(e) => {
96 // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
97 // specific reason control gave AND publish it as a typed `Failed` state so
98 // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
99 // of the opaque `Internal(Actor)` the caller would otherwise see once the
100 // stopped actor is next asked. Publishing before `return Err` is why the state
101 // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
102 let reason = crate::RegistrationError::from(&e);
103 tracing::error!(error = %e, "registration failed; control runner stopping");
104 params
105 .state_tx
106 .send_replace(crate::DeviceState::Failed(reason));
107 return Err(e.into());
108 }
109 }
110 }
111 // check_auth succeeded, but the node is not "up" until the netmap stream is actually
112 // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
113 // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
114 // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
115 // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
116 let bring_up = async {
117 let (client, stream) = AsyncControlClient::connect(
118 ¶ms.config,
119 ¶ms.env.keys,
120 params.auth_key.as_deref(),
121 )
122 .await?;
123
124 DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
125
126 params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
127 params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
128 slf.attach_stream(stream.boxed(), (), ());
129 Ok::<_, ControlRunnerError>(client)
130 };
131
132 let client = match bring_up.await {
133 Ok(client) => client,
134 Err(e) => {
135 tracing::error!(error = %e, "bringing up the control session failed");
136 // The control session never came up; surface it as a transient registration
137 // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
138 // stuck at `Connecting`.
139 params.state_tx.send_replace(crate::DeviceState::Failed(
140 crate::RegistrationError::NetworkUnreachable,
141 ));
142 return Err(e);
143 }
144 };
145
146 // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
147 // current (and flips to `Expired` if the self-node's key lapses).
148 params.state_tx.send_replace(crate::DeviceState::Running);
149
150 Ok(Self {
151 client,
152 params,
153 self_node: Default::default(),
154 ssh_policy: Default::default(),
155 tka: Default::default(),
156 cert_domains: Default::default(),
157 })
158 }
159}
160
161impl ControlRunner {
162 fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
163 where
164 F: FnOnce(&Node) -> R,
165 {
166 let mut sub = self.self_node.subscribe();
167 let mut shutdown = self.params.env.shutdown.clone();
168
169 async move {
170 tokio::select! {
171 _ = shutdown.wait_for(|x| *x) => {
172 None
173 },
174 node = sub.wait_for(Option::is_some) => {
175 Some(f(node.ok()?.as_ref()?))
176 },
177 }
178 }
179 }
180}
181
182// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
183// those generated fields carry no doc and can't take attributes, so wrap in a module where
184// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
185// are re-exported so callers keep referencing them at `control_runner::<Name>`.
186pub use msg_impl::*;
187
188#[allow(missing_docs)]
189mod msg_impl {
190 use kameo::{message::Context, reply::DelegatedReply};
191
192 use super::*;
193
194 #[kameo::messages]
195 impl ControlRunner {
196 /// Fetch the IPv4 address for this tailscale device.
197 #[message(ctx)]
198 pub fn ipv4(
199 &self,
200 ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
201 ) -> DelegatedReply<Option<Ipv4Addr>> {
202 let (deleg, replier) = ctx.reply_sender();
203
204 if let Some(replier) = replier {
205 let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
206
207 tokio::spawn(async move {
208 let ip = fut.await;
209 replier.send(ip);
210 });
211 }
212
213 deleg
214 }
215
216 /// Fetch the IPv6 address for this tailscale device.
217 #[message(ctx)]
218 pub fn ipv6(
219 &self,
220 ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
221 ) -> DelegatedReply<Option<Ipv6Addr>> {
222 let (deleg, replier) = ctx.reply_sender();
223
224 if let Some(replier) = replier {
225 let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
226
227 tokio::spawn(async move {
228 let ip = fut.await;
229 replier.send(ip);
230 });
231 }
232
233 deleg
234 }
235
236 /// Fetch the self node for this tailscale device.
237 #[message(ctx)]
238 pub fn self_node(
239 &self,
240 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
241 ) -> DelegatedReply<Option<Node>> {
242 let (deleg, replier) = ctx.reply_sender();
243
244 if let Some(replier) = replier {
245 let node = self.with_self_node(|node| node.clone());
246
247 tokio::spawn(async move {
248 let node = node.await;
249 replier.send(node)
250 });
251 }
252
253 deleg
254 }
255
256 /// Fetch the current Tailscale SSH policy, if control has pushed one.
257 ///
258 /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
259 /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
260 /// for a value: an absent policy is a legitimate, immediate answer.
261 #[message]
262 pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
263 self.ssh_policy.borrow().clone()
264 }
265
266 /// Fetch the current Tailnet Lock status, if control has pushed one.
267 ///
268 /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
269 #[message]
270 pub fn current_tka_status(&self) -> Option<TkaStatus> {
271 self.tka.borrow().clone()
272 }
273
274 /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
275 ///
276 /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
277 /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
278 /// does not block waiting for a value).
279 #[message]
280 pub fn cert_domains(&self) -> Vec<String> {
281 self.cert_domains.borrow().clone()
282 }
283
284 /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
285 ///
286 /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
287 /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
288 /// for the round-trip.
289 #[message(ctx)]
290 pub fn fetch_id_token(
291 &self,
292 ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
293 audience: String,
294 ) -> DelegatedReply<Result<String, IdTokenError>> {
295 let (deleg, replier) = ctx.reply_sender();
296
297 if let Some(replier) = replier {
298 let config = self.params.config.clone();
299 let keys = self.params.env.keys.clone();
300 tokio::spawn(async move {
301 let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
302 replier.send(result);
303 });
304 }
305
306 deleg
307 }
308
309 /// Log this node out of the tailnet: deregister it by expiring its current node key.
310 ///
311 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
312 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
313 /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
314 /// control-plane state change only — it does NOT stop this actor or tear down the datapath
315 /// (the caller follows up with the normal runtime shutdown), and it does not touch the
316 /// on-disk node key, so re-registering with the same key is the re-login path.
317 #[message(ctx)]
318 pub fn logout(
319 &self,
320 ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
321 ) -> DelegatedReply<Result<(), LogoutError>> {
322 let (deleg, replier) = ctx.reply_sender();
323
324 if let Some(replier) = replier {
325 let config = self.params.config.clone();
326 let keys = self.params.env.keys.clone();
327 tokio::spawn(async move {
328 let result = ts_control::logout(&config, &keys).await;
329 replier.send(result);
330 });
331 }
332
333 deleg
334 }
335 }
336
337 // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
338 // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
339 // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
340 // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
341 // block keeps the default build clean.
342 #[cfg(feature = "acme")]
343 #[kameo::messages]
344 impl ControlRunner {
345 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
346 /// client-side ACME DNS-01 engine (`acme` feature).
347 ///
348 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
349 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
350 /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
351 /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
352 ///
353 /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
354 /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
355 /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
356 /// allows it). Persisting a generated key back into the key file is the embedder's job (no
357 /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
358 /// publish 501s.
359 #[message(ctx)]
360 pub fn get_certificate(
361 &self,
362 ctx: &mut Context<
363 Self,
364 DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
365 >,
366 name: String,
367 ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
368 let (deleg, replier) = ctx.reply_sender();
369
370 if let Some(replier) = replier {
371 let config = self.params.config.clone();
372 let keys = self.params.env.keys.clone();
373 tokio::spawn(async move {
374 let result = issue_certificate(&config, &keys, &name).await;
375 replier.send(result);
376 });
377 }
378
379 deleg
380 }
381 }
382}
383
384/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01.
385///
386/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
387/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
388/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
389/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]).
390#[cfg(feature = "acme")]
391async fn issue_certificate(
392 config: &ts_control::Config,
393 keys: &ts_keys::NodeState,
394 name: &str,
395) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
396 let account_key = match keys.acme_account_key.as_deref() {
397 Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
398 None => {
399 tracing::debug!(
400 "no persisted ACME account key in key state; generating an ephemeral per-call key \
401 (a new ACME account this issuance — not persisted back)"
402 );
403 ts_control::acme::AcmeAccountKey::generate()?.0
404 }
405 };
406 let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
407 .parse()
408 .map_err(|e| {
409 ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
410 })?;
411 ts_control::issue_certificate_via_setdns(config, keys, name, &account_key, &directory).await
412}
413
414impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
415 type Reply = ();
416
417 async fn handle(
418 &mut self,
419 msg: StreamMessage<Arc<StateUpdate>, (), ()>,
420 _ctx: &mut Context<Self, Self::Reply>,
421 ) {
422 match msg {
423 StreamMessage::Started(_) => {
424 tracing::trace!("started listening to state updates");
425 }
426
427 StreamMessage::Next(msg) => {
428 if let Some(node) = msg.node.as_ref() {
429 // Reflect node-key expiry into the device state: control delivering a self-node
430 // whose key is in the past means the node must re-authenticate. Otherwise the
431 // arrival of a fresh self-node confirms we are Running (recovers the state if a
432 // prior update had flipped it to Expired).
433 let now_unix = std::time::SystemTime::now()
434 .duration_since(std::time::UNIX_EPOCH)
435 .map(|d| d.as_secs() as i64)
436 .unwrap_or(0);
437 let next = if node.key_expired_at_unix(now_unix) {
438 crate::DeviceState::Expired
439 } else {
440 crate::DeviceState::Running
441 };
442 // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
443 // self-node arrives on every netmap update).
444 self.params.state_tx.send_if_modified(|s| {
445 if *s != next {
446 *s = next.clone();
447 true
448 } else {
449 false
450 }
451 });
452
453 self.self_node.send_replace(Some(node.clone()));
454 }
455
456 if let Some(policy) = msg.ssh_policy.as_ref() {
457 self.ssh_policy.send_replace(Some(policy.clone()));
458 }
459
460 if let Some(tka) = msg.tka.as_ref() {
461 self.tka.send_replace(Some(tka.clone()));
462 }
463
464 // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
465 // An update with no DNS config, or one carrying no cert domains, means "none" — Go
466 // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
467 let cert_domains = msg
468 .dns_config
469 .as_ref()
470 .map(|d| d.cert_domains.clone())
471 .unwrap_or_default();
472 self.cert_domains.send_replace(cert_domains);
473
474 if let Err(e) = self.params.env.publish(msg).await {
475 tracing::error!(error = %e, "publishing netmap update");
476 }
477 }
478
479 StreamMessage::Finished(_) => {
480 tracing::error!("state update stream terminated")
481 }
482 }
483 }
484}
485
486impl Message<DerpLatencyMeasurement> for ControlRunner {
487 type Reply = ();
488
489 async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
490 let measurements = msg.measurement.as_ref().clone();
491
492 let Some(result) = measurements.first() else {
493 tracing::debug!("derp latency measurements empty");
494 return;
495 };
496
497 let iter = measurements.iter().map(|result| {
498 (
499 result.latency_map_key.as_str(),
500 result.latency.as_secs_f64(),
501 )
502 });
503
504 tracing::debug!(selected_region_id = ?result.id, "updating home region");
505
506 self.client.set_home_region(result.id, iter).await;
507 }
508}
509
510impl Message<EndpointAdvertisement> for ControlRunner {
511 type Reply = ();
512
513 async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
514 let endpoints: Vec<Endpoint> = msg
515 .endpoints
516 .iter()
517 .map(|ep| Endpoint {
518 endpoint: ep.addr,
519 ty: match ep.ty {
520 SelfEndpointType::Local => EndpointType::Local,
521 SelfEndpointType::Stun => EndpointType::Stun,
522 SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
523 },
524 })
525 .collect();
526
527 tracing::debug!(
528 n_endpoints = endpoints.len(),
529 "advertising endpoints to control"
530 );
531
532 self.client.set_endpoints(endpoints).await;
533 }
534}