1use std::{
4 collections::{HashMap, HashSet},
5 net::IpAddr,
6 sync::Arc,
7};
8
9use kameo::{
10 actor::ActorRef,
11 message::{Context, Message},
12 reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24pub struct PeerTracker {
26 peer_db: PeerDb,
27 seen_state_update: bool,
28 pending_requests: Vec<Pending>,
29 peer_watch: watch::Sender<Vec<StatusNode>>,
32 user_profiles: HashMap<UserId, UserProfile>,
39 tka_authority: Option<ts_tka::Authority>,
48 env: Env,
49}
50
51impl PeerTracker {
52 fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
53 self.peer_db.get(&name).map(|(_id, node)| node)
55 }
56
57 fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
58 self.peer_db.get(&ip).map(|(_id, node)| node)
59 }
60
61 fn status_peers(&self) -> Vec<StatusNode> {
63 self.peer_db
64 .peers()
65 .values()
66 .map(StatusNode::from_node)
67 .collect()
68 }
69
70 fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
71 let ip = crate::status::whois_addr(addr);
72 let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
73 let user = self.resolve_user(node.user_id);
77 Some(crate::status::WhoIs::from_node_with_user(node, user))
78 }
79
80 fn resolve_user(&self, user_id: UserId) -> Option<String> {
82 self.user_profiles
83 .get(&user_id)
84 .and_then(UserProfile::best_label)
85 }
86
87 fn tka_admits(&self, node: &Node) -> bool {
97 let Some(auth) = &self.tka_authority else {
98 return true;
99 };
100
101 if node.key_signature.is_empty() {
102 tracing::warn!(
105 stable_id = ?node.stable_id,
106 "TKA: rejecting unsigned peer under tailnet lock"
107 );
108 return false;
109 }
110
111 if let Err(e) = auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
112 tracing::warn!(
113 stable_id = ?node.stable_id,
114 error = %e,
115 "TKA: rejecting peer with unauthorized node key"
116 );
117 return false;
118 }
119
120 true
121 }
122}
123
124impl kameo::Actor for PeerTracker {
125 type Args = Env;
126 type Error = Error;
127
128 async fn on_start(env: Self::Args, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
129 env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
130
131 let (peer_watch, _) = watch::channel(Vec::new());
132
133 Ok(Self {
134 peer_db: PeerDb::default(),
135 pending_requests: Default::default(),
136 seen_state_update: false,
137 peer_watch,
138 user_profiles: HashMap::new(),
139 tka_authority: None,
142 env,
143 })
144 }
145}
146
147enum Pending {
148 PeerByName(PeerByName, ReplySender<Option<Node>>),
149 AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
150 TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
151 Status(ReplySender<Vec<StatusNode>>),
152 WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
153}
154
155#[allow(missing_docs)]
159mod msg_impl {
160 use std::net::IpAddr;
161
162 use kameo::prelude::DelegatedReply;
163
164 use super::*;
165
166 #[kameo::messages]
167 impl PeerTracker {
168 #[message(ctx)]
172 pub async fn peer_by_name(
173 &mut self,
174 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
175 name: String,
176 ) -> DelegatedReply<Option<Node>> {
177 let (deleg, sender) = ctx.reply_sender();
178 let Some(sender) = sender else { return deleg };
179
180 if !self.seen_state_update {
181 tracing::debug!(query = name, "no peer state seen yet, queueing request");
182
183 self.pending_requests
184 .push(Pending::PeerByName(PeerByName { name }, sender));
185
186 return deleg;
187 }
188
189 sender.send(self.peer_by_name_opt(&name).cloned());
190
191 deleg
192 }
193
194 #[message(ctx)]
209 pub fn peer_by_accepted_route(
210 &mut self,
211 ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
212 ip: IpAddr,
213 ) -> DelegatedReply<Vec<Node>> {
214 let (deleg, sender) = ctx.reply_sender();
215 let Some(sender) = sender else { return deleg };
216
217 if !self.seen_state_update {
218 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
219
220 self.pending_requests
221 .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
222
223 return deleg;
224 }
225
226 sender.send(
227 self.peer_db
228 .get_route(ip.into())
229 .map(|(_id, node)| node.clone())
230 .collect(),
231 );
232
233 deleg
234 }
235
236 #[message(ctx)]
238 pub fn peer_by_tailnet_ip(
239 &mut self,
240 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
241 ip: IpAddr,
242 ) -> DelegatedReply<Option<Node>> {
243 let (deleg, sender) = ctx.reply_sender();
244 let Some(sender) = sender else { return deleg };
245
246 if !self.seen_state_update {
247 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
248
249 self.pending_requests
250 .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
251
252 return deleg;
253 }
254
255 sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
256
257 deleg
258 }
259
260 #[message(ctx)]
267 pub fn get_status(
268 &mut self,
269 ctx: &mut Context<Self, DelegatedReply<Vec<StatusNode>>>,
270 ) -> DelegatedReply<Vec<StatusNode>> {
271 let (deleg, sender) = ctx.reply_sender();
272 let Some(sender) = sender else { return deleg };
273
274 if !self.seen_state_update {
275 tracing::debug!("no peer state seen yet, queueing status request");
276 self.pending_requests.push(Pending::Status(sender));
277 return deleg;
278 }
279
280 sender.send(self.status_peers());
281
282 deleg
283 }
284
285 #[message]
293 pub fn all_peers(&self) -> Vec<Node> {
294 self.peer_db.peers().values().cloned().collect()
295 }
296
297 #[message(ctx)]
308 pub fn whois(
309 &mut self,
310 ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
311 addr: std::net::SocketAddr,
312 ) -> DelegatedReply<Option<crate::status::WhoIs>> {
313 let (deleg, sender) = ctx.reply_sender();
314 let Some(sender) = sender else { return deleg };
315
316 if !self.seen_state_update {
317 tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
318 self.pending_requests
319 .push(Pending::WhoIs(Whois { addr }, sender));
320 return deleg;
321 }
322
323 sender.send(self.whois_opt(addr));
324
325 deleg
326 }
327
328 #[message(derive(Clone))]
338 pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
339 self.peer_watch.subscribe()
340 }
341 }
342}
343
344pub use msg_impl::*;
345
346#[derive(Debug, Clone)]
347pub(crate) struct PeerState {
348 #[allow(unused)]
349 pub deletions: HashSet<PeerId>,
350 #[allow(unused)]
351 pub upserts: HashSet<PeerId>,
352 pub peers: Arc<PeerDb>,
353}
354
355impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
356 type Reply = ();
357
358 async fn handle(
359 &mut self,
360 msg: Arc<ts_control::StateUpdate>,
361 _ctx: &mut Context<Self, Self::Reply>,
362 ) {
363 for profile in &msg.user_profiles {
367 self.user_profiles.insert(profile.id, profile.clone());
368 }
369
370 let liveness_changed =
376 self.apply_liveness_changes(&msg.online_change, &msg.peer_seen_change);
377
378 let Some(peer_update) = &msg.peer_update else {
379 if liveness_changed {
382 self.service_pending_requests();
383 self.peer_watch.send_replace(self.status_peers());
384 if let Err(e) = self
385 .env
386 .publish(Arc::new(PeerState {
387 upserts: HashSet::default(),
388 deletions: HashSet::default(),
389 peers: Arc::new(self.peer_db.clone()),
390 }))
391 .await
392 {
393 tracing::error!(error = %e, "publishing liveness-only peer state update");
394 }
395 }
396 return;
397 };
398
399 let (upserts, deletions) = self.apply_peer_update(peer_update);
400
401 tracing::debug!(
402 n_upsert = upserts.len(),
403 n_delete = deletions.len(),
404 peer_count = self.peer_db.peers().len(),
405 "new peer state"
406 );
407
408 self.service_pending_requests();
409
410 self.peer_watch.send_replace(self.status_peers());
413
414 if let Err(e) = self
415 .env
416 .publish(Arc::new(PeerState {
417 upserts,
418 deletions,
419 peers: Arc::new(self.peer_db.clone()),
420 }))
421 .await
422 {
423 tracing::error!(error = %e, "publishing peer state update");
424 }
425 }
426}
427
428#[derive(Debug, Clone, Copy)]
433pub struct RepublishState;
434
435impl Message<RepublishState> for PeerTracker {
436 type Reply = ();
437
438 async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
439 if let Err(e) = self
443 .env
444 .publish(Arc::new(PeerState {
445 upserts: HashSet::default(),
446 deletions: HashSet::default(),
447 peers: Arc::new(self.peer_db.clone()),
448 }))
449 .await
450 {
451 tracing::error!(error = %e, "re-publishing peer state after exit-node change");
452 }
453 }
454}
455
456impl PeerTracker {
457 fn apply_peer_update(
466 &mut self,
467 peer_update: &ts_control::PeerUpdate,
468 ) -> (HashSet<PeerId>, HashSet<PeerId>) {
469 let mut upserts = HashSet::default();
470 let mut deletions = HashSet::default();
471
472 match peer_update {
473 ts_control::PeerUpdate::Full(new_nodes) => {
474 tracing::trace!("full peer update");
475
476 let retained_ids = new_nodes
484 .iter()
485 .filter(|node| self.tka_admits(node))
486 .map(|x| &x.stable_id)
487 .collect::<HashSet<_>>();
488
489 self.peer_db.retain(|id, peer| {
490 let retain = retained_ids.contains(&peer.stable_id);
491
492 if !retain {
493 deletions.insert(id);
494 }
495
496 retain
497 });
498
499 for node in new_nodes {
500 if !self.tka_admits(node) {
501 continue; }
503 let peer_id = self.peer_db.upsert(node);
504 upserts.insert(peer_id);
505 }
506 }
507
508 ts_control::PeerUpdate::Delta { remove, upsert } => {
509 tracing::trace!("delta peer update");
510
511 for peer in upsert {
512 if !self.tka_admits(peer) {
513 continue; }
515 let id = self.peer_db.upsert(peer);
516
517 upserts.insert(id);
518 }
519
520 for peer in remove {
521 let Some((id, _node)) = self.peer_db.remove(peer) else {
522 tracing::error!(control_node_id = peer, "removed peer was unknown");
523 continue;
524 };
525
526 deletions.insert(id);
527 }
528 }
529
530 ts_control::PeerUpdate::Patch(patches) => {
531 tracing::trace!(n = patches.len(), "peer patch update");
532
533 for patch in patches {
534 let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
539 tracing::debug!(
540 control_node_id = patch.id,
541 "peer patch for unknown node; ignoring"
542 );
543 continue;
544 };
545
546 let mut node = existing.clone();
547 if let Some(endpoints) = &patch.underlay_addresses {
548 node.underlay_addresses = endpoints.clone();
549 }
550 if let Some(derp) = patch.derp_region {
551 node.derp_region = Some(derp);
552 }
553 if let Some(cap) = patch.cap {
554 node.cap = cap;
555 }
556 if let Some(cap_map) = &patch.cap_map {
557 node.cap_map = cap_map.clone();
558 }
559 if let Some(disco_key) = patch.disco_key {
560 node.disco_key = Some(disco_key);
561 }
562 if let Some(expiry) = patch.node_key_expiry {
563 node.node_key_expiry = Some(expiry);
564 }
565 if let Some(online) = patch.online {
569 node.online = Some(online);
570 }
571 if let Some(last_seen) = patch.last_seen {
572 node.last_seen = Some(last_seen);
573 }
574 if let Some(node_key) = patch.node_key {
578 node.node_key = node_key;
579 }
580 if let Some(sig) = &patch.key_signature {
581 node.key_signature = sig.clone();
582 }
583
584 if !self.tka_admits(&node) {
589 if let Some((id, _)) = self.peer_db.remove(&patch.id) {
590 tracing::warn!(
591 control_node_id = patch.id,
592 "peer patch rejected by tailnet lock; evicting peer"
593 );
594 deletions.insert(id);
595 }
596 continue;
597 }
598
599 let id = self.peer_db.upsert(&node);
600 upserts.insert(id);
601 }
602 }
603 }
604
605 (upserts, deletions)
606 }
607
608 fn apply_liveness_changes(
621 &mut self,
622 online_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
623 peer_seen_change: &std::collections::BTreeMap<ts_control::NodeId, bool>,
624 ) -> bool {
625 let mut changed = false;
626
627 for (&node_id, &online) in online_change {
629 if let Some((_pid, existing)) = self.peer_db.get(&node_id)
630 && existing.online != Some(online)
631 {
632 let mut node = existing.clone();
633 node.online = Some(online);
634 self.peer_db.upsert(&node);
635 changed = true;
636 }
637 }
638
639 for (&node_id, &seen) in peer_seen_change {
644 if !seen
645 && let Some((_pid, existing)) = self.peer_db.get(&node_id)
646 && existing.online != Some(false)
647 {
648 let mut node = existing.clone();
649 node.online = Some(false);
650 self.peer_db.upsert(&node);
651 changed = true;
652 }
653 }
654
655 changed
656 }
657
658 #[cfg(test)]
662 fn for_test(env: Env, tka_authority: Option<ts_tka::Authority>) -> Self {
663 let (peer_watch, _) = watch::channel(Vec::new());
664 Self {
665 peer_db: PeerDb::default(),
666 seen_state_update: false,
667 pending_requests: Vec::new(),
668 peer_watch,
669 user_profiles: HashMap::new(),
670 tka_authority,
671 env,
672 }
673 }
674
675 fn service_pending_requests(&mut self) {
676 if self.seen_state_update {
677 return;
678 }
679
680 self.seen_state_update = true;
681
682 if !self.pending_requests.is_empty() {
683 tracing::debug!(
684 n_pending = self.pending_requests.len(),
685 "state update received, servicing pending requests"
686 );
687 }
688
689 for req in core::mem::take(&mut self.pending_requests) {
690 match req {
691 Pending::PeerByName(PeerByName { name }, reply) => {
692 reply.send(self.peer_by_name_opt(&name).cloned());
693 }
694 Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
695 reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
696 }
697 Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
698 reply.send(
699 self.peer_db
700 .get_route(ip.into())
701 .map(|(_id, node)| node.clone())
702 .collect(),
703 );
704 }
705 Pending::Status(reply) => {
706 reply.send(self.status_peers());
707 }
708 Pending::WhoIs(Whois { addr }, reply) => {
709 reply.send(self.whois_opt(addr));
710 }
711 }
712 }
713 }
714}
715
716#[cfg(test)]
717mod tka_tests {
718 use ed25519_dalek::{Signer, SigningKey};
728 use ts_control::{Node, StableNodeId, TailnetAddress};
729 use ts_tka::{
730 AumHash, Authority, Key, KeyKind, State,
731 cbor::{self, Value},
732 };
733
734 use super::*;
735
736 const SIG_KIND_DIRECT: u64 = 1;
738
739 const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
741
742 fn test_env() -> Env {
745 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
746 Env::new(
747 ts_keys::NodeState::generate(),
748 shutdown_rx,
749 crate::env::ForwarderConfig {
750 accept_routes: false,
751 exit_node: None,
752 forward_routes: Vec::new(),
753 forward_tcp_ports: Vec::new(),
754 forward_udp_ports: Vec::new(),
755 forward_all_ports: false,
756 forward_exit_egress: false,
757 exit_proxy: None,
758 peerapi_port: None,
759 taildrop_dir: None,
760 enable_ipv6: false,
761 persistent_keepalive_interval: None,
762 ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
763 },
764 )
765 }
766
767 fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
769 Node {
770 id: 1,
771 stable_id: StableNodeId(stable_id.to_string()),
772 hostname: stable_id.to_string(),
773 user_id: 0,
774 tailnet: Some("ts.net".to_string()),
775 tags: Vec::new(),
776 tailnet_address: TailnetAddress {
777 ipv4: "100.64.0.1/32".parse().unwrap(),
778 ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
779 },
780 node_key: node_key.into(),
781 node_key_expiry: None,
782 online: None,
783 last_seen: None,
784 key_signature,
785 machine_key: None,
786 disco_key: None,
787 accepted_routes: Vec::new(),
788 underlay_addresses: Vec::new(),
789 derp_region: None,
790 cap: Default::default(),
791 cap_map: Default::default(),
792 peerapi_port: None,
793 peerapi_dns_proxy: false,
794 is_wireguard_only: false,
795 exit_node_dns_resolvers: Vec::new(),
796 peer_relay: false,
797 service_vips: Default::default(),
798 }
799 }
800
801 fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
806 let mut pairs = alloc_pairs(node_key, key_id);
807 if let Some(sig) = signature {
808 pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
809 }
810 cbor::int_map(pairs).to_vec()
811 }
812
813 fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
814 vec![
815 (1, Some(Value::Uint(SIG_KIND_DIRECT))),
816 (2, Some(Value::Bytes(node_key.to_vec()))),
817 (3, Some(Value::Bytes(key_id.to_vec()))),
818 ]
819 }
820
821 fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
824 let signing = SigningKey::from_bytes(&[42u8; 32]);
826 let trusted_pub = signing.verifying_key().to_bytes().to_vec();
827
828 let authority = Authority::from_state(
829 AumHash([0; 32]),
830 State {
831 keys: vec![Key {
832 kind: KeyKind::Ed25519,
833 votes: 1,
834 public: trusted_pub.clone(),
835 }],
836 },
837 );
838
839 let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
841 let sig_hash = ts_tka::aum_hash(&preimage).0;
842 let signature = signing.sign(&sig_hash).to_bytes().to_vec();
843
844 let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
845 assert!(
847 authority
848 .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
849 .is_ok()
850 );
851
852 (authority, signed_cbor)
853 }
854
855 #[tokio::test]
856 async fn tka_inactive_upserts_all_peers() {
857 let mut tracker = PeerTracker::for_test(test_env(), None);
859
860 let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
861 let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
862
863 assert!(tracker.tka_admits(&signed));
864 assert!(tracker.tka_admits(&unsigned));
865
866 tracker.peer_db.upsert(&signed);
867 tracker.peer_db.upsert(&unsigned);
868 assert_eq!(tracker.peer_db.peers().len(), 2);
869 }
870
871 #[tokio::test]
872 async fn tka_active_rejects_unsigned_peer() {
873 let (authority, _sig) = authority_and_valid_sig();
875 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
876
877 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
878 assert!(!tracker.tka_admits(&unsigned));
879
880 if tracker.tka_admits(&unsigned) {
882 tracker.peer_db.upsert(&unsigned);
883 }
884 assert_eq!(tracker.peer_db.peers().len(), 0);
885 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
886 }
887
888 #[tokio::test]
889 async fn tka_active_rejects_bad_signature() {
890 let (authority, mut sig) = authority_and_valid_sig();
892 let last = sig.len() - 1;
894 sig[last] ^= 0xff;
895
896 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
897 let bad = peer_node("bad", NODE_KEY_BYTES, sig);
898 assert!(!tracker.tka_admits(&bad));
899
900 if tracker.tka_admits(&bad) {
901 tracker.peer_db.upsert(&bad);
902 }
903 assert_eq!(tracker.peer_db.peers().len(), 0);
904 }
905
906 #[tokio::test]
907 async fn tka_active_admits_authorized_peer() {
908 let (authority, sig) = authority_and_valid_sig();
910 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
911
912 let good = peer_node("good", NODE_KEY_BYTES, sig);
913 assert!(tracker.tka_admits(&good));
914
915 if tracker.tka_admits(&good) {
916 tracker.peer_db.upsert(&good);
917 }
918 assert_eq!(tracker.peer_db.peers().len(), 1);
919 assert!(tracker.peer_db.get(&good.node_key).is_some());
920 }
921
922 #[tokio::test]
930 async fn tka_active_delta_upsert_rejects_unauthorized() {
931 let (authority, _sig) = authority_and_valid_sig();
934 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
935
936 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
937 let update = ts_control::PeerUpdate::Delta {
938 upsert: vec![unsigned.clone()],
939 remove: Vec::new(),
940 };
941
942 tracker.apply_peer_update(&update);
943
944 assert_eq!(tracker.peer_db.peers().len(), 0);
945 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
946 }
947
948 #[tokio::test]
949 async fn tka_active_delta_upsert_admits_authorized() {
950 let (authority, sig) = authority_and_valid_sig();
952 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
953
954 let good = peer_node("good", NODE_KEY_BYTES, sig);
955 let update = ts_control::PeerUpdate::Delta {
956 upsert: vec![good.clone()],
957 remove: Vec::new(),
958 };
959
960 tracker.apply_peer_update(&update);
961
962 assert_eq!(tracker.peer_db.peers().len(), 1);
963 assert!(tracker.peer_db.get(&good.node_key).is_some());
964 }
965
966 #[tokio::test]
967 async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
968 let (authority, sig) = authority_and_valid_sig();
972 let mut bad_sig = sig.clone();
974 let last = bad_sig.len() - 1;
975 bad_sig[last] ^= 0xff;
976
977 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
978
979 let good = peer_node("good", NODE_KEY_BYTES, sig);
982 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
983 let bad = peer_node("bad", [9u8; 32], bad_sig);
984
985 let update =
986 ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
987
988 tracker.apply_peer_update(&update);
989
990 assert_eq!(tracker.peer_db.peers().len(), 1);
991 assert!(tracker.peer_db.get(&good.node_key).is_some());
992 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
993 assert!(tracker.peer_db.get(&bad.node_key).is_none());
994 }
995
996 #[tokio::test]
997 async fn tka_full_resync_revocation_behavior() {
998 let (authority, sig) = authority_and_valid_sig();
1008 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
1009
1010 let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
1012 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1013 assert_eq!(tracker.peer_db.peers().len(), 1);
1014 assert!(tracker.peer_db.get(&good.node_key).is_some());
1015
1016 let mut bad_sig = sig;
1018 let last = bad_sig.len() - 1;
1019 bad_sig[last] ^= 0xff;
1020 let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
1021 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
1022
1023 assert_eq!(tracker.peer_db.peers().len(), 0);
1025 assert!(tracker.peer_db.get(&revoked.node_key).is_none());
1026 }
1027
1028 #[tokio::test]
1029 async fn tka_inactive_full_resync_keeps_reincluded_peer() {
1030 let mut tracker = PeerTracker::for_test(test_env(), None);
1035
1036 let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
1037 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1038 assert_eq!(tracker.peer_db.peers().len(), 1);
1039
1040 let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
1042 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
1043 assert_eq!(tracker.peer_db.peers().len(), 1);
1044 assert!(tracker.peer_db.get(&resynced.node_key).is_some());
1045 }
1046
1047 #[tokio::test]
1052 async fn patch_merges_endpoints_and_derp_into_existing_peer() {
1053 let mut tracker = PeerTracker::for_test(test_env(), None);
1054
1055 let peer = peer_node("mover", [1u8; 32], vec![]);
1057 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
1058 let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1059 assert!(before.underlay_addresses.is_empty());
1060 assert!(before.derp_region.is_none());
1061
1062 let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
1064 let patch = ts_control::PeerChange {
1065 id: 1,
1066 derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
1067 cap: None,
1068 cap_map: None,
1069 underlay_addresses: Some(vec![new_ep]),
1070 node_key: None,
1071 key_signature: None,
1072 disco_key: None,
1073 node_key_expiry: None,
1074 online: None,
1075 last_seen: None,
1076 };
1077 let (upserts, deletions) =
1078 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1079
1080 assert_eq!(upserts.len(), 1);
1081 assert_eq!(deletions.len(), 0);
1082 assert_eq!(tracker.peer_db.peers().len(), 1);
1084 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1085 assert_eq!(after.underlay_addresses, vec![new_ep]);
1086 assert_eq!(
1087 after.derp_region,
1088 Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
1089 );
1090 assert_eq!(after.node_key, peer.node_key);
1091 }
1092
1093 #[tokio::test]
1096 async fn patch_for_unknown_node_is_ignored() {
1097 let mut tracker = PeerTracker::for_test(test_env(), None);
1098 let known = peer_node("known", [1u8; 32], vec![]); tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1100
1101 let patch = ts_control::PeerChange {
1102 id: 999, derp_region: None,
1104 cap: None,
1105 cap_map: None,
1106 underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1107 node_key: None,
1108 key_signature: None,
1109 disco_key: None,
1110 node_key_expiry: None,
1111 online: None,
1112 last_seen: None,
1113 };
1114 let (upserts, deletions) =
1115 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1116
1117 assert_eq!(upserts.len(), 0);
1118 assert_eq!(deletions.len(), 0);
1119 assert_eq!(tracker.peer_db.peers().len(), 1);
1120 assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1121 }
1122
1123 #[tokio::test]
1126 async fn patch_updates_node_key_expiry() {
1127 let mut tracker = PeerTracker::for_test(test_env(), None);
1128 let peer = peer_node("expiring", [1u8; 32], vec![]); tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1130
1131 let expiry = "2027-01-01T00:00:00Z"
1132 .parse::<chrono::DateTime<chrono::Utc>>()
1133 .unwrap();
1134 let patch = ts_control::PeerChange {
1135 id: 1,
1136 derp_region: None,
1137 cap: None,
1138 cap_map: None,
1139 underlay_addresses: None,
1140 node_key: None,
1141 key_signature: None,
1142 disco_key: None,
1143 node_key_expiry: Some(expiry),
1144 online: None,
1145 last_seen: None,
1146 };
1147 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1148
1149 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1150 assert_eq!(after.node_key_expiry, Some(expiry));
1151 }
1152
1153 #[tokio::test]
1155 async fn patch_updates_online() {
1156 let mut tracker = PeerTracker::for_test(test_env(), None);
1157 let peer = peer_node("p", [1u8; 32], vec![]); tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1159 assert_eq!(
1160 tracker
1161 .peer_db
1162 .get(&(1 as ts_control::NodeId))
1163 .unwrap()
1164 .1
1165 .online,
1166 None
1167 );
1168
1169 let mut patch = ts_control::PeerChange {
1170 id: 1,
1171 derp_region: None,
1172 cap: None,
1173 cap_map: None,
1174 underlay_addresses: None,
1175 node_key: None,
1176 key_signature: None,
1177 disco_key: None,
1178 node_key_expiry: None,
1179 online: Some(true),
1180 last_seen: None,
1181 };
1182 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch.clone()]));
1183 assert_eq!(
1184 tracker
1185 .peer_db
1186 .get(&(1 as ts_control::NodeId))
1187 .unwrap()
1188 .1
1189 .online,
1190 Some(true),
1191 "PeerChange.online=Some(true) marks the peer online"
1192 );
1193
1194 patch.online = Some(false);
1196 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1197 assert_eq!(
1198 tracker
1199 .peer_db
1200 .get(&(1 as ts_control::NodeId))
1201 .unwrap()
1202 .1
1203 .online,
1204 Some(false)
1205 );
1206 }
1207
1208 #[tokio::test]
1212 async fn liveness_change_maps_apply_online() {
1213 let mut tracker = PeerTracker::for_test(test_env(), None);
1214 let peer = peer_node("p", [1u8; 32], vec![]); tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1216
1217 let mut online_change = std::collections::BTreeMap::new();
1219 online_change.insert(1 as ts_control::NodeId, true);
1220 online_change.insert(999 as ts_control::NodeId, true); let changed = tracker.apply_liveness_changes(&online_change, &Default::default());
1222 assert!(changed);
1223 assert_eq!(
1224 tracker
1225 .peer_db
1226 .get(&(1 as ts_control::NodeId))
1227 .unwrap()
1228 .1
1229 .online,
1230 Some(true)
1231 );
1232
1233 let mut peer_seen_change = std::collections::BTreeMap::new();
1235 peer_seen_change.insert(1 as ts_control::NodeId, false);
1236 let changed = tracker.apply_liveness_changes(&Default::default(), &peer_seen_change);
1237 assert!(changed);
1238 assert_eq!(
1239 tracker
1240 .peer_db
1241 .get(&(1 as ts_control::NodeId))
1242 .unwrap()
1243 .1
1244 .online,
1245 Some(false),
1246 "peer_seen_change=false marks offline (the node stays in the netmap)"
1247 );
1248 assert_eq!(
1249 tracker.peer_db.peers().len(),
1250 1,
1251 "the node is retained, not removed"
1252 );
1253
1254 assert!(!tracker.apply_liveness_changes(&Default::default(), &Default::default()));
1256 }
1257
1258 #[tokio::test]
1263 async fn patch_key_rotation_failing_tka_evicts_peer() {
1264 let (authority, sig) = authority_and_valid_sig();
1265 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
1266
1267 let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1269 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1270 assert_eq!(tracker.peer_db.peers().len(), 1);
1271
1272 let patch = ts_control::PeerChange {
1274 id: 1,
1275 derp_region: None,
1276 cap: None,
1277 cap_map: None,
1278 underlay_addresses: None,
1279 node_key: Some([0x33u8; 32].into()),
1280 key_signature: Some(vec![0x00, 0x01, 0x02]),
1281 disco_key: None,
1282 node_key_expiry: None,
1283 online: None,
1284 last_seen: None,
1285 };
1286 let (upserts, deletions) =
1287 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1288
1289 assert_eq!(upserts.len(), 0);
1290 assert_eq!(deletions.len(), 1);
1291 assert_eq!(tracker.peer_db.peers().len(), 0);
1292 }
1293
1294 fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1299 ts_control::UserProfile {
1300 id,
1301 login_name: login.to_string(),
1302 display_name: None,
1303 }
1304 }
1305
1306 #[tokio::test]
1307 async fn whois_resolves_user_from_accumulated_profiles() {
1308 let mut tracker = PeerTracker::for_test(test_env(), None);
1309
1310 let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1312 peer.user_id = 42;
1313 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1314 let addr = "100.64.0.1:0".parse().unwrap();
1315
1316 let who = tracker.whois_opt(addr).expect("peer is known");
1318 assert_eq!(who.user, None);
1319
1320 tracker
1322 .user_profiles
1323 .insert(7, profile(7, "someone-else@example.com"));
1324 assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1325
1326 tracker
1329 .user_profiles
1330 .insert(42, profile(42, "alice@example.com"));
1331 assert_eq!(
1332 tracker.whois_opt(addr).unwrap().user,
1333 Some("alice@example.com".to_string())
1334 );
1335 }
1336
1337 #[test]
1339 fn user_profile_best_label_prefers_login() {
1340 assert_eq!(
1341 profile(1, "alice@example.com").best_label(),
1342 Some("alice@example.com".to_string())
1343 );
1344 let display_only = ts_control::UserProfile {
1345 id: 2,
1346 login_name: String::new(),
1347 display_name: Some("Bob".to_string()),
1348 };
1349 assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1350 let empty = ts_control::UserProfile {
1351 id: 3,
1352 login_name: String::new(),
1353 display_name: None,
1354 };
1355 assert_eq!(empty.best_label(), None);
1356 }
1357}