Skip to main content

ts_runtime/
control_runner.rs

1use core::{
2    net::{Ipv4Addr, Ipv6Addr},
3    time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9    actor::{ActorRef, Spawn},
10    message::{Context, StreamMessage},
11    prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15    AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16    Node, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21    derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22    direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29    client: AsyncControlClient,
30    params: Params,
31
32    self_node: watch::Sender<Option<Node>>,
33    /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34    /// server reads this to authorize incoming connections; absent policy means deny-all.
35    ssh_policy: watch::Sender<Option<SshPolicy>>,
36    /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37    tka: watch::Sender<Option<TkaStatus>>,
38    /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
39    /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
40    /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
41    /// the result returns via the [`TkaSynced`] self-message).
42    tka_synced: Option<crate::tka_sync::SyncedTka>,
43    /// Published copy of the synced TKA [`Authority`](ts_tka::Authority) for the verify-and-log
44    /// consumer. `None` until the first successful sync. Observe-only: a reader uses it to *log*
45    /// whether a peer's node-key signature verifies, never to drop a peer (enforcement is a separate
46    /// gated decision).
47    tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
48    /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
49    /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
50    tka_syncing: bool,
51    /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
52    /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
53    cert_domains: watch::Sender<Vec<String>>,
54}
55
56/// Control runner args.
57pub struct Params {
58    /// Control config.
59    pub(crate) config: ts_control::Config,
60
61    /// Auth key (if needed).
62    pub(crate) auth_key: Option<String>,
63
64    /// The [`crate::Env`] for this actor.
65    pub(crate) env: crate::Env,
66
67    /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
68    /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
69    /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
70    /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
71    pub(crate) state_tx: watch::Sender<crate::DeviceState>,
72}
73
74#[doc(hidden)]
75#[derive(Debug, thiserror::Error)]
76pub enum ControlRunnerError {
77    #[error(transparent)]
78    Control(#[from] ControlError),
79
80    #[error(transparent)]
81    Crate(#[from] crate::Error),
82}
83
84impl kameo::Actor for ControlRunner {
85    type Args = Params;
86    type Error = ControlRunnerError;
87
88    async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
89        loop {
90            match AsyncControlClient::check_auth(
91                &params.config,
92                &params.env.keys,
93                params.auth_key.as_deref(),
94            )
95            .await
96            {
97                Ok(()) => break,
98                Err(ControlError::MachineNotAuthorized(u)) => {
99                    tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
100                    // Surface "interactive login required" so a watcher / `wait_until_running` can
101                    // tell the user to authorize, instead of seeing an opaque timeout. Registration
102                    // keeps retrying (transient), so this is not a terminal `Failed`.
103                    params
104                        .state_tx
105                        .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
106                    tokio::time::sleep(Duration::from_secs(5)).await;
107                }
108                Err(e) => {
109                    // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
110                    // specific reason control gave AND publish it as a typed `Failed` state so
111                    // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
112                    // of the opaque `Internal(Actor)` the caller would otherwise see once the
113                    // stopped actor is next asked. Publishing before `return Err` is why the state
114                    // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
115                    let reason = crate::RegistrationError::from(&e);
116                    tracing::error!(error = %e, "registration failed; control runner stopping");
117                    params
118                        .state_tx
119                        .send_replace(crate::DeviceState::Failed(reason));
120                    return Err(e.into());
121                }
122            }
123        }
124        // check_auth succeeded, but the node is not "up" until the netmap stream is actually
125        // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
126        // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
127        // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
128        // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
129        let bring_up = async {
130            let (client, stream) = AsyncControlClient::connect(
131                &params.config,
132                &params.env.keys,
133                params.auth_key.as_deref(),
134            )
135            .await?;
136
137            DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
138
139            params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
140            params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
141            slf.attach_stream(stream.boxed(), (), ());
142            Ok::<_, ControlRunnerError>(client)
143        };
144
145        let client = match bring_up.await {
146            Ok(client) => client,
147            Err(e) => {
148                tracing::error!(error = %e, "bringing up the control session failed");
149                // The control session never came up; surface it as a transient registration
150                // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
151                // stuck at `Connecting`.
152                params.state_tx.send_replace(crate::DeviceState::Failed(
153                    crate::RegistrationError::NetworkUnreachable,
154                ));
155                return Err(e);
156            }
157        };
158
159        // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
160        // current (and flips to `Expired` if the self-node's key lapses).
161        params.state_tx.send_replace(crate::DeviceState::Running);
162
163        Ok(Self {
164            client,
165            params,
166            self_node: Default::default(),
167            ssh_policy: Default::default(),
168            tka: Default::default(),
169            tka_synced: None,
170            tka_authority: Default::default(),
171            tka_syncing: false,
172            cert_domains: Default::default(),
173        })
174    }
175}
176
177impl ControlRunner {
178    /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
179    /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
180    /// round-trip). The result returns via the [`TkaSynced`] self-message.
181    ///
182    /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
183    /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
184    /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
185    /// `tkaSyncIfNeeded`: a no-op when our head already matches.
186    fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
187        if !tka.is_enabled() {
188            // Lock disabled (or never enabled): drop any synced state and stop publishing an
189            // Authority. Never an error; peers are unaffected.
190            if self.tka_synced.is_some() {
191                self.tka_synced = None;
192                self.tka_authority.send_replace(None);
193            }
194            return;
195        }
196        if self.tka_syncing {
197            return; // a sync is already in flight; the next netmap will re-trigger if still stale
198        }
199        // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
200        // do. A malformed control head is treated as "different" (we'll attempt a sync, which
201        // fail-closes harmlessly).
202        if let Some(synced) = &self.tka_synced
203            && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
204            && synced.authority.head_matches(&control_head)
205        {
206            return;
207        }
208
209        // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
210        // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
211        // `tka_syncing` so we don't spawn a second concurrent sync.
212        self.tka_syncing = true;
213        let current = self.tka_synced.take();
214        let config = self.params.config.clone();
215        let keys = self.params.env.keys.clone();
216        tokio::spawn(async move {
217            let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
218            // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
219            // not allowed). A send failure just means the actor is gone — nothing to do.
220            if let Err(e) = self_ref.tell(TkaSynced { result }).await {
221                tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
222            }
223        });
224    }
225
226    /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
227    /// state + publish the `Authority` (or, on inert/failed sync, leave peers unaffected). Always
228    /// clears the in-flight guard.
229    fn apply_tka_synced(
230        &mut self,
231        result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
232    ) {
233        self.tka_syncing = false;
234        match result {
235            Ok(Some(synced)) => {
236                tracing::info!(
237                    head = %synced.authority.head().to_base32(),
238                    "TKA sync succeeded; publishing verified Authority (observe-only)"
239                );
240                self.tka_authority
241                    .send_replace(Some(synced.authority.clone()));
242                self.tka_synced = Some(synced);
243            }
244            Ok(None) => {
245                // Control has no lock for us (no genesis / disabled): stay inert. Not an error.
246                tracing::debug!("TKA sync: control reported no lock for this node (inert)");
247            }
248            Err(e) => {
249                // Transport or verify failure: log and stay inert. NEVER errors the netmap or drops a
250                // peer. The next netmap update re-triggers a sync attempt.
251                tracing::warn!(error = %e, "TKA sync failed; staying inert (no peer impact)");
252            }
253        }
254    }
255
256    fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
257    where
258        F: FnOnce(&Node) -> R,
259    {
260        let mut sub = self.self_node.subscribe();
261        let mut shutdown = self.params.env.shutdown.clone();
262
263        async move {
264            tokio::select! {
265                _ = shutdown.wait_for(|x| *x) => {
266                    None
267                },
268                node = sub.wait_for(Option::is_some) => {
269                    Some(f(node.ok()?.as_ref()?))
270                },
271            }
272        }
273    }
274}
275
276// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
277// those generated fields carry no doc and can't take attributes, so wrap in a module where
278// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
279// are re-exported so callers keep referencing them at `control_runner::<Name>`.
280pub use msg_impl::*;
281
282#[allow(missing_docs)]
283mod msg_impl {
284    use kameo::{message::Context, reply::DelegatedReply};
285
286    use super::*;
287
288    #[kameo::messages]
289    impl ControlRunner {
290        /// Fetch the IPv4 address for this tailscale device.
291        #[message(ctx)]
292        pub fn ipv4(
293            &self,
294            ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
295        ) -> DelegatedReply<Option<Ipv4Addr>> {
296            let (deleg, replier) = ctx.reply_sender();
297
298            if let Some(replier) = replier {
299                let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
300
301                tokio::spawn(async move {
302                    let ip = fut.await;
303                    replier.send(ip);
304                });
305            }
306
307            deleg
308        }
309
310        /// Fetch the IPv6 address for this tailscale device.
311        #[message(ctx)]
312        pub fn ipv6(
313            &self,
314            ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
315        ) -> DelegatedReply<Option<Ipv6Addr>> {
316            let (deleg, replier) = ctx.reply_sender();
317
318            if let Some(replier) = replier {
319                let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
320
321                tokio::spawn(async move {
322                    let ip = fut.await;
323                    replier.send(ip);
324                });
325            }
326
327            deleg
328        }
329
330        /// Fetch the self node for this tailscale device.
331        #[message(ctx)]
332        pub fn self_node(
333            &self,
334            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
335        ) -> DelegatedReply<Option<Node>> {
336            let (deleg, replier) = ctx.reply_sender();
337
338            if let Some(replier) = replier {
339                let node = self.with_self_node(|node| node.clone());
340
341                tokio::spawn(async move {
342                    let node = node.await;
343                    replier.send(node)
344                });
345            }
346
347            deleg
348        }
349
350        /// Fetch the current Tailscale SSH policy, if control has pushed one.
351        ///
352        /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
353        /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
354        /// for a value: an absent policy is a legitimate, immediate answer.
355        #[message]
356        pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
357            self.ssh_policy.borrow().clone()
358        }
359
360        /// Fetch the current Tailnet Lock status, if control has pushed one.
361        ///
362        /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
363        #[message]
364        pub fn current_tka_status(&self) -> Option<TkaStatus> {
365            self.tka.borrow().clone()
366        }
367
368        /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
369        ///
370        /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
371        /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
372        /// does not block waiting for a value).
373        #[message]
374        pub fn cert_domains(&self) -> Vec<String> {
375            self.cert_domains.borrow().clone()
376        }
377
378        /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
379        ///
380        /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
381        /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
382        /// for the round-trip.
383        #[message(ctx)]
384        pub fn fetch_id_token(
385            &self,
386            ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
387            audience: String,
388        ) -> DelegatedReply<Result<String, IdTokenError>> {
389            let (deleg, replier) = ctx.reply_sender();
390
391            if let Some(replier) = replier {
392                let config = self.params.config.clone();
393                let keys = self.params.env.keys.clone();
394                tokio::spawn(async move {
395                    let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
396                    replier.send(result);
397                });
398            }
399
400            deleg
401        }
402
403        /// Log this node out of the tailnet: deregister it by expiring its current node key.
404        ///
405        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
406        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
407        /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
408        /// control-plane state change only — it does NOT stop this actor or tear down the datapath
409        /// (the caller follows up with the normal runtime shutdown), and it does not touch the
410        /// on-disk node key, so re-registering with the same key is the re-login path.
411        #[message(ctx)]
412        pub fn logout(
413            &self,
414            ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
415        ) -> DelegatedReply<Result<(), LogoutError>> {
416            let (deleg, replier) = ctx.reply_sender();
417
418            if let Some(replier) = replier {
419                let config = self.params.config.clone();
420                let keys = self.params.env.keys.clone();
421                tokio::spawn(async move {
422                    let result = ts_control::logout(&config, &keys).await;
423                    replier.send(result);
424                });
425            }
426
427            deleg
428        }
429    }
430
431    // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
432    // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
433    // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
434    // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
435    // block keeps the default build clean.
436    #[cfg(feature = "acme")]
437    #[kameo::messages]
438    impl ControlRunner {
439        /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
440        /// client-side ACME DNS-01 engine (`acme` feature).
441        ///
442        /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
443        /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
444        /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
445        /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
446        ///
447        /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
448        /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
449        /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
450        /// allows it). Persisting a generated key back into the key file is the embedder's job (no
451        /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
452        /// publish 501s.
453        #[message(ctx)]
454        pub fn get_certificate(
455            &self,
456            ctx: &mut Context<
457                Self,
458                DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
459            >,
460            name: String,
461        ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
462            let (deleg, replier) = ctx.reply_sender();
463
464            if let Some(replier) = replier {
465                let config = self.params.config.clone();
466                let keys = self.params.env.keys.clone();
467                tokio::spawn(async move {
468                    let result = issue_certificate(&config, &keys, &name).await;
469                    replier.send(result);
470                });
471            }
472
473            deleg
474        }
475    }
476}
477
478/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01.
479///
480/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
481/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
482/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
483/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]).
484#[cfg(feature = "acme")]
485async fn issue_certificate(
486    config: &ts_control::Config,
487    keys: &ts_keys::NodeState,
488    name: &str,
489) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
490    let account_key = match keys.acme_account_key.as_deref() {
491        Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
492        None => {
493            tracing::debug!(
494                "no persisted ACME account key in key state; generating an ephemeral per-call key \
495                 (a new ACME account this issuance — not persisted back)"
496            );
497            ts_control::acme::AcmeAccountKey::generate()?.0
498        }
499    };
500    let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
501        .parse()
502        .map_err(|e| {
503            ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
504        })?;
505    ts_control::issue_certificate_via_setdns(config, keys, name, &account_key, &directory).await
506}
507
508impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
509    type Reply = ();
510
511    async fn handle(
512        &mut self,
513        msg: StreamMessage<Arc<StateUpdate>, (), ()>,
514        ctx: &mut Context<Self, Self::Reply>,
515    ) {
516        match msg {
517            StreamMessage::Started(_) => {
518                tracing::trace!("started listening to state updates");
519            }
520
521            StreamMessage::Next(msg) => {
522                if let Some(node) = msg.node.as_ref() {
523                    // Reflect node-key expiry into the device state: control delivering a self-node
524                    // whose key is in the past means the node must re-authenticate. Otherwise the
525                    // arrival of a fresh self-node confirms we are Running (recovers the state if a
526                    // prior update had flipped it to Expired).
527                    let now_unix = std::time::SystemTime::now()
528                        .duration_since(std::time::UNIX_EPOCH)
529                        .map(|d| d.as_secs() as i64)
530                        .unwrap_or(0);
531                    let next = if node.key_expired_at_unix(now_unix) {
532                        crate::DeviceState::Expired
533                    } else {
534                        crate::DeviceState::Running
535                    };
536                    // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
537                    // self-node arrives on every netmap update).
538                    self.params.state_tx.send_if_modified(|s| {
539                        if *s != next {
540                            *s = next.clone();
541                            true
542                        } else {
543                            false
544                        }
545                    });
546
547                    self.self_node.send_replace(Some(node.clone()));
548                }
549
550                if let Some(policy) = msg.ssh_policy.as_ref() {
551                    self.ssh_policy.send_replace(Some(policy.clone()));
552                }
553
554                if let Some(tka) = msg.tka.as_ref() {
555                    self.tka.send_replace(Some(tka.clone()));
556                    self.maybe_sync_tka(tka, ctx.actor_ref().clone());
557                }
558
559                // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
560                // An update with no DNS config, or one carrying no cert domains, means "none" — Go
561                // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
562                let cert_domains = msg
563                    .dns_config
564                    .as_ref()
565                    .map(|d| d.cert_domains.clone())
566                    .unwrap_or_default();
567                self.cert_domains.send_replace(cert_domains);
568
569                if let Err(e) = self.params.env.publish(msg).await {
570                    tracing::error!(error = %e, "publishing netmap update");
571                }
572            }
573
574            StreamMessage::Finished(_) => {
575                tracing::error!("state update stream terminated")
576            }
577        }
578    }
579}
580
581/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
582/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
583/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
584/// [`ControlRunner::apply_tka_synced`](ControlRunner).
585#[doc(hidden)]
586pub struct TkaSynced {
587    pub(crate) result:
588        Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
589}
590
591impl Message<TkaSynced> for ControlRunner {
592    type Reply = ();
593
594    async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
595        self.apply_tka_synced(msg.result);
596    }
597}
598
599impl Message<DerpLatencyMeasurement> for ControlRunner {
600    type Reply = ();
601
602    async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
603        let measurements = msg.measurement.as_ref().clone();
604
605        let Some(result) = measurements.first() else {
606            tracing::debug!("derp latency measurements empty");
607            return;
608        };
609
610        let iter = measurements.iter().map(|result| {
611            (
612                result.latency_map_key.as_str(),
613                result.latency.as_secs_f64(),
614            )
615        });
616
617        tracing::debug!(selected_region_id = ?result.id, "updating home region");
618
619        self.client.set_home_region(result.id, iter).await;
620    }
621}
622
623impl Message<EndpointAdvertisement> for ControlRunner {
624    type Reply = ();
625
626    async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
627        let endpoints: Vec<Endpoint> = msg
628            .endpoints
629            .iter()
630            .map(|ep| Endpoint {
631                endpoint: ep.addr,
632                ty: match ep.ty {
633                    SelfEndpointType::Local => EndpointType::Local,
634                    SelfEndpointType::Stun => EndpointType::Stun,
635                    SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
636                },
637            })
638            .collect();
639
640        tracing::debug!(
641            n_endpoints = endpoints.len(),
642            "advertising endpoints to control"
643        );
644
645        self.client.set_endpoints(endpoints).await;
646    }
647}