geiserx_ts_runtime 0.29.0

tailscale runtime
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
use std::sync::Arc;

use kameo::{
    actor::{ActorRef, Spawn},
    message::Message,
};
use kameo_actors::message_bus::{MessageBus, Publish, Register};
use tokio::sync::watch;

use crate::{Error, error::ResultExt};

/// The forwarding / routing preferences that flow from [`ts_control::Config`] into the runtime's
/// dataplane actors, grouped into one named-field struct.
///
/// These are pure client-side dataplane preferences: `ts_control` does not read them (control
/// always sends the full advertised route set; the runtime trims and forwards). Grouping them
/// here removes the positional-argument hazard of the old `Env::new` — in particular the two
/// adjacent `bool`s [`forward_all_ports`](ForwarderConfig::forward_all_ports) and
/// [`forward_exit_egress`](ForwarderConfig::forward_exit_egress), which a positional constructor
/// could silently swap. Named fields make a swap a compile error.
#[derive(Clone)]
pub struct ForwarderConfig {
    /// Whether to accept subnet routes advertised by peers (`--accept-routes` / `RouteAll`).
    ///
    /// Fixed for the life of the runtime. Consulted by the route updater (outbound routing) and
    /// the source filter (inbound source validation), which must agree so a peer can only source
    /// traffic from subnets we actually route to it.
    pub accept_routes: bool,

    /// Whether to accept the tailnet's DNS configuration (`--accept-dns` / `CorpDNS`).
    ///
    /// The initial value seeds a live `watch` cell (mirroring [`accept_routes`](ForwarderConfig::accept_routes)):
    /// `Device::set_accept_dns` toggles it at runtime and the MagicDNS responder re-reads it via
    /// [`Env::accept_dns`](Env::accept_dns) on every view rebuild. When `false`, the responder ignores
    /// the control-pushed DNS config and serves nothing (`REFUSED`), mirroring Go applying an empty
    /// `dns.Config` when `CorpDNS` is off. See [`ts_control::Config::accept_dns`].
    pub accept_dns: bool,

    /// Which peer (if any) is selected as this node's exit node (`ExitNodeID`).
    ///
    /// Fixed for the life of the runtime, but the selector is *unresolved*: the route updater and
    /// the source filter each call [`ExitNodeSelector::resolve`](ts_control::ExitNodeSelector::resolve)
    /// against the live peer set on every rebuild, so an IP/name selection follows the peer across
    /// netmap changes. Because `resolve` is deterministic, both actors resolve to the same stable
    /// id and stay coupled: only the selected exit peer gets a default route installed (outbound)
    /// and may legitimately source arbitrary internet IPs (inbound). `None` (or an unresolvable
    /// selector) means no exit node — internet-bound traffic is dropped (fail-closed).
    pub exit_node: Option<ts_control::ExitNodeSelector>,

    /// The set of prefixes the inbound forwarder accepts and dials to real OS sockets.
    ///
    /// This is exactly [`Config::advertised_routes`](ts_control::Config::advertised_routes): we
    /// forward precisely what we advertise (advertise == forward), so there is no prefix we
    /// advertise but won't forward (which would be a black hole) and none we forward but didn't
    /// advertise. v4-only (IPv6-off posture). Empty means "subnet-router/exit-node forwarding
    /// disabled" — the forwarder netstack still exists but its route table is empty.
    pub forward_routes: Vec<ipnet::IpNet>,

    /// TCP ports the inbound forwarder splices per advertised route. See
    /// [`Config::forward_tcp_ports`](ts_control::Config::forward_tcp_ports).
    pub forward_tcp_ports: Vec<u16>,

    /// UDP ports the inbound forwarder splices per advertised route. See
    /// [`Config::forward_udp_ports`](ts_control::Config::forward_udp_ports).
    pub forward_udp_ports: Vec<u16>,

    /// Whether the inbound forwarder forwards **all** TCP/UDP ports per advertised route.
    ///
    /// When `true`, the explicit [`forward_tcp_ports`](ForwarderConfig::forward_tcp_ports) /
    /// [`forward_udp_ports`](ForwarderConfig::forward_udp_ports) sets are ignored and the forwarder
    /// runs in all-port mode (driven by a raw-socket port observer). See
    /// [`Config::forward_all_ports`](ts_control::Config::forward_all_ports).
    pub forward_all_ports: bool,

    /// Whether exit-node (`0.0.0.0/0`) inbound flows are egressed via this host's real origin IP.
    ///
    /// Anti-leak opt-in. When `false` (the default, fail-closed), the forwarder is wired with a
    /// dialer that structurally refuses exit-node egress, so a `0.0.0.0/0` flow is dropped at dial
    /// time rather than leaking out our real IP. See
    /// [`Config::forward_exit_egress`](ts_control::Config::forward_exit_egress).
    pub forward_exit_egress: bool,

    /// Shields-up: when `true`, the packet-filter updater wraps the live filter so inbound packets
    /// destined to this node's own addresses are dropped (refuse inbound peer connections).
    /// See [`Config::block_incoming`](ts_control::Config::block_incoming).
    pub block_incoming: bool,

    /// Optional upstream proxy that exit-node egress is routed through (product capability beyond
    /// strict tsnet parity — residential-proxy egress).
    ///
    /// Only consulted when [`forward_exit_egress`](ForwarderConfig::forward_exit_egress) is `true`.
    /// When `Some`, the forwarder is wired with a [`ProxyExitDialer`](ts_forwarder::ProxyExitDialer)
    /// that tunnels exit-node flows through the proxy and **fails closed** (never falls back to a
    /// direct host-IP dial). When `None`, exit egress (if enabled) uses this host's real IP. This is
    /// already the `ts_forwarder` type: the conversion from the transport-only
    /// [`ts_control::ExitProxyConfig`] happens in [`from_control_config`](ForwarderConfig::from_control_config),
    /// since `ts_control` must not depend on `ts_forwarder`.
    pub exit_proxy: Option<ts_forwarder::ProxyConfig>,

    /// The IPv4 peerAPI port this node binds to serve exit-node DoH (`/dns-query`) to peers, if any.
    ///
    /// See [`Config::peerapi_port`](ts_control::Config::peerapi_port). `None` (the default) means
    /// this node advertises no peerAPI service and runs no DoH server — peers can't use it as a DNS
    /// proxy. The same value is advertised (`PeerApi4` service) and used to bind the server, so the
    /// advertised port always matches the actual bind.
    pub peerapi_port: Option<u16>,

    /// Filesystem directory received Taildrop files land in, or `None` to disable Taildrop.
    ///
    /// See [`Config::taildrop_dir`](ts_control::Config::taildrop_dir). When `Some`, the runtime
    /// constructs the [`TaildropStore`](crate::taildrop::TaildropStore) from it; the store is then
    /// served on the shared peerAPI listener (only if [`peerapi_port`](ForwarderConfig::peerapi_port)
    /// is also set) and exposed to the embedder's read APIs. `None` (the default) is fail-closed:
    /// no store, no Taildrop server.
    pub taildrop_dir: Option<std::path::PathBuf>,

    /// Whether IPv6 is enabled on the tailnet overlay. Defaults to `false` (IPv4-only).
    ///
    /// See [`Config::enable_ipv6`](ts_control::Config::enable_ipv6). Governs the underlay socket
    /// bind, disco candidate filtering, netstack overlay-address assignment, and MagicDNS AAAA
    /// handling. It NEVER governs the forwarder exit/subnet egress path, which stays IPv4-only
    /// regardless to uphold the real-origin-IP isolation invariant.
    pub enable_ipv6: bool,

    /// The WireGuard persistent-keepalive interval applied to every peer, or `None` to disable.
    ///
    /// See [`Config::persistent_keepalive_interval`](ts_control::Config::persistent_keepalive_interval).
    /// Threaded into the dataplane actor, which sets it on every upserted
    /// [`ts_tunnel::PeerConfig`] so an idle (typically DERP-relayed) session keeps its path warm and
    /// doesn't age out and wedge the next dial.
    pub persistent_keepalive_interval: Option<std::time::Duration>,

    /// The shared "Funnel ingress listener active" flag, the same `Arc` as
    /// [`Config::ingress_active`](ts_control::Config::ingress_active).
    ///
    /// `Device::listen_funnel` flips this `true` when its listener starts (and the dropped manager
    /// flips it back `false`); the control session reads it on each map request to set
    /// `HostInfo.IngressEnabled`. Cloned from the control config at `from_control_config` so the
    /// runtime and `ts_control` share one flag.
    pub ingress_active: std::sync::Arc<std::sync::atomic::AtomicBool>,
}

impl ForwarderConfig {
    /// Extract the runtime's forwarding preferences from a [`ts_control::Config`].
    ///
    /// `ts_control::Config` carries these dataplane fields for transport only (it never reads
    /// them); this is the boundary where they are grouped for the runtime.
    pub fn from_control_config(config: &ts_control::Config) -> Self {
        Self {
            accept_routes: config.accept_routes,
            accept_dns: config.accept_dns,
            exit_node: config.exit_node.clone(),
            forward_routes: config.advertised_routes(),
            forward_tcp_ports: config.forward_tcp_ports.clone(),
            forward_udp_ports: config.forward_udp_ports.clone(),
            forward_all_ports: config.forward_all_ports,
            forward_exit_egress: config.forward_exit_egress,
            block_incoming: config.block_incoming,
            exit_proxy: config.exit_proxy.as_ref().map(exit_proxy_to_forwarder),
            peerapi_port: config.peerapi_port,
            taildrop_dir: config.taildrop_dir.clone(),
            enable_ipv6: config.enable_ipv6,
            persistent_keepalive_interval: config.persistent_keepalive_interval,
            ingress_active: config.ingress_active.clone(),
        }
    }
}

/// Convert the transport-only [`ts_control::ExitProxyConfig`] into the [`ts_forwarder::ProxyConfig`]
/// the dialer consumes. This boundary exists because `ts_control` must not depend on `ts_forwarder`
/// (they are independent crates joined only here in `ts_runtime`).
fn exit_proxy_to_forwarder(cfg: &ts_control::ExitProxyConfig) -> ts_forwarder::ProxyConfig {
    ts_forwarder::ProxyConfig {
        addr: cfg.addr,
        scheme: match cfg.scheme {
            ts_control::ExitProxyScheme::Socks5 => ts_forwarder::ProxyScheme::Socks5,
            ts_control::ExitProxyScheme::HttpConnect => ts_forwarder::ProxyScheme::HttpConnect,
        },
        auth: cfg.auth.clone(),
    }
}

/// The sender (mutation) halves of the runtime-mutable preference `watch` cells, returned by
/// [`Env::new_with_runtime_txs`] grouped in one named struct.
///
/// Each `Sender` is the *mutation capability* for a live preference the runtime can change without
/// recreating the device; it is retained privately on the `Runtime` while only the matching
/// `Receiver` is cloned into the `Env` the actors hold (the readers' contract). Grouping them as a
/// named-field struct (rather than a positional tuple) means adding a new preference cell is an
/// additive field, and the two same-typed `bool` senders ([`accept_routes`](RuntimePrefCells::accept_routes)
/// and [`accept_dns`](RuntimePrefCells::accept_dns)) cannot be silently transposed at the call site.
pub struct RuntimePrefCells {
    /// Sender for the exit-node selector cell; mutated by `Device::set_exit_node`.
    pub exit_node: watch::Sender<Option<ts_control::ExitNodeSelector>>,
    /// Sender for the accept-routes preference cell; toggled by `Device::set_accept_routes`.
    pub accept_routes: watch::Sender<bool>,
    /// Sender for the accept-dns preference cell; toggled by `Device::set_accept_dns`.
    pub accept_dns: watch::Sender<bool>,
}

#[derive(Clone)]
pub struct Env {
    pub bus: ActorRef<MessageBus>,
    pub keys: Arc<ts_keys::NodeState>,

    /// Whether to accept subnet routes advertised by peers (`--accept-routes` / `RouteAll`).
    ///
    /// A live cell rather than a snapshot (mirroring [`exit_node_rx`](Env::exit_node_rx)):
    /// `Device::set_accept_routes` updates the backing [`watch::Sender`] (held privately on the
    /// runtime, not here) at runtime, and both readers (the route updater and the source filter)
    /// re-read it via [`Env::accept_routes`](Env::accept_routes) on their next recompute, so the
    /// preference can change without recreating the device. `accept-routes` is a purely *local*
    /// preference (unlike advertised routes it is never reported to control), so flipping it only
    /// re-runs the local route/source-filter recompute. See [`ForwarderConfig::accept_routes`].
    pub accept_routes_rx: watch::Receiver<bool>,

    /// Whether to accept the tailnet's DNS configuration (`--accept-dns` / `CorpDNS`).
    ///
    /// A live cell rather than a snapshot (mirroring [`accept_routes_rx`](Env::accept_routes_rx)):
    /// `Device::set_accept_dns` updates the backing [`watch::Sender`] (held privately on the runtime,
    /// not here) at runtime, and the MagicDNS responder re-reads it via [`Env::accept_dns`](Env::accept_dns)
    /// on every view rebuild, so the preference can change without recreating the device. Like
    /// `accept-routes` it is a purely *local* preference (never reported to control): flipping it only
    /// re-runs the local view rebuild that re-applies the gate. See [`ForwarderConfig::accept_dns`].
    pub accept_dns_rx: watch::Receiver<bool>,

    /// Which peer (if any) is selected as this node's exit node (`ExitNodeID`).
    ///
    /// A live cell rather than a snapshot: `Device::set_exit_node` updates the backing
    /// [`watch::Sender`] (held privately on the runtime, not here) at runtime, and both readers
    /// (the route updater and the source filter) re-read it via [`Env::exit_node`](Env::exit_node)
    /// on their next recompute, so the selected exit can change without recreating the device. This
    /// is the readers' contract: `Env` is cloned into many actors, so only the read side lives here
    /// while the mutation capability stays narrowed to the runtime. See [`ForwarderConfig::exit_node`].
    pub exit_node_rx: watch::Receiver<Option<ts_control::ExitNodeSelector>>,

    /// The set of prefixes the inbound forwarder accepts and dials to real OS sockets.
    ///
    /// See [`ForwarderConfig::forward_routes`].
    pub forward_routes: Arc<Vec<ipnet::IpNet>>,

    /// TCP ports the inbound forwarder splices per advertised route. See
    /// [`ForwarderConfig::forward_tcp_ports`].
    pub forward_tcp_ports: Arc<Vec<u16>>,

    /// UDP ports the inbound forwarder splices per advertised route. See
    /// [`ForwarderConfig::forward_udp_ports`].
    pub forward_udp_ports: Arc<Vec<u16>>,

    /// Whether the inbound forwarder forwards **all** TCP/UDP ports per advertised route.
    ///
    /// See [`ForwarderConfig::forward_all_ports`].
    pub forward_all_ports: bool,

    /// Whether exit-node (`0.0.0.0/0`) inbound flows are egressed via this host's real origin IP.
    ///
    /// See [`ForwarderConfig::forward_exit_egress`].
    pub forward_exit_egress: bool,

    /// Shields-up: drop inbound peer connections terminating on this node (read by the
    /// packet-filter updater). See [`ForwarderConfig::block_incoming`].
    pub block_incoming: bool,

    /// Optional upstream proxy that exit-node egress is routed through.
    ///
    /// See [`ForwarderConfig::exit_proxy`].
    pub exit_proxy: Option<ts_forwarder::ProxyConfig>,

    /// The IPv4 peerAPI port this node binds to serve exit-node DoH to peers, if any.
    ///
    /// See [`ForwarderConfig::peerapi_port`].
    pub peerapi_port: Option<u16>,

    /// The Taildrop file store, constructed once at startup when
    /// [`ForwarderConfig::taildrop_dir`] is `Some` (and the on-disk root could be created), else
    /// `None` (Taildrop disabled — fail-closed). Shared (`Arc`) between the peerAPI Taildrop server
    /// (which writes received files) and the embedder's read APIs on the device.
    pub taildrop_store: Option<Arc<crate::taildrop::TaildropStore>>,

    /// Whether IPv6 is enabled on the tailnet overlay (default `false`, IPv4-only).
    ///
    /// See [`ForwarderConfig::enable_ipv6`]. Read by the underlay socket, disco candidate filter,
    /// netstack address assignment, and MagicDNS; never by the forwarder egress path.
    pub enable_ipv6: bool,

    /// The WireGuard persistent-keepalive interval applied to every peer, or `None` to disable.
    ///
    /// See [`ForwarderConfig::persistent_keepalive_interval`]. Read by the dataplane actor when it
    /// upserts peers.
    pub persistent_keepalive_interval: Option<std::time::Duration>,

    /// The shared "Funnel ingress listener active" flag, the same `Arc` as
    /// [`Config::ingress_active`](ts_control::Config::ingress_active).
    ///
    /// `Device::listen_funnel` flips this `true` when its listener starts; the control session reads
    /// it on each map request to set `HostInfo.IngressEnabled`. See [`ForwarderConfig::ingress_active`].
    pub ingress_active: std::sync::Arc<std::sync::atomic::AtomicBool>,

    /// The active Funnel ingress sink, shared (runtime-lifetime) between the peerAPI server and
    /// `Device::listen_funnel`.
    ///
    /// The peerAPI server (spawned at startup, before any `listen_funnel`) holds a clone of this
    /// `Arc` and reads it per connection: when a `FunnelManager` is registered (the embedder called
    /// `Device::listen_funnel`) the slot holds its [`FunnelIngressSink`](crate::funnel::FunnelIngressSink)
    /// and a `POST /v0/ingress` is membership-gated, `101`-hijacked, and pushed to the sink; when
    /// `None` (the default, no funnel listener active) the route fails closed (`404`) without
    /// hijacking. Installing the sink here at `listen_funnel` time makes the route live without
    /// restarting the peerAPI server.
    pub funnel_ingress: crate::funnel::FunnelIngressSlot,

    /// Whether the runtime is shutdown.
    ///
    /// This is provided so that actors can check whether a message send has failed because
    /// the runtime is closing, or if it's because the peer has panicked.
    ///
    /// It's not a bus message because we need a value that is guaranteed to be delivered
    /// to anyone who's interested. The bus is by definition unreliable during shutdown, so
    /// we need this independent mechanism.
    pub shutdown: watch::Receiver<bool>,
}

impl Env {
    /// The currently-selected exit-node selector, re-read live (it can change at runtime via
    /// `Device::set_exit_node`). Callers resolve it against the live peer set each time.
    pub fn exit_node(&self) -> Option<ts_control::ExitNodeSelector> {
        self.exit_node_rx.borrow().clone()
    }

    /// Whether peer-advertised subnet routes are currently accepted (`--accept-routes` / `RouteAll`),
    /// re-read live (it can change at runtime via `Device::set_accept_routes`). Both the route
    /// updater and the source filter call this on each recompute so a runtime toggle takes effect on
    /// the next netmap update / republish.
    pub fn accept_routes(&self) -> bool {
        // `bool` is `Copy`, so deref the borrow guard rather than cloning.
        *self.accept_routes_rx.borrow()
    }

    /// Whether the tailnet's DNS configuration is currently accepted (`--accept-dns` / `CorpDNS`),
    /// re-read live (it can change at runtime via `Device::set_accept_dns`). The MagicDNS responder
    /// calls this when it rebuilds its [`DnsView`](crate::magic_dns) on each control/peer update, so a
    /// runtime toggle takes effect on the next view rebuild (driven by the `set_accept_dns` republish).
    pub fn accept_dns(&self) -> bool {
        // `bool` is `Copy`, so deref the borrow guard rather than cloning.
        *self.accept_dns_rx.borrow()
    }

    /// Build an [`Env`] and the runtime-mutable preference [`watch::Sender`]s separately, so each
    /// `Sender` (the mutation capability) can be retained privately by the runtime while only the
    /// read sides (`exit_node_rx`, `accept_routes_rx`, `accept_dns_rx`) are cloned into the many
    /// actors that subscribe to `Env`. The senders are seeded from [`ForwarderConfig::exit_node`] /
    /// [`ForwarderConfig::accept_routes`] / [`ForwarderConfig::accept_dns`] and returned grouped in a
    /// [`RuntimePrefCells`] (a named struct rather than a positional tuple, so adding a future
    /// preference cell is an additive field and the same-typed `bool` senders can't be transposed).
    /// The runtime uses this; callers that never mutate these (e.g. tests) use [`Env::new`], which
    /// discards the senders.
    pub fn new_with_runtime_txs(
        keys: ts_keys::NodeState,
        shutdown: watch::Receiver<bool>,
        forwarding: ForwarderConfig,
    ) -> (Self, RuntimePrefCells) {
        let (exit_node_tx, exit_node_rx) = watch::channel(forwarding.exit_node.clone());
        let (accept_routes_tx, accept_routes_rx) = watch::channel(forwarding.accept_routes);
        let (accept_dns_tx, accept_dns_rx) = watch::channel(forwarding.accept_dns);

        let ForwarderConfig {
            // Already consumed above to seed the `watch` channel; the `Sender` is returned so the
            // runtime can hold it privately, narrowing mutation away from the cloned `Env`.
            accept_routes: _,
            // Already consumed above to seed the `watch` channel; the `Sender` is returned so the
            // runtime can hold it privately, narrowing mutation away from the cloned `Env`.
            accept_dns: _,
            // Already consumed above to seed the `watch` channel; the `Sender` is returned so the
            // runtime can hold it privately, narrowing mutation away from the cloned `Env`.
            exit_node: _,
            forward_routes,
            forward_tcp_ports,
            forward_udp_ports,
            forward_all_ports,
            forward_exit_egress,
            block_incoming,
            exit_proxy,
            peerapi_port,
            taildrop_dir,
            enable_ipv6,
            persistent_keepalive_interval,
            ingress_active,
        } = forwarding;

        // Construct the Taildrop store once when a directory is configured. A construction failure
        // (e.g. the root can't be created) is non-fatal: the store stays `None` (Taildrop disabled,
        // fail-closed) so the runtime still starts, rather than taking it down for a feature the
        // node opted into but whose directory is unusable.
        let taildrop_store = taildrop_dir.and_then(|dir| {
            match crate::taildrop::TaildropStore::new(&dir) {
                Ok(store) => Some(Arc::new(store)),
                Err(e) => {
                    tracing::error!(error = %e, dir = %dir.display(), "taildrop: store init failed; disabled");
                    None
                }
            }
        });

        let env = Self {
            bus: MessageBus::spawn_default(),
            keys: Arc::new(keys),
            shutdown,
            accept_routes_rx,
            accept_dns_rx,
            exit_node_rx,
            forward_routes: Arc::new(forward_routes),
            forward_tcp_ports: Arc::new(forward_tcp_ports),
            forward_udp_ports: Arc::new(forward_udp_ports),
            forward_all_ports,
            forward_exit_egress,
            block_incoming,
            exit_proxy,
            peerapi_port,
            taildrop_store,
            enable_ipv6,
            persistent_keepalive_interval,
            ingress_active,
            funnel_ingress: Arc::new(std::sync::Mutex::new(None)),
        };

        (
            env,
            RuntimePrefCells {
                exit_node: exit_node_tx,
                accept_routes: accept_routes_tx,
                accept_dns: accept_dns_tx,
            },
        )
    }

    /// Build an [`Env`] without retaining the runtime-mutable preference [`watch::Sender`]s — for
    /// callers that only read these and never mutate them (e.g. tests). The exit node / accept-routes
    /// are still seeded from [`ForwarderConfig`] but become immutable since the senders are dropped.
    pub fn new(
        keys: ts_keys::NodeState,
        shutdown: watch::Receiver<bool>,
        forwarding: ForwarderConfig,
    ) -> Self {
        Self::new_with_runtime_txs(keys, shutdown, forwarding).0
    }

    pub async fn subscribe<M>(&self, slf: &ActorRef<impl Message<M>>) -> Result<(), Error>
    where
        M: Clone + Send + 'static,
    {
        self.bus
            .tell(Register(slf.clone().recipient::<M>()))
            .await
            .with_actor_info(&self.bus)?;

        Ok(())
    }

    pub async fn publish<M>(&self, msg: M) -> Result<(), Error>
    where
        M: Clone + Send + 'static,
    {
        self.bus
            .tell(Publish(msg))
            .await
            .with_actor_info(&self.bus)?;

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// A minimal `ForwarderConfig` for exercising the live preference cells, seeding `accept_routes`.
    fn cfg(accept_routes: bool) -> ForwarderConfig {
        ForwarderConfig {
            accept_routes,
            accept_dns: true,
            exit_node: None,
            forward_routes: vec![],
            forward_tcp_ports: vec![],
            forward_udp_ports: vec![],
            forward_all_ports: false,
            forward_exit_egress: false,
            block_incoming: false,
            exit_proxy: None,
            peerapi_port: None,
            taildrop_dir: None,
            enable_ipv6: false,
            persistent_keepalive_interval: None,
            ingress_active: Arc::new(std::sync::atomic::AtomicBool::new(false)),
        }
    }

    fn env_with(accept_routes: bool) -> (Env, watch::Sender<bool>) {
        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
        let (env, cells) = Env::new_with_runtime_txs(
            ts_keys::NodeState::generate(),
            shutdown_rx,
            cfg(accept_routes),
        );
        (env, cells.accept_routes)
    }

    /// `Env::accept_routes()` reflects the `ForwarderConfig` seed.
    // `Env::new_with_runtime_txs` spawns the `MessageBus` actor, which needs a Tokio reactor.
    #[tokio::test]
    async fn accept_routes_seeded_from_config() {
        let (env_on, _t1) = env_with(true);
        assert!(env_on.accept_routes(), "seeded true");
        let (env_off, _t2) = env_with(false);
        assert!(!env_off.accept_routes(), "seeded false");
    }

    /// The accessor re-reads the LIVE cell: a `send_replace` on the runtime-held sender is observed
    /// by `Env::accept_routes()` (the mechanism `Device::set_accept_routes` relies on), and a clone
    /// of `Env` (as the actors hold) sees the same update.
    #[tokio::test]
    async fn accept_routes_accessor_tracks_live_toggle() {
        let (env, accept_tx) = env_with(false);
        let env_clone = env.clone();
        assert!(!env.accept_routes());

        accept_tx.send_replace(true);
        assert!(env.accept_routes(), "accessor reflects toggle ON");
        assert!(
            env_clone.accept_routes(),
            "a cloned Env (held by actors) sees the same live toggle"
        );

        accept_tx.send_replace(false);
        assert!(!env.accept_routes(), "accessor reflects toggle OFF");

        // OFF → ON again: toggling is repeatable / idempotent (not a one-way latch).
        accept_tx.send_replace(true);
        assert!(
            env.accept_routes(),
            "accessor reflects a repeated toggle ON"
        );
    }

    /// `Env::new` drops the accept-routes `Sender`, but the seeded value must survive (a `watch`
    /// `Receiver::borrow` still returns the last-sent value after the sole sender drops). The actor
    /// unit tests that build an `Env` via `Env::new` rely on this immutable-but-readable behavior.
    #[tokio::test]
    async fn accept_routes_survives_dropped_sender_via_new() {
        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
        let env = Env::new(ts_keys::NodeState::generate(), shutdown_rx, cfg(true));
        assert!(
            env.accept_routes(),
            "seed survives the dropped sender (Env::new path)"
        );
    }
}