1use std::{
4 collections::{HashMap, HashSet},
5 net::IpAddr,
6 sync::Arc,
7};
8
9use kameo::{
10 actor::ActorRef,
11 message::{Context, Message},
12 reply::ReplySender,
13};
14use tokio::sync::watch;
15use ts_control::{Node, UserId, UserProfile};
16use ts_transport::PeerId;
17
18use crate::{Error, env::Env, status::StatusNode};
19
20mod peer_db;
21
22pub use peer_db::PeerDb;
23
24pub struct PeerTracker {
26 peer_db: PeerDb,
27 seen_state_update: bool,
28 pending_requests: Vec<Pending>,
29 peer_watch: watch::Sender<Vec<StatusNode>>,
32 user_profiles: HashMap<UserId, UserProfile>,
39 tka_authority: Option<ts_tka::Authority>,
48 env: Env,
49}
50
51impl PeerTracker {
52 fn peer_by_name_opt(&self, name: &str) -> Option<&Node> {
53 self.peer_db.get(&name).map(|(_id, node)| node)
55 }
56
57 fn peer_by_tailnet_ip_opt(&self, ip: IpAddr) -> Option<&Node> {
58 self.peer_db.get(&ip).map(|(_id, node)| node)
59 }
60
61 fn status_peers(&self) -> Vec<StatusNode> {
63 self.peer_db
64 .peers()
65 .values()
66 .map(StatusNode::from_node)
67 .collect()
68 }
69
70 fn whois_opt(&self, addr: std::net::SocketAddr) -> Option<crate::status::WhoIs> {
71 let ip = crate::status::whois_addr(addr);
72 let node = self.peer_by_tailnet_ip_opt(ip).cloned()?;
73 let user = self.resolve_user(node.user_id);
77 Some(crate::status::WhoIs::from_node_with_user(node, user))
78 }
79
80 fn resolve_user(&self, user_id: UserId) -> Option<String> {
82 self.user_profiles
83 .get(&user_id)
84 .and_then(UserProfile::best_label)
85 }
86
87 fn tka_admits(&self, node: &Node) -> bool {
97 let Some(auth) = &self.tka_authority else {
98 return true;
99 };
100
101 if node.key_signature.is_empty() {
102 tracing::warn!(
105 stable_id = ?node.stable_id,
106 "TKA: rejecting unsigned peer under tailnet lock"
107 );
108 return false;
109 }
110
111 if let Err(e) = auth.node_key_authorized(&node.node_key.to_bytes(), &node.key_signature) {
112 tracing::warn!(
113 stable_id = ?node.stable_id,
114 error = %e,
115 "TKA: rejecting peer with unauthorized node key"
116 );
117 return false;
118 }
119
120 true
121 }
122}
123
124impl kameo::Actor for PeerTracker {
125 type Args = Env;
126 type Error = Error;
127
128 async fn on_start(env: Self::Args, slf: ActorRef<Self>) -> Result<Self, Self::Error> {
129 env.subscribe::<Arc<ts_control::StateUpdate>>(&slf).await?;
130
131 let (peer_watch, _) = watch::channel(Vec::new());
132
133 Ok(Self {
134 peer_db: PeerDb::default(),
135 pending_requests: Default::default(),
136 seen_state_update: false,
137 peer_watch,
138 user_profiles: HashMap::new(),
139 tka_authority: None,
142 env,
143 })
144 }
145}
146
147enum Pending {
148 PeerByName(PeerByName, ReplySender<Option<Node>>),
149 AcceptedRoute(PeerByAcceptedRoute, ReplySender<Vec<Node>>),
150 TailnetIp(PeerByTailnetIp, ReplySender<Option<Node>>),
151 Status(ReplySender<Vec<StatusNode>>),
152 WhoIs(Whois, ReplySender<Option<crate::status::WhoIs>>),
153}
154
155#[allow(missing_docs)]
159mod msg_impl {
160 use std::net::IpAddr;
161
162 use kameo::prelude::DelegatedReply;
163
164 use super::*;
165
166 #[kameo::messages]
167 impl PeerTracker {
168 #[message(ctx)]
172 pub async fn peer_by_name(
173 &mut self,
174 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
175 name: String,
176 ) -> DelegatedReply<Option<Node>> {
177 let (deleg, sender) = ctx.reply_sender();
178 let Some(sender) = sender else { return deleg };
179
180 if !self.seen_state_update {
181 tracing::debug!(query = name, "no peer state seen yet, queueing request");
182
183 self.pending_requests
184 .push(Pending::PeerByName(PeerByName { name }, sender));
185
186 return deleg;
187 }
188
189 sender.send(self.peer_by_name_opt(&name).cloned());
190
191 deleg
192 }
193
194 #[message(ctx)]
209 pub fn peer_by_accepted_route(
210 &mut self,
211 ctx: &mut Context<Self, DelegatedReply<Vec<Node>>>,
212 ip: IpAddr,
213 ) -> DelegatedReply<Vec<Node>> {
214 let (deleg, sender) = ctx.reply_sender();
215 let Some(sender) = sender else { return deleg };
216
217 if !self.seen_state_update {
218 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
219
220 self.pending_requests
221 .push(Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, sender));
222
223 return deleg;
224 }
225
226 sender.send(
227 self.peer_db
228 .get_route(ip.into())
229 .map(|(_id, node)| node.clone())
230 .collect(),
231 );
232
233 deleg
234 }
235
236 #[message(ctx)]
238 pub fn peer_by_tailnet_ip(
239 &mut self,
240 ctx: &mut Context<Self, DelegatedReply<Option<Node>>>,
241 ip: IpAddr,
242 ) -> DelegatedReply<Option<Node>> {
243 let (deleg, sender) = ctx.reply_sender();
244 let Some(sender) = sender else { return deleg };
245
246 if !self.seen_state_update {
247 tracing::debug!(query = %ip, "no peer state seen yet, queueing request");
248
249 self.pending_requests
250 .push(Pending::TailnetIp(PeerByTailnetIp { ip }, sender));
251
252 return deleg;
253 }
254
255 sender.send(self.peer_by_tailnet_ip_opt(ip).cloned());
256
257 deleg
258 }
259
260 #[message(ctx)]
267 pub fn get_status(
268 &mut self,
269 ctx: &mut Context<Self, DelegatedReply<Vec<StatusNode>>>,
270 ) -> DelegatedReply<Vec<StatusNode>> {
271 let (deleg, sender) = ctx.reply_sender();
272 let Some(sender) = sender else { return deleg };
273
274 if !self.seen_state_update {
275 tracing::debug!("no peer state seen yet, queueing status request");
276 self.pending_requests.push(Pending::Status(sender));
277 return deleg;
278 }
279
280 sender.send(self.status_peers());
281
282 deleg
283 }
284
285 #[message(ctx)]
296 pub fn whois(
297 &mut self,
298 ctx: &mut Context<Self, DelegatedReply<Option<crate::status::WhoIs>>>,
299 addr: std::net::SocketAddr,
300 ) -> DelegatedReply<Option<crate::status::WhoIs>> {
301 let (deleg, sender) = ctx.reply_sender();
302 let Some(sender) = sender else { return deleg };
303
304 if !self.seen_state_update {
305 tracing::debug!(query = %addr, "no peer state seen yet, queueing whois request");
306 self.pending_requests
307 .push(Pending::WhoIs(Whois { addr }, sender));
308 return deleg;
309 }
310
311 sender.send(self.whois_opt(addr));
312
313 deleg
314 }
315
316 #[message(derive(Clone))]
326 pub fn watch_netmap(&self) -> watch::Receiver<Vec<StatusNode>> {
327 self.peer_watch.subscribe()
328 }
329 }
330}
331
332pub use msg_impl::*;
333
334#[derive(Debug, Clone)]
335pub(crate) struct PeerState {
336 #[allow(unused)]
337 pub deletions: HashSet<PeerId>,
338 #[allow(unused)]
339 pub upserts: HashSet<PeerId>,
340 pub peers: Arc<PeerDb>,
341}
342
343impl Message<Arc<ts_control::StateUpdate>> for PeerTracker {
344 type Reply = ();
345
346 async fn handle(
347 &mut self,
348 msg: Arc<ts_control::StateUpdate>,
349 _ctx: &mut Context<Self, Self::Reply>,
350 ) {
351 for profile in &msg.user_profiles {
355 self.user_profiles.insert(profile.id, profile.clone());
356 }
357
358 let Some(peer_update) = &msg.peer_update else {
359 return;
360 };
361
362 let (upserts, deletions) = self.apply_peer_update(peer_update);
363
364 tracing::debug!(
365 n_upsert = upserts.len(),
366 n_delete = deletions.len(),
367 peer_count = self.peer_db.peers().len(),
368 "new peer state"
369 );
370
371 self.service_pending_requests();
372
373 self.peer_watch.send_replace(self.status_peers());
376
377 if let Err(e) = self
378 .env
379 .publish(Arc::new(PeerState {
380 upserts,
381 deletions,
382 peers: Arc::new(self.peer_db.clone()),
383 }))
384 .await
385 {
386 tracing::error!(error = %e, "publishing peer state update");
387 }
388 }
389}
390
391#[derive(Debug, Clone, Copy)]
396pub struct RepublishState;
397
398impl Message<RepublishState> for PeerTracker {
399 type Reply = ();
400
401 async fn handle(&mut self, _msg: RepublishState, _ctx: &mut Context<Self, Self::Reply>) {
402 if let Err(e) = self
406 .env
407 .publish(Arc::new(PeerState {
408 upserts: HashSet::default(),
409 deletions: HashSet::default(),
410 peers: Arc::new(self.peer_db.clone()),
411 }))
412 .await
413 {
414 tracing::error!(error = %e, "re-publishing peer state after exit-node change");
415 }
416 }
417}
418
419impl PeerTracker {
420 fn apply_peer_update(
429 &mut self,
430 peer_update: &ts_control::PeerUpdate,
431 ) -> (HashSet<PeerId>, HashSet<PeerId>) {
432 let mut upserts = HashSet::default();
433 let mut deletions = HashSet::default();
434
435 match peer_update {
436 ts_control::PeerUpdate::Full(new_nodes) => {
437 tracing::trace!("full peer update");
438
439 let retained_ids = new_nodes
447 .iter()
448 .filter(|node| self.tka_admits(node))
449 .map(|x| &x.stable_id)
450 .collect::<HashSet<_>>();
451
452 self.peer_db.retain(|id, peer| {
453 let retain = retained_ids.contains(&peer.stable_id);
454
455 if !retain {
456 deletions.insert(id);
457 }
458
459 retain
460 });
461
462 for node in new_nodes {
463 if !self.tka_admits(node) {
464 continue; }
466 let peer_id = self.peer_db.upsert(node);
467 upserts.insert(peer_id);
468 }
469 }
470
471 ts_control::PeerUpdate::Delta { remove, upsert } => {
472 tracing::trace!("delta peer update");
473
474 for peer in upsert {
475 if !self.tka_admits(peer) {
476 continue; }
478 let id = self.peer_db.upsert(peer);
479
480 upserts.insert(id);
481 }
482
483 for peer in remove {
484 let Some((id, _node)) = self.peer_db.remove(peer) else {
485 tracing::error!(control_node_id = peer, "removed peer was unknown");
486 continue;
487 };
488
489 deletions.insert(id);
490 }
491 }
492
493 ts_control::PeerUpdate::Patch(patches) => {
494 tracing::trace!(n = patches.len(), "peer patch update");
495
496 for patch in patches {
497 let Some((_id, existing)) = self.peer_db.get(&patch.id) else {
502 tracing::debug!(
503 control_node_id = patch.id,
504 "peer patch for unknown node; ignoring"
505 );
506 continue;
507 };
508
509 let mut node = existing.clone();
510 if let Some(endpoints) = &patch.underlay_addresses {
511 node.underlay_addresses = endpoints.clone();
512 }
513 if let Some(derp) = patch.derp_region {
514 node.derp_region = Some(derp);
515 }
516 if let Some(cap) = patch.cap {
517 node.cap = cap;
518 }
519 if let Some(cap_map) = &patch.cap_map {
520 node.cap_map = cap_map.clone();
521 }
522 if let Some(disco_key) = patch.disco_key {
523 node.disco_key = Some(disco_key);
524 }
525 if let Some(expiry) = patch.node_key_expiry {
526 node.node_key_expiry = Some(expiry);
527 }
528 if let Some(node_key) = patch.node_key {
532 node.node_key = node_key;
533 }
534 if let Some(sig) = &patch.key_signature {
535 node.key_signature = sig.clone();
536 }
537
538 if !self.tka_admits(&node) {
543 if let Some((id, _)) = self.peer_db.remove(&patch.id) {
544 tracing::warn!(
545 control_node_id = patch.id,
546 "peer patch rejected by tailnet lock; evicting peer"
547 );
548 deletions.insert(id);
549 }
550 continue;
551 }
552
553 let id = self.peer_db.upsert(&node);
554 upserts.insert(id);
555 }
556 }
557 }
558
559 (upserts, deletions)
560 }
561
562 #[cfg(test)]
566 fn for_test(env: Env, tka_authority: Option<ts_tka::Authority>) -> Self {
567 let (peer_watch, _) = watch::channel(Vec::new());
568 Self {
569 peer_db: PeerDb::default(),
570 seen_state_update: false,
571 pending_requests: Vec::new(),
572 peer_watch,
573 user_profiles: HashMap::new(),
574 tka_authority,
575 env,
576 }
577 }
578
579 fn service_pending_requests(&mut self) {
580 if self.seen_state_update {
581 return;
582 }
583
584 self.seen_state_update = true;
585
586 if !self.pending_requests.is_empty() {
587 tracing::debug!(
588 n_pending = self.pending_requests.len(),
589 "state update received, servicing pending requests"
590 );
591 }
592
593 for req in core::mem::take(&mut self.pending_requests) {
594 match req {
595 Pending::PeerByName(PeerByName { name }, reply) => {
596 reply.send(self.peer_by_name_opt(&name).cloned());
597 }
598 Pending::TailnetIp(PeerByTailnetIp { ip }, reply) => {
599 reply.send(self.peer_by_tailnet_ip_opt(ip).cloned());
600 }
601 Pending::AcceptedRoute(PeerByAcceptedRoute { ip }, reply) => {
602 reply.send(
603 self.peer_db
604 .get_route(ip.into())
605 .map(|(_id, node)| node.clone())
606 .collect(),
607 );
608 }
609 Pending::Status(reply) => {
610 reply.send(self.status_peers());
611 }
612 Pending::WhoIs(Whois { addr }, reply) => {
613 reply.send(self.whois_opt(addr));
614 }
615 }
616 }
617 }
618}
619
620#[cfg(test)]
621mod tka_tests {
622 use ed25519_dalek::{Signer, SigningKey};
632 use ts_control::{Node, StableNodeId, TailnetAddress};
633 use ts_tka::{
634 AumHash, Authority, Key, KeyKind, State,
635 cbor::{self, Value},
636 };
637
638 use super::*;
639
640 const SIG_KIND_DIRECT: u64 = 1;
642
643 const NODE_KEY_BYTES: [u8; 32] = [7u8; 32];
645
646 fn test_env() -> Env {
649 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
650 Env::new(
651 ts_keys::NodeState::generate(),
652 shutdown_rx,
653 crate::env::ForwarderConfig {
654 accept_routes: false,
655 exit_node: None,
656 forward_routes: Vec::new(),
657 forward_tcp_ports: Vec::new(),
658 forward_udp_ports: Vec::new(),
659 forward_all_ports: false,
660 forward_exit_egress: false,
661 exit_proxy: None,
662 peerapi_port: None,
663 taildrop_dir: None,
664 enable_ipv6: false,
665 persistent_keepalive_interval: None,
666 ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
667 },
668 )
669 }
670
671 fn peer_node(stable_id: &str, node_key: [u8; 32], key_signature: Vec<u8>) -> Node {
673 Node {
674 id: 1,
675 stable_id: StableNodeId(stable_id.to_string()),
676 hostname: stable_id.to_string(),
677 user_id: 0,
678 tailnet: Some("ts.net".to_string()),
679 tags: Vec::new(),
680 tailnet_address: TailnetAddress {
681 ipv4: "100.64.0.1/32".parse().unwrap(),
682 ipv6: "fd7a:115c:a1e0::1/128".parse().unwrap(),
683 },
684 node_key: node_key.into(),
685 node_key_expiry: None,
686 key_signature,
687 machine_key: None,
688 disco_key: None,
689 accepted_routes: Vec::new(),
690 underlay_addresses: Vec::new(),
691 derp_region: None,
692 cap: Default::default(),
693 cap_map: Default::default(),
694 peerapi_port: None,
695 peerapi_dns_proxy: false,
696 is_wireguard_only: false,
697 exit_node_dns_resolvers: Vec::new(),
698 peer_relay: false,
699 service_vips: Default::default(),
700 }
701 }
702
703 fn direct_sig_cbor(node_key: &[u8], key_id: &[u8], signature: Option<&[u8]>) -> Vec<u8> {
708 let mut pairs = alloc_pairs(node_key, key_id);
709 if let Some(sig) = signature {
710 pairs.push((4, Some(Value::Bytes(sig.to_vec()))));
711 }
712 cbor::int_map(pairs).to_vec()
713 }
714
715 fn alloc_pairs(node_key: &[u8], key_id: &[u8]) -> Vec<(u64, Option<Value>)> {
716 vec![
717 (1, Some(Value::Uint(SIG_KIND_DIRECT))),
718 (2, Some(Value::Bytes(node_key.to_vec()))),
719 (3, Some(Value::Bytes(key_id.to_vec()))),
720 ]
721 }
722
723 fn authority_and_valid_sig() -> (Authority, Vec<u8>) {
726 let signing = SigningKey::from_bytes(&[42u8; 32]);
728 let trusted_pub = signing.verifying_key().to_bytes().to_vec();
729
730 let authority = Authority::from_state(
731 AumHash([0; 32]),
732 State {
733 keys: vec![Key {
734 kind: KeyKind::Ed25519,
735 votes: 1,
736 public: trusted_pub.clone(),
737 }],
738 },
739 );
740
741 let preimage = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, None);
743 let sig_hash = ts_tka::aum_hash(&preimage).0;
744 let signature = signing.sign(&sig_hash).to_bytes().to_vec();
745
746 let signed_cbor = direct_sig_cbor(&NODE_KEY_BYTES, &trusted_pub, Some(&signature));
747 assert!(
749 authority
750 .node_key_authorized(&NODE_KEY_BYTES, &signed_cbor)
751 .is_ok()
752 );
753
754 (authority, signed_cbor)
755 }
756
757 #[tokio::test]
758 async fn tka_inactive_upserts_all_peers() {
759 let mut tracker = PeerTracker::for_test(test_env(), None);
761
762 let signed = peer_node("signed", [1u8; 32], vec![0xde, 0xad, 0xbe, 0xef]);
763 let unsigned = peer_node("unsigned", [2u8; 32], vec![]);
764
765 assert!(tracker.tka_admits(&signed));
766 assert!(tracker.tka_admits(&unsigned));
767
768 tracker.peer_db.upsert(&signed);
769 tracker.peer_db.upsert(&unsigned);
770 assert_eq!(tracker.peer_db.peers().len(), 2);
771 }
772
773 #[tokio::test]
774 async fn tka_active_rejects_unsigned_peer() {
775 let (authority, _sig) = authority_and_valid_sig();
777 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
778
779 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
780 assert!(!tracker.tka_admits(&unsigned));
781
782 if tracker.tka_admits(&unsigned) {
784 tracker.peer_db.upsert(&unsigned);
785 }
786 assert_eq!(tracker.peer_db.peers().len(), 0);
787 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
788 }
789
790 #[tokio::test]
791 async fn tka_active_rejects_bad_signature() {
792 let (authority, mut sig) = authority_and_valid_sig();
794 let last = sig.len() - 1;
796 sig[last] ^= 0xff;
797
798 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
799 let bad = peer_node("bad", NODE_KEY_BYTES, sig);
800 assert!(!tracker.tka_admits(&bad));
801
802 if tracker.tka_admits(&bad) {
803 tracker.peer_db.upsert(&bad);
804 }
805 assert_eq!(tracker.peer_db.peers().len(), 0);
806 }
807
808 #[tokio::test]
809 async fn tka_active_admits_authorized_peer() {
810 let (authority, sig) = authority_and_valid_sig();
812 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
813
814 let good = peer_node("good", NODE_KEY_BYTES, sig);
815 assert!(tracker.tka_admits(&good));
816
817 if tracker.tka_admits(&good) {
818 tracker.peer_db.upsert(&good);
819 }
820 assert_eq!(tracker.peer_db.peers().len(), 1);
821 assert!(tracker.peer_db.get(&good.node_key).is_some());
822 }
823
824 #[tokio::test]
832 async fn tka_active_delta_upsert_rejects_unauthorized() {
833 let (authority, _sig) = authority_and_valid_sig();
836 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
837
838 let unsigned = peer_node("unsigned", NODE_KEY_BYTES, vec![]);
839 let update = ts_control::PeerUpdate::Delta {
840 upsert: vec![unsigned.clone()],
841 remove: Vec::new(),
842 };
843
844 tracker.apply_peer_update(&update);
845
846 assert_eq!(tracker.peer_db.peers().len(), 0);
847 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
848 }
849
850 #[tokio::test]
851 async fn tka_active_delta_upsert_admits_authorized() {
852 let (authority, sig) = authority_and_valid_sig();
854 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
855
856 let good = peer_node("good", NODE_KEY_BYTES, sig);
857 let update = ts_control::PeerUpdate::Delta {
858 upsert: vec![good.clone()],
859 remove: Vec::new(),
860 };
861
862 tracker.apply_peer_update(&update);
863
864 assert_eq!(tracker.peer_db.peers().len(), 1);
865 assert!(tracker.peer_db.get(&good.node_key).is_some());
866 }
867
868 #[tokio::test]
869 async fn tka_active_full_admits_only_authorized_in_mixed_batch() {
870 let (authority, sig) = authority_and_valid_sig();
874 let mut bad_sig = sig.clone();
876 let last = bad_sig.len() - 1;
877 bad_sig[last] ^= 0xff;
878
879 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
880
881 let good = peer_node("good", NODE_KEY_BYTES, sig);
884 let unsigned = peer_node("unsigned", [8u8; 32], vec![]);
885 let bad = peer_node("bad", [9u8; 32], bad_sig);
886
887 let update =
888 ts_control::PeerUpdate::Full(vec![good.clone(), unsigned.clone(), bad.clone()]);
889
890 tracker.apply_peer_update(&update);
891
892 assert_eq!(tracker.peer_db.peers().len(), 1);
893 assert!(tracker.peer_db.get(&good.node_key).is_some());
894 assert!(tracker.peer_db.get(&unsigned.node_key).is_none());
895 assert!(tracker.peer_db.get(&bad.node_key).is_none());
896 }
897
898 #[tokio::test]
899 async fn tka_full_resync_revocation_behavior() {
900 let (authority, sig) = authority_and_valid_sig();
910 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
911
912 let good = peer_node("revoked", NODE_KEY_BYTES, sig.clone());
914 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
915 assert_eq!(tracker.peer_db.peers().len(), 1);
916 assert!(tracker.peer_db.get(&good.node_key).is_some());
917
918 let mut bad_sig = sig;
920 let last = bad_sig.len() - 1;
921 bad_sig[last] ^= 0xff;
922 let revoked = peer_node("revoked", NODE_KEY_BYTES, bad_sig);
923 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![revoked.clone()]));
924
925 assert_eq!(tracker.peer_db.peers().len(), 0);
927 assert!(tracker.peer_db.get(&revoked.node_key).is_none());
928 }
929
930 #[tokio::test]
931 async fn tka_inactive_full_resync_keeps_reincluded_peer() {
932 let mut tracker = PeerTracker::for_test(test_env(), None);
937
938 let peer = peer_node("p", NODE_KEY_BYTES, vec![0xde, 0xad]);
939 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
940 assert_eq!(tracker.peer_db.peers().len(), 1);
941
942 let resynced = peer_node("p", NODE_KEY_BYTES, vec![0x00]);
944 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![resynced.clone()]));
945 assert_eq!(tracker.peer_db.peers().len(), 1);
946 assert!(tracker.peer_db.get(&resynced.node_key).is_some());
947 }
948
949 #[tokio::test]
954 async fn patch_merges_endpoints_and_derp_into_existing_peer() {
955 let mut tracker = PeerTracker::for_test(test_env(), None);
956
957 let peer = peer_node("mover", [1u8; 32], vec![]);
959 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer.clone()]));
960 let (_pid, before) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
961 assert!(before.underlay_addresses.is_empty());
962 assert!(before.derp_region.is_none());
963
964 let new_ep: std::net::SocketAddr = "203.0.113.7:41641".parse().unwrap();
966 let patch = ts_control::PeerChange {
967 id: 1,
968 derp_region: Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap())),
969 cap: None,
970 cap_map: None,
971 underlay_addresses: Some(vec![new_ep]),
972 node_key: None,
973 key_signature: None,
974 disco_key: None,
975 node_key_expiry: None,
976 };
977 let (upserts, deletions) =
978 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
979
980 assert_eq!(upserts.len(), 1);
981 assert_eq!(deletions.len(), 0);
982 assert_eq!(tracker.peer_db.peers().len(), 1);
984 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
985 assert_eq!(after.underlay_addresses, vec![new_ep]);
986 assert_eq!(
987 after.derp_region,
988 Some(ts_derp::RegionId(core::num::NonZeroU32::new(5).unwrap()))
989 );
990 assert_eq!(after.node_key, peer.node_key);
991 }
992
993 #[tokio::test]
996 async fn patch_for_unknown_node_is_ignored() {
997 let mut tracker = PeerTracker::for_test(test_env(), None);
998 let known = peer_node("known", [1u8; 32], vec![]); tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![known]));
1000
1001 let patch = ts_control::PeerChange {
1002 id: 999, derp_region: None,
1004 cap: None,
1005 cap_map: None,
1006 underlay_addresses: Some(vec!["198.51.100.9:1".parse().unwrap()]),
1007 node_key: None,
1008 key_signature: None,
1009 disco_key: None,
1010 node_key_expiry: None,
1011 };
1012 let (upserts, deletions) =
1013 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1014
1015 assert_eq!(upserts.len(), 0);
1016 assert_eq!(deletions.len(), 0);
1017 assert_eq!(tracker.peer_db.peers().len(), 1);
1018 assert!(tracker.peer_db.get(&(999 as ts_control::NodeId)).is_none());
1019 }
1020
1021 #[tokio::test]
1024 async fn patch_updates_node_key_expiry() {
1025 let mut tracker = PeerTracker::for_test(test_env(), None);
1026 let peer = peer_node("expiring", [1u8; 32], vec![]); tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1028
1029 let expiry = "2027-01-01T00:00:00Z"
1030 .parse::<chrono::DateTime<chrono::Utc>>()
1031 .unwrap();
1032 let patch = ts_control::PeerChange {
1033 id: 1,
1034 derp_region: None,
1035 cap: None,
1036 cap_map: None,
1037 underlay_addresses: None,
1038 node_key: None,
1039 key_signature: None,
1040 disco_key: None,
1041 node_key_expiry: Some(expiry),
1042 };
1043 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1044
1045 let (_pid, after) = tracker.peer_db.get(&(1 as ts_control::NodeId)).unwrap();
1046 assert_eq!(after.node_key_expiry, Some(expiry));
1047 }
1048
1049 #[tokio::test]
1054 async fn patch_key_rotation_failing_tka_evicts_peer() {
1055 let (authority, sig) = authority_and_valid_sig();
1056 let mut tracker = PeerTracker::for_test(test_env(), Some(authority));
1057
1058 let good = peer_node("rotator", NODE_KEY_BYTES, sig.clone());
1060 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![good.clone()]));
1061 assert_eq!(tracker.peer_db.peers().len(), 1);
1062
1063 let patch = ts_control::PeerChange {
1065 id: 1,
1066 derp_region: None,
1067 cap: None,
1068 cap_map: None,
1069 underlay_addresses: None,
1070 node_key: Some([0x33u8; 32].into()),
1071 key_signature: Some(vec![0x00, 0x01, 0x02]),
1072 disco_key: None,
1073 node_key_expiry: None,
1074 };
1075 let (upserts, deletions) =
1076 tracker.apply_peer_update(&ts_control::PeerUpdate::Patch(vec![patch]));
1077
1078 assert_eq!(upserts.len(), 0);
1079 assert_eq!(deletions.len(), 1);
1080 assert_eq!(tracker.peer_db.peers().len(), 0);
1081 }
1082
1083 fn profile(id: ts_control::UserId, login: &str) -> ts_control::UserProfile {
1088 ts_control::UserProfile {
1089 id,
1090 login_name: login.to_string(),
1091 display_name: None,
1092 }
1093 }
1094
1095 #[tokio::test]
1096 async fn whois_resolves_user_from_accumulated_profiles() {
1097 let mut tracker = PeerTracker::for_test(test_env(), None);
1098
1099 let mut peer = peer_node("p", NODE_KEY_BYTES, Vec::new());
1101 peer.user_id = 42;
1102 tracker.apply_peer_update(&ts_control::PeerUpdate::Full(vec![peer]));
1103 let addr = "100.64.0.1:0".parse().unwrap();
1104
1105 let who = tracker.whois_opt(addr).expect("peer is known");
1107 assert_eq!(who.user, None);
1108
1109 tracker
1111 .user_profiles
1112 .insert(7, profile(7, "someone-else@example.com"));
1113 assert_eq!(tracker.whois_opt(addr).unwrap().user, None);
1114
1115 tracker
1118 .user_profiles
1119 .insert(42, profile(42, "alice@example.com"));
1120 assert_eq!(
1121 tracker.whois_opt(addr).unwrap().user,
1122 Some("alice@example.com".to_string())
1123 );
1124 }
1125
1126 #[test]
1128 fn user_profile_best_label_prefers_login() {
1129 assert_eq!(
1130 profile(1, "alice@example.com").best_label(),
1131 Some("alice@example.com".to_string())
1132 );
1133 let display_only = ts_control::UserProfile {
1134 id: 2,
1135 login_name: String::new(),
1136 display_name: Some("Bob".to_string()),
1137 };
1138 assert_eq!(display_only.best_label(), Some("Bob".to_string()));
1139 let empty = ts_control::UserProfile {
1140 id: 3,
1141 login_name: String::new(),
1142 display_name: None,
1143 };
1144 assert_eq!(empty.best_label(), None);
1145 }
1146}