Skip to main content

ts_runtime/peer_tracker/
mod.rs

1//! Peer delta update tracking.
2
3use std::{
4    collections::{HashMap, HashSet},
5    net::IpAddr,
6    sync::Arc,
7};
8
9use kameo::{
10    actor::ActorRef,
11    message::{Context, Message},
12    reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24/// Actor that tracks peer delta updates and emits new states.
25pub struct PeerTracker {
26    peer_db: PeerDb,
27    seen_state_update: bool,
28    pending_requests: Vec<Pending>,
29    /// Latest peer snapshot, published on every netmap update so embedders can watch for peer
30    /// changes ([`WatchNetmap`]).
31    peer_watch: watch::Sender<Vec<StatusNode>>,
32    /// Accumulated netmap user profiles (`MapResponse.UserProfiles`), keyed by user id, joined
33    /// against a node's [`Node::user_id`](ts_control::Node::user_id) to resolve the owning user's
34    /// login/display name for a [`WhoIs`](crate::status::WhoIs). Control sends these incrementally
35    /// (only new/changed profiles per response), so this map **accumulates** across updates rather
36    /// than being replaced — a peer upserted in one response may reference a profile delivered in an
37    /// earlier one.
38    user_profiles: HashMap<UserId, UserProfile>,
39    /// Tailnet-Lock (TKA) authority used to verify each peer's `key_signature` at the peer-trust
40    /// chokepoint. When `Some`, enforcement is **active**: every upserted peer must present a
41    /// signature this authority authorizes, or it is rejected (fail-closed). When `None` (always,
42    /// this wave) enforcement is **inactive** and every peer is upserted — identical to pre-TKA
43    /// behavior. There is no live `Authority` source yet: building one requires the
44    /// `/machine/tka/sync` Noise RPC + AUM-chain replayer (deferred, see SECURITY.md). The
45    /// enforcement path below is wired and unit-tested, and flips on the instant an authority is
46    /// supplied; it is explicitly gated, not a silent no-op.
47    tka_authority: Option<ts_tka::Authority>,
48    /// Tailnet-Lock authority used **observe-only** (verify-and-LOG, issue #136): the live
49    /// `Authority` synced from control (delivered over the bus via [`TkaAuthorityUpdate`]). Distinct
50    /// from [`tka_authority`](Self::tka_authority) on purpose — populating *that* would flip the
51    /// runtime to fail-closed enforcement, whereas this field only feeds
52    /// [`tka_observe_log`](Self::tka_observe_log), which logs each peer's signature verdict and
53    /// **never** drops a peer. The current posture (per SECURITY.md / PARITY_ROADMAP): verify-and-log
54    /// while the `ts_tka` crypto is unaudited and control is treated as trusted; flipping to enforce
55    /// is a separate, gated decision.
56    tka_observe: Option<ts_tka::Authority>,
57    env: Env,
58}
59
60impl PeerTracker {
61    fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
62        // Canonicalization (case + trailing dot) is handled inside the name index lookup.
63        self.peer_db.get(&name).map(|(_id, node)| node)
64    }
65
66    fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
67        self.peer_db.get(&ip).map(|(_id, node)| node)
68    }
69
70    /// Build the peer entries for a [`Status`](crate::Status) snapshot from the current peer db.
71    fn status_peers(&self) -> Vec<StatusNode> {
72        self.peer_db
73            .peers()
74            .values()
75            .map(StatusNode::from_node)
76            .collect()
77    }
78
79    fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
80        let ip = crate::status::whois_addr(addr);
81        let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
82        // Join the node's owning user id against the accumulated UserProfiles table to resolve a
83        // login/display name. `None` when control sent no profile for that user (e.g. tagged nodes
84        // with no human owner, or a profile not yet delivered).
85        let user = self.resolve_user(node.user_id);
86        Some(crate::status::WhoIs::from_node_with_user(node, user))
87    }
88
89    /// Resolve a user id to its best display label from the accumulated profile table.
90    fn resolve_user(&self, user_id: UserId) -> Option<String> {
91        self.user_profiles
92            .get(&user_id)
93            .and_then(UserProfile::best_label)
94    }
95
96    /// Whether `node` may be admitted to the peer db under the current Tailnet-Lock posture.
97    ///
98    /// Fail-closed and gated:
99    /// - No [`tka_authority`](Self::tka_authority) ⇒ enforcement inactive ⇒ always admit (today's
100    ///   behavior; this is the always-taken branch this wave).
101    /// - Authority present + peer carries a `key_signature` that the authority authorizes for the
102    ///   peer's node key ⇒ admit.
103    /// - Authority present + signature missing or unauthorized/invalid ⇒ **reject** (Go denies
104    ///   network access to unsigned peers under tailnet lock; we do not upsert them).
105    fn tka_admits(&self, node: &Node) -> bool {
106        let Some(auth) = &self.tka_authority else {
107            return true;
108        };
109
110        if node.key_signature.is_empty() {
111            // TKA active but peer presented no signature: reject (Go denies network access to
112            // unsigned peers under tailnet lock, unless UnsignedPeerAPIOnly — out of scope here).
113            tracing::warn!(
114                stable_id = ?node.stable_id,
115                "TKA: rejecting unsigned peer under tailnet lock"
116            );
117            return false;
118        }
119
120        if let Err(e) = auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
121            tracing::warn!(
122                stable_id = ?node.stable_id,
123                error = %e,
124                "TKA: rejecting peer with unauthorized node key"
125            );
126            return false;
127        }
128
129        true
130    }
131
132    /// Verify `node`'s Tailnet-Lock signature against the **observe-only** authority and LOG the
133    /// verdict — issue #136. This is the verify-and-log seam: it returns `()` (NOT a bool), so it is
134    /// structurally impossible to wire as an admission gate, and it is called *adjacent* to each
135    /// upsert site without affecting whether the peer is admitted. Every peer is upserted exactly as
136    /// it would be with this call absent.
137    ///
138    /// A no-op when no observe authority has been synced yet. Logs `verified` / `failed` / `unsigned`
139    /// with the peer's `stable_id` and, on failure, the `TkaError` Display (static descriptors —
140    /// "bad sig len" etc.). NEVER logs the node-key or signature bytes.
141    fn tka_observe_log(&self, node: &Node) {
142        let Some(auth) = &self.tka_observe else {
143            return;
144        };
145        if node.key_signature.is_empty() {
146            tracing::info!(
147                stable_id = ?node.stable_id,
148                tka_verdict = "unsigned",
149                "TKA observe: peer presented no key-signature (advisory, NOT enforced)"
150            );
151            return;
152        }
153        match auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
154            Ok(()) => tracing::info!(
155                stable_id = ?node.stable_id,
156                tka_verdict = "verified",
157                "TKA observe: peer node-key authorized (advisory, NOT enforced)"
158            ),
159            Err(e) => tracing::warn!(
160                stable_id = ?node.stable_id,
161                tka_verdict = "failed",
162                reason = %e,
163                "TKA observe: peer key-signature did not verify (advisory, NOT enforced)"
164            ),
165        }
166    }
167}
168
169impl kameo::Actor for PeerTracker {
170    type Args = Env;
171    type Error = Error;
172
173    async fn on_start(env: Self::Args, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
174        env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
175        // Observe-only TKA (#136): the control runner publishes the verified `Authority` here after a
176        // successful `/machine/tka/sync`; we use it to verify-and-LOG each peer's signature, never to
177        // enforce. The bus has no replay, so the control runner re-publishes on every sync.
178        env.subscribe::<TkaAuthorityUpdate>(&slf).await?;
179
180        let (peer_watch, _) = watch::channel(Vec::new());
181
182        Ok(Self {
183            peer_db: PeerDb::default(),
184            pending_requests: Default::default(),
185            seen_state_update: false,
186            peer_watch,
187            user_profiles: HashMap::new(),
188            // No live TKA *enforcement* authority this wave (fail-closed path stays gated off; see
189            // `tka_authority`). The observe-only authority (`tka_observe`) is supplied over the bus.
190            tka_authority: None,
191            tka_observe: None,
192            env,
193        })
194    }
195}
196
197enum Pending {
198    PeerByName(PeerByName, ReplySender<Option<Node>>),
199    AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
200    TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
201    Status(ReplySender<Vec<StatusNode>>),
202    WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
203}
204
205// For messages with arguments, a struct is generated with the args as fields. They aren't
206// documented, and we can't apply attributes directly to the fields. Hence, wrap in a module where
207// docs are turned off everywhere.
208#[allow(missing_docs)]
209mod msg_impl {
210    use std::net::IpAddr;
211
212    use kameo::prelude::DelegatedReply;
213
214    use super::*;
215
216    #[kameo::messages]
217    impl PeerTracker {
218        /// Lookup a peer by name.
219        ///
220        /// Waits until we've received at least one peer update from control.
221        #[message(ctx)]
222        pub async fn peer_by_name(
223            &mut self,
224            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
225            name: String,
226        ) -> DelegatedReply<Option<Node>> {
227            let (deleg, sender) = ctx.reply_sender();
228            let Some(sender) = sender else { return deleg };
229
230            if !self.seen_state_update {
231                tracing::debug!(query = name, "no peer state seen yet, queueing request");
232
233                self.pending_requests
234                    .push(Pending::PeerByName(PeerByName { name }, sender));
235
236                return deleg;
237            }
238
239            sender.send(self.peer_by_name_opt(&name).cloned());
240
241            deleg
242        }
243
244        /// Lookup all peers that accept packets addressed to the given IP.
245        ///
246        /// This includes the peer's tailnet address and any subnet routes it provides. Only
247        /// the peers with the most specific subnet route match that covers `ip` will be
248        /// returned.
249        ///
250        /// E.g., suppose:
251        ///
252        /// - We're querying for `10.1.2.3`
253        /// - `PeerA` and `PeerB` have accepted routes for `10.1.2.0/24`
254        /// - `PeerC` has an accepted route for `10.1.0.0/16`
255        ///
256        /// Only `PeerA` and `PeerB` will be returned, since they have the most specific
257        /// prefix match.
258        #[message(ctx)]
259        pub fn peer_by_accepted_route(
260            &mut self,
261            ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
262            ip: IpAddr,
263        ) -> DelegatedReply<Vec<Node>> {
264            let (deleg, sender) = ctx.reply_sender();
265            let Some(sender) = sender else { return deleg };
266
267            if !self.seen_state_update {
268                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
269
270                self.pending_requests
271                    .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
272
273                return deleg;
274            }
275
276            sender.send(
277                self.peer_db
278                    .get_route(ip.into())
279                    .map(|(_id, node)| node.clone())
280                    .collect(),
281            );
282
283            deleg
284        }
285
286        /// Lookup the peer that has the given tailnet IP address.
287        #[message(ctx)]
288        pub fn peer_by_tailnet_ip(
289            &mut self,
290            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
291            ip: IpAddr,
292        ) -> DelegatedReply<Option<Node>> {
293            let (deleg, sender) = ctx.reply_sender();
294            let Some(sender) = sender else { return deleg };
295
296            if !self.seen_state_update {
297                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
298
299                self.pending_requests
300                    .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
301
302                return deleg;
303            }
304
305            sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
306
307            deleg
308        }
309
310        /// Build the peer entries of a [`Status`](crate::Status) snapshot.
311        ///
312        /// Returns one [`StatusNode`] per known peer. The self node is *not* included here (it
313        /// lives in the control runner); [`Runtime::status`](crate::Runtime::status) combines both.
314        ///
315        /// Waits until we've received at least one peer update from control.
316        #[message(ctx)]
317        pub fn get_status(
318            &mut self,
319            ctx: &mut Context<Self, DelegatedReply<Vec<StatusNode>>>,
320        ) -> DelegatedReply<Vec<StatusNode>> {
321            let (deleg, sender) = ctx.reply_sender();
322            let Some(sender) = sender else { return deleg };
323
324            if !self.seen_state_update {
325                tracing::debug!("no peer state seen yet, queueing status request");
326                self.pending_requests.push(Pending::Status(sender));
327                return deleg;
328            }
329
330            sender.send(self.status_peers());
331
332            deleg
333        }
334
335        /// Return every known peer's full domain [`Node`] (not the lossy [`StatusNode`]).
336        ///
337        /// Used by [`Runtime::file_targets`](crate::Runtime::file_targets), which needs the full node
338        /// (peerAPI address, owning user id, cap map) to compute Taildrop send targets. The self node
339        /// is not included (it lives in the control runner). Returns empty before the first netmap —
340        /// the natural "not connected yet" analog (an immediate answer, no queueing needed: callers
341        /// that need a populated list await `Running` first).
342        #[message]
343        pub fn all_peers(&self) -> Vec<Node> {
344            self.peer_db.peers().values().cloned().collect()
345        }
346
347        /// Resolve which node owns a tailnet source address.
348        ///
349        /// Maps the source IP of `addr` to the owning node via the tailnet-IP index, returning a
350        /// [`WhoIs`](crate::WhoIs). The port is ignored (a tailnet IP uniquely identifies a node).
351        ///
352        /// The resulting [`WhoIs`](crate::WhoIs) carries no user/login or capability data: this
353        /// fork's domain [`Node`](ts_control::Node) does not retain those wire fields. See the
354        /// [`status`](crate::status) module docs for the gap.
355        ///
356        /// Waits until we've received at least one peer update from control.
357        #[message(ctx)]
358        pub fn whois(
359            &mut self,
360            ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
361            addr: std::net::SocketAddr,
362        ) -> DelegatedReply<Option<crate::status::WhoIs>> {
363            let (deleg, sender) = ctx.reply_sender();
364            let Some(sender) = sender else { return deleg };
365
366            if !self.seen_state_update {
367                tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
368                self.pending_requests
369                    .push(Pending::WhoIs(Whois { addr }, sender));
370                return deleg;
371            }
372
373            sender.send(self.whois_opt(addr));
374
375            deleg
376        }
377
378        /// Subscribe to netmap peer-change events.
379        ///
380        /// Returns a [`watch::Receiver`] whose value is the current set of peer
381        /// [`StatusNode`]s, updated on every netmap state update from control. Embedders can await
382        /// changes via [`watch::Receiver::changed`] to react to peers joining, leaving, or changing.
383        ///
384        /// The receiver's initial value is the peer set at subscription time (empty before the
385        /// first netmap update). This is a peer-only view; combine with the self node from
386        /// [`Runtime::status`](crate::Runtime::status) when a full snapshot is needed.
387        #[message(derive(Clone))]
388        pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
389            self.peer_watch.subscribe()
390        }
391    }
392}
393
394pub use msg_impl::*;
395
396#[derive(Debug, Clone)]
397pub(crate) struct PeerState {
398    #[allow(unused)]
399    pub deletions: HashSet<PeerId>,
400    #[allow(unused)]
401    pub upserts: HashSet<PeerId>,
402    pub peers: Arc<PeerDb>,
403}
404
405impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
406    type Reply = ();
407
408    async fn handle(
409        &mut self,
410        msg: Arc<ts_control::StateUpdate>,
411        _ctx: &mut Context<Self, Self::Reply>,
412    ) {
413        // Accumulate user profiles first — control sends them incrementally and a response may
414        // carry profiles with no peer delta (or peers that reference a profile from an earlier
415        // response), so this must happen before the no-peer-update early return below.
416        for profile in &msg.user_profiles {
417            self.user_profiles.insert(profile.id, profile.clone());
418        }
419
420        // Apply the standalone online/last-seen delta maps (channels C/D, `MapResponse.OnlineChange`
421        // / `PeerSeenChange`). These arrive keyed by control node id and may ride a response that
422        // carries NO `peer_update` (a bare online flip is the common case), so they must be applied
423        // *before* the no-peer-update early return — otherwise online status freezes at the last
424        // full-node/patch value. Each entry only ever *sets* a value (never back to unknown).
425        let liveness_changed =
426            self.apply_liveness_changes(&msg.online_change, &msg.peer_seen_change);
427
428        if msg.peer_update.is_none() && msg.peer_patches.is_empty() {
429            // No peer set or patch this response. If a liveness delta still mutated the netmap,
430            // publish the refreshed snapshot so watchers (and `GetStatus`) see the new online state.
431            if liveness_changed {
432                self.service_pending_requests();
433                self.peer_watch.send_replace(self.status_peers());
434                if let Err(e) = self
435                    .env
436                    .publish(Arc::new(PeerState {
437                        upserts: HashSet::default(),
438                        deletions: HashSet::default(),
439                        peers: Arc::new(self.peer_db.clone()),
440                    }))
441                    .await
442                {
443                    tracing::error!(error = %e, "publishing liveness-only peer state update");
444                }
445            }
446            return;
447        }
448
449        // Apply the whole-node peer set (if any) FIRST, then the field-level patches on top —
450        // mirroring Go's `controlclient` order (`Peers*` then `PeersChangedPatch`). A response may
451        // carry either, both, or (with a liveness-only delta) neither. Merge the upsert/deletion sets
452        // so the published `PeerState` reflects every node touched by both passes; a node both
453        // upserted by the set and patched stays in `upserts` (the patch removes it from `deletions`).
454        let (mut upserts, mut deletions) = msg
455            .peer_update
456            .as_ref()
457            .map(|u| self.apply_peer_update(u))
458            .unwrap_or_default();
459
460        if !msg.peer_patches.is_empty() {
461            let (patch_upserts, patch_deletions) = self.apply_peer_patches(&msg.peer_patches);
462            // A patch can evict a node the set just upserted (TKA rejection after key rotation), or
463            // re-admit/patch one not in the set — reconcile so each id lands in exactly one set.
464            for id in &patch_upserts {
465                deletions.remove(id);
466            }
467            for id in &patch_deletions {
468                upserts.remove(id);
469            }
470            upserts.extend(patch_upserts);
471            deletions.extend(patch_deletions);
472        }
473
474        tracing::debug!(
475            n_upsert = upserts.len(),
476            n_delete = deletions.len(),
477            peer_count = self.peer_db.peers().len(),
478            "new peer state"
479        );
480
481        self.service_pending_requests();
482
483        // Publish the latest peer snapshot to netmap watchers. `send_replace` keeps the receiver's
484        // value current even when there are no subscribers, so a late subscriber sees fresh state.
485        self.peer_watch.send_replace(self.status_peers());
486
487        if let Err(e) = self
488            .env
489            .publish(Arc::new(PeerState {
490                upserts,
491                deletions,
492                peers: Arc::new(self.peer_db.clone()),
493            }))
494            .await
495        {
496            tracing::error!(error = %e, "publishing peer state update");
497        }
498    }
499}
500
501/// Bus message delivering the latest verified Tailnet-Lock [`Authority`](ts_tka::Authority) from the
502/// control runner (after a successful `/machine/tka/sync`) to the peer tracker for **observe-only**
503/// verify-and-logging (issue #136). Cloned onto the bus (`Authority` is `Clone`); the control runner
504/// re-publishes on every successful sync since the bus has no replay for a late subscriber.
505#[derive(Clone)]
506pub struct TkaAuthorityUpdate(pub Arc<ts_tka::Authority>);
507
508impl Message<TkaAuthorityUpdate> for PeerTracker {
509    type Reply = ();
510
511    async fn handle(&mut self, msg: TkaAuthorityUpdate, _ctx: &mut Context<Self, Self::Reply>) {
512        // Store as the OBSERVE-ONLY authority — never `tka_authority` (which would enforce). From
513        // here on, each upserted peer's signature verdict is logged; admission is unchanged.
514        tracing::info!(
515            head = %msg.0.head().to_base32(),
516            "TKA observe authority updated (verify-and-log active; not enforcing)"
517        );
518        self.tka_observe = Some((*msg.0).clone());
519    }
520}
521
522/// Ask the peer tracker to re-broadcast its current peer snapshot on the bus, without any peer
523/// change. `Device::set_exit_node` sends this after changing the exit-node selector so the route
524/// updater and source filter (both `Arc<PeerState>` subscribers) re-resolve the new selector
525/// immediately, rather than waiting for the next netmap update.
526#[derive(Debug, Clone, Copy)]
527pub struct RepublishState;
528
529impl Message<RepublishState> for PeerTracker {
530    type Reply = ();
531
532    async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
533        // An empty upsert/deletion set: this is a re-broadcast of the unchanged peer set, not a
534        // delta. Subscribers recompute their routes/filters against the current peers and the
535        // (just-updated) exit-node selector.
536        if let Err(e) = self
537            .env
538            .publish(Arc::new(PeerState {
539                upserts: HashSet::default(),
540                deletions: HashSet::default(),
541                peers: Arc::new(self.peer_db.clone()),
542            }))
543            .await
544        {
545            tracing::error!(error = %e, "re-publishing peer state after exit-node change");
546        }
547    }
548}
549
550impl PeerTracker {
551    /// Apply a single [`PeerUpdate`](ts_control::PeerUpdate) to the peer db, enforcing the
552    /// Tailnet-Lock peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) at every upsert site.
553    ///
554    /// This is the **single source of truth** for the peer-trust enforcement loop: the actor's
555    /// netmap [`handle`](Message::handle) calls it, and so do the TKA enforcement tests, so the two
556    /// real upsert sites (`Full` and `Delta { upsert }`) cannot diverge from what is tested.
557    ///
558    /// Returns `(upserts, deletions)` — the [`PeerId`]s touched — for downstream bookkeeping.
559    fn apply_peer_update(
560        &mut self,
561        peer_update: &ts_control::PeerUpdate,
562    ) -> (HashSet<PeerId>, HashSet<PeerId>) {
563        let mut upserts = HashSet::default();
564        let mut deletions = HashSet::default();
565
566        match peer_update {
567            ts_control::PeerUpdate::Full(new_nodes) => {
568                tracing::trace!("full peer update");
569
570                // Only stable_ids that PASS the Tailnet-Lock gate survive a full re-sync. This makes
571                // revocation evict: if a peer is re-included with a now-invalid (or missing)
572                // signature under an active authority, it is excluded from `retained_ids`, so
573                // `retain` drops the stale (previously-admitted) entry rather than leaving it in the
574                // db unverified. With no authority, `tka_admits` is always `true`, so `retained_ids`
575                // is exactly the set of re-included stable_ids — the inactive path is byte-for-byte
576                // the pre-TKA behavior (no regression).
577                let retained_ids = new_nodes
578                    .iter()
579                    .filter(|node| self.tka_admits(node))
580                    .map(|x| &x.stable_id)
581                    .collect::<HashSet<_>>();
582
583                self.peer_db.retain(|id, peer| {
584                    let retain = retained_ids.contains(&peer.stable_id);
585
586                    if !retain {
587                        deletions.insert(id);
588                    }
589
590                    retain
591                });
592
593                for node in new_nodes {
594                    if !self.tka_admits(node) {
595                        continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
596                    }
597                    self.tka_observe_log(node); // verify-and-LOG (#136); never gates admission
598                    let peer_id = self.peer_db.upsert(node);
599                    upserts.insert(peer_id);
600                }
601            }
602
603            ts_control::PeerUpdate::Delta { remove, upsert } => {
604                tracing::trace!("delta peer update");
605
606                for peer in upsert {
607                    if !self.tka_admits(peer) {
608                        continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
609                    }
610                    self.tka_observe_log(peer); // verify-and-LOG (#136); never gates admission
611                    let id = self.peer_db.upsert(peer);
612
613                    upserts.insert(id);
614                }
615
616                for peer in remove {
617                    let Some((id, _node)) = self.peer_db.remove(peer) else {
618                        tracing::error!(control_node_id = peer, "removed peer was unknown");
619                        continue;
620                    };
621
622                    deletions.insert(id);
623                }
624            }
625        }
626
627        (upserts, deletions)
628    }
629
630    /// Apply field-level peer patches (`MapResponse.PeersChangedPatch`), returning the upserted /
631    /// deleted [`PeerId`]s.
632    ///
633    /// This is a SEPARATE channel from [`apply_peer_update`](Self::apply_peer_update): Go's
634    /// `controlclient` applies the whole-node `Peers*` set first and then `PeersChangedPatch`, so a
635    /// response that carries both has the peer set applied first (by the caller) and these patches
636    /// applied second, on top of the freshly-synced nodes. A patch only mutates a peer already in the
637    /// netmap; an unknown node id is ignored (the wire contract — a patch never creates a node).
638    fn apply_peer_patches(
639        &mut self,
640        patches: &[ts_control::PeerChange],
641    ) -> (HashSet<PeerId>, HashSet<PeerId>) {
642        let mut upserts = HashSet::default();
643        let mut deletions = HashSet::default();
644
645        tracing::trace!(n = patches.len(), "peer patch update");
646
647        for patch in patches {
648            // Clone the current node, apply the present fields, and re-upsert through the same path
649            // as a delta so indexes/routes stay consistent.
650            let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
651                tracing::debug!(
652                    control_node_id = patch.id,
653                    "peer patch for unknown node; ignoring"
654                );
655                continue;
656            };
657
658            let mut node = existing.clone();
659            if let Some(endpoints) = &patch.underlay_addresses {
660                node.underlay_addresses = endpoints.clone();
661            }
662            if let Some(derp) = patch.derp_region {
663                node.derp_region = Some(derp);
664            }
665            if let Some(cap) = patch.cap {
666                node.cap = cap;
667            }
668            if let Some(cap_map) = &patch.cap_map {
669                node.cap_map = cap_map.clone();
670            }
671            if let Some(disco_key) = patch.disco_key {
672                node.disco_key = Some(disco_key);
673            }
674            if let Some(expiry) = patch.node_key_expiry {
675                node.node_key_expiry = Some(expiry);
676            }
677            // Online/last-seen liveness deltas (`PeerChange.Online`/`LastSeen`) — the dominant
678            // channel by which peer online transitions arrive mid-session. A patch only ever *sets*
679            // a value (never patches back to unknown), so apply when present.
680            if let Some(online) = patch.online {
681                node.online = Some(online);
682            }
683            if let Some(last_seen) = patch.last_seen {
684                node.last_seen = Some(last_seen);
685            }
686            // Key rotation: a patch may swap the node key (and its TKA signature). Apply both
687            // together so the trust gate below verifies the new signature against the new key, never
688            // a mismatched pair.
689            if let Some(node_key) = patch.node_key {
690                node.node_key = node_key;
691            }
692            if let Some(sig) = &patch.key_signature {
693                node.key_signature = sig.clone();
694            }
695
696            // Re-run the tailnet-lock gate on the patched node: a patch that rotates the key must
697            // satisfy the active authority, exactly like a `Delta` upsert, or it would be a
698            // trust-enforcement bypass. fail-CLOSED — if the patched node is no longer admitted,
699            // evict it rather than keep the stale (now-unverified) entry.
700            if !self.tka_admits(&node) {
701                if let Some((id, _)) = self.peer_db.remove(&patch.id) {
702                    tracing::warn!(
703                        control_node_id = patch.id,
704                        "peer patch rejected by tailnet lock; evicting peer"
705                    );
706                    deletions.insert(id);
707                }
708                continue;
709            }
710
711            self.tka_observe_log(&node); // verify-and-LOG (#136); never gates admission
712            let id = self.peer_db.upsert(&node);
713            upserts.insert(id);
714        }
715
716        (upserts, deletions)
717    }
718
719    /// Apply the standalone online/last-seen delta maps (`MapResponse.OnlineChange` /
720    /// `PeerSeenChange`, channels C/D) onto the retained netmap. Returns `true` if any node was
721    /// actually mutated (so the caller knows whether to re-publish).
722    ///
723    /// Mirrors Go's post-`peers*` application of these maps. Each entry is keyed by control node id
724    /// and only ever *sets* a value (never back to unknown). An entry for an unknown node id is
725    /// ignored (like a patch — these maps never create a node). `peer_seen_change`'s `false` ("the
726    /// peer is gone") is applied as `online = Some(false)` — the node stays in the netmap, it is
727    /// merely marked offline; the `last_seen = now` update for the `true` case is intentionally not
728    /// performed here (it needs a wall clock this actor does not hold, and `last_seen` is the
729    /// low-value half — `online` is the `tailscale status` column that matters; see the iter-5
730    /// research note §5.5).
731    fn apply_liveness_changes(
732        &mut self,
733        online_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
734        peer_seen_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
735    ) -> bool {
736        let mut changed = false;
737
738        // Channel C — direct online flips.
739        for (&node_id, &online) in online_change {
740            if let Some((_pid, existing)) = self.peer_db.get(&node_id)
741                && existing.online != Some(online)
742            {
743                let mut node = existing.clone();
744                node.online = Some(online);
745                self.peer_db.upsert(&node);
746                changed = true;
747            }
748        }
749
750        // Channel D — peer-seen flips. `false` ⇒ "the peer is gone" ⇒ mark offline (the node is
751        // retained, not removed). `true` ⇒ "seen just now"; the online half is unknown from this
752        // signal alone, so we leave `online` untouched (a `true` here does not assert connectivity to
753        // control, only recent contact) and defer the `last_seen = now` timestamp (no clock here).
754        for (&node_id, &seen) in peer_seen_change {
755            if !seen
756                && let Some((_pid, existing)) = self.peer_db.get(&node_id)
757                && existing.online != Some(false)
758            {
759                let mut node = existing.clone();
760                node.online = Some(false);
761                self.peer_db.upsert(&node);
762                changed = true;
763            }
764        }
765
766        changed
767    }
768
769    /// Test-only constructor: build a [`PeerTracker`] with chosen TKA authorities without going
770    /// through the actor `on_start` path. `tka_authority` exercises the fail-closed enforcement
771    /// chokepoint ([`tka_admits`](Self::tka_admits)); `tka_observe` exercises the observe-only
772    /// verify-and-log seam ([`tka_observe_log`](Self::tka_observe_log)).
773    #[cfg(test)]
774    fn for_test(
775        env: Env,
776        tka_authority: Option<ts_tka::Authority>,
777        tka_observe: Option<ts_tka::Authority>,
778    ) -> Self {
779        let (peer_watch, _) = watch::channel(Vec::new());
780        Self {
781            peer_db: PeerDb::default(),
782            seen_state_update: false,
783            pending_requests: Vec::new(),
784            peer_watch,
785            user_profiles: HashMap::new(),
786            tka_authority,
787            tka_observe,
788            env,
789        }
790    }
791
792    fn service_pending_requests(&mut self) {
793        if self.seen_state_update {
794            return;
795        }
796
797        self.seen_state_update = true;
798
799        if !self.pending_requests.is_empty() {
800            tracing::debug!(
801                n_pending = self.pending_requests.len(),
802                "state update received, servicing pending requests"
803            );
804        }
805
806        for req in core::mem::take(&mut self.pending_requests) {
807            match req {
808                Pending::PeerByName(PeerByName { name }, reply) => {
809                    reply.send(self.peer_by_name_opt(&name).cloned());
810                }
811                Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
812                    reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
813                }
814                Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
815                    reply.send(
816                        self.peer_db
817                            .get_route(ip.into())
818                            .map(|(_id, node)| node.clone())
819                            .collect(),
820                    );
821                }
822                Pending::Status(reply) => {
823                    reply.send(self.status_peers());
824                }
825                Pending::WhoIs(Whois { addr }, reply) => {
826                    reply.send(self.whois_opt(addr));
827                }
828            }
829        }
830    }
831}
832
833#[cfg(test)]
834mod tka_tests {
835    //! Tailnet-Lock (TKA) enforcement tests for the peer-trust chokepoint.
836    //!
837    //! These exercise [`PeerTracker::tka_admits`] and the `tka_admits ⇒ upsert` loop the netmap
838    //! handler runs. The test [`ts_tka::Authority`] is built with [`ts_tka::Authority::from_state`]
839    //! over a known Ed25519 trusted key, and the signed node-key signature CBOR is produced through
840    //! `ts_tka`'s public `cbor` encoder + `aum_hash` (the exact same canonical bytes `ts_tka`'s own
841    //! `direct_signature_verifies_end_to_end` test signs, with no new crypto vectors invented and no
842    //! private `ts_tka` API used).
843
844    use ed25519_dalek::{Signer, SigningKey};
845    use ts_control::{Node, StableNodeId, TailnetAddress};
846    use ts_tka::{
847        AumHash, Authority, Key, KeyKind, State,
848        cbor::{self, Value},
849    };
850
851    use super::*;
852
853    /// `SigKind::Direct` wire value (Go `SigKind`; `ts_tka::SigKind::Direct = 1`).
854    const SIG_KIND_DIRECT: u64 = 1;
855
856    /// The 32-byte node key used across the signed-peer fixtures.
857    const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
858
859    /// Build a real [`Env`] for the tracker. Only the bus/keys/shutdown plumbing matters here; the
860    /// TKA gate reads neither, so the forwarding preferences are all benign defaults.
861    fn test_env() -> Env {
862        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
863        Env::new(
864            ts_keys::NodeState::generate(),
865            shutdown_rx,
866            crate::env::ForwarderConfig {
867                accept_routes: false,
868                exit_node: None,
869                forward_routes: Vec::new(),
870                forward_tcp_ports: Vec::new(),
871                forward_udp_ports: Vec::new(),
872                forward_all_ports: false,
873                forward_exit_egress: false,
874                block_incoming: false,
875                exit_proxy: None,
876                peerapi_port: None,
877                taildrop_dir: None,
878                enable_ipv6: false,
879                persistent_keepalive_interval: None,
880                ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
881            },
882        )
883    }
884
885    /// A minimal peer [`Node`] carrying `node_key` and the given `key_signature`.
886    fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
887        Node {
888            id: 1,
889            stable_id: StableNodeId(stable_id.to_string()),
890            hostname: stable_id.to_string(),
891            user_id: 0,
892            tailnet: Some("ts.net".to_string()),
893            tags: Vec::new(),
894            tailnet_address: TailnetAddress {
895                ipv4: "100.64.0.1/32".parse().unwrap(),
896                ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
897            },
898            node_key: node_key.into(),
899            node_key_expiry: None,
900            online: None,
901            last_seen: None,
902            key_signature,
903            machine_key: None,
904            disco_key: None,
905            accepted_routes: Vec::new(),
906            underlay_addresses: Vec::new(),
907            derp_region: None,
908            cap: Default::default(),
909            cap_map: Default::default(),
910            peerapi_port: None,
911            peerapi_dns_proxy: false,
912            is_wireguard_only: false,
913            exit_node_dns_resolvers: Vec::new(),
914            peer_relay: false,
915            service_vips: Default::default(),
916        }
917    }
918
919    /// Encode a `Direct` [`ts_tka::NodeKeySignature`] CBOR exactly as `ts_tka`'s private `to_cbor`
920    /// does (int-map keys: 1=kind, 2=pubkey, 3=key_id, 4=signature; empty byte fields omitted),
921    /// using only the crate's *public* `cbor` encoder. `signature` of `None` produces the
922    /// signing-digest preimage (the `SigHash` form).
923    fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
924        let mut pairs = alloc_pairs(node_key, key_id);
925        if let Some(sig) = signature {
926            pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
927        }
928        cbor::int_map(pairs).to_vec()
929    }
930
931    fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
932        vec![
933            (1, Some(Value::Uint(SIG_KIND_DIRECT))),
934            (2, Some(Value::Bytes(node_key.to_vec()))),
935            (3, Some(Value::Bytes(key_id.to_vec()))),
936        ]
937    }
938
939    /// Build a TKA [`Authority`] that trusts `signing.verifying_key()`, plus a valid `Direct`
940    /// node-key signature CBOR authorizing [`NODE_KEY_BYTES`] under it.
941    fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
942        // A fixed, known Ed25519 trusted key (mirrors ts_tka's own end-to-end test seed).
943        let signing = SigningKey::from_bytes(&[42u8; 32]);
944        let trusted_pub = signing.verifying_key().to_bytes().to_vec();
945
946        let authority = Authority::from_state(
947            AumHash([0; 32]),
948            State {
949                keys: vec![Key {
950                    kind: KeyKind::Ed25519,
951                    votes: 1,
952                    public: trusted_pub.clone(),
953                }],
954            },
955        );
956
957        // SigHash preimage = canonical CBOR with the signature field omitted; sign its blake2s hash.
958        let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
959        let sig_hash = ts_tka::aum_hash(&preimage).0;
960        let signature = signing.sign(&sig_hash).to_bytes().to_vec();
961
962        let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
963        // Sanity: the authority accepts the signature we just built (same path the gate uses).
964        assert!(
965            authority
966                .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
967                .is_ok()
968        );
969
970        (authority, signed_cbor)
971    }
972
973    #[tokio::test]
974    async fn tka_inactive_upserts_all_peers() {
975        // No authority ⇒ enforcement inactive ⇒ both a signed and an unsigned peer are admitted.
976        let mut tracker = PeerTracker::for_test(test_env(), None, None);
977
978        let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
979        let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
980
981        assert!(tracker.tka_admits(&signed));
982        assert!(tracker.tka_admits(&unsigned));
983
984        tracker.peer_db.upsert(&signed);
985        tracker.peer_db.upsert(&unsigned);
986        assert_eq!(tracker.peer_db.peers().len(), 2);
987    }
988
989    #[tokio::test]
990    async fn tka_active_rejects_unsigned_peer() {
991        // Authority present + peer presents no signature ⇒ rejected (fail-closed), not in peer_db.
992        let (authority, _sig) = authority_and_valid_sig();
993        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
994
995        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
996        assert!(!tracker.tka_admits(&unsigned));
997
998        // Mirror the handler's `if !tka_admits { continue }` loop.
999        if tracker.tka_admits(&unsigned) {
1000            tracker.peer_db.upsert(&unsigned);
1001        }
1002        assert_eq!(tracker.peer_db.peers().len(), 0);
1003        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1004    }
1005
1006    #[tokio::test]
1007    async fn tka_active_rejects_bad_signature() {
1008        // Authority present + a signature that fails to verify ⇒ rejected, not in peer_db.
1009        let (authority, mut sig) = authority_and_valid_sig();
1010        // Tamper the last byte (the trailing signature byte) so verification fails.
1011        let last = sig.len() - 1;
1012        sig[last] ^= 0xff;
1013
1014        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1015        let bad = peer_node("bad", NODE_KEY_BYTES, sig);
1016        assert!(!tracker.tka_admits(&bad));
1017
1018        if tracker.tka_admits(&bad) {
1019            tracker.peer_db.upsert(&bad);
1020        }
1021        assert_eq!(tracker.peer_db.peers().len(), 0);
1022    }
1023
1024    #[tokio::test]
1025    async fn tka_active_admits_authorized_peer() {
1026        // Authority present + correctly-signed node key ⇒ admitted and upserted.
1027        let (authority, sig) = authority_and_valid_sig();
1028        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1029
1030        let good = peer_node("good", NODE_KEY_BYTES, sig);
1031        assert!(tracker.tka_admits(&good));
1032
1033        if tracker.tka_admits(&good) {
1034            tracker.peer_db.upsert(&good);
1035        }
1036        assert_eq!(tracker.peer_db.peers().len(), 1);
1037        assert!(tracker.peer_db.get(&good.node_key).is_some());
1038    }
1039
1040    // ---------------------------------------------------------------------------------------------
1041    // Tests that drive REAL `PeerUpdate`s through the shared handler body
1042    // ([`PeerTracker::apply_peer_update`], the single source of truth the actor's netmap `handle`
1043    // also calls), so the two real upsert sites (`Full` and `Delta { upsert }`) are exercised via
1044    // the actual enforcement path — not by hand-mirroring `if !tka_admits { continue }`.
1045    // ---------------------------------------------------------------------------------------------
1046
1047    #[tokio::test]
1048    async fn tka_active_delta_upsert_rejects_unauthorized() {
1049        // Drive a real `Delta { upsert }` whose peer carries no signature. The Delta upsert site
1050        // must reject it under an active authority ⇒ not present in peer_db after the handler runs.
1051        let (authority, _sig) = authority_and_valid_sig();
1052        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1053
1054        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1055        let update = ts_control::PeerUpdate::Delta {
1056            upsert: vec![unsigned.clone()],
1057            remove: Vec::new(),
1058        };
1059
1060        tracker.apply_peer_update(&update);
1061
1062        assert_eq!(tracker.peer_db.peers().len(), 0);
1063        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1064    }
1065
1066    #[tokio::test]
1067    async fn tka_active_delta_upsert_admits_authorized() {
1068        // Drive a real `Delta { upsert }` with a correctly-signed peer ⇒ present in peer_db.
1069        let (authority, sig) = authority_and_valid_sig();
1070        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1071
1072        let good = peer_node("good", NODE_KEY_BYTES, sig);
1073        let update = ts_control::PeerUpdate::Delta {
1074            upsert: vec![good.clone()],
1075            remove: Vec::new(),
1076        };
1077
1078        tracker.apply_peer_update(&update);
1079
1080        assert_eq!(tracker.peer_db.peers().len(), 1);
1081        assert!(tracker.peer_db.get(&good.node_key).is_some());
1082    }
1083
1084    #[tokio::test]
1085    async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
1086        // Drive a real `Full` carrying a MIX of authorized + unauthorized peers. Only the
1087        // correctly-signed peer survives the Full upsert site; the unsigned and bad-sig peers are
1088        // dropped fail-closed.
1089        let (authority, sig) = authority_and_valid_sig();
1090        // A bad-sig variant of the same authorized signature (tamper the trailing byte).
1091        let mut bad_sig = sig.clone();
1092        let last = bad_sig.len() - 1;
1093        bad_sig[last] ^= 0xff;
1094
1095        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1096
1097        // Only the authorized peer carries NODE_KEY_BYTES (the key the authority signed); the
1098        // rejected peers use distinct node keys so the survivor is unambiguous.
1099        let good = peer_node("good", NODE_KEY_BYTES, sig);
1100        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1101        let bad = peer_node("bad", [9u8; 32], bad_sig);
1102
1103        let update =
1104            ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1105
1106        tracker.apply_peer_update(&update);
1107
1108        assert_eq!(tracker.peer_db.peers().len(), 1);
1109        assert!(tracker.peer_db.get(&good.node_key).is_some());
1110        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1111        assert!(tracker.peer_db.get(&bad.node_key).is_none());
1112    }
1113
1114    #[tokio::test]
1115    async fn tka_observe_only_admits_all_peers_in_mixed_batch() {
1116        // #136 observe-only contract: with the OBSERVE authority set (and `tka_authority = None`, so
1117        // enforcement is OFF), the exact mixed batch that the fail-closed test above prunes to 1 must
1118        // instead admit ALL THREE peers. The verify-and-log seam logs each verdict (verified /
1119        // unsigned / failed) but never gates admission. This locks observe-only against a future
1120        // refactor that accidentally wires `tka_observe` into a drop path.
1121        let (authority, sig) = authority_and_valid_sig();
1122        let mut bad_sig = sig.clone();
1123        let last = bad_sig.len() - 1;
1124        bad_sig[last] ^= 0xff;
1125
1126        // Authority in the OBSERVE slot, enforcement slot empty.
1127        let mut tracker = PeerTracker::for_test(test_env(), None, Some(authority));
1128
1129        let good = peer_node("good", NODE_KEY_BYTES, sig);
1130        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1131        let bad = peer_node("bad", [9u8; 32], bad_sig);
1132
1133        let update =
1134            ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1135
1136        tracker.apply_peer_update(&update);
1137
1138        // ALL THREE survive — observe-only never drops a peer.
1139        assert_eq!(
1140            tracker.peer_db.peers().len(),
1141            3,
1142            "observe-only must admit every peer regardless of signature verdict"
1143        );
1144        assert!(tracker.peer_db.get(&good.node_key).is_some());
1145        assert!(tracker.peer_db.get(&unsigned.node_key).is_some());
1146        assert!(tracker.peer_db.get(&bad.node_key).is_some());
1147    }
1148
1149    #[tokio::test]
1150    async fn tka_full_resync_revocation_behavior() {
1151        // Revocation-on-resync: admit a peer, then re-include the SAME stable_id in a `Full` with a
1152        // now-invalid signature. Per the Logic review finding, the pre-fix `retain` kept the stale
1153        // (previously-admitted) entry because membership was decided purely by stable_id.
1154        //
1155        // FIXED (not merely documented): the `Full` `retain` now keys on `tka_admits`-passing
1156        // stable_ids, so a peer whose re-included signature no longer verifies under the active
1157        // authority is EVICTED. This test asserts eviction. The inactive (authority=None) path is
1158        // provably unchanged — `tka_admits` always returns `true` there, so the retained set equals
1159        // the set of re-included stable_ids exactly (see `tka_inactive_full_resync_keeps_*`).
1160        let (authority, sig) = authority_and_valid_sig();
1161        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1162
1163        // 1) Admit the peer with a valid signature via a real `Full`.
1164        let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
1165        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1166        assert_eq!(tracker.peer_db.peers().len(), 1);
1167        assert!(tracker.peer_db.get(&good.node_key).is_some());
1168
1169        // 2) Re-sync the SAME stable_id, but with a now-invalid signature (tamper trailing byte).
1170        let mut bad_sig = sig;
1171        let last = bad_sig.len() - 1;
1172        bad_sig[last] ^= 0xff;
1173        let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
1174        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
1175
1176        // Eviction: the stale entry is dropped because its re-included signature fails the gate.
1177        assert_eq!(tracker.peer_db.peers().len(), 0);
1178        assert!(tracker.peer_db.get(&revoked.node_key).is_none());
1179    }
1180
1181    #[tokio::test]
1182    async fn tka_inactive_full_resync_keeps_reincluded_peer() {
1183        // Guard the inactive (authority=None) path against the revocation fix: with no authority,
1184        // a peer re-included in a `Full` survives regardless of its signature bytes — byte-for-byte
1185        // pre-TKA behavior, proving the `Full` `retain` change does not regress the always-taken
1186        // branch this wave.
1187        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1188
1189        let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
1190        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1191        assert_eq!(tracker.peer_db.peers().len(), 1);
1192
1193        // Re-sync the same stable_id with garbage signature bytes; inactive enforcement keeps it.
1194        let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
1195        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
1196        assert_eq!(tracker.peer_db.peers().len(), 1);
1197        assert!(tracker.peer_db.get(&resynced.node_key).is_some());
1198    }
1199
1200    /// A `Patch` for a peer already in the netmap merges only the fields it carries — here new UDP
1201    /// endpoints and a new home DERP — leaving the rest of the node intact. This is the fix for
1202    /// dropped `peers_changed_patch`: without it the netmap keeps stale endpoints and the peer can
1203    /// never re-handshake after it moves.
1204    #[tokio::test]
1205    async fn patch_merges_endpoints_and_derp_into_existing_peer() {
1206        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1207
1208        // Seed a peer (id == 1, per `peer_node`) with no endpoints / no DERP.
1209        let peer = peer_node("mover", [1u8; 32], vec![]);
1210        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1211        let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1212        assert!(before.underlay_addresses.is_empty());
1213        assert!(before.derp_region.is_none());
1214
1215        // Patch in fresh reachability (the idle-peer-reconnect case).
1216        let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1217        let patch = ts_control::PeerChange {
1218            id: 1,
1219            derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
1220            cap: None,
1221            cap_map: None,
1222            underlay_addresses: Some(vec![new_ep]),
1223            node_key: None,
1224            key_signature: None,
1225            disco_key: None,
1226            node_key_expiry: None,
1227            online: None,
1228            last_seen: None,
1229        };
1230        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1231
1232        assert_eq!(upserts.len(), 1);
1233        assert_eq!(deletions.len(), 0);
1234        // Same peer, now carrying the patched endpoint + DERP; node key untouched.
1235        assert_eq!(tracker.peer_db.peers().len(), 1);
1236        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1237        assert_eq!(after.underlay_addresses, vec![new_ep]);
1238        assert_eq!(
1239            after.derp_region,
1240            Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
1241        );
1242        assert_eq!(after.node_key, peer.node_key);
1243    }
1244
1245    /// Regression for `tsr-5u0`: when a whole-node set (`Delta`/`Full`) and a patch co-occur in one
1246    /// response, the patch is applied *on top of* the node the set just upserted — mirroring the
1247    /// handler's apply-order (peer set first, then `peer_patches`). Before the fix the patch shared
1248    /// the single `peer_update` slot and the co-occurring set silently dropped it, so a peer brought
1249    /// in by the delta kept stale (empty) reachability.
1250    #[tokio::test]
1251    async fn patch_applies_on_top_of_co_occurring_delta() {
1252        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1253
1254        // The whole-node delta upserts a brand-new peer (id == 1) with no reachability.
1255        let peer = peer_node("mover", [1u8; 32], vec![]);
1256        let (set_upserts, _) = tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1257            upsert: vec![peer.clone()],
1258            remove: vec![],
1259        });
1260        assert_eq!(set_upserts.len(), 1, "delta upserts the new peer");
1261
1262        // The patch from the SAME response then sets that peer's endpoints + DERP. This is exactly
1263        // the consumer order the handler runs (apply_peer_update then apply_peer_patches).
1264        let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1265        let patch = ts_control::PeerChange {
1266            id: 1,
1267            derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap())),
1268            cap: None,
1269            cap_map: None,
1270            underlay_addresses: Some(vec![new_ep]),
1271            node_key: None,
1272            key_signature: None,
1273            disco_key: None,
1274            node_key_expiry: None,
1275            online: None,
1276            last_seen: None,
1277        };
1278        let (patch_upserts, patch_deletions) =
1279            tracker.apply_peer_patches(std::slice::from_ref(&patch));
1280
1281        assert_eq!(
1282            patch_upserts.len(),
1283            1,
1284            "patch re-upserts the just-added peer"
1285        );
1286        assert_eq!(patch_deletions.len(), 0);
1287        // The peer added by the delta now carries the patched reachability — the patch was NOT lost.
1288        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1289        assert_eq!(after.underlay_addresses, vec![new_ep]);
1290        assert_eq!(
1291            after.derp_region,
1292            Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap()))
1293        );
1294    }
1295
1296    /// A `Patch` whose node id is not in the current netmap is ignored (the wire contract: a patch
1297    /// never creates a node). No upsert, no deletion, peer set unchanged.
1298    #[tokio::test]
1299    async fn patch_for_unknown_node_is_ignored() {
1300        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1301        let known = peer_node("known", [1u8; 32], vec![]); // id == 1
1302        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1303
1304        let patch = ts_control::PeerChange {
1305            id: 999, // not in the netmap
1306            derp_region: None,
1307            cap: None,
1308            cap_map: None,
1309            underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1310            node_key: None,
1311            key_signature: None,
1312            disco_key: None,
1313            node_key_expiry: None,
1314            online: None,
1315            last_seen: None,
1316        };
1317        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1318
1319        assert_eq!(upserts.len(), 0);
1320        assert_eq!(deletions.len(), 0);
1321        assert_eq!(tracker.peer_db.peers().len(), 1);
1322        assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1323    }
1324
1325    /// An expiry-only `Patch` updates `node_key_expiry` on the matching peer (Go
1326    /// `PeerChange.KeyExpiry`), rather than being silently dropped until the next full resync.
1327    #[tokio::test]
1328    async fn patch_updates_node_key_expiry() {
1329        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1330        let peer = peer_node("expiring", [1u8; 32], vec![]); // id == 1, node_key_expiry: None
1331        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1332
1333        let expiry = "2027-01-01T00:00:00Z"
1334            .parse::<chrono::DateTime<chrono::Utc>>()
1335            .unwrap();
1336        let patch = ts_control::PeerChange {
1337            id: 1,
1338            derp_region: None,
1339            cap: None,
1340            cap_map: None,
1341            underlay_addresses: None,
1342            node_key: None,
1343            key_signature: None,
1344            disco_key: None,
1345            node_key_expiry: Some(expiry),
1346            online: None,
1347            last_seen: None,
1348        };
1349        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1350
1351        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1352        assert_eq!(after.node_key_expiry, Some(expiry));
1353    }
1354
1355    /// Channel B: a `PeerChange.online` patch flips a peer's online state without a full node.
1356    #[tokio::test]
1357    async fn patch_updates_online() {
1358        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1359        let peer = peer_node("p", [1u8; 32], vec![]); // id == 1, online: None
1360        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1361        assert_eq!(
1362            tracker
1363                .peer_db
1364                .get(&(1 as ts_control::NodeId))
1365                .unwrap()
1366                .1
1367                .online,
1368            None
1369        );
1370
1371        let mut patch = ts_control::PeerChange {
1372            id: 1,
1373            derp_region: None,
1374            cap: None,
1375            cap_map: None,
1376            underlay_addresses: None,
1377            node_key: None,
1378            key_signature: None,
1379            disco_key: None,
1380            node_key_expiry: None,
1381            online: Some(true),
1382            last_seen: None,
1383        };
1384        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1385        assert_eq!(
1386            tracker
1387                .peer_db
1388                .get(&(1 as ts_control::NodeId))
1389                .unwrap()
1390                .1
1391                .online,
1392            Some(true),
1393            "PeerChange.online=Some(true) marks the peer online"
1394        );
1395
1396        // A subsequent patch flips it offline.
1397        patch.online = Some(false);
1398        tracker.apply_peer_patches(std::slice::from_ref(&patch));
1399        assert_eq!(
1400            tracker
1401                .peer_db
1402                .get(&(1 as ts_control::NodeId))
1403                .unwrap()
1404                .1
1405                .online,
1406            Some(false)
1407        );
1408    }
1409
1410    /// Channel C/D: the `online_change` map flips online directly; `peer_seen_change: false`
1411    /// ("the peer is gone") marks the peer offline. Both apply to a peer already in the netmap and
1412    /// ignore unknown ids.
1413    #[tokio::test]
1414    async fn liveness_change_maps_apply_online() {
1415        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1416        let peer = peer_node("p", [1u8; 32], vec![]); // id == 1
1417        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1418
1419        // Channel C: online_change sets online=true.
1420        let mut online_change = std::collections::BTreeMap::new();
1421        online_change.insert(1 as ts_control::NodeId, true);
1422        online_change.insert(999 as ts_control::NodeId, true); // unknown id — ignored
1423        let changed = tracker.apply_liveness_changes(&online_change, &Default::default());
1424        assert!(changed);
1425        assert_eq!(
1426            tracker
1427                .peer_db
1428                .get(&(1 as ts_control::NodeId))
1429                .unwrap()
1430                .1
1431                .online,
1432            Some(true)
1433        );
1434
1435        // Channel D: peer_seen_change=false marks the peer offline (gone), node retained.
1436        let mut peer_seen_change = std::collections::BTreeMap::new();
1437        peer_seen_change.insert(1 as ts_control::NodeId, false);
1438        let changed = tracker.apply_liveness_changes(&Default::default(), &peer_seen_change);
1439        assert!(changed);
1440        assert_eq!(
1441            tracker
1442                .peer_db
1443                .get(&(1 as ts_control::NodeId))
1444                .unwrap()
1445                .1
1446                .online,
1447            Some(false),
1448            "peer_seen_change=false marks offline (the node stays in the netmap)"
1449        );
1450        assert_eq!(
1451            tracker.peer_db.peers().len(),
1452            1,
1453            "the node is retained, not removed"
1454        );
1455
1456        // No-op when nothing matches / changes.
1457        assert!(!tracker.apply_liveness_changes(&Default::default(), &Default::default()));
1458    }
1459
1460    /// Security: a `Patch` that rotates the node key must re-satisfy the tailnet-lock authority,
1461    /// exactly like a `Delta` upsert. A key-rotation patch whose new signature does NOT verify
1462    /// evicts the peer (fail-closed) rather than leaving a now-unverified entry — closing what would
1463    /// otherwise be a trust-enforcement bypass via the patch path.
1464    #[tokio::test]
1465    async fn patch_key_rotation_failing_tka_evicts_peer() {
1466        let (authority, sig) = authority_and_valid_sig();
1467        let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1468
1469        // Admit a correctly-signed peer (id == 1).
1470        let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1471        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1472        assert_eq!(tracker.peer_db.peers().len(), 1);
1473
1474        // Patch a new node key whose signature is garbage under the active authority.
1475        let patch = ts_control::PeerChange {
1476            id: 1,
1477            derp_region: None,
1478            cap: None,
1479            cap_map: None,
1480            underlay_addresses: None,
1481            node_key: Some([0x33u8; 32].into()),
1482            key_signature: Some(vec![0x00, 0x01, 0x02]),
1483            disco_key: None,
1484            node_key_expiry: None,
1485            online: None,
1486            last_seen: None,
1487        };
1488        let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1489
1490        assert_eq!(upserts.len(), 0);
1491        assert_eq!(deletions.len(), 1);
1492        assert_eq!(tracker.peer_db.peers().len(), 0);
1493    }
1494
1495    /// A node's `user_id` joins against the accumulated UserProfiles table to resolve the owning
1496    /// user's login name in `WhoIs.user`. With no matching profile, `user` is `None` (the
1497    /// pre-existing behavior); once a profile arrives, the same node resolves to its login. This
1498    /// proves the accumulate-then-join path the netmap handler builds.
1499    fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1500        ts_control::UserProfile {
1501            id,
1502            login_name: login.to_string(),
1503            display_name: None,
1504        }
1505    }
1506
1507    #[tokio::test]
1508    async fn whois_resolves_user_from_accumulated_profiles() {
1509        let mut tracker = PeerTracker::for_test(test_env(), None, None);
1510
1511        // A peer owned by user id 42 at 100.64.0.1 (the peer_node fixture's address).
1512        let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1513        peer.user_id = 42;
1514        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1515        let addr = "100.64.0.1:0".parse().unwrap();
1516
1517        // No profile yet: the node resolves but its owner is unknown.
1518        let who = tracker.whois_opt(addr).expect("peer is known");
1519        assert_eq!(who.user, None);
1520
1521        // Profile for a DIFFERENT user must not match.
1522        tracker
1523            .user_profiles
1524            .insert(7, profile(7, "someone-else@example.com"));
1525        assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1526
1527        // The owning user's profile arrives (as the netmap handler would accumulate it): now the
1528        // login resolves.
1529        tracker
1530            .user_profiles
1531            .insert(42, profile(42, "alice@example.com"));
1532        assert_eq!(
1533            tracker.whois_opt(addr).unwrap().user,
1534            Some("alice@example.com".to_string())
1535        );
1536    }
1537
1538    /// `UserProfile::best_label` prefers the login name, falling back to display name, else `None`.
1539    #[test]
1540    fn user_profile_best_label_prefers_login() {
1541        assert_eq!(
1542            profile(1, "alice@example.com").best_label(),
1543            Some("alice@example.com".to_string())
1544        );
1545        let display_only = ts_control::UserProfile {
1546            id: 2,
1547            login_name: String::new(),
1548            display_name: Some("Bob".to_string()),
1549        };
1550        assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1551        let empty = ts_control::UserProfile {
1552            id: 3,
1553            login_name: String::new(),
1554            display_name: None,
1555        };
1556        assert_eq!(empty.best_label(), None);
1557    }
1558}