Skip to main content

ts_runtime/
control_runner.rs

1use core::{
2    net::{Ipv4Addr, Ipv6Addr},
3    time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9    actor::{ActorRef, Spawn},
10    message::{Context, StreamMessage},
11    prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15    AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16    Node, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21    derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22    direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29    client: AsyncControlClient,
30    params: Params,
31
32    self_node: watch::Sender<Option<Node>>,
33    /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34    /// server reads this to authorize incoming connections; absent policy means deny-all.
35    ssh_policy: watch::Sender<Option<SshPolicy>>,
36    /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37    tka: watch::Sender<Option<TkaStatus>>,
38}
39
40/// Control runner args.
41pub struct Params {
42    /// Control config.
43    pub(crate) config: ts_control::Config,
44
45    /// Auth key (if needed).
46    pub(crate) auth_key: Option<String>,
47
48    /// The [`crate::Env`] for this actor.
49    pub(crate) env: crate::Env,
50
51    /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
52    /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
53    /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
54    /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
55    pub(crate) state_tx: watch::Sender<crate::DeviceState>,
56}
57
58#[doc(hidden)]
59#[derive(Debug, thiserror::Error)]
60pub enum ControlRunnerError {
61    #[error(transparent)]
62    Control(#[from] ControlError),
63
64    #[error(transparent)]
65    Crate(#[from] crate::Error),
66}
67
68impl kameo::Actor for ControlRunner {
69    type Args = Params;
70    type Error = ControlRunnerError;
71
72    async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
73        loop {
74            match AsyncControlClient::check_auth(
75                &params.config,
76                &params.env.keys,
77                params.auth_key.as_deref(),
78            )
79            .await
80            {
81                Ok(()) => break,
82                Err(ControlError::MachineNotAuthorized(u)) => {
83                    tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
84                    // Surface "interactive login required" so a watcher / `wait_until_running` can
85                    // tell the user to authorize, instead of seeing an opaque timeout. Registration
86                    // keeps retrying (transient), so this is not a terminal `Failed`.
87                    params
88                        .state_tx
89                        .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
90                    tokio::time::sleep(Duration::from_secs(5)).await;
91                }
92                Err(e) => {
93                    // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
94                    // specific reason control gave AND publish it as a typed `Failed` state so
95                    // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
96                    // of the opaque `Internal(Actor)` the caller would otherwise see once the
97                    // stopped actor is next asked. Publishing before `return Err` is why the state
98                    // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
99                    let reason = crate::RegistrationError::from(&e);
100                    tracing::error!(error = %e, "registration failed; control runner stopping");
101                    params
102                        .state_tx
103                        .send_replace(crate::DeviceState::Failed(reason));
104                    return Err(e.into());
105                }
106            }
107        }
108        // check_auth succeeded, but the node is not "up" until the netmap stream is actually
109        // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
110        // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
111        // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
112        // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
113        let bring_up = async {
114            let (client, stream) = AsyncControlClient::connect(
115                &params.config,
116                &params.env.keys,
117                params.auth_key.as_deref(),
118            )
119            .await?;
120
121            DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
122
123            params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
124            params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
125            slf.attach_stream(stream.boxed(), (), ());
126            Ok::<_, ControlRunnerError>(client)
127        };
128
129        let client = match bring_up.await {
130            Ok(client) => client,
131            Err(e) => {
132                tracing::error!(error = %e, "bringing up the control session failed");
133                // The control session never came up; surface it as a transient registration
134                // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
135                // stuck at `Connecting`.
136                params.state_tx.send_replace(crate::DeviceState::Failed(
137                    crate::RegistrationError::NetworkUnreachable,
138                ));
139                return Err(e);
140            }
141        };
142
143        // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
144        // current (and flips to `Expired` if the self-node's key lapses).
145        params.state_tx.send_replace(crate::DeviceState::Running);
146
147        Ok(Self {
148            client,
149            params,
150            self_node: Default::default(),
151            ssh_policy: Default::default(),
152            tka: Default::default(),
153        })
154    }
155}
156
157impl ControlRunner {
158    fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
159    where
160        F: FnOnce(&Node) -> R,
161    {
162        let mut sub = self.self_node.subscribe();
163        let mut shutdown = self.params.env.shutdown.clone();
164
165        async move {
166            tokio::select! {
167                _ = shutdown.wait_for(|x| *x) => {
168                    None
169                },
170                node = sub.wait_for(Option::is_some) => {
171                    Some(f(node.ok()?.as_ref()?))
172                },
173            }
174        }
175    }
176}
177
178// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
179// those generated fields carry no doc and can't take attributes, so wrap in a module where
180// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
181// are re-exported so callers keep referencing them at `control_runner::<Name>`.
182pub use msg_impl::*;
183
184#[allow(missing_docs)]
185mod msg_impl {
186    use kameo::{message::Context, reply::DelegatedReply};
187
188    use super::*;
189
190    #[kameo::messages]
191    impl ControlRunner {
192        /// Fetch the IPv4 address for this tailscale device.
193        #[message(ctx)]
194        pub fn ipv4(
195            &self,
196            ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
197        ) -> DelegatedReply<Option<Ipv4Addr>> {
198            let (deleg, replier) = ctx.reply_sender();
199
200            if let Some(replier) = replier {
201                let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
202
203                tokio::spawn(async move {
204                    let ip = fut.await;
205                    replier.send(ip);
206                });
207            }
208
209            deleg
210        }
211
212        /// Fetch the IPv6 address for this tailscale device.
213        #[message(ctx)]
214        pub fn ipv6(
215            &self,
216            ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
217        ) -> DelegatedReply<Option<Ipv6Addr>> {
218            let (deleg, replier) = ctx.reply_sender();
219
220            if let Some(replier) = replier {
221                let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
222
223                tokio::spawn(async move {
224                    let ip = fut.await;
225                    replier.send(ip);
226                });
227            }
228
229            deleg
230        }
231
232        /// Fetch the self node for this tailscale device.
233        #[message(ctx)]
234        pub fn self_node(
235            &self,
236            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
237        ) -> DelegatedReply<Option<Node>> {
238            let (deleg, replier) = ctx.reply_sender();
239
240            if let Some(replier) = replier {
241                let node = self.with_self_node(|node| node.clone());
242
243                tokio::spawn(async move {
244                    let node = node.await;
245                    replier.send(node)
246                });
247            }
248
249            deleg
250        }
251
252        /// Fetch the current Tailscale SSH policy, if control has pushed one.
253        ///
254        /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
255        /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
256        /// for a value: an absent policy is a legitimate, immediate answer.
257        #[message]
258        pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
259            self.ssh_policy.borrow().clone()
260        }
261
262        /// Fetch the current Tailnet Lock status, if control has pushed one.
263        ///
264        /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
265        #[message]
266        pub fn current_tka_status(&self) -> Option<TkaStatus> {
267            self.tka.borrow().clone()
268        }
269
270        /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
271        ///
272        /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
273        /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
274        /// for the round-trip.
275        #[message(ctx)]
276        pub fn fetch_id_token(
277            &self,
278            ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
279            audience: String,
280        ) -> DelegatedReply<Result<String, IdTokenError>> {
281            let (deleg, replier) = ctx.reply_sender();
282
283            if let Some(replier) = replier {
284                let config = self.params.config.clone();
285                let keys = self.params.env.keys.clone();
286                tokio::spawn(async move {
287                    let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
288                    replier.send(result);
289                });
290            }
291
292            deleg
293        }
294
295        /// Log this node out of the tailnet: deregister it by expiring its current node key.
296        ///
297        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
298        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
299        /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
300        /// control-plane state change only — it does NOT stop this actor or tear down the datapath
301        /// (the caller follows up with the normal runtime shutdown), and it does not touch the
302        /// on-disk node key, so re-registering with the same key is the re-login path.
303        #[message(ctx)]
304        pub fn logout(
305            &self,
306            ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
307        ) -> DelegatedReply<Result<(), LogoutError>> {
308            let (deleg, replier) = ctx.reply_sender();
309
310            if let Some(replier) = replier {
311                let config = self.params.config.clone();
312                let keys = self.params.env.keys.clone();
313                tokio::spawn(async move {
314                    let result = ts_control::logout(&config, &keys).await;
315                    replier.send(result);
316                });
317            }
318
319            deleg
320        }
321    }
322
323    // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
324    // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
325    // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
326    // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
327    // block keeps the default build clean.
328    #[cfg(feature = "acme")]
329    #[kameo::messages]
330    impl ControlRunner {
331        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
332        /// client-side ACME DNS-01 engine (`acme` feature).
333        ///
334        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
335        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
336        /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
337        /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
338        ///
339        /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
340        /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
341        /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
342        /// allows it). Persisting a generated key back into the key file is the embedder's job (no
343        /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
344        /// publish 501s.
345        #[message(ctx)]
346        pub fn get_certificate(
347            &self,
348            ctx: &mut Context<
349                Self,
350                DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
351            >,
352            name: String,
353        ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
354            let (deleg, replier) = ctx.reply_sender();
355
356            if let Some(replier) = replier {
357                let config = self.params.config.clone();
358                let keys = self.params.env.keys.clone();
359                tokio::spawn(async move {
360                    let result = issue_certificate(&config, &keys, &name).await;
361                    replier.send(result);
362                });
363            }
364
365            deleg
366        }
367    }
368}
369
370/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01.
371///
372/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
373/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
374/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
375/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]).
376#[cfg(feature = "acme")]
377async fn issue_certificate(
378    config: &ts_control::Config,
379    keys: &ts_keys::NodeState,
380    name: &str,
381) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
382    let account_key = match keys.acme_account_key.as_deref() {
383        Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
384        None => {
385            tracing::debug!(
386                "no persisted ACME account key in key state; generating an ephemeral per-call key \
387                 (a new ACME account this issuance — not persisted back)"
388            );
389            ts_control::acme::AcmeAccountKey::generate()?.0
390        }
391    };
392    let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
393        .parse()
394        .map_err(|e| {
395            ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
396        })?;
397    ts_control::issue_certificate_via_setdns(config, keys, name, &account_key, &directory).await
398}
399
400impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
401    type Reply = ();
402
403    async fn handle(
404        &mut self,
405        msg: StreamMessage<Arc<StateUpdate>, (), ()>,
406        _ctx: &mut Context<Self, Self::Reply>,
407    ) {
408        match msg {
409            StreamMessage::Started(_) => {
410                tracing::trace!("started listening to state updates");
411            }
412
413            StreamMessage::Next(msg) => {
414                if let Some(node) = msg.node.as_ref() {
415                    // Reflect node-key expiry into the device state: control delivering a self-node
416                    // whose key is in the past means the node must re-authenticate. Otherwise the
417                    // arrival of a fresh self-node confirms we are Running (recovers the state if a
418                    // prior update had flipped it to Expired).
419                    let now_unix = std::time::SystemTime::now()
420                        .duration_since(std::time::UNIX_EPOCH)
421                        .map(|d| d.as_secs() as i64)
422                        .unwrap_or(0);
423                    let next = if node.key_expired_at_unix(now_unix) {
424                        crate::DeviceState::Expired
425                    } else {
426                        crate::DeviceState::Running
427                    };
428                    // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
429                    // self-node arrives on every netmap update).
430                    self.params.state_tx.send_if_modified(|s| {
431                        if *s != next {
432                            *s = next.clone();
433                            true
434                        } else {
435                            false
436                        }
437                    });
438
439                    self.self_node.send_replace(Some(node.clone()));
440                }
441
442                if let Some(policy) = msg.ssh_policy.as_ref() {
443                    self.ssh_policy.send_replace(Some(policy.clone()));
444                }
445
446                if let Some(tka) = msg.tka.as_ref() {
447                    self.tka.send_replace(Some(tka.clone()));
448                }
449
450                if let Err(e) = self.params.env.publish(msg).await {
451                    tracing::error!(error = %e, "publishing netmap update");
452                }
453            }
454
455            StreamMessage::Finished(_) => {
456                tracing::error!("state update stream terminated")
457            }
458        }
459    }
460}
461
462impl Message<DerpLatencyMeasurement> for ControlRunner {
463    type Reply = ();
464
465    async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
466        let measurements = msg.measurement.as_ref().clone();
467
468        let Some(result) = measurements.first() else {
469            tracing::debug!("derp latency measurements empty");
470            return;
471        };
472
473        let iter = measurements.iter().map(|result| {
474            (
475                result.latency_map_key.as_str(),
476                result.latency.as_secs_f64(),
477            )
478        });
479
480        tracing::debug!(selected_region_id = ?result.id, "updating home region");
481
482        self.client.set_home_region(result.id, iter).await;
483    }
484}
485
486impl Message<EndpointAdvertisement> for ControlRunner {
487    type Reply = ();
488
489    async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
490        let endpoints: Vec<Endpoint> = msg
491            .endpoints
492            .iter()
493            .map(|ep| Endpoint {
494                endpoint: ep.addr,
495                ty: match ep.ty {
496                    SelfEndpointType::Local => EndpointType::Local,
497                    SelfEndpointType::Stun => EndpointType::Stun,
498                    SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
499                },
500            })
501            .collect();
502
503        tracing::debug!(
504            n_endpoints = endpoints.len(),
505            "advertising endpoints to control"
506        );
507
508        self.client.set_endpoints(endpoints).await;
509    }
510}