ant_quic/bootstrap_cache/
cache.rs1use super::config::BootstrapCacheConfig;
4use super::entry::{CachedPeer, ConnectionOutcome, PeerCapabilities, PeerSource};
5use super::persistence::{CacheData, CachePersistence};
6use super::selection::select_epsilon_greedy;
7use crate::nat_traversal_api::PeerId;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use std::time::Instant;
11use tokio::sync::{RwLock, broadcast};
12use tracing::{debug, info, warn};
13
14#[derive(Debug, Clone)]
16pub enum CacheEvent {
17 Updated {
19 peer_count: usize,
21 },
22 Saved,
24 Merged {
26 added: usize,
28 },
29 Cleaned {
31 removed: usize,
33 },
34}
35
36#[derive(Debug, Clone, Default)]
38pub struct CacheStats {
39 pub total_peers: usize,
41 pub relay_peers: usize,
43 pub coordinator_peers: usize,
45 pub average_quality: f64,
47 pub untested_peers: usize,
49}
50
51pub struct BootstrapCache {
57 config: BootstrapCacheConfig,
58 data: Arc<RwLock<CacheData>>,
59 persistence: CachePersistence,
60 event_tx: broadcast::Sender<CacheEvent>,
61 last_save: Arc<RwLock<Instant>>,
62 last_cleanup: Arc<RwLock<Instant>>,
63}
64
65impl BootstrapCache {
66 pub async fn open(config: BootstrapCacheConfig) -> std::io::Result<Self> {
70 let persistence = CachePersistence::new(&config.cache_dir, config.enable_file_locking)?;
71 let data = persistence.load()?;
72 let (event_tx, _) = broadcast::channel(256);
73 let now = Instant::now();
74
75 info!("Opened bootstrap cache with {} peers", data.peers.len());
76
77 Ok(Self {
78 config,
79 data: Arc::new(RwLock::new(data)),
80 persistence,
81 event_tx,
82 last_save: Arc::new(RwLock::new(now)),
83 last_cleanup: Arc::new(RwLock::new(now)),
84 })
85 }
86
87 pub fn subscribe(&self) -> broadcast::Receiver<CacheEvent> {
89 self.event_tx.subscribe()
90 }
91
92 pub async fn peer_count(&self) -> usize {
94 self.data.read().await.peers.len()
95 }
96
97 pub async fn select_peers(&self, count: usize) -> Vec<CachedPeer> {
102 let data = self.data.read().await;
103 let peers: Vec<CachedPeer> = data.peers.values().cloned().collect();
104
105 select_epsilon_greedy(&peers, count, self.config.epsilon)
106 .into_iter()
107 .cloned()
108 .collect()
109 }
110
111 pub async fn select_relay_peers(&self, count: usize) -> Vec<CachedPeer> {
115 let data = self.data.read().await;
116 let mut relays: Vec<CachedPeer> = data
117 .peers
118 .values()
119 .filter(|p| p.capabilities.supports_relay)
120 .cloned()
121 .collect();
122
123 relays.sort_by(|a, b| {
124 b.quality_score
125 .partial_cmp(&a.quality_score)
126 .unwrap_or(std::cmp::Ordering::Equal)
127 });
128
129 relays.into_iter().take(count).collect()
130 }
131
132 pub async fn select_coordinators(&self, count: usize) -> Vec<CachedPeer> {
136 let data = self.data.read().await;
137 let mut coordinators: Vec<CachedPeer> = data
138 .peers
139 .values()
140 .filter(|p| p.capabilities.supports_coordination)
141 .cloned()
142 .collect();
143
144 coordinators.sort_by(|a, b| {
145 b.quality_score
146 .partial_cmp(&a.quality_score)
147 .unwrap_or(std::cmp::Ordering::Equal)
148 });
149
150 coordinators.into_iter().take(count).collect()
151 }
152
153 pub async fn upsert(&self, peer: CachedPeer) {
157 let mut data = self.data.write().await;
158
159 if data.peers.len() >= self.config.max_peers && !data.peers.contains_key(&peer.peer_id.0) {
161 self.evict_lowest_quality(&mut data);
162 }
163
164 data.peers.insert(peer.peer_id.0, peer);
165
166 let count = data.peers.len();
167 drop(data);
168
169 let _ = self
170 .event_tx
171 .send(CacheEvent::Updated { peer_count: count });
172 }
173
174 pub async fn add_seed(&self, peer_id: PeerId, addresses: Vec<SocketAddr>) {
176 let peer = CachedPeer::new(peer_id, addresses, PeerSource::Seed);
177 self.upsert(peer).await;
178 }
179
180 pub async fn add_from_connection(
182 &self,
183 peer_id: PeerId,
184 addresses: Vec<SocketAddr>,
185 caps: Option<PeerCapabilities>,
186 ) {
187 let mut peer = CachedPeer::new(peer_id, addresses, PeerSource::Connection);
188 if let Some(caps) = caps {
189 peer.capabilities = caps;
190 }
191 self.upsert(peer).await;
192 }
193
194 pub async fn record_outcome(&self, peer_id: &PeerId, outcome: ConnectionOutcome) {
196 let mut data = self.data.write().await;
197
198 if let Some(peer) = data.peers.get_mut(&peer_id.0) {
199 if outcome.success {
200 peer.record_success(
201 outcome.rtt_ms.unwrap_or(100),
202 outcome.capabilities_discovered,
203 );
204 } else {
205 peer.record_failure();
206 }
207
208 peer.calculate_quality(&self.config.weights);
210 }
211 }
212
213 pub async fn record_success(&self, peer_id: &PeerId, rtt_ms: u32) {
215 self.record_outcome(
216 peer_id,
217 ConnectionOutcome {
218 success: true,
219 rtt_ms: Some(rtt_ms),
220 capabilities_discovered: None,
221 },
222 )
223 .await;
224 }
225
226 pub async fn record_failure(&self, peer_id: &PeerId) {
228 self.record_outcome(
229 peer_id,
230 ConnectionOutcome {
231 success: false,
232 rtt_ms: None,
233 capabilities_discovered: None,
234 },
235 )
236 .await;
237 }
238
239 pub async fn update_capabilities(&self, peer_id: &PeerId, caps: PeerCapabilities) {
241 let mut data = self.data.write().await;
242
243 if let Some(peer) = data.peers.get_mut(&peer_id.0) {
244 peer.capabilities = caps;
245 peer.calculate_quality(&self.config.weights);
246 }
247 }
248
249 pub async fn get(&self, peer_id: &PeerId) -> Option<CachedPeer> {
251 self.data.read().await.peers.get(&peer_id.0).cloned()
252 }
253
254 pub async fn contains(&self, peer_id: &PeerId) -> bool {
256 self.data.read().await.peers.contains_key(&peer_id.0)
257 }
258
259 pub async fn remove(&self, peer_id: &PeerId) -> Option<CachedPeer> {
261 self.data.write().await.peers.remove(&peer_id.0)
262 }
263
264 pub async fn save(&self) -> std::io::Result<()> {
266 let mut data = self.data.write().await;
267
268 if data.peers.len() < self.config.min_peers_to_save {
269 debug!(
270 "Skipping save: only {} peers (min: {})",
271 data.peers.len(),
272 self.config.min_peers_to_save
273 );
274 return Ok(());
275 }
276
277 self.persistence.save(&mut data)?;
278
279 drop(data);
280 *self.last_save.write().await = Instant::now();
281 let _ = self.event_tx.send(CacheEvent::Saved);
282
283 Ok(())
284 }
285
286 pub async fn cleanup_stale(&self) -> usize {
291 let mut data = self.data.write().await;
292 let initial_count = data.peers.len();
293
294 data.peers
295 .retain(|_, peer| !peer.is_stale(self.config.stale_threshold));
296
297 let removed = initial_count - data.peers.len();
298
299 if removed > 0 {
300 info!("Cleaned up {} stale peers", removed);
301 let _ = self.event_tx.send(CacheEvent::Cleaned { removed });
302 }
303
304 drop(data);
305 *self.last_cleanup.write().await = Instant::now();
306
307 removed
308 }
309
310 pub async fn recalculate_quality(&self) {
312 let mut data = self.data.write().await;
313
314 for peer in data.peers.values_mut() {
315 peer.calculate_quality(&self.config.weights);
316 }
317
318 let count = data.peers.len();
319 let _ = self
320 .event_tx
321 .send(CacheEvent::Updated { peer_count: count });
322 }
323
324 pub async fn stats(&self) -> CacheStats {
326 let data = self.data.read().await;
327
328 let relay_count = data
329 .peers
330 .values()
331 .filter(|p| p.capabilities.supports_relay)
332 .count();
333 let coord_count = data
334 .peers
335 .values()
336 .filter(|p| p.capabilities.supports_coordination)
337 .count();
338 let untested = data
339 .peers
340 .values()
341 .filter(|p| p.stats.success_count + p.stats.failure_count == 0)
342 .count();
343 let avg_quality = if data.peers.is_empty() {
344 0.0
345 } else {
346 data.peers.values().map(|p| p.quality_score).sum::<f64>() / data.peers.len() as f64
347 };
348
349 CacheStats {
350 total_peers: data.peers.len(),
351 relay_peers: relay_count,
352 coordinator_peers: coord_count,
353 average_quality: avg_quality,
354 untested_peers: untested,
355 }
356 }
357
358 pub fn start_maintenance(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
367 let cache = self;
368
369 tokio::spawn(async move {
370 let mut save_interval = tokio::time::interval(cache.config.save_interval);
371 let mut cleanup_interval = tokio::time::interval(cache.config.cleanup_interval);
372 let mut quality_interval = tokio::time::interval(cache.config.quality_update_interval);
373
374 loop {
375 tokio::select! {
376 _ = save_interval.tick() => {
377 if let Err(e) = cache.save().await {
378 warn!("Failed to save cache: {}", e);
379 }
380 }
381 _ = cleanup_interval.tick() => {
382 cache.cleanup_stale().await;
383 }
384 _ = quality_interval.tick() => {
385 cache.recalculate_quality().await;
386 }
387 }
388 }
389 })
390 }
391
392 pub async fn all_peers(&self) -> Vec<CachedPeer> {
394 self.data.read().await.peers.values().cloned().collect()
395 }
396
397 pub fn config(&self) -> &BootstrapCacheConfig {
399 &self.config
400 }
401
402 fn evict_lowest_quality(&self, data: &mut CacheData) {
403 let evict_count = (self.config.max_peers / 20).max(1); let mut sorted: Vec<_> = data.peers.iter().collect();
406 sorted.sort_by(|a, b| {
407 a.1.quality_score
408 .partial_cmp(&b.1.quality_score)
409 .unwrap_or(std::cmp::Ordering::Equal)
410 });
411
412 let to_remove: Vec<[u8; 32]> = sorted
413 .into_iter()
414 .take(evict_count)
415 .map(|(id, _)| *id)
416 .collect();
417
418 for id in to_remove {
419 data.peers.remove(&id);
420 }
421
422 debug!("Evicted {} lowest quality peers", evict_count);
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429 use tempfile::TempDir;
430
431 async fn create_test_cache(temp_dir: &TempDir) -> BootstrapCache {
432 let config = BootstrapCacheConfig::builder()
433 .cache_dir(temp_dir.path())
434 .max_peers(100)
435 .epsilon(0.0) .min_peers_to_save(1)
437 .build();
438
439 BootstrapCache::open(config).await.unwrap()
440 }
441
442 #[tokio::test]
443 async fn test_cache_creation() {
444 let temp_dir = TempDir::new().unwrap();
445 let cache = create_test_cache(&temp_dir).await;
446 assert_eq!(cache.peer_count().await, 0);
447 }
448
449 #[tokio::test]
450 async fn test_add_and_get() {
451 let temp_dir = TempDir::new().unwrap();
452 let cache = create_test_cache(&temp_dir).await;
453
454 let peer_id = PeerId([1u8; 32]);
455 cache
456 .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
457 .await;
458
459 assert_eq!(cache.peer_count().await, 1);
460 assert!(cache.contains(&peer_id).await);
461
462 let peer = cache.get(&peer_id).await.unwrap();
463 assert_eq!(peer.addresses.len(), 1);
464 }
465
466 #[tokio::test]
467 async fn test_select_peers() {
468 let temp_dir = TempDir::new().unwrap();
469 let cache = create_test_cache(&temp_dir).await;
470
471 for i in 0..10usize {
473 let peer_id = PeerId([i as u8; 32]);
474 let mut peer = CachedPeer::new(
475 peer_id,
476 vec![format!("127.0.0.1:{}", 9000 + i).parse().unwrap()],
477 PeerSource::Seed,
478 );
479 peer.quality_score = i as f64 / 10.0;
480 cache.upsert(peer).await;
481 }
482
483 let selected = cache.select_peers(5).await;
485 assert_eq!(selected.len(), 5);
486 assert!(selected[0].quality_score >= selected[4].quality_score);
487 }
488
489 #[tokio::test]
490 async fn test_persistence() {
491 let temp_dir = TempDir::new().unwrap();
492
493 {
495 let cache = create_test_cache(&temp_dir).await;
496 cache
497 .add_seed(PeerId([1; 32]), vec!["127.0.0.1:9000".parse().unwrap()])
498 .await;
499 cache.save().await.unwrap();
500 }
501
502 {
504 let cache = create_test_cache(&temp_dir).await;
505 assert_eq!(cache.peer_count().await, 1);
506 assert!(cache.contains(&PeerId([1; 32])).await);
507 }
508 }
509
510 #[tokio::test]
511 async fn test_quality_scoring() {
512 let temp_dir = TempDir::new().unwrap();
513 let cache = create_test_cache(&temp_dir).await;
514
515 let peer_id = PeerId([1; 32]);
516 cache
517 .add_seed(peer_id, vec!["127.0.0.1:9000".parse().unwrap()])
518 .await;
519
520 let peer = cache.get(&peer_id).await.unwrap();
522 let initial_quality = peer.quality_score;
523
524 for _ in 0..5 {
526 cache.record_success(&peer_id, 50).await;
527 }
528
529 let peer = cache.get(&peer_id).await.unwrap();
530 assert!(peer.quality_score > initial_quality);
531 assert!(peer.success_rate() > 0.9);
532 }
533
534 #[tokio::test]
535 async fn test_eviction() {
536 let temp_dir = TempDir::new().unwrap();
537 let config = BootstrapCacheConfig::builder()
538 .cache_dir(temp_dir.path())
539 .max_peers(10)
540 .build();
541
542 let cache = BootstrapCache::open(config).await.unwrap();
543
544 for i in 0..15u8 {
546 let peer_id = PeerId([i; 32]);
547 let mut peer = CachedPeer::new(
548 peer_id,
549 vec![format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap()],
550 PeerSource::Seed,
551 );
552 peer.quality_score = i as f64 / 15.0;
553 cache.upsert(peer).await;
554 }
555
556 assert!(cache.peer_count().await <= 10);
558 }
559
560 #[tokio::test]
561 async fn test_stats() {
562 let temp_dir = TempDir::new().unwrap();
563 let cache = create_test_cache(&temp_dir).await;
564
565 let mut peer1 = CachedPeer::new(
567 PeerId([1; 32]),
568 vec!["127.0.0.1:9001".parse().unwrap()],
569 PeerSource::Seed,
570 );
571 peer1.capabilities.supports_relay = true;
572 cache.upsert(peer1).await;
573
574 let mut peer2 = CachedPeer::new(
575 PeerId([2; 32]),
576 vec!["127.0.0.1:9002".parse().unwrap()],
577 PeerSource::Seed,
578 );
579 peer2.capabilities.supports_coordination = true;
580 cache.upsert(peer2).await;
581
582 cache
583 .add_seed(PeerId([3; 32]), vec!["127.0.0.1:9003".parse().unwrap()])
584 .await;
585
586 let stats = cache.stats().await;
587 assert_eq!(stats.total_peers, 3);
588 assert_eq!(stats.relay_peers, 1);
589 assert_eq!(stats.coordinator_peers, 1);
590 assert_eq!(stats.untested_peers, 3);
591 }
592
593 #[tokio::test]
594 async fn test_select_relay_peers() {
595 let temp_dir = TempDir::new().unwrap();
596 let cache = create_test_cache(&temp_dir).await;
597
598 for i in 0..10u8 {
600 let mut peer = CachedPeer::new(
601 PeerId([i; 32]),
602 vec![format!("127.0.0.1:{}", 9000 + i as u16).parse().unwrap()],
603 PeerSource::Seed,
604 );
605 peer.capabilities.supports_relay = i % 2 == 0;
606 peer.quality_score = i as f64 / 10.0;
607 cache.upsert(peer).await;
608 }
609
610 let relays = cache.select_relay_peers(10).await;
611 assert_eq!(relays.len(), 5); for peer in &relays {
615 assert!(peer.capabilities.supports_relay);
616 }
617 }
618}