1use std::{
4 collections::{HashMap, HashSet},
5 net::IpAddr,
6 sync::Arc,
7};
8
9use kameo::{
10 actor::ActorRef,
11 message::{Context, Message},
12 reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24pub struct PeerTracker {
26 peer_db: PeerDb,
27 seen_state_update: bool,
28 pending_requests: Vec<Pending>,
29 peer_watch: watch::Sender<Vec<StatusNode>>,
32 user_profiles: HashMap<UserId, UserProfile>,
39 tka_authority: Option<ts_tka::Authority>,
48 env: Env,
49}
50
51impl PeerTracker {
52 fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
53 self.peer_db.get(&name).map(|(_id, node)| node)
55 }
56
57 fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
58 self.peer_db.get(&ip).map(|(_id, node)| node)
59 }
60
61 fn status_peers(&self) -> Vec<StatusNode> {
63 self.peer_db
64 .peers()
65 .values()
66 .map(StatusNode::from_node)
67 .collect()
68 }
69
70 fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
71 let ip = crate::status::whois_addr(addr);
72 let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
73 let user = self.resolve_user(node.user_id);
77 Some(crate::status::WhoIs::from_node_with_user(node, user))
78 }
79
80 fn resolve_user(&self, user_id: UserId) -> Option<String> {
82 self.user_profiles
83 .get(&user_id)
84 .and_then(UserProfile::best_label)
85 }
86
87 fn tka_admits(&self, node: &Node) -> bool {
97 let Some(auth) = &self.tka_authority else {
98 return true;
99 };
100
101 if node.key_signature.is_empty() {
102 tracing::warn!(
105 stable_id = ?node.stable_id,
106 "TKA: rejecting unsigned peer under tailnet lock"
107 );
108 return false;
109 }
110
111 if let Err(e) = auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
112 tracing::warn!(
113 stable_id = ?node.stable_id,
114 error = %e,
115 "TKA: rejecting peer with unauthorized node key"
116 );
117 return false;
118 }
119
120 true
121 }
122}
123
124impl kameo::Actor for PeerTracker {
125 type Args = Env;
126 type Error = Error;
127
128 async fn on_start(env: Self::Args, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
129 env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
130
131 let (peer_watch, _) = watch::channel(Vec::new());
132
133 Ok(Self {
134 peer_db: PeerDb::default(),
135 pending_requests: Default::default(),
136 seen_state_update: false,
137 peer_watch,
138 user_profiles: HashMap::new(),
139 tka_authority: None,
142 env,
143 })
144 }
145}
146
147enum Pending {
148 PeerByName(PeerByName, ReplySender<Option<Node>>),
149 AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
150 TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
151 Status(ReplySender<Vec<StatusNode>>),
152 WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
153}
154
155#[allow(missing_docs)]
159mod msg_impl {
160 use std::net::IpAddr;
161
162 use kameo::prelude::DelegatedReply;
163
164 use super::*;
165
166 #[kameo::messages]
167 impl PeerTracker {
168 #[message(ctx)]
172 pub async fn peer_by_name(
173 &mut self,
174 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
175 name: String,
176 ) -> DelegatedReply<Option<Node>> {
177 let (deleg, sender) = ctx.reply_sender();
178 let Some(sender) = sender else { return deleg };
179
180 if !self.seen_state_update {
181 tracing::debug!(query = name, "no peer state seen yet, queueing request");
182
183 self.pending_requests
184 .push(Pending::PeerByName(PeerByName { name }, sender));
185
186 return deleg;
187 }
188
189 sender.send(self.peer_by_name_opt(&name).cloned());
190
191 deleg
192 }
193
194 #[message(ctx)]
209 pub fn peer_by_accepted_route(
210 &mut self,
211 ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
212 ip: IpAddr,
213 ) -> DelegatedReply<Vec<Node>> {
214 let (deleg, sender) = ctx.reply_sender();
215 let Some(sender) = sender else { return deleg };
216
217 if !self.seen_state_update {
218 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
219
220 self.pending_requests
221 .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
222
223 return deleg;
224 }
225
226 sender.send(
227 self.peer_db
228 .get_route(ip.into())
229 .map(|(_id, node)| node.clone())
230 .collect(),
231 );
232
233 deleg
234 }
235
236 #[message(ctx)]
238 pub fn peer_by_tailnet_ip(
239 &mut self,
240 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
241 ip: IpAddr,
242 ) -> DelegatedReply<Option<Node>> {
243 let (deleg, sender) = ctx.reply_sender();
244 let Some(sender) = sender else { return deleg };
245
246 if !self.seen_state_update {
247 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
248
249 self.pending_requests
250 .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
251
252 return deleg;
253 }
254
255 sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
256
257 deleg
258 }
259
260 #[message(ctx)]
267 pub fn get_status(
268 &mut self,
269 ctx: &mut Context<Self, DelegatedReply<Vec<StatusNode>>>,
270 ) -> DelegatedReply<Vec<StatusNode>> {
271 let (deleg, sender) = ctx.reply_sender();
272 let Some(sender) = sender else { return deleg };
273
274 if !self.seen_state_update {
275 tracing::debug!("no peer state seen yet, queueing status request");
276 self.pending_requests.push(Pending::Status(sender));
277 return deleg;
278 }
279
280 sender.send(self.status_peers());
281
282 deleg
283 }
284
285 #[message(ctx)]
296 pub fn whois(
297 &mut self,
298 ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
299 addr: std::net::SocketAddr,
300 ) -> DelegatedReply<Option<crate::status::WhoIs>> {
301 let (deleg, sender) = ctx.reply_sender();
302 let Some(sender) = sender else { return deleg };
303
304 if !self.seen_state_update {
305 tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
306 self.pending_requests
307 .push(Pending::WhoIs(Whois { addr }, sender));
308 return deleg;
309 }
310
311 sender.send(self.whois_opt(addr));
312
313 deleg
314 }
315
316 #[message(derive(Clone))]
326 pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
327 self.peer_watch.subscribe()
328 }
329 }
330}
331
332pub use msg_impl::*;
333
334#[derive(Debug, Clone)]
335pub(crate) struct PeerState {
336 #[allow(unused)]
337 pub deletions: HashSet<PeerId>,
338 #[allow(unused)]
339 pub upserts: HashSet<PeerId>,
340 pub peers: Arc<PeerDb>,
341}
342
343impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
344 type Reply = ();
345
346 async fn handle(
347 &mut self,
348 msg: Arc<ts_control::StateUpdate>,
349 _ctx: &mut Context<Self, Self::Reply>,
350 ) {
351 for profile in &msg.user_profiles {
355 self.user_profiles.insert(profile.id, profile.clone());
356 }
357
358 let Some(peer_update) = &msg.peer_update else {
359 return;
360 };
361
362 let (upserts, deletions) = self.apply_peer_update(peer_update);
363
364 tracing::debug!(
365 n_upsert = upserts.len(),
366 n_delete = deletions.len(),
367 peer_count = self.peer_db.peers().len(),
368 "new peer state"
369 );
370
371 self.service_pending_requests();
372
373 self.peer_watch.send_replace(self.status_peers());
376
377 if let Err(e) = self
378 .env
379 .publish(Arc::new(PeerState {
380 upserts,
381 deletions,
382 peers: Arc::new(self.peer_db.clone()),
383 }))
384 .await
385 {
386 tracing::error!(error = %e, "publishing peer state update");
387 }
388 }
389}
390
391#[derive(Debug, Clone, Copy)]
396pub struct RepublishState;
397
398impl Message<RepublishState> for PeerTracker {
399 type Reply = ();
400
401 async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
402 if let Err(e) = self
406 .env
407 .publish(Arc::new(PeerState {
408 upserts: HashSet::default(),
409 deletions: HashSet::default(),
410 peers: Arc::new(self.peer_db.clone()),
411 }))
412 .await
413 {
414 tracing::error!(error = %e, "re-publishing peer state after exit-node change");
415 }
416 }
417}
418
419impl PeerTracker {
420 fn apply_peer_update(
429 &mut self,
430 peer_update: &ts_control::PeerUpdate,
431 ) -> (HashSet<PeerId>, HashSet<PeerId>) {
432 let mut upserts = HashSet::default();
433 let mut deletions = HashSet::default();
434
435 match peer_update {
436 ts_control::PeerUpdate::Full(new_nodes) => {
437 tracing::trace!("full peer update");
438
439 let retained_ids = new_nodes
447 .iter()
448 .filter(|node| self.tka_admits(node))
449 .map(|x| &x.stable_id)
450 .collect::<HashSet<_>>();
451
452 self.peer_db.retain(|id, peer| {
453 let retain = retained_ids.contains(&peer.stable_id);
454
455 if !retain {
456 deletions.insert(id);
457 }
458
459 retain
460 });
461
462 for node in new_nodes {
463 if !self.tka_admits(node) {
464 continue; }
466 let peer_id = self.peer_db.upsert(node);
467 upserts.insert(peer_id);
468 }
469 }
470
471 ts_control::PeerUpdate::Delta { remove, upsert } => {
472 tracing::trace!("delta peer update");
473
474 for peer in upsert {
475 if !self.tka_admits(peer) {
476 continue; }
478 let id = self.peer_db.upsert(peer);
479
480 upserts.insert(id);
481 }
482
483 for peer in remove {
484 let Some((id, _node)) = self.peer_db.remove(peer) else {
485 tracing::error!(control_node_id = peer, "removed peer was unknown");
486 continue;
487 };
488
489 deletions.insert(id);
490 }
491 }
492
493 ts_control::PeerUpdate::Patch(patches) => {
494 tracing::trace!(n = patches.len(), "peer patch update");
495
496 for patch in patches {
497 let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
502 tracing::debug!(
503 control_node_id = patch.id,
504 "peer patch for unknown node; ignoring"
505 );
506 continue;
507 };
508
509 let mut node = existing.clone();
510 if let Some(endpoints) = &patch.underlay_addresses {
511 node.underlay_addresses = endpoints.clone();
512 }
513 if let Some(derp) = patch.derp_region {
514 node.derp_region = Some(derp);
515 }
516 if let Some(cap) = patch.cap {
517 node.cap = cap;
518 }
519 if let Some(cap_map) = &patch.cap_map {
520 node.cap_map = cap_map.clone();
521 }
522 if let Some(disco_key) = patch.disco_key {
523 node.disco_key = Some(disco_key);
524 }
525 if let Some(expiry) = patch.node_key_expiry {
526 node.node_key_expiry = Some(expiry);
527 }
528 if let Some(node_key) = patch.node_key {
532 node.node_key = node_key;
533 }
534 if let Some(sig) = &patch.key_signature {
535 node.key_signature = sig.clone();
536 }
537
538 if !self.tka_admits(&node) {
543 if let Some((id, _)) = self.peer_db.remove(&patch.id) {
544 tracing::warn!(
545 control_node_id = patch.id,
546 "peer patch rejected by tailnet lock; evicting peer"
547 );
548 deletions.insert(id);
549 }
550 continue;
551 }
552
553 let id = self.peer_db.upsert(&node);
554 upserts.insert(id);
555 }
556 }
557 }
558
559 (upserts, deletions)
560 }
561
562 #[cfg(test)]
566 fn for_test(env: Env, tka_authority: Option<ts_tka::Authority>) -> Self {
567 let (peer_watch, _) = watch::channel(Vec::new());
568 Self {
569 peer_db: PeerDb::default(),
570 seen_state_update: false,
571 pending_requests: Vec::new(),
572 peer_watch,
573 user_profiles: HashMap::new(),
574 tka_authority,
575 env,
576 }
577 }
578
579 fn service_pending_requests(&mut self) {
580 if self.seen_state_update {
581 return;
582 }
583
584 self.seen_state_update = true;
585
586 if !self.pending_requests.is_empty() {
587 tracing::debug!(
588 n_pending = self.pending_requests.len(),
589 "state update received, servicing pending requests"
590 );
591 }
592
593 for req in core::mem::take(&mut self.pending_requests) {
594 match req {
595 Pending::PeerByName(PeerByName { name }, reply) => {
596 reply.send(self.peer_by_name_opt(&name).cloned());
597 }
598 Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
599 reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
600 }
601 Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
602 reply.send(
603 self.peer_db
604 .get_route(ip.into())
605 .map(|(_id, node)| node.clone())
606 .collect(),
607 );
608 }
609 Pending::Status(reply) => {
610 reply.send(self.status_peers());
611 }
612 Pending::WhoIs(Whois { addr }, reply) => {
613 reply.send(self.whois_opt(addr));
614 }
615 }
616 }
617 }
618}
619
620#[cfg(test)]
621mod tka_tests {
622 use ed25519_dalek::{Signer, SigningKey};
632 use ts_control::{Node, StableNodeId, TailnetAddress};
633 use ts_tka::{
634 AumHash, Authority, Key, KeyKind, State,
635 cbor::{self, Value},
636 };
637
638 use super::*;
639
640 const SIG_KIND_DIRECT: u64 = 1;
642
643 const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
645
646 fn test_env() -> Env {
649 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
650 Env::new(
651 ts_keys::NodeState::generate(),
652 shutdown_rx,
653 crate::env::ForwarderConfig {
654 accept_routes: false,
655 exit_node: None,
656 forward_routes: Vec::new(),
657 forward_tcp_ports: Vec::new(),
658 forward_udp_ports: Vec::new(),
659 forward_all_ports: false,
660 forward_exit_egress: false,
661 exit_proxy: None,
662 peerapi_port: None,
663 taildrop_dir: None,
664 enable_ipv6: false,
665 ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
666 },
667 )
668 }
669
670 fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
672 Node {
673 id: 1,
674 stable_id: StableNodeId(stable_id.to_string()),
675 hostname: stable_id.to_string(),
676 user_id: 0,
677 tailnet: Some("ts.net".to_string()),
678 tags: Vec::new(),
679 tailnet_address: TailnetAddress {
680 ipv4: "100.64.0.1/32".parse().unwrap(),
681 ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
682 },
683 node_key: node_key.into(),
684 node_key_expiry: None,
685 key_signature,
686 machine_key: None,
687 disco_key: None,
688 accepted_routes: Vec::new(),
689 underlay_addresses: Vec::new(),
690 derp_region: None,
691 cap: Default::default(),
692 cap_map: Default::default(),
693 peerapi_port: None,
694 peerapi_dns_proxy: false,
695 is_wireguard_only: false,
696 exit_node_dns_resolvers: Vec::new(),
697 peer_relay: false,
698 service_vips: Default::default(),
699 }
700 }
701
702 fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
707 let mut pairs = alloc_pairs(node_key, key_id);
708 if let Some(sig) = signature {
709 pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
710 }
711 cbor::int_map(pairs).to_vec()
712 }
713
714 fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
715 vec![
716 (1, Some(Value::Uint(SIG_KIND_DIRECT))),
717 (2, Some(Value::Bytes(node_key.to_vec()))),
718 (3, Some(Value::Bytes(key_id.to_vec()))),
719 ]
720 }
721
722 fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
725 let signing = SigningKey::from_bytes(&[42u8; 32]);
727 let trusted_pub = signing.verifying_key().to_bytes().to_vec();
728
729 let authority = Authority::from_state(
730 AumHash([0; 32]),
731 State {
732 keys: vec![Key {
733 kind: KeyKind::Ed25519,
734 votes: 1,
735 public: trusted_pub.clone(),
736 }],
737 },
738 );
739
740 let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
742 let sig_hash = ts_tka::aum_hash(&preimage).0;
743 let signature = signing.sign(&sig_hash).to_bytes().to_vec();
744
745 let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
746 assert!(
748 authority
749 .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
750 .is_ok()
751 );
752
753 (authority, signed_cbor)
754 }
755
756 #[tokio::test]
757 async fn tka_inactive_upserts_all_peers() {
758 let mut tracker = PeerTracker::for_test(test_env(), None);
760
761 let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
762 let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
763
764 assert!(tracker.tka_admits(&signed));
765 assert!(tracker.tka_admits(&unsigned));
766
767 tracker.peer_db.upsert(&signed);
768 tracker.peer_db.upsert(&unsigned);
769 assert_eq!(tracker.peer_db.peers().len(), 2);
770 }
771
772 #[tokio::test]
773 async fn tka_active_rejects_unsigned_peer() {
774 let (authority, _sig) = authority_and_valid_sig();
776 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
777
778 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
779 assert!(!tracker.tka_admits(&unsigned));
780
781 if tracker.tka_admits(&unsigned) {
783 tracker.peer_db.upsert(&unsigned);
784 }
785 assert_eq!(tracker.peer_db.peers().len(), 0);
786 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
787 }
788
789 #[tokio::test]
790 async fn tka_active_rejects_bad_signature() {
791 let (authority, mut sig) = authority_and_valid_sig();
793 let last = sig.len() - 1;
795 sig[last] ^= 0xff;
796
797 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
798 let bad = peer_node("bad", NODE_KEY_BYTES, sig);
799 assert!(!tracker.tka_admits(&bad));
800
801 if tracker.tka_admits(&bad) {
802 tracker.peer_db.upsert(&bad);
803 }
804 assert_eq!(tracker.peer_db.peers().len(), 0);
805 }
806
807 #[tokio::test]
808 async fn tka_active_admits_authorized_peer() {
809 let (authority, sig) = authority_and_valid_sig();
811 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
812
813 let good = peer_node("good", NODE_KEY_BYTES, sig);
814 assert!(tracker.tka_admits(&good));
815
816 if tracker.tka_admits(&good) {
817 tracker.peer_db.upsert(&good);
818 }
819 assert_eq!(tracker.peer_db.peers().len(), 1);
820 assert!(tracker.peer_db.get(&good.node_key).is_some());
821 }
822
823 #[tokio::test]
831 async fn tka_active_delta_upsert_rejects_unauthorized() {
832 let (authority, _sig) = authority_and_valid_sig();
835 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
836
837 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
838 let update = ts_control::PeerUpdate::Delta {
839 upsert: vec![unsigned.clone()],
840 remove: Vec::new(),
841 };
842
843 tracker.apply_peer_update(&update);
844
845 assert_eq!(tracker.peer_db.peers().len(), 0);
846 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
847 }
848
849 #[tokio::test]
850 async fn tka_active_delta_upsert_admits_authorized() {
851 let (authority, sig) = authority_and_valid_sig();
853 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
854
855 let good = peer_node("good", NODE_KEY_BYTES, sig);
856 let update = ts_control::PeerUpdate::Delta {
857 upsert: vec![good.clone()],
858 remove: Vec::new(),
859 };
860
861 tracker.apply_peer_update(&update);
862
863 assert_eq!(tracker.peer_db.peers().len(), 1);
864 assert!(tracker.peer_db.get(&good.node_key).is_some());
865 }
866
867 #[tokio::test]
868 async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
869 let (authority, sig) = authority_and_valid_sig();
873 let mut bad_sig = sig.clone();
875 let last = bad_sig.len() - 1;
876 bad_sig[last] ^= 0xff;
877
878 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
879
880 let good = peer_node("good", NODE_KEY_BYTES, sig);
883 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
884 let bad = peer_node("bad", [9u8; 32], bad_sig);
885
886 let update =
887 ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
888
889 tracker.apply_peer_update(&update);
890
891 assert_eq!(tracker.peer_db.peers().len(), 1);
892 assert!(tracker.peer_db.get(&good.node_key).is_some());
893 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
894 assert!(tracker.peer_db.get(&bad.node_key).is_none());
895 }
896
897 #[tokio::test]
898 async fn tka_full_resync_revocation_behavior() {
899 let (authority, sig) = authority_and_valid_sig();
909 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
910
911 let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
913 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
914 assert_eq!(tracker.peer_db.peers().len(), 1);
915 assert!(tracker.peer_db.get(&good.node_key).is_some());
916
917 let mut bad_sig = sig;
919 let last = bad_sig.len() - 1;
920 bad_sig[last] ^= 0xff;
921 let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
922 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
923
924 assert_eq!(tracker.peer_db.peers().len(), 0);
926 assert!(tracker.peer_db.get(&revoked.node_key).is_none());
927 }
928
929 #[tokio::test]
930 async fn tka_inactive_full_resync_keeps_reincluded_peer() {
931 let mut tracker = PeerTracker::for_test(test_env(), None);
936
937 let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
938 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
939 assert_eq!(tracker.peer_db.peers().len(), 1);
940
941 let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
943 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
944 assert_eq!(tracker.peer_db.peers().len(), 1);
945 assert!(tracker.peer_db.get(&resynced.node_key).is_some());
946 }
947
948 #[tokio::test]
953 async fn patch_merges_endpoints_and_derp_into_existing_peer() {
954 let mut tracker = PeerTracker::for_test(test_env(), None);
955
956 let peer = peer_node("mover", [1u8; 32], vec![]);
958 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
959 let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
960 assert!(before.underlay_addresses.is_empty());
961 assert!(before.derp_region.is_none());
962
963 let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
965 let patch = ts_control::PeerChange {
966 id: 1,
967 derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
968 cap: None,
969 cap_map: None,
970 underlay_addresses: Some(vec![new_ep]),
971 node_key: None,
972 key_signature: None,
973 disco_key: None,
974 node_key_expiry: None,
975 };
976 let (upserts, deletions) =
977 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
978
979 assert_eq!(upserts.len(), 1);
980 assert_eq!(deletions.len(), 0);
981 assert_eq!(tracker.peer_db.peers().len(), 1);
983 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
984 assert_eq!(after.underlay_addresses, vec![new_ep]);
985 assert_eq!(
986 after.derp_region,
987 Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
988 );
989 assert_eq!(after.node_key, peer.node_key);
990 }
991
992 #[tokio::test]
995 async fn patch_for_unknown_node_is_ignored() {
996 let mut tracker = PeerTracker::for_test(test_env(), None);
997 let known = peer_node("known", [1u8; 32], vec![]); tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
999
1000 let patch = ts_control::PeerChange {
1001 id: 999, derp_region: None,
1003 cap: None,
1004 cap_map: None,
1005 underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1006 node_key: None,
1007 key_signature: None,
1008 disco_key: None,
1009 node_key_expiry: None,
1010 };
1011 let (upserts, deletions) =
1012 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1013
1014 assert_eq!(upserts.len(), 0);
1015 assert_eq!(deletions.len(), 0);
1016 assert_eq!(tracker.peer_db.peers().len(), 1);
1017 assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1018 }
1019
1020 #[tokio::test]
1023 async fn patch_updates_node_key_expiry() {
1024 let mut tracker = PeerTracker::for_test(test_env(), None);
1025 let peer = peer_node("expiring", [1u8; 32], vec![]); tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1027
1028 let expiry = "2027-01-01T00:00:00Z"
1029 .parse::<chrono::DateTime<chrono::Utc>>()
1030 .unwrap();
1031 let patch = ts_control::PeerChange {
1032 id: 1,
1033 derp_region: None,
1034 cap: None,
1035 cap_map: None,
1036 underlay_addresses: None,
1037 node_key: None,
1038 key_signature: None,
1039 disco_key: None,
1040 node_key_expiry: Some(expiry),
1041 };
1042 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1043
1044 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1045 assert_eq!(after.node_key_expiry, Some(expiry));
1046 }
1047
1048 #[tokio::test]
1053 async fn patch_key_rotation_failing_tka_evicts_peer() {
1054 let (authority, sig) = authority_and_valid_sig();
1055 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
1056
1057 let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1059 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1060 assert_eq!(tracker.peer_db.peers().len(), 1);
1061
1062 let patch = ts_control::PeerChange {
1064 id: 1,
1065 derp_region: None,
1066 cap: None,
1067 cap_map: None,
1068 underlay_addresses: None,
1069 node_key: Some([0x33u8; 32].into()),
1070 key_signature: Some(vec![0x00, 0x01, 0x02]),
1071 disco_key: None,
1072 node_key_expiry: None,
1073 };
1074 let (upserts, deletions) =
1075 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1076
1077 assert_eq!(upserts.len(), 0);
1078 assert_eq!(deletions.len(), 1);
1079 assert_eq!(tracker.peer_db.peers().len(), 0);
1080 }
1081
1082 fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1087 ts_control::UserProfile {
1088 id,
1089 login_name: login.to_string(),
1090 display_name: None,
1091 }
1092 }
1093
1094 #[tokio::test]
1095 async fn whois_resolves_user_from_accumulated_profiles() {
1096 let mut tracker = PeerTracker::for_test(test_env(), None);
1097
1098 let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1100 peer.user_id = 42;
1101 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1102 let addr = "100.64.0.1:0".parse().unwrap();
1103
1104 let who = tracker.whois_opt(addr).expect("peer is known");
1106 assert_eq!(who.user, None);
1107
1108 tracker
1110 .user_profiles
1111 .insert(7, profile(7, "someone-else@example.com"));
1112 assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1113
1114 tracker
1117 .user_profiles
1118 .insert(42, profile(42, "alice@example.com"));
1119 assert_eq!(
1120 tracker.whois_opt(addr).unwrap().user,
1121 Some("alice@example.com".to_string())
1122 );
1123 }
1124
1125 #[test]
1127 fn user_profile_best_label_prefers_login() {
1128 assert_eq!(
1129 profile(1, "alice@example.com").best_label(),
1130 Some("alice@example.com".to_string())
1131 );
1132 let display_only = ts_control::UserProfile {
1133 id: 2,
1134 login_name: String::new(),
1135 display_name: Some("Bob".to_string()),
1136 };
1137 assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1138 let empty = ts_control::UserProfile {
1139 id: 3,
1140 login_name: String::new(),
1141 display_name: None,
1142 };
1143 assert_eq!(empty.best_label(), None);
1144 }
1145}