ts_runtime/peer_tracker/mod.rs
1//! Peer delta update tracking.
2
3use std::{
4 collections::{HashMap, HashSet},
5 net::IpAddr,
6 sync::Arc,
7};
8
9use kameo::{
10 actor::ActorRef,
11 message::{Context, Message},
12 reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24/// Actor that tracks peer delta updates and emits new states.
25pub struct PeerTracker {
26 peer_db: PeerDb,
27 seen_state_update: bool,
28 pending_requests: Vec<Pending>,
29 /// Latest peer snapshot, published on every netmap update so embedders can watch for peer
30 /// changes ([`WatchNetmap`]).
31 peer_watch: watch::Sender<Vec<StatusNode>>,
32 /// Accumulated netmap user profiles (`MapResponse.UserProfiles`), keyed by user id, joined
33 /// against a node's [`Node::user_id`](ts_control::Node::user_id) to resolve the owning user's
34 /// login/display name for a [`WhoIs`](crate::status::WhoIs). Control sends these incrementally
35 /// (only new/changed profiles per response), so this map **accumulates** across updates rather
36 /// than being replaced — a peer upserted in one response may reference a profile delivered in an
37 /// earlier one.
38 user_profiles: HashMap<UserId, UserProfile>,
39 /// Tailnet-Lock (TKA) authority enforced at the peer-trust chokepoint, matching Go
40 /// `tkaFilterNetmapLocked`. Read on demand from a [`watch`] cell the control runner owns: when it
41 /// holds `Some` (a verified lock has been synced from control), enforcement is **active** — every
42 /// upserted peer must present a `key_signature` this authority authorizes, or it is dropped
43 /// (fail-closed), exactly as Go drops peers with a missing or failing signature. When it holds
44 /// `None` (no lock, or the lock was disabled) enforcement is **inactive** and every peer is
45 /// upserted, identical to pre-TKA behavior and to Go's `b.tka == nil` early return.
46 ///
47 /// A `watch::Receiver` (not the bus) is the transport on purpose: the authority is a single
48 /// security-critical state cell, and `watch` is last-write-wins, never-dropped, and ordered by
49 /// the control runner's own writes — so a disable (`None`) can never be reordered behind or
50 /// silently dropped before a stale `Some` (which a best-effort broadcast bus could do, leaving a
51 /// defunct lock enforcing forever). The control runner is the sole writer; we only ever read.
52 ///
53 /// The authority always passes through `VerifiedAumChain::verify` before the control runner
54 /// publishes it, so enforcement only engages on a chain we have cryptographically verified.
55 /// Connectivity now depends on `ts_tka` verifying genuinely-good signatures correctly (see
56 /// SECURITY.md). Self is structurally never filtered here (the self node never enters `peer_db` —
57 /// it is routed to the control runner's `self_node` cell), so a node cannot lock itself out of
58 /// its own netmap.
59 tka_authority: watch::Receiver<Option<Arc<ts_tka::Authority>>>,
60 env: Env,
61}
62
63impl PeerTracker {
64 fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
65 // Canonicalization (case + trailing dot) is handled inside the name index lookup.
66 self.peer_db.get(&name).map(|(_id, node)| node)
67 }
68
69 fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
70 self.peer_db.get(&ip).map(|(_id, node)| node)
71 }
72
73 /// Build the peer entries for a [`Status`](crate::Status) snapshot from the current peer db.
74 ///
75 /// Connectivity fields (`cur_addr`/`relay`) are left at their `from_node` defaults (`None`) here:
76 /// this is the live-watch/hot path and must stay magicsock-free and synchronous. The explicit
77 /// [`GetStatus`] snapshot enriches them ([`status_peers_with_ids`](Self::status_peers_with_ids)).
78 fn status_peers(&self) -> Vec<StatusNode> {
79 self.peer_db
80 .peers()
81 .values()
82 .map(StatusNode::from_node)
83 .collect()
84 }
85
86 /// Like [`status_peers`](Self::status_peers) but pairs each entry with its [`PeerId`], so the
87 /// caller can join per-peer connectivity (the direct manager's `best_addrs`, keyed by `PeerId`)
88 /// onto the `StatusNode` before returning it. Order is unspecified (a `HashMap` walk).
89 fn status_peers_with_ids(&self) -> Vec<(PeerId, StatusNode)> {
90 self.peer_db
91 .peers()
92 .iter()
93 .map(|(id, node)| (*id, StatusNode::from_node(node)))
94 .collect()
95 }
96
97 fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
98 let ip = crate::status::whois_addr(addr);
99 let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
100 // Join the node's owning user id against the accumulated UserProfiles table to resolve a
101 // login/display name. `None` when control sent no profile for that user (e.g. tagged nodes
102 // with no human owner, or a profile not yet delivered).
103 let user = self.resolve_user(node.user_id);
104 Some(crate::status::WhoIs::from_node_with_user(node, user))
105 }
106
107 /// Resolve a user id to its best display label from the accumulated profile table.
108 fn resolve_user(&self, user_id: UserId) -> Option<String> {
109 self.user_profiles
110 .get(&user_id)
111 .and_then(UserProfile::best_label)
112 }
113
114 /// Whether `node` may be admitted to the peer db under Tailnet Lock, matching Go
115 /// `tkaFilterNetmapLocked`'s per-peer verdict (drop unsigned / failed-signature peers).
116 ///
117 /// This consults the live [`tka_authority`](Self::tka_authority) cell on each call (one `borrow`,
118 /// held only for the duration of the verdict). For a `Full` resync — which checks every peer —
119 /// prefer [`tka_authority_snapshot`](Self::tka_authority_snapshot) +
120 /// [`tka_snapshot_admits`](Self::tka_snapshot_admits) to borrow once and verify each peer a single
121 /// time; this method is the convenience wrapper for the single-peer (`Delta`/patch) sites.
122 ///
123 /// Fail-closed and gated:
124 /// - No authority ⇒ no lock synced ⇒ always admit (Go's `b.tka == nil` early return; identical to
125 /// pre-TKA behavior).
126 /// - **Empty trusted-key state** ⇒ always admit (logged at `error!` — see
127 /// [`tka_snapshot_admits`](Self::tka_snapshot_admits) for the full rationale).
128 /// - Authority present + peer carries a `key_signature` the authority authorizes for the peer's
129 /// node key ⇒ admit.
130 /// - Authority present + signature missing or unauthorized/invalid ⇒ **drop** (Go drops peers
131 /// with a missing signature or failed `NodeKeyAuthorized` under tailnet lock).
132 fn tka_admits(&self, node: &Node) -> bool {
133 // Single-peer sites (`Delta`/patch) only need the admit bool; the rotation details are used
134 // exclusively by the cross-peer `Full` filter (rotation obsolescence is whole-netmap).
135 Self::tka_snapshot_admits(self.tka_authority.borrow().as_deref(), node).admitted
136 }
137
138 /// Borrow the current TKA authority once (cloning the cheap `Arc`) for a batch verdict. Returns
139 /// `None` when no lock is synced (admit-all). Used by the `Full` path so a netmap of N peers
140 /// reads the cell once and runs at most one signature verify per peer (not two).
141 fn tka_authority_snapshot(&self) -> Option<Arc<ts_tka::Authority>> {
142 self.tka_authority.borrow().clone()
143 }
144
145 /// The per-peer Tailnet-Lock verdict against an already-borrowed `authority` snapshot. Factored
146 /// out so both the single-peer [`tka_admits`](Self::tka_admits) and the `Full` batch path share
147 /// one verdict implementation (no divergence) while the batch path verifies each peer exactly
148 /// once.
149 ///
150 /// Returns whether the peer is admitted AND, for an admitted peer signed by a rotation chain, the
151 /// [`RotationDetails`](ts_tka::RotationDetails) of that chain — so the `Full` path can run the
152 /// cross-peer rotation filter (Go's `rotationTracker`) without a second verify per peer. A peer
153 /// that is dropped, unsigned, or signed by a non-rotation chain carries `rotation == None`.
154 ///
155 /// Never logs key/signature bytes — only the `stable_id` and the `TkaError` Display (static
156 /// descriptors). One documented parity gap remains vs Go (under-enforcement, in PARITY_ROADMAP):
157 /// no `UnsignedPeerAPIOnly` exemption (our node model lacks the field).
158 fn tka_snapshot_admits(authority: Option<&ts_tka::Authority>, node: &Node) -> TkaVerdict {
159 let Some(auth) = authority else {
160 return TkaVerdict::admit();
161 };
162
163 // Brick-guard: an authority with no trusted keys would drop every peer. A verified chain is
164 // structurally guaranteed ≥1 key (genesis rejects an empty key set, and the last key cannot
165 // be removed), so reaching here means a `ts_tka` invariant was violated — admit rather than
166 // black-hole the whole netmap, and log at `error!` because it signals a real bug, not an
167 // expected runtime input. This is OUR fail-safe, not a Go behavior. NOTE: it only catches the
168 // empty-keyset shape; a non-empty authority that authorizes none of the offered peers still
169 // (correctly) drops them — that is what a lock that revoked everyone means. The
170 // "authorized-zero-peers" isolation case is surfaced separately by the caller.
171 if auth.state().keys.is_empty() {
172 tracing::error!(
173 "TKA: authority has an empty trusted-key set (verified chains never do — likely a \
174 ts_tka bug); not enforcing (admitting all) to avoid isolating the node"
175 );
176 return TkaVerdict::admit();
177 }
178
179 if node.key_signature.is_empty() {
180 tracing::warn!(
181 stable_id = ?node.stable_id,
182 "TKA: dropping unsigned peer under tailnet lock"
183 );
184 return TkaVerdict::drop();
185 }
186
187 match auth.node_key_authorized_with_details(&node.node_key.to_bytes(), &node.key_signature)
188 {
189 Ok(rotation) => {
190 tracing::debug!(stable_id = ?node.stable_id, "TKA: peer node-key authorized");
191 TkaVerdict {
192 admitted: true,
193 rotation,
194 }
195 }
196 Err(e) => {
197 tracing::warn!(
198 stable_id = ?node.stable_id,
199 error = %e,
200 "TKA: dropping peer with unauthorized node key"
201 );
202 TkaVerdict::drop()
203 }
204 }
205 }
206}
207
208/// The outcome of a per-peer Tailnet-Lock check: whether the peer is admitted, plus (for an admitted
209/// peer signed by a rotation chain) the chain's [`RotationDetails`](ts_tka::RotationDetails) so the
210/// `Full` path can run the cross-peer rotation filter from the SAME verify pass (no second verify).
211struct TkaVerdict {
212 admitted: bool,
213 rotation: Option<ts_tka::RotationDetails>,
214}
215
216impl TkaVerdict {
217 /// Admitted, no rotation details (no lock / brick-guard / non-rotation signature).
218 fn admit() -> Self {
219 Self {
220 admitted: true,
221 rotation: None,
222 }
223 }
224 /// Dropped.
225 fn drop() -> Self {
226 Self {
227 admitted: false,
228 rotation: None,
229 }
230 }
231}
232
233/// Cross-peer rotation-obsolescence tracker, mirroring Go `ipnlocal.rotationTracker`. Fed the
234/// [`RotationDetails`](ts_tka::RotationDetails) of every admitted, rotation-signed peer in a `Full`
235/// netmap; [`obsolete_keys`](Self::obsolete_keys) then returns the node keys to drop on top of the
236/// per-peer verdict. Two rules (Go `tkaFilterNetmapLocked` + `rotationTracker.obsoleteKeys`):
237///
238/// 1. Every prior node key named in any rotation chain is obsolete (a newer chain rotated it away).
239/// 2. Among `Direct`-rooted chains sharing one wrapping pubkey (a clone signal), only the
240/// longest-chain peer survives; if the two longest are tied, ALL in that group are dropped (we
241/// cannot tell which is the latest, so reject for safety). `Credential`-rooted chains are exempt
242/// from rule 2 — several nodes can legitimately join under one reusable auth key (same wrapping
243/// pubkey), so sharing it is not a clone signal there. (Rule 1 still applies to them.)
244///
245/// Node keys are tracked as raw `Vec<u8>` (the verified 32-byte node-public bytes).
246#[derive(Default)]
247struct RotationTracker {
248 obsolete: HashSet<Vec<u8>>,
249 by_wrapping_key: HashMap<Vec<u8>, Vec<SigRotation>>,
250}
251
252/// One admitted peer's rotation entry within a wrapping-key group.
253struct SigRotation {
254 node_key: Vec<u8>,
255 num_prev_keys: usize,
256}
257
258impl RotationTracker {
259 /// Record an admitted peer `node_key` and its rotation `details` (Go `addRotationDetails`).
260 fn add(&mut self, node_key: Vec<u8>, details: &ts_tka::RotationDetails) {
261 // Rule 1: every prior key is obsolete — applied for ALL chains (incl. credential-rooted),
262 // matching Go's ungated `obsolete.AddSlice(d.PrevNodeKeys)`.
263 self.obsolete.extend(details.prev_node_keys.iter().cloned());
264 // Rule 2 (clone-uniqueness) is gated to Direct-rooted chains only.
265 if details.initial_sig_kind != ts_tka::SigKind::Direct {
266 return;
267 }
268 self.by_wrapping_key
269 .entry(details.initial_wrapping_pubkey.clone())
270 .or_default()
271 .push(SigRotation {
272 node_key,
273 num_prev_keys: details.prev_node_keys.len(),
274 });
275 }
276
277 /// Compute the full obsolete node-key set (Go `rotationTracker.obsoleteKeys`). Processes each
278 /// wrapping-key group, mutating the shared `obsolete` set as it goes (so a key obsoleted by one
279 /// group is seen as obsolete by later groups via the `retain` below — Go's
280 /// `slices.DeleteFunc(... Contains)`). Group iteration order (a `HashMap` drain) is
281 /// nondeterministic, but the result is order-INDEPENDENT: this only ever *inserts* into
282 /// `obsolete` (never removes), and rule 1 already obsoleted every prior key before this loop, so
283 /// the final set is a union that does not depend on which group runs first (as in Go).
284 fn obsolete_keys(mut self) -> HashSet<Vec<u8>> {
285 // Drain only the group map so the loop can mutate `self.obsolete` without aliasing it; the
286 // shared `obsolete` set itself is NOT drained, preserving the cross-group visibility above.
287 let groups: Vec<Vec<SigRotation>> = self.by_wrapping_key.drain().map(|(_k, v)| v).collect();
288 for mut group in groups {
289 // Drop entries already obsoleted (rotated away) by another chain.
290 group.retain(|rd| !self.obsolete.contains(&rd.node_key));
291 if group.is_empty() {
292 continue;
293 }
294 // Longest chain (most prior keys) is the newest ⇒ the survivor; sort decreasing.
295 // `sort_by_key` is stable (like Go's `SortStableFunc`); `Reverse` gives descending order.
296 group.sort_by_key(|rd| core::cmp::Reverse(rd.num_prev_keys));
297 if group.len() >= 2 && group[0].num_prev_keys == group[1].num_prev_keys {
298 // Tie for longest ⇒ cannot disambiguate the latest ⇒ drop the WHOLE group.
299 tracing::warn!(
300 "TKA: multiple peers share a wrapping key with equal rotation depth; dropping all (cannot determine the latest)"
301 );
302 for rd in &group {
303 self.obsolete.insert(rd.node_key.clone());
304 }
305 } else {
306 // Only the longest-chain peer survives; the rest are obsolete.
307 for rd in &group[1..] {
308 self.obsolete.insert(rd.node_key.clone());
309 }
310 }
311 }
312 self.obsolete
313 }
314}
315
316impl kameo::Actor for PeerTracker {
317 /// `(env, tka_authority)`: the bus/keys env, plus the read end of the control runner's TKA
318 /// enforcement-authority cell (Go `tkaFilterNetmapLocked`). The control runner is the sole
319 /// writer; it publishes the verified `Authority` after a successful `/machine/tka/sync` and
320 /// `None` when the lock is disabled. A `watch` cell (not a bus message) so the latest value is
321 /// always readable on demand, never dropped, and never reordered (see [`tka_authority`]).
322 type Args = (Env, watch::Receiver<Option<Arc<ts_tka::Authority>>>);
323 type Error = Error;
324
325 async fn on_start(
326 (env, tka_authority): Self::Args,
327 slf: ActorRef<Self>,
328 ) -> Result<Self, Self::Error> {
329 env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
330
331 let (peer_watch, _) = watch::channel(Vec::new());
332
333 Ok(Self {
334 peer_db: PeerDb::default(),
335 pending_requests: Default::default(),
336 seen_state_update: false,
337 peer_watch,
338 user_profiles: HashMap::new(),
339 // The cell starts `None` (no lock synced ⇒ enforcement inactive, admit all, matching
340 // Go's `b.tka == nil`); the control runner flips it to `Some` on the first sync.
341 tka_authority,
342 env,
343 })
344 }
345}
346
347enum Pending {
348 PeerByName(PeerByName, ReplySender<Option<Node>>),
349 AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
350 TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
351 Status(ReplySender<Vec<(PeerId, StatusNode)>>),
352 WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
353}
354
355// For messages with arguments, a struct is generated with the args as fields. They aren't
356// documented, and we can't apply attributes directly to the fields. Hence, wrap in a module where
357// docs are turned off everywhere.
358#[allow(missing_docs)]
359mod msg_impl {
360 use std::net::IpAddr;
361
362 use kameo::prelude::DelegatedReply;
363
364 use super::*;
365
366 #[kameo::messages]
367 impl PeerTracker {
368 /// Lookup a peer by name.
369 ///
370 /// Waits until we've received at least one peer update from control.
371 #[message(ctx)]
372 pub async fn peer_by_name(
373 &mut self,
374 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
375 name: String,
376 ) -> DelegatedReply<Option<Node>> {
377 let (deleg, sender) = ctx.reply_sender();
378 let Some(sender) = sender else { return deleg };
379
380 if !self.seen_state_update {
381 tracing::debug!(query = name, "no peer state seen yet, queueing request");
382
383 self.pending_requests
384 .push(Pending::PeerByName(PeerByName { name }, sender));
385
386 return deleg;
387 }
388
389 sender.send(self.peer_by_name_opt(&name).cloned());
390
391 deleg
392 }
393
394 /// Lookup all peers that accept packets addressed to the given IP.
395 ///
396 /// This includes the peer's tailnet address and any subnet routes it provides. Only
397 /// the peers with the most specific subnet route match that covers `ip` will be
398 /// returned.
399 ///
400 /// E.g., suppose:
401 ///
402 /// - We're querying for `10.1.2.3`
403 /// - `PeerA` and `PeerB` have accepted routes for `10.1.2.0/24`
404 /// - `PeerC` has an accepted route for `10.1.0.0/16`
405 ///
406 /// Only `PeerA` and `PeerB` will be returned, since they have the most specific
407 /// prefix match.
408 #[message(ctx)]
409 pub fn peer_by_accepted_route(
410 &mut self,
411 ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
412 ip: IpAddr,
413 ) -> DelegatedReply<Vec<Node>> {
414 let (deleg, sender) = ctx.reply_sender();
415 let Some(sender) = sender else { return deleg };
416
417 if !self.seen_state_update {
418 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
419
420 self.pending_requests
421 .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
422
423 return deleg;
424 }
425
426 sender.send(
427 self.peer_db
428 .get_route(ip.into())
429 .map(|(_id, node)| node.clone())
430 .collect(),
431 );
432
433 deleg
434 }
435
436 /// Lookup the peer that has the given tailnet IP address.
437 #[message(ctx)]
438 pub fn peer_by_tailnet_ip(
439 &mut self,
440 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
441 ip: IpAddr,
442 ) -> DelegatedReply<Option<Node>> {
443 let (deleg, sender) = ctx.reply_sender();
444 let Some(sender) = sender else { return deleg };
445
446 if !self.seen_state_update {
447 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
448
449 self.pending_requests
450 .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
451
452 return deleg;
453 }
454
455 sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
456
457 deleg
458 }
459
460 /// Build the peer entries of a [`Status`](crate::Status) snapshot, each paired with its
461 /// [`PeerId`] so [`Runtime::status`](crate::Runtime::status) can join per-peer connectivity
462 /// (`cur_addr`/`relay`) from the direct manager before returning. The self node is *not*
463 /// included here (it lives in the control runner); `Runtime::status` combines both and drops
464 /// the ids.
465 ///
466 /// Waits until we've received at least one peer update from control.
467 #[message(ctx)]
468 pub fn get_status(
469 &mut self,
470 ctx: &mut Context<Self, DelegatedReply<Vec<(PeerId, StatusNode)>>>,
471 ) -> DelegatedReply<Vec<(PeerId, StatusNode)>> {
472 let (deleg, sender) = ctx.reply_sender();
473 let Some(sender) = sender else { return deleg };
474
475 if !self.seen_state_update {
476 tracing::debug!("no peer state seen yet, queueing status request");
477 self.pending_requests.push(Pending::Status(sender));
478 return deleg;
479 }
480
481 sender.send(self.status_peers_with_ids());
482
483 deleg
484 }
485
486 /// Return every known peer's full domain [`Node`] (not the lossy [`StatusNode`]).
487 ///
488 /// Used by [`Runtime::file_targets`](crate::Runtime::file_targets), which needs the full node
489 /// (peerAPI address, owning user id, cap map) to compute Taildrop send targets. The self node
490 /// is not included (it lives in the control runner). Returns empty before the first netmap —
491 /// the natural "not connected yet" analog (an immediate answer, no queueing needed: callers
492 /// that need a populated list await `Running` first).
493 #[message]
494 pub fn all_peers(&self) -> Vec<Node> {
495 self.peer_db.peers().values().cloned().collect()
496 }
497
498 /// Resolve which node owns a tailnet source address.
499 ///
500 /// Maps the source IP of `addr` to the owning node via the tailnet-IP index, returning a
501 /// [`WhoIs`](crate::WhoIs). The port is ignored (a tailnet IP uniquely identifies a node).
502 ///
503 /// The resulting [`WhoIs`](crate::WhoIs) carries no user/login or capability data: this
504 /// fork's domain [`Node`](ts_control::Node) does not retain those wire fields. See the
505 /// [`status`](crate::status) module docs for the gap.
506 ///
507 /// Waits until we've received at least one peer update from control.
508 #[message(ctx)]
509 pub fn whois(
510 &mut self,
511 ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
512 addr: std::net::SocketAddr,
513 ) -> DelegatedReply<Option<crate::status::WhoIs>> {
514 let (deleg, sender) = ctx.reply_sender();
515 let Some(sender) = sender else { return deleg };
516
517 if !self.seen_state_update {
518 tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
519 self.pending_requests
520 .push(Pending::WhoIs(Whois { addr }, sender));
521 return deleg;
522 }
523
524 sender.send(self.whois_opt(addr));
525
526 deleg
527 }
528
529 /// Subscribe to netmap peer-change events.
530 ///
531 /// Returns a [`watch::Receiver`] whose value is the current set of peer
532 /// [`StatusNode`]s, updated on every netmap state update from control. Embedders can await
533 /// changes via [`watch::Receiver::changed`] to react to peers joining, leaving, or changing.
534 ///
535 /// The receiver's initial value is the peer set at subscription time (empty before the
536 /// first netmap update). This is a peer-only view; combine with the self node from
537 /// [`Runtime::status`](crate::Runtime::status) when a full snapshot is needed.
538 #[message(derive(Clone))]
539 pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
540 self.peer_watch.subscribe()
541 }
542 }
543}
544
545pub use msg_impl::*;
546
547#[derive(Debug, Clone)]
548pub(crate) struct PeerState {
549 #[allow(unused)]
550 pub deletions: HashSet<PeerId>,
551 #[allow(unused)]
552 pub upserts: HashSet<PeerId>,
553 pub peers: Arc<PeerDb>,
554}
555
556impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
557 type Reply = ();
558
559 async fn handle(
560 &mut self,
561 msg: Arc<ts_control::StateUpdate>,
562 _ctx: &mut Context<Self, Self::Reply>,
563 ) {
564 // Accumulate user profiles first — control sends them incrementally and a response may
565 // carry profiles with no peer delta (or peers that reference a profile from an earlier
566 // response), so this must happen before the no-peer-update early return below.
567 for profile in &msg.user_profiles {
568 self.user_profiles.insert(profile.id, profile.clone());
569 }
570
571 // Apply the standalone online/last-seen delta maps (channels C/D, `MapResponse.OnlineChange`
572 // / `PeerSeenChange`). These arrive keyed by control node id and may ride a response that
573 // carries NO `peer_update` (a bare online flip is the common case), so they must be applied
574 // *before* the no-peer-update early return — otherwise online status freezes at the last
575 // full-node/patch value. Each entry only ever *sets* a value (never back to unknown).
576 // Wall clock for a `PeerSeenChange: true` (Go uses `clock.Now()`). chrono is built without
577 // its `clock` feature in this workspace, so derive it from `SystemTime` the same way the
578 // control runner / ssh-policy paths do (unix secs → `DateTime::from_timestamp`).
579 let now = std::time::SystemTime::now()
580 .duration_since(std::time::UNIX_EPOCH)
581 .ok()
582 .and_then(|d| chrono::DateTime::from_timestamp(d.as_secs() as i64, d.subsec_nanos()))
583 .unwrap_or_default();
584 let liveness_changed =
585 self.apply_liveness_changes(&msg.online_change, &msg.peer_seen_change, now);
586
587 if msg.peer_update.is_none() && msg.peer_patches.is_empty() {
588 // No peer set or patch this response. If a liveness delta still mutated the netmap,
589 // publish the refreshed snapshot so watchers (and `GetStatus`) see the new online state.
590 if liveness_changed {
591 self.service_pending_requests();
592 self.peer_watch.send_replace(self.status_peers());
593 if let Err(e) = self
594 .env
595 .publish(Arc::new(PeerState {
596 upserts: HashSet::default(),
597 deletions: HashSet::default(),
598 peers: Arc::new(self.peer_db.clone()),
599 }))
600 .await
601 {
602 tracing::error!(error = %e, "publishing liveness-only peer state update");
603 }
604 }
605 return;
606 }
607
608 // Apply the whole-node peer set (if any) FIRST, then the field-level patches on top —
609 // mirroring Go's `controlclient` order (`Peers*` then `PeersChangedPatch`). A response may
610 // carry either, both, or (with a liveness-only delta) neither. Merge the upsert/deletion sets
611 // so the published `PeerState` reflects every node touched by both passes; a node both
612 // upserted by the set and patched stays in `upserts` (the patch removes it from `deletions`).
613 let (mut upserts, mut deletions) = msg
614 .peer_update
615 .as_ref()
616 .map(|u| self.apply_peer_update(u))
617 .unwrap_or_default();
618
619 if !msg.peer_patches.is_empty() {
620 let (patch_upserts, patch_deletions) = self.apply_peer_patches(&msg.peer_patches);
621 // A patch can evict a node the set just upserted (TKA rejection after key rotation), or
622 // re-admit/patch one not in the set — reconcile so each id lands in exactly one set.
623 for id in &patch_upserts {
624 deletions.remove(id);
625 }
626 for id in &patch_deletions {
627 upserts.remove(id);
628 }
629 upserts.extend(patch_upserts);
630 deletions.extend(patch_deletions);
631 }
632
633 tracing::debug!(
634 n_upsert = upserts.len(),
635 n_delete = deletions.len(),
636 peer_count = self.peer_db.peers().len(),
637 "new peer state"
638 );
639
640 self.service_pending_requests();
641
642 // Publish the latest peer snapshot to netmap watchers. `send_replace` keeps the receiver's
643 // value current even when there are no subscribers, so a late subscriber sees fresh state.
644 self.peer_watch.send_replace(self.status_peers());
645
646 if let Err(e) = self
647 .env
648 .publish(Arc::new(PeerState {
649 upserts,
650 deletions,
651 peers: Arc::new(self.peer_db.clone()),
652 }))
653 .await
654 {
655 tracing::error!(error = %e, "publishing peer state update");
656 }
657 }
658}
659
660/// Ask the peer tracker to re-broadcast its current peer snapshot on the bus, without any peer
661/// change. Sent after a runtime preference change so the route updater and source filter (both
662/// `Arc<PeerState>` subscribers) re-resolve against the new value immediately, rather than waiting
663/// for the next netmap update: `Device::set_exit_node` (new exit-node selector) and
664/// `Device::set_accept_routes` (new accept-routes flag) both send it.
665#[derive(Debug, Clone, Copy)]
666pub struct RepublishState;
667
668impl Message<RepublishState> for PeerTracker {
669 type Reply = ();
670
671 async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
672 // An empty upsert/deletion set: this is a re-broadcast of the unchanged peer set, not a
673 // delta. Subscribers recompute their routes/filters against the current peers and the
674 // (just-updated) runtime preferences (exit-node selector, accept-routes flag).
675 if let Err(e) = self
676 .env
677 .publish(Arc::new(PeerState {
678 upserts: HashSet::default(),
679 deletions: HashSet::default(),
680 peers: Arc::new(self.peer_db.clone()),
681 }))
682 .await
683 {
684 tracing::error!(error = %e, "re-publishing peer state after a runtime preference change");
685 }
686 }
687}
688
689impl PeerTracker {
690 /// Apply a single [`PeerUpdate`](ts_control::PeerUpdate) to the peer db, enforcing the
691 /// Tailnet-Lock peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) at every upsert site.
692 ///
693 /// This is the **single source of truth** for the peer-trust enforcement loop: the actor's
694 /// netmap [`handle`](Message::handle) calls it, and so do the TKA enforcement tests, so the two
695 /// real upsert sites (`Full` and `Delta { upsert }`) cannot diverge from what is tested.
696 ///
697 /// Returns `(upserts, deletions)` — the [`PeerId`]s touched — for downstream bookkeeping.
698 fn apply_peer_update(
699 &mut self,
700 peer_update: &ts_control::PeerUpdate,
701 ) -> (HashSet<PeerId>, HashSet<PeerId>) {
702 let mut upserts = HashSet::default();
703 let mut deletions = HashSet::default();
704
705 match peer_update {
706 ts_control::PeerUpdate::Full(new_nodes) => {
707 tracing::trace!("full peer update");
708
709 // Borrow the authority ONCE for the whole batch and verify each peer EXACTLY once
710 // (Go runs `tkaFilterNetmapLocked` once over the assembled netmap; an earlier draft
711 // verified every peer twice — once for `retained_ids`, once in the upsert loop —
712 // doubling the ed25519 cost on the hot resync path). The per-node verdict vector
713 // `admits` is computed once and drives both the `retain` (evict revoked peers, keyed
714 // by stable_id) and the upsert loop (skip rejected peers, by the node's OWN verdict).
715 // Keeping a per-node verdict (not just a stable_id set) means a node whose own
716 // signature fails is never admitted on the strength of a different node that happens
717 // to share its stable_id — matching the old per-node re-verify for that degenerate
718 // (malformed-control) input.
719 //
720 // Revocation evicts: a peer re-included with a now-invalid/missing signature under an
721 // active authority fails its verdict, so it is excluded from `retained_ids` and
722 // `retain` drops the stale (previously-admitted) entry. With no authority the snapshot
723 // is `None`, so every node passes — byte-for-byte the pre-TKA behavior (no regression).
724 let authority = self.tka_authority_snapshot();
725 let verdicts = new_nodes
726 .iter()
727 .map(|node| Self::tka_snapshot_admits(authority.as_deref(), node))
728 .collect::<Vec<_>>();
729
730 // Cross-peer rotation filter (Go `rotationTracker`): from the SAME verify pass above,
731 // feed every admitted, rotation-signed peer's details to the tracker, then drop any
732 // peer presenting a node key a newer rotation has superseded (or a tied clone). This
733 // is whole-netmap by nature — one peer's chain obsoletes another's key — so it lives
734 // here, not in the per-peer verdict, matching Go's single pass over `nm.Peers`.
735 let mut rotation = RotationTracker::default();
736 for (node, verdict) in new_nodes.iter().zip(&verdicts) {
737 if verdict.admitted
738 && let Some(details) = &verdict.rotation
739 {
740 rotation.add(node.node_key.to_bytes().to_vec(), details);
741 }
742 }
743 let obsolete = rotation.obsolete_keys();
744
745 // Final per-node keep verdict: admitted by the per-peer check AND not rotation-obsolete.
746 // Drives both the `retain` (evict) and the upsert loop, so a node whose own signature
747 // fails — or whose key was rotated away — is never admitted on the strength of a
748 // stable_id twin.
749 let keep = new_nodes
750 .iter()
751 .zip(&verdicts)
752 .map(|(node, v)| {
753 // `contains` takes `&[u8]` (HashSet<Vec<u8>> borrows as a slice) — no alloc.
754 v.admitted && !obsolete.contains(&node.node_key.to_bytes()[..])
755 })
756 .collect::<Vec<bool>>();
757 let retained_ids = new_nodes
758 .iter()
759 .zip(keep.iter().copied())
760 .filter(|(_, k)| *k)
761 .map(|(node, _)| &node.stable_id)
762 .collect::<HashSet<_>>();
763
764 // Isolation diagnostic: an ACTIVE lock that authorized none of the offered peers
765 // leaves this node with no peers — surface it loudly so a self-lockout (vs an attack)
766 // is diagnosable. `authority.is_some()` means a real keyed lock (the empty-keyset
767 // brick-guard admits-all, so it never reaches here with zero retained).
768 if authority.is_some() && !new_nodes.is_empty() && retained_ids.is_empty() {
769 tracing::error!(
770 offered = new_nodes.len(),
771 "TKA: active lock authorized ZERO of the offered peers; node is isolated \
772 (verify the lock state, or disable tailnet lock to recover)"
773 );
774 }
775
776 self.peer_db.retain(|id, peer| {
777 let retain = retained_ids.contains(&peer.stable_id);
778
779 if !retain {
780 deletions.insert(id);
781 }
782
783 retain
784 });
785
786 for (node, k) in new_nodes.iter().zip(keep.iter().copied()) {
787 if !k {
788 continue; // fail-CLOSED: rejected by tailnet lock or rotation-obsolete (above)
789 }
790 let peer_id = self.peer_db.upsert(node);
791 upserts.insert(peer_id);
792 }
793 }
794
795 ts_control::PeerUpdate::Delta { remove, upsert } => {
796 tracing::trace!("delta peer update");
797
798 for peer in upsert {
799 if !self.tka_admits(peer) {
800 // fail-CLOSED: do not upsert a peer rejected by tailnet lock. If the peer is
801 // ALREADY in the db (a delta re-upserting an existing peer whose signature is
802 // now invalid — e.g. revoked between syncs), evict the stale entry rather than
803 // leaving an unverified peer admitted; Go re-filters the whole netmap each map
804 // response, so a now-unsigned peer would not survive there either.
805 if let Some((id, _)) = self.peer_db.remove(&peer.stable_id) {
806 tracing::warn!(
807 stable_id = ?peer.stable_id,
808 "TKA: delta re-upsert rejected; evicting now-unauthorized peer"
809 );
810 deletions.insert(id);
811 }
812 continue;
813 }
814 let id = self.peer_db.upsert(peer);
815
816 upserts.insert(id);
817 }
818
819 for peer in remove {
820 let Some((id, _node)) = self.peer_db.remove(peer) else {
821 tracing::error!(control_node_id = peer, "removed peer was unknown");
822 continue;
823 };
824
825 deletions.insert(id);
826 }
827 }
828 }
829
830 (upserts, deletions)
831 }
832
833 /// Apply field-level peer patches (`MapResponse.PeersChangedPatch`), returning the upserted /
834 /// deleted [`PeerId`]s.
835 ///
836 /// This is a SEPARATE channel from [`apply_peer_update`](Self::apply_peer_update): Go's
837 /// `controlclient` applies the whole-node `Peers*` set first and then `PeersChangedPatch`, so a
838 /// response that carries both has the peer set applied first (by the caller) and these patches
839 /// applied second, on top of the freshly-synced nodes. A patch only mutates a peer already in the
840 /// netmap; an unknown node id is ignored (the wire contract — a patch never creates a node).
841 fn apply_peer_patches(
842 &mut self,
843 patches: &[ts_control::PeerChange],
844 ) -> (HashSet<PeerId>, HashSet<PeerId>) {
845 let mut upserts = HashSet::default();
846 let mut deletions = HashSet::default();
847
848 tracing::trace!(n = patches.len(), "peer patch update");
849
850 for patch in patches {
851 // Clone the current node, apply the present fields, and re-upsert through the same path
852 // as a delta so indexes/routes stay consistent.
853 let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
854 tracing::debug!(
855 control_node_id = patch.id,
856 "peer patch for unknown node; ignoring"
857 );
858 continue;
859 };
860
861 let mut node = existing.clone();
862 if let Some(endpoints) = &patch.underlay_addresses {
863 node.underlay_addresses = endpoints.clone();
864 }
865 if let Some(derp) = patch.derp_region {
866 node.derp_region = Some(derp);
867 }
868 if let Some(cap) = patch.cap {
869 node.cap = cap;
870 }
871 if let Some(cap_map) = &patch.cap_map {
872 node.cap_map = cap_map.clone();
873 }
874 if let Some(disco_key) = patch.disco_key {
875 node.disco_key = Some(disco_key);
876 }
877 if let Some(expiry) = patch.node_key_expiry {
878 node.node_key_expiry = Some(expiry);
879 }
880 // Online/last-seen liveness deltas (`PeerChange.Online`/`LastSeen`) — the dominant
881 // channel by which peer online transitions arrive mid-session. A patch only ever *sets*
882 // a value (never patches back to unknown), so apply when present.
883 if let Some(online) = patch.online {
884 node.online = Some(online);
885 }
886 if let Some(last_seen) = patch.last_seen {
887 node.last_seen = Some(last_seen);
888 }
889 // Key rotation: a patch may swap the node key (and its TKA signature). Apply both
890 // together so the trust gate below verifies the new signature against the new key, never
891 // a mismatched pair.
892 if let Some(node_key) = patch.node_key {
893 node.node_key = node_key;
894 }
895 if let Some(sig) = &patch.key_signature {
896 node.key_signature = sig.clone();
897 }
898
899 // Re-run the tailnet-lock gate on the patched node: a patch that rotates the key must
900 // satisfy the active authority, exactly like a `Delta` upsert, or it would be a
901 // trust-enforcement bypass. fail-CLOSED — if the patched node is no longer admitted,
902 // evict it rather than keep the stale (now-unverified) entry.
903 if !self.tka_admits(&node) {
904 if let Some((id, _)) = self.peer_db.remove(&patch.id) {
905 tracing::warn!(
906 control_node_id = patch.id,
907 "peer patch rejected by tailnet lock; evicting peer"
908 );
909 deletions.insert(id);
910 }
911 continue;
912 }
913
914 let id = self.peer_db.upsert(&node);
915 upserts.insert(id);
916 }
917
918 (upserts, deletions)
919 }
920
921 /// Apply the standalone online/last-seen delta maps (`MapResponse.OnlineChange` /
922 /// `PeerSeenChange`, channels C/D) onto the retained netmap. Returns `true` if any node was
923 /// actually mutated (so the caller knows whether to re-publish).
924 ///
925 /// Mirrors Go `controlclient/map.go:updatePeersStateFromResponse` (the two channels are
926 /// semantically DISTINCT and must not be conflated):
927 /// - `OnlineChange` (channel C) is the sole driver of a peer's `online` flag (`mut.Online = v`).
928 /// - `PeerSeenChange` (channel D) is the sole driver of `last_seen`: `true ⇒ LastSeen = now`,
929 /// `false ⇒ LastSeen = nil` (cleared). It NEVER touches `online` — "not seen recently" is not
930 /// the same as "offline", which only `OnlineChange` asserts.
931 ///
932 /// Each entry is keyed by control node id and applies to a peer already in the netmap; an unknown
933 /// node id is ignored (these maps never create a node). `now` is the wall-clock timestamp for a
934 /// `PeerSeenChange: true` (Go uses `clock.Now()`); the caller passes it so this stays a pure
935 /// function of its inputs. Returns `true` if any node was actually mutated.
936 fn apply_liveness_changes(
937 &mut self,
938 online_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
939 peer_seen_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
940 now: chrono::DateTime<chrono::Utc>,
941 ) -> bool {
942 let mut changed = false;
943
944 // Channel C — direct online flips (the only writer of `online`).
945 for (&node_id, &online) in online_change {
946 if let Some((_pid, existing)) = self.peer_db.get(&node_id)
947 && existing.online != Some(online)
948 {
949 let mut node = existing.clone();
950 node.online = Some(online);
951 self.peer_db.upsert(&node);
952 changed = true;
953 }
954 }
955
956 // Channel D — peer-seen flips (the only writer of `last_seen`; never touches `online`).
957 // `true` ⇒ last-seen is now; `false` ⇒ last-seen cleared (Go map.go:820-830).
958 for (&node_id, &seen) in peer_seen_change {
959 let new_last_seen = if seen { Some(now) } else { None };
960 if let Some((_pid, existing)) = self.peer_db.get(&node_id)
961 && existing.last_seen != new_last_seen
962 {
963 let mut node = existing.clone();
964 node.last_seen = new_last_seen;
965 self.peer_db.upsert(&node);
966 changed = true;
967 }
968 }
969
970 changed
971 }
972
973 /// Test-only constructor: build a [`PeerTracker`] with a chosen initial TKA authority without
974 /// going through the actor `on_start` path. Returns the tracker plus the **`watch::Sender`** for
975 /// its enforcement-authority cell, so a test can drive the exact enable/disable transitions the
976 /// control runner drives at runtime (`tx.send_replace(Some(..))` ⇒ enforce, `tx.send_replace(None)`
977 /// ⇒ clear). The initial `Some` exercises the fail-closed chokepoint
978 /// ([`tka_admits`](Self::tka_admits)); `None` is the no-lock admit-all path. The returned sender
979 /// must be kept alive for the tracker to read updated values.
980 #[cfg(test)]
981 fn for_test(
982 env: Env,
983 tka_authority: Option<ts_tka::Authority>,
984 ) -> (Self, watch::Sender<Option<Arc<ts_tka::Authority>>>) {
985 let (peer_watch, _) = watch::channel(Vec::new());
986 let (tka_tx, tka_rx) = watch::channel(tka_authority.map(Arc::new));
987 let tracker = Self {
988 peer_db: PeerDb::default(),
989 seen_state_update: false,
990 pending_requests: Vec::new(),
991 peer_watch,
992 user_profiles: HashMap::new(),
993 tka_authority: tka_rx,
994 env,
995 };
996 (tracker, tka_tx)
997 }
998
999 fn service_pending_requests(&mut self) {
1000 if self.seen_state_update {
1001 return;
1002 }
1003
1004 self.seen_state_update = true;
1005
1006 if !self.pending_requests.is_empty() {
1007 tracing::debug!(
1008 n_pending = self.pending_requests.len(),
1009 "state update received, servicing pending requests"
1010 );
1011 }
1012
1013 for req in core::mem::take(&mut self.pending_requests) {
1014 match req {
1015 Pending::PeerByName(PeerByName { name }, reply) => {
1016 reply.send(self.peer_by_name_opt(&name).cloned());
1017 }
1018 Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
1019 reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
1020 }
1021 Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
1022 reply.send(
1023 self.peer_db
1024 .get_route(ip.into())
1025 .map(|(_id, node)| node.clone())
1026 .collect(),
1027 );
1028 }
1029 Pending::Status(reply) => {
1030 reply.send(self.status_peers_with_ids());
1031 }
1032 Pending::WhoIs(Whois { addr }, reply) => {
1033 reply.send(self.whois_opt(addr));
1034 }
1035 }
1036 }
1037 }
1038}
1039
1040#[cfg(test)]
1041mod tka_tests {
1042 //! Tailnet-Lock (TKA) enforcement tests for the peer-trust chokepoint.
1043 //!
1044 //! These exercise [`PeerTracker::tka_admits`] and the `tka_admits ⇒ upsert` loop the netmap
1045 //! handler runs. The test [`ts_tka::Authority`] is built with [`ts_tka::Authority::from_state`]
1046 //! over a known Ed25519 trusted key, and the signed node-key signature CBOR is produced through
1047 //! `ts_tka`'s public `cbor` encoder + `aum_hash` (the exact same canonical bytes `ts_tka`'s own
1048 //! `direct_signature_verifies_end_to_end` test signs, with no new crypto vectors invented and no
1049 //! private `ts_tka` API used).
1050
1051 use ed25519_dalek::{Signer, SigningKey};
1052 use ts_control::{Node, StableNodeId, TailnetAddress};
1053 use ts_tka::{
1054 AumHash, Authority, Key, KeyKind, State,
1055 cbor::{self, Value},
1056 };
1057
1058 use super::*;
1059
1060 /// `SigKind::Direct` wire value (Go `SigKind`; `ts_tka::SigKind::Direct = 1`).
1061 const SIG_KIND_DIRECT: u64 = 1;
1062
1063 /// The 32-byte node key used across the signed-peer fixtures.
1064 const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
1065
1066 /// Build a real [`Env`] for the tracker. Only the bus/keys/shutdown plumbing matters here; the
1067 /// TKA gate reads neither, so the forwarding preferences are all benign defaults.
1068 fn test_env() -> Env {
1069 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
1070 Env::new(
1071 ts_keys::NodeState::generate(),
1072 shutdown_rx,
1073 crate::env::ForwarderConfig {
1074 accept_routes: false,
1075 accept_dns: true,
1076 exit_node: None,
1077 forward_routes: Vec::new(),
1078 forward_tcp_ports: Vec::new(),
1079 forward_udp_ports: Vec::new(),
1080 forward_all_ports: false,
1081 forward_exit_egress: false,
1082 block_incoming: false,
1083 exit_proxy: None,
1084 peerapi_port: None,
1085 taildrop_dir: None,
1086 enable_ipv6: false,
1087 persistent_keepalive_interval: None,
1088 ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1089 },
1090 )
1091 }
1092
1093 /// A minimal peer [`Node`] carrying `node_key` and the given `key_signature`.
1094 fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
1095 Node {
1096 id: 1,
1097 stable_id: StableNodeId(stable_id.to_string()),
1098 hostname: stable_id.to_string(),
1099 user_id: 0,
1100 tailnet: Some("ts.net".to_string()),
1101 tags: Vec::new(),
1102 tailnet_address: TailnetAddress {
1103 ipv4: "100.64.0.1/32".parse().unwrap(),
1104 ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
1105 },
1106 node_key: node_key.into(),
1107 node_key_expiry: None,
1108 online: None,
1109 last_seen: None,
1110 key_signature,
1111 machine_key: None,
1112 disco_key: None,
1113 accepted_routes: Vec::new(),
1114 underlay_addresses: Vec::new(),
1115 derp_region: None,
1116 cap: Default::default(),
1117 cap_map: Default::default(),
1118 peerapi_port: None,
1119 peerapi_dns_proxy: false,
1120 is_wireguard_only: false,
1121 exit_node_dns_resolvers: Vec::new(),
1122 peer_relay: false,
1123 service_vips: Default::default(),
1124 }
1125 }
1126
1127 /// Encode a `Direct` [`ts_tka::NodeKeySignature`] CBOR exactly as `ts_tka`'s private `to_cbor`
1128 /// does (int-map keys: 1=kind, 2=pubkey, 3=key_id, 4=signature; empty byte fields omitted),
1129 /// using only the crate's *public* `cbor` encoder. `signature` of `None` produces the
1130 /// signing-digest preimage (the `SigHash` form).
1131 fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
1132 let mut pairs = alloc_pairs(node_key, key_id);
1133 if let Some(sig) = signature {
1134 pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
1135 }
1136 cbor::int_map(pairs).to_vec()
1137 }
1138
1139 fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
1140 vec![
1141 (1, Some(Value::Uint(SIG_KIND_DIRECT))),
1142 (2, Some(Value::Bytes(node_key.to_vec()))),
1143 (3, Some(Value::Bytes(key_id.to_vec()))),
1144 ]
1145 }
1146
1147 /// Build a TKA [`Authority`] that trusts `signing.verifying_key()`, plus a valid `Direct`
1148 /// node-key signature CBOR authorizing [`NODE_KEY_BYTES`] under it.
1149 fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
1150 // A fixed, known Ed25519 trusted key (mirrors ts_tka's own end-to-end test seed).
1151 let signing = SigningKey::from_bytes(&[42u8; 32]);
1152 let trusted_pub = signing.verifying_key().to_bytes().to_vec();
1153
1154 let authority = Authority::from_state(
1155 AumHash([0; 32]),
1156 State {
1157 keys: vec![Key {
1158 kind: KeyKind::Ed25519,
1159 votes: 1,
1160 public: trusted_pub.clone(),
1161 }],
1162 },
1163 );
1164
1165 // SigHash preimage = canonical CBOR with the signature field omitted; sign its blake2s hash.
1166 let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
1167 let sig_hash = ts_tka::aum_hash(&preimage).0;
1168 let signature = signing.sign(&sig_hash).to_bytes().to_vec();
1169
1170 let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
1171 // Sanity: the authority accepts the signature we just built (same path the gate uses).
1172 assert!(
1173 authority
1174 .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
1175 .is_ok()
1176 );
1177
1178 (authority, signed_cbor)
1179 }
1180
1181 #[tokio::test]
1182 async fn tka_inactive_upserts_all_peers() {
1183 // No authority ⇒ enforcement inactive ⇒ both a signed and an unsigned peer are admitted.
1184 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1185
1186 let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
1187 let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
1188
1189 assert!(tracker.tka_admits(&signed));
1190 assert!(tracker.tka_admits(&unsigned));
1191
1192 tracker.peer_db.upsert(&signed);
1193 tracker.peer_db.upsert(&unsigned);
1194 assert_eq!(tracker.peer_db.peers().len(), 2);
1195 }
1196
1197 #[tokio::test]
1198 async fn tka_active_rejects_unsigned_peer() {
1199 // Authority present + peer presents no signature ⇒ rejected (fail-closed), not in peer_db.
1200 let (authority, _sig) = authority_and_valid_sig();
1201 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1202
1203 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1204 assert!(!tracker.tka_admits(&unsigned));
1205
1206 // Mirror the handler's `if !tka_admits { continue }` loop.
1207 if tracker.tka_admits(&unsigned) {
1208 tracker.peer_db.upsert(&unsigned);
1209 }
1210 assert_eq!(tracker.peer_db.peers().len(), 0);
1211 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1212 }
1213
1214 #[tokio::test]
1215 async fn tka_active_rejects_bad_signature() {
1216 // Authority present + a signature that fails to verify ⇒ rejected, not in peer_db.
1217 let (authority, mut sig) = authority_and_valid_sig();
1218 // Tamper the last byte (the trailing signature byte) so verification fails.
1219 let last = sig.len() - 1;
1220 sig[last] ^= 0xff;
1221
1222 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1223 let bad = peer_node("bad", NODE_KEY_BYTES, sig);
1224 assert!(!tracker.tka_admits(&bad));
1225
1226 if tracker.tka_admits(&bad) {
1227 tracker.peer_db.upsert(&bad);
1228 }
1229 assert_eq!(tracker.peer_db.peers().len(), 0);
1230 }
1231
1232 #[tokio::test]
1233 async fn tka_active_admits_authorized_peer() {
1234 // Authority present + correctly-signed node key ⇒ admitted and upserted.
1235 let (authority, sig) = authority_and_valid_sig();
1236 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1237
1238 let good = peer_node("good", NODE_KEY_BYTES, sig);
1239 assert!(tracker.tka_admits(&good));
1240
1241 if tracker.tka_admits(&good) {
1242 tracker.peer_db.upsert(&good);
1243 }
1244 assert_eq!(tracker.peer_db.peers().len(), 1);
1245 assert!(tracker.peer_db.get(&good.node_key).is_some());
1246 }
1247
1248 // ---------------------------------------------------------------------------------------------
1249 // Tests that drive REAL `PeerUpdate`s through the shared handler body
1250 // ([`PeerTracker::apply_peer_update`], the single source of truth the actor's netmap `handle`
1251 // also calls), so the two real upsert sites (`Full` and `Delta { upsert }`) are exercised via
1252 // the actual enforcement path — not by hand-mirroring `if !tka_admits { continue }`.
1253 // ---------------------------------------------------------------------------------------------
1254
1255 #[tokio::test]
1256 async fn tka_active_delta_upsert_rejects_unauthorized() {
1257 // Drive a real `Delta { upsert }` whose peer carries no signature. The Delta upsert site
1258 // must reject it under an active authority ⇒ not present in peer_db after the handler runs.
1259 let (authority, _sig) = authority_and_valid_sig();
1260 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1261
1262 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1263 let update = ts_control::PeerUpdate::Delta {
1264 upsert: vec![unsigned.clone()],
1265 remove: Vec::new(),
1266 };
1267
1268 tracker.apply_peer_update(&update);
1269
1270 assert_eq!(tracker.peer_db.peers().len(), 0);
1271 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1272 }
1273
1274 #[tokio::test]
1275 async fn tka_active_delta_upsert_admits_authorized() {
1276 // Drive a real `Delta { upsert }` with a correctly-signed peer ⇒ present in peer_db.
1277 let (authority, sig) = authority_and_valid_sig();
1278 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1279
1280 let good = peer_node("good", NODE_KEY_BYTES, sig);
1281 let update = ts_control::PeerUpdate::Delta {
1282 upsert: vec![good.clone()],
1283 remove: Vec::new(),
1284 };
1285
1286 tracker.apply_peer_update(&update);
1287
1288 assert_eq!(tracker.peer_db.peers().len(), 1);
1289 assert!(tracker.peer_db.get(&good.node_key).is_some());
1290 }
1291
1292 #[tokio::test]
1293 async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
1294 // Drive a real `Full` carrying a MIX of authorized + unauthorized peers. Only the
1295 // correctly-signed peer survives the Full upsert site; the unsigned and bad-sig peers are
1296 // dropped fail-closed.
1297 let (authority, sig) = authority_and_valid_sig();
1298 // A bad-sig variant of the same authorized signature (tamper the trailing byte).
1299 let mut bad_sig = sig.clone();
1300 let last = bad_sig.len() - 1;
1301 bad_sig[last] ^= 0xff;
1302
1303 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1304
1305 // Only the authorized peer carries NODE_KEY_BYTES (the key the authority signed); the
1306 // rejected peers use distinct node keys so the survivor is unambiguous.
1307 let good = peer_node("good", NODE_KEY_BYTES, sig);
1308 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1309 let bad = peer_node("bad", [9u8; 32], bad_sig);
1310
1311 let update =
1312 ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1313
1314 tracker.apply_peer_update(&update);
1315
1316 assert_eq!(tracker.peer_db.peers().len(), 1);
1317 assert!(tracker.peer_db.get(&good.node_key).is_some());
1318 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1319 assert!(tracker.peer_db.get(&bad.node_key).is_none());
1320 }
1321
1322 /// End-to-end through the REAL enforcement-authority transport (the `watch` cell the control
1323 /// runner writes), not a direct field poke: writing `Some(authority)` flips enforcement on so a
1324 /// mixed batch drops the unsigned/bad peers, and a subsequent `None` (lock disabled) clears
1325 /// enforcement so a peer DROPPED while enforced is re-admitted. Exercises the exact `borrow`-based
1326 /// read path `tka_admits` uses — a broken receiver wiring would pass every for_test-field test but
1327 /// fail here.
1328 #[tokio::test]
1329 async fn tka_authority_watch_enables_then_clears_enforcement() {
1330 let (authority, sig) = authority_and_valid_sig();
1331 let mut bad_sig = sig.clone();
1332 let last = bad_sig.len() - 1;
1333 bad_sig[last] ^= 0xff;
1334
1335 let (mut tracker, tka_tx) = PeerTracker::for_test(test_env(), None);
1336
1337 // 1) No authority yet ⇒ admit-all (Go b.tka == nil).
1338 let good = peer_node("good", NODE_KEY_BYTES, sig.clone());
1339 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1340 let bad = peer_node("bad", [9u8; 32], bad_sig);
1341 let batch = ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1342 tracker.apply_peer_update(&batch);
1343 assert_eq!(tracker.peer_db.peers().len(), 3, "no lock ⇒ admit all");
1344
1345 // 2) Publish the verified authority over the watch cell (exactly what the control runner does
1346 // on a successful sync) ⇒ enforcement ON. A re-applied Full now drops unsigned + bad.
1347 tka_tx.send_replace(Some(Arc::new(authority)));
1348 tracker.apply_peer_update(&batch);
1349 assert_eq!(
1350 tracker.peer_db.peers().len(),
1351 1,
1352 "lock active ⇒ only the signed peer survives"
1353 );
1354 assert!(tracker.peer_db.get(&good.node_key).is_some());
1355 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1356 assert!(tracker.peer_db.get(&bad.node_key).is_none());
1357
1358 // 3) Lock disabled (None) ⇒ enforcement cleared ⇒ a peer that was DROPPED while enforced is
1359 // re-admitted by a fresh netmap. Assert the specific previously-dropped key returns (not
1360 // merely a count), so this proves the drop→clear→re-admit transition, not "admit-all-fresh".
1361 tka_tx.send_replace(None);
1362 tracker.apply_peer_update(&batch);
1363 assert_eq!(
1364 tracker.peer_db.peers().len(),
1365 3,
1366 "lock disabled ⇒ admit all again"
1367 );
1368 assert!(
1369 tracker.peer_db.get(&unsigned.node_key).is_some(),
1370 "the peer dropped under enforcement must come back once the lock is cleared"
1371 );
1372 assert!(tracker.peer_db.get(&bad.node_key).is_some());
1373 }
1374
1375 /// Degenerate input: two DISTINCT nodes sharing one `stable_id` in a single `Full`, one with a
1376 /// valid signature and one unsigned, under an active lock. Each node is judged by its OWN verdict
1377 /// (the per-node `admits` vector), so the unsigned node is never admitted on the strength of its
1378 /// signed twin. The single-verify `Full` refactor keeps this per-node semantics (a stable_id-set
1379 /// alone would have admitted whichever node was upserted last). Malformed control input; asserted
1380 /// only to lock the verdict-per-node behavior against regression.
1381 #[tokio::test]
1382 async fn tka_full_duplicate_stable_id_judges_each_node_on_its_own_signature() {
1383 let (authority, sig) = authority_and_valid_sig();
1384 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1385
1386 // Both carry stable_id "dup"; the signed one authorizes NODE_KEY_BYTES, the other is unsigned
1387 // and uses a different node key. Order them unsigned-last so a last-writer-wins stable_id set
1388 // would (wrongly) leave the unsigned node's key in the db.
1389 let signed = peer_node("dup", NODE_KEY_BYTES, sig);
1390 let unsigned = peer_node("dup", [8u8; 32], vec![]);
1391 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
1392 signed.clone(),
1393 unsigned.clone(),
1394 ]));
1395
1396 // The unsigned node's own verdict failed, so its key must NOT be present, regardless of the
1397 // shared stable_id. (The signed twin retained the stable_id; the db holds the signed key.)
1398 assert!(
1399 tracker.peer_db.get(&unsigned.node_key).is_none(),
1400 "a node whose own signature fails must not be admitted via a stable_id twin"
1401 );
1402 assert!(tracker.peer_db.get(&signed.node_key).is_some());
1403 }
1404
1405 /// The empty-trusted-key-state brick-guard: an authority with no keys must NOT drop the whole
1406 /// netmap (a `ts_tka` invariant violation / replayer edge). A verified chain always carries ≥1
1407 /// key, so this never weakens a genuine lock — it only prevents a black-hole. Uses ≥2 peers
1408 /// (one signed, one unsigned) to prove it admits **all**, not accidentally just one.
1409 #[tokio::test]
1410 async fn tka_empty_keyset_authority_admits_all() {
1411 use ts_tka::{AumHash, Authority, State};
1412 let empty_auth = Authority::from_state(AumHash([0u8; 32]), State { keys: Vec::new() });
1413 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(empty_auth));
1414 let signed = peer_node("signed", [7u8; 32], vec![0xde, 0xad]);
1415 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1416 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
1417 signed.clone(),
1418 unsigned.clone(),
1419 ]));
1420 assert_eq!(
1421 tracker.peer_db.peers().len(),
1422 2,
1423 "an empty-keyset authority must admit ALL peers (brick-guard), not enforce"
1424 );
1425 }
1426
1427 /// Signature-replay / `NodeKeyMismatch`: a structurally-valid signature that authorizes
1428 /// `NODE_KEY_BYTES` must NOT admit a DIFFERENT node key carrying that same signature blob. This is
1429 /// the highest-value bypass — if the sig↔node-key binding in `verify_signature` were dropped, this
1430 /// is the only test that would catch it (the other "bad" peers only flip a byte ⇒ `BadSignature`).
1431 #[tokio::test]
1432 async fn tka_active_rejects_valid_sig_for_wrong_node_key() {
1433 let (authority, sig) = authority_and_valid_sig();
1434 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1435
1436 // The signature authorizes NODE_KEY_BYTES; attach it to an imposter with a different key.
1437 let imposter = peer_node("imposter", [0x55u8; 32], sig);
1438 assert!(
1439 !tracker.tka_admits(&imposter),
1440 "a signature bound to one node key must not authorize a different node key"
1441 );
1442 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![imposter.clone()]));
1443 assert!(tracker.peer_db.get(&imposter.node_key).is_none());
1444 }
1445
1446 /// `UntrustedKey`: a signature produced by a well-formed Ed25519 key that is NOT in the
1447 /// authority's trusted-key state must be rejected — distinct from a tampered-byte `BadSignature`.
1448 #[tokio::test]
1449 async fn tka_active_rejects_sig_from_untrusted_key() {
1450 use ed25519_dalek::{Signer, SigningKey};
1451 let (authority, _sig) = authority_and_valid_sig();
1452 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1453
1454 // Sign a valid CBOR with a DIFFERENT key (not the one the authority trusts). The key_id in
1455 // the signature names this untrusted key, so `get_key` misses ⇒ UntrustedKey.
1456 let rogue = SigningKey::from_bytes(&[99u8; 32]);
1457 let rogue_pub = rogue.verifying_key().to_bytes().to_vec();
1458 let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &rogue_pub, None);
1459 let sig_hash = ts_tka::aum_hash(&preimage).0;
1460 let signature = rogue.sign(&sig_hash).to_bytes().to_vec();
1461 let rogue_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &rogue_pub, Some(&signature));
1462
1463 let peer = peer_node("rogue-signed", NODE_KEY_BYTES, rogue_cbor);
1464 assert!(
1465 !tracker.tka_admits(&peer),
1466 "a signature from a key outside the trusted set must be rejected"
1467 );
1468 // Drive the real upsert path too (match the sibling replay test's depth): an untrusted-key
1469 // signature must keep the peer out of the db, not merely fail the verdict in isolation.
1470 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1471 assert!(tracker.peer_db.get(&peer.node_key).is_none());
1472 }
1473
1474 /// Bus-enable analogue for `Delta`: enforcement engaged via the watch cell must also gate a
1475 /// `Delta { upsert }` (not only `Full`). Closes the "authority arrived over the transport AND the
1476 /// next update is a Delta" combination.
1477 #[tokio::test]
1478 async fn tka_watch_enable_enforces_delta_upsert() {
1479 let (authority, sig) = authority_and_valid_sig();
1480 let (mut tracker, tka_tx) = PeerTracker::for_test(test_env(), None);
1481 tka_tx.send_replace(Some(Arc::new(authority)));
1482
1483 let good = peer_node("good", NODE_KEY_BYTES, sig);
1484 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1485 tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1486 remove: vec![],
1487 upsert: vec![good.clone(), unsigned.clone()],
1488 });
1489 assert!(tracker.peer_db.get(&good.node_key).is_some());
1490 assert!(
1491 tracker.peer_db.get(&unsigned.node_key).is_none(),
1492 "delta upsert under an active lock must drop the unsigned peer"
1493 );
1494 }
1495
1496 /// A `Delta` re-upsert of an ALREADY-ADMITTED peer whose signature is now invalid must EVICT the
1497 /// stale entry (revocation-via-delta), not leave it admitted. Go re-filters the whole netmap each
1498 /// response, so a now-unsigned peer would not survive there either.
1499 #[tokio::test]
1500 async fn tka_delta_reupsert_with_invalid_sig_evicts_existing() {
1501 let (authority, sig) = authority_and_valid_sig();
1502 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1503
1504 // Admit the signed peer.
1505 let good = peer_node("good", NODE_KEY_BYTES, sig.clone());
1506 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1507 assert!(tracker.peer_db.get(&good.node_key).is_some());
1508
1509 // Re-upsert the SAME stable_id (now with no signature) via a delta ⇒ evicted, not retained.
1510 let revoked = peer_node("good", NODE_KEY_BYTES, vec![]);
1511 tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1512 remove: vec![],
1513 upsert: vec![revoked],
1514 });
1515 assert!(
1516 tracker.peer_db.get(&good.node_key).is_none(),
1517 "a delta re-upsert that fails the lock must evict the previously-admitted peer"
1518 );
1519 }
1520
1521 #[tokio::test]
1522 async fn tka_full_resync_revocation_behavior() {
1523 // Revocation-on-resync: admit a peer, then re-include the SAME stable_id in a `Full` with a
1524 // now-invalid signature. Per the Logic review finding, the pre-fix `retain` kept the stale
1525 // (previously-admitted) entry because membership was decided purely by stable_id.
1526 //
1527 // FIXED (not merely documented): the `Full` `retain` now keys on `tka_admits`-passing
1528 // stable_ids, so a peer whose re-included signature no longer verifies under the active
1529 // authority is EVICTED. This test asserts eviction. The inactive (authority=None) path is
1530 // provably unchanged — `tka_admits` always returns `true` there, so the retained set equals
1531 // the set of re-included stable_ids exactly (see `tka_inactive_full_resync_keeps_*`).
1532 let (authority, sig) = authority_and_valid_sig();
1533 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1534
1535 // 1) Admit the peer with a valid signature via a real `Full`.
1536 let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
1537 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1538 assert_eq!(tracker.peer_db.peers().len(), 1);
1539 assert!(tracker.peer_db.get(&good.node_key).is_some());
1540
1541 // 2) Re-sync the SAME stable_id, but with a now-invalid signature (tamper trailing byte).
1542 let mut bad_sig = sig;
1543 let last = bad_sig.len() - 1;
1544 bad_sig[last] ^= 0xff;
1545 let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
1546 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
1547
1548 // Eviction: the stale entry is dropped because its re-included signature fails the gate.
1549 assert_eq!(tracker.peer_db.peers().len(), 0);
1550 assert!(tracker.peer_db.get(&revoked.node_key).is_none());
1551 }
1552
1553 #[tokio::test]
1554 async fn tka_inactive_full_resync_keeps_reincluded_peer() {
1555 // Guard the inactive (authority=None) path against the revocation fix: with no authority,
1556 // a peer re-included in a `Full` survives regardless of its signature bytes — byte-for-byte
1557 // pre-TKA behavior, proving the `Full` `retain` change does not regress the always-taken
1558 // branch this wave.
1559 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1560
1561 let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
1562 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1563 assert_eq!(tracker.peer_db.peers().len(), 1);
1564
1565 // Re-sync the same stable_id with garbage signature bytes; inactive enforcement keeps it.
1566 let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
1567 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
1568 assert_eq!(tracker.peer_db.peers().len(), 1);
1569 assert!(tracker.peer_db.get(&resynced.node_key).is_some());
1570 }
1571
1572 /// A `Patch` for a peer already in the netmap merges only the fields it carries — here new UDP
1573 /// endpoints and a new home DERP — leaving the rest of the node intact. This is the fix for
1574 /// dropped `peers_changed_patch`: without it the netmap keeps stale endpoints and the peer can
1575 /// never re-handshake after it moves.
1576 #[tokio::test]
1577 async fn patch_merges_endpoints_and_derp_into_existing_peer() {
1578 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1579
1580 // Seed a peer (id == 1, per `peer_node`) with no endpoints / no DERP.
1581 let peer = peer_node("mover", [1u8; 32], vec![]);
1582 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1583 let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1584 assert!(before.underlay_addresses.is_empty());
1585 assert!(before.derp_region.is_none());
1586
1587 // Patch in fresh reachability (the idle-peer-reconnect case).
1588 let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1589 let patch = ts_control::PeerChange {
1590 id: 1,
1591 derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
1592 cap: None,
1593 cap_map: None,
1594 underlay_addresses: Some(vec![new_ep]),
1595 node_key: None,
1596 key_signature: None,
1597 disco_key: None,
1598 node_key_expiry: None,
1599 online: None,
1600 last_seen: None,
1601 };
1602 let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1603
1604 assert_eq!(upserts.len(), 1);
1605 assert_eq!(deletions.len(), 0);
1606 // Same peer, now carrying the patched endpoint + DERP; node key untouched.
1607 assert_eq!(tracker.peer_db.peers().len(), 1);
1608 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1609 assert_eq!(after.underlay_addresses, vec![new_ep]);
1610 assert_eq!(
1611 after.derp_region,
1612 Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
1613 );
1614 assert_eq!(after.node_key, peer.node_key);
1615 }
1616
1617 /// Regression for `tsr-5u0`: when a whole-node set (`Delta`/`Full`) and a patch co-occur in one
1618 /// response, the patch is applied *on top of* the node the set just upserted — mirroring the
1619 /// handler's apply-order (peer set first, then `peer_patches`). Before the fix the patch shared
1620 /// the single `peer_update` slot and the co-occurring set silently dropped it, so a peer brought
1621 /// in by the delta kept stale (empty) reachability.
1622 #[tokio::test]
1623 async fn patch_applies_on_top_of_co_occurring_delta() {
1624 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1625
1626 // The whole-node delta upserts a brand-new peer (id == 1) with no reachability.
1627 let peer = peer_node("mover", [1u8; 32], vec![]);
1628 let (set_upserts, _) = tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1629 upsert: vec![peer.clone()],
1630 remove: vec![],
1631 });
1632 assert_eq!(set_upserts.len(), 1, "delta upserts the new peer");
1633
1634 // The patch from the SAME response then sets that peer's endpoints + DERP. This is exactly
1635 // the consumer order the handler runs (apply_peer_update then apply_peer_patches).
1636 let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1637 let patch = ts_control::PeerChange {
1638 id: 1,
1639 derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap())),
1640 cap: None,
1641 cap_map: None,
1642 underlay_addresses: Some(vec![new_ep]),
1643 node_key: None,
1644 key_signature: None,
1645 disco_key: None,
1646 node_key_expiry: None,
1647 online: None,
1648 last_seen: None,
1649 };
1650 let (patch_upserts, patch_deletions) =
1651 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1652
1653 assert_eq!(
1654 patch_upserts.len(),
1655 1,
1656 "patch re-upserts the just-added peer"
1657 );
1658 assert_eq!(patch_deletions.len(), 0);
1659 // The peer added by the delta now carries the patched reachability — the patch was NOT lost.
1660 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1661 assert_eq!(after.underlay_addresses, vec![new_ep]);
1662 assert_eq!(
1663 after.derp_region,
1664 Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap()))
1665 );
1666 }
1667
1668 /// A `Patch` whose node id is not in the current netmap is ignored (the wire contract: a patch
1669 /// never creates a node). No upsert, no deletion, peer set unchanged.
1670 #[tokio::test]
1671 async fn patch_for_unknown_node_is_ignored() {
1672 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1673 let known = peer_node("known", [1u8; 32], vec![]); // id == 1
1674 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1675
1676 let patch = ts_control::PeerChange {
1677 id: 999, // not in the netmap
1678 derp_region: None,
1679 cap: None,
1680 cap_map: None,
1681 underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1682 node_key: None,
1683 key_signature: None,
1684 disco_key: None,
1685 node_key_expiry: None,
1686 online: None,
1687 last_seen: None,
1688 };
1689 let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1690
1691 assert_eq!(upserts.len(), 0);
1692 assert_eq!(deletions.len(), 0);
1693 assert_eq!(tracker.peer_db.peers().len(), 1);
1694 assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1695 }
1696
1697 /// An expiry-only `Patch` updates `node_key_expiry` on the matching peer (Go
1698 /// `PeerChange.KeyExpiry`), rather than being silently dropped until the next full resync.
1699 #[tokio::test]
1700 async fn patch_updates_node_key_expiry() {
1701 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1702 let peer = peer_node("expiring", [1u8; 32], vec![]); // id == 1, node_key_expiry: None
1703 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1704
1705 let expiry = "2027-01-01T00:00:00Z"
1706 .parse::<chrono::DateTime<chrono::Utc>>()
1707 .unwrap();
1708 let patch = ts_control::PeerChange {
1709 id: 1,
1710 derp_region: None,
1711 cap: None,
1712 cap_map: None,
1713 underlay_addresses: None,
1714 node_key: None,
1715 key_signature: None,
1716 disco_key: None,
1717 node_key_expiry: Some(expiry),
1718 online: None,
1719 last_seen: None,
1720 };
1721 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1722
1723 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1724 assert_eq!(after.node_key_expiry, Some(expiry));
1725 }
1726
1727 /// Channel B: a `PeerChange.online` patch flips a peer's online state without a full node.
1728 #[tokio::test]
1729 async fn patch_updates_online() {
1730 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1731 let peer = peer_node("p", [1u8; 32], vec![]); // id == 1, online: None
1732 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1733 assert_eq!(
1734 tracker
1735 .peer_db
1736 .get(&(1 as ts_control::NodeId))
1737 .unwrap()
1738 .1
1739 .online,
1740 None
1741 );
1742
1743 let mut patch = ts_control::PeerChange {
1744 id: 1,
1745 derp_region: None,
1746 cap: None,
1747 cap_map: None,
1748 underlay_addresses: None,
1749 node_key: None,
1750 key_signature: None,
1751 disco_key: None,
1752 node_key_expiry: None,
1753 online: Some(true),
1754 last_seen: None,
1755 };
1756 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1757 assert_eq!(
1758 tracker
1759 .peer_db
1760 .get(&(1 as ts_control::NodeId))
1761 .unwrap()
1762 .1
1763 .online,
1764 Some(true),
1765 "PeerChange.online=Some(true) marks the peer online"
1766 );
1767
1768 // A subsequent patch flips it offline.
1769 patch.online = Some(false);
1770 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1771 assert_eq!(
1772 tracker
1773 .peer_db
1774 .get(&(1 as ts_control::NodeId))
1775 .unwrap()
1776 .1
1777 .online,
1778 Some(false)
1779 );
1780 }
1781
1782 /// Channel C/D (Go `map.go:updatePeersStateFromResponse`): `online_change` is the sole driver of
1783 /// `online`; `peer_seen_change` is the sole driver of `last_seen` (true ⇒ now, false ⇒ cleared)
1784 /// and must NEVER touch `online`. Both apply to a peer already in the netmap and ignore unknown
1785 /// ids. This pins the fix for the prior bug where channel D wrote `online=false` (conflating
1786 /// "not seen recently" with "offline" — distinct signals in Go).
1787 #[tokio::test]
1788 async fn liveness_change_maps_apply_online() {
1789 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1790 let peer = peer_node("p", [1u8; 32], vec![]); // id == 1
1791 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1792 // A fixed timestamp (chrono is built without its `clock` feature, so no `Utc::now()`).
1793 let now = chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap();
1794
1795 // Channel C: online_change sets online=true.
1796 let mut online_change = std::collections::BTreeMap::new();
1797 online_change.insert(1 as ts_control::NodeId, true);
1798 online_change.insert(999 as ts_control::NodeId, true); // unknown id — ignored
1799 let changed = tracker.apply_liveness_changes(&online_change, &Default::default(), now);
1800 assert!(changed);
1801 assert_eq!(
1802 tracker
1803 .peer_db
1804 .get(&(1 as ts_control::NodeId))
1805 .unwrap()
1806 .1
1807 .online,
1808 Some(true)
1809 );
1810
1811 // Channel D: peer_seen_change=true sets last_seen=now and leaves online UNTOUCHED.
1812 let mut seen_true = std::collections::BTreeMap::new();
1813 seen_true.insert(1 as ts_control::NodeId, true);
1814 let changed = tracker.apply_liveness_changes(&Default::default(), &seen_true, now);
1815 assert!(changed);
1816 {
1817 let (_id, node) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1818 assert_eq!(
1819 node.last_seen,
1820 Some(now),
1821 "peer_seen_change=true sets last_seen=now"
1822 );
1823 assert_eq!(
1824 node.online,
1825 Some(true),
1826 "channel D must NOT touch online (still true from channel C)"
1827 );
1828 }
1829
1830 // Channel D: peer_seen_change=false clears last_seen, still leaving online untouched.
1831 let mut seen_false = std::collections::BTreeMap::new();
1832 seen_false.insert(1 as ts_control::NodeId, false);
1833 let changed = tracker.apply_liveness_changes(&Default::default(), &seen_false, now);
1834 assert!(changed);
1835 {
1836 let (_id, node) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1837 assert_eq!(
1838 node.last_seen, None,
1839 "peer_seen_change=false clears last_seen"
1840 );
1841 assert_eq!(node.online, Some(true), "channel D must NOT mark offline");
1842 }
1843 assert_eq!(
1844 tracker.peer_db.peers().len(),
1845 1,
1846 "the node is retained, not removed"
1847 );
1848
1849 // No-op when nothing matches / changes.
1850 assert!(!tracker.apply_liveness_changes(&Default::default(), &Default::default(), now));
1851 }
1852
1853 /// Security: a `Patch` that rotates the node key must re-satisfy the tailnet-lock authority,
1854 /// exactly like a `Delta` upsert. A key-rotation patch whose new signature does NOT verify
1855 /// evicts the peer (fail-closed) rather than leaving a now-unverified entry — closing what would
1856 /// otherwise be a trust-enforcement bypass via the patch path.
1857 #[tokio::test]
1858 async fn patch_key_rotation_failing_tka_evicts_peer() {
1859 let (authority, sig) = authority_and_valid_sig();
1860 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1861
1862 // Admit a correctly-signed peer (id == 1).
1863 let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1864 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1865 assert_eq!(tracker.peer_db.peers().len(), 1);
1866
1867 // Patch a new node key whose signature is garbage under the active authority.
1868 let patch = ts_control::PeerChange {
1869 id: 1,
1870 derp_region: None,
1871 cap: None,
1872 cap_map: None,
1873 underlay_addresses: None,
1874 node_key: Some([0x33u8; 32].into()),
1875 key_signature: Some(vec![0x00, 0x01, 0x02]),
1876 disco_key: None,
1877 node_key_expiry: None,
1878 online: None,
1879 last_seen: None,
1880 };
1881 let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1882
1883 assert_eq!(upserts.len(), 0);
1884 assert_eq!(deletions.len(), 1);
1885 assert_eq!(tracker.peer_db.peers().len(), 0);
1886 }
1887
1888 /// A node's `user_id` joins against the accumulated UserProfiles table to resolve the owning
1889 /// user's login name in `WhoIs.user`. With no matching profile, `user` is `None` (the
1890 /// pre-existing behavior); once a profile arrives, the same node resolves to its login. This
1891 /// proves the accumulate-then-join path the netmap handler builds.
1892 fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1893 ts_control::UserProfile {
1894 id,
1895 login_name: login.to_string(),
1896 display_name: None,
1897 }
1898 }
1899
1900 #[tokio::test]
1901 async fn whois_resolves_user_from_accumulated_profiles() {
1902 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1903
1904 // A peer owned by user id 42 at 100.64.0.1 (the peer_node fixture's address).
1905 let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1906 peer.user_id = 42;
1907 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1908 let addr = "100.64.0.1:0".parse().unwrap();
1909
1910 // No profile yet: the node resolves but its owner is unknown.
1911 let who = tracker.whois_opt(addr).expect("peer is known");
1912 assert_eq!(who.user, None);
1913
1914 // Profile for a DIFFERENT user must not match.
1915 tracker
1916 .user_profiles
1917 .insert(7, profile(7, "someone-else@example.com"));
1918 assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1919
1920 // The owning user's profile arrives (as the netmap handler would accumulate it): now the
1921 // login resolves.
1922 tracker
1923 .user_profiles
1924 .insert(42, profile(42, "alice@example.com"));
1925 assert_eq!(
1926 tracker.whois_opt(addr).unwrap().user,
1927 Some("alice@example.com".to_string())
1928 );
1929 }
1930
1931 /// `UserProfile::best_label` prefers the login name, falling back to display name, else `None`.
1932 #[test]
1933 fn user_profile_best_label_prefers_login() {
1934 assert_eq!(
1935 profile(1, "alice@example.com").best_label(),
1936 Some("alice@example.com".to_string())
1937 );
1938 let display_only = ts_control::UserProfile {
1939 id: 2,
1940 login_name: String::new(),
1941 display_name: Some("Bob".to_string()),
1942 };
1943 assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1944 let empty = ts_control::UserProfile {
1945 id: 3,
1946 login_name: String::new(),
1947 display_name: None,
1948 };
1949 assert_eq!(empty.best_label(), None);
1950 }
1951
1952 // ----- tsr-jo1: RotationTracker (Go ipnlocal.rotationTracker.obsoleteKeys) -----
1953
1954 /// A `RotationDetails` for a `Direct`-rooted chain with the given prior keys + wrapping key.
1955 fn rot_details(
1956 prev: &[&[u8]],
1957 wrapping: &[u8],
1958 kind: ts_tka::SigKind,
1959 ) -> ts_tka::RotationDetails {
1960 ts_tka::RotationDetails {
1961 prev_node_keys: prev.iter().map(|p| p.to_vec()).collect(),
1962 initial_sig_kind: kind,
1963 initial_wrapping_pubkey: wrapping.to_vec(),
1964 }
1965 }
1966
1967 /// Rule 1: every prior node key named by any rotation chain is obsolete, regardless of the
1968 /// chain's root kind (Go's ungated `obsolete.AddSlice(d.PrevNodeKeys)`).
1969 #[test]
1970 fn rotation_tracker_prev_keys_always_obsolete() {
1971 let mut t = RotationTracker::default();
1972 // A Direct-rooted chain that rotated away OLD1, and a Credential-rooted one that rotated OLD2.
1973 t.add(
1974 b"newA".to_vec(),
1975 &rot_details(&[b"OLD1"], b"wrapA", ts_tka::SigKind::Direct),
1976 );
1977 t.add(
1978 b"newB".to_vec(),
1979 &rot_details(&[b"OLD2"], b"wrapB", ts_tka::SigKind::Credential),
1980 );
1981 let obsolete = t.obsolete_keys();
1982 assert!(
1983 obsolete.contains(b"OLD1".as_slice()),
1984 "Direct chain's prior key obsolete"
1985 );
1986 assert!(
1987 obsolete.contains(b"OLD2".as_slice()),
1988 "Credential chain's prior key obsolete too (rule 1 is ungated)"
1989 );
1990 // The current keys themselves are not obsolete (only one peer per wrapping key here).
1991 assert!(!obsolete.contains(b"newA".as_slice()));
1992 assert!(!obsolete.contains(b"newB".as_slice()));
1993 }
1994
1995 /// Rule 2: among `Direct`-rooted chains sharing a wrapping key, only the longest survives; the
1996 /// shorter (older) clone's key is obsolete.
1997 #[test]
1998 fn rotation_tracker_unequal_chain_keeps_longest() {
1999 let mut t = RotationTracker::default();
2000 // Same wrapping key; "long" has 2 prior keys, "short" has 1 ⇒ "short" is the older clone.
2001 t.add(
2002 b"long".to_vec(),
2003 &rot_details(&[b"p1", b"p2"], b"wrap", ts_tka::SigKind::Direct),
2004 );
2005 t.add(
2006 b"short".to_vec(),
2007 &rot_details(&[b"q1"], b"wrap", ts_tka::SigKind::Direct),
2008 );
2009 let obsolete = t.obsolete_keys();
2010 assert!(
2011 obsolete.contains(b"short".as_slice()),
2012 "the shorter-chain clone is obsolete"
2013 );
2014 assert!(
2015 !obsolete.contains(b"long".as_slice()),
2016 "the longest-chain peer survives"
2017 );
2018 }
2019
2020 /// Rule 2 tie: two `Direct`-rooted chains sharing a wrapping key with EQUAL chain length cannot
2021 /// be disambiguated ⇒ BOTH are dropped (Go's safety branch).
2022 #[test]
2023 fn rotation_tracker_equal_chain_drops_both() {
2024 let mut t = RotationTracker::default();
2025 t.add(
2026 b"cloneA".to_vec(),
2027 &rot_details(&[b"p1"], b"wrap", ts_tka::SigKind::Direct),
2028 );
2029 t.add(
2030 b"cloneB".to_vec(),
2031 &rot_details(&[b"p2"], b"wrap", ts_tka::SigKind::Direct),
2032 );
2033 let obsolete = t.obsolete_keys();
2034 assert!(
2035 obsolete.contains(b"cloneA".as_slice()),
2036 "tied clone A dropped"
2037 );
2038 assert!(
2039 obsolete.contains(b"cloneB".as_slice()),
2040 "tied clone B dropped"
2041 );
2042 }
2043
2044 /// `Credential`-rooted chains sharing a wrapping key are EXEMPT from rule 2 (reusable-authkey
2045 /// carve-out): both are kept even with equal chain length.
2046 #[test]
2047 fn rotation_tracker_credential_root_clones_both_kept() {
2048 let mut t = RotationTracker::default();
2049 t.add(
2050 b"credA".to_vec(),
2051 &rot_details(&[b"p1"], b"wrap", ts_tka::SigKind::Credential),
2052 );
2053 t.add(
2054 b"credB".to_vec(),
2055 &rot_details(&[b"p2"], b"wrap", ts_tka::SigKind::Credential),
2056 );
2057 let obsolete = t.obsolete_keys();
2058 assert!(
2059 !obsolete.contains(b"credA".as_slice()),
2060 "credential-rooted clone A kept"
2061 );
2062 assert!(
2063 !obsolete.contains(b"credB".as_slice()),
2064 "credential-rooted clone B kept"
2065 );
2066 }
2067
2068 /// A peer that another chain already rotated away does not also act as a surviving clone: it is
2069 /// removed from its wrapping-key group before the longest-survivor pick (Go's `DeleteFunc`).
2070 #[test]
2071 fn rotation_tracker_already_obsolete_peer_not_a_survivor() {
2072 let mut t = RotationTracker::default();
2073 // "victim" is rotated away by "rotator" (different wrapping key), AND shares wrapping key
2074 // "w" with "other". Because "victim" is already obsolete, only "other" is in play for "w" and
2075 // survives (no spurious tie-drop of "other").
2076 t.add(
2077 b"rotator".to_vec(),
2078 &rot_details(&[b"victim"], b"wRot", ts_tka::SigKind::Direct),
2079 );
2080 t.add(
2081 b"victim".to_vec(),
2082 &rot_details(&[b"x"], b"w", ts_tka::SigKind::Direct),
2083 );
2084 t.add(
2085 b"other".to_vec(),
2086 &rot_details(&[b"y"], b"w", ts_tka::SigKind::Direct),
2087 );
2088 let obsolete = t.obsolete_keys();
2089 assert!(
2090 obsolete.contains(b"victim".as_slice()),
2091 "victim rotated away by rotator"
2092 );
2093 assert!(
2094 !obsolete.contains(b"other".as_slice()),
2095 "other survives — victim was removed from the group before the tie check"
2096 );
2097 }
2098
2099 /// Empty tracker (no rotation-signed peers) ⇒ no obsolete keys (the non-rotation netmap path).
2100 #[test]
2101 fn rotation_tracker_empty_is_noop() {
2102 let t = RotationTracker::default();
2103 assert!(t.obsolete_keys().is_empty());
2104 }
2105
2106 /// End-to-end through the real `Full` path: a peer presenting a freshly-rotated key (a Rotation
2107 /// chain) is admitted, while a second peer still presenting the rotated-AWAY pivot key — even with
2108 /// that key's own still-valid Direct signature — is DROPPED by the cross-peer rotation filter.
2109 /// This is the gap closed here: Go `tkaFilterNetmapLocked` drops the stale clone; we used to admit
2110 /// it. Uses real `ts_tka` signing (`sign_direct` + `sign_rotation`) so the whole
2111 /// verify → details → filter pipeline runs.
2112 ///
2113 /// Construction: the trusted key signs an inner `Direct` over the PIVOT keypair's public key; the
2114 /// pivot key then signs an outer `Rotation` authorizing `new_key`. That chain's `prev_node_keys`
2115 /// names the pivot pubkey — so a peer presenting the pivot pubkey as its node key is the
2116 /// rotated-away key the filter must drop.
2117 #[tokio::test]
2118 async fn tka_full_drops_rotated_away_key_e2e() {
2119 use ed25519_dalek::SigningKey;
2120 use ts_tka::NodeKeySignature;
2121
2122 let trusted = SigningKey::from_bytes(&[42u8; 32]);
2123 let trusted_pub = trusted.verifying_key().to_bytes().to_vec();
2124 let authority = Authority::from_state(
2125 AumHash([0; 32]),
2126 State {
2127 keys: vec![Key {
2128 kind: KeyKind::Ed25519,
2129 votes: 1,
2130 public: trusted_pub.clone(),
2131 }],
2132 },
2133 );
2134
2135 // The rotation pivot: a keypair whose public key the inner Direct authorizes and whose
2136 // private key signs the outer rotation wrap. This pivot pubkey IS the key being rotated away.
2137 let pivot = SigningKey::from_bytes(&[9u8; 32]);
2138 let pivot_pub: [u8; 32] = pivot.verifying_key().to_bytes();
2139
2140 let new_key = [4u8; 32]; // the freshly-rotated node key
2141
2142 // Fresh peer: a Rotation chain authorizing `new_key`, inner Direct over the pivot signed by
2143 // trusted, outer wrap signed by the pivot. Its prev_node_keys names `pivot_pub`.
2144 let new_sig = NodeKeySignature::sign_rotation(&new_key, &trusted, &pivot).serialize();
2145 let new_peer = peer_node("rotated", new_key, new_sig);
2146
2147 // Stale peer: still presents the pivot pubkey (the rotated-away key) with its own valid
2148 // Direct signature — valid in isolation, but obsoleted by the fresh peer's rotation chain.
2149 let stale_sig = NodeKeySignature::sign_direct(&pivot_pub, &trusted).serialize();
2150 let stale_peer = peer_node("stale", pivot_pub, stale_sig);
2151
2152 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
2153 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
2154 new_peer.clone(),
2155 stale_peer.clone(),
2156 ]));
2157
2158 assert!(
2159 tracker.peer_db.get(&new_peer.node_key).is_some(),
2160 "the freshly-rotated peer is admitted"
2161 );
2162 assert!(
2163 tracker.peer_db.get(&stale_peer.node_key).is_none(),
2164 "the peer presenting the rotated-away key is dropped (Go tkaFilterNetmapLocked)"
2165 );
2166 }
2167}