Skip to main content

ant_quic/bootstrap_cache/
cache.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! Main bootstrap cache implementation.
9
10use super::config::BootstrapCacheConfig;
11use super::entry::{CachedPeer, ConnectionOutcome, PeerCapabilities, PeerSource};
12use super::persistence::{CacheData, CachePersistence};
13use super::selection::select_epsilon_greedy;
14use crate::nat_traversal_api::PeerId;
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Instant;
18use tokio::sync::{RwLock, broadcast};
19use tracing::{debug, info, warn};
20
21/// Bootstrap cache event for notifications
22#[derive(Debug, Clone)]
23pub enum CacheEvent {
24    /// Cache was updated (peers added/removed/modified)
25    Updated {
26        /// Current peer count
27        peer_count: usize,
28    },
29    /// Cache was saved to disk
30    Saved,
31    /// Cache was merged from another source
32    Merged {
33        /// Number of peers added from merge
34        added: usize,
35    },
36    /// Stale peers were cleaned up
37    Cleaned {
38        /// Number of peers removed
39        removed: usize,
40    },
41}
42
43/// Cache statistics
44#[derive(Debug, Clone, Default)]
45pub struct CacheStats {
46    /// Total number of cached peers
47    pub total_peers: usize,
48    /// Peers that support relay
49    pub relay_peers: usize,
50    /// Peers that support NAT coordination
51    pub coordinator_peers: usize,
52    /// Peers that support dual-stack (IPv4 + IPv6) bridging
53    pub dual_stack_relay_peers: usize,
54    /// Average quality score across all peers
55    pub average_quality: f64,
56    /// Number of untested peers
57    pub untested_peers: usize,
58}
59
60/// Greedy bootstrap cache with quality-based peer selection.
61///
62/// This cache stores peer information with quality metrics and provides
63/// epsilon-greedy selection to balance exploitation (using known-good peers)
64/// with exploration (trying new peers to discover potentially better ones).
65#[derive(Debug)]
66pub struct BootstrapCache {
67    config: BootstrapCacheConfig,
68    data: Arc<RwLock<CacheData>>,
69    persistence: CachePersistence,
70    event_tx: broadcast::Sender<CacheEvent>,
71    last_save: Arc<RwLock<Instant>>,
72    last_cleanup: Arc<RwLock<Instant>>,
73}
74
75impl BootstrapCache {
76    // ... (existing open/subscribe methods)
77    /// Open or create a bootstrap cache.
78    ///
79    /// Loads existing cache data from disk if available, otherwise starts fresh.
80    pub async fn open(config: BootstrapCacheConfig) -> std::io::Result<Self> {
81        let persistence = CachePersistence::new(&config.cache_dir, config.enable_file_locking)?;
82        let data = persistence.load()?;
83        let (event_tx, _) = broadcast::channel(256);
84        let now = Instant::now();
85
86        info!("Opened bootstrap cache with {} peers", data.peers.len());
87
88        Ok(Self {
89            config,
90            data: Arc::new(RwLock::new(data)),
91            persistence,
92            event_tx,
93            last_save: Arc::new(RwLock::new(now)),
94            last_cleanup: Arc::new(RwLock::new(now)),
95        })
96    }
97
98    /// Subscribe to cache events
99    pub fn subscribe(&self) -> broadcast::Receiver<CacheEvent> {
100        self.event_tx.subscribe()
101    }
102
103    /// Get the number of cached peers
104    pub async fn peer_count(&self) -> usize {
105        self.data.read().await.peers.len()
106    }
107
108    /// Get a specific peer from the cache
109    pub async fn get_peer(&self, peer_id: &PeerId) -> Option<CachedPeer> {
110        self.data.read().await.peers.get(&peer_id.0).cloned()
111    }
112
113    /// Select peers for bootstrap using epsilon-greedy strategy.
114    ///
115    /// Returns up to `count` peers, balancing exploitation of known-good peers
116    /// with exploration of untested peers based on the configured epsilon.
117    pub async fn select_peers(&self, count: usize) -> Vec<CachedPeer> {
118        let data = self.data.read().await;
119        let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
120
121        select_epsilon_greedy(&peers, count, self.config.epsilon)
122            .into_iter()
123            .cloned()
124            .collect()
125    }
126
127    /// Select peers that support relay functionality.
128    ///
129    /// Returns peers sorted by quality score, preferring observed relay capability.
130    pub async fn select_relay_peers(&self, count: usize) -> Vec<CachedPeer> {
131        let data = self.data.read().await;
132        let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
133
134        super::selection::select_with_capabilities(&peers, count, true, false)
135            .into_iter()
136            .cloned()
137            .collect()
138    }
139
140    /// Select peers that support NAT coordination.
141    ///
142    /// Returns peers sorted by quality score, preferring observed coordination capability.
143    pub async fn select_coordinators(&self, count: usize) -> Vec<CachedPeer> {
144        let data = self.data.read().await;
145        let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
146
147        super::selection::select_with_capabilities(&peers, count, false, true)
148            .into_iter()
149            .cloned()
150            .collect()
151    }
152
153    /// Select relay peers that can reach a target IP version.
154    ///
155    /// Returns relays sorted by quality that can bridge traffic to the target.
156    /// Dual-stack relays are preferred as they can reach any target.
157    ///
158    /// # Arguments
159    /// * `count` - Maximum number of relays to return
160    /// * `target` - The target address to reach
161    /// * `prefer_dual_stack` - If true, prioritize dual-stack relays
162    pub async fn select_relays_for_target(
163        &self,
164        count: usize,
165        target: &std::net::SocketAddr,
166        prefer_dual_stack: bool,
167    ) -> Vec<CachedPeer> {
168        use super::selection::select_relays_for_target;
169
170        let data = self.data.read().await;
171        let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
172
173        select_relays_for_target(&peers, count, target.is_ipv4(), prefer_dual_stack)
174            .into_iter()
175            .cloned()
176            .collect()
177    }
178
179    /// Select relay peers that support dual-stack (IPv4 + IPv6) bridging.
180    ///
181    /// These peers are valuable for bridging between IPv4-only and IPv6-only networks.
182    pub async fn select_dual_stack_relays(&self, count: usize) -> Vec<CachedPeer> {
183        use super::selection::select_dual_stack_relays;
184
185        let data = self.data.read().await;
186        let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
187
188        select_dual_stack_relays(&peers, count)
189            .into_iter()
190            .cloned()
191            .collect()
192    }
193
194    /// Add or update a peer in the cache.
195    ///
196    /// If the cache is at capacity, evicts the lowest quality peers.
197    pub async fn upsert(&self, peer: CachedPeer) {
198        let mut data = self.data.write().await;
199
200        // Evict lowest quality if at capacity
201        if data.peers.len() >= self.config.max_peers && !data.peers.contains_key(&peer.peer_id.0) {
202            self.evict_lowest_quality(&mut data);
203        }
204
205        data.peers.insert(peer.peer_id.0, peer);
206
207        let count = data.peers.len();
208        drop(data);
209
210        let _ = self
211            .event_tx
212            .send(CacheEvent::Updated { peer_count: count });
213    }
214
215    /// Add a seed peer (user-provided bootstrap node).
216    pub async fn add_seed(&self, peer_id: PeerId, addresses: Vec<SocketAddr>) {
217        let peer = CachedPeer::new(peer_id, addresses, PeerSource::Seed);
218        self.upsert(peer).await;
219    }
220
221    /// Add a peer discovered from an active connection.
222    pub async fn add_from_connection(
223        &self,
224        peer_id: PeerId,
225        addresses: Vec<SocketAddr>,
226        caps: Option<PeerCapabilities>,
227    ) {
228        let mut peer = CachedPeer::new(peer_id, addresses, PeerSource::Connection);
229        if let Some(caps) = caps {
230            peer.capabilities = caps;
231        }
232        self.upsert(peer).await;
233    }
234
235    /// Record a connection attempt result.
236    pub async fn record_outcome(&self, peer_id: &PeerId, outcome: ConnectionOutcome) {
237        let mut data = self.data.write().await;
238
239        if let Some(peer) = data.peers.get_mut(&peer_id.0) {
240            if outcome.success {
241                peer.record_success(
242                    outcome.rtt_ms.unwrap_or(100),
243                    outcome.capabilities_discovered,
244                );
245            } else {
246                peer.record_failure();
247            }
248
249            // Recalculate quality score
250            peer.calculate_quality(&self.config.weights);
251        }
252    }
253
254    /// Record successful connection.
255    pub async fn record_success(&self, peer_id: &PeerId, rtt_ms: u32) {
256        self.record_outcome(
257            peer_id,
258            ConnectionOutcome {
259                success: true,
260                rtt_ms: Some(rtt_ms),
261                capabilities_discovered: None,
262            },
263        )
264        .await;
265    }
266
267    /// Record failed connection.
268    pub async fn record_failure(&self, peer_id: &PeerId) {
269        self.record_outcome(
270            peer_id,
271            ConnectionOutcome {
272                success: false,
273                rtt_ms: None,
274                capabilities_discovered: None,
275            },
276        )
277        .await;
278    }
279
280    /// Update peer capabilities.
281    pub async fn update_capabilities(&self, peer_id: &PeerId, caps: PeerCapabilities) {
282        let mut data = self.data.write().await;
283
284        if let Some(peer) = data.peers.get_mut(&peer_id.0) {
285            peer.capabilities = caps;
286            peer.calculate_quality(&self.config.weights);
287        }
288    }
289
290    /// Get a specific peer.
291    pub async fn get(&self, peer_id: &PeerId) -> Option<CachedPeer> {
292        self.data.read().await.peers.get(&peer_id.0).cloned()
293    }
294
295    /// Update the address validation token for a peer
296    pub async fn update_token(&self, peer_id: PeerId, token: Vec<u8>) {
297        let mut data = self.data.write().await;
298        if let Some(peer) = data.peers.get_mut(&peer_id.0) {
299            peer.token = Some(token);
300        }
301    }
302
303    /// Get all tokens from cached peers (for initializing TokenStore)
304    pub async fn get_all_tokens(&self) -> std::collections::HashMap<PeerId, Vec<u8>> {
305        self.data
306            .read()
307            .await
308            .peers
309            .values()
310            .filter_map(|p| p.token.clone().map(|t| (p.peer_id, t)))
311            .collect()
312    }
313
314    /// Check if peer exists in cache.
315    pub async fn contains(&self, peer_id: &PeerId) -> bool {
316        self.data.read().await.peers.contains_key(&peer_id.0)
317    }
318
319    /// Remove a peer from cache.
320    pub async fn remove(&self, peer_id: &PeerId) -> Option<CachedPeer> {
321        self.data.write().await.peers.remove(&peer_id.0)
322    }
323
324    /// Save cache to disk.
325    pub async fn save(&self) -> std::io::Result<()> {
326        let mut data = self.data.write().await;
327
328        if data.peers.len() < self.config.min_peers_to_save {
329            debug!(
330                "Skipping save: only {} peers (min: {})",
331                data.peers.len(),
332                self.config.min_peers_to_save
333            );
334            return Ok(());
335        }
336
337        self.persistence.save(&mut data)?;
338
339        drop(data);
340        *self.last_save.write().await = Instant::now();
341        let _ = self.event_tx.send(CacheEvent::Saved);
342
343        Ok(())
344    }
345
346    /// Cleanup stale peers.
347    ///
348    /// Removes peers that haven't been seen within the stale threshold.
349    /// Returns the number of peers removed.
350    pub async fn cleanup_stale(&self) -> usize {
351        let mut data = self.data.write().await;
352        let initial_count = data.peers.len();
353
354        data.peers
355            .retain(|_, peer| !peer.is_stale(self.config.stale_threshold));
356
357        let removed = initial_count - data.peers.len();
358
359        if removed > 0 {
360            info!("Cleaned up {} stale peers", removed);
361            let _ = self.event_tx.send(CacheEvent::Cleaned { removed });
362        }
363
364        drop(data);
365        *self.last_cleanup.write().await = Instant::now();
366
367        removed
368    }
369
370    /// Recalculate quality scores for all peers.
371    pub async fn recalculate_quality(&self) {
372        let mut data = self.data.write().await;
373
374        for peer in data.peers.values_mut() {
375            peer.calculate_quality(&self.config.weights);
376        }
377
378        let count = data.peers.len();
379        let _ = self
380            .event_tx
381            .send(CacheEvent::Updated { peer_count: count });
382    }
383
384    /// Get cache statistics.
385    pub async fn stats(&self) -> CacheStats {
386        let data = self.data.read().await;
387
388        let relay_count = data
389            .peers
390            .values()
391            .filter(|p| p.capabilities.supports_relay)
392            .count();
393        let coord_count = data
394            .peers
395            .values()
396            .filter(|p| p.capabilities.supports_coordination)
397            .count();
398        let dual_stack_count = data
399            .peers
400            .values()
401            .filter(|p| p.capabilities.supports_relay && p.capabilities.supports_dual_stack())
402            .count();
403        let untested = data
404            .peers
405            .values()
406            .filter(|p| p.stats.success_count + p.stats.failure_count == 0)
407            .count();
408        let avg_quality = if data.peers.is_empty() {
409            0.0
410        } else {
411            data.peers.values().map(|p| p.quality_score).sum::<f64>() / data.peers.len() as f64
412        };
413
414        CacheStats {
415            total_peers: data.peers.len(),
416            relay_peers: relay_count,
417            coordinator_peers: coord_count,
418            dual_stack_relay_peers: dual_stack_count,
419            average_quality: avg_quality,
420            untested_peers: untested,
421        }
422    }
423
424    /// Start background maintenance tasks.
425    ///
426    /// Spawns a task that periodically:
427    /// - Saves the cache to disk
428    /// - Cleans up stale peers
429    /// - Recalculates quality scores
430    ///
431    /// Returns a handle that can be used to cancel the task.
432    pub fn start_maintenance(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
433        let cache = self;
434
435        tokio::spawn(async move {
436            let mut save_interval = tokio::time::interval(cache.config.save_interval);
437            let mut cleanup_interval = tokio::time::interval(cache.config.cleanup_interval);
438            let mut quality_interval = tokio::time::interval(cache.config.quality_update_interval);
439
440            loop {
441                tokio::select! {
442                    _ = save_interval.tick() => {
443                        if let Err(e) = cache.save().await {
444                            warn!("Failed to save cache: {}", e);
445                        }
446                    }
447                    _ = cleanup_interval.tick() => {
448                        cache.cleanup_stale().await;
449                    }
450                    _ = quality_interval.tick() => {
451                        cache.recalculate_quality().await;
452                    }
453                }
454            }
455        })
456    }
457
458    /// Get all cached peers (for export/debug).
459    pub async fn all_peers(&self) -> Vec<CachedPeer> {
460        self.data.read().await.peers.values().cloned().collect()
461    }
462
463    /// Get the configuration.
464    pub fn config(&self) -> &BootstrapCacheConfig {
465        &self.config
466    }
467
468    fn evict_lowest_quality(&self, data: &mut CacheData) {
469        let evict_count = (self.config.max_peers / 20).max(1); // Evict ~5%
470
471        let mut sorted: Vec<_> = data.peers.iter().collect();
472        sorted.sort_by(|a, b| {
473            a.1.quality_score
474                .partial_cmp(&b.1.quality_score)
475                .unwrap_or(std::cmp::Ordering::Equal)
476        });
477
478        let to_remove: Vec<[u8; 32]> = sorted
479            .into_iter()
480            .take(evict_count)
481            .map(|(id, _)| *id)
482            .collect();
483
484        for id in to_remove {
485            data.peers.remove(&id);
486        }
487
488        debug!("Evicted {} lowest quality peers", evict_count);
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use tempfile::TempDir;
496
497    async fn create_test_cache(temp_dir: &TempDir) -> BootstrapCache {
498        let config = BootstrapCacheConfig::builder()
499            .cache_dir(temp_dir.path())
500            .max_peers(100)
501            .epsilon(0.0) // Pure exploitation for predictable tests
502            .min_peers_to_save(1)
503            .build();
504
505        BootstrapCache::open(config).await.unwrap()
506    }
507
508    #[tokio::test]
509    async fn test_cache_creation() {
510        let temp_dir = TempDir::new().unwrap();
511        let cache = create_test_cache(&temp_dir).await;
512        assert_eq!(cache.peer_count().await, 0);
513    }
514
515    #[tokio::test]
516    async fn test_add_and_get() {
517        let temp_dir = TempDir::new().unwrap();
518        let cache = create_test_cache(&temp_dir).await;
519
520        let peer_id = PeerId([1u8; 32]);
521        cache
522            .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
523            .await;
524
525        assert_eq!(cache.peer_count().await, 1);
526        assert!(cache.contains(&peer_id).await);
527
528        let peer = cache.get(&peer_id).await.unwrap();
529        assert_eq!(peer.addresses.len(), 1);
530    }
531
532    #[tokio::test]
533    async fn test_select_peers() {
534        let temp_dir = TempDir::new().unwrap();
535        let cache = create_test_cache(&temp_dir).await;
536
537        // Add peers with different quality
538        for i in 0..10usize {
539            let peer_id = PeerId([i as u8; 32]);
540            let mut peer = CachedPeer::new(
541                peer_id,
542                vec![format!("127.0.0.1:{}", 9000 + i).parse().unwrap()],
543                PeerSource::Seed,
544            );
545            peer.quality_score = i as f64 / 10.0;
546            cache.upsert(peer).await;
547        }
548
549        // Select should return highest quality first (epsilon=0)
550        let selected = cache.select_peers(5).await;
551        assert_eq!(selected.len(), 5);
552        assert!(selected[0].quality_score >= selected[4].quality_score);
553    }
554
555    #[tokio::test]
556    async fn test_persistence() {
557        let temp_dir = TempDir::new().unwrap();
558
559        // Create and populate cache
560        {
561            let cache = create_test_cache(&temp_dir).await;
562            cache
563                .add_seed(PeerId([1; 32]), vec!["127.0.0.1:9000".parse().unwrap()])
564                .await;
565            cache.save().await.unwrap();
566        }
567
568        // Reopen and verify
569        {
570            let cache = create_test_cache(&temp_dir).await;
571            assert_eq!(cache.peer_count().await, 1);
572            assert!(cache.contains(&PeerId([1; 32])).await);
573        }
574    }
575
576    #[tokio::test]
577    async fn test_quality_scoring() {
578        let temp_dir = TempDir::new().unwrap();
579        let cache = create_test_cache(&temp_dir).await;
580
581        let peer_id = PeerId([1; 32]);
582        cache
583            .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
584            .await;
585
586        // Initial quality should be neutral
587        let peer = cache.get(&peer_id).await.unwrap();
588        let initial_quality = peer.quality_score;
589
590        // Record successes - quality should improve
591        for _ in 0..5 {
592            cache.record_success(&peer_id, 50).await;
593        }
594
595        let peer = cache.get(&peer_id).await.unwrap();
596        assert!(peer.quality_score > initial_quality);
597        assert!(peer.success_rate() > 0.9);
598    }
599
600    #[tokio::test]
601    async fn test_eviction() {
602        let temp_dir = TempDir::new().unwrap();
603        let config = BootstrapCacheConfig::builder()
604            .cache_dir(temp_dir.path())
605            .max_peers(10)
606            .build();
607
608        let cache = BootstrapCache::open(config).await.unwrap();
609
610        // Add 15 peers
611        for i in 0..15u8 {
612            let peer_id = PeerId([i; 32]);
613            let mut peer = CachedPeer::new(
614                peer_id,
615                vec![format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap()],
616                PeerSource::Seed,
617            );
618            peer.quality_score = i as f64 / 15.0;
619            cache.upsert(peer).await;
620        }
621
622        // Should have evicted some
623        assert!(cache.peer_count().await <= 10);
624    }
625
626    #[tokio::test]
627    async fn test_stats() {
628        let temp_dir = TempDir::new().unwrap();
629        let cache = create_test_cache(&temp_dir).await;
630
631        // Add some peers with capabilities
632        let mut peer1 = CachedPeer::new(
633            PeerId([1; 32]),
634            vec!["127.0.0.1:9001".parse().unwrap()],
635            PeerSource::Seed,
636        );
637        peer1.capabilities.supports_relay = true;
638        cache.upsert(peer1).await;
639
640        let mut peer2 = CachedPeer::new(
641            PeerId([2; 32]),
642            vec!["127.0.0.1:9002".parse().unwrap()],
643            PeerSource::Seed,
644        );
645        peer2.capabilities.supports_coordination = true;
646        cache.upsert(peer2).await;
647
648        cache
649            .add_seed(PeerId([3; 32]), vec!["127.0.0.1:9003".parse().unwrap()])
650            .await;
651
652        let stats = cache.stats().await;
653        assert_eq!(stats.total_peers, 3);
654        assert_eq!(stats.relay_peers, 1);
655        assert_eq!(stats.coordinator_peers, 1);
656        assert_eq!(stats.untested_peers, 3);
657    }
658
659    #[tokio::test]
660    async fn test_select_relay_peers() {
661        let temp_dir = TempDir::new().unwrap();
662        let cache = create_test_cache(&temp_dir).await;
663
664        // Add mix of relay and non-relay peers
665        for i in 0..10u8 {
666            let mut peer = CachedPeer::new(
667                PeerId([i; 32]),
668                vec![format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap()],
669                PeerSource::Seed,
670            );
671            peer.capabilities.supports_relay = i % 2 == 0;
672            peer.quality_score = i as f64 / 10.0;
673            cache.upsert(peer).await;
674        }
675
676        // v0.13.0+: Measure, don't trust - returns all peers but prefers
677        // those with observed relay capability.
678        let relays = cache.select_relay_peers(10).await;
679        assert_eq!(relays.len(), 10); // All peers are candidates
680
681        // First 5 should have relay capability (prioritized)
682        let relay_capable = relays
683            .iter()
684            .take(5)
685            .filter(|p| p.capabilities.supports_relay)
686            .count();
687        assert_eq!(relay_capable, 5, "Relay-capable peers should be first");
688    }
689}