ant_quic/bootstrap_cache/
cache.rs

1//! Main bootstrap cache implementation.
2
3use super::config::BootstrapCacheConfig;
4use super::entry::{CachedPeer, ConnectionOutcome, PeerCapabilities, PeerSource};
5use super::persistence::{CacheData, CachePersistence};
6use super::selection::select_epsilon_greedy;
7use crate::nat_traversal_api::PeerId;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use std::time::Instant;
11use tokio::sync::{RwLock, broadcast};
12use tracing::{debug, info, warn};
13
14/// Bootstrap cache event for notifications
15#[derive(Debug, Clone)]
16pub enum CacheEvent {
17    /// Cache was updated (peers added/removed/modified)
18    Updated {
19        /// Current peer count
20        peer_count: usize,
21    },
22    /// Cache was saved to disk
23    Saved,
24    /// Cache was merged from another source
25    Merged {
26        /// Number of peers added from merge
27        added: usize,
28    },
29    /// Stale peers were cleaned up
30    Cleaned {
31        /// Number of peers removed
32        removed: usize,
33    },
34}
35
36/// Cache statistics
37#[derive(Debug, Clone, Default)]
38pub struct CacheStats {
39    /// Total number of cached peers
40    pub total_peers: usize,
41    /// Peers that support relay
42    pub relay_peers: usize,
43    /// Peers that support NAT coordination
44    pub coordinator_peers: usize,
45    /// Average quality score across all peers
46    pub average_quality: f64,
47    /// Number of untested peers
48    pub untested_peers: usize,
49}
50
51/// Greedy bootstrap cache with quality-based peer selection.
52///
53/// This cache stores peer information with quality metrics and provides
54/// epsilon-greedy selection to balance exploitation (using known-good peers)
55/// with exploration (trying new peers to discover potentially better ones).
56pub struct BootstrapCache {
57    config: BootstrapCacheConfig,
58    data: Arc<RwLock<CacheData>>,
59    persistence: CachePersistence,
60    event_tx: broadcast::Sender<CacheEvent>,
61    last_save: Arc<RwLock<Instant>>,
62    last_cleanup: Arc<RwLock<Instant>>,
63}
64
65impl BootstrapCache {
66    /// Open or create a bootstrap cache.
67    ///
68    /// Loads existing cache data from disk if available, otherwise starts fresh.
69    pub async fn open(config: BootstrapCacheConfig) -> std::io::Result<Self> {
70        let persistence = CachePersistence::new(&config.cache_dir, config.enable_file_locking)?;
71        let data = persistence.load()?;
72        let (event_tx, _) = broadcast::channel(256);
73        let now = Instant::now();
74
75        info!("Opened bootstrap cache with {} peers", data.peers.len());
76
77        Ok(Self {
78            config,
79            data: Arc::new(RwLock::new(data)),
80            persistence,
81            event_tx,
82            last_save: Arc::new(RwLock::new(now)),
83            last_cleanup: Arc::new(RwLock::new(now)),
84        })
85    }
86
87    /// Subscribe to cache events
88    pub fn subscribe(&self) -> broadcast::Receiver<CacheEvent> {
89        self.event_tx.subscribe()
90    }
91
92    /// Get the number of cached peers
93    pub async fn peer_count(&self) -> usize {
94        self.data.read().await.peers.len()
95    }
96
97    /// Select peers for bootstrap using epsilon-greedy strategy.
98    ///
99    /// Returns up to `count` peers, balancing exploitation of known-good peers
100    /// with exploration of untested peers based on the configured epsilon.
101    pub async fn select_peers(&self, count: usize) -> Vec<CachedPeer> {
102        let data = self.data.read().await;
103        let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
104
105        select_epsilon_greedy(&peers, count, self.config.epsilon)
106            .into_iter()
107            .cloned()
108            .collect()
109    }
110
111    /// Select peers that support relay functionality.
112    ///
113    /// Returns peers sorted by quality score that have relay capability.
114    pub async fn select_relay_peers(&self, count: usize) -> Vec<CachedPeer> {
115        let data = self.data.read().await;
116        let mut relays: Vec<CachedPeer> = data
117            .peers
118            .values()
119            .filter(|p| p.capabilities.supports_relay)
120            .cloned()
121            .collect();
122
123        relays.sort_by(|a, b| {
124            b.quality_score
125                .partial_cmp(&a.quality_score)
126                .unwrap_or(std::cmp::Ordering::Equal)
127        });
128
129        relays.into_iter().take(count).collect()
130    }
131
132    /// Select peers that support NAT coordination.
133    ///
134    /// Returns peers sorted by quality score that have coordination capability.
135    pub async fn select_coordinators(&self, count: usize) -> Vec<CachedPeer> {
136        let data = self.data.read().await;
137        let mut coordinators: Vec<CachedPeer> = data
138            .peers
139            .values()
140            .filter(|p| p.capabilities.supports_coordination)
141            .cloned()
142            .collect();
143
144        coordinators.sort_by(|a, b| {
145            b.quality_score
146                .partial_cmp(&a.quality_score)
147                .unwrap_or(std::cmp::Ordering::Equal)
148        });
149
150        coordinators.into_iter().take(count).collect()
151    }
152
153    /// Add or update a peer in the cache.
154    ///
155    /// If the cache is at capacity, evicts the lowest quality peers.
156    pub async fn upsert(&self, peer: CachedPeer) {
157        let mut data = self.data.write().await;
158
159        // Evict lowest quality if at capacity
160        if data.peers.len() >= self.config.max_peers && !data.peers.contains_key(&peer.peer_id.0) {
161            self.evict_lowest_quality(&mut data);
162        }
163
164        data.peers.insert(peer.peer_id.0, peer);
165
166        let count = data.peers.len();
167        drop(data);
168
169        let _ = self
170            .event_tx
171            .send(CacheEvent::Updated { peer_count: count });
172    }
173
174    /// Add a seed peer (user-provided bootstrap node).
175    pub async fn add_seed(&self, peer_id: PeerId, addresses: Vec<SocketAddr>) {
176        let peer = CachedPeer::new(peer_id, addresses, PeerSource::Seed);
177        self.upsert(peer).await;
178    }
179
180    /// Add a peer discovered from an active connection.
181    pub async fn add_from_connection(
182        &self,
183        peer_id: PeerId,
184        addresses: Vec<SocketAddr>,
185        caps: Option<PeerCapabilities>,
186    ) {
187        let mut peer = CachedPeer::new(peer_id, addresses, PeerSource::Connection);
188        if let Some(caps) = caps {
189            peer.capabilities = caps;
190        }
191        self.upsert(peer).await;
192    }
193
194    /// Record a connection attempt result.
195    pub async fn record_outcome(&self, peer_id: &PeerId, outcome: ConnectionOutcome) {
196        let mut data = self.data.write().await;
197
198        if let Some(peer) = data.peers.get_mut(&peer_id.0) {
199            if outcome.success {
200                peer.record_success(
201                    outcome.rtt_ms.unwrap_or(100),
202                    outcome.capabilities_discovered,
203                );
204            } else {
205                peer.record_failure();
206            }
207
208            // Recalculate quality score
209            peer.calculate_quality(&self.config.weights);
210        }
211    }
212
213    /// Record successful connection.
214    pub async fn record_success(&self, peer_id: &PeerId, rtt_ms: u32) {
215        self.record_outcome(
216            peer_id,
217            ConnectionOutcome {
218                success: true,
219                rtt_ms: Some(rtt_ms),
220                capabilities_discovered: None,
221            },
222        )
223        .await;
224    }
225
226    /// Record failed connection.
227    pub async fn record_failure(&self, peer_id: &PeerId) {
228        self.record_outcome(
229            peer_id,
230            ConnectionOutcome {
231                success: false,
232                rtt_ms: None,
233                capabilities_discovered: None,
234            },
235        )
236        .await;
237    }
238
239    /// Update peer capabilities.
240    pub async fn update_capabilities(&self, peer_id: &PeerId, caps: PeerCapabilities) {
241        let mut data = self.data.write().await;
242
243        if let Some(peer) = data.peers.get_mut(&peer_id.0) {
244            peer.capabilities = caps;
245            peer.calculate_quality(&self.config.weights);
246        }
247    }
248
249    /// Get a specific peer.
250    pub async fn get(&self, peer_id: &PeerId) -> Option<CachedPeer> {
251        self.data.read().await.peers.get(&peer_id.0).cloned()
252    }
253
254    /// Check if peer exists in cache.
255    pub async fn contains(&self, peer_id: &PeerId) -> bool {
256        self.data.read().await.peers.contains_key(&peer_id.0)
257    }
258
259    /// Remove a peer from cache.
260    pub async fn remove(&self, peer_id: &PeerId) -> Option<CachedPeer> {
261        self.data.write().await.peers.remove(&peer_id.0)
262    }
263
264    /// Save cache to disk.
265    pub async fn save(&self) -> std::io::Result<()> {
266        let mut data = self.data.write().await;
267
268        if data.peers.len() < self.config.min_peers_to_save {
269            debug!(
270                "Skipping save: only {} peers (min: {})",
271                data.peers.len(),
272                self.config.min_peers_to_save
273            );
274            return Ok(());
275        }
276
277        self.persistence.save(&mut data)?;
278
279        drop(data);
280        *self.last_save.write().await = Instant::now();
281        let _ = self.event_tx.send(CacheEvent::Saved);
282
283        Ok(())
284    }
285
286    /// Cleanup stale peers.
287    ///
288    /// Removes peers that haven't been seen within the stale threshold.
289    /// Returns the number of peers removed.
290    pub async fn cleanup_stale(&self) -> usize {
291        let mut data = self.data.write().await;
292        let initial_count = data.peers.len();
293
294        data.peers
295            .retain(|_, peer| !peer.is_stale(self.config.stale_threshold));
296
297        let removed = initial_count - data.peers.len();
298
299        if removed > 0 {
300            info!("Cleaned up {} stale peers", removed);
301            let _ = self.event_tx.send(CacheEvent::Cleaned { removed });
302        }
303
304        drop(data);
305        *self.last_cleanup.write().await = Instant::now();
306
307        removed
308    }
309
310    /// Recalculate quality scores for all peers.
311    pub async fn recalculate_quality(&self) {
312        let mut data = self.data.write().await;
313
314        for peer in data.peers.values_mut() {
315            peer.calculate_quality(&self.config.weights);
316        }
317
318        let count = data.peers.len();
319        let _ = self
320            .event_tx
321            .send(CacheEvent::Updated { peer_count: count });
322    }
323
324    /// Get cache statistics.
325    pub async fn stats(&self) -> CacheStats {
326        let data = self.data.read().await;
327
328        let relay_count = data
329            .peers
330            .values()
331            .filter(|p| p.capabilities.supports_relay)
332            .count();
333        let coord_count = data
334            .peers
335            .values()
336            .filter(|p| p.capabilities.supports_coordination)
337            .count();
338        let untested = data
339            .peers
340            .values()
341            .filter(|p| p.stats.success_count + p.stats.failure_count == 0)
342            .count();
343        let avg_quality = if data.peers.is_empty() {
344            0.0
345        } else {
346            data.peers.values().map(|p| p.quality_score).sum::<f64>() / data.peers.len() as f64
347        };
348
349        CacheStats {
350            total_peers: data.peers.len(),
351            relay_peers: relay_count,
352            coordinator_peers: coord_count,
353            average_quality: avg_quality,
354            untested_peers: untested,
355        }
356    }
357
358    /// Start background maintenance tasks.
359    ///
360    /// Spawns a task that periodically:
361    /// - Saves the cache to disk
362    /// - Cleans up stale peers
363    /// - Recalculates quality scores
364    ///
365    /// Returns a handle that can be used to cancel the task.
366    pub fn start_maintenance(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
367        let cache = self;
368
369        tokio::spawn(async move {
370            let mut save_interval = tokio::time::interval(cache.config.save_interval);
371            let mut cleanup_interval = tokio::time::interval(cache.config.cleanup_interval);
372            let mut quality_interval = tokio::time::interval(cache.config.quality_update_interval);
373
374            loop {
375                tokio::select! {
376                    _ = save_interval.tick() => {
377                        if let Err(e) = cache.save().await {
378                            warn!("Failed to save cache: {}", e);
379                        }
380                    }
381                    _ = cleanup_interval.tick() => {
382                        cache.cleanup_stale().await;
383                    }
384                    _ = quality_interval.tick() => {
385                        cache.recalculate_quality().await;
386                    }
387                }
388            }
389        })
390    }
391
392    /// Get all cached peers (for export/debug).
393    pub async fn all_peers(&self) -> Vec<CachedPeer> {
394        self.data.read().await.peers.values().cloned().collect()
395    }
396
397    /// Get the configuration.
398    pub fn config(&self) -> &BootstrapCacheConfig {
399        &self.config
400    }
401
402    fn evict_lowest_quality(&self, data: &mut CacheData) {
403        let evict_count = (self.config.max_peers / 20).max(1); // Evict ~5%
404
405        let mut sorted: Vec<_> = data.peers.iter().collect();
406        sorted.sort_by(|a, b| {
407            a.1.quality_score
408                .partial_cmp(&b.1.quality_score)
409                .unwrap_or(std::cmp::Ordering::Equal)
410        });
411
412        let to_remove: Vec<[u8; 32]> = sorted
413            .into_iter()
414            .take(evict_count)
415            .map(|(id, _)| *id)
416            .collect();
417
418        for id in to_remove {
419            data.peers.remove(&id);
420        }
421
422        debug!("Evicted {} lowest quality peers", evict_count);
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use tempfile::TempDir;
430
431    async fn create_test_cache(temp_dir: &TempDir) -> BootstrapCache {
432        let config = BootstrapCacheConfig::builder()
433            .cache_dir(temp_dir.path())
434            .max_peers(100)
435            .epsilon(0.0) // Pure exploitation for predictable tests
436            .min_peers_to_save(1)
437            .build();
438
439        BootstrapCache::open(config).await.unwrap()
440    }
441
442    #[tokio::test]
443    async fn test_cache_creation() {
444        let temp_dir = TempDir::new().unwrap();
445        let cache = create_test_cache(&temp_dir).await;
446        assert_eq!(cache.peer_count().await, 0);
447    }
448
449    #[tokio::test]
450    async fn test_add_and_get() {
451        let temp_dir = TempDir::new().unwrap();
452        let cache = create_test_cache(&temp_dir).await;
453
454        let peer_id = PeerId([1u8; 32]);
455        cache
456            .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
457            .await;
458
459        assert_eq!(cache.peer_count().await, 1);
460        assert!(cache.contains(&peer_id).await);
461
462        let peer = cache.get(&peer_id).await.unwrap();
463        assert_eq!(peer.addresses.len(), 1);
464    }
465
466    #[tokio::test]
467    async fn test_select_peers() {
468        let temp_dir = TempDir::new().unwrap();
469        let cache = create_test_cache(&temp_dir).await;
470
471        // Add peers with different quality
472        for i in 0..10usize {
473            let peer_id = PeerId([i as u8; 32]);
474            let mut peer = CachedPeer::new(
475                peer_id,
476                vec![format!("127.0.0.1:{}", 9000 + i).parse().unwrap()],
477                PeerSource::Seed,
478            );
479            peer.quality_score = i as f64 / 10.0;
480            cache.upsert(peer).await;
481        }
482
483        // Select should return highest quality first (epsilon=0)
484        let selected = cache.select_peers(5).await;
485        assert_eq!(selected.len(), 5);
486        assert!(selected[0].quality_score >= selected[4].quality_score);
487    }
488
489    #[tokio::test]
490    async fn test_persistence() {
491        let temp_dir = TempDir::new().unwrap();
492
493        // Create and populate cache
494        {
495            let cache = create_test_cache(&temp_dir).await;
496            cache
497                .add_seed(PeerId([1; 32]), vec!["127.0.0.1:9000".parse().unwrap()])
498                .await;
499            cache.save().await.unwrap();
500        }
501
502        // Reopen and verify
503        {
504            let cache = create_test_cache(&temp_dir).await;
505            assert_eq!(cache.peer_count().await, 1);
506            assert!(cache.contains(&PeerId([1; 32])).await);
507        }
508    }
509
510    #[tokio::test]
511    async fn test_quality_scoring() {
512        let temp_dir = TempDir::new().unwrap();
513        let cache = create_test_cache(&temp_dir).await;
514
515        let peer_id = PeerId([1; 32]);
516        cache
517            .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
518            .await;
519
520        // Initial quality should be neutral
521        let peer = cache.get(&peer_id).await.unwrap();
522        let initial_quality = peer.quality_score;
523
524        // Record successes - quality should improve
525        for _ in 0..5 {
526            cache.record_success(&peer_id, 50).await;
527        }
528
529        let peer = cache.get(&peer_id).await.unwrap();
530        assert!(peer.quality_score > initial_quality);
531        assert!(peer.success_rate() > 0.9);
532    }
533
534    #[tokio::test]
535    async fn test_eviction() {
536        let temp_dir = TempDir::new().unwrap();
537        let config = BootstrapCacheConfig::builder()
538            .cache_dir(temp_dir.path())
539            .max_peers(10)
540            .build();
541
542        let cache = BootstrapCache::open(config).await.unwrap();
543
544        // Add 15 peers
545        for i in 0..15u8 {
546            let peer_id = PeerId([i; 32]);
547            let mut peer = CachedPeer::new(
548                peer_id,
549                vec![format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap()],
550                PeerSource::Seed,
551            );
552            peer.quality_score = i as f64 / 15.0;
553            cache.upsert(peer).await;
554        }
555
556        // Should have evicted some
557        assert!(cache.peer_count().await <= 10);
558    }
559
560    #[tokio::test]
561    async fn test_stats() {
562        let temp_dir = TempDir::new().unwrap();
563        let cache = create_test_cache(&temp_dir).await;
564
565        // Add some peers with capabilities
566        let mut peer1 = CachedPeer::new(
567            PeerId([1; 32]),
568            vec!["127.0.0.1:9001".parse().unwrap()],
569            PeerSource::Seed,
570        );
571        peer1.capabilities.supports_relay = true;
572        cache.upsert(peer1).await;
573
574        let mut peer2 = CachedPeer::new(
575            PeerId([2; 32]),
576            vec!["127.0.0.1:9002".parse().unwrap()],
577            PeerSource::Seed,
578        );
579        peer2.capabilities.supports_coordination = true;
580        cache.upsert(peer2).await;
581
582        cache
583            .add_seed(PeerId([3; 32]), vec!["127.0.0.1:9003".parse().unwrap()])
584            .await;
585
586        let stats = cache.stats().await;
587        assert_eq!(stats.total_peers, 3);
588        assert_eq!(stats.relay_peers, 1);
589        assert_eq!(stats.coordinator_peers, 1);
590        assert_eq!(stats.untested_peers, 3);
591    }
592
593    #[tokio::test]
594    async fn test_select_relay_peers() {
595        let temp_dir = TempDir::new().unwrap();
596        let cache = create_test_cache(&temp_dir).await;
597
598        // Add mix of relay and non-relay peers
599        for i in 0..10u8 {
600            let mut peer = CachedPeer::new(
601                PeerId([i; 32]),
602                vec![format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap()],
603                PeerSource::Seed,
604            );
605            peer.capabilities.supports_relay = i % 2 == 0;
606            peer.quality_score = i as f64 / 10.0;
607            cache.upsert(peer).await;
608        }
609
610        let relays = cache.select_relay_peers(10).await;
611        assert_eq!(relays.len(), 5); // Only half support relay
612
613        // All selected should support relay
614        for peer in &relays {
615            assert!(peer.capabilities.supports_relay);
616        }
617    }
618}