Skip to main content

ts_runtime/peer_tracker/
mod.rs

1//! Peer delta update tracking.
2
3use std::{
4    collections::{HashMap, HashSet},
5    net::IpAddr,
6    sync::Arc,
7};
8
9use kameo::{
10    actor::ActorRef,
11    message::{Context, Message},
12    reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24/// Actor that tracks peer delta updates and emits new states.
25pub struct PeerTracker {
26    peer_db: PeerDb,
27    seen_state_update: bool,
28    pending_requests: Vec<Pending>,
29    /// Latest peer snapshot, published on every netmap update so embedders can watch for peer
30    /// changes ([`WatchNetmap`]).
31    peer_watch: watch::Sender<Vec<StatusNode>>,
32    /// Accumulated netmap user profiles (`MapResponse.UserProfiles`), keyed by user id, joined
33    /// against a node's [`Node::user_id`](ts_control::Node::user_id) to resolve the owning user's
34    /// login/display name for a [`WhoIs`](crate::status::WhoIs). Control sends these incrementally
35    /// (only new/changed profiles per response), so this map **accumulates** across updates rather
36    /// than being replaced — a peer upserted in one response may reference a profile delivered in an
37    /// earlier one.
38    user_profiles: HashMap<UserId, UserProfile>,
39    /// Tailnet-Lock (TKA) authority used to verify each peer's `key_signature` at the peer-trust
40    /// chokepoint. When `Some`, enforcement is **active**: every upserted peer must present a
41    /// signature this authority authorizes, or it is rejected (fail-closed). When `None` (always,
42    /// this wave) enforcement is **inactive** and every peer is upserted — identical to pre-TKA
43    /// behavior. There is no live `Authority` source yet: building one requires the
44    /// `/machine/tka/sync` Noise RPC + AUM-chain replayer (deferred, see SECURITY.md). The
45    /// enforcement path below is wired and unit-tested, and flips on the instant an authority is
46    /// supplied; it is explicitly gated, not a silent no-op.
47    tka_authority: Option<ts_tka::Authority>,
48    env: Env,
49}
50
51impl PeerTracker {
52    fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
53        // Canonicalization (case + trailing dot) is handled inside the name index lookup.
54        self.peer_db.get(&name).map(|(_id, node)| node)
55    }
56
57    fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
58        self.peer_db.get(&ip).map(|(_id, node)| node)
59    }
60
61    /// Build the peer entries for a [`Status`](crate::Status) snapshot from the current peer db.
62    fn status_peers(&self) -> Vec<StatusNode> {
63        self.peer_db
64            .peers()
65            .values()
66            .map(StatusNode::from_node)
67            .collect()
68    }
69
70    fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
71        let ip = crate::status::whois_addr(addr);
72        let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
73        // Join the node's owning user id against the accumulated UserProfiles table to resolve a
74        // login/display name. `None` when control sent no profile for that user (e.g. tagged nodes
75        // with no human owner, or a profile not yet delivered).
76        let user = self.resolve_user(node.user_id);
77        Some(crate::status::WhoIs::from_node_with_user(node, user))
78    }
79
80    /// Resolve a user id to its best display label from the accumulated profile table.
81    fn resolve_user(&self, user_id: UserId) -> Option<String> {
82        self.user_profiles
83            .get(&user_id)
84            .and_then(UserProfile::best_label)
85    }
86
87    /// Whether `node` may be admitted to the peer db under the current Tailnet-Lock posture.
88    ///
89    /// Fail-closed and gated:
90    /// - No [`tka_authority`](Self::tka_authority) ⇒ enforcement inactive ⇒ always admit (today's
91    ///   behavior; this is the always-taken branch this wave).
92    /// - Authority present + peer carries a `key_signature` that the authority authorizes for the
93    ///   peer's node key ⇒ admit.
94    /// - Authority present + signature missing or unauthorized/invalid ⇒ **reject** (Go denies
95    ///   network access to unsigned peers under tailnet lock; we do not upsert them).
96    fn tka_admits(&self, node: &Node) -> bool {
97        let Some(auth) = &self.tka_authority else {
98            return true;
99        };
100
101        if node.key_signature.is_empty() {
102            // TKA active but peer presented no signature: reject (Go denies network access to
103            // unsigned peers under tailnet lock, unless UnsignedPeerAPIOnly — out of scope here).
104            tracing::warn!(
105                stable_id = ?node.stable_id,
106                "TKA: rejecting unsigned peer under tailnet lock"
107            );
108            return false;
109        }
110
111        if let Err(e) = auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
112            tracing::warn!(
113                stable_id = ?node.stable_id,
114                error = %e,
115                "TKA: rejecting peer with unauthorized node key"
116            );
117            return false;
118        }
119
120        true
121    }
122}
123
124impl kameo::Actor for PeerTracker {
125    type Args = Env;
126    type Error = Error;
127
128    async fn on_start(env: Self::Args, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
129        env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
130
131        let (peer_watch, _) = watch::channel(Vec::new());
132
133        Ok(Self {
134            peer_db: PeerDb::default(),
135            pending_requests: Default::default(),
136            seen_state_update: false,
137            peer_watch,
138            user_profiles: HashMap::new(),
139            // No live TKA authority source this wave (the `/machine/tka/sync` RPC + AUM replayer are
140            // deferred); enforcement stays inactive until one is supplied. See `tka_authority`.
141            tka_authority: None,
142            env,
143        })
144    }
145}
146
147enum Pending {
148    PeerByName(PeerByName, ReplySender<Option<Node>>),
149    AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
150    TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
151    Status(ReplySender<Vec<StatusNode>>),
152    WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
153}
154
155// For messages with arguments, a struct is generated with the args as fields. They aren't
156// documented, and we can't apply attributes directly to the fields. Hence, wrap in a module where
157// docs are turned off everywhere.
158#[allow(missing_docs)]
159mod msg_impl {
160    use std::net::IpAddr;
161
162    use kameo::prelude::DelegatedReply;
163
164    use super::*;
165
166    #[kameo::messages]
167    impl PeerTracker {
168        /// Lookup a peer by name.
169        ///
170        /// Waits until we've received at least one peer update from control.
171        #[message(ctx)]
172        pub async fn peer_by_name(
173            &mut self,
174            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
175            name: String,
176        ) -> DelegatedReply<Option<Node>> {
177            let (deleg, sender) = ctx.reply_sender();
178            let Some(sender) = sender else { return deleg };
179
180            if !self.seen_state_update {
181                tracing::debug!(query = name, "no peer state seen yet, queueing request");
182
183                self.pending_requests
184                    .push(Pending::PeerByName(PeerByName { name }, sender));
185
186                return deleg;
187            }
188
189            sender.send(self.peer_by_name_opt(&name).cloned());
190
191            deleg
192        }
193
194        /// Lookup all peers that accept packets addressed to the given IP.
195        ///
196        /// This includes the peer's tailnet address and any subnet routes it provides. Only
197        /// the peers with the most specific subnet route match that covers `ip` will be
198        /// returned.
199        ///
200        /// E.g., suppose:
201        ///
202        /// - We're querying for `10.1.2.3`
203        /// - `PeerA` and `PeerB` have accepted routes for `10.1.2.0/24`
204        /// - `PeerC` has an accepted route for `10.1.0.0/16`
205        ///
206        /// Only `PeerA` and `PeerB` will be returned, since they have the most specific
207        /// prefix match.
208        #[message(ctx)]
209        pub fn peer_by_accepted_route(
210            &mut self,
211            ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
212            ip: IpAddr,
213        ) -> DelegatedReply<Vec<Node>> {
214            let (deleg, sender) = ctx.reply_sender();
215            let Some(sender) = sender else { return deleg };
216
217            if !self.seen_state_update {
218                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
219
220                self.pending_requests
221                    .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
222
223                return deleg;
224            }
225
226            sender.send(
227                self.peer_db
228                    .get_route(ip.into())
229                    .map(|(_id, node)| node.clone())
230                    .collect(),
231            );
232
233            deleg
234        }
235
236        /// Lookup the peer that has the given tailnet IP address.
237        #[message(ctx)]
238        pub fn peer_by_tailnet_ip(
239            &mut self,
240            ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
241            ip: IpAddr,
242        ) -> DelegatedReply<Option<Node>> {
243            let (deleg, sender) = ctx.reply_sender();
244            let Some(sender) = sender else { return deleg };
245
246            if !self.seen_state_update {
247                tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
248
249                self.pending_requests
250                    .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
251
252                return deleg;
253            }
254
255            sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
256
257            deleg
258        }
259
260        /// Build the peer entries of a [`Status`](crate::Status) snapshot.
261        ///
262        /// Returns one [`StatusNode`] per known peer. The self node is *not* included here (it
263        /// lives in the control runner); [`Runtime::status`](crate::Runtime::status) combines both.
264        ///
265        /// Waits until we've received at least one peer update from control.
266        #[message(ctx)]
267        pub fn get_status(
268            &mut self,
269            ctx: &mut Context<Self, DelegatedReply<Vec<StatusNode>>>,
270        ) -> DelegatedReply<Vec<StatusNode>> {
271            let (deleg, sender) = ctx.reply_sender();
272            let Some(sender) = sender else { return deleg };
273
274            if !self.seen_state_update {
275                tracing::debug!("no peer state seen yet, queueing status request");
276                self.pending_requests.push(Pending::Status(sender));
277                return deleg;
278            }
279
280            sender.send(self.status_peers());
281
282            deleg
283        }
284
285        /// Return every known peer's full domain [`Node`] (not the lossy [`StatusNode`]).
286        ///
287        /// Used by [`Runtime::file_targets`](crate::Runtime::file_targets), which needs the full node
288        /// (peerAPI address, owning user id, cap map) to compute Taildrop send targets. The self node
289        /// is not included (it lives in the control runner). Returns empty before the first netmap —
290        /// the natural "not connected yet" analog (an immediate answer, no queueing needed: callers
291        /// that need a populated list await `Running` first).
292        #[message]
293        pub fn all_peers(&self) -> Vec<Node> {
294            self.peer_db.peers().values().cloned().collect()
295        }
296
297        /// Resolve which node owns a tailnet source address.
298        ///
299        /// Maps the source IP of `addr` to the owning node via the tailnet-IP index, returning a
300        /// [`WhoIs`](crate::WhoIs). The port is ignored (a tailnet IP uniquely identifies a node).
301        ///
302        /// The resulting [`WhoIs`](crate::WhoIs) carries no user/login or capability data: this
303        /// fork's domain [`Node`](ts_control::Node) does not retain those wire fields. See the
304        /// [`status`](crate::status) module docs for the gap.
305        ///
306        /// Waits until we've received at least one peer update from control.
307        #[message(ctx)]
308        pub fn whois(
309            &mut self,
310            ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
311            addr: std::net::SocketAddr,
312        ) -> DelegatedReply<Option<crate::status::WhoIs>> {
313            let (deleg, sender) = ctx.reply_sender();
314            let Some(sender) = sender else { return deleg };
315
316            if !self.seen_state_update {
317                tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
318                self.pending_requests
319                    .push(Pending::WhoIs(Whois { addr }, sender));
320                return deleg;
321            }
322
323            sender.send(self.whois_opt(addr));
324
325            deleg
326        }
327
328        /// Subscribe to netmap peer-change events.
329        ///
330        /// Returns a [`watch::Receiver`] whose value is the current set of peer
331        /// [`StatusNode`]s, updated on every netmap state update from control. Embedders can await
332        /// changes via [`watch::Receiver::changed`] to react to peers joining, leaving, or changing.
333        ///
334        /// The receiver's initial value is the peer set at subscription time (empty before the
335        /// first netmap update). This is a peer-only view; combine with the self node from
336        /// [`Runtime::status`](crate::Runtime::status) when a full snapshot is needed.
337        #[message(derive(Clone))]
338        pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
339            self.peer_watch.subscribe()
340        }
341    }
342}
343
344pub use msg_impl::*;
345
346#[derive(Debug, Clone)]
347pub(crate) struct PeerState {
348    #[allow(unused)]
349    pub deletions: HashSet<PeerId>,
350    #[allow(unused)]
351    pub upserts: HashSet<PeerId>,
352    pub peers: Arc<PeerDb>,
353}
354
355impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
356    type Reply = ();
357
358    async fn handle(
359        &mut self,
360        msg: Arc<ts_control::StateUpdate>,
361        _ctx: &mut Context<Self, Self::Reply>,
362    ) {
363        // Accumulate user profiles first — control sends them incrementally and a response may
364        // carry profiles with no peer delta (or peers that reference a profile from an earlier
365        // response), so this must happen before the no-peer-update early return below.
366        for profile in &msg.user_profiles {
367            self.user_profiles.insert(profile.id, profile.clone());
368        }
369
370        // Apply the standalone online/last-seen delta maps (channels C/D, `MapResponse.OnlineChange`
371        // / `PeerSeenChange`). These arrive keyed by control node id and may ride a response that
372        // carries NO `peer_update` (a bare online flip is the common case), so they must be applied
373        // *before* the no-peer-update early return — otherwise online status freezes at the last
374        // full-node/patch value. Each entry only ever *sets* a value (never back to unknown).
375        let liveness_changed =
376            self.apply_liveness_changes(&msg.online_change, &msg.peer_seen_change);
377
378        let Some(peer_update) = &msg.peer_update else {
379            // No peer set/patch this response. If a liveness delta still mutated the netmap, publish
380            // the refreshed snapshot so watchers (and `GetStatus`) see the new online state.
381            if liveness_changed {
382                self.service_pending_requests();
383                self.peer_watch.send_replace(self.status_peers());
384                if let Err(e) = self
385                    .env
386                    .publish(Arc::new(PeerState {
387                        upserts: HashSet::default(),
388                        deletions: HashSet::default(),
389                        peers: Arc::new(self.peer_db.clone()),
390                    }))
391                    .await
392                {
393                    tracing::error!(error = %e, "publishing liveness-only peer state update");
394                }
395            }
396            return;
397        };
398
399        let (upserts, deletions) = self.apply_peer_update(peer_update);
400
401        tracing::debug!(
402            n_upsert = upserts.len(),
403            n_delete = deletions.len(),
404            peer_count = self.peer_db.peers().len(),
405            "new peer state"
406        );
407
408        self.service_pending_requests();
409
410        // Publish the latest peer snapshot to netmap watchers. `send_replace` keeps the receiver's
411        // value current even when there are no subscribers, so a late subscriber sees fresh state.
412        self.peer_watch.send_replace(self.status_peers());
413
414        if let Err(e) = self
415            .env
416            .publish(Arc::new(PeerState {
417                upserts,
418                deletions,
419                peers: Arc::new(self.peer_db.clone()),
420            }))
421            .await
422        {
423            tracing::error!(error = %e, "publishing peer state update");
424        }
425    }
426}
427
428/// Ask the peer tracker to re-broadcast its current peer snapshot on the bus, without any peer
429/// change. `Device::set_exit_node` sends this after changing the exit-node selector so the route
430/// updater and source filter (both `Arc<PeerState>` subscribers) re-resolve the new selector
431/// immediately, rather than waiting for the next netmap update.
432#[derive(Debug, Clone, Copy)]
433pub struct RepublishState;
434
435impl Message<RepublishState> for PeerTracker {
436    type Reply = ();
437
438    async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
439        // An empty upsert/deletion set: this is a re-broadcast of the unchanged peer set, not a
440        // delta. Subscribers recompute their routes/filters against the current peers and the
441        // (just-updated) exit-node selector.
442        if let Err(e) = self
443            .env
444            .publish(Arc::new(PeerState {
445                upserts: HashSet::default(),
446                deletions: HashSet::default(),
447                peers: Arc::new(self.peer_db.clone()),
448            }))
449            .await
450        {
451            tracing::error!(error = %e, "re-publishing peer state after exit-node change");
452        }
453    }
454}
455
456impl PeerTracker {
457    /// Apply a single [`PeerUpdate`](ts_control::PeerUpdate) to the peer db, enforcing the
458    /// Tailnet-Lock peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) at every upsert site.
459    ///
460    /// This is the **single source of truth** for the peer-trust enforcement loop: the actor's
461    /// netmap [`handle`](Message::handle) calls it, and so do the TKA enforcement tests, so the two
462    /// real upsert sites (`Full` and `Delta { upsert }`) cannot diverge from what is tested.
463    ///
464    /// Returns `(upserts, deletions)` — the [`PeerId`]s touched — for downstream bookkeeping.
465    fn apply_peer_update(
466        &mut self,
467        peer_update: &ts_control::PeerUpdate,
468    ) -> (HashSet<PeerId>, HashSet<PeerId>) {
469        let mut upserts = HashSet::default();
470        let mut deletions = HashSet::default();
471
472        match peer_update {
473            ts_control::PeerUpdate::Full(new_nodes) => {
474                tracing::trace!("full peer update");
475
476                // Only stable_ids that PASS the Tailnet-Lock gate survive a full re-sync. This makes
477                // revocation evict: if a peer is re-included with a now-invalid (or missing)
478                // signature under an active authority, it is excluded from `retained_ids`, so
479                // `retain` drops the stale (previously-admitted) entry rather than leaving it in the
480                // db unverified. With no authority, `tka_admits` is always `true`, so `retained_ids`
481                // is exactly the set of re-included stable_ids — the inactive path is byte-for-byte
482                // the pre-TKA behavior (no regression).
483                let retained_ids = new_nodes
484                    .iter()
485                    .filter(|node| self.tka_admits(node))
486                    .map(|x| &x.stable_id)
487                    .collect::<HashSet<_>>();
488
489                self.peer_db.retain(|id, peer| {
490                    let retain = retained_ids.contains(&peer.stable_id);
491
492                    if !retain {
493                        deletions.insert(id);
494                    }
495
496                    retain
497                });
498
499                for node in new_nodes {
500                    if !self.tka_admits(node) {
501                        continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
502                    }
503                    let peer_id = self.peer_db.upsert(node);
504                    upserts.insert(peer_id);
505                }
506            }
507
508            ts_control::PeerUpdate::Delta { remove, upsert } => {
509                tracing::trace!("delta peer update");
510
511                for peer in upsert {
512                    if !self.tka_admits(peer) {
513                        continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
514                    }
515                    let id = self.peer_db.upsert(peer);
516
517                    upserts.insert(id);
518                }
519
520                for peer in remove {
521                    let Some((id, _node)) = self.peer_db.remove(peer) else {
522                        tracing::error!(control_node_id = peer, "removed peer was unknown");
523                        continue;
524                    };
525
526                    deletions.insert(id);
527                }
528            }
529
530            ts_control::PeerUpdate::Patch(patches) => {
531                tracing::trace!(n = patches.len(), "peer patch update");
532
533                for patch in patches {
534                    // A patch only mutates a peer already in the netmap; an unknown node id is
535                    // ignored (the wire contract — a patch never creates a node). Clone the current
536                    // node, apply the present fields, and re-upsert through the same path as a
537                    // delta so indexes/routes stay consistent.
538                    let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
539                        tracing::debug!(
540                            control_node_id = patch.id,
541                            "peer patch for unknown node; ignoring"
542                        );
543                        continue;
544                    };
545
546                    let mut node = existing.clone();
547                    if let Some(endpoints) = &patch.underlay_addresses {
548                        node.underlay_addresses = endpoints.clone();
549                    }
550                    if let Some(derp) = patch.derp_region {
551                        node.derp_region = Some(derp);
552                    }
553                    if let Some(cap) = patch.cap {
554                        node.cap = cap;
555                    }
556                    if let Some(cap_map) = &patch.cap_map {
557                        node.cap_map = cap_map.clone();
558                    }
559                    if let Some(disco_key) = patch.disco_key {
560                        node.disco_key = Some(disco_key);
561                    }
562                    if let Some(expiry) = patch.node_key_expiry {
563                        node.node_key_expiry = Some(expiry);
564                    }
565                    // Online/last-seen liveness deltas (`PeerChange.Online`/`LastSeen`) — the
566                    // dominant channel by which peer online transitions arrive mid-session. A patch
567                    // only ever *sets* a value (never patches back to unknown), so apply when present.
568                    if let Some(online) = patch.online {
569                        node.online = Some(online);
570                    }
571                    if let Some(last_seen) = patch.last_seen {
572                        node.last_seen = Some(last_seen);
573                    }
574                    // Key rotation: a patch may swap the node key (and its TKA signature). Apply
575                    // both together so the trust gate below verifies the new signature against the
576                    // new key, never a mismatched pair.
577                    if let Some(node_key) = patch.node_key {
578                        node.node_key = node_key;
579                    }
580                    if let Some(sig) = &patch.key_signature {
581                        node.key_signature = sig.clone();
582                    }
583
584                    // Re-run the tailnet-lock gate on the patched node: a patch that rotates the key
585                    // must satisfy the active authority, exactly like a `Delta` upsert, or it would
586                    // be a trust-enforcement bypass. fail-CLOSED — if the patched node is no longer
587                    // admitted, evict it rather than keep the stale (now-unverified) entry.
588                    if !self.tka_admits(&node) {
589                        if let Some((id, _)) = self.peer_db.remove(&patch.id) {
590                            tracing::warn!(
591                                control_node_id = patch.id,
592                                "peer patch rejected by tailnet lock; evicting peer"
593                            );
594                            deletions.insert(id);
595                        }
596                        continue;
597                    }
598
599                    let id = self.peer_db.upsert(&node);
600                    upserts.insert(id);
601                }
602            }
603        }
604
605        (upserts, deletions)
606    }
607
608    /// Apply the standalone online/last-seen delta maps (`MapResponse.OnlineChange` /
609    /// `PeerSeenChange`, channels C/D) onto the retained netmap. Returns `true` if any node was
610    /// actually mutated (so the caller knows whether to re-publish).
611    ///
612    /// Mirrors Go's post-`peers*` application of these maps. Each entry is keyed by control node id
613    /// and only ever *sets* a value (never back to unknown). An entry for an unknown node id is
614    /// ignored (like a patch — these maps never create a node). `peer_seen_change`'s `false` ("the
615    /// peer is gone") is applied as `online = Some(false)` — the node stays in the netmap, it is
616    /// merely marked offline; the `last_seen = now` update for the `true` case is intentionally not
617    /// performed here (it needs a wall clock this actor does not hold, and `last_seen` is the
618    /// low-value half — `online` is the `tailscale status` column that matters; see the iter-5
619    /// research note §5.5).
620    fn apply_liveness_changes(
621        &mut self,
622        online_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
623        peer_seen_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
624    ) -> bool {
625        let mut changed = false;
626
627        // Channel C — direct online flips.
628        for (&node_id, &online) in online_change {
629            if let Some((_pid, existing)) = self.peer_db.get(&node_id)
630                && existing.online != Some(online)
631            {
632                let mut node = existing.clone();
633                node.online = Some(online);
634                self.peer_db.upsert(&node);
635                changed = true;
636            }
637        }
638
639        // Channel D — peer-seen flips. `false` ⇒ "the peer is gone" ⇒ mark offline (the node is
640        // retained, not removed). `true` ⇒ "seen just now"; the online half is unknown from this
641        // signal alone, so we leave `online` untouched (a `true` here does not assert connectivity to
642        // control, only recent contact) and defer the `last_seen = now` timestamp (no clock here).
643        for (&node_id, &seen) in peer_seen_change {
644            if !seen
645                && let Some((_pid, existing)) = self.peer_db.get(&node_id)
646                && existing.online != Some(false)
647            {
648                let mut node = existing.clone();
649                node.online = Some(false);
650                self.peer_db.upsert(&node);
651                changed = true;
652            }
653        }
654
655        changed
656    }
657
658    /// Test-only constructor: build a [`PeerTracker`] with a chosen [`tka_authority`](Self::tka_authority)
659    /// without going through the actor `on_start` path. Used by the TKA enforcement unit tests to
660    /// exercise the peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) directly.
661    #[cfg(test)]
662    fn for_test(env: Env, tka_authority: Option<ts_tka::Authority>) -> Self {
663        let (peer_watch, _) = watch::channel(Vec::new());
664        Self {
665            peer_db: PeerDb::default(),
666            seen_state_update: false,
667            pending_requests: Vec::new(),
668            peer_watch,
669            user_profiles: HashMap::new(),
670            tka_authority,
671            env,
672        }
673    }
674
675    fn service_pending_requests(&mut self) {
676        if self.seen_state_update {
677            return;
678        }
679
680        self.seen_state_update = true;
681
682        if !self.pending_requests.is_empty() {
683            tracing::debug!(
684                n_pending = self.pending_requests.len(),
685                "state update received, servicing pending requests"
686            );
687        }
688
689        for req in core::mem::take(&mut self.pending_requests) {
690            match req {
691                Pending::PeerByName(PeerByName { name }, reply) => {
692                    reply.send(self.peer_by_name_opt(&name).cloned());
693                }
694                Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
695                    reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
696                }
697                Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
698                    reply.send(
699                        self.peer_db
700                            .get_route(ip.into())
701                            .map(|(_id, node)| node.clone())
702                            .collect(),
703                    );
704                }
705                Pending::Status(reply) => {
706                    reply.send(self.status_peers());
707                }
708                Pending::WhoIs(Whois { addr }, reply) => {
709                    reply.send(self.whois_opt(addr));
710                }
711            }
712        }
713    }
714}
715
716#[cfg(test)]
717mod tka_tests {
718    //! Tailnet-Lock (TKA) enforcement tests for the peer-trust chokepoint.
719    //!
720    //! These exercise [`PeerTracker::tka_admits`] and the `tka_admits ⇒ upsert` loop the netmap
721    //! handler runs. The test [`ts_tka::Authority`] is built with [`ts_tka::Authority::from_state`]
722    //! over a known Ed25519 trusted key, and the signed node-key signature CBOR is produced through
723    //! `ts_tka`'s public `cbor` encoder + `aum_hash` (the exact same canonical bytes `ts_tka`'s own
724    //! `direct_signature_verifies_end_to_end` test signs, with no new crypto vectors invented and no
725    //! private `ts_tka` API used).
726
727    use ed25519_dalek::{Signer, SigningKey};
728    use ts_control::{Node, StableNodeId, TailnetAddress};
729    use ts_tka::{
730        AumHash, Authority, Key, KeyKind, State,
731        cbor::{self, Value},
732    };
733
734    use super::*;
735
736    /// `SigKind::Direct` wire value (Go `SigKind`; `ts_tka::SigKind::Direct = 1`).
737    const SIG_KIND_DIRECT: u64 = 1;
738
739    /// The 32-byte node key used across the signed-peer fixtures.
740    const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
741
742    /// Build a real [`Env`] for the tracker. Only the bus/keys/shutdown plumbing matters here; the
743    /// TKA gate reads neither, so the forwarding preferences are all benign defaults.
744    fn test_env() -> Env {
745        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
746        Env::new(
747            ts_keys::NodeState::generate(),
748            shutdown_rx,
749            crate::env::ForwarderConfig {
750                accept_routes: false,
751                exit_node: None,
752                forward_routes: Vec::new(),
753                forward_tcp_ports: Vec::new(),
754                forward_udp_ports: Vec::new(),
755                forward_all_ports: false,
756                forward_exit_egress: false,
757                exit_proxy: None,
758                peerapi_port: None,
759                taildrop_dir: None,
760                enable_ipv6: false,
761                persistent_keepalive_interval: None,
762                ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
763            },
764        )
765    }
766
767    /// A minimal peer [`Node`] carrying `node_key` and the given `key_signature`.
768    fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
769        Node {
770            id: 1,
771            stable_id: StableNodeId(stable_id.to_string()),
772            hostname: stable_id.to_string(),
773            user_id: 0,
774            tailnet: Some("ts.net".to_string()),
775            tags: Vec::new(),
776            tailnet_address: TailnetAddress {
777                ipv4: "100.64.0.1/32".parse().unwrap(),
778                ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
779            },
780            node_key: node_key.into(),
781            node_key_expiry: None,
782            online: None,
783            last_seen: None,
784            key_signature,
785            machine_key: None,
786            disco_key: None,
787            accepted_routes: Vec::new(),
788            underlay_addresses: Vec::new(),
789            derp_region: None,
790            cap: Default::default(),
791            cap_map: Default::default(),
792            peerapi_port: None,
793            peerapi_dns_proxy: false,
794            is_wireguard_only: false,
795            exit_node_dns_resolvers: Vec::new(),
796            peer_relay: false,
797            service_vips: Default::default(),
798        }
799    }
800
801    /// Encode a `Direct` [`ts_tka::NodeKeySignature`] CBOR exactly as `ts_tka`'s private `to_cbor`
802    /// does (int-map keys: 1=kind, 2=pubkey, 3=key_id, 4=signature; empty byte fields omitted),
803    /// using only the crate's *public* `cbor` encoder. `signature` of `None` produces the
804    /// signing-digest preimage (the `SigHash` form).
805    fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
806        let mut pairs = alloc_pairs(node_key, key_id);
807        if let Some(sig) = signature {
808            pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
809        }
810        cbor::int_map(pairs).to_vec()
811    }
812
813    fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
814        vec![
815            (1, Some(Value::Uint(SIG_KIND_DIRECT))),
816            (2, Some(Value::Bytes(node_key.to_vec()))),
817            (3, Some(Value::Bytes(key_id.to_vec()))),
818        ]
819    }
820
821    /// Build a TKA [`Authority`] that trusts `signing.verifying_key()`, plus a valid `Direct`
822    /// node-key signature CBOR authorizing [`NODE_KEY_BYTES`] under it.
823    fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
824        // A fixed, known Ed25519 trusted key (mirrors ts_tka's own end-to-end test seed).
825        let signing = SigningKey::from_bytes(&[42u8; 32]);
826        let trusted_pub = signing.verifying_key().to_bytes().to_vec();
827
828        let authority = Authority::from_state(
829            AumHash([0; 32]),
830            State {
831                keys: vec![Key {
832                    kind: KeyKind::Ed25519,
833                    votes: 1,
834                    public: trusted_pub.clone(),
835                }],
836            },
837        );
838
839        // SigHash preimage = canonical CBOR with the signature field omitted; sign its blake2s hash.
840        let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
841        let sig_hash = ts_tka::aum_hash(&preimage).0;
842        let signature = signing.sign(&sig_hash).to_bytes().to_vec();
843
844        let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
845        // Sanity: the authority accepts the signature we just built (same path the gate uses).
846        assert!(
847            authority
848                .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
849                .is_ok()
850        );
851
852        (authority, signed_cbor)
853    }
854
855    #[tokio::test]
856    async fn tka_inactive_upserts_all_peers() {
857        // No authority ⇒ enforcement inactive ⇒ both a signed and an unsigned peer are admitted.
858        let mut tracker = PeerTracker::for_test(test_env(), None);
859
860        let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
861        let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
862
863        assert!(tracker.tka_admits(&signed));
864        assert!(tracker.tka_admits(&unsigned));
865
866        tracker.peer_db.upsert(&signed);
867        tracker.peer_db.upsert(&unsigned);
868        assert_eq!(tracker.peer_db.peers().len(), 2);
869    }
870
871    #[tokio::test]
872    async fn tka_active_rejects_unsigned_peer() {
873        // Authority present + peer presents no signature ⇒ rejected (fail-closed), not in peer_db.
874        let (authority, _sig) = authority_and_valid_sig();
875        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
876
877        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
878        assert!(!tracker.tka_admits(&unsigned));
879
880        // Mirror the handler's `if !tka_admits { continue }` loop.
881        if tracker.tka_admits(&unsigned) {
882            tracker.peer_db.upsert(&unsigned);
883        }
884        assert_eq!(tracker.peer_db.peers().len(), 0);
885        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
886    }
887
888    #[tokio::test]
889    async fn tka_active_rejects_bad_signature() {
890        // Authority present + a signature that fails to verify ⇒ rejected, not in peer_db.
891        let (authority, mut sig) = authority_and_valid_sig();
892        // Tamper the last byte (the trailing signature byte) so verification fails.
893        let last = sig.len() - 1;
894        sig[last] ^= 0xff;
895
896        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
897        let bad = peer_node("bad", NODE_KEY_BYTES, sig);
898        assert!(!tracker.tka_admits(&bad));
899
900        if tracker.tka_admits(&bad) {
901            tracker.peer_db.upsert(&bad);
902        }
903        assert_eq!(tracker.peer_db.peers().len(), 0);
904    }
905
906    #[tokio::test]
907    async fn tka_active_admits_authorized_peer() {
908        // Authority present + correctly-signed node key ⇒ admitted and upserted.
909        let (authority, sig) = authority_and_valid_sig();
910        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
911
912        let good = peer_node("good", NODE_KEY_BYTES, sig);
913        assert!(tracker.tka_admits(&good));
914
915        if tracker.tka_admits(&good) {
916            tracker.peer_db.upsert(&good);
917        }
918        assert_eq!(tracker.peer_db.peers().len(), 1);
919        assert!(tracker.peer_db.get(&good.node_key).is_some());
920    }
921
922    // ---------------------------------------------------------------------------------------------
923    // Tests that drive REAL `PeerUpdate`s through the shared handler body
924    // ([`PeerTracker::apply_peer_update`], the single source of truth the actor's netmap `handle`
925    // also calls), so the two real upsert sites (`Full` and `Delta { upsert }`) are exercised via
926    // the actual enforcement path — not by hand-mirroring `if !tka_admits { continue }`.
927    // ---------------------------------------------------------------------------------------------
928
929    #[tokio::test]
930    async fn tka_active_delta_upsert_rejects_unauthorized() {
931        // Drive a real `Delta { upsert }` whose peer carries no signature. The Delta upsert site
932        // must reject it under an active authority ⇒ not present in peer_db after the handler runs.
933        let (authority, _sig) = authority_and_valid_sig();
934        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
935
936        let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
937        let update = ts_control::PeerUpdate::Delta {
938            upsert: vec![unsigned.clone()],
939            remove: Vec::new(),
940        };
941
942        tracker.apply_peer_update(&update);
943
944        assert_eq!(tracker.peer_db.peers().len(), 0);
945        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
946    }
947
948    #[tokio::test]
949    async fn tka_active_delta_upsert_admits_authorized() {
950        // Drive a real `Delta { upsert }` with a correctly-signed peer ⇒ present in peer_db.
951        let (authority, sig) = authority_and_valid_sig();
952        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
953
954        let good = peer_node("good", NODE_KEY_BYTES, sig);
955        let update = ts_control::PeerUpdate::Delta {
956            upsert: vec![good.clone()],
957            remove: Vec::new(),
958        };
959
960        tracker.apply_peer_update(&update);
961
962        assert_eq!(tracker.peer_db.peers().len(), 1);
963        assert!(tracker.peer_db.get(&good.node_key).is_some());
964    }
965
966    #[tokio::test]
967    async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
968        // Drive a real `Full` carrying a MIX of authorized + unauthorized peers. Only the
969        // correctly-signed peer survives the Full upsert site; the unsigned and bad-sig peers are
970        // dropped fail-closed.
971        let (authority, sig) = authority_and_valid_sig();
972        // A bad-sig variant of the same authorized signature (tamper the trailing byte).
973        let mut bad_sig = sig.clone();
974        let last = bad_sig.len() - 1;
975        bad_sig[last] ^= 0xff;
976
977        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
978
979        // Only the authorized peer carries NODE_KEY_BYTES (the key the authority signed); the
980        // rejected peers use distinct node keys so the survivor is unambiguous.
981        let good = peer_node("good", NODE_KEY_BYTES, sig);
982        let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
983        let bad = peer_node("bad", [9u8; 32], bad_sig);
984
985        let update =
986            ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
987
988        tracker.apply_peer_update(&update);
989
990        assert_eq!(tracker.peer_db.peers().len(), 1);
991        assert!(tracker.peer_db.get(&good.node_key).is_some());
992        assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
993        assert!(tracker.peer_db.get(&bad.node_key).is_none());
994    }
995
996    #[tokio::test]
997    async fn tka_full_resync_revocation_behavior() {
998        // Revocation-on-resync: admit a peer, then re-include the SAME stable_id in a `Full` with a
999        // now-invalid signature. Per the Logic review finding, the pre-fix `retain` kept the stale
1000        // (previously-admitted) entry because membership was decided purely by stable_id.
1001        //
1002        // FIXED (not merely documented): the `Full` `retain` now keys on `tka_admits`-passing
1003        // stable_ids, so a peer whose re-included signature no longer verifies under the active
1004        // authority is EVICTED. This test asserts eviction. The inactive (authority=None) path is
1005        // provably unchanged — `tka_admits` always returns `true` there, so the retained set equals
1006        // the set of re-included stable_ids exactly (see `tka_inactive_full_resync_keeps_*`).
1007        let (authority, sig) = authority_and_valid_sig();
1008        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
1009
1010        // 1) Admit the peer with a valid signature via a real `Full`.
1011        let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
1012        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1013        assert_eq!(tracker.peer_db.peers().len(), 1);
1014        assert!(tracker.peer_db.get(&good.node_key).is_some());
1015
1016        // 2) Re-sync the SAME stable_id, but with a now-invalid signature (tamper trailing byte).
1017        let mut bad_sig = sig;
1018        let last = bad_sig.len() - 1;
1019        bad_sig[last] ^= 0xff;
1020        let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
1021        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
1022
1023        // Eviction: the stale entry is dropped because its re-included signature fails the gate.
1024        assert_eq!(tracker.peer_db.peers().len(), 0);
1025        assert!(tracker.peer_db.get(&revoked.node_key).is_none());
1026    }
1027
1028    #[tokio::test]
1029    async fn tka_inactive_full_resync_keeps_reincluded_peer() {
1030        // Guard the inactive (authority=None) path against the revocation fix: with no authority,
1031        // a peer re-included in a `Full` survives regardless of its signature bytes — byte-for-byte
1032        // pre-TKA behavior, proving the `Full` `retain` change does not regress the always-taken
1033        // branch this wave.
1034        let mut tracker = PeerTracker::for_test(test_env(), None);
1035
1036        let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
1037        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1038        assert_eq!(tracker.peer_db.peers().len(), 1);
1039
1040        // Re-sync the same stable_id with garbage signature bytes; inactive enforcement keeps it.
1041        let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
1042        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
1043        assert_eq!(tracker.peer_db.peers().len(), 1);
1044        assert!(tracker.peer_db.get(&resynced.node_key).is_some());
1045    }
1046
1047    /// A `Patch` for a peer already in the netmap merges only the fields it carries — here new UDP
1048    /// endpoints and a new home DERP — leaving the rest of the node intact. This is the fix for
1049    /// dropped `peers_changed_patch`: without it the netmap keeps stale endpoints and the peer can
1050    /// never re-handshake after it moves.
1051    #[tokio::test]
1052    async fn patch_merges_endpoints_and_derp_into_existing_peer() {
1053        let mut tracker = PeerTracker::for_test(test_env(), None);
1054
1055        // Seed a peer (id == 1, per `peer_node`) with no endpoints / no DERP.
1056        let peer = peer_node("mover", [1u8; 32], vec![]);
1057        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1058        let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1059        assert!(before.underlay_addresses.is_empty());
1060        assert!(before.derp_region.is_none());
1061
1062        // Patch in fresh reachability (the idle-peer-reconnect case).
1063        let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1064        let patch = ts_control::PeerChange {
1065            id: 1,
1066            derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
1067            cap: None,
1068            cap_map: None,
1069            underlay_addresses: Some(vec![new_ep]),
1070            node_key: None,
1071            key_signature: None,
1072            disco_key: None,
1073            node_key_expiry: None,
1074            online: None,
1075            last_seen: None,
1076        };
1077        let (upserts, deletions) =
1078            tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1079
1080        assert_eq!(upserts.len(), 1);
1081        assert_eq!(deletions.len(), 0);
1082        // Same peer, now carrying the patched endpoint + DERP; node key untouched.
1083        assert_eq!(tracker.peer_db.peers().len(), 1);
1084        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1085        assert_eq!(after.underlay_addresses, vec![new_ep]);
1086        assert_eq!(
1087            after.derp_region,
1088            Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
1089        );
1090        assert_eq!(after.node_key, peer.node_key);
1091    }
1092
1093    /// A `Patch` whose node id is not in the current netmap is ignored (the wire contract: a patch
1094    /// never creates a node). No upsert, no deletion, peer set unchanged.
1095    #[tokio::test]
1096    async fn patch_for_unknown_node_is_ignored() {
1097        let mut tracker = PeerTracker::for_test(test_env(), None);
1098        let known = peer_node("known", [1u8; 32], vec![]); // id == 1
1099        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1100
1101        let patch = ts_control::PeerChange {
1102            id: 999, // not in the netmap
1103            derp_region: None,
1104            cap: None,
1105            cap_map: None,
1106            underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1107            node_key: None,
1108            key_signature: None,
1109            disco_key: None,
1110            node_key_expiry: None,
1111            online: None,
1112            last_seen: None,
1113        };
1114        let (upserts, deletions) =
1115            tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1116
1117        assert_eq!(upserts.len(), 0);
1118        assert_eq!(deletions.len(), 0);
1119        assert_eq!(tracker.peer_db.peers().len(), 1);
1120        assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1121    }
1122
1123    /// An expiry-only `Patch` updates `node_key_expiry` on the matching peer (Go
1124    /// `PeerChange.KeyExpiry`), rather than being silently dropped until the next full resync.
1125    #[tokio::test]
1126    async fn patch_updates_node_key_expiry() {
1127        let mut tracker = PeerTracker::for_test(test_env(), None);
1128        let peer = peer_node("expiring", [1u8; 32], vec![]); // id == 1, node_key_expiry: None
1129        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1130
1131        let expiry = "2027-01-01T00:00:00Z"
1132            .parse::<chrono::DateTime<chrono::Utc>>()
1133            .unwrap();
1134        let patch = ts_control::PeerChange {
1135            id: 1,
1136            derp_region: None,
1137            cap: None,
1138            cap_map: None,
1139            underlay_addresses: None,
1140            node_key: None,
1141            key_signature: None,
1142            disco_key: None,
1143            node_key_expiry: Some(expiry),
1144            online: None,
1145            last_seen: None,
1146        };
1147        tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1148
1149        let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1150        assert_eq!(after.node_key_expiry, Some(expiry));
1151    }
1152
1153    /// Channel B: a `PeerChange.online` patch flips a peer's online state without a full node.
1154    #[tokio::test]
1155    async fn patch_updates_online() {
1156        let mut tracker = PeerTracker::for_test(test_env(), None);
1157        let peer = peer_node("p", [1u8; 32], vec![]); // id == 1, online: None
1158        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1159        assert_eq!(
1160            tracker
1161                .peer_db
1162                .get(&(1 as ts_control::NodeId))
1163                .unwrap()
1164                .1
1165                .online,
1166            None
1167        );
1168
1169        let mut patch = ts_control::PeerChange {
1170            id: 1,
1171            derp_region: None,
1172            cap: None,
1173            cap_map: None,
1174            underlay_addresses: None,
1175            node_key: None,
1176            key_signature: None,
1177            disco_key: None,
1178            node_key_expiry: None,
1179            online: Some(true),
1180            last_seen: None,
1181        };
1182        tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch.clone()]));
1183        assert_eq!(
1184            tracker
1185                .peer_db
1186                .get(&(1 as ts_control::NodeId))
1187                .unwrap()
1188                .1
1189                .online,
1190            Some(true),
1191            "PeerChange.online=Some(true) marks the peer online"
1192        );
1193
1194        // A subsequent patch flips it offline.
1195        patch.online = Some(false);
1196        tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1197        assert_eq!(
1198            tracker
1199                .peer_db
1200                .get(&(1 as ts_control::NodeId))
1201                .unwrap()
1202                .1
1203                .online,
1204            Some(false)
1205        );
1206    }
1207
1208    /// Channel C/D: the `online_change` map flips online directly; `peer_seen_change: false`
1209    /// ("the peer is gone") marks the peer offline. Both apply to a peer already in the netmap and
1210    /// ignore unknown ids.
1211    #[tokio::test]
1212    async fn liveness_change_maps_apply_online() {
1213        let mut tracker = PeerTracker::for_test(test_env(), None);
1214        let peer = peer_node("p", [1u8; 32], vec![]); // id == 1
1215        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1216
1217        // Channel C: online_change sets online=true.
1218        let mut online_change = std::collections::BTreeMap::new();
1219        online_change.insert(1 as ts_control::NodeId, true);
1220        online_change.insert(999 as ts_control::NodeId, true); // unknown id — ignored
1221        let changed = tracker.apply_liveness_changes(&online_change, &Default::default());
1222        assert!(changed);
1223        assert_eq!(
1224            tracker
1225                .peer_db
1226                .get(&(1 as ts_control::NodeId))
1227                .unwrap()
1228                .1
1229                .online,
1230            Some(true)
1231        );
1232
1233        // Channel D: peer_seen_change=false marks the peer offline (gone), node retained.
1234        let mut peer_seen_change = std::collections::BTreeMap::new();
1235        peer_seen_change.insert(1 as ts_control::NodeId, false);
1236        let changed = tracker.apply_liveness_changes(&Default::default(), &peer_seen_change);
1237        assert!(changed);
1238        assert_eq!(
1239            tracker
1240                .peer_db
1241                .get(&(1 as ts_control::NodeId))
1242                .unwrap()
1243                .1
1244                .online,
1245            Some(false),
1246            "peer_seen_change=false marks offline (the node stays in the netmap)"
1247        );
1248        assert_eq!(
1249            tracker.peer_db.peers().len(),
1250            1,
1251            "the node is retained, not removed"
1252        );
1253
1254        // No-op when nothing matches / changes.
1255        assert!(!tracker.apply_liveness_changes(&Default::default(), &Default::default()));
1256    }
1257
1258    /// Security: a `Patch` that rotates the node key must re-satisfy the tailnet-lock authority,
1259    /// exactly like a `Delta` upsert. A key-rotation patch whose new signature does NOT verify
1260    /// evicts the peer (fail-closed) rather than leaving a now-unverified entry — closing what would
1261    /// otherwise be a trust-enforcement bypass via the patch path.
1262    #[tokio::test]
1263    async fn patch_key_rotation_failing_tka_evicts_peer() {
1264        let (authority, sig) = authority_and_valid_sig();
1265        let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
1266
1267        // Admit a correctly-signed peer (id == 1).
1268        let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1269        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1270        assert_eq!(tracker.peer_db.peers().len(), 1);
1271
1272        // Patch a new node key whose signature is garbage under the active authority.
1273        let patch = ts_control::PeerChange {
1274            id: 1,
1275            derp_region: None,
1276            cap: None,
1277            cap_map: None,
1278            underlay_addresses: None,
1279            node_key: Some([0x33u8; 32].into()),
1280            key_signature: Some(vec![0x00, 0x01, 0x02]),
1281            disco_key: None,
1282            node_key_expiry: None,
1283            online: None,
1284            last_seen: None,
1285        };
1286        let (upserts, deletions) =
1287            tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1288
1289        assert_eq!(upserts.len(), 0);
1290        assert_eq!(deletions.len(), 1);
1291        assert_eq!(tracker.peer_db.peers().len(), 0);
1292    }
1293
1294    /// A node's `user_id` joins against the accumulated UserProfiles table to resolve the owning
1295    /// user's login name in `WhoIs.user`. With no matching profile, `user` is `None` (the
1296    /// pre-existing behavior); once a profile arrives, the same node resolves to its login. This
1297    /// proves the accumulate-then-join path the netmap handler builds.
1298    fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1299        ts_control::UserProfile {
1300            id,
1301            login_name: login.to_string(),
1302            display_name: None,
1303        }
1304    }
1305
1306    #[tokio::test]
1307    async fn whois_resolves_user_from_accumulated_profiles() {
1308        let mut tracker = PeerTracker::for_test(test_env(), None);
1309
1310        // A peer owned by user id 42 at 100.64.0.1 (the peer_node fixture's address).
1311        let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1312        peer.user_id = 42;
1313        tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1314        let addr = "100.64.0.1:0".parse().unwrap();
1315
1316        // No profile yet: the node resolves but its owner is unknown.
1317        let who = tracker.whois_opt(addr).expect("peer is known");
1318        assert_eq!(who.user, None);
1319
1320        // Profile for a DIFFERENT user must not match.
1321        tracker
1322            .user_profiles
1323            .insert(7, profile(7, "someone-else@example.com"));
1324        assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1325
1326        // The owning user's profile arrives (as the netmap handler would accumulate it): now the
1327        // login resolves.
1328        tracker
1329            .user_profiles
1330            .insert(42, profile(42, "alice@example.com"));
1331        assert_eq!(
1332            tracker.whois_opt(addr).unwrap().user,
1333            Some("alice@example.com".to_string())
1334        );
1335    }
1336
1337    /// `UserProfile::best_label` prefers the login name, falling back to display name, else `None`.
1338    #[test]
1339    fn user_profile_best_label_prefers_login() {
1340        assert_eq!(
1341            profile(1, "alice@example.com").best_label(),
1342            Some("alice@example.com".to_string())
1343        );
1344        let display_only = ts_control::UserProfile {
1345            id: 2,
1346            login_name: String::new(),
1347            display_name: Some("Bob".to_string()),
1348        };
1349        assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1350        let empty = ts_control::UserProfile {
1351            id: 3,
1352            login_name: String::new(),
1353            display_name: None,
1354        };
1355        assert_eq!(empty.best_label(), None);
1356    }
1357}