ts_runtime/control_runner.rs
1use core::{
2 net::{Ipv4Addr, Ipv6Addr},
3 time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9 actor::{ActorRef, Spawn},
10 message::{Context, StreamMessage},
11 prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15 AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16 Node, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21 derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22 direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29 client: AsyncControlClient,
30 params: Params,
31
32 self_node: watch::Sender<Option<Node>>,
33 /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34 /// server reads this to authorize incoming connections; absent policy means deny-all.
35 ssh_policy: watch::Sender<Option<SshPolicy>>,
36 /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37 tka: watch::Sender<Option<TkaStatus>>,
38}
39
40/// Control runner args.
41pub struct Params {
42 /// Control config.
43 pub(crate) config: ts_control::Config,
44
45 /// Auth key (if needed).
46 pub(crate) auth_key: Option<String>,
47
48 /// The [`crate::Env`] for this actor.
49 pub(crate) env: crate::Env,
50
51 /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
52 /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
53 /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
54 /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
55 pub(crate) state_tx: watch::Sender<crate::DeviceState>,
56}
57
58#[doc(hidden)]
59#[derive(Debug, thiserror::Error)]
60pub enum ControlRunnerError {
61 #[error(transparent)]
62 Control(#[from] ControlError),
63
64 #[error(transparent)]
65 Crate(#[from] crate::Error),
66}
67
68impl kameo::Actor for ControlRunner {
69 type Args = Params;
70 type Error = ControlRunnerError;
71
72 async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
73 loop {
74 match AsyncControlClient::check_auth(
75 ¶ms.config,
76 ¶ms.env.keys,
77 params.auth_key.as_deref(),
78 )
79 .await
80 {
81 Ok(()) => break,
82 Err(ControlError::MachineNotAuthorized(u)) => {
83 tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
84 // Surface "interactive login required" so a watcher / `wait_until_running` can
85 // tell the user to authorize, instead of seeing an opaque timeout. Registration
86 // keeps retrying (transient), so this is not a terminal `Failed`.
87 params
88 .state_tx
89 .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
90 tokio::time::sleep(Duration::from_secs(5)).await;
91 }
92 Err(e) => {
93 // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
94 // specific reason control gave AND publish it as a typed `Failed` state so
95 // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
96 // of the opaque `Internal(Actor)` the caller would otherwise see once the
97 // stopped actor is next asked. Publishing before `return Err` is why the state
98 // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
99 let reason = crate::RegistrationError::from(&e);
100 tracing::error!(error = %e, "registration failed; control runner stopping");
101 params
102 .state_tx
103 .send_replace(crate::DeviceState::Failed(reason));
104 return Err(e.into());
105 }
106 }
107 }
108 // check_auth succeeded, but the node is not "up" until the netmap stream is actually
109 // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
110 // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
111 // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
112 // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
113 let bring_up = async {
114 let (client, stream) = AsyncControlClient::connect(
115 ¶ms.config,
116 ¶ms.env.keys,
117 params.auth_key.as_deref(),
118 )
119 .await?;
120
121 DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
122
123 params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
124 params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
125 slf.attach_stream(stream.boxed(), (), ());
126 Ok::<_, ControlRunnerError>(client)
127 };
128
129 let client = match bring_up.await {
130 Ok(client) => client,
131 Err(e) => {
132 tracing::error!(error = %e, "bringing up the control session failed");
133 // The control session never came up; surface it as a transient registration
134 // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
135 // stuck at `Connecting`.
136 params.state_tx.send_replace(crate::DeviceState::Failed(
137 crate::RegistrationError::NetworkUnreachable,
138 ));
139 return Err(e);
140 }
141 };
142
143 // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
144 // current (and flips to `Expired` if the self-node's key lapses).
145 params.state_tx.send_replace(crate::DeviceState::Running);
146
147 Ok(Self {
148 client,
149 params,
150 self_node: Default::default(),
151 ssh_policy: Default::default(),
152 tka: Default::default(),
153 })
154 }
155}
156
157impl ControlRunner {
158 fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
159 where
160 F: FnOnce(&Node) -> R,
161 {
162 let mut sub = self.self_node.subscribe();
163 let mut shutdown = self.params.env.shutdown.clone();
164
165 async move {
166 tokio::select! {
167 _ = shutdown.wait_for(|x| *x) => {
168 None
169 },
170 node = sub.wait_for(Option::is_some) => {
171 Some(f(node.ok()?.as_ref()?))
172 },
173 }
174 }
175 }
176}
177
178// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
179// those generated fields carry no doc and can't take attributes, so wrap in a module where
180// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
181// are re-exported so callers keep referencing them at `control_runner::<Name>`.
182pub use msg_impl::*;
183
184#[allow(missing_docs)]
185mod msg_impl {
186 use kameo::{message::Context, reply::DelegatedReply};
187
188 use super::*;
189
190 #[kameo::messages]
191 impl ControlRunner {
192 /// Fetch the IPv4 address for this tailscale device.
193 #[message(ctx)]
194 pub fn ipv4(
195 &self,
196 ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
197 ) -> DelegatedReply<Option<Ipv4Addr>> {
198 let (deleg, replier) = ctx.reply_sender();
199
200 if let Some(replier) = replier {
201 let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
202
203 tokio::spawn(async move {
204 let ip = fut.await;
205 replier.send(ip);
206 });
207 }
208
209 deleg
210 }
211
212 /// Fetch the IPv6 address for this tailscale device.
213 #[message(ctx)]
214 pub fn ipv6(
215 &self,
216 ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
217 ) -> DelegatedReply<Option<Ipv6Addr>> {
218 let (deleg, replier) = ctx.reply_sender();
219
220 if let Some(replier) = replier {
221 let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
222
223 tokio::spawn(async move {
224 let ip = fut.await;
225 replier.send(ip);
226 });
227 }
228
229 deleg
230 }
231
232 /// Fetch the self node for this tailscale device.
233 #[message(ctx)]
234 pub fn self_node(
235 &self,
236 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
237 ) -> DelegatedReply<Option<Node>> {
238 let (deleg, replier) = ctx.reply_sender();
239
240 if let Some(replier) = replier {
241 let node = self.with_self_node(|node| node.clone());
242
243 tokio::spawn(async move {
244 let node = node.await;
245 replier.send(node)
246 });
247 }
248
249 deleg
250 }
251
252 /// Fetch the current Tailscale SSH policy, if control has pushed one.
253 ///
254 /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
255 /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
256 /// for a value: an absent policy is a legitimate, immediate answer.
257 #[message]
258 pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
259 self.ssh_policy.borrow().clone()
260 }
261
262 /// Fetch the current Tailnet Lock status, if control has pushed one.
263 ///
264 /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
265 #[message]
266 pub fn current_tka_status(&self) -> Option<TkaStatus> {
267 self.tka.borrow().clone()
268 }
269
270 /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
271 ///
272 /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
273 /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
274 /// for the round-trip.
275 #[message(ctx)]
276 pub fn fetch_id_token(
277 &self,
278 ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
279 audience: String,
280 ) -> DelegatedReply<Result<String, IdTokenError>> {
281 let (deleg, replier) = ctx.reply_sender();
282
283 if let Some(replier) = replier {
284 let config = self.params.config.clone();
285 let keys = self.params.env.keys.clone();
286 tokio::spawn(async move {
287 let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
288 replier.send(result);
289 });
290 }
291
292 deleg
293 }
294
295 /// Log this node out of the tailnet: deregister it by expiring its current node key.
296 ///
297 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
298 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
299 /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
300 /// control-plane state change only — it does NOT stop this actor or tear down the datapath
301 /// (the caller follows up with the normal runtime shutdown), and it does not touch the
302 /// on-disk node key, so re-registering with the same key is the re-login path.
303 #[message(ctx)]
304 pub fn logout(
305 &self,
306 ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
307 ) -> DelegatedReply<Result<(), LogoutError>> {
308 let (deleg, replier) = ctx.reply_sender();
309
310 if let Some(replier) = replier {
311 let config = self.params.config.clone();
312 let keys = self.params.env.keys.clone();
313 tokio::spawn(async move {
314 let result = ts_control::logout(&config, &keys).await;
315 replier.send(result);
316 });
317 }
318
319 deleg
320 }
321 }
322
323 // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
324 // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
325 // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
326 // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
327 // block keeps the default build clean.
328 #[cfg(feature = "acme")]
329 #[kameo::messages]
330 impl ControlRunner {
331 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
332 /// client-side ACME DNS-01 engine (`acme` feature).
333 ///
334 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
335 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
336 /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
337 /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
338 ///
339 /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
340 /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
341 /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
342 /// allows it). Persisting a generated key back into the key file is the embedder's job (no
343 /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
344 /// publish 501s.
345 #[message(ctx)]
346 pub fn get_certificate(
347 &self,
348 ctx: &mut Context<
349 Self,
350 DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
351 >,
352 name: String,
353 ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
354 let (deleg, replier) = ctx.reply_sender();
355
356 if let Some(replier) = replier {
357 let config = self.params.config.clone();
358 let keys = self.params.env.keys.clone();
359 tokio::spawn(async move {
360 let result = issue_certificate(&config, &keys, &name).await;
361 replier.send(result);
362 });
363 }
364
365 deleg
366 }
367 }
368}
369
370/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01.
371///
372/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
373/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
374/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
375/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]).
376#[cfg(feature = "acme")]
377async fn issue_certificate(
378 config: &ts_control::Config,
379 keys: &ts_keys::NodeState,
380 name: &str,
381) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
382 let account_key = match keys.acme_account_key.as_deref() {
383 Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
384 None => {
385 tracing::debug!(
386 "no persisted ACME account key in key state; generating an ephemeral per-call key \
387 (a new ACME account this issuance — not persisted back)"
388 );
389 ts_control::acme::AcmeAccountKey::generate()?.0
390 }
391 };
392 let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
393 .parse()
394 .map_err(|e| {
395 ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
396 })?;
397 ts_control::issue_certificate_via_setdns(config, keys, name, &account_key, &directory).await
398}
399
400impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
401 type Reply = ();
402
403 async fn handle(
404 &mut self,
405 msg: StreamMessage<Arc<StateUpdate>, (), ()>,
406 _ctx: &mut Context<Self, Self::Reply>,
407 ) {
408 match msg {
409 StreamMessage::Started(_) => {
410 tracing::trace!("started listening to state updates");
411 }
412
413 StreamMessage::Next(msg) => {
414 if let Some(node) = msg.node.as_ref() {
415 // Reflect node-key expiry into the device state: control delivering a self-node
416 // whose key is in the past means the node must re-authenticate. Otherwise the
417 // arrival of a fresh self-node confirms we are Running (recovers the state if a
418 // prior update had flipped it to Expired).
419 let now_unix = std::time::SystemTime::now()
420 .duration_since(std::time::UNIX_EPOCH)
421 .map(|d| d.as_secs() as i64)
422 .unwrap_or(0);
423 let next = if node.key_expired_at_unix(now_unix) {
424 crate::DeviceState::Expired
425 } else {
426 crate::DeviceState::Running
427 };
428 // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
429 // self-node arrives on every netmap update).
430 self.params.state_tx.send_if_modified(|s| {
431 if *s != next {
432 *s = next.clone();
433 true
434 } else {
435 false
436 }
437 });
438
439 self.self_node.send_replace(Some(node.clone()));
440 }
441
442 if let Some(policy) = msg.ssh_policy.as_ref() {
443 self.ssh_policy.send_replace(Some(policy.clone()));
444 }
445
446 if let Some(tka) = msg.tka.as_ref() {
447 self.tka.send_replace(Some(tka.clone()));
448 }
449
450 if let Err(e) = self.params.env.publish(msg).await {
451 tracing::error!(error = %e, "publishing netmap update");
452 }
453 }
454
455 StreamMessage::Finished(_) => {
456 tracing::error!("state update stream terminated")
457 }
458 }
459 }
460}
461
462impl Message<DerpLatencyMeasurement> for ControlRunner {
463 type Reply = ();
464
465 async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
466 let measurements = msg.measurement.as_ref().clone();
467
468 let Some(result) = measurements.first() else {
469 tracing::debug!("derp latency measurements empty");
470 return;
471 };
472
473 let iter = measurements.iter().map(|result| {
474 (
475 result.latency_map_key.as_str(),
476 result.latency.as_secs_f64(),
477 )
478 });
479
480 tracing::debug!(selected_region_id = ?result.id, "updating home region");
481
482 self.client.set_home_region(result.id, iter).await;
483 }
484}
485
486impl Message<EndpointAdvertisement> for ControlRunner {
487 type Reply = ();
488
489 async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
490 let endpoints: Vec<Endpoint> = msg
491 .endpoints
492 .iter()
493 .map(|ep| Endpoint {
494 endpoint: ep.addr,
495 ty: match ep.ty {
496 SelfEndpointType::Local => EndpointType::Local,
497 SelfEndpointType::Stun => EndpointType::Stun,
498 SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
499 },
500 })
501 .collect();
502
503 tracing::debug!(
504 n_endpoints = endpoints.len(),
505 "advertising endpoints to control"
506 );
507
508 self.client.set_endpoints(endpoints).await;
509 }
510}