ts_runtime/control_runner.rs
1use core::{
2 net::{Ipv4Addr, Ipv6Addr},
3 time::Duration,
4};
5use std::sync::Arc;
6
7use futures::StreamExt;
8use kameo::{
9 actor::{ActorRef, Spawn},
10 message::{Context, StreamMessage},
11 prelude::Message,
12};
13use tokio::sync::watch;
14use ts_control::{
15 AsyncControlClient, Endpoint, EndpointType, Error as ControlError, IdTokenError, LogoutError,
16 Node, SshPolicy, StateUpdate, TkaStatus,
17};
18use ts_magicsock::SelfEndpointType;
19
20use crate::{
21 derp_latency::{DerpLatencyMeasurement, DerpLatencyMeasurer},
22 direct::EndpointAdvertisement,
23};
24
25/// Actor responsible for maintaining the connection to control.
26///
27/// This actor is responsible for proxying the map response stream onto the message bus.
28pub struct ControlRunner {
29 client: AsyncControlClient,
30 params: Params,
31
32 self_node: watch::Sender<Option<Node>>,
33 /// Latest Tailscale SSH policy pushed by control, or `None` until control sends one. The SSH
34 /// server reads this to authorize incoming connections; absent policy means deny-all.
35 ssh_policy: watch::Sender<Option<SshPolicy>>,
36 /// Latest Tailnet Lock status pushed by control, or `None` until control sends one.
37 tka: watch::Sender<Option<TkaStatus>>,
38 /// The locally-synced Tailnet-Lock state (verified `Authority` + AUM store), or `None` until a
39 /// successful bootstrap+sync. Held here because `ControlRunner` owns the netmap stream that
40 /// triggers resync. Mutated only on the actor thread (the netmap handler spawns the sync RPC and
41 /// the result returns via the [`TkaSynced`] self-message).
42 tka_synced: Option<crate::tka_sync::SyncedTka>,
43 /// Published copy of the synced TKA [`Authority`](ts_tka::Authority) for the verify-and-log
44 /// consumer. `None` until the first successful sync. Observe-only: a reader uses it to *log*
45 /// whether a peer's node-key signature verifies, never to drop a peer (enforcement is a separate
46 /// gated decision).
47 tka_authority: watch::Sender<Option<Arc<ts_tka::Authority>>>,
48 /// In-flight guard: `true` while a sync RPC task is running, so a burst of netmap updates does
49 /// not spawn overlapping syncs (Go serializes sync under `b.mu`).
50 tka_syncing: bool,
51 /// Latest cert-domain list from control's netmap DNS config (Go `nm.DNS.CertDomains`), or empty
52 /// until control sends a DNS config carrying one. The facade reads this for `Device::cert_domains`.
53 cert_domains: watch::Sender<Vec<String>>,
54}
55
56/// Control runner args.
57pub struct Params {
58 /// Control config.
59 pub(crate) config: ts_control::Config,
60
61 /// Auth key (if needed).
62 pub(crate) auth_key: Option<String>,
63
64 /// The [`crate::Env`] for this actor.
65 pub(crate) env: crate::Env,
66
67 /// Sender for the device connection-state cell. Created in [`Runtime::spawn`](crate::Runtime)
68 /// so it outlives the actor's `on_start` (which may publish [`DeviceState::Failed`] and then
69 /// return `Err`, before `Self` exists). The runtime keeps the matching `Receiver` for
70 /// [`watch_state`](crate::Runtime::watch_state) / [`wait_until_running`](crate::Runtime::wait_until_running).
71 pub(crate) state_tx: watch::Sender<crate::DeviceState>,
72}
73
74#[doc(hidden)]
75#[derive(Debug, thiserror::Error)]
76pub enum ControlRunnerError {
77 #[error(transparent)]
78 Control(#[from] ControlError),
79
80 #[error(transparent)]
81 Crate(#[from] crate::Error),
82}
83
84impl kameo::Actor for ControlRunner {
85 type Args = Params;
86 type Error = ControlRunnerError;
87
88 async fn on_start(params: Params, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
89 loop {
90 match AsyncControlClient::check_auth(
91 ¶ms.config,
92 ¶ms.env.keys,
93 params.auth_key.as_deref(),
94 )
95 .await
96 {
97 Ok(()) => break,
98 Err(ControlError::MachineNotAuthorized(u)) => {
99 tracing::info!(auth_url = %u, "please authorize this machine or pass an auth key");
100 // Surface "interactive login required" so a watcher / `wait_until_running` can
101 // tell the user to authorize, instead of seeing an opaque timeout. Registration
102 // keeps retrying (transient), so this is not a terminal `Failed`.
103 params
104 .state_tx
105 .send_replace(crate::DeviceState::NeedsLogin(u.clone()));
106 tokio::time::sleep(Duration::from_secs(5)).await;
107 }
108 Err(e) => {
109 // A hard registration failure (bad/expired/unknown auth key, etc.). Log the
110 // specific reason control gave AND publish it as a typed `Failed` state so
111 // `Device::wait_until_running` returns the actionable reason (tsr-kqj) instead
112 // of the opaque `Internal(Actor)` the caller would otherwise see once the
113 // stopped actor is next asked. Publishing before `return Err` is why the state
114 // sender lives on `Runtime`, not on `Self` (which never gets constructed here).
115 let reason = crate::RegistrationError::from(&e);
116 tracing::error!(error = %e, "registration failed; control runner stopping");
117 params
118 .state_tx
119 .send_replace(crate::DeviceState::Failed(reason));
120 return Err(e.into());
121 }
122 }
123 }
124 // check_auth succeeded, but the node is not "up" until the netmap stream is actually
125 // attached below. Publish `Running` only AFTER `attach_stream` so `wait_until_running` never
126 // resolves `Ok` for a device whose stream connect failed (which would leave a stopped actor
127 // behind). If the connect/subscribe steps fail, publish a transient `Failed` first so the
128 // waiter sees an actionable reason instead of the opaque post-mortem `Internal(Actor)`.
129 let bring_up = async {
130 let (client, stream) = AsyncControlClient::connect(
131 ¶ms.config,
132 ¶ms.env.keys,
133 params.auth_key.as_deref(),
134 )
135 .await?;
136
137 DerpLatencyMeasurer::spawn_link(&slf, params.env.clone()).await;
138
139 params.env.subscribe::<DerpLatencyMeasurement>(&slf).await?;
140 params.env.subscribe::<EndpointAdvertisement>(&slf).await?;
141 slf.attach_stream(stream.boxed(), (), ());
142 Ok::<_, ControlRunnerError>(client)
143 };
144
145 let client = match bring_up.await {
146 Ok(client) => client,
147 Err(e) => {
148 tracing::error!(error = %e, "bringing up the control session failed");
149 // The control session never came up; surface it as a transient registration
150 // failure (a retry / fresh `Device::new` may succeed) rather than leaving the state
151 // stuck at `Connecting`.
152 params.state_tx.send_replace(crate::DeviceState::Failed(
153 crate::RegistrationError::NetworkUnreachable,
154 ));
155 return Err(e);
156 }
157 };
158
159 // The netmap stream is attached: the node is up. The stream `Next` handler keeps this
160 // current (and flips to `Expired` if the self-node's key lapses).
161 params.state_tx.send_replace(crate::DeviceState::Running);
162
163 Ok(Self {
164 client,
165 params,
166 self_node: Default::default(),
167 ssh_policy: Default::default(),
168 tka: Default::default(),
169 tka_synced: None,
170 tka_authority: Default::default(),
171 tka_syncing: false,
172 cert_domains: Default::default(),
173 })
174 }
175}
176
177impl ControlRunner {
178 /// Decide whether the latest netmap's Tailnet-Lock status warrants a (re)sync and, if so, spawn
179 /// the bootstrap+sync RPC off the actor thread (so the netmap stream never blocks on a control
180 /// round-trip). The result returns via the [`TkaSynced`] self-message.
181 ///
182 /// Triggers when control reports TKA enabled (`is_enabled`) AND we are not already syncing AND
183 /// either we hold no `Authority` yet (→ bootstrap) or control's head differs from ours (→ catch
184 /// up). When TKA is disabled, clears any synced state (the lock was turned off). Mirrors Go's
185 /// `tkaSyncIfNeeded`: a no-op when our head already matches.
186 fn maybe_sync_tka(&mut self, tka: &TkaStatus, self_ref: ActorRef<Self>) {
187 if !tka.is_enabled() {
188 // Lock disabled (or never enabled): drop any synced state and stop publishing an
189 // Authority. Never an error; peers are unaffected.
190 if self.tka_synced.is_some() {
191 self.tka_synced = None;
192 self.tka_authority.send_replace(None);
193 }
194 return;
195 }
196 if self.tka_syncing {
197 return; // a sync is already in flight; the next netmap will re-trigger if still stale
198 }
199 // Up-to-date check: if we already have an Authority whose head matches control's, nothing to
200 // do. A malformed control head is treated as "different" (we'll attempt a sync, which
201 // fail-closes harmlessly).
202 if let Some(synced) = &self.tka_synced
203 && let Some(control_head) = ts_tka::AumHash::from_base32(&tka.head)
204 && synced.authority.head_matches(&control_head)
205 {
206 return;
207 }
208
209 // Spawn the sync. Move the current synced state out (the driver takes it by value and returns
210 // the advanced state); `tka_synced` stays `None` until the result lands, guarded by
211 // `tka_syncing` so we don't spawn a second concurrent sync.
212 self.tka_syncing = true;
213 let current = self.tka_synced.take();
214 let config = self.params.config.clone();
215 let keys = self.params.env.keys.clone();
216 tokio::spawn(async move {
217 let result = crate::tka_sync::sync_tka(&config, &keys, current).await;
218 // Hand the outcome back to the actor thread to apply (mutating actor state off-thread is
219 // not allowed). A send failure just means the actor is gone — nothing to do.
220 if let Err(e) = self_ref.tell(TkaSynced { result }).await {
221 tracing::debug!(error = ?e, "TKA sync result not delivered (actor gone)");
222 }
223 });
224 }
225
226 /// Apply the outcome of a spawned [`maybe_sync_tka`] task on the actor thread: store the advanced
227 /// state + publish the `Authority` (or, on inert/failed sync, leave peers unaffected). Always
228 /// clears the in-flight guard.
229 fn apply_tka_synced(
230 &mut self,
231 result: Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
232 ) {
233 self.tka_syncing = false;
234 match result {
235 Ok(Some(synced)) => {
236 tracing::info!(
237 head = %synced.authority.head().to_base32(),
238 "TKA sync succeeded; publishing verified Authority (observe-only)"
239 );
240 self.tka_authority
241 .send_replace(Some(synced.authority.clone()));
242 self.tka_synced = Some(synced);
243 }
244 Ok(None) => {
245 // Control has no lock for us (no genesis / disabled): stay inert. Not an error.
246 tracing::debug!("TKA sync: control reported no lock for this node (inert)");
247 }
248 Err(e) => {
249 // Transport or verify failure: log and stay inert. NEVER errors the netmap or drops a
250 // peer. The next netmap update re-triggers a sync attempt.
251 tracing::warn!(error = %e, "TKA sync failed; staying inert (no peer impact)");
252 }
253 }
254 }
255
256 fn with_self_node<F, R>(&self, f: F) -> impl Future<Output = Option<R>> + use<F, R>
257 where
258 F: FnOnce(&Node) -> R,
259 {
260 let mut sub = self.self_node.subscribe();
261 let mut shutdown = self.params.env.shutdown.clone();
262
263 async move {
264 tokio::select! {
265 _ = shutdown.wait_for(|x| *x) => {
266 None
267 },
268 node = sub.wait_for(Option::is_some) => {
269 Some(f(node.ok()?.as_ref()?))
270 },
271 }
272 }
273 }
274}
275
276// The `#[kameo::messages]` macro generates message structs whose fields mirror the method params;
277// those generated fields carry no doc and can't take attributes, so wrap in a module where
278// missing-docs is allowed (same pattern as PeerTracker's `msg_impl`). The generated message structs
279// are re-exported so callers keep referencing them at `control_runner::<Name>`.
280pub use msg_impl::*;
281
282#[allow(missing_docs)]
283mod msg_impl {
284 use kameo::{message::Context, reply::DelegatedReply};
285
286 use super::*;
287
288 #[kameo::messages]
289 impl ControlRunner {
290 /// Fetch the IPv4 address for this tailscale device.
291 #[message(ctx)]
292 pub fn ipv4(
293 &self,
294 ctx: &mut Context<Self, DelegatedReply<Option<Ipv4Addr>>>,
295 ) -> DelegatedReply<Option<Ipv4Addr>> {
296 let (deleg, replier) = ctx.reply_sender();
297
298 if let Some(replier) = replier {
299 let fut = self.with_self_node(|node| node.tailnet_address.ipv4.addr());
300
301 tokio::spawn(async move {
302 let ip = fut.await;
303 replier.send(ip);
304 });
305 }
306
307 deleg
308 }
309
310 /// Fetch the IPv6 address for this tailscale device.
311 #[message(ctx)]
312 pub fn ipv6(
313 &self,
314 ctx: &mut Context<Self, DelegatedReply<Option<Ipv6Addr>>>,
315 ) -> DelegatedReply<Option<Ipv6Addr>> {
316 let (deleg, replier) = ctx.reply_sender();
317
318 if let Some(replier) = replier {
319 let fut = self.with_self_node(|node| node.tailnet_address.ipv6.addr());
320
321 tokio::spawn(async move {
322 let ip = fut.await;
323 replier.send(ip);
324 });
325 }
326
327 deleg
328 }
329
330 /// Fetch the self node for this tailscale device.
331 #[message(ctx)]
332 pub fn self_node(
333 &self,
334 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
335 ) -> DelegatedReply<Option<Node>> {
336 let (deleg, replier) = ctx.reply_sender();
337
338 if let Some(replier) = replier {
339 let node = self.with_self_node(|node| node.clone());
340
341 tokio::spawn(async move {
342 let node = node.await;
343 replier.send(node)
344 });
345 }
346
347 deleg
348 }
349
350 /// Fetch the current Tailscale SSH policy, if control has pushed one.
351 ///
352 /// Returns `None` when control has not sent an SSH policy (the SSH server treats this as
353 /// deny-all — fail-closed). Unlike `self_node` this does not block waiting
354 /// for a value: an absent policy is a legitimate, immediate answer.
355 #[message]
356 pub fn current_ssh_policy(&self) -> Option<SshPolicy> {
357 self.ssh_policy.borrow().clone()
358 }
359
360 /// Fetch the current Tailnet Lock status, if control has pushed one.
361 ///
362 /// Returns `None` when control has sent no `TKAInfo` (tailnet lock not in use / no change seen).
363 #[message]
364 pub fn current_tka_status(&self) -> Option<TkaStatus> {
365 self.tka.borrow().clone()
366 }
367
368 /// The cert-eligible DNS names from control's netmap DNS config (Go `nm.DNS.CertDomains`).
369 ///
370 /// Returns an empty `Vec` when control has sent no DNS config, or one carrying no cert
371 /// domains (an empty list is a legitimate, immediate answer — like `current_ssh_policy`, this
372 /// does not block waiting for a value).
373 #[message]
374 pub fn cert_domains(&self) -> Vec<String> {
375 self.cert_domains.borrow().clone()
376 }
377
378 /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
379 ///
380 /// Opens a fresh Noise channel and POSTs `/machine/id-token`; returns the signed JWT or an
381 /// [`IdTokenError`]. Runs on a spawned task (delegated reply) so the actor mailbox isn't blocked
382 /// for the round-trip.
383 #[message(ctx)]
384 pub fn fetch_id_token(
385 &self,
386 ctx: &mut Context<Self, DelegatedReply<Result<String, IdTokenError>>>,
387 audience: String,
388 ) -> DelegatedReply<Result<String, IdTokenError>> {
389 let (deleg, replier) = ctx.reply_sender();
390
391 if let Some(replier) = replier {
392 let config = self.params.config.clone();
393 let keys = self.params.env.keys.clone();
394 tokio::spawn(async move {
395 let result = ts_control::fetch_id_token(&config, &keys, &audience).await;
396 replier.send(result);
397 });
398 }
399
400 deleg
401 }
402
403 /// Log this node out of the tailnet: deregister it by expiring its current node key.
404 ///
405 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
406 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox) and
407 /// re-POSTs `/machine/register` with a past expiry over a fresh Noise channel. This is a
408 /// control-plane state change only — it does NOT stop this actor or tear down the datapath
409 /// (the caller follows up with the normal runtime shutdown), and it does not touch the
410 /// on-disk node key, so re-registering with the same key is the re-login path.
411 #[message(ctx)]
412 pub fn logout(
413 &self,
414 ctx: &mut Context<Self, DelegatedReply<Result<(), LogoutError>>>,
415 ) -> DelegatedReply<Result<(), LogoutError>> {
416 let (deleg, replier) = ctx.reply_sender();
417
418 if let Some(replier) = replier {
419 let config = self.params.config.clone();
420 let keys = self.params.env.keys.clone();
421 tokio::spawn(async move {
422 let result = ts_control::logout(&config, &keys).await;
423 replier.send(result);
424 });
425 }
426
427 deleg
428 }
429 }
430
431 // The `acme`-gated cert-issuance message lives in its own `#[kameo::messages]` impl block so the
432 // proc-macro never sees it in a non-`acme` build (a `#[cfg]` *inside* a single messages-impl
433 // block is not honored by the macro's generated dispatch — it would emit a `GetCertificate`
434 // handler calling a `get_certificate` method that the same `#[cfg]` strips). A separate gated
435 // block keeps the default build clean.
436 #[cfg(feature = "acme")]
437 #[kameo::messages]
438 impl ControlRunner {
439 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` via the
440 /// client-side ACME DNS-01 engine (`acme` feature).
441 ///
442 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): clones the control config + node keys
443 /// into a spawned task (delegated reply, so the round-trip doesn't block the mailbox), loads
444 /// or generates the ACME account key, and runs issuance against Let's Encrypt production,
445 /// publishing the DNS-01 challenge TXT through the node's `POST /machine/set-dns` RPC.
446 ///
447 /// The account key is loaded from [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when
448 /// present, so the same ACME account persists across renewals; otherwise an ephemeral key is
449 /// generated for this call only (a fresh ACME account each issuance — acceptable for v1; LE
450 /// allows it). Persisting a generated key back into the key file is the embedder's job (no
451 /// write-back path here). SaaS-only: against a self-hosted control plane the set-dns
452 /// publish 501s.
453 #[message(ctx)]
454 pub fn get_certificate(
455 &self,
456 ctx: &mut Context<
457 Self,
458 DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>>,
459 >,
460 name: String,
461 ) -> DelegatedReply<Result<ts_control::tls::CertifiedKey, ts_control::CertError>> {
462 let (deleg, replier) = ctx.reply_sender();
463
464 if let Some(replier) = replier {
465 let config = self.params.config.clone();
466 let keys = self.params.env.keys.clone();
467 tokio::spawn(async move {
468 let result = issue_certificate(&config, &keys, &name).await;
469 replier.send(result);
470 });
471 }
472
473 deleg
474 }
475 }
476}
477
478/// Load or generate the ACME account key, then issue a cert for `name` via set-dns DNS-01.
479///
480/// Reuses the persisted [`ts_keys::NodeState::acme_account_key`] (PKCS#8 DER) when present so the
481/// same Let's Encrypt account survives renewals; otherwise generates an ephemeral per-call key
482/// (logged at debug — a new ACME account each issuance, with no write-back). Always targets Let's
483/// Encrypt production ([`ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY`]).
484#[cfg(feature = "acme")]
485async fn issue_certificate(
486 config: &ts_control::Config,
487 keys: &ts_keys::NodeState,
488 name: &str,
489) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
490 let account_key = match keys.acme_account_key.as_deref() {
491 Some(der) => ts_control::acme::AcmeAccountKey::from_pkcs8(der)?,
492 None => {
493 tracing::debug!(
494 "no persisted ACME account key in key state; generating an ephemeral per-call key \
495 (a new ACME account this issuance — not persisted back)"
496 );
497 ts_control::acme::AcmeAccountKey::generate()?.0
498 }
499 };
500 let directory = ts_control::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
501 .parse()
502 .map_err(|e| {
503 ts_control::CertError::Acme(format!("parsing Let's Encrypt directory URL: {e}"))
504 })?;
505 ts_control::issue_certificate_via_setdns(config, keys, name, &account_key, &directory).await
506}
507
508impl Message<StreamMessage<Arc<StateUpdate>, (), ()>> for ControlRunner {
509 type Reply = ();
510
511 async fn handle(
512 &mut self,
513 msg: StreamMessage<Arc<StateUpdate>, (), ()>,
514 ctx: &mut Context<Self, Self::Reply>,
515 ) {
516 match msg {
517 StreamMessage::Started(_) => {
518 tracing::trace!("started listening to state updates");
519 }
520
521 StreamMessage::Next(msg) => {
522 if let Some(node) = msg.node.as_ref() {
523 // Reflect node-key expiry into the device state: control delivering a self-node
524 // whose key is in the past means the node must re-authenticate. Otherwise the
525 // arrival of a fresh self-node confirms we are Running (recovers the state if a
526 // prior update had flipped it to Expired).
527 let now_unix = std::time::SystemTime::now()
528 .duration_since(std::time::UNIX_EPOCH)
529 .map(|d| d.as_secs() as i64)
530 .unwrap_or(0);
531 let next = if node.key_expired_at_unix(now_unix) {
532 crate::DeviceState::Expired
533 } else {
534 crate::DeviceState::Running
535 };
536 // `send_if_modified` avoids waking watchers when the state is unchanged (a fresh
537 // self-node arrives on every netmap update).
538 self.params.state_tx.send_if_modified(|s| {
539 if *s != next {
540 *s = next.clone();
541 true
542 } else {
543 false
544 }
545 });
546
547 self.self_node.send_replace(Some(node.clone()));
548 }
549
550 if let Some(policy) = msg.ssh_policy.as_ref() {
551 self.ssh_policy.send_replace(Some(policy.clone()));
552 }
553
554 if let Some(tka) = msg.tka.as_ref() {
555 self.tka.send_replace(Some(tka.clone()));
556 self.maybe_sync_tka(tka, ctx.actor_ref().clone());
557 }
558
559 // Track the cert-domain list from the netmap DNS config (Go `nm.DNS.CertDomains`).
560 // An update with no DNS config, or one carrying no cert domains, means "none" — Go
561 // reads an empty slice off an absent config too, so mirror that as an empty `Vec`.
562 let cert_domains = msg
563 .dns_config
564 .as_ref()
565 .map(|d| d.cert_domains.clone())
566 .unwrap_or_default();
567 self.cert_domains.send_replace(cert_domains);
568
569 if let Err(e) = self.params.env.publish(msg).await {
570 tracing::error!(error = %e, "publishing netmap update");
571 }
572 }
573
574 StreamMessage::Finished(_) => {
575 tracing::error!("state update stream terminated")
576 }
577 }
578 }
579}
580
581/// The outcome of a spawned TKA bootstrap+sync task, delivered back to the actor thread so the
582/// result can be applied to actor state (which a spawned task cannot touch directly). Sent by
583/// [`ControlRunner::maybe_sync_tka`]; handled by applying via
584/// [`ControlRunner::apply_tka_synced`](ControlRunner).
585#[doc(hidden)]
586pub struct TkaSynced {
587 pub(crate) result:
588 Result<Option<crate::tka_sync::SyncedTka>, crate::tka_sync::TkaSyncDriverError>,
589}
590
591impl Message<TkaSynced> for ControlRunner {
592 type Reply = ();
593
594 async fn handle(&mut self, msg: TkaSynced, _ctx: &mut Context<Self, Self::Reply>) {
595 self.apply_tka_synced(msg.result);
596 }
597}
598
599impl Message<DerpLatencyMeasurement> for ControlRunner {
600 type Reply = ();
601
602 async fn handle(&mut self, msg: DerpLatencyMeasurement, _ctx: &mut Context<Self, Self::Reply>) {
603 let measurements = msg.measurement.as_ref().clone();
604
605 let Some(result) = measurements.first() else {
606 tracing::debug!("derp latency measurements empty");
607 return;
608 };
609
610 let iter = measurements.iter().map(|result| {
611 (
612 result.latency_map_key.as_str(),
613 result.latency.as_secs_f64(),
614 )
615 });
616
617 tracing::debug!(selected_region_id = ?result.id, "updating home region");
618
619 self.client.set_home_region(result.id, iter).await;
620 }
621}
622
623impl Message<EndpointAdvertisement> for ControlRunner {
624 type Reply = ();
625
626 async fn handle(&mut self, msg: EndpointAdvertisement, _ctx: &mut Context<Self, Self::Reply>) {
627 let endpoints: Vec<Endpoint> = msg
628 .endpoints
629 .iter()
630 .map(|ep| Endpoint {
631 endpoint: ep.addr,
632 ty: match ep.ty {
633 SelfEndpointType::Local => EndpointType::Local,
634 SelfEndpointType::Stun => EndpointType::Stun,
635 SelfEndpointType::Stun4LocalPort => EndpointType::Stun4LocalPort,
636 },
637 })
638 .collect();
639
640 tracing::debug!(
641 n_endpoints = endpoints.len(),
642 "advertising endpoints to control"
643 );
644
645 self.client.set_endpoints(endpoints).await;
646 }
647}