Skip to main content

ts_control/tokio/
client.rs

1use alloc::{collections::BTreeMap, sync::Arc};
2
3use futures_util::{Stream, StreamExt};
4use tokio::{
5    sync::{broadcast, mpsc, watch},
6    task::JoinSet,
7};
8use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
9use url::Url;
10
11use crate::{
12    ControlDialer, Error,
13    map_request_builder::MapRequestBuilder,
14    tokio::{
15        map_stream::{StateUpdate, map_stream, send_map_request},
16        ping::handle_ping,
17    },
18};
19
20/// A client to communicate with control.
21#[derive(Debug)]
22pub struct AsyncControlClient {
23    base_url: Url,
24    state_tx: broadcast::Sender<Arc<StateUpdate>>,
25    command_tx: mpsc::Sender<Command>,
26    _tasks: JoinSet<()>,
27}
28
29impl AsyncControlClient {
30    /// Check whether it is possible to login with the given config, node keys, and auth
31    /// key.
32    pub async fn check_auth(
33        config: &crate::Config,
34        node_keys: &ts_keys::NodeState,
35        auth_key: Option<&str>,
36    ) -> Result<(), Error> {
37        let control_url = &config.server_url;
38
39        let h2_client = crate::tokio::connect(
40            control_url,
41            &node_keys.machine_keys,
42            config.allow_http_key_fetch,
43        )
44        .await?;
45
46        crate::tokio::register(config, control_url, auth_key, node_keys, &h2_client).await?;
47
48        Ok(())
49    }
50
51    /// Connects to the control plane, registers this Tailscale node, and starts handling the
52    /// message stream from control.
53    ///
54    /// The second element of the return value is a netmap stream which started listening
55    /// _before_ the client connected, i.e. it will not miss any updates from control.
56    ///
57    /// `auth_url_tx` is the embedder-owned "current pending re-auth URL" cell: if the live
58    /// map-poll loop hits a mid-session re-auth (control returns
59    /// [`MachineNotAuthorized`](crate::Error::MachineNotAuthorized) on a re-register because the
60    /// node key expired or was revoked), `run` publishes that URL here without tearing the loop
61    /// down, so the embedder can prompt the user to re-authorize while registration keeps retrying.
62    /// The caller creates the channel and keeps the [`Receiver`](watch::Receiver) (this crate must
63    /// not depend on the embedder's device-state types, so the cell carries a bare `Option<Url>`).
64    #[tracing::instrument(skip_all, fields(control_url = %config.server_url))]
65    pub async fn connect(
66        config: &crate::Config,
67        node_keys: &ts_keys::NodeState,
68        auth_key: Option<&str>,
69        auth_url_tx: watch::Sender<Option<Url>>,
70    ) -> Result<
71        (
72            Self,
73            impl Stream<Item = Arc<StateUpdate>> + Send + Sync + use<>,
74        ),
75        Error,
76    > {
77        let control_url = &config.server_url;
78        let mut tasks = JoinSet::new();
79
80        let h2_client = crate::tokio::connect(
81            control_url,
82            &node_keys.machine_keys,
83            config.allow_http_key_fetch,
84        )
85        .await?;
86        tracing::info!("connected to control, registering");
87
88        crate::tokio::register(config, control_url, auth_key, node_keys, &h2_client).await?;
89
90        tracing::info!("registered, starting netmap stream");
91
92        let (state_tx, state_rx) = broadcast::channel(32);
93        let (command_tx, command_rx) = mpsc::channel(32);
94
95        tasks.spawn({
96            let state_tx = state_tx.clone();
97            let control_url = control_url.clone();
98            let node_keys = node_keys.clone();
99            let auth_key = auth_key.map(ToOwned::to_owned);
100            let config = config.clone();
101
102            async move {
103                run(
104                    state_tx,
105                    command_rx,
106                    control_url.clone(),
107                    node_keys.clone(),
108                    auth_key,
109                    config,
110                    auth_url_tx,
111                )
112                .await
113            }
114        });
115
116        Ok((
117            Self {
118                base_url: control_url.clone(),
119                state_tx,
120                command_tx,
121                _tasks: tasks,
122            },
123            netmap_stream(state_rx),
124        ))
125    }
126
127    /// Set the DERP home region for this node.
128    #[tracing::instrument(skip_all, fields(map_url = %self.map_url(), %region_id), level = "trace")]
129    pub async fn set_home_region<'c>(
130        &mut self,
131        region_id: ts_derp::RegionId,
132        latencies: impl IntoIterator<Item = (&'c str, f64)>,
133    ) {
134        tracing::trace!(region = %region_id, "reporting home derp to control server");
135
136        if let Err(e) = self
137            .command_tx
138            .send(Command::SetDerpHomeRegion {
139                id: region_id,
140                latencies: latencies
141                    .into_iter()
142                    .map(|(name, sample)| (name.to_owned(), sample))
143                    .collect(),
144            })
145            .await
146        {
147            tracing::error!(error = %e, "setting home derp region");
148        }
149    }
150
151    /// Advertise this node's magicsock UDP endpoints (ip:port candidates) to the control server
152    /// so peers can learn where to attempt direct connections.
153    #[tracing::instrument(skip_all, fields(map_url = %self.map_url(), n_endpoints), level = "trace")]
154    pub async fn set_endpoints(&mut self, endpoints: Vec<ts_control_serde::Endpoint>) {
155        tracing::Span::current().record("n_endpoints", endpoints.len());
156        tracing::trace!("reporting magicsock endpoints to control server");
157
158        if let Err(e) = self
159            .command_tx
160            .send(Command::SetEndpoints { endpoints })
161            .await
162        {
163            tracing::error!(error = %e, "setting endpoints");
164        }
165    }
166
167    /// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) to control mid-session
168    /// — the wire half of a runtime `set_advertise_routes`. `routes` is the final advertised set
169    /// (already filtered); it is sent on the live map-poll connection without tearing down the
170    /// long-poll, exactly like [`set_endpoints`](Self::set_endpoints).
171    #[tracing::instrument(skip_all, fields(map_url = %self.map_url(), n_routes = routes.len()), level = "trace")]
172    pub async fn set_routable_ips(&mut self, routes: Vec<ipnet::IpNet>) {
173        tracing::trace!("reporting routable IPs to control server");
174
175        if let Err(e) = self
176            .command_tx
177            .send(Command::SetRoutableIPs { routes })
178            .await
179        {
180            tracing::error!(error = %e, "setting routable IPs");
181        }
182    }
183
184    /// Update this node's `Hostinfo.Hostname` to `hostname` at control mid-session — the wire half of
185    /// a runtime `set_hostname`. Sent on the live map-poll connection without tearing down the
186    /// long-poll, exactly like [`set_routable_ips`](Self::set_routable_ips).
187    #[tracing::instrument(skip_all, fields(map_url = %self.map_url()), level = "trace")]
188    pub async fn set_hostname(&mut self, hostname: String) {
189        tracing::trace!("reporting hostname to control server");
190
191        if let Err(e) = self
192            .command_tx
193            .send(Command::SetHostname { hostname })
194            .await
195        {
196            tracing::error!(error = %e, "setting hostname");
197        }
198    }
199
200    /// Construct the URL that should be used to fetch the netmap.
201    pub fn map_url(&self) -> Url {
202        self.base_url
203            .join("machine/map")
204            .expect("map_url was parsed without issue before")
205    }
206
207    /// Get a stream of all netmap updates.
208    pub fn netmap_stream(&self) -> impl Stream<Item = Arc<StateUpdate>> + Send + Sync + use<> {
209        netmap_stream(self.state_tx.subscribe())
210    }
211}
212
213// Every variant is a "set X on the next map request" command, so they all legitimately share the
214// `Set` prefix (each mirrors a control-side field a side MapRequest carries). The shared prefix is
215// the point, not an accident — silence the variant-name lint rather than rename to something less
216// clear.
217#[allow(clippy::enum_variant_names)]
218#[derive(Debug)]
219pub enum Command {
220    SetDerpHomeRegion {
221        id: ts_derp::RegionId,
222        latencies: BTreeMap<String, f64>,
223    },
224    SetEndpoints {
225        endpoints: Vec<ts_control_serde::Endpoint>,
226    },
227    /// Re-advertise this node's routable IP prefixes (`Hostinfo.RoutableIPs`) mid-session — the wire
228    /// half of a runtime `set_advertise_routes`. The routes travel IN the command (not read from the
229    /// run-loop's frozen `config` clone), already filtered to the final advertised set the caller
230    /// wants control to see.
231    SetRoutableIPs { routes: Vec<ipnet::IpNet> },
232    /// Update this node's `Hostinfo.Hostname` mid-session — the wire half of a runtime
233    /// `set_hostname`. The hostname travels IN the command (the run-loop's `config` clone is frozen,
234    /// so a runtime change can only reach here through the command). Hostname is display-only, so
235    /// there is no local/dataplane half; control reflects the new name on the next netmap.
236    SetHostname { hostname: String },
237}
238
239/// Identifies a map-poll session so a reconnect can resume the delta stream instead of
240/// cold-restarting. Control assigns the `handle` in the first [`MapResponse`] of a session and
241/// stamps each response with a monotonically increasing `seq`; on reconnect we offer the last
242/// `(handle, seq)` we processed and control either resumes after `seq` or ignores it and starts a
243/// fresh session with a full netmap (both are safe — see [`MapRequestBuilder::map_session`]).
244#[derive(Clone, Default)]
245struct MapSession {
246    handle: String,
247    seq: i64,
248}
249
250/// Upper bound on the control-supplied session handle we will store/echo. The handle is an opaque
251/// token; anything beyond this is rejected to avoid unbounded memory growth and log injection.
252const MAX_SESSION_HANDLE_LEN: usize = 256;
253
254/// Advance the resume cursor from a freshly received [`StateUpdate`]. The handle is assigned once
255/// (first response of a session); `seq` advances on substantive responses and is 0 on keep-alives.
256///
257/// If control issues a *new* handle (a fresh session), `seq` is reset to 0 so we never carry a
258/// stale cursor from the prior session into the new one. A control-supplied handle that is empty,
259/// over [`MAX_SESSION_HANDLE_LEN`], or contains non-`ascii_graphic` bytes is rejected (the cursor
260/// is left unchanged) to bound memory and prevent log injection.
261fn advance_session(session: &mut MapSession, update: &StateUpdate) {
262    if let Some(handle) = &update.session_handle {
263        let valid = !handle.is_empty()
264            && handle.len() <= MAX_SESSION_HANDLE_LEN
265            && handle.bytes().all(|b| b.is_ascii_graphic());
266        if valid && *handle != session.handle {
267            session.handle = handle.clone();
268            session.seq = 0;
269        } else if !valid {
270            tracing::warn!(
271                handle_len = handle.len(),
272                "control sent an invalid map-session handle; ignoring it"
273            );
274        }
275    }
276    if update.seq != 0 {
277        session.seq = update.seq;
278    }
279}
280
281/// Reconnect backoff for the map-poll loop, mirroring Go's `util/backoff` (the schedule
282/// `controlclient`'s `mapRoutine` uses): the delay grows as `n²·10ms`, is capped at
283/// [`MAP_BACKOFF_MAX`], and is jittered to a uniform `[0.5×, 1.5×)` to avoid a thundering herd of
284/// clients reconnecting in lock-step against a control server that just came back. `n` increments
285/// on each consecutive failed/empty poll and resets to 0 once a poll has actually delivered a
286/// response, so a flaky control plane is retried with increasing spacing instead of a flat 2 Hz
287/// storm (or, on the clean-EOF path, an unthrottled hot loop).
288///
289/// This is the same shape as `ts_runtime`'s `DerpBackoff`; it is duplicated here (rather than
290/// shared) because `ts_control` is an upstream crate that cannot depend on `ts_runtime`, and the
291/// cap differs (Go passes `30*time.Second` to `NewBackoff` for `mapRoutine`, vs `5s` for the DERP
292/// readers).
293///
294/// Residual (intentional, matches Go): because *any* received frame — including a bare keep-alive
295/// (`seq == 0`) — resets the schedule, a control server that sends one frame then closes the body
296/// can hold the backoff at the bottom and drive a reconnect every cycle. Go's `mapRoutine` has the
297/// identical property (it resets on any received `MapResponse`) and no max-consecutive-reconnect
298/// cap, relying on the fact that the node already has a machine-key relationship with the control
299/// server. The pre-fix behavior was a *busy* spin (zero handshake); the residual is now one full
300/// connect→TLS→Noise→register per cycle, symmetric in cost to the attacker — a large improvement,
301/// and faithful parity rather than a divergence. Gating the reset on `seq != 0` would punish a
302/// healthy keep-alive-only idle poll, so it is deliberately not done.
303#[derive(Debug, Default)]
304struct ControlBackoff {
305    n: u32,
306}
307
308/// Cap on the map-poll reconnect backoff delay (Go `controlclient` passes `30*time.Second` to
309/// `NewBackoff` for `mapRoutine`).
310const MAP_BACKOFF_MAX: core::time::Duration = core::time::Duration::from_secs(30);
311
312impl ControlBackoff {
313    /// Reset the backoff after a poll that actually received a response, so the next failure starts
314    /// from the bottom of the schedule again. Crucially this is driven by *receiving a frame*, not
315    /// by the poll merely ending: a control server that accepts the request then closes the body
316    /// with zero frames never resets, so the clean-EOF path still backs off and escalates.
317    fn reset(&mut self) {
318        self.n = 0;
319    }
320
321    /// The next backoff delay, advancing the counter. `n²·10ms` capped at [`MAP_BACKOFF_MAX`], then
322    /// scaled by a random factor in `[0.5, 1.5)` (matching Go's `rand.Float64()+0.5`).
323    fn next_delay(&mut self, rng: &mut impl rand::RngExt) -> core::time::Duration {
324        // n² growth on a 10ms base, saturating so a long outage can't overflow the multiply.
325        let base_ms = u64::from(self.n)
326            .saturating_mul(u64::from(self.n))
327            .saturating_mul(10);
328        let capped = core::time::Duration::from_millis(base_ms).min(MAP_BACKOFF_MAX);
329        self.n = self.n.saturating_add(1);
330        let factor = rng.random::<f64>() + 0.5;
331        capped.mul_f64(factor)
332    }
333}
334
335/// Decide how long to wait before the next map-poll reconnect, resetting the schedule when the poll
336/// made progress. This is the **single, tested site of the load-bearing anti-DoS gate**: a poll
337/// that delivered at least one frame (`received_frame`) proves the whole connect→register→poll path
338/// works, so it resets the backoff and the next reconnect is immediate (Go resets its backoff on a
339/// received netmap); a poll that delivered **zero** frames — a clean-EOF hot-loop, a watchdog kill,
340/// or a frame the stream swallowed to `None` — does **not** reset, so a zero-progress control server
341/// escalates up the `n²·10ms` schedule instead of being hammered at full speed.
342///
343/// The gate lives in this named function rather than as a bare `backoff.reset()` buried in the poll
344/// loop precisely so it cannot be silently relocated: moving the reset onto the poll-*end* path
345/// (e.g. resetting unconditionally on `Ok(())`) would reintroduce the clean-EOF hot loop, and
346/// [`reconnect_delay_resets_only_when_a_frame_arrived`] would fail. The reset granularity is
347/// observationally identical to resetting the instant a frame arrives: the backoff is only ever
348/// read here (after the poll returns), so deferring the reset to this point changes nothing the
349/// schedule can observe.
350fn reconnect_delay_after_poll(
351    received_frame: bool,
352    backoff: &mut ControlBackoff,
353    rng: &mut impl rand::RngExt,
354) -> core::time::Duration {
355    if received_frame {
356        backoff.reset();
357    }
358    backoff.next_delay(rng)
359}
360
361/// Surface a mid-session re-auth URL to the embedder without disturbing the retry loop.
362///
363/// On a live map-poll re-register, control returning [`Error::MachineNotAuthorized`] means the
364/// node key lapsed (expiry/revoke) and the user must re-authorize at the carried URL. Unlike the
365/// initial-registration path (which the runtime's `check_auth` loop already surfaces), the live
366/// `run` loop only logs and backs off, dropping the URL — so we publish it into the
367/// embedder-owned `auth_url_tx` cell here (→ the runtime maps it to its "needs login" state). The
368/// caller still propagates the error so `run` backs off and retries; a later successful
369/// re-register clears the state for free (Go's `authRoutine` keeps `urlToVisit` and keeps polling).
370///
371/// **Only `MachineNotAuthorized` sets the cell.** `MachineNotAuthorized(None)` (no auth URL on
372/// offer) maps upstream to [`Error::Internal`]`(MachineAuthorization, _)`, not this variant, so it
373/// correctly does *not* set a (nonexistent) URL. The write is sticky via
374/// [`send_if_modified`](watch::Sender::send_if_modified): the cell is updated only when the URL
375/// actually differs from its current value, so a re-auth URL that persists across several failed
376/// re-register attempts does not thrash the cell or wake the runtime's bridge spuriously.
377///
378/// Factored out of [`run_once`] so this classify-then-surface decision is unit-testable against a
379/// plain `watch` channel without the real network round-trip [`crate::tokio::register`] performs.
380fn surface_reauth_url(err: &Error, auth_url_tx: &watch::Sender<Option<Url>>) {
381    if let Error::MachineNotAuthorized(url) = err {
382        auth_url_tx.send_if_modified(|current| {
383            if current.as_ref() == Some(url) {
384                false
385            } else {
386                *current = Some(url.clone());
387                true
388            }
389        });
390    }
391}
392
393/// Clear any pending re-auth URL (set the cell back to `None`), used when a re-register succeeds or
394/// a poll delivers a frame — both prove the node is authorized again so the surfaced URL is stale.
395/// Sticky `send_if_modified` so an already-`None` cell never wakes the runtime bridge. Clearing at
396/// register-success (rather than only at stream end) is what prevents a recovering poll from leaving
397/// a stale `Some(url)` for the bridge to re-read and clobber the netmap's `Running` flip with.
398fn clear_reauth_url(auth_url_tx: &watch::Sender<Option<Url>>) {
399    auth_url_tx.send_if_modified(|current| {
400        if current.is_some() {
401            *current = None;
402            true
403        } else {
404            false
405        }
406    });
407}
408
409pub async fn run(
410    state_tx: broadcast::Sender<Arc<StateUpdate>>,
411    mut command_rx: mpsc::Receiver<Command>,
412    control_url: Url,
413    node_keys: ts_keys::NodeState,
414    auth_key: Option<String>,
415    config: crate::Config,
416    auth_url_tx: watch::Sender<Option<Url>>,
417) {
418    let mut dialer = ControlDialer::default();
419    let mut session = MapSession::default();
420    let mut backoff = ControlBackoff::default();
421
422    loop {
423        // `run_once` sets this to `true` the moment it receives its first frame on this poll, so
424        // the flag survives an error that occurs *after* frames flowed (a poll that worked then
425        // dropped still counts as progress and reconnects promptly).
426        let mut received_frame = false;
427        let outcome = run_once(
428            &state_tx,
429            &mut command_rx,
430            &control_url,
431            &node_keys,
432            auth_key.as_deref(),
433            &config,
434            &mut dialer,
435            &mut session,
436            &mut received_frame,
437            &auth_url_tx,
438        )
439        .await;
440
441        // A poll that delivered any frame proves the connect→register→poll path works again, so a
442        // re-auth URL surfaced by an earlier failed re-register is stale: clear the cell. The
443        // primary clear is at register-success above (so the cell empties before the bridge can
444        // re-read a stale `Some(url)` on recovery); this is a secondary clear for the case where the
445        // stream itself delivered frames after a register that did not need re-auth. Sticky
446        // `send_if_modified` so we never wake the bridge unless the cell actually changes.
447        if received_frame {
448            clear_reauth_url(&auth_url_tx);
449        }
450
451        // Back off before every reconnect, on BOTH the clean-EOF and error paths — Go's
452        // `mapRoutine` runs `bo.BackOff(ctx, err)` after every poll regardless of how it ended.
453        // The clean-EOF arm (`Ok(())`) previously reconnected with ZERO delay: a control server
454        // that returns 200 then closes the body (or sends one frame the stream swallows to `None`)
455        // would spin a full-speed connect→TLS→Noise→register loop, hammering control and pinning
456        // CPU. The reset is gated on `received_frame` (see `reconnect_delay_after_poll`), so a
457        // healthy long-lived poll that delivered frames reconnects promptly while a zero-progress
458        // server escalates up the n²·10ms schedule.
459        let delay = reconnect_delay_after_poll(received_frame, &mut backoff, &mut rand::rng());
460        match outcome {
461            Ok(()) => {
462                tracing::warn!(
463                    resume_handle = %session.handle,
464                    resume_seq = session.seq,
465                    backoff_ms = delay.as_millis() as u64,
466                    "netmap stream ended without error, attempting restart"
467                );
468            }
469            Err(e) => {
470                tracing::error!(
471                    error = %e,
472                    resume_handle = %session.handle,
473                    resume_seq = session.seq,
474                    backoff_ms = delay.as_millis() as u64,
475                    "netmap stream failed, attempting restart"
476                );
477            }
478        }
479        tokio::time::sleep(delay).await;
480    }
481}
482
483async fn run_once(
484    state_tx: &broadcast::Sender<Arc<StateUpdate>>,
485    command_rx: &mut mpsc::Receiver<Command>,
486    control_url: &Url,
487    node_keys: &ts_keys::NodeState,
488    auth_key: Option<&str>,
489    config: &crate::Config,
490    control_dialer: &mut ControlDialer,
491    session: &mut MapSession,
492    received_frame: &mut bool,
493    auth_url_tx: &watch::Sender<Option<Url>>,
494) -> Result<(), Error> {
495    let h2_client = control_dialer
496        .full_connect_next(
497            control_url,
498            &node_keys.machine_keys,
499            config.allow_http_key_fetch,
500        )
501        .await?;
502
503    // Re-register on every reconnect. On a mid-session re-auth (key expiry/revoke) control answers
504    // `MachineNotAuthorized(Some(url))`: surface that URL to the embedder (→ "needs login") via
505    // `surface_reauth_url`, then still propagate the error so `run` backs off and retries — Go's
506    // `authRoutine` keeps the URL and keeps polling, and a later successful re-register recovers.
507    match crate::tokio::register(config, control_url, auth_key, node_keys, &h2_client).await {
508        Ok(()) => {
509            // Re-register succeeded — clear any pending re-auth URL NOW (not at stream end), so a
510            // recovering poll empties the cell BEFORE the runtime bridge can wake and re-read a
511            // stale `Some(url)`. Without this, the bridge could clobber the netmap's `Running` flip
512            // back to `NeedsLogin` on recovery (a recovered node would show "needs login" until the
513            // next keep-alive).
514            clear_reauth_url(auth_url_tx);
515        }
516        Err(e) => {
517            let err = Error::from(e);
518            surface_reauth_url(&err, auth_url_tx);
519            return Err(err);
520        }
521    }
522
523    let client_name = config.format_client_name();
524    // Advertise-side VIP services: hash the validated hosted-service set into
525    // `HostInfo.ServicesHash`. Empty config -> empty hash -> wire field omitted (unchanged behavior).
526    let advertised_vip_services = config.advertised_vip_services();
527    let services_hash = crate::services_hash(&advertised_vip_services);
528    let builder = MapRequestBuilder::new(node_keys)
529        .keep_alive(true)
530        .omit_peers(false)
531        .stream(true)
532        .routable_ips(config.advertised_routes())
533        .client_info(&client_name, crate::PKG_VERSION)
534        .request_tags(config.tags.iter().map(String::as_str))
535        .services(config.advertised_services())
536        .services_hash(&services_hash)
537        .wire_ingress(config.wire_ingress)
538        .ingress_enabled(
539            config
540                .ingress_active
541                .load(core::sync::atomic::Ordering::Relaxed),
542        )
543        .map_session(&session.handle, session.seq);
544
545    let request = if let Some(hostname) = &config.hostname {
546        builder.hostname(hostname)
547    } else {
548        builder
549    }
550    .build();
551
552    let map_url = control_url.join("machine/map").unwrap();
553
554    let reader = send_map_request(request, &map_url, &h2_client).await?;
555
556    let mut stream = core::pin::pin!(map_stream(reader));
557    tracing::info!("netmap stream started");
558
559    loop {
560        tokio::select! {
561            state_update = stream.next() => {
562                let Some(state_update) = state_update else {
563                    break;
564                };
565
566                // A frame arrived, so the full connect→register→poll path is demonstrably working:
567                // record it so `run` resets the reconnect backoff (Go resets on a received netmap).
568                // This is what makes the clean-EOF backoff in `run` safe — a server that delivers
569                // frames and later drops reconnects promptly, while one that closes the body with
570                // zero frames never reaches here and keeps escalating. Keep-alives (seq 0) count
571                // too: they prove the long poll is live. The reset decision itself lives in
572                // `reconnect_delay_after_poll` (the single tested gate); here we only flag progress.
573                *received_frame = true;
574
575                // Track the session cursor so a reconnect can resume after the last processed
576                // message instead of cold-restarting.
577                advance_session(session, &state_update);
578
579                let _ = handle_ping(&state_update, control_url, &h2_client, config).await;
580
581                if let Some(dial_plan) = &state_update.dial_plan
582                    && control_dialer.update_dial_plan(dial_plan)
583                {
584                    tracing::trace!(new_dial_plan = ?dial_plan);
585                }
586
587                // This errors only if there are no receivers. That's not semantically an error for
588                // us, so just ignore it.
589                let _ignore = state_tx.send(Arc::new(state_update));
590            }
591
592            command = command_rx.recv() => {
593                match command.unwrap() {
594                    Command::SetDerpHomeRegion { id, latencies } => {
595                        let mut builder = MapRequestBuilder::new(node_keys)
596                            .keep_alive(false)
597                            .omit_peers(true)
598                            .stream(false)
599                            .routable_ips(config.advertised_routes())
600                            .preferred_derp(id)
601                            .derp_latencies(latencies.iter().map(|(k, v)| (k.as_str(), *v)));
602
603                        if let Some(hostname) = &config.hostname {
604                            builder = builder.hostname(hostname);
605                        }
606                        let req = builder.build();
607
608                        drop(send_map_request(req, &map_url, &h2_client).await?);
609                    },
610                    Command::SetEndpoints { endpoints } => {
611                        let mut builder = MapRequestBuilder::new(node_keys)
612                            .keep_alive(false)
613                            .omit_peers(true)
614                            .stream(false)
615                            .routable_ips(config.advertised_routes())
616                            .endpoints(endpoints);
617
618                        if let Some(hostname) = &config.hostname {
619                            builder = builder.hostname(hostname);
620                        }
621                        let req = builder.build();
622
623                        drop(send_map_request(req, &map_url, &h2_client).await?);
624                    },
625                    Command::SetRoutableIPs { routes } => {
626                        // The routes come from the command payload, NOT `config.advertised_routes()`:
627                        // `config` is a frozen clone captured when this loop started, so a runtime
628                        // route change can only reach here through the command itself.
629                        let mut builder = MapRequestBuilder::new(node_keys)
630                            .keep_alive(false)
631                            .omit_peers(true)
632                            .stream(false)
633                            .routable_ips(routes);
634
635                        if let Some(hostname) = &config.hostname {
636                            builder = builder.hostname(hostname);
637                        }
638                        let req = builder.build();
639
640                        drop(send_map_request(req, &map_url, &h2_client).await?);
641                    },
642                    Command::SetHostname { hostname } => {
643                        // The hostname comes from the command payload, NOT `config.hostname`: the
644                        // run-loop's `config` is a frozen clone, so a runtime hostname change can only
645                        // reach here through the command. Preserve the advertised routes on this
646                        // request so a hostname update doesn't transiently withdraw them.
647                        let req = MapRequestBuilder::new(node_keys)
648                            .keep_alive(false)
649                            .omit_peers(true)
650                            .stream(false)
651                            .routable_ips(config.advertised_routes())
652                            .hostname(&hostname)
653                            .build();
654
655                        drop(send_map_request(req, &map_url, &h2_client).await?);
656                    },
657                }
658            }
659        }
660    }
661
662    Ok(())
663}
664
665fn netmap_stream(
666    rx: broadcast::Receiver<Arc<StateUpdate>>,
667) -> impl Stream<Item = Arc<StateUpdate>> + Send + Sync {
668    tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(async |x| {
669        if let Err(BroadcastStreamRecvError::Lagged(n)) = &x {
670            tracing::warn!(messages_missed = n, "map_stream lagged");
671        }
672
673        x.ok()
674    })
675}
676
677#[cfg(test)]
678mod tests {
679    use super::*;
680
681    fn update(handle: Option<&str>, seq: i64) -> StateUpdate {
682        StateUpdate {
683            session_handle: handle.map(ToOwned::to_owned),
684            seq,
685            derp: None,
686            node: None,
687            peer_update: None,
688            peer_patches: Vec::new(),
689            user_profiles: Vec::new(),
690            ping: None,
691            packetfilter: None,
692            cap_grants: None,
693            pop_browser_url: None,
694            dial_plan: None,
695            dns_config: None,
696            ssh_policy: None,
697            tka: None,
698            online_change: Default::default(),
699            peer_seen_change: Default::default(),
700        }
701    }
702
703    #[test]
704    fn advance_session_captures_handle_and_seq() {
705        let mut session = MapSession::default();
706
707        advance_session(&mut session, &update(Some("sess-1"), 5));
708
709        assert_eq!(session.handle, "sess-1");
710        assert_eq!(session.seq, 5);
711    }
712
713    #[test]
714    fn advance_session_keepalive_preserves_cursor() {
715        let mut session = MapSession {
716            handle: "sess-1".to_owned(),
717            seq: 7,
718        };
719
720        // Keep-alive: no handle, seq == 0. The cursor must not regress.
721        advance_session(&mut session, &update(None, 0));
722
723        assert_eq!(session.handle, "sess-1");
724        assert_eq!(session.seq, 7);
725    }
726
727    #[test]
728    fn advance_session_resets_seq_on_new_handle() {
729        let mut session = MapSession {
730            handle: "sess-1".to_owned(),
731            seq: 42,
732        };
733
734        // Control started a fresh session: a new handle must reset seq so we never carry a stale
735        // cursor from the prior session.
736        advance_session(&mut session, &update(Some("sess-2"), 0));
737
738        assert_eq!(session.handle, "sess-2");
739        assert_eq!(session.seq, 0);
740    }
741
742    #[test]
743    fn advance_session_same_handle_keeps_seq() {
744        let mut session = MapSession {
745            handle: "sess-1".to_owned(),
746            seq: 10,
747        };
748
749        // Re-issuing the same handle (not a new session) must not reset the cursor.
750        advance_session(&mut session, &update(Some("sess-1"), 0));
751
752        assert_eq!(session.handle, "sess-1");
753        assert_eq!(session.seq, 10);
754    }
755
756    #[test]
757    fn advance_session_rejects_overlong_handle() {
758        let mut session = MapSession::default();
759        let huge = "a".repeat(MAX_SESSION_HANDLE_LEN + 1);
760
761        advance_session(&mut session, &update(Some(&huge), 3));
762
763        // The handle is rejected (cursor handle stays empty); seq still advances.
764        assert_eq!(session.handle, "");
765        assert_eq!(session.seq, 3);
766    }
767
768    #[test]
769    fn advance_session_rejects_non_graphic_handle() {
770        let mut session = MapSession::default();
771
772        // A handle with control/whitespace bytes (log-injection risk) is rejected.
773        advance_session(&mut session, &update(Some("bad\nhandle"), 1));
774
775        assert_eq!(session.handle, "");
776        assert_eq!(session.seq, 1);
777    }
778
779    /// The backoff delay for a given `n` must always land in `[0.5, 1.5)` of the unjittered
780    /// `min(n²·10ms, MAP_BACKOFF_MAX)` — the Go `util/backoff` envelope. Probing each `n` with a
781    /// fresh fixed-`n` `ControlBackoff` (the same technique `ts_runtime` uses for `DerpBackoff`)
782    /// keeps the assertion independent of the process RNG.
783    #[test]
784    fn control_backoff_delay_is_within_the_go_jitter_envelope() {
785        let mut rng = rand::rng();
786        for n in 0u32..80 {
787            let unjittered_ms = u64::from(n)
788                .saturating_mul(u64::from(n))
789                .saturating_mul(10)
790                .min(MAP_BACKOFF_MAX.as_millis() as u64);
791            let unjittered = core::time::Duration::from_millis(unjittered_ms);
792
793            // 100 draws per n to exercise the jitter range.
794            for _ in 0..100 {
795                let mut probe = ControlBackoff { n };
796                let d = probe.next_delay(&mut rng);
797                if unjittered.is_zero() {
798                    // n=0: the unjittered base is 0, so any jitter factor still yields exactly 0.
799                    assert_eq!(d, core::time::Duration::ZERO, "n=0 delay must be zero");
800                } else {
801                    assert!(
802                        d >= unjittered.mul_f64(0.5) && d < unjittered.mul_f64(1.5),
803                        "n={n}: delay {d:?} outside [0.5,1.5) x {unjittered:?}"
804                    );
805                }
806            }
807        }
808    }
809
810    /// The delay grows monotonically (in expectation) until the cap, then is bounded by the cap's
811    /// jitter envelope. We assert the *unjittered* schedule directly via the cap: by `n` large
812    /// enough that `n²·10ms >= 30s`, every draw is `< 1.5 × 30s` and `>= 0.5 × 30s`.
813    #[test]
814    fn control_backoff_saturates_at_the_cap() {
815        let mut rng = rand::rng();
816        // 30_000ms / 10 = 3000 = 55² (54.7..), so n >= 55 is past the cap.
817        let mut probe = ControlBackoff { n: 1000 };
818        let d = probe.next_delay(&mut rng);
819        assert!(
820            d >= MAP_BACKOFF_MAX.mul_f64(0.5) && d < MAP_BACKOFF_MAX.mul_f64(1.5),
821            "saturated delay {d:?} outside the cap's jitter envelope"
822        );
823        // A huge `n` must not overflow the n²·10 multiply (saturating math).
824        let mut probe = ControlBackoff { n: u32::MAX };
825        let d = probe.next_delay(&mut rng);
826        assert!(d < MAP_BACKOFF_MAX.mul_f64(1.5), "overflowed at u32::MAX");
827    }
828
829    /// `reset()` returns the schedule to the bottom: after several advances, a reset makes the next
830    /// delay the `n=0` delay (which is zero — `0²·10ms`), and the counter climbs again from there.
831    #[test]
832    fn control_backoff_reset_returns_to_bottom() {
833        let mut rng = rand::rng();
834        let mut bo = ControlBackoff::default();
835
836        // Advance a few times.
837        for _ in 0..5 {
838            let _ = bo.next_delay(&mut rng);
839        }
840        assert!(bo.n > 0, "counter advanced");
841
842        bo.reset();
843        assert_eq!(bo.n, 0, "reset zeroes the counter");
844
845        // The n=0 draw is 0ms (0²·10ms · jitter == 0), and the counter advances to 1 afterward.
846        let d = bo.next_delay(&mut rng);
847        assert_eq!(d, core::time::Duration::ZERO, "n=0 delay is zero");
848        assert_eq!(bo.n, 1, "counter advances after the n=0 draw");
849    }
850
851    /// The load-bearing anti-DoS gate: [`reconnect_delay_after_poll`] resets the schedule ONLY when
852    /// the poll delivered a frame. A poll that delivered ZERO frames (the clean-EOF hot-loop, a
853    /// watchdog kill, or a frame swallowed to `None`) must NOT reset, so a zero-progress control
854    /// server escalates up the schedule instead of being hammered at full speed.
855    ///
856    /// This pins the gate that protects the whole fix: if a future change resets the backoff on the
857    /// poll-*end* path (e.g. unconditionally on `Ok(())`) instead of on frame receipt, this test
858    /// fails — the frameless branch would start returning the `n=0` (zero) delay.
859    #[test]
860    fn reconnect_delay_resets_only_when_a_frame_arrived() {
861        let mut rng = rand::rng();
862        let mut backoff = ControlBackoff::default();
863
864        // A run of frameless polls (zero progress) must escalate: each delay strictly larger than
865        // the last in expectation, and crucially NONE collapses back to the n=0 zero delay.
866        let mut last_n = backoff.n;
867        for i in 0..6 {
868            let d = reconnect_delay_after_poll(false, &mut backoff, &mut rng);
869            assert!(
870                backoff.n > last_n,
871                "frameless poll {i} must advance the counter (no reset)"
872            );
873            last_n = backoff.n;
874            if i > 0 {
875                // Past n=0, a frameless reconnect is never the zero delay (the hot-loop we fixed).
876                assert!(
877                    d > core::time::Duration::ZERO,
878                    "frameless reconnect {i} must be delayed, not a 0ms spin"
879                );
880            }
881        }
882
883        // Now a poll that DID receive a frame resets the schedule: the next delay is the n=0 zero
884        // delay (immediate reconnect for a healthy, progressing poll), and the counter is back to 1.
885        let d = reconnect_delay_after_poll(true, &mut backoff, &mut rng);
886        assert_eq!(
887            d,
888            core::time::Duration::ZERO,
889            "a poll that delivered a frame resets to the immediate (n=0) reconnect"
890        );
891        assert_eq!(backoff.n, 1, "reset then one draw leaves the counter at 1");
892    }
893
894    fn auth_url() -> Url {
895        "https://login.example/a/abc123".parse().unwrap()
896    }
897
898    /// A mid-session `MachineNotAuthorized(url)` sets the re-auth cell to `Some(url)` — the exact
899    /// drop the bug fixes (the live `run` loop used to discard this URL and only log+backoff).
900    #[test]
901    fn mid_session_machine_not_authorized_sets_auth_url_cell() {
902        let (tx, rx) = watch::channel(None);
903        let url = auth_url();
904
905        surface_reauth_url(&Error::MachineNotAuthorized(url.clone()), &tx);
906
907        assert_eq!(*rx.borrow(), Some(url));
908    }
909
910    /// `MachineNotAuthorized(None)` (control offered no auth URL) maps upstream to
911    /// `Error::Internal(MachineAuthorization, _)`, NOT `Error::MachineNotAuthorized`, so the helper
912    /// must leave the cell untouched. Built from the *exact* upstream mapping (register.rs
913    /// `From<RegistrationError> for Error`) so this stays honest if that mapping ever changes.
914    #[test]
915    fn machine_not_authorized_none_does_not_set_url_cell() {
916        let (tx, rx) = watch::channel(None);
917        let err =
918            Error::from(crate::tokio::register::RegistrationError::MachineNotAuthorized(None));
919        // Confirm the mapping is the non-URL internal variant (the precondition for the assertion).
920        assert!(matches!(
921            err,
922            Error::Internal(crate::InternalErrorKind::MachineAuthorization, _)
923        ));
924
925        surface_reauth_url(&err, &tx);
926
927        assert_eq!(
928            *rx.borrow(),
929            None,
930            "no auth URL on offer must not set the cell"
931        );
932    }
933
934    /// A non-auth error (e.g. a transient network failure) must never set the cell either — only
935    /// `MachineNotAuthorized` is a re-auth signal.
936    #[test]
937    fn non_auth_error_does_not_set_url_cell() {
938        let (tx, rx) = watch::channel(None);
939
940        surface_reauth_url(&Error::NetworkError(crate::Operation::Registration), &tx);
941
942        assert_eq!(*rx.borrow(), None);
943    }
944
945    /// The clear path: a re-register success (or a poll that delivered a frame) means a
946    /// previously-surfaced re-auth URL is stale, so `clear_reauth_url` resets the cell to `None`.
947    /// This is the recovery half of the fix — clearing at register-success (run_once's `Ok` arm)
948    /// empties the cell before the runtime bridge can re-read a stale `Some(url)` and clobber the
949    /// netmap's `Running` flip back to `NeedsLogin` (the review's recovery-race finding).
950    #[test]
951    fn clear_reauth_url_resets_a_pending_url() {
952        let (tx, rx) = watch::channel(Some(auth_url()));
953        clear_reauth_url(&tx);
954        assert_eq!(*rx.borrow(), None);
955    }
956
957    /// Clearing an already-`None` cell is a no-op that does NOT notify (so the runtime bridge isn't
958    /// woken spuriously on every frame of a healthy, never-deauthorized session).
959    #[test]
960    fn clear_reauth_url_on_empty_cell_does_not_notify() {
961        let (tx, rx) = watch::channel::<Option<Url>>(None);
962        clear_reauth_url(&tx);
963        // No change was published, so the receiver sees nothing new.
964        assert!(!rx.has_changed().unwrap());
965        assert_eq!(*rx.borrow(), None);
966    }
967
968    /// Recovery sequence at the cell level: surface a URL (failed re-register), then clear it
969    /// (the next re-register succeeds). The terminal cell state is `None`, so when the bridge next
970    /// reads it there is no stale `Some(url)` to re-assert `NeedsLogin` from.
971    #[test]
972    fn surface_then_clear_leaves_cell_empty() {
973        let (tx, rx) = watch::channel(None);
974        let url = auth_url();
975
976        surface_reauth_url(&Error::MachineNotAuthorized(url.clone()), &tx);
977        assert_eq!(*rx.borrow(), Some(url));
978
979        clear_reauth_url(&tx); // models run_once's `Ok(())` arm on the recovering poll
980        assert_eq!(*rx.borrow(), None);
981    }
982}