ts_runtime/peer_tracker/mod.rs
1//! Peer delta update tracking.
2
3use std::{
4 collections::{HashMap, HashSet},
5 net::IpAddr,
6 sync::Arc,
7};
8
9use kameo::{
10 actor::ActorRef,
11 message::{Context, Message},
12 reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24/// Actor that tracks peer delta updates and emits new states.
25pub struct PeerTracker {
26 peer_db: PeerDb,
27 seen_state_update: bool,
28 pending_requests: Vec<Pending>,
29 /// Latest peer snapshot, published on every netmap update so embedders can watch for peer
30 /// changes ([`WatchNetmap`]).
31 peer_watch: watch::Sender<Vec<StatusNode>>,
32 /// Accumulated netmap user profiles (`MapResponse.UserProfiles`), keyed by user id, joined
33 /// against a node's [`Node::user_id`](ts_control::Node::user_id) to resolve the owning user's
34 /// login/display name for a [`WhoIs`](crate::status::WhoIs). Control sends these incrementally
35 /// (only new/changed profiles per response), so this map **accumulates** across updates rather
36 /// than being replaced — a peer upserted in one response may reference a profile delivered in an
37 /// earlier one.
38 user_profiles: HashMap<UserId, UserProfile>,
39 /// Tailnet-Lock (TKA) authority enforced at the peer-trust chokepoint, matching Go
40 /// `tkaFilterNetmapLocked`. Read on demand from a [`watch`] cell the control runner owns: when it
41 /// holds `Some` (a verified lock has been synced from control), enforcement is **active** — every
42 /// upserted peer must present a `key_signature` this authority authorizes, or it is dropped
43 /// (fail-closed), exactly as Go drops peers with a missing or failing signature. When it holds
44 /// `None` (no lock, or the lock was disabled) enforcement is **inactive** and every peer is
45 /// upserted, identical to pre-TKA behavior and to Go's `b.tka == nil` early return.
46 ///
47 /// A `watch::Receiver` (not the bus) is the transport on purpose: the authority is a single
48 /// security-critical state cell, and `watch` is last-write-wins, never-dropped, and ordered by
49 /// the control runner's own writes — so a disable (`None`) can never be reordered behind or
50 /// silently dropped before a stale `Some` (which a best-effort broadcast bus could do, leaving a
51 /// defunct lock enforcing forever). The control runner is the sole writer; we only ever read.
52 ///
53 /// The authority always passes through `VerifiedAumChain::verify` before the control runner
54 /// publishes it, so enforcement only engages on a chain we have cryptographically verified.
55 /// Connectivity now depends on `ts_tka` verifying genuinely-good signatures correctly (see
56 /// SECURITY.md). Self is structurally never filtered here (the self node never enters `peer_db` —
57 /// it is routed to the control runner's `self_node` cell), so a node cannot lock itself out of
58 /// its own netmap.
59 tka_authority: watch::Receiver<Option<Arc<ts_tka::Authority>>>,
60 env: Env,
61}
62
63impl PeerTracker {
64 fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
65 // Canonicalization (case + trailing dot) is handled inside the name index lookup.
66 self.peer_db.get(&name).map(|(_id, node)| node)
67 }
68
69 fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
70 self.peer_db.get(&ip).map(|(_id, node)| node)
71 }
72
73 /// Build the peer entries for a [`Status`](crate::Status) snapshot from the current peer db.
74 ///
75 /// Connectivity fields (`cur_addr`/`relay`) are left at their `from_node` defaults (`None`) here:
76 /// this is the live-watch/hot path and must stay magicsock-free and synchronous. The explicit
77 /// [`GetStatus`] snapshot enriches them ([`status_peers_with_ids`](Self::status_peers_with_ids)).
78 fn status_peers(&self) -> Vec<StatusNode> {
79 self.peer_db
80 .peers()
81 .values()
82 .map(StatusNode::from_node)
83 .collect()
84 }
85
86 /// Like [`status_peers`](Self::status_peers) but pairs each entry with its [`PeerId`], so the
87 /// caller can join per-peer connectivity (the direct manager's `best_addrs`, keyed by `PeerId`)
88 /// onto the `StatusNode` before returning it. Order is unspecified (a `HashMap` walk).
89 fn status_peers_with_ids(&self) -> Vec<(PeerId, StatusNode)> {
90 self.peer_db
91 .peers()
92 .iter()
93 .map(|(id, node)| (*id, StatusNode::from_node(node)))
94 .collect()
95 }
96
97 fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
98 let ip = crate::status::whois_addr(addr);
99 let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
100 // Join the node's owning user id against the accumulated UserProfiles table to resolve a
101 // login/display name. `None` when control sent no profile for that user (e.g. tagged nodes
102 // with no human owner, or a profile not yet delivered).
103 let user = self.resolve_user(node.user_id);
104 Some(crate::status::WhoIs::from_node_with_user(node, user))
105 }
106
107 /// Resolve a user id to its best display label from the accumulated profile table.
108 fn resolve_user(&self, user_id: UserId) -> Option<String> {
109 self.user_profiles
110 .get(&user_id)
111 .and_then(UserProfile::best_label)
112 }
113
114 /// Whether `node` may be admitted to the peer db under Tailnet Lock, matching Go
115 /// `tkaFilterNetmapLocked`'s per-peer verdict (drop unsigned / failed-signature peers).
116 ///
117 /// This consults the live [`tka_authority`](Self::tka_authority) cell on each call (one `borrow`,
118 /// held only for the duration of the verdict). For a `Full` resync — which checks every peer —
119 /// prefer [`tka_authority_snapshot`](Self::tka_authority_snapshot) +
120 /// [`tka_snapshot_admits`](Self::tka_snapshot_admits) to borrow once and verify each peer a single
121 /// time; this method is the convenience wrapper for the single-peer (`Delta`/patch) sites.
122 ///
123 /// Fail-closed and gated:
124 /// - No authority ⇒ no lock synced ⇒ always admit (Go's `b.tka == nil` early return; identical to
125 /// pre-TKA behavior).
126 /// - **Empty trusted-key state** ⇒ always admit (logged at `error!` — see
127 /// [`tka_snapshot_admits`](Self::tka_snapshot_admits) for the full rationale).
128 /// - Authority present + peer carries a `key_signature` the authority authorizes for the peer's
129 /// node key ⇒ admit.
130 /// - Authority present + signature missing or unauthorized/invalid ⇒ **drop** (Go drops peers
131 /// with a missing signature or failed `NodeKeyAuthorized` under tailnet lock).
132 fn tka_admits(&self, node: &Node) -> bool {
133 Self::tka_snapshot_admits(self.tka_authority.borrow().as_deref(), node)
134 }
135
136 /// Borrow the current TKA authority once (cloning the cheap `Arc`) for a batch verdict. Returns
137 /// `None` when no lock is synced (admit-all). Used by the `Full` path so a netmap of N peers
138 /// reads the cell once and runs at most one signature verify per peer (not two).
139 fn tka_authority_snapshot(&self) -> Option<Arc<ts_tka::Authority>> {
140 self.tka_authority.borrow().clone()
141 }
142
143 /// The per-peer Tailnet-Lock verdict against an already-borrowed `authority` snapshot. Factored
144 /// out so both the single-peer [`tka_admits`](Self::tka_admits) and the `Full` batch path share
145 /// one verdict implementation (no divergence) while the batch path verifies each peer exactly
146 /// once.
147 ///
148 /// Never logs key/signature bytes — only the `stable_id` and the `TkaError` Display (static
149 /// descriptors). Known parity gaps vs Go (both *under*-enforcement, documented in PARITY_ROADMAP):
150 /// no `UnsignedPeerAPIOnly` exemption (our model lacks the field), and no cross-peer
151 /// rotation-obsolete dropping (a rotated-away but still-validly-signed key is admitted — see the
152 /// roadmap; closing it needs a details-returning verify + a whole-netmap rotation pass).
153 fn tka_snapshot_admits(authority: Option<&ts_tka::Authority>, node: &Node) -> bool {
154 let Some(auth) = authority else {
155 return true;
156 };
157
158 // Brick-guard: an authority with no trusted keys would drop every peer. A verified chain is
159 // structurally guaranteed ≥1 key (genesis rejects an empty key set, and the last key cannot
160 // be removed), so reaching here means a `ts_tka` invariant was violated — admit rather than
161 // black-hole the whole netmap, and log at `error!` because it signals a real bug, not an
162 // expected runtime input. This is OUR fail-safe, not a Go behavior. NOTE: it only catches the
163 // empty-keyset shape; a non-empty authority that authorizes none of the offered peers still
164 // (correctly) drops them — that is what a lock that revoked everyone means. The
165 // "authorized-zero-peers" isolation case is surfaced separately by the caller.
166 if auth.state().keys.is_empty() {
167 tracing::error!(
168 "TKA: authority has an empty trusted-key set (verified chains never do — likely a \
169 ts_tka bug); not enforcing (admitting all) to avoid isolating the node"
170 );
171 return true;
172 }
173
174 if node.key_signature.is_empty() {
175 tracing::warn!(
176 stable_id = ?node.stable_id,
177 "TKA: dropping unsigned peer under tailnet lock"
178 );
179 return false;
180 }
181
182 match auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
183 Ok(()) => {
184 tracing::debug!(stable_id = ?node.stable_id, "TKA: peer node-key authorized");
185 true
186 }
187 Err(e) => {
188 tracing::warn!(
189 stable_id = ?node.stable_id,
190 error = %e,
191 "TKA: dropping peer with unauthorized node key"
192 );
193 false
194 }
195 }
196 }
197}
198
199impl kameo::Actor for PeerTracker {
200 /// `(env, tka_authority)`: the bus/keys env, plus the read end of the control runner's TKA
201 /// enforcement-authority cell (Go `tkaFilterNetmapLocked`). The control runner is the sole
202 /// writer; it publishes the verified `Authority` after a successful `/machine/tka/sync` and
203 /// `None` when the lock is disabled. A `watch` cell (not a bus message) so the latest value is
204 /// always readable on demand, never dropped, and never reordered (see [`tka_authority`]).
205 type Args = (Env, watch::Receiver<Option<Arc<ts_tka::Authority>>>);
206 type Error = Error;
207
208 async fn on_start(
209 (env, tka_authority): Self::Args,
210 slf: ActorRef<Self>,
211 ) -> Result<Self, Self::Error> {
212 env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
213
214 let (peer_watch, _) = watch::channel(Vec::new());
215
216 Ok(Self {
217 peer_db: PeerDb::default(),
218 pending_requests: Default::default(),
219 seen_state_update: false,
220 peer_watch,
221 user_profiles: HashMap::new(),
222 // The cell starts `None` (no lock synced ⇒ enforcement inactive, admit all, matching
223 // Go's `b.tka == nil`); the control runner flips it to `Some` on the first sync.
224 tka_authority,
225 env,
226 })
227 }
228}
229
230enum Pending {
231 PeerByName(PeerByName, ReplySender<Option<Node>>),
232 AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
233 TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
234 Status(ReplySender<Vec<(PeerId, StatusNode)>>),
235 WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
236}
237
238// For messages with arguments, a struct is generated with the args as fields. They aren't
239// documented, and we can't apply attributes directly to the fields. Hence, wrap in a module where
240// docs are turned off everywhere.
241#[allow(missing_docs)]
242mod msg_impl {
243 use std::net::IpAddr;
244
245 use kameo::prelude::DelegatedReply;
246
247 use super::*;
248
249 #[kameo::messages]
250 impl PeerTracker {
251 /// Lookup a peer by name.
252 ///
253 /// Waits until we've received at least one peer update from control.
254 #[message(ctx)]
255 pub async fn peer_by_name(
256 &mut self,
257 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
258 name: String,
259 ) -> DelegatedReply<Option<Node>> {
260 let (deleg, sender) = ctx.reply_sender();
261 let Some(sender) = sender else { return deleg };
262
263 if !self.seen_state_update {
264 tracing::debug!(query = name, "no peer state seen yet, queueing request");
265
266 self.pending_requests
267 .push(Pending::PeerByName(PeerByName { name }, sender));
268
269 return deleg;
270 }
271
272 sender.send(self.peer_by_name_opt(&name).cloned());
273
274 deleg
275 }
276
277 /// Lookup all peers that accept packets addressed to the given IP.
278 ///
279 /// This includes the peer's tailnet address and any subnet routes it provides. Only
280 /// the peers with the most specific subnet route match that covers `ip` will be
281 /// returned.
282 ///
283 /// E.g., suppose:
284 ///
285 /// - We're querying for `10.1.2.3`
286 /// - `PeerA` and `PeerB` have accepted routes for `10.1.2.0/24`
287 /// - `PeerC` has an accepted route for `10.1.0.0/16`
288 ///
289 /// Only `PeerA` and `PeerB` will be returned, since they have the most specific
290 /// prefix match.
291 #[message(ctx)]
292 pub fn peer_by_accepted_route(
293 &mut self,
294 ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
295 ip: IpAddr,
296 ) -> DelegatedReply<Vec<Node>> {
297 let (deleg, sender) = ctx.reply_sender();
298 let Some(sender) = sender else { return deleg };
299
300 if !self.seen_state_update {
301 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
302
303 self.pending_requests
304 .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
305
306 return deleg;
307 }
308
309 sender.send(
310 self.peer_db
311 .get_route(ip.into())
312 .map(|(_id, node)| node.clone())
313 .collect(),
314 );
315
316 deleg
317 }
318
319 /// Lookup the peer that has the given tailnet IP address.
320 #[message(ctx)]
321 pub fn peer_by_tailnet_ip(
322 &mut self,
323 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
324 ip: IpAddr,
325 ) -> DelegatedReply<Option<Node>> {
326 let (deleg, sender) = ctx.reply_sender();
327 let Some(sender) = sender else { return deleg };
328
329 if !self.seen_state_update {
330 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
331
332 self.pending_requests
333 .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
334
335 return deleg;
336 }
337
338 sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
339
340 deleg
341 }
342
343 /// Build the peer entries of a [`Status`](crate::Status) snapshot, each paired with its
344 /// [`PeerId`] so [`Runtime::status`](crate::Runtime::status) can join per-peer connectivity
345 /// (`cur_addr`/`relay`) from the direct manager before returning. The self node is *not*
346 /// included here (it lives in the control runner); `Runtime::status` combines both and drops
347 /// the ids.
348 ///
349 /// Waits until we've received at least one peer update from control.
350 #[message(ctx)]
351 pub fn get_status(
352 &mut self,
353 ctx: &mut Context<Self, DelegatedReply<Vec<(PeerId, StatusNode)>>>,
354 ) -> DelegatedReply<Vec<(PeerId, StatusNode)>> {
355 let (deleg, sender) = ctx.reply_sender();
356 let Some(sender) = sender else { return deleg };
357
358 if !self.seen_state_update {
359 tracing::debug!("no peer state seen yet, queueing status request");
360 self.pending_requests.push(Pending::Status(sender));
361 return deleg;
362 }
363
364 sender.send(self.status_peers_with_ids());
365
366 deleg
367 }
368
369 /// Return every known peer's full domain [`Node`] (not the lossy [`StatusNode`]).
370 ///
371 /// Used by [`Runtime::file_targets`](crate::Runtime::file_targets), which needs the full node
372 /// (peerAPI address, owning user id, cap map) to compute Taildrop send targets. The self node
373 /// is not included (it lives in the control runner). Returns empty before the first netmap —
374 /// the natural "not connected yet" analog (an immediate answer, no queueing needed: callers
375 /// that need a populated list await `Running` first).
376 #[message]
377 pub fn all_peers(&self) -> Vec<Node> {
378 self.peer_db.peers().values().cloned().collect()
379 }
380
381 /// Resolve which node owns a tailnet source address.
382 ///
383 /// Maps the source IP of `addr` to the owning node via the tailnet-IP index, returning a
384 /// [`WhoIs`](crate::WhoIs). The port is ignored (a tailnet IP uniquely identifies a node).
385 ///
386 /// The resulting [`WhoIs`](crate::WhoIs) carries no user/login or capability data: this
387 /// fork's domain [`Node`](ts_control::Node) does not retain those wire fields. See the
388 /// [`status`](crate::status) module docs for the gap.
389 ///
390 /// Waits until we've received at least one peer update from control.
391 #[message(ctx)]
392 pub fn whois(
393 &mut self,
394 ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
395 addr: std::net::SocketAddr,
396 ) -> DelegatedReply<Option<crate::status::WhoIs>> {
397 let (deleg, sender) = ctx.reply_sender();
398 let Some(sender) = sender else { return deleg };
399
400 if !self.seen_state_update {
401 tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
402 self.pending_requests
403 .push(Pending::WhoIs(Whois { addr }, sender));
404 return deleg;
405 }
406
407 sender.send(self.whois_opt(addr));
408
409 deleg
410 }
411
412 /// Subscribe to netmap peer-change events.
413 ///
414 /// Returns a [`watch::Receiver`] whose value is the current set of peer
415 /// [`StatusNode`]s, updated on every netmap state update from control. Embedders can await
416 /// changes via [`watch::Receiver::changed`] to react to peers joining, leaving, or changing.
417 ///
418 /// The receiver's initial value is the peer set at subscription time (empty before the
419 /// first netmap update). This is a peer-only view; combine with the self node from
420 /// [`Runtime::status`](crate::Runtime::status) when a full snapshot is needed.
421 #[message(derive(Clone))]
422 pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
423 self.peer_watch.subscribe()
424 }
425 }
426}
427
428pub use msg_impl::*;
429
430#[derive(Debug, Clone)]
431pub(crate) struct PeerState {
432 #[allow(unused)]
433 pub deletions: HashSet<PeerId>,
434 #[allow(unused)]
435 pub upserts: HashSet<PeerId>,
436 pub peers: Arc<PeerDb>,
437}
438
439impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
440 type Reply = ();
441
442 async fn handle(
443 &mut self,
444 msg: Arc<ts_control::StateUpdate>,
445 _ctx: &mut Context<Self, Self::Reply>,
446 ) {
447 // Accumulate user profiles first — control sends them incrementally and a response may
448 // carry profiles with no peer delta (or peers that reference a profile from an earlier
449 // response), so this must happen before the no-peer-update early return below.
450 for profile in &msg.user_profiles {
451 self.user_profiles.insert(profile.id, profile.clone());
452 }
453
454 // Apply the standalone online/last-seen delta maps (channels C/D, `MapResponse.OnlineChange`
455 // / `PeerSeenChange`). These arrive keyed by control node id and may ride a response that
456 // carries NO `peer_update` (a bare online flip is the common case), so they must be applied
457 // *before* the no-peer-update early return — otherwise online status freezes at the last
458 // full-node/patch value. Each entry only ever *sets* a value (never back to unknown).
459 // Wall clock for a `PeerSeenChange: true` (Go uses `clock.Now()`). chrono is built without
460 // its `clock` feature in this workspace, so derive it from `SystemTime` the same way the
461 // control runner / ssh-policy paths do (unix secs → `DateTime::from_timestamp`).
462 let now = std::time::SystemTime::now()
463 .duration_since(std::time::UNIX_EPOCH)
464 .ok()
465 .and_then(|d| chrono::DateTime::from_timestamp(d.as_secs() as i64, d.subsec_nanos()))
466 .unwrap_or_default();
467 let liveness_changed =
468 self.apply_liveness_changes(&msg.online_change, &msg.peer_seen_change, now);
469
470 if msg.peer_update.is_none() && msg.peer_patches.is_empty() {
471 // No peer set or patch this response. If a liveness delta still mutated the netmap,
472 // publish the refreshed snapshot so watchers (and `GetStatus`) see the new online state.
473 if liveness_changed {
474 self.service_pending_requests();
475 self.peer_watch.send_replace(self.status_peers());
476 if let Err(e) = self
477 .env
478 .publish(Arc::new(PeerState {
479 upserts: HashSet::default(),
480 deletions: HashSet::default(),
481 peers: Arc::new(self.peer_db.clone()),
482 }))
483 .await
484 {
485 tracing::error!(error = %e, "publishing liveness-only peer state update");
486 }
487 }
488 return;
489 }
490
491 // Apply the whole-node peer set (if any) FIRST, then the field-level patches on top —
492 // mirroring Go's `controlclient` order (`Peers*` then `PeersChangedPatch`). A response may
493 // carry either, both, or (with a liveness-only delta) neither. Merge the upsert/deletion sets
494 // so the published `PeerState` reflects every node touched by both passes; a node both
495 // upserted by the set and patched stays in `upserts` (the patch removes it from `deletions`).
496 let (mut upserts, mut deletions) = msg
497 .peer_update
498 .as_ref()
499 .map(|u| self.apply_peer_update(u))
500 .unwrap_or_default();
501
502 if !msg.peer_patches.is_empty() {
503 let (patch_upserts, patch_deletions) = self.apply_peer_patches(&msg.peer_patches);
504 // A patch can evict a node the set just upserted (TKA rejection after key rotation), or
505 // re-admit/patch one not in the set — reconcile so each id lands in exactly one set.
506 for id in &patch_upserts {
507 deletions.remove(id);
508 }
509 for id in &patch_deletions {
510 upserts.remove(id);
511 }
512 upserts.extend(patch_upserts);
513 deletions.extend(patch_deletions);
514 }
515
516 tracing::debug!(
517 n_upsert = upserts.len(),
518 n_delete = deletions.len(),
519 peer_count = self.peer_db.peers().len(),
520 "new peer state"
521 );
522
523 self.service_pending_requests();
524
525 // Publish the latest peer snapshot to netmap watchers. `send_replace` keeps the receiver's
526 // value current even when there are no subscribers, so a late subscriber sees fresh state.
527 self.peer_watch.send_replace(self.status_peers());
528
529 if let Err(e) = self
530 .env
531 .publish(Arc::new(PeerState {
532 upserts,
533 deletions,
534 peers: Arc::new(self.peer_db.clone()),
535 }))
536 .await
537 {
538 tracing::error!(error = %e, "publishing peer state update");
539 }
540 }
541}
542
543/// Ask the peer tracker to re-broadcast its current peer snapshot on the bus, without any peer
544/// change. Sent after a runtime preference change so the route updater and source filter (both
545/// `Arc<PeerState>` subscribers) re-resolve against the new value immediately, rather than waiting
546/// for the next netmap update: `Device::set_exit_node` (new exit-node selector) and
547/// `Device::set_accept_routes` (new accept-routes flag) both send it.
548#[derive(Debug, Clone, Copy)]
549pub struct RepublishState;
550
551impl Message<RepublishState> for PeerTracker {
552 type Reply = ();
553
554 async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
555 // An empty upsert/deletion set: this is a re-broadcast of the unchanged peer set, not a
556 // delta. Subscribers recompute their routes/filters against the current peers and the
557 // (just-updated) runtime preferences (exit-node selector, accept-routes flag).
558 if let Err(e) = self
559 .env
560 .publish(Arc::new(PeerState {
561 upserts: HashSet::default(),
562 deletions: HashSet::default(),
563 peers: Arc::new(self.peer_db.clone()),
564 }))
565 .await
566 {
567 tracing::error!(error = %e, "re-publishing peer state after a runtime preference change");
568 }
569 }
570}
571
572impl PeerTracker {
573 /// Apply a single [`PeerUpdate`](ts_control::PeerUpdate) to the peer db, enforcing the
574 /// Tailnet-Lock peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) at every upsert site.
575 ///
576 /// This is the **single source of truth** for the peer-trust enforcement loop: the actor's
577 /// netmap [`handle`](Message::handle) calls it, and so do the TKA enforcement tests, so the two
578 /// real upsert sites (`Full` and `Delta { upsert }`) cannot diverge from what is tested.
579 ///
580 /// Returns `(upserts, deletions)` — the [`PeerId`]s touched — for downstream bookkeeping.
581 fn apply_peer_update(
582 &mut self,
583 peer_update: &ts_control::PeerUpdate,
584 ) -> (HashSet<PeerId>, HashSet<PeerId>) {
585 let mut upserts = HashSet::default();
586 let mut deletions = HashSet::default();
587
588 match peer_update {
589 ts_control::PeerUpdate::Full(new_nodes) => {
590 tracing::trace!("full peer update");
591
592 // Borrow the authority ONCE for the whole batch and verify each peer EXACTLY once
593 // (Go runs `tkaFilterNetmapLocked` once over the assembled netmap; an earlier draft
594 // verified every peer twice — once for `retained_ids`, once in the upsert loop —
595 // doubling the ed25519 cost on the hot resync path). The per-node verdict vector
596 // `admits` is computed once and drives both the `retain` (evict revoked peers, keyed
597 // by stable_id) and the upsert loop (skip rejected peers, by the node's OWN verdict).
598 // Keeping a per-node verdict (not just a stable_id set) means a node whose own
599 // signature fails is never admitted on the strength of a different node that happens
600 // to share its stable_id — matching the old per-node re-verify for that degenerate
601 // (malformed-control) input.
602 //
603 // Revocation evicts: a peer re-included with a now-invalid/missing signature under an
604 // active authority fails its verdict, so it is excluded from `retained_ids` and
605 // `retain` drops the stale (previously-admitted) entry. With no authority the snapshot
606 // is `None`, so every node passes — byte-for-byte the pre-TKA behavior (no regression).
607 let authority = self.tka_authority_snapshot();
608 let admits = new_nodes
609 .iter()
610 .map(|node| Self::tka_snapshot_admits(authority.as_deref(), node))
611 .collect::<Vec<_>>();
612 let retained_ids = new_nodes
613 .iter()
614 .zip(admits.iter().copied())
615 .filter(|(_, ok)| *ok)
616 .map(|(node, _)| &node.stable_id)
617 .collect::<HashSet<_>>();
618
619 // Isolation diagnostic: an ACTIVE lock that authorized none of the offered peers
620 // leaves this node with no peers — surface it loudly so a self-lockout (vs an attack)
621 // is diagnosable. `authority.is_some()` means a real keyed lock (the empty-keyset
622 // brick-guard admits-all, so it never reaches here with zero retained).
623 if authority.is_some() && !new_nodes.is_empty() && retained_ids.is_empty() {
624 tracing::error!(
625 offered = new_nodes.len(),
626 "TKA: active lock authorized ZERO of the offered peers; node is isolated \
627 (verify the lock state, or disable tailnet lock to recover)"
628 );
629 }
630
631 self.peer_db.retain(|id, peer| {
632 let retain = retained_ids.contains(&peer.stable_id);
633
634 if !retain {
635 deletions.insert(id);
636 }
637
638 retain
639 });
640
641 for (node, ok) in new_nodes.iter().zip(admits.iter().copied()) {
642 if !ok {
643 continue; // fail-CLOSED: peer rejected by tailnet lock (verified once, above)
644 }
645 let peer_id = self.peer_db.upsert(node);
646 upserts.insert(peer_id);
647 }
648 }
649
650 ts_control::PeerUpdate::Delta { remove, upsert } => {
651 tracing::trace!("delta peer update");
652
653 for peer in upsert {
654 if !self.tka_admits(peer) {
655 // fail-CLOSED: do not upsert a peer rejected by tailnet lock. If the peer is
656 // ALREADY in the db (a delta re-upserting an existing peer whose signature is
657 // now invalid — e.g. revoked between syncs), evict the stale entry rather than
658 // leaving an unverified peer admitted; Go re-filters the whole netmap each map
659 // response, so a now-unsigned peer would not survive there either.
660 if let Some((id, _)) = self.peer_db.remove(&peer.stable_id) {
661 tracing::warn!(
662 stable_id = ?peer.stable_id,
663 "TKA: delta re-upsert rejected; evicting now-unauthorized peer"
664 );
665 deletions.insert(id);
666 }
667 continue;
668 }
669 let id = self.peer_db.upsert(peer);
670
671 upserts.insert(id);
672 }
673
674 for peer in remove {
675 let Some((id, _node)) = self.peer_db.remove(peer) else {
676 tracing::error!(control_node_id = peer, "removed peer was unknown");
677 continue;
678 };
679
680 deletions.insert(id);
681 }
682 }
683 }
684
685 (upserts, deletions)
686 }
687
688 /// Apply field-level peer patches (`MapResponse.PeersChangedPatch`), returning the upserted /
689 /// deleted [`PeerId`]s.
690 ///
691 /// This is a SEPARATE channel from [`apply_peer_update`](Self::apply_peer_update): Go's
692 /// `controlclient` applies the whole-node `Peers*` set first and then `PeersChangedPatch`, so a
693 /// response that carries both has the peer set applied first (by the caller) and these patches
694 /// applied second, on top of the freshly-synced nodes. A patch only mutates a peer already in the
695 /// netmap; an unknown node id is ignored (the wire contract — a patch never creates a node).
696 fn apply_peer_patches(
697 &mut self,
698 patches: &[ts_control::PeerChange],
699 ) -> (HashSet<PeerId>, HashSet<PeerId>) {
700 let mut upserts = HashSet::default();
701 let mut deletions = HashSet::default();
702
703 tracing::trace!(n = patches.len(), "peer patch update");
704
705 for patch in patches {
706 // Clone the current node, apply the present fields, and re-upsert through the same path
707 // as a delta so indexes/routes stay consistent.
708 let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
709 tracing::debug!(
710 control_node_id = patch.id,
711 "peer patch for unknown node; ignoring"
712 );
713 continue;
714 };
715
716 let mut node = existing.clone();
717 if let Some(endpoints) = &patch.underlay_addresses {
718 node.underlay_addresses = endpoints.clone();
719 }
720 if let Some(derp) = patch.derp_region {
721 node.derp_region = Some(derp);
722 }
723 if let Some(cap) = patch.cap {
724 node.cap = cap;
725 }
726 if let Some(cap_map) = &patch.cap_map {
727 node.cap_map = cap_map.clone();
728 }
729 if let Some(disco_key) = patch.disco_key {
730 node.disco_key = Some(disco_key);
731 }
732 if let Some(expiry) = patch.node_key_expiry {
733 node.node_key_expiry = Some(expiry);
734 }
735 // Online/last-seen liveness deltas (`PeerChange.Online`/`LastSeen`) — the dominant
736 // channel by which peer online transitions arrive mid-session. A patch only ever *sets*
737 // a value (never patches back to unknown), so apply when present.
738 if let Some(online) = patch.online {
739 node.online = Some(online);
740 }
741 if let Some(last_seen) = patch.last_seen {
742 node.last_seen = Some(last_seen);
743 }
744 // Key rotation: a patch may swap the node key (and its TKA signature). Apply both
745 // together so the trust gate below verifies the new signature against the new key, never
746 // a mismatched pair.
747 if let Some(node_key) = patch.node_key {
748 node.node_key = node_key;
749 }
750 if let Some(sig) = &patch.key_signature {
751 node.key_signature = sig.clone();
752 }
753
754 // Re-run the tailnet-lock gate on the patched node: a patch that rotates the key must
755 // satisfy the active authority, exactly like a `Delta` upsert, or it would be a
756 // trust-enforcement bypass. fail-CLOSED — if the patched node is no longer admitted,
757 // evict it rather than keep the stale (now-unverified) entry.
758 if !self.tka_admits(&node) {
759 if let Some((id, _)) = self.peer_db.remove(&patch.id) {
760 tracing::warn!(
761 control_node_id = patch.id,
762 "peer patch rejected by tailnet lock; evicting peer"
763 );
764 deletions.insert(id);
765 }
766 continue;
767 }
768
769 let id = self.peer_db.upsert(&node);
770 upserts.insert(id);
771 }
772
773 (upserts, deletions)
774 }
775
776 /// Apply the standalone online/last-seen delta maps (`MapResponse.OnlineChange` /
777 /// `PeerSeenChange`, channels C/D) onto the retained netmap. Returns `true` if any node was
778 /// actually mutated (so the caller knows whether to re-publish).
779 ///
780 /// Mirrors Go `controlclient/map.go:updatePeersStateFromResponse` (the two channels are
781 /// semantically DISTINCT and must not be conflated):
782 /// - `OnlineChange` (channel C) is the sole driver of a peer's `online` flag (`mut.Online = v`).
783 /// - `PeerSeenChange` (channel D) is the sole driver of `last_seen`: `true ⇒ LastSeen = now`,
784 /// `false ⇒ LastSeen = nil` (cleared). It NEVER touches `online` — "not seen recently" is not
785 /// the same as "offline", which only `OnlineChange` asserts.
786 ///
787 /// Each entry is keyed by control node id and applies to a peer already in the netmap; an unknown
788 /// node id is ignored (these maps never create a node). `now` is the wall-clock timestamp for a
789 /// `PeerSeenChange: true` (Go uses `clock.Now()`); the caller passes it so this stays a pure
790 /// function of its inputs. Returns `true` if any node was actually mutated.
791 fn apply_liveness_changes(
792 &mut self,
793 online_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
794 peer_seen_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
795 now: chrono::DateTime<chrono::Utc>,
796 ) -> bool {
797 let mut changed = false;
798
799 // Channel C — direct online flips (the only writer of `online`).
800 for (&node_id, &online) in online_change {
801 if let Some((_pid, existing)) = self.peer_db.get(&node_id)
802 && existing.online != Some(online)
803 {
804 let mut node = existing.clone();
805 node.online = Some(online);
806 self.peer_db.upsert(&node);
807 changed = true;
808 }
809 }
810
811 // Channel D — peer-seen flips (the only writer of `last_seen`; never touches `online`).
812 // `true` ⇒ last-seen is now; `false` ⇒ last-seen cleared (Go map.go:820-830).
813 for (&node_id, &seen) in peer_seen_change {
814 let new_last_seen = if seen { Some(now) } else { None };
815 if let Some((_pid, existing)) = self.peer_db.get(&node_id)
816 && existing.last_seen != new_last_seen
817 {
818 let mut node = existing.clone();
819 node.last_seen = new_last_seen;
820 self.peer_db.upsert(&node);
821 changed = true;
822 }
823 }
824
825 changed
826 }
827
828 /// Test-only constructor: build a [`PeerTracker`] with a chosen initial TKA authority without
829 /// going through the actor `on_start` path. Returns the tracker plus the **`watch::Sender`** for
830 /// its enforcement-authority cell, so a test can drive the exact enable/disable transitions the
831 /// control runner drives at runtime (`tx.send_replace(Some(..))` ⇒ enforce, `tx.send_replace(None)`
832 /// ⇒ clear). The initial `Some` exercises the fail-closed chokepoint
833 /// ([`tka_admits`](Self::tka_admits)); `None` is the no-lock admit-all path. The returned sender
834 /// must be kept alive for the tracker to read updated values.
835 #[cfg(test)]
836 fn for_test(
837 env: Env,
838 tka_authority: Option<ts_tka::Authority>,
839 ) -> (Self, watch::Sender<Option<Arc<ts_tka::Authority>>>) {
840 let (peer_watch, _) = watch::channel(Vec::new());
841 let (tka_tx, tka_rx) = watch::channel(tka_authority.map(Arc::new));
842 let tracker = Self {
843 peer_db: PeerDb::default(),
844 seen_state_update: false,
845 pending_requests: Vec::new(),
846 peer_watch,
847 user_profiles: HashMap::new(),
848 tka_authority: tka_rx,
849 env,
850 };
851 (tracker, tka_tx)
852 }
853
854 fn service_pending_requests(&mut self) {
855 if self.seen_state_update {
856 return;
857 }
858
859 self.seen_state_update = true;
860
861 if !self.pending_requests.is_empty() {
862 tracing::debug!(
863 n_pending = self.pending_requests.len(),
864 "state update received, servicing pending requests"
865 );
866 }
867
868 for req in core::mem::take(&mut self.pending_requests) {
869 match req {
870 Pending::PeerByName(PeerByName { name }, reply) => {
871 reply.send(self.peer_by_name_opt(&name).cloned());
872 }
873 Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
874 reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
875 }
876 Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
877 reply.send(
878 self.peer_db
879 .get_route(ip.into())
880 .map(|(_id, node)| node.clone())
881 .collect(),
882 );
883 }
884 Pending::Status(reply) => {
885 reply.send(self.status_peers_with_ids());
886 }
887 Pending::WhoIs(Whois { addr }, reply) => {
888 reply.send(self.whois_opt(addr));
889 }
890 }
891 }
892 }
893}
894
895#[cfg(test)]
896mod tka_tests {
897 //! Tailnet-Lock (TKA) enforcement tests for the peer-trust chokepoint.
898 //!
899 //! These exercise [`PeerTracker::tka_admits`] and the `tka_admits ⇒ upsert` loop the netmap
900 //! handler runs. The test [`ts_tka::Authority`] is built with [`ts_tka::Authority::from_state`]
901 //! over a known Ed25519 trusted key, and the signed node-key signature CBOR is produced through
902 //! `ts_tka`'s public `cbor` encoder + `aum_hash` (the exact same canonical bytes `ts_tka`'s own
903 //! `direct_signature_verifies_end_to_end` test signs, with no new crypto vectors invented and no
904 //! private `ts_tka` API used).
905
906 use ed25519_dalek::{Signer, SigningKey};
907 use ts_control::{Node, StableNodeId, TailnetAddress};
908 use ts_tka::{
909 AumHash, Authority, Key, KeyKind, State,
910 cbor::{self, Value},
911 };
912
913 use super::*;
914
915 /// `SigKind::Direct` wire value (Go `SigKind`; `ts_tka::SigKind::Direct = 1`).
916 const SIG_KIND_DIRECT: u64 = 1;
917
918 /// The 32-byte node key used across the signed-peer fixtures.
919 const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
920
921 /// Build a real [`Env`] for the tracker. Only the bus/keys/shutdown plumbing matters here; the
922 /// TKA gate reads neither, so the forwarding preferences are all benign defaults.
923 fn test_env() -> Env {
924 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
925 Env::new(
926 ts_keys::NodeState::generate(),
927 shutdown_rx,
928 crate::env::ForwarderConfig {
929 accept_routes: false,
930 accept_dns: true,
931 exit_node: None,
932 forward_routes: Vec::new(),
933 forward_tcp_ports: Vec::new(),
934 forward_udp_ports: Vec::new(),
935 forward_all_ports: false,
936 forward_exit_egress: false,
937 block_incoming: false,
938 exit_proxy: None,
939 peerapi_port: None,
940 taildrop_dir: None,
941 enable_ipv6: false,
942 persistent_keepalive_interval: None,
943 ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
944 },
945 )
946 }
947
948 /// A minimal peer [`Node`] carrying `node_key` and the given `key_signature`.
949 fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
950 Node {
951 id: 1,
952 stable_id: StableNodeId(stable_id.to_string()),
953 hostname: stable_id.to_string(),
954 user_id: 0,
955 tailnet: Some("ts.net".to_string()),
956 tags: Vec::new(),
957 tailnet_address: TailnetAddress {
958 ipv4: "100.64.0.1/32".parse().unwrap(),
959 ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
960 },
961 node_key: node_key.into(),
962 node_key_expiry: None,
963 online: None,
964 last_seen: None,
965 key_signature,
966 machine_key: None,
967 disco_key: None,
968 accepted_routes: Vec::new(),
969 underlay_addresses: Vec::new(),
970 derp_region: None,
971 cap: Default::default(),
972 cap_map: Default::default(),
973 peerapi_port: None,
974 peerapi_dns_proxy: false,
975 is_wireguard_only: false,
976 exit_node_dns_resolvers: Vec::new(),
977 peer_relay: false,
978 service_vips: Default::default(),
979 }
980 }
981
982 /// Encode a `Direct` [`ts_tka::NodeKeySignature`] CBOR exactly as `ts_tka`'s private `to_cbor`
983 /// does (int-map keys: 1=kind, 2=pubkey, 3=key_id, 4=signature; empty byte fields omitted),
984 /// using only the crate's *public* `cbor` encoder. `signature` of `None` produces the
985 /// signing-digest preimage (the `SigHash` form).
986 fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
987 let mut pairs = alloc_pairs(node_key, key_id);
988 if let Some(sig) = signature {
989 pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
990 }
991 cbor::int_map(pairs).to_vec()
992 }
993
994 fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
995 vec![
996 (1, Some(Value::Uint(SIG_KIND_DIRECT))),
997 (2, Some(Value::Bytes(node_key.to_vec()))),
998 (3, Some(Value::Bytes(key_id.to_vec()))),
999 ]
1000 }
1001
1002 /// Build a TKA [`Authority`] that trusts `signing.verifying_key()`, plus a valid `Direct`
1003 /// node-key signature CBOR authorizing [`NODE_KEY_BYTES`] under it.
1004 fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
1005 // A fixed, known Ed25519 trusted key (mirrors ts_tka's own end-to-end test seed).
1006 let signing = SigningKey::from_bytes(&[42u8; 32]);
1007 let trusted_pub = signing.verifying_key().to_bytes().to_vec();
1008
1009 let authority = Authority::from_state(
1010 AumHash([0; 32]),
1011 State {
1012 keys: vec![Key {
1013 kind: KeyKind::Ed25519,
1014 votes: 1,
1015 public: trusted_pub.clone(),
1016 }],
1017 },
1018 );
1019
1020 // SigHash preimage = canonical CBOR with the signature field omitted; sign its blake2s hash.
1021 let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
1022 let sig_hash = ts_tka::aum_hash(&preimage).0;
1023 let signature = signing.sign(&sig_hash).to_bytes().to_vec();
1024
1025 let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
1026 // Sanity: the authority accepts the signature we just built (same path the gate uses).
1027 assert!(
1028 authority
1029 .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
1030 .is_ok()
1031 );
1032
1033 (authority, signed_cbor)
1034 }
1035
1036 #[tokio::test]
1037 async fn tka_inactive_upserts_all_peers() {
1038 // No authority ⇒ enforcement inactive ⇒ both a signed and an unsigned peer are admitted.
1039 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1040
1041 let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
1042 let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
1043
1044 assert!(tracker.tka_admits(&signed));
1045 assert!(tracker.tka_admits(&unsigned));
1046
1047 tracker.peer_db.upsert(&signed);
1048 tracker.peer_db.upsert(&unsigned);
1049 assert_eq!(tracker.peer_db.peers().len(), 2);
1050 }
1051
1052 #[tokio::test]
1053 async fn tka_active_rejects_unsigned_peer() {
1054 // Authority present + peer presents no signature ⇒ rejected (fail-closed), not in peer_db.
1055 let (authority, _sig) = authority_and_valid_sig();
1056 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1057
1058 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1059 assert!(!tracker.tka_admits(&unsigned));
1060
1061 // Mirror the handler's `if !tka_admits { continue }` loop.
1062 if tracker.tka_admits(&unsigned) {
1063 tracker.peer_db.upsert(&unsigned);
1064 }
1065 assert_eq!(tracker.peer_db.peers().len(), 0);
1066 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1067 }
1068
1069 #[tokio::test]
1070 async fn tka_active_rejects_bad_signature() {
1071 // Authority present + a signature that fails to verify ⇒ rejected, not in peer_db.
1072 let (authority, mut sig) = authority_and_valid_sig();
1073 // Tamper the last byte (the trailing signature byte) so verification fails.
1074 let last = sig.len() - 1;
1075 sig[last] ^= 0xff;
1076
1077 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1078 let bad = peer_node("bad", NODE_KEY_BYTES, sig);
1079 assert!(!tracker.tka_admits(&bad));
1080
1081 if tracker.tka_admits(&bad) {
1082 tracker.peer_db.upsert(&bad);
1083 }
1084 assert_eq!(tracker.peer_db.peers().len(), 0);
1085 }
1086
1087 #[tokio::test]
1088 async fn tka_active_admits_authorized_peer() {
1089 // Authority present + correctly-signed node key ⇒ admitted and upserted.
1090 let (authority, sig) = authority_and_valid_sig();
1091 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1092
1093 let good = peer_node("good", NODE_KEY_BYTES, sig);
1094 assert!(tracker.tka_admits(&good));
1095
1096 if tracker.tka_admits(&good) {
1097 tracker.peer_db.upsert(&good);
1098 }
1099 assert_eq!(tracker.peer_db.peers().len(), 1);
1100 assert!(tracker.peer_db.get(&good.node_key).is_some());
1101 }
1102
1103 // ---------------------------------------------------------------------------------------------
1104 // Tests that drive REAL `PeerUpdate`s through the shared handler body
1105 // ([`PeerTracker::apply_peer_update`], the single source of truth the actor's netmap `handle`
1106 // also calls), so the two real upsert sites (`Full` and `Delta { upsert }`) are exercised via
1107 // the actual enforcement path — not by hand-mirroring `if !tka_admits { continue }`.
1108 // ---------------------------------------------------------------------------------------------
1109
1110 #[tokio::test]
1111 async fn tka_active_delta_upsert_rejects_unauthorized() {
1112 // Drive a real `Delta { upsert }` whose peer carries no signature. The Delta upsert site
1113 // must reject it under an active authority ⇒ not present in peer_db after the handler runs.
1114 let (authority, _sig) = authority_and_valid_sig();
1115 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1116
1117 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1118 let update = ts_control::PeerUpdate::Delta {
1119 upsert: vec![unsigned.clone()],
1120 remove: Vec::new(),
1121 };
1122
1123 tracker.apply_peer_update(&update);
1124
1125 assert_eq!(tracker.peer_db.peers().len(), 0);
1126 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1127 }
1128
1129 #[tokio::test]
1130 async fn tka_active_delta_upsert_admits_authorized() {
1131 // Drive a real `Delta { upsert }` with a correctly-signed peer ⇒ present in peer_db.
1132 let (authority, sig) = authority_and_valid_sig();
1133 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1134
1135 let good = peer_node("good", NODE_KEY_BYTES, sig);
1136 let update = ts_control::PeerUpdate::Delta {
1137 upsert: vec![good.clone()],
1138 remove: Vec::new(),
1139 };
1140
1141 tracker.apply_peer_update(&update);
1142
1143 assert_eq!(tracker.peer_db.peers().len(), 1);
1144 assert!(tracker.peer_db.get(&good.node_key).is_some());
1145 }
1146
1147 #[tokio::test]
1148 async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
1149 // Drive a real `Full` carrying a MIX of authorized + unauthorized peers. Only the
1150 // correctly-signed peer survives the Full upsert site; the unsigned and bad-sig peers are
1151 // dropped fail-closed.
1152 let (authority, sig) = authority_and_valid_sig();
1153 // A bad-sig variant of the same authorized signature (tamper the trailing byte).
1154 let mut bad_sig = sig.clone();
1155 let last = bad_sig.len() - 1;
1156 bad_sig[last] ^= 0xff;
1157
1158 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1159
1160 // Only the authorized peer carries NODE_KEY_BYTES (the key the authority signed); the
1161 // rejected peers use distinct node keys so the survivor is unambiguous.
1162 let good = peer_node("good", NODE_KEY_BYTES, sig);
1163 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1164 let bad = peer_node("bad", [9u8; 32], bad_sig);
1165
1166 let update =
1167 ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1168
1169 tracker.apply_peer_update(&update);
1170
1171 assert_eq!(tracker.peer_db.peers().len(), 1);
1172 assert!(tracker.peer_db.get(&good.node_key).is_some());
1173 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1174 assert!(tracker.peer_db.get(&bad.node_key).is_none());
1175 }
1176
1177 /// End-to-end through the REAL enforcement-authority transport (the `watch` cell the control
1178 /// runner writes), not a direct field poke: writing `Some(authority)` flips enforcement on so a
1179 /// mixed batch drops the unsigned/bad peers, and a subsequent `None` (lock disabled) clears
1180 /// enforcement so a peer DROPPED while enforced is re-admitted. Exercises the exact `borrow`-based
1181 /// read path `tka_admits` uses — a broken receiver wiring would pass every for_test-field test but
1182 /// fail here.
1183 #[tokio::test]
1184 async fn tka_authority_watch_enables_then_clears_enforcement() {
1185 let (authority, sig) = authority_and_valid_sig();
1186 let mut bad_sig = sig.clone();
1187 let last = bad_sig.len() - 1;
1188 bad_sig[last] ^= 0xff;
1189
1190 let (mut tracker, tka_tx) = PeerTracker::for_test(test_env(), None);
1191
1192 // 1) No authority yet ⇒ admit-all (Go b.tka == nil).
1193 let good = peer_node("good", NODE_KEY_BYTES, sig.clone());
1194 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1195 let bad = peer_node("bad", [9u8; 32], bad_sig);
1196 let batch = ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1197 tracker.apply_peer_update(&batch);
1198 assert_eq!(tracker.peer_db.peers().len(), 3, "no lock ⇒ admit all");
1199
1200 // 2) Publish the verified authority over the watch cell (exactly what the control runner does
1201 // on a successful sync) ⇒ enforcement ON. A re-applied Full now drops unsigned + bad.
1202 tka_tx.send_replace(Some(Arc::new(authority)));
1203 tracker.apply_peer_update(&batch);
1204 assert_eq!(
1205 tracker.peer_db.peers().len(),
1206 1,
1207 "lock active ⇒ only the signed peer survives"
1208 );
1209 assert!(tracker.peer_db.get(&good.node_key).is_some());
1210 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1211 assert!(tracker.peer_db.get(&bad.node_key).is_none());
1212
1213 // 3) Lock disabled (None) ⇒ enforcement cleared ⇒ a peer that was DROPPED while enforced is
1214 // re-admitted by a fresh netmap. Assert the specific previously-dropped key returns (not
1215 // merely a count), so this proves the drop→clear→re-admit transition, not "admit-all-fresh".
1216 tka_tx.send_replace(None);
1217 tracker.apply_peer_update(&batch);
1218 assert_eq!(
1219 tracker.peer_db.peers().len(),
1220 3,
1221 "lock disabled ⇒ admit all again"
1222 );
1223 assert!(
1224 tracker.peer_db.get(&unsigned.node_key).is_some(),
1225 "the peer dropped under enforcement must come back once the lock is cleared"
1226 );
1227 assert!(tracker.peer_db.get(&bad.node_key).is_some());
1228 }
1229
1230 /// Degenerate input: two DISTINCT nodes sharing one `stable_id` in a single `Full`, one with a
1231 /// valid signature and one unsigned, under an active lock. Each node is judged by its OWN verdict
1232 /// (the per-node `admits` vector), so the unsigned node is never admitted on the strength of its
1233 /// signed twin. The single-verify `Full` refactor keeps this per-node semantics (a stable_id-set
1234 /// alone would have admitted whichever node was upserted last). Malformed control input; asserted
1235 /// only to lock the verdict-per-node behavior against regression.
1236 #[tokio::test]
1237 async fn tka_full_duplicate_stable_id_judges_each_node_on_its_own_signature() {
1238 let (authority, sig) = authority_and_valid_sig();
1239 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1240
1241 // Both carry stable_id "dup"; the signed one authorizes NODE_KEY_BYTES, the other is unsigned
1242 // and uses a different node key. Order them unsigned-last so a last-writer-wins stable_id set
1243 // would (wrongly) leave the unsigned node's key in the db.
1244 let signed = peer_node("dup", NODE_KEY_BYTES, sig);
1245 let unsigned = peer_node("dup", [8u8; 32], vec![]);
1246 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
1247 signed.clone(),
1248 unsigned.clone(),
1249 ]));
1250
1251 // The unsigned node's own verdict failed, so its key must NOT be present, regardless of the
1252 // shared stable_id. (The signed twin retained the stable_id; the db holds the signed key.)
1253 assert!(
1254 tracker.peer_db.get(&unsigned.node_key).is_none(),
1255 "a node whose own signature fails must not be admitted via a stable_id twin"
1256 );
1257 assert!(tracker.peer_db.get(&signed.node_key).is_some());
1258 }
1259
1260 /// The empty-trusted-key-state brick-guard: an authority with no keys must NOT drop the whole
1261 /// netmap (a `ts_tka` invariant violation / replayer edge). A verified chain always carries ≥1
1262 /// key, so this never weakens a genuine lock — it only prevents a black-hole. Uses ≥2 peers
1263 /// (one signed, one unsigned) to prove it admits **all**, not accidentally just one.
1264 #[tokio::test]
1265 async fn tka_empty_keyset_authority_admits_all() {
1266 use ts_tka::{AumHash, Authority, State};
1267 let empty_auth = Authority::from_state(AumHash([0u8; 32]), State { keys: Vec::new() });
1268 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(empty_auth));
1269 let signed = peer_node("signed", [7u8; 32], vec![0xde, 0xad]);
1270 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1271 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![
1272 signed.clone(),
1273 unsigned.clone(),
1274 ]));
1275 assert_eq!(
1276 tracker.peer_db.peers().len(),
1277 2,
1278 "an empty-keyset authority must admit ALL peers (brick-guard), not enforce"
1279 );
1280 }
1281
1282 /// Signature-replay / `NodeKeyMismatch`: a structurally-valid signature that authorizes
1283 /// `NODE_KEY_BYTES` must NOT admit a DIFFERENT node key carrying that same signature blob. This is
1284 /// the highest-value bypass — if the sig↔node-key binding in `verify_signature` were dropped, this
1285 /// is the only test that would catch it (the other "bad" peers only flip a byte ⇒ `BadSignature`).
1286 #[tokio::test]
1287 async fn tka_active_rejects_valid_sig_for_wrong_node_key() {
1288 let (authority, sig) = authority_and_valid_sig();
1289 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1290
1291 // The signature authorizes NODE_KEY_BYTES; attach it to an imposter with a different key.
1292 let imposter = peer_node("imposter", [0x55u8; 32], sig);
1293 assert!(
1294 !tracker.tka_admits(&imposter),
1295 "a signature bound to one node key must not authorize a different node key"
1296 );
1297 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![imposter.clone()]));
1298 assert!(tracker.peer_db.get(&imposter.node_key).is_none());
1299 }
1300
1301 /// `UntrustedKey`: a signature produced by a well-formed Ed25519 key that is NOT in the
1302 /// authority's trusted-key state must be rejected — distinct from a tampered-byte `BadSignature`.
1303 #[tokio::test]
1304 async fn tka_active_rejects_sig_from_untrusted_key() {
1305 use ed25519_dalek::{Signer, SigningKey};
1306 let (authority, _sig) = authority_and_valid_sig();
1307 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1308
1309 // Sign a valid CBOR with a DIFFERENT key (not the one the authority trusts). The key_id in
1310 // the signature names this untrusted key, so `get_key` misses ⇒ UntrustedKey.
1311 let rogue = SigningKey::from_bytes(&[99u8; 32]);
1312 let rogue_pub = rogue.verifying_key().to_bytes().to_vec();
1313 let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &rogue_pub, None);
1314 let sig_hash = ts_tka::aum_hash(&preimage).0;
1315 let signature = rogue.sign(&sig_hash).to_bytes().to_vec();
1316 let rogue_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &rogue_pub, Some(&signature));
1317
1318 let peer = peer_node("rogue-signed", NODE_KEY_BYTES, rogue_cbor);
1319 assert!(
1320 !tracker.tka_admits(&peer),
1321 "a signature from a key outside the trusted set must be rejected"
1322 );
1323 // Drive the real upsert path too (match the sibling replay test's depth): an untrusted-key
1324 // signature must keep the peer out of the db, not merely fail the verdict in isolation.
1325 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1326 assert!(tracker.peer_db.get(&peer.node_key).is_none());
1327 }
1328
1329 /// Bus-enable analogue for `Delta`: enforcement engaged via the watch cell must also gate a
1330 /// `Delta { upsert }` (not only `Full`). Closes the "authority arrived over the transport AND the
1331 /// next update is a Delta" combination.
1332 #[tokio::test]
1333 async fn tka_watch_enable_enforces_delta_upsert() {
1334 let (authority, sig) = authority_and_valid_sig();
1335 let (mut tracker, tka_tx) = PeerTracker::for_test(test_env(), None);
1336 tka_tx.send_replace(Some(Arc::new(authority)));
1337
1338 let good = peer_node("good", NODE_KEY_BYTES, sig);
1339 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1340 tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1341 remove: vec![],
1342 upsert: vec![good.clone(), unsigned.clone()],
1343 });
1344 assert!(tracker.peer_db.get(&good.node_key).is_some());
1345 assert!(
1346 tracker.peer_db.get(&unsigned.node_key).is_none(),
1347 "delta upsert under an active lock must drop the unsigned peer"
1348 );
1349 }
1350
1351 /// A `Delta` re-upsert of an ALREADY-ADMITTED peer whose signature is now invalid must EVICT the
1352 /// stale entry (revocation-via-delta), not leave it admitted. Go re-filters the whole netmap each
1353 /// response, so a now-unsigned peer would not survive there either.
1354 #[tokio::test]
1355 async fn tka_delta_reupsert_with_invalid_sig_evicts_existing() {
1356 let (authority, sig) = authority_and_valid_sig();
1357 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1358
1359 // Admit the signed peer.
1360 let good = peer_node("good", NODE_KEY_BYTES, sig.clone());
1361 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1362 assert!(tracker.peer_db.get(&good.node_key).is_some());
1363
1364 // Re-upsert the SAME stable_id (now with no signature) via a delta ⇒ evicted, not retained.
1365 let revoked = peer_node("good", NODE_KEY_BYTES, vec![]);
1366 tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1367 remove: vec![],
1368 upsert: vec![revoked],
1369 });
1370 assert!(
1371 tracker.peer_db.get(&good.node_key).is_none(),
1372 "a delta re-upsert that fails the lock must evict the previously-admitted peer"
1373 );
1374 }
1375
1376 #[tokio::test]
1377 async fn tka_full_resync_revocation_behavior() {
1378 // Revocation-on-resync: admit a peer, then re-include the SAME stable_id in a `Full` with a
1379 // now-invalid signature. Per the Logic review finding, the pre-fix `retain` kept the stale
1380 // (previously-admitted) entry because membership was decided purely by stable_id.
1381 //
1382 // FIXED (not merely documented): the `Full` `retain` now keys on `tka_admits`-passing
1383 // stable_ids, so a peer whose re-included signature no longer verifies under the active
1384 // authority is EVICTED. This test asserts eviction. The inactive (authority=None) path is
1385 // provably unchanged — `tka_admits` always returns `true` there, so the retained set equals
1386 // the set of re-included stable_ids exactly (see `tka_inactive_full_resync_keeps_*`).
1387 let (authority, sig) = authority_and_valid_sig();
1388 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1389
1390 // 1) Admit the peer with a valid signature via a real `Full`.
1391 let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
1392 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1393 assert_eq!(tracker.peer_db.peers().len(), 1);
1394 assert!(tracker.peer_db.get(&good.node_key).is_some());
1395
1396 // 2) Re-sync the SAME stable_id, but with a now-invalid signature (tamper trailing byte).
1397 let mut bad_sig = sig;
1398 let last = bad_sig.len() - 1;
1399 bad_sig[last] ^= 0xff;
1400 let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
1401 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
1402
1403 // Eviction: the stale entry is dropped because its re-included signature fails the gate.
1404 assert_eq!(tracker.peer_db.peers().len(), 0);
1405 assert!(tracker.peer_db.get(&revoked.node_key).is_none());
1406 }
1407
1408 #[tokio::test]
1409 async fn tka_inactive_full_resync_keeps_reincluded_peer() {
1410 // Guard the inactive (authority=None) path against the revocation fix: with no authority,
1411 // a peer re-included in a `Full` survives regardless of its signature bytes — byte-for-byte
1412 // pre-TKA behavior, proving the `Full` `retain` change does not regress the always-taken
1413 // branch this wave.
1414 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1415
1416 let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
1417 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1418 assert_eq!(tracker.peer_db.peers().len(), 1);
1419
1420 // Re-sync the same stable_id with garbage signature bytes; inactive enforcement keeps it.
1421 let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
1422 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
1423 assert_eq!(tracker.peer_db.peers().len(), 1);
1424 assert!(tracker.peer_db.get(&resynced.node_key).is_some());
1425 }
1426
1427 /// A `Patch` for a peer already in the netmap merges only the fields it carries — here new UDP
1428 /// endpoints and a new home DERP — leaving the rest of the node intact. This is the fix for
1429 /// dropped `peers_changed_patch`: without it the netmap keeps stale endpoints and the peer can
1430 /// never re-handshake after it moves.
1431 #[tokio::test]
1432 async fn patch_merges_endpoints_and_derp_into_existing_peer() {
1433 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1434
1435 // Seed a peer (id == 1, per `peer_node`) with no endpoints / no DERP.
1436 let peer = peer_node("mover", [1u8; 32], vec![]);
1437 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1438 let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1439 assert!(before.underlay_addresses.is_empty());
1440 assert!(before.derp_region.is_none());
1441
1442 // Patch in fresh reachability (the idle-peer-reconnect case).
1443 let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1444 let patch = ts_control::PeerChange {
1445 id: 1,
1446 derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
1447 cap: None,
1448 cap_map: None,
1449 underlay_addresses: Some(vec![new_ep]),
1450 node_key: None,
1451 key_signature: None,
1452 disco_key: None,
1453 node_key_expiry: None,
1454 online: None,
1455 last_seen: None,
1456 };
1457 let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1458
1459 assert_eq!(upserts.len(), 1);
1460 assert_eq!(deletions.len(), 0);
1461 // Same peer, now carrying the patched endpoint + DERP; node key untouched.
1462 assert_eq!(tracker.peer_db.peers().len(), 1);
1463 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1464 assert_eq!(after.underlay_addresses, vec![new_ep]);
1465 assert_eq!(
1466 after.derp_region,
1467 Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
1468 );
1469 assert_eq!(after.node_key, peer.node_key);
1470 }
1471
1472 /// Regression for `tsr-5u0`: when a whole-node set (`Delta`/`Full`) and a patch co-occur in one
1473 /// response, the patch is applied *on top of* the node the set just upserted — mirroring the
1474 /// handler's apply-order (peer set first, then `peer_patches`). Before the fix the patch shared
1475 /// the single `peer_update` slot and the co-occurring set silently dropped it, so a peer brought
1476 /// in by the delta kept stale (empty) reachability.
1477 #[tokio::test]
1478 async fn patch_applies_on_top_of_co_occurring_delta() {
1479 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1480
1481 // The whole-node delta upserts a brand-new peer (id == 1) with no reachability.
1482 let peer = peer_node("mover", [1u8; 32], vec![]);
1483 let (set_upserts, _) = tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1484 upsert: vec![peer.clone()],
1485 remove: vec![],
1486 });
1487 assert_eq!(set_upserts.len(), 1, "delta upserts the new peer");
1488
1489 // The patch from the SAME response then sets that peer's endpoints + DERP. This is exactly
1490 // the consumer order the handler runs (apply_peer_update then apply_peer_patches).
1491 let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1492 let patch = ts_control::PeerChange {
1493 id: 1,
1494 derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap())),
1495 cap: None,
1496 cap_map: None,
1497 underlay_addresses: Some(vec![new_ep]),
1498 node_key: None,
1499 key_signature: None,
1500 disco_key: None,
1501 node_key_expiry: None,
1502 online: None,
1503 last_seen: None,
1504 };
1505 let (patch_upserts, patch_deletions) =
1506 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1507
1508 assert_eq!(
1509 patch_upserts.len(),
1510 1,
1511 "patch re-upserts the just-added peer"
1512 );
1513 assert_eq!(patch_deletions.len(), 0);
1514 // The peer added by the delta now carries the patched reachability — the patch was NOT lost.
1515 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1516 assert_eq!(after.underlay_addresses, vec![new_ep]);
1517 assert_eq!(
1518 after.derp_region,
1519 Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap()))
1520 );
1521 }
1522
1523 /// A `Patch` whose node id is not in the current netmap is ignored (the wire contract: a patch
1524 /// never creates a node). No upsert, no deletion, peer set unchanged.
1525 #[tokio::test]
1526 async fn patch_for_unknown_node_is_ignored() {
1527 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1528 let known = peer_node("known", [1u8; 32], vec![]); // id == 1
1529 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1530
1531 let patch = ts_control::PeerChange {
1532 id: 999, // not in the netmap
1533 derp_region: None,
1534 cap: None,
1535 cap_map: None,
1536 underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1537 node_key: None,
1538 key_signature: None,
1539 disco_key: None,
1540 node_key_expiry: None,
1541 online: None,
1542 last_seen: None,
1543 };
1544 let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1545
1546 assert_eq!(upserts.len(), 0);
1547 assert_eq!(deletions.len(), 0);
1548 assert_eq!(tracker.peer_db.peers().len(), 1);
1549 assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1550 }
1551
1552 /// An expiry-only `Patch` updates `node_key_expiry` on the matching peer (Go
1553 /// `PeerChange.KeyExpiry`), rather than being silently dropped until the next full resync.
1554 #[tokio::test]
1555 async fn patch_updates_node_key_expiry() {
1556 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1557 let peer = peer_node("expiring", [1u8; 32], vec![]); // id == 1, node_key_expiry: None
1558 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1559
1560 let expiry = "2027-01-01T00:00:00Z"
1561 .parse::<chrono::DateTime<chrono::Utc>>()
1562 .unwrap();
1563 let patch = ts_control::PeerChange {
1564 id: 1,
1565 derp_region: None,
1566 cap: None,
1567 cap_map: None,
1568 underlay_addresses: None,
1569 node_key: None,
1570 key_signature: None,
1571 disco_key: None,
1572 node_key_expiry: Some(expiry),
1573 online: None,
1574 last_seen: None,
1575 };
1576 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1577
1578 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1579 assert_eq!(after.node_key_expiry, Some(expiry));
1580 }
1581
1582 /// Channel B: a `PeerChange.online` patch flips a peer's online state without a full node.
1583 #[tokio::test]
1584 async fn patch_updates_online() {
1585 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1586 let peer = peer_node("p", [1u8; 32], vec![]); // id == 1, online: None
1587 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1588 assert_eq!(
1589 tracker
1590 .peer_db
1591 .get(&(1 as ts_control::NodeId))
1592 .unwrap()
1593 .1
1594 .online,
1595 None
1596 );
1597
1598 let mut patch = ts_control::PeerChange {
1599 id: 1,
1600 derp_region: None,
1601 cap: None,
1602 cap_map: None,
1603 underlay_addresses: None,
1604 node_key: None,
1605 key_signature: None,
1606 disco_key: None,
1607 node_key_expiry: None,
1608 online: Some(true),
1609 last_seen: None,
1610 };
1611 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1612 assert_eq!(
1613 tracker
1614 .peer_db
1615 .get(&(1 as ts_control::NodeId))
1616 .unwrap()
1617 .1
1618 .online,
1619 Some(true),
1620 "PeerChange.online=Some(true) marks the peer online"
1621 );
1622
1623 // A subsequent patch flips it offline.
1624 patch.online = Some(false);
1625 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1626 assert_eq!(
1627 tracker
1628 .peer_db
1629 .get(&(1 as ts_control::NodeId))
1630 .unwrap()
1631 .1
1632 .online,
1633 Some(false)
1634 );
1635 }
1636
1637 /// Channel C/D (Go `map.go:updatePeersStateFromResponse`): `online_change` is the sole driver of
1638 /// `online`; `peer_seen_change` is the sole driver of `last_seen` (true ⇒ now, false ⇒ cleared)
1639 /// and must NEVER touch `online`. Both apply to a peer already in the netmap and ignore unknown
1640 /// ids. This pins the fix for the prior bug where channel D wrote `online=false` (conflating
1641 /// "not seen recently" with "offline" — distinct signals in Go).
1642 #[tokio::test]
1643 async fn liveness_change_maps_apply_online() {
1644 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1645 let peer = peer_node("p", [1u8; 32], vec![]); // id == 1
1646 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1647 // A fixed timestamp (chrono is built without its `clock` feature, so no `Utc::now()`).
1648 let now = chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap();
1649
1650 // Channel C: online_change sets online=true.
1651 let mut online_change = std::collections::BTreeMap::new();
1652 online_change.insert(1 as ts_control::NodeId, true);
1653 online_change.insert(999 as ts_control::NodeId, true); // unknown id — ignored
1654 let changed = tracker.apply_liveness_changes(&online_change, &Default::default(), now);
1655 assert!(changed);
1656 assert_eq!(
1657 tracker
1658 .peer_db
1659 .get(&(1 as ts_control::NodeId))
1660 .unwrap()
1661 .1
1662 .online,
1663 Some(true)
1664 );
1665
1666 // Channel D: peer_seen_change=true sets last_seen=now and leaves online UNTOUCHED.
1667 let mut seen_true = std::collections::BTreeMap::new();
1668 seen_true.insert(1 as ts_control::NodeId, true);
1669 let changed = tracker.apply_liveness_changes(&Default::default(), &seen_true, now);
1670 assert!(changed);
1671 {
1672 let (_id, node) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1673 assert_eq!(
1674 node.last_seen,
1675 Some(now),
1676 "peer_seen_change=true sets last_seen=now"
1677 );
1678 assert_eq!(
1679 node.online,
1680 Some(true),
1681 "channel D must NOT touch online (still true from channel C)"
1682 );
1683 }
1684
1685 // Channel D: peer_seen_change=false clears last_seen, still leaving online untouched.
1686 let mut seen_false = std::collections::BTreeMap::new();
1687 seen_false.insert(1 as ts_control::NodeId, false);
1688 let changed = tracker.apply_liveness_changes(&Default::default(), &seen_false, now);
1689 assert!(changed);
1690 {
1691 let (_id, node) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1692 assert_eq!(
1693 node.last_seen, None,
1694 "peer_seen_change=false clears last_seen"
1695 );
1696 assert_eq!(node.online, Some(true), "channel D must NOT mark offline");
1697 }
1698 assert_eq!(
1699 tracker.peer_db.peers().len(),
1700 1,
1701 "the node is retained, not removed"
1702 );
1703
1704 // No-op when nothing matches / changes.
1705 assert!(!tracker.apply_liveness_changes(&Default::default(), &Default::default(), now));
1706 }
1707
1708 /// Security: a `Patch` that rotates the node key must re-satisfy the tailnet-lock authority,
1709 /// exactly like a `Delta` upsert. A key-rotation patch whose new signature does NOT verify
1710 /// evicts the peer (fail-closed) rather than leaving a now-unverified entry — closing what would
1711 /// otherwise be a trust-enforcement bypass via the patch path.
1712 #[tokio::test]
1713 async fn patch_key_rotation_failing_tka_evicts_peer() {
1714 let (authority, sig) = authority_and_valid_sig();
1715 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), Some(authority));
1716
1717 // Admit a correctly-signed peer (id == 1).
1718 let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1719 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1720 assert_eq!(tracker.peer_db.peers().len(), 1);
1721
1722 // Patch a new node key whose signature is garbage under the active authority.
1723 let patch = ts_control::PeerChange {
1724 id: 1,
1725 derp_region: None,
1726 cap: None,
1727 cap_map: None,
1728 underlay_addresses: None,
1729 node_key: Some([0x33u8; 32].into()),
1730 key_signature: Some(vec![0x00, 0x01, 0x02]),
1731 disco_key: None,
1732 node_key_expiry: None,
1733 online: None,
1734 last_seen: None,
1735 };
1736 let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1737
1738 assert_eq!(upserts.len(), 0);
1739 assert_eq!(deletions.len(), 1);
1740 assert_eq!(tracker.peer_db.peers().len(), 0);
1741 }
1742
1743 /// A node's `user_id` joins against the accumulated UserProfiles table to resolve the owning
1744 /// user's login name in `WhoIs.user`. With no matching profile, `user` is `None` (the
1745 /// pre-existing behavior); once a profile arrives, the same node resolves to its login. This
1746 /// proves the accumulate-then-join path the netmap handler builds.
1747 fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1748 ts_control::UserProfile {
1749 id,
1750 login_name: login.to_string(),
1751 display_name: None,
1752 }
1753 }
1754
1755 #[tokio::test]
1756 async fn whois_resolves_user_from_accumulated_profiles() {
1757 let (mut tracker, _tka_tx) = PeerTracker::for_test(test_env(), None);
1758
1759 // A peer owned by user id 42 at 100.64.0.1 (the peer_node fixture's address).
1760 let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1761 peer.user_id = 42;
1762 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1763 let addr = "100.64.0.1:0".parse().unwrap();
1764
1765 // No profile yet: the node resolves but its owner is unknown.
1766 let who = tracker.whois_opt(addr).expect("peer is known");
1767 assert_eq!(who.user, None);
1768
1769 // Profile for a DIFFERENT user must not match.
1770 tracker
1771 .user_profiles
1772 .insert(7, profile(7, "someone-else@example.com"));
1773 assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1774
1775 // The owning user's profile arrives (as the netmap handler would accumulate it): now the
1776 // login resolves.
1777 tracker
1778 .user_profiles
1779 .insert(42, profile(42, "alice@example.com"));
1780 assert_eq!(
1781 tracker.whois_opt(addr).unwrap().user,
1782 Some("alice@example.com".to_string())
1783 );
1784 }
1785
1786 /// `UserProfile::best_label` prefers the login name, falling back to display name, else `None`.
1787 #[test]
1788 fn user_profile_best_label_prefers_login() {
1789 assert_eq!(
1790 profile(1, "alice@example.com").best_label(),
1791 Some("alice@example.com".to_string())
1792 );
1793 let display_only = ts_control::UserProfile {
1794 id: 2,
1795 login_name: String::new(),
1796 display_name: Some("Bob".to_string()),
1797 };
1798 assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1799 let empty = ts_control::UserProfile {
1800 id: 3,
1801 login_name: String::new(),
1802 display_name: None,
1803 };
1804 assert_eq!(empty.best_label(), None);
1805 }
1806}