ant_quic/bootstrap_cache/
cache.rs1use super::config::BootstrapCacheConfig;
11use super::entry::{CachedPeer, ConnectionOutcome, PeerCapabilities, PeerSource};
12use super::persistence::{CacheData, CachePersistence};
13use super::selection::select_epsilon_greedy;
14use crate::nat_traversal_api::PeerId;
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Instant;
18use tokio::sync::{RwLock, broadcast};
19use tracing::{debug, info, warn};
20
21#[derive(Debug, Clone)]
23pub enum CacheEvent {
24 Updated {
26 peer_count: usize,
28 },
29 Saved,
31 Merged {
33 added: usize,
35 },
36 Cleaned {
38 removed: usize,
40 },
41}
42
43#[derive(Debug, Clone, Default)]
45pub struct CacheStats {
46 pub total_peers: usize,
48 pub relay_peers: usize,
50 pub coordinator_peers: usize,
52 pub dual_stack_relay_peers: usize,
54 pub average_quality: f64,
56 pub untested_peers: usize,
58}
59
60#[derive(Debug)]
66pub struct BootstrapCache {
67 config: BootstrapCacheConfig,
68 data: Arc<RwLock<CacheData>>,
69 persistence: CachePersistence,
70 event_tx: broadcast::Sender<CacheEvent>,
71 last_save: Arc<RwLock<Instant>>,
72 last_cleanup: Arc<RwLock<Instant>>,
73}
74
75impl BootstrapCache {
76 pub async fn open(config: BootstrapCacheConfig) -> std::io::Result<Self> {
81 let persistence = CachePersistence::new(&config.cache_dir, config.enable_file_locking)?;
82 let data = persistence.load()?;
83 let (event_tx, _) = broadcast::channel(256);
84 let now = Instant::now();
85
86 info!("Opened bootstrap cache with {} peers", data.peers.len());
87
88 Ok(Self {
89 config,
90 data: Arc::new(RwLock::new(data)),
91 persistence,
92 event_tx,
93 last_save: Arc::new(RwLock::new(now)),
94 last_cleanup: Arc::new(RwLock::new(now)),
95 })
96 }
97
98 pub fn subscribe(&self) -> broadcast::Receiver<CacheEvent> {
100 self.event_tx.subscribe()
101 }
102
103 pub async fn peer_count(&self) -> usize {
105 self.data.read().await.peers.len()
106 }
107
108 pub async fn get_peer(&self, peer_id: &PeerId) -> Option<CachedPeer> {
110 self.data.read().await.peers.get(&peer_id.0).cloned()
111 }
112
113 pub async fn select_peers(&self, count: usize) -> Vec<CachedPeer> {
118 let data = self.data.read().await;
119 let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
120
121 select_epsilon_greedy(&peers, count, self.config.epsilon)
122 .into_iter()
123 .cloned()
124 .collect()
125 }
126
127 pub async fn select_relay_peers(&self, count: usize) -> Vec<CachedPeer> {
131 let data = self.data.read().await;
132 let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
133
134 super::selection::select_with_capabilities(&peers, count, true, false)
135 .into_iter()
136 .cloned()
137 .collect()
138 }
139
140 pub async fn select_coordinators(&self, count: usize) -> Vec<CachedPeer> {
144 let data = self.data.read().await;
145 let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
146
147 super::selection::select_with_capabilities(&peers, count, false, true)
148 .into_iter()
149 .cloned()
150 .collect()
151 }
152
153 pub async fn select_relays_for_target(
163 &self,
164 count: usize,
165 target: &std::net::SocketAddr,
166 prefer_dual_stack: bool,
167 ) -> Vec<CachedPeer> {
168 use super::selection::select_relays_for_target;
169
170 let data = self.data.read().await;
171 let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
172
173 select_relays_for_target(&peers, count, target.is_ipv4(), prefer_dual_stack)
174 .into_iter()
175 .cloned()
176 .collect()
177 }
178
179 pub async fn select_dual_stack_relays(&self, count: usize) -> Vec<CachedPeer> {
183 use super::selection::select_dual_stack_relays;
184
185 let data = self.data.read().await;
186 let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
187
188 select_dual_stack_relays(&peers, count)
189 .into_iter()
190 .cloned()
191 .collect()
192 }
193
194 pub async fn upsert(&self, peer: CachedPeer) {
198 let mut data = self.data.write().await;
199
200 if data.peers.len() >= self.config.max_peers && !data.peers.contains_key(&peer.peer_id.0) {
202 self.evict_lowest_quality(&mut data);
203 }
204
205 data.peers.insert(peer.peer_id.0, peer);
206
207 let count = data.peers.len();
208 drop(data);
209
210 let _ = self
211 .event_tx
212 .send(CacheEvent::Updated { peer_count: count });
213 }
214
215 pub async fn add_seed(&self, peer_id: PeerId, addresses: Vec<SocketAddr>) {
217 let peer = CachedPeer::new(peer_id, addresses, PeerSource::Seed);
218 self.upsert(peer).await;
219 }
220
221 pub async fn add_from_connection(
223 &self,
224 peer_id: PeerId,
225 addresses: Vec<SocketAddr>,
226 caps: Option<PeerCapabilities>,
227 ) {
228 let mut peer = CachedPeer::new(peer_id, addresses, PeerSource::Connection);
229 if let Some(caps) = caps {
230 peer.capabilities = caps;
231 }
232 self.upsert(peer).await;
233 }
234
235 pub async fn record_outcome(&self, peer_id: &PeerId, outcome: ConnectionOutcome) {
237 let mut data = self.data.write().await;
238
239 if let Some(peer) = data.peers.get_mut(&peer_id.0) {
240 if outcome.success {
241 peer.record_success(
242 outcome.rtt_ms.unwrap_or(100),
243 outcome.capabilities_discovered,
244 );
245 } else {
246 peer.record_failure();
247 }
248
249 peer.calculate_quality(&self.config.weights);
251 }
252 }
253
254 pub async fn record_success(&self, peer_id: &PeerId, rtt_ms: u32) {
256 self.record_outcome(
257 peer_id,
258 ConnectionOutcome {
259 success: true,
260 rtt_ms: Some(rtt_ms),
261 capabilities_discovered: None,
262 },
263 )
264 .await;
265 }
266
267 pub async fn record_failure(&self, peer_id: &PeerId) {
269 self.record_outcome(
270 peer_id,
271 ConnectionOutcome {
272 success: false,
273 rtt_ms: None,
274 capabilities_discovered: None,
275 },
276 )
277 .await;
278 }
279
280 pub async fn update_capabilities(&self, peer_id: &PeerId, caps: PeerCapabilities) {
282 let mut data = self.data.write().await;
283
284 if let Some(peer) = data.peers.get_mut(&peer_id.0) {
285 peer.capabilities = caps;
286 peer.calculate_quality(&self.config.weights);
287 }
288 }
289
290 pub async fn get(&self, peer_id: &PeerId) -> Option<CachedPeer> {
292 self.data.read().await.peers.get(&peer_id.0).cloned()
293 }
294
295 pub async fn update_token(&self, peer_id: PeerId, token: Vec<u8>) {
297 let mut data = self.data.write().await;
298 if let Some(peer) = data.peers.get_mut(&peer_id.0) {
299 peer.token = Some(token);
300 }
301 }
302
303 pub async fn get_all_tokens(&self) -> std::collections::HashMap<PeerId, Vec<u8>> {
305 self.data
306 .read()
307 .await
308 .peers
309 .values()
310 .filter_map(|p| p.token.clone().map(|t| (p.peer_id, t)))
311 .collect()
312 }
313
314 pub async fn contains(&self, peer_id: &PeerId) -> bool {
316 self.data.read().await.peers.contains_key(&peer_id.0)
317 }
318
319 pub async fn remove(&self, peer_id: &PeerId) -> Option<CachedPeer> {
321 self.data.write().await.peers.remove(&peer_id.0)
322 }
323
324 pub async fn save(&self) -> std::io::Result<()> {
326 let mut data = self.data.write().await;
327
328 if data.peers.len() < self.config.min_peers_to_save {
329 debug!(
330 "Skipping save: only {} peers (min: {})",
331 data.peers.len(),
332 self.config.min_peers_to_save
333 );
334 return Ok(());
335 }
336
337 self.persistence.save(&mut data)?;
338
339 drop(data);
340 *self.last_save.write().await = Instant::now();
341 let _ = self.event_tx.send(CacheEvent::Saved);
342
343 Ok(())
344 }
345
346 pub async fn cleanup_stale(&self) -> usize {
351 let mut data = self.data.write().await;
352 let initial_count = data.peers.len();
353
354 data.peers
355 .retain(|_, peer| !peer.is_stale(self.config.stale_threshold));
356
357 let removed = initial_count - data.peers.len();
358
359 if removed > 0 {
360 info!("Cleaned up {} stale peers", removed);
361 let _ = self.event_tx.send(CacheEvent::Cleaned { removed });
362 }
363
364 drop(data);
365 *self.last_cleanup.write().await = Instant::now();
366
367 removed
368 }
369
370 pub async fn recalculate_quality(&self) {
372 let mut data = self.data.write().await;
373
374 for peer in data.peers.values_mut() {
375 peer.calculate_quality(&self.config.weights);
376 }
377
378 let count = data.peers.len();
379 let _ = self
380 .event_tx
381 .send(CacheEvent::Updated { peer_count: count });
382 }
383
384 pub async fn stats(&self) -> CacheStats {
386 let data = self.data.read().await;
387
388 let relay_count = data
389 .peers
390 .values()
391 .filter(|p| p.capabilities.supports_relay)
392 .count();
393 let coord_count = data
394 .peers
395 .values()
396 .filter(|p| p.capabilities.supports_coordination)
397 .count();
398 let dual_stack_count = data
399 .peers
400 .values()
401 .filter(|p| p.capabilities.supports_relay && p.capabilities.supports_dual_stack())
402 .count();
403 let untested = data
404 .peers
405 .values()
406 .filter(|p| p.stats.success_count + p.stats.failure_count == 0)
407 .count();
408 let avg_quality = if data.peers.is_empty() {
409 0.0
410 } else {
411 data.peers.values().map(|p| p.quality_score).sum::<f64>() / data.peers.len() as f64
412 };
413
414 CacheStats {
415 total_peers: data.peers.len(),
416 relay_peers: relay_count,
417 coordinator_peers: coord_count,
418 dual_stack_relay_peers: dual_stack_count,
419 average_quality: avg_quality,
420 untested_peers: untested,
421 }
422 }
423
424 pub fn start_maintenance(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
433 let cache = self;
434
435 tokio::spawn(async move {
436 let mut save_interval = tokio::time::interval(cache.config.save_interval);
437 let mut cleanup_interval = tokio::time::interval(cache.config.cleanup_interval);
438 let mut quality_interval = tokio::time::interval(cache.config.quality_update_interval);
439
440 loop {
441 tokio::select! {
442 _ = save_interval.tick() => {
443 if let Err(e) = cache.save().await {
444 warn!("Failed to save cache: {}", e);
445 }
446 }
447 _ = cleanup_interval.tick() => {
448 cache.cleanup_stale().await;
449 }
450 _ = quality_interval.tick() => {
451 cache.recalculate_quality().await;
452 }
453 }
454 }
455 })
456 }
457
458 pub async fn all_peers(&self) -> Vec<CachedPeer> {
460 self.data.read().await.peers.values().cloned().collect()
461 }
462
463 pub fn config(&self) -> &BootstrapCacheConfig {
465 &self.config
466 }
467
468 fn evict_lowest_quality(&self, data: &mut CacheData) {
469 let evict_count = (self.config.max_peers / 20).max(1); let mut sorted: Vec<_> = data.peers.iter().collect();
472 sorted.sort_by(|a, b| {
473 a.1.quality_score
474 .partial_cmp(&b.1.quality_score)
475 .unwrap_or(std::cmp::Ordering::Equal)
476 });
477
478 let to_remove: Vec<[u8; 32]> = sorted
479 .into_iter()
480 .take(evict_count)
481 .map(|(id, _)| *id)
482 .collect();
483
484 for id in to_remove {
485 data.peers.remove(&id);
486 }
487
488 debug!("Evicted {} lowest quality peers", evict_count);
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use tempfile::TempDir;
496
497 async fn create_test_cache(temp_dir: &TempDir) -> BootstrapCache {
498 let config = BootstrapCacheConfig::builder()
499 .cache_dir(temp_dir.path())
500 .max_peers(100)
501 .epsilon(0.0) .min_peers_to_save(1)
503 .build();
504
505 BootstrapCache::open(config).await.unwrap()
506 }
507
508 #[tokio::test]
509 async fn test_cache_creation() {
510 let temp_dir = TempDir::new().unwrap();
511 let cache = create_test_cache(&temp_dir).await;
512 assert_eq!(cache.peer_count().await, 0);
513 }
514
515 #[tokio::test]
516 async fn test_add_and_get() {
517 let temp_dir = TempDir::new().unwrap();
518 let cache = create_test_cache(&temp_dir).await;
519
520 let peer_id = PeerId([1u8; 32]);
521 cache
522 .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
523 .await;
524
525 assert_eq!(cache.peer_count().await, 1);
526 assert!(cache.contains(&peer_id).await);
527
528 let peer = cache.get(&peer_id).await.unwrap();
529 assert_eq!(peer.addresses.len(), 1);
530 }
531
532 #[tokio::test]
533 async fn test_select_peers() {
534 let temp_dir = TempDir::new().unwrap();
535 let cache = create_test_cache(&temp_dir).await;
536
537 for i in 0..10usize {
539 let peer_id = PeerId([i as u8; 32]);
540 let mut peer = CachedPeer::new(
541 peer_id,
542 vec![format!("127.0.0.1:{}", 9000 + i).parse().unwrap()],
543 PeerSource::Seed,
544 );
545 peer.quality_score = i as f64 / 10.0;
546 cache.upsert(peer).await;
547 }
548
549 let selected = cache.select_peers(5).await;
551 assert_eq!(selected.len(), 5);
552 assert!(selected[0].quality_score >= selected[4].quality_score);
553 }
554
555 #[tokio::test]
556 async fn test_persistence() {
557 let temp_dir = TempDir::new().unwrap();
558
559 {
561 let cache = create_test_cache(&temp_dir).await;
562 cache
563 .add_seed(PeerId([1; 32]), vec!["127.0.0.1:9000".parse().unwrap()])
564 .await;
565 cache.save().await.unwrap();
566 }
567
568 {
570 let cache = create_test_cache(&temp_dir).await;
571 assert_eq!(cache.peer_count().await, 1);
572 assert!(cache.contains(&PeerId([1; 32])).await);
573 }
574 }
575
576 #[tokio::test]
577 async fn test_quality_scoring() {
578 let temp_dir = TempDir::new().unwrap();
579 let cache = create_test_cache(&temp_dir).await;
580
581 let peer_id = PeerId([1; 32]);
582 cache
583 .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
584 .await;
585
586 let peer = cache.get(&peer_id).await.unwrap();
588 let initial_quality = peer.quality_score;
589
590 for _ in 0..5 {
592 cache.record_success(&peer_id, 50).await;
593 }
594
595 let peer = cache.get(&peer_id).await.unwrap();
596 assert!(peer.quality_score > initial_quality);
597 assert!(peer.success_rate() > 0.9);
598 }
599
600 #[tokio::test]
601 async fn test_eviction() {
602 let temp_dir = TempDir::new().unwrap();
603 let config = BootstrapCacheConfig::builder()
604 .cache_dir(temp_dir.path())
605 .max_peers(10)
606 .build();
607
608 let cache = BootstrapCache::open(config).await.unwrap();
609
610 for i in 0..15u8 {
612 let peer_id = PeerId([i; 32]);
613 let mut peer = CachedPeer::new(
614 peer_id,
615 vec![format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap()],
616 PeerSource::Seed,
617 );
618 peer.quality_score = i as f64 / 15.0;
619 cache.upsert(peer).await;
620 }
621
622 assert!(cache.peer_count().await <= 10);
624 }
625
626 #[tokio::test]
627 async fn test_stats() {
628 let temp_dir = TempDir::new().unwrap();
629 let cache = create_test_cache(&temp_dir).await;
630
631 let mut peer1 = CachedPeer::new(
633 PeerId([1; 32]),
634 vec!["127.0.0.1:9001".parse().unwrap()],
635 PeerSource::Seed,
636 );
637 peer1.capabilities.supports_relay = true;
638 cache.upsert(peer1).await;
639
640 let mut peer2 = CachedPeer::new(
641 PeerId([2; 32]),
642 vec!["127.0.0.1:9002".parse().unwrap()],
643 PeerSource::Seed,
644 );
645 peer2.capabilities.supports_coordination = true;
646 cache.upsert(peer2).await;
647
648 cache
649 .add_seed(PeerId([3; 32]), vec!["127.0.0.1:9003".parse().unwrap()])
650 .await;
651
652 let stats = cache.stats().await;
653 assert_eq!(stats.total_peers, 3);
654 assert_eq!(stats.relay_peers, 1);
655 assert_eq!(stats.coordinator_peers, 1);
656 assert_eq!(stats.untested_peers, 3);
657 }
658
659 #[tokio::test]
660 async fn test_select_relay_peers() {
661 let temp_dir = TempDir::new().unwrap();
662 let cache = create_test_cache(&temp_dir).await;
663
664 for i in 0..10u8 {
666 let mut peer = CachedPeer::new(
667 PeerId([i; 32]),
668 vec![format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap()],
669 PeerSource::Seed,
670 );
671 peer.capabilities.supports_relay = i % 2 == 0;
672 peer.quality_score = i as f64 / 10.0;
673 cache.upsert(peer).await;
674 }
675
676 let relays = cache.select_relay_peers(10).await;
679 assert_eq!(relays.len(), 10); let relay_capable = relays
683 .iter()
684 .take(5)
685 .filter(|p| p.capabilities.supports_relay)
686 .count();
687 assert_eq!(relay_capable, 5, "Relay-capable peers should be first");
688 }
689}