ts_control/tokio/client.rs
1use alloc::{collections::BTreeMap, sync::Arc};
2
3use futures_util::{Stream, StreamExt};
4use tokio::{
5 sync::{broadcast, mpsc, watch},
6 task::JoinSet,
7};
8use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
9use url::Url;
10
11use crate::{
12 ControlDialer, Error,
13 map_request_builder::MapRequestBuilder,
14 tokio::{
15 map_stream::{StateUpdate, map_stream, send_map_request},
16 ping::handle_ping,
17 },
18};
19
20/// A client to communicate with control.
21#[derive(Debug)]
22pub struct AsyncControlClient {
23 base_url: Url,
24 state_tx: broadcast::Sender<Arc<StateUpdate>>,
25 command_tx: mpsc::Sender<Command>,
26 _tasks: JoinSet<()>,
27}
28
29impl AsyncControlClient {
30 /// Check whether it is possible to login with the given config, node keys, and auth
31 /// key.
32 pub async fn check_auth(
33 config: &crate::Config,
34 node_keys: &ts_keys::NodeState,
35 auth_key: Option<&str>,
36 ) -> Result<(), Error> {
37 let control_url = &config.server_url;
38
39 let h2_client = crate::tokio::connect(
40 control_url,
41 &node_keys.machine_keys,
42 config.allow_http_key_fetch,
43 )
44 .await?;
45
46 crate::tokio::register(config, control_url, auth_key, node_keys, &h2_client).await?;
47
48 Ok(())
49 }
50
51 /// Connects to the control plane, registers this Tailscale node, and starts handling the
52 /// message stream from control.
53 ///
54 /// The second element of the return value is a netmap stream which started listening
55 /// _before_ the client connected, i.e. it will not miss any updates from control.
56 ///
57 /// `auth_url_tx` is the embedder-owned "current pending re-auth URL" cell: if the live
58 /// map-poll loop hits a mid-session re-auth (control returns
59 /// [`MachineNotAuthorized`](crate::Error::MachineNotAuthorized) on a re-register because the
60 /// node key expired or was revoked), `run` publishes that URL here without tearing the loop
61 /// down, so the embedder can prompt the user to re-authorize while registration keeps retrying.
62 /// The caller creates the channel and keeps the [`Receiver`](watch::Receiver) (this crate must
63 /// not depend on the embedder's device-state types, so the cell carries a bare `Option<Url>`).
64 #[tracing::instrument(skip_all, fields(control_url = %config.server_url))]
65 pub async fn connect(
66 config: &crate::Config,
67 node_keys: &ts_keys::NodeState,
68 auth_key: Option<&str>,
69 auth_url_tx: watch::Sender<Option<Url>>,
70 ) -> Result<
71 (
72 Self,
73 impl Stream<Item = Arc<StateUpdate>> + Send + Sync + use<>,
74 ),
75 Error,
76 > {
77 let control_url = &config.server_url;
78 let mut tasks = JoinSet::new();
79
80 let h2_client = crate::tokio::connect(
81 control_url,
82 &node_keys.machine_keys,
83 config.allow_http_key_fetch,
84 )
85 .await?;
86 tracing::info!("connected to control, registering");
87
88 crate::tokio::register(config, control_url, auth_key, node_keys, &h2_client).await?;
89
90 tracing::info!("registered, starting netmap stream");
91
92 let (state_tx, state_rx) = broadcast::channel(32);
93 let (command_tx, command_rx) = mpsc::channel(32);
94
95 tasks.spawn({
96 let state_tx = state_tx.clone();
97 let control_url = control_url.clone();
98 let node_keys = node_keys.clone();
99 let auth_key = auth_key.map(ToOwned::to_owned);
100 let config = config.clone();
101
102 async move {
103 run(
104 state_tx,
105 command_rx,
106 control_url.clone(),
107 node_keys.clone(),
108 auth_key,
109 config,
110 auth_url_tx,
111 )
112 .await
113 }
114 });
115
116 Ok((
117 Self {
118 base_url: control_url.clone(),
119 state_tx,
120 command_tx,
121 _tasks: tasks,
122 },
123 netmap_stream(state_rx),
124 ))
125 }
126
127 /// Set the DERP home region for this node.
128 #[tracing::instrument(skip_all, fields(map_url = %self.map_url(), %region_id), level = "trace")]
129 pub async fn set_home_region<'c>(
130 &mut self,
131 region_id: ts_derp::RegionId,
132 latencies: impl IntoIterator<Item = (&'c str, f64)>,
133 ) {
134 tracing::trace!(region = %region_id, "reporting home derp to control server");
135
136 if let Err(e) = self
137 .command_tx
138 .send(Command::SetDerpHomeRegion {
139 id: region_id,
140 latencies: latencies
141 .into_iter()
142 .map(|(name, sample)| (name.to_owned(), sample))
143 .collect(),
144 })
145 .await
146 {
147 tracing::error!(error = %e, "setting home derp region");
148 }
149 }
150
151 /// Advertise this node's magicsock UDP endpoints (ip:port candidates) to the control server
152 /// so peers can learn where to attempt direct connections.
153 #[tracing::instrument(skip_all, fields(map_url = %self.map_url(), n_endpoints), level = "trace")]
154 pub async fn set_endpoints(&mut self, endpoints: Vec<ts_control_serde::Endpoint>) {
155 tracing::Span::current().record("n_endpoints", endpoints.len());
156 tracing::trace!("reporting magicsock endpoints to control server");
157
158 if let Err(e) = self
159 .command_tx
160 .send(Command::SetEndpoints { endpoints })
161 .await
162 {
163 tracing::error!(error = %e, "setting endpoints");
164 }
165 }
166
167 /// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control mid-session
168 /// — the wire half of a runtime `set_advertise_routes`. `routes` is the final advertised set
169 /// (already filtered); it is sent on the live map-poll connection without tearing down the
170 /// long-poll, exactly like [`set_endpoints`](Self::set_endpoints).
171 #[tracing::instrument(skip_all, fields(map_url = %self.map_url(), n_routes = routes.len()), level = "trace")]
172 pub async fn set_routable_ips(&mut self, routes: Vec<ipnet::IpNet>) {
173 tracing::trace!("reporting routable IPs to control server");
174
175 if let Err(e) = self
176 .command_tx
177 .send(Command::SetRoutableIPs { routes })
178 .await
179 {
180 tracing::error!(error = %e, "setting routable IPs");
181 }
182 }
183
184 /// Update this node's `Hostinfo.Hostname` to `hostname` at control mid-session — the wire half of
185 /// a runtime `set_hostname`. Sent on the live map-poll connection without tearing down the
186 /// long-poll, exactly like [`set_routable_ips`](Self::set_routable_ips).
187 #[tracing::instrument(skip_all, fields(map_url = %self.map_url()), level = "trace")]
188 pub async fn set_hostname(&mut self, hostname: String) {
189 tracing::trace!("reporting hostname to control server");
190
191 if let Err(e) = self
192 .command_tx
193 .send(Command::SetHostname { hostname })
194 .await
195 {
196 tracing::error!(error = %e, "setting hostname");
197 }
198 }
199
200 /// Construct the URL that should be used to fetch the netmap.
201 pub fn map_url(&self) -> Url {
202 self.base_url
203 .join("machine/map")
204 .expect("map_url was parsed without issue before")
205 }
206
207 /// Get a stream of all netmap updates.
208 pub fn netmap_stream(&self) -> impl Stream<Item = Arc<StateUpdate>> + Send + Sync + use<> {
209 netmap_stream(self.state_tx.subscribe())
210 }
211}
212
213// Every variant is a "set X on the next map request" command, so they all legitimately share the
214// `Set` prefix (each mirrors a control-side field a side MapRequest carries). The shared prefix is
215// the point, not an accident — silence the variant-name lint rather than rename to something less
216// clear.
217#[allow(clippy::enum_variant_names)]
218#[derive(Debug)]
219pub enum Command {
220 SetDerpHomeRegion {
221 id: ts_derp::RegionId,
222 latencies: BTreeMap<String, f64>,
223 },
224 SetEndpoints {
225 endpoints: Vec<ts_control_serde::Endpoint>,
226 },
227 /// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) mid-session — the wire
228 /// half of a runtime `set_advertise_routes`. The routes travel IN the command (not read from the
229 /// run-loop's frozen `config` clone), already filtered to the final advertised set the caller
230 /// wants control to see.
231 SetRoutableIPs { routes: Vec<ipnet::IpNet> },
232 /// Update this node's `Hostinfo.Hostname` mid-session — the wire half of a runtime
233 /// `set_hostname`. The hostname travels IN the command (the run-loop's `config` clone is frozen,
234 /// so a runtime change can only reach here through the command). Hostname is display-only, so
235 /// there is no local/dataplane half; control reflects the new name on the next netmap.
236 SetHostname { hostname: String },
237}
238
239/// Identifies a map-poll session so a reconnect can resume the delta stream instead of
240/// cold-restarting. Control assigns the `handle` in the first [`MapResponse`] of a session and
241/// stamps each response with a monotonically increasing `seq`; on reconnect we offer the last
242/// `(handle, seq)` we processed and control either resumes after `seq` or ignores it and starts a
243/// fresh session with a full netmap (both are safe — see [`MapRequestBuilder::map_session`]).
244#[derive(Clone, Default)]
245struct MapSession {
246 handle: String,
247 seq: i64,
248}
249
250/// Upper bound on the control-supplied session handle we will store/echo. The handle is an opaque
251/// token; anything beyond this is rejected to avoid unbounded memory growth and log injection.
252const MAX_SESSION_HANDLE_LEN: usize = 256;
253
254/// Advance the resume cursor from a freshly received [`StateUpdate`]. The handle is assigned once
255/// (first response of a session); `seq` advances on substantive responses and is 0 on keep-alives.
256///
257/// If control issues a *new* handle (a fresh session), `seq` is reset to 0 so we never carry a
258/// stale cursor from the prior session into the new one. A control-supplied handle that is empty,
259/// over [`MAX_SESSION_HANDLE_LEN`], or contains non-`ascii_graphic` bytes is rejected (the cursor
260/// is left unchanged) to bound memory and prevent log injection.
261fn advance_session(session: &mut MapSession, update: &StateUpdate) {
262 if let Some(handle) = &update.session_handle {
263 let valid = !handle.is_empty()
264 && handle.len() <= MAX_SESSION_HANDLE_LEN
265 && handle.bytes().all(|b| b.is_ascii_graphic());
266 if valid && *handle != session.handle {
267 session.handle = handle.clone();
268 session.seq = 0;
269 } else if !valid {
270 tracing::warn!(
271 handle_len = handle.len(),
272 "control sent an invalid map-session handle; ignoring it"
273 );
274 }
275 }
276 if update.seq != 0 {
277 session.seq = update.seq;
278 }
279}
280
281/// Reconnect backoff for the map-poll loop, mirroring Go's `util/backoff` (the schedule
282/// `controlclient`'s `mapRoutine` uses): the delay grows as `n²·10ms`, is capped at
283/// [`MAP_BACKOFF_MAX`], and is jittered to a uniform `[0.5×, 1.5×)` to avoid a thundering herd of
284/// clients reconnecting in lock-step against a control server that just came back. `n` increments
285/// on each consecutive failed/empty poll and resets to 0 once a poll has actually delivered a
286/// response, so a flaky control plane is retried with increasing spacing instead of a flat 2 Hz
287/// storm (or, on the clean-EOF path, an unthrottled hot loop).
288///
289/// This is the same shape as `ts_runtime`'s `DerpBackoff`; it is duplicated here (rather than
290/// shared) because `ts_control` is an upstream crate that cannot depend on `ts_runtime`, and the
291/// cap differs (Go passes `30*time.Second` to `NewBackoff` for `mapRoutine`, vs `5s` for the DERP
292/// readers).
293///
294/// Residual (intentional, matches Go): because *any* received frame — including a bare keep-alive
295/// (`seq == 0`) — resets the schedule, a control server that sends one frame then closes the body
296/// can hold the backoff at the bottom and drive a reconnect every cycle. Go's `mapRoutine` has the
297/// identical property (it resets on any received `MapResponse`) and no max-consecutive-reconnect
298/// cap, relying on the fact that the node already has a machine-key relationship with the control
299/// server. The pre-fix behavior was a *busy* spin (zero handshake); the residual is now one full
300/// connect→TLS→Noise→register per cycle, symmetric in cost to the attacker — a large improvement,
301/// and faithful parity rather than a divergence. Gating the reset on `seq != 0` would punish a
302/// healthy keep-alive-only idle poll, so it is deliberately not done.
303#[derive(Debug, Default)]
304struct ControlBackoff {
305 n: u32,
306}
307
308/// Cap on the map-poll reconnect backoff delay (Go `controlclient` passes `30*time.Second` to
309/// `NewBackoff` for `mapRoutine`).
310const MAP_BACKOFF_MAX: core::time::Duration = core::time::Duration::from_secs(30);
311
312impl ControlBackoff {
313 /// Reset the backoff after a poll that actually received a response, so the next failure starts
314 /// from the bottom of the schedule again. Crucially this is driven by *receiving a frame*, not
315 /// by the poll merely ending: a control server that accepts the request then closes the body
316 /// with zero frames never resets, so the clean-EOF path still backs off and escalates.
317 fn reset(&mut self) {
318 self.n = 0;
319 }
320
321 /// The next backoff delay, advancing the counter. `n²·10ms` capped at [`MAP_BACKOFF_MAX`], then
322 /// scaled by a random factor in `[0.5, 1.5)` (matching Go's `rand.Float64()+0.5`).
323 fn next_delay(&mut self, rng: &mut impl rand::RngExt) -> core::time::Duration {
324 // n² growth on a 10ms base, saturating so a long outage can't overflow the multiply.
325 let base_ms = u64::from(self.n)
326 .saturating_mul(u64::from(self.n))
327 .saturating_mul(10);
328 let capped = core::time::Duration::from_millis(base_ms).min(MAP_BACKOFF_MAX);
329 self.n = self.n.saturating_add(1);
330 let factor = rng.random::<f64>() + 0.5;
331 capped.mul_f64(factor)
332 }
333}
334
335/// Decide how long to wait before the next map-poll reconnect, resetting the schedule when the poll
336/// made progress. This is the **single, tested site of the load-bearing anti-DoS gate**: a poll
337/// that delivered at least one frame (`received_frame`) proves the whole connect→register→poll path
338/// works, so it resets the backoff and the next reconnect is immediate (Go resets its backoff on a
339/// received netmap); a poll that delivered **zero** frames — a clean-EOF hot-loop, a watchdog kill,
340/// or a frame the stream swallowed to `None` — does **not** reset, so a zero-progress control server
341/// escalates up the `n²·10ms` schedule instead of being hammered at full speed.
342///
343/// The gate lives in this named function rather than as a bare `backoff.reset()` buried in the poll
344/// loop precisely so it cannot be silently relocated: moving the reset onto the poll-*end* path
345/// (e.g. resetting unconditionally on `Ok(())`) would reintroduce the clean-EOF hot loop, and
346/// [`reconnect_delay_resets_only_when_a_frame_arrived`] would fail. The reset granularity is
347/// observationally identical to resetting the instant a frame arrives: the backoff is only ever
348/// read here (after the poll returns), so deferring the reset to this point changes nothing the
349/// schedule can observe.
350fn reconnect_delay_after_poll(
351 received_frame: bool,
352 backoff: &mut ControlBackoff,
353 rng: &mut impl rand::RngExt,
354) -> core::time::Duration {
355 if received_frame {
356 backoff.reset();
357 }
358 backoff.next_delay(rng)
359}
360
361/// Surface a mid-session re-auth URL to the embedder without disturbing the retry loop.
362///
363/// On a live map-poll re-register, control returning [`Error::MachineNotAuthorized`] means the
364/// node key lapsed (expiry/revoke) and the user must re-authorize at the carried URL. Unlike the
365/// initial-registration path (which the runtime's `check_auth` loop already surfaces), the live
366/// `run` loop only logs and backs off, dropping the URL — so we publish it into the
367/// embedder-owned `auth_url_tx` cell here (→ the runtime maps it to its "needs login" state). The
368/// caller still propagates the error so `run` backs off and retries; a later successful
369/// re-register clears the state for free (Go's `authRoutine` keeps `urlToVisit` and keeps polling).
370///
371/// **Only `MachineNotAuthorized` sets the cell.** `MachineNotAuthorized(None)` (no auth URL on
372/// offer) maps upstream to [`Error::Internal`]`(MachineAuthorization, _)`, not this variant, so it
373/// correctly does *not* set a (nonexistent) URL. The write is sticky via
374/// [`send_if_modified`](watch::Sender::send_if_modified): the cell is updated only when the URL
375/// actually differs from its current value, so a re-auth URL that persists across several failed
376/// re-register attempts does not thrash the cell or wake the runtime's bridge spuriously.
377///
378/// Factored out of [`run_once`] so this classify-then-surface decision is unit-testable against a
379/// plain `watch` channel without the real network round-trip [`crate::tokio::register`] performs.
380fn surface_reauth_url(err: &Error, auth_url_tx: &watch::Sender<Option<Url>>) {
381 if let Error::MachineNotAuthorized(url) = err {
382 auth_url_tx.send_if_modified(|current| {
383 if current.as_ref() == Some(url) {
384 false
385 } else {
386 *current = Some(url.clone());
387 true
388 }
389 });
390 }
391}
392
393/// Clear any pending re-auth URL (set the cell back to `None`), used when a re-register succeeds or
394/// a poll delivers a frame — both prove the node is authorized again so the surfaced URL is stale.
395/// Sticky `send_if_modified` so an already-`None` cell never wakes the runtime bridge. Clearing at
396/// register-success (rather than only at stream end) is what prevents a recovering poll from leaving
397/// a stale `Some(url)` for the bridge to re-read and clobber the netmap's `Running` flip with.
398fn clear_reauth_url(auth_url_tx: &watch::Sender<Option<Url>>) {
399 auth_url_tx.send_if_modified(|current| {
400 if current.is_some() {
401 *current = None;
402 true
403 } else {
404 false
405 }
406 });
407}
408
409pub async fn run(
410 state_tx: broadcast::Sender<Arc<StateUpdate>>,
411 mut command_rx: mpsc::Receiver<Command>,
412 control_url: Url,
413 node_keys: ts_keys::NodeState,
414 auth_key: Option<String>,
415 config: crate::Config,
416 auth_url_tx: watch::Sender<Option<Url>>,
417) {
418 let mut dialer = ControlDialer::default();
419 let mut session = MapSession::default();
420 let mut backoff = ControlBackoff::default();
421
422 loop {
423 // `run_once` sets this to `true` the moment it receives its first frame on this poll, so
424 // the flag survives an error that occurs *after* frames flowed (a poll that worked then
425 // dropped still counts as progress and reconnects promptly).
426 let mut received_frame = false;
427 let outcome = run_once(
428 &state_tx,
429 &mut command_rx,
430 &control_url,
431 &node_keys,
432 auth_key.as_deref(),
433 &config,
434 &mut dialer,
435 &mut session,
436 &mut received_frame,
437 &auth_url_tx,
438 )
439 .await;
440
441 // A poll that delivered any frame proves the connect→register→poll path works again, so a
442 // re-auth URL surfaced by an earlier failed re-register is stale: clear the cell. The
443 // primary clear is at register-success above (so the cell empties before the bridge can
444 // re-read a stale `Some(url)` on recovery); this is a secondary clear for the case where the
445 // stream itself delivered frames after a register that did not need re-auth. Sticky
446 // `send_if_modified` so we never wake the bridge unless the cell actually changes.
447 if received_frame {
448 clear_reauth_url(&auth_url_tx);
449 }
450
451 // Back off before every reconnect, on BOTH the clean-EOF and error paths — Go's
452 // `mapRoutine` runs `bo.BackOff(ctx, err)` after every poll regardless of how it ended.
453 // The clean-EOF arm (`Ok(())`) previously reconnected with ZERO delay: a control server
454 // that returns 200 then closes the body (or sends one frame the stream swallows to `None`)
455 // would spin a full-speed connect→TLS→Noise→register loop, hammering control and pinning
456 // CPU. The reset is gated on `received_frame` (see `reconnect_delay_after_poll`), so a
457 // healthy long-lived poll that delivered frames reconnects promptly while a zero-progress
458 // server escalates up the n²·10ms schedule.
459 let delay = reconnect_delay_after_poll(received_frame, &mut backoff, &mut rand::rng());
460 match outcome {
461 Ok(()) => {
462 tracing::warn!(
463 resume_handle = %session.handle,
464 resume_seq = session.seq,
465 backoff_ms = delay.as_millis() as u64,
466 "netmap stream ended without error, attempting restart"
467 );
468 }
469 Err(e) => {
470 tracing::error!(
471 error = %e,
472 resume_handle = %session.handle,
473 resume_seq = session.seq,
474 backoff_ms = delay.as_millis() as u64,
475 "netmap stream failed, attempting restart"
476 );
477 }
478 }
479 tokio::time::sleep(delay).await;
480 }
481}
482
483async fn run_once(
484 state_tx: &broadcast::Sender<Arc<StateUpdate>>,
485 command_rx: &mut mpsc::Receiver<Command>,
486 control_url: &Url,
487 node_keys: &ts_keys::NodeState,
488 auth_key: Option<&str>,
489 config: &crate::Config,
490 control_dialer: &mut ControlDialer,
491 session: &mut MapSession,
492 received_frame: &mut bool,
493 auth_url_tx: &watch::Sender<Option<Url>>,
494) -> Result<(), Error> {
495 let h2_client = control_dialer
496 .full_connect_next(
497 control_url,
498 &node_keys.machine_keys,
499 config.allow_http_key_fetch,
500 )
501 .await?;
502
503 // Re-register on every reconnect. On a mid-session re-auth (key expiry/revoke) control answers
504 // `MachineNotAuthorized(Some(url))`: surface that URL to the embedder (→ "needs login") via
505 // `surface_reauth_url`, then still propagate the error so `run` backs off and retries — Go's
506 // `authRoutine` keeps the URL and keeps polling, and a later successful re-register recovers.
507 match crate::tokio::register(config, control_url, auth_key, node_keys, &h2_client).await {
508 Ok(()) => {
509 // Re-register succeeded — clear any pending re-auth URL NOW (not at stream end), so a
510 // recovering poll empties the cell BEFORE the runtime bridge can wake and re-read a
511 // stale `Some(url)`. Without this, the bridge could clobber the netmap's `Running` flip
512 // back to `NeedsLogin` on recovery (a recovered node would show "needs login" until the
513 // next keep-alive).
514 clear_reauth_url(auth_url_tx);
515 }
516 Err(e) => {
517 let err = Error::from(e);
518 surface_reauth_url(&err, auth_url_tx);
519 return Err(err);
520 }
521 }
522
523 let client_name = config.format_client_name();
524 // Advertise-side VIP services: hash the validated hosted-service set into
525 // `HostInfo.ServicesHash`. Empty config -> empty hash -> wire field omitted (unchanged behavior).
526 let advertised_vip_services = config.advertised_vip_services();
527 let services_hash = crate::services_hash(&advertised_vip_services);
528 let builder = MapRequestBuilder::new(node_keys)
529 .keep_alive(true)
530 .omit_peers(false)
531 .stream(true)
532 .routable_ips(config.advertised_routes())
533 .client_info(&client_name, crate::PKG_VERSION)
534 .request_tags(config.tags.iter().map(String::as_str))
535 .services(config.advertised_services())
536 .services_hash(&services_hash)
537 .wire_ingress(config.wire_ingress)
538 .ingress_enabled(
539 config
540 .ingress_active
541 .load(core::sync::atomic::Ordering::Relaxed),
542 )
543 .map_session(&session.handle, session.seq);
544
545 let request = if let Some(hostname) = &config.hostname {
546 builder.hostname(hostname)
547 } else {
548 builder
549 }
550 .build();
551
552 let map_url = control_url.join("machine/map").unwrap();
553
554 let reader = send_map_request(request, &map_url, &h2_client).await?;
555
556 let mut stream = core::pin::pin!(map_stream(reader));
557 tracing::info!("netmap stream started");
558
559 loop {
560 tokio::select! {
561 state_update = stream.next() => {
562 let Some(state_update) = state_update else {
563 break;
564 };
565
566 // A frame arrived, so the full connect→register→poll path is demonstrably working:
567 // record it so `run` resets the reconnect backoff (Go resets on a received netmap).
568 // This is what makes the clean-EOF backoff in `run` safe — a server that delivers
569 // frames and later drops reconnects promptly, while one that closes the body with
570 // zero frames never reaches here and keeps escalating. Keep-alives (seq 0) count
571 // too: they prove the long poll is live. The reset decision itself lives in
572 // `reconnect_delay_after_poll` (the single tested gate); here we only flag progress.
573 *received_frame = true;
574
575 // Track the session cursor so a reconnect can resume after the last processed
576 // message instead of cold-restarting.
577 advance_session(session, &state_update);
578
579 let _ = handle_ping(&state_update, control_url, &h2_client, config).await;
580
581 if let Some(dial_plan) = &state_update.dial_plan
582 && control_dialer.update_dial_plan(dial_plan)
583 {
584 tracing::trace!(new_dial_plan = ?dial_plan);
585 }
586
587 // This errors only if there are no receivers. That's not semantically an error for
588 // us, so just ignore it.
589 let _ignore = state_tx.send(Arc::new(state_update));
590 }
591
592 command = command_rx.recv() => {
593 match command.unwrap() {
594 Command::SetDerpHomeRegion { id, latencies } => {
595 let mut builder = MapRequestBuilder::new(node_keys)
596 .keep_alive(false)
597 .omit_peers(true)
598 .stream(false)
599 .routable_ips(config.advertised_routes())
600 .preferred_derp(id)
601 .derp_latencies(latencies.iter().map(|(k, v)| (k.as_str(), *v)));
602
603 if let Some(hostname) = &config.hostname {
604 builder = builder.hostname(hostname);
605 }
606 let req = builder.build();
607
608 drop(send_map_request(req, &map_url, &h2_client).await?);
609 },
610 Command::SetEndpoints { endpoints } => {
611 let mut builder = MapRequestBuilder::new(node_keys)
612 .keep_alive(false)
613 .omit_peers(true)
614 .stream(false)
615 .routable_ips(config.advertised_routes())
616 .endpoints(endpoints);
617
618 if let Some(hostname) = &config.hostname {
619 builder = builder.hostname(hostname);
620 }
621 let req = builder.build();
622
623 drop(send_map_request(req, &map_url, &h2_client).await?);
624 },
625 Command::SetRoutableIPs { routes } => {
626 // The routes come from the command payload, NOT `config.advertised_routes()`:
627 // `config` is a frozen clone captured when this loop started, so a runtime
628 // route change can only reach here through the command itself.
629 let mut builder = MapRequestBuilder::new(node_keys)
630 .keep_alive(false)
631 .omit_peers(true)
632 .stream(false)
633 .routable_ips(routes);
634
635 if let Some(hostname) = &config.hostname {
636 builder = builder.hostname(hostname);
637 }
638 let req = builder.build();
639
640 drop(send_map_request(req, &map_url, &h2_client).await?);
641 },
642 Command::SetHostname { hostname } => {
643 // The hostname comes from the command payload, NOT `config.hostname`: the
644 // run-loop's `config` is a frozen clone, so a runtime hostname change can only
645 // reach here through the command. Preserve the advertised routes on this
646 // request so a hostname update doesn't transiently withdraw them.
647 let req = MapRequestBuilder::new(node_keys)
648 .keep_alive(false)
649 .omit_peers(true)
650 .stream(false)
651 .routable_ips(config.advertised_routes())
652 .hostname(&hostname)
653 .build();
654
655 drop(send_map_request(req, &map_url, &h2_client).await?);
656 },
657 }
658 }
659 }
660 }
661
662 Ok(())
663}
664
665fn netmap_stream(
666 rx: broadcast::Receiver<Arc<StateUpdate>>,
667) -> impl Stream<Item = Arc<StateUpdate>> + Send + Sync {
668 tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(async |x| {
669 if let Err(BroadcastStreamRecvError::Lagged(n)) = &x {
670 tracing::warn!(messages_missed = n, "map_stream lagged");
671 }
672
673 x.ok()
674 })
675}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680
681 fn update(handle: Option<&str>, seq: i64) -> StateUpdate {
682 StateUpdate {
683 session_handle: handle.map(ToOwned::to_owned),
684 seq,
685 derp: None,
686 node: None,
687 peer_update: None,
688 peer_patches: Vec::new(),
689 user_profiles: Vec::new(),
690 ping: None,
691 packetfilter: None,
692 cap_grants: None,
693 pop_browser_url: None,
694 dial_plan: None,
695 dns_config: None,
696 ssh_policy: None,
697 tka: None,
698 online_change: Default::default(),
699 peer_seen_change: Default::default(),
700 }
701 }
702
703 #[test]
704 fn advance_session_captures_handle_and_seq() {
705 let mut session = MapSession::default();
706
707 advance_session(&mut session, &update(Some("sess-1"), 5));
708
709 assert_eq!(session.handle, "sess-1");
710 assert_eq!(session.seq, 5);
711 }
712
713 #[test]
714 fn advance_session_keepalive_preserves_cursor() {
715 let mut session = MapSession {
716 handle: "sess-1".to_owned(),
717 seq: 7,
718 };
719
720 // Keep-alive: no handle, seq == 0. The cursor must not regress.
721 advance_session(&mut session, &update(None, 0));
722
723 assert_eq!(session.handle, "sess-1");
724 assert_eq!(session.seq, 7);
725 }
726
727 #[test]
728 fn advance_session_resets_seq_on_new_handle() {
729 let mut session = MapSession {
730 handle: "sess-1".to_owned(),
731 seq: 42,
732 };
733
734 // Control started a fresh session: a new handle must reset seq so we never carry a stale
735 // cursor from the prior session.
736 advance_session(&mut session, &update(Some("sess-2"), 0));
737
738 assert_eq!(session.handle, "sess-2");
739 assert_eq!(session.seq, 0);
740 }
741
742 #[test]
743 fn advance_session_same_handle_keeps_seq() {
744 let mut session = MapSession {
745 handle: "sess-1".to_owned(),
746 seq: 10,
747 };
748
749 // Re-issuing the same handle (not a new session) must not reset the cursor.
750 advance_session(&mut session, &update(Some("sess-1"), 0));
751
752 assert_eq!(session.handle, "sess-1");
753 assert_eq!(session.seq, 10);
754 }
755
756 #[test]
757 fn advance_session_rejects_overlong_handle() {
758 let mut session = MapSession::default();
759 let huge = "a".repeat(MAX_SESSION_HANDLE_LEN + 1);
760
761 advance_session(&mut session, &update(Some(&huge), 3));
762
763 // The handle is rejected (cursor handle stays empty); seq still advances.
764 assert_eq!(session.handle, "");
765 assert_eq!(session.seq, 3);
766 }
767
768 #[test]
769 fn advance_session_rejects_non_graphic_handle() {
770 let mut session = MapSession::default();
771
772 // A handle with control/whitespace bytes (log-injection risk) is rejected.
773 advance_session(&mut session, &update(Some("bad\nhandle"), 1));
774
775 assert_eq!(session.handle, "");
776 assert_eq!(session.seq, 1);
777 }
778
779 /// The backoff delay for a given `n` must always land in `[0.5, 1.5)` of the unjittered
780 /// `min(n²·10ms, MAP_BACKOFF_MAX)` — the Go `util/backoff` envelope. Probing each `n` with a
781 /// fresh fixed-`n` `ControlBackoff` (the same technique `ts_runtime` uses for `DerpBackoff`)
782 /// keeps the assertion independent of the process RNG.
783 #[test]
784 fn control_backoff_delay_is_within_the_go_jitter_envelope() {
785 let mut rng = rand::rng();
786 for n in 0u32..80 {
787 let unjittered_ms = u64::from(n)
788 .saturating_mul(u64::from(n))
789 .saturating_mul(10)
790 .min(MAP_BACKOFF_MAX.as_millis() as u64);
791 let unjittered = core::time::Duration::from_millis(unjittered_ms);
792
793 // 100 draws per n to exercise the jitter range.
794 for _ in 0..100 {
795 let mut probe = ControlBackoff { n };
796 let d = probe.next_delay(&mut rng);
797 if unjittered.is_zero() {
798 // n=0: the unjittered base is 0, so any jitter factor still yields exactly 0.
799 assert_eq!(d, core::time::Duration::ZERO, "n=0 delay must be zero");
800 } else {
801 assert!(
802 d >= unjittered.mul_f64(0.5) && d < unjittered.mul_f64(1.5),
803 "n={n}: delay {d:?} outside [0.5,1.5) x {unjittered:?}"
804 );
805 }
806 }
807 }
808 }
809
810 /// The delay grows monotonically (in expectation) until the cap, then is bounded by the cap's
811 /// jitter envelope. We assert the *unjittered* schedule directly via the cap: by `n` large
812 /// enough that `n²·10ms >= 30s`, every draw is `< 1.5 × 30s` and `>= 0.5 × 30s`.
813 #[test]
814 fn control_backoff_saturates_at_the_cap() {
815 let mut rng = rand::rng();
816 // 30_000ms / 10 = 3000 = 55² (54.7..), so n >= 55 is past the cap.
817 let mut probe = ControlBackoff { n: 1000 };
818 let d = probe.next_delay(&mut rng);
819 assert!(
820 d >= MAP_BACKOFF_MAX.mul_f64(0.5) && d < MAP_BACKOFF_MAX.mul_f64(1.5),
821 "saturated delay {d:?} outside the cap's jitter envelope"
822 );
823 // A huge `n` must not overflow the n²·10 multiply (saturating math).
824 let mut probe = ControlBackoff { n: u32::MAX };
825 let d = probe.next_delay(&mut rng);
826 assert!(d < MAP_BACKOFF_MAX.mul_f64(1.5), "overflowed at u32::MAX");
827 }
828
829 /// `reset()` returns the schedule to the bottom: after several advances, a reset makes the next
830 /// delay the `n=0` delay (which is zero — `0²·10ms`), and the counter climbs again from there.
831 #[test]
832 fn control_backoff_reset_returns_to_bottom() {
833 let mut rng = rand::rng();
834 let mut bo = ControlBackoff::default();
835
836 // Advance a few times.
837 for _ in 0..5 {
838 let _ = bo.next_delay(&mut rng);
839 }
840 assert!(bo.n > 0, "counter advanced");
841
842 bo.reset();
843 assert_eq!(bo.n, 0, "reset zeroes the counter");
844
845 // The n=0 draw is 0ms (0²·10ms · jitter == 0), and the counter advances to 1 afterward.
846 let d = bo.next_delay(&mut rng);
847 assert_eq!(d, core::time::Duration::ZERO, "n=0 delay is zero");
848 assert_eq!(bo.n, 1, "counter advances after the n=0 draw");
849 }
850
851 /// The load-bearing anti-DoS gate: [`reconnect_delay_after_poll`] resets the schedule ONLY when
852 /// the poll delivered a frame. A poll that delivered ZERO frames (the clean-EOF hot-loop, a
853 /// watchdog kill, or a frame swallowed to `None`) must NOT reset, so a zero-progress control
854 /// server escalates up the schedule instead of being hammered at full speed.
855 ///
856 /// This pins the gate that protects the whole fix: if a future change resets the backoff on the
857 /// poll-*end* path (e.g. unconditionally on `Ok(())`) instead of on frame receipt, this test
858 /// fails — the frameless branch would start returning the `n=0` (zero) delay.
859 #[test]
860 fn reconnect_delay_resets_only_when_a_frame_arrived() {
861 let mut rng = rand::rng();
862 let mut backoff = ControlBackoff::default();
863
864 // A run of frameless polls (zero progress) must escalate: each delay strictly larger than
865 // the last in expectation, and crucially NONE collapses back to the n=0 zero delay.
866 let mut last_n = backoff.n;
867 for i in 0..6 {
868 let d = reconnect_delay_after_poll(false, &mut backoff, &mut rng);
869 assert!(
870 backoff.n > last_n,
871 "frameless poll {i} must advance the counter (no reset)"
872 );
873 last_n = backoff.n;
874 if i > 0 {
875 // Past n=0, a frameless reconnect is never the zero delay (the hot-loop we fixed).
876 assert!(
877 d > core::time::Duration::ZERO,
878 "frameless reconnect {i} must be delayed, not a 0ms spin"
879 );
880 }
881 }
882
883 // Now a poll that DID receive a frame resets the schedule: the next delay is the n=0 zero
884 // delay (immediate reconnect for a healthy, progressing poll), and the counter is back to 1.
885 let d = reconnect_delay_after_poll(true, &mut backoff, &mut rng);
886 assert_eq!(
887 d,
888 core::time::Duration::ZERO,
889 "a poll that delivered a frame resets to the immediate (n=0) reconnect"
890 );
891 assert_eq!(backoff.n, 1, "reset then one draw leaves the counter at 1");
892 }
893
894 fn auth_url() -> Url {
895 "https://login.example/a/abc123".parse().unwrap()
896 }
897
898 /// A mid-session `MachineNotAuthorized(url)` sets the re-auth cell to `Some(url)` — the exact
899 /// drop the bug fixes (the live `run` loop used to discard this URL and only log+backoff).
900 #[test]
901 fn mid_session_machine_not_authorized_sets_auth_url_cell() {
902 let (tx, rx) = watch::channel(None);
903 let url = auth_url();
904
905 surface_reauth_url(&Error::MachineNotAuthorized(url.clone()), &tx);
906
907 assert_eq!(*rx.borrow(), Some(url));
908 }
909
910 /// `MachineNotAuthorized(None)` (control offered no auth URL) maps upstream to
911 /// `Error::Internal(MachineAuthorization, _)`, NOT `Error::MachineNotAuthorized`, so the helper
912 /// must leave the cell untouched. Built from the *exact* upstream mapping (register.rs
913 /// `From<RegistrationError> for Error`) so this stays honest if that mapping ever changes.
914 #[test]
915 fn machine_not_authorized_none_does_not_set_url_cell() {
916 let (tx, rx) = watch::channel(None);
917 let err =
918 Error::from(crate::tokio::register::RegistrationError::MachineNotAuthorized(None));
919 // Confirm the mapping is the non-URL internal variant (the precondition for the assertion).
920 assert!(matches!(
921 err,
922 Error::Internal(crate::InternalErrorKind::MachineAuthorization, _)
923 ));
924
925 surface_reauth_url(&err, &tx);
926
927 assert_eq!(
928 *rx.borrow(),
929 None,
930 "no auth URL on offer must not set the cell"
931 );
932 }
933
934 /// A non-auth error (e.g. a transient network failure) must never set the cell either — only
935 /// `MachineNotAuthorized` is a re-auth signal.
936 #[test]
937 fn non_auth_error_does_not_set_url_cell() {
938 let (tx, rx) = watch::channel(None);
939
940 surface_reauth_url(&Error::NetworkError(crate::Operation::Registration), &tx);
941
942 assert_eq!(*rx.borrow(), None);
943 }
944
945 /// The clear path: a re-register success (or a poll that delivered a frame) means a
946 /// previously-surfaced re-auth URL is stale, so `clear_reauth_url` resets the cell to `None`.
947 /// This is the recovery half of the fix — clearing at register-success (run_once's `Ok` arm)
948 /// empties the cell before the runtime bridge can re-read a stale `Some(url)` and clobber the
949 /// netmap's `Running` flip back to `NeedsLogin` (the review's recovery-race finding).
950 #[test]
951 fn clear_reauth_url_resets_a_pending_url() {
952 let (tx, rx) = watch::channel(Some(auth_url()));
953 clear_reauth_url(&tx);
954 assert_eq!(*rx.borrow(), None);
955 }
956
957 /// Clearing an already-`None` cell is a no-op that does NOT notify (so the runtime bridge isn't
958 /// woken spuriously on every frame of a healthy, never-deauthorized session).
959 #[test]
960 fn clear_reauth_url_on_empty_cell_does_not_notify() {
961 let (tx, rx) = watch::channel::<Option<Url>>(None);
962 clear_reauth_url(&tx);
963 // No change was published, so the receiver sees nothing new.
964 assert!(!rx.has_changed().unwrap());
965 assert_eq!(*rx.borrow(), None);
966 }
967
968 /// Recovery sequence at the cell level: surface a URL (failed re-register), then clear it
969 /// (the next re-register succeeds). The terminal cell state is `None`, so when the bridge next
970 /// reads it there is no stale `Some(url)` to re-assert `NeedsLogin` from.
971 #[test]
972 fn surface_then_clear_leaves_cell_empty() {
973 let (tx, rx) = watch::channel(None);
974 let url = auth_url();
975
976 surface_reauth_url(&Error::MachineNotAuthorized(url.clone()), &tx);
977 assert_eq!(*rx.borrow(), Some(url));
978
979 clear_reauth_url(&tx); // models run_once's `Ok(())` arm on the recovering poll
980 assert_eq!(*rx.borrow(), None);
981 }
982}