Skip to main content

ts_runtime/peer_tracker/
mod.rs

1//! Peer delta update tracking.
2
3use std::{
4    collections::{HashMap, HashSet},
5    net::IpAddr,
6    sync::Arc,
7};
8
9use kameo::{
10    actor::ActorRef,
11    message::{Context, Message},
12    reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24/// Actor that tracks peer delta updates and emits new states.
25pub struct PeerTracker {
26    peer_db: PeerDb,
27    seen_state_update: bool,
28    pending_requests: Vec<Pending>,
29    /// Latest peer snapshot, published on every netmap update so embedders can watch for peer
30    /// changes ([`WatchNetmap`]).
31    peer_watch: watch::Sender<Vec<StatusNode>>,
32    /// Accumulated netmap user profiles (`MapResponse.UserProfiles`), keyed by user id, joined
33    /// against a node's [`Node::user_id`](ts_control::Node::user_id) to resolve the owning user's
34    /// login/display name for a [`WhoIs`](crate::status::WhoIs). Control sends these incrementally
35    /// (only new/changed profiles per response), so this map **accumulates** across updates rather
36    /// than being replaced — a peer upserted in one response may reference a profile delivered in an
37    /// earlier one.
38    user_profiles: HashMap<UserId, UserProfile>,
39    /// Tailnet-Lock (TKA) authority enforced at the peer-trust chokepoint, matching Go
40    /// `tkaFilterNetmapLocked`. Read on demand from a [`watch`] cell the control runner owns: when it
41    /// holds `Some` (a verified lock has been synced from control), enforcement is **active** — every
42    /// upserted peer must present a `key_signature` this authority authorizes, or it is dropped
43    /// (fail-closed), exactly as Go drops peers with a missing or failing signature. When it holds
44    /// `None` (no lock, or the lock was disabled) enforcement is **inactive** and every peer is
45    /// upserted, identical to pre-TKA behavior and to Go's `b.tka == nil` early return.
46    ///
47    /// A `watch::Receiver` (not the bus) is the transport on purpose: the authority is a single
48    /// security-critical state cell, and `watch` is last-write-wins, never-dropped, and ordered by
49    /// the control runner's own writes — so a disable (`None`) can never be reordered behind or
50    /// silently dropped before a stale `Some` (which a best-effort broadcast bus could do, leaving a
51    /// defunct lock enforcing forever). The control runner is the sole writer; we only ever read.
52    ///
53    /// The authority always passes through `VerifiedAumChain::verify` before the control runner
54    /// publishes it, so enforcement only engages on a chain we have cryptographically verified.
55    /// Connectivity now depends on `ts_tka` verifying genuinely-good signatures correctly (see
56    /// SECURITY.md). Self is structurally never filtered here (the self node never enters `peer_db` —
57    /// it is routed to the control runner's `self_node` cell), so a node cannot lock itself out of
58    /// its own netmap.
59    tka_authority: watch::Receiver<Option<Arc<ts_tka::Authority>>>,
60    env: Env,
61}
62
63impl PeerTracker {
64    fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
65        // Canonicalization (case + trailing dot) is handled inside the name index lookup.
66        self.peer_db.get(&name).map(|(_id, node)| node)
67    }
68
69    fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
70        self.peer_db.get(&ip).map(|(_id, node)| node)
71    }
72
73    /// Build the peer entries for a [`Status`](crate::Status) snapshot from the current peer db.
74    ///
75    /// Connectivity fields (`cur_addr`/`relay`) are left at their `from_node` defaults (`None`) here:
76    /// this is the live-watch/hot path and must stay magicsock-free and synchronous. The explicit
77    /// [`GetStatus`] snapshot enriches them ([`status_peers_with_ids`](Self::status_peers_with_ids)).
78    fn status_peers(&self) -> Vec<StatusNode> {
79        self.peer_db
80            .peers()
81            .values()
82            .map(StatusNode::from_node)
83            .collect()
84    }
85
86    /// Like [`status_peers`](Self::status_peers) but pairs each entry with its [`PeerId`], so the
87    /// caller can join per-peer connectivity (the direct manager's `best_addrs`, keyed by `PeerId`)
88    /// onto the `StatusNode` before returning it. Order is unspecified (a `HashMap` walk).
89    fn status_peers_with_ids(&self) -> Vec<(PeerId, StatusNode)> {
90        self.peer_db
91            .peers()
92            .iter()
93            .map(|(id, node)| (*id, StatusNode::from_node(node)))
94            .collect()
95    }
96
97    fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
98        let ip = crate::status::whois_addr(addr);
99        let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
100        // Join the node's owning user id against the accumulated UserProfiles table to resolve a
101        // login/display name. `None` when control sent no profile for that user (e.g. tagged nodes
102        // with no human owner, or a profile not yet delivered).
103        let user = self.resolve_user(node.user_id);
104        Some(crate::status::WhoIs::from_node_with_user(node, user))
105    }
106
107    /// Resolve a user id to its best display label from the accumulated profile table.
108    fn resolve_user(&self, user_id: UserId) -> Option<String> {
109        self.user_profiles
110            .get(&user_id)
111            .and_then(UserProfile::best_label)
112    }
113
114    /// Whether `node` may be admitted to the peer db under Tailnet Lock, matching Go
115    /// `tkaFilterNetmapLocked`'s per-peer verdict (drop unsigned / failed-signature peers).
116    ///
117    /// This consults the live [`tka_authority`](Self::tka_authority) cell on each call (one `borrow`,
118    /// held only for the duration of the verdict). For a `Full` resync — which checks every peer —
119    /// prefer [`tka_authority_snapshot`](Self::tka_authority_snapshot) +
120    /// [`tka_snapshot_admits`](Self::tka_snapshot_admits) to borrow once and verify each peer a single
121    /// time; this method is the convenience wrapper for the single-peer (`Delta`/patch) sites.
122    ///
123    /// Fail-closed and gated:
124    /// - No authority ⇒ no lock synced ⇒ always admit (Go's `b.tka == nil` early return; identical to
125    ///   pre-TKA behavior).
126    /// - **Empty trusted-key state** ⇒ always admit (logged at `error!` — see
127    ///   [`tka_snapshot_admits`](Self::tka_snapshot_admits) for the full rationale).
128    /// - Authority present + peer carries a `key_signature` the authority authorizes for the peer's
129    ///   node key ⇒ admit.
130    /// - Authority present + signature missing or unauthorized/invalid ⇒ **drop** (Go drops peers
131    ///   with a missing signature or failed `NodeKeyAuthorized` under tailnet lock).
132    fn tka_admits(&self, node: &Node) -> bool {
133        // Single-peer sites (`Delta`/patch) only need the admit bool; the rotation details are used
134        // exclusively by the cross-peer `Full` filter (rotation obsolescence is whole-netmap).
135        Self::tka_snapshot_admits(self.tka_authority.borrow().as_deref(), node).admitted
136    }
137
138    /// Borrow the current TKA authority once (cloning the cheap `Arc`) for a batch verdict. Returns
139    /// `None` when no lock is synced (admit-all). Used by the `Full` path so a netmap of N peers
140    /// reads the cell once and runs at most one signature verify per peer (not two).
141    fn tka_authority_snapshot(&self) -> Option<Arc<ts_tka::Authority>> {
142        self.tka_authority.borrow().clone()
143    }
144
145    /// The per-peer Tailnet-Lock verdict against an already-borrowed `authority` snapshot. Factored
146    /// out so both the single-peer [`tka_admits`](Self::tka_admits) and the `Full` batch path share
147    /// one verdict implementation (no divergence) while the batch path verifies each peer exactly
148    /// once.
149    ///
150    /// Returns whether the peer is admitted AND, for an admitted peer signed by a rotation chain, the
151    /// [`RotationDetails`](ts_tka::RotationDetails) of that chain — so the `Full` path can run the
152    /// cross-peer rotation filter (Go's `rotationTracker`) without a second verify per peer. A peer
153    /// that is dropped, unsigned, or signed by a non-rotation chain carries `rotation == None`.
154    ///
155    /// Never logs key/signature bytes — only the `stable_id` and the `TkaError` Display (static
156    /// descriptors). One documented parity gap remains vs Go (under-enforcement, in PARITY_ROADMAP):
157    /// no `UnsignedPeerAPIOnly` exemption (our node model lacks the field).
158    fn tka_snapshot_admits(authority: Option<&ts_tka::Authority>, node: &Node) -> TkaVerdict {
159        let Some(auth) = authority else {
160            return TkaVerdict::admit();
161        };
162
163        // Brick-guard: an authority with no trusted keys would drop every peer. A verified chain is
164        // structurally guaranteed ≥1 key (genesis rejects an empty key set, and the last key cannot
165        // be removed), so reaching here means a `ts_tka` invariant was violated — admit rather than
166        // black-hole the whole netmap, and log at `error!` because it signals a real bug, not an
167        // expected runtime input. This is OUR fail-safe, not a Go behavior. NOTE: it only catches the
168        // empty-keyset shape; a non-empty authority that authorizes none of the offered peers still
169        // (correctly) drops them — that is what a lock that revoked everyone means. The
170        // "authorized-zero-peers" isolation case is surfaced separately by the caller.
171        if auth.state().keys.is_empty() {
172            tracing::error!(
173                "TKA: authority has an empty trusted-key set (verified chains never do — likely a \
174                 ts_tka bug); not enforcing (admitting all) to avoid isolating the node"
175            );
176            return TkaVerdict::admit();
177        }
178
179        if node.key_signature.is_empty() {
180            tracing::warn!(
181                stable_id = ?node.stable_id,
182                "TKA: dropping unsigned peer under tailnet lock"
183            );
184            return TkaVerdict::drop();
185        }
186
187        match auth.node_key_authorized_with_details(&node.node_key.to_bytes(), &node.key_signature)
188        {
189            Ok(rotation) => {
190                tracing::debug!(stable_id = ?node.stable_id, "TKA: peer node-key authorized");
191                TkaVerdict {
192                    admitted: true,
193                    rotation,
194                }
195            }
196            Err(e) => {
197                tracing::warn!(
198                    stable_id = ?node.stable_id,
199                    error = %e,
200                    "TKA: dropping peer with unauthorized node key"
201                );
202                TkaVerdict::drop()
203            }
204        }
205    }
206}
207
208/// The outcome of a per-peer Tailnet-Lock check: whether the peer is admitted, plus (for an admitted
209/// peer signed by a rotation chain) the chain's [`RotationDetails`](ts_tka::RotationDetails) so the
210/// `Full` path can run the cross-peer rotation filter from the SAME verify pass (no second verify).
211struct TkaVerdict {
212    admitted: bool,
213    rotation: Option<ts_tka::RotationDetails>,
214}
215
216impl TkaVerdict {
217    /// Admitted, no rotation details (no lock / brick-guard / non-rotation signature).
218    fn admit() -> Self {
219        Self {
220            admitted: true,
221            rotation: None,
222        }
223    }
224    /// Dropped.
225    fn drop() -> Self {
226        Self {
227            admitted: false,
228            rotation: None,
229        }
230    }
231}
232
233/// Cross-peer rotation-obsolescence tracker, mirroring Go `ipnlocal.rotationTracker`. Fed the
234/// [`RotationDetails`](ts_tka::RotationDetails) of every admitted, rotation-signed peer in a `Full`
235/// netmap; [`obsolete_keys`](Self::obsolete_keys) then returns the node keys to drop on top of the
236/// per-peer verdict. Two rules (Go `tkaFilterNetmapLocked` + `rotationTracker.obsoleteKeys`):
237///
238/// 1. Every prior node key named in any rotation chain is obsolete (a newer chain rotated it away).
239/// 2. Among `Direct`-rooted chains sharing one wrapping pubkey (a clone signal), only the
240///    longest-chain peer survives; if the two longest are tied, ALL in that group are dropped (we
241///    cannot tell which is the latest, so reject for safety). `Credential`-rooted chains are exempt
242///    from rule 2 — several nodes can legitimately join under one reusable auth key (same wrapping
243///    pubkey), so sharing it is not a clone signal there. (Rule 1 still applies to them.)
244///
245/// Node keys are tracked as raw `Vec<u8>` (the verified 32-byte node-public bytes).
246#[derive(Default)]
247struct RotationTracker {
248    obsolete: HashSet<Vec<u8>>,
249    by_wrapping_key: HashMap<Vec<u8>, Vec<SigRotation>>,
250}
251
252/// One admitted peer's rotation entry within a wrapping-key group.
253struct SigRotation {
254    node_key: Vec<u8>,
255    num_prev_keys: usize,
256}
257
258impl RotationTracker {
259    /// Record an admitted peer `node_key` and its rotation `details` (Go `addRotationDetails`).
260    fn add(&mut self, node_key: Vec<u8>, details: &ts_tka::RotationDetails) {
261        // Rule 1: every prior key is obsolete — applied for ALL chains (incl. credential-rooted),
262        // matching Go's ungated `obsolete.AddSlice(d.PrevNodeKeys)`.
263        self.obsolete.extend(details.prev_node_keys.iter().cloned());
264        // Rule 2 (clone-uniqueness) is gated to Direct-rooted chains only.
265        if details.initial_sig_kind != ts_tka::SigKind::Direct {
266            return;
267        }
268        self.by_wrapping_key
269            .entry(details.initial_wrapping_pubkey.clone())
270            .or_default()
271            .push(SigRotation {
272                node_key,
273                num_prev_keys: details.prev_node_keys.len(),
274            });
275    }
276
277    /// Compute the full obsolete node-key set (Go `rotationTracker.obsoleteKeys`). Processes each
278    /// wrapping-key group, mutating the shared `obsolete` set as it goes (so a key obsoleted by one
279    /// group is seen as obsolete by later groups via the `retain` below — Go's
280    /// `slices.DeleteFunc(... Contains)`). Group iteration order (a `HashMap` drain) is
281    /// nondeterministic, but the result is order-INDEPENDENT: this only ever *inserts* into
282    /// `obsolete` (never removes), and rule 1 already obsoleted every prior key before this loop, so
283    /// the final set is a union that does not depend on which group runs first (as in Go).
284    fn obsolete_keys(mut self) -> HashSet<Vec<u8>> {
285        // Drain only the group map so the loop can mutate `self.obsolete` without aliasing it; the
286        // shared `obsolete` set itself is NOT drained, preserving the cross-group visibility above.
287        let groups: Vec<Vec<SigRotation>> = self.by_wrapping_key.drain().map(|(_k, v)| v).collect();
288        for mut group in groups {
289            // Drop entries already obsoleted (rotated away) by another chain.
290            group.retain(|rd| !self.obsolete.contains(&rd.node_key));
291            if group.is_empty() {
292                continue;
293            }
294            // Longest chain (most prior keys) is the newest ⇒ the survivor; sort decreasing.
295            // `sort_by_key` is stable (like Go's `SortStableFunc`); `Reverse` gives descending order.
296            group.sort_by_key(|rd| core::cmp::Reverse(rd.num_prev_keys));
297            if group.len() >= 2 && group[0].num_prev_keys == group[1].num_prev_keys {
298                // Tie for longest ⇒ cannot disambiguate the latest ⇒ drop the WHOLE group.
299                tracing::warn!(
300                    "TKA: multiple peers share a wrapping key with equal rotation depth; dropping all (cannot determine the latest)"
301                );
302                for rd in &group {
303                    self.obsolete.insert(rd.node_key.clone());
304                }
305            } else {
306                // Only the longest-chain peer survives; the rest are obsolete.
307                for rd in &group[1..] {
308                    self.obsolete.insert(rd.node_key.clone());
309                }
310            }
311        }
312        self.obsolete
313    }
314}
315
316impl kameo::Actor for PeerTracker {
317    /// `(env, tka_authority)`: the bus/keys env, plus the read end of the control runner's TKA
318    /// enforcement-authority cell (Go `tkaFilterNetmapLocked`). The control runner is the sole
319    /// writer; it publishes the verified `Authority` after a successful `/machine/tka/sync` and
320    /// `None` when the lock is disabled. A `watch` cell (not a bus message) so the latest value is
321    /// always readable on demand, never dropped, and never reordered (see [`tka_authority`]).
322    type Args = (Env, watch::Receiver<Option<Arc<ts_tka::Authority>>>);
323    type Error = Error;
324
325    async fn on_start(
326        (env, tka_authority): Self::Args,
327        slf: ActorRef<Self>,
328    ) -> Result<Self, Self::Error> {
329        env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
330
331        let (peer_watch, _) = watch::channel(Vec::new());
332
333        Ok(Self {
334            peer_db: PeerDb::default(),
335            pending_requests: Default::default(),
336            seen_state_update: false,
337            peer_watch,
338            user_profiles: HashMap::new(),
339            // The cell starts `None` (no lock synced ⇒ enforcement inactive, admit all, matching
340            // Go's `b.tka == nil`); the control runner flips it to `Some` on the first sync.
341            tka_authority,
342            env,
343        })
344    }
345}
346
347enum Pending {
348    PeerByName(PeerByName, ReplySender<Option<Node>>),
349    AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
350    TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
351    Status(ReplySender<Vec<(PeerId, StatusNode)>>),
352    WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
353}
354
355// For messages with arguments, a struct is generated with the args as fields. They aren't
356// documented, and we can't apply attributes directly to the fields. Hence, wrap in a module where
357// docs are turned off everywhere.
358#[allow(missing_docs)]
359mod msg_impl {
360    use std::net::IpAddr;
361
362    use kameo::prelude::DelegatedReply;
363
364    use super::*;
365
366    #[kameo::messages]
367    impl PeerTracker {
368        /// Lookup a peer by name.
369        ///
370        /// Waits until we've received at least one peer update from control.
371        #[message(ctx)]
372        pub async fn peer_by_name(
373            &mut self,
374            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
375            name: String,
376        ) -> DelegatedReply<Option<Node>> {
377            let (deleg, sender) = ctx.reply_sender();
378            let Some(sender) = sender else { return deleg };
379
380            if !self.seen_state_update {
381                tracing::debug!(query = name, "no peer state seen yet, queueing request");
382
383                self.pending_requests
384                    .push(Pending::PeerByName(PeerByName { name }, sender));
385
386                return deleg;
387            }
388
389            sender.send(self.peer_by_name_opt(&name).cloned());
390
391            deleg
392        }
393
394        /// Lookup all peers that accept packets addressed to the given IP.
395        ///
396        /// This includes the peer's tailnet address and any subnet routes it provides. Only
397        /// the peers with the most specific subnet route match that covers `ip` will be
398        /// returned.
399        ///
400        /// E.g., suppose:
401        ///
402        /// - We're querying for `10.1.2.3`
403        /// - `PeerA` and `PeerB` have accepted routes for `10.1.2.0/24`
404        /// - `PeerC` has an accepted route for `10.1.0.0/16`
405        ///
406        /// Only `PeerA` and `PeerB` will be returned, since they have the most specific
407        /// prefix match.
408        #[message(ctx)]
409        pub fn peer_by_accepted_route(
410            &mut self,
411            ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
412            ip: IpAddr,
413        ) -> DelegatedReply<Vec<Node>> {
414            let (deleg, sender) = ctx.reply_sender();
415            let Some(sender) = sender else { return deleg };
416
417            if !self.seen_state_update {
418                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
419
420                self.pending_requests
421                    .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
422
423                return deleg;
424            }
425
426            sender.send(
427                self.peer_db
428                    .get_route(ip.into())
429                    .map(|(_id, node)| node.clone())
430                    .collect(),
431            );
432
433            deleg
434        }
435
436        /// Lookup the peer that has the given tailnet IP address.
437        #[message(ctx)]
438        pub fn peer_by_tailnet_ip(
439            &mut self,
440            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
441            ip: IpAddr,
442        ) -> DelegatedReply<Option<Node>> {
443            let (deleg, sender) = ctx.reply_sender();
444            let Some(sender) = sender else { return deleg };
445
446            if !self.seen_state_update {
447                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
448
449                self.pending_requests
450                    .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
451
452                return deleg;
453            }
454
455            sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
456
457            deleg
458        }
459
460        /// Build the peer entries of a [`Status`](crate::Status) snapshot, each paired with its
461        /// [`PeerId`] so [`Runtime::status`](crate::Runtime::status) can join per-peer connectivity
462        /// (`cur_addr`/`relay`) from the direct manager before returning. The self node is *not*
463        /// included here (it lives in the control runner); `Runtime::status` combines both and drops
464        /// the ids.
465        ///
466        /// Waits until we've received at least one peer update from control.
467        #[message(ctx)]
468        pub fn get_status(
469            &mut self,
470            ctx: &mut Context<Self, DelegatedReply<Vec<(PeerId, StatusNode)>>>,
471        ) -> DelegatedReply<Vec<(PeerId, StatusNode)>> {
472            let (deleg, sender) = ctx.reply_sender();
473            let Some(sender) = sender else { return deleg };
474
475            if !self.seen_state_update {
476                tracing::debug!("no peer state seen yet, queueing status request");
477                self.pending_requests.push(Pending::Status(sender));
478                return deleg;
479            }
480
481            sender.send(self.status_peers_with_ids());
482
483            deleg
484        }
485
486        /// Return every known peer's full domain [`Node`] (not the lossy [`StatusNode`]).
487        ///
488        /// Used by [`Runtime::file_targets`](crate::Runtime::file_targets), which needs the full node
489        /// (peerAPI address, owning user id, cap map) to compute Taildrop send targets. The self node
490        /// is not included (it lives in the control runner). Returns empty before the first netmap —
491        /// the natural "not connected yet" analog (an immediate answer, no queueing needed: callers
492        /// that need a populated list await `Running` first).
493        #[message]
494        pub fn all_peers(&self) -> Vec<Node> {
495            self.peer_db.peers().values().cloned().collect()
496        }
497
498        /// Resolve which node owns a tailnet source address.
499        ///
500        /// Maps the source IP of `addr` to the owning node via the tailnet-IP index, returning a
501        /// [`WhoIs`](crate::WhoIs). The port is ignored (a tailnet IP uniquely identifies a node).
502        ///
503        /// The resulting [`WhoIs`](crate::WhoIs) carries no user/login or capability data: this
504        /// fork's domain [`Node`](ts_control::Node) does not retain those wire fields. See the
505        /// [`status`](crate::status) module docs for the gap.
506        ///
507        /// Waits until we've received at least one peer update from control.
508        #[message(ctx)]
509        pub fn whois(
510            &mut self,
511            ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
512            addr: std::net::SocketAddr,
513        ) -> DelegatedReply<Option<crate::status::WhoIs>> {
514            let (deleg, sender) = ctx.reply_sender();
515            let Some(sender) = sender else { return deleg };
516
517            if !self.seen_state_update {
518                tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
519                self.pending_requests
520                    .push(Pending::WhoIs(Whois { addr }, sender));
521                return deleg;
522            }
523
524            sender.send(self.whois_opt(addr));
525
526            deleg
527        }
528
529        /// Subscribe to netmap peer-change events.
530        ///
531        /// Returns a [`watch::Receiver`] whose value is the current set of peer
532        /// [`StatusNode`]s, updated on every netmap state update from control. Embedders can await
533        /// changes via [`watch::Receiver::changed`] to react to peers joining, leaving, or changing.
534        ///
535        /// The receiver's initial value is the peer set at subscription time (empty before the
536        /// first netmap update). This is a peer-only view; combine with the self node from
537        /// [`Runtime::status`](crate::Runtime::status) when a full snapshot is needed.
538        #[message(derive(Clone))]
539        pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
540            self.peer_watch.subscribe()
541        }
542    }
543}
544
545pub use msg_impl::*;
546
547#[derive(Debug, Clone)]
548pub(crate) struct PeerState {
549    #[allow(unused)]
550    pub deletions: HashSet<PeerId>,
551    #[allow(unused)]
552    pub upserts: HashSet<PeerId>,
553    pub peers: Arc<PeerDb>,
554}
555
556impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
557    type Reply = ();
558
559    async fn handle(
560        &mut self,
561        msg: Arc<ts_control::StateUpdate>,
562        _ctx: &mut Context<Self, Self::Reply>,
563    ) {
564        // Accumulate user profiles first — control sends them incrementally and a response may
565        // carry profiles with no peer delta (or peers that reference a profile from an earlier
566        // response), so this must happen before the no-peer-update early return below.
567        for profile in &msg.user_profiles {
568            self.user_profiles.insert(profile.id, profile.clone());
569        }
570
571        // Apply the standalone online/last-seen delta maps (channels C/D, `MapResponse.OnlineChange`
572        // / `PeerSeenChange`). These arrive keyed by control node id and may ride a response that
573        // carries NO `peer_update` (a bare online flip is the common case), so they must be applied
574        // *before* the no-peer-update early return — otherwise online status freezes at the last
575        // full-node/patch value. Each entry only ever *sets* a value (never back to unknown).
576        // Wall clock for a `PeerSeenChange: true` (Go uses `clock.Now()`). chrono is built without
577        // its `clock` feature in this workspace, so derive it from `SystemTime` the same way the
578        // control runner / ssh-policy paths do (unix secs → `DateTime::from_timestamp`).
579        let now = std::time::SystemTime::now()
580            .duration_since(std::time::UNIX_EPOCH)
581            .ok()
582            .and_then(|d| chrono::DateTime::from_timestamp(d.as_secs() as i64, d.subsec_nanos()))
583            .unwrap_or_default();
584        let liveness_changed =
585            self.apply_liveness_changes(&msg.online_change, &msg.peer_seen_change, now);
586
587        if msg.peer_update.is_none() && msg.peer_patches.is_empty() {
588            // No peer set or patch this response. If a liveness delta still mutated the netmap,
589            // publish the refreshed snapshot so watchers (and `GetStatus`) see the new online state.
590            if liveness_changed {
591                self.service_pending_requests();
592                self.peer_watch.send_replace(self.status_peers());
593                if let Err(e) = self
594                    .env
595                    .publish(Arc::new(PeerState {
596                        upserts: HashSet::default(),
597                        deletions: HashSet::default(),
598                        peers: Arc::new(self.peer_db.clone()),
599                    }))
600                    .await
601                {
602                    tracing::error!(error = %e, "publishing liveness-only peer state update");
603                }
604            }
605            return;
606        }
607
608        // Apply the whole-node peer set (if any) FIRST, then the field-level patches on top —
609        // mirroring Go's `controlclient` order (`Peers*` then `PeersChangedPatch`). A response may
610        // carry either, both, or (with a liveness-only delta) neither. Merge the upsert/deletion sets
611        // so the published `PeerState` reflects every node touched by both passes; a node both
612        // upserted by the set and patched stays in `upserts` (the patch removes it from `deletions`).
613        let (mut upserts, mut deletions) = msg
614            .peer_update
615            .as_ref()
616            .map(|u| self.apply_peer_update(u))
617            .unwrap_or_default();
618
619        if !msg.peer_patches.is_empty() {
620            let (patch_upserts, patch_deletions) = self.apply_peer_patches(&msg.peer_patches);
621            // A patch can evict a node the set just upserted (TKA rejection after key rotation), or
622            // re-admit/patch one not in the set — reconcile so each id lands in exactly one set.
623            for id in &patch_upserts {
624                deletions.remove(id);
625            }
626            for id in &patch_deletions {
627                upserts.remove(id);
628            }
629            upserts.extend(patch_upserts);
630            deletions.extend(patch_deletions);
631        }
632
633        tracing::debug!(
634            n_upsert = upserts.len(),
635            n_delete = deletions.len(),
636            peer_count = self.peer_db.peers().len(),
637            "new peer state"
638        );
639
640        self.service_pending_requests();
641
642        // Publish the latest peer snapshot to netmap watchers. `send_replace` keeps the receiver's
643        // value current even when there are no subscribers, so a late subscriber sees fresh state.
644        self.peer_watch.send_replace(self.status_peers());
645
646        if let Err(e) = self
647            .env
648            .publish(Arc::new(PeerState {
649                upserts,
650                deletions,
651                peers: Arc::new(self.peer_db.clone()),
652            }))
653            .await
654        {
655            tracing::error!(error = %e, "publishing peer state update");
656        }
657    }
658}
659
660/// Ask the peer tracker to re-broadcast its current peer snapshot on the bus, without any peer
661/// change. Sent after a runtime preference change so the route updater and source filter (both
662/// `Arc<PeerState>` subscribers) re-resolve against the new value immediately, rather than waiting
663/// for the next netmap update: `Device::set_exit_node` (new exit-node selector) and
664/// `Device::set_accept_routes` (new accept-routes flag) both send it.
665#[derive(Debug, Clone, Copy)]
666pub struct RepublishState;
667
668impl Message<RepublishState> for PeerTracker {
669    type Reply = ();
670
671    async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
672        // An empty upsert/deletion set: this is a re-broadcast of the unchanged peer set, not a
673        // delta. Subscribers recompute their routes/filters against the current peers and the
674        // (just-updated) runtime preferences (exit-node selector, accept-routes flag).
675        if let Err(e) = self
676            .env
677            .publish(Arc::new(PeerState {
678                upserts: HashSet::default(),
679                deletions: HashSet::default(),
680                peers: Arc::new(self.peer_db.clone()),
681            }))
682            .await
683        {
684            tracing::error!(error = %e, "re-publishing peer state after a runtime preference change");
685        }
686    }
687}
688
689impl PeerTracker {
690    /// Apply a single [`PeerUpdate`](ts_control::PeerUpdate) to the peer db, enforcing the
691    /// Tailnet-Lock peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) at every upsert site.
692    ///
693    /// This is the **single source of truth** for the peer-trust enforcement loop: the actor's
694    /// netmap [`handle`](Message::handle) calls it, and so do the TKA enforcement tests, so the two
695    /// real upsert sites (`Full` and `Delta { upsert }`) cannot diverge from what is tested.
696    ///
697    /// Returns `(upserts, deletions)` — the [`PeerId`]s touched — for downstream bookkeeping.
698    fn apply_peer_update(
699        &mut self,
700        peer_update: &ts_control::PeerUpdate,
701    ) -> (HashSet<PeerId>, HashSet<PeerId>) {
702        let mut upserts = HashSet::default();
703        let mut deletions = HashSet::default();
704
705        match peer_update {
706            ts_control::PeerUpdate::Full(new_nodes) => {
707                tracing::trace!("full peer update");
708
709                // Borrow the authority ONCE for the whole batch and verify each peer EXACTLY once
710                // (Go runs `tkaFilterNetmapLocked` once over the assembled netmap; an earlier draft
711                // verified every peer twice — once for `retained_ids`, once in the upsert loop —
712                // doubling the ed25519 cost on the hot resync path). The per-node verdict vector
713                // `admits` is computed once and drives both the `retain` (evict revoked peers, keyed
714                // by stable_id) and the upsert loop (skip rejected peers, by the node's OWN verdict).
715                // Keeping a per-node verdict (not just a stable_id set) means a node whose own
716                // signature fails is never admitted on the strength of a different node that happens
717                // to share its stable_id — matching the old per-node re-verify for that degenerate
718                // (malformed-control) input.
719                //
720                // Revocation evicts: a peer re-included with a now-invalid/missing signature under an
721                // active authority fails its verdict, so it is excluded from `retained_ids` and
722                // `retain` drops the stale (previously-admitted) entry. With no authority the snapshot
723                // is `None`, so every node passes — byte-for-byte the pre-TKA behavior (no regression).
724                let authority = self.tka_authority_snapshot();
725                let verdicts = new_nodes
726                    .iter()
727                    .map(|node| Self::tka_snapshot_admits(authority.as_deref(), node))
728                    .collect::<Vec<_>>();
729
730                // Cross-peer rotation filter (Go `rotationTracker`): from the SAME verify pass above,
731                // feed every admitted, rotation-signed peer's details to the tracker, then drop any
732                // peer presenting a node key a newer rotation has superseded (or a tied clone). This
733                // is whole-netmap by nature — one peer's chain obsoletes another's key — so it lives
734                // here, not in the per-peer verdict, matching Go's single pass over `nm.Peers`.
735                let mut rotation = RotationTracker::default();
736                for (node, verdict) in new_nodes.iter().zip(&verdicts) {
737                    if verdict.admitted
738                        && let Some(details) = &verdict.rotation
739                    {
740                        rotation.add(node.node_key.to_bytes().to_vec(), details);
741                    }
742                }
743                let obsolete = rotation.obsolete_keys();
744
745                // Final per-node keep verdict: admitted by the per-peer check AND not rotation-obsolete.
746                // Drives both the `retain` (evict) and the upsert loop, so a node whose own signature
747                // fails — or whose key was rotated away — is never admitted on the strength of a
748                // stable_id twin.
749                let keep = new_nodes
750                    .iter()
751                    .zip(&verdicts)
752                    .map(|(node, v)| {
753                        // `contains` takes `&[u8]` (HashSet<Vec<u8>> borrows as a slice) — no alloc.
754                        v.admitted && !obsolete.contains(&node.node_key.to_bytes()[..])
755                    })
756                    .collect::<Vec<bool>>();
757
758                // `retained_ids` is the set of stable_ids that survive (drives `retain` to evict the
759                // rest). It must agree with what the upsert loop below will leave in the db. Control
760                // should never send two distinct nodes with the same `stable_id` in one `Full`, but if
761                // it does, `peer_db.upsert` is last-writer-wins on `stable_id`, so the db ends holding
762                // the LAST kept node for that id. Build `retained_ids` from kept nodes only — a
763                // stable_id is retained iff at least one of its (possibly duplicate) nodes is kept, so
764                // the upsert loop's last-kept node lands and `retain` never evicts a just-upserted id.
765                let retained_ids = new_nodes
766                    .iter()
767                    .zip(keep.iter().copied())
768                    .filter(|(_, k)| *k)
769                    .map(|(node, _)| &node.stable_id)
770                    .collect::<HashSet<_>>();
771
772                // Isolation diagnostic: an ACTIVE lock that authorized none of the offered peers
773                // leaves this node with no peers — surface it loudly so a self-lockout (vs an attack)
774                // is diagnosable. `authority.is_some()` means a real keyed lock (the empty-keyset
775                // brick-guard admits-all, so it never reaches here with zero retained).
776                if authority.is_some() && !new_nodes.is_empty() && retained_ids.is_empty() {
777                    tracing::error!(
778                        offered = new_nodes.len(),
779                        "TKA: active lock authorized ZERO of the offered peers; node is isolated \
780                         (verify the lock state, or disable tailnet lock to recover)"
781                    );
782                }
783
784                self.peer_db.retain(|id, peer| {
785                    let retain = retained_ids.contains(&peer.stable_id);
786
787                    if !retain {
788                        deletions.insert(id);
789                    }
790
791                    retain
792                });
793
794                for (node, k) in new_nodes.iter().zip(keep.iter().copied()) {
795                    if !k {
796                        continue; // fail-CLOSED: rejected by tailnet lock or rotation-obsolete (above)
797                    }
798                    let peer_id = self.peer_db.upsert(node);
799                    upserts.insert(peer_id);
800                }
801            }
802
803            ts_control::PeerUpdate::Delta { remove, upsert } => {
804                tracing::trace!("delta peer update");
805
806                for peer in upsert {
807                    if !self.tka_admits(peer) {
808                        // fail-CLOSED: do not upsert a peer rejected by tailnet lock. If the peer is
809                        // ALREADY in the db (a delta re-upserting an existing peer whose signature is
810                        // now invalid — e.g. revoked between syncs), evict the stale entry rather than
811                        // leaving an unverified peer admitted; Go re-filters the whole netmap each map
812                        // response, so a now-unsigned peer would not survive there either.
813                        if let Some((id, _)) = self.peer_db.remove(&peer.stable_id) {
814                            tracing::warn!(
815                                stable_id = ?peer.stable_id,
816                                "TKA: delta re-upsert rejected; evicting now-unauthorized peer"
817                            );
818                            deletions.insert(id);
819                        }
820                        continue;
821                    }
822                    let id = self.peer_db.upsert(peer);
823
824                    upserts.insert(id);
825                }
826
827                for peer in remove {
828                    let Some((id, _node)) = self.peer_db.remove(peer) else {
829                        // A benign, expected race: the peer may already be gone (dropped in a prior
830                        // `Full`, or fail-closed by TKA — whose now-"unknown" ids commonly reappear in
831                        // a trailing `peers_removed`). Go treats an unknown removal as a no-op; log at
832                        // debug, not error, to avoid false-alarm noise on a healthy node (matches the
833                        // unknown-node handling in `apply_peer_patches`).
834                        tracing::debug!(
835                            control_node_id = peer,
836                            "removed peer was unknown; ignoring"
837                        );
838                        continue;
839                    };
840
841                    deletions.insert(id);
842                }
843            }
844        }
845
846        (upserts, deletions)
847    }
848
849    /// Apply field-level peer patches (`MapResponse.PeersChangedPatch`), returning the upserted /
850    /// deleted [`PeerId`]s.
851    ///
852    /// This is a SEPARATE channel from [`apply_peer_update`](Self::apply_peer_update): Go's
853    /// `controlclient` applies the whole-node `Peers*` set first and then `PeersChangedPatch`, so a
854    /// response that carries both has the peer set applied first (by the caller) and these patches
855    /// applied second, on top of the freshly-synced nodes. A patch only mutates a peer already in the
856    /// netmap; an unknown node id is ignored (the wire contract — a patch never creates a node).
857    fn apply_peer_patches(
858        &mut self,
859        patches: &[ts_control::PeerChange],
860    ) -> (HashSet<PeerId>, HashSet<PeerId>) {
861        let mut upserts = HashSet::default();
862        let mut deletions = HashSet::default();
863
864        tracing::trace!(n = patches.len(), "peer patch update");
865
866        for patch in patches {
867            // Clone the current node, apply the present fields, and re-upsert through the same path
868            // as a delta so indexes/routes stay consistent.
869            let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
870                tracing::debug!(
871                    control_node_id = patch.id,
872                    "peer patch for unknown node; ignoring"
873                );
874                continue;
875            };
876
877            let mut node = existing.clone();
878            if let Some(endpoints) = &patch.underlay_addresses {
879                node.underlay_addresses = endpoints.clone();
880            }
881            if let Some(derp) = patch.derp_region {
882                node.derp_region = Some(derp);
883            }
884            if let Some(cap) = patch.cap {
885                node.cap = cap;
886            }
887            if let Some(cap_map) = &patch.cap_map {
888                node.cap_map = cap_map.clone();
889            }
890            if let Some(disco_key) = patch.disco_key {
891                node.disco_key = Some(disco_key);
892            }
893            if let Some(expiry) = patch.node_key_expiry {
894                node.node_key_expiry = Some(expiry);
895            }
896            // Online/last-seen liveness deltas (`PeerChange.Online`/`LastSeen`) — the dominant
897            // channel by which peer online transitions arrive mid-session. A patch only ever *sets*
898            // a value (never patches back to unknown), so apply when present.
899            if let Some(online) = patch.online {
900                node.online = Some(online);
901            }
902            if let Some(last_seen) = patch.last_seen {
903                node.last_seen = Some(last_seen);
904            }
905            // Key rotation: a patch may swap the node key (and its TKA signature). Apply both
906            // together so the trust gate below verifies the new signature against the new key, never
907            // a mismatched pair.
908            if let Some(node_key) = patch.node_key {
909                node.node_key = node_key;
910            }
911            if let Some(sig) = &patch.key_signature {
912                node.key_signature = sig.clone();
913            }
914
915            // Re-run the tailnet-lock gate on the patched node: a patch that rotates the key must
916            // satisfy the active authority, exactly like a `Delta` upsert, or it would be a
917            // trust-enforcement bypass. fail-CLOSED — if the patched node is no longer admitted,
918            // evict it rather than keep the stale (now-unverified) entry.
919            if !self.tka_admits(&node) {
920                if let Some((id, _)) = self.peer_db.remove(&patch.id) {
921                    tracing::warn!(
922                        control_node_id = patch.id,
923                        "peer patch rejected by tailnet lock; evicting peer"
924                    );
925                    deletions.insert(id);
926                }
927                continue;
928            }
929
930            let id = self.peer_db.upsert(&node);
931            upserts.insert(id);
932        }
933
934        (upserts, deletions)
935    }
936
937    /// Apply the standalone online/last-seen delta maps (`MapResponse.OnlineChange` /
938    /// `PeerSeenChange`, channels C/D) onto the retained netmap. Returns `true` if any node was
939    /// actually mutated (so the caller knows whether to re-publish).
940    ///
941    /// Mirrors Go `controlclient/map.go:updatePeersStateFromResponse` (the two channels are
942    /// semantically DISTINCT and must not be conflated):
943    /// - `OnlineChange` (channel C) is the sole driver of a peer's `online` flag (`mut.Online = v`).
944    /// - `PeerSeenChange` (channel D) is the sole driver of `last_seen`: `true ⇒ LastSeen = now`,
945    ///   `false ⇒ LastSeen = nil` (cleared). It NEVER touches `online` — "not seen recently" is not
946    ///   the same as "offline", which only `OnlineChange` asserts.
947    ///
948    /// Each entry is keyed by control node id and applies to a peer already in the netmap; an unknown
949    /// node id is ignored (these maps never create a node). `now` is the wall-clock timestamp for a
950    /// `PeerSeenChange: true` (Go uses `clock.Now()`); the caller passes it so this stays a pure
951    /// function of its inputs. Returns `true` if any node was actually mutated.
952    fn apply_liveness_changes(
953        &mut self,
954        online_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
955        peer_seen_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
956        now: chrono::DateTime<chrono::Utc>,
957    ) -> bool {
958        let mut changed = false;
959
960        // Channel C — direct online flips (the only writer of `online`).
961        for (&node_id, &online) in online_change {
962            if let Some((_pid, existing)) = self.peer_db.get(&node_id)
963                && existing.online != Some(online)
964            {
965                let mut node = existing.clone();
966                node.online = Some(online);
967                self.peer_db.upsert(&node);
968                changed = true;
969            }
970        }
971
972        // Channel D — peer-seen flips (the only writer of `last_seen`; never touches `online`).
973        // `true` ⇒ last-seen is now; `false` ⇒ last-seen cleared (Go map.go:820-830).
974        for (&node_id, &seen) in peer_seen_change {
975            let new_last_seen = if seen { Some(now) } else { None };
976            if let Some((_pid, existing)) = self.peer_db.get(&node_id)
977                && existing.last_seen != new_last_seen
978            {
979                let mut node = existing.clone();
980                node.last_seen = new_last_seen;
981                self.peer_db.upsert(&node);
982                changed = true;
983            }
984        }
985
986        changed
987    }
988
989    /// Test-only constructor: build a [`PeerTracker`] with a chosen initial TKA authority without
990    /// going through the actor `on_start` path. Returns the tracker plus the **`watch::Sender`** for
991    /// its enforcement-authority cell, so a test can drive the exact enable/disable transitions the
992    /// control runner drives at runtime (`tx.send_replace(Some(..))` ⇒ enforce, `tx.send_replace(None)`
993    /// ⇒ clear). The initial `Some` exercises the fail-closed chokepoint
994    /// ([`tka_admits`](Self::tka_admits)); `None` is the no-lock admit-all path. The returned sender
995    /// must be kept alive for the tracker to read updated values.
996    #[cfg(test)]
997    fn for_test(
998        env: Env,
999        tka_authority: Option<ts_tka::Authority>,
1000    ) -> (Self, watch::Sender<Option<Arc<ts_tka::Authority>>>) {
1001        let (peer_watch, _) = watch::channel(Vec::new());
1002        let (tka_tx, tka_rx) = watch::channel(tka_authority.map(Arc::new));
1003        let tracker = Self {
1004            peer_db: PeerDb::default(),
1005            seen_state_update: false,
1006            pending_requests: Vec::new(),
1007            peer_watch,
1008            user_profiles: HashMap::new(),
1009            tka_authority: tka_rx,
1010            env,
1011        };
1012        (tracker, tka_tx)
1013    }
1014
1015    fn service_pending_requests(&mut self) {
1016        if self.seen_state_update {
1017            return;
1018        }
1019
1020        self.seen_state_update = true;
1021
1022        if !self.pending_requests.is_empty() {
1023            tracing::debug!(
1024                n_pending = self.pending_requests.len(),
1025                "state update received, servicing pending requests"
1026            );
1027        }
1028
1029        for req in core::mem::take(&mut self.pending_requests) {
1030            match req {
1031                Pending::PeerByName(PeerByName { name }, reply) => {
1032                    reply.send(self.peer_by_name_opt(&name).cloned());
1033                }
1034                Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
1035                    reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
1036                }
1037                Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
1038                    reply.send(
1039                        self.peer_db
1040                            .get_route(ip.into())
1041                            .map(|(_id, node)| node.clone())
1042                            .collect(),
1043                    );
1044                }
1045                Pending::Status(reply) => {
1046                    reply.send(self.status_peers_with_ids());
1047                }
1048                Pending::WhoIs(Whois { addr }, reply) => {
1049                    reply.send(self.whois_opt(addr));
1050                }
1051            }
1052        }
1053    }
1054}
1055
1056#[cfg(test)]
1057mod tka_tests {
1058    //! Tailnet-Lock (TKA) enforcement tests for the peer-trust chokepoint.
1059    //!
1060    //! These exercise [`PeerTracker::tka_admits`] and the `tka_admits ⇒ upsert` loop the netmap
1061    //! handler runs. The test [`ts_tka::Authority`] is built with [`ts_tka::Authority::from_state`]
1062    //! over a known Ed25519 trusted key, and the signed node-key signature CBOR is produced through
1063    //! `ts_tka`'s public `cbor` encoder + `aum_hash` (the exact same canonical bytes `ts_tka`'s own
1064    //! `direct_signature_verifies_end_to_end` test signs, with no new crypto vectors invented and no
1065    //! private `ts_tka` API used).
1066
1067    use ed25519_dalek::{Signer, SigningKey};
1068    use ts_control::{Node, StableNodeId, TailnetAddress};
1069    use ts_tka::{
1070        AumHash, Authority, Key, KeyKind, State,
1071        cbor::{self, Value},
1072    };
1073
1074    use super::*;
1075
1076    /// `SigKind::Direct` wire value (Go `SigKind`; `ts_tka::SigKind::Direct = 1`).
1077    const SIG_KIND_DIRECT: u64 = 1;
1078
1079    /// The 32-byte node key used across the signed-peer fixtures.
1080    const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
1081
1082    /// Build a real [`Env`] for the tracker. Only the bus/keys/shutdown plumbing matters here; the
1083    /// TKA gate reads neither, so the forwarding preferences are all benign defaults.
1084    fn test_env() -> Env {
1085        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
1086        Env::new(
1087            ts_keys::NodeState::generate(),
1088            shutdown_rx,
1089            crate::env::ForwarderConfig {
1090                accept_routes: false,
1091                accept_dns: true,
1092                exit_node: None,
1093                forward_routes: Vec::new(),
1094                forward_tcp_ports: Vec::new(),
1095                forward_udp_ports: Vec::new(),
1096                forward_all_ports: false,
1097                forward_exit_egress: false,
1098                block_incoming: false,
1099                exit_proxy: None,
1100                peerapi_port: None,
1101                taildrop_dir: None,
1102                enable_ipv6: false,
1103                persistent_keepalive_interval: None,
1104                ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1105            },
1106        )
1107    }
1108
1109    /// A minimal peer [`Node`] carrying `node_key` and the given `key_signature`.
1110    fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
1111        Node {
1112            id: 1,
1113            stable_id: StableNodeId(stable_id.to_string()),
1114            hostname: stable_id.to_string(),
1115            user_id: 0,
1116            tailnet: Some("ts.net".to_string()),
1117            tags: Vec::new(),
1118            tailnet_address: TailnetAddress {
1119                ipv4: "100.64.0.1/32".parse().unwrap(),
1120                ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
1121            },
1122            node_key: node_key.into(),
1123            node_key_expiry: None,
1124            online: None,
1125            last_seen: None,
1126            key_signature,
1127            machine_key: None,
1128            disco_key: None,
1129            accepted_routes: Vec::new(),
1130            underlay_addresses: Vec::new(),
1131            derp_region: None,
1132            cap: Default::default(),
1133            cap_map: Default::default(),
1134            peerapi_port: None,
1135            peerapi_dns_proxy: false,
1136            is_wireguard_only: false,
1137            exit_node_dns_resolvers: Vec::new(),
1138            peer_relay: false,
1139            service_vips: Default::default(),
1140        }
1141    }
1142
1143    /// Encode a `Direct` [`ts_tka::NodeKeySignature`] CBOR exactly as `ts_tka`'s private `to_cbor`
1144    /// does (int-map keys: 1=kind, 2=pubkey, 3=key_id, 4=signature; empty byte fields omitted),
1145    /// using only the crate's *public* `cbor` encoder. `signature` of `None` produces the
1146    /// signing-digest preimage (the `SigHash` form).
1147    fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
1148        let mut pairs = alloc_pairs(node_key, key_id);
1149        if let Some(sig) = signature {
1150            pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
1151        }
1152        cbor::int_map(pairs).to_vec()
1153    }
1154
1155    fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
1156        vec![
1157            (1, Some(Value::Uint(SIG_KIND_DIRECT))),
1158            (2, Some(Value::Bytes(node_key.to_vec()))),
1159            (3, Some(Value::Bytes(key_id.to_vec()))),
1160        ]
1161    }
1162
1163    /// Build a TKA [`Authority`] that trusts `signing.verifying_key()`, plus a valid `Direct`
1164    /// node-key signature CBOR authorizing [`NODE_KEY_BYTES`] under it.
1165    fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
1166        // A fixed, known Ed25519 trusted key (mirrors ts_tka's own end-to-end test seed).
1167        let signing = SigningKey::from_bytes(&[42u8; 32]);
1168        let trusted_pub = signing.verifying_key().to_bytes().to_vec();
1169
1170        let authority = Authority::from_state(
1171            AumHash([0; 32]),
1172            State {
1173                keys: vec![Key {
1174                    kind: KeyKind::Ed25519,
1175                    votes: 1,
1176                    public: trusted_pub.clone(),
1177                }],
1178            },
1179        );
1180
1181        // SigHash preimage = canonical CBOR with the signature field omitted; sign its blake2s hash.
1182        let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
1183        let sig_hash = ts_tka::aum_hash(&preimage).0;
1184        let signature = signing.sign(&sig_hash).to_bytes().to_vec();
1185
1186        let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
1187        // Sanity: the authority accepts the signature we just built (same path the gate uses).
1188        assert!(
1189            authority
1190                .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
1191                .is_ok()
1192        );
1193
1194        (authority, signed_cbor)
1195    }
1196
1197    #[tokio::test]
1198    async fn tka_inactive_upserts_all_peers() {
1199        // No authority ⇒ enforcement inactive ⇒ both a signed and an unsigned peer are admitted.
1200        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1201
1202        let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
1203        let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
1204
1205        assert!(tracker.tka_admits(&signed));
1206        assert!(tracker.tka_admits(&unsigned));
1207
1208        tracker.peer_db.upsert(&signed);
1209        tracker.peer_db.upsert(&unsigned);
1210        assert_eq!(tracker.peer_db.peers().len(), 2);
1211    }
1212
1213    #[tokio::test]
1214    async fn tka_active_rejects_unsigned_peer() {
1215        // Authority present + peer presents no signature ⇒ rejected (fail-closed), not in peer_db.
1216        let (authority, _sig) = authority_and_valid_sig();
1217        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1218
1219        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1220        assert!(!tracker.tka_admits(&unsigned));
1221
1222        // Mirror the handler's `if !tka_admits { continue }` loop.
1223        if tracker.tka_admits(&unsigned) {
1224            tracker.peer_db.upsert(&unsigned);
1225        }
1226        assert_eq!(tracker.peer_db.peers().len(), 0);
1227        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1228    }
1229
1230    #[tokio::test]
1231    async fn tka_active_rejects_bad_signature() {
1232        // Authority present + a signature that fails to verify ⇒ rejected, not in peer_db.
1233        let (authority, mut sig) = authority_and_valid_sig();
1234        // Tamper the last byte (the trailing signature byte) so verification fails.
1235        let last = sig.len() - 1;
1236        sig[last] ^= 0xff;
1237
1238        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1239        let bad = peer_node("bad", NODE_KEY_BYTES, sig);
1240        assert!(!tracker.tka_admits(&bad));
1241
1242        if tracker.tka_admits(&bad) {
1243            tracker.peer_db.upsert(&bad);
1244        }
1245        assert_eq!(tracker.peer_db.peers().len(), 0);
1246    }
1247
1248    #[tokio::test]
1249    async fn tka_active_admits_authorized_peer() {
1250        // Authority present + correctly-signed node key ⇒ admitted and upserted.
1251        let (authority, sig) = authority_and_valid_sig();
1252        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1253
1254        let good = peer_node("good", NODE_KEY_BYTES, sig);
1255        assert!(tracker.tka_admits(&good));
1256
1257        if tracker.tka_admits(&good) {
1258            tracker.peer_db.upsert(&good);
1259        }
1260        assert_eq!(tracker.peer_db.peers().len(), 1);
1261        assert!(tracker.peer_db.get(&good.node_key).is_some());
1262    }
1263
1264    // ---------------------------------------------------------------------------------------------
1265    // Tests that drive REAL `PeerUpdate`s through the shared handler body
1266    // ([`PeerTracker::apply_peer_update`], the single source of truth the actor's netmap `handle`
1267    // also calls), so the two real upsert sites (`Full` and `Delta { upsert }`) are exercised via
1268    // the actual enforcement path — not by hand-mirroring `if !tka_admits { continue }`.
1269    // ---------------------------------------------------------------------------------------------
1270
1271    #[tokio::test]
1272    async fn tka_active_delta_upsert_rejects_unauthorized() {
1273        // Drive a real `Delta { upsert }` whose peer carries no signature. The Delta upsert site
1274        // must reject it under an active authority ⇒ not present in peer_db after the handler runs.
1275        let (authority, _sig) = authority_and_valid_sig();
1276        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1277
1278        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1279        let update = ts_control::PeerUpdate::Delta {
1280            upsert: vec![unsigned.clone()],
1281            remove: Vec::new(),
1282        };
1283
1284        tracker.apply_peer_update(&update);
1285
1286        assert_eq!(tracker.peer_db.peers().len(), 0);
1287        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1288    }
1289
1290    #[tokio::test]
1291    async fn tka_active_delta_upsert_admits_authorized() {
1292        // Drive a real `Delta { upsert }` with a correctly-signed peer ⇒ present in peer_db.
1293        let (authority, sig) = authority_and_valid_sig();
1294        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1295
1296        let good = peer_node("good", NODE_KEY_BYTES, sig);
1297        let update = ts_control::PeerUpdate::Delta {
1298            upsert: vec![good.clone()],
1299            remove: Vec::new(),
1300        };
1301
1302        tracker.apply_peer_update(&update);
1303
1304        assert_eq!(tracker.peer_db.peers().len(), 1);
1305        assert!(tracker.peer_db.get(&good.node_key).is_some());
1306    }
1307
1308    #[tokio::test]
1309    async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
1310        // Drive a real `Full` carrying a MIX of authorized + unauthorized peers. Only the
1311        // correctly-signed peer survives the Full upsert site; the unsigned and bad-sig peers are
1312        // dropped fail-closed.
1313        let (authority, sig) = authority_and_valid_sig();
1314        // A bad-sig variant of the same authorized signature (tamper the trailing byte).
1315        let mut bad_sig = sig.clone();
1316        let last = bad_sig.len() - 1;
1317        bad_sig[last] ^= 0xff;
1318
1319        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1320
1321        // Only the authorized peer carries NODE_KEY_BYTES (the key the authority signed); the
1322        // rejected peers use distinct node keys so the survivor is unambiguous.
1323        let good = peer_node("good", NODE_KEY_BYTES, sig);
1324        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1325        let bad = peer_node("bad", [9u8; 32], bad_sig);
1326
1327        let update =
1328            ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1329
1330        tracker.apply_peer_update(&update);
1331
1332        assert_eq!(tracker.peer_db.peers().len(), 1);
1333        assert!(tracker.peer_db.get(&good.node_key).is_some());
1334        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1335        assert!(tracker.peer_db.get(&bad.node_key).is_none());
1336    }
1337
1338    /// End-to-end through the REAL enforcement-authority transport (the `watch` cell the control
1339    /// runner writes), not a direct field poke: writing `Some(authority)` flips enforcement on so a
1340    /// mixed batch drops the unsigned/bad peers, and a subsequent `None` (lock disabled) clears
1341    /// enforcement so a peer DROPPED while enforced is re-admitted. Exercises the exact `borrow`-based
1342    /// read path `tka_admits` uses — a broken receiver wiring would pass every for_test-field test but
1343    /// fail here.
1344    #[tokio::test]
1345    async fn tka_authority_watch_enables_then_clears_enforcement() {
1346        let (authority, sig) = authority_and_valid_sig();
1347        let mut bad_sig = sig.clone();
1348        let last = bad_sig.len() - 1;
1349        bad_sig[last] ^= 0xff;
1350
1351        let (mut tracker, tka_tx) = PeerTracker::for_test(test_env(), None);
1352
1353        // 1) No authority yet ⇒ admit-all (Go b.tka == nil).
1354        let good = peer_node("good", NODE_KEY_BYTES, sig.clone());
1355        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1356        let bad = peer_node("bad", [9u8; 32], bad_sig);
1357        let batch = ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1358        tracker.apply_peer_update(&batch);
1359        assert_eq!(tracker.peer_db.peers().len(), 3, "no lock ⇒ admit all");
1360
1361        // 2) Publish the verified authority over the watch cell (exactly what the control runner does
1362        //    on a successful sync) ⇒ enforcement ON. A re-applied Full now drops unsigned + bad.
1363        tka_tx.send_replace(Some(Arc::new(authority)));
1364        tracker.apply_peer_update(&batch);
1365        assert_eq!(
1366            tracker.peer_db.peers().len(),
1367            1,
1368            "lock active ⇒ only the signed peer survives"
1369        );
1370        assert!(tracker.peer_db.get(&good.node_key).is_some());
1371        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1372        assert!(tracker.peer_db.get(&bad.node_key).is_none());
1373
1374        // 3) Lock disabled (None) ⇒ enforcement cleared ⇒ a peer that was DROPPED while enforced is
1375        //    re-admitted by a fresh netmap. Assert the specific previously-dropped key returns (not
1376        //    merely a count), so this proves the drop→clear→re-admit transition, not "admit-all-fresh".
1377        tka_tx.send_replace(None);
1378        tracker.apply_peer_update(&batch);
1379        assert_eq!(
1380            tracker.peer_db.peers().len(),
1381            3,
1382            "lock disabled ⇒ admit all again"
1383        );
1384        assert!(
1385            tracker.peer_db.get(&unsigned.node_key).is_some(),
1386            "the peer dropped under enforcement must come back once the lock is cleared"
1387        );
1388        assert!(tracker.peer_db.get(&bad.node_key).is_some());
1389    }
1390
1391    /// Degenerate input: two DISTINCT nodes sharing one `stable_id` in a single `Full`, one with a
1392    /// valid signature and one unsigned, under an active lock. Each node is judged by its OWN verdict
1393    /// (the per-node `admits` vector), so the unsigned node is never admitted on the strength of its
1394    /// signed twin. The single-verify `Full` refactor keeps this per-node semantics (a stable_id-set
1395    /// alone would have admitted whichever node was upserted last). Malformed control input; asserted
1396    /// only to lock the verdict-per-node behavior against regression.
1397    #[tokio::test]
1398    async fn tka_full_duplicate_stable_id_judges_each_node_on_its_own_signature() {
1399        let (authority, sig) = authority_and_valid_sig();
1400        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1401
1402        // Both carry stable_id "dup"; the signed one authorizes NODE_KEY_BYTES, the other is unsigned
1403        // and uses a different node key. Order them unsigned-last so a last-writer-wins stable_id set
1404        // would (wrongly) leave the unsigned node's key in the db.
1405        let signed = peer_node("dup", NODE_KEY_BYTES, sig);
1406        let unsigned = peer_node("dup", [8u8; 32], vec![]);
1407        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
1408            signed.clone(),
1409            unsigned.clone(),
1410        ]));
1411
1412        // The unsigned node's own verdict failed, so its key must NOT be present, regardless of the
1413        // shared stable_id. (The signed twin retained the stable_id; the db holds the signed key.)
1414        assert!(
1415            tracker.peer_db.get(&unsigned.node_key).is_none(),
1416            "a node whose own signature fails must not be admitted via a stable_id twin"
1417        );
1418        assert!(tracker.peer_db.get(&signed.node_key).is_some());
1419    }
1420
1421    /// Full-path consistency under two KEPT nodes sharing a `stable_id`: `peer_db.upsert` is
1422    /// last-writer-wins on `stable_id`, so the db ends holding exactly one node for that id (the last
1423    /// kept), and `retain` never evicts that just-upserted id (`retained_ids` contains the shared id
1424    /// because at least one of its nodes was kept). No lock here, so both nodes are "kept". This pins
1425    /// the published-state invariant the whole-surface audit flagged: `retain` and the upsert loop
1426    /// agree on the surviving stable_id. Malformed control input; asserted for robustness.
1427    #[tokio::test]
1428    async fn tka_full_duplicate_stable_id_both_kept_is_consistent() {
1429        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1430        let first = peer_node("dup", [1u8; 32], vec![]);
1431        let last = peer_node("dup", [2u8; 32], vec![]);
1432        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
1433            first.clone(),
1434            last.clone(),
1435        ]));
1436
1437        // Exactly one db entry for the shared stable_id, holding the LAST node (upsert is
1438        // last-writer-wins on stable_id); the first node's key was transparently superseded.
1439        assert_eq!(
1440            tracker.peer_db.peers().len(),
1441            1,
1442            "one entry for the shared stable_id"
1443        );
1444        assert!(
1445            tracker.peer_db.get(&last.node_key).is_some(),
1446            "the db holds the last-upserted node for the shared id"
1447        );
1448        assert!(
1449            tracker.peer_db.get(&first.node_key).is_none(),
1450            "the first node's key was superseded by the last at the shared id"
1451        );
1452    }
1453
1454    /// A peer admitted in one `Full`, then in a later `Full` presenting a key that a co-resident
1455    /// peer's rotation chain has rotated away, is EVICTED — the cross-peer rotation filter applies on
1456    /// every resync, not only at first admission. Exercises the rotation filter through two
1457    /// sequential `Full` updates with real signing.
1458    #[tokio::test]
1459    async fn tka_full_rotation_obsolete_evicts_on_resync() {
1460        use ed25519_dalek::SigningKey;
1461        use ts_tka::NodeKeySignature;
1462
1463        let trusted = SigningKey::from_bytes(&[42u8; 32]);
1464        let trusted_pub = trusted.verifying_key().to_bytes().to_vec();
1465        let authority = Authority::from_state(
1466            AumHash([0; 32]),
1467            State {
1468                keys: vec![Key {
1469                    kind: KeyKind::Ed25519,
1470                    votes: 1,
1471                    public: trusted_pub.clone(),
1472                }],
1473            },
1474        );
1475        let pivot = SigningKey::from_bytes(&[9u8; 32]);
1476        let pivot_pub: [u8; 32] = pivot.verifying_key().to_bytes();
1477
1478        // First Full: the soon-to-be-stale peer presents the pivot key with a valid Direct sig.
1479        let stale_sig = NodeKeySignature::sign_direct(&pivot_pub, &trusted).serialize();
1480        let stale_peer = peer_node("stale", pivot_pub, stale_sig);
1481        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1482        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![stale_peer.clone()]));
1483        assert!(
1484            tracker.peer_db.get(&stale_peer.node_key).is_some(),
1485            "the stale peer is admitted while no rotation has superseded it yet"
1486        );
1487
1488        // Second Full: a freshly-rotated peer (whose chain rotated AWAY the pivot key) joins, and the
1489        // stale peer is re-included. The rotation filter now obsoletes the pivot key ⇒ stale evicted.
1490        let new_key = [4u8; 32];
1491        let new_sig = NodeKeySignature::sign_rotation(&new_key, &trusted, &pivot).serialize();
1492        let new_peer = peer_node("rotated", new_key, new_sig);
1493        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
1494            new_peer.clone(),
1495            stale_peer.clone(),
1496        ]));
1497        assert!(
1498            tracker.peer_db.get(&new_peer.node_key).is_some(),
1499            "the freshly-rotated peer is admitted"
1500        );
1501        assert!(
1502            tracker.peer_db.get(&stale_peer.node_key).is_none(),
1503            "the stale peer is EVICTED on the resync once a rotation supersedes its key"
1504        );
1505    }
1506
1507    /// The empty-trusted-key-state brick-guard: an authority with no keys must NOT drop the whole
1508    /// netmap (a `ts_tka` invariant violation / replayer edge). A verified chain always carries ≥1
1509    /// key, so this never weakens a genuine lock — it only prevents a black-hole. Uses ≥2 peers
1510    /// (one signed, one unsigned) to prove it admits **all**, not accidentally just one.
1511    #[tokio::test]
1512    async fn tka_empty_keyset_authority_admits_all() {
1513        use ts_tka::{AumHash, Authority, State};
1514        let empty_auth = Authority::from_state(AumHash([0u8; 32]), State { keys: Vec::new() });
1515        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(empty_auth));
1516        let signed = peer_node("signed", [7u8; 32], vec![0xde, 0xad]);
1517        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1518        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
1519            signed.clone(),
1520            unsigned.clone(),
1521        ]));
1522        assert_eq!(
1523            tracker.peer_db.peers().len(),
1524            2,
1525            "an empty-keyset authority must admit ALL peers (brick-guard), not enforce"
1526        );
1527    }
1528
1529    /// Signature-replay / `NodeKeyMismatch`: a structurally-valid signature that authorizes
1530    /// `NODE_KEY_BYTES` must NOT admit a DIFFERENT node key carrying that same signature blob. This is
1531    /// the highest-value bypass — if the sig↔node-key binding in `verify_signature` were dropped, this
1532    /// is the only test that would catch it (the other "bad" peers only flip a byte ⇒ `BadSignature`).
1533    #[tokio::test]
1534    async fn tka_active_rejects_valid_sig_for_wrong_node_key() {
1535        let (authority, sig) = authority_and_valid_sig();
1536        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1537
1538        // The signature authorizes NODE_KEY_BYTES; attach it to an imposter with a different key.
1539        let imposter = peer_node("imposter", [0x55u8; 32], sig);
1540        assert!(
1541            !tracker.tka_admits(&imposter),
1542            "a signature bound to one node key must not authorize a different node key"
1543        );
1544        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![imposter.clone()]));
1545        assert!(tracker.peer_db.get(&imposter.node_key).is_none());
1546    }
1547
1548    /// `UntrustedKey`: a signature produced by a well-formed Ed25519 key that is NOT in the
1549    /// authority's trusted-key state must be rejected — distinct from a tampered-byte `BadSignature`.
1550    #[tokio::test]
1551    async fn tka_active_rejects_sig_from_untrusted_key() {
1552        use ed25519_dalek::{Signer, SigningKey};
1553        let (authority, _sig) = authority_and_valid_sig();
1554        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1555
1556        // Sign a valid CBOR with a DIFFERENT key (not the one the authority trusts). The key_id in
1557        // the signature names this untrusted key, so `get_key` misses ⇒ UntrustedKey.
1558        let rogue = SigningKey::from_bytes(&[99u8; 32]);
1559        let rogue_pub = rogue.verifying_key().to_bytes().to_vec();
1560        let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &rogue_pub, None);
1561        let sig_hash = ts_tka::aum_hash(&preimage).0;
1562        let signature = rogue.sign(&sig_hash).to_bytes().to_vec();
1563        let rogue_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &rogue_pub, Some(&signature));
1564
1565        let peer = peer_node("rogue-signed", NODE_KEY_BYTES, rogue_cbor);
1566        assert!(
1567            !tracker.tka_admits(&peer),
1568            "a signature from a key outside the trusted set must be rejected"
1569        );
1570        // Drive the real upsert path too (match the sibling replay test's depth): an untrusted-key
1571        // signature must keep the peer out of the db, not merely fail the verdict in isolation.
1572        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1573        assert!(tracker.peer_db.get(&peer.node_key).is_none());
1574    }
1575
1576    /// Bus-enable analogue for `Delta`: enforcement engaged via the watch cell must also gate a
1577    /// `Delta { upsert }` (not only `Full`). Closes the "authority arrived over the transport AND the
1578    /// next update is a Delta" combination.
1579    #[tokio::test]
1580    async fn tka_watch_enable_enforces_delta_upsert() {
1581        let (authority, sig) = authority_and_valid_sig();
1582        let (mut tracker, tka_tx) = PeerTracker::for_test(test_env(), None);
1583        tka_tx.send_replace(Some(Arc::new(authority)));
1584
1585        let good = peer_node("good", NODE_KEY_BYTES, sig);
1586        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1587        tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1588            remove: vec![],
1589            upsert: vec![good.clone(), unsigned.clone()],
1590        });
1591        assert!(tracker.peer_db.get(&good.node_key).is_some());
1592        assert!(
1593            tracker.peer_db.get(&unsigned.node_key).is_none(),
1594            "delta upsert under an active lock must drop the unsigned peer"
1595        );
1596    }
1597
1598    /// A `Delta` re-upsert of an ALREADY-ADMITTED peer whose signature is now invalid must EVICT the
1599    /// stale entry (revocation-via-delta), not leave it admitted. Go re-filters the whole netmap each
1600    /// response, so a now-unsigned peer would not survive there either.
1601    #[tokio::test]
1602    async fn tka_delta_reupsert_with_invalid_sig_evicts_existing() {
1603        let (authority, sig) = authority_and_valid_sig();
1604        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1605
1606        // Admit the signed peer.
1607        let good = peer_node("good", NODE_KEY_BYTES, sig.clone());
1608        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1609        assert!(tracker.peer_db.get(&good.node_key).is_some());
1610
1611        // Re-upsert the SAME stable_id (now with no signature) via a delta ⇒ evicted, not retained.
1612        let revoked = peer_node("good", NODE_KEY_BYTES, vec![]);
1613        tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1614            remove: vec![],
1615            upsert: vec![revoked],
1616        });
1617        assert!(
1618            tracker.peer_db.get(&good.node_key).is_none(),
1619            "a delta re-upsert that fails the lock must evict the previously-admitted peer"
1620        );
1621    }
1622
1623    #[tokio::test]
1624    async fn tka_full_resync_revocation_behavior() {
1625        // Revocation-on-resync: admit a peer, then re-include the SAME stable_id in a `Full` with a
1626        // now-invalid signature. Per the Logic review finding, the pre-fix `retain` kept the stale
1627        // (previously-admitted) entry because membership was decided purely by stable_id.
1628        //
1629        // FIXED (not merely documented): the `Full` `retain` now keys on `tka_admits`-passing
1630        // stable_ids, so a peer whose re-included signature no longer verifies under the active
1631        // authority is EVICTED. This test asserts eviction. The inactive (authority=None) path is
1632        // provably unchanged — `tka_admits` always returns `true` there, so the retained set equals
1633        // the set of re-included stable_ids exactly (see `tka_inactive_full_resync_keeps_*`).
1634        let (authority, sig) = authority_and_valid_sig();
1635        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1636
1637        // 1) Admit the peer with a valid signature via a real `Full`.
1638        let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
1639        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1640        assert_eq!(tracker.peer_db.peers().len(), 1);
1641        assert!(tracker.peer_db.get(&good.node_key).is_some());
1642
1643        // 2) Re-sync the SAME stable_id, but with a now-invalid signature (tamper trailing byte).
1644        let mut bad_sig = sig;
1645        let last = bad_sig.len() - 1;
1646        bad_sig[last] ^= 0xff;
1647        let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
1648        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
1649
1650        // Eviction: the stale entry is dropped because its re-included signature fails the gate.
1651        assert_eq!(tracker.peer_db.peers().len(), 0);
1652        assert!(tracker.peer_db.get(&revoked.node_key).is_none());
1653    }
1654
1655    #[tokio::test]
1656    async fn tka_inactive_full_resync_keeps_reincluded_peer() {
1657        // Guard the inactive (authority=None) path against the revocation fix: with no authority,
1658        // a peer re-included in a `Full` survives regardless of its signature bytes — byte-for-byte
1659        // pre-TKA behavior, proving the `Full` `retain` change does not regress the always-taken
1660        // branch this wave.
1661        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1662
1663        let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
1664        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1665        assert_eq!(tracker.peer_db.peers().len(), 1);
1666
1667        // Re-sync the same stable_id with garbage signature bytes; inactive enforcement keeps it.
1668        let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
1669        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
1670        assert_eq!(tracker.peer_db.peers().len(), 1);
1671        assert!(tracker.peer_db.get(&resynced.node_key).is_some());
1672    }
1673
1674    /// A `Patch` for a peer already in the netmap merges only the fields it carries — here new UDP
1675    /// endpoints and a new home DERP — leaving the rest of the node intact. This is the fix for
1676    /// dropped `peers_changed_patch`: without it the netmap keeps stale endpoints and the peer can
1677    /// never re-handshake after it moves.
1678    #[tokio::test]
1679    async fn patch_merges_endpoints_and_derp_into_existing_peer() {
1680        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1681
1682        // Seed a peer (id == 1, per `peer_node`) with no endpoints / no DERP.
1683        let peer = peer_node("mover", [1u8; 32], vec![]);
1684        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1685        let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1686        assert!(before.underlay_addresses.is_empty());
1687        assert!(before.derp_region.is_none());
1688
1689        // Patch in fresh reachability (the idle-peer-reconnect case).
1690        let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1691        let patch = ts_control::PeerChange {
1692            id: 1,
1693            derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
1694            cap: None,
1695            cap_map: None,
1696            underlay_addresses: Some(vec![new_ep]),
1697            node_key: None,
1698            key_signature: None,
1699            disco_key: None,
1700            node_key_expiry: None,
1701            online: None,
1702            last_seen: None,
1703        };
1704        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1705
1706        assert_eq!(upserts.len(), 1);
1707        assert_eq!(deletions.len(), 0);
1708        // Same peer, now carrying the patched endpoint + DERP; node key untouched.
1709        assert_eq!(tracker.peer_db.peers().len(), 1);
1710        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1711        assert_eq!(after.underlay_addresses, vec![new_ep]);
1712        assert_eq!(
1713            after.derp_region,
1714            Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
1715        );
1716        assert_eq!(after.node_key, peer.node_key);
1717    }
1718
1719    /// Regression for `tsr-5u0`: when a whole-node set (`Delta`/`Full`) and a patch co-occur in one
1720    /// response, the patch is applied *on top of* the node the set just upserted — mirroring the
1721    /// handler's apply-order (peer set first, then `peer_patches`). Before the fix the patch shared
1722    /// the single `peer_update` slot and the co-occurring set silently dropped it, so a peer brought
1723    /// in by the delta kept stale (empty) reachability.
1724    #[tokio::test]
1725    async fn patch_applies_on_top_of_co_occurring_delta() {
1726        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1727
1728        // The whole-node delta upserts a brand-new peer (id == 1) with no reachability.
1729        let peer = peer_node("mover", [1u8; 32], vec![]);
1730        let (set_upserts, _) = tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1731            upsert: vec![peer.clone()],
1732            remove: vec![],
1733        });
1734        assert_eq!(set_upserts.len(), 1, "delta upserts the new peer");
1735
1736        // The patch from the SAME response then sets that peer's endpoints + DERP. This is exactly
1737        // the consumer order the handler runs (apply_peer_update then apply_peer_patches).
1738        let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1739        let patch = ts_control::PeerChange {
1740            id: 1,
1741            derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap())),
1742            cap: None,
1743            cap_map: None,
1744            underlay_addresses: Some(vec![new_ep]),
1745            node_key: None,
1746            key_signature: None,
1747            disco_key: None,
1748            node_key_expiry: None,
1749            online: None,
1750            last_seen: None,
1751        };
1752        let (patch_upserts, patch_deletions) =
1753            tracker.apply_peer_patches(std::slice::from_ref(&patch));
1754
1755        assert_eq!(
1756            patch_upserts.len(),
1757            1,
1758            "patch re-upserts the just-added peer"
1759        );
1760        assert_eq!(patch_deletions.len(), 0);
1761        // The peer added by the delta now carries the patched reachability — the patch was NOT lost.
1762        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1763        assert_eq!(after.underlay_addresses, vec![new_ep]);
1764        assert_eq!(
1765            after.derp_region,
1766            Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap()))
1767        );
1768    }
1769
1770    /// A `Patch` whose node id is not in the current netmap is ignored (the wire contract: a patch
1771    /// never creates a node). No upsert, no deletion, peer set unchanged.
1772    #[tokio::test]
1773    async fn patch_for_unknown_node_is_ignored() {
1774        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1775        let known = peer_node("known", [1u8; 32], vec![]); // id == 1
1776        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1777
1778        let patch = ts_control::PeerChange {
1779            id: 999, // not in the netmap
1780            derp_region: None,
1781            cap: None,
1782            cap_map: None,
1783            underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1784            node_key: None,
1785            key_signature: None,
1786            disco_key: None,
1787            node_key_expiry: None,
1788            online: None,
1789            last_seen: None,
1790        };
1791        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1792
1793        assert_eq!(upserts.len(), 0);
1794        assert_eq!(deletions.len(), 0);
1795        assert_eq!(tracker.peer_db.peers().len(), 1);
1796        assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1797    }
1798
1799    /// An expiry-only `Patch` updates `node_key_expiry` on the matching peer (Go
1800    /// `PeerChange.KeyExpiry`), rather than being silently dropped until the next full resync.
1801    #[tokio::test]
1802    async fn patch_updates_node_key_expiry() {
1803        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1804        let peer = peer_node("expiring", [1u8; 32], vec![]); // id == 1, node_key_expiry: None
1805        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1806
1807        let expiry = "2027-01-01T00:00:00Z"
1808            .parse::<chrono::DateTime<chrono::Utc>>()
1809            .unwrap();
1810        let patch = ts_control::PeerChange {
1811            id: 1,
1812            derp_region: None,
1813            cap: None,
1814            cap_map: None,
1815            underlay_addresses: None,
1816            node_key: None,
1817            key_signature: None,
1818            disco_key: None,
1819            node_key_expiry: Some(expiry),
1820            online: None,
1821            last_seen: None,
1822        };
1823        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1824
1825        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1826        assert_eq!(after.node_key_expiry, Some(expiry));
1827    }
1828
1829    /// Channel B: a `PeerChange.online` patch flips a peer's online state without a full node.
1830    #[tokio::test]
1831    async fn patch_updates_online() {
1832        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1833        let peer = peer_node("p", [1u8; 32], vec![]); // id == 1, online: None
1834        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1835        assert_eq!(
1836            tracker
1837                .peer_db
1838                .get(&(1 as ts_control::NodeId))
1839                .unwrap()
1840                .1
1841                .online,
1842            None
1843        );
1844
1845        let mut patch = ts_control::PeerChange {
1846            id: 1,
1847            derp_region: None,
1848            cap: None,
1849            cap_map: None,
1850            underlay_addresses: None,
1851            node_key: None,
1852            key_signature: None,
1853            disco_key: None,
1854            node_key_expiry: None,
1855            online: Some(true),
1856            last_seen: None,
1857        };
1858        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1859        assert_eq!(
1860            tracker
1861                .peer_db
1862                .get(&(1 as ts_control::NodeId))
1863                .unwrap()
1864                .1
1865                .online,
1866            Some(true),
1867            "PeerChange.online=Some(true) marks the peer online"
1868        );
1869
1870        // A subsequent patch flips it offline.
1871        patch.online = Some(false);
1872        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1873        assert_eq!(
1874            tracker
1875                .peer_db
1876                .get(&(1 as ts_control::NodeId))
1877                .unwrap()
1878                .1
1879                .online,
1880            Some(false)
1881        );
1882    }
1883
1884    /// Channel C/D (Go `map.go:updatePeersStateFromResponse`): `online_change` is the sole driver of
1885    /// `online`; `peer_seen_change` is the sole driver of `last_seen` (true ⇒ now, false ⇒ cleared)
1886    /// and must NEVER touch `online`. Both apply to a peer already in the netmap and ignore unknown
1887    /// ids. This pins the fix for the prior bug where channel D wrote `online=false` (conflating
1888    /// "not seen recently" with "offline" — distinct signals in Go).
1889    #[tokio::test]
1890    async fn liveness_change_maps_apply_online() {
1891        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1892        let peer = peer_node("p", [1u8; 32], vec![]); // id == 1
1893        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1894        // A fixed timestamp (chrono is built without its `clock` feature, so no `Utc::now()`).
1895        let now = chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap();
1896
1897        // Channel C: online_change sets online=true.
1898        let mut online_change = std::collections::BTreeMap::new();
1899        online_change.insert(1 as ts_control::NodeId, true);
1900        online_change.insert(999 as ts_control::NodeId, true); // unknown id — ignored
1901        let changed = tracker.apply_liveness_changes(&online_change, &Default::default(), now);
1902        assert!(changed);
1903        assert_eq!(
1904            tracker
1905                .peer_db
1906                .get(&(1 as ts_control::NodeId))
1907                .unwrap()
1908                .1
1909                .online,
1910            Some(true)
1911        );
1912
1913        // Channel D: peer_seen_change=true sets last_seen=now and leaves online UNTOUCHED.
1914        let mut seen_true = std::collections::BTreeMap::new();
1915        seen_true.insert(1 as ts_control::NodeId, true);
1916        let changed = tracker.apply_liveness_changes(&Default::default(), &seen_true, now);
1917        assert!(changed);
1918        {
1919            let (_id, node) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1920            assert_eq!(
1921                node.last_seen,
1922                Some(now),
1923                "peer_seen_change=true sets last_seen=now"
1924            );
1925            assert_eq!(
1926                node.online,
1927                Some(true),
1928                "channel D must NOT touch online (still true from channel C)"
1929            );
1930        }
1931
1932        // Channel D: peer_seen_change=false clears last_seen, still leaving online untouched.
1933        let mut seen_false = std::collections::BTreeMap::new();
1934        seen_false.insert(1 as ts_control::NodeId, false);
1935        let changed = tracker.apply_liveness_changes(&Default::default(), &seen_false, now);
1936        assert!(changed);
1937        {
1938            let (_id, node) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1939            assert_eq!(
1940                node.last_seen, None,
1941                "peer_seen_change=false clears last_seen"
1942            );
1943            assert_eq!(node.online, Some(true), "channel D must NOT mark offline");
1944        }
1945        assert_eq!(
1946            tracker.peer_db.peers().len(),
1947            1,
1948            "the node is retained, not removed"
1949        );
1950
1951        // No-op when nothing matches / changes.
1952        assert!(!tracker.apply_liveness_changes(&Default::default(), &Default::default(), now));
1953    }
1954
1955    /// Security: a `Patch` that rotates the node key must re-satisfy the tailnet-lock authority,
1956    /// exactly like a `Delta` upsert. A key-rotation patch whose new signature does NOT verify
1957    /// evicts the peer (fail-closed) rather than leaving a now-unverified entry — closing what would
1958    /// otherwise be a trust-enforcement bypass via the patch path.
1959    #[tokio::test]
1960    async fn patch_key_rotation_failing_tka_evicts_peer() {
1961        let (authority, sig) = authority_and_valid_sig();
1962        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1963
1964        // Admit a correctly-signed peer (id == 1).
1965        let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1966        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1967        assert_eq!(tracker.peer_db.peers().len(), 1);
1968
1969        // Patch a new node key whose signature is garbage under the active authority.
1970        let patch = ts_control::PeerChange {
1971            id: 1,
1972            derp_region: None,
1973            cap: None,
1974            cap_map: None,
1975            underlay_addresses: None,
1976            node_key: Some([0x33u8; 32].into()),
1977            key_signature: Some(vec![0x00, 0x01, 0x02]),
1978            disco_key: None,
1979            node_key_expiry: None,
1980            online: None,
1981            last_seen: None,
1982        };
1983        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1984
1985        assert_eq!(upserts.len(), 0);
1986        assert_eq!(deletions.len(), 1);
1987        assert_eq!(tracker.peer_db.peers().len(), 0);
1988    }
1989
1990    /// A node's `user_id` joins against the accumulated UserProfiles table to resolve the owning
1991    /// user's login name in `WhoIs.user`. With no matching profile, `user` is `None` (the
1992    /// pre-existing behavior); once a profile arrives, the same node resolves to its login. This
1993    /// proves the accumulate-then-join path the netmap handler builds.
1994    fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1995        ts_control::UserProfile {
1996            id,
1997            login_name: login.to_string(),
1998            display_name: None,
1999        }
2000    }
2001
2002    #[tokio::test]
2003    async fn whois_resolves_user_from_accumulated_profiles() {
2004        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
2005
2006        // A peer owned by user id 42 at 100.64.0.1 (the peer_node fixture's address).
2007        let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
2008        peer.user_id = 42;
2009        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
2010        let addr = "100.64.0.1:0".parse().unwrap();
2011
2012        // No profile yet: the node resolves but its owner is unknown.
2013        let who = tracker.whois_opt(addr).expect("peer is known");
2014        assert_eq!(who.user, None);
2015
2016        // Profile for a DIFFERENT user must not match.
2017        tracker
2018            .user_profiles
2019            .insert(7, profile(7, "someone-else@example.com"));
2020        assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
2021
2022        // The owning user's profile arrives (as the netmap handler would accumulate it): now the
2023        // login resolves.
2024        tracker
2025            .user_profiles
2026            .insert(42, profile(42, "alice@example.com"));
2027        assert_eq!(
2028            tracker.whois_opt(addr).unwrap().user,
2029            Some("alice@example.com".to_string())
2030        );
2031    }
2032
2033    /// `UserProfile::best_label` prefers the login name, falling back to display name, else `None`.
2034    #[test]
2035    fn user_profile_best_label_prefers_login() {
2036        assert_eq!(
2037            profile(1, "alice@example.com").best_label(),
2038            Some("alice@example.com".to_string())
2039        );
2040        let display_only = ts_control::UserProfile {
2041            id: 2,
2042            login_name: String::new(),
2043            display_name: Some("Bob".to_string()),
2044        };
2045        assert_eq!(display_only.best_label(), Some("Bob".to_string()));
2046        let empty = ts_control::UserProfile {
2047            id: 3,
2048            login_name: String::new(),
2049            display_name: None,
2050        };
2051        assert_eq!(empty.best_label(), None);
2052    }
2053
2054    // ----- tsr-jo1: RotationTracker (Go ipnlocal.rotationTracker.obsoleteKeys) -----
2055
2056    /// A `RotationDetails` for a `Direct`-rooted chain with the given prior keys + wrapping key.
2057    fn rot_details(
2058        prev: &[&[u8]],
2059        wrapping: &[u8],
2060        kind: ts_tka::SigKind,
2061    ) -> ts_tka::RotationDetails {
2062        ts_tka::RotationDetails {
2063            prev_node_keys: prev.iter().map(|p| p.to_vec()).collect(),
2064            initial_sig_kind: kind,
2065            initial_wrapping_pubkey: wrapping.to_vec(),
2066        }
2067    }
2068
2069    /// Rule 1: every prior node key named by any rotation chain is obsolete, regardless of the
2070    /// chain's root kind (Go's ungated `obsolete.AddSlice(d.PrevNodeKeys)`).
2071    #[test]
2072    fn rotation_tracker_prev_keys_always_obsolete() {
2073        let mut t = RotationTracker::default();
2074        // A Direct-rooted chain that rotated away OLD1, and a Credential-rooted one that rotated OLD2.
2075        t.add(
2076            b"newA".to_vec(),
2077            &rot_details(&[b"OLD1"], b"wrapA", ts_tka::SigKind::Direct),
2078        );
2079        t.add(
2080            b"newB".to_vec(),
2081            &rot_details(&[b"OLD2"], b"wrapB", ts_tka::SigKind::Credential),
2082        );
2083        let obsolete = t.obsolete_keys();
2084        assert!(
2085            obsolete.contains(b"OLD1".as_slice()),
2086            "Direct chain's prior key obsolete"
2087        );
2088        assert!(
2089            obsolete.contains(b"OLD2".as_slice()),
2090            "Credential chain's prior key obsolete too (rule 1 is ungated)"
2091        );
2092        // The current keys themselves are not obsolete (only one peer per wrapping key here).
2093        assert!(!obsolete.contains(b"newA".as_slice()));
2094        assert!(!obsolete.contains(b"newB".as_slice()));
2095    }
2096
2097    /// Rule 2: among `Direct`-rooted chains sharing a wrapping key, only the longest survives; the
2098    /// shorter (older) clone's key is obsolete.
2099    #[test]
2100    fn rotation_tracker_unequal_chain_keeps_longest() {
2101        let mut t = RotationTracker::default();
2102        // Same wrapping key; "long" has 2 prior keys, "short" has 1 ⇒ "short" is the older clone.
2103        t.add(
2104            b"long".to_vec(),
2105            &rot_details(&[b"p1", b"p2"], b"wrap", ts_tka::SigKind::Direct),
2106        );
2107        t.add(
2108            b"short".to_vec(),
2109            &rot_details(&[b"q1"], b"wrap", ts_tka::SigKind::Direct),
2110        );
2111        let obsolete = t.obsolete_keys();
2112        assert!(
2113            obsolete.contains(b"short".as_slice()),
2114            "the shorter-chain clone is obsolete"
2115        );
2116        assert!(
2117            !obsolete.contains(b"long".as_slice()),
2118            "the longest-chain peer survives"
2119        );
2120    }
2121
2122    /// Rule 2 tie: two `Direct`-rooted chains sharing a wrapping key with EQUAL chain length cannot
2123    /// be disambiguated ⇒ BOTH are dropped (Go's safety branch).
2124    #[test]
2125    fn rotation_tracker_equal_chain_drops_both() {
2126        let mut t = RotationTracker::default();
2127        t.add(
2128            b"cloneA".to_vec(),
2129            &rot_details(&[b"p1"], b"wrap", ts_tka::SigKind::Direct),
2130        );
2131        t.add(
2132            b"cloneB".to_vec(),
2133            &rot_details(&[b"p2"], b"wrap", ts_tka::SigKind::Direct),
2134        );
2135        let obsolete = t.obsolete_keys();
2136        assert!(
2137            obsolete.contains(b"cloneA".as_slice()),
2138            "tied clone A dropped"
2139        );
2140        assert!(
2141            obsolete.contains(b"cloneB".as_slice()),
2142            "tied clone B dropped"
2143        );
2144    }
2145
2146    /// `Credential`-rooted chains sharing a wrapping key are EXEMPT from rule 2 (reusable-authkey
2147    /// carve-out): both are kept even with equal chain length.
2148    #[test]
2149    fn rotation_tracker_credential_root_clones_both_kept() {
2150        let mut t = RotationTracker::default();
2151        t.add(
2152            b"credA".to_vec(),
2153            &rot_details(&[b"p1"], b"wrap", ts_tka::SigKind::Credential),
2154        );
2155        t.add(
2156            b"credB".to_vec(),
2157            &rot_details(&[b"p2"], b"wrap", ts_tka::SigKind::Credential),
2158        );
2159        let obsolete = t.obsolete_keys();
2160        assert!(
2161            !obsolete.contains(b"credA".as_slice()),
2162            "credential-rooted clone A kept"
2163        );
2164        assert!(
2165            !obsolete.contains(b"credB".as_slice()),
2166            "credential-rooted clone B kept"
2167        );
2168    }
2169
2170    /// A peer that another chain already rotated away does not also act as a surviving clone: it is
2171    /// removed from its wrapping-key group before the longest-survivor pick (Go's `DeleteFunc`).
2172    #[test]
2173    fn rotation_tracker_already_obsolete_peer_not_a_survivor() {
2174        let mut t = RotationTracker::default();
2175        // "victim" is rotated away by "rotator" (different wrapping key), AND shares wrapping key
2176        // "w" with "other". Because "victim" is already obsolete, only "other" is in play for "w" and
2177        // survives (no spurious tie-drop of "other").
2178        t.add(
2179            b"rotator".to_vec(),
2180            &rot_details(&[b"victim"], b"wRot", ts_tka::SigKind::Direct),
2181        );
2182        t.add(
2183            b"victim".to_vec(),
2184            &rot_details(&[b"x"], b"w", ts_tka::SigKind::Direct),
2185        );
2186        t.add(
2187            b"other".to_vec(),
2188            &rot_details(&[b"y"], b"w", ts_tka::SigKind::Direct),
2189        );
2190        let obsolete = t.obsolete_keys();
2191        assert!(
2192            obsolete.contains(b"victim".as_slice()),
2193            "victim rotated away by rotator"
2194        );
2195        assert!(
2196            !obsolete.contains(b"other".as_slice()),
2197            "other survives — victim was removed from the group before the tie check"
2198        );
2199    }
2200
2201    /// Empty tracker (no rotation-signed peers) ⇒ no obsolete keys (the non-rotation netmap path).
2202    #[test]
2203    fn rotation_tracker_empty_is_noop() {
2204        let t = RotationTracker::default();
2205        assert!(t.obsolete_keys().is_empty());
2206    }
2207
2208    /// End-to-end through the real `Full` path: a peer presenting a freshly-rotated key (a Rotation
2209    /// chain) is admitted, while a second peer still presenting the rotated-AWAY pivot key — even with
2210    /// that key's own still-valid Direct signature — is DROPPED by the cross-peer rotation filter.
2211    /// This is the gap closed here: Go `tkaFilterNetmapLocked` drops the stale clone; we used to admit
2212    /// it. Uses real `ts_tka` signing (`sign_direct` + `sign_rotation`) so the whole
2213    /// verify → details → filter pipeline runs.
2214    ///
2215    /// Construction: the trusted key signs an inner `Direct` over the PIVOT keypair's public key; the
2216    /// pivot key then signs an outer `Rotation` authorizing `new_key`. That chain's `prev_node_keys`
2217    /// names the pivot pubkey — so a peer presenting the pivot pubkey as its node key is the
2218    /// rotated-away key the filter must drop.
2219    #[tokio::test]
2220    async fn tka_full_drops_rotated_away_key_e2e() {
2221        use ed25519_dalek::SigningKey;
2222        use ts_tka::NodeKeySignature;
2223
2224        let trusted = SigningKey::from_bytes(&[42u8; 32]);
2225        let trusted_pub = trusted.verifying_key().to_bytes().to_vec();
2226        let authority = Authority::from_state(
2227            AumHash([0; 32]),
2228            State {
2229                keys: vec![Key {
2230                    kind: KeyKind::Ed25519,
2231                    votes: 1,
2232                    public: trusted_pub.clone(),
2233                }],
2234            },
2235        );
2236
2237        // The rotation pivot: a keypair whose public key the inner Direct authorizes and whose
2238        // private key signs the outer rotation wrap. This pivot pubkey IS the key being rotated away.
2239        let pivot = SigningKey::from_bytes(&[9u8; 32]);
2240        let pivot_pub: [u8; 32] = pivot.verifying_key().to_bytes();
2241
2242        let new_key = [4u8; 32]; // the freshly-rotated node key
2243
2244        // Fresh peer: a Rotation chain authorizing `new_key`, inner Direct over the pivot signed by
2245        // trusted, outer wrap signed by the pivot. Its prev_node_keys names `pivot_pub`.
2246        let new_sig = NodeKeySignature::sign_rotation(&new_key, &trusted, &pivot).serialize();
2247        let new_peer = peer_node("rotated", new_key, new_sig);
2248
2249        // Stale peer: still presents the pivot pubkey (the rotated-away key) with its own valid
2250        // Direct signature — valid in isolation, but obsoleted by the fresh peer's rotation chain.
2251        let stale_sig = NodeKeySignature::sign_direct(&pivot_pub, &trusted).serialize();
2252        let stale_peer = peer_node("stale", pivot_pub, stale_sig);
2253
2254        let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
2255        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
2256            new_peer.clone(),
2257            stale_peer.clone(),
2258        ]));
2259
2260        assert!(
2261            tracker.peer_db.get(&new_peer.node_key).is_some(),
2262            "the freshly-rotated peer is admitted"
2263        );
2264        assert!(
2265            tracker.peer_db.get(&stale_peer.node_key).is_none(),
2266            "the peer presenting the rotated-away key is dropped (Go tkaFilterNetmapLocked)"
2267        );
2268    }
2269}