Skip to main content

ant_quic/bootstrap_cache/
cache.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! Main bootstrap cache implementation.
9
10use super::config::BootstrapCacheConfig;
11use super::entry::{CachedPeer, ConnectionOutcome, PeerCapabilities, PeerSource};
12use super::persistence::{CacheData, CachePersistence};
13use super::selection::select_epsilon_greedy;
14use crate::nat_traversal_api::PeerId;
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::{Instant, SystemTime};
18use tokio::sync::{RwLock, broadcast};
19use tracing::{debug, info, warn};
20
21/// Bootstrap cache event for notifications
22#[derive(Debug, Clone)]
23pub enum CacheEvent {
24    /// Cache was updated (peers added/removed/modified)
25    Updated {
26        /// Current peer count
27        peer_count: usize,
28    },
29    /// Cache was saved to disk
30    Saved,
31    /// Cache was merged from another source
32    Merged {
33        /// Number of peers added from merge
34        added: usize,
35    },
36    /// Stale peers were cleaned up
37    Cleaned {
38        /// Number of peers removed
39        removed: usize,
40    },
41}
42
43/// Cache statistics
44#[derive(Debug, Clone, Default)]
45pub struct CacheStats {
46    /// Total number of cached peers
47    pub total_peers: usize,
48    /// Peers that support relay
49    pub relay_peers: usize,
50    /// Peers that support NAT coordination
51    pub coordinator_peers: usize,
52    /// Peers that support dual-stack (IPv4 + IPv6) bridging
53    pub dual_stack_relay_peers: usize,
54    /// Average quality score across all peers
55    pub average_quality: f64,
56    /// Number of untested peers
57    pub untested_peers: usize,
58}
59
60/// Greedy bootstrap cache with quality-based peer selection.
61///
62/// This cache stores peer information with quality metrics and provides
63/// epsilon-greedy selection to balance exploitation (using known-good peers)
64/// with exploration (trying new peers to discover potentially better ones).
65#[derive(Debug)]
66pub struct BootstrapCache {
67    config: BootstrapCacheConfig,
68    data: Arc<RwLock<CacheData>>,
69    persistence: CachePersistence,
70    event_tx: broadcast::Sender<CacheEvent>,
71    last_save: Arc<RwLock<Instant>>,
72    last_cleanup: Arc<RwLock<Instant>>,
73}
74
75impl BootstrapCache {
76    // ... (existing open/subscribe methods)
77    /// Open or create a bootstrap cache.
78    ///
79    /// Loads existing cache data from disk if available, otherwise starts fresh.
80    pub async fn open(config: BootstrapCacheConfig) -> std::io::Result<Self> {
81        let persistence = CachePersistence::new(&config.cache_dir, config.enable_file_locking)?;
82        let data = persistence.load()?;
83        let (event_tx, _) = broadcast::channel(256);
84        let now = Instant::now();
85
86        info!("Opened bootstrap cache with {} peers", data.peers.len());
87
88        Ok(Self {
89            config,
90            data: Arc::new(RwLock::new(data)),
91            persistence,
92            event_tx,
93            last_save: Arc::new(RwLock::new(now)),
94            last_cleanup: Arc::new(RwLock::new(now)),
95        })
96    }
97
98    /// Subscribe to cache events
99    pub fn subscribe(&self) -> broadcast::Receiver<CacheEvent> {
100        self.event_tx.subscribe()
101    }
102
103    /// Get the number of cached peers
104    pub async fn peer_count(&self) -> usize {
105        self.data.read().await.peers.len()
106    }
107
108    /// Get a specific peer from the cache
109    pub async fn get_peer(&self, peer_id: &PeerId) -> Option<CachedPeer> {
110        let mut data = self.data.write().await;
111        let peer = data.peers.get_mut(&peer_id.0)?;
112        peer.capabilities
113            .refresh_direct_capabilities(self.config.reachability_ttl, SystemTime::now());
114        peer.calculate_quality(&self.config.weights);
115        Some(peer.clone())
116    }
117
118    fn refresh_cached_peer(&self, peer: &mut CachedPeer, now: SystemTime) {
119        peer.capabilities
120            .refresh_direct_capabilities(self.config.reachability_ttl, now);
121        peer.calculate_quality(&self.config.weights);
122    }
123
124    /// Select peers for bootstrap using epsilon-greedy strategy.
125    ///
126    /// Returns up to `count` peers, balancing exploitation of known-good peers
127    /// with exploration of untested peers based on the configured epsilon.
128    pub async fn select_peers(&self, count: usize) -> Vec<CachedPeer> {
129        let mut data = self.data.write().await;
130        let now = SystemTime::now();
131        for peer in data.peers.values_mut() {
132            self.refresh_cached_peer(peer, now);
133        }
134        let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
135        drop(data);
136
137        select_epsilon_greedy(&peers, count, self.config.epsilon)
138            .into_iter()
139            .cloned()
140            .collect()
141    }
142
143    /// Select peers that support relay functionality.
144    ///
145    /// Returns peers sorted by quality score, preferring observed relay capability.
146    pub async fn select_relay_peers(&self, count: usize) -> Vec<CachedPeer> {
147        let mut data = self.data.write().await;
148        let now = SystemTime::now();
149        for peer in data.peers.values_mut() {
150            self.refresh_cached_peer(peer, now);
151        }
152        let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
153        drop(data);
154
155        super::selection::select_with_capabilities(&peers, count, true, false)
156            .into_iter()
157            .cloned()
158            .collect()
159    }
160
161    /// Select peers that support NAT coordination.
162    ///
163    /// Returns peers sorted by quality score, preferring observed coordination capability.
164    pub async fn select_coordinators(&self, count: usize) -> Vec<CachedPeer> {
165        let mut data = self.data.write().await;
166        let now = SystemTime::now();
167        for peer in data.peers.values_mut() {
168            self.refresh_cached_peer(peer, now);
169        }
170        let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
171        drop(data);
172
173        super::selection::select_with_capabilities(&peers, count, false, true)
174            .into_iter()
175            .cloned()
176            .collect()
177    }
178
179    /// Select relay peers that can reach a target IP version.
180    ///
181    /// Returns relays sorted by quality that can bridge traffic to the target.
182    /// Dual-stack relays are preferred as they can reach any target.
183    ///
184    /// # Arguments
185    /// * `count` - Maximum number of relays to return
186    /// * `target` - The target address to reach
187    /// * `prefer_dual_stack` - If true, prioritize dual-stack relays
188    pub async fn select_relays_for_target(
189        &self,
190        count: usize,
191        target: &std::net::SocketAddr,
192        prefer_dual_stack: bool,
193    ) -> Vec<CachedPeer> {
194        use super::selection::select_relays_for_target;
195
196        let mut data = self.data.write().await;
197        let now = SystemTime::now();
198        for peer in data.peers.values_mut() {
199            self.refresh_cached_peer(peer, now);
200        }
201        let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
202        drop(data);
203
204        select_relays_for_target(&peers, count, *target, prefer_dual_stack)
205            .into_iter()
206            .cloned()
207            .collect()
208    }
209
210    /// Select relay peers that support dual-stack (IPv4 + IPv6) bridging.
211    ///
212    /// These peers are valuable for bridging between IPv4-only and IPv6-only networks.
213    pub async fn select_dual_stack_relays(&self, count: usize) -> Vec<CachedPeer> {
214        use super::selection::select_dual_stack_relays;
215
216        let mut data = self.data.write().await;
217        let now = SystemTime::now();
218        for peer in data.peers.values_mut() {
219            self.refresh_cached_peer(peer, now);
220        }
221        let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
222        drop(data);
223
224        select_dual_stack_relays(&peers, count)
225            .into_iter()
226            .cloned()
227            .collect()
228    }
229
230    /// Add or update a peer in the cache.
231    ///
232    /// If the cache is at capacity, evicts the lowest quality peers.
233    pub async fn upsert(&self, peer: CachedPeer) {
234        let mut data = self.data.write().await;
235
236        // Evict lowest quality if at capacity
237        if data.peers.len() >= self.config.max_peers && !data.peers.contains_key(&peer.peer_id.0) {
238            self.evict_lowest_quality(&mut data);
239        }
240
241        data.peers.insert(peer.peer_id.0, peer);
242
243        let count = data.peers.len();
244        drop(data);
245
246        let _ = self
247            .event_tx
248            .send(CacheEvent::Updated { peer_count: count });
249    }
250
251    /// Add a seed peer (user-provided bootstrap node).
252    pub async fn add_seed(&self, peer_id: PeerId, addresses: Vec<SocketAddr>) {
253        let peer = CachedPeer::new(peer_id, addresses, PeerSource::Seed);
254        self.upsert(peer).await;
255    }
256
257    /// Add a peer discovered from an active connection.
258    pub async fn add_from_connection(
259        &self,
260        peer_id: PeerId,
261        addresses: Vec<SocketAddr>,
262        caps: Option<PeerCapabilities>,
263    ) {
264        let mut peer = CachedPeer::new(peer_id, addresses, PeerSource::Connection);
265        if let Some(caps) = caps {
266            peer.capabilities = caps;
267        }
268        self.upsert(peer).await;
269    }
270
271    /// Record a connection attempt result.
272    pub async fn record_outcome(&self, peer_id: &PeerId, outcome: ConnectionOutcome) {
273        let mut data = self.data.write().await;
274
275        if let Some(peer) = data.peers.get_mut(&peer_id.0) {
276            if outcome.success {
277                peer.record_success(
278                    outcome.rtt_ms.unwrap_or(100),
279                    outcome.capabilities_discovered,
280                );
281            } else {
282                peer.record_failure();
283            }
284
285            // Recalculate quality score
286            peer.calculate_quality(&self.config.weights);
287        }
288    }
289
290    /// Record successful connection.
291    pub async fn record_success(&self, peer_id: &PeerId, rtt_ms: u32) {
292        self.record_outcome(
293            peer_id,
294            ConnectionOutcome {
295                success: true,
296                rtt_ms: Some(rtt_ms),
297                capabilities_discovered: None,
298            },
299        )
300        .await;
301    }
302
303    /// Record failed connection.
304    pub async fn record_failure(&self, peer_id: &PeerId) {
305        self.record_outcome(
306            peer_id,
307            ConnectionOutcome {
308                success: false,
309                rtt_ms: None,
310                capabilities_discovered: None,
311            },
312        )
313        .await;
314    }
315
316    /// Update peer capabilities.
317    pub async fn update_capabilities(&self, peer_id: &PeerId, caps: PeerCapabilities) {
318        let mut data = self.data.write().await;
319
320        if let Some(peer) = data.peers.get_mut(&peer_id.0) {
321            peer.capabilities = caps;
322            peer.calculate_quality(&self.config.weights);
323        }
324    }
325
326    /// Record that a peer was directly reachable from this node.
327    ///
328    /// This is observer-scoped evidence. A peer is considered suitable for
329    /// relay/bootstrap/coordinator selection only after a direct connection to
330    /// one of its addresses succeeds without coordinator or relay assistance.
331    pub async fn observe_direct_reachability(&self, peer_id: PeerId, address: SocketAddr) {
332        let mut data = self.data.write().await;
333        let now = SystemTime::now();
334
335        let peer = data
336            .peers
337            .entry(peer_id.0)
338            .or_insert_with(|| CachedPeer::new(peer_id, vec![address], PeerSource::Connection));
339
340        if !peer.addresses.contains(&address) {
341            peer.addresses.push(address);
342        }
343
344        peer.last_seen = now;
345        peer.last_attempt = Some(now);
346        peer.stats.success_count = peer.stats.success_count.saturating_add(1);
347        peer.capabilities.record_direct_observation(address, now);
348        self.refresh_cached_peer(peer, now);
349
350        let count = data.peers.len();
351        drop(data);
352
353        let _ = self
354            .event_tx
355            .send(CacheEvent::Updated { peer_count: count });
356    }
357
358    /// Get a specific peer.
359    pub async fn get(&self, peer_id: &PeerId) -> Option<CachedPeer> {
360        let mut data = self.data.write().await;
361        let peer = data.peers.get_mut(&peer_id.0)?;
362        self.refresh_cached_peer(peer, SystemTime::now());
363        Some(peer.clone())
364    }
365
366    /// Update the address validation token for a peer
367    pub async fn update_token(&self, peer_id: PeerId, token: Vec<u8>) {
368        let mut data = self.data.write().await;
369        if let Some(peer) = data.peers.get_mut(&peer_id.0) {
370            peer.token = Some(token);
371        }
372    }
373
374    /// Get all tokens from cached peers (for initializing TokenStore)
375    pub async fn get_all_tokens(&self) -> std::collections::HashMap<PeerId, Vec<u8>> {
376        self.data
377            .read()
378            .await
379            .peers
380            .values()
381            .filter_map(|p| p.token.clone().map(|t| (p.peer_id, t)))
382            .collect()
383    }
384
385    /// Check if peer exists in cache.
386    pub async fn contains(&self, peer_id: &PeerId) -> bool {
387        self.data.read().await.peers.contains_key(&peer_id.0)
388    }
389
390    /// Remove a peer from cache.
391    pub async fn remove(&self, peer_id: &PeerId) -> Option<CachedPeer> {
392        self.data.write().await.peers.remove(&peer_id.0)
393    }
394
395    /// Save cache to disk.
396    pub async fn save(&self) -> std::io::Result<()> {
397        let mut data = self.data.write().await;
398
399        if data.peers.len() < self.config.min_peers_to_save {
400            debug!(
401                "Skipping save: only {} peers (min: {})",
402                data.peers.len(),
403                self.config.min_peers_to_save
404            );
405            return Ok(());
406        }
407
408        self.persistence.save(&mut data)?;
409
410        drop(data);
411        *self.last_save.write().await = Instant::now();
412        let _ = self.event_tx.send(CacheEvent::Saved);
413
414        Ok(())
415    }
416
417    /// Cleanup stale peers.
418    ///
419    /// Removes peers that haven't been seen within the stale threshold.
420    /// Returns the number of peers removed.
421    pub async fn cleanup_stale(&self) -> usize {
422        let mut data = self.data.write().await;
423        let initial_count = data.peers.len();
424
425        data.peers
426            .retain(|_, peer| !peer.is_stale(self.config.stale_threshold));
427
428        let removed = initial_count - data.peers.len();
429
430        if removed > 0 {
431            info!("Cleaned up {} stale peers", removed);
432            let _ = self.event_tx.send(CacheEvent::Cleaned { removed });
433        }
434
435        drop(data);
436        *self.last_cleanup.write().await = Instant::now();
437
438        removed
439    }
440
441    /// Recalculate quality scores for all peers.
442    pub async fn recalculate_quality(&self) {
443        let mut data = self.data.write().await;
444
445        for peer in data.peers.values_mut() {
446            peer.calculate_quality(&self.config.weights);
447        }
448
449        let count = data.peers.len();
450        let _ = self
451            .event_tx
452            .send(CacheEvent::Updated { peer_count: count });
453    }
454
455    /// Get cache statistics.
456    pub async fn stats(&self) -> CacheStats {
457        let mut data = self.data.write().await;
458        let now = SystemTime::now();
459        for peer in data.peers.values_mut() {
460            self.refresh_cached_peer(peer, now);
461        }
462
463        let relay_count = data
464            .peers
465            .values()
466            .filter(|p| p.capabilities.supports_relay)
467            .count();
468        let coord_count = data
469            .peers
470            .values()
471            .filter(|p| p.capabilities.supports_coordination)
472            .count();
473        let dual_stack_count = data
474            .peers
475            .values()
476            .filter(|p| p.capabilities.supports_relay && p.capabilities.supports_dual_stack())
477            .count();
478        let untested = data
479            .peers
480            .values()
481            .filter(|p| p.stats.success_count + p.stats.failure_count == 0)
482            .count();
483        let avg_quality = if data.peers.is_empty() {
484            0.0
485        } else {
486            data.peers.values().map(|p| p.quality_score).sum::<f64>() / data.peers.len() as f64
487        };
488
489        CacheStats {
490            total_peers: data.peers.len(),
491            relay_peers: relay_count,
492            coordinator_peers: coord_count,
493            dual_stack_relay_peers: dual_stack_count,
494            average_quality: avg_quality,
495            untested_peers: untested,
496        }
497    }
498
499    /// Start background maintenance tasks.
500    ///
501    /// Spawns a task that periodically:
502    /// - Saves the cache to disk
503    /// - Cleans up stale peers
504    /// - Recalculates quality scores
505    ///
506    /// Returns a handle that can be used to cancel the task.
507    pub fn start_maintenance(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
508        let cache = self;
509
510        tokio::spawn(async move {
511            let mut save_interval = tokio::time::interval(cache.config.save_interval);
512            let mut cleanup_interval = tokio::time::interval(cache.config.cleanup_interval);
513            let mut quality_interval = tokio::time::interval(cache.config.quality_update_interval);
514
515            loop {
516                tokio::select! {
517                    _ = save_interval.tick() => {
518                        if let Err(e) = cache.save().await {
519                            warn!("Failed to save cache: {}", e);
520                        }
521                    }
522                    _ = cleanup_interval.tick() => {
523                        cache.cleanup_stale().await;
524                    }
525                    _ = quality_interval.tick() => {
526                        cache.recalculate_quality().await;
527                    }
528                }
529            }
530        })
531    }
532
533    /// Get all cached peers (for export/debug).
534    pub async fn all_peers(&self) -> Vec<CachedPeer> {
535        let mut data = self.data.write().await;
536        let now = SystemTime::now();
537        for peer in data.peers.values_mut() {
538            self.refresh_cached_peer(peer, now);
539        }
540        data.peers.values().cloned().collect()
541    }
542
543    /// Get the configuration.
544    pub fn config(&self) -> &BootstrapCacheConfig {
545        &self.config
546    }
547
548    fn evict_lowest_quality(&self, data: &mut CacheData) {
549        let evict_count = (self.config.max_peers / 20).max(1); // Evict ~5%
550
551        let mut sorted: Vec<_> = data.peers.iter().collect();
552        sorted.sort_by(|a, b| {
553            a.1.quality_score
554                .partial_cmp(&b.1.quality_score)
555                .unwrap_or(std::cmp::Ordering::Equal)
556        });
557
558        let to_remove: Vec<[u8; 32]> = sorted
559            .into_iter()
560            .take(evict_count)
561            .map(|(id, _)| *id)
562            .collect();
563
564        for id in to_remove {
565            data.peers.remove(&id);
566        }
567
568        debug!("Evicted {} lowest quality peers", evict_count);
569    }
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575    use tempfile::TempDir;
576
577    async fn create_test_cache(temp_dir: &TempDir) -> BootstrapCache {
578        let config = BootstrapCacheConfig::builder()
579            .cache_dir(temp_dir.path())
580            .max_peers(100)
581            .epsilon(0.0) // Pure exploitation for predictable tests
582            .min_peers_to_save(1)
583            .build();
584
585        BootstrapCache::open(config).await.unwrap()
586    }
587
588    #[tokio::test]
589    async fn test_cache_creation() {
590        let temp_dir = TempDir::new().unwrap();
591        let cache = create_test_cache(&temp_dir).await;
592        assert_eq!(cache.peer_count().await, 0);
593    }
594
595    #[tokio::test]
596    async fn test_add_and_get() {
597        let temp_dir = TempDir::new().unwrap();
598        let cache = create_test_cache(&temp_dir).await;
599
600        let peer_id = PeerId([1u8; 32]);
601        cache
602            .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
603            .await;
604
605        assert_eq!(cache.peer_count().await, 1);
606        assert!(cache.contains(&peer_id).await);
607
608        let peer = cache.get(&peer_id).await.unwrap();
609        assert_eq!(peer.addresses.len(), 1);
610    }
611
612    #[tokio::test]
613    async fn test_select_peers() {
614        let temp_dir = TempDir::new().unwrap();
615        let cache = create_test_cache(&temp_dir).await;
616
617        // Add peers with different quality
618        for i in 0..10usize {
619            let peer_id = PeerId([i as u8; 32]);
620            let mut peer = CachedPeer::new(
621                peer_id,
622                vec![format!("127.0.0.1:{}", 9000 + i).parse().unwrap()],
623                PeerSource::Seed,
624            );
625            peer.quality_score = i as f64 / 10.0;
626            cache.upsert(peer).await;
627        }
628
629        // Select should return highest quality first (epsilon=0)
630        let selected = cache.select_peers(5).await;
631        assert_eq!(selected.len(), 5);
632        assert!(selected[0].quality_score >= selected[4].quality_score);
633    }
634
635    #[tokio::test]
636    async fn test_persistence() {
637        let temp_dir = TempDir::new().unwrap();
638
639        // Create and populate cache
640        {
641            let cache = create_test_cache(&temp_dir).await;
642            cache
643                .add_seed(PeerId([1; 32]), vec!["127.0.0.1:9000".parse().unwrap()])
644                .await;
645            cache.save().await.unwrap();
646        }
647
648        // Reopen and verify
649        {
650            let cache = create_test_cache(&temp_dir).await;
651            assert_eq!(cache.peer_count().await, 1);
652            assert!(cache.contains(&PeerId([1; 32])).await);
653        }
654    }
655
656    #[tokio::test]
657    async fn test_persisted_explicit_assist_hints_survive_reopen() {
658        let temp_dir = TempDir::new().unwrap();
659        let peer_id = PeerId([9; 32]);
660        let peer_addr: SocketAddr = "198.51.100.9:9000".parse().unwrap();
661
662        {
663            let cache = create_test_cache(&temp_dir).await;
664            let mut peer = CachedPeer::new(peer_id, vec![peer_addr], PeerSource::Merge);
665            peer.capabilities.record_assist_hints(true, true);
666            cache.upsert(peer).await;
667            cache.save().await.unwrap();
668        }
669
670        {
671            let cache = create_test_cache(&temp_dir).await;
672            let peer = cache.get(&peer_id).await.expect("peer should reload");
673            assert!(peer.capabilities.hinted_supports_relay);
674            assert!(peer.capabilities.hinted_supports_coordination);
675            assert!(peer.capabilities.supports_relay);
676            assert!(peer.capabilities.supports_coordination);
677            assert!(peer.addresses.contains(&peer_addr));
678        }
679    }
680
681    #[tokio::test]
682    async fn test_quality_scoring() {
683        let temp_dir = TempDir::new().unwrap();
684        let cache = create_test_cache(&temp_dir).await;
685
686        let peer_id = PeerId([1; 32]);
687        cache
688            .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
689            .await;
690
691        // Initial quality should be neutral
692        let peer = cache.get(&peer_id).await.unwrap();
693        let initial_quality = peer.quality_score;
694
695        // Record successes - quality should improve
696        for _ in 0..5 {
697            cache.record_success(&peer_id, 50).await;
698        }
699
700        let peer = cache.get(&peer_id).await.unwrap();
701        assert!(peer.quality_score > initial_quality);
702        assert!(peer.success_rate() > 0.9);
703    }
704
705    #[tokio::test]
706    async fn test_eviction() {
707        let temp_dir = TempDir::new().unwrap();
708        let config = BootstrapCacheConfig::builder()
709            .cache_dir(temp_dir.path())
710            .max_peers(10)
711            .build();
712
713        let cache = BootstrapCache::open(config).await.unwrap();
714
715        // Add 15 peers
716        for i in 0..15u8 {
717            let peer_id = PeerId([i; 32]);
718            let mut peer = CachedPeer::new(
719                peer_id,
720                vec![format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap()],
721                PeerSource::Seed,
722            );
723            peer.quality_score = i as f64 / 15.0;
724            cache.upsert(peer).await;
725        }
726
727        // Should have evicted some
728        assert!(cache.peer_count().await <= 10);
729    }
730
731    #[tokio::test]
732    async fn test_stats() {
733        let temp_dir = TempDir::new().unwrap();
734        let cache = create_test_cache(&temp_dir).await;
735
736        // Add some peers with capabilities
737        let mut peer1 = CachedPeer::new(
738            PeerId([1; 32]),
739            vec!["203.0.113.1:9001".parse().unwrap()],
740            PeerSource::Seed,
741        );
742        peer1
743            .capabilities
744            .record_direct_observation("203.0.113.1:9001".parse().unwrap(), SystemTime::now());
745        cache.upsert(peer1).await;
746
747        let mut peer2 = CachedPeer::new(
748            PeerId([2; 32]),
749            vec!["198.51.100.2:9002".parse().unwrap()],
750            PeerSource::Seed,
751        );
752        peer2
753            .capabilities
754            .record_direct_observation("198.51.100.2:9002".parse().unwrap(), SystemTime::now());
755        cache.upsert(peer2).await;
756
757        cache
758            .add_seed(PeerId([3; 32]), vec!["127.0.0.1:9003".parse().unwrap()])
759            .await;
760
761        let stats = cache.stats().await;
762        assert_eq!(stats.total_peers, 3);
763        assert_eq!(stats.relay_peers, 2);
764        assert_eq!(stats.coordinator_peers, 2);
765        assert_eq!(stats.untested_peers, 3);
766    }
767
768    #[tokio::test]
769    async fn test_select_relay_peers() {
770        let temp_dir = TempDir::new().unwrap();
771        let cache = create_test_cache(&temp_dir).await;
772
773        // Add mix of relay and non-relay peers
774        for i in 0..10u8 {
775            let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap();
776            let mut peer = CachedPeer::new(PeerId([i; 32]), vec![addr], PeerSource::Seed);
777            if i % 2 == 0 {
778                peer.capabilities
779                    .record_direct_observation(addr, SystemTime::now());
780            }
781            peer.quality_score = i as f64 / 10.0;
782            cache.upsert(peer).await;
783        }
784
785        // v0.13.0+: Measure, don't trust - returns all peers but prefers
786        // those with observed relay capability.
787        let relays = cache.select_relay_peers(10).await;
788        assert_eq!(relays.len(), 10); // All peers are candidates
789
790        // First 5 should have direct reachability evidence (prioritized)
791        let relay_capable = relays
792            .iter()
793            .take(5)
794            .filter(|p| p.capabilities.direct_reachability_scope.is_some())
795            .count();
796        assert_eq!(
797            relay_capable, 5,
798            "Scoped direct-evidence peers should be first"
799        );
800    }
801
802    #[tokio::test]
803    async fn test_observe_direct_reachability_preserves_local_scope_without_global_promotion() {
804        let temp_dir = TempDir::new().unwrap();
805        let cache = create_test_cache(&temp_dir).await;
806        let peer_id = PeerId([9; 32]);
807        let addr: SocketAddr = "192.168.1.50:9000".parse().unwrap();
808
809        cache.observe_direct_reachability(peer_id, addr).await;
810
811        let peer = cache.get(&peer_id).await.expect("peer inserted");
812        assert!(!peer.capabilities.supports_relay);
813        assert!(!peer.capabilities.supports_coordination);
814        assert_eq!(
815            peer.capabilities.direct_reachability_scope,
816            Some(crate::reachability::ReachabilityScope::LocalNetwork)
817        );
818        assert!(peer.addresses.contains(&addr));
819        assert!(
820            peer.capabilities
821                .reachable_addresses
822                .iter()
823                .any(|entry| entry.address == addr)
824        );
825        assert!(peer.success_rate() > 0.0);
826    }
827}