1use crate::nat_traversal_api::PeerId;
11use crate::reachability::{ReachabilityScope, socket_addr_scope};
12use serde::{Deserialize, Serialize};
13use std::collections::HashSet;
14use std::net::SocketAddr;
15use std::time::{Duration, SystemTime};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CachedPeer {
20 #[serde(with = "peer_id_serde")]
22 pub peer_id: PeerId,
23
24 pub addresses: Vec<SocketAddr>,
26
27 pub capabilities: PeerCapabilities,
29
30 pub first_seen: SystemTime,
32
33 pub last_seen: SystemTime,
35
36 pub last_attempt: Option<SystemTime>,
38
39 pub stats: ConnectionStats,
41
42 #[serde(default = "default_quality_score")]
44 pub quality_score: f64,
45
46 pub source: PeerSource,
48
49 #[serde(default)]
51 pub relay_paths: Vec<RelayPathHint>,
52
53 #[serde(default)]
55 pub token: Option<Vec<u8>>,
56}
57
58fn default_quality_score() -> f64 {
59 0.5
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ReachableAddressRecord {
65 pub address: SocketAddr,
67 pub scope: ReachabilityScope,
69 pub verified_at: SystemTime,
71}
72
73#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct PeerCapabilities {
76 pub supports_relay: bool,
85
86 pub supports_coordination: bool,
91
92 #[serde(default)]
97 pub hinted_supports_relay: bool,
98
99 #[serde(default)]
104 pub hinted_supports_coordination: bool,
105
106 #[serde(default)]
108 pub protocols: HashSet<String>,
109
110 pub nat_type: Option<NatType>,
112
113 #[serde(default)]
115 pub external_addresses: Vec<SocketAddr>,
116
117 #[serde(default)]
123 pub reachable_addresses: Vec<ReachableAddressRecord>,
124
125 pub direct_reachability_scope: Option<ReachabilityScope>,
127}
128
129impl PeerCapabilities {
130 fn refresh_effective_helper_flags(&mut self) {
131 let globally_reachable = self.has_global_direct_reachability();
132 self.supports_relay = globally_reachable || self.hinted_supports_relay;
133 self.supports_coordination = globally_reachable || self.hinted_supports_coordination;
134 }
135
136 pub fn record_assist_hints(&mut self, supports_relay: bool, supports_coordination: bool) {
138 if supports_relay {
139 self.hinted_supports_relay = true;
140 }
141 if supports_coordination {
142 self.hinted_supports_coordination = true;
143 }
144 self.refresh_effective_helper_flags();
145 }
146
147 pub fn record_external_address(&mut self, addr: SocketAddr) {
149 if !self.external_addresses.contains(&addr) {
150 self.external_addresses.push(addr);
151 }
152 }
153
154 pub fn record_direct_observation(&mut self, addr: SocketAddr, observed_at: SystemTime) {
156 let scope = socket_addr_scope(addr).unwrap_or(ReachabilityScope::LocalNetwork);
157 if let Some(existing) = self
158 .reachable_addresses
159 .iter_mut()
160 .find(|entry| entry.address == addr)
161 {
162 existing.verified_at = observed_at;
163 existing.scope = scope;
164 } else {
165 self.reachable_addresses.push(ReachableAddressRecord {
166 address: addr,
167 scope,
168 verified_at: observed_at,
169 });
170 }
171 self.direct_reachability_scope = self
172 .reachable_addresses
173 .iter()
174 .map(|entry| entry.scope)
175 .max();
176
177 self.refresh_effective_helper_flags();
178 }
179
180 pub fn has_fresh_direct_reachability(&self, ttl: Duration, now: SystemTime) -> bool {
182 self.reachable_addresses.iter().any(|entry| {
183 now.duration_since(entry.verified_at)
184 .map(|age| age <= ttl)
185 .unwrap_or(false)
186 })
187 }
188
189 pub fn has_global_direct_reachability(&self) -> bool {
191 self.reachable_addresses
192 .iter()
193 .any(|entry| entry.scope == ReachabilityScope::Global)
194 }
195
196 pub fn refresh_direct_capabilities(&mut self, ttl: Duration, now: SystemTime) {
198 self.reachable_addresses.retain(|entry| {
199 now.duration_since(entry.verified_at)
200 .map(|age| age <= ttl)
201 .unwrap_or(false)
202 });
203
204 self.direct_reachability_scope = self
205 .reachable_addresses
206 .iter()
207 .map(|entry| entry.scope)
208 .max();
209
210 self.refresh_effective_helper_flags();
211 }
212
213 pub fn known_addresses(&self) -> Vec<SocketAddr> {
215 let mut addrs: Vec<SocketAddr> = self
216 .reachable_addresses
217 .iter()
218 .map(|entry| entry.address)
219 .collect();
220 for addr in &self.external_addresses {
221 if !addrs.contains(addr) {
222 addrs.push(*addr);
223 }
224 }
225 addrs
226 }
227
228 pub fn has_ipv4(&self) -> bool {
230 self.known_addresses().iter().any(|addr| addr.is_ipv4())
231 }
232
233 pub fn has_ipv6(&self) -> bool {
235 self.known_addresses().iter().any(|addr| addr.is_ipv6())
236 }
237
238 pub fn supports_dual_stack(&self) -> bool {
243 self.has_ipv4() && self.has_ipv6()
244 }
245
246 pub fn addresses_by_version(&self, ipv4: bool) -> Vec<SocketAddr> {
248 self.known_addresses()
249 .into_iter()
250 .filter(|addr| addr.is_ipv4() == ipv4)
251 .collect()
252 }
253
254 pub fn can_bridge(&self, source: &SocketAddr, target: &SocketAddr) -> bool {
256 let source_v4 = source.is_ipv4();
257 let target_v4 = target.is_ipv4();
258
259 if source_v4 == target_v4 {
261 return true;
262 }
263
264 self.supports_dual_stack()
266 }
267}
268
269#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
271pub enum NatType {
272 None,
274 FullCone,
276 AddressRestrictedCone,
278 PortRestrictedCone,
280 Symmetric,
282 Unknown,
284}
285
286#[derive(Debug, Clone, Default, Serialize, Deserialize)]
288pub struct ConnectionStats {
289 pub success_count: u32,
291
292 pub failure_count: u32,
294
295 pub avg_rtt_ms: u32,
297
298 pub min_rtt_ms: u32,
300
301 pub max_rtt_ms: u32,
303
304 pub bytes_relayed: u64,
306
307 pub coordinations_completed: u32,
309}
310
311#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
313pub enum PeerSource {
314 Seed,
316 Connection,
318 Relay,
320 Coordination,
322 Merge,
324 #[default]
326 Unknown,
327}
328
329#[derive(Debug, Clone)]
331pub struct ConnectionOutcome {
332 pub success: bool,
334 pub rtt_ms: Option<u32>,
336 pub capabilities_discovered: Option<PeerCapabilities>,
338}
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
345pub struct RelayPathHint {
346 #[serde(with = "peer_id_serde")]
348 pub relay_endpoint_id: PeerId,
349
350 pub relay_locators: Vec<SocketAddr>,
352
353 pub observed_latency_ms: Option<u32>,
355
356 pub last_used: SystemTime,
358}
359
360impl CachedPeer {
361 pub fn new(peer_id: PeerId, addresses: Vec<SocketAddr>, source: PeerSource) -> Self {
363 let now = SystemTime::now();
364 Self {
365 peer_id,
366 addresses,
367 capabilities: PeerCapabilities::default(),
368 first_seen: now,
369 last_seen: now,
370 last_attempt: None,
371 stats: ConnectionStats::default(),
372 quality_score: 0.5, source,
374 relay_paths: Vec::new(),
375 token: None,
376 }
377 }
378
379 pub fn record_success(&mut self, rtt_ms: u32, caps: Option<PeerCapabilities>) {
381 self.last_seen = SystemTime::now();
382 self.last_attempt = Some(SystemTime::now());
383 self.stats.success_count = self.stats.success_count.saturating_add(1);
384
385 if self.stats.avg_rtt_ms == 0 {
387 self.stats.avg_rtt_ms = rtt_ms;
388 self.stats.min_rtt_ms = rtt_ms;
389 self.stats.max_rtt_ms = rtt_ms;
390 } else {
391 self.stats.avg_rtt_ms = (self.stats.avg_rtt_ms * 7 + rtt_ms) / 8;
392 self.stats.min_rtt_ms = self.stats.min_rtt_ms.min(rtt_ms);
393 self.stats.max_rtt_ms = self.stats.max_rtt_ms.max(rtt_ms);
394 }
395
396 if let Some(caps) = caps {
397 self.capabilities = caps;
398 }
399 }
400
401 pub fn record_failure(&mut self) {
403 self.last_attempt = Some(SystemTime::now());
404 self.stats.failure_count = self.stats.failure_count.saturating_add(1);
405 }
406
407 pub fn calculate_quality(&mut self, weights: &super::config::QualityWeights) {
409 let total_attempts = self.stats.success_count + self.stats.failure_count;
410
411 let success_rate = if total_attempts > 0 {
413 self.stats.success_count as f64 / total_attempts as f64
414 } else {
415 0.5 };
417
418 let rtt_score = if self.stats.avg_rtt_ms > 0 {
421 1.0 - (self.stats.avg_rtt_ms as f64 / 1000.0).min(1.0)
422 } else {
423 0.5 };
425
426 let age_secs = self
428 .last_seen
429 .duration_since(SystemTime::UNIX_EPOCH)
430 .ok()
431 .and_then(|last_seen_epoch| {
432 SystemTime::now()
433 .duration_since(SystemTime::UNIX_EPOCH)
434 .ok()
435 .map(|now_epoch| {
436 now_epoch
437 .as_secs()
438 .saturating_sub(last_seen_epoch.as_secs())
439 })
440 })
441 .unwrap_or(0) as f64;
442
443 let freshness = (-age_secs * 0.693 / 86400.0).exp();
445
446 let mut cap_bonus: f64 = 0.0;
448 if self.capabilities.supports_relay {
449 cap_bonus += 0.25;
450 }
451 if self.capabilities.supports_coordination {
452 cap_bonus += 0.25;
453 }
454 if self.capabilities.supports_dual_stack() {
455 cap_bonus += 0.2; }
457 let cap_score = cap_bonus.min(1.0);
458
459 self.quality_score = (success_rate * weights.success_rate
461 + rtt_score * weights.rtt
462 + freshness * weights.freshness
463 + cap_score * weights.capabilities)
464 .clamp(0.0, 1.0);
465 }
466
467 pub fn is_stale(&self, threshold: Duration) -> bool {
469 self.last_seen
470 .elapsed()
471 .map(|age| age > threshold)
472 .unwrap_or(true)
473 }
474
475 pub fn success_rate(&self) -> f64 {
477 let total = self.stats.success_count + self.stats.failure_count;
478 if total == 0 {
479 0.5
480 } else {
481 self.stats.success_count as f64 / total as f64
482 }
483 }
484
485 pub fn preferred_addresses(&self) -> Vec<SocketAddr> {
491 let mut addrs = self.capabilities.known_addresses();
492 for addr in &self.addresses {
493 if !addrs.contains(addr) {
494 addrs.push(*addr);
495 }
496 }
497 addrs
498 }
499
500 pub fn merge_addresses(&mut self, other: &CachedPeer) {
502 for addr in &other.addresses {
503 if !self.addresses.contains(addr) {
504 self.addresses.push(*addr);
505 }
506 }
507 if self.addresses.len() > 10 {
509 self.addresses.truncate(10);
510 }
511 }
512}
513
514mod peer_id_serde {
516 use super::PeerId;
517 use serde::{Deserialize, Deserializer, Serialize, Serializer};
518
519 pub fn serialize<S>(peer_id: &PeerId, serializer: S) -> Result<S::Ok, S::Error>
520 where
521 S: Serializer,
522 {
523 hex::encode(peer_id.0).serialize(serializer)
524 }
525
526 pub fn deserialize<'de, D>(deserializer: D) -> Result<PeerId, D::Error>
527 where
528 D: Deserializer<'de>,
529 {
530 let s = String::deserialize(deserializer)?;
531 let bytes = hex::decode(&s).map_err(serde::de::Error::custom)?;
532 if bytes.len() != 32 {
533 return Err(serde::de::Error::custom("PeerId must be 32 bytes"));
534 }
535 let mut arr = [0u8; 32];
536 arr.copy_from_slice(&bytes);
537 Ok(PeerId(arr))
538 }
539}
540
541#[cfg(test)]
542mod tests {
543 use super::*;
544
545 #[test]
546 fn test_cached_peer_new() {
547 let peer_id = PeerId([1u8; 32]);
548 let peer = CachedPeer::new(
549 peer_id,
550 vec!["127.0.0.1:9000".parse().unwrap()],
551 PeerSource::Seed,
552 );
553
554 assert_eq!(peer.peer_id, peer_id);
555 assert_eq!(peer.addresses.len(), 1);
556 assert_eq!(peer.source, PeerSource::Seed);
557 assert!((peer.quality_score - 0.5).abs() < f64::EPSILON);
558 }
559
560 #[test]
561 fn test_record_success() {
562 let mut peer = CachedPeer::new(
563 PeerId([1u8; 32]),
564 vec!["127.0.0.1:9000".parse().unwrap()],
565 PeerSource::Seed,
566 );
567
568 peer.record_success(100, None);
569 assert_eq!(peer.stats.success_count, 1);
570 assert_eq!(peer.stats.avg_rtt_ms, 100);
571 assert_eq!(peer.stats.min_rtt_ms, 100);
572 assert_eq!(peer.stats.max_rtt_ms, 100);
573
574 peer.record_success(200, None);
575 assert_eq!(peer.stats.success_count, 2);
576 assert_eq!(peer.stats.avg_rtt_ms, 112);
578 assert_eq!(peer.stats.min_rtt_ms, 100);
579 assert_eq!(peer.stats.max_rtt_ms, 200);
580 }
581
582 #[test]
583 fn test_record_failure() {
584 let mut peer = CachedPeer::new(
585 PeerId([1u8; 32]),
586 vec!["127.0.0.1:9000".parse().unwrap()],
587 PeerSource::Seed,
588 );
589
590 peer.record_failure();
591 assert_eq!(peer.stats.failure_count, 1);
592 assert!(peer.last_attempt.is_some());
593 }
594
595 #[test]
596 fn test_success_rate() {
597 let mut peer = CachedPeer::new(
598 PeerId([1u8; 32]),
599 vec!["127.0.0.1:9000".parse().unwrap()],
600 PeerSource::Seed,
601 );
602
603 assert!((peer.success_rate() - 0.5).abs() < f64::EPSILON);
605
606 peer.record_success(100, None);
607 assert!((peer.success_rate() - 1.0).abs() < f64::EPSILON);
608
609 peer.record_failure();
610 assert!((peer.success_rate() - 0.5).abs() < f64::EPSILON);
611 }
612
613 #[test]
614 fn test_quality_calculation() {
615 let weights = super::super::config::QualityWeights::default();
616 let mut peer = CachedPeer::new(
617 PeerId([1u8; 32]),
618 vec!["127.0.0.1:9000".parse().unwrap()],
619 PeerSource::Seed,
620 );
621
622 peer.calculate_quality(&weights);
624 assert!(peer.quality_score > 0.3 && peer.quality_score < 0.7);
625
626 for _ in 0..5 {
628 peer.record_success(50, None); }
630 peer.calculate_quality(&weights);
631 assert!(peer.quality_score > 0.6);
632 }
633
634 #[test]
635 fn test_peer_serialization() {
636 let peer = CachedPeer::new(
637 PeerId([0xab; 32]),
638 vec!["127.0.0.1:9000".parse().unwrap()],
639 PeerSource::Seed,
640 );
641
642 let json = serde_json::to_string(&peer).unwrap();
643 let deserialized: CachedPeer = serde_json::from_str(&json).unwrap();
644
645 assert_eq!(deserialized.peer_id, peer.peer_id);
646 assert_eq!(deserialized.addresses, peer.addresses);
647 assert_eq!(deserialized.source, peer.source);
648 }
649
650 #[test]
651 fn test_peer_capabilities_dual_stack() {
652 let mut caps = PeerCapabilities::default();
653
654 assert!(!caps.supports_dual_stack());
656 assert!(!caps.has_ipv4());
657 assert!(!caps.has_ipv6());
658
659 caps.external_addresses
661 .push("127.0.0.1:9000".parse().unwrap());
662 assert!(!caps.supports_dual_stack());
663 assert!(caps.has_ipv4());
664 assert!(!caps.has_ipv6());
665
666 caps.external_addresses.push("[::1]:9001".parse().unwrap());
668 assert!(caps.supports_dual_stack());
669 assert!(caps.has_ipv4());
670 assert!(caps.has_ipv6());
671 }
672
673 #[test]
674 fn test_peer_capabilities_ipv6_only() {
675 let mut caps = PeerCapabilities::default();
676 caps.external_addresses.push("[::1]:9000".parse().unwrap());
677 caps.external_addresses.push("[::1]:9001".parse().unwrap());
678
679 assert!(!caps.supports_dual_stack());
680 assert!(!caps.has_ipv4());
681 assert!(caps.has_ipv6());
682 }
683
684 #[test]
685 fn test_peer_capabilities_can_bridge() {
686 let mut caps = PeerCapabilities::default();
687 caps.external_addresses
688 .push("127.0.0.1:9000".parse().unwrap());
689 caps.external_addresses.push("[::1]:9001".parse().unwrap());
690
691 let v4_src: SocketAddr = "192.168.1.1:1000".parse().unwrap();
692 let v4_dst: SocketAddr = "192.168.1.2:2000".parse().unwrap();
693 let v6_src: SocketAddr = "[2001:db8::1]:1000".parse().unwrap();
694 let v6_dst: SocketAddr = "[2001:db8::2]:2000".parse().unwrap();
695
696 assert!(caps.can_bridge(&v4_src, &v4_dst));
698 assert!(caps.can_bridge(&v6_src, &v6_dst));
699
700 assert!(caps.can_bridge(&v4_src, &v6_dst));
702 assert!(caps.can_bridge(&v6_src, &v4_dst));
703 }
704
705 #[test]
706 fn test_peer_capabilities_cannot_bridge_ipv4_only() {
707 let mut caps = PeerCapabilities::default();
708 caps.external_addresses
709 .push("127.0.0.1:9000".parse().unwrap());
710
711 let v4_addr: SocketAddr = "192.168.1.1:1000".parse().unwrap();
712 let v6_addr: SocketAddr = "[2001:db8::1]:1000".parse().unwrap();
713
714 assert!(caps.can_bridge(&v4_addr, &v4_addr));
716
717 assert!(!caps.can_bridge(&v4_addr, &v6_addr));
719 assert!(!caps.can_bridge(&v6_addr, &v4_addr));
720 }
721
722 #[test]
723 fn test_addresses_by_version() {
724 let mut caps = PeerCapabilities::default();
725 caps.external_addresses
726 .push("127.0.0.1:9000".parse().unwrap());
727 caps.external_addresses
728 .push("10.0.0.1:9001".parse().unwrap());
729 caps.external_addresses.push("[::1]:9002".parse().unwrap());
730
731 let v4_addrs = caps.addresses_by_version(true);
732 assert_eq!(v4_addrs.len(), 2);
733
734 let v6_addrs = caps.addresses_by_version(false);
735 assert_eq!(v6_addrs.len(), 1);
736 }
737
738 #[test]
739 fn test_known_addresses_prefer_directly_reachable_addresses() {
740 let mut caps = PeerCapabilities::default();
741 let direct: SocketAddr = "192.168.1.20:9000".parse().unwrap();
742 let external: SocketAddr = "203.0.113.10:9000".parse().unwrap();
743
744 caps.record_direct_observation(direct, SystemTime::now());
745 caps.record_external_address(external);
746 caps.record_external_address(direct);
747
748 let known = caps.known_addresses();
749 assert_eq!(known[0], direct);
750 assert!(known.contains(&external));
751 assert_eq!(known.iter().filter(|addr| **addr == direct).count(), 1);
752 }
753
754 #[test]
755 fn test_local_direct_observation_does_not_claim_global_helper_capability() {
756 let mut caps = PeerCapabilities::default();
757 let direct: SocketAddr = "192.168.1.20:9000".parse().unwrap();
758
759 caps.record_direct_observation(direct, SystemTime::now());
760 caps.refresh_direct_capabilities(Duration::from_secs(60), SystemTime::now());
761
762 assert_eq!(
763 caps.direct_reachability_scope,
764 Some(ReachabilityScope::LocalNetwork)
765 );
766 assert!(!caps.supports_relay);
767 assert!(!caps.supports_coordination);
768 }
769
770 #[test]
771 fn test_preferred_addresses_include_cached_fallbacks() {
772 let mut peer = CachedPeer::new(
773 PeerId([7; 32]),
774 vec!["198.51.100.7:9000".parse().unwrap()],
775 PeerSource::Seed,
776 );
777 peer.capabilities
778 .record_direct_observation("192.168.1.20:9000".parse().unwrap(), SystemTime::now());
779 peer.capabilities
780 .record_external_address("203.0.113.20:9000".parse().unwrap());
781
782 let preferred = peer.preferred_addresses();
783 assert_eq!(preferred[0], "192.168.1.20:9000".parse().unwrap());
784 assert!(preferred.contains(&"203.0.113.20:9000".parse().unwrap()));
785 assert!(preferred.contains(&"198.51.100.7:9000".parse().unwrap()));
786 }
787
788 #[test]
789 fn test_explicit_assist_hints_survive_direct_refresh() {
790 let mut caps = PeerCapabilities::default();
791 let now = SystemTime::now();
792
793 caps.record_assist_hints(true, true);
794 caps.record_direct_observation("203.0.113.20:9000".parse().unwrap(), now);
795 caps.refresh_direct_capabilities(Duration::from_secs(60), now + Duration::from_secs(120));
796
797 assert!(caps.reachable_addresses.is_empty());
798 assert!(caps.hinted_supports_relay);
799 assert!(caps.hinted_supports_coordination);
800 assert!(caps.supports_relay);
801 assert!(caps.supports_coordination);
802 assert_eq!(caps.direct_reachability_scope, None);
803 }
804
805 #[test]
806 fn test_refresh_direct_capabilities_prunes_stale_addresses() {
807 let mut caps = PeerCapabilities::default();
808 let direct: SocketAddr = "192.168.1.20:9000".parse().unwrap();
809 let now = SystemTime::now();
810
811 caps.record_direct_observation(direct, now - Duration::from_secs(120));
812 caps.refresh_direct_capabilities(Duration::from_secs(60), now);
813
814 assert!(caps.reachable_addresses.is_empty());
815 assert!(!caps.supports_relay);
816 assert!(!caps.supports_coordination);
817 assert_eq!(caps.direct_reachability_scope, None);
818 }
819}