Skip to main content

ts_runtime/peer_tracker/
mod.rs

1//! Peer delta update tracking.
2
3use std::{
4    collections::{HashMap, HashSet},
5    net::IpAddr,
6    sync::Arc,
7};
8
9use kameo::{
10    actor::ActorRef,
11    message::{Context, Message},
12    reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24/// Actor that tracks peer delta updates and emits new states.
25pub struct PeerTracker {
26    peer_db: PeerDb,
27    seen_state_update: bool,
28    pending_requests: Vec<Pending>,
29    /// Latest peer snapshot, published on every netmap update so embedders can watch for peer
30    /// changes ([`WatchNetmap`]).
31    peer_watch: watch::Sender<Vec<StatusNode>>,
32    /// Accumulated netmap user profiles (`MapResponse.UserProfiles`), keyed by user id, joined
33    /// against a node's [`Node::user_id`](ts_control::Node::user_id) to resolve the owning user's
34    /// login/display name for a [`WhoIs`](crate::status::WhoIs). Control sends these incrementally
35    /// (only new/changed profiles per response), so this map **accumulates** across updates rather
36    /// than being replaced — a peer upserted in one response may reference a profile delivered in an
37    /// earlier one.
38    user_profiles: HashMap<UserId, UserProfile>,
39    /// Tailnet-Lock (TKA) authority enforced at the peer-trust chokepoint, matching Go
40    /// `tkaFilterNetmapLocked`. Read on demand from a [`watch`] cell the control runner owns: when it
41    /// holds `Some` (a verified lock has been synced from control), enforcement is **active** — every
42    /// upserted peer must present a `key_signature` this authority authorizes, or it is dropped
43    /// (fail-closed), exactly as Go drops peers with a missing or failing signature. When it holds
44    /// `None` (no lock, or the lock was disabled) enforcement is **inactive** and every peer is
45    /// upserted, identical to pre-TKA behavior and to Go's `b.tka == nil` early return.
46    ///
47    /// A `watch::Receiver` (not the bus) is the transport on purpose: the authority is a single
48    /// security-critical state cell, and `watch` is last-write-wins, never-dropped, and ordered by
49    /// the control runner's own writes — so a disable (`None`) can never be reordered behind or
50    /// silently dropped before a stale `Some` (which a best-effort broadcast bus could do, leaving a
51    /// defunct lock enforcing forever). The control runner is the sole writer; we only ever read.
52    ///
53    /// The authority always passes through `VerifiedAumChain::verify` before the control runner
54    /// publishes it, so enforcement only engages on a chain we have cryptographically verified.
55    /// Connectivity now depends on `ts_tka` verifying genuinely-good signatures correctly (see
56    /// SECURITY.md). Self is structurally never filtered here (the self node never enters `peer_db` —
57    /// it is routed to the control runner's `self_node` cell), so a node cannot lock itself out of
58    /// its own netmap.
59    tka_authority: watch::Receiver<Option<Arc<ts_tka::Authority>>>,
60    env: Env,
61}
62
63impl PeerTracker {
64    fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
65        // Canonicalization (case + trailing dot) is handled inside the name index lookup.
66        self.peer_db.get(&name).map(|(_id, node)| node)
67    }
68
69    fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
70        self.peer_db.get(&ip).map(|(_id, node)| node)
71    }
72
73    /// Build the peer entries for a [`Status`](crate::Status) snapshot from the current peer db.
74    ///
75    /// Connectivity fields (`cur_addr`/`relay`) are left at their `from_node` defaults (`None`) here:
76    /// this is the live-watch/hot path and must stay magicsock-free and synchronous. The explicit
77    /// [`GetStatus`] snapshot enriches them ([`status_peers_with_ids`](Self::status_peers_with_ids)).
78    fn status_peers(&self) -> Vec<StatusNode> {
79        self.peer_db
80            .peers()
81            .values()
82            .map(StatusNode::from_node)
83            .collect()
84    }
85
86    /// Like [`status_peers`](Self::status_peers) but pairs each entry with its [`PeerId`], so the
87    /// caller can join per-peer connectivity (the direct manager's `best_addrs`, keyed by `PeerId`)
88    /// onto the `StatusNode` before returning it. Order is unspecified (a `HashMap` walk).
89    fn status_peers_with_ids(&self) -> Vec<(PeerId, StatusNode)> {
90        self.peer_db
91            .peers()
92            .iter()
93            .map(|(id, node)| (*id, StatusNode::from_node(node)))
94            .collect()
95    }
96
97    fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
98        let ip = crate::status::whois_addr(addr);
99        let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
100        // Join the node's owning user id against the accumulated UserProfiles table to resolve a
101        // login/display name. `None` when control sent no profile for that user (e.g. tagged nodes
102        // with no human owner, or a profile not yet delivered).
103        let user = self.resolve_user(node.user_id);
104        Some(crate::status::WhoIs::from_node_with_user(node, user))
105    }
106
107    /// Resolve a user id to its best display label from the accumulated profile table.
108    fn resolve_user(&self, user_id: UserId) -> Option<String> {
109        self.user_profiles
110            .get(&user_id)
111            .and_then(UserProfile::best_label)
112    }
113
114    /// Whether `node` may be admitted to the peer db under Tailnet Lock, matching Go
115    /// `tkaFilterNetmapLocked`'s per-peer verdict (drop unsigned / failed-signature peers).
116    ///
117    /// This consults the live [`tka_authority`](Self::tka_authority) cell on each call (one `borrow`,
118    /// held only for the duration of the verdict). For a `Full` resync — which checks every peer —
119    /// prefer [`tka_authority_snapshot`](Self::tka_authority_snapshot) +
120    /// [`tka_snapshot_admits`](Self::tka_snapshot_admits) to borrow once and verify each peer a single
121    /// time; this method is the convenience wrapper for the single-peer (`Delta`/patch) sites.
122    ///
123    /// Fail-closed and gated:
124    /// - No authority ⇒ no lock synced ⇒ always admit (Go's `b.tka == nil` early return; identical to
125    ///   pre-TKA behavior).
126    /// - **Empty trusted-key state** ⇒ always admit (logged at `error!` — see
127    ///   [`tka_snapshot_admits`](Self::tka_snapshot_admits) for the full rationale).
128    /// - Authority present + peer carries a `key_signature` the authority authorizes for the peer's
129    ///   node key ⇒ admit.
130    /// - Authority present + signature missing or unauthorized/invalid ⇒ **drop** (Go drops peers
131    ///   with a missing signature or failed `NodeKeyAuthorized` under tailnet lock).
132    fn tka_admits(&self, node: &Node) -> bool {
133        Self::tka_snapshot_admits(self.tka_authority.borrow().as_deref(), node)
134    }
135
136    /// Borrow the current TKA authority once (cloning the cheap `Arc`) for a batch verdict. Returns
137    /// `None` when no lock is synced (admit-all). Used by the `Full` path so a netmap of N peers
138    /// reads the cell once and runs at most one signature verify per peer (not two).
139    fn tka_authority_snapshot(&self) -> Option<Arc<ts_tka::Authority>> {
140        self.tka_authority.borrow().clone()
141    }
142
143    /// The per-peer Tailnet-Lock verdict against an already-borrowed `authority` snapshot. Factored
144    /// out so both the single-peer [`tka_admits`](Self::tka_admits) and the `Full` batch path share
145    /// one verdict implementation (no divergence) while the batch path verifies each peer exactly
146    /// once.
147    ///
148    /// Never logs key/signature bytes — only the `stable_id` and the `TkaError` Display (static
149    /// descriptors). Known parity gaps vs Go (both *under*-enforcement, documented in PARITY_ROADMAP):
150    /// no `UnsignedPeerAPIOnly` exemption (our model lacks the field), and no cross-peer
151    /// rotation-obsolete dropping (a rotated-away but still-validly-signed key is admitted — see the
152    /// roadmap; closing it needs a details-returning verify + a whole-netmap rotation pass).
153    fn tka_snapshot_admits(authority: Option<&ts_tka::Authority>, node: &Node) -> bool {
154        let Some(auth) = authority else {
155            return true;
156        };
157
158        // Brick-guard: an authority with no trusted keys would drop every peer. A verified chain is
159        // structurally guaranteed ≥1 key (genesis rejects an empty key set, and the last key cannot
160        // be removed), so reaching here means a `ts_tka` invariant was violated — admit rather than
161        // black-hole the whole netmap, and log at `error!` because it signals a real bug, not an
162        // expected runtime input. This is OUR fail-safe, not a Go behavior. NOTE: it only catches the
163        // empty-keyset shape; a non-empty authority that authorizes none of the offered peers still
164        // (correctly) drops them — that is what a lock that revoked everyone means. The
165        // "authorized-zero-peers" isolation case is surfaced separately by the caller.
166        if auth.state().keys.is_empty() {
167            tracing::error!(
168                "TKA: authority has an empty trusted-key set (verified chains never do — likely a \
169                 ts_tka bug); not enforcing (admitting all) to avoid isolating the node"
170            );
171            return true;
172        }
173
174        if node.key_signature.is_empty() {
175            tracing::warn!(
176                stable_id = ?node.stable_id,
177                "TKA: dropping unsigned peer under tailnet lock"
178            );
179            return false;
180        }
181
182        match auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
183            Ok(()) => {
184                tracing::debug!(stable_id = ?node.stable_id, "TKA: peer node-key authorized");
185                true
186            }
187            Err(e) => {
188                tracing::warn!(
189                    stable_id = ?node.stable_id,
190                    error = %e,
191                    "TKA: dropping peer with unauthorized node key"
192                );
193                false
194            }
195        }
196    }
197}
198
199impl kameo::Actor for PeerTracker {
200    /// `(env, tka_authority)`: the bus/keys env, plus the read end of the control runner's TKA
201    /// enforcement-authority cell (Go `tkaFilterNetmapLocked`). The control runner is the sole
202    /// writer; it publishes the verified `Authority` after a successful `/machine/tka/sync` and
203    /// `None` when the lock is disabled. A `watch` cell (not a bus message) so the latest value is
204    /// always readable on demand, never dropped, and never reordered (see [`tka_authority`]).
205    type Args = (Env, watch::Receiver<Option<Arc<ts_tka::Authority>>>);
206    type Error = Error;
207
208    async fn on_start(
209        (env, tka_authority): Self::Args,
210        slf: ActorRef<Self>,
211    ) -> Result<Self, Self::Error> {
212        env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
213
214        let (peer_watch, _) = watch::channel(Vec::new());
215
216        Ok(Self {
217            peer_db: PeerDb::default(),
218            pending_requests: Default::default(),
219            seen_state_update: false,
220            peer_watch,
221            user_profiles: HashMap::new(),
222            // The cell starts `None` (no lock synced ⇒ enforcement inactive, admit all, matching
223            // Go's `b.tka == nil`); the control runner flips it to `Some` on the first sync.
224            tka_authority,
225            env,
226        })
227    }
228}
229
230enum Pending {
231    PeerByName(PeerByName, ReplySender<Option<Node>>),
232    AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
233    TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
234    Status(ReplySender<Vec<(PeerId, StatusNode)>>),
235    WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
236}
237
238// For messages with arguments, a struct is generated with the args as fields. They aren't
239// documented, and we can't apply attributes directly to the fields. Hence, wrap in a module where
240// docs are turned off everywhere.
241#[allow(missing_docs)]
242mod msg_impl {
243    use std::net::IpAddr;
244
245    use kameo::prelude::DelegatedReply;
246
247    use super::*;
248
249    #[kameo::messages]
250    impl PeerTracker {
251        /// Lookup a peer by name.
252        ///
253        /// Waits until we've received at least one peer update from control.
254        #[message(ctx)]
255        pub async fn peer_by_name(
256            &mut self,
257            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
258            name: String,
259        ) -> DelegatedReply<Option<Node>> {
260            let (deleg, sender) = ctx.reply_sender();
261            let Some(sender) = sender else { return deleg };
262
263            if !self.seen_state_update {
264                tracing::debug!(query = name, "no peer state seen yet, queueing request");
265
266                self.pending_requests
267                    .push(Pending::PeerByName(PeerByName { name }, sender));
268
269                return deleg;
270            }
271
272            sender.send(self.peer_by_name_opt(&name).cloned());
273
274            deleg
275        }
276
277        /// Lookup all peers that accept packets addressed to the given IP.
278        ///
279        /// This includes the peer's tailnet address and any subnet routes it provides. Only
280        /// the peers with the most specific subnet route match that covers `ip` will be
281        /// returned.
282        ///
283        /// E.g., suppose:
284        ///
285        /// - We're querying for `10.1.2.3`
286        /// - `PeerA` and `PeerB` have accepted routes for `10.1.2.0/24`
287        /// - `PeerC` has an accepted route for `10.1.0.0/16`
288        ///
289        /// Only `PeerA` and `PeerB` will be returned, since they have the most specific
290        /// prefix match.
291        #[message(ctx)]
292        pub fn peer_by_accepted_route(
293            &mut self,
294            ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
295            ip: IpAddr,
296        ) -> DelegatedReply<Vec<Node>> {
297            let (deleg, sender) = ctx.reply_sender();
298            let Some(sender) = sender else { return deleg };
299
300            if !self.seen_state_update {
301                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
302
303                self.pending_requests
304                    .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
305
306                return deleg;
307            }
308
309            sender.send(
310                self.peer_db
311                    .get_route(ip.into())
312                    .map(|(_id, node)| node.clone())
313                    .collect(),
314            );
315
316            deleg
317        }
318
319        /// Lookup the peer that has the given tailnet IP address.
320        #[message(ctx)]
321        pub fn peer_by_tailnet_ip(
322            &mut self,
323            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
324            ip: IpAddr,
325        ) -> DelegatedReply<Option<Node>> {
326            let (deleg, sender) = ctx.reply_sender();
327            let Some(sender) = sender else { return deleg };
328
329            if !self.seen_state_update {
330                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
331
332                self.pending_requests
333                    .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
334
335                return deleg;
336            }
337
338            sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
339
340            deleg
341        }
342
343        /// Build the peer entries of a [`Status`](crate::Status) snapshot, each paired with its
344        /// [`PeerId`] so [`Runtime::status`](crate::Runtime::status) can join per-peer connectivity
345        /// (`cur_addr`/`relay`) from the direct manager before returning. The self node is *not*
346        /// included here (it lives in the control runner); `Runtime::status` combines both and drops
347        /// the ids.
348        ///
349        /// Waits until we've received at least one peer update from control.
350        #[message(ctx)]
351        pub fn get_status(
352            &mut self,
353            ctx: &mut Context<Self, DelegatedReply<Vec<(PeerId, StatusNode)>>>,
354        ) -> DelegatedReply<Vec<(PeerId, StatusNode)>> {
355            let (deleg, sender) = ctx.reply_sender();
356            let Some(sender) = sender else { return deleg };
357
358            if !self.seen_state_update {
359                tracing::debug!("no peer state seen yet, queueing status request");
360                self.pending_requests.push(Pending::Status(sender));
361                return deleg;
362            }
363
364            sender.send(self.status_peers_with_ids());
365
366            deleg
367        }
368
369        /// Return every known peer's full domain [`Node`] (not the lossy [`StatusNode`]).
370        ///
371        /// Used by [`Runtime::file_targets`](crate::Runtime::file_targets), which needs the full node
372        /// (peerAPI address, owning user id, cap map) to compute Taildrop send targets. The self node
373        /// is not included (it lives in the control runner). Returns empty before the first netmap —
374        /// the natural "not connected yet" analog (an immediate answer, no queueing needed: callers
375        /// that need a populated list await `Running` first).
376        #[message]
377        pub fn all_peers(&self) -> Vec<Node> {
378            self.peer_db.peers().values().cloned().collect()
379        }
380
381        /// Resolve which node owns a tailnet source address.
382        ///
383        /// Maps the source IP of `addr` to the owning node via the tailnet-IP index, returning a
384        /// [`WhoIs`](crate::WhoIs). The port is ignored (a tailnet IP uniquely identifies a node).
385        ///
386        /// The resulting [`WhoIs`](crate::WhoIs) carries no user/login or capability data: this
387        /// fork's domain [`Node`](ts_control::Node) does not retain those wire fields. See the
388        /// [`status`](crate::status) module docs for the gap.
389        ///
390        /// Waits until we've received at least one peer update from control.
391        #[message(ctx)]
392        pub fn whois(
393            &mut self,
394            ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
395            addr: std::net::SocketAddr,
396        ) -> DelegatedReply<Option<crate::status::WhoIs>> {
397            let (deleg, sender) = ctx.reply_sender();
398            let Some(sender) = sender else { return deleg };
399
400            if !self.seen_state_update {
401                tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
402                self.pending_requests
403                    .push(Pending::WhoIs(Whois { addr }, sender));
404                return deleg;
405            }
406
407            sender.send(self.whois_opt(addr));
408
409            deleg
410        }
411
412        /// Subscribe to netmap peer-change events.
413        ///
414        /// Returns a [`watch::Receiver`] whose value is the current set of peer
415        /// [`StatusNode`]s, updated on every netmap state update from control. Embedders can await
416        /// changes via [`watch::Receiver::changed`] to react to peers joining, leaving, or changing.
417        ///
418        /// The receiver's initial value is the peer set at subscription time (empty before the
419        /// first netmap update). This is a peer-only view; combine with the self node from
420        /// [`Runtime::status`](crate::Runtime::status) when a full snapshot is needed.
421        #[message(derive(Clone))]
422        pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
423            self.peer_watch.subscribe()
424        }
425    }
426}
427
428pub use msg_impl::*;
429
430#[derive(Debug, Clone)]
431pub(crate) struct PeerState {
432    #[allow(unused)]
433    pub deletions: HashSet<PeerId>,
434    #[allow(unused)]
435    pub upserts: HashSet<PeerId>,
436    pub peers: Arc<PeerDb>,
437}
438
439impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
440    type Reply = ();
441
442    async fn handle(
443        &mut self,
444        msg: Arc<ts_control::StateUpdate>,
445        _ctx: &mut Context<Self, Self::Reply>,
446    ) {
447        // Accumulate user profiles first — control sends them incrementally and a response may
448        // carry profiles with no peer delta (or peers that reference a profile from an earlier
449        // response), so this must happen before the no-peer-update early return below.
450        for profile in &msg.user_profiles {
451            self.user_profiles.insert(profile.id, profile.clone());
452        }
453
454        // Apply the standalone online/last-seen delta maps (channels C/D, `MapResponse.OnlineChange`
455        // / `PeerSeenChange`). These arrive keyed by control node id and may ride a response that
456        // carries NO `peer_update` (a bare online flip is the common case), so they must be applied
457        // *before* the no-peer-update early return — otherwise online status freezes at the last
458        // full-node/patch value. Each entry only ever *sets* a value (never back to unknown).
459        let liveness_changed =
460            self.apply_liveness_changes(&msg.online_change, &msg.peer_seen_change);
461
462        if msg.peer_update.is_none() && msg.peer_patches.is_empty() {
463            // No peer set or patch this response. If a liveness delta still mutated the netmap,
464            // publish the refreshed snapshot so watchers (and `GetStatus`) see the new online state.
465            if liveness_changed {
466                self.service_pending_requests();
467                self.peer_watch.send_replace(self.status_peers());
468                if let Err(e) = self
469                    .env
470                    .publish(Arc::new(PeerState {
471                        upserts: HashSet::default(),
472                        deletions: HashSet::default(),
473                        peers: Arc::new(self.peer_db.clone()),
474                    }))
475                    .await
476                {
477                    tracing::error!(error = %e, "publishing liveness-only peer state update");
478                }
479            }
480            return;
481        }
482
483        // Apply the whole-node peer set (if any) FIRST, then the field-level patches on top —
484        // mirroring Go's `controlclient` order (`Peers*` then `PeersChangedPatch`). A response may
485        // carry either, both, or (with a liveness-only delta) neither. Merge the upsert/deletion sets
486        // so the published `PeerState` reflects every node touched by both passes; a node both
487        // upserted by the set and patched stays in `upserts` (the patch removes it from `deletions`).
488        let (mut upserts, mut deletions) = msg
489            .peer_update
490            .as_ref()
491            .map(|u| self.apply_peer_update(u))
492            .unwrap_or_default();
493
494        if !msg.peer_patches.is_empty() {
495            let (patch_upserts, patch_deletions) = self.apply_peer_patches(&msg.peer_patches);
496            // A patch can evict a node the set just upserted (TKA rejection after key rotation), or
497            // re-admit/patch one not in the set — reconcile so each id lands in exactly one set.
498            for id in &patch_upserts {
499                deletions.remove(id);
500            }
501            for id in &patch_deletions {
502                upserts.remove(id);
503            }
504            upserts.extend(patch_upserts);
505            deletions.extend(patch_deletions);
506        }
507
508        tracing::debug!(
509            n_upsert = upserts.len(),
510            n_delete = deletions.len(),
511            peer_count = self.peer_db.peers().len(),
512            "new peer state"
513        );
514
515        self.service_pending_requests();
516
517        // Publish the latest peer snapshot to netmap watchers. `send_replace` keeps the receiver's
518        // value current even when there are no subscribers, so a late subscriber sees fresh state.
519        self.peer_watch.send_replace(self.status_peers());
520
521        if let Err(e) = self
522            .env
523            .publish(Arc::new(PeerState {
524                upserts,
525                deletions,
526                peers: Arc::new(self.peer_db.clone()),
527            }))
528            .await
529        {
530            tracing::error!(error = %e, "publishing peer state update");
531        }
532    }
533}
534
535/// Ask the peer tracker to re-broadcast its current peer snapshot on the bus, without any peer
536/// change. Sent after a runtime preference change so the route updater and source filter (both
537/// `Arc<PeerState>` subscribers) re-resolve against the new value immediately, rather than waiting
538/// for the next netmap update: `Device::set_exit_node` (new exit-node selector) and
539/// `Device::set_accept_routes` (new accept-routes flag) both send it.
540#[derive(Debug, Clone, Copy)]
541pub struct RepublishState;
542
543impl Message<RepublishState> for PeerTracker {
544    type Reply = ();
545
546    async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
547        // An empty upsert/deletion set: this is a re-broadcast of the unchanged peer set, not a
548        // delta. Subscribers recompute their routes/filters against the current peers and the
549        // (just-updated) runtime preferences (exit-node selector, accept-routes flag).
550        if let Err(e) = self
551            .env
552            .publish(Arc::new(PeerState {
553                upserts: HashSet::default(),
554                deletions: HashSet::default(),
555                peers: Arc::new(self.peer_db.clone()),
556            }))
557            .await
558        {
559            tracing::error!(error = %e, "re-publishing peer state after a runtime preference change");
560        }
561    }
562}
563
564impl PeerTracker {
565    /// Apply a single [`PeerUpdate`](ts_control::PeerUpdate) to the peer db, enforcing the
566    /// Tailnet-Lock peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) at every upsert site.
567    ///
568    /// This is the **single source of truth** for the peer-trust enforcement loop: the actor's
569    /// netmap [`handle`](Message::handle) calls it, and so do the TKA enforcement tests, so the two
570    /// real upsert sites (`Full` and `Delta { upsert }`) cannot diverge from what is tested.
571    ///
572    /// Returns `(upserts, deletions)` — the [`PeerId`]s touched — for downstream bookkeeping.
573    fn apply_peer_update(
574        &mut self,
575        peer_update: &ts_control::PeerUpdate,
576    ) -> (HashSet<PeerId>, HashSet<PeerId>) {
577        let mut upserts = HashSet::default();
578        let mut deletions = HashSet::default();
579
580        match peer_update {
581            ts_control::PeerUpdate::Full(new_nodes) => {
582                tracing::trace!("full peer update");
583
584                // Borrow the authority ONCE for the whole batch and verify each peer EXACTLY once
585                // (Go runs `tkaFilterNetmapLocked` once over the assembled netmap; an earlier draft
586                // verified every peer twice — once for `retained_ids`, once in the upsert loop —
587                // doubling the ed25519 cost on the hot resync path). The per-node verdict vector
588                // `admits` is computed once and drives both the `retain` (evict revoked peers, keyed
589                // by stable_id) and the upsert loop (skip rejected peers, by the node's OWN verdict).
590                // Keeping a per-node verdict (not just a stable_id set) means a node whose own
591                // signature fails is never admitted on the strength of a different node that happens
592                // to share its stable_id — matching the old per-node re-verify for that degenerate
593                // (malformed-control) input.
594                //
595                // Revocation evicts: a peer re-included with a now-invalid/missing signature under an
596                // active authority fails its verdict, so it is excluded from `retained_ids` and
597                // `retain` drops the stale (previously-admitted) entry. With no authority the snapshot
598                // is `None`, so every node passes — byte-for-byte the pre-TKA behavior (no regression).
599                let authority = self.tka_authority_snapshot();
600                let admits = new_nodes
601                    .iter()
602                    .map(|node| Self::tka_snapshot_admits(authority.as_deref(), node))
603                    .collect::<Vec<_>>();
604                let retained_ids = new_nodes
605                    .iter()
606                    .zip(admits.iter().copied())
607                    .filter(|(_, ok)| *ok)
608                    .map(|(node, _)| &node.stable_id)
609                    .collect::<HashSet<_>>();
610
611                // Isolation diagnostic: an ACTIVE lock that authorized none of the offered peers
612                // leaves this node with no peers — surface it loudly so a self-lockout (vs an attack)
613                // is diagnosable. `authority.is_some()` means a real keyed lock (the empty-keyset
614                // brick-guard admits-all, so it never reaches here with zero retained).
615                if authority.is_some() && !new_nodes.is_empty() && retained_ids.is_empty() {
616                    tracing::error!(
617                        offered = new_nodes.len(),
618                        "TKA: active lock authorized ZERO of the offered peers; node is isolated \
619                         (verify the lock state, or disable tailnet lock to recover)"
620                    );
621                }
622
623                self.peer_db.retain(|id, peer| {
624                    let retain = retained_ids.contains(&peer.stable_id);
625
626                    if !retain {
627                        deletions.insert(id);
628                    }
629
630                    retain
631                });
632
633                for (node, ok) in new_nodes.iter().zip(admits.iter().copied()) {
634                    if !ok {
635                        continue; // fail-CLOSED: peer rejected by tailnet lock (verified once, above)
636                    }
637                    let peer_id = self.peer_db.upsert(node);
638                    upserts.insert(peer_id);
639                }
640            }
641
642            ts_control::PeerUpdate::Delta { remove, upsert } => {
643                tracing::trace!("delta peer update");
644
645                for peer in upsert {
646                    if !self.tka_admits(peer) {
647                        // fail-CLOSED: do not upsert a peer rejected by tailnet lock. If the peer is
648                        // ALREADY in the db (a delta re-upserting an existing peer whose signature is
649                        // now invalid — e.g. revoked between syncs), evict the stale entry rather than
650                        // leaving an unverified peer admitted; Go re-filters the whole netmap each map
651                        // response, so a now-unsigned peer would not survive there either.
652                        if let Some((id, _)) = self.peer_db.remove(&peer.stable_id) {
653                            tracing::warn!(
654                                stable_id = ?peer.stable_id,
655                                "TKA: delta re-upsert rejected; evicting now-unauthorized peer"
656                            );
657                            deletions.insert(id);
658                        }
659                        continue;
660                    }
661                    let id = self.peer_db.upsert(peer);
662
663                    upserts.insert(id);
664                }
665
666                for peer in remove {
667                    let Some((id, _node)) = self.peer_db.remove(peer) else {
668                        tracing::error!(control_node_id = peer, "removed peer was unknown");
669                        continue;
670                    };
671
672                    deletions.insert(id);
673                }
674            }
675        }
676
677        (upserts, deletions)
678    }
679
680    /// Apply field-level peer patches (`MapResponse.PeersChangedPatch`), returning the upserted /
681    /// deleted [`PeerId`]s.
682    ///
683    /// This is a SEPARATE channel from [`apply_peer_update`](Self::apply_peer_update): Go's
684    /// `controlclient` applies the whole-node `Peers*` set first and then `PeersChangedPatch`, so a
685    /// response that carries both has the peer set applied first (by the caller) and these patches
686    /// applied second, on top of the freshly-synced nodes. A patch only mutates a peer already in the
687    /// netmap; an unknown node id is ignored (the wire contract — a patch never creates a node).
688    fn apply_peer_patches(
689        &mut self,
690        patches: &[ts_control::PeerChange],
691    ) -> (HashSet<PeerId>, HashSet<PeerId>) {
692        let mut upserts = HashSet::default();
693        let mut deletions = HashSet::default();
694
695        tracing::trace!(n = patches.len(), "peer patch update");
696
697        for patch in patches {
698            // Clone the current node, apply the present fields, and re-upsert through the same path
699            // as a delta so indexes/routes stay consistent.
700            let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
701                tracing::debug!(
702                    control_node_id = patch.id,
703                    "peer patch for unknown node; ignoring"
704                );
705                continue;
706            };
707
708            let mut node = existing.clone();
709            if let Some(endpoints) = &patch.underlay_addresses {
710                node.underlay_addresses = endpoints.clone();
711            }
712            if let Some(derp) = patch.derp_region {
713                node.derp_region = Some(derp);
714            }
715            if let Some(cap) = patch.cap {
716                node.cap = cap;
717            }
718            if let Some(cap_map) = &patch.cap_map {
719                node.cap_map = cap_map.clone();
720            }
721            if let Some(disco_key) = patch.disco_key {
722                node.disco_key = Some(disco_key);
723            }
724            if let Some(expiry) = patch.node_key_expiry {
725                node.node_key_expiry = Some(expiry);
726            }
727            // Online/last-seen liveness deltas (`PeerChange.Online`/`LastSeen`) — the dominant
728            // channel by which peer online transitions arrive mid-session. A patch only ever *sets*
729            // a value (never patches back to unknown), so apply when present.
730            if let Some(online) = patch.online {
731                node.online = Some(online);
732            }
733            if let Some(last_seen) = patch.last_seen {
734                node.last_seen = Some(last_seen);
735            }
736            // Key rotation: a patch may swap the node key (and its TKA signature). Apply both
737            // together so the trust gate below verifies the new signature against the new key, never
738            // a mismatched pair.
739            if let Some(node_key) = patch.node_key {
740                node.node_key = node_key;
741            }
742            if let Some(sig) = &patch.key_signature {
743                node.key_signature = sig.clone();
744            }
745
746            // Re-run the tailnet-lock gate on the patched node: a patch that rotates the key must
747            // satisfy the active authority, exactly like a `Delta` upsert, or it would be a
748            // trust-enforcement bypass. fail-CLOSED — if the patched node is no longer admitted,
749            // evict it rather than keep the stale (now-unverified) entry.
750            if !self.tka_admits(&node) {
751                if let Some((id, _)) = self.peer_db.remove(&patch.id) {
752                    tracing::warn!(
753                        control_node_id = patch.id,
754                        "peer patch rejected by tailnet lock; evicting peer"
755                    );
756                    deletions.insert(id);
757                }
758                continue;
759            }
760
761            let id = self.peer_db.upsert(&node);
762            upserts.insert(id);
763        }
764
765        (upserts, deletions)
766    }
767
768    /// Apply the standalone online/last-seen delta maps (`MapResponse.OnlineChange` /
769    /// `PeerSeenChange`, channels C/D) onto the retained netmap. Returns `true` if any node was
770    /// actually mutated (so the caller knows whether to re-publish).
771    ///
772    /// Mirrors Go's post-`peers*` application of these maps. Each entry is keyed by control node id
773    /// and only ever *sets* a value (never back to unknown). An entry for an unknown node id is
774    /// ignored (like a patch — these maps never create a node). `peer_seen_change`'s `false` ("the
775    /// peer is gone") is applied as `online = Some(false)` — the node stays in the netmap, it is
776    /// merely marked offline; the `last_seen = now` update for the `true` case is intentionally not
777    /// performed here (it needs a wall clock this actor does not hold, and `last_seen` is the
778    /// low-value half — `online` is the `tailscale status` column that matters; see the iter-5
779    /// research note §5.5).
780    fn apply_liveness_changes(
781        &mut self,
782        online_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
783        peer_seen_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
784    ) -> bool {
785        let mut changed = false;
786
787        // Channel C — direct online flips.
788        for (&node_id, &online) in online_change {
789            if let Some((_pid, existing)) = self.peer_db.get(&node_id)
790                && existing.online != Some(online)
791            {
792                let mut node = existing.clone();
793                node.online = Some(online);
794                self.peer_db.upsert(&node);
795                changed = true;
796            }
797        }
798
799        // Channel D — peer-seen flips. `false` ⇒ "the peer is gone" ⇒ mark offline (the node is
800        // retained, not removed). `true` ⇒ "seen just now"; the online half is unknown from this
801        // signal alone, so we leave `online` untouched (a `true` here does not assert connectivity to
802        // control, only recent contact) and defer the `last_seen = now` timestamp (no clock here).
803        for (&node_id, &seen) in peer_seen_change {
804            if !seen
805                && let Some((_pid, existing)) = self.peer_db.get(&node_id)
806                && existing.online != Some(false)
807            {
808                let mut node = existing.clone();
809                node.online = Some(false);
810                self.peer_db.upsert(&node);
811                changed = true;
812            }
813        }
814
815        changed
816    }
817
818    /// Test-only constructor: build a [`PeerTracker`] with a chosen initial TKA authority without
819    /// going through the actor `on_start` path. Returns the tracker plus the **`watch::Sender`** for
820    /// its enforcement-authority cell, so a test can drive the exact enable/disable transitions the
821    /// control runner drives at runtime (`tx.send_replace(Some(..))` ⇒ enforce, `tx.send_replace(None)`
822    /// ⇒ clear). The initial `Some` exercises the fail-closed chokepoint
823    /// ([`tka_admits`](Self::tka_admits)); `None` is the no-lock admit-all path. The returned sender
824    /// must be kept alive for the tracker to read updated values.
825    #[cfg(test)]
826    fn for_test(
827        env: Env,
828        tka_authority: Option<ts_tka::Authority>,
829    ) -> (Self, watch::Sender<Option<Arc<ts_tka::Authority>>>) {
830        let (peer_watch, _) = watch::channel(Vec::new());
831        let (tka_tx, tka_rx) = watch::channel(tka_authority.map(Arc::new));
832        let tracker = Self {
833            peer_db: PeerDb::default(),
834            seen_state_update: false,
835            pending_requests: Vec::new(),
836            peer_watch,
837            user_profiles: HashMap::new(),
838            tka_authority: tka_rx,
839            env,
840        };
841        (tracker, tka_tx)
842    }
843
844    fn service_pending_requests(&mut self) {
845        if self.seen_state_update {
846            return;
847        }
848
849        self.seen_state_update = true;
850
851        if !self.pending_requests.is_empty() {
852            tracing::debug!(
853                n_pending = self.pending_requests.len(),
854                "state update received, servicing pending requests"
855            );
856        }
857
858        for req in core::mem::take(&mut self.pending_requests) {
859            match req {
860                Pending::PeerByName(PeerByName { name }, reply) => {
861                    reply.send(self.peer_by_name_opt(&name).cloned());
862                }
863                Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
864                    reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
865                }
866                Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
867                    reply.send(
868                        self.peer_db
869                            .get_route(ip.into())
870                            .map(|(_id, node)| node.clone())
871                            .collect(),
872                    );
873                }
874                Pending::Status(reply) => {
875                    reply.send(self.status_peers_with_ids());
876                }
877                Pending::WhoIs(Whois { addr }, reply) => {
878                    reply.send(self.whois_opt(addr));
879                }
880            }
881        }
882    }
883}
884
885#[cfg(test)]
886mod tka_tests {
887    //! Tailnet-Lock (TKA) enforcement tests for the peer-trust chokepoint.
888    //!
889    //! These exercise [`PeerTracker::tka_admits`] and the `tka_admits ⇒ upsert` loop the netmap
890    //! handler runs. The test [`ts_tka::Authority`] is built with [`ts_tka::Authority::from_state`]
891    //! over a known Ed25519 trusted key, and the signed node-key signature CBOR is produced through
892    //! `ts_tka`'s public `cbor` encoder + `aum_hash` (the exact same canonical bytes `ts_tka`'s own
893    //! `direct_signature_verifies_end_to_end` test signs, with no new crypto vectors invented and no
894    //! private `ts_tka` API used).
895
896    use ed25519_dalek::{Signer, SigningKey};
897    use ts_control::{Node, StableNodeId, TailnetAddress};
898    use ts_tka::{
899        AumHash, Authority, Key, KeyKind, State,
900        cbor::{self, Value},
901    };
902
903    use super::*;
904
905    /// `SigKind::Direct` wire value (Go `SigKind`; `ts_tka::SigKind::Direct = 1`).
906    const SIG_KIND_DIRECT: u64 = 1;
907
908    /// The 32-byte node key used across the signed-peer fixtures.
909    const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
910
911    /// Build a real [`Env`] for the tracker. Only the bus/keys/shutdown plumbing matters here; the
912    /// TKA gate reads neither, so the forwarding preferences are all benign defaults.
913    fn test_env() -> Env {
914        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
915        Env::new(
916            ts_keys::NodeState::generate(),
917            shutdown_rx,
918            crate::env::ForwarderConfig {
919                accept_routes: false,
920                accept_dns: true,
921                exit_node: None,
922                forward_routes: Vec::new(),
923                forward_tcp_ports: Vec::new(),
924                forward_udp_ports: Vec::new(),
925                forward_all_ports: false,
926                forward_exit_egress: false,
927                block_incoming: false,
928                exit_proxy: None,
929                peerapi_port: None,
930                taildrop_dir: None,
931                enable_ipv6: false,
932                persistent_keepalive_interval: None,
933                ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
934            },
935        )
936    }
937
938    /// A minimal peer [`Node`] carrying `node_key` and the given `key_signature`.
939    fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
940        Node {
941            id: 1,
942            stable_id: StableNodeId(stable_id.to_string()),
943            hostname: stable_id.to_string(),
944            user_id: 0,
945            tailnet: Some("ts.net".to_string()),
946            tags: Vec::new(),
947            tailnet_address: TailnetAddress {
948                ipv4: "100.64.0.1/32".parse().unwrap(),
949                ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
950            },
951            node_key: node_key.into(),
952            node_key_expiry: None,
953            online: None,
954            last_seen: None,
955            key_signature,
956            machine_key: None,
957            disco_key: None,
958            accepted_routes: Vec::new(),
959            underlay_addresses: Vec::new(),
960            derp_region: None,
961            cap: Default::default(),
962            cap_map: Default::default(),
963            peerapi_port: None,
964            peerapi_dns_proxy: false,
965            is_wireguard_only: false,
966            exit_node_dns_resolvers: Vec::new(),
967            peer_relay: false,
968            service_vips: Default::default(),
969        }
970    }
971
972    /// Encode a `Direct` [`ts_tka::NodeKeySignature`] CBOR exactly as `ts_tka`'s private `to_cbor`
973    /// does (int-map keys: 1=kind, 2=pubkey, 3=key_id, 4=signature; empty byte fields omitted),
974    /// using only the crate's *public* `cbor` encoder. `signature` of `None` produces the
975    /// signing-digest preimage (the `SigHash` form).
976    fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
977        let mut pairs = alloc_pairs(node_key, key_id);
978        if let Some(sig) = signature {
979            pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
980        }
981        cbor::int_map(pairs).to_vec()
982    }
983
984    fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
985        vec![
986            (1, Some(Value::Uint(SIG_KIND_DIRECT))),
987            (2, Some(Value::Bytes(node_key.to_vec()))),
988            (3, Some(Value::Bytes(key_id.to_vec()))),
989        ]
990    }
991
992    /// Build a TKA [`Authority`] that trusts `signing.verifying_key()`, plus a valid `Direct`
993    /// node-key signature CBOR authorizing [`NODE_KEY_BYTES`] under it.
994    fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
995        // A fixed, known Ed25519 trusted key (mirrors ts_tka's own end-to-end test seed).
996        let signing = SigningKey::from_bytes(&[42u8; 32]);
997        let trusted_pub = signing.verifying_key().to_bytes().to_vec();
998
999        let authority = Authority::from_state(
1000            AumHash([0; 32]),
1001            State {
1002                keys: vec![Key {
1003                    kind: KeyKind::Ed25519,
1004                    votes: 1,
1005                    public: trusted_pub.clone(),
1006                }],
1007            },
1008        );
1009
1010        // SigHash preimage = canonical CBOR with the signature field omitted; sign its blake2s hash.
1011        let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
1012        let sig_hash = ts_tka::aum_hash(&preimage).0;
1013        let signature = signing.sign(&sig_hash).to_bytes().to_vec();
1014
1015        let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
1016        // Sanity: the authority accepts the signature we just built (same path the gate uses).
1017        assert!(
1018            authority
1019                .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
1020                .is_ok()
1021        );
1022
1023        (authority, signed_cbor)
1024    }
1025
1026    #[tokio::test]
1027    async fn tka_inactive_upserts_all_peers() {
1028        // No authority ⇒ enforcement inactive ⇒ both a signed and an unsigned peer are admitted.
1029        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1030
1031        let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
1032        let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
1033
1034        assert!(tracker.tka_admits(&signed));
1035        assert!(tracker.tka_admits(&unsigned));
1036
1037        tracker.peer_db.upsert(&signed);
1038        tracker.peer_db.upsert(&unsigned);
1039        assert_eq!(tracker.peer_db.peers().len(), 2);
1040    }
1041
1042    #[tokio::test]
1043    async fn tka_active_rejects_unsigned_peer() {
1044        // Authority present + peer presents no signature ⇒ rejected (fail-closed), not in peer_db.
1045        let (authority, _sig) = authority_and_valid_sig();
1046        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1047
1048        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1049        assert!(!tracker.tka_admits(&unsigned));
1050
1051        // Mirror the handler's `if !tka_admits { continue }` loop.
1052        if tracker.tka_admits(&unsigned) {
1053            tracker.peer_db.upsert(&unsigned);
1054        }
1055        assert_eq!(tracker.peer_db.peers().len(), 0);
1056        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1057    }
1058
1059    #[tokio::test]
1060    async fn tka_active_rejects_bad_signature() {
1061        // Authority present + a signature that fails to verify ⇒ rejected, not in peer_db.
1062        let (authority, mut sig) = authority_and_valid_sig();
1063        // Tamper the last byte (the trailing signature byte) so verification fails.
1064        let last = sig.len() - 1;
1065        sig[last] ^= 0xff;
1066
1067        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1068        let bad = peer_node("bad", NODE_KEY_BYTES, sig);
1069        assert!(!tracker.tka_admits(&bad));
1070
1071        if tracker.tka_admits(&bad) {
1072            tracker.peer_db.upsert(&bad);
1073        }
1074        assert_eq!(tracker.peer_db.peers().len(), 0);
1075    }
1076
1077    #[tokio::test]
1078    async fn tka_active_admits_authorized_peer() {
1079        // Authority present + correctly-signed node key ⇒ admitted and upserted.
1080        let (authority, sig) = authority_and_valid_sig();
1081        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1082
1083        let good = peer_node("good", NODE_KEY_BYTES, sig);
1084        assert!(tracker.tka_admits(&good));
1085
1086        if tracker.tka_admits(&good) {
1087            tracker.peer_db.upsert(&good);
1088        }
1089        assert_eq!(tracker.peer_db.peers().len(), 1);
1090        assert!(tracker.peer_db.get(&good.node_key).is_some());
1091    }
1092
1093    // ---------------------------------------------------------------------------------------------
1094    // Tests that drive REAL `PeerUpdate`s through the shared handler body
1095    // ([`PeerTracker::apply_peer_update`], the single source of truth the actor's netmap `handle`
1096    // also calls), so the two real upsert sites (`Full` and `Delta { upsert }`) are exercised via
1097    // the actual enforcement path — not by hand-mirroring `if !tka_admits { continue }`.
1098    // ---------------------------------------------------------------------------------------------
1099
1100    #[tokio::test]
1101    async fn tka_active_delta_upsert_rejects_unauthorized() {
1102        // Drive a real `Delta { upsert }` whose peer carries no signature. The Delta upsert site
1103        // must reject it under an active authority ⇒ not present in peer_db after the handler runs.
1104        let (authority, _sig) = authority_and_valid_sig();
1105        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1106
1107        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1108        let update = ts_control::PeerUpdate::Delta {
1109            upsert: vec![unsigned.clone()],
1110            remove: Vec::new(),
1111        };
1112
1113        tracker.apply_peer_update(&update);
1114
1115        assert_eq!(tracker.peer_db.peers().len(), 0);
1116        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1117    }
1118
1119    #[tokio::test]
1120    async fn tka_active_delta_upsert_admits_authorized() {
1121        // Drive a real `Delta { upsert }` with a correctly-signed peer ⇒ present in peer_db.
1122        let (authority, sig) = authority_and_valid_sig();
1123        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1124
1125        let good = peer_node("good", NODE_KEY_BYTES, sig);
1126        let update = ts_control::PeerUpdate::Delta {
1127            upsert: vec![good.clone()],
1128            remove: Vec::new(),
1129        };
1130
1131        tracker.apply_peer_update(&update);
1132
1133        assert_eq!(tracker.peer_db.peers().len(), 1);
1134        assert!(tracker.peer_db.get(&good.node_key).is_some());
1135    }
1136
1137    #[tokio::test]
1138    async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
1139        // Drive a real `Full` carrying a MIX of authorized + unauthorized peers. Only the
1140        // correctly-signed peer survives the Full upsert site; the unsigned and bad-sig peers are
1141        // dropped fail-closed.
1142        let (authority, sig) = authority_and_valid_sig();
1143        // A bad-sig variant of the same authorized signature (tamper the trailing byte).
1144        let mut bad_sig = sig.clone();
1145        let last = bad_sig.len() - 1;
1146        bad_sig[last] ^= 0xff;
1147
1148        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1149
1150        // Only the authorized peer carries NODE_KEY_BYTES (the key the authority signed); the
1151        // rejected peers use distinct node keys so the survivor is unambiguous.
1152        let good = peer_node("good", NODE_KEY_BYTES, sig);
1153        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1154        let bad = peer_node("bad", [9u8; 32], bad_sig);
1155
1156        let update =
1157            ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1158
1159        tracker.apply_peer_update(&update);
1160
1161        assert_eq!(tracker.peer_db.peers().len(), 1);
1162        assert!(tracker.peer_db.get(&good.node_key).is_some());
1163        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1164        assert!(tracker.peer_db.get(&bad.node_key).is_none());
1165    }
1166
1167    /// End-to-end through the REAL enforcement-authority transport (the `watch` cell the control
1168    /// runner writes), not a direct field poke: writing `Some(authority)` flips enforcement on so a
1169    /// mixed batch drops the unsigned/bad peers, and a subsequent `None` (lock disabled) clears
1170    /// enforcement so a peer DROPPED while enforced is re-admitted. Exercises the exact `borrow`-based
1171    /// read path `tka_admits` uses — a broken receiver wiring would pass every for_test-field test but
1172    /// fail here.
1173    #[tokio::test]
1174    async fn tka_authority_watch_enables_then_clears_enforcement() {
1175        let (authority, sig) = authority_and_valid_sig();
1176        let mut bad_sig = sig.clone();
1177        let last = bad_sig.len() - 1;
1178        bad_sig[last] ^= 0xff;
1179
1180        let (mut tracker, tka_tx) = PeerTracker::for_test(test_env(), None);
1181
1182        // 1) No authority yet ⇒ admit-all (Go b.tka == nil).
1183        let good = peer_node("good", NODE_KEY_BYTES, sig.clone());
1184        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1185        let bad = peer_node("bad", [9u8; 32], bad_sig);
1186        let batch = ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1187        tracker.apply_peer_update(&batch);
1188        assert_eq!(tracker.peer_db.peers().len(), 3, "no lock ⇒ admit all");
1189
1190        // 2) Publish the verified authority over the watch cell (exactly what the control runner does
1191        //    on a successful sync) ⇒ enforcement ON. A re-applied Full now drops unsigned + bad.
1192        tka_tx.send_replace(Some(Arc::new(authority)));
1193        tracker.apply_peer_update(&batch);
1194        assert_eq!(
1195            tracker.peer_db.peers().len(),
1196            1,
1197            "lock active ⇒ only the signed peer survives"
1198        );
1199        assert!(tracker.peer_db.get(&good.node_key).is_some());
1200        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1201        assert!(tracker.peer_db.get(&bad.node_key).is_none());
1202
1203        // 3) Lock disabled (None) ⇒ enforcement cleared ⇒ a peer that was DROPPED while enforced is
1204        //    re-admitted by a fresh netmap. Assert the specific previously-dropped key returns (not
1205        //    merely a count), so this proves the drop→clear→re-admit transition, not "admit-all-fresh".
1206        tka_tx.send_replace(None);
1207        tracker.apply_peer_update(&batch);
1208        assert_eq!(
1209            tracker.peer_db.peers().len(),
1210            3,
1211            "lock disabled ⇒ admit all again"
1212        );
1213        assert!(
1214            tracker.peer_db.get(&unsigned.node_key).is_some(),
1215            "the peer dropped under enforcement must come back once the lock is cleared"
1216        );
1217        assert!(tracker.peer_db.get(&bad.node_key).is_some());
1218    }
1219
1220    /// Degenerate input: two DISTINCT nodes sharing one `stable_id` in a single `Full`, one with a
1221    /// valid signature and one unsigned, under an active lock. Each node is judged by its OWN verdict
1222    /// (the per-node `admits` vector), so the unsigned node is never admitted on the strength of its
1223    /// signed twin. The single-verify `Full` refactor keeps this per-node semantics (a stable_id-set
1224    /// alone would have admitted whichever node was upserted last). Malformed control input; asserted
1225    /// only to lock the verdict-per-node behavior against regression.
1226    #[tokio::test]
1227    async fn tka_full_duplicate_stable_id_judges_each_node_on_its_own_signature() {
1228        let (authority, sig) = authority_and_valid_sig();
1229        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1230
1231        // Both carry stable_id "dup"; the signed one authorizes NODE_KEY_BYTES, the other is unsigned
1232        // and uses a different node key. Order them unsigned-last so a last-writer-wins stable_id set
1233        // would (wrongly) leave the unsigned node's key in the db.
1234        let signed = peer_node("dup", NODE_KEY_BYTES, sig);
1235        let unsigned = peer_node("dup", [8u8; 32], vec![]);
1236        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
1237            signed.clone(),
1238            unsigned.clone(),
1239        ]));
1240
1241        // The unsigned node's own verdict failed, so its key must NOT be present, regardless of the
1242        // shared stable_id. (The signed twin retained the stable_id; the db holds the signed key.)
1243        assert!(
1244            tracker.peer_db.get(&unsigned.node_key).is_none(),
1245            "a node whose own signature fails must not be admitted via a stable_id twin"
1246        );
1247        assert!(tracker.peer_db.get(&signed.node_key).is_some());
1248    }
1249
1250    /// The empty-trusted-key-state brick-guard: an authority with no keys must NOT drop the whole
1251    /// netmap (a `ts_tka` invariant violation / replayer edge). A verified chain always carries ≥1
1252    /// key, so this never weakens a genuine lock — it only prevents a black-hole. Uses ≥2 peers
1253    /// (one signed, one unsigned) to prove it admits **all**, not accidentally just one.
1254    #[tokio::test]
1255    async fn tka_empty_keyset_authority_admits_all() {
1256        use ts_tka::{AumHash, Authority, State};
1257        let empty_auth = Authority::from_state(AumHash([0u8; 32]), State { keys: Vec::new() });
1258        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(empty_auth));
1259        let signed = peer_node("signed", [7u8; 32], vec![0xde, 0xad]);
1260        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1261        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
1262            signed.clone(),
1263            unsigned.clone(),
1264        ]));
1265        assert_eq!(
1266            tracker.peer_db.peers().len(),
1267            2,
1268            "an empty-keyset authority must admit ALL peers (brick-guard), not enforce"
1269        );
1270    }
1271
1272    /// Signature-replay / `NodeKeyMismatch`: a structurally-valid signature that authorizes
1273    /// `NODE_KEY_BYTES` must NOT admit a DIFFERENT node key carrying that same signature blob. This is
1274    /// the highest-value bypass — if the sig↔node-key binding in `verify_signature` were dropped, this
1275    /// is the only test that would catch it (the other "bad" peers only flip a byte ⇒ `BadSignature`).
1276    #[tokio::test]
1277    async fn tka_active_rejects_valid_sig_for_wrong_node_key() {
1278        let (authority, sig) = authority_and_valid_sig();
1279        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1280
1281        // The signature authorizes NODE_KEY_BYTES; attach it to an imposter with a different key.
1282        let imposter = peer_node("imposter", [0x55u8; 32], sig);
1283        assert!(
1284            !tracker.tka_admits(&imposter),
1285            "a signature bound to one node key must not authorize a different node key"
1286        );
1287        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![imposter.clone()]));
1288        assert!(tracker.peer_db.get(&imposter.node_key).is_none());
1289    }
1290
1291    /// `UntrustedKey`: a signature produced by a well-formed Ed25519 key that is NOT in the
1292    /// authority's trusted-key state must be rejected — distinct from a tampered-byte `BadSignature`.
1293    #[tokio::test]
1294    async fn tka_active_rejects_sig_from_untrusted_key() {
1295        use ed25519_dalek::{Signer, SigningKey};
1296        let (authority, _sig) = authority_and_valid_sig();
1297        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1298
1299        // Sign a valid CBOR with a DIFFERENT key (not the one the authority trusts). The key_id in
1300        // the signature names this untrusted key, so `get_key` misses ⇒ UntrustedKey.
1301        let rogue = SigningKey::from_bytes(&[99u8; 32]);
1302        let rogue_pub = rogue.verifying_key().to_bytes().to_vec();
1303        let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &rogue_pub, None);
1304        let sig_hash = ts_tka::aum_hash(&preimage).0;
1305        let signature = rogue.sign(&sig_hash).to_bytes().to_vec();
1306        let rogue_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &rogue_pub, Some(&signature));
1307
1308        let peer = peer_node("rogue-signed", NODE_KEY_BYTES, rogue_cbor);
1309        assert!(
1310            !tracker.tka_admits(&peer),
1311            "a signature from a key outside the trusted set must be rejected"
1312        );
1313        // Drive the real upsert path too (match the sibling replay test's depth): an untrusted-key
1314        // signature must keep the peer out of the db, not merely fail the verdict in isolation.
1315        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1316        assert!(tracker.peer_db.get(&peer.node_key).is_none());
1317    }
1318
1319    /// Bus-enable analogue for `Delta`: enforcement engaged via the watch cell must also gate a
1320    /// `Delta { upsert }` (not only `Full`). Closes the "authority arrived over the transport AND the
1321    /// next update is a Delta" combination.
1322    #[tokio::test]
1323    async fn tka_watch_enable_enforces_delta_upsert() {
1324        let (authority, sig) = authority_and_valid_sig();
1325        let (mut tracker, tka_tx) = PeerTracker::for_test(test_env(), None);
1326        tka_tx.send_replace(Some(Arc::new(authority)));
1327
1328        let good = peer_node("good", NODE_KEY_BYTES, sig);
1329        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1330        tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1331            remove: vec![],
1332            upsert: vec![good.clone(), unsigned.clone()],
1333        });
1334        assert!(tracker.peer_db.get(&good.node_key).is_some());
1335        assert!(
1336            tracker.peer_db.get(&unsigned.node_key).is_none(),
1337            "delta upsert under an active lock must drop the unsigned peer"
1338        );
1339    }
1340
1341    /// A `Delta` re-upsert of an ALREADY-ADMITTED peer whose signature is now invalid must EVICT the
1342    /// stale entry (revocation-via-delta), not leave it admitted. Go re-filters the whole netmap each
1343    /// response, so a now-unsigned peer would not survive there either.
1344    #[tokio::test]
1345    async fn tka_delta_reupsert_with_invalid_sig_evicts_existing() {
1346        let (authority, sig) = authority_and_valid_sig();
1347        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1348
1349        // Admit the signed peer.
1350        let good = peer_node("good", NODE_KEY_BYTES, sig.clone());
1351        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1352        assert!(tracker.peer_db.get(&good.node_key).is_some());
1353
1354        // Re-upsert the SAME stable_id (now with no signature) via a delta ⇒ evicted, not retained.
1355        let revoked = peer_node("good", NODE_KEY_BYTES, vec![]);
1356        tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1357            remove: vec![],
1358            upsert: vec![revoked],
1359        });
1360        assert!(
1361            tracker.peer_db.get(&good.node_key).is_none(),
1362            "a delta re-upsert that fails the lock must evict the previously-admitted peer"
1363        );
1364    }
1365
1366    #[tokio::test]
1367    async fn tka_full_resync_revocation_behavior() {
1368        // Revocation-on-resync: admit a peer, then re-include the SAME stable_id in a `Full` with a
1369        // now-invalid signature. Per the Logic review finding, the pre-fix `retain` kept the stale
1370        // (previously-admitted) entry because membership was decided purely by stable_id.
1371        //
1372        // FIXED (not merely documented): the `Full` `retain` now keys on `tka_admits`-passing
1373        // stable_ids, so a peer whose re-included signature no longer verifies under the active
1374        // authority is EVICTED. This test asserts eviction. The inactive (authority=None) path is
1375        // provably unchanged — `tka_admits` always returns `true` there, so the retained set equals
1376        // the set of re-included stable_ids exactly (see `tka_inactive_full_resync_keeps_*`).
1377        let (authority, sig) = authority_and_valid_sig();
1378        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1379
1380        // 1) Admit the peer with a valid signature via a real `Full`.
1381        let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
1382        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1383        assert_eq!(tracker.peer_db.peers().len(), 1);
1384        assert!(tracker.peer_db.get(&good.node_key).is_some());
1385
1386        // 2) Re-sync the SAME stable_id, but with a now-invalid signature (tamper trailing byte).
1387        let mut bad_sig = sig;
1388        let last = bad_sig.len() - 1;
1389        bad_sig[last] ^= 0xff;
1390        let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
1391        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
1392
1393        // Eviction: the stale entry is dropped because its re-included signature fails the gate.
1394        assert_eq!(tracker.peer_db.peers().len(), 0);
1395        assert!(tracker.peer_db.get(&revoked.node_key).is_none());
1396    }
1397
1398    #[tokio::test]
1399    async fn tka_inactive_full_resync_keeps_reincluded_peer() {
1400        // Guard the inactive (authority=None) path against the revocation fix: with no authority,
1401        // a peer re-included in a `Full` survives regardless of its signature bytes — byte-for-byte
1402        // pre-TKA behavior, proving the `Full` `retain` change does not regress the always-taken
1403        // branch this wave.
1404        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1405
1406        let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
1407        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1408        assert_eq!(tracker.peer_db.peers().len(), 1);
1409
1410        // Re-sync the same stable_id with garbage signature bytes; inactive enforcement keeps it.
1411        let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
1412        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
1413        assert_eq!(tracker.peer_db.peers().len(), 1);
1414        assert!(tracker.peer_db.get(&resynced.node_key).is_some());
1415    }
1416
1417    /// A `Patch` for a peer already in the netmap merges only the fields it carries — here new UDP
1418    /// endpoints and a new home DERP — leaving the rest of the node intact. This is the fix for
1419    /// dropped `peers_changed_patch`: without it the netmap keeps stale endpoints and the peer can
1420    /// never re-handshake after it moves.
1421    #[tokio::test]
1422    async fn patch_merges_endpoints_and_derp_into_existing_peer() {
1423        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1424
1425        // Seed a peer (id == 1, per `peer_node`) with no endpoints / no DERP.
1426        let peer = peer_node("mover", [1u8; 32], vec![]);
1427        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1428        let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1429        assert!(before.underlay_addresses.is_empty());
1430        assert!(before.derp_region.is_none());
1431
1432        // Patch in fresh reachability (the idle-peer-reconnect case).
1433        let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1434        let patch = ts_control::PeerChange {
1435            id: 1,
1436            derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
1437            cap: None,
1438            cap_map: None,
1439            underlay_addresses: Some(vec![new_ep]),
1440            node_key: None,
1441            key_signature: None,
1442            disco_key: None,
1443            node_key_expiry: None,
1444            online: None,
1445            last_seen: None,
1446        };
1447        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1448
1449        assert_eq!(upserts.len(), 1);
1450        assert_eq!(deletions.len(), 0);
1451        // Same peer, now carrying the patched endpoint + DERP; node key untouched.
1452        assert_eq!(tracker.peer_db.peers().len(), 1);
1453        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1454        assert_eq!(after.underlay_addresses, vec![new_ep]);
1455        assert_eq!(
1456            after.derp_region,
1457            Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
1458        );
1459        assert_eq!(after.node_key, peer.node_key);
1460    }
1461
1462    /// Regression for `tsr-5u0`: when a whole-node set (`Delta`/`Full`) and a patch co-occur in one
1463    /// response, the patch is applied *on top of* the node the set just upserted — mirroring the
1464    /// handler's apply-order (peer set first, then `peer_patches`). Before the fix the patch shared
1465    /// the single `peer_update` slot and the co-occurring set silently dropped it, so a peer brought
1466    /// in by the delta kept stale (empty) reachability.
1467    #[tokio::test]
1468    async fn patch_applies_on_top_of_co_occurring_delta() {
1469        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1470
1471        // The whole-node delta upserts a brand-new peer (id == 1) with no reachability.
1472        let peer = peer_node("mover", [1u8; 32], vec![]);
1473        let (set_upserts, _) = tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1474            upsert: vec![peer.clone()],
1475            remove: vec![],
1476        });
1477        assert_eq!(set_upserts.len(), 1, "delta upserts the new peer");
1478
1479        // The patch from the SAME response then sets that peer's endpoints + DERP. This is exactly
1480        // the consumer order the handler runs (apply_peer_update then apply_peer_patches).
1481        let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1482        let patch = ts_control::PeerChange {
1483            id: 1,
1484            derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap())),
1485            cap: None,
1486            cap_map: None,
1487            underlay_addresses: Some(vec![new_ep]),
1488            node_key: None,
1489            key_signature: None,
1490            disco_key: None,
1491            node_key_expiry: None,
1492            online: None,
1493            last_seen: None,
1494        };
1495        let (patch_upserts, patch_deletions) =
1496            tracker.apply_peer_patches(std::slice::from_ref(&patch));
1497
1498        assert_eq!(
1499            patch_upserts.len(),
1500            1,
1501            "patch re-upserts the just-added peer"
1502        );
1503        assert_eq!(patch_deletions.len(), 0);
1504        // The peer added by the delta now carries the patched reachability — the patch was NOT lost.
1505        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1506        assert_eq!(after.underlay_addresses, vec![new_ep]);
1507        assert_eq!(
1508            after.derp_region,
1509            Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap()))
1510        );
1511    }
1512
1513    /// A `Patch` whose node id is not in the current netmap is ignored (the wire contract: a patch
1514    /// never creates a node). No upsert, no deletion, peer set unchanged.
1515    #[tokio::test]
1516    async fn patch_for_unknown_node_is_ignored() {
1517        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1518        let known = peer_node("known", [1u8; 32], vec![]); // id == 1
1519        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1520
1521        let patch = ts_control::PeerChange {
1522            id: 999, // not in the netmap
1523            derp_region: None,
1524            cap: None,
1525            cap_map: None,
1526            underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1527            node_key: None,
1528            key_signature: None,
1529            disco_key: None,
1530            node_key_expiry: None,
1531            online: None,
1532            last_seen: None,
1533        };
1534        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1535
1536        assert_eq!(upserts.len(), 0);
1537        assert_eq!(deletions.len(), 0);
1538        assert_eq!(tracker.peer_db.peers().len(), 1);
1539        assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1540    }
1541
1542    /// An expiry-only `Patch` updates `node_key_expiry` on the matching peer (Go
1543    /// `PeerChange.KeyExpiry`), rather than being silently dropped until the next full resync.
1544    #[tokio::test]
1545    async fn patch_updates_node_key_expiry() {
1546        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1547        let peer = peer_node("expiring", [1u8; 32], vec![]); // id == 1, node_key_expiry: None
1548        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1549
1550        let expiry = "2027-01-01T00:00:00Z"
1551            .parse::<chrono::DateTime<chrono::Utc>>()
1552            .unwrap();
1553        let patch = ts_control::PeerChange {
1554            id: 1,
1555            derp_region: None,
1556            cap: None,
1557            cap_map: None,
1558            underlay_addresses: None,
1559            node_key: None,
1560            key_signature: None,
1561            disco_key: None,
1562            node_key_expiry: Some(expiry),
1563            online: None,
1564            last_seen: None,
1565        };
1566        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1567
1568        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1569        assert_eq!(after.node_key_expiry, Some(expiry));
1570    }
1571
1572    /// Channel B: a `PeerChange.online` patch flips a peer's online state without a full node.
1573    #[tokio::test]
1574    async fn patch_updates_online() {
1575        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1576        let peer = peer_node("p", [1u8; 32], vec![]); // id == 1, online: None
1577        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1578        assert_eq!(
1579            tracker
1580                .peer_db
1581                .get(&(1 as ts_control::NodeId))
1582                .unwrap()
1583                .1
1584                .online,
1585            None
1586        );
1587
1588        let mut patch = ts_control::PeerChange {
1589            id: 1,
1590            derp_region: None,
1591            cap: None,
1592            cap_map: None,
1593            underlay_addresses: None,
1594            node_key: None,
1595            key_signature: None,
1596            disco_key: None,
1597            node_key_expiry: None,
1598            online: Some(true),
1599            last_seen: None,
1600        };
1601        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1602        assert_eq!(
1603            tracker
1604                .peer_db
1605                .get(&(1 as ts_control::NodeId))
1606                .unwrap()
1607                .1
1608                .online,
1609            Some(true),
1610            "PeerChange.online=Some(true) marks the peer online"
1611        );
1612
1613        // A subsequent patch flips it offline.
1614        patch.online = Some(false);
1615        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1616        assert_eq!(
1617            tracker
1618                .peer_db
1619                .get(&(1 as ts_control::NodeId))
1620                .unwrap()
1621                .1
1622                .online,
1623            Some(false)
1624        );
1625    }
1626
1627    /// Channel C/D: the `online_change` map flips online directly; `peer_seen_change: false`
1628    /// ("the peer is gone") marks the peer offline. Both apply to a peer already in the netmap and
1629    /// ignore unknown ids.
1630    #[tokio::test]
1631    async fn liveness_change_maps_apply_online() {
1632        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1633        let peer = peer_node("p", [1u8; 32], vec![]); // id == 1
1634        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1635
1636        // Channel C: online_change sets online=true.
1637        let mut online_change = std::collections::BTreeMap::new();
1638        online_change.insert(1 as ts_control::NodeId, true);
1639        online_change.insert(999 as ts_control::NodeId, true); // unknown id — ignored
1640        let changed = tracker.apply_liveness_changes(&online_change, &Default::default());
1641        assert!(changed);
1642        assert_eq!(
1643            tracker
1644                .peer_db
1645                .get(&(1 as ts_control::NodeId))
1646                .unwrap()
1647                .1
1648                .online,
1649            Some(true)
1650        );
1651
1652        // Channel D: peer_seen_change=false marks the peer offline (gone), node retained.
1653        let mut peer_seen_change = std::collections::BTreeMap::new();
1654        peer_seen_change.insert(1 as ts_control::NodeId, false);
1655        let changed = tracker.apply_liveness_changes(&Default::default(), &peer_seen_change);
1656        assert!(changed);
1657        assert_eq!(
1658            tracker
1659                .peer_db
1660                .get(&(1 as ts_control::NodeId))
1661                .unwrap()
1662                .1
1663                .online,
1664            Some(false),
1665            "peer_seen_change=false marks offline (the node stays in the netmap)"
1666        );
1667        assert_eq!(
1668            tracker.peer_db.peers().len(),
1669            1,
1670            "the node is retained, not removed"
1671        );
1672
1673        // No-op when nothing matches / changes.
1674        assert!(!tracker.apply_liveness_changes(&Default::default(), &Default::default()));
1675    }
1676
1677    /// Security: a `Patch` that rotates the node key must re-satisfy the tailnet-lock authority,
1678    /// exactly like a `Delta` upsert. A key-rotation patch whose new signature does NOT verify
1679    /// evicts the peer (fail-closed) rather than leaving a now-unverified entry — closing what would
1680    /// otherwise be a trust-enforcement bypass via the patch path.
1681    #[tokio::test]
1682    async fn patch_key_rotation_failing_tka_evicts_peer() {
1683        let (authority, sig) = authority_and_valid_sig();
1684        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1685
1686        // Admit a correctly-signed peer (id == 1).
1687        let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1688        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1689        assert_eq!(tracker.peer_db.peers().len(), 1);
1690
1691        // Patch a new node key whose signature is garbage under the active authority.
1692        let patch = ts_control::PeerChange {
1693            id: 1,
1694            derp_region: None,
1695            cap: None,
1696            cap_map: None,
1697            underlay_addresses: None,
1698            node_key: Some([0x33u8; 32].into()),
1699            key_signature: Some(vec![0x00, 0x01, 0x02]),
1700            disco_key: None,
1701            node_key_expiry: None,
1702            online: None,
1703            last_seen: None,
1704        };
1705        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1706
1707        assert_eq!(upserts.len(), 0);
1708        assert_eq!(deletions.len(), 1);
1709        assert_eq!(tracker.peer_db.peers().len(), 0);
1710    }
1711
1712    /// A node's `user_id` joins against the accumulated UserProfiles table to resolve the owning
1713    /// user's login name in `WhoIs.user`. With no matching profile, `user` is `None` (the
1714    /// pre-existing behavior); once a profile arrives, the same node resolves to its login. This
1715    /// proves the accumulate-then-join path the netmap handler builds.
1716    fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1717        ts_control::UserProfile {
1718            id,
1719            login_name: login.to_string(),
1720            display_name: None,
1721        }
1722    }
1723
1724    #[tokio::test]
1725    async fn whois_resolves_user_from_accumulated_profiles() {
1726        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1727
1728        // A peer owned by user id 42 at 100.64.0.1 (the peer_node fixture's address).
1729        let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1730        peer.user_id = 42;
1731        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1732        let addr = "100.64.0.1:0".parse().unwrap();
1733
1734        // No profile yet: the node resolves but its owner is unknown.
1735        let who = tracker.whois_opt(addr).expect("peer is known");
1736        assert_eq!(who.user, None);
1737
1738        // Profile for a DIFFERENT user must not match.
1739        tracker
1740            .user_profiles
1741            .insert(7, profile(7, "someone-else@example.com"));
1742        assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1743
1744        // The owning user's profile arrives (as the netmap handler would accumulate it): now the
1745        // login resolves.
1746        tracker
1747            .user_profiles
1748            .insert(42, profile(42, "alice@example.com"));
1749        assert_eq!(
1750            tracker.whois_opt(addr).unwrap().user,
1751            Some("alice@example.com".to_string())
1752        );
1753    }
1754
1755    /// `UserProfile::best_label` prefers the login name, falling back to display name, else `None`.
1756    #[test]
1757    fn user_profile_best_label_prefers_login() {
1758        assert_eq!(
1759            profile(1, "alice@example.com").best_label(),
1760            Some("alice@example.com".to_string())
1761        );
1762        let display_only = ts_control::UserProfile {
1763            id: 2,
1764            login_name: String::new(),
1765            display_name: Some("Bob".to_string()),
1766        };
1767        assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1768        let empty = ts_control::UserProfile {
1769            id: 3,
1770            login_name: String::new(),
1771            display_name: None,
1772        };
1773        assert_eq!(empty.best_label(), None);
1774    }
1775}