ts_runtime/peer_tracker/mod.rs
1//! Peer delta update tracking.
2
3use std::{
4 collections::{HashMap, HashSet},
5 net::IpAddr,
6 sync::Arc,
7};
8
9use kameo::{
10 actor::ActorRef,
11 message::{Context, Message},
12 reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24/// Actor that tracks peer delta updates and emits new states.
25pub struct PeerTracker {
26 peer_db: PeerDb,
27 seen_state_update: bool,
28 pending_requests: Vec<Pending>,
29 /// Latest peer snapshot, published on every netmap update so embedders can watch for peer
30 /// changes ([`WatchNetmap`]).
31 peer_watch: watch::Sender<Vec<StatusNode>>,
32 /// Accumulated netmap user profiles (`MapResponse.UserProfiles`), keyed by user id, joined
33 /// against a node's [`Node::user_id`](ts_control::Node::user_id) to resolve the owning user's
34 /// login/display name for a [`WhoIs`](crate::status::WhoIs). Control sends these incrementally
35 /// (only new/changed profiles per response), so this map **accumulates** across updates rather
36 /// than being replaced — a peer upserted in one response may reference a profile delivered in an
37 /// earlier one.
38 user_profiles: HashMap<UserId, UserProfile>,
39 /// Tailnet-Lock (TKA) authority used to verify each peer's `key_signature` at the peer-trust
40 /// chokepoint. When `Some`, enforcement is **active**: every upserted peer must present a
41 /// signature this authority authorizes, or it is rejected (fail-closed). When `None` (always,
42 /// this wave) enforcement is **inactive** and every peer is upserted — identical to pre-TKA
43 /// behavior. There is no live `Authority` source yet: building one requires the
44 /// `/machine/tka/sync` Noise RPC + AUM-chain replayer (deferred, see SECURITY.md). The
45 /// enforcement path below is wired and unit-tested, and flips on the instant an authority is
46 /// supplied; it is explicitly gated, not a silent no-op.
47 tka_authority: Option<ts_tka::Authority>,
48 /// Tailnet-Lock authority used **observe-only** (verify-and-LOG, issue #136): the live
49 /// `Authority` synced from control (delivered over the bus via [`TkaAuthorityUpdate`]). Distinct
50 /// from [`tka_authority`](Self::tka_authority) on purpose — populating *that* would flip the
51 /// runtime to fail-closed enforcement, whereas this field only feeds
52 /// [`tka_observe_log`](Self::tka_observe_log), which logs each peer's signature verdict and
53 /// **never** drops a peer. The current posture (per SECURITY.md / PARITY_ROADMAP): verify-and-log
54 /// while the `ts_tka` crypto is unaudited and control is treated as trusted; flipping to enforce
55 /// is a separate, gated decision.
56 tka_observe: Option<ts_tka::Authority>,
57 env: Env,
58}
59
60impl PeerTracker {
61 fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
62 // Canonicalization (case + trailing dot) is handled inside the name index lookup.
63 self.peer_db.get(&name).map(|(_id, node)| node)
64 }
65
66 fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
67 self.peer_db.get(&ip).map(|(_id, node)| node)
68 }
69
70 /// Build the peer entries for a [`Status`](crate::Status) snapshot from the current peer db.
71 ///
72 /// Connectivity fields (`cur_addr`/`relay`) are left at their `from_node` defaults (`None`) here:
73 /// this is the live-watch/hot path and must stay magicsock-free and synchronous. The explicit
74 /// [`GetStatus`] snapshot enriches them ([`status_peers_with_ids`](Self::status_peers_with_ids)).
75 fn status_peers(&self) -> Vec<StatusNode> {
76 self.peer_db
77 .peers()
78 .values()
79 .map(StatusNode::from_node)
80 .collect()
81 }
82
83 /// Like [`status_peers`](Self::status_peers) but pairs each entry with its [`PeerId`], so the
84 /// caller can join per-peer connectivity (the direct manager's `best_addrs`, keyed by `PeerId`)
85 /// onto the `StatusNode` before returning it. Order is unspecified (a `HashMap` walk).
86 fn status_peers_with_ids(&self) -> Vec<(PeerId, StatusNode)> {
87 self.peer_db
88 .peers()
89 .iter()
90 .map(|(id, node)| (*id, StatusNode::from_node(node)))
91 .collect()
92 }
93
94 fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
95 let ip = crate::status::whois_addr(addr);
96 let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
97 // Join the node's owning user id against the accumulated UserProfiles table to resolve a
98 // login/display name. `None` when control sent no profile for that user (e.g. tagged nodes
99 // with no human owner, or a profile not yet delivered).
100 let user = self.resolve_user(node.user_id);
101 Some(crate::status::WhoIs::from_node_with_user(node, user))
102 }
103
104 /// Resolve a user id to its best display label from the accumulated profile table.
105 fn resolve_user(&self, user_id: UserId) -> Option<String> {
106 self.user_profiles
107 .get(&user_id)
108 .and_then(UserProfile::best_label)
109 }
110
111 /// Whether `node` may be admitted to the peer db under the current Tailnet-Lock posture.
112 ///
113 /// Fail-closed and gated:
114 /// - No [`tka_authority`](Self::tka_authority) ⇒ enforcement inactive ⇒ always admit (today's
115 /// behavior; this is the always-taken branch this wave).
116 /// - Authority present + peer carries a `key_signature` that the authority authorizes for the
117 /// peer's node key ⇒ admit.
118 /// - Authority present + signature missing or unauthorized/invalid ⇒ **reject** (Go denies
119 /// network access to unsigned peers under tailnet lock; we do not upsert them).
120 fn tka_admits(&self, node: &Node) -> bool {
121 let Some(auth) = &self.tka_authority else {
122 return true;
123 };
124
125 if node.key_signature.is_empty() {
126 // TKA active but peer presented no signature: reject (Go denies network access to
127 // unsigned peers under tailnet lock, unless UnsignedPeerAPIOnly — out of scope here).
128 tracing::warn!(
129 stable_id = ?node.stable_id,
130 "TKA: rejecting unsigned peer under tailnet lock"
131 );
132 return false;
133 }
134
135 if let Err(e) = auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
136 tracing::warn!(
137 stable_id = ?node.stable_id,
138 error = %e,
139 "TKA: rejecting peer with unauthorized node key"
140 );
141 return false;
142 }
143
144 true
145 }
146
147 /// Verify `node`'s Tailnet-Lock signature against the **observe-only** authority and LOG the
148 /// verdict — issue #136. This is the verify-and-log seam: it returns `()` (NOT a bool), so it is
149 /// structurally impossible to wire as an admission gate, and it is called *adjacent* to each
150 /// upsert site without affecting whether the peer is admitted. Every peer is upserted exactly as
151 /// it would be with this call absent.
152 ///
153 /// A no-op when no observe authority has been synced yet. Logs `verified` / `failed` / `unsigned`
154 /// with the peer's `stable_id` and, on failure, the `TkaError` Display (static descriptors —
155 /// "bad sig len" etc.). NEVER logs the node-key or signature bytes.
156 fn tka_observe_log(&self, node: &Node) {
157 let Some(auth) = &self.tka_observe else {
158 return;
159 };
160 if node.key_signature.is_empty() {
161 tracing::info!(
162 stable_id = ?node.stable_id,
163 tka_verdict = "unsigned",
164 "TKA observe: peer presented no key-signature (advisory, NOT enforced)"
165 );
166 return;
167 }
168 match auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
169 Ok(()) => tracing::info!(
170 stable_id = ?node.stable_id,
171 tka_verdict = "verified",
172 "TKA observe: peer node-key authorized (advisory, NOT enforced)"
173 ),
174 Err(e) => tracing::warn!(
175 stable_id = ?node.stable_id,
176 tka_verdict = "failed",
177 reason = %e,
178 "TKA observe: peer key-signature did not verify (advisory, NOT enforced)"
179 ),
180 }
181 }
182}
183
184impl kameo::Actor for PeerTracker {
185 type Args = Env;
186 type Error = Error;
187
188 async fn on_start(env: Self::Args, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
189 env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
190 // Observe-only TKA (#136): the control runner publishes the verified `Authority` here after a
191 // successful `/machine/tka/sync`; we use it to verify-and-LOG each peer's signature, never to
192 // enforce. The bus has no replay, so the control runner re-publishes on every sync.
193 env.subscribe::<TkaAuthorityUpdate>(&slf).await?;
194
195 let (peer_watch, _) = watch::channel(Vec::new());
196
197 Ok(Self {
198 peer_db: PeerDb::default(),
199 pending_requests: Default::default(),
200 seen_state_update: false,
201 peer_watch,
202 user_profiles: HashMap::new(),
203 // No live TKA *enforcement* authority this wave (fail-closed path stays gated off; see
204 // `tka_authority`). The observe-only authority (`tka_observe`) is supplied over the bus.
205 tka_authority: None,
206 tka_observe: None,
207 env,
208 })
209 }
210}
211
212enum Pending {
213 PeerByName(PeerByName, ReplySender<Option<Node>>),
214 AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
215 TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
216 Status(ReplySender<Vec<(PeerId, StatusNode)>>),
217 WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
218}
219
220// For messages with arguments, a struct is generated with the args as fields. They aren't
221// documented, and we can't apply attributes directly to the fields. Hence, wrap in a module where
222// docs are turned off everywhere.
223#[allow(missing_docs)]
224mod msg_impl {
225 use std::net::IpAddr;
226
227 use kameo::prelude::DelegatedReply;
228
229 use super::*;
230
231 #[kameo::messages]
232 impl PeerTracker {
233 /// Lookup a peer by name.
234 ///
235 /// Waits until we've received at least one peer update from control.
236 #[message(ctx)]
237 pub async fn peer_by_name(
238 &mut self,
239 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
240 name: String,
241 ) -> DelegatedReply<Option<Node>> {
242 let (deleg, sender) = ctx.reply_sender();
243 let Some(sender) = sender else { return deleg };
244
245 if !self.seen_state_update {
246 tracing::debug!(query = name, "no peer state seen yet, queueing request");
247
248 self.pending_requests
249 .push(Pending::PeerByName(PeerByName { name }, sender));
250
251 return deleg;
252 }
253
254 sender.send(self.peer_by_name_opt(&name).cloned());
255
256 deleg
257 }
258
259 /// Lookup all peers that accept packets addressed to the given IP.
260 ///
261 /// This includes the peer's tailnet address and any subnet routes it provides. Only
262 /// the peers with the most specific subnet route match that covers `ip` will be
263 /// returned.
264 ///
265 /// E.g., suppose:
266 ///
267 /// - We're querying for `10.1.2.3`
268 /// - `PeerA` and `PeerB` have accepted routes for `10.1.2.0/24`
269 /// - `PeerC` has an accepted route for `10.1.0.0/16`
270 ///
271 /// Only `PeerA` and `PeerB` will be returned, since they have the most specific
272 /// prefix match.
273 #[message(ctx)]
274 pub fn peer_by_accepted_route(
275 &mut self,
276 ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
277 ip: IpAddr,
278 ) -> DelegatedReply<Vec<Node>> {
279 let (deleg, sender) = ctx.reply_sender();
280 let Some(sender) = sender else { return deleg };
281
282 if !self.seen_state_update {
283 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
284
285 self.pending_requests
286 .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
287
288 return deleg;
289 }
290
291 sender.send(
292 self.peer_db
293 .get_route(ip.into())
294 .map(|(_id, node)| node.clone())
295 .collect(),
296 );
297
298 deleg
299 }
300
301 /// Lookup the peer that has the given tailnet IP address.
302 #[message(ctx)]
303 pub fn peer_by_tailnet_ip(
304 &mut self,
305 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
306 ip: IpAddr,
307 ) -> DelegatedReply<Option<Node>> {
308 let (deleg, sender) = ctx.reply_sender();
309 let Some(sender) = sender else { return deleg };
310
311 if !self.seen_state_update {
312 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
313
314 self.pending_requests
315 .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
316
317 return deleg;
318 }
319
320 sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
321
322 deleg
323 }
324
325 /// Build the peer entries of a [`Status`](crate::Status) snapshot, each paired with its
326 /// [`PeerId`] so [`Runtime::status`](crate::Runtime::status) can join per-peer connectivity
327 /// (`cur_addr`/`relay`) from the direct manager before returning. The self node is *not*
328 /// included here (it lives in the control runner); `Runtime::status` combines both and drops
329 /// the ids.
330 ///
331 /// Waits until we've received at least one peer update from control.
332 #[message(ctx)]
333 pub fn get_status(
334 &mut self,
335 ctx: &mut Context<Self, DelegatedReply<Vec<(PeerId, StatusNode)>>>,
336 ) -> DelegatedReply<Vec<(PeerId, StatusNode)>> {
337 let (deleg, sender) = ctx.reply_sender();
338 let Some(sender) = sender else { return deleg };
339
340 if !self.seen_state_update {
341 tracing::debug!("no peer state seen yet, queueing status request");
342 self.pending_requests.push(Pending::Status(sender));
343 return deleg;
344 }
345
346 sender.send(self.status_peers_with_ids());
347
348 deleg
349 }
350
351 /// Return every known peer's full domain [`Node`] (not the lossy [`StatusNode`]).
352 ///
353 /// Used by [`Runtime::file_targets`](crate::Runtime::file_targets), which needs the full node
354 /// (peerAPI address, owning user id, cap map) to compute Taildrop send targets. The self node
355 /// is not included (it lives in the control runner). Returns empty before the first netmap —
356 /// the natural "not connected yet" analog (an immediate answer, no queueing needed: callers
357 /// that need a populated list await `Running` first).
358 #[message]
359 pub fn all_peers(&self) -> Vec<Node> {
360 self.peer_db.peers().values().cloned().collect()
361 }
362
363 /// Resolve which node owns a tailnet source address.
364 ///
365 /// Maps the source IP of `addr` to the owning node via the tailnet-IP index, returning a
366 /// [`WhoIs`](crate::WhoIs). The port is ignored (a tailnet IP uniquely identifies a node).
367 ///
368 /// The resulting [`WhoIs`](crate::WhoIs) carries no user/login or capability data: this
369 /// fork's domain [`Node`](ts_control::Node) does not retain those wire fields. See the
370 /// [`status`](crate::status) module docs for the gap.
371 ///
372 /// Waits until we've received at least one peer update from control.
373 #[message(ctx)]
374 pub fn whois(
375 &mut self,
376 ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
377 addr: std::net::SocketAddr,
378 ) -> DelegatedReply<Option<crate::status::WhoIs>> {
379 let (deleg, sender) = ctx.reply_sender();
380 let Some(sender) = sender else { return deleg };
381
382 if !self.seen_state_update {
383 tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
384 self.pending_requests
385 .push(Pending::WhoIs(Whois { addr }, sender));
386 return deleg;
387 }
388
389 sender.send(self.whois_opt(addr));
390
391 deleg
392 }
393
394 /// Subscribe to netmap peer-change events.
395 ///
396 /// Returns a [`watch::Receiver`] whose value is the current set of peer
397 /// [`StatusNode`]s, updated on every netmap state update from control. Embedders can await
398 /// changes via [`watch::Receiver::changed`] to react to peers joining, leaving, or changing.
399 ///
400 /// The receiver's initial value is the peer set at subscription time (empty before the
401 /// first netmap update). This is a peer-only view; combine with the self node from
402 /// [`Runtime::status`](crate::Runtime::status) when a full snapshot is needed.
403 #[message(derive(Clone))]
404 pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
405 self.peer_watch.subscribe()
406 }
407 }
408}
409
410pub use msg_impl::*;
411
412#[derive(Debug, Clone)]
413pub(crate) struct PeerState {
414 #[allow(unused)]
415 pub deletions: HashSet<PeerId>,
416 #[allow(unused)]
417 pub upserts: HashSet<PeerId>,
418 pub peers: Arc<PeerDb>,
419}
420
421impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
422 type Reply = ();
423
424 async fn handle(
425 &mut self,
426 msg: Arc<ts_control::StateUpdate>,
427 _ctx: &mut Context<Self, Self::Reply>,
428 ) {
429 // Accumulate user profiles first — control sends them incrementally and a response may
430 // carry profiles with no peer delta (or peers that reference a profile from an earlier
431 // response), so this must happen before the no-peer-update early return below.
432 for profile in &msg.user_profiles {
433 self.user_profiles.insert(profile.id, profile.clone());
434 }
435
436 // Apply the standalone online/last-seen delta maps (channels C/D, `MapResponse.OnlineChange`
437 // / `PeerSeenChange`). These arrive keyed by control node id and may ride a response that
438 // carries NO `peer_update` (a bare online flip is the common case), so they must be applied
439 // *before* the no-peer-update early return — otherwise online status freezes at the last
440 // full-node/patch value. Each entry only ever *sets* a value (never back to unknown).
441 let liveness_changed =
442 self.apply_liveness_changes(&msg.online_change, &msg.peer_seen_change);
443
444 if msg.peer_update.is_none() && msg.peer_patches.is_empty() {
445 // No peer set or patch this response. If a liveness delta still mutated the netmap,
446 // publish the refreshed snapshot so watchers (and `GetStatus`) see the new online state.
447 if liveness_changed {
448 self.service_pending_requests();
449 self.peer_watch.send_replace(self.status_peers());
450 if let Err(e) = self
451 .env
452 .publish(Arc::new(PeerState {
453 upserts: HashSet::default(),
454 deletions: HashSet::default(),
455 peers: Arc::new(self.peer_db.clone()),
456 }))
457 .await
458 {
459 tracing::error!(error = %e, "publishing liveness-only peer state update");
460 }
461 }
462 return;
463 }
464
465 // Apply the whole-node peer set (if any) FIRST, then the field-level patches on top —
466 // mirroring Go's `controlclient` order (`Peers*` then `PeersChangedPatch`). A response may
467 // carry either, both, or (with a liveness-only delta) neither. Merge the upsert/deletion sets
468 // so the published `PeerState` reflects every node touched by both passes; a node both
469 // upserted by the set and patched stays in `upserts` (the patch removes it from `deletions`).
470 let (mut upserts, mut deletions) = msg
471 .peer_update
472 .as_ref()
473 .map(|u| self.apply_peer_update(u))
474 .unwrap_or_default();
475
476 if !msg.peer_patches.is_empty() {
477 let (patch_upserts, patch_deletions) = self.apply_peer_patches(&msg.peer_patches);
478 // A patch can evict a node the set just upserted (TKA rejection after key rotation), or
479 // re-admit/patch one not in the set — reconcile so each id lands in exactly one set.
480 for id in &patch_upserts {
481 deletions.remove(id);
482 }
483 for id in &patch_deletions {
484 upserts.remove(id);
485 }
486 upserts.extend(patch_upserts);
487 deletions.extend(patch_deletions);
488 }
489
490 tracing::debug!(
491 n_upsert = upserts.len(),
492 n_delete = deletions.len(),
493 peer_count = self.peer_db.peers().len(),
494 "new peer state"
495 );
496
497 self.service_pending_requests();
498
499 // Publish the latest peer snapshot to netmap watchers. `send_replace` keeps the receiver's
500 // value current even when there are no subscribers, so a late subscriber sees fresh state.
501 self.peer_watch.send_replace(self.status_peers());
502
503 if let Err(e) = self
504 .env
505 .publish(Arc::new(PeerState {
506 upserts,
507 deletions,
508 peers: Arc::new(self.peer_db.clone()),
509 }))
510 .await
511 {
512 tracing::error!(error = %e, "publishing peer state update");
513 }
514 }
515}
516
517/// Bus message delivering the latest verified Tailnet-Lock [`Authority`](ts_tka::Authority) from the
518/// control runner (after a successful `/machine/tka/sync`) to the peer tracker for **observe-only**
519/// verify-and-logging (issue #136). Cloned onto the bus (`Authority` is `Clone`); the control runner
520/// re-publishes on every successful sync since the bus has no replay for a late subscriber.
521#[derive(Clone)]
522pub struct TkaAuthorityUpdate(pub Arc<ts_tka::Authority>);
523
524impl Message<TkaAuthorityUpdate> for PeerTracker {
525 type Reply = ();
526
527 async fn handle(&mut self, msg: TkaAuthorityUpdate, _ctx: &mut Context<Self, Self::Reply>) {
528 // Store as the OBSERVE-ONLY authority — never `tka_authority` (which would enforce). From
529 // here on, each upserted peer's signature verdict is logged; admission is unchanged.
530 tracing::info!(
531 head = %msg.0.head().to_base32(),
532 "TKA observe authority updated (verify-and-log active; not enforcing)"
533 );
534 self.tka_observe = Some((*msg.0).clone());
535 }
536}
537
538/// Ask the peer tracker to re-broadcast its current peer snapshot on the bus, without any peer
539/// change. Sent after a runtime preference change so the route updater and source filter (both
540/// `Arc<PeerState>` subscribers) re-resolve against the new value immediately, rather than waiting
541/// for the next netmap update: `Device::set_exit_node` (new exit-node selector) and
542/// `Device::set_accept_routes` (new accept-routes flag) both send it.
543#[derive(Debug, Clone, Copy)]
544pub struct RepublishState;
545
546impl Message<RepublishState> for PeerTracker {
547 type Reply = ();
548
549 async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
550 // An empty upsert/deletion set: this is a re-broadcast of the unchanged peer set, not a
551 // delta. Subscribers recompute their routes/filters against the current peers and the
552 // (just-updated) runtime preferences (exit-node selector, accept-routes flag).
553 if let Err(e) = self
554 .env
555 .publish(Arc::new(PeerState {
556 upserts: HashSet::default(),
557 deletions: HashSet::default(),
558 peers: Arc::new(self.peer_db.clone()),
559 }))
560 .await
561 {
562 tracing::error!(error = %e, "re-publishing peer state after a runtime preference change");
563 }
564 }
565}
566
567impl PeerTracker {
568 /// Apply a single [`PeerUpdate`](ts_control::PeerUpdate) to the peer db, enforcing the
569 /// Tailnet-Lock peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) at every upsert site.
570 ///
571 /// This is the **single source of truth** for the peer-trust enforcement loop: the actor's
572 /// netmap [`handle`](Message::handle) calls it, and so do the TKA enforcement tests, so the two
573 /// real upsert sites (`Full` and `Delta { upsert }`) cannot diverge from what is tested.
574 ///
575 /// Returns `(upserts, deletions)` — the [`PeerId`]s touched — for downstream bookkeeping.
576 fn apply_peer_update(
577 &mut self,
578 peer_update: &ts_control::PeerUpdate,
579 ) -> (HashSet<PeerId>, HashSet<PeerId>) {
580 let mut upserts = HashSet::default();
581 let mut deletions = HashSet::default();
582
583 match peer_update {
584 ts_control::PeerUpdate::Full(new_nodes) => {
585 tracing::trace!("full peer update");
586
587 // Only stable_ids that PASS the Tailnet-Lock gate survive a full re-sync. This makes
588 // revocation evict: if a peer is re-included with a now-invalid (or missing)
589 // signature under an active authority, it is excluded from `retained_ids`, so
590 // `retain` drops the stale (previously-admitted) entry rather than leaving it in the
591 // db unverified. With no authority, `tka_admits` is always `true`, so `retained_ids`
592 // is exactly the set of re-included stable_ids — the inactive path is byte-for-byte
593 // the pre-TKA behavior (no regression).
594 let retained_ids = new_nodes
595 .iter()
596 .filter(|node| self.tka_admits(node))
597 .map(|x| &x.stable_id)
598 .collect::<HashSet<_>>();
599
600 self.peer_db.retain(|id, peer| {
601 let retain = retained_ids.contains(&peer.stable_id);
602
603 if !retain {
604 deletions.insert(id);
605 }
606
607 retain
608 });
609
610 for node in new_nodes {
611 if !self.tka_admits(node) {
612 continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
613 }
614 self.tka_observe_log(node); // verify-and-LOG (#136); never gates admission
615 let peer_id = self.peer_db.upsert(node);
616 upserts.insert(peer_id);
617 }
618 }
619
620 ts_control::PeerUpdate::Delta { remove, upsert } => {
621 tracing::trace!("delta peer update");
622
623 for peer in upsert {
624 if !self.tka_admits(peer) {
625 continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
626 }
627 self.tka_observe_log(peer); // verify-and-LOG (#136); never gates admission
628 let id = self.peer_db.upsert(peer);
629
630 upserts.insert(id);
631 }
632
633 for peer in remove {
634 let Some((id, _node)) = self.peer_db.remove(peer) else {
635 tracing::error!(control_node_id = peer, "removed peer was unknown");
636 continue;
637 };
638
639 deletions.insert(id);
640 }
641 }
642 }
643
644 (upserts, deletions)
645 }
646
647 /// Apply field-level peer patches (`MapResponse.PeersChangedPatch`), returning the upserted /
648 /// deleted [`PeerId`]s.
649 ///
650 /// This is a SEPARATE channel from [`apply_peer_update`](Self::apply_peer_update): Go's
651 /// `controlclient` applies the whole-node `Peers*` set first and then `PeersChangedPatch`, so a
652 /// response that carries both has the peer set applied first (by the caller) and these patches
653 /// applied second, on top of the freshly-synced nodes. A patch only mutates a peer already in the
654 /// netmap; an unknown node id is ignored (the wire contract — a patch never creates a node).
655 fn apply_peer_patches(
656 &mut self,
657 patches: &[ts_control::PeerChange],
658 ) -> (HashSet<PeerId>, HashSet<PeerId>) {
659 let mut upserts = HashSet::default();
660 let mut deletions = HashSet::default();
661
662 tracing::trace!(n = patches.len(), "peer patch update");
663
664 for patch in patches {
665 // Clone the current node, apply the present fields, and re-upsert through the same path
666 // as a delta so indexes/routes stay consistent.
667 let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
668 tracing::debug!(
669 control_node_id = patch.id,
670 "peer patch for unknown node; ignoring"
671 );
672 continue;
673 };
674
675 let mut node = existing.clone();
676 if let Some(endpoints) = &patch.underlay_addresses {
677 node.underlay_addresses = endpoints.clone();
678 }
679 if let Some(derp) = patch.derp_region {
680 node.derp_region = Some(derp);
681 }
682 if let Some(cap) = patch.cap {
683 node.cap = cap;
684 }
685 if let Some(cap_map) = &patch.cap_map {
686 node.cap_map = cap_map.clone();
687 }
688 if let Some(disco_key) = patch.disco_key {
689 node.disco_key = Some(disco_key);
690 }
691 if let Some(expiry) = patch.node_key_expiry {
692 node.node_key_expiry = Some(expiry);
693 }
694 // Online/last-seen liveness deltas (`PeerChange.Online`/`LastSeen`) — the dominant
695 // channel by which peer online transitions arrive mid-session. A patch only ever *sets*
696 // a value (never patches back to unknown), so apply when present.
697 if let Some(online) = patch.online {
698 node.online = Some(online);
699 }
700 if let Some(last_seen) = patch.last_seen {
701 node.last_seen = Some(last_seen);
702 }
703 // Key rotation: a patch may swap the node key (and its TKA signature). Apply both
704 // together so the trust gate below verifies the new signature against the new key, never
705 // a mismatched pair.
706 if let Some(node_key) = patch.node_key {
707 node.node_key = node_key;
708 }
709 if let Some(sig) = &patch.key_signature {
710 node.key_signature = sig.clone();
711 }
712
713 // Re-run the tailnet-lock gate on the patched node: a patch that rotates the key must
714 // satisfy the active authority, exactly like a `Delta` upsert, or it would be a
715 // trust-enforcement bypass. fail-CLOSED — if the patched node is no longer admitted,
716 // evict it rather than keep the stale (now-unverified) entry.
717 if !self.tka_admits(&node) {
718 if let Some((id, _)) = self.peer_db.remove(&patch.id) {
719 tracing::warn!(
720 control_node_id = patch.id,
721 "peer patch rejected by tailnet lock; evicting peer"
722 );
723 deletions.insert(id);
724 }
725 continue;
726 }
727
728 self.tka_observe_log(&node); // verify-and-LOG (#136); never gates admission
729 let id = self.peer_db.upsert(&node);
730 upserts.insert(id);
731 }
732
733 (upserts, deletions)
734 }
735
736 /// Apply the standalone online/last-seen delta maps (`MapResponse.OnlineChange` /
737 /// `PeerSeenChange`, channels C/D) onto the retained netmap. Returns `true` if any node was
738 /// actually mutated (so the caller knows whether to re-publish).
739 ///
740 /// Mirrors Go's post-`peers*` application of these maps. Each entry is keyed by control node id
741 /// and only ever *sets* a value (never back to unknown). An entry for an unknown node id is
742 /// ignored (like a patch — these maps never create a node). `peer_seen_change`'s `false` ("the
743 /// peer is gone") is applied as `online = Some(false)` — the node stays in the netmap, it is
744 /// merely marked offline; the `last_seen = now` update for the `true` case is intentionally not
745 /// performed here (it needs a wall clock this actor does not hold, and `last_seen` is the
746 /// low-value half — `online` is the `tailscale status` column that matters; see the iter-5
747 /// research note §5.5).
748 fn apply_liveness_changes(
749 &mut self,
750 online_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
751 peer_seen_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
752 ) -> bool {
753 let mut changed = false;
754
755 // Channel C — direct online flips.
756 for (&node_id, &online) in online_change {
757 if let Some((_pid, existing)) = self.peer_db.get(&node_id)
758 && existing.online != Some(online)
759 {
760 let mut node = existing.clone();
761 node.online = Some(online);
762 self.peer_db.upsert(&node);
763 changed = true;
764 }
765 }
766
767 // Channel D — peer-seen flips. `false` ⇒ "the peer is gone" ⇒ mark offline (the node is
768 // retained, not removed). `true` ⇒ "seen just now"; the online half is unknown from this
769 // signal alone, so we leave `online` untouched (a `true` here does not assert connectivity to
770 // control, only recent contact) and defer the `last_seen = now` timestamp (no clock here).
771 for (&node_id, &seen) in peer_seen_change {
772 if !seen
773 && let Some((_pid, existing)) = self.peer_db.get(&node_id)
774 && existing.online != Some(false)
775 {
776 let mut node = existing.clone();
777 node.online = Some(false);
778 self.peer_db.upsert(&node);
779 changed = true;
780 }
781 }
782
783 changed
784 }
785
786 /// Test-only constructor: build a [`PeerTracker`] with chosen TKA authorities without going
787 /// through the actor `on_start` path. `tka_authority` exercises the fail-closed enforcement
788 /// chokepoint ([`tka_admits`](Self::tka_admits)); `tka_observe` exercises the observe-only
789 /// verify-and-log seam ([`tka_observe_log`](Self::tka_observe_log)).
790 #[cfg(test)]
791 fn for_test(
792 env: Env,
793 tka_authority: Option<ts_tka::Authority>,
794 tka_observe: Option<ts_tka::Authority>,
795 ) -> Self {
796 let (peer_watch, _) = watch::channel(Vec::new());
797 Self {
798 peer_db: PeerDb::default(),
799 seen_state_update: false,
800 pending_requests: Vec::new(),
801 peer_watch,
802 user_profiles: HashMap::new(),
803 tka_authority,
804 tka_observe,
805 env,
806 }
807 }
808
809 fn service_pending_requests(&mut self) {
810 if self.seen_state_update {
811 return;
812 }
813
814 self.seen_state_update = true;
815
816 if !self.pending_requests.is_empty() {
817 tracing::debug!(
818 n_pending = self.pending_requests.len(),
819 "state update received, servicing pending requests"
820 );
821 }
822
823 for req in core::mem::take(&mut self.pending_requests) {
824 match req {
825 Pending::PeerByName(PeerByName { name }, reply) => {
826 reply.send(self.peer_by_name_opt(&name).cloned());
827 }
828 Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
829 reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
830 }
831 Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
832 reply.send(
833 self.peer_db
834 .get_route(ip.into())
835 .map(|(_id, node)| node.clone())
836 .collect(),
837 );
838 }
839 Pending::Status(reply) => {
840 reply.send(self.status_peers_with_ids());
841 }
842 Pending::WhoIs(Whois { addr }, reply) => {
843 reply.send(self.whois_opt(addr));
844 }
845 }
846 }
847 }
848}
849
850#[cfg(test)]
851mod tka_tests {
852 //! Tailnet-Lock (TKA) enforcement tests for the peer-trust chokepoint.
853 //!
854 //! These exercise [`PeerTracker::tka_admits`] and the `tka_admits ⇒ upsert` loop the netmap
855 //! handler runs. The test [`ts_tka::Authority`] is built with [`ts_tka::Authority::from_state`]
856 //! over a known Ed25519 trusted key, and the signed node-key signature CBOR is produced through
857 //! `ts_tka`'s public `cbor` encoder + `aum_hash` (the exact same canonical bytes `ts_tka`'s own
858 //! `direct_signature_verifies_end_to_end` test signs, with no new crypto vectors invented and no
859 //! private `ts_tka` API used).
860
861 use ed25519_dalek::{Signer, SigningKey};
862 use ts_control::{Node, StableNodeId, TailnetAddress};
863 use ts_tka::{
864 AumHash, Authority, Key, KeyKind, State,
865 cbor::{self, Value},
866 };
867
868 use super::*;
869
870 /// `SigKind::Direct` wire value (Go `SigKind`; `ts_tka::SigKind::Direct = 1`).
871 const SIG_KIND_DIRECT: u64 = 1;
872
873 /// The 32-byte node key used across the signed-peer fixtures.
874 const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
875
876 /// Build a real [`Env`] for the tracker. Only the bus/keys/shutdown plumbing matters here; the
877 /// TKA gate reads neither, so the forwarding preferences are all benign defaults.
878 fn test_env() -> Env {
879 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
880 Env::new(
881 ts_keys::NodeState::generate(),
882 shutdown_rx,
883 crate::env::ForwarderConfig {
884 accept_routes: false,
885 exit_node: None,
886 forward_routes: Vec::new(),
887 forward_tcp_ports: Vec::new(),
888 forward_udp_ports: Vec::new(),
889 forward_all_ports: false,
890 forward_exit_egress: false,
891 block_incoming: false,
892 exit_proxy: None,
893 peerapi_port: None,
894 taildrop_dir: None,
895 enable_ipv6: false,
896 persistent_keepalive_interval: None,
897 ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
898 },
899 )
900 }
901
902 /// A minimal peer [`Node`] carrying `node_key` and the given `key_signature`.
903 fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
904 Node {
905 id: 1,
906 stable_id: StableNodeId(stable_id.to_string()),
907 hostname: stable_id.to_string(),
908 user_id: 0,
909 tailnet: Some("ts.net".to_string()),
910 tags: Vec::new(),
911 tailnet_address: TailnetAddress {
912 ipv4: "100.64.0.1/32".parse().unwrap(),
913 ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
914 },
915 node_key: node_key.into(),
916 node_key_expiry: None,
917 online: None,
918 last_seen: None,
919 key_signature,
920 machine_key: None,
921 disco_key: None,
922 accepted_routes: Vec::new(),
923 underlay_addresses: Vec::new(),
924 derp_region: None,
925 cap: Default::default(),
926 cap_map: Default::default(),
927 peerapi_port: None,
928 peerapi_dns_proxy: false,
929 is_wireguard_only: false,
930 exit_node_dns_resolvers: Vec::new(),
931 peer_relay: false,
932 service_vips: Default::default(),
933 }
934 }
935
936 /// Encode a `Direct` [`ts_tka::NodeKeySignature`] CBOR exactly as `ts_tka`'s private `to_cbor`
937 /// does (int-map keys: 1=kind, 2=pubkey, 3=key_id, 4=signature; empty byte fields omitted),
938 /// using only the crate's *public* `cbor` encoder. `signature` of `None` produces the
939 /// signing-digest preimage (the `SigHash` form).
940 fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
941 let mut pairs = alloc_pairs(node_key, key_id);
942 if let Some(sig) = signature {
943 pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
944 }
945 cbor::int_map(pairs).to_vec()
946 }
947
948 fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
949 vec![
950 (1, Some(Value::Uint(SIG_KIND_DIRECT))),
951 (2, Some(Value::Bytes(node_key.to_vec()))),
952 (3, Some(Value::Bytes(key_id.to_vec()))),
953 ]
954 }
955
956 /// Build a TKA [`Authority`] that trusts `signing.verifying_key()`, plus a valid `Direct`
957 /// node-key signature CBOR authorizing [`NODE_KEY_BYTES`] under it.
958 fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
959 // A fixed, known Ed25519 trusted key (mirrors ts_tka's own end-to-end test seed).
960 let signing = SigningKey::from_bytes(&[42u8; 32]);
961 let trusted_pub = signing.verifying_key().to_bytes().to_vec();
962
963 let authority = Authority::from_state(
964 AumHash([0; 32]),
965 State {
966 keys: vec![Key {
967 kind: KeyKind::Ed25519,
968 votes: 1,
969 public: trusted_pub.clone(),
970 }],
971 },
972 );
973
974 // SigHash preimage = canonical CBOR with the signature field omitted; sign its blake2s hash.
975 let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
976 let sig_hash = ts_tka::aum_hash(&preimage).0;
977 let signature = signing.sign(&sig_hash).to_bytes().to_vec();
978
979 let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
980 // Sanity: the authority accepts the signature we just built (same path the gate uses).
981 assert!(
982 authority
983 .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
984 .is_ok()
985 );
986
987 (authority, signed_cbor)
988 }
989
990 #[tokio::test]
991 async fn tka_inactive_upserts_all_peers() {
992 // No authority ⇒ enforcement inactive ⇒ both a signed and an unsigned peer are admitted.
993 let mut tracker = PeerTracker::for_test(test_env(), None, None);
994
995 let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
996 let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
997
998 assert!(tracker.tka_admits(&signed));
999 assert!(tracker.tka_admits(&unsigned));
1000
1001 tracker.peer_db.upsert(&signed);
1002 tracker.peer_db.upsert(&unsigned);
1003 assert_eq!(tracker.peer_db.peers().len(), 2);
1004 }
1005
1006 #[tokio::test]
1007 async fn tka_active_rejects_unsigned_peer() {
1008 // Authority present + peer presents no signature ⇒ rejected (fail-closed), not in peer_db.
1009 let (authority, _sig) = authority_and_valid_sig();
1010 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1011
1012 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1013 assert!(!tracker.tka_admits(&unsigned));
1014
1015 // Mirror the handler's `if !tka_admits { continue }` loop.
1016 if tracker.tka_admits(&unsigned) {
1017 tracker.peer_db.upsert(&unsigned);
1018 }
1019 assert_eq!(tracker.peer_db.peers().len(), 0);
1020 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1021 }
1022
1023 #[tokio::test]
1024 async fn tka_active_rejects_bad_signature() {
1025 // Authority present + a signature that fails to verify ⇒ rejected, not in peer_db.
1026 let (authority, mut sig) = authority_and_valid_sig();
1027 // Tamper the last byte (the trailing signature byte) so verification fails.
1028 let last = sig.len() - 1;
1029 sig[last] ^= 0xff;
1030
1031 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1032 let bad = peer_node("bad", NODE_KEY_BYTES, sig);
1033 assert!(!tracker.tka_admits(&bad));
1034
1035 if tracker.tka_admits(&bad) {
1036 tracker.peer_db.upsert(&bad);
1037 }
1038 assert_eq!(tracker.peer_db.peers().len(), 0);
1039 }
1040
1041 #[tokio::test]
1042 async fn tka_active_admits_authorized_peer() {
1043 // Authority present + correctly-signed node key ⇒ admitted and upserted.
1044 let (authority, sig) = authority_and_valid_sig();
1045 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1046
1047 let good = peer_node("good", NODE_KEY_BYTES, sig);
1048 assert!(tracker.tka_admits(&good));
1049
1050 if tracker.tka_admits(&good) {
1051 tracker.peer_db.upsert(&good);
1052 }
1053 assert_eq!(tracker.peer_db.peers().len(), 1);
1054 assert!(tracker.peer_db.get(&good.node_key).is_some());
1055 }
1056
1057 // ---------------------------------------------------------------------------------------------
1058 // Tests that drive REAL `PeerUpdate`s through the shared handler body
1059 // ([`PeerTracker::apply_peer_update`], the single source of truth the actor's netmap `handle`
1060 // also calls), so the two real upsert sites (`Full` and `Delta { upsert }`) are exercised via
1061 // the actual enforcement path — not by hand-mirroring `if !tka_admits { continue }`.
1062 // ---------------------------------------------------------------------------------------------
1063
1064 #[tokio::test]
1065 async fn tka_active_delta_upsert_rejects_unauthorized() {
1066 // Drive a real `Delta { upsert }` whose peer carries no signature. The Delta upsert site
1067 // must reject it under an active authority ⇒ not present in peer_db after the handler runs.
1068 let (authority, _sig) = authority_and_valid_sig();
1069 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1070
1071 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1072 let update = ts_control::PeerUpdate::Delta {
1073 upsert: vec![unsigned.clone()],
1074 remove: Vec::new(),
1075 };
1076
1077 tracker.apply_peer_update(&update);
1078
1079 assert_eq!(tracker.peer_db.peers().len(), 0);
1080 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1081 }
1082
1083 #[tokio::test]
1084 async fn tka_active_delta_upsert_admits_authorized() {
1085 // Drive a real `Delta { upsert }` with a correctly-signed peer ⇒ present in peer_db.
1086 let (authority, sig) = authority_and_valid_sig();
1087 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1088
1089 let good = peer_node("good", NODE_KEY_BYTES, sig);
1090 let update = ts_control::PeerUpdate::Delta {
1091 upsert: vec![good.clone()],
1092 remove: Vec::new(),
1093 };
1094
1095 tracker.apply_peer_update(&update);
1096
1097 assert_eq!(tracker.peer_db.peers().len(), 1);
1098 assert!(tracker.peer_db.get(&good.node_key).is_some());
1099 }
1100
1101 #[tokio::test]
1102 async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
1103 // Drive a real `Full` carrying a MIX of authorized + unauthorized peers. Only the
1104 // correctly-signed peer survives the Full upsert site; the unsigned and bad-sig peers are
1105 // dropped fail-closed.
1106 let (authority, sig) = authority_and_valid_sig();
1107 // A bad-sig variant of the same authorized signature (tamper the trailing byte).
1108 let mut bad_sig = sig.clone();
1109 let last = bad_sig.len() - 1;
1110 bad_sig[last] ^= 0xff;
1111
1112 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1113
1114 // Only the authorized peer carries NODE_KEY_BYTES (the key the authority signed); the
1115 // rejected peers use distinct node keys so the survivor is unambiguous.
1116 let good = peer_node("good", NODE_KEY_BYTES, sig);
1117 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1118 let bad = peer_node("bad", [9u8; 32], bad_sig);
1119
1120 let update =
1121 ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1122
1123 tracker.apply_peer_update(&update);
1124
1125 assert_eq!(tracker.peer_db.peers().len(), 1);
1126 assert!(tracker.peer_db.get(&good.node_key).is_some());
1127 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1128 assert!(tracker.peer_db.get(&bad.node_key).is_none());
1129 }
1130
1131 #[tokio::test]
1132 async fn tka_observe_only_admits_all_peers_in_mixed_batch() {
1133 // #136 observe-only contract: with the OBSERVE authority set (and `tka_authority = None`, so
1134 // enforcement is OFF), the exact mixed batch that the fail-closed test above prunes to 1 must
1135 // instead admit ALL THREE peers. The verify-and-log seam logs each verdict (verified /
1136 // unsigned / failed) but never gates admission. This locks observe-only against a future
1137 // refactor that accidentally wires `tka_observe` into a drop path.
1138 let (authority, sig) = authority_and_valid_sig();
1139 let mut bad_sig = sig.clone();
1140 let last = bad_sig.len() - 1;
1141 bad_sig[last] ^= 0xff;
1142
1143 // Authority in the OBSERVE slot, enforcement slot empty.
1144 let mut tracker = PeerTracker::for_test(test_env(), None, Some(authority));
1145
1146 let good = peer_node("good", NODE_KEY_BYTES, sig);
1147 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1148 let bad = peer_node("bad", [9u8; 32], bad_sig);
1149
1150 let update =
1151 ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1152
1153 tracker.apply_peer_update(&update);
1154
1155 // ALL THREE survive — observe-only never drops a peer.
1156 assert_eq!(
1157 tracker.peer_db.peers().len(),
1158 3,
1159 "observe-only must admit every peer regardless of signature verdict"
1160 );
1161 assert!(tracker.peer_db.get(&good.node_key).is_some());
1162 assert!(tracker.peer_db.get(&unsigned.node_key).is_some());
1163 assert!(tracker.peer_db.get(&bad.node_key).is_some());
1164 }
1165
1166 #[tokio::test]
1167 async fn tka_full_resync_revocation_behavior() {
1168 // Revocation-on-resync: admit a peer, then re-include the SAME stable_id in a `Full` with a
1169 // now-invalid signature. Per the Logic review finding, the pre-fix `retain` kept the stale
1170 // (previously-admitted) entry because membership was decided purely by stable_id.
1171 //
1172 // FIXED (not merely documented): the `Full` `retain` now keys on `tka_admits`-passing
1173 // stable_ids, so a peer whose re-included signature no longer verifies under the active
1174 // authority is EVICTED. This test asserts eviction. The inactive (authority=None) path is
1175 // provably unchanged — `tka_admits` always returns `true` there, so the retained set equals
1176 // the set of re-included stable_ids exactly (see `tka_inactive_full_resync_keeps_*`).
1177 let (authority, sig) = authority_and_valid_sig();
1178 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1179
1180 // 1) Admit the peer with a valid signature via a real `Full`.
1181 let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
1182 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1183 assert_eq!(tracker.peer_db.peers().len(), 1);
1184 assert!(tracker.peer_db.get(&good.node_key).is_some());
1185
1186 // 2) Re-sync the SAME stable_id, but with a now-invalid signature (tamper trailing byte).
1187 let mut bad_sig = sig;
1188 let last = bad_sig.len() - 1;
1189 bad_sig[last] ^= 0xff;
1190 let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
1191 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
1192
1193 // Eviction: the stale entry is dropped because its re-included signature fails the gate.
1194 assert_eq!(tracker.peer_db.peers().len(), 0);
1195 assert!(tracker.peer_db.get(&revoked.node_key).is_none());
1196 }
1197
1198 #[tokio::test]
1199 async fn tka_inactive_full_resync_keeps_reincluded_peer() {
1200 // Guard the inactive (authority=None) path against the revocation fix: with no authority,
1201 // a peer re-included in a `Full` survives regardless of its signature bytes — byte-for-byte
1202 // pre-TKA behavior, proving the `Full` `retain` change does not regress the always-taken
1203 // branch this wave.
1204 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1205
1206 let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
1207 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1208 assert_eq!(tracker.peer_db.peers().len(), 1);
1209
1210 // Re-sync the same stable_id with garbage signature bytes; inactive enforcement keeps it.
1211 let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
1212 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
1213 assert_eq!(tracker.peer_db.peers().len(), 1);
1214 assert!(tracker.peer_db.get(&resynced.node_key).is_some());
1215 }
1216
1217 /// A `Patch` for a peer already in the netmap merges only the fields it carries — here new UDP
1218 /// endpoints and a new home DERP — leaving the rest of the node intact. This is the fix for
1219 /// dropped `peers_changed_patch`: without it the netmap keeps stale endpoints and the peer can
1220 /// never re-handshake after it moves.
1221 #[tokio::test]
1222 async fn patch_merges_endpoints_and_derp_into_existing_peer() {
1223 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1224
1225 // Seed a peer (id == 1, per `peer_node`) with no endpoints / no DERP.
1226 let peer = peer_node("mover", [1u8; 32], vec![]);
1227 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1228 let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1229 assert!(before.underlay_addresses.is_empty());
1230 assert!(before.derp_region.is_none());
1231
1232 // Patch in fresh reachability (the idle-peer-reconnect case).
1233 let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1234 let patch = ts_control::PeerChange {
1235 id: 1,
1236 derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
1237 cap: None,
1238 cap_map: None,
1239 underlay_addresses: Some(vec![new_ep]),
1240 node_key: None,
1241 key_signature: None,
1242 disco_key: None,
1243 node_key_expiry: None,
1244 online: None,
1245 last_seen: None,
1246 };
1247 let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1248
1249 assert_eq!(upserts.len(), 1);
1250 assert_eq!(deletions.len(), 0);
1251 // Same peer, now carrying the patched endpoint + DERP; node key untouched.
1252 assert_eq!(tracker.peer_db.peers().len(), 1);
1253 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1254 assert_eq!(after.underlay_addresses, vec![new_ep]);
1255 assert_eq!(
1256 after.derp_region,
1257 Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
1258 );
1259 assert_eq!(after.node_key, peer.node_key);
1260 }
1261
1262 /// Regression for `tsr-5u0`: when a whole-node set (`Delta`/`Full`) and a patch co-occur in one
1263 /// response, the patch is applied *on top of* the node the set just upserted — mirroring the
1264 /// handler's apply-order (peer set first, then `peer_patches`). Before the fix the patch shared
1265 /// the single `peer_update` slot and the co-occurring set silently dropped it, so a peer brought
1266 /// in by the delta kept stale (empty) reachability.
1267 #[tokio::test]
1268 async fn patch_applies_on_top_of_co_occurring_delta() {
1269 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1270
1271 // The whole-node delta upserts a brand-new peer (id == 1) with no reachability.
1272 let peer = peer_node("mover", [1u8; 32], vec![]);
1273 let (set_upserts, _) = tracker.apply_peer_update(&ts_control::PeerUpdate::Delta {
1274 upsert: vec![peer.clone()],
1275 remove: vec![],
1276 });
1277 assert_eq!(set_upserts.len(), 1, "delta upserts the new peer");
1278
1279 // The patch from the SAME response then sets that peer's endpoints + DERP. This is exactly
1280 // the consumer order the handler runs (apply_peer_update then apply_peer_patches).
1281 let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1282 let patch = ts_control::PeerChange {
1283 id: 1,
1284 derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap())),
1285 cap: None,
1286 cap_map: None,
1287 underlay_addresses: Some(vec![new_ep]),
1288 node_key: None,
1289 key_signature: None,
1290 disco_key: None,
1291 node_key_expiry: None,
1292 online: None,
1293 last_seen: None,
1294 };
1295 let (patch_upserts, patch_deletions) =
1296 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1297
1298 assert_eq!(
1299 patch_upserts.len(),
1300 1,
1301 "patch re-upserts the just-added peer"
1302 );
1303 assert_eq!(patch_deletions.len(), 0);
1304 // The peer added by the delta now carries the patched reachability — the patch was NOT lost.
1305 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1306 assert_eq!(after.underlay_addresses, vec![new_ep]);
1307 assert_eq!(
1308 after.derp_region,
1309 Some(ts_derp::RegionId(core::num::NonZeroU32::new(7).unwrap()))
1310 );
1311 }
1312
1313 /// A `Patch` whose node id is not in the current netmap is ignored (the wire contract: a patch
1314 /// never creates a node). No upsert, no deletion, peer set unchanged.
1315 #[tokio::test]
1316 async fn patch_for_unknown_node_is_ignored() {
1317 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1318 let known = peer_node("known", [1u8; 32], vec![]); // id == 1
1319 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1320
1321 let patch = ts_control::PeerChange {
1322 id: 999, // not in the netmap
1323 derp_region: None,
1324 cap: None,
1325 cap_map: None,
1326 underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1327 node_key: None,
1328 key_signature: None,
1329 disco_key: None,
1330 node_key_expiry: None,
1331 online: None,
1332 last_seen: None,
1333 };
1334 let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1335
1336 assert_eq!(upserts.len(), 0);
1337 assert_eq!(deletions.len(), 0);
1338 assert_eq!(tracker.peer_db.peers().len(), 1);
1339 assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1340 }
1341
1342 /// An expiry-only `Patch` updates `node_key_expiry` on the matching peer (Go
1343 /// `PeerChange.KeyExpiry`), rather than being silently dropped until the next full resync.
1344 #[tokio::test]
1345 async fn patch_updates_node_key_expiry() {
1346 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1347 let peer = peer_node("expiring", [1u8; 32], vec![]); // id == 1, node_key_expiry: None
1348 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1349
1350 let expiry = "2027-01-01T00:00:00Z"
1351 .parse::<chrono::DateTime<chrono::Utc>>()
1352 .unwrap();
1353 let patch = ts_control::PeerChange {
1354 id: 1,
1355 derp_region: None,
1356 cap: None,
1357 cap_map: None,
1358 underlay_addresses: None,
1359 node_key: None,
1360 key_signature: None,
1361 disco_key: None,
1362 node_key_expiry: Some(expiry),
1363 online: None,
1364 last_seen: None,
1365 };
1366 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1367
1368 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1369 assert_eq!(after.node_key_expiry, Some(expiry));
1370 }
1371
1372 /// Channel B: a `PeerChange.online` patch flips a peer's online state without a full node.
1373 #[tokio::test]
1374 async fn patch_updates_online() {
1375 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1376 let peer = peer_node("p", [1u8; 32], vec![]); // id == 1, online: None
1377 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1378 assert_eq!(
1379 tracker
1380 .peer_db
1381 .get(&(1 as ts_control::NodeId))
1382 .unwrap()
1383 .1
1384 .online,
1385 None
1386 );
1387
1388 let mut patch = ts_control::PeerChange {
1389 id: 1,
1390 derp_region: None,
1391 cap: None,
1392 cap_map: None,
1393 underlay_addresses: None,
1394 node_key: None,
1395 key_signature: None,
1396 disco_key: None,
1397 node_key_expiry: None,
1398 online: Some(true),
1399 last_seen: None,
1400 };
1401 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1402 assert_eq!(
1403 tracker
1404 .peer_db
1405 .get(&(1 as ts_control::NodeId))
1406 .unwrap()
1407 .1
1408 .online,
1409 Some(true),
1410 "PeerChange.online=Some(true) marks the peer online"
1411 );
1412
1413 // A subsequent patch flips it offline.
1414 patch.online = Some(false);
1415 tracker.apply_peer_patches(std::slice::from_ref(&patch));
1416 assert_eq!(
1417 tracker
1418 .peer_db
1419 .get(&(1 as ts_control::NodeId))
1420 .unwrap()
1421 .1
1422 .online,
1423 Some(false)
1424 );
1425 }
1426
1427 /// Channel C/D: the `online_change` map flips online directly; `peer_seen_change: false`
1428 /// ("the peer is gone") marks the peer offline. Both apply to a peer already in the netmap and
1429 /// ignore unknown ids.
1430 #[tokio::test]
1431 async fn liveness_change_maps_apply_online() {
1432 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1433 let peer = peer_node("p", [1u8; 32], vec![]); // id == 1
1434 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1435
1436 // Channel C: online_change sets online=true.
1437 let mut online_change = std::collections::BTreeMap::new();
1438 online_change.insert(1 as ts_control::NodeId, true);
1439 online_change.insert(999 as ts_control::NodeId, true); // unknown id — ignored
1440 let changed = tracker.apply_liveness_changes(&online_change, &Default::default());
1441 assert!(changed);
1442 assert_eq!(
1443 tracker
1444 .peer_db
1445 .get(&(1 as ts_control::NodeId))
1446 .unwrap()
1447 .1
1448 .online,
1449 Some(true)
1450 );
1451
1452 // Channel D: peer_seen_change=false marks the peer offline (gone), node retained.
1453 let mut peer_seen_change = std::collections::BTreeMap::new();
1454 peer_seen_change.insert(1 as ts_control::NodeId, false);
1455 let changed = tracker.apply_liveness_changes(&Default::default(), &peer_seen_change);
1456 assert!(changed);
1457 assert_eq!(
1458 tracker
1459 .peer_db
1460 .get(&(1 as ts_control::NodeId))
1461 .unwrap()
1462 .1
1463 .online,
1464 Some(false),
1465 "peer_seen_change=false marks offline (the node stays in the netmap)"
1466 );
1467 assert_eq!(
1468 tracker.peer_db.peers().len(),
1469 1,
1470 "the node is retained, not removed"
1471 );
1472
1473 // No-op when nothing matches / changes.
1474 assert!(!tracker.apply_liveness_changes(&Default::default(), &Default::default()));
1475 }
1476
1477 /// Security: a `Patch` that rotates the node key must re-satisfy the tailnet-lock authority,
1478 /// exactly like a `Delta` upsert. A key-rotation patch whose new signature does NOT verify
1479 /// evicts the peer (fail-closed) rather than leaving a now-unverified entry — closing what would
1480 /// otherwise be a trust-enforcement bypass via the patch path.
1481 #[tokio::test]
1482 async fn patch_key_rotation_failing_tka_evicts_peer() {
1483 let (authority, sig) = authority_and_valid_sig();
1484 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1485
1486 // Admit a correctly-signed peer (id == 1).
1487 let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1488 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1489 assert_eq!(tracker.peer_db.peers().len(), 1);
1490
1491 // Patch a new node key whose signature is garbage under the active authority.
1492 let patch = ts_control::PeerChange {
1493 id: 1,
1494 derp_region: None,
1495 cap: None,
1496 cap_map: None,
1497 underlay_addresses: None,
1498 node_key: Some([0x33u8; 32].into()),
1499 key_signature: Some(vec![0x00, 0x01, 0x02]),
1500 disco_key: None,
1501 node_key_expiry: None,
1502 online: None,
1503 last_seen: None,
1504 };
1505 let (upserts, deletions) = tracker.apply_peer_patches(std::slice::from_ref(&patch));
1506
1507 assert_eq!(upserts.len(), 0);
1508 assert_eq!(deletions.len(), 1);
1509 assert_eq!(tracker.peer_db.peers().len(), 0);
1510 }
1511
1512 /// A node's `user_id` joins against the accumulated UserProfiles table to resolve the owning
1513 /// user's login name in `WhoIs.user`. With no matching profile, `user` is `None` (the
1514 /// pre-existing behavior); once a profile arrives, the same node resolves to its login. This
1515 /// proves the accumulate-then-join path the netmap handler builds.
1516 fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1517 ts_control::UserProfile {
1518 id,
1519 login_name: login.to_string(),
1520 display_name: None,
1521 }
1522 }
1523
1524 #[tokio::test]
1525 async fn whois_resolves_user_from_accumulated_profiles() {
1526 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1527
1528 // A peer owned by user id 42 at 100.64.0.1 (the peer_node fixture's address).
1529 let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1530 peer.user_id = 42;
1531 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1532 let addr = "100.64.0.1:0".parse().unwrap();
1533
1534 // No profile yet: the node resolves but its owner is unknown.
1535 let who = tracker.whois_opt(addr).expect("peer is known");
1536 assert_eq!(who.user, None);
1537
1538 // Profile for a DIFFERENT user must not match.
1539 tracker
1540 .user_profiles
1541 .insert(7, profile(7, "someone-else@example.com"));
1542 assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1543
1544 // The owning user's profile arrives (as the netmap handler would accumulate it): now the
1545 // login resolves.
1546 tracker
1547 .user_profiles
1548 .insert(42, profile(42, "alice@example.com"));
1549 assert_eq!(
1550 tracker.whois_opt(addr).unwrap().user,
1551 Some("alice@example.com".to_string())
1552 );
1553 }
1554
1555 /// `UserProfile::best_label` prefers the login name, falling back to display name, else `None`.
1556 #[test]
1557 fn user_profile_best_label_prefers_login() {
1558 assert_eq!(
1559 profile(1, "alice@example.com").best_label(),
1560 Some("alice@example.com".to_string())
1561 );
1562 let display_only = ts_control::UserProfile {
1563 id: 2,
1564 login_name: String::new(),
1565 display_name: Some("Bob".to_string()),
1566 };
1567 assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1568 let empty = ts_control::UserProfile {
1569 id: 3,
1570 login_name: String::new(),
1571 display_name: None,
1572 };
1573 assert_eq!(empty.best_label(), None);
1574 }
1575}