Skip to main content

ts_runtime/
lib.rs

1#![doc = include_str!("../README.md")]
2
3extern crate ts_netstack_smoltcp as netstack;
4
5use core::time::Duration;
6use std::sync::Arc;
7
8use kameo::{
9    actor::{ActorRef, Spawn, WeakActorRef},
10    mailbox::Signal,
11};
12use netstack::netcore::Channel;
13use tokio::sync::watch;
14
15use crate::{
16    control_runner::ControlRunner, dataplane::DataplaneActor, direct::DirectManager,
17    forwarder_actor::ForwarderActor, multiderp::Multiderp, netstack_actor::NetstackActor,
18};
19
20/// Pcap stream framer for debug packet capture (`CapturePcap`).
21pub mod capture;
22/// Control runner.
23pub mod control_runner;
24mod dataplane;
25mod derp_latency;
26/// Device connection-state tracking ([`DeviceState`]) and typed registration outcome
27/// ([`RegistrationError`]).
28pub mod device_state;
29mod direct;
30mod env;
31mod error;
32/// Fallback TCP handler registry (`tsnet.Server.RegisterFallbackTCPHandler` parity).
33pub mod fallback_tcp;
34mod forwarder_actor;
35/// Client-side Funnel ingress termination (`tsnet`'s `ListenFunnel` data path).
36pub mod funnel;
37mod magic_dns;
38mod multiderp;
39mod netstack_actor;
40mod packetfilter;
41pub mod peer_tracker;
42mod peerapi;
43mod peerapi_doh;
44mod route_updater;
45/// Stored Serve config + accept-loop runtime (`tsnet`'s `Get/SetServeConfig` + serving runtime).
46pub mod serve;
47mod src_filter;
48/// Netmap status snapshot, WhoIs, and watcher types.
49pub mod status;
50/// Taildrop peer-to-peer file transfer store.
51pub mod taildrop;
52pub mod taildrop_send;
53#[cfg(feature = "tun")]
54mod tun_actor;
55
56pub use device_state::{DeviceState, RegistrationError};
57pub(crate) use env::Env;
58pub use error::{Error, ErrorKind};
59pub use status::{Status, StatusNode, WhoIs};
60pub use ts_dataplane::{CaptureHook, CapturePath};
61
62use crate::peer_tracker::PeerTracker;
63
64/// The runtime for a tailscale device.
65pub struct Runtime {
66    /// Reference to the control actor.
67    pub control: ActorRef<ControlRunner>,
68    dataplane: ActorRef<DataplaneActor>,
69    /// Reference to the direct (disco/UDP underlay) manager, retained so [`Runtime::rebind`] can
70    /// ask it to re-bind the underlay socket on a network/link change.
71    direct: ActorRef<DirectManager>,
72    /// Reference to the application netstack actor. `None` in TUN transport mode, where there is
73    /// no userspace application netstack (the application data path is a real kernel TUN device).
74    netstack: Option<WeakActorRef<NetstackActor>>,
75    /// Reference to the peer tracker for peer lookups.
76    pub peer_tracker: WeakActorRef<PeerTracker>,
77    /// Fallback TCP handler registry, bound to the application netstack. `None` in TUN transport
78    /// mode (no application netstack exists to attach it to).
79    fallback_tcp: Option<fallback_tcp::FallbackTcpManager>,
80    env: Env,
81    shutdown: watch::Sender<bool>,
82    /// Sender side of the exit-node selector `watch` cell. Held privately here (not on the cloned
83    /// `Env`, which keeps only the read side) so that only `Runtime::set_exit_node` can mutate the
84    /// selection; the route updater and source filter re-read it via [`Env::exit_node`].
85    exit_node_tx: watch::Sender<Option<ts_control::ExitNodeSelector>>,
86    /// Receiver mirroring the *active* (resolved + fail-closed) exit node's stable id, fed by the
87    /// route updater. Read by [`Runtime::status`] / [`Runtime::active_exit_node`] to report which
88    /// exit node traffic is actually egressing through (vs. the merely-configured selector).
89    active_exit_rx: watch::Receiver<Option<ts_control::StableNodeId>>,
90    /// Receiver for the device connection-state cell, fed by the control runner. Read by
91    /// [`Runtime::watch_state`] and [`Runtime::wait_until_running`].
92    state_rx: watch::Receiver<DeviceState>,
93}
94
95impl Runtime {
96    /// Spawn a new runtime with the given parameters for connecting to a tailnet.
97    pub async fn spawn(
98        config: ts_control::Config,
99        auth_key: Option<String>,
100        keys: ts_keys::NodeState,
101    ) -> Result<Self, Error> {
102        let (shutdown_tx, shutdown_rx) = watch::channel(false);
103
104        // The exit-node selector is a live `watch` cell so `Device::set_exit_node` can change it at
105        // runtime. `new_with_exit_tx` returns the `Sender` (mutation capability) separately so it is
106        // retained privately on the `Runtime`, while only the `Receiver` (the readers' contract)
107        // lives on the cloned `Env`. The initial value comes from `ForwarderConfig.exit_node`.
108        let (env, exit_node_tx) = Env::new_with_exit_tx(
109            keys,
110            shutdown_rx,
111            env::ForwarderConfig::from_control_config(&config),
112        );
113
114        // Both userspace netstacks (application + forwarder) share one netstack config. Honor the
115        // per-deployment TCP buffer knob when set, otherwise fall back to the netstack default.
116        let netstack_config = netstack_config_from(config.tcp_buffer_size);
117
118        let dataplane = DataplaneActor::spawn(env.clone());
119
120        let (netstack_id, netstack_up, netstack_down) =
121            dataplane.ask(dataplane::NewOverlayTransport).await?;
122
123        // A second overlay transport feeds the dedicated any-IP forwarder netstack. Inbound packets
124        // for advertised subnet routes / the exit-node default route are routed here (see
125        // `route_updater`), keeping forwarded flows off the application netstack.
126        let (forwarder_id, forwarder_up, forwarder_down) =
127            dataplane.ask(dataplane::NewOverlayTransport).await?;
128
129        let multiderp = Multiderp::spawn((env.clone(), dataplane.clone()));
130
131        // Spawn the direct (disco) underlay manager before the route updater. Its `on_start`
132        // binds the UDP socket and registers its transport synchronously, so by the time the
133        // route updater asks it for the direct transport id it is guaranteed to be available.
134        let direct = DirectManager::spawn((env.clone(), dataplane.clone(), multiderp.clone()));
135
136        // Spawn the forwarder before the route updater. Its `on_start` builds the forwarder
137        // netstack, enables any-IP acceptance, and starts the per-port accept loops synchronously,
138        // so by the time the route updater begins delivering advertised prefixes to
139        // `forwarder_id` the netstack is already draining its transport.
140        let forwarder = ForwarderActor::spawn((
141            env.clone(),
142            netstack_config.clone(),
143            forwarder_up,
144            forwarder_down,
145        ));
146        // Force `on_start` to finish (any-IP enabled, accept loops live) before the route updater
147        // can route the first inbound flow to `forwarder_id`: an `ask` blocks until the actor has
148        // started.
149        //
150        // The forwarder netstack's overlay `Channel` is reused by the TUN application path for
151        // recursive / exit-node-DoH MagicDNS forwarding (TUN mode has no application netstack of its
152        // own, but the forwarder netstack runs in both modes and egresses over the overlay — the
153        // anti-leak property `forward_query`/`forward_doh` require). Only the `tun` Tun arm consumes
154        // it, so it is unused when the `tun` feature is off — allow that without warn-as-error.
155        #[cfg_attr(not(feature = "tun"), allow(unused_variables))]
156        let (forwarder_channel,) = forwarder.ask(forwarder_actor::GetChannel).await?;
157
158        // The route updater is the single authoritative resolver of the active (resolved,
159        // fail-closed) exit node; it publishes the resolved stable id into this watch cell so
160        // `Runtime::status` can report which exit is actually engaged (not just configured).
161        let (active_exit_tx, active_exit_rx) = watch::channel(None);
162        route_updater::RouteUpdater::spawn((
163            multiderp.clone(),
164            direct.clone(),
165            env.clone(),
166            netstack_id,
167            forwarder_id,
168            active_exit_tx,
169        ));
170        packetfilter::PacketfilterUpdater::spawn(env.clone());
171        src_filter::SourceFilterUpdater::spawn(env.clone());
172        let peer_tracker = PeerTracker::spawn(env.clone()).downgrade();
173
174        // Select the application data path from the transport mode. The forwarder/egress path
175        // above is UNCHANGED in both modes — TUN mode only swaps the application data path, never
176        // the forwarder. `config` is moved into `ControlRunner::spawn` below, so branch on a
177        // borrow and clone the small `TunConfig` where needed before the move.
178        //
179        // - Netstack (the default, and the only reachable arm when the `tun` feature is off):
180        //   spawn the application netstack + MagicDNS responder + fallback-TCP registry, all on
181        //   the `netstack_up`/`netstack_down` overlay seam.
182        // - Tun: spawn `TunActor` on that same overlay seam instead; no application netstack and
183        //   no MagicDNS responder exist, and `netstack`/`fallback_tcp` are `None`.
184        // - Tun requested but built without the `tun` feature: hard-error (a config/build
185        //   mismatch knowable at spawn time). NEVER silently fall back to netstack.
186        let (netstack, fallback_tcp) = match &config.transport_mode {
187            ts_control::TransportMode::Netstack => {
188                let netstack = NetstackActor::spawn((
189                    env.clone(),
190                    netstack_config,
191                    netstack_up,
192                    netstack_down,
193                ));
194
195                // Fetch the netstack channel while we still hold the strong ActorRef, then spawn
196                // the MagicDNS responder on it. Fire-and-forget: like src_filter/route_updater,
197                // it's owned by the message bus and isn't stored on `Runtime`.
198                let (channel,) = netstack.ask(netstack_actor::GetChannel).await?;
199                // The fallback-TCP registry attaches to the application netstack — the same one
200                // that carries the embedder's explicit `Device::tcp_listen` sockets — so a
201                // fallback handler sees exactly the inbound flows no explicit listener matched.
202                let fallback_tcp = fallback_tcp::FallbackTcpManager::new(channel.clone());
203                magic_dns::MagicDnsActor::spawn((env.clone(), channel));
204
205                (Some(netstack.downgrade()), Some(fallback_tcp))
206            }
207
208            #[cfg(feature = "tun")]
209            ts_control::TransportMode::Tun(tun_cfg) => {
210                // Reuse the same `netstack_up`/`netstack_down` overlay-transport pair that would
211                // have fed the netstack — it is just the application-side overlay seam (the name
212                // is historical). No NetstackActor / MagicDnsActor is spawned.
213                tun_actor::TunActor::spawn((
214                    env.clone(),
215                    tun_cfg.clone(),
216                    netstack_up,
217                    netstack_down,
218                    // Host-route gating inputs derived from `Env`: subnet routes are only steered
219                    // into the TUN when `--accept-routes` is set, and the host `/0` only when the
220                    // embedder configured an exit node. See `tun_actor::host_routes_from_node`.
221                    tun_actor::HostRouteGating {
222                        accept_routes: env.accept_routes,
223                        exit_node_configured: env.exit_node().is_some(),
224                    },
225                    // Reuse the forwarder netstack's overlay `Channel` for recursive / exit-node-DoH
226                    // MagicDNS forwarding in the TUN datapath (TUN mode has no application netstack
227                    // Channel of its own). Egresses over the overlay — anti-leak preserved.
228                    forwarder_channel.clone(),
229                ));
230
231                (None, None)
232            }
233
234            #[cfg(not(feature = "tun"))]
235            ts_control::TransportMode::Tun(_) => {
236                return Err(Error {
237                    kind: ErrorKind::TunUnavailable,
238                    target_actor: None,
239                    message_ty: None,
240                });
241            }
242        };
243
244        // Device connection-state cell. Created here (not inside the actor) so the control runner's
245        // `on_start` can publish `Failed`/`NeedsLogin` and still return `Err` without the sender
246        // being tied to a `Self` that never gets constructed on a hard registration failure.
247        let (state_tx, state_rx) = watch::channel(DeviceState::Connecting);
248
249        let control = ControlRunner::spawn(control_runner::Params {
250            config,
251            auth_key,
252            env: env.clone(),
253            state_tx,
254        });
255
256        Ok(Self {
257            control,
258            dataplane,
259            direct,
260            peer_tracker,
261            fallback_tcp,
262            netstack,
263            env,
264            shutdown: shutdown_tx,
265            exit_node_tx,
266            active_exit_rx,
267            state_rx,
268        })
269    }
270
271    /// Register a fallback TCP handler consulted for every inbound TCP flow that matches no
272    /// explicit listener (`tsnet.Server.RegisterFallbackTCPHandler` parity).
273    ///
274    /// The returned [`fallback_tcp::FallbackTcpHandle`] deregisters the handler when dropped. See
275    /// [`fallback_tcp`] for the dispatch contract and anti-leak guarantees.
276    ///
277    /// Returns [`ErrorKind::UnsupportedInTunMode`] in TUN transport mode, where there is no
278    /// application netstack to attach a fallback handler to.
279    pub fn register_fallback_tcp_handler(
280        &self,
281        cb: Arc<
282            dyn Fn(core::net::SocketAddr, core::net::SocketAddr) -> fallback_tcp::FallbackDecision
283                + Send
284                + Sync,
285        >,
286    ) -> Result<fallback_tcp::FallbackTcpHandle, Error> {
287        Ok(self
288            .fallback_tcp
289            .as_ref()
290            .ok_or(Error {
291                kind: ErrorKind::UnsupportedInTunMode,
292                target_actor: None,
293                message_ty: None,
294            })?
295            .register(cb))
296    }
297
298    /// Get a channel to send commands to the netstack.
299    ///
300    /// Returns [`ErrorKind::UnsupportedInTunMode`] in TUN transport mode, where there is no
301    /// application netstack.
302    pub async fn channel(&self) -> Result<Channel, Error> {
303        let (channel,) = self
304            .netstack
305            .as_ref()
306            .ok_or(Error {
307                kind: ErrorKind::UnsupportedInTunMode,
308                target_actor: None,
309                message_ty: None,
310            })?
311            .upgrade()
312            .ok_or(Error {
313                kind: ErrorKind::ActorGone,
314                target_actor: None,
315                message_ty: None,
316            })?
317            .ask(netstack_actor::GetChannel)
318            .await?;
319
320        Ok(channel)
321    }
322
323    /// The Taildrop file store, if Taildrop is enabled (`taildrop_dir` configured and the store
324    /// initialized). `None` when disabled — fail-closed. Shared with the peerAPI Taildrop server so
325    /// the embedder's read APIs and the receive path see the same on-disk store.
326    pub fn taildrop_store(&self) -> Option<Arc<crate::taildrop::TaildropStore>> {
327        self.env.taildrop_store.clone()
328    }
329
330    /// The shared Funnel ingress slot the peerAPI `/v0/ingress` route reads per connection.
331    ///
332    /// `Device::listen_funnel` installs a [`FunnelManager`](crate::funnel::FunnelManager)'s sink here
333    /// to make the route live (the peerAPI server is already running from startup). Returns a clone of
334    /// the runtime-lifetime `Arc` so the device can write the slot without restarting the server. See
335    /// [`crate::funnel`] for the ingress data path.
336    pub fn funnel_ingress_slot(&self) -> crate::funnel::FunnelIngressSlot {
337        self.env.funnel_ingress.clone()
338    }
339
340    /// The shared "Funnel ingress listener active" flag (the same `Arc` the control session reads to
341    /// set `HostInfo.IngressEnabled`). `Device::listen_funnel` flips it `true` while a funnel listener
342    /// is up so control routes Funnel traffic to this node; clearing it advertises no live endpoint.
343    pub fn ingress_active_flag(&self) -> std::sync::Arc<std::sync::atomic::AtomicBool> {
344        self.env.ingress_active.clone()
345    }
346
347    /// Install (`Some`) or clear (`None`) the debug packet-capture hook on the running dataplane.
348    /// `Some(hook)` tees every plaintext packet crossing the datapath to `hook` until it is cleared;
349    /// `None` stops capture. Mirrors Go `tstun.Wrapper.InstallCaptureHook` / `ClearCaptureSink`.
350    pub async fn install_capture(
351        &self,
352        hook: Option<ts_dataplane::CaptureHook>,
353    ) -> Result<(), Error> {
354        self.dataplane
355            .ask(dataplane::InstallCapture { hook })
356            .await
357            .map_err(Into::into)
358    }
359
360    /// Re-bind the underlay UDP socket after a network/link change (Wi-Fi switch, sleep/wake). The
361    /// embedder's own link monitor calls this (the engine owns the socket re-bind; the embedder owns
362    /// OS netmon). Re-binds the socket (same-port-preferred, IPv4-only invariant preserved) and
363    /// resets the now-stale local NAT mapping — clearing learned reflexive addresses and every
364    /// confirmed direct path while keeping candidate endpoints, so peers re-probe over the new socket
365    /// and relay over DERP (never a direct host dial) until a path re-confirms. Peers, control, the
366    /// netmap, disco state, and DERP are untouched. A no-op when the underlay is inert (bind failed
367    /// at startup, DERP-only). Mirrors Go magicsock `Conn.Rebind` + `resetEndpointStates`.
368    pub async fn rebind(&self) -> Result<(), Error> {
369        self.direct.ask(direct::Rebind).await.map_err(Error::from)
370    }
371
372    /// A snapshot of the local netmap: this node plus every known peer.
373    ///
374    /// Combines the self node held by the control runner with the peer set held by the peer
375    /// tracker. Mirrors tsnet's `LocalClient::Status`.
376    ///
377    /// `self_node` is `None` until the first netmap update has been received from control. Peer
378    /// entries carry no online/user/capability data (see the [`status`] module docs for that gap).
379    pub async fn status(&self) -> Result<Status, Error> {
380        let self_node = self
381            .control
382            .ask(control_runner::SelfNode)
383            .await?
384            .as_ref()
385            .map(StatusNode::from_node);
386
387        let peers = self
388            .peer_tracker
389            .upgrade()
390            .ok_or(Error {
391                kind: ErrorKind::ActorGone,
392                target_actor: None,
393                message_ty: None,
394            })?
395            .ask(peer_tracker::GetStatus)
396            .await?;
397
398        Ok(Status {
399            self_node,
400            peers,
401            active_exit_node: self.active_exit_node(),
402        })
403    }
404
405    /// The stable id of the exit node traffic is currently egressing through, or `None` if none is
406    /// engaged. This is the route updater's resolved + fail-closed answer (see
407    /// [`Status::active_exit_node`](crate::status::Status::active_exit_node)): it differs from the
408    /// configured [`exit_node`](Self::exit_node) selector, which may name a peer that is absent or
409    /// no longer advertising a default route (in which case egress is dropped and this returns
410    /// `None`).
411    pub fn active_exit_node(&self) -> Option<ts_control::StableNodeId> {
412        self.active_exit_rx.borrow().clone()
413    }
414
415    /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
416    ///
417    /// Returns the signed JWT, or the token RPC's own [`ts_control::IdTokenError`]. The kameo
418    /// delegated-reply send error is flattened: a handler error carries the real `IdTokenError`,
419    /// any other send failure (actor shutdown / mailbox closed) is surfaced as
420    /// [`ts_control::IdTokenError::NetworkError`].
421    pub async fn fetch_id_token(
422        &self,
423        audience: String,
424    ) -> Result<String, ts_control::IdTokenError> {
425        self.control
426            .ask(control_runner::FetchIdToken { audience })
427            .await
428            .map_err(flatten_send_err)
429    }
430
431    /// Log this node out of the tailnet: deregister it by expiring its current node key.
432    ///
433    /// Forwards to the control runner, which re-POSTs `/machine/register` with a past expiry over a
434    /// fresh Noise channel. This is a control-plane state change only — it does NOT shut the runtime
435    /// down (the caller follows with [`graceful_shutdown`](Self::graceful_shutdown)) and does not
436    /// touch the on-disk node key. The kameo delegated-reply send error is flattened the same way as
437    /// [`fetch_id_token`](Self::fetch_id_token): a handler error carries the real
438    /// [`ts_control::LogoutError`]; any other send failure (actor shutdown / mailbox closed) is
439    /// surfaced as [`ts_control::LogoutError::NetworkError`].
440    pub async fn logout(&self) -> Result<(), ts_control::LogoutError> {
441        self.control
442            .ask(control_runner::Logout)
443            .await
444            .map_err(flatten_logout_send_err)
445    }
446
447    /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` (`acme` feature).
448    ///
449    /// Mirrors [`fetch_id_token`](Self::fetch_id_token): forwards to the control runner, which runs
450    /// the client-side ACME DNS-01 flow on a spawned task and publishes the challenge TXT via the
451    /// node's set-dns RPC. The kameo delegated-reply send error is flattened — a handler error
452    /// carries the real [`ts_control::CertError`]; any other send failure (actor shutdown / mailbox
453    /// closed) is surfaced as a [`ts_control::CertError::Io`]. SaaS-only: a self-hosted control
454    /// plane 501s on set-dns.
455    #[cfg(feature = "acme")]
456    pub async fn get_certificate(
457        &self,
458        name: String,
459    ) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
460        self.control
461            .ask(control_runner::GetCertificate { name })
462            .await
463            .map_err(flatten_cert_send_err)
464    }
465
466    /// Resolve which node owns a tailnet source address.
467    ///
468    /// Maps the source IP of `addr` to its owning node. Mirrors tsnet's `LocalClient::WhoIs`.
469    /// Returns `None` if no peer holds that tailnet IP. The returned [`WhoIs`] carries no
470    /// user/login or capability data in this fork (see the [`status`] module docs).
471    pub async fn whois(&self, addr: core::net::SocketAddr) -> Result<Option<WhoIs>, Error> {
472        self.peer_tracker
473            .upgrade()
474            .ok_or(Error {
475                kind: ErrorKind::ActorGone,
476                target_actor: None,
477                message_ty: None,
478            })?
479            .ask(peer_tracker::Whois { addr })
480            .await
481            .map_err(Into::into)
482    }
483
484    /// Change the selected exit node at runtime (the equivalent of Go `tsnet`'s
485    /// `LocalClient.EditPrefs(ExitNodeID/ExitNodeIP)`), without recreating the device.
486    ///
487    /// Updates the live exit-node selector, then asks the peer tracker to re-broadcast the current
488    /// peer set so the route updater and source filter re-resolve the new selector immediately.
489    /// `None` clears the exit node (internet-bound traffic is then dropped, fail-closed, unless this
490    /// node egresses directly). The selection is re-resolved against the live peer set, so passing a
491    /// selector for a peer not yet in the netmap simply takes effect once that peer appears.
492    pub async fn set_exit_node(
493        &self,
494        selector: Option<ts_control::ExitNodeSelector>,
495    ) -> Result<(), Error> {
496        // Update the live cell every reader borrows from. `send_replace` keeps the value current
497        // even with no active receivers (none can have dropped while the runtime is up, but it is
498        // the right non-failing primitive here).
499        self.exit_node_tx.send_replace(selector);
500
501        // Trigger an immediate re-resolution: the route updater (outbound routes + DoH delegation)
502        // and the source filter (inbound validation) both recompute on an `Arc<PeerState>`, so a
503        // re-broadcast applies the new exit without waiting for the next netmap update.
504        self.peer_tracker
505            .upgrade()
506            .ok_or(Error {
507                kind: ErrorKind::ActorGone,
508                target_actor: None,
509                message_ty: None,
510            })?
511            .ask(peer_tracker::RepublishState)
512            .await
513            .map_err(Into::into)
514    }
515
516    /// The currently-selected exit node, or `None` if none is selected.
517    pub fn exit_node(&self) -> Option<ts_control::ExitNodeSelector> {
518        self.env.exit_node()
519    }
520
521    /// Subscribe to netmap peer-change events.
522    ///
523    /// Returns a [`watch::Receiver`] whose value is the current set of peer [`StatusNode`]s,
524    /// updated on every netmap state update from control. Mirrors tsnet's `WatchIPNBus`. Await
525    /// [`watch::Receiver::changed`](tokio::sync::watch::Receiver::changed) to react to peers
526    /// joining, leaving, or changing.
527    pub async fn watch_netmap(&self) -> Result<watch::Receiver<Vec<StatusNode>>, Error> {
528        self.peer_tracker
529            .upgrade()
530            .ok_or(Error {
531                kind: ErrorKind::ActorGone,
532                target_actor: None,
533                message_ty: None,
534            })?
535            .ask(peer_tracker::WatchNetmap)
536            .await
537            .map_err(Into::into)
538    }
539
540    /// The current device connection-[`DeviceState`].
541    pub fn device_state(&self) -> DeviceState {
542        self.state_rx.borrow().clone()
543    }
544
545    /// Watch the device connection-[`DeviceState`] (`Connecting` → `Running` / `NeedsLogin` /
546    /// `Expired` / `Failed`).
547    ///
548    /// Returns a [`watch::Receiver`]; await
549    /// [`changed`](tokio::sync::watch::Receiver::changed) to react push-style to control connection
550    /// transitions instead of polling [`status`](Self::status). The initial value is the current
551    /// state. Note: a transient per-reconnect dip back to `Connecting` is **not** currently
552    /// emitted (control transparently reconnects below this layer); the state reflects registration
553    /// outcome and node-key expiry.
554    pub fn watch_state(&self) -> watch::Receiver<DeviceState> {
555        self.state_rx.clone()
556    }
557
558    /// Wait until the device finishes registering, returning a typed outcome.
559    ///
560    /// Resolves `Ok(())` once the device reaches [`DeviceState::Running`]. Returns a typed
561    /// [`RegistrationError`] otherwise — the actionable distinction between "retry", "re-pair", and
562    /// "drive interactive login" that replaces polling [`ipv4_addr`](Self::ipv4_addr) in a loop:
563    /// - `AuthRejected` — bad/expired/unknown auth key. **Permanent** (re-pair).
564    /// - `NeedsLogin(url)` — interactive authorization required (no usable auth key). **Not
565    ///   permanent**: the runtime keeps retrying and will reach `Running` once the user authorizes
566    ///   the URL. An **auth-key** caller should treat this as a failure; an **interactive** caller
567    ///   should ignore this return and instead drive the flow via [`watch_state`](Self::watch_state)
568    ///   (this method returns the URL eagerly rather than blocking for the whole login).
569    /// - `NetworkUnreachable` — control unreachable. **Transient** (retry).
570    /// - `Timeout` — no settled state within `timeout`.
571    ///
572    /// `KeyExpired` is not produced by this initial wait (a node key expires only *after* it has
573    /// come up); observe post-registration expiry via [`watch_state`](Self::watch_state).
574    /// `timeout` of `None` waits indefinitely for a settled state.
575    pub async fn wait_until_running(
576        &self,
577        timeout: Option<Duration>,
578    ) -> Result<(), RegistrationError> {
579        device_state::wait_for_running(self.state_rx.clone(), timeout).await
580    }
581
582    /// Attempt to shut down the runtime gracefully.
583    ///
584    /// Returns false if the shutdown timed out. It is still shut down if it timed out, just
585    /// more violently and with possible resource leaks.
586    pub async fn graceful_shutdown(self, timeout: Option<Duration>) -> bool {
587        self.shutdown.send_replace(true);
588
589        async fn _shutdown_all(runtime: Runtime) {
590            // See the note in `Drop` for why we only need to stop these actors to bring down the
591            // whole runtime.
592
593            let _ignore = runtime.control.stop_gracefully().await;
594            let _ignore = runtime.dataplane.stop_gracefully().await;
595            let _ignore = runtime.env.bus.stop_gracefully().await;
596
597            tokio::join![
598                runtime.control.wait_for_shutdown(),
599                runtime.dataplane.wait_for_shutdown(),
600                runtime.env.bus.wait_for_shutdown(),
601            ];
602        }
603
604        let fut = _shutdown_all(self);
605
606        match timeout {
607            Some(timeout) => tokio::time::timeout(timeout, fut).await.is_ok(),
608            None => {
609                fut.await;
610                true
611            }
612        }
613    }
614}
615
616impl Drop for Runtime {
617    fn drop(&mut self) {
618        // We must have already run `graceful_shutdown`: on the happy path, this does nothing, but
619        // if it timed out, we need to make sure the actors are dead so we don't leak them and their
620        // dependents.
621        if *self.shutdown.borrow() {
622            self.control.kill();
623            self.dataplane.kill();
624            self.env.bus.kill();
625            return;
626        }
627
628        self.shutdown.send_replace(true);
629
630        // Actors shut down when the last ActorRef to them is dropped (as nothing can send them
631        // messages anymore). If we don't hold an ActorRef in Runtime, in general the only thing
632        // that has one is the MessageBus, which each actor subscribes to for a subset of messages.
633        // Hence, if we shut down the bus, most actors die as well.
634
635        // First shut down the actors we have an ActorRef to:
636        try_shutdown(&self.control);
637        try_shutdown(&self.dataplane);
638
639        // Then shutdown the message bus, stopping the rest of the actors:
640        try_shutdown(&self.env.bus);
641    }
642}
643
644fn try_shutdown(a: &ActorRef<impl kameo::Actor>) {
645    if let Err(e) = a.mailbox_sender().try_send(Signal::Stop) {
646        tracing::error!(error = %e, "graceful shutdown failed, killing actor");
647        a.kill();
648    }
649}
650
651/// Build the netstack config shared by both userspace netstacks (application + forwarder) from the
652/// per-deployment `tcp_buffer_size` knob.
653///
654/// `None` keeps the netstack default (256 KiB/direction); `Some(n)` overrides it (e.g. a smaller
655/// window on a memory-constrained exit node forwarding many concurrent flows — see
656/// [`netstack::netcore::Config::tcp_buffer_size`]). Factored out of [`Runtime::spawn`] so the
657/// None-default / Some-override mapping is unit-testable without standing up the actor system.
658fn netstack_config_from(tcp_buffer_size: Option<usize>) -> netstack::netcore::Config {
659    let mut c = netstack::netcore::Config::default();
660    if let Some(tcp_buffer_size) = tcp_buffer_size {
661        c.tcp_buffer_size = tcp_buffer_size;
662    }
663    c
664}
665
666/// Flatten a kameo delegated-reply [`SendError`] for the id-token RPC into the RPC's own
667/// [`ts_control::IdTokenError`].
668///
669/// A [`SendError::HandlerError`](kameo::error::SendError::HandlerError) carries the real
670/// `IdTokenError` produced by the handler and is surfaced verbatim. Any other send failure (actor
671/// not running / stopped, mailbox full, send timeout) is a delivery problem rather than an RPC
672/// result, so it collapses to a transient [`ts_control::IdTokenError::NetworkError`]. Factored out
673/// of [`Runtime::fetch_id_token`] so this mapping is unit-testable without standing up an actor.
674fn flatten_send_err<M>(
675    e: kameo::error::SendError<M, ts_control::IdTokenError>,
676) -> ts_control::IdTokenError {
677    match e {
678        kameo::error::SendError::HandlerError(err) => err,
679        _ => ts_control::IdTokenError::NetworkError,
680    }
681}
682
683/// Flatten a kameo `SendError` from the `Logout` ask into a [`ts_control::LogoutError`].
684///
685/// A `HandlerError` carries the real `LogoutError` from the control RPC and is surfaced verbatim;
686/// any other send failure (actor not running / stopped, mailbox full, send timeout) — a delivery
687/// problem, not a logout result — collapses to the transient [`ts_control::LogoutError::NetworkError`]
688/// (logout is idempotent, so a retry after a delivery failure is safe). Factored out of
689/// [`Runtime::logout`] so the mapping is unit-testable without standing up an actor.
690fn flatten_logout_send_err<M>(
691    e: kameo::error::SendError<M, ts_control::LogoutError>,
692) -> ts_control::LogoutError {
693    match e {
694        kameo::error::SendError::HandlerError(err) => err,
695        _ => ts_control::LogoutError::NetworkError,
696    }
697}
698
699/// Flatten a kameo `SendError` from the `GetCertificate` ask into a [`ts_control::CertError`].
700///
701/// A `HandlerError` carries the real `CertError` produced by the ACME issuance and is surfaced
702/// verbatim. `CertError` has no transient-network variant, so any other send failure (actor not
703/// running / stopped, mailbox full, send timeout) — a delivery problem rather than an issuance
704/// result — collapses to a [`ts_control::CertError::Io`]. Factored out of
705/// [`Runtime::get_certificate`] so this mapping is unit-testable without standing up an actor.
706#[cfg(feature = "acme")]
707fn flatten_cert_send_err<M>(
708    e: kameo::error::SendError<M, ts_control::CertError>,
709) -> ts_control::CertError {
710    match e {
711        kameo::error::SendError::HandlerError(err) => err,
712        _ => ts_control::CertError::Io(std::io::Error::other(
713            "control runner unavailable for certificate issuance",
714        )),
715    }
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721
722    /// `None` must leave the netstack's own default TCP window in place (the 256 KiB throughput
723    /// default), and must not silently coerce to some other value.
724    #[test]
725    fn netstack_config_none_uses_netstack_default() {
726        let default = netstack::netcore::Config::default();
727        let built = netstack_config_from(None);
728        assert_eq!(
729            built.tcp_buffer_size, default.tcp_buffer_size,
730            "None must inherit the netstack default TCP buffer size"
731        );
732    }
733
734    /// `Some(n)` must override the TCP window (the memory-vs-throughput knob exit-node operators
735    /// reach for), reaching the config that both netstacks are built from.
736    #[test]
737    fn netstack_config_some_overrides_buffer() {
738        let built = netstack_config_from(Some(64 * 1024));
739        assert_eq!(
740            built.tcp_buffer_size,
741            64 * 1024,
742            "Some(n) must override the TCP buffer size that both netstacks use"
743        );
744    }
745
746    /// A `HandlerError` carries the real `IdTokenError` from the RPC handler and must pass through
747    /// verbatim, not be flattened to a generic network error. Using an `Internal(_)` payload (not
748    /// `NetworkError`) makes the passthrough observable: a buggy flatten that always returned
749    /// `NetworkError` would fail this assertion.
750    #[test]
751    fn flatten_send_err_handler_error_passes_through() {
752        // Build an `Internal(_)` payload via the public `From<Utf8Error>` conversion (no extra
753        // deps): it is distinct from the `_ => NetworkError` fallback, so a buggy flatten that
754        // always returned `NetworkError` would fail this assertion.
755        // Route the invalid bytes through a runtime Vec so the `invalid_from_utf8` lint (which only
756        // fires on compile-time-known literals) doesn't flag this intentional bad input.
757        let bytes = vec![0xffu8, 0xfe];
758        let utf8_err = core::str::from_utf8(&bytes).unwrap_err();
759        let inner = ts_control::IdTokenError::from(utf8_err);
760        assert!(matches!(inner, ts_control::IdTokenError::Internal(_)));
761        let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
762            kameo::error::SendError::HandlerError(inner.clone());
763        assert_eq!(flatten_send_err(e), inner);
764    }
765
766    /// A non-handler send failure (actor stopped) is a delivery problem, not an RPC result, so it
767    /// must collapse to a transient `NetworkError`.
768    #[test]
769    fn flatten_send_err_actor_stopped_is_network_error() {
770        let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
771            kameo::error::SendError::ActorStopped;
772        assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
773    }
774
775    /// `ActorNotRunning` (the message bounces back undelivered) is likewise a delivery failure and
776    /// must map to a transient `NetworkError`.
777    #[test]
778    fn flatten_send_err_actor_not_running_is_network_error() {
779        let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
780            kameo::error::SendError::ActorNotRunning(control_runner::FetchIdToken {
781                audience: "sts.amazonaws.com".to_string(),
782            });
783        assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
784    }
785
786    /// A `HandlerError` from the logout RPC carries the real `LogoutError` and must pass through
787    /// verbatim. An `Internal(_)` payload (distinct from the `_ => NetworkError` fallback) makes the
788    /// passthrough observable.
789    #[test]
790    fn flatten_logout_send_err_handler_error_passes_through() {
791        let inner = ts_control::LogoutError::Internal(ts_control::LogoutInternalErrorKind::Http);
792        assert!(matches!(inner, ts_control::LogoutError::Internal(_)));
793        let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
794            kameo::error::SendError::HandlerError(inner.clone());
795        assert_eq!(flatten_logout_send_err(e), inner);
796    }
797
798    /// A non-handler send failure (actor stopped) is a delivery problem, not a logout result, and
799    /// collapses to a transient `NetworkError` (logout is idempotent, so a retry is safe).
800    #[test]
801    fn flatten_logout_send_err_actor_stopped_is_network_error() {
802        let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
803            kameo::error::SendError::ActorStopped;
804        assert_eq!(
805            flatten_logout_send_err(e),
806            ts_control::LogoutError::NetworkError
807        );
808    }
809}