1use super::config::BootstrapCacheConfig;
11use super::entry::{CachedPeer, ConnectionOutcome, PeerCapabilities, PeerSource};
12use super::persistence::{CacheData, CachePersistence};
13use super::selection::select_epsilon_greedy;
14use crate::nat_traversal_api::PeerId;
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::{Instant, SystemTime};
18use tokio::sync::{RwLock, broadcast};
19use tracing::{debug, info, warn};
20
21#[derive(Debug, Clone)]
23pub enum CacheEvent {
24 Updated {
26 peer_count: usize,
28 },
29 Saved,
31 Merged {
33 added: usize,
35 },
36 Cleaned {
38 removed: usize,
40 },
41}
42
43#[derive(Debug, Clone, Default)]
45pub struct CacheStats {
46 pub total_peers: usize,
48 pub relay_peers: usize,
50 pub coordinator_peers: usize,
52 pub dual_stack_relay_peers: usize,
54 pub average_quality: f64,
56 pub untested_peers: usize,
58}
59
60#[derive(Debug)]
66pub struct BootstrapCache {
67 config: BootstrapCacheConfig,
68 data: Arc<RwLock<CacheData>>,
69 persistence: CachePersistence,
70 event_tx: broadcast::Sender<CacheEvent>,
71 last_save: Arc<RwLock<Instant>>,
72 last_cleanup: Arc<RwLock<Instant>>,
73}
74
75impl BootstrapCache {
76 pub async fn open(config: BootstrapCacheConfig) -> std::io::Result<Self> {
81 let persistence = CachePersistence::new(&config.cache_dir, config.enable_file_locking)?;
82 let data = persistence.load()?;
83 let (event_tx, _) = broadcast::channel(256);
84 let now = Instant::now();
85
86 info!("Opened bootstrap cache with {} peers", data.peers.len());
87
88 Ok(Self {
89 config,
90 data: Arc::new(RwLock::new(data)),
91 persistence,
92 event_tx,
93 last_save: Arc::new(RwLock::new(now)),
94 last_cleanup: Arc::new(RwLock::new(now)),
95 })
96 }
97
98 pub fn subscribe(&self) -> broadcast::Receiver<CacheEvent> {
100 self.event_tx.subscribe()
101 }
102
103 pub async fn peer_count(&self) -> usize {
105 self.data.read().await.peers.len()
106 }
107
108 pub async fn get_peer(&self, peer_id: &PeerId) -> Option<CachedPeer> {
110 let mut data = self.data.write().await;
111 let peer = data.peers.get_mut(&peer_id.0)?;
112 peer.capabilities
113 .refresh_direct_capabilities(self.config.reachability_ttl, SystemTime::now());
114 peer.calculate_quality(&self.config.weights);
115 Some(peer.clone())
116 }
117
118 fn refresh_cached_peer(&self, peer: &mut CachedPeer, now: SystemTime) {
119 peer.capabilities
120 .refresh_direct_capabilities(self.config.reachability_ttl, now);
121 peer.calculate_quality(&self.config.weights);
122 }
123
124 pub async fn select_peers(&self, count: usize) -> Vec<CachedPeer> {
129 let mut data = self.data.write().await;
130 let now = SystemTime::now();
131 for peer in data.peers.values_mut() {
132 self.refresh_cached_peer(peer, now);
133 }
134 let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
135 drop(data);
136
137 select_epsilon_greedy(&peers, count, self.config.epsilon)
138 .into_iter()
139 .cloned()
140 .collect()
141 }
142
143 pub async fn select_relay_peers(&self, count: usize) -> Vec<CachedPeer> {
147 let mut data = self.data.write().await;
148 let now = SystemTime::now();
149 for peer in data.peers.values_mut() {
150 self.refresh_cached_peer(peer, now);
151 }
152 let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
153 drop(data);
154
155 super::selection::select_with_capabilities(&peers, count, true, false)
156 .into_iter()
157 .cloned()
158 .collect()
159 }
160
161 pub async fn select_coordinators(&self, count: usize) -> Vec<CachedPeer> {
165 let mut data = self.data.write().await;
166 let now = SystemTime::now();
167 for peer in data.peers.values_mut() {
168 self.refresh_cached_peer(peer, now);
169 }
170 let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
171 drop(data);
172
173 super::selection::select_with_capabilities(&peers, count, false, true)
174 .into_iter()
175 .cloned()
176 .collect()
177 }
178
179 pub async fn select_relays_for_target(
189 &self,
190 count: usize,
191 target: &std::net::SocketAddr,
192 prefer_dual_stack: bool,
193 ) -> Vec<CachedPeer> {
194 use super::selection::select_relays_for_target;
195
196 let mut data = self.data.write().await;
197 let now = SystemTime::now();
198 for peer in data.peers.values_mut() {
199 self.refresh_cached_peer(peer, now);
200 }
201 let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
202 drop(data);
203
204 select_relays_for_target(&peers, count, *target, prefer_dual_stack)
205 .into_iter()
206 .cloned()
207 .collect()
208 }
209
210 pub async fn select_dual_stack_relays(&self, count: usize) -> Vec<CachedPeer> {
214 use super::selection::select_dual_stack_relays;
215
216 let mut data = self.data.write().await;
217 let now = SystemTime::now();
218 for peer in data.peers.values_mut() {
219 self.refresh_cached_peer(peer, now);
220 }
221 let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
222 drop(data);
223
224 select_dual_stack_relays(&peers, count)
225 .into_iter()
226 .cloned()
227 .collect()
228 }
229
230 pub async fn upsert(&self, peer: CachedPeer) {
234 let mut data = self.data.write().await;
235
236 if data.peers.len() >= self.config.max_peers && !data.peers.contains_key(&peer.peer_id.0) {
238 self.evict_lowest_quality(&mut data);
239 }
240
241 data.peers.insert(peer.peer_id.0, peer);
242
243 let count = data.peers.len();
244 drop(data);
245
246 let _ = self
247 .event_tx
248 .send(CacheEvent::Updated { peer_count: count });
249 }
250
251 pub async fn add_seed(&self, peer_id: PeerId, addresses: Vec<SocketAddr>) {
253 let peer = CachedPeer::new(peer_id, addresses, PeerSource::Seed);
254 self.upsert(peer).await;
255 }
256
257 pub async fn add_from_connection(
259 &self,
260 peer_id: PeerId,
261 addresses: Vec<SocketAddr>,
262 caps: Option<PeerCapabilities>,
263 ) {
264 let mut peer = CachedPeer::new(peer_id, addresses, PeerSource::Connection);
265 if let Some(caps) = caps {
266 peer.capabilities = caps;
267 }
268 self.upsert(peer).await;
269 }
270
271 pub async fn record_outcome(&self, peer_id: &PeerId, outcome: ConnectionOutcome) {
273 let mut data = self.data.write().await;
274
275 if let Some(peer) = data.peers.get_mut(&peer_id.0) {
276 if outcome.success {
277 peer.record_success(
278 outcome.rtt_ms.unwrap_or(100),
279 outcome.capabilities_discovered,
280 );
281 } else {
282 peer.record_failure();
283 }
284
285 peer.calculate_quality(&self.config.weights);
287 }
288 }
289
290 pub async fn record_success(&self, peer_id: &PeerId, rtt_ms: u32) {
292 self.record_outcome(
293 peer_id,
294 ConnectionOutcome {
295 success: true,
296 rtt_ms: Some(rtt_ms),
297 capabilities_discovered: None,
298 },
299 )
300 .await;
301 }
302
303 pub async fn record_failure(&self, peer_id: &PeerId) {
305 self.record_outcome(
306 peer_id,
307 ConnectionOutcome {
308 success: false,
309 rtt_ms: None,
310 capabilities_discovered: None,
311 },
312 )
313 .await;
314 }
315
316 pub async fn update_capabilities(&self, peer_id: &PeerId, caps: PeerCapabilities) {
318 let mut data = self.data.write().await;
319
320 if let Some(peer) = data.peers.get_mut(&peer_id.0) {
321 peer.capabilities = caps;
322 peer.calculate_quality(&self.config.weights);
323 }
324 }
325
326 pub async fn observe_direct_reachability(&self, peer_id: PeerId, address: SocketAddr) {
332 let mut data = self.data.write().await;
333 let now = SystemTime::now();
334
335 let peer = data
336 .peers
337 .entry(peer_id.0)
338 .or_insert_with(|| CachedPeer::new(peer_id, vec![address], PeerSource::Connection));
339
340 if !peer.addresses.contains(&address) {
341 peer.addresses.push(address);
342 }
343
344 peer.last_seen = now;
345 peer.last_attempt = Some(now);
346 peer.stats.success_count = peer.stats.success_count.saturating_add(1);
347 peer.capabilities.record_direct_observation(address, now);
348 self.refresh_cached_peer(peer, now);
349
350 let count = data.peers.len();
351 drop(data);
352
353 let _ = self
354 .event_tx
355 .send(CacheEvent::Updated { peer_count: count });
356 }
357
358 pub async fn get(&self, peer_id: &PeerId) -> Option<CachedPeer> {
360 let mut data = self.data.write().await;
361 let peer = data.peers.get_mut(&peer_id.0)?;
362 self.refresh_cached_peer(peer, SystemTime::now());
363 Some(peer.clone())
364 }
365
366 pub async fn update_token(&self, peer_id: PeerId, token: Vec<u8>) {
368 let mut data = self.data.write().await;
369 if let Some(peer) = data.peers.get_mut(&peer_id.0) {
370 peer.token = Some(token);
371 }
372 }
373
374 pub async fn get_all_tokens(&self) -> std::collections::HashMap<PeerId, Vec<u8>> {
376 self.data
377 .read()
378 .await
379 .peers
380 .values()
381 .filter_map(|p| p.token.clone().map(|t| (p.peer_id, t)))
382 .collect()
383 }
384
385 pub async fn contains(&self, peer_id: &PeerId) -> bool {
387 self.data.read().await.peers.contains_key(&peer_id.0)
388 }
389
390 pub async fn remove(&self, peer_id: &PeerId) -> Option<CachedPeer> {
392 self.data.write().await.peers.remove(&peer_id.0)
393 }
394
395 pub async fn save(&self) -> std::io::Result<()> {
397 let mut data = self.data.write().await;
398
399 if data.peers.len() < self.config.min_peers_to_save {
400 debug!(
401 "Skipping save: only {} peers (min: {})",
402 data.peers.len(),
403 self.config.min_peers_to_save
404 );
405 return Ok(());
406 }
407
408 self.persistence.save(&mut data)?;
409
410 drop(data);
411 *self.last_save.write().await = Instant::now();
412 let _ = self.event_tx.send(CacheEvent::Saved);
413
414 Ok(())
415 }
416
417 pub async fn cleanup_stale(&self) -> usize {
422 let mut data = self.data.write().await;
423 let initial_count = data.peers.len();
424
425 data.peers
426 .retain(|_, peer| !peer.is_stale(self.config.stale_threshold));
427
428 let removed = initial_count - data.peers.len();
429
430 if removed > 0 {
431 info!("Cleaned up {} stale peers", removed);
432 let _ = self.event_tx.send(CacheEvent::Cleaned { removed });
433 }
434
435 drop(data);
436 *self.last_cleanup.write().await = Instant::now();
437
438 removed
439 }
440
441 pub async fn recalculate_quality(&self) {
443 let mut data = self.data.write().await;
444
445 for peer in data.peers.values_mut() {
446 peer.calculate_quality(&self.config.weights);
447 }
448
449 let count = data.peers.len();
450 let _ = self
451 .event_tx
452 .send(CacheEvent::Updated { peer_count: count });
453 }
454
455 pub async fn stats(&self) -> CacheStats {
457 let mut data = self.data.write().await;
458 let now = SystemTime::now();
459 for peer in data.peers.values_mut() {
460 self.refresh_cached_peer(peer, now);
461 }
462
463 let relay_count = data
464 .peers
465 .values()
466 .filter(|p| p.capabilities.supports_relay)
467 .count();
468 let coord_count = data
469 .peers
470 .values()
471 .filter(|p| p.capabilities.supports_coordination)
472 .count();
473 let dual_stack_count = data
474 .peers
475 .values()
476 .filter(|p| p.capabilities.supports_relay && p.capabilities.supports_dual_stack())
477 .count();
478 let untested = data
479 .peers
480 .values()
481 .filter(|p| p.stats.success_count + p.stats.failure_count == 0)
482 .count();
483 let avg_quality = if data.peers.is_empty() {
484 0.0
485 } else {
486 data.peers.values().map(|p| p.quality_score).sum::<f64>() / data.peers.len() as f64
487 };
488
489 CacheStats {
490 total_peers: data.peers.len(),
491 relay_peers: relay_count,
492 coordinator_peers: coord_count,
493 dual_stack_relay_peers: dual_stack_count,
494 average_quality: avg_quality,
495 untested_peers: untested,
496 }
497 }
498
499 pub fn start_maintenance(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
508 let cache = self;
509
510 tokio::spawn(async move {
511 let mut save_interval = tokio::time::interval(cache.config.save_interval);
512 let mut cleanup_interval = tokio::time::interval(cache.config.cleanup_interval);
513 let mut quality_interval = tokio::time::interval(cache.config.quality_update_interval);
514
515 loop {
516 tokio::select! {
517 _ = save_interval.tick() => {
518 if let Err(e) = cache.save().await {
519 warn!("Failed to save cache: {}", e);
520 }
521 }
522 _ = cleanup_interval.tick() => {
523 cache.cleanup_stale().await;
524 }
525 _ = quality_interval.tick() => {
526 cache.recalculate_quality().await;
527 }
528 }
529 }
530 })
531 }
532
533 pub async fn all_peers(&self) -> Vec<CachedPeer> {
535 let mut data = self.data.write().await;
536 let now = SystemTime::now();
537 for peer in data.peers.values_mut() {
538 self.refresh_cached_peer(peer, now);
539 }
540 data.peers.values().cloned().collect()
541 }
542
543 pub fn config(&self) -> &BootstrapCacheConfig {
545 &self.config
546 }
547
548 fn evict_lowest_quality(&self, data: &mut CacheData) {
549 let evict_count = (self.config.max_peers / 20).max(1); let mut sorted: Vec<_> = data.peers.iter().collect();
552 sorted.sort_by(|a, b| {
553 a.1.quality_score
554 .partial_cmp(&b.1.quality_score)
555 .unwrap_or(std::cmp::Ordering::Equal)
556 });
557
558 let to_remove: Vec<[u8; 32]> = sorted
559 .into_iter()
560 .take(evict_count)
561 .map(|(id, _)| *id)
562 .collect();
563
564 for id in to_remove {
565 data.peers.remove(&id);
566 }
567
568 debug!("Evicted {} lowest quality peers", evict_count);
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575 use tempfile::TempDir;
576
577 async fn create_test_cache(temp_dir: &TempDir) -> BootstrapCache {
578 let config = BootstrapCacheConfig::builder()
579 .cache_dir(temp_dir.path())
580 .max_peers(100)
581 .epsilon(0.0) .min_peers_to_save(1)
583 .build();
584
585 BootstrapCache::open(config).await.unwrap()
586 }
587
588 #[tokio::test]
589 async fn test_cache_creation() {
590 let temp_dir = TempDir::new().unwrap();
591 let cache = create_test_cache(&temp_dir).await;
592 assert_eq!(cache.peer_count().await, 0);
593 }
594
595 #[tokio::test]
596 async fn test_add_and_get() {
597 let temp_dir = TempDir::new().unwrap();
598 let cache = create_test_cache(&temp_dir).await;
599
600 let peer_id = PeerId([1u8; 32]);
601 cache
602 .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
603 .await;
604
605 assert_eq!(cache.peer_count().await, 1);
606 assert!(cache.contains(&peer_id).await);
607
608 let peer = cache.get(&peer_id).await.unwrap();
609 assert_eq!(peer.addresses.len(), 1);
610 }
611
612 #[tokio::test]
613 async fn test_select_peers() {
614 let temp_dir = TempDir::new().unwrap();
615 let cache = create_test_cache(&temp_dir).await;
616
617 for i in 0..10usize {
619 let peer_id = PeerId([i as u8; 32]);
620 let mut peer = CachedPeer::new(
621 peer_id,
622 vec![format!("127.0.0.1:{}", 9000 + i).parse().unwrap()],
623 PeerSource::Seed,
624 );
625 peer.quality_score = i as f64 / 10.0;
626 cache.upsert(peer).await;
627 }
628
629 let selected = cache.select_peers(5).await;
631 assert_eq!(selected.len(), 5);
632 assert!(selected[0].quality_score >= selected[4].quality_score);
633 }
634
635 #[tokio::test]
636 async fn test_persistence() {
637 let temp_dir = TempDir::new().unwrap();
638
639 {
641 let cache = create_test_cache(&temp_dir).await;
642 cache
643 .add_seed(PeerId([1; 32]), vec!["127.0.0.1:9000".parse().unwrap()])
644 .await;
645 cache.save().await.unwrap();
646 }
647
648 {
650 let cache = create_test_cache(&temp_dir).await;
651 assert_eq!(cache.peer_count().await, 1);
652 assert!(cache.contains(&PeerId([1; 32])).await);
653 }
654 }
655
656 #[tokio::test]
657 async fn test_persisted_explicit_assist_hints_survive_reopen() {
658 let temp_dir = TempDir::new().unwrap();
659 let peer_id = PeerId([9; 32]);
660 let peer_addr: SocketAddr = "198.51.100.9:9000".parse().unwrap();
661
662 {
663 let cache = create_test_cache(&temp_dir).await;
664 let mut peer = CachedPeer::new(peer_id, vec![peer_addr], PeerSource::Merge);
665 peer.capabilities.record_assist_hints(true, true);
666 cache.upsert(peer).await;
667 cache.save().await.unwrap();
668 }
669
670 {
671 let cache = create_test_cache(&temp_dir).await;
672 let peer = cache.get(&peer_id).await.expect("peer should reload");
673 assert!(peer.capabilities.hinted_supports_relay);
674 assert!(peer.capabilities.hinted_supports_coordination);
675 assert!(peer.capabilities.supports_relay);
676 assert!(peer.capabilities.supports_coordination);
677 assert!(peer.addresses.contains(&peer_addr));
678 }
679 }
680
681 #[tokio::test]
682 async fn test_quality_scoring() {
683 let temp_dir = TempDir::new().unwrap();
684 let cache = create_test_cache(&temp_dir).await;
685
686 let peer_id = PeerId([1; 32]);
687 cache
688 .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
689 .await;
690
691 let peer = cache.get(&peer_id).await.unwrap();
693 let initial_quality = peer.quality_score;
694
695 for _ in 0..5 {
697 cache.record_success(&peer_id, 50).await;
698 }
699
700 let peer = cache.get(&peer_id).await.unwrap();
701 assert!(peer.quality_score > initial_quality);
702 assert!(peer.success_rate() > 0.9);
703 }
704
705 #[tokio::test]
706 async fn test_eviction() {
707 let temp_dir = TempDir::new().unwrap();
708 let config = BootstrapCacheConfig::builder()
709 .cache_dir(temp_dir.path())
710 .max_peers(10)
711 .build();
712
713 let cache = BootstrapCache::open(config).await.unwrap();
714
715 for i in 0..15u8 {
717 let peer_id = PeerId([i; 32]);
718 let mut peer = CachedPeer::new(
719 peer_id,
720 vec![format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap()],
721 PeerSource::Seed,
722 );
723 peer.quality_score = i as f64 / 15.0;
724 cache.upsert(peer).await;
725 }
726
727 assert!(cache.peer_count().await <= 10);
729 }
730
731 #[tokio::test]
732 async fn test_stats() {
733 let temp_dir = TempDir::new().unwrap();
734 let cache = create_test_cache(&temp_dir).await;
735
736 let mut peer1 = CachedPeer::new(
738 PeerId([1; 32]),
739 vec!["203.0.113.1:9001".parse().unwrap()],
740 PeerSource::Seed,
741 );
742 peer1
743 .capabilities
744 .record_direct_observation("203.0.113.1:9001".parse().unwrap(), SystemTime::now());
745 cache.upsert(peer1).await;
746
747 let mut peer2 = CachedPeer::new(
748 PeerId([2; 32]),
749 vec!["198.51.100.2:9002".parse().unwrap()],
750 PeerSource::Seed,
751 );
752 peer2
753 .capabilities
754 .record_direct_observation("198.51.100.2:9002".parse().unwrap(), SystemTime::now());
755 cache.upsert(peer2).await;
756
757 cache
758 .add_seed(PeerId([3; 32]), vec!["127.0.0.1:9003".parse().unwrap()])
759 .await;
760
761 let stats = cache.stats().await;
762 assert_eq!(stats.total_peers, 3);
763 assert_eq!(stats.relay_peers, 2);
764 assert_eq!(stats.coordinator_peers, 2);
765 assert_eq!(stats.untested_peers, 3);
766 }
767
768 #[tokio::test]
769 async fn test_select_relay_peers() {
770 let temp_dir = TempDir::new().unwrap();
771 let cache = create_test_cache(&temp_dir).await;
772
773 for i in 0..10u8 {
775 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap();
776 let mut peer = CachedPeer::new(PeerId([i; 32]), vec![addr], PeerSource::Seed);
777 if i % 2 == 0 {
778 peer.capabilities
779 .record_direct_observation(addr, SystemTime::now());
780 }
781 peer.quality_score = i as f64 / 10.0;
782 cache.upsert(peer).await;
783 }
784
785 let relays = cache.select_relay_peers(10).await;
788 assert_eq!(relays.len(), 10); let relay_capable = relays
792 .iter()
793 .take(5)
794 .filter(|p| p.capabilities.direct_reachability_scope.is_some())
795 .count();
796 assert_eq!(
797 relay_capable, 5,
798 "Scoped direct-evidence peers should be first"
799 );
800 }
801
802 #[tokio::test]
803 async fn test_observe_direct_reachability_preserves_local_scope_without_global_promotion() {
804 let temp_dir = TempDir::new().unwrap();
805 let cache = create_test_cache(&temp_dir).await;
806 let peer_id = PeerId([9; 32]);
807 let addr: SocketAddr = "192.168.1.50:9000".parse().unwrap();
808
809 cache.observe_direct_reachability(peer_id, addr).await;
810
811 let peer = cache.get(&peer_id).await.expect("peer inserted");
812 assert!(!peer.capabilities.supports_relay);
813 assert!(!peer.capabilities.supports_coordination);
814 assert_eq!(
815 peer.capabilities.direct_reachability_scope,
816 Some(crate::reachability::ReachabilityScope::LocalNetwork)
817 );
818 assert!(peer.addresses.contains(&addr));
819 assert!(
820 peer.capabilities
821 .reachable_addresses
822 .iter()
823 .any(|entry| entry.address == addr)
824 );
825 assert!(peer.success_rate() > 0.0);
826 }
827}