Skip to main content

ts_runtime/peer_tracker/
mod.rs

1//! Peer delta update tracking.
2
3use std::{
4    collections::{HashMap, HashSet},
5    net::IpAddr,
6    sync::Arc,
7};
8
9use kameo::{
10    actor::ActorRef,
11    message::{Context, Message},
12    reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24/// Actor that tracks peer delta updates and emits new states.
25pub struct PeerTracker {
26    peer_db: PeerDb,
27    seen_state_update: bool,
28    pending_requests: Vec<Pending>,
29    /// Latest peer snapshot, published on every netmap update so embedders can watch for peer
30    /// changes ([`WatchNetmap`]).
31    peer_watch: watch::Sender<Vec<StatusNode>>,
32    /// Accumulated netmap user profiles (`MapResponse.UserProfiles`), keyed by user id, joined
33    /// against a node's [`Node::user_id`](ts_control::Node::user_id) to resolve the owning user's
34    /// login/display name for a [`WhoIs`](crate::status::WhoIs). Control sends these incrementally
35    /// (only new/changed profiles per response), so this map **accumulates** across updates rather
36    /// than being replaced — a peer upserted in one response may reference a profile delivered in an
37    /// earlier one.
38    user_profiles: HashMap<UserId, UserProfile>,
39    /// Tailnet-Lock (TKA) authority used to verify each peer's `key_signature` at the peer-trust
40    /// chokepoint. When `Some`, enforcement is **active**: every upserted peer must present a
41    /// signature this authority authorizes, or it is rejected (fail-closed). When `None` (always,
42    /// this wave) enforcement is **inactive** and every peer is upserted — identical to pre-TKA
43    /// behavior. There is no live `Authority` source yet: building one requires the
44    /// `/machine/tka/sync` Noise RPC + AUM-chain replayer (deferred, see SECURITY.md). The
45    /// enforcement path below is wired and unit-tested, and flips on the instant an authority is
46    /// supplied; it is explicitly gated, not a silent no-op.
47    tka_authority: Option<ts_tka::Authority>,
48    /// Tailnet-Lock authority used **observe-only** (verify-and-LOG, issue #136): the live
49    /// `Authority` synced from control (delivered over the bus via [`TkaAuthorityUpdate`]). Distinct
50    /// from [`tka_authority`](Self::tka_authority) on purpose — populating *that* would flip the
51    /// runtime to fail-closed enforcement, whereas this field only feeds
52    /// [`tka_observe_log`](Self::tka_observe_log), which logs each peer's signature verdict and
53    /// **never** drops a peer. The current posture (per SECURITY.md / PARITY_ROADMAP): verify-and-log
54    /// while the `ts_tka` crypto is unaudited and control is treated as trusted; flipping to enforce
55    /// is a separate, gated decision.
56    tka_observe: Option<ts_tka::Authority>,
57    env: Env,
58}
59
60impl PeerTracker {
61    fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
62        // Canonicalization (case + trailing dot) is handled inside the name index lookup.
63        self.peer_db.get(&name).map(|(_id, node)| node)
64    }
65
66    fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
67        self.peer_db.get(&ip).map(|(_id, node)| node)
68    }
69
70    /// Build the peer entries for a [`Status`](crate::Status) snapshot from the current peer db.
71    ///
72    /// Connectivity fields (`cur_addr`/`relay`) are left at their `from_node` defaults (`None`) here:
73    /// this is the live-watch/hot path and must stay magicsock-free and synchronous. The explicit
74    /// [`GetStatus`] snapshot enriches them ([`status_peers_with_ids`](Self::status_peers_with_ids)).
75    fn status_peers(&self) -> Vec<StatusNode> {
76        self.peer_db
77            .peers()
78            .values()
79            .map(StatusNode::from_node)
80            .collect()
81    }
82
83    /// Like [`status_peers`](Self::status_peers) but pairs each entry with its [`PeerId`], so the
84    /// caller can join per-peer connectivity (the direct manager's `best_addrs`, keyed by `PeerId`)
85    /// onto the `StatusNode` before returning it. Order is unspecified (a `HashMap` walk).
86    fn status_peers_with_ids(&self) -> Vec<(PeerId, StatusNode)> {
87        self.peer_db
88            .peers()
89            .iter()
90            .map(|(id, node)| (*id, StatusNode::from_node(node)))
91            .collect()
92    }
93
94    fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
95        let ip = crate::status::whois_addr(addr);
96        let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
97        // Join the node's owning user id against the accumulated UserProfiles table to resolve a
98        // login/display name. `None` when control sent no profile for that user (e.g. tagged nodes
99        // with no human owner, or a profile not yet delivered).
100        let user = self.resolve_user(node.user_id);
101        Some(crate::status::WhoIs::from_node_with_user(node, user))
102    }
103
104    /// Resolve a user id to its best display label from the accumulated profile table.
105    fn resolve_user(&self, user_id: UserId) -> Option<String> {
106        self.user_profiles
107            .get(&user_id)
108            .and_then(UserProfile::best_label)
109    }
110
111    /// Whether `node` may be admitted to the peer db under the current Tailnet-Lock posture.
112    ///
113    /// Fail-closed and gated:
114    /// - No [`tka_authority`](Self::tka_authority) ⇒ enforcement inactive ⇒ always admit (today's
115    ///   behavior; this is the always-taken branch this wave).
116    /// - Authority present + peer carries a `key_signature` that the authority authorizes for the
117    ///   peer's node key ⇒ admit.
118    /// - Authority present + signature missing or unauthorized/invalid ⇒ **reject** (Go denies
119    ///   network access to unsigned peers under tailnet lock; we do not upsert them).
120    fn tka_admits(&self, node: &Node) -> bool {
121        let Some(auth) = &self.tka_authority else {
122            return true;
123        };
124
125        if node.key_signature.is_empty() {
126            // TKA active but peer presented no signature: reject (Go denies network access to
127            // unsigned peers under tailnet lock, unless UnsignedPeerAPIOnly — out of scope here).
128            tracing::warn!(
129                stable_id = ?node.stable_id,
130                "TKA: rejecting unsigned peer under tailnet lock"
131            );
132            return false;
133        }
134
135        if let Err(e) = auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
136            tracing::warn!(
137                stable_id = ?node.stable_id,
138                error = %e,
139                "TKA: rejecting peer with unauthorized node key"
140            );
141            return false;
142        }
143
144        true
145    }
146
147    /// Verify `node`'s Tailnet-Lock signature against the **observe-only** authority and LOG the
148    /// verdict — issue #136. This is the verify-and-log seam: it returns `()` (NOT a bool), so it is
149    /// structurally impossible to wire as an admission gate, and it is called *adjacent* to each
150    /// upsert site without affecting whether the peer is admitted. Every peer is upserted exactly as
151    /// it would be with this call absent.
152    ///
153    /// A no-op when no observe authority has been synced yet. Logs `verified` / `failed` / `unsigned`
154    /// with the peer's `stable_id` and, on failure, the `TkaError` Display (static descriptors —
155    /// "bad sig len" etc.). NEVER logs the node-key or signature bytes.
156    fn tka_observe_log(&self, node: &Node) {
157        let Some(auth) = &self.tka_observe else {
158            return;
159        };
160        if node.key_signature.is_empty() {
161            tracing::info!(
162                stable_id = ?node.stable_id,
163                tka_verdict = "unsigned",
164                "TKA observe: peer presented no key-signature (advisory, NOT enforced)"
165            );
166            return;
167        }
168        match auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
169            Ok(()) => tracing::info!(
170                stable_id = ?node.stable_id,
171                tka_verdict = "verified",
172                "TKA observe: peer node-key authorized (advisory, NOT enforced)"
173            ),
174            Err(e) => tracing::warn!(
175                stable_id = ?node.stable_id,
176                tka_verdict = "failed",
177                reason = %e,
178                "TKA observe: peer key-signature did not verify (advisory, NOT enforced)"
179            ),
180        }
181    }
182}
183
184impl kameo::Actor for PeerTracker {
185    type Args = Env;
186    type Error = Error;
187
188    async fn on_start(env: Self::Args, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
189        env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
190        // Observe-only TKA (#136): the control runner publishes the verified `Authority` here after a
191        // successful `/machine/tka/sync`; we use it to verify-and-LOG each peer's signature, never to
192        // enforce. The bus has no replay, so the control runner re-publishes on every sync.
193        env.subscribe::<TkaAuthorityUpdate>(&slf).await?;
194
195        let (peer_watch, _) = watch::channel(Vec::new());
196
197        Ok(Self {
198            peer_db: PeerDb::default(),
199            pending_requests: Default::default(),
200            seen_state_update: false,
201            peer_watch,
202            user_profiles: HashMap::new(),
203            // No live TKA *enforcement* authority this wave (fail-closed path stays gated off; see
204            // `tka_authority`). The observe-only authority (`tka_observe`) is supplied over the bus.
205            tka_authority: None,
206            tka_observe: None,
207            env,
208        })
209    }
210}
211
212enum Pending {
213    PeerByName(PeerByName, ReplySender<Option<Node>>),
214    AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
215    TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
216    Status(ReplySender<Vec<(PeerId, StatusNode)>>),
217    WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
218}
219
220// For messages with arguments, a struct is generated with the args as fields. They aren't
221// documented, and we can't apply attributes directly to the fields. Hence, wrap in a module where
222// docs are turned off everywhere.
223#[allow(missing_docs)]
224mod msg_impl {
225    use std::net::IpAddr;
226
227    use kameo::prelude::DelegatedReply;
228
229    use super::*;
230
231    #[kameo::messages]
232    impl PeerTracker {
233        /// Lookup a peer by name.
234        ///
235        /// Waits until we've received at least one peer update from control.
236        #[message(ctx)]
237        pub async fn peer_by_name(
238            &mut self,
239            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
240            name: String,
241        ) -> DelegatedReply<Option<Node>> {
242            let (deleg, sender) = ctx.reply_sender();
243            let Some(sender) = sender else { return deleg };
244
245            if !self.seen_state_update {
246                tracing::debug!(query = name, "no peer state seen yet, queueing request");
247
248                self.pending_requests
249                    .push(Pending::PeerByName(PeerByName { name }, sender));
250
251                return deleg;
252            }
253
254            sender.send(self.peer_by_name_opt(&name).cloned());
255
256            deleg
257        }
258
259        /// Lookup all peers that accept packets addressed to the given IP.
260        ///
261        /// This includes the peer's tailnet address and any subnet routes it provides. Only
262        /// the peers with the most specific subnet route match that covers `ip` will be
263        /// returned.
264        ///
265        /// E.g., suppose:
266        ///
267        /// - We're querying for `10.1.2.3`
268        /// - `PeerA` and `PeerB` have accepted routes for `10.1.2.0/24`
269        /// - `PeerC` has an accepted route for `10.1.0.0/16`
270        ///
271        /// Only `PeerA` and `PeerB` will be returned, since they have the most specific
272        /// prefix match.
273        #[message(ctx)]
274        pub fn peer_by_accepted_route(
275            &mut self,
276            ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
277            ip: IpAddr,
278        ) -> DelegatedReply<Vec<Node>> {
279            let (deleg, sender) = ctx.reply_sender();
280            let Some(sender) = sender else { return deleg };
281
282            if !self.seen_state_update {
283                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
284
285                self.pending_requests
286                    .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
287
288                return deleg;
289            }
290
291            sender.send(
292                self.peer_db
293                    .get_route(ip.into())
294                    .map(|(_id, node)| node.clone())
295                    .collect(),
296            );
297
298            deleg
299        }
300
301        /// Lookup the peer that has the given tailnet IP address.
302        #[message(ctx)]
303        pub fn peer_by_tailnet_ip(
304            &mut self,
305            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
306            ip: IpAddr,
307        ) -> DelegatedReply<Option<Node>> {
308            let (deleg, sender) = ctx.reply_sender();
309            let Some(sender) = sender else { return deleg };
310
311            if !self.seen_state_update {
312                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
313
314                self.pending_requests
315                    .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
316
317                return deleg;
318            }
319
320            sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
321
322            deleg
323        }
324
325        /// Build the peer entries of a [`Status`](crate::Status) snapshot, each paired with its
326        /// [`PeerId`] so [`Runtime::status`](crate::Runtime::status) can join per-peer connectivity
327        /// (`cur_addr`/`relay`) from the direct manager before returning. The self node is *not*
328        /// included here (it lives in the control runner); `Runtime::status` combines both and drops
329        /// the ids.
330        ///
331        /// Waits until we've received at least one peer update from control.
332        #[message(ctx)]
333        pub fn get_status(
334            &mut self,
335            ctx: &mut Context<Self, DelegatedReply<Vec<(PeerId, StatusNode)>>>,
336        ) -> DelegatedReply<Vec<(PeerId, StatusNode)>> {
337            let (deleg, sender) = ctx.reply_sender();
338            let Some(sender) = sender else { return deleg };
339
340            if !self.seen_state_update {
341                tracing::debug!("no peer state seen yet, queueing status request");
342                self.pending_requests.push(Pending::Status(sender));
343                return deleg;
344            }
345
346            sender.send(self.status_peers_with_ids());
347
348            deleg
349        }
350
351        /// Return every known peer's full domain [`Node`] (not the lossy [`StatusNode`]).
352        ///
353        /// Used by [`Runtime::file_targets`](crate::Runtime::file_targets), which needs the full node
354        /// (peerAPI address, owning user id, cap map) to compute Taildrop send targets. The self node
355        /// is not included (it lives in the control runner). Returns empty before the first netmap —
356        /// the natural "not connected yet" analog (an immediate answer, no queueing needed: callers
357        /// that need a populated list await `Running` first).
358        #[message]
359        pub fn all_peers(&self) -> Vec<Node> {
360            self.peer_db.peers().values().cloned().collect()
361        }
362
363        /// Resolve which node owns a tailnet source address.
364        ///
365        /// Maps the source IP of `addr` to the owning node via the tailnet-IP index, returning a
366        /// [`WhoIs`](crate::WhoIs). The port is ignored (a tailnet IP uniquely identifies a node).
367        ///
368        /// The resulting [`WhoIs`](crate::WhoIs) carries no user/login or capability data: this
369        /// fork's domain [`Node`](ts_control::Node) does not retain those wire fields. See the
370        /// [`status`](crate::status) module docs for the gap.
371        ///
372        /// Waits until we've received at least one peer update from control.
373        #[message(ctx)]
374        pub fn whois(
375            &mut self,
376            ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
377            addr: std::net::SocketAddr,
378        ) -> DelegatedReply<Option<crate::status::WhoIs>> {
379            let (deleg, sender) = ctx.reply_sender();
380            let Some(sender) = sender else { return deleg };
381
382            if !self.seen_state_update {
383                tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
384                self.pending_requests
385                    .push(Pending::WhoIs(Whois { addr }, sender));
386                return deleg;
387            }
388
389            sender.send(self.whois_opt(addr));
390
391            deleg
392        }
393
394        /// Subscribe to netmap peer-change events.
395        ///
396        /// Returns a [`watch::Receiver`] whose value is the current set of peer
397        /// [`StatusNode`]s, updated on every netmap state update from control. Embedders can await
398        /// changes via [`watch::Receiver::changed`] to react to peers joining, leaving, or changing.
399        ///
400        /// The receiver's initial value is the peer set at subscription time (empty before the
401        /// first netmap update). This is a peer-only view; combine with the self node from
402        /// [`Runtime::status`](crate::Runtime::status) when a full snapshot is needed.
403        #[message(derive(Clone))]
404        pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
405            self.peer_watch.subscribe()
406        }
407    }
408}
409
410pub use msg_impl::*;
411
412#[derive(Debug, Clone)]
413pub(crate) struct PeerState {
414    #[allow(unused)]
415    pub deletions: HashSet<PeerId>,
416    #[allow(unused)]
417    pub upserts: HashSet<PeerId>,
418    pub peers: Arc<PeerDb>,
419}
420
421impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
422    type Reply = ();
423
424    async fn handle(
425        &mut self,
426        msg: Arc<ts_control::StateUpdate>,
427        _ctx: &mut Context<Self, Self::Reply>,
428    ) {
429        // Accumulate user profiles first — control sends them incrementally and a response may
430        // carry profiles with no peer delta (or peers that reference a profile from an earlier
431        // response), so this must happen before the no-peer-update early return below.
432        for profile in &msg.user_profiles {
433            self.user_profiles.insert(profile.id, profile.clone());
434        }
435
436        // Apply the standalone online/last-seen delta maps (channels C/D, `MapResponse.OnlineChange`
437        // / `PeerSeenChange`). These arrive keyed by control node id and may ride a response that
438        // carries NO `peer_update` (a bare online flip is the common case), so they must be applied
439        // *before* the no-peer-update early return — otherwise online status freezes at the last
440        // full-node/patch value. Each entry only ever *sets* a value (never back to unknown).
441        let liveness_changed =
442            self.apply_liveness_changes(&msg.online_change, &msg.peer_seen_change);
443
444        if msg.peer_update.is_none() && msg.peer_patches.is_empty() {
445            // No peer set or patch this response. If a liveness delta still mutated the netmap,
446            // publish the refreshed snapshot so watchers (and `GetStatus`) see the new online state.
447            if liveness_changed {
448                self.service_pending_requests();
449                self.peer_watch.send_replace(self.status_peers());
450                if let Err(e) = self
451                    .env
452                    .publish(Arc::new(PeerState {
453                        upserts: HashSet::default(),
454                        deletions: HashSet::default(),
455                        peers: Arc::new(self.peer_db.clone()),
456                    }))
457                    .await
458                {
459                    tracing::error!(error = %e, "publishing liveness-only peer state update");
460                }
461            }
462            return;
463        }
464
465        // Apply the whole-node peer set (if any) FIRST, then the field-level patches on top —
466        // mirroring Go's `controlclient` order (`Peers*` then `PeersChangedPatch`). A response may
467        // carry either, both, or (with a liveness-only delta) neither. Merge the upsert/deletion sets
468        // so the published `PeerState` reflects every node touched by both passes; a node both
469        // upserted by the set and patched stays in `upserts` (the patch removes it from `deletions`).
470        let (mut upserts, mut deletions) = msg
471            .peer_update
472            .as_ref()
473            .map(|u| self.apply_peer_update(u))
474            .unwrap_or_default();
475
476        if !msg.peer_patches.is_empty() {
477            let (patch_upserts, patch_deletions) = self.apply_peer_patches(&msg.peer_patches);
478            // A patch can evict a node the set just upserted (TKA rejection after key rotation), or
479            // re-admit/patch one not in the set — reconcile so each id lands in exactly one set.
480            for id in &patch_upserts {
481                deletions.remove(id);
482            }
483            for id in &patch_deletions {
484                upserts.remove(id);
485            }
486            upserts.extend(patch_upserts);
487            deletions.extend(patch_deletions);
488        }
489
490        tracing::debug!(
491            n_upsert = upserts.len(),
492            n_delete = deletions.len(),
493            peer_count = self.peer_db.peers().len(),
494            "new peer state"
495        );
496
497        self.service_pending_requests();
498
499        // Publish the latest peer snapshot to netmap watchers. `send_replace` keeps the receiver's
500        // value current even when there are no subscribers, so a late subscriber sees fresh state.
501        self.peer_watch.send_replace(self.status_peers());
502
503        if let Err(e) = self
504            .env
505            .publish(Arc::new(PeerState {
506                upserts,
507                deletions,
508                peers: Arc::new(self.peer_db.clone()),
509            }))
510            .await
511        {
512            tracing::error!(error = %e, "publishing peer state update");
513        }
514    }
515}
516
517/// Bus message delivering the latest verified Tailnet-Lock [`Authority`](ts_tka::Authority) from the
518/// control runner (after a successful `/machine/tka/sync`) to the peer tracker for **observe-only**
519/// verify-and-logging (issue #136). Cloned onto the bus (`Authority` is `Clone`); the control runner
520/// re-publishes on every successful sync since the bus has no replay for a late subscriber.
521#[derive(Clone)]
522pub struct TkaAuthorityUpdate(pub Arc<ts_tka::Authority>);
523
524impl Message<TkaAuthorityUpdate> for PeerTracker {
525    type Reply = ();
526
527    async fn handle(&mut self, msg: TkaAuthorityUpdate, _ctx: &mut Context<Self, Self::Reply>) {
528        // Store as the OBSERVE-ONLY authority — never `tka_authority` (which would enforce). From
529        // here on, each upserted peer's signature verdict is logged; admission is unchanged.
530        tracing::info!(
531            head = %msg.0.head().to_base32(),
532            "TKA observe authority updated (verify-and-log active; not enforcing)"
533        );
534        self.tka_observe = Some((*msg.0).clone());
535    }
536}
537
538/// Ask the peer tracker to re-broadcast its current peer snapshot on the bus, without any peer
539/// change. Sent after a runtime preference change so the route updater and source filter (both
540/// `Arc<PeerState>` subscribers) re-resolve against the new value immediately, rather than waiting
541/// for the next netmap update: `Device::set_exit_node` (new exit-node selector) and
542/// `Device::set_accept_routes` (new accept-routes flag) both send it.
543#[derive(Debug, Clone, Copy)]
544pub struct RepublishState;
545
546impl Message<RepublishState> for PeerTracker {
547    type Reply = ();
548
549    async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
550        // An empty upsert/deletion set: this is a re-broadcast of the unchanged peer set, not a
551        // delta. Subscribers recompute their routes/filters against the current peers and the
552        // (just-updated) runtime preferences (exit-node selector, accept-routes flag).
553        if let Err(e) = self
554            .env
555            .publish(Arc::new(PeerState {
556                upserts: HashSet::default(),
557                deletions: HashSet::default(),
558                peers: Arc::new(self.peer_db.clone()),
559            }))
560            .await
561        {
562            tracing::error!(error = %e, "re-publishing peer state after a runtime preference change");
563        }
564    }
565}
566
567impl PeerTracker {
568    /// Apply a single [`PeerUpdate`](ts_control::PeerUpdate) to the peer db, enforcing the
569    /// Tailnet-Lock peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) at every upsert site.
570    ///
571    /// This is the **single source of truth** for the peer-trust enforcement loop: the actor's
572    /// netmap [`handle`](Message::handle) calls it, and so do the TKA enforcement tests, so the two
573    /// real upsert sites (`Full` and `Delta { upsert }`) cannot diverge from what is tested.
574    ///
575    /// Returns `(upserts, deletions)` — the [`PeerId`]s touched — for downstream bookkeeping.
576    fn apply_peer_update(
577        &mut self,
578        peer_update: &ts_control::PeerUpdate,
579    ) -> (HashSet<PeerId>, HashSet<PeerId>) {
580        let mut upserts = HashSet::default();
581        let mut deletions = HashSet::default();
582
583        match peer_update {
584            ts_control::PeerUpdate::Full(new_nodes) => {
585                tracing::trace!("full peer update");
586
587                // Only stable_ids that PASS the Tailnet-Lock gate survive a full re-sync. This makes
588                // revocation evict: if a peer is re-included with a now-invalid (or missing)
589                // signature under an active authority, it is excluded from `retained_ids`, so
590                // `retain` drops the stale (previously-admitted) entry rather than leaving it in the
591                // db unverified. With no authority, `tka_admits` is always `true`, so `retained_ids`
592                // is exactly the set of re-included stable_ids — the inactive path is byte-for-byte
593                // the pre-TKA behavior (no regression).
594                let retained_ids = new_nodes
595                    .iter()
596                    .filter(|node| self.tka_admits(node))
597                    .map(|x| &x.stable_id)
598                    .collect::<HashSet<_>>();
599
600                self.peer_db.retain(|id, peer| {
601                    let retain = retained_ids.contains(&peer.stable_id);
602
603                    if !retain {
604                        deletions.insert(id);
605                    }
606
607                    retain
608                });
609
610                for node in new_nodes {
611                    if !self.tka_admits(node) {
612                        continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
613                    }
614                    self.tka_observe_log(node); // verify-and-LOG (#136); never gates admission
615                    let peer_id = self.peer_db.upsert(node);
616                    upserts.insert(peer_id);
617                }
618            }
619
620            ts_control::PeerUpdate::Delta { remove, upsert } => {
621                tracing::trace!("delta peer update");
622
623                for peer in upsert {
624                    if !self.tka_admits(peer) {
625                        continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
626                    }
627                    self.tka_observe_log(peer); // verify-and-LOG (#136); never gates admission
628                    let id = self.peer_db.upsert(peer);
629
630                    upserts.insert(id);
631                }
632
633                for peer in remove {
634                    let Some((id, _node)) = self.peer_db.remove(peer) else {
635                        tracing::error!(control_node_id = peer, "removed peer was unknown");
636                        continue;
637                    };
638
639                    deletions.insert(id);
640                }
641            }
642        }
643
644        (upserts, deletions)
645    }
646
647    /// Apply field-level peer patches (`MapResponse.PeersChangedPatch`), returning the upserted /
648    /// deleted [`PeerId`]s.
649    ///
650    /// This is a SEPARATE channel from [`apply_peer_update`](Self::apply_peer_update): Go's
651    /// `controlclient` applies the whole-node `Peers*` set first and then `PeersChangedPatch`, so a
652    /// response that carries both has the peer set applied first (by the caller) and these patches
653    /// applied second, on top of the freshly-synced nodes. A patch only mutates a peer already in the
654    /// netmap; an unknown node id is ignored (the wire contract — a patch never creates a node).
655    fn apply_peer_patches(
656        &mut self,
657        patches: &[ts_control::PeerChange],
658    ) -> (HashSet<PeerId>, HashSet<PeerId>) {
659        let mut upserts = HashSet::default();
660        let mut deletions = HashSet::default();
661
662        tracing::trace!(n = patches.len(), "peer patch update");
663
664        for patch in patches {
665            // Clone the current node, apply the present fields, and re-upsert through the same path
666            // as a delta so indexes/routes stay consistent.
667            let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
668                tracing::debug!(
669                    control_node_id = patch.id,
670                    "peer patch for unknown node; ignoring"
671                );
672                continue;
673            };
674
675            let mut node = existing.clone();
676            if let Some(endpoints) = &patch.underlay_addresses {
677                node.underlay_addresses = endpoints.clone();
678            }
679            if let Some(derp) = patch.derp_region {
680                node.derp_region = Some(derp);
681            }
682            if let Some(cap) = patch.cap {
683                node.cap = cap;
684            }
685            if let Some(cap_map) = &patch.cap_map {
686                node.cap_map = cap_map.clone();
687            }
688            if let Some(disco_key) = patch.disco_key {
689                node.disco_key = Some(disco_key);
690            }
691            if let Some(expiry) = patch.node_key_expiry {
692                node.node_key_expiry = Some(expiry);
693            }
694            // Online/last-seen liveness deltas (`PeerChange.Online`/`LastSeen`) — the dominant
695            // channel by which peer online transitions arrive mid-session. A patch only ever *sets*
696            // a value (never patches back to unknown), so apply when present.
697            if let Some(online) = patch.online {
698                node.online = Some(online);
699            }
700            if let Some(last_seen) = patch.last_seen {
701                node.last_seen = Some(last_seen);
702            }
703            // Key rotation: a patch may swap the node key (and its TKA signature). Apply both
704            // together so the trust gate below verifies the new signature against the new key, never
705            // a mismatched pair.
706            if let Some(node_key) = patch.node_key {
707                node.node_key = node_key;
708            }
709            if let Some(sig) = &patch.key_signature {
710                node.key_signature = sig.clone();
711            }
712
713            // Re-run the tailnet-lock gate on the patched node: a patch that rotates the key must
714            // satisfy the active authority, exactly like a `Delta` upsert, or it would be a
715            // trust-enforcement bypass. fail-CLOSED — if the patched node is no longer admitted,
716            // evict it rather than keep the stale (now-unverified) entry.
717            if !self.tka_admits(&node) {
718                if let Some((id, _)) = self.peer_db.remove(&patch.id) {
719                    tracing::warn!(
720                        control_node_id = patch.id,
721                        "peer patch rejected by tailnet lock; evicting peer"
722                    );
723                    deletions.insert(id);
724                }
725                continue;
726            }
727
728            self.tka_observe_log(&node); // verify-and-LOG (#136); never gates admission
729            let id = self.peer_db.upsert(&node);
730            upserts.insert(id);
731        }
732
733        (upserts, deletions)
734    }
735
736    /// Apply the standalone online/last-seen delta maps (`MapResponse.OnlineChange` /
737    /// `PeerSeenChange`, channels C/D) onto the retained netmap. Returns `true` if any node was
738    /// actually mutated (so the caller knows whether to re-publish).
739    ///
740    /// Mirrors Go's post-`peers*` application of these maps. Each entry is keyed by control node id
741    /// and only ever *sets* a value (never back to unknown). An entry for an unknown node id is
742    /// ignored (like a patch — these maps never create a node). `peer_seen_change`'s `false` ("the
743    /// peer is gone") is applied as `online = Some(false)` — the node stays in the netmap, it is
744    /// merely marked offline; the `last_seen = now` update for the `true` case is intentionally not
745    /// performed here (it needs a wall clock this actor does not hold, and `last_seen` is the
746    /// low-value half — `online` is the `tailscale status` column that matters; see the iter-5
747    /// research note §5.5).
748    fn apply_liveness_changes(
749        &mut self,
750        online_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
751        peer_seen_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
752    ) -> bool {
753        let mut changed = false;
754
755        // Channel C — direct online flips.
756        for (&node_id, &online) in online_change {
757            if let Some((_pid, existing)) = self.peer_db.get(&node_id)
758                && existing.online != Some(online)
759            {
760                let mut node = existing.clone();
761                node.online = Some(online);
762                self.peer_db.upsert(&node);
763                changed = true;
764            }
765        }
766
767        // Channel D — peer-seen flips. `false` ⇒ "the peer is gone" ⇒ mark offline (the node is
768        // retained, not removed). `true` ⇒ "seen just now"; the online half is unknown from this
769        // signal alone, so we leave `online` untouched (a `true` here does not assert connectivity to
770        // control, only recent contact) and defer the `last_seen = now` timestamp (no clock here).
771        for (&node_id, &seen) in peer_seen_change {
772            if !seen
773                && let Some((_pid, existing)) = self.peer_db.get(&node_id)
774                && existing.online != Some(false)
775            {
776                let mut node = existing.clone();
777                node.online = Some(false);
778                self.peer_db.upsert(&node);
779                changed = true;
780            }
781        }
782
783        changed
784    }
785
786    /// Test-only constructor: build a [`PeerTracker`] with chosen TKA authorities without going
787    /// through the actor `on_start` path. `tka_authority` exercises the fail-closed enforcement
788    /// chokepoint ([`tka_admits`](Self::tka_admits)); `tka_observe` exercises the observe-only
789    /// verify-and-log seam ([`tka_observe_log`](Self::tka_observe_log)).
790    #[cfg(test)]
791    fn for_test(
792        env: Env,
793        tka_authority: Option<ts_tka::Authority>,
794        tka_observe: Option<ts_tka::Authority>,
795    ) -> Self {
796        let (peer_watch, _) = watch::channel(Vec::new());
797        Self {
798            peer_db: PeerDb::default(),
799            seen_state_update: false,
800            pending_requests: Vec::new(),
801            peer_watch,
802            user_profiles: HashMap::new(),
803            tka_authority,
804            tka_observe,
805            env,
806        }
807    }
808
809    fn service_pending_requests(&mut self) {
810        if self.seen_state_update {
811            return;
812        }
813
814        self.seen_state_update = true;
815
816        if !self.pending_requests.is_empty() {
817            tracing::debug!(
818                n_pending = self.pending_requests.len(),
819                "state update received, servicing pending requests"
820            );
821        }
822
823        for req in core::mem::take(&mut self.pending_requests) {
824            match req {
825                Pending::PeerByName(PeerByName { name }, reply) => {
826                    reply.send(self.peer_by_name_opt(&name).cloned());
827                }
828                Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
829                    reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
830                }
831                Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
832                    reply.send(
833                        self.peer_db
834                            .get_route(ip.into())
835                            .map(|(_id, node)| node.clone())
836                            .collect(),
837                    );
838                }
839                Pending::Status(reply) => {
840                    reply.send(self.status_peers_with_ids());
841                }
842                Pending::WhoIs(Whois { addr }, reply) => {
843                    reply.send(self.whois_opt(addr));
844                }
845            }
846        }
847    }
848}
849
850#[cfg(test)]
851mod tka_tests {
852    //! Tailnet-Lock (TKA) enforcement tests for the peer-trust chokepoint.
853    //!
854    //! These exercise [`PeerTracker::tka_admits`] and the `tka_admits ⇒ upsert` loop the netmap
855    //! handler runs. The test [`ts_tka::Authority`] is built with [`ts_tka::Authority::from_state`]
856    //! over a known Ed25519 trusted key, and the signed node-key signature CBOR is produced through
857    //! `ts_tka`'s public `cbor` encoder + `aum_hash` (the exact same canonical bytes `ts_tka`'s own
858    //! `direct_signature_verifies_end_to_end` test signs, with no new crypto vectors invented and no
859    //! private `ts_tka` API used).
860
861    use ed25519_dalek::{Signer, SigningKey};
862    use ts_control::{Node, StableNodeId, TailnetAddress};
863    use ts_tka::{
864        AumHash, Authority, Key, KeyKind, State,
865        cbor::{self, Value},
866    };
867
868    use super::*;
869
870    /// `SigKind::Direct` wire value (Go `SigKind`; `ts_tka::SigKind::Direct = 1`).
871    const SIG_KIND_DIRECT: u64 = 1;
872
873    /// The 32-byte node key used across the signed-peer fixtures.
874    const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
875
876    /// Build a real [`Env`] for the tracker. Only the bus/keys/shutdown plumbing matters here; the
877    /// TKA gate reads neither, so the forwarding preferences are all benign defaults.
878    fn test_env() -> Env {
879        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
880        Env::new(
881            ts_keys::NodeState::generate(),
882            shutdown_rx,
883            crate::env::ForwarderConfig {
884                accept_routes: false,
885                accept_dns: true,
886                exit_node: None,
887                forward_routes: Vec::new(),
888                forward_tcp_ports: Vec::new(),
889                forward_udp_ports: Vec::new(),
890                forward_all_ports: false,
891                forward_exit_egress: false,
892                block_incoming: false,
893                exit_proxy: None,
894                peerapi_port: None,
895                taildrop_dir: None,
896                enable_ipv6: false,
897                persistent_keepalive_interval: None,
898                ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
899            },
900        )
901    }
902
903    /// A minimal peer [`Node`] carrying `node_key` and the given `key_signature`.
904    fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
905        Node {
906            id: 1,
907            stable_id: StableNodeId(stable_id.to_string()),
908            hostname: stable_id.to_string(),
909            user_id: 0,
910            tailnet: Some("ts.net".to_string()),
911            tags: Vec::new(),
912            tailnet_address: TailnetAddress {
913                ipv4: "100.64.0.1/32".parse().unwrap(),
914                ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
915            },
916            node_key: node_key.into(),
917            node_key_expiry: None,
918            online: None,
919            last_seen: None,
920            key_signature,
921            machine_key: None,
922            disco_key: None,
923            accepted_routes: Vec::new(),
924            underlay_addresses: Vec::new(),
925            derp_region: None,
926            cap: Default::default(),
927            cap_map: Default::default(),
928            peerapi_port: None,
929            peerapi_dns_proxy: false,
930            is_wireguard_only: false,
931            exit_node_dns_resolvers: Vec::new(),
932            peer_relay: false,
933            service_vips: Default::default(),
934        }
935    }
936
937    /// Encode a `Direct` [`ts_tka::NodeKeySignature`] CBOR exactly as `ts_tka`'s private `to_cbor`
938    /// does (int-map keys: 1=kind, 2=pubkey, 3=key_id, 4=signature; empty byte fields omitted),
939    /// using only the crate's *public* `cbor` encoder. `signature` of `None` produces the
940    /// signing-digest preimage (the `SigHash` form).
941    fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
942        let mut pairs = alloc_pairs(node_key, key_id);
943        if let Some(sig) = signature {
944            pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
945        }
946        cbor::int_map(pairs).to_vec()
947    }
948
949    fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
950        vec![
951            (1, Some(Value::Uint(SIG_KIND_DIRECT))),
952            (2, Some(Value::Bytes(node_key.to_vec()))),
953            (3, Some(Value::Bytes(key_id.to_vec()))),
954        ]
955    }
956
957    /// Build a TKA [`Authority`] that trusts `signing.verifying_key()`, plus a valid `Direct`
958    /// node-key signature CBOR authorizing [`NODE_KEY_BYTES`] under it.
959    fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
960        // A fixed, known Ed25519 trusted key (mirrors ts_tka's own end-to-end test seed).
961        let signing = SigningKey::from_bytes(&[42u8; 32]);
962        let trusted_pub = signing.verifying_key().to_bytes().to_vec();
963
964        let authority = Authority::from_state(
965            AumHash([0; 32]),
966            State {
967                keys: vec![Key {
968                    kind: KeyKind::Ed25519,
969                    votes: 1,
970                    public: trusted_pub.clone(),
971                }],
972            },
973        );
974
975        // SigHash preimage = canonical CBOR with the signature field omitted; sign its blake2s hash.
976        let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
977        let sig_hash = ts_tka::aum_hash(&preimage).0;
978        let signature = signing.sign(&sig_hash).to_bytes().to_vec();
979
980        let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
981        // Sanity: the authority accepts the signature we just built (same path the gate uses).
982        assert!(
983            authority
984                .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
985                .is_ok()
986        );
987
988        (authority, signed_cbor)
989    }
990
991    #[tokio::test]
992    async fn tka_inactive_upserts_all_peers() {
993        // No authority ⇒ enforcement inactive ⇒ both a signed and an unsigned peer are admitted.
994        let mut tracker = PeerTracker::for_test(test_env(), None, None);
995
996        let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
997        let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
998
999        assert!(tracker.tka_admits(&signed));
1000        assert!(tracker.tka_admits(&unsigned));
1001
1002        tracker.peer_db.upsert(&signed);
1003        tracker.peer_db.upsert(&unsigned);
1004        assert_eq!(tracker.peer_db.peers().len(), 2);
1005    }
1006
1007    #[tokio::test]
1008    async fn tka_active_rejects_unsigned_peer() {
1009        // Authority present + peer presents no signature ⇒ rejected (fail-closed), not in peer_db.
1010        let (authority, _sig) = authority_and_valid_sig();
1011        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1012
1013        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1014        assert!(!tracker.tka_admits(&unsigned));
1015
1016        // Mirror the handler's `if !tka_admits { continue }` loop.
1017        if tracker.tka_admits(&unsigned) {
1018            tracker.peer_db.upsert(&unsigned);
1019        }
1020        assert_eq!(tracker.peer_db.peers().len(), 0);
1021        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1022    }
1023
1024    #[tokio::test]
1025    async fn tka_active_rejects_bad_signature() {
1026        // Authority present + a signature that fails to verify ⇒ rejected, not in peer_db.
1027        let (authority, mut sig) = authority_and_valid_sig();
1028        // Tamper the last byte (the trailing signature byte) so verification fails.
1029        let last = sig.len() - 1;
1030        sig[last] ^= 0xff;
1031
1032        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1033        let bad = peer_node("bad", NODE_KEY_BYTES, sig);
1034        assert!(!tracker.tka_admits(&bad));
1035
1036        if tracker.tka_admits(&bad) {
1037            tracker.peer_db.upsert(&bad);
1038        }
1039        assert_eq!(tracker.peer_db.peers().len(), 0);
1040    }
1041
1042    #[tokio::test]
1043    async fn tka_active_admits_authorized_peer() {
1044        // Authority present + correctly-signed node key ⇒ admitted and upserted.
1045        let (authority, sig) = authority_and_valid_sig();
1046        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1047
1048        let good = peer_node("good", NODE_KEY_BYTES, sig);
1049        assert!(tracker.tka_admits(&good));
1050
1051        if tracker.tka_admits(&good) {
1052            tracker.peer_db.upsert(&good);
1053        }
1054        assert_eq!(tracker.peer_db.peers().len(), 1);
1055        assert!(tracker.peer_db.get(&good.node_key).is_some());
1056    }
1057
1058    // ---------------------------------------------------------------------------------------------
1059    // Tests that drive REAL `PeerUpdate`s through the shared handler body
1060    // ([`PeerTracker::apply_peer_update`], the single source of truth the actor's netmap `handle`
1061    // also calls), so the two real upsert sites (`Full` and `Delta { upsert }`) are exercised via
1062    // the actual enforcement path — not by hand-mirroring `if !tka_admits { continue }`.
1063    // ---------------------------------------------------------------------------------------------
1064
1065    #[tokio::test]
1066    async fn tka_active_delta_upsert_rejects_unauthorized() {
1067        // Drive a real `Delta { upsert }` whose peer carries no signature. The Delta upsert site
1068        // must reject it under an active authority ⇒ not present in peer_db after the handler runs.
1069        let (authority, _sig) = authority_and_valid_sig();
1070        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1071
1072        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1073        let update = ts_control::PeerUpdate::Delta {
1074            upsert: vec![unsigned.clone()],
1075            remove: Vec::new(),
1076        };
1077
1078        tracker.apply_peer_update(&update);
1079
1080        assert_eq!(tracker.peer_db.peers().len(), 0);
1081        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1082    }
1083
1084    #[tokio::test]
1085    async fn tka_active_delta_upsert_admits_authorized() {
1086        // Drive a real `Delta { upsert }` with a correctly-signed peer ⇒ present in peer_db.
1087        let (authority, sig) = authority_and_valid_sig();
1088        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1089
1090        let good = peer_node("good", NODE_KEY_BYTES, sig);
1091        let update = ts_control::PeerUpdate::Delta {
1092            upsert: vec![good.clone()],
1093            remove: Vec::new(),
1094        };
1095
1096        tracker.apply_peer_update(&update);
1097
1098        assert_eq!(tracker.peer_db.peers().len(), 1);
1099        assert!(tracker.peer_db.get(&good.node_key).is_some());
1100    }
1101
1102    #[tokio::test]
1103    async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
1104        // Drive a real `Full` carrying a MIX of authorized + unauthorized peers. Only the
1105        // correctly-signed peer survives the Full upsert site; the unsigned and bad-sig peers are
1106        // dropped fail-closed.
1107        let (authority, sig) = authority_and_valid_sig();
1108        // A bad-sig variant of the same authorized signature (tamper the trailing byte).
1109        let mut bad_sig = sig.clone();
1110        let last = bad_sig.len() - 1;
1111        bad_sig[last] ^= 0xff;
1112
1113        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1114
1115        // Only the authorized peer carries NODE_KEY_BYTES (the key the authority signed); the
1116        // rejected peers use distinct node keys so the survivor is unambiguous.
1117        let good = peer_node("good", NODE_KEY_BYTES, sig);
1118        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1119        let bad = peer_node("bad", [9u8; 32], bad_sig);
1120
1121        let update =
1122            ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1123
1124        tracker.apply_peer_update(&update);
1125
1126        assert_eq!(tracker.peer_db.peers().len(), 1);
1127        assert!(tracker.peer_db.get(&good.node_key).is_some());
1128        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1129        assert!(tracker.peer_db.get(&bad.node_key).is_none());
1130    }
1131
1132    #[tokio::test]
1133    async fn tka_observe_only_admits_all_peers_in_mixed_batch() {
1134        // #136 observe-only contract: with the OBSERVE authority set (and `tka_authority = None`, so
1135        // enforcement is OFF), the exact mixed batch that the fail-closed test above prunes to 1 must
1136        // instead admit ALL THREE peers. The verify-and-log seam logs each verdict (verified /
1137        // unsigned / failed) but never gates admission. This locks observe-only against a future
1138        // refactor that accidentally wires `tka_observe` into a drop path.
1139        let (authority, sig) = authority_and_valid_sig();
1140        let mut bad_sig = sig.clone();
1141        let last = bad_sig.len() - 1;
1142        bad_sig[last] ^= 0xff;
1143
1144        // Authority in the OBSERVE slot, enforcement slot empty.
1145        let mut tracker = PeerTracker::for_test(test_env(), None, Some(authority));
1146
1147        let good = peer_node("good", NODE_KEY_BYTES, sig);
1148        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1149        let bad = peer_node("bad", [9u8; 32], bad_sig);
1150
1151        let update =
1152            ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1153
1154        tracker.apply_peer_update(&update);
1155
1156        // ALL THREE survive — observe-only never drops a peer.
1157        assert_eq!(
1158            tracker.peer_db.peers().len(),
1159            3,
1160            "observe-only must admit every peer regardless of signature verdict"
1161        );
1162        assert!(tracker.peer_db.get(&good.node_key).is_some());
1163        assert!(tracker.peer_db.get(&unsigned.node_key).is_some());
1164        assert!(tracker.peer_db.get(&bad.node_key).is_some());
1165    }
1166
1167    #[tokio::test]
1168    async fn tka_full_resync_revocation_behavior() {
1169        // Revocation-on-resync: admit a peer, then re-include the SAME stable_id in a `Full` with a
1170        // now-invalid signature. Per the Logic review finding, the pre-fix `retain` kept the stale
1171        // (previously-admitted) entry because membership was decided purely by stable_id.
1172        //
1173        // FIXED (not merely documented): the `Full` `retain` now keys on `tka_admits`-passing
1174        // stable_ids, so a peer whose re-included signature no longer verifies under the active
1175        // authority is EVICTED. This test asserts eviction. The inactive (authority=None) path is
1176        // provably unchanged — `tka_admits` always returns `true` there, so the retained set equals
1177        // the set of re-included stable_ids exactly (see `tka_inactive_full_resync_keeps_*`).
1178        let (authority, sig) = authority_and_valid_sig();
1179        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1180
1181        // 1) Admit the peer with a valid signature via a real `Full`.
1182        let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
1183        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1184        assert_eq!(tracker.peer_db.peers().len(), 1);
1185        assert!(tracker.peer_db.get(&good.node_key).is_some());
1186
1187        // 2) Re-sync the SAME stable_id, but with a now-invalid signature (tamper trailing byte).
1188        let mut bad_sig = sig;
1189        let last = bad_sig.len() - 1;
1190        bad_sig[last] ^= 0xff;
1191        let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
1192        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
1193
1194        // Eviction: the stale entry is dropped because its re-included signature fails the gate.
1195        assert_eq!(tracker.peer_db.peers().len(), 0);
1196        assert!(tracker.peer_db.get(&revoked.node_key).is_none());
1197    }
1198
1199    #[tokio::test]
1200    async fn tka_inactive_full_resync_keeps_reincluded_peer() {
1201        // Guard the inactive (authority=None) path against the revocation fix: with no authority,
1202        // a peer re-included in a `Full` survives regardless of its signature bytes — byte-for-byte
1203        // pre-TKA behavior, proving the `Full` `retain` change does not regress the always-taken
1204        // branch this wave.
1205        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1206
1207        let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
1208        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1209        assert_eq!(tracker.peer_db.peers().len(), 1);
1210
1211        // Re-sync the same stable_id with garbage signature bytes; inactive enforcement keeps it.
1212        let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
1213        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
1214        assert_eq!(tracker.peer_db.peers().len(), 1);
1215        assert!(tracker.peer_db.get(&resynced.node_key).is_some());
1216    }
1217
1218    /// A `Patch` for a peer already in the netmap merges only the fields it carries — here new UDP
1219    /// endpoints and a new home DERP — leaving the rest of the node intact. This is the fix for
1220    /// dropped `peers_changed_patch`: without it the netmap keeps stale endpoints and the peer can
1221    /// never re-handshake after it moves.
1222    #[tokio::test]
1223    async fn patch_merges_endpoints_and_derp_into_existing_peer() {
1224        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1225
1226        // Seed a peer (id == 1, per `peer_node`) with no endpoints / no DERP.
1227        let peer = peer_node("mover", [1u8; 32], vec![]);
1228        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1229        let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1230        assert!(before.underlay_addresses.is_empty());
1231        assert!(before.derp_region.is_none());
1232
1233        // Patch in fresh reachability (the idle-peer-reconnect case).
1234        let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1235        let patch = ts_control::PeerChange {
1236            id: 1,
1237            derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
1238            cap: None,
1239            cap_map: None,
1240            underlay_addresses: Some(vec![new_ep]),
1241            node_key: None,
1242            key_signature: None,
1243            disco_key: None,
1244            node_key_expiry: None,
1245            online: None,
1246            last_seen: None,
1247        };
1248        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1249
1250        assert_eq!(upserts.len(), 1);
1251        assert_eq!(deletions.len(), 0);
1252        // Same peer, now carrying the patched endpoint + DERP; node key untouched.
1253        assert_eq!(tracker.peer_db.peers().len(), 1);
1254        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1255        assert_eq!(after.underlay_addresses, vec![new_ep]);
1256        assert_eq!(
1257            after.derp_region,
1258            Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
1259        );
1260        assert_eq!(after.node_key, peer.node_key);
1261    }
1262
1263    /// Regression for `tsr-5u0`: when a whole-node set (`Delta`/`Full`) and a patch co-occur in one
1264    /// response, the patch is applied *on top of* the node the set just upserted — mirroring the
1265    /// handler's apply-order (peer set first, then `peer_patches`). Before the fix the patch shared
1266    /// the single `peer_update` slot and the co-occurring set silently dropped it, so a peer brought
1267    /// in by the delta kept stale (empty) reachability.
1268    #[tokio::test]
1269    async fn patch_applies_on_top_of_co_occurring_delta() {
1270        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1271
1272        // The whole-node delta upserts a brand-new peer (id == 1) with no reachability.
1273        let peer = peer_node("mover", [1u8; 32], vec![]);
1274        let (set_upserts, _) = tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1275            upsert: vec![peer.clone()],
1276            remove: vec![],
1277        });
1278        assert_eq!(set_upserts.len(), 1, "delta upserts the new peer");
1279
1280        // The patch from the SAME response then sets that peer's endpoints + DERP. This is exactly
1281        // the consumer order the handler runs (apply_peer_update then apply_peer_patches).
1282        let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1283        let patch = ts_control::PeerChange {
1284            id: 1,
1285            derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap())),
1286            cap: None,
1287            cap_map: None,
1288            underlay_addresses: Some(vec![new_ep]),
1289            node_key: None,
1290            key_signature: None,
1291            disco_key: None,
1292            node_key_expiry: None,
1293            online: None,
1294            last_seen: None,
1295        };
1296        let (patch_upserts, patch_deletions) =
1297            tracker.apply_peer_patches(std::slice::from_ref(&patch));
1298
1299        assert_eq!(
1300            patch_upserts.len(),
1301            1,
1302            "patch re-upserts the just-added peer"
1303        );
1304        assert_eq!(patch_deletions.len(), 0);
1305        // The peer added by the delta now carries the patched reachability — the patch was NOT lost.
1306        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1307        assert_eq!(after.underlay_addresses, vec![new_ep]);
1308        assert_eq!(
1309            after.derp_region,
1310            Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap()))
1311        );
1312    }
1313
1314    /// A `Patch` whose node id is not in the current netmap is ignored (the wire contract: a patch
1315    /// never creates a node). No upsert, no deletion, peer set unchanged.
1316    #[tokio::test]
1317    async fn patch_for_unknown_node_is_ignored() {
1318        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1319        let known = peer_node("known", [1u8; 32], vec![]); // id == 1
1320        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1321
1322        let patch = ts_control::PeerChange {
1323            id: 999, // not in the netmap
1324            derp_region: None,
1325            cap: None,
1326            cap_map: None,
1327            underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1328            node_key: None,
1329            key_signature: None,
1330            disco_key: None,
1331            node_key_expiry: None,
1332            online: None,
1333            last_seen: None,
1334        };
1335        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1336
1337        assert_eq!(upserts.len(), 0);
1338        assert_eq!(deletions.len(), 0);
1339        assert_eq!(tracker.peer_db.peers().len(), 1);
1340        assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1341    }
1342
1343    /// An expiry-only `Patch` updates `node_key_expiry` on the matching peer (Go
1344    /// `PeerChange.KeyExpiry`), rather than being silently dropped until the next full resync.
1345    #[tokio::test]
1346    async fn patch_updates_node_key_expiry() {
1347        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1348        let peer = peer_node("expiring", [1u8; 32], vec![]); // id == 1, node_key_expiry: None
1349        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1350
1351        let expiry = "2027-01-01T00:00:00Z"
1352            .parse::<chrono::DateTime<chrono::Utc>>()
1353            .unwrap();
1354        let patch = ts_control::PeerChange {
1355            id: 1,
1356            derp_region: None,
1357            cap: None,
1358            cap_map: None,
1359            underlay_addresses: None,
1360            node_key: None,
1361            key_signature: None,
1362            disco_key: None,
1363            node_key_expiry: Some(expiry),
1364            online: None,
1365            last_seen: None,
1366        };
1367        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1368
1369        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1370        assert_eq!(after.node_key_expiry, Some(expiry));
1371    }
1372
1373    /// Channel B: a `PeerChange.online` patch flips a peer's online state without a full node.
1374    #[tokio::test]
1375    async fn patch_updates_online() {
1376        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1377        let peer = peer_node("p", [1u8; 32], vec![]); // id == 1, online: None
1378        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1379        assert_eq!(
1380            tracker
1381                .peer_db
1382                .get(&(1 as ts_control::NodeId))
1383                .unwrap()
1384                .1
1385                .online,
1386            None
1387        );
1388
1389        let mut patch = ts_control::PeerChange {
1390            id: 1,
1391            derp_region: None,
1392            cap: None,
1393            cap_map: None,
1394            underlay_addresses: None,
1395            node_key: None,
1396            key_signature: None,
1397            disco_key: None,
1398            node_key_expiry: None,
1399            online: Some(true),
1400            last_seen: None,
1401        };
1402        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1403        assert_eq!(
1404            tracker
1405                .peer_db
1406                .get(&(1 as ts_control::NodeId))
1407                .unwrap()
1408                .1
1409                .online,
1410            Some(true),
1411            "PeerChange.online=Some(true) marks the peer online"
1412        );
1413
1414        // A subsequent patch flips it offline.
1415        patch.online = Some(false);
1416        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1417        assert_eq!(
1418            tracker
1419                .peer_db
1420                .get(&(1 as ts_control::NodeId))
1421                .unwrap()
1422                .1
1423                .online,
1424            Some(false)
1425        );
1426    }
1427
1428    /// Channel C/D: the `online_change` map flips online directly; `peer_seen_change: false`
1429    /// ("the peer is gone") marks the peer offline. Both apply to a peer already in the netmap and
1430    /// ignore unknown ids.
1431    #[tokio::test]
1432    async fn liveness_change_maps_apply_online() {
1433        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1434        let peer = peer_node("p", [1u8; 32], vec![]); // id == 1
1435        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1436
1437        // Channel C: online_change sets online=true.
1438        let mut online_change = std::collections::BTreeMap::new();
1439        online_change.insert(1 as ts_control::NodeId, true);
1440        online_change.insert(999 as ts_control::NodeId, true); // unknown id — ignored
1441        let changed = tracker.apply_liveness_changes(&online_change, &Default::default());
1442        assert!(changed);
1443        assert_eq!(
1444            tracker
1445                .peer_db
1446                .get(&(1 as ts_control::NodeId))
1447                .unwrap()
1448                .1
1449                .online,
1450            Some(true)
1451        );
1452
1453        // Channel D: peer_seen_change=false marks the peer offline (gone), node retained.
1454        let mut peer_seen_change = std::collections::BTreeMap::new();
1455        peer_seen_change.insert(1 as ts_control::NodeId, false);
1456        let changed = tracker.apply_liveness_changes(&Default::default(), &peer_seen_change);
1457        assert!(changed);
1458        assert_eq!(
1459            tracker
1460                .peer_db
1461                .get(&(1 as ts_control::NodeId))
1462                .unwrap()
1463                .1
1464                .online,
1465            Some(false),
1466            "peer_seen_change=false marks offline (the node stays in the netmap)"
1467        );
1468        assert_eq!(
1469            tracker.peer_db.peers().len(),
1470            1,
1471            "the node is retained, not removed"
1472        );
1473
1474        // No-op when nothing matches / changes.
1475        assert!(!tracker.apply_liveness_changes(&Default::default(), &Default::default()));
1476    }
1477
1478    /// Security: a `Patch` that rotates the node key must re-satisfy the tailnet-lock authority,
1479    /// exactly like a `Delta` upsert. A key-rotation patch whose new signature does NOT verify
1480    /// evicts the peer (fail-closed) rather than leaving a now-unverified entry — closing what would
1481    /// otherwise be a trust-enforcement bypass via the patch path.
1482    #[tokio::test]
1483    async fn patch_key_rotation_failing_tka_evicts_peer() {
1484        let (authority, sig) = authority_and_valid_sig();
1485        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1486
1487        // Admit a correctly-signed peer (id == 1).
1488        let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1489        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1490        assert_eq!(tracker.peer_db.peers().len(), 1);
1491
1492        // Patch a new node key whose signature is garbage under the active authority.
1493        let patch = ts_control::PeerChange {
1494            id: 1,
1495            derp_region: None,
1496            cap: None,
1497            cap_map: None,
1498            underlay_addresses: None,
1499            node_key: Some([0x33u8; 32].into()),
1500            key_signature: Some(vec![0x00, 0x01, 0x02]),
1501            disco_key: None,
1502            node_key_expiry: None,
1503            online: None,
1504            last_seen: None,
1505        };
1506        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1507
1508        assert_eq!(upserts.len(), 0);
1509        assert_eq!(deletions.len(), 1);
1510        assert_eq!(tracker.peer_db.peers().len(), 0);
1511    }
1512
1513    /// A node's `user_id` joins against the accumulated UserProfiles table to resolve the owning
1514    /// user's login name in `WhoIs.user`. With no matching profile, `user` is `None` (the
1515    /// pre-existing behavior); once a profile arrives, the same node resolves to its login. This
1516    /// proves the accumulate-then-join path the netmap handler builds.
1517    fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1518        ts_control::UserProfile {
1519            id,
1520            login_name: login.to_string(),
1521            display_name: None,
1522        }
1523    }
1524
1525    #[tokio::test]
1526    async fn whois_resolves_user_from_accumulated_profiles() {
1527        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1528
1529        // A peer owned by user id 42 at 100.64.0.1 (the peer_node fixture's address).
1530        let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1531        peer.user_id = 42;
1532        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1533        let addr = "100.64.0.1:0".parse().unwrap();
1534
1535        // No profile yet: the node resolves but its owner is unknown.
1536        let who = tracker.whois_opt(addr).expect("peer is known");
1537        assert_eq!(who.user, None);
1538
1539        // Profile for a DIFFERENT user must not match.
1540        tracker
1541            .user_profiles
1542            .insert(7, profile(7, "someone-else@example.com"));
1543        assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1544
1545        // The owning user's profile arrives (as the netmap handler would accumulate it): now the
1546        // login resolves.
1547        tracker
1548            .user_profiles
1549            .insert(42, profile(42, "alice@example.com"));
1550        assert_eq!(
1551            tracker.whois_opt(addr).unwrap().user,
1552            Some("alice@example.com".to_string())
1553        );
1554    }
1555
1556    /// `UserProfile::best_label` prefers the login name, falling back to display name, else `None`.
1557    #[test]
1558    fn user_profile_best_label_prefers_login() {
1559        assert_eq!(
1560            profile(1, "alice@example.com").best_label(),
1561            Some("alice@example.com".to_string())
1562        );
1563        let display_only = ts_control::UserProfile {
1564            id: 2,
1565            login_name: String::new(),
1566            display_name: Some("Bob".to_string()),
1567        };
1568        assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1569        let empty = ts_control::UserProfile {
1570            id: 3,
1571            login_name: String::new(),
1572            display_name: None,
1573        };
1574        assert_eq!(empty.best_label(), None);
1575    }
1576}