Skip to main content

ts_runtime/peer_tracker/
mod.rs

1//! Peer delta update tracking.
2
3use std::{
4    collections::{HashMap, HashSet},
5    net::IpAddr,
6    sync::Arc,
7};
8
9use kameo::{
10    actor::ActorRef,
11    message::{Context, Message},
12    reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24/// Actor that tracks peer delta updates and emits new states.
25pub struct PeerTracker {
26    peer_db: PeerDb,
27    seen_state_update: bool,
28    pending_requests: Vec<Pending>,
29    /// Latest peer snapshot, published on every netmap update so embedders can watch for peer
30    /// changes ([`WatchNetmap`]).
31    peer_watch: watch::Sender<Vec<StatusNode>>,
32    /// Accumulated netmap user profiles (`MapResponse.UserProfiles`), keyed by user id, joined
33    /// against a node's [`Node::user_id`](ts_control::Node::user_id) to resolve the owning user's
34    /// login/display name for a [`WhoIs`](crate::status::WhoIs). Control sends these incrementally
35    /// (only new/changed profiles per response), so this map **accumulates** across updates rather
36    /// than being replaced — a peer upserted in one response may reference a profile delivered in an
37    /// earlier one.
38    user_profiles: HashMap<UserId, UserProfile>,
39    /// Tailnet-Lock (TKA) authority used to verify each peer's `key_signature` at the peer-trust
40    /// chokepoint. When `Some`, enforcement is **active**: every upserted peer must present a
41    /// signature this authority authorizes, or it is rejected (fail-closed). When `None` (always,
42    /// this wave) enforcement is **inactive** and every peer is upserted — identical to pre-TKA
43    /// behavior. There is no live `Authority` source yet: building one requires the
44    /// `/machine/tka/sync` Noise RPC + AUM-chain replayer (deferred, see SECURITY.md). The
45    /// enforcement path below is wired and unit-tested, and flips on the instant an authority is
46    /// supplied; it is explicitly gated, not a silent no-op.
47    tka_authority: Option<ts_tka::Authority>,
48    env: Env,
49}
50
51impl PeerTracker {
52    fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
53        // Canonicalization (case + trailing dot) is handled inside the name index lookup.
54        self.peer_db.get(&name).map(|(_id, node)| node)
55    }
56
57    fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
58        self.peer_db.get(&ip).map(|(_id, node)| node)
59    }
60
61    /// Build the peer entries for a [`Status`](crate::Status) snapshot from the current peer db.
62    fn status_peers(&self) -> Vec<StatusNode> {
63        self.peer_db
64            .peers()
65            .values()
66            .map(StatusNode::from_node)
67            .collect()
68    }
69
70    fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
71        let ip = crate::status::whois_addr(addr);
72        let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
73        // Join the node's owning user id against the accumulated UserProfiles table to resolve a
74        // login/display name. `None` when control sent no profile for that user (e.g. tagged nodes
75        // with no human owner, or a profile not yet delivered).
76        let user = self.resolve_user(node.user_id);
77        Some(crate::status::WhoIs::from_node_with_user(node, user))
78    }
79
80    /// Resolve a user id to its best display label from the accumulated profile table.
81    fn resolve_user(&self, user_id: UserId) -> Option<String> {
82        self.user_profiles
83            .get(&user_id)
84            .and_then(UserProfile::best_label)
85    }
86
87    /// Whether `node` may be admitted to the peer db under the current Tailnet-Lock posture.
88    ///
89    /// Fail-closed and gated:
90    /// - No [`tka_authority`](Self::tka_authority) ⇒ enforcement inactive ⇒ always admit (today's
91    ///   behavior; this is the always-taken branch this wave).
92    /// - Authority present + peer carries a `key_signature` that the authority authorizes for the
93    ///   peer's node key ⇒ admit.
94    /// - Authority present + signature missing or unauthorized/invalid ⇒ **reject** (Go denies
95    ///   network access to unsigned peers under tailnet lock; we do not upsert them).
96    fn tka_admits(&self, node: &Node) -> bool {
97        let Some(auth) = &self.tka_authority else {
98            return true;
99        };
100
101        if node.key_signature.is_empty() {
102            // TKA active but peer presented no signature: reject (Go denies network access to
103            // unsigned peers under tailnet lock, unless UnsignedPeerAPIOnly — out of scope here).
104            tracing::warn!(
105                stable_id = ?node.stable_id,
106                "TKA: rejecting unsigned peer under tailnet lock"
107            );
108            return false;
109        }
110
111        if let Err(e) = auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
112            tracing::warn!(
113                stable_id = ?node.stable_id,
114                error = %e,
115                "TKA: rejecting peer with unauthorized node key"
116            );
117            return false;
118        }
119
120        true
121    }
122}
123
124impl kameo::Actor for PeerTracker {
125    type Args = Env;
126    type Error = Error;
127
128    async fn on_start(env: Self::Args, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
129        env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
130
131        let (peer_watch, _) = watch::channel(Vec::new());
132
133        Ok(Self {
134            peer_db: PeerDb::default(),
135            pending_requests: Default::default(),
136            seen_state_update: false,
137            peer_watch,
138            user_profiles: HashMap::new(),
139            // No live TKA authority source this wave (the `/machine/tka/sync` RPC + AUM replayer are
140            // deferred); enforcement stays inactive until one is supplied. See `tka_authority`.
141            tka_authority: None,
142            env,
143        })
144    }
145}
146
147enum Pending {
148    PeerByName(PeerByName, ReplySender<Option<Node>>),
149    AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
150    TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
151    Status(ReplySender<Vec<StatusNode>>),
152    WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
153}
154
155// For messages with arguments, a struct is generated with the args as fields. They aren't
156// documented, and we can't apply attributes directly to the fields. Hence, wrap in a module where
157// docs are turned off everywhere.
158#[allow(missing_docs)]
159mod msg_impl {
160    use std::net::IpAddr;
161
162    use kameo::prelude::DelegatedReply;
163
164    use super::*;
165
166    #[kameo::messages]
167    impl PeerTracker {
168        /// Lookup a peer by name.
169        ///
170        /// Waits until we've received at least one peer update from control.
171        #[message(ctx)]
172        pub async fn peer_by_name(
173            &mut self,
174            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
175            name: String,
176        ) -> DelegatedReply<Option<Node>> {
177            let (deleg, sender) = ctx.reply_sender();
178            let Some(sender) = sender else { return deleg };
179
180            if !self.seen_state_update {
181                tracing::debug!(query = name, "no peer state seen yet, queueing request");
182
183                self.pending_requests
184                    .push(Pending::PeerByName(PeerByName { name }, sender));
185
186                return deleg;
187            }
188
189            sender.send(self.peer_by_name_opt(&name).cloned());
190
191            deleg
192        }
193
194        /// Lookup all peers that accept packets addressed to the given IP.
195        ///
196        /// This includes the peer's tailnet address and any subnet routes it provides. Only
197        /// the peers with the most specific subnet route match that covers `ip` will be
198        /// returned.
199        ///
200        /// E.g., suppose:
201        ///
202        /// - We're querying for `10.1.2.3`
203        /// - `PeerA` and `PeerB` have accepted routes for `10.1.2.0/24`
204        /// - `PeerC` has an accepted route for `10.1.0.0/16`
205        ///
206        /// Only `PeerA` and `PeerB` will be returned, since they have the most specific
207        /// prefix match.
208        #[message(ctx)]
209        pub fn peer_by_accepted_route(
210            &mut self,
211            ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
212            ip: IpAddr,
213        ) -> DelegatedReply<Vec<Node>> {
214            let (deleg, sender) = ctx.reply_sender();
215            let Some(sender) = sender else { return deleg };
216
217            if !self.seen_state_update {
218                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
219
220                self.pending_requests
221                    .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
222
223                return deleg;
224            }
225
226            sender.send(
227                self.peer_db
228                    .get_route(ip.into())
229                    .map(|(_id, node)| node.clone())
230                    .collect(),
231            );
232
233            deleg
234        }
235
236        /// Lookup the peer that has the given tailnet IP address.
237        #[message(ctx)]
238        pub fn peer_by_tailnet_ip(
239            &mut self,
240            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
241            ip: IpAddr,
242        ) -> DelegatedReply<Option<Node>> {
243            let (deleg, sender) = ctx.reply_sender();
244            let Some(sender) = sender else { return deleg };
245
246            if !self.seen_state_update {
247                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
248
249                self.pending_requests
250                    .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
251
252                return deleg;
253            }
254
255            sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
256
257            deleg
258        }
259
260        /// Build the peer entries of a [`Status`](crate::Status) snapshot.
261        ///
262        /// Returns one [`StatusNode`] per known peer. The self node is *not* included here (it
263        /// lives in the control runner); [`Runtime::status`](crate::Runtime::status) combines both.
264        ///
265        /// Waits until we've received at least one peer update from control.
266        #[message(ctx)]
267        pub fn get_status(
268            &mut self,
269            ctx: &mut Context<Self, DelegatedReply<Vec<StatusNode>>>,
270        ) -> DelegatedReply<Vec<StatusNode>> {
271            let (deleg, sender) = ctx.reply_sender();
272            let Some(sender) = sender else { return deleg };
273
274            if !self.seen_state_update {
275                tracing::debug!("no peer state seen yet, queueing status request");
276                self.pending_requests.push(Pending::Status(sender));
277                return deleg;
278            }
279
280            sender.send(self.status_peers());
281
282            deleg
283        }
284
285        /// Resolve which node owns a tailnet source address.
286        ///
287        /// Maps the source IP of `addr` to the owning node via the tailnet-IP index, returning a
288        /// [`WhoIs`](crate::WhoIs). The port is ignored (a tailnet IP uniquely identifies a node).
289        ///
290        /// The resulting [`WhoIs`](crate::WhoIs) carries no user/login or capability data: this
291        /// fork's domain [`Node`](ts_control::Node) does not retain those wire fields. See the
292        /// [`status`](crate::status) module docs for the gap.
293        ///
294        /// Waits until we've received at least one peer update from control.
295        #[message(ctx)]
296        pub fn whois(
297            &mut self,
298            ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
299            addr: std::net::SocketAddr,
300        ) -> DelegatedReply<Option<crate::status::WhoIs>> {
301            let (deleg, sender) = ctx.reply_sender();
302            let Some(sender) = sender else { return deleg };
303
304            if !self.seen_state_update {
305                tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
306                self.pending_requests
307                    .push(Pending::WhoIs(Whois { addr }, sender));
308                return deleg;
309            }
310
311            sender.send(self.whois_opt(addr));
312
313            deleg
314        }
315
316        /// Subscribe to netmap peer-change events.
317        ///
318        /// Returns a [`watch::Receiver`] whose value is the current set of peer
319        /// [`StatusNode`]s, updated on every netmap state update from control. Embedders can await
320        /// changes via [`watch::Receiver::changed`] to react to peers joining, leaving, or changing.
321        ///
322        /// The receiver's initial value is the peer set at subscription time (empty before the
323        /// first netmap update). This is a peer-only view; combine with the self node from
324        /// [`Runtime::status`](crate::Runtime::status) when a full snapshot is needed.
325        #[message(derive(Clone))]
326        pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
327            self.peer_watch.subscribe()
328        }
329    }
330}
331
332pub use msg_impl::*;
333
334#[derive(Debug, Clone)]
335pub(crate) struct PeerState {
336    #[allow(unused)]
337    pub deletions: HashSet<PeerId>,
338    #[allow(unused)]
339    pub upserts: HashSet<PeerId>,
340    pub peers: Arc<PeerDb>,
341}
342
343impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
344    type Reply = ();
345
346    async fn handle(
347        &mut self,
348        msg: Arc<ts_control::StateUpdate>,
349        _ctx: &mut Context<Self, Self::Reply>,
350    ) {
351        // Accumulate user profiles first — control sends them incrementally and a response may
352        // carry profiles with no peer delta (or peers that reference a profile from an earlier
353        // response), so this must happen before the no-peer-update early return below.
354        for profile in &msg.user_profiles {
355            self.user_profiles.insert(profile.id, profile.clone());
356        }
357
358        let Some(peer_update) = &msg.peer_update else {
359            return;
360        };
361
362        let (upserts, deletions) = self.apply_peer_update(peer_update);
363
364        tracing::debug!(
365            n_upsert = upserts.len(),
366            n_delete = deletions.len(),
367            peer_count = self.peer_db.peers().len(),
368            "new peer state"
369        );
370
371        self.service_pending_requests();
372
373        // Publish the latest peer snapshot to netmap watchers. `send_replace` keeps the receiver's
374        // value current even when there are no subscribers, so a late subscriber sees fresh state.
375        self.peer_watch.send_replace(self.status_peers());
376
377        if let Err(e) = self
378            .env
379            .publish(Arc::new(PeerState {
380                upserts,
381                deletions,
382                peers: Arc::new(self.peer_db.clone()),
383            }))
384            .await
385        {
386            tracing::error!(error = %e, "publishing peer state update");
387        }
388    }
389}
390
391/// Ask the peer tracker to re-broadcast its current peer snapshot on the bus, without any peer
392/// change. `Device::set_exit_node` sends this after changing the exit-node selector so the route
393/// updater and source filter (both `Arc<PeerState>` subscribers) re-resolve the new selector
394/// immediately, rather than waiting for the next netmap update.
395#[derive(Debug, Clone, Copy)]
396pub struct RepublishState;
397
398impl Message<RepublishState> for PeerTracker {
399    type Reply = ();
400
401    async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
402        // An empty upsert/deletion set: this is a re-broadcast of the unchanged peer set, not a
403        // delta. Subscribers recompute their routes/filters against the current peers and the
404        // (just-updated) exit-node selector.
405        if let Err(e) = self
406            .env
407            .publish(Arc::new(PeerState {
408                upserts: HashSet::default(),
409                deletions: HashSet::default(),
410                peers: Arc::new(self.peer_db.clone()),
411            }))
412            .await
413        {
414            tracing::error!(error = %e, "re-publishing peer state after exit-node change");
415        }
416    }
417}
418
419impl PeerTracker {
420    /// Apply a single [`PeerUpdate`](ts_control::PeerUpdate) to the peer db, enforcing the
421    /// Tailnet-Lock peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) at every upsert site.
422    ///
423    /// This is the **single source of truth** for the peer-trust enforcement loop: the actor's
424    /// netmap [`handle`](Message::handle) calls it, and so do the TKA enforcement tests, so the two
425    /// real upsert sites (`Full` and `Delta { upsert }`) cannot diverge from what is tested.
426    ///
427    /// Returns `(upserts, deletions)` — the [`PeerId`]s touched — for downstream bookkeeping.
428    fn apply_peer_update(
429        &mut self,
430        peer_update: &ts_control::PeerUpdate,
431    ) -> (HashSet<PeerId>, HashSet<PeerId>) {
432        let mut upserts = HashSet::default();
433        let mut deletions = HashSet::default();
434
435        match peer_update {
436            ts_control::PeerUpdate::Full(new_nodes) => {
437                tracing::trace!("full peer update");
438
439                // Only stable_ids that PASS the Tailnet-Lock gate survive a full re-sync. This makes
440                // revocation evict: if a peer is re-included with a now-invalid (or missing)
441                // signature under an active authority, it is excluded from `retained_ids`, so
442                // `retain` drops the stale (previously-admitted) entry rather than leaving it in the
443                // db unverified. With no authority, `tka_admits` is always `true`, so `retained_ids`
444                // is exactly the set of re-included stable_ids — the inactive path is byte-for-byte
445                // the pre-TKA behavior (no regression).
446                let retained_ids = new_nodes
447                    .iter()
448                    .filter(|node| self.tka_admits(node))
449                    .map(|x| &x.stable_id)
450                    .collect::<HashSet<_>>();
451
452                self.peer_db.retain(|id, peer| {
453                    let retain = retained_ids.contains(&peer.stable_id);
454
455                    if !retain {
456                        deletions.insert(id);
457                    }
458
459                    retain
460                });
461
462                for node in new_nodes {
463                    if !self.tka_admits(node) {
464                        continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
465                    }
466                    let peer_id = self.peer_db.upsert(node);
467                    upserts.insert(peer_id);
468                }
469            }
470
471            ts_control::PeerUpdate::Delta { remove, upsert } => {
472                tracing::trace!("delta peer update");
473
474                for peer in upsert {
475                    if !self.tka_admits(peer) {
476                        continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
477                    }
478                    let id = self.peer_db.upsert(peer);
479
480                    upserts.insert(id);
481                }
482
483                for peer in remove {
484                    let Some((id, _node)) = self.peer_db.remove(peer) else {
485                        tracing::error!(control_node_id = peer, "removed peer was unknown");
486                        continue;
487                    };
488
489                    deletions.insert(id);
490                }
491            }
492
493            ts_control::PeerUpdate::Patch(patches) => {
494                tracing::trace!(n = patches.len(), "peer patch update");
495
496                for patch in patches {
497                    // A patch only mutates a peer already in the netmap; an unknown node id is
498                    // ignored (the wire contract — a patch never creates a node). Clone the current
499                    // node, apply the present fields, and re-upsert through the same path as a
500                    // delta so indexes/routes stay consistent.
501                    let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
502                        tracing::debug!(
503                            control_node_id = patch.id,
504                            "peer patch for unknown node; ignoring"
505                        );
506                        continue;
507                    };
508
509                    let mut node = existing.clone();
510                    if let Some(endpoints) = &patch.underlay_addresses {
511                        node.underlay_addresses = endpoints.clone();
512                    }
513                    if let Some(derp) = patch.derp_region {
514                        node.derp_region = Some(derp);
515                    }
516                    if let Some(cap) = patch.cap {
517                        node.cap = cap;
518                    }
519                    if let Some(cap_map) = &patch.cap_map {
520                        node.cap_map = cap_map.clone();
521                    }
522                    if let Some(disco_key) = patch.disco_key {
523                        node.disco_key = Some(disco_key);
524                    }
525                    if let Some(expiry) = patch.node_key_expiry {
526                        node.node_key_expiry = Some(expiry);
527                    }
528                    // Key rotation: a patch may swap the node key (and its TKA signature). Apply
529                    // both together so the trust gate below verifies the new signature against the
530                    // new key, never a mismatched pair.
531                    if let Some(node_key) = patch.node_key {
532                        node.node_key = node_key;
533                    }
534                    if let Some(sig) = &patch.key_signature {
535                        node.key_signature = sig.clone();
536                    }
537
538                    // Re-run the tailnet-lock gate on the patched node: a patch that rotates the key
539                    // must satisfy the active authority, exactly like a `Delta` upsert, or it would
540                    // be a trust-enforcement bypass. fail-CLOSED — if the patched node is no longer
541                    // admitted, evict it rather than keep the stale (now-unverified) entry.
542                    if !self.tka_admits(&node) {
543                        if let Some((id, _)) = self.peer_db.remove(&patch.id) {
544                            tracing::warn!(
545                                control_node_id = patch.id,
546                                "peer patch rejected by tailnet lock; evicting peer"
547                            );
548                            deletions.insert(id);
549                        }
550                        continue;
551                    }
552
553                    let id = self.peer_db.upsert(&node);
554                    upserts.insert(id);
555                }
556            }
557        }
558
559        (upserts, deletions)
560    }
561
562    /// Test-only constructor: build a [`PeerTracker`] with a chosen [`tka_authority`](Self::tka_authority)
563    /// without going through the actor `on_start` path. Used by the TKA enforcement unit tests to
564    /// exercise the peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) directly.
565    #[cfg(test)]
566    fn for_test(env: Env, tka_authority: Option<ts_tka::Authority>) -> Self {
567        let (peer_watch, _) = watch::channel(Vec::new());
568        Self {
569            peer_db: PeerDb::default(),
570            seen_state_update: false,
571            pending_requests: Vec::new(),
572            peer_watch,
573            user_profiles: HashMap::new(),
574            tka_authority,
575            env,
576        }
577    }
578
579    fn service_pending_requests(&mut self) {
580        if self.seen_state_update {
581            return;
582        }
583
584        self.seen_state_update = true;
585
586        if !self.pending_requests.is_empty() {
587            tracing::debug!(
588                n_pending = self.pending_requests.len(),
589                "state update received, servicing pending requests"
590            );
591        }
592
593        for req in core::mem::take(&mut self.pending_requests) {
594            match req {
595                Pending::PeerByName(PeerByName { name }, reply) => {
596                    reply.send(self.peer_by_name_opt(&name).cloned());
597                }
598                Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
599                    reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
600                }
601                Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
602                    reply.send(
603                        self.peer_db
604                            .get_route(ip.into())
605                            .map(|(_id, node)| node.clone())
606                            .collect(),
607                    );
608                }
609                Pending::Status(reply) => {
610                    reply.send(self.status_peers());
611                }
612                Pending::WhoIs(Whois { addr }, reply) => {
613                    reply.send(self.whois_opt(addr));
614                }
615            }
616        }
617    }
618}
619
620#[cfg(test)]
621mod tka_tests {
622    //! Tailnet-Lock (TKA) enforcement tests for the peer-trust chokepoint.
623    //!
624    //! These exercise [`PeerTracker::tka_admits`] and the `tka_admits ⇒ upsert` loop the netmap
625    //! handler runs. The test [`ts_tka::Authority`] is built with [`ts_tka::Authority::from_state`]
626    //! over a known Ed25519 trusted key, and the signed node-key signature CBOR is produced through
627    //! `ts_tka`'s public `cbor` encoder + `aum_hash` (the exact same canonical bytes `ts_tka`'s own
628    //! `direct_signature_verifies_end_to_end` test signs, with no new crypto vectors invented and no
629    //! private `ts_tka` API used).
630
631    use ed25519_dalek::{Signer, SigningKey};
632    use ts_control::{Node, StableNodeId, TailnetAddress};
633    use ts_tka::{
634        AumHash, Authority, Key, KeyKind, State,
635        cbor::{self, Value},
636    };
637
638    use super::*;
639
640    /// `SigKind::Direct` wire value (Go `SigKind`; `ts_tka::SigKind::Direct = 1`).
641    const SIG_KIND_DIRECT: u64 = 1;
642
643    /// The 32-byte node key used across the signed-peer fixtures.
644    const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
645
646    /// Build a real [`Env`] for the tracker. Only the bus/keys/shutdown plumbing matters here; the
647    /// TKA gate reads neither, so the forwarding preferences are all benign defaults.
648    fn test_env() -> Env {
649        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
650        Env::new(
651            ts_keys::NodeState::generate(),
652            shutdown_rx,
653            crate::env::ForwarderConfig {
654                accept_routes: false,
655                exit_node: None,
656                forward_routes: Vec::new(),
657                forward_tcp_ports: Vec::new(),
658                forward_udp_ports: Vec::new(),
659                forward_all_ports: false,
660                forward_exit_egress: false,
661                exit_proxy: None,
662                peerapi_port: None,
663                taildrop_dir: None,
664                enable_ipv6: false,
665                persistent_keepalive_interval: None,
666                ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
667            },
668        )
669    }
670
671    /// A minimal peer [`Node`] carrying `node_key` and the given `key_signature`.
672    fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
673        Node {
674            id: 1,
675            stable_id: StableNodeId(stable_id.to_string()),
676            hostname: stable_id.to_string(),
677            user_id: 0,
678            tailnet: Some("ts.net".to_string()),
679            tags: Vec::new(),
680            tailnet_address: TailnetAddress {
681                ipv4: "100.64.0.1/32".parse().unwrap(),
682                ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
683            },
684            node_key: node_key.into(),
685            node_key_expiry: None,
686            key_signature,
687            machine_key: None,
688            disco_key: None,
689            accepted_routes: Vec::new(),
690            underlay_addresses: Vec::new(),
691            derp_region: None,
692            cap: Default::default(),
693            cap_map: Default::default(),
694            peerapi_port: None,
695            peerapi_dns_proxy: false,
696            is_wireguard_only: false,
697            exit_node_dns_resolvers: Vec::new(),
698            peer_relay: false,
699            service_vips: Default::default(),
700        }
701    }
702
703    /// Encode a `Direct` [`ts_tka::NodeKeySignature`] CBOR exactly as `ts_tka`'s private `to_cbor`
704    /// does (int-map keys: 1=kind, 2=pubkey, 3=key_id, 4=signature; empty byte fields omitted),
705    /// using only the crate's *public* `cbor` encoder. `signature` of `None` produces the
706    /// signing-digest preimage (the `SigHash` form).
707    fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
708        let mut pairs = alloc_pairs(node_key, key_id);
709        if let Some(sig) = signature {
710            pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
711        }
712        cbor::int_map(pairs).to_vec()
713    }
714
715    fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
716        vec![
717            (1, Some(Value::Uint(SIG_KIND_DIRECT))),
718            (2, Some(Value::Bytes(node_key.to_vec()))),
719            (3, Some(Value::Bytes(key_id.to_vec()))),
720        ]
721    }
722
723    /// Build a TKA [`Authority`] that trusts `signing.verifying_key()`, plus a valid `Direct`
724    /// node-key signature CBOR authorizing [`NODE_KEY_BYTES`] under it.
725    fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
726        // A fixed, known Ed25519 trusted key (mirrors ts_tka's own end-to-end test seed).
727        let signing = SigningKey::from_bytes(&[42u8; 32]);
728        let trusted_pub = signing.verifying_key().to_bytes().to_vec();
729
730        let authority = Authority::from_state(
731            AumHash([0; 32]),
732            State {
733                keys: vec![Key {
734                    kind: KeyKind::Ed25519,
735                    votes: 1,
736                    public: trusted_pub.clone(),
737                }],
738            },
739        );
740
741        // SigHash preimage = canonical CBOR with the signature field omitted; sign its blake2s hash.
742        let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
743        let sig_hash = ts_tka::aum_hash(&preimage).0;
744        let signature = signing.sign(&sig_hash).to_bytes().to_vec();
745
746        let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
747        // Sanity: the authority accepts the signature we just built (same path the gate uses).
748        assert!(
749            authority
750                .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
751                .is_ok()
752        );
753
754        (authority, signed_cbor)
755    }
756
757    #[tokio::test]
758    async fn tka_inactive_upserts_all_peers() {
759        // No authority ⇒ enforcement inactive ⇒ both a signed and an unsigned peer are admitted.
760        let mut tracker = PeerTracker::for_test(test_env(), None);
761
762        let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
763        let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
764
765        assert!(tracker.tka_admits(&signed));
766        assert!(tracker.tka_admits(&unsigned));
767
768        tracker.peer_db.upsert(&signed);
769        tracker.peer_db.upsert(&unsigned);
770        assert_eq!(tracker.peer_db.peers().len(), 2);
771    }
772
773    #[tokio::test]
774    async fn tka_active_rejects_unsigned_peer() {
775        // Authority present + peer presents no signature ⇒ rejected (fail-closed), not in peer_db.
776        let (authority, _sig) = authority_and_valid_sig();
777        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
778
779        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
780        assert!(!tracker.tka_admits(&unsigned));
781
782        // Mirror the handler's `if !tka_admits { continue }` loop.
783        if tracker.tka_admits(&unsigned) {
784            tracker.peer_db.upsert(&unsigned);
785        }
786        assert_eq!(tracker.peer_db.peers().len(), 0);
787        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
788    }
789
790    #[tokio::test]
791    async fn tka_active_rejects_bad_signature() {
792        // Authority present + a signature that fails to verify ⇒ rejected, not in peer_db.
793        let (authority, mut sig) = authority_and_valid_sig();
794        // Tamper the last byte (the trailing signature byte) so verification fails.
795        let last = sig.len() - 1;
796        sig[last] ^= 0xff;
797
798        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
799        let bad = peer_node("bad", NODE_KEY_BYTES, sig);
800        assert!(!tracker.tka_admits(&bad));
801
802        if tracker.tka_admits(&bad) {
803            tracker.peer_db.upsert(&bad);
804        }
805        assert_eq!(tracker.peer_db.peers().len(), 0);
806    }
807
808    #[tokio::test]
809    async fn tka_active_admits_authorized_peer() {
810        // Authority present + correctly-signed node key ⇒ admitted and upserted.
811        let (authority, sig) = authority_and_valid_sig();
812        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
813
814        let good = peer_node("good", NODE_KEY_BYTES, sig);
815        assert!(tracker.tka_admits(&good));
816
817        if tracker.tka_admits(&good) {
818            tracker.peer_db.upsert(&good);
819        }
820        assert_eq!(tracker.peer_db.peers().len(), 1);
821        assert!(tracker.peer_db.get(&good.node_key).is_some());
822    }
823
824    // ---------------------------------------------------------------------------------------------
825    // Tests that drive REAL `PeerUpdate`s through the shared handler body
826    // ([`PeerTracker::apply_peer_update`], the single source of truth the actor's netmap `handle`
827    // also calls), so the two real upsert sites (`Full` and `Delta { upsert }`) are exercised via
828    // the actual enforcement path — not by hand-mirroring `if !tka_admits { continue }`.
829    // ---------------------------------------------------------------------------------------------
830
831    #[tokio::test]
832    async fn tka_active_delta_upsert_rejects_unauthorized() {
833        // Drive a real `Delta { upsert }` whose peer carries no signature. The Delta upsert site
834        // must reject it under an active authority ⇒ not present in peer_db after the handler runs.
835        let (authority, _sig) = authority_and_valid_sig();
836        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
837
838        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
839        let update = ts_control::PeerUpdate::Delta {
840            upsert: vec![unsigned.clone()],
841            remove: Vec::new(),
842        };
843
844        tracker.apply_peer_update(&update);
845
846        assert_eq!(tracker.peer_db.peers().len(), 0);
847        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
848    }
849
850    #[tokio::test]
851    async fn tka_active_delta_upsert_admits_authorized() {
852        // Drive a real `Delta { upsert }` with a correctly-signed peer ⇒ present in peer_db.
853        let (authority, sig) = authority_and_valid_sig();
854        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
855
856        let good = peer_node("good", NODE_KEY_BYTES, sig);
857        let update = ts_control::PeerUpdate::Delta {
858            upsert: vec![good.clone()],
859            remove: Vec::new(),
860        };
861
862        tracker.apply_peer_update(&update);
863
864        assert_eq!(tracker.peer_db.peers().len(), 1);
865        assert!(tracker.peer_db.get(&good.node_key).is_some());
866    }
867
868    #[tokio::test]
869    async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
870        // Drive a real `Full` carrying a MIX of authorized + unauthorized peers. Only the
871        // correctly-signed peer survives the Full upsert site; the unsigned and bad-sig peers are
872        // dropped fail-closed.
873        let (authority, sig) = authority_and_valid_sig();
874        // A bad-sig variant of the same authorized signature (tamper the trailing byte).
875        let mut bad_sig = sig.clone();
876        let last = bad_sig.len() - 1;
877        bad_sig[last] ^= 0xff;
878
879        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
880
881        // Only the authorized peer carries NODE_KEY_BYTES (the key the authority signed); the
882        // rejected peers use distinct node keys so the survivor is unambiguous.
883        let good = peer_node("good", NODE_KEY_BYTES, sig);
884        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
885        let bad = peer_node("bad", [9u8; 32], bad_sig);
886
887        let update =
888            ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
889
890        tracker.apply_peer_update(&update);
891
892        assert_eq!(tracker.peer_db.peers().len(), 1);
893        assert!(tracker.peer_db.get(&good.node_key).is_some());
894        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
895        assert!(tracker.peer_db.get(&bad.node_key).is_none());
896    }
897
898    #[tokio::test]
899    async fn tka_full_resync_revocation_behavior() {
900        // Revocation-on-resync: admit a peer, then re-include the SAME stable_id in a `Full` with a
901        // now-invalid signature. Per the Logic review finding, the pre-fix `retain` kept the stale
902        // (previously-admitted) entry because membership was decided purely by stable_id.
903        //
904        // FIXED (not merely documented): the `Full` `retain` now keys on `tka_admits`-passing
905        // stable_ids, so a peer whose re-included signature no longer verifies under the active
906        // authority is EVICTED. This test asserts eviction. The inactive (authority=None) path is
907        // provably unchanged — `tka_admits` always returns `true` there, so the retained set equals
908        // the set of re-included stable_ids exactly (see `tka_inactive_full_resync_keeps_*`).
909        let (authority, sig) = authority_and_valid_sig();
910        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
911
912        // 1) Admit the peer with a valid signature via a real `Full`.
913        let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
914        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
915        assert_eq!(tracker.peer_db.peers().len(), 1);
916        assert!(tracker.peer_db.get(&good.node_key).is_some());
917
918        // 2) Re-sync the SAME stable_id, but with a now-invalid signature (tamper trailing byte).
919        let mut bad_sig = sig;
920        let last = bad_sig.len() - 1;
921        bad_sig[last] ^= 0xff;
922        let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
923        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
924
925        // Eviction: the stale entry is dropped because its re-included signature fails the gate.
926        assert_eq!(tracker.peer_db.peers().len(), 0);
927        assert!(tracker.peer_db.get(&revoked.node_key).is_none());
928    }
929
930    #[tokio::test]
931    async fn tka_inactive_full_resync_keeps_reincluded_peer() {
932        // Guard the inactive (authority=None) path against the revocation fix: with no authority,
933        // a peer re-included in a `Full` survives regardless of its signature bytes — byte-for-byte
934        // pre-TKA behavior, proving the `Full` `retain` change does not regress the always-taken
935        // branch this wave.
936        let mut tracker = PeerTracker::for_test(test_env(), None);
937
938        let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
939        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
940        assert_eq!(tracker.peer_db.peers().len(), 1);
941
942        // Re-sync the same stable_id with garbage signature bytes; inactive enforcement keeps it.
943        let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
944        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
945        assert_eq!(tracker.peer_db.peers().len(), 1);
946        assert!(tracker.peer_db.get(&resynced.node_key).is_some());
947    }
948
949    /// A `Patch` for a peer already in the netmap merges only the fields it carries — here new UDP
950    /// endpoints and a new home DERP — leaving the rest of the node intact. This is the fix for
951    /// dropped `peers_changed_patch`: without it the netmap keeps stale endpoints and the peer can
952    /// never re-handshake after it moves.
953    #[tokio::test]
954    async fn patch_merges_endpoints_and_derp_into_existing_peer() {
955        let mut tracker = PeerTracker::for_test(test_env(), None);
956
957        // Seed a peer (id == 1, per `peer_node`) with no endpoints / no DERP.
958        let peer = peer_node("mover", [1u8; 32], vec![]);
959        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
960        let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
961        assert!(before.underlay_addresses.is_empty());
962        assert!(before.derp_region.is_none());
963
964        // Patch in fresh reachability (the idle-peer-reconnect case).
965        let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
966        let patch = ts_control::PeerChange {
967            id: 1,
968            derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
969            cap: None,
970            cap_map: None,
971            underlay_addresses: Some(vec![new_ep]),
972            node_key: None,
973            key_signature: None,
974            disco_key: None,
975            node_key_expiry: None,
976        };
977        let (upserts, deletions) =
978            tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
979
980        assert_eq!(upserts.len(), 1);
981        assert_eq!(deletions.len(), 0);
982        // Same peer, now carrying the patched endpoint + DERP; node key untouched.
983        assert_eq!(tracker.peer_db.peers().len(), 1);
984        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
985        assert_eq!(after.underlay_addresses, vec![new_ep]);
986        assert_eq!(
987            after.derp_region,
988            Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
989        );
990        assert_eq!(after.node_key, peer.node_key);
991    }
992
993    /// A `Patch` whose node id is not in the current netmap is ignored (the wire contract: a patch
994    /// never creates a node). No upsert, no deletion, peer set unchanged.
995    #[tokio::test]
996    async fn patch_for_unknown_node_is_ignored() {
997        let mut tracker = PeerTracker::for_test(test_env(), None);
998        let known = peer_node("known", [1u8; 32], vec![]); // id == 1
999        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1000
1001        let patch = ts_control::PeerChange {
1002            id: 999, // not in the netmap
1003            derp_region: None,
1004            cap: None,
1005            cap_map: None,
1006            underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1007            node_key: None,
1008            key_signature: None,
1009            disco_key: None,
1010            node_key_expiry: None,
1011        };
1012        let (upserts, deletions) =
1013            tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1014
1015        assert_eq!(upserts.len(), 0);
1016        assert_eq!(deletions.len(), 0);
1017        assert_eq!(tracker.peer_db.peers().len(), 1);
1018        assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1019    }
1020
1021    /// An expiry-only `Patch` updates `node_key_expiry` on the matching peer (Go
1022    /// `PeerChange.KeyExpiry`), rather than being silently dropped until the next full resync.
1023    #[tokio::test]
1024    async fn patch_updates_node_key_expiry() {
1025        let mut tracker = PeerTracker::for_test(test_env(), None);
1026        let peer = peer_node("expiring", [1u8; 32], vec![]); // id == 1, node_key_expiry: None
1027        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1028
1029        let expiry = "2027-01-01T00:00:00Z"
1030            .parse::<chrono::DateTime<chrono::Utc>>()
1031            .unwrap();
1032        let patch = ts_control::PeerChange {
1033            id: 1,
1034            derp_region: None,
1035            cap: None,
1036            cap_map: None,
1037            underlay_addresses: None,
1038            node_key: None,
1039            key_signature: None,
1040            disco_key: None,
1041            node_key_expiry: Some(expiry),
1042        };
1043        tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1044
1045        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1046        assert_eq!(after.node_key_expiry, Some(expiry));
1047    }
1048
1049    /// Security: a `Patch` that rotates the node key must re-satisfy the tailnet-lock authority,
1050    /// exactly like a `Delta` upsert. A key-rotation patch whose new signature does NOT verify
1051    /// evicts the peer (fail-closed) rather than leaving a now-unverified entry — closing what would
1052    /// otherwise be a trust-enforcement bypass via the patch path.
1053    #[tokio::test]
1054    async fn patch_key_rotation_failing_tka_evicts_peer() {
1055        let (authority, sig) = authority_and_valid_sig();
1056        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
1057
1058        // Admit a correctly-signed peer (id == 1).
1059        let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1060        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1061        assert_eq!(tracker.peer_db.peers().len(), 1);
1062
1063        // Patch a new node key whose signature is garbage under the active authority.
1064        let patch = ts_control::PeerChange {
1065            id: 1,
1066            derp_region: None,
1067            cap: None,
1068            cap_map: None,
1069            underlay_addresses: None,
1070            node_key: Some([0x33u8; 32].into()),
1071            key_signature: Some(vec![0x00, 0x01, 0x02]),
1072            disco_key: None,
1073            node_key_expiry: None,
1074        };
1075        let (upserts, deletions) =
1076            tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1077
1078        assert_eq!(upserts.len(), 0);
1079        assert_eq!(deletions.len(), 1);
1080        assert_eq!(tracker.peer_db.peers().len(), 0);
1081    }
1082
1083    /// A node's `user_id` joins against the accumulated UserProfiles table to resolve the owning
1084    /// user's login name in `WhoIs.user`. With no matching profile, `user` is `None` (the
1085    /// pre-existing behavior); once a profile arrives, the same node resolves to its login. This
1086    /// proves the accumulate-then-join path the netmap handler builds.
1087    fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1088        ts_control::UserProfile {
1089            id,
1090            login_name: login.to_string(),
1091            display_name: None,
1092        }
1093    }
1094
1095    #[tokio::test]
1096    async fn whois_resolves_user_from_accumulated_profiles() {
1097        let mut tracker = PeerTracker::for_test(test_env(), None);
1098
1099        // A peer owned by user id 42 at 100.64.0.1 (the peer_node fixture's address).
1100        let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1101        peer.user_id = 42;
1102        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1103        let addr = "100.64.0.1:0".parse().unwrap();
1104
1105        // No profile yet: the node resolves but its owner is unknown.
1106        let who = tracker.whois_opt(addr).expect("peer is known");
1107        assert_eq!(who.user, None);
1108
1109        // Profile for a DIFFERENT user must not match.
1110        tracker
1111            .user_profiles
1112            .insert(7, profile(7, "someone-else@example.com"));
1113        assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1114
1115        // The owning user's profile arrives (as the netmap handler would accumulate it): now the
1116        // login resolves.
1117        tracker
1118            .user_profiles
1119            .insert(42, profile(42, "alice@example.com"));
1120        assert_eq!(
1121            tracker.whois_opt(addr).unwrap().user,
1122            Some("alice@example.com".to_string())
1123        );
1124    }
1125
1126    /// `UserProfile::best_label` prefers the login name, falling back to display name, else `None`.
1127    #[test]
1128    fn user_profile_best_label_prefers_login() {
1129        assert_eq!(
1130            profile(1, "alice@example.com").best_label(),
1131            Some("alice@example.com".to_string())
1132        );
1133        let display_only = ts_control::UserProfile {
1134            id: 2,
1135            login_name: String::new(),
1136            display_name: Some("Bob".to_string()),
1137        };
1138        assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1139        let empty = ts_control::UserProfile {
1140            id: 3,
1141            login_name: String::new(),
1142            display_name: None,
1143        };
1144        assert_eq!(empty.best_label(), None);
1145    }
1146}