Skip to main content

ts_runtime/
control_runner.rs

1use core::{
2    net::{Ipv4Addr, Ipv6Addr},
3    time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9    actor::{ActorRef, Spawn},
10    message::{Context, StreamMessage},
11    prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15    AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16    Node, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21    derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22    direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29    client: AsyncControlClient,
30    params: Params,
31
32    self_node: watch::Sender<Option<Node>>,
33    /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34    /// server reads this to authorize incoming connections; absent policy means deny-all.
35    ssh_policy: watch::Sender<Option<SshPolicy>>,
36    /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37    tka: watch::Sender<Option<TkaStatus>>,
38    /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
39    /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
40    cert_domains: watch::Sender<Vec<String>>,
41}
42
43/// Control runner args.
44pub struct Params {
45    /// Control config.
46    pub(crate) config: ts_control::Config,
47
48    /// Auth key (if needed).
49    pub(crate) auth_key: Option<String>,
50
51    /// The [`crate::Env`] for this actor.
52    pub(crate) env: crate::Env,
53
54    /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
55    /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
56    /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
57    /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
58    pub(crate) state_tx: watch::Sender<crate::DeviceState>,
59}
60
61#[doc(hidden)]
62#[derive(Debug, thiserror::Error)]
63pub enum ControlRunnerError {
64    #[error(transparent)]
65    Control(#[from] ControlError),
66
67    #[error(transparent)]
68    Crate(#[from] crate::Error),
69}
70
71impl kameo::Actor for ControlRunner {
72    type Args = Params;
73    type Error = ControlRunnerError;
74
75    async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
76        loop {
77            match AsyncControlClient::check_auth(
78                &params.config,
79                &params.env.keys,
80                params.auth_key.as_deref(),
81            )
82            .await
83            {
84                Ok(()) => break,
85                Err(ControlError::MachineNotAuthorized(u)) => {
86                    tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
87                    // Surface "interactive login required" so a watcher / `wait_until_running` can
88                    // tell the user to authorize, instead of seeing an opaque timeout. Registration
89                    // keeps retrying (transient), so this is not a terminal `Failed`.
90                    params
91                        .state_tx
92                        .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
93                    tokio::time::sleep(Duration::from_secs(5)).await;
94                }
95                Err(e) => {
96                    // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
97                    // specific reason control gave AND publish it as a typed `Failed` state so
98                    // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
99                    // of the opaque `Internal(Actor)` the caller would otherwise see once the
100                    // stopped actor is next asked. Publishing before `return Err` is why the state
101                    // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
102                    let reason = crate::RegistrationError::from(&e);
103                    tracing::error!(error = %e, "registration failed; control runner stopping");
104                    params
105                        .state_tx
106                        .send_replace(crate::DeviceState::Failed(reason));
107                    return Err(e.into());
108                }
109            }
110        }
111        // check_auth succeeded, but the node is not "up" until the netmap stream is actually
112        // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
113        // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
114        // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
115        // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
116        let bring_up = async {
117            let (client, stream) = AsyncControlClient::connect(
118                &params.config,
119                &params.env.keys,
120                params.auth_key.as_deref(),
121            )
122            .await?;
123
124            DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
125
126            params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
127            params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
128            slf.attach_stream(stream.boxed(), (), ());
129            Ok::<_, ControlRunnerError>(client)
130        };
131
132        let client = match bring_up.await {
133            Ok(client) => client,
134            Err(e) => {
135                tracing::error!(error = %e, "bringing up the control session failed");
136                // The control session never came up; surface it as a transient registration
137                // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
138                // stuck at `Connecting`.
139                params.state_tx.send_replace(crate::DeviceState::Failed(
140                    crate::RegistrationError::NetworkUnreachable,
141                ));
142                return Err(e);
143            }
144        };
145
146        // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
147        // current (and flips to `Expired` if the self-node's key lapses).
148        params.state_tx.send_replace(crate::DeviceState::Running);
149
150        Ok(Self {
151            client,
152            params,
153            self_node: Default::default(),
154            ssh_policy: Default::default(),
155            tka: Default::default(),
156            cert_domains: Default::default(),
157        })
158    }
159}
160
161impl ControlRunner {
162    fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
163    where
164        F: FnOnce(&Node) -> R,
165    {
166        let mut sub = self.self_node.subscribe();
167        let mut shutdown = self.params.env.shutdown.clone();
168
169        async move {
170            tokio::select! {
171                _ = shutdown.wait_for(|x| *x) => {
172                    None
173                },
174                node = sub.wait_for(Option::is_some) => {
175                    Some(f(node.ok()?.as_ref()?))
176                },
177            }
178        }
179    }
180}
181
182// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
183// those generated fields carry no doc and can't take attributes, so wrap in a module where
184// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
185// are re-exported so callers keep referencing them at `control_runner::<Name>`.
186pub use msg_impl::*;
187
188#[allow(missing_docs)]
189mod msg_impl {
190    use kameo::{message::Context, reply::DelegatedReply};
191
192    use super::*;
193
194    #[kameo::messages]
195    impl ControlRunner {
196        /// Fetch the IPv4 address for this tailscale device.
197        #[message(ctx)]
198        pub fn ipv4(
199            &self,
200            ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
201        ) -> DelegatedReply<Option<Ipv4Addr>> {
202            let (deleg, replier) = ctx.reply_sender();
203
204            if let Some(replier) = replier {
205                let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
206
207                tokio::spawn(async move {
208                    let ip = fut.await;
209                    replier.send(ip);
210                });
211            }
212
213            deleg
214        }
215
216        /// Fetch the IPv6 address for this tailscale device.
217        #[message(ctx)]
218        pub fn ipv6(
219            &self,
220            ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
221        ) -> DelegatedReply<Option<Ipv6Addr>> {
222            let (deleg, replier) = ctx.reply_sender();
223
224            if let Some(replier) = replier {
225                let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
226
227                tokio::spawn(async move {
228                    let ip = fut.await;
229                    replier.send(ip);
230                });
231            }
232
233            deleg
234        }
235
236        /// Fetch the self node for this tailscale device.
237        #[message(ctx)]
238        pub fn self_node(
239            &self,
240            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
241        ) -> DelegatedReply<Option<Node>> {
242            let (deleg, replier) = ctx.reply_sender();
243
244            if let Some(replier) = replier {
245                let node = self.with_self_node(|node| node.clone());
246
247                tokio::spawn(async move {
248                    let node = node.await;
249                    replier.send(node)
250                });
251            }
252
253            deleg
254        }
255
256        /// Fetch the current Tailscale SSH policy, if control has pushed one.
257        ///
258        /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
259        /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
260        /// for a value: an absent policy is a legitimate, immediate answer.
261        #[message]
262        pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
263            self.ssh_policy.borrow().clone()
264        }
265
266        /// Fetch the current Tailnet Lock status, if control has pushed one.
267        ///
268        /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
269        #[message]
270        pub fn current_tka_status(&self) -> Option<TkaStatus> {
271            self.tka.borrow().clone()
272        }
273
274        /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
275        ///
276        /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
277        /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
278        /// does not block waiting for a value).
279        #[message]
280        pub fn cert_domains(&self) -> Vec<String> {
281            self.cert_domains.borrow().clone()
282        }
283
284        /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
285        ///
286        /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
287        /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
288        /// for the round-trip.
289        #[message(ctx)]
290        pub fn fetch_id_token(
291            &self,
292            ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
293            audience: String,
294        ) -> DelegatedReply<Result<String, IdTokenError>> {
295            let (deleg, replier) = ctx.reply_sender();
296
297            if let Some(replier) = replier {
298                let config = self.params.config.clone();
299                let keys = self.params.env.keys.clone();
300                tokio::spawn(async move {
301                    let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
302                    replier.send(result);
303                });
304            }
305
306            deleg
307        }
308
309        /// Log this node out of the tailnet: deregister it by expiring its current node key.
310        ///
311        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
312        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
313        /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
314        /// control-plane state change only — it does NOT stop this actor or tear down the datapath
315        /// (the caller follows up with the normal runtime shutdown), and it does not touch the
316        /// on-disk node key, so re-registering with the same key is the re-login path.
317        #[message(ctx)]
318        pub fn logout(
319            &self,
320            ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
321        ) -> DelegatedReply<Result<(), LogoutError>> {
322            let (deleg, replier) = ctx.reply_sender();
323
324            if let Some(replier) = replier {
325                let config = self.params.config.clone();
326                let keys = self.params.env.keys.clone();
327                tokio::spawn(async move {
328                    let result = ts_control::logout(&config, &keys).await;
329                    replier.send(result);
330                });
331            }
332
333            deleg
334        }
335    }
336
337    // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
338    // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
339    // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
340    // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
341    // block keeps the default build clean.
342    #[cfg(feature = "acme")]
343    #[kameo::messages]
344    impl ControlRunner {
345        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
346        /// client-side ACME DNS-01 engine (`acme` feature).
347        ///
348        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
349        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
350        /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
351        /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
352        ///
353        /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
354        /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
355        /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
356        /// allows it). Persisting a generated key back into the key file is the embedder's job (no
357        /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
358        /// publish 501s.
359        #[message(ctx)]
360        pub fn get_certificate(
361            &self,
362            ctx: &mut Context<
363                Self,
364                DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
365            >,
366            name: String,
367        ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
368            let (deleg, replier) = ctx.reply_sender();
369
370            if let Some(replier) = replier {
371                let config = self.params.config.clone();
372                let keys = self.params.env.keys.clone();
373                tokio::spawn(async move {
374                    let result = issue_certificate(&config, &keys, &name).await;
375                    replier.send(result);
376                });
377            }
378
379            deleg
380        }
381    }
382}
383
384/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01.
385///
386/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
387/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
388/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
389/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]).
390#[cfg(feature = "acme")]
391async fn issue_certificate(
392    config: &ts_control::Config,
393    keys: &ts_keys::NodeState,
394    name: &str,
395) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
396    let account_key = match keys.acme_account_key.as_deref() {
397        Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
398        None => {
399            tracing::debug!(
400                "no persisted ACME account key in key state; generating an ephemeral per-call key \
401                 (a new ACME account this issuance — not persisted back)"
402            );
403            ts_control::acme::AcmeAccountKey::generate()?.0
404        }
405    };
406    let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
407        .parse()
408        .map_err(|e| {
409            ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
410        })?;
411    ts_control::issue_certificate_via_setdns(config, keys, name, &account_key, &directory).await
412}
413
414impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
415    type Reply = ();
416
417    async fn handle(
418        &mut self,
419        msg: StreamMessage<Arc<StateUpdate>, (), ()>,
420        _ctx: &mut Context<Self, Self::Reply>,
421    ) {
422        match msg {
423            StreamMessage::Started(_) => {
424                tracing::trace!("started listening to state updates");
425            }
426
427            StreamMessage::Next(msg) => {
428                if let Some(node) = msg.node.as_ref() {
429                    // Reflect node-key expiry into the device state: control delivering a self-node
430                    // whose key is in the past means the node must re-authenticate. Otherwise the
431                    // arrival of a fresh self-node confirms we are Running (recovers the state if a
432                    // prior update had flipped it to Expired).
433                    let now_unix = std::time::SystemTime::now()
434                        .duration_since(std::time::UNIX_EPOCH)
435                        .map(|d| d.as_secs() as i64)
436                        .unwrap_or(0);
437                    let next = if node.key_expired_at_unix(now_unix) {
438                        crate::DeviceState::Expired
439                    } else {
440                        crate::DeviceState::Running
441                    };
442                    // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
443                    // self-node arrives on every netmap update).
444                    self.params.state_tx.send_if_modified(|s| {
445                        if *s != next {
446                            *s = next.clone();
447                            true
448                        } else {
449                            false
450                        }
451                    });
452
453                    self.self_node.send_replace(Some(node.clone()));
454                }
455
456                if let Some(policy) = msg.ssh_policy.as_ref() {
457                    self.ssh_policy.send_replace(Some(policy.clone()));
458                }
459
460                if let Some(tka) = msg.tka.as_ref() {
461                    self.tka.send_replace(Some(tka.clone()));
462                }
463
464                // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
465                // An update with no DNS config, or one carrying no cert domains, means "none" — Go
466                // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
467                let cert_domains = msg
468                    .dns_config
469                    .as_ref()
470                    .map(|d| d.cert_domains.clone())
471                    .unwrap_or_default();
472                self.cert_domains.send_replace(cert_domains);
473
474                if let Err(e) = self.params.env.publish(msg).await {
475                    tracing::error!(error = %e, "publishing netmap update");
476                }
477            }
478
479            StreamMessage::Finished(_) => {
480                tracing::error!("state update stream terminated")
481            }
482        }
483    }
484}
485
486impl Message<DerpLatencyMeasurement> for ControlRunner {
487    type Reply = ();
488
489    async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
490        let measurements = msg.measurement.as_ref().clone();
491
492        let Some(result) = measurements.first() else {
493            tracing::debug!("derp latency measurements empty");
494            return;
495        };
496
497        let iter = measurements.iter().map(|result| {
498            (
499                result.latency_map_key.as_str(),
500                result.latency.as_secs_f64(),
501            )
502        });
503
504        tracing::debug!(selected_region_id = ?result.id, "updating home region");
505
506        self.client.set_home_region(result.id, iter).await;
507    }
508}
509
510impl Message<EndpointAdvertisement> for ControlRunner {
511    type Reply = ();
512
513    async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
514        let endpoints: Vec<Endpoint> = msg
515            .endpoints
516            .iter()
517            .map(|ep| Endpoint {
518                endpoint: ep.addr,
519                ty: match ep.ty {
520                    SelfEndpointType::Local => EndpointType::Local,
521                    SelfEndpointType::Stun => EndpointType::Stun,
522                    SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
523                },
524            })
525            .collect();
526
527        tracing::debug!(
528            n_endpoints = endpoints.len(),
529            "advertising endpoints to control"
530        );
531
532        self.client.set_endpoints(endpoints).await;
533    }
534}