Skip to main content

ts_runtime/
lib.rs

1#![doc = include_str!("../README.md")]
2
3extern crate ts_netstack_smoltcp as netstack;
4
5use core::time::Duration;
6use std::sync::Arc;
7
8use kameo::{
9    actor::{ActorRef, Spawn, WeakActorRef},
10    mailbox::Signal,
11};
12use netstack::netcore::Channel;
13use tokio::sync::watch;
14
15use crate::{
16    control_runner::ControlRunner, dataplane::DataplaneActor, direct::DirectManager,
17    forwarder_actor::ForwarderActor, multiderp::Multiderp, netstack_actor::NetstackActor,
18};
19
20/// Pcap stream framer for debug packet capture (`CapturePcap`).
21pub mod capture;
22/// Control runner.
23pub mod control_runner;
24mod dataplane;
25mod derp_latency;
26/// Device connection-state tracking ([`DeviceState`]) and typed registration outcome
27/// ([`RegistrationError`]).
28pub mod device_state;
29mod direct;
30mod env;
31mod error;
32/// Fallback TCP handler registry (`tsnet.Server.RegisterFallbackTCPHandler` parity).
33pub mod fallback_tcp;
34mod forwarder_actor;
35/// Client-side Funnel ingress termination (`tsnet`'s `ListenFunnel` data path).
36pub mod funnel;
37mod magic_dns;
38mod multiderp;
39mod netstack_actor;
40mod packetfilter;
41pub mod peer_tracker;
42mod peerapi;
43mod peerapi_doh;
44mod route_updater;
45/// Stored Serve config + accept-loop runtime (`tsnet`'s `Get/SetServeConfig` + serving runtime).
46pub mod serve;
47mod src_filter;
48/// Netmap status snapshot, WhoIs, and watcher types.
49pub mod status;
50/// Taildrop peer-to-peer file transfer store.
51pub mod taildrop;
52pub mod taildrop_send;
53/// Tailnet-Lock (TKA) chain-sync orchestration: bootstrap + offer/send driver (the runtime layer
54/// that bridges the `ts_control` sync RPCs and the `ts_tka` chain logic).
55mod tka_sync;
56#[cfg(feature = "tun")]
57mod tun_actor;
58
59pub use device_state::{DeviceState, RegistrationError};
60pub(crate) use env::Env;
61pub use error::{Error, ErrorKind};
62pub use status::{FileTarget, NetcheckReport, RegionLatency, Status, StatusNode, WhoIs};
63pub use ts_dataplane::{CaptureHook, CapturePath};
64
65use crate::peer_tracker::PeerTracker;
66
67/// The runtime for a tailscale device.
68pub struct Runtime {
69    /// Reference to the control actor.
70    pub control: ActorRef<ControlRunner>,
71    dataplane: ActorRef<DataplaneActor>,
72    /// Reference to the direct (disco/UDP underlay) manager, retained so [`Runtime::rebind`] can
73    /// ask it to re-bind the underlay socket on a network/link change.
74    direct: ActorRef<DirectManager>,
75    /// Reference to the application netstack actor. `None` in TUN transport mode, where there is
76    /// no userspace application netstack (the application data path is a real kernel TUN device).
77    netstack: Option<WeakActorRef<NetstackActor>>,
78    /// Reference to the peer tracker for peer lookups.
79    pub peer_tracker: WeakActorRef<PeerTracker>,
80    /// Fallback TCP handler registry, bound to the application netstack. `None` in TUN transport
81    /// mode (no application netstack exists to attach it to).
82    fallback_tcp: Option<fallback_tcp::FallbackTcpManager>,
83    /// Reference to the forwarder actor, retained so [`Runtime::set_advertise_routes`] can push a
84    /// new accept/dial route table onto the running forwarder (the local half of advertising
85    /// routes). Without this the strong ref would drop after the startup `GetChannel` and the
86    /// forwarder would be reachable only via the message bus.
87    forwarder: ActorRef<ForwarderActor>,
88    /// Reference to the multiderp manager, retained so [`Runtime::status`] can resolve each
89    /// relayed peer's DERP region id to its region **code** (`ipnstate.PeerStatus.Relay`). Without
90    /// this the strong ref would drop after startup (it is cloned into the direct manager + route
91    /// updater) and the region-code map would be unreachable.
92    multiderp: ActorRef<Multiderp>,
93    env: Env,
94    shutdown: watch::Sender<bool>,
95    /// Sender side of the exit-node selector `watch` cell. Held privately here (not on the cloned
96    /// `Env`, which keeps only the read side) so that only `Runtime::set_exit_node` can mutate the
97    /// selection; the route updater and source filter re-read it via [`Env::exit_node`].
98    exit_node_tx: watch::Sender<Option<ts_control::ExitNodeSelector>>,
99    /// Receiver mirroring the *active* (resolved + fail-closed) exit node's stable id, fed by the
100    /// route updater. Read by [`Runtime::status`] / [`Runtime::active_exit_node`] to report which
101    /// exit node traffic is actually egressing through (vs. the merely-configured selector).
102    active_exit_rx: watch::Receiver<Option<ts_control::StableNodeId>>,
103    /// Receiver for the device connection-state cell, fed by the control runner. Read by
104    /// [`Runtime::watch_state`] and [`Runtime::wait_until_running`].
105    state_rx: watch::Receiver<DeviceState>,
106}
107
108impl Runtime {
109    /// Spawn a new runtime with the given parameters for connecting to a tailnet.
110    pub async fn spawn(
111        config: ts_control::Config,
112        auth_key: Option<String>,
113        keys: ts_keys::NodeState,
114    ) -> Result<Self, Error> {
115        let (shutdown_tx, shutdown_rx) = watch::channel(false);
116
117        // The exit-node selector is a live `watch` cell so `Device::set_exit_node` can change it at
118        // runtime. `new_with_exit_tx` returns the `Sender` (mutation capability) separately so it is
119        // retained privately on the `Runtime`, while only the `Receiver` (the readers' contract)
120        // lives on the cloned `Env`. The initial value comes from `ForwarderConfig.exit_node`.
121        let (env, exit_node_tx) = Env::new_with_exit_tx(
122            keys,
123            shutdown_rx,
124            env::ForwarderConfig::from_control_config(&config),
125        );
126
127        // Both userspace netstacks (application + forwarder) share one netstack config. Honor the
128        // per-deployment TCP buffer knob when set, otherwise fall back to the netstack default.
129        let netstack_config = netstack_config_from(config.tcp_buffer_size);
130
131        let dataplane = DataplaneActor::spawn(env.clone());
132
133        let (netstack_id, netstack_up, netstack_down) =
134            dataplane.ask(dataplane::NewOverlayTransport).await?;
135
136        // A second overlay transport feeds the dedicated any-IP forwarder netstack. Inbound packets
137        // for advertised subnet routes / the exit-node default route are routed here (see
138        // `route_updater`), keeping forwarded flows off the application netstack.
139        let (forwarder_id, forwarder_up, forwarder_down) =
140            dataplane.ask(dataplane::NewOverlayTransport).await?;
141
142        let multiderp = Multiderp::spawn((env.clone(), dataplane.clone()));
143
144        // Spawn the direct (disco) underlay manager before the route updater. Its `on_start`
145        // binds the UDP socket and registers its transport synchronously, so by the time the
146        // route updater asks it for the direct transport id it is guaranteed to be available.
147        let direct = DirectManager::spawn((env.clone(), dataplane.clone(), multiderp.clone()));
148
149        // Spawn the forwarder before the route updater. Its `on_start` builds the forwarder
150        // netstack, enables any-IP acceptance, and starts the per-port accept loops synchronously,
151        // so by the time the route updater begins delivering advertised prefixes to
152        // `forwarder_id` the netstack is already draining its transport.
153        let forwarder = ForwarderActor::spawn((
154            env.clone(),
155            netstack_config.clone(),
156            forwarder_up,
157            forwarder_down,
158        ));
159        // Force `on_start` to finish (any-IP enabled, accept loops live) before the route updater
160        // can route the first inbound flow to `forwarder_id`: an `ask` blocks until the actor has
161        // started.
162        //
163        // The forwarder netstack's overlay `Channel` is reused by the TUN application path for
164        // recursive / exit-node-DoH MagicDNS forwarding (TUN mode has no application netstack of its
165        // own, but the forwarder netstack runs in both modes and egresses over the overlay — the
166        // anti-leak property `forward_query`/`forward_doh` require). Only the `tun` Tun arm consumes
167        // it, so it is unused when the `tun` feature is off — allow that without warn-as-error.
168        #[cfg_attr(not(feature = "tun"), allow(unused_variables))]
169        let (forwarder_channel,) = forwarder.ask(forwarder_actor::GetChannel).await?;
170
171        // The route updater is the single authoritative resolver of the active (resolved,
172        // fail-closed) exit node; it publishes the resolved stable id into this watch cell so
173        // `Runtime::status` can report which exit is actually engaged (not just configured).
174        let (active_exit_tx, active_exit_rx) = watch::channel(None);
175        route_updater::RouteUpdater::spawn((
176            multiderp.clone(),
177            direct.clone(),
178            env.clone(),
179            netstack_id,
180            forwarder_id,
181            active_exit_tx,
182        ));
183        packetfilter::PacketfilterUpdater::spawn(env.clone());
184        src_filter::SourceFilterUpdater::spawn(env.clone());
185        let peer_tracker = PeerTracker::spawn(env.clone()).downgrade();
186
187        // Select the application data path from the transport mode. The forwarder/egress path
188        // above is UNCHANGED in both modes — TUN mode only swaps the application data path, never
189        // the forwarder. `config` is moved into `ControlRunner::spawn` below, so branch on a
190        // borrow and clone the small `TunConfig` where needed before the move.
191        //
192        // - Netstack (the default, and the only reachable arm when the `tun` feature is off):
193        //   spawn the application netstack + MagicDNS responder + fallback-TCP registry, all on
194        //   the `netstack_up`/`netstack_down` overlay seam.
195        // - Tun: spawn `TunActor` on that same overlay seam instead; no application netstack and
196        //   no MagicDNS responder exist, and `netstack`/`fallback_tcp` are `None`.
197        // - Tun requested but built without the `tun` feature: hard-error (a config/build
198        //   mismatch knowable at spawn time). NEVER silently fall back to netstack.
199        let (netstack, fallback_tcp) = match &config.transport_mode {
200            ts_control::TransportMode::Netstack => {
201                let netstack = NetstackActor::spawn((
202                    env.clone(),
203                    netstack_config,
204                    netstack_up,
205                    netstack_down,
206                ));
207
208                // Fetch the netstack channel while we still hold the strong ActorRef, then spawn
209                // the MagicDNS responder on it. Fire-and-forget: like src_filter/route_updater,
210                // it's owned by the message bus and isn't stored on `Runtime`.
211                let (channel,) = netstack.ask(netstack_actor::GetChannel).await?;
212                // The fallback-TCP registry attaches to the application netstack — the same one
213                // that carries the embedder's explicit `Device::tcp_listen` sockets — so a
214                // fallback handler sees exactly the inbound flows no explicit listener matched.
215                let fallback_tcp = fallback_tcp::FallbackTcpManager::new(channel.clone());
216                magic_dns::MagicDnsActor::spawn((env.clone(), channel));
217
218                (Some(netstack.downgrade()), Some(fallback_tcp))
219            }
220
221            #[cfg(feature = "tun")]
222            ts_control::TransportMode::Tun(tun_cfg) => {
223                // Reuse the same `netstack_up`/`netstack_down` overlay-transport pair that would
224                // have fed the netstack — it is just the application-side overlay seam (the name
225                // is historical). No NetstackActor / MagicDnsActor is spawned.
226                tun_actor::TunActor::spawn((
227                    env.clone(),
228                    tun_cfg.clone(),
229                    netstack_up,
230                    netstack_down,
231                    // Host-route gating inputs derived from `Env`: subnet routes are only steered
232                    // into the TUN when `--accept-routes` is set, and the host `/0` only when the
233                    // embedder configured an exit node. See `tun_actor::host_routes_from_node`.
234                    tun_actor::HostRouteGating {
235                        accept_routes: env.accept_routes,
236                        exit_node_configured: env.exit_node().is_some(),
237                    },
238                    // Reuse the forwarder netstack's overlay `Channel` for recursive / exit-node-DoH
239                    // MagicDNS forwarding in the TUN datapath (TUN mode has no application netstack
240                    // Channel of its own). Egresses over the overlay — anti-leak preserved.
241                    forwarder_channel.clone(),
242                ));
243
244                (None, None)
245            }
246
247            #[cfg(not(feature = "tun"))]
248            ts_control::TransportMode::Tun(_) => {
249                return Err(Error {
250                    kind: ErrorKind::TunUnavailable,
251                    target_actor: None,
252                    message_ty: None,
253                });
254            }
255        };
256
257        // Device connection-state cell. Created here (not inside the actor) so the control runner's
258        // `on_start` can publish `Failed`/`NeedsLogin` and still return `Err` without the sender
259        // being tied to a `Self` that never gets constructed on a hard registration failure.
260        let (state_tx, state_rx) = watch::channel(DeviceState::Connecting);
261
262        let control = ControlRunner::spawn(control_runner::Params {
263            config,
264            auth_key,
265            env: env.clone(),
266            state_tx,
267        });
268
269        Ok(Self {
270            control,
271            dataplane,
272            direct,
273            peer_tracker,
274            fallback_tcp,
275            forwarder,
276            multiderp,
277            netstack,
278            env,
279            shutdown: shutdown_tx,
280            exit_node_tx,
281            active_exit_rx,
282            state_rx,
283        })
284    }
285
286    /// Register a fallback TCP handler consulted for every inbound TCP flow that matches no
287    /// explicit listener (`tsnet.Server.RegisterFallbackTCPHandler` parity).
288    ///
289    /// The returned [`fallback_tcp::FallbackTcpHandle`] deregisters the handler when dropped. See
290    /// [`fallback_tcp`] for the dispatch contract and anti-leak guarantees.
291    ///
292    /// Returns [`ErrorKind::UnsupportedInTunMode`] in TUN transport mode, where there is no
293    /// application netstack to attach a fallback handler to.
294    pub fn register_fallback_tcp_handler(
295        &self,
296        cb: Arc<
297            dyn Fn(core::net::SocketAddr, core::net::SocketAddr) -> fallback_tcp::FallbackDecision
298                + Send
299                + Sync,
300        >,
301    ) -> Result<fallback_tcp::FallbackTcpHandle, Error> {
302        Ok(self
303            .fallback_tcp
304            .as_ref()
305            .ok_or(Error {
306                kind: ErrorKind::UnsupportedInTunMode,
307                target_actor: None,
308                message_ty: None,
309            })?
310            .register(cb))
311    }
312
313    /// Get a channel to send commands to the netstack.
314    ///
315    /// Returns [`ErrorKind::UnsupportedInTunMode`] in TUN transport mode, where there is no
316    /// application netstack.
317    pub async fn channel(&self) -> Result<Channel, Error> {
318        let (channel,) = self
319            .netstack
320            .as_ref()
321            .ok_or(Error {
322                kind: ErrorKind::UnsupportedInTunMode,
323                target_actor: None,
324                message_ty: None,
325            })?
326            .upgrade()
327            .ok_or(Error {
328                kind: ErrorKind::ActorGone,
329                target_actor: None,
330                message_ty: None,
331            })?
332            .ask(netstack_actor::GetChannel)
333            .await?;
334
335        Ok(channel)
336    }
337
338    /// The Taildrop file store, if Taildrop is enabled (`taildrop_dir` configured and the store
339    /// initialized). `None` when disabled — fail-closed. Shared with the peerAPI Taildrop server so
340    /// the embedder's read APIs and the receive path see the same on-disk store.
341    pub fn taildrop_store(&self) -> Option<Arc<crate::taildrop::TaildropStore>> {
342        self.env.taildrop_store.clone()
343    }
344
345    /// The shared Funnel ingress slot the peerAPI `/v0/ingress` route reads per connection.
346    ///
347    /// `Device::listen_funnel` installs a [`FunnelManager`](crate::funnel::FunnelManager)'s sink here
348    /// to make the route live (the peerAPI server is already running from startup). Returns a clone of
349    /// the runtime-lifetime `Arc` so the device can write the slot without restarting the server. See
350    /// [`crate::funnel`] for the ingress data path.
351    pub fn funnel_ingress_slot(&self) -> crate::funnel::FunnelIngressSlot {
352        self.env.funnel_ingress.clone()
353    }
354
355    /// The shared "Funnel ingress listener active" flag (the same `Arc` the control session reads to
356    /// set `HostInfo.IngressEnabled`). `Device::listen_funnel` flips it `true` while a funnel listener
357    /// is up so control routes Funnel traffic to this node; clearing it advertises no live endpoint.
358    pub fn ingress_active_flag(&self) -> std::sync::Arc<std::sync::atomic::AtomicBool> {
359        self.env.ingress_active.clone()
360    }
361
362    /// Install (`Some`) or clear (`None`) the debug packet-capture hook on the running dataplane.
363    /// `Some(hook)` tees every plaintext packet crossing the datapath to `hook` until it is cleared;
364    /// `None` stops capture. Mirrors Go `tstun.Wrapper.InstallCaptureHook` / `ClearCaptureSink`.
365    pub async fn install_capture(
366        &self,
367        hook: Option<ts_dataplane::CaptureHook>,
368    ) -> Result<(), Error> {
369        self.dataplane
370            .ask(dataplane::InstallCapture { hook })
371            .await
372            .map_err(Into::into)
373    }
374
375    /// Re-bind the underlay UDP socket after a network/link change (Wi-Fi switch, sleep/wake). The
376    /// embedder's own link monitor calls this (the engine owns the socket re-bind; the embedder owns
377    /// OS netmon). Re-binds the socket (same-port-preferred, IPv4-only invariant preserved) and
378    /// resets the now-stale local NAT mapping — clearing learned reflexive addresses and every
379    /// confirmed direct path while keeping candidate endpoints, so peers re-probe over the new socket
380    /// and relay over DERP (never a direct host dial) until a path re-confirms. Peers, control, the
381    /// netmap, disco state, and DERP are untouched. A no-op when the underlay is inert (bind failed
382    /// at startup, DERP-only). Mirrors Go magicsock `Conn.Rebind` + `resetEndpointStates`.
383    pub async fn rebind(&self) -> Result<(), Error> {
384        self.direct.ask(direct::Rebind).await.map_err(Error::from)
385    }
386
387    /// A snapshot of the local netmap: this node plus every known peer.
388    ///
389    /// Combines the self node held by the control runner with the peer set held by the peer
390    /// tracker. Mirrors tsnet's `LocalClient::Status`.
391    ///
392    /// `self_node` is `None` until the first netmap update has been received from control. Peer
393    /// entries carry no online/user/capability data (see the [`status`] module docs for that gap).
394    pub async fn status(&self) -> Result<Status, Error> {
395        let self_node_domain = self.control.ask(control_runner::SelfNode).await?;
396        // The MagicDNS suffix is the self node's FQDN minus its host label — already split into
397        // `Node.tailnet` at decode time (Go derives it the same way in `NetworkMap.MagicDNSSuffix`).
398        // Capture it before the domain `Node` is mapped away into a `StatusNode`.
399        let magic_dns_suffix = self_node_domain.as_ref().and_then(|n| n.tailnet.clone());
400        let self_node = self_node_domain.as_ref().map(StatusNode::from_node);
401
402        let peers_with_ids = self
403            .peer_tracker
404            .upgrade()
405            .ok_or(Error {
406                kind: ErrorKind::ActorGone,
407                target_actor: None,
408                message_ty: None,
409            })?
410            .ask(peer_tracker::GetStatus)
411            .await?;
412
413        // Join per-peer connectivity (Go `PeerStatus.CurAddr`): one batched query to the direct
414        // manager for every peer's current trusted direct endpoint, then fill `cur_addr` on each
415        // `StatusNode`. A peer absent from the map is relayed via DERP (`cur_addr = None`). This is a
416        // live snapshot — the direct path can expire/re-confirm between calls (matches Go's snapshot
417        // semantics). The `watch_netmap` stream intentionally carries no connectivity (it is a netmap
418        // watch, not a path-state watch, and does not re-fire on direct↔relay flips).
419        let ids: Vec<ts_transport::PeerId> = peers_with_ids.iter().map(|(id, _)| *id).collect();
420        let best_addrs = self
421            .direct
422            .ask(direct::BestAddrs { ids: ids.clone() })
423            .await
424            .unwrap_or_default();
425
426        // For the peers with NO direct path (relayed via DERP), resolve the region CODE they relay
427        // through (Go `PeerStatus.Relay`). One batched ask to multiderp; `cur_addr` and `relay` are
428        // mutually exclusive for a routed peer, mirroring Go's empty-vs-set strings.
429        let relay_ids: Vec<ts_transport::PeerId> = ids
430            .into_iter()
431            .filter(|id| !best_addrs.contains_key(id))
432            .collect();
433        let relay_codes = if relay_ids.is_empty() {
434            Default::default()
435        } else {
436            self.multiderp
437                .ask(multiderp::RelayCodesForPeers { ids: relay_ids })
438                .await
439                .unwrap_or_default()
440        };
441
442        let peers = peers_with_ids
443            .into_iter()
444            .map(|(id, mut node)| match best_addrs.get(&id).copied() {
445                Some(addr) => {
446                    node.cur_addr = Some(addr);
447                    node
448                }
449                None => {
450                    node.relay = relay_codes.get(&id).cloned();
451                    node
452                }
453            })
454            .collect();
455
456        Ok(Status {
457            self_node,
458            peers,
459            active_exit_node: self.active_exit_node(),
460            magic_dns_suffix,
461        })
462    }
463
464    /// List the tailnet peers this node can Taildrop a file *to* (Go LocalAPI `FileTargets`).
465    ///
466    /// Mirrors the upstream send-path filter (`feature/taildrop` `Extension::FileTargets`): a peer
467    /// qualifies when it advertises a reachable peerAPI **and** is either owned by the same user as
468    /// this node **or** explicitly granted the file-sharing-target capability. The whole list is
469    /// gated on this node holding the file-sharing capability (control sets it when the admin enables
470    /// Taildrop) — absent that, an empty list (fail-closed, not an error, matching how the receive
471    /// store returns empty when disabled). Results are sorted by the peer's MagicDNS name.
472    ///
473    /// Targets are listed regardless of current online state (upstream's `FileTargets` does not gate
474    /// on online either; an offline target's send will simply time out). The self node is never
475    /// included. Returns empty before the first netmap.
476    ///
477    /// Divergence from Go: the upstream filter also excludes `tvOS` peers, which this fork cannot
478    /// reproduce (the domain node carries no OS string); the impact is negligible — the actual send
479    /// fail-closes if such a peer refused the transfer.
480    pub async fn file_targets(&self) -> Result<Vec<FileTarget>, Error> {
481        // Node-level gate: this node must hold the file-sharing capability (Taildrop enabled by the
482        // admin). Read it off the self node's cap map, like Go's `hasCapFileSharing()`.
483        let self_node = self.control.ask(control_runner::SelfNode).await?;
484        let Some(self_node) = self_node else {
485            return Ok(Vec::new()); // no netmap yet
486        };
487        if !self_node.can_share_files() {
488            return Ok(Vec::new()); // Taildrop not enabled for the tailnet — fail-closed
489        }
490        let self_user_id = self_node.user_id;
491
492        let peers = self
493            .peer_tracker
494            .upgrade()
495            .ok_or(Error {
496                kind: ErrorKind::ActorGone,
497                target_actor: None,
498                message_ty: None,
499            })?
500            .ask(peer_tracker::AllPeers)
501            .await?;
502
503        // Eligibility + ordering live in `build_file_targets` (pure, unit-tested in `status`).
504        Ok(status::build_file_targets(peers, self_user_id))
505    }
506
507    /// The stable id of the exit node traffic is currently egressing through, or `None` if none is
508    /// engaged. This is the route updater's resolved + fail-closed answer (see
509    /// [`Status::active_exit_node`](crate::status::Status::active_exit_node)): it differs from the
510    /// configured [`exit_node`](Self::exit_node) selector, which may name a peer that is absent or
511    /// no longer advertising a default route (in which case egress is dropped and this returns
512    /// `None`).
513    pub fn active_exit_node(&self) -> Option<ts_control::StableNodeId> {
514        self.active_exit_rx.borrow().clone()
515    }
516
517    /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
518    ///
519    /// Returns the signed JWT, or the token RPC's own [`ts_control::IdTokenError`]. The kameo
520    /// delegated-reply send error is flattened: a handler error carries the real `IdTokenError`,
521    /// any other send failure (actor shutdown / mailbox closed) is surfaced as
522    /// [`ts_control::IdTokenError::NetworkError`].
523    pub async fn fetch_id_token(
524        &self,
525        audience: String,
526    ) -> Result<String, ts_control::IdTokenError> {
527        self.control
528            .ask(control_runner::FetchIdToken { audience })
529            .await
530            .map_err(flatten_send_err)
531    }
532
533    /// Log this node out of the tailnet: deregister it by expiring its current node key.
534    ///
535    /// Forwards to the control runner, which re-POSTs `/machine/register` with a past expiry over a
536    /// fresh Noise channel. This is a control-plane state change only — it does NOT shut the runtime
537    /// down (the caller follows with [`graceful_shutdown`](Self::graceful_shutdown)) and does not
538    /// touch the on-disk node key. The kameo delegated-reply send error is flattened the same way as
539    /// [`fetch_id_token`](Self::fetch_id_token): a handler error carries the real
540    /// [`ts_control::LogoutError`]; any other send failure (actor shutdown / mailbox closed) is
541    /// surfaced as [`ts_control::LogoutError::NetworkError`].
542    pub async fn logout(&self) -> Result<(), ts_control::LogoutError> {
543        self.control
544            .ask(control_runner::Logout)
545            .await
546            .map_err(flatten_logout_send_err)
547    }
548
549    /// Publish a `TXT` DNS record for this node via control's `/machine/set-dns` (Go
550    /// `LocalClient.SetDNS`).
551    ///
552    /// Forwards to the control runner, which POSTs the record over a fresh Noise channel. The kameo
553    /// delegated-reply send error is flattened the same way as [`fetch_id_token`](Self::fetch_id_token):
554    /// a handler error carries the real [`ts_control::SetDnsError`]; any other send failure (actor
555    /// shutdown / mailbox closed) is surfaced as [`ts_control::SetDnsError::NetworkError`].
556    pub async fn set_dns(
557        &self,
558        name: String,
559        value: String,
560    ) -> Result<(), ts_control::SetDnsError> {
561        self.control
562            .ask(control_runner::SetDns { name, value })
563            .await
564            .map_err(flatten_set_dns_send_err)
565    }
566
567    /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` (`acme` feature).
568    ///
569    /// Mirrors [`fetch_id_token`](Self::fetch_id_token): forwards to the control runner, which runs
570    /// the client-side ACME DNS-01 flow on a spawned task and publishes the challenge TXT via the
571    /// node's set-dns RPC. The kameo delegated-reply send error is flattened — a handler error
572    /// carries the real [`ts_control::CertError`]; any other send failure (actor shutdown / mailbox
573    /// closed) is surfaced as a [`ts_control::CertError::Io`]. SaaS-only: a self-hosted control
574    /// plane 501s on set-dns.
575    #[cfg(feature = "acme")]
576    pub async fn get_certificate(
577        &self,
578        name: String,
579    ) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
580        self.control
581            .ask(control_runner::GetCertificate { name })
582            .await
583            .map_err(flatten_cert_send_err)
584    }
585
586    /// Resolve which node owns a tailnet source address.
587    ///
588    /// Maps the source IP of `addr` to its owning node. Mirrors tsnet's `LocalClient::WhoIs`.
589    /// Returns `None` if no peer holds that tailnet IP. The returned [`WhoIs`] carries no
590    /// user/login or capability data in this fork (see the [`status`] module docs).
591    pub async fn whois(&self, addr: core::net::SocketAddr) -> Result<Option<WhoIs>, Error> {
592        self.peer_tracker
593            .upgrade()
594            .ok_or(Error {
595                kind: ErrorKind::ActorGone,
596                target_actor: None,
597                message_ty: None,
598            })?
599            .ask(peer_tracker::Whois { addr })
600            .await
601            .map_err(Into::into)
602    }
603
604    /// The current direct-path status to the peer holding tailnet IP `dst`: its confirmed direct UDP
605    /// endpoint and that path's last-measured RTT, or `None` when there is no direct path right now
606    /// (the peer is relayed via DERP, is unknown, or has no disco key).
607    ///
608    /// The latency is the RTT of the most recent disco ping/pong that confirmed the path — a live
609    /// snapshot up to one probe interval stale, NOT a fresh on-demand round-trip (that is a separate,
610    /// heavier capability). Mirrors the direct-path latency Go surfaces for `ipnstate.PeerStatus`.
611    pub async fn direct_path(
612        &self,
613        dst: core::net::IpAddr,
614    ) -> Result<Option<(core::net::SocketAddr, Duration)>, Error> {
615        let peer_tracker = self.peer_tracker.upgrade().ok_or(Error {
616            kind: ErrorKind::ActorGone,
617            target_actor: None,
618            message_ty: None,
619        })?;
620
621        // Resolve the tailnet IP to its node, then to its disco key. No node / no disco key ⇒ no
622        // direct path is possible (a peer with no disco key can only be reached via DERP).
623        let Some(node) = peer_tracker
624            .ask(peer_tracker::PeerByTailnetIp { ip: dst })
625            .await?
626        else {
627            return Ok(None);
628        };
629        let Some(disco) = node.disco_key else {
630            return Ok(None);
631        };
632
633        self.direct
634            .ask(direct::DirectPathLatency { disco })
635            .await
636            .map_err(Into::into)
637    }
638
639    /// Send a disco ping to the peer holding tailnet IP `dst` **now** and await the pong, returning
640    /// the fresh round-trip latency and the endpoint that answered, or `None` if no pong arrives
641    /// within `timeout` (or the peer is unknown / has no disco key / no candidate path). This is the
642    /// true on-demand `PingType::Disco` (Go `tailscale ping`), as opposed to
643    /// [`direct_path`](Self::direct_path) which reports the last periodic probe's RTT.
644    ///
645    /// The ping round-trip is awaited OFF the direct manager's mailbox (we take a `MagicSock` handle
646    /// and await on it directly), so a slow/timing-out ping never blocks the actor.
647    pub async fn ping_disco(
648        &self,
649        dst: core::net::IpAddr,
650        timeout: Duration,
651    ) -> Result<Option<(core::net::SocketAddr, Duration)>, Error> {
652        let peer_tracker = self.peer_tracker.upgrade().ok_or(Error {
653            kind: ErrorKind::ActorGone,
654            target_actor: None,
655            message_ty: None,
656        })?;
657
658        let Some(node) = peer_tracker
659            .ask(peer_tracker::PeerByTailnetIp { ip: dst })
660            .await?
661        else {
662            return Ok(None);
663        };
664        let Some(disco) = node.disco_key else {
665            return Ok(None);
666        };
667
668        // Cheap synchronous handle fetch, then await the ping OFF the actor mailbox.
669        let Some(sock) = self.direct.ask(direct::SockHandle).await? else {
670            return Ok(None);
671        };
672        // A `ping_now` error is an underlay UDP send failure (not an actor problem); surface it as a
673        // reply-level error. A timed-out / unanswered ping is `Ok(None)`, not an error.
674        sock.ping_now(&disco, timeout).await.map_err(|_| Error {
675            kind: ErrorKind::ReplyErr,
676            target_actor: None,
677            message_ty: None,
678        })
679    }
680
681    /// Change the selected exit node at runtime (the equivalent of Go `tsnet`'s
682    /// `LocalClient.EditPrefs(ExitNodeID/ExitNodeIP)`), without recreating the device.
683    ///
684    /// Updates the live exit-node selector, then asks the peer tracker to re-broadcast the current
685    /// peer set so the route updater and source filter re-resolve the new selector immediately.
686    /// `None` clears the exit node (internet-bound traffic is then dropped, fail-closed, unless this
687    /// node egresses directly). The selection is re-resolved against the live peer set, so passing a
688    /// selector for a peer not yet in the netmap simply takes effect once that peer appears.
689    pub async fn set_exit_node(
690        &self,
691        selector: Option<ts_control::ExitNodeSelector>,
692    ) -> Result<(), Error> {
693        // Update the live cell every reader borrows from. `send_replace` keeps the value current
694        // even with no active receivers (none can have dropped while the runtime is up, but it is
695        // the right non-failing primitive here).
696        self.exit_node_tx.send_replace(selector);
697
698        // Trigger an immediate re-resolution: the route updater (outbound routes + DoH delegation)
699        // and the source filter (inbound validation) both recompute on an `Arc<PeerState>`, so a
700        // re-broadcast applies the new exit without waiting for the next netmap update.
701        self.peer_tracker
702            .upgrade()
703            .ok_or(Error {
704                kind: ErrorKind::ActorGone,
705                target_actor: None,
706                message_ty: None,
707            })?
708            .ask(peer_tracker::RepublishState)
709            .await
710            .map_err(Into::into)
711    }
712
713    /// The currently-selected exit node, or `None` if none is selected.
714    pub fn exit_node(&self) -> Option<ts_control::ExitNodeSelector> {
715        self.env.exit_node()
716    }
717
718    /// Change the set of subnet routes this node advertises at runtime (Go `tailscale set
719    /// --advertise-routes`). Applies BOTH halves together so the wire and the data path agree:
720    ///
721    /// 1. **Wire** — re-advertise `Hostinfo.RoutableIPs` to control on the live map-poll connection
722    ///    (so control grants the node the subnet-router role for exactly these prefixes).
723    /// 2. **Local** — swap the forwarder's accept/dial route table (so the node actually forwards the
724    ///    prefixes it advertises). New flows see the new set; in-flight flows keep their routing.
725    ///
726    /// `routes` is filtered to the IPv4-only, deduplicated set this fork can honor (IPv6 prefixes are
727    /// dropped under the IPv6-off posture — we never advertise a route we won't forward), so the wire
728    /// and forwarder are fed the identical final set. This sets the explicit subnet prefixes only; it
729    /// does NOT touch the exit-node `0.0.0.0/0` advertisement (a separate concern).
730    pub async fn set_advertise_routes(&self, routes: Vec<ipnet::IpNet>) -> Result<(), Error> {
731        // IPv4-only + dedup, mirroring `ts_control::Config::advertised_routes` so the wire grant and
732        // the forwarder accept set never disagree.
733        let filtered = filter_advertise_routes(routes);
734
735        // Local half first: start forwarding the prefixes before control grants them, so there is no
736        // window where control has granted a route the node black-holes. (The reverse order would
737        // briefly advertise a route we don't yet forward.) New flows pick up the table immediately.
738        self.forwarder
739            .ask(forwarder_actor::UpdateRoutes {
740                routes: filtered.clone(),
741            })
742            .await?;
743
744        // Wire half: re-advertise to control on the live map-poll connection.
745        self.control
746            .ask(control_runner::SetAdvertiseRoutes { routes: filtered })
747            .await
748            .map_err(Into::into)
749    }
750
751    /// Subscribe to netmap peer-change events.
752    ///
753    /// Returns a [`watch::Receiver`] whose value is the current set of peer [`StatusNode`]s,
754    /// updated on every netmap state update from control. Mirrors tsnet's `WatchIPNBus`. Await
755    /// [`watch::Receiver::changed`](tokio::sync::watch::Receiver::changed) to react to peers
756    /// joining, leaving, or changing.
757    pub async fn watch_netmap(&self) -> Result<watch::Receiver<Vec<StatusNode>>, Error> {
758        self.peer_tracker
759            .upgrade()
760            .ok_or(Error {
761                kind: ErrorKind::ActorGone,
762                target_actor: None,
763                message_ty: None,
764            })?
765            .ask(peer_tracker::WatchNetmap)
766            .await
767            .map_err(Into::into)
768    }
769
770    /// The current device connection-[`DeviceState`].
771    pub fn device_state(&self) -> DeviceState {
772        self.state_rx.borrow().clone()
773    }
774
775    /// Watch the device connection-[`DeviceState`] (`Connecting` → `Running` / `NeedsLogin` /
776    /// `Expired` / `Failed`).
777    ///
778    /// Returns a [`watch::Receiver`]; await
779    /// [`changed`](tokio::sync::watch::Receiver::changed) to react push-style to control connection
780    /// transitions instead of polling [`status`](Self::status). The initial value is the current
781    /// state. Note: a transient per-reconnect dip back to `Connecting` is **not** currently
782    /// emitted (control transparently reconnects below this layer); the state reflects registration
783    /// outcome and node-key expiry.
784    pub fn watch_state(&self) -> watch::Receiver<DeviceState> {
785        self.state_rx.clone()
786    }
787
788    /// Wait until the device finishes registering, returning a typed outcome.
789    ///
790    /// Resolves `Ok(())` once the device reaches [`DeviceState::Running`]. Returns a typed
791    /// [`RegistrationError`] otherwise — the actionable distinction between "retry", "re-pair", and
792    /// "drive interactive login" that replaces polling [`ipv4_addr`](Self::ipv4_addr) in a loop:
793    /// - `AuthRejected` — bad/expired/unknown auth key. **Permanent** (re-pair).
794    /// - `NeedsLogin(url)` — interactive authorization required (no usable auth key). **Not
795    ///   permanent**: the runtime keeps retrying and will reach `Running` once the user authorizes
796    ///   the URL. An **auth-key** caller should treat this as a failure; an **interactive** caller
797    ///   should ignore this return and instead drive the flow via [`watch_state`](Self::watch_state)
798    ///   (this method returns the URL eagerly rather than blocking for the whole login).
799    /// - `NetworkUnreachable` — control unreachable. **Transient** (retry).
800    /// - `Timeout` — no settled state within `timeout`.
801    ///
802    /// `KeyExpired` is not produced by this initial wait (a node key expires only *after* it has
803    /// come up); observe post-registration expiry via [`watch_state`](Self::watch_state).
804    /// `timeout` of `None` waits indefinitely for a settled state.
805    pub async fn wait_until_running(
806        &self,
807        timeout: Option<Duration>,
808    ) -> Result<(), RegistrationError> {
809        device_state::wait_for_running(self.state_rx.clone(), timeout).await
810    }
811
812    /// Attempt to shut down the runtime gracefully.
813    ///
814    /// Returns false if the shutdown timed out. It is still shut down if it timed out, just
815    /// more violently and with possible resource leaks.
816    pub async fn graceful_shutdown(self, timeout: Option<Duration>) -> bool {
817        self.shutdown.send_replace(true);
818
819        async fn _shutdown_all(runtime: Runtime) {
820            // See the note in `Drop` for why we only need to stop these actors to bring down the
821            // whole runtime.
822
823            let _ignore = runtime.control.stop_gracefully().await;
824            let _ignore = runtime.dataplane.stop_gracefully().await;
825            let _ignore = runtime.env.bus.stop_gracefully().await;
826
827            tokio::join![
828                runtime.control.wait_for_shutdown(),
829                runtime.dataplane.wait_for_shutdown(),
830                runtime.env.bus.wait_for_shutdown(),
831            ];
832        }
833
834        let fut = _shutdown_all(self);
835
836        match timeout {
837            Some(timeout) => tokio::time::timeout(timeout, fut).await.is_ok(),
838            None => {
839                fut.await;
840                true
841            }
842        }
843    }
844}
845
846impl Drop for Runtime {
847    fn drop(&mut self) {
848        // We must have already run `graceful_shutdown`: on the happy path, this does nothing, but
849        // if it timed out, we need to make sure the actors are dead so we don't leak them and their
850        // dependents.
851        if *self.shutdown.borrow() {
852            self.control.kill();
853            self.dataplane.kill();
854            self.env.bus.kill();
855            return;
856        }
857
858        self.shutdown.send_replace(true);
859
860        // Actors shut down when the last ActorRef to them is dropped (as nothing can send them
861        // messages anymore). If we don't hold an ActorRef in Runtime, in general the only thing
862        // that has one is the MessageBus, which each actor subscribes to for a subset of messages.
863        // Hence, if we shut down the bus, most actors die as well.
864
865        // First shut down the actors we have an ActorRef to:
866        try_shutdown(&self.control);
867        try_shutdown(&self.dataplane);
868
869        // Then shutdown the message bus, stopping the rest of the actors:
870        try_shutdown(&self.env.bus);
871    }
872}
873
874fn try_shutdown(a: &ActorRef<impl kameo::Actor>) {
875    if let Err(e) = a.mailbox_sender().try_send(Signal::Stop) {
876        tracing::error!(error = %e, "graceful shutdown failed, killing actor");
877        a.kill();
878    }
879}
880
881/// Build the netstack config shared by both userspace netstacks (application + forwarder) from the
882/// per-deployment `tcp_buffer_size` knob.
883///
884/// `None` keeps the netstack default (256 KiB/direction); `Some(n)` overrides it (e.g. a smaller
885/// window on a memory-constrained exit node forwarding many concurrent flows — see
886/// [`netstack::netcore::Config::tcp_buffer_size`]). Factored out of [`Runtime::spawn`] so the
887/// None-default / Some-override mapping is unit-testable without standing up the actor system.
888fn netstack_config_from(tcp_buffer_size: Option<usize>) -> netstack::netcore::Config {
889    let mut c = netstack::netcore::Config::default();
890    if let Some(tcp_buffer_size) = tcp_buffer_size {
891        c.tcp_buffer_size = tcp_buffer_size;
892    }
893    c
894}
895
896/// Filter a requested advertise-route set to the IPv4-only, deduplicated set this fork can honor,
897/// mirroring [`ts_control::Config::advertised_routes`] so a runtime `set_advertise_routes` feeds the
898/// wire (control grant) and the forwarder (accept/dial table) the identical final set. IPv6 prefixes
899/// are dropped under the IPv6-off posture — we never advertise a route we won't forward. Order is
900/// preserved (first occurrence wins). Factored out so the filter is unit-testable without an actor.
901fn filter_advertise_routes(routes: Vec<ipnet::IpNet>) -> Vec<ipnet::IpNet> {
902    let mut filtered: Vec<ipnet::IpNet> = Vec::new();
903    for net in routes {
904        if matches!(net, ipnet::IpNet::V4(_)) {
905            if !filtered.contains(&net) {
906                filtered.push(net);
907            }
908        } else {
909            tracing::warn!(prefix = %net, "dropping IPv6 advertise route (IPv6-off posture)");
910        }
911    }
912    filtered
913}
914
915/// Flatten a kameo delegated-reply [`SendError`] for the id-token RPC into the RPC's own
916/// [`ts_control::IdTokenError`].
917///
918/// A [`SendError::HandlerError`](kameo::error::SendError::HandlerError) carries the real
919/// `IdTokenError` produced by the handler and is surfaced verbatim. Any other send failure (actor
920/// not running / stopped, mailbox full, send timeout) is a delivery problem rather than an RPC
921/// result, so it collapses to a transient [`ts_control::IdTokenError::NetworkError`]. Factored out
922/// of [`Runtime::fetch_id_token`] so this mapping is unit-testable without standing up an actor.
923fn flatten_send_err<M>(
924    e: kameo::error::SendError<M, ts_control::IdTokenError>,
925) -> ts_control::IdTokenError {
926    match e {
927        kameo::error::SendError::HandlerError(err) => err,
928        _ => ts_control::IdTokenError::NetworkError,
929    }
930}
931
932/// Flatten a kameo `SendError` from the `Logout` ask into a [`ts_control::LogoutError`].
933///
934/// A `HandlerError` carries the real `LogoutError` from the control RPC and is surfaced verbatim;
935/// any other send failure (actor not running / stopped, mailbox full, send timeout) — a delivery
936/// problem, not a logout result — collapses to the transient [`ts_control::LogoutError::NetworkError`]
937/// (logout is idempotent, so a retry after a delivery failure is safe). Factored out of
938/// [`Runtime::logout`] so the mapping is unit-testable without standing up an actor.
939fn flatten_logout_send_err<M>(
940    e: kameo::error::SendError<M, ts_control::LogoutError>,
941) -> ts_control::LogoutError {
942    match e {
943        kameo::error::SendError::HandlerError(err) => err,
944        _ => ts_control::LogoutError::NetworkError,
945    }
946}
947
948/// Flatten a kameo `SendError` from the `SetDns` ask into a [`ts_control::SetDnsError`].
949///
950/// A `HandlerError` carries the real `SetDnsError` from the set-dns RPC and is surfaced verbatim;
951/// any other send failure (actor not running / stopped, mailbox full, send timeout) — a delivery
952/// problem, not a publish result — collapses to the transient
953/// [`ts_control::SetDnsError::NetworkError`]. Factored out of [`Runtime::set_dns`] so the mapping is
954/// unit-testable without standing up an actor.
955fn flatten_set_dns_send_err<M>(
956    e: kameo::error::SendError<M, ts_control::SetDnsError>,
957) -> ts_control::SetDnsError {
958    match e {
959        kameo::error::SendError::HandlerError(err) => err,
960        _ => ts_control::SetDnsError::NetworkError,
961    }
962}
963
964/// Flatten a kameo `SendError` from the `GetCertificate` ask into a [`ts_control::CertError`].
965///
966/// A `HandlerError` carries the real `CertError` produced by the ACME issuance and is surfaced
967/// verbatim. `CertError` has no transient-network variant, so any other send failure (actor not
968/// running / stopped, mailbox full, send timeout) — a delivery problem rather than an issuance
969/// result — collapses to a [`ts_control::CertError::Io`]. Factored out of
970/// [`Runtime::get_certificate`] so this mapping is unit-testable without standing up an actor.
971#[cfg(feature = "acme")]
972fn flatten_cert_send_err<M>(
973    e: kameo::error::SendError<M, ts_control::CertError>,
974) -> ts_control::CertError {
975    match e {
976        kameo::error::SendError::HandlerError(err) => err,
977        _ => ts_control::CertError::Io(std::io::Error::other(
978            "control runner unavailable for certificate issuance",
979        )),
980    }
981}
982
983#[cfg(test)]
984mod tests {
985    use super::*;
986
987    /// `None` must leave the netstack's own default TCP window in place (the 256 KiB throughput
988    /// default), and must not silently coerce to some other value.
989    #[test]
990    fn netstack_config_none_uses_netstack_default() {
991        let default = netstack::netcore::Config::default();
992        let built = netstack_config_from(None);
993        assert_eq!(
994            built.tcp_buffer_size, default.tcp_buffer_size,
995            "None must inherit the netstack default TCP buffer size"
996        );
997    }
998
999    /// `Some(n)` must override the TCP window (the memory-vs-throughput knob exit-node operators
1000    /// reach for), reaching the config that both netstacks are built from.
1001    #[test]
1002    fn netstack_config_some_overrides_buffer() {
1003        let built = netstack_config_from(Some(64 * 1024));
1004        assert_eq!(
1005            built.tcp_buffer_size,
1006            64 * 1024,
1007            "Some(n) must override the TCP buffer size that both netstacks use"
1008        );
1009    }
1010
1011    /// `set_advertise_routes` must feed the wire and the forwarder the IDENTICAL filtered set:
1012    /// IPv4-only (IPv6 dropped under the IPv6-off posture), deduplicated, order preserved.
1013    #[test]
1014    fn filter_advertise_routes_keeps_v4_dedups_drops_v6() {
1015        let v4a: ipnet::IpNet = "10.0.0.0/24".parse().unwrap();
1016        let v4b: ipnet::IpNet = "192.168.1.0/24".parse().unwrap();
1017        let v6: ipnet::IpNet = "2001:db8::/32".parse().unwrap();
1018
1019        // Mixed input with a duplicate v4 and a v6 prefix.
1020        let out = filter_advertise_routes(vec![v4a, v6, v4b, v4a]);
1021
1022        assert_eq!(
1023            out,
1024            vec![v4a, v4b],
1025            "v6 dropped, duplicate v4 collapsed, first-occurrence order preserved"
1026        );
1027    }
1028
1029    /// An all-IPv6 request filters to empty (we never advertise a route we won't forward) rather
1030    /// than erroring — clearing the advertised set is a legitimate outcome.
1031    #[test]
1032    fn filter_advertise_routes_all_v6_is_empty() {
1033        let v6: ipnet::IpNet = "2001:db8::/32".parse().unwrap();
1034        assert!(filter_advertise_routes(vec![v6]).is_empty());
1035    }
1036
1037    /// A `HandlerError` carries the real `IdTokenError` from the RPC handler and must pass through
1038    /// verbatim, not be flattened to a generic network error. Using an `Internal(_)` payload (not
1039    /// `NetworkError`) makes the passthrough observable: a buggy flatten that always returned
1040    /// `NetworkError` would fail this assertion.
1041    #[test]
1042    fn flatten_send_err_handler_error_passes_through() {
1043        // Build an `Internal(_)` payload via the public `From<Utf8Error>` conversion (no extra
1044        // deps): it is distinct from the `_ => NetworkError` fallback, so a buggy flatten that
1045        // always returned `NetworkError` would fail this assertion.
1046        // Route the invalid bytes through a runtime Vec so the `invalid_from_utf8` lint (which only
1047        // fires on compile-time-known literals) doesn't flag this intentional bad input.
1048        let bytes = vec![0xffu8, 0xfe];
1049        let utf8_err = core::str::from_utf8(&bytes).unwrap_err();
1050        let inner = ts_control::IdTokenError::from(utf8_err);
1051        assert!(matches!(inner, ts_control::IdTokenError::Internal(_)));
1052        let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
1053            kameo::error::SendError::HandlerError(inner.clone());
1054        assert_eq!(flatten_send_err(e), inner);
1055    }
1056
1057    /// A non-handler send failure (actor stopped) is a delivery problem, not an RPC result, so it
1058    /// must collapse to a transient `NetworkError`.
1059    #[test]
1060    fn flatten_send_err_actor_stopped_is_network_error() {
1061        let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
1062            kameo::error::SendError::ActorStopped;
1063        assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
1064    }
1065
1066    /// `ActorNotRunning` (the message bounces back undelivered) is likewise a delivery failure and
1067    /// must map to a transient `NetworkError`.
1068    #[test]
1069    fn flatten_send_err_actor_not_running_is_network_error() {
1070        let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
1071            kameo::error::SendError::ActorNotRunning(control_runner::FetchIdToken {
1072                audience: "sts.amazonaws.com".to_string(),
1073            });
1074        assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
1075    }
1076
1077    /// A `HandlerError` from the logout RPC carries the real `LogoutError` and must pass through
1078    /// verbatim. An `Internal(_)` payload (distinct from the `_ => NetworkError` fallback) makes the
1079    /// passthrough observable.
1080    #[test]
1081    fn flatten_logout_send_err_handler_error_passes_through() {
1082        let inner = ts_control::LogoutError::Internal(ts_control::LogoutInternalErrorKind::Http);
1083        assert!(matches!(inner, ts_control::LogoutError::Internal(_)));
1084        let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
1085            kameo::error::SendError::HandlerError(inner.clone());
1086        assert_eq!(flatten_logout_send_err(e), inner);
1087    }
1088
1089    /// A non-handler send failure (actor stopped) is a delivery problem, not a logout result, and
1090    /// collapses to a transient `NetworkError` (logout is idempotent, so a retry is safe).
1091    #[test]
1092    fn flatten_logout_send_err_actor_stopped_is_network_error() {
1093        let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
1094            kameo::error::SendError::ActorStopped;
1095        assert_eq!(
1096            flatten_logout_send_err(e),
1097            ts_control::LogoutError::NetworkError
1098        );
1099    }
1100
1101    /// A `HandlerError` from the set-dns RPC carries the real `SetDnsError` and must pass through
1102    /// verbatim. An `Internal(_)` payload (distinct from the `_ => NetworkError` fallback) makes the
1103    /// passthrough observable.
1104    #[test]
1105    fn flatten_set_dns_send_err_handler_error_passes_through() {
1106        let inner = ts_control::SetDnsError::Internal(ts_control::SetDnsInternalErrorKind::Http);
1107        assert!(matches!(inner, ts_control::SetDnsError::Internal(_)));
1108        let e: kameo::error::SendError<control_runner::SetDns, ts_control::SetDnsError> =
1109            kameo::error::SendError::HandlerError(inner.clone());
1110        assert_eq!(flatten_set_dns_send_err(e), inner);
1111    }
1112
1113    /// A non-handler send failure (actor stopped) is a delivery problem, not a publish result, and
1114    /// collapses to a transient `NetworkError`.
1115    #[test]
1116    fn flatten_set_dns_send_err_actor_stopped_is_network_error() {
1117        let e: kameo::error::SendError<control_runner::SetDns, ts_control::SetDnsError> =
1118            kameo::error::SendError::ActorStopped;
1119        assert_eq!(
1120            flatten_set_dns_send_err(e),
1121            ts_control::SetDnsError::NetworkError
1122        );
1123    }
1124}