ts_runtime/lib.rs
1#![doc = include_str!("../README.md")]
2
3extern crate ts_netstack_smoltcp as netstack;
4
5use core::time::Duration;
6use std::sync::Arc;
7
8use kameo::{
9 actor::{ActorRef, Spawn, WeakActorRef},
10 mailbox::Signal,
11};
12use netstack::netcore::Channel;
13use tokio::sync::watch;
14
15use crate::{
16 control_runner::ControlRunner, dataplane::DataplaneActor, direct::DirectManager,
17 forwarder_actor::ForwarderActor, multiderp::Multiderp, netstack_actor::NetstackActor,
18};
19
20/// Pcap stream framer for debug packet capture (`CapturePcap`).
21pub mod capture;
22/// Control runner.
23pub mod control_runner;
24mod dataplane;
25mod derp_latency;
26/// Device connection-state tracking ([`DeviceState`]) and typed registration outcome
27/// ([`RegistrationError`]).
28pub mod device_state;
29mod direct;
30mod env;
31mod error;
32/// Fallback TCP handler registry (`tsnet.Server.RegisterFallbackTCPHandler` parity).
33pub mod fallback_tcp;
34mod forwarder_actor;
35/// Client-side Funnel ingress termination (`tsnet`'s `ListenFunnel` data path).
36pub mod funnel;
37mod magic_dns;
38mod multiderp;
39mod netstack_actor;
40mod packetfilter;
41pub mod peer_tracker;
42mod peerapi;
43mod peerapi_doh;
44mod route_updater;
45/// Stored Serve config + accept-loop runtime (`tsnet`'s `Get/SetServeConfig` + serving runtime).
46pub mod serve;
47mod src_filter;
48/// Netmap status snapshot, WhoIs, and watcher types.
49pub mod status;
50/// Taildrop peer-to-peer file transfer store.
51pub mod taildrop;
52pub mod taildrop_send;
53#[cfg(feature = "tun")]
54mod tun_actor;
55
56pub use device_state::{DeviceState, RegistrationError};
57pub(crate) use env::Env;
58pub use error::{Error, ErrorKind};
59pub use status::{Status, StatusNode, WhoIs};
60pub use ts_dataplane::{CaptureHook, CapturePath};
61
62use crate::peer_tracker::PeerTracker;
63
64/// The runtime for a tailscale device.
65pub struct Runtime {
66 /// Reference to the control actor.
67 pub control: ActorRef<ControlRunner>,
68 dataplane: ActorRef<DataplaneActor>,
69 /// Reference to the application netstack actor. `None` in TUN transport mode, where there is
70 /// no userspace application netstack (the application data path is a real kernel TUN device).
71 netstack: Option<WeakActorRef<NetstackActor>>,
72 /// Reference to the peer tracker for peer lookups.
73 pub peer_tracker: WeakActorRef<PeerTracker>,
74 /// Fallback TCP handler registry, bound to the application netstack. `None` in TUN transport
75 /// mode (no application netstack exists to attach it to).
76 fallback_tcp: Option<fallback_tcp::FallbackTcpManager>,
77 env: Env,
78 shutdown: watch::Sender<bool>,
79 /// Sender side of the exit-node selector `watch` cell. Held privately here (not on the cloned
80 /// `Env`, which keeps only the read side) so that only `Runtime::set_exit_node` can mutate the
81 /// selection; the route updater and source filter re-read it via [`Env::exit_node`].
82 exit_node_tx: watch::Sender<Option<ts_control::ExitNodeSelector>>,
83 /// Receiver mirroring the *active* (resolved + fail-closed) exit node's stable id, fed by the
84 /// route updater. Read by [`Runtime::status`] / [`Runtime::active_exit_node`] to report which
85 /// exit node traffic is actually egressing through (vs. the merely-configured selector).
86 active_exit_rx: watch::Receiver<Option<ts_control::StableNodeId>>,
87 /// Receiver for the device connection-state cell, fed by the control runner. Read by
88 /// [`Runtime::watch_state`] and [`Runtime::wait_until_running`].
89 state_rx: watch::Receiver<DeviceState>,
90}
91
92impl Runtime {
93 /// Spawn a new runtime with the given parameters for connecting to a tailnet.
94 pub async fn spawn(
95 config: ts_control::Config,
96 auth_key: Option<String>,
97 keys: ts_keys::NodeState,
98 ) -> Result<Self, Error> {
99 let (shutdown_tx, shutdown_rx) = watch::channel(false);
100
101 // The exit-node selector is a live `watch` cell so `Device::set_exit_node` can change it at
102 // runtime. `new_with_exit_tx` returns the `Sender` (mutation capability) separately so it is
103 // retained privately on the `Runtime`, while only the `Receiver` (the readers' contract)
104 // lives on the cloned `Env`. The initial value comes from `ForwarderConfig.exit_node`.
105 let (env, exit_node_tx) = Env::new_with_exit_tx(
106 keys,
107 shutdown_rx,
108 env::ForwarderConfig::from_control_config(&config),
109 );
110
111 // Both userspace netstacks (application + forwarder) share one netstack config. Honor the
112 // per-deployment TCP buffer knob when set, otherwise fall back to the netstack default.
113 let netstack_config = netstack_config_from(config.tcp_buffer_size);
114
115 let dataplane = DataplaneActor::spawn(env.clone());
116
117 let (netstack_id, netstack_up, netstack_down) =
118 dataplane.ask(dataplane::NewOverlayTransport).await?;
119
120 // A second overlay transport feeds the dedicated any-IP forwarder netstack. Inbound packets
121 // for advertised subnet routes / the exit-node default route are routed here (see
122 // `route_updater`), keeping forwarded flows off the application netstack.
123 let (forwarder_id, forwarder_up, forwarder_down) =
124 dataplane.ask(dataplane::NewOverlayTransport).await?;
125
126 let multiderp = Multiderp::spawn((env.clone(), dataplane.clone()));
127
128 // Spawn the direct (disco) underlay manager before the route updater. Its `on_start`
129 // binds the UDP socket and registers its transport synchronously, so by the time the
130 // route updater asks it for the direct transport id it is guaranteed to be available.
131 let direct = DirectManager::spawn((env.clone(), dataplane.clone(), multiderp.clone()));
132
133 // Spawn the forwarder before the route updater. Its `on_start` builds the forwarder
134 // netstack, enables any-IP acceptance, and starts the per-port accept loops synchronously,
135 // so by the time the route updater begins delivering advertised prefixes to
136 // `forwarder_id` the netstack is already draining its transport.
137 let forwarder = ForwarderActor::spawn((
138 env.clone(),
139 netstack_config.clone(),
140 forwarder_up,
141 forwarder_down,
142 ));
143 // Force `on_start` to finish (any-IP enabled, accept loops live) before the route updater
144 // can route the first inbound flow to `forwarder_id`: an `ask` blocks until the actor has
145 // started.
146 //
147 // The forwarder netstack's overlay `Channel` is reused by the TUN application path for
148 // recursive / exit-node-DoH MagicDNS forwarding (TUN mode has no application netstack of its
149 // own, but the forwarder netstack runs in both modes and egresses over the overlay — the
150 // anti-leak property `forward_query`/`forward_doh` require). Only the `tun` Tun arm consumes
151 // it, so it is unused when the `tun` feature is off — allow that without warn-as-error.
152 #[cfg_attr(not(feature = "tun"), allow(unused_variables))]
153 let (forwarder_channel,) = forwarder.ask(forwarder_actor::GetChannel).await?;
154
155 // The route updater is the single authoritative resolver of the active (resolved,
156 // fail-closed) exit node; it publishes the resolved stable id into this watch cell so
157 // `Runtime::status` can report which exit is actually engaged (not just configured).
158 let (active_exit_tx, active_exit_rx) = watch::channel(None);
159 route_updater::RouteUpdater::spawn((
160 multiderp.clone(),
161 direct.clone(),
162 env.clone(),
163 netstack_id,
164 forwarder_id,
165 active_exit_tx,
166 ));
167 packetfilter::PacketfilterUpdater::spawn(env.clone());
168 src_filter::SourceFilterUpdater::spawn(env.clone());
169 let peer_tracker = PeerTracker::spawn(env.clone()).downgrade();
170
171 // Select the application data path from the transport mode. The forwarder/egress path
172 // above is UNCHANGED in both modes — TUN mode only swaps the application data path, never
173 // the forwarder. `config` is moved into `ControlRunner::spawn` below, so branch on a
174 // borrow and clone the small `TunConfig` where needed before the move.
175 //
176 // - Netstack (the default, and the only reachable arm when the `tun` feature is off):
177 // spawn the application netstack + MagicDNS responder + fallback-TCP registry, all on
178 // the `netstack_up`/`netstack_down` overlay seam.
179 // - Tun: spawn `TunActor` on that same overlay seam instead; no application netstack and
180 // no MagicDNS responder exist, and `netstack`/`fallback_tcp` are `None`.
181 // - Tun requested but built without the `tun` feature: hard-error (a config/build
182 // mismatch knowable at spawn time). NEVER silently fall back to netstack.
183 let (netstack, fallback_tcp) = match &config.transport_mode {
184 ts_control::TransportMode::Netstack => {
185 let netstack = NetstackActor::spawn((
186 env.clone(),
187 netstack_config,
188 netstack_up,
189 netstack_down,
190 ));
191
192 // Fetch the netstack channel while we still hold the strong ActorRef, then spawn
193 // the MagicDNS responder on it. Fire-and-forget: like src_filter/route_updater,
194 // it's owned by the message bus and isn't stored on `Runtime`.
195 let (channel,) = netstack.ask(netstack_actor::GetChannel).await?;
196 // The fallback-TCP registry attaches to the application netstack — the same one
197 // that carries the embedder's explicit `Device::tcp_listen` sockets — so a
198 // fallback handler sees exactly the inbound flows no explicit listener matched.
199 let fallback_tcp = fallback_tcp::FallbackTcpManager::new(channel.clone());
200 magic_dns::MagicDnsActor::spawn((env.clone(), channel));
201
202 (Some(netstack.downgrade()), Some(fallback_tcp))
203 }
204
205 #[cfg(feature = "tun")]
206 ts_control::TransportMode::Tun(tun_cfg) => {
207 // Reuse the same `netstack_up`/`netstack_down` overlay-transport pair that would
208 // have fed the netstack — it is just the application-side overlay seam (the name
209 // is historical). No NetstackActor / MagicDnsActor is spawned.
210 tun_actor::TunActor::spawn((
211 env.clone(),
212 tun_cfg.clone(),
213 netstack_up,
214 netstack_down,
215 // Host-route gating inputs derived from `Env`: subnet routes are only steered
216 // into the TUN when `--accept-routes` is set, and the host `/0` only when the
217 // embedder configured an exit node. See `tun_actor::host_routes_from_node`.
218 tun_actor::HostRouteGating {
219 accept_routes: env.accept_routes,
220 exit_node_configured: env.exit_node().is_some(),
221 },
222 // Reuse the forwarder netstack's overlay `Channel` for recursive / exit-node-DoH
223 // MagicDNS forwarding in the TUN datapath (TUN mode has no application netstack
224 // Channel of its own). Egresses over the overlay — anti-leak preserved.
225 forwarder_channel.clone(),
226 ));
227
228 (None, None)
229 }
230
231 #[cfg(not(feature = "tun"))]
232 ts_control::TransportMode::Tun(_) => {
233 return Err(Error {
234 kind: ErrorKind::TunUnavailable,
235 target_actor: None,
236 message_ty: None,
237 });
238 }
239 };
240
241 // Device connection-state cell. Created here (not inside the actor) so the control runner's
242 // `on_start` can publish `Failed`/`NeedsLogin` and still return `Err` without the sender
243 // being tied to a `Self` that never gets constructed on a hard registration failure.
244 let (state_tx, state_rx) = watch::channel(DeviceState::Connecting);
245
246 let control = ControlRunner::spawn(control_runner::Params {
247 config,
248 auth_key,
249 env: env.clone(),
250 state_tx,
251 });
252
253 Ok(Self {
254 control,
255 dataplane,
256 peer_tracker,
257 fallback_tcp,
258 netstack,
259 env,
260 shutdown: shutdown_tx,
261 exit_node_tx,
262 active_exit_rx,
263 state_rx,
264 })
265 }
266
267 /// Register a fallback TCP handler consulted for every inbound TCP flow that matches no
268 /// explicit listener (`tsnet.Server.RegisterFallbackTCPHandler` parity).
269 ///
270 /// The returned [`fallback_tcp::FallbackTcpHandle`] deregisters the handler when dropped. See
271 /// [`fallback_tcp`] for the dispatch contract and anti-leak guarantees.
272 ///
273 /// Returns [`ErrorKind::UnsupportedInTunMode`] in TUN transport mode, where there is no
274 /// application netstack to attach a fallback handler to.
275 pub fn register_fallback_tcp_handler(
276 &self,
277 cb: Arc<
278 dyn Fn(core::net::SocketAddr, core::net::SocketAddr) -> fallback_tcp::FallbackDecision
279 + Send
280 + Sync,
281 >,
282 ) -> Result<fallback_tcp::FallbackTcpHandle, Error> {
283 Ok(self
284 .fallback_tcp
285 .as_ref()
286 .ok_or(Error {
287 kind: ErrorKind::UnsupportedInTunMode,
288 target_actor: None,
289 message_ty: None,
290 })?
291 .register(cb))
292 }
293
294 /// Get a channel to send commands to the netstack.
295 ///
296 /// Returns [`ErrorKind::UnsupportedInTunMode`] in TUN transport mode, where there is no
297 /// application netstack.
298 pub async fn channel(&self) -> Result<Channel, Error> {
299 let (channel,) = self
300 .netstack
301 .as_ref()
302 .ok_or(Error {
303 kind: ErrorKind::UnsupportedInTunMode,
304 target_actor: None,
305 message_ty: None,
306 })?
307 .upgrade()
308 .ok_or(Error {
309 kind: ErrorKind::ActorGone,
310 target_actor: None,
311 message_ty: None,
312 })?
313 .ask(netstack_actor::GetChannel)
314 .await?;
315
316 Ok(channel)
317 }
318
319 /// The Taildrop file store, if Taildrop is enabled (`taildrop_dir` configured and the store
320 /// initialized). `None` when disabled — fail-closed. Shared with the peerAPI Taildrop server so
321 /// the embedder's read APIs and the receive path see the same on-disk store.
322 pub fn taildrop_store(&self) -> Option<Arc<crate::taildrop::TaildropStore>> {
323 self.env.taildrop_store.clone()
324 }
325
326 /// The shared Funnel ingress slot the peerAPI `/v0/ingress` route reads per connection.
327 ///
328 /// `Device::listen_funnel` installs a [`FunnelManager`](crate::funnel::FunnelManager)'s sink here
329 /// to make the route live (the peerAPI server is already running from startup). Returns a clone of
330 /// the runtime-lifetime `Arc` so the device can write the slot without restarting the server. See
331 /// [`crate::funnel`] for the ingress data path.
332 pub fn funnel_ingress_slot(&self) -> crate::funnel::FunnelIngressSlot {
333 self.env.funnel_ingress.clone()
334 }
335
336 /// The shared "Funnel ingress listener active" flag (the same `Arc` the control session reads to
337 /// set `HostInfo.IngressEnabled`). `Device::listen_funnel` flips it `true` while a funnel listener
338 /// is up so control routes Funnel traffic to this node; clearing it advertises no live endpoint.
339 pub fn ingress_active_flag(&self) -> std::sync::Arc<std::sync::atomic::AtomicBool> {
340 self.env.ingress_active.clone()
341 }
342
343 /// Install (`Some`) or clear (`None`) the debug packet-capture hook on the running dataplane.
344 /// `Some(hook)` tees every plaintext packet crossing the datapath to `hook` until it is cleared;
345 /// `None` stops capture. Mirrors Go `tstun.Wrapper.InstallCaptureHook` / `ClearCaptureSink`.
346 pub async fn install_capture(
347 &self,
348 hook: Option<ts_dataplane::CaptureHook>,
349 ) -> Result<(), Error> {
350 self.dataplane
351 .ask(dataplane::InstallCapture { hook })
352 .await
353 .map_err(Into::into)
354 }
355
356 /// A snapshot of the local netmap: this node plus every known peer.
357 ///
358 /// Combines the self node held by the control runner with the peer set held by the peer
359 /// tracker. Mirrors tsnet's `LocalClient::Status`.
360 ///
361 /// `self_node` is `None` until the first netmap update has been received from control. Peer
362 /// entries carry no online/user/capability data (see the [`status`] module docs for that gap).
363 pub async fn status(&self) -> Result<Status, Error> {
364 let self_node = self
365 .control
366 .ask(control_runner::SelfNode)
367 .await?
368 .as_ref()
369 .map(StatusNode::from_node);
370
371 let peers = self
372 .peer_tracker
373 .upgrade()
374 .ok_or(Error {
375 kind: ErrorKind::ActorGone,
376 target_actor: None,
377 message_ty: None,
378 })?
379 .ask(peer_tracker::GetStatus)
380 .await?;
381
382 Ok(Status {
383 self_node,
384 peers,
385 active_exit_node: self.active_exit_node(),
386 })
387 }
388
389 /// The stable id of the exit node traffic is currently egressing through, or `None` if none is
390 /// engaged. This is the route updater's resolved + fail-closed answer (see
391 /// [`Status::active_exit_node`](crate::status::Status::active_exit_node)): it differs from the
392 /// configured [`exit_node`](Self::exit_node) selector, which may name a peer that is absent or
393 /// no longer advertising a default route (in which case egress is dropped and this returns
394 /// `None`).
395 pub fn active_exit_node(&self) -> Option<ts_control::StableNodeId> {
396 self.active_exit_rx.borrow().clone()
397 }
398
399 /// Request an OIDC ID token from control scoped to `audience` (workload-identity federation).
400 ///
401 /// Returns the signed JWT, or the token RPC's own [`ts_control::IdTokenError`]. The kameo
402 /// delegated-reply send error is flattened: a handler error carries the real `IdTokenError`,
403 /// any other send failure (actor shutdown / mailbox closed) is surfaced as
404 /// [`ts_control::IdTokenError::NetworkError`].
405 pub async fn fetch_id_token(
406 &self,
407 audience: String,
408 ) -> Result<String, ts_control::IdTokenError> {
409 self.control
410 .ask(control_runner::FetchIdToken { audience })
411 .await
412 .map_err(flatten_send_err)
413 }
414
415 /// Log this node out of the tailnet: deregister it by expiring its current node key.
416 ///
417 /// Forwards to the control runner, which re-POSTs `/machine/register` with a past expiry over a
418 /// fresh Noise channel. This is a control-plane state change only — it does NOT shut the runtime
419 /// down (the caller follows with [`graceful_shutdown`](Self::graceful_shutdown)) and does not
420 /// touch the on-disk node key. The kameo delegated-reply send error is flattened the same way as
421 /// [`fetch_id_token`](Self::fetch_id_token): a handler error carries the real
422 /// [`ts_control::LogoutError`]; any other send failure (actor shutdown / mailbox closed) is
423 /// surfaced as [`ts_control::LogoutError::NetworkError`].
424 pub async fn logout(&self) -> Result<(), ts_control::LogoutError> {
425 self.control
426 .ask(control_runner::Logout)
427 .await
428 .map_err(flatten_logout_send_err)
429 }
430
431 /// Issue a real Let's Encrypt certificate for this node's MagicDNS `name` (`acme` feature).
432 ///
433 /// Mirrors [`fetch_id_token`](Self::fetch_id_token): forwards to the control runner, which runs
434 /// the client-side ACME DNS-01 flow on a spawned task and publishes the challenge TXT via the
435 /// node's set-dns RPC. The kameo delegated-reply send error is flattened — a handler error
436 /// carries the real [`ts_control::CertError`]; any other send failure (actor shutdown / mailbox
437 /// closed) is surfaced as a [`ts_control::CertError::Io`]. SaaS-only: a self-hosted control
438 /// plane 501s on set-dns.
439 #[cfg(feature = "acme")]
440 pub async fn get_certificate(
441 &self,
442 name: String,
443 ) -> Result<ts_control::tls::CertifiedKey, ts_control::CertError> {
444 self.control
445 .ask(control_runner::GetCertificate { name })
446 .await
447 .map_err(flatten_cert_send_err)
448 }
449
450 /// Resolve which node owns a tailnet source address.
451 ///
452 /// Maps the source IP of `addr` to its owning node. Mirrors tsnet's `LocalClient::WhoIs`.
453 /// Returns `None` if no peer holds that tailnet IP. The returned [`WhoIs`] carries no
454 /// user/login or capability data in this fork (see the [`status`] module docs).
455 pub async fn whois(&self, addr: core::net::SocketAddr) -> Result<Option<WhoIs>, Error> {
456 self.peer_tracker
457 .upgrade()
458 .ok_or(Error {
459 kind: ErrorKind::ActorGone,
460 target_actor: None,
461 message_ty: None,
462 })?
463 .ask(peer_tracker::Whois { addr })
464 .await
465 .map_err(Into::into)
466 }
467
468 /// Change the selected exit node at runtime (the equivalent of Go `tsnet`'s
469 /// `LocalClient.EditPrefs(ExitNodeID/ExitNodeIP)`), without recreating the device.
470 ///
471 /// Updates the live exit-node selector, then asks the peer tracker to re-broadcast the current
472 /// peer set so the route updater and source filter re-resolve the new selector immediately.
473 /// `None` clears the exit node (internet-bound traffic is then dropped, fail-closed, unless this
474 /// node egresses directly). The selection is re-resolved against the live peer set, so passing a
475 /// selector for a peer not yet in the netmap simply takes effect once that peer appears.
476 pub async fn set_exit_node(
477 &self,
478 selector: Option<ts_control::ExitNodeSelector>,
479 ) -> Result<(), Error> {
480 // Update the live cell every reader borrows from. `send_replace` keeps the value current
481 // even with no active receivers (none can have dropped while the runtime is up, but it is
482 // the right non-failing primitive here).
483 self.exit_node_tx.send_replace(selector);
484
485 // Trigger an immediate re-resolution: the route updater (outbound routes + DoH delegation)
486 // and the source filter (inbound validation) both recompute on an `Arc<PeerState>`, so a
487 // re-broadcast applies the new exit without waiting for the next netmap update.
488 self.peer_tracker
489 .upgrade()
490 .ok_or(Error {
491 kind: ErrorKind::ActorGone,
492 target_actor: None,
493 message_ty: None,
494 })?
495 .ask(peer_tracker::RepublishState)
496 .await
497 .map_err(Into::into)
498 }
499
500 /// The currently-selected exit node, or `None` if none is selected.
501 pub fn exit_node(&self) -> Option<ts_control::ExitNodeSelector> {
502 self.env.exit_node()
503 }
504
505 /// Subscribe to netmap peer-change events.
506 ///
507 /// Returns a [`watch::Receiver`] whose value is the current set of peer [`StatusNode`]s,
508 /// updated on every netmap state update from control. Mirrors tsnet's `WatchIPNBus`. Await
509 /// [`watch::Receiver::changed`](tokio::sync::watch::Receiver::changed) to react to peers
510 /// joining, leaving, or changing.
511 pub async fn watch_netmap(&self) -> Result<watch::Receiver<Vec<StatusNode>>, Error> {
512 self.peer_tracker
513 .upgrade()
514 .ok_or(Error {
515 kind: ErrorKind::ActorGone,
516 target_actor: None,
517 message_ty: None,
518 })?
519 .ask(peer_tracker::WatchNetmap)
520 .await
521 .map_err(Into::into)
522 }
523
524 /// The current device connection-[`DeviceState`].
525 pub fn device_state(&self) -> DeviceState {
526 self.state_rx.borrow().clone()
527 }
528
529 /// Watch the device connection-[`DeviceState`] (`Connecting` → `Running` / `NeedsLogin` /
530 /// `Expired` / `Failed`).
531 ///
532 /// Returns a [`watch::Receiver`]; await
533 /// [`changed`](tokio::sync::watch::Receiver::changed) to react push-style to control connection
534 /// transitions instead of polling [`status`](Self::status). The initial value is the current
535 /// state. Note: a transient per-reconnect dip back to `Connecting` is **not** currently
536 /// emitted (control transparently reconnects below this layer); the state reflects registration
537 /// outcome and node-key expiry.
538 pub fn watch_state(&self) -> watch::Receiver<DeviceState> {
539 self.state_rx.clone()
540 }
541
542 /// Wait until the device finishes registering, returning a typed outcome.
543 ///
544 /// Resolves `Ok(())` once the device reaches [`DeviceState::Running`]. Returns a typed
545 /// [`RegistrationError`] otherwise — the actionable distinction between "retry", "re-pair", and
546 /// "drive interactive login" that replaces polling [`ipv4_addr`](Self::ipv4_addr) in a loop:
547 /// - `AuthRejected` — bad/expired/unknown auth key. **Permanent** (re-pair).
548 /// - `NeedsLogin(url)` — interactive authorization required (no usable auth key). **Not
549 /// permanent**: the runtime keeps retrying and will reach `Running` once the user authorizes
550 /// the URL. An **auth-key** caller should treat this as a failure; an **interactive** caller
551 /// should ignore this return and instead drive the flow via [`watch_state`](Self::watch_state)
552 /// (this method returns the URL eagerly rather than blocking for the whole login).
553 /// - `NetworkUnreachable` — control unreachable. **Transient** (retry).
554 /// - `Timeout` — no settled state within `timeout`.
555 ///
556 /// `KeyExpired` is not produced by this initial wait (a node key expires only *after* it has
557 /// come up); observe post-registration expiry via [`watch_state`](Self::watch_state).
558 /// `timeout` of `None` waits indefinitely for a settled state.
559 pub async fn wait_until_running(
560 &self,
561 timeout: Option<Duration>,
562 ) -> Result<(), RegistrationError> {
563 device_state::wait_for_running(self.state_rx.clone(), timeout).await
564 }
565
566 /// Attempt to shut down the runtime gracefully.
567 ///
568 /// Returns false if the shutdown timed out. It is still shut down if it timed out, just
569 /// more violently and with possible resource leaks.
570 pub async fn graceful_shutdown(self, timeout: Option<Duration>) -> bool {
571 self.shutdown.send_replace(true);
572
573 async fn _shutdown_all(runtime: Runtime) {
574 // See the note in `Drop` for why we only need to stop these actors to bring down the
575 // whole runtime.
576
577 let _ignore = runtime.control.stop_gracefully().await;
578 let _ignore = runtime.dataplane.stop_gracefully().await;
579 let _ignore = runtime.env.bus.stop_gracefully().await;
580
581 tokio::join![
582 runtime.control.wait_for_shutdown(),
583 runtime.dataplane.wait_for_shutdown(),
584 runtime.env.bus.wait_for_shutdown(),
585 ];
586 }
587
588 let fut = _shutdown_all(self);
589
590 match timeout {
591 Some(timeout) => tokio::time::timeout(timeout, fut).await.is_ok(),
592 None => {
593 fut.await;
594 true
595 }
596 }
597 }
598}
599
600impl Drop for Runtime {
601 fn drop(&mut self) {
602 // We must have already run `graceful_shutdown`: on the happy path, this does nothing, but
603 // if it timed out, we need to make sure the actors are dead so we don't leak them and their
604 // dependents.
605 if *self.shutdown.borrow() {
606 self.control.kill();
607 self.dataplane.kill();
608 self.env.bus.kill();
609 return;
610 }
611
612 self.shutdown.send_replace(true);
613
614 // Actors shut down when the last ActorRef to them is dropped (as nothing can send them
615 // messages anymore). If we don't hold an ActorRef in Runtime, in general the only thing
616 // that has one is the MessageBus, which each actor subscribes to for a subset of messages.
617 // Hence, if we shut down the bus, most actors die as well.
618
619 // First shut down the actors we have an ActorRef to:
620 try_shutdown(&self.control);
621 try_shutdown(&self.dataplane);
622
623 // Then shutdown the message bus, stopping the rest of the actors:
624 try_shutdown(&self.env.bus);
625 }
626}
627
628fn try_shutdown(a: &ActorRef<impl kameo::Actor>) {
629 if let Err(e) = a.mailbox_sender().try_send(Signal::Stop) {
630 tracing::error!(error = %e, "graceful shutdown failed, killing actor");
631 a.kill();
632 }
633}
634
635/// Build the netstack config shared by both userspace netstacks (application + forwarder) from the
636/// per-deployment `tcp_buffer_size` knob.
637///
638/// `None` keeps the netstack default (256 KiB/direction); `Some(n)` overrides it (e.g. a smaller
639/// window on a memory-constrained exit node forwarding many concurrent flows — see
640/// [`netstack::netcore::Config::tcp_buffer_size`]). Factored out of [`Runtime::spawn`] so the
641/// None-default / Some-override mapping is unit-testable without standing up the actor system.
642fn netstack_config_from(tcp_buffer_size: Option<usize>) -> netstack::netcore::Config {
643 let mut c = netstack::netcore::Config::default();
644 if let Some(tcp_buffer_size) = tcp_buffer_size {
645 c.tcp_buffer_size = tcp_buffer_size;
646 }
647 c
648}
649
650/// Flatten a kameo delegated-reply [`SendError`] for the id-token RPC into the RPC's own
651/// [`ts_control::IdTokenError`].
652///
653/// A [`SendError::HandlerError`](kameo::error::SendError::HandlerError) carries the real
654/// `IdTokenError` produced by the handler and is surfaced verbatim. Any other send failure (actor
655/// not running / stopped, mailbox full, send timeout) is a delivery problem rather than an RPC
656/// result, so it collapses to a transient [`ts_control::IdTokenError::NetworkError`]. Factored out
657/// of [`Runtime::fetch_id_token`] so this mapping is unit-testable without standing up an actor.
658fn flatten_send_err<M>(
659 e: kameo::error::SendError<M, ts_control::IdTokenError>,
660) -> ts_control::IdTokenError {
661 match e {
662 kameo::error::SendError::HandlerError(err) => err,
663 _ => ts_control::IdTokenError::NetworkError,
664 }
665}
666
667/// Flatten a kameo `SendError` from the `Logout` ask into a [`ts_control::LogoutError`].
668///
669/// A `HandlerError` carries the real `LogoutError` from the control RPC and is surfaced verbatim;
670/// any other send failure (actor not running / stopped, mailbox full, send timeout) — a delivery
671/// problem, not a logout result — collapses to the transient [`ts_control::LogoutError::NetworkError`]
672/// (logout is idempotent, so a retry after a delivery failure is safe). Factored out of
673/// [`Runtime::logout`] so the mapping is unit-testable without standing up an actor.
674fn flatten_logout_send_err<M>(
675 e: kameo::error::SendError<M, ts_control::LogoutError>,
676) -> ts_control::LogoutError {
677 match e {
678 kameo::error::SendError::HandlerError(err) => err,
679 _ => ts_control::LogoutError::NetworkError,
680 }
681}
682
683/// Flatten a kameo `SendError` from the `GetCertificate` ask into a [`ts_control::CertError`].
684///
685/// A `HandlerError` carries the real `CertError` produced by the ACME issuance and is surfaced
686/// verbatim. `CertError` has no transient-network variant, so any other send failure (actor not
687/// running / stopped, mailbox full, send timeout) — a delivery problem rather than an issuance
688/// result — collapses to a [`ts_control::CertError::Io`]. Factored out of
689/// [`Runtime::get_certificate`] so this mapping is unit-testable without standing up an actor.
690#[cfg(feature = "acme")]
691fn flatten_cert_send_err<M>(
692 e: kameo::error::SendError<M, ts_control::CertError>,
693) -> ts_control::CertError {
694 match e {
695 kameo::error::SendError::HandlerError(err) => err,
696 _ => ts_control::CertError::Io(std::io::Error::other(
697 "control runner unavailable for certificate issuance",
698 )),
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705
706 /// `None` must leave the netstack's own default TCP window in place (the 256 KiB throughput
707 /// default), and must not silently coerce to some other value.
708 #[test]
709 fn netstack_config_none_uses_netstack_default() {
710 let default = netstack::netcore::Config::default();
711 let built = netstack_config_from(None);
712 assert_eq!(
713 built.tcp_buffer_size, default.tcp_buffer_size,
714 "None must inherit the netstack default TCP buffer size"
715 );
716 }
717
718 /// `Some(n)` must override the TCP window (the memory-vs-throughput knob exit-node operators
719 /// reach for), reaching the config that both netstacks are built from.
720 #[test]
721 fn netstack_config_some_overrides_buffer() {
722 let built = netstack_config_from(Some(64 * 1024));
723 assert_eq!(
724 built.tcp_buffer_size,
725 64 * 1024,
726 "Some(n) must override the TCP buffer size that both netstacks use"
727 );
728 }
729
730 /// A `HandlerError` carries the real `IdTokenError` from the RPC handler and must pass through
731 /// verbatim, not be flattened to a generic network error. Using an `Internal(_)` payload (not
732 /// `NetworkError`) makes the passthrough observable: a buggy flatten that always returned
733 /// `NetworkError` would fail this assertion.
734 #[test]
735 fn flatten_send_err_handler_error_passes_through() {
736 // Build an `Internal(_)` payload via the public `From<Utf8Error>` conversion (no extra
737 // deps): it is distinct from the `_ => NetworkError` fallback, so a buggy flatten that
738 // always returned `NetworkError` would fail this assertion.
739 // Route the invalid bytes through a runtime Vec so the `invalid_from_utf8` lint (which only
740 // fires on compile-time-known literals) doesn't flag this intentional bad input.
741 let bytes = vec![0xffu8, 0xfe];
742 let utf8_err = core::str::from_utf8(&bytes).unwrap_err();
743 let inner = ts_control::IdTokenError::from(utf8_err);
744 assert!(matches!(inner, ts_control::IdTokenError::Internal(_)));
745 let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
746 kameo::error::SendError::HandlerError(inner.clone());
747 assert_eq!(flatten_send_err(e), inner);
748 }
749
750 /// A non-handler send failure (actor stopped) is a delivery problem, not an RPC result, so it
751 /// must collapse to a transient `NetworkError`.
752 #[test]
753 fn flatten_send_err_actor_stopped_is_network_error() {
754 let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
755 kameo::error::SendError::ActorStopped;
756 assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
757 }
758
759 /// `ActorNotRunning` (the message bounces back undelivered) is likewise a delivery failure and
760 /// must map to a transient `NetworkError`.
761 #[test]
762 fn flatten_send_err_actor_not_running_is_network_error() {
763 let e: kameo::error::SendError<control_runner::FetchIdToken, ts_control::IdTokenError> =
764 kameo::error::SendError::ActorNotRunning(control_runner::FetchIdToken {
765 audience: "sts.amazonaws.com".to_string(),
766 });
767 assert_eq!(flatten_send_err(e), ts_control::IdTokenError::NetworkError);
768 }
769
770 /// A `HandlerError` from the logout RPC carries the real `LogoutError` and must pass through
771 /// verbatim. An `Internal(_)` payload (distinct from the `_ => NetworkError` fallback) makes the
772 /// passthrough observable.
773 #[test]
774 fn flatten_logout_send_err_handler_error_passes_through() {
775 let inner = ts_control::LogoutError::Internal(ts_control::LogoutInternalErrorKind::Http);
776 assert!(matches!(inner, ts_control::LogoutError::Internal(_)));
777 let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
778 kameo::error::SendError::HandlerError(inner.clone());
779 assert_eq!(flatten_logout_send_err(e), inner);
780 }
781
782 /// A non-handler send failure (actor stopped) is a delivery problem, not a logout result, and
783 /// collapses to a transient `NetworkError` (logout is idempotent, so a retry is safe).
784 #[test]
785 fn flatten_logout_send_err_actor_stopped_is_network_error() {
786 let e: kameo::error::SendError<control_runner::Logout, ts_control::LogoutError> =
787 kameo::error::SendError::ActorStopped;
788 assert_eq!(
789 flatten_logout_send_err(e),
790 ts_control::LogoutError::NetworkError
791 );
792 }
793}