Skip to main content

ts_runtime/
lib.rs

1#![doc = include_str!("../README.md")]
2
3extern crate ts_netstack_smoltcp as netstack;
4
5use core::time::Duration;
6use std::sync::Arc;
7
8use kameo::{
9    actor::{ActorRef, Spawn, WeakActorRef},
10    mailbox::Signal,
11};
12use netstack::netcore::Channel;
13use tokio::sync::watch;
14
15use crate::{
16    control_runner::ControlRunner, dataplane::DataplaneActor, direct::DirectManager,
17    forwarder_actor::ForwarderActor, multiderp::Multiderp, netstack_actor::NetstackActor,
18};
19
20/// Pcap stream framer for debug packet capture (`CapturePcap`).
21pub mod capture;
22/// Control runner.
23pub mod control_runner;
24mod dataplane;
25mod derp_latency;
26/// Device connection-state tracking ([`DeviceState`]) and typed registration outcome
27/// ([`RegistrationError`]).
28pub mod device_state;
29mod direct;
30mod env;
31mod error;
32/// Fallback TCP handler registry (`tsnet.Server.RegisterFallbackTCPHandler` parity).
33pub mod fallback_tcp;
34mod forwarder_actor;
35/// Client-side Funnel ingress termination (`tsnet`'s `ListenFunnel` data path).
36pub mod funnel;
37mod magic_dns;
38mod multiderp;
39mod netstack_actor;
40mod packetfilter;
41pub mod peer_tracker;
42mod peerapi;
43mod peerapi_doh;
44mod route_updater;
45/// Stored Serve config + accept-loop runtime (`tsnet`'s `Get/SetServeConfig` + serving runtime).
46pub mod serve;
47mod src_filter;
48/// Netmap status snapshot, WhoIs, and watcher types.
49pub mod status;
50/// Taildrop peer-to-peer file transfer store.
51pub mod taildrop;
52pub mod taildrop_send;
53#[cfg(feature = "tun")]
54mod tun_actor;
55
56pub use device_state::{DeviceState, RegistrationError};
57pub(crate) use env::Env;
58pub use error::{Error, ErrorKind};
59pub use status::{Status, StatusNode, WhoIs};
60pub use ts_dataplane::{CaptureHook, CapturePath};
61
62use crate::peer_tracker::PeerTracker;
63
64/// The runtime for a tailscale device.
65pub struct Runtime {
66    /// Reference to the control actor.
67    pub control: ActorRef<ControlRunner>,
68    dataplane: ActorRef<DataplaneActor>,
69    /// Reference to the application netstack actor. `None` in TUN transport mode, where there is
70    /// no userspace application netstack (the application data path is a real kernel TUN device).
71    netstack: Option<WeakActorRef<NetstackActor>>,
72    /// Reference to the peer tracker for peer lookups.
73    pub peer_tracker: WeakActorRef<PeerTracker>,
74    /// Fallback TCP handler registry, bound to the application netstack. `None` in TUN transport
75    /// mode (no application netstack exists to attach it to).
76    fallback_tcp: Option<fallback_tcp::FallbackTcpManager>,
77    env: Env,
78    shutdown: watch::Sender<bool>,
79    /// Sender side of the exit-node selector `watch` cell. Held privately here (not on the cloned
80    /// `Env`, which keeps only the read side) so that only `Runtime::set_exit_node` can mutate the
81    /// selection; the route updater and source filter re-read it via [`Env::exit_node`].
82    exit_node_tx: watch::Sender<Option<ts_control::ExitNodeSelector>>,
83    /// Receiver mirroring the *active* (resolved + fail-closed) exit node's stable id, fed by the
84    /// route updater. Read by [`Runtime::status`] / [`Runtime::active_exit_node`] to report which
85    /// exit node traffic is actually egressing through (vs. the merely-configured selector).
86    active_exit_rx: watch::Receiver<Option<ts_control::StableNodeId>>,
87    /// Receiver for the device connection-state cell, fed by the control runner. Read by
88    /// [`Runtime::watch_state`] and [`Runtime::wait_until_running`].
89    state_rx: watch::Receiver<DeviceState>,
90}
91
92impl Runtime {
93    /// Spawn a new runtime with the given parameters for connecting to a tailnet.
94    pub async fn spawn(
95        config: ts_control::Config,
96        auth_key: Option<String>,
97        keys: ts_keys::NodeState,
98    ) -> Result<Self, Error> {
99        let (shutdown_tx, shutdown_rx) = watch::channel(false);
100
101        // The exit-node selector is a live `watch` cell so `Device::set_exit_node` can change it at
102        // runtime. `new_with_exit_tx` returns the `Sender` (mutation capability) separately so it is
103        // retained privately on the `Runtime`, while only the `Receiver` (the readers' contract)
104        // lives on the cloned `Env`. The initial value comes from `ForwarderConfig.exit_node`.
105        let (env, exit_node_tx) = Env::new_with_exit_tx(
106            keys,
107            shutdown_rx,
108            env::ForwarderConfig::from_control_config(&config),
109        );
110
111        // Both userspace netstacks (application + forwarder) share one netstack config. Honor the
112        // per-deployment TCP buffer knob when set, otherwise fall back to the netstack default.
113        let netstack_config = netstack_config_from(config.tcp_buffer_size);
114
115        let dataplane = DataplaneActor::spawn(env.clone());
116
117        let (netstack_id, netstack_up, netstack_down) =
118            dataplane.ask(dataplane::NewOverlayTransport).await?;
119
120        // A second overlay transport feeds the dedicated any-IP forwarder netstack. Inbound packets
121        // for advertised subnet routes / the exit-node default route are routed here (see
122        // `route_updater`), keeping forwarded flows off the application netstack.
123        let (forwarder_id, forwarder_up, forwarder_down) =
124            dataplane.ask(dataplane::NewOverlayTransport).await?;
125
126        let multiderp = Multiderp::spawn((env.clone(), dataplane.clone()));
127
128        // Spawn the direct (disco) underlay manager before the route updater. Its `on_start`
129        // binds the UDP socket and registers its transport synchronously, so by the time the
130        // route updater asks it for the direct transport id it is guaranteed to be available.
131        let direct = DirectManager::spawn((env.clone(), dataplane.clone(), multiderp.clone()));
132
133        // Spawn the forwarder before the route updater. Its `on_start` builds the forwarder
134        // netstack, enables any-IP acceptance, and starts the per-port accept loops synchronously,
135        // so by the time the route updater begins delivering advertised prefixes to
136        // `forwarder_id` the netstack is already draining its transport.
137        let forwarder = ForwarderActor::spawn((
138            env.clone(),
139            netstack_config.clone(),
140            forwarder_up,
141            forwarder_down,
142        ));
143        // Force `on_start` to finish (any-IP enabled, accept loops live) before the route updater
144        // can route the first inbound flow to `forwarder_id`: an `ask` blocks until the actor has
145        // started.
146        //
147        // The forwarder netstack's overlay `Channel` is reused by the TUN application path for
148        // recursive / exit-node-DoH MagicDNS forwarding (TUN mode has no application netstack of its
149        // own, but the forwarder netstack runs in both modes and egresses over the overlay — the
150        // anti-leak property `forward_query`/`forward_doh` require). Only the `tun` Tun arm consumes
151        // it, so it is unused when the `tun` feature is off — allow that without warn-as-error.
152        #[cfg_attr(not(feature = "tun"), allow(unused_variables))]
153        let (forwarder_channel,) = forwarder.ask(forwarder_actor::GetChannel).await?;
154
155        // The route updater is the single authoritative resolver of the active (resolved,
156        // fail-closed) exit node; it publishes the resolved stable id into this watch cell so
157        // `Runtime::status` can report which exit is actually engaged (not just configured).
158        let (active_exit_tx, active_exit_rx) = watch::channel(None);
159        route_updater::RouteUpdater::spawn((
160            multiderp.clone(),
161            direct.clone(),
162            env.clone(),
163            netstack_id,
164            forwarder_id,
165            active_exit_tx,
166        ));
167        packetfilter::PacketfilterUpdater::spawn(env.clone());
168        src_filter::SourceFilterUpdater::spawn(env.clone());
169        let peer_tracker = PeerTracker::spawn(env.clone()).downgrade();
170
171        // Select the application data path from the transport mode. The forwarder/egress path
172        // above is UNCHANGED in both modes — TUN mode only swaps the application data path, never
173        // the forwarder. `config` is moved into `ControlRunner::spawn` below, so branch on a
174        // borrow and clone the small `TunConfig` where needed before the move.
175        //
176        // - Netstack (the default, and the only reachable arm when the `tun` feature is off):
177        //   spawn the application netstack + MagicDNS responder + fallback-TCP registry, all on
178        //   the `netstack_up`/`netstack_down` overlay seam.
179        // - Tun: spawn `TunActor` on that same overlay seam instead; no application netstack and
180        //   no MagicDNS responder exist, and `netstack`/`fallback_tcp` are `None`.
181        // - Tun requested but built without the `tun` feature: hard-error (a config/build
182        //   mismatch knowable at spawn time). NEVER silently fall back to netstack.
183        let (netstack, fallback_tcp) = match &config.transport_mode {
184            ts_control::TransportMode::Netstack => {
185                let netstack = NetstackActor::spawn((
186                    env.clone(),
187                    netstack_config,
188                    netstack_up,
189                    netstack_down,
190                ));
191
192                // Fetch the netstack channel while we still hold the strong ActorRef, then spawn
193                // the MagicDNS responder on it. Fire-and-forget: like src_filter/route_updater,
194                // it's owned by the message bus and isn't stored on `Runtime`.
195                let (channel,) = netstack.ask(netstack_actor::GetChannel).await?;
196                // The fallback-TCP registry attaches to the application netstack — the same one
197                // that carries the embedder's explicit `Device::tcp_listen` sockets — so a
198                // fallback handler sees exactly the inbound flows no explicit listener matched.
199                let fallback_tcp = fallback_tcp::FallbackTcpManager::new(channel.clone());
200                magic_dns::MagicDnsActor::spawn((env.clone(), channel));
201
202                (Some(netstack.downgrade()), Some(fallback_tcp))
203            }
204
205            #[cfg(feature = "tun")]
206            ts_control::TransportMode::Tun(tun_cfg) => {
207                // Reuse the same `netstack_up`/`netstack_down` overlay-transport pair that would
208                // have fed the netstack — it is just the application-side overlay seam (the name
209                // is historical). No NetstackActor / MagicDnsActor is spawned.
210                tun_actor::TunActor::spawn((
211                    env.clone(),
212                    tun_cfg.clone(),
213                    netstack_up,
214                    netstack_down,
215                    // Host-route gating inputs derived from `Env`: subnet routes are only steered
216                    // into the TUN when `--accept-routes` is set, and the host `/0` only when the
217                    // embedder configured an exit node. See `tun_actor::host_routes_from_node`.
218                    tun_actor::HostRouteGating {
219                        accept_routes: env.accept_routes,
220                        exit_node_configured: env.exit_node().is_some(),
221                    },
222                    // Reuse the forwarder netstack's overlay `Channel` for recursive / exit-node-DoH
223                    // MagicDNS forwarding in the TUN datapath (TUN mode has no application netstack
224                    // Channel of its own). Egresses over the overlay — anti-leak preserved.
225                    forwarder_channel.clone(),
226                ));
227
228                (None, None)
229            }
230
231            #[cfg(not(feature = "tun"))]
232            ts_control::TransportMode::Tun(_) => {
233                return Err(Error {
234                    kind: ErrorKind::TunUnavailable,
235                    target_actor: None,
236                    message_ty: None,
237                });
238            }
239        };
240
241        // Device connection-state cell. Created here (not inside the actor) so the control runner's
242        // `on_start` can publish `Failed`/`NeedsLogin` and still return `Err` without the sender
243        // being tied to a `Self` that never gets constructed on a hard registration failure.
244        let (state_tx, state_rx) = watch::channel(DeviceState::Connecting);
245
246        let control = ControlRunner::spawn(control_runner::Params {
247            config,
248            auth_key,
249            env: env.clone(),
250            state_tx,
251        });
252
253        Ok(Self {
254            control,
255            dataplane,
256            peer_tracker,
257            fallback_tcp,
258            netstack,
259            env,
260            shutdown: shutdown_tx,
261            exit_node_tx,
262            active_exit_rx,
263            state_rx,
264        })
265    }
266
267    /// Register a fallback TCP handler consulted for every inbound TCP flow that matches no
268    /// explicit listener (`tsnet.Server.RegisterFallbackTCPHandler` parity).
269    ///
270    /// The returned [`fallback_tcp::FallbackTcpHandle`] deregisters the handler when dropped. See
271    /// [`fallback_tcp`] for the dispatch contract and anti-leak guarantees.
272    ///
273    /// Returns [`ErrorKind::UnsupportedInTunMode`] in TUN transport mode, where there is no
274    /// application netstack to attach a fallback handler to.
275    pub fn register_fallback_tcp_handler(
276        &self,
277        cb: Arc<
278            dyn Fn(core::net::SocketAddr, core::net::SocketAddr) -> fallback_tcp::FallbackDecision
279                + Send
280                + Sync,
281        >,
282    ) -> Result<fallback_tcp::FallbackTcpHandle, Error> {
283        Ok(self
284            .fallback_tcp
285            .as_ref()
286            .ok_or(Error {
287                kind: ErrorKind::UnsupportedInTunMode,
288                target_actor: None,
289                message_ty: None,
290            })?
291            .register(cb))
292    }
293
294    /// Get a channel to send commands to the netstack.
295    ///
296    /// Returns [`ErrorKind::UnsupportedInTunMode`] in TUN transport mode, where there is no
297    /// application netstack.
298    pub async fn channel(&self) -> Result<Channel, Error> {
299        let (channel,) = self
300            .netstack
301            .as_ref()
302            .ok_or(Error {
303                kind: ErrorKind::UnsupportedInTunMode,
304                target_actor: None,
305                message_ty: None,
306            })?
307            .upgrade()
308            .ok_or(Error {
309                kind: ErrorKind::ActorGone,
310                target_actor: None,
311                message_ty: None,
312            })?
313            .ask(netstack_actor::GetChannel)
314            .await?;
315
316        Ok(channel)
317    }
318
319    /// The Taildrop file store, if Taildrop is enabled (`taildrop_dir` configured and the store
320    /// initialized). `None` when disabled — fail-closed. Shared with the peerAPI Taildrop server so
321    /// the embedder's read APIs and the receive path see the same on-disk store.
322    pub fn taildrop_store(&self) -> Option<Arc<crate::taildrop::TaildropStore>> {
323        self.env.taildrop_store.clone()
324    }
325
326    /// The shared Funnel ingress slot the peerAPI `/v0/ingress` route reads per connection.
327    ///
328    /// `Device::listen_funnel` installs a [`FunnelManager`](crate::funnel::FunnelManager)'s sink here
329    /// to make the route live (the peerAPI server is already running from startup). Returns a clone of
330    /// the runtime-lifetime `Arc` so the device can write the slot without restarting the server. See
331    /// [`crate::funnel`] for the ingress data path.
332    pub fn funnel_ingress_slot(&self) -> crate::funnel::FunnelIngressSlot {
333        self.env.funnel_ingress.clone()
334    }
335
336    /// The shared "Funnel ingress listener active" flag (the same `Arc` the control session reads to
337    /// set `HostInfo.IngressEnabled`). `Device::listen_funnel` flips it `true` while a funnel listener
338    /// is up so control routes Funnel traffic to this node; clearing it advertises no live endpoint.
339    pub fn ingress_active_flag(&self) -> std::sync::Arc<std::sync::atomic::AtomicBool> {
340        self.env.ingress_active.clone()
341    }
342
343    /// Install (`Some`) or clear (`None`) the debug packet-capture hook on the running dataplane.
344    /// `Some(hook)` tees every plaintext packet crossing the datapath to `hook` until it is cleared;
345    /// `None` stops capture. Mirrors Go `tstun.Wrapper.InstallCaptureHook` / `ClearCaptureSink`.
346    pub async fn install_capture(
347        &self,
348        hook: Option<ts_dataplane::CaptureHook>,
349    ) -> Result<(), Error> {
350        self.dataplane
351            .ask(dataplane::InstallCapture { hook })
352            .await
353            .map_err(Into::into)
354    }
355
356    /// A snapshot of the local netmap: this node plus every known peer.
357    ///
358    /// Combines the self node held by the control runner with the peer set held by the peer
359    /// tracker. Mirrors tsnet's `LocalClient::Status`.
360    ///
361    /// `self_node` is `None` until the first netmap update has been received from control. Peer
362    /// entries carry no online/user/capability data (see the [`status`] module docs for that gap).
363    pub async fn status(&self) -> Result<Status, Error> {
364        let self_node = self
365            .control
366            .ask(control_runner::SelfNode)
367            .await?
368            .as_ref()
369            .map(StatusNode::from_node);
370
371        let peers = self
372            .peer_tracker
373            .upgrade()
374            .ok_or(Error {
375                kind: ErrorKind::ActorGone,
376                target_actor: None,
377                message_ty: None,
378            })?
379            .ask(peer_tracker::GetStatus)
380            .await?;
381
382        Ok(Status {
383            self_node,
384            peers,
385            active_exit_node: self.active_exit_node(),
386        })
387    }
388
389    /// The stable id of the exit node traffic is currently egressing through, or `None` if none is
390    /// engaged. This is the route updater's resolved + fail-closed answer (see
391    /// [`Status::active_exit_node`](crate::status::Status::active_exit_node)): it differs from the
392    /// configured [`exit_node`](Self::exit_node) selector, which may name a peer that is absent or
393    /// no longer advertising a default route (in which case egress is dropped and this returns
394    /// `None`).
395    pub fn active_exit_node(&self) -> Option<ts_control::StableNodeId> {
396        self.active_exit_rx.borrow().clone()
397    }
398
399    /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
400    ///
401    /// Returns the signed JWT, or the token RPC's own [`ts_control::IdTokenError`]. The kameo
402    /// delegated-reply send error is flattened: a handler error carries the real `IdTokenError`,
403    /// any other send failure (actor shutdown / mailbox closed) is surfaced as
404    /// [`ts_control::IdTokenError::NetworkError`].
405    pub async fn fetch_id_token(
406        &self,
407        audience: String,
408    ) -> Result<String, ts_control::IdTokenError> {
409        self.control
410            .ask(control_runner::FetchIdToken { audience })
411            .await
412            .map_err(flatten_send_err)
413    }
414
415    /// Log this node out of the tailnet: deregister it by expiring its current node key.
416    ///
417    /// Forwards to the control runner, which re-POSTs `/machine/register` with a past expiry over a
418    /// fresh Noise channel. This is a control-plane state change only — it does NOT shut the runtime
419    /// down (the caller follows with [`graceful_shutdown`](Self::graceful_shutdown)) and does not
420    /// touch the on-disk node key. The kameo delegated-reply send error is flattened the same way as
421    /// [`fetch_id_token`](Self::fetch_id_token): a handler error carries the real
422    /// [`ts_control::LogoutError`]; any other send failure (actor shutdown / mailbox closed) is
423    /// surfaced as [`ts_control::LogoutError::NetworkError`].
424    pub async fn logout(&self) -> Result<(), ts_control::LogoutError> {
425        self.control
426            .ask(control_runner::Logout)
427            .await
428            .map_err(flatten_logout_send_err)
429    }
430
431    /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` (`acme` feature).
432    ///
433    /// Mirrors [`fetch_id_token`](Self::fetch_id_token): forwards to the control runner, which runs
434    /// the client-side ACME DNS-01 flow on a spawned task and publishes the challenge TXT via the
435    /// node's set-dns RPC. The kameo delegated-reply send error is flattened — a handler error
436    /// carries the real [`ts_control::CertError`]; any other send failure (actor shutdown / mailbox
437    /// closed) is surfaced as a [`ts_control::CertError::Io`]. SaaS-only: a self-hosted control
438    /// plane 501s on set-dns.
439    #[cfg(feature = "acme")]
440    pub async fn get_certificate(
441        &self,
442        name: String,
443    ) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
444        self.control
445            .ask(control_runner::GetCertificate { name })
446            .await
447            .map_err(flatten_cert_send_err)
448    }
449
450    /// Resolve which node owns a tailnet source address.
451    ///
452    /// Maps the source IP of `addr` to its owning node. Mirrors tsnet's `LocalClient::WhoIs`.
453    /// Returns `None` if no peer holds that tailnet IP. The returned [`WhoIs`] carries no
454    /// user/login or capability data in this fork (see the [`status`] module docs).
455    pub async fn whois(&self, addr: core::net::SocketAddr) -> Result<Option<WhoIs>, Error> {
456        self.peer_tracker
457            .upgrade()
458            .ok_or(Error {
459                kind: ErrorKind::ActorGone,
460                target_actor: None,
461                message_ty: None,
462            })?
463            .ask(peer_tracker::Whois { addr })
464            .await
465            .map_err(Into::into)
466    }
467
468    /// Change the selected exit node at runtime (the equivalent of Go `tsnet`'s
469    /// `LocalClient.EditPrefs(ExitNodeID/ExitNodeIP)`), without recreating the device.
470    ///
471    /// Updates the live exit-node selector, then asks the peer tracker to re-broadcast the current
472    /// peer set so the route updater and source filter re-resolve the new selector immediately.
473    /// `None` clears the exit node (internet-bound traffic is then dropped, fail-closed, unless this
474    /// node egresses directly). The selection is re-resolved against the live peer set, so passing a
475    /// selector for a peer not yet in the netmap simply takes effect once that peer appears.
476    pub async fn set_exit_node(
477        &self,
478        selector: Option<ts_control::ExitNodeSelector>,
479    ) -> Result<(), Error> {
480        // Update the live cell every reader borrows from. `send_replace` keeps the value current
481        // even with no active receivers (none can have dropped while the runtime is up, but it is
482        // the right non-failing primitive here).
483        self.exit_node_tx.send_replace(selector);
484
485        // Trigger an immediate re-resolution: the route updater (outbound routes + DoH delegation)
486        // and the source filter (inbound validation) both recompute on an `Arc<PeerState>`, so a
487        // re-broadcast applies the new exit without waiting for the next netmap update.
488        self.peer_tracker
489            .upgrade()
490            .ok_or(Error {
491                kind: ErrorKind::ActorGone,
492                target_actor: None,
493                message_ty: None,
494            })?
495            .ask(peer_tracker::RepublishState)
496            .await
497            .map_err(Into::into)
498    }
499
500    /// The currently-selected exit node, or `None` if none is selected.
501    pub fn exit_node(&self) -> Option<ts_control::ExitNodeSelector> {
502        self.env.exit_node()
503    }
504
505    /// Subscribe to netmap peer-change events.
506    ///
507    /// Returns a [`watch::Receiver`] whose value is the current set of peer [`StatusNode`]s,
508    /// updated on every netmap state update from control. Mirrors tsnet's `WatchIPNBus`. Await
509    /// [`watch::Receiver::changed`](tokio::sync::watch::Receiver::changed) to react to peers
510    /// joining, leaving, or changing.
511    pub async fn watch_netmap(&self) -> Result<watch::Receiver<Vec<StatusNode>>, Error> {
512        self.peer_tracker
513            .upgrade()
514            .ok_or(Error {
515                kind: ErrorKind::ActorGone,
516                target_actor: None,
517                message_ty: None,
518            })?
519            .ask(peer_tracker::WatchNetmap)
520            .await
521            .map_err(Into::into)
522    }
523
524    /// The current device connection-[`DeviceState`].
525    pub fn device_state(&self) -> DeviceState {
526        self.state_rx.borrow().clone()
527    }
528
529    /// Watch the device connection-[`DeviceState`] (`Connecting` → `Running` / `NeedsLogin` /
530    /// `Expired` / `Failed`).
531    ///
532    /// Returns a [`watch::Receiver`]; await
533    /// [`changed`](tokio::sync::watch::Receiver::changed) to react push-style to control connection
534    /// transitions instead of polling [`status`](Self::status). The initial value is the current
535    /// state. Note: a transient per-reconnect dip back to `Connecting` is **not** currently
536    /// emitted (control transparently reconnects below this layer); the state reflects registration
537    /// outcome and node-key expiry.
538    pub fn watch_state(&self) -> watch::Receiver<DeviceState> {
539        self.state_rx.clone()
540    }
541
542    /// Wait until the device finishes registering, returning a typed outcome.
543    ///
544    /// Resolves `Ok(())` once the device reaches [`DeviceState::Running`]. Returns a typed
545    /// [`RegistrationError`] otherwise — the actionable distinction between "retry", "re-pair", and
546    /// "drive interactive login" that replaces polling [`ipv4_addr`](Self::ipv4_addr) in a loop:
547    /// - `AuthRejected` — bad/expired/unknown auth key. **Permanent** (re-pair).
548    /// - `NeedsLogin(url)` — interactive authorization required (no usable auth key). **Not
549    ///   permanent**: the runtime keeps retrying and will reach `Running` once the user authorizes
550    ///   the URL. An **auth-key** caller should treat this as a failure; an **interactive** caller
551    ///   should ignore this return and instead drive the flow via [`watch_state`](Self::watch_state)
552    ///   (this method returns the URL eagerly rather than blocking for the whole login).
553    /// - `NetworkUnreachable` — control unreachable. **Transient** (retry).
554    /// - `Timeout` — no settled state within `timeout`.
555    ///
556    /// `KeyExpired` is not produced by this initial wait (a node key expires only *after* it has
557    /// come up); observe post-registration expiry via [`watch_state`](Self::watch_state).
558    /// `timeout` of `None` waits indefinitely for a settled state.
559    pub async fn wait_until_running(
560        &self,
561        timeout: Option<Duration>,
562    ) -> Result<(), RegistrationError> {
563        device_state::wait_for_running(self.state_rx.clone(), timeout).await
564    }
565
566    /// Attempt to shut down the runtime gracefully.
567    ///
568    /// Returns false if the shutdown timed out. It is still shut down if it timed out, just
569    /// more violently and with possible resource leaks.
570    pub async fn graceful_shutdown(self, timeout: Option<Duration>) -> bool {
571        self.shutdown.send_replace(true);
572
573        async fn _shutdown_all(runtime: Runtime) {
574            // See the note in `Drop` for why we only need to stop these actors to bring down the
575            // whole runtime.
576
577            let _ignore = runtime.control.stop_gracefully().await;
578            let _ignore = runtime.dataplane.stop_gracefully().await;
579            let _ignore = runtime.env.bus.stop_gracefully().await;
580
581            tokio::join![
582                runtime.control.wait_for_shutdown(),
583                runtime.dataplane.wait_for_shutdown(),
584                runtime.env.bus.wait_for_shutdown(),
585            ];
586        }
587
588        let fut = _shutdown_all(self);
589
590        match timeout {
591            Some(timeout) => tokio::time::timeout(timeout, fut).await.is_ok(),
592            None => {
593                fut.await;
594                true
595            }
596        }
597    }
598}
599
600impl Drop for Runtime {
601    fn drop(&mut self) {
602        // We must have already run `graceful_shutdown`: on the happy path, this does nothing, but
603        // if it timed out, we need to make sure the actors are dead so we don't leak them and their
604        // dependents.
605        if *self.shutdown.borrow() {
606            self.control.kill();
607            self.dataplane.kill();
608            self.env.bus.kill();
609            return;
610        }
611
612        self.shutdown.send_replace(true);
613
614        // Actors shut down when the last ActorRef to them is dropped (as nothing can send them
615        // messages anymore). If we don't hold an ActorRef in Runtime, in general the only thing
616        // that has one is the MessageBus, which each actor subscribes to for a subset of messages.
617        // Hence, if we shut down the bus, most actors die as well.
618
619        // First shut down the actors we have an ActorRef to:
620        try_shutdown(&self.control);
621        try_shutdown(&self.dataplane);
622
623        // Then shutdown the message bus, stopping the rest of the actors:
624        try_shutdown(&self.env.bus);
625    }
626}
627
628fn try_shutdown(a: &ActorRef<impl kameo::Actor>) {
629    if let Err(e) = a.mailbox_sender().try_send(Signal::Stop) {
630        tracing::error!(error = %e, "graceful shutdown failed, killing actor");
631        a.kill();
632    }
633}
634
635/// Build the netstack config shared by both userspace netstacks (application + forwarder) from the
636/// per-deployment `tcp_buffer_size` knob.
637///
638/// `None` keeps the netstack default (256 KiB/direction); `Some(n)` overrides it (e.g. a smaller
639/// window on a memory-constrained exit node forwarding many concurrent flows — see
640/// [`netstack::netcore::Config::tcp_buffer_size`]). Factored out of [`Runtime::spawn`] so the
641/// None-default / Some-override mapping is unit-testable without standing up the actor system.
642fn netstack_config_from(tcp_buffer_size: Option<usize>) -> netstack::netcore::Config {
643    let mut c = netstack::netcore::Config::default();
644    if let Some(tcp_buffer_size) = tcp_buffer_size {
645        c.tcp_buffer_size = tcp_buffer_size;
646    }
647    c
648}
649
650/// Flatten a kameo delegated-reply [`SendError`] for the id-token RPC into the RPC's own
651/// [`ts_control::IdTokenError`].
652///
653/// A [`SendError::HandlerError`](kameo::error::SendError::HandlerError) carries the real
654/// `IdTokenError` produced by the handler and is surfaced verbatim. Any other send failure (actor
655/// not running / stopped, mailbox full, send timeout) is a delivery problem rather than an RPC
656/// result, so it collapses to a transient [`ts_control::IdTokenError::NetworkError`]. Factored out
657/// of [`Runtime::fetch_id_token`] so this mapping is unit-testable without standing up an actor.
658fn flatten_send_err<M>(
659    e: kameo::error::SendError<M, ts_control::IdTokenError>,
660) -> ts_control::IdTokenError {
661    match e {
662        kameo::error::SendError::HandlerError(err) => err,
663        _ => ts_control::IdTokenError::NetworkError,
664    }
665}
666
667/// Flatten a kameo `SendError` from the `Logout` ask into a [`ts_control::LogoutError`].
668///
669/// A `HandlerError` carries the real `LogoutError` from the control RPC and is surfaced verbatim;
670/// any other send failure (actor not running / stopped, mailbox full, send timeout) — a delivery
671/// problem, not a logout result — collapses to the transient [`ts_control::LogoutError::NetworkError`]
672/// (logout is idempotent, so a retry after a delivery failure is safe). Factored out of
673/// [`Runtime::logout`] so the mapping is unit-testable without standing up an actor.
674fn flatten_logout_send_err<M>(
675    e: kameo::error::SendError<M, ts_control::LogoutError>,
676) -> ts_control::LogoutError {
677    match e {
678        kameo::error::SendError::HandlerError(err) => err,
679        _ => ts_control::LogoutError::NetworkError,
680    }
681}
682
683/// Flatten a kameo `SendError` from the `GetCertificate` ask into a [`ts_control::CertError`].
684///
685/// A `HandlerError` carries the real `CertError` produced by the ACME issuance and is surfaced
686/// verbatim. `CertError` has no transient-network variant, so any other send failure (actor not
687/// running / stopped, mailbox full, send timeout) — a delivery problem rather than an issuance
688/// result — collapses to a [`ts_control::CertError::Io`]. Factored out of
689/// [`Runtime::get_certificate`] so this mapping is unit-testable without standing up an actor.
690#[cfg(feature = "acme")]
691fn flatten_cert_send_err<M>(
692    e: kameo::error::SendError<M, ts_control::CertError>,
693) -> ts_control::CertError {
694    match e {
695        kameo::error::SendError::HandlerError(err) => err,
696        _ => ts_control::CertError::Io(std::io::Error::other(
697            "control runner unavailable for certificate issuance",
698        )),
699    }
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705
706    /// `None` must leave the netstack's own default TCP window in place (the 256 KiB throughput
707    /// default), and must not silently coerce to some other value.
708    #[test]
709    fn netstack_config_none_uses_netstack_default() {
710        let default = netstack::netcore::Config::default();
711        let built = netstack_config_from(None);
712        assert_eq!(
713            built.tcp_buffer_size, default.tcp_buffer_size,
714            "None must inherit the netstack default TCP buffer size"
715        );
716    }
717
718    /// `Some(n)` must override the TCP window (the memory-vs-throughput knob exit-node operators
719    /// reach for), reaching the config that both netstacks are built from.
720    #[test]
721    fn netstack_config_some_overrides_buffer() {
722        let built = netstack_config_from(Some(64 * 1024));
723        assert_eq!(
724            built.tcp_buffer_size,
725            64 * 1024,
726            "Some(n) must override the TCP buffer size that both netstacks use"
727        );
728    }
729
730    /// A `HandlerError` carries the real `IdTokenError` from the RPC handler and must pass through
731    /// verbatim, not be flattened to a generic network error. Using an `Internal(_)` payload (not
732    /// `NetworkError`) makes the passthrough observable: a buggy flatten that always returned
733    /// `NetworkError` would fail this assertion.
734    #[test]
735    fn flatten_send_err_handler_error_passes_through() {
736        // Build an `Internal(_)` payload via the public `From<Utf8Error>` conversion (no extra
737        // deps): it is distinct from the `_ => NetworkError` fallback, so a buggy flatten that
738        // always returned `NetworkError` would fail this assertion.
739        // Route the invalid bytes through a runtime Vec so the `invalid_from_utf8` lint (which only
740        // fires on compile-time-known literals) doesn't flag this intentional bad input.
741        let bytes = vec![0xffu8, 0xfe];
742        let utf8_err = core::str::from_utf8(&bytes).unwrap_err();
743        let inner = ts_control::IdTokenError::from(utf8_err);
744        assert!(matches!(inner, ts_control::IdTokenError::Internal(_)));
745        let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
746            kameo::error::SendError::HandlerError(inner.clone());
747        assert_eq!(flatten_send_err(e), inner);
748    }
749
750    /// A non-handler send failure (actor stopped) is a delivery problem, not an RPC result, so it
751    /// must collapse to a transient `NetworkError`.
752    #[test]
753    fn flatten_send_err_actor_stopped_is_network_error() {
754        let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
755            kameo::error::SendError::ActorStopped;
756        assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
757    }
758
759    /// `ActorNotRunning` (the message bounces back undelivered) is likewise a delivery failure and
760    /// must map to a transient `NetworkError`.
761    #[test]
762    fn flatten_send_err_actor_not_running_is_network_error() {
763        let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
764            kameo::error::SendError::ActorNotRunning(control_runner::FetchIdToken {
765                audience: "sts.amazonaws.com".to_string(),
766            });
767        assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
768    }
769
770    /// A `HandlerError` from the logout RPC carries the real `LogoutError` and must pass through
771    /// verbatim. An `Internal(_)` payload (distinct from the `_ => NetworkError` fallback) makes the
772    /// passthrough observable.
773    #[test]
774    fn flatten_logout_send_err_handler_error_passes_through() {
775        let inner = ts_control::LogoutError::Internal(ts_control::LogoutInternalErrorKind::Http);
776        assert!(matches!(inner, ts_control::LogoutError::Internal(_)));
777        let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
778            kameo::error::SendError::HandlerError(inner.clone());
779        assert_eq!(flatten_logout_send_err(e), inner);
780    }
781
782    /// A non-handler send failure (actor stopped) is a delivery problem, not a logout result, and
783    /// collapses to a transient `NetworkError` (logout is idempotent, so a retry is safe).
784    #[test]
785    fn flatten_logout_send_err_actor_stopped_is_network_error() {
786        let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
787            kameo::error::SendError::ActorStopped;
788        assert_eq!(
789            flatten_logout_send_err(e),
790            ts_control::LogoutError::NetworkError
791        );
792    }
793}