ts_runtime/peer_tracker/mod.rs
1//! Peer delta update tracking.
2
3use std::{
4 collections::{HashMap, HashSet},
5 net::IpAddr,
6 sync::Arc,
7};
8
9use kameo::{
10 actor::ActorRef,
11 message::{Context, Message},
12 reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24/// Actor that tracks peer delta updates and emits new states.
25pub struct PeerTracker {
26 peer_db: PeerDb,
27 seen_state_update: bool,
28 pending_requests: Vec<Pending>,
29 /// Latest peer snapshot, published on every netmap update so embedders can watch for peer
30 /// changes ([`WatchNetmap`]).
31 peer_watch: watch::Sender<Vec<StatusNode>>,
32 /// Accumulated netmap user profiles (`MapResponse.UserProfiles`), keyed by user id, joined
33 /// against a node's [`Node::user_id`](ts_control::Node::user_id) to resolve the owning user's
34 /// login/display name for a [`WhoIs`](crate::status::WhoIs). Control sends these incrementally
35 /// (only new/changed profiles per response), so this map **accumulates** across updates rather
36 /// than being replaced — a peer upserted in one response may reference a profile delivered in an
37 /// earlier one.
38 user_profiles: HashMap<UserId, UserProfile>,
39 /// Tailnet-Lock (TKA) authority used to verify each peer's `key_signature` at the peer-trust
40 /// chokepoint. When `Some`, enforcement is **active**: every upserted peer must present a
41 /// signature this authority authorizes, or it is rejected (fail-closed). When `None` (always,
42 /// this wave) enforcement is **inactive** and every peer is upserted — identical to pre-TKA
43 /// behavior. There is no live `Authority` source yet: building one requires the
44 /// `/machine/tka/sync` Noise RPC + AUM-chain replayer (deferred, see SECURITY.md). The
45 /// enforcement path below is wired and unit-tested, and flips on the instant an authority is
46 /// supplied; it is explicitly gated, not a silent no-op.
47 tka_authority: Option<ts_tka::Authority>,
48 /// Tailnet-Lock authority used **observe-only** (verify-and-LOG, issue #136): the live
49 /// `Authority` synced from control (delivered over the bus via [`TkaAuthorityUpdate`]). Distinct
50 /// from [`tka_authority`](Self::tka_authority) on purpose — populating *that* would flip the
51 /// runtime to fail-closed enforcement, whereas this field only feeds
52 /// [`tka_observe_log`](Self::tka_observe_log), which logs each peer's signature verdict and
53 /// **never** drops a peer. The current posture (per SECURITY.md / PARITY_ROADMAP): verify-and-log
54 /// while the `ts_tka` crypto is unaudited and control is treated as trusted; flipping to enforce
55 /// is a separate, gated decision.
56 tka_observe: Option<ts_tka::Authority>,
57 env: Env,
58}
59
60impl PeerTracker {
61 fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
62 // Canonicalization (case + trailing dot) is handled inside the name index lookup.
63 self.peer_db.get(&name).map(|(_id, node)| node)
64 }
65
66 fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
67 self.peer_db.get(&ip).map(|(_id, node)| node)
68 }
69
70 /// Build the peer entries for a [`Status`](crate::Status) snapshot from the current peer db.
71 fn status_peers(&self) -> Vec<StatusNode> {
72 self.peer_db
73 .peers()
74 .values()
75 .map(StatusNode::from_node)
76 .collect()
77 }
78
79 fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
80 let ip = crate::status::whois_addr(addr);
81 let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
82 // Join the node's owning user id against the accumulated UserProfiles table to resolve a
83 // login/display name. `None` when control sent no profile for that user (e.g. tagged nodes
84 // with no human owner, or a profile not yet delivered).
85 let user = self.resolve_user(node.user_id);
86 Some(crate::status::WhoIs::from_node_with_user(node, user))
87 }
88
89 /// Resolve a user id to its best display label from the accumulated profile table.
90 fn resolve_user(&self, user_id: UserId) -> Option<String> {
91 self.user_profiles
92 .get(&user_id)
93 .and_then(UserProfile::best_label)
94 }
95
96 /// Whether `node` may be admitted to the peer db under the current Tailnet-Lock posture.
97 ///
98 /// Fail-closed and gated:
99 /// - No [`tka_authority`](Self::tka_authority) ⇒ enforcement inactive ⇒ always admit (today's
100 /// behavior; this is the always-taken branch this wave).
101 /// - Authority present + peer carries a `key_signature` that the authority authorizes for the
102 /// peer's node key ⇒ admit.
103 /// - Authority present + signature missing or unauthorized/invalid ⇒ **reject** (Go denies
104 /// network access to unsigned peers under tailnet lock; we do not upsert them).
105 fn tka_admits(&self, node: &Node) -> bool {
106 let Some(auth) = &self.tka_authority else {
107 return true;
108 };
109
110 if node.key_signature.is_empty() {
111 // TKA active but peer presented no signature: reject (Go denies network access to
112 // unsigned peers under tailnet lock, unless UnsignedPeerAPIOnly — out of scope here).
113 tracing::warn!(
114 stable_id = ?node.stable_id,
115 "TKA: rejecting unsigned peer under tailnet lock"
116 );
117 return false;
118 }
119
120 if let Err(e) = auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
121 tracing::warn!(
122 stable_id = ?node.stable_id,
123 error = %e,
124 "TKA: rejecting peer with unauthorized node key"
125 );
126 return false;
127 }
128
129 true
130 }
131
132 /// Verify `node`'s Tailnet-Lock signature against the **observe-only** authority and LOG the
133 /// verdict — issue #136. This is the verify-and-log seam: it returns `()` (NOT a bool), so it is
134 /// structurally impossible to wire as an admission gate, and it is called *adjacent* to each
135 /// upsert site without affecting whether the peer is admitted. Every peer is upserted exactly as
136 /// it would be with this call absent.
137 ///
138 /// A no-op when no observe authority has been synced yet. Logs `verified` / `failed` / `unsigned`
139 /// with the peer's `stable_id` and, on failure, the `TkaError` Display (static descriptors —
140 /// "bad sig len" etc.). NEVER logs the node-key or signature bytes.
141 fn tka_observe_log(&self, node: &Node) {
142 let Some(auth) = &self.tka_observe else {
143 return;
144 };
145 if node.key_signature.is_empty() {
146 tracing::info!(
147 stable_id = ?node.stable_id,
148 tka_verdict = "unsigned",
149 "TKA observe: peer presented no key-signature (advisory, NOT enforced)"
150 );
151 return;
152 }
153 match auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
154 Ok(()) => tracing::info!(
155 stable_id = ?node.stable_id,
156 tka_verdict = "verified",
157 "TKA observe: peer node-key authorized (advisory, NOT enforced)"
158 ),
159 Err(e) => tracing::warn!(
160 stable_id = ?node.stable_id,
161 tka_verdict = "failed",
162 reason = %e,
163 "TKA observe: peer key-signature did not verify (advisory, NOT enforced)"
164 ),
165 }
166 }
167}
168
169impl kameo::Actor for PeerTracker {
170 type Args = Env;
171 type Error = Error;
172
173 async fn on_start(env: Self::Args, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
174 env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
175 // Observe-only TKA (#136): the control runner publishes the verified `Authority` here after a
176 // successful `/machine/tka/sync`; we use it to verify-and-LOG each peer's signature, never to
177 // enforce. The bus has no replay, so the control runner re-publishes on every sync.
178 env.subscribe::<TkaAuthorityUpdate>(&slf).await?;
179
180 let (peer_watch, _) = watch::channel(Vec::new());
181
182 Ok(Self {
183 peer_db: PeerDb::default(),
184 pending_requests: Default::default(),
185 seen_state_update: false,
186 peer_watch,
187 user_profiles: HashMap::new(),
188 // No live TKA *enforcement* authority this wave (fail-closed path stays gated off; see
189 // `tka_authority`). The observe-only authority (`tka_observe`) is supplied over the bus.
190 tka_authority: None,
191 tka_observe: None,
192 env,
193 })
194 }
195}
196
197enum Pending {
198 PeerByName(PeerByName, ReplySender<Option<Node>>),
199 AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
200 TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
201 Status(ReplySender<Vec<StatusNode>>),
202 WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
203}
204
205// For messages with arguments, a struct is generated with the args as fields. They aren't
206// documented, and we can't apply attributes directly to the fields. Hence, wrap in a module where
207// docs are turned off everywhere.
208#[allow(missing_docs)]
209mod msg_impl {
210 use std::net::IpAddr;
211
212 use kameo::prelude::DelegatedReply;
213
214 use super::*;
215
216 #[kameo::messages]
217 impl PeerTracker {
218 /// Lookup a peer by name.
219 ///
220 /// Waits until we've received at least one peer update from control.
221 #[message(ctx)]
222 pub async fn peer_by_name(
223 &mut self,
224 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
225 name: String,
226 ) -> DelegatedReply<Option<Node>> {
227 let (deleg, sender) = ctx.reply_sender();
228 let Some(sender) = sender else { return deleg };
229
230 if !self.seen_state_update {
231 tracing::debug!(query = name, "no peer state seen yet, queueing request");
232
233 self.pending_requests
234 .push(Pending::PeerByName(PeerByName { name }, sender));
235
236 return deleg;
237 }
238
239 sender.send(self.peer_by_name_opt(&name).cloned());
240
241 deleg
242 }
243
244 /// Lookup all peers that accept packets addressed to the given IP.
245 ///
246 /// This includes the peer's tailnet address and any subnet routes it provides. Only
247 /// the peers with the most specific subnet route match that covers `ip` will be
248 /// returned.
249 ///
250 /// E.g., suppose:
251 ///
252 /// - We're querying for `10.1.2.3`
253 /// - `PeerA` and `PeerB` have accepted routes for `10.1.2.0/24`
254 /// - `PeerC` has an accepted route for `10.1.0.0/16`
255 ///
256 /// Only `PeerA` and `PeerB` will be returned, since they have the most specific
257 /// prefix match.
258 #[message(ctx)]
259 pub fn peer_by_accepted_route(
260 &mut self,
261 ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
262 ip: IpAddr,
263 ) -> DelegatedReply<Vec<Node>> {
264 let (deleg, sender) = ctx.reply_sender();
265 let Some(sender) = sender else { return deleg };
266
267 if !self.seen_state_update {
268 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
269
270 self.pending_requests
271 .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
272
273 return deleg;
274 }
275
276 sender.send(
277 self.peer_db
278 .get_route(ip.into())
279 .map(|(_id, node)| node.clone())
280 .collect(),
281 );
282
283 deleg
284 }
285
286 /// Lookup the peer that has the given tailnet IP address.
287 #[message(ctx)]
288 pub fn peer_by_tailnet_ip(
289 &mut self,
290 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
291 ip: IpAddr,
292 ) -> DelegatedReply<Option<Node>> {
293 let (deleg, sender) = ctx.reply_sender();
294 let Some(sender) = sender else { return deleg };
295
296 if !self.seen_state_update {
297 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
298
299 self.pending_requests
300 .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
301
302 return deleg;
303 }
304
305 sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
306
307 deleg
308 }
309
310 /// Build the peer entries of a [`Status`](crate::Status) snapshot.
311 ///
312 /// Returns one [`StatusNode`] per known peer. The self node is *not* included here (it
313 /// lives in the control runner); [`Runtime::status`](crate::Runtime::status) combines both.
314 ///
315 /// Waits until we've received at least one peer update from control.
316 #[message(ctx)]
317 pub fn get_status(
318 &mut self,
319 ctx: &mut Context<Self, DelegatedReply<Vec<StatusNode>>>,
320 ) -> DelegatedReply<Vec<StatusNode>> {
321 let (deleg, sender) = ctx.reply_sender();
322 let Some(sender) = sender else { return deleg };
323
324 if !self.seen_state_update {
325 tracing::debug!("no peer state seen yet, queueing status request");
326 self.pending_requests.push(Pending::Status(sender));
327 return deleg;
328 }
329
330 sender.send(self.status_peers());
331
332 deleg
333 }
334
335 /// Return every known peer's full domain [`Node`] (not the lossy [`StatusNode`]).
336 ///
337 /// Used by [`Runtime::file_targets`](crate::Runtime::file_targets), which needs the full node
338 /// (peerAPI address, owning user id, cap map) to compute Taildrop send targets. The self node
339 /// is not included (it lives in the control runner). Returns empty before the first netmap —
340 /// the natural "not connected yet" analog (an immediate answer, no queueing needed: callers
341 /// that need a populated list await `Running` first).
342 #[message]
343 pub fn all_peers(&self) -> Vec<Node> {
344 self.peer_db.peers().values().cloned().collect()
345 }
346
347 /// Resolve which node owns a tailnet source address.
348 ///
349 /// Maps the source IP of `addr` to the owning node via the tailnet-IP index, returning a
350 /// [`WhoIs`](crate::WhoIs). The port is ignored (a tailnet IP uniquely identifies a node).
351 ///
352 /// The resulting [`WhoIs`](crate::WhoIs) carries no user/login or capability data: this
353 /// fork's domain [`Node`](ts_control::Node) does not retain those wire fields. See the
354 /// [`status`](crate::status) module docs for the gap.
355 ///
356 /// Waits until we've received at least one peer update from control.
357 #[message(ctx)]
358 pub fn whois(
359 &mut self,
360 ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
361 addr: std::net::SocketAddr,
362 ) -> DelegatedReply<Option<crate::status::WhoIs>> {
363 let (deleg, sender) = ctx.reply_sender();
364 let Some(sender) = sender else { return deleg };
365
366 if !self.seen_state_update {
367 tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
368 self.pending_requests
369 .push(Pending::WhoIs(Whois { addr }, sender));
370 return deleg;
371 }
372
373 sender.send(self.whois_opt(addr));
374
375 deleg
376 }
377
378 /// Subscribe to netmap peer-change events.
379 ///
380 /// Returns a [`watch::Receiver`] whose value is the current set of peer
381 /// [`StatusNode`]s, updated on every netmap state update from control. Embedders can await
382 /// changes via [`watch::Receiver::changed`] to react to peers joining, leaving, or changing.
383 ///
384 /// The receiver's initial value is the peer set at subscription time (empty before the
385 /// first netmap update). This is a peer-only view; combine with the self node from
386 /// [`Runtime::status`](crate::Runtime::status) when a full snapshot is needed.
387 #[message(derive(Clone))]
388 pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
389 self.peer_watch.subscribe()
390 }
391 }
392}
393
394pub use msg_impl::*;
395
396#[derive(Debug, Clone)]
397pub(crate) struct PeerState {
398 #[allow(unused)]
399 pub deletions: HashSet<PeerId>,
400 #[allow(unused)]
401 pub upserts: HashSet<PeerId>,
402 pub peers: Arc<PeerDb>,
403}
404
405impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
406 type Reply = ();
407
408 async fn handle(
409 &mut self,
410 msg: Arc<ts_control::StateUpdate>,
411 _ctx: &mut Context<Self, Self::Reply>,
412 ) {
413 // Accumulate user profiles first — control sends them incrementally and a response may
414 // carry profiles with no peer delta (or peers that reference a profile from an earlier
415 // response), so this must happen before the no-peer-update early return below.
416 for profile in &msg.user_profiles {
417 self.user_profiles.insert(profile.id, profile.clone());
418 }
419
420 // Apply the standalone online/last-seen delta maps (channels C/D, `MapResponse.OnlineChange`
421 // / `PeerSeenChange`). These arrive keyed by control node id and may ride a response that
422 // carries NO `peer_update` (a bare online flip is the common case), so they must be applied
423 // *before* the no-peer-update early return — otherwise online status freezes at the last
424 // full-node/patch value. Each entry only ever *sets* a value (never back to unknown).
425 let liveness_changed =
426 self.apply_liveness_changes(&msg.online_change, &msg.peer_seen_change);
427
428 let Some(peer_update) = &msg.peer_update else {
429 // No peer set/patch this response. If a liveness delta still mutated the netmap, publish
430 // the refreshed snapshot so watchers (and `GetStatus`) see the new online state.
431 if liveness_changed {
432 self.service_pending_requests();
433 self.peer_watch.send_replace(self.status_peers());
434 if let Err(e) = self
435 .env
436 .publish(Arc::new(PeerState {
437 upserts: HashSet::default(),
438 deletions: HashSet::default(),
439 peers: Arc::new(self.peer_db.clone()),
440 }))
441 .await
442 {
443 tracing::error!(error = %e, "publishing liveness-only peer state update");
444 }
445 }
446 return;
447 };
448
449 let (upserts, deletions) = self.apply_peer_update(peer_update);
450
451 tracing::debug!(
452 n_upsert = upserts.len(),
453 n_delete = deletions.len(),
454 peer_count = self.peer_db.peers().len(),
455 "new peer state"
456 );
457
458 self.service_pending_requests();
459
460 // Publish the latest peer snapshot to netmap watchers. `send_replace` keeps the receiver's
461 // value current even when there are no subscribers, so a late subscriber sees fresh state.
462 self.peer_watch.send_replace(self.status_peers());
463
464 if let Err(e) = self
465 .env
466 .publish(Arc::new(PeerState {
467 upserts,
468 deletions,
469 peers: Arc::new(self.peer_db.clone()),
470 }))
471 .await
472 {
473 tracing::error!(error = %e, "publishing peer state update");
474 }
475 }
476}
477
478/// Bus message delivering the latest verified Tailnet-Lock [`Authority`](ts_tka::Authority) from the
479/// control runner (after a successful `/machine/tka/sync`) to the peer tracker for **observe-only**
480/// verify-and-logging (issue #136). Cloned onto the bus (`Authority` is `Clone`); the control runner
481/// re-publishes on every successful sync since the bus has no replay for a late subscriber.
482#[derive(Clone)]
483pub struct TkaAuthorityUpdate(pub Arc<ts_tka::Authority>);
484
485impl Message<TkaAuthorityUpdate> for PeerTracker {
486 type Reply = ();
487
488 async fn handle(&mut self, msg: TkaAuthorityUpdate, _ctx: &mut Context<Self, Self::Reply>) {
489 // Store as the OBSERVE-ONLY authority — never `tka_authority` (which would enforce). From
490 // here on, each upserted peer's signature verdict is logged; admission is unchanged.
491 tracing::info!(
492 head = %msg.0.head().to_base32(),
493 "TKA observe authority updated (verify-and-log active; not enforcing)"
494 );
495 self.tka_observe = Some((*msg.0).clone());
496 }
497}
498
499/// Ask the peer tracker to re-broadcast its current peer snapshot on the bus, without any peer
500/// change. `Device::set_exit_node` sends this after changing the exit-node selector so the route
501/// updater and source filter (both `Arc<PeerState>` subscribers) re-resolve the new selector
502/// immediately, rather than waiting for the next netmap update.
503#[derive(Debug, Clone, Copy)]
504pub struct RepublishState;
505
506impl Message<RepublishState> for PeerTracker {
507 type Reply = ();
508
509 async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
510 // An empty upsert/deletion set: this is a re-broadcast of the unchanged peer set, not a
511 // delta. Subscribers recompute their routes/filters against the current peers and the
512 // (just-updated) exit-node selector.
513 if let Err(e) = self
514 .env
515 .publish(Arc::new(PeerState {
516 upserts: HashSet::default(),
517 deletions: HashSet::default(),
518 peers: Arc::new(self.peer_db.clone()),
519 }))
520 .await
521 {
522 tracing::error!(error = %e, "re-publishing peer state after exit-node change");
523 }
524 }
525}
526
527impl PeerTracker {
528 /// Apply a single [`PeerUpdate`](ts_control::PeerUpdate) to the peer db, enforcing the
529 /// Tailnet-Lock peer-trust chokepoint ([`tka_admits`](Self::tka_admits)) at every upsert site.
530 ///
531 /// This is the **single source of truth** for the peer-trust enforcement loop: the actor's
532 /// netmap [`handle`](Message::handle) calls it, and so do the TKA enforcement tests, so the two
533 /// real upsert sites (`Full` and `Delta { upsert }`) cannot diverge from what is tested.
534 ///
535 /// Returns `(upserts, deletions)` — the [`PeerId`]s touched — for downstream bookkeeping.
536 fn apply_peer_update(
537 &mut self,
538 peer_update: &ts_control::PeerUpdate,
539 ) -> (HashSet<PeerId>, HashSet<PeerId>) {
540 let mut upserts = HashSet::default();
541 let mut deletions = HashSet::default();
542
543 match peer_update {
544 ts_control::PeerUpdate::Full(new_nodes) => {
545 tracing::trace!("full peer update");
546
547 // Only stable_ids that PASS the Tailnet-Lock gate survive a full re-sync. This makes
548 // revocation evict: if a peer is re-included with a now-invalid (or missing)
549 // signature under an active authority, it is excluded from `retained_ids`, so
550 // `retain` drops the stale (previously-admitted) entry rather than leaving it in the
551 // db unverified. With no authority, `tka_admits` is always `true`, so `retained_ids`
552 // is exactly the set of re-included stable_ids — the inactive path is byte-for-byte
553 // the pre-TKA behavior (no regression).
554 let retained_ids = new_nodes
555 .iter()
556 .filter(|node| self.tka_admits(node))
557 .map(|x| &x.stable_id)
558 .collect::<HashSet<_>>();
559
560 self.peer_db.retain(|id, peer| {
561 let retain = retained_ids.contains(&peer.stable_id);
562
563 if !retain {
564 deletions.insert(id);
565 }
566
567 retain
568 });
569
570 for node in new_nodes {
571 if !self.tka_admits(node) {
572 continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
573 }
574 self.tka_observe_log(node); // verify-and-LOG (#136); never gates admission
575 let peer_id = self.peer_db.upsert(node);
576 upserts.insert(peer_id);
577 }
578 }
579
580 ts_control::PeerUpdate::Delta { remove, upsert } => {
581 tracing::trace!("delta peer update");
582
583 for peer in upsert {
584 if !self.tka_admits(peer) {
585 continue; // fail-CLOSED: do not upsert a peer rejected by tailnet lock
586 }
587 self.tka_observe_log(peer); // verify-and-LOG (#136); never gates admission
588 let id = self.peer_db.upsert(peer);
589
590 upserts.insert(id);
591 }
592
593 for peer in remove {
594 let Some((id, _node)) = self.peer_db.remove(peer) else {
595 tracing::error!(control_node_id = peer, "removed peer was unknown");
596 continue;
597 };
598
599 deletions.insert(id);
600 }
601 }
602
603 ts_control::PeerUpdate::Patch(patches) => {
604 tracing::trace!(n = patches.len(), "peer patch update");
605
606 for patch in patches {
607 // A patch only mutates a peer already in the netmap; an unknown node id is
608 // ignored (the wire contract — a patch never creates a node). Clone the current
609 // node, apply the present fields, and re-upsert through the same path as a
610 // delta so indexes/routes stay consistent.
611 let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
612 tracing::debug!(
613 control_node_id = patch.id,
614 "peer patch for unknown node; ignoring"
615 );
616 continue;
617 };
618
619 let mut node = existing.clone();
620 if let Some(endpoints) = &patch.underlay_addresses {
621 node.underlay_addresses = endpoints.clone();
622 }
623 if let Some(derp) = patch.derp_region {
624 node.derp_region = Some(derp);
625 }
626 if let Some(cap) = patch.cap {
627 node.cap = cap;
628 }
629 if let Some(cap_map) = &patch.cap_map {
630 node.cap_map = cap_map.clone();
631 }
632 if let Some(disco_key) = patch.disco_key {
633 node.disco_key = Some(disco_key);
634 }
635 if let Some(expiry) = patch.node_key_expiry {
636 node.node_key_expiry = Some(expiry);
637 }
638 // Online/last-seen liveness deltas (`PeerChange.Online`/`LastSeen`) — the
639 // dominant channel by which peer online transitions arrive mid-session. A patch
640 // only ever *sets* a value (never patches back to unknown), so apply when present.
641 if let Some(online) = patch.online {
642 node.online = Some(online);
643 }
644 if let Some(last_seen) = patch.last_seen {
645 node.last_seen = Some(last_seen);
646 }
647 // Key rotation: a patch may swap the node key (and its TKA signature). Apply
648 // both together so the trust gate below verifies the new signature against the
649 // new key, never a mismatched pair.
650 if let Some(node_key) = patch.node_key {
651 node.node_key = node_key;
652 }
653 if let Some(sig) = &patch.key_signature {
654 node.key_signature = sig.clone();
655 }
656
657 // Re-run the tailnet-lock gate on the patched node: a patch that rotates the key
658 // must satisfy the active authority, exactly like a `Delta` upsert, or it would
659 // be a trust-enforcement bypass. fail-CLOSED — if the patched node is no longer
660 // admitted, evict it rather than keep the stale (now-unverified) entry.
661 if !self.tka_admits(&node) {
662 if let Some((id, _)) = self.peer_db.remove(&patch.id) {
663 tracing::warn!(
664 control_node_id = patch.id,
665 "peer patch rejected by tailnet lock; evicting peer"
666 );
667 deletions.insert(id);
668 }
669 continue;
670 }
671
672 self.tka_observe_log(&node); // verify-and-LOG (#136); never gates admission
673 let id = self.peer_db.upsert(&node);
674 upserts.insert(id);
675 }
676 }
677 }
678
679 (upserts, deletions)
680 }
681
682 /// Apply the standalone online/last-seen delta maps (`MapResponse.OnlineChange` /
683 /// `PeerSeenChange`, channels C/D) onto the retained netmap. Returns `true` if any node was
684 /// actually mutated (so the caller knows whether to re-publish).
685 ///
686 /// Mirrors Go's post-`peers*` application of these maps. Each entry is keyed by control node id
687 /// and only ever *sets* a value (never back to unknown). An entry for an unknown node id is
688 /// ignored (like a patch — these maps never create a node). `peer_seen_change`'s `false` ("the
689 /// peer is gone") is applied as `online = Some(false)` — the node stays in the netmap, it is
690 /// merely marked offline; the `last_seen = now` update for the `true` case is intentionally not
691 /// performed here (it needs a wall clock this actor does not hold, and `last_seen` is the
692 /// low-value half — `online` is the `tailscale status` column that matters; see the iter-5
693 /// research note §5.5).
694 fn apply_liveness_changes(
695 &mut self,
696 online_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
697 peer_seen_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
698 ) -> bool {
699 let mut changed = false;
700
701 // Channel C — direct online flips.
702 for (&node_id, &online) in online_change {
703 if let Some((_pid, existing)) = self.peer_db.get(&node_id)
704 && existing.online != Some(online)
705 {
706 let mut node = existing.clone();
707 node.online = Some(online);
708 self.peer_db.upsert(&node);
709 changed = true;
710 }
711 }
712
713 // Channel D — peer-seen flips. `false` ⇒ "the peer is gone" ⇒ mark offline (the node is
714 // retained, not removed). `true` ⇒ "seen just now"; the online half is unknown from this
715 // signal alone, so we leave `online` untouched (a `true` here does not assert connectivity to
716 // control, only recent contact) and defer the `last_seen = now` timestamp (no clock here).
717 for (&node_id, &seen) in peer_seen_change {
718 if !seen
719 && let Some((_pid, existing)) = self.peer_db.get(&node_id)
720 && existing.online != Some(false)
721 {
722 let mut node = existing.clone();
723 node.online = Some(false);
724 self.peer_db.upsert(&node);
725 changed = true;
726 }
727 }
728
729 changed
730 }
731
732 /// Test-only constructor: build a [`PeerTracker`] with chosen TKA authorities without going
733 /// through the actor `on_start` path. `tka_authority` exercises the fail-closed enforcement
734 /// chokepoint ([`tka_admits`](Self::tka_admits)); `tka_observe` exercises the observe-only
735 /// verify-and-log seam ([`tka_observe_log`](Self::tka_observe_log)).
736 #[cfg(test)]
737 fn for_test(
738 env: Env,
739 tka_authority: Option<ts_tka::Authority>,
740 tka_observe: Option<ts_tka::Authority>,
741 ) -> Self {
742 let (peer_watch, _) = watch::channel(Vec::new());
743 Self {
744 peer_db: PeerDb::default(),
745 seen_state_update: false,
746 pending_requests: Vec::new(),
747 peer_watch,
748 user_profiles: HashMap::new(),
749 tka_authority,
750 tka_observe,
751 env,
752 }
753 }
754
755 fn service_pending_requests(&mut self) {
756 if self.seen_state_update {
757 return;
758 }
759
760 self.seen_state_update = true;
761
762 if !self.pending_requests.is_empty() {
763 tracing::debug!(
764 n_pending = self.pending_requests.len(),
765 "state update received, servicing pending requests"
766 );
767 }
768
769 for req in core::mem::take(&mut self.pending_requests) {
770 match req {
771 Pending::PeerByName(PeerByName { name }, reply) => {
772 reply.send(self.peer_by_name_opt(&name).cloned());
773 }
774 Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
775 reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
776 }
777 Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
778 reply.send(
779 self.peer_db
780 .get_route(ip.into())
781 .map(|(_id, node)| node.clone())
782 .collect(),
783 );
784 }
785 Pending::Status(reply) => {
786 reply.send(self.status_peers());
787 }
788 Pending::WhoIs(Whois { addr }, reply) => {
789 reply.send(self.whois_opt(addr));
790 }
791 }
792 }
793 }
794}
795
796#[cfg(test)]
797mod tka_tests {
798 //! Tailnet-Lock (TKA) enforcement tests for the peer-trust chokepoint.
799 //!
800 //! These exercise [`PeerTracker::tka_admits`] and the `tka_admits ⇒ upsert` loop the netmap
801 //! handler runs. The test [`ts_tka::Authority`] is built with [`ts_tka::Authority::from_state`]
802 //! over a known Ed25519 trusted key, and the signed node-key signature CBOR is produced through
803 //! `ts_tka`'s public `cbor` encoder + `aum_hash` (the exact same canonical bytes `ts_tka`'s own
804 //! `direct_signature_verifies_end_to_end` test signs, with no new crypto vectors invented and no
805 //! private `ts_tka` API used).
806
807 use ed25519_dalek::{Signer, SigningKey};
808 use ts_control::{Node, StableNodeId, TailnetAddress};
809 use ts_tka::{
810 AumHash, Authority, Key, KeyKind, State,
811 cbor::{self, Value},
812 };
813
814 use super::*;
815
816 /// `SigKind::Direct` wire value (Go `SigKind`; `ts_tka::SigKind::Direct = 1`).
817 const SIG_KIND_DIRECT: u64 = 1;
818
819 /// The 32-byte node key used across the signed-peer fixtures.
820 const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
821
822 /// Build a real [`Env`] for the tracker. Only the bus/keys/shutdown plumbing matters here; the
823 /// TKA gate reads neither, so the forwarding preferences are all benign defaults.
824 fn test_env() -> Env {
825 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
826 Env::new(
827 ts_keys::NodeState::generate(),
828 shutdown_rx,
829 crate::env::ForwarderConfig {
830 accept_routes: false,
831 exit_node: None,
832 forward_routes: Vec::new(),
833 forward_tcp_ports: Vec::new(),
834 forward_udp_ports: Vec::new(),
835 forward_all_ports: false,
836 forward_exit_egress: false,
837 block_incoming: false,
838 exit_proxy: None,
839 peerapi_port: None,
840 taildrop_dir: None,
841 enable_ipv6: false,
842 persistent_keepalive_interval: None,
843 ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
844 },
845 )
846 }
847
848 /// A minimal peer [`Node`] carrying `node_key` and the given `key_signature`.
849 fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
850 Node {
851 id: 1,
852 stable_id: StableNodeId(stable_id.to_string()),
853 hostname: stable_id.to_string(),
854 user_id: 0,
855 tailnet: Some("ts.net".to_string()),
856 tags: Vec::new(),
857 tailnet_address: TailnetAddress {
858 ipv4: "100.64.0.1/32".parse().unwrap(),
859 ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
860 },
861 node_key: node_key.into(),
862 node_key_expiry: None,
863 online: None,
864 last_seen: None,
865 key_signature,
866 machine_key: None,
867 disco_key: None,
868 accepted_routes: Vec::new(),
869 underlay_addresses: Vec::new(),
870 derp_region: None,
871 cap: Default::default(),
872 cap_map: Default::default(),
873 peerapi_port: None,
874 peerapi_dns_proxy: false,
875 is_wireguard_only: false,
876 exit_node_dns_resolvers: Vec::new(),
877 peer_relay: false,
878 service_vips: Default::default(),
879 }
880 }
881
882 /// Encode a `Direct` [`ts_tka::NodeKeySignature`] CBOR exactly as `ts_tka`'s private `to_cbor`
883 /// does (int-map keys: 1=kind, 2=pubkey, 3=key_id, 4=signature; empty byte fields omitted),
884 /// using only the crate's *public* `cbor` encoder. `signature` of `None` produces the
885 /// signing-digest preimage (the `SigHash` form).
886 fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
887 let mut pairs = alloc_pairs(node_key, key_id);
888 if let Some(sig) = signature {
889 pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
890 }
891 cbor::int_map(pairs).to_vec()
892 }
893
894 fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
895 vec![
896 (1, Some(Value::Uint(SIG_KIND_DIRECT))),
897 (2, Some(Value::Bytes(node_key.to_vec()))),
898 (3, Some(Value::Bytes(key_id.to_vec()))),
899 ]
900 }
901
902 /// Build a TKA [`Authority`] that trusts `signing.verifying_key()`, plus a valid `Direct`
903 /// node-key signature CBOR authorizing [`NODE_KEY_BYTES`] under it.
904 fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
905 // A fixed, known Ed25519 trusted key (mirrors ts_tka's own end-to-end test seed).
906 let signing = SigningKey::from_bytes(&[42u8; 32]);
907 let trusted_pub = signing.verifying_key().to_bytes().to_vec();
908
909 let authority = Authority::from_state(
910 AumHash([0; 32]),
911 State {
912 keys: vec![Key {
913 kind: KeyKind::Ed25519,
914 votes: 1,
915 public: trusted_pub.clone(),
916 }],
917 },
918 );
919
920 // SigHash preimage = canonical CBOR with the signature field omitted; sign its blake2s hash.
921 let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
922 let sig_hash = ts_tka::aum_hash(&preimage).0;
923 let signature = signing.sign(&sig_hash).to_bytes().to_vec();
924
925 let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
926 // Sanity: the authority accepts the signature we just built (same path the gate uses).
927 assert!(
928 authority
929 .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
930 .is_ok()
931 );
932
933 (authority, signed_cbor)
934 }
935
936 #[tokio::test]
937 async fn tka_inactive_upserts_all_peers() {
938 // No authority ⇒ enforcement inactive ⇒ both a signed and an unsigned peer are admitted.
939 let mut tracker = PeerTracker::for_test(test_env(), None, None);
940
941 let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
942 let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
943
944 assert!(tracker.tka_admits(&signed));
945 assert!(tracker.tka_admits(&unsigned));
946
947 tracker.peer_db.upsert(&signed);
948 tracker.peer_db.upsert(&unsigned);
949 assert_eq!(tracker.peer_db.peers().len(), 2);
950 }
951
952 #[tokio::test]
953 async fn tka_active_rejects_unsigned_peer() {
954 // Authority present + peer presents no signature ⇒ rejected (fail-closed), not in peer_db.
955 let (authority, _sig) = authority_and_valid_sig();
956 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
957
958 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
959 assert!(!tracker.tka_admits(&unsigned));
960
961 // Mirror the handler's `if !tka_admits { continue }` loop.
962 if tracker.tka_admits(&unsigned) {
963 tracker.peer_db.upsert(&unsigned);
964 }
965 assert_eq!(tracker.peer_db.peers().len(), 0);
966 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
967 }
968
969 #[tokio::test]
970 async fn tka_active_rejects_bad_signature() {
971 // Authority present + a signature that fails to verify ⇒ rejected, not in peer_db.
972 let (authority, mut sig) = authority_and_valid_sig();
973 // Tamper the last byte (the trailing signature byte) so verification fails.
974 let last = sig.len() - 1;
975 sig[last] ^= 0xff;
976
977 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
978 let bad = peer_node("bad", NODE_KEY_BYTES, sig);
979 assert!(!tracker.tka_admits(&bad));
980
981 if tracker.tka_admits(&bad) {
982 tracker.peer_db.upsert(&bad);
983 }
984 assert_eq!(tracker.peer_db.peers().len(), 0);
985 }
986
987 #[tokio::test]
988 async fn tka_active_admits_authorized_peer() {
989 // Authority present + correctly-signed node key ⇒ admitted and upserted.
990 let (authority, sig) = authority_and_valid_sig();
991 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
992
993 let good = peer_node("good", NODE_KEY_BYTES, sig);
994 assert!(tracker.tka_admits(&good));
995
996 if tracker.tka_admits(&good) {
997 tracker.peer_db.upsert(&good);
998 }
999 assert_eq!(tracker.peer_db.peers().len(), 1);
1000 assert!(tracker.peer_db.get(&good.node_key).is_some());
1001 }
1002
1003 // ---------------------------------------------------------------------------------------------
1004 // Tests that drive REAL `PeerUpdate`s through the shared handler body
1005 // ([`PeerTracker::apply_peer_update`], the single source of truth the actor's netmap `handle`
1006 // also calls), so the two real upsert sites (`Full` and `Delta { upsert }`) are exercised via
1007 // the actual enforcement path — not by hand-mirroring `if !tka_admits { continue }`.
1008 // ---------------------------------------------------------------------------------------------
1009
1010 #[tokio::test]
1011 async fn tka_active_delta_upsert_rejects_unauthorized() {
1012 // Drive a real `Delta { upsert }` whose peer carries no signature. The Delta upsert site
1013 // must reject it under an active authority ⇒ not present in peer_db after the handler runs.
1014 let (authority, _sig) = authority_and_valid_sig();
1015 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1016
1017 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
1018 let update = ts_control::PeerUpdate::Delta {
1019 upsert: vec![unsigned.clone()],
1020 remove: Vec::new(),
1021 };
1022
1023 tracker.apply_peer_update(&update);
1024
1025 assert_eq!(tracker.peer_db.peers().len(), 0);
1026 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1027 }
1028
1029 #[tokio::test]
1030 async fn tka_active_delta_upsert_admits_authorized() {
1031 // Drive a real `Delta { upsert }` with a correctly-signed peer ⇒ present in peer_db.
1032 let (authority, sig) = authority_and_valid_sig();
1033 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1034
1035 let good = peer_node("good", NODE_KEY_BYTES, sig);
1036 let update = ts_control::PeerUpdate::Delta {
1037 upsert: vec![good.clone()],
1038 remove: Vec::new(),
1039 };
1040
1041 tracker.apply_peer_update(&update);
1042
1043 assert_eq!(tracker.peer_db.peers().len(), 1);
1044 assert!(tracker.peer_db.get(&good.node_key).is_some());
1045 }
1046
1047 #[tokio::test]
1048 async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
1049 // Drive a real `Full` carrying a MIX of authorized + unauthorized peers. Only the
1050 // correctly-signed peer survives the Full upsert site; the unsigned and bad-sig peers are
1051 // dropped fail-closed.
1052 let (authority, sig) = authority_and_valid_sig();
1053 // A bad-sig variant of the same authorized signature (tamper the trailing byte).
1054 let mut bad_sig = sig.clone();
1055 let last = bad_sig.len() - 1;
1056 bad_sig[last] ^= 0xff;
1057
1058 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1059
1060 // Only the authorized peer carries NODE_KEY_BYTES (the key the authority signed); the
1061 // rejected peers use distinct node keys so the survivor is unambiguous.
1062 let good = peer_node("good", NODE_KEY_BYTES, sig);
1063 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1064 let bad = peer_node("bad", [9u8; 32], bad_sig);
1065
1066 let update =
1067 ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1068
1069 tracker.apply_peer_update(&update);
1070
1071 assert_eq!(tracker.peer_db.peers().len(), 1);
1072 assert!(tracker.peer_db.get(&good.node_key).is_some());
1073 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
1074 assert!(tracker.peer_db.get(&bad.node_key).is_none());
1075 }
1076
1077 #[tokio::test]
1078 async fn tka_observe_only_admits_all_peers_in_mixed_batch() {
1079 // #136 observe-only contract: with the OBSERVE authority set (and `tka_authority = None`, so
1080 // enforcement is OFF), the exact mixed batch that the fail-closed test above prunes to 1 must
1081 // instead admit ALL THREE peers. The verify-and-log seam logs each verdict (verified /
1082 // unsigned / failed) but never gates admission. This locks observe-only against a future
1083 // refactor that accidentally wires `tka_observe` into a drop path.
1084 let (authority, sig) = authority_and_valid_sig();
1085 let mut bad_sig = sig.clone();
1086 let last = bad_sig.len() - 1;
1087 bad_sig[last] ^= 0xff;
1088
1089 // Authority in the OBSERVE slot, enforcement slot empty.
1090 let mut tracker = PeerTracker::for_test(test_env(), None, Some(authority));
1091
1092 let good = peer_node("good", NODE_KEY_BYTES, sig);
1093 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
1094 let bad = peer_node("bad", [9u8; 32], bad_sig);
1095
1096 let update =
1097 ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
1098
1099 tracker.apply_peer_update(&update);
1100
1101 // ALL THREE survive — observe-only never drops a peer.
1102 assert_eq!(
1103 tracker.peer_db.peers().len(),
1104 3,
1105 "observe-only must admit every peer regardless of signature verdict"
1106 );
1107 assert!(tracker.peer_db.get(&good.node_key).is_some());
1108 assert!(tracker.peer_db.get(&unsigned.node_key).is_some());
1109 assert!(tracker.peer_db.get(&bad.node_key).is_some());
1110 }
1111
1112 #[tokio::test]
1113 async fn tka_full_resync_revocation_behavior() {
1114 // Revocation-on-resync: admit a peer, then re-include the SAME stable_id in a `Full` with a
1115 // now-invalid signature. Per the Logic review finding, the pre-fix `retain` kept the stale
1116 // (previously-admitted) entry because membership was decided purely by stable_id.
1117 //
1118 // FIXED (not merely documented): the `Full` `retain` now keys on `tka_admits`-passing
1119 // stable_ids, so a peer whose re-included signature no longer verifies under the active
1120 // authority is EVICTED. This test asserts eviction. The inactive (authority=None) path is
1121 // provably unchanged — `tka_admits` always returns `true` there, so the retained set equals
1122 // the set of re-included stable_ids exactly (see `tka_inactive_full_resync_keeps_*`).
1123 let (authority, sig) = authority_and_valid_sig();
1124 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1125
1126 // 1) Admit the peer with a valid signature via a real `Full`.
1127 let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
1128 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1129 assert_eq!(tracker.peer_db.peers().len(), 1);
1130 assert!(tracker.peer_db.get(&good.node_key).is_some());
1131
1132 // 2) Re-sync the SAME stable_id, but with a now-invalid signature (tamper trailing byte).
1133 let mut bad_sig = sig;
1134 let last = bad_sig.len() - 1;
1135 bad_sig[last] ^= 0xff;
1136 let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
1137 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
1138
1139 // Eviction: the stale entry is dropped because its re-included signature fails the gate.
1140 assert_eq!(tracker.peer_db.peers().len(), 0);
1141 assert!(tracker.peer_db.get(&revoked.node_key).is_none());
1142 }
1143
1144 #[tokio::test]
1145 async fn tka_inactive_full_resync_keeps_reincluded_peer() {
1146 // Guard the inactive (authority=None) path against the revocation fix: with no authority,
1147 // a peer re-included in a `Full` survives regardless of its signature bytes — byte-for-byte
1148 // pre-TKA behavior, proving the `Full` `retain` change does not regress the always-taken
1149 // branch this wave.
1150 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1151
1152 let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
1153 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1154 assert_eq!(tracker.peer_db.peers().len(), 1);
1155
1156 // Re-sync the same stable_id with garbage signature bytes; inactive enforcement keeps it.
1157 let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
1158 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
1159 assert_eq!(tracker.peer_db.peers().len(), 1);
1160 assert!(tracker.peer_db.get(&resynced.node_key).is_some());
1161 }
1162
1163 /// A `Patch` for a peer already in the netmap merges only the fields it carries — here new UDP
1164 /// endpoints and a new home DERP — leaving the rest of the node intact. This is the fix for
1165 /// dropped `peers_changed_patch`: without it the netmap keeps stale endpoints and the peer can
1166 /// never re-handshake after it moves.
1167 #[tokio::test]
1168 async fn patch_merges_endpoints_and_derp_into_existing_peer() {
1169 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1170
1171 // Seed a peer (id == 1, per `peer_node`) with no endpoints / no DERP.
1172 let peer = peer_node("mover", [1u8; 32], vec![]);
1173 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1174 let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1175 assert!(before.underlay_addresses.is_empty());
1176 assert!(before.derp_region.is_none());
1177
1178 // Patch in fresh reachability (the idle-peer-reconnect case).
1179 let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1180 let patch = ts_control::PeerChange {
1181 id: 1,
1182 derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
1183 cap: None,
1184 cap_map: None,
1185 underlay_addresses: Some(vec![new_ep]),
1186 node_key: None,
1187 key_signature: None,
1188 disco_key: None,
1189 node_key_expiry: None,
1190 online: None,
1191 last_seen: None,
1192 };
1193 let (upserts, deletions) =
1194 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1195
1196 assert_eq!(upserts.len(), 1);
1197 assert_eq!(deletions.len(), 0);
1198 // Same peer, now carrying the patched endpoint + DERP; node key untouched.
1199 assert_eq!(tracker.peer_db.peers().len(), 1);
1200 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1201 assert_eq!(after.underlay_addresses, vec![new_ep]);
1202 assert_eq!(
1203 after.derp_region,
1204 Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
1205 );
1206 assert_eq!(after.node_key, peer.node_key);
1207 }
1208
1209 /// A `Patch` whose node id is not in the current netmap is ignored (the wire contract: a patch
1210 /// never creates a node). No upsert, no deletion, peer set unchanged.
1211 #[tokio::test]
1212 async fn patch_for_unknown_node_is_ignored() {
1213 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1214 let known = peer_node("known", [1u8; 32], vec![]); // id == 1
1215 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1216
1217 let patch = ts_control::PeerChange {
1218 id: 999, // not in the netmap
1219 derp_region: None,
1220 cap: None,
1221 cap_map: None,
1222 underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1223 node_key: None,
1224 key_signature: None,
1225 disco_key: None,
1226 node_key_expiry: None,
1227 online: None,
1228 last_seen: None,
1229 };
1230 let (upserts, deletions) =
1231 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1232
1233 assert_eq!(upserts.len(), 0);
1234 assert_eq!(deletions.len(), 0);
1235 assert_eq!(tracker.peer_db.peers().len(), 1);
1236 assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1237 }
1238
1239 /// An expiry-only `Patch` updates `node_key_expiry` on the matching peer (Go
1240 /// `PeerChange.KeyExpiry`), rather than being silently dropped until the next full resync.
1241 #[tokio::test]
1242 async fn patch_updates_node_key_expiry() {
1243 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1244 let peer = peer_node("expiring", [1u8; 32], vec![]); // id == 1, node_key_expiry: None
1245 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1246
1247 let expiry = "2027-01-01T00:00:00Z"
1248 .parse::<chrono::DateTime<chrono::Utc>>()
1249 .unwrap();
1250 let patch = ts_control::PeerChange {
1251 id: 1,
1252 derp_region: None,
1253 cap: None,
1254 cap_map: None,
1255 underlay_addresses: None,
1256 node_key: None,
1257 key_signature: None,
1258 disco_key: None,
1259 node_key_expiry: Some(expiry),
1260 online: None,
1261 last_seen: None,
1262 };
1263 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1264
1265 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1266 assert_eq!(after.node_key_expiry, Some(expiry));
1267 }
1268
1269 /// Channel B: a `PeerChange.online` patch flips a peer's online state without a full node.
1270 #[tokio::test]
1271 async fn patch_updates_online() {
1272 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1273 let peer = peer_node("p", [1u8; 32], vec![]); // id == 1, online: None
1274 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1275 assert_eq!(
1276 tracker
1277 .peer_db
1278 .get(&(1 as ts_control::NodeId))
1279 .unwrap()
1280 .1
1281 .online,
1282 None
1283 );
1284
1285 let mut patch = ts_control::PeerChange {
1286 id: 1,
1287 derp_region: None,
1288 cap: None,
1289 cap_map: None,
1290 underlay_addresses: None,
1291 node_key: None,
1292 key_signature: None,
1293 disco_key: None,
1294 node_key_expiry: None,
1295 online: Some(true),
1296 last_seen: None,
1297 };
1298 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch.clone()]));
1299 assert_eq!(
1300 tracker
1301 .peer_db
1302 .get(&(1 as ts_control::NodeId))
1303 .unwrap()
1304 .1
1305 .online,
1306 Some(true),
1307 "PeerChange.online=Some(true) marks the peer online"
1308 );
1309
1310 // A subsequent patch flips it offline.
1311 patch.online = Some(false);
1312 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1313 assert_eq!(
1314 tracker
1315 .peer_db
1316 .get(&(1 as ts_control::NodeId))
1317 .unwrap()
1318 .1
1319 .online,
1320 Some(false)
1321 );
1322 }
1323
1324 /// Channel C/D: the `online_change` map flips online directly; `peer_seen_change: false`
1325 /// ("the peer is gone") marks the peer offline. Both apply to a peer already in the netmap and
1326 /// ignore unknown ids.
1327 #[tokio::test]
1328 async fn liveness_change_maps_apply_online() {
1329 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1330 let peer = peer_node("p", [1u8; 32], vec![]); // id == 1
1331 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1332
1333 // Channel C: online_change sets online=true.
1334 let mut online_change = std::collections::BTreeMap::new();
1335 online_change.insert(1 as ts_control::NodeId, true);
1336 online_change.insert(999 as ts_control::NodeId, true); // unknown id — ignored
1337 let changed = tracker.apply_liveness_changes(&online_change, &Default::default());
1338 assert!(changed);
1339 assert_eq!(
1340 tracker
1341 .peer_db
1342 .get(&(1 as ts_control::NodeId))
1343 .unwrap()
1344 .1
1345 .online,
1346 Some(true)
1347 );
1348
1349 // Channel D: peer_seen_change=false marks the peer offline (gone), node retained.
1350 let mut peer_seen_change = std::collections::BTreeMap::new();
1351 peer_seen_change.insert(1 as ts_control::NodeId, false);
1352 let changed = tracker.apply_liveness_changes(&Default::default(), &peer_seen_change);
1353 assert!(changed);
1354 assert_eq!(
1355 tracker
1356 .peer_db
1357 .get(&(1 as ts_control::NodeId))
1358 .unwrap()
1359 .1
1360 .online,
1361 Some(false),
1362 "peer_seen_change=false marks offline (the node stays in the netmap)"
1363 );
1364 assert_eq!(
1365 tracker.peer_db.peers().len(),
1366 1,
1367 "the node is retained, not removed"
1368 );
1369
1370 // No-op when nothing matches / changes.
1371 assert!(!tracker.apply_liveness_changes(&Default::default(), &Default::default()));
1372 }
1373
1374 /// Security: a `Patch` that rotates the node key must re-satisfy the tailnet-lock authority,
1375 /// exactly like a `Delta` upsert. A key-rotation patch whose new signature does NOT verify
1376 /// evicts the peer (fail-closed) rather than leaving a now-unverified entry — closing what would
1377 /// otherwise be a trust-enforcement bypass via the patch path.
1378 #[tokio::test]
1379 async fn patch_key_rotation_failing_tka_evicts_peer() {
1380 let (authority, sig) = authority_and_valid_sig();
1381 let mut tracker = PeerTracker::for_test(test_env(), Some(authority), None);
1382
1383 // Admit a correctly-signed peer (id == 1).
1384 let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1385 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1386 assert_eq!(tracker.peer_db.peers().len(), 1);
1387
1388 // Patch a new node key whose signature is garbage under the active authority.
1389 let patch = ts_control::PeerChange {
1390 id: 1,
1391 derp_region: None,
1392 cap: None,
1393 cap_map: None,
1394 underlay_addresses: None,
1395 node_key: Some([0x33u8; 32].into()),
1396 key_signature: Some(vec![0x00, 0x01, 0x02]),
1397 disco_key: None,
1398 node_key_expiry: None,
1399 online: None,
1400 last_seen: None,
1401 };
1402 let (upserts, deletions) =
1403 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1404
1405 assert_eq!(upserts.len(), 0);
1406 assert_eq!(deletions.len(), 1);
1407 assert_eq!(tracker.peer_db.peers().len(), 0);
1408 }
1409
1410 /// A node's `user_id` joins against the accumulated UserProfiles table to resolve the owning
1411 /// user's login name in `WhoIs.user`. With no matching profile, `user` is `None` (the
1412 /// pre-existing behavior); once a profile arrives, the same node resolves to its login. This
1413 /// proves the accumulate-then-join path the netmap handler builds.
1414 fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1415 ts_control::UserProfile {
1416 id,
1417 login_name: login.to_string(),
1418 display_name: None,
1419 }
1420 }
1421
1422 #[tokio::test]
1423 async fn whois_resolves_user_from_accumulated_profiles() {
1424 let mut tracker = PeerTracker::for_test(test_env(), None, None);
1425
1426 // A peer owned by user id 42 at 100.64.0.1 (the peer_node fixture's address).
1427 let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1428 peer.user_id = 42;
1429 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1430 let addr = "100.64.0.1:0".parse().unwrap();
1431
1432 // No profile yet: the node resolves but its owner is unknown.
1433 let who = tracker.whois_opt(addr).expect("peer is known");
1434 assert_eq!(who.user, None);
1435
1436 // Profile for a DIFFERENT user must not match.
1437 tracker
1438 .user_profiles
1439 .insert(7, profile(7, "someone-else@example.com"));
1440 assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1441
1442 // The owning user's profile arrives (as the netmap handler would accumulate it): now the
1443 // login resolves.
1444 tracker
1445 .user_profiles
1446 .insert(42, profile(42, "alice@example.com"));
1447 assert_eq!(
1448 tracker.whois_opt(addr).unwrap().user,
1449 Some("alice@example.com".to_string())
1450 );
1451 }
1452
1453 /// `UserProfile::best_label` prefers the login name, falling back to display name, else `None`.
1454 #[test]
1455 fn user_profile_best_label_prefers_login() {
1456 assert_eq!(
1457 profile(1, "alice@example.com").best_label(),
1458 Some("alice@example.com".to_string())
1459 );
1460 let display_only = ts_control::UserProfile {
1461 id: 2,
1462 login_name: String::new(),
1463 display_name: Some("Bob".to_string()),
1464 };
1465 assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1466 let empty = ts_control::UserProfile {
1467 id: 3,
1468 login_name: String::new(),
1469 display_name: None,
1470 };
1471 assert_eq!(empty.best_label(), None);
1472 }
1473}