1use crate::error::BootstrapError;
24use crate::rate_limit::{JoinRateLimiter, JoinRateLimiterConfig};
25use crate::security::{IPDiversityConfig, IPDiversityEnforcer};
26use crate::{P2PError, PeerId, Result};
27use ant_quic::bootstrap_cache::{
28 BootstrapCache as AntBootstrapCache, BootstrapCacheConfig, CachedPeer, PeerCapabilities,
29};
30use parking_lot::Mutex;
31use serde::{Deserialize, Serialize};
32use std::net::{IpAddr, Ipv6Addr, SocketAddr};
33use std::path::PathBuf;
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::task::JoinHandle;
37use tracing::{info, warn};
38
39pub const DEFAULT_MAX_CONTACTS: usize = 30_000;
41pub const DEFAULT_MERGE_INTERVAL: Duration = Duration::from_secs(60);
42pub const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
43pub const DEFAULT_QUALITY_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct CacheConfig {
50 pub cache_dir: PathBuf,
52 pub max_contacts: usize,
54 pub merge_interval: Duration,
56 pub cleanup_interval: Duration,
58 pub quality_update_interval: Duration,
60 pub stale_threshold: Duration,
62 pub connectivity_check_interval: Duration,
64 pub connectivity_check_count: usize,
66}
67
68impl Default for CacheConfig {
69 fn default() -> Self {
70 Self {
71 cache_dir: PathBuf::from(".cache/saorsa"),
72 max_contacts: DEFAULT_MAX_CONTACTS,
73 merge_interval: DEFAULT_MERGE_INTERVAL,
74 cleanup_interval: DEFAULT_CLEANUP_INTERVAL,
75 quality_update_interval: DEFAULT_QUALITY_UPDATE_INTERVAL,
76 stale_threshold: Duration::from_secs(86400 * 7), connectivity_check_interval: Duration::from_secs(900), connectivity_check_count: 100,
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
85pub struct BootstrapConfig {
86 pub cache_dir: PathBuf,
88 pub max_peers: usize,
90 pub epsilon: f64,
92 pub rate_limit: JoinRateLimiterConfig,
94 pub diversity: IPDiversityConfig,
96}
97
98impl Default for BootstrapConfig {
99 fn default() -> Self {
100 Self {
101 cache_dir: default_cache_dir(),
102 max_peers: 20_000,
103 epsilon: 0.1,
104 rate_limit: JoinRateLimiterConfig::default(),
105 diversity: IPDiversityConfig::default(),
106 }
107 }
108}
109
110pub struct BootstrapManager {
115 cache: Arc<AntBootstrapCache>,
116 rate_limiter: JoinRateLimiter,
117 diversity_enforcer: Mutex<IPDiversityEnforcer>,
118 diversity_config: IPDiversityConfig,
119 maintenance_handle: Option<JoinHandle<()>>,
120}
121
122impl BootstrapManager {
123 pub async fn new() -> Result<Self> {
125 Self::with_config(BootstrapConfig::default()).await
126 }
127
128 pub async fn with_config(config: BootstrapConfig) -> Result<Self> {
130 let ant_config = BootstrapCacheConfig::builder()
131 .cache_dir(&config.cache_dir)
132 .max_peers(config.max_peers)
133 .epsilon(config.epsilon)
134 .build();
135
136 let cache = AntBootstrapCache::open(ant_config).await.map_err(|e| {
137 P2PError::Bootstrap(BootstrapError::CacheError(
138 format!("Failed to open bootstrap cache: {e}").into(),
139 ))
140 })?;
141
142 Ok(Self {
143 cache: Arc::new(cache),
144 rate_limiter: JoinRateLimiter::new(config.rate_limit),
145 diversity_enforcer: Mutex::new(IPDiversityEnforcer::new(config.diversity.clone())),
146 diversity_config: config.diversity,
147 maintenance_handle: None,
148 })
149 }
150
151 pub async fn with_full_config(
156 cache_config: CacheConfig,
157 rate_limit_config: JoinRateLimiterConfig,
158 diversity_config: IPDiversityConfig,
159 ) -> Result<Self> {
160 let config = BootstrapConfig {
161 cache_dir: cache_config.cache_dir,
162 max_peers: cache_config.max_contacts,
163 epsilon: 0.1, rate_limit: rate_limit_config,
165 diversity: diversity_config,
166 };
167 Self::with_config(config).await
168 }
169
170 pub fn start_maintenance(&mut self) -> Result<()> {
172 if self.maintenance_handle.is_some() {
173 return Ok(()); }
175
176 let handle = self.cache.clone().start_maintenance();
177 self.maintenance_handle = Some(handle);
178 info!("Started bootstrap cache maintenance tasks");
179 Ok(())
180 }
181
182 pub async fn add_peer(&self, peer_id: PeerId, addresses: Vec<SocketAddr>) -> Result<()> {
188 if addresses.is_empty() {
189 return Err(P2PError::Bootstrap(BootstrapError::InvalidData(
190 "No addresses provided".to_string().into(),
191 )));
192 }
193
194 let ip = addresses[0].ip();
195
196 self.rate_limiter.check_join_allowed(&ip).map_err(|e| {
198 warn!("Rate limit exceeded for {}: {}", ip, e);
199 P2PError::Bootstrap(BootstrapError::RateLimited(e.to_string().into()))
200 })?;
201
202 let ipv6 = ip_to_ipv6(&ip);
204 {
205 let mut diversity = self.diversity_enforcer.lock();
206 let analysis = diversity.analyze_ip(ipv6).map_err(|e| {
207 warn!("IP analysis failed for {}: {}", ip, e);
208 P2PError::Bootstrap(BootstrapError::InvalidData(
209 format!("IP analysis failed: {e}").into(),
210 ))
211 })?;
212
213 if !diversity.can_accept_node(&analysis) {
214 warn!("IP diversity limit exceeded for {}", ip);
215 return Err(P2PError::Bootstrap(BootstrapError::RateLimited(
216 "IP diversity limits exceeded".to_string().into(),
217 )));
218 }
219
220 if let Err(e) = diversity.add_node(&analysis) {
222 warn!("Failed to track IP diversity for {}: {}", ip, e);
223 }
224 } let ant_peer_id = string_to_ant_peer_id(&peer_id);
228 self.cache.add_seed(ant_peer_id, addresses).await;
229
230 Ok(())
231 }
232
233 pub async fn add_peer_trusted(&self, peer_id: PeerId, addresses: Vec<SocketAddr>) {
237 let ant_peer_id = string_to_ant_peer_id(&peer_id);
238 self.cache.add_seed(ant_peer_id, addresses).await;
239 }
240
241 pub async fn record_success(&self, peer_id: &PeerId, rtt_ms: u32) {
243 let ant_peer_id = string_to_ant_peer_id(peer_id);
244 self.cache.record_success(&ant_peer_id, rtt_ms).await;
245 }
246
247 pub async fn record_failure(&self, peer_id: &PeerId) {
249 let ant_peer_id = string_to_ant_peer_id(peer_id);
250 self.cache.record_failure(&ant_peer_id).await;
251 }
252
253 pub async fn select_peers(&self, count: usize) -> Vec<CachedPeer> {
255 self.cache.select_peers(count).await
256 }
257
258 pub async fn select_relay_peers(&self, count: usize) -> Vec<CachedPeer> {
260 self.cache.select_relay_peers(count).await
261 }
262
263 pub async fn select_coordinators(&self, count: usize) -> Vec<CachedPeer> {
265 self.cache.select_coordinators(count).await
266 }
267
268 pub async fn stats(&self) -> BootstrapStats {
270 let ant_stats = self.cache.stats().await;
271 BootstrapStats {
272 total_peers: ant_stats.total_peers,
273 relay_peers: ant_stats.relay_peers,
274 coordinator_peers: ant_stats.coordinator_peers,
275 average_quality: ant_stats.average_quality,
276 untested_peers: ant_stats.untested_peers,
277 }
278 }
279
280 pub async fn peer_count(&self) -> usize {
282 self.cache.peer_count().await
283 }
284
285 pub async fn save(&self) -> Result<()> {
287 self.cache.save().await.map_err(|e| {
288 P2PError::Bootstrap(BootstrapError::CacheError(
289 format!("Failed to save cache: {e}").into(),
290 ))
291 })
292 }
293
294 pub async fn update_capabilities(&self, peer_id: &PeerId, capabilities: PeerCapabilities) {
296 let ant_peer_id = string_to_ant_peer_id(peer_id);
297 self.cache
298 .update_capabilities(&ant_peer_id, capabilities)
299 .await;
300 }
301
302 pub async fn contains(&self, peer_id: &PeerId) -> bool {
304 let ant_peer_id = string_to_ant_peer_id(peer_id);
305 self.cache.contains(&ant_peer_id).await
306 }
307
308 pub async fn get_peer(&self, peer_id: &PeerId) -> Option<CachedPeer> {
310 let ant_peer_id = string_to_ant_peer_id(peer_id);
311 self.cache.get(&ant_peer_id).await
312 }
313
314 pub async fn add_contact(&self, contact: super::ContactEntry) -> Result<()> {
323 self.add_peer(contact.peer_id, contact.addresses).await
324 }
325
326 pub async fn add_contact_trusted(&self, contact: super::ContactEntry) {
328 self.add_peer_trusted(contact.peer_id, contact.addresses)
329 .await;
330 }
331
332 pub async fn update_contact_metrics(
336 &self,
337 peer_id: &PeerId,
338 metrics: super::QualityMetrics,
339 ) -> Result<()> {
340 if metrics.success_rate >= 0.5 {
344 let rtt_ms = metrics.avg_latency_ms as u32;
345 self.record_success(peer_id, rtt_ms).await;
346 } else {
347 self.record_failure(peer_id).await;
348 }
349 Ok(())
350 }
351
352 pub async fn get_stats(&self) -> Result<super::CacheStats> {
356 let stats = self.stats().await;
357 Ok(super::CacheStats {
358 total_contacts: stats.total_peers,
359 high_quality_contacts: stats.relay_peers + stats.coordinator_peers,
360 verified_contacts: stats.total_peers - stats.untested_peers,
361 average_quality_score: stats.average_quality,
362 cache_hit_rate: 0.0, last_cleanup: chrono::Utc::now(),
364 last_merge: chrono::Utc::now(),
365 iroh_contacts: stats.total_peers, nat_traversal_contacts: stats.coordinator_peers,
368 avg_iroh_setup_time_ms: 0.0,
369 preferred_iroh_connection_type: None,
370 })
371 }
372
373 pub async fn start_background_tasks(&mut self) -> Result<()> {
377 self.start_maintenance()
378 }
379
380 pub async fn get_bootstrap_peers(&self, count: usize) -> Result<Vec<super::ContactEntry>> {
384 let peers = self.select_peers(count).await;
385 let contacts: Vec<super::ContactEntry> = peers
386 .into_iter()
387 .map(|cached| {
388 let peer_id_str = hex::encode(&cached.peer_id.0[..8]);
391 super::ContactEntry::new(peer_id_str, cached.addresses.clone())
392 })
393 .collect();
394 Ok(contacts)
395 }
396
397 pub async fn get_quic_bootstrap_peers(&self, count: usize) -> Result<Vec<super::ContactEntry>> {
399 self.get_bootstrap_peers(count).await
401 }
402}
403
404impl BootstrapManager {
405 pub fn diversity_config(&self) -> &IPDiversityConfig {
407 &self.diversity_config
408 }
409}
410
411#[derive(Debug, Clone, Default)]
413pub struct BootstrapStats {
414 pub total_peers: usize,
416 pub relay_peers: usize,
418 pub coordinator_peers: usize,
420 pub average_quality: f64,
422 pub untested_peers: usize,
424}
425
426fn string_to_ant_peer_id(peer_id: &str) -> ant_quic::nat_traversal_api::PeerId {
428 use sha2::{Digest, Sha256};
429 let mut hasher = Sha256::new();
430 hasher.update(peer_id.as_bytes());
431 let result = hasher.finalize();
432 let mut bytes = [0u8; 32];
433 bytes.copy_from_slice(&result);
434 ant_quic::nat_traversal_api::PeerId(bytes)
435}
436
437fn ip_to_ipv6(ip: &IpAddr) -> Ipv6Addr {
439 match ip {
440 IpAddr::V4(v4) => v4.to_ipv6_mapped(),
441 IpAddr::V6(v6) => *v6,
442 }
443}
444
445fn default_cache_dir() -> PathBuf {
447 if let Some(cache_dir) = dirs::cache_dir() {
448 cache_dir.join("saorsa").join("bootstrap")
449 } else if let Some(home) = dirs::home_dir() {
450 home.join(".cache").join("saorsa").join("bootstrap")
451 } else {
452 PathBuf::from(".saorsa-bootstrap-cache")
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459 use tempfile::TempDir;
460
461 fn test_config(temp_dir: &TempDir) -> BootstrapConfig {
463 BootstrapConfig {
464 cache_dir: temp_dir.path().to_path_buf(),
465 max_peers: 100,
466 epsilon: 0.0, rate_limit: JoinRateLimiterConfig::default(),
468 diversity: IPDiversityConfig::default(),
469 }
470 }
471
472 #[tokio::test]
473 async fn test_manager_creation() {
474 let temp_dir = TempDir::new().unwrap();
475 let config = test_config(&temp_dir);
476
477 let manager = BootstrapManager::with_config(config).await;
478 assert!(manager.is_ok());
479
480 let manager = manager.unwrap();
481 assert_eq!(manager.peer_count().await, 0);
482 }
483
484 #[tokio::test]
485 async fn test_add_and_get_peer() {
486 let temp_dir = TempDir::new().unwrap();
487 let config = test_config(&temp_dir);
488 let manager = BootstrapManager::with_config(config).await.unwrap();
489
490 let peer_id = "test-peer-1".to_string();
491 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
492
493 let result = manager.add_peer(peer_id.clone(), vec![addr]).await;
495 assert!(result.is_ok());
496
497 assert_eq!(manager.peer_count().await, 1);
499 assert!(manager.contains(&peer_id).await);
500 }
501
502 #[tokio::test]
503 async fn test_add_peer_no_addresses_fails() {
504 let temp_dir = TempDir::new().unwrap();
505 let config = test_config(&temp_dir);
506 let manager = BootstrapManager::with_config(config).await.unwrap();
507
508 let peer_id = "test-peer-1".to_string();
509 let result = manager.add_peer(peer_id, vec![]).await;
510
511 assert!(result.is_err());
512 assert!(matches!(
513 result.unwrap_err(),
514 P2PError::Bootstrap(BootstrapError::InvalidData(_))
515 ));
516 }
517
518 #[tokio::test]
519 async fn test_add_trusted_peer_bypasses_checks() {
520 let temp_dir = TempDir::new().unwrap();
521 let config = test_config(&temp_dir);
522 let manager = BootstrapManager::with_config(config).await.unwrap();
523
524 let peer_id = "trusted-peer".to_string();
525 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
526
527 manager.add_peer_trusted(peer_id.clone(), vec![addr]).await;
529
530 assert_eq!(manager.peer_count().await, 1);
531 assert!(manager.contains(&peer_id).await);
532 }
533
534 #[tokio::test]
535 async fn test_record_success_updates_quality() {
536 let temp_dir = TempDir::new().unwrap();
537 let config = test_config(&temp_dir);
538 let manager = BootstrapManager::with_config(config).await.unwrap();
539
540 let peer_id = "test-peer".to_string();
541 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
542 manager.add_peer_trusted(peer_id.clone(), vec![addr]).await;
543
544 let initial_peer = manager.get_peer(&peer_id).await.unwrap();
546 let initial_quality = initial_peer.quality_score;
547
548 for _ in 0..5 {
550 manager.record_success(&peer_id, 50).await;
551 }
552
553 let updated_peer = manager.get_peer(&peer_id).await.unwrap();
555 assert!(
556 updated_peer.quality_score >= initial_quality,
557 "Quality should improve after successes"
558 );
559 }
560
561 #[tokio::test]
562 async fn test_record_failure_decreases_quality() {
563 let temp_dir = TempDir::new().unwrap();
564 let config = test_config(&temp_dir);
565 let manager = BootstrapManager::with_config(config).await.unwrap();
566
567 let peer_id = "test-peer".to_string();
568 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
569 manager.add_peer_trusted(peer_id.clone(), vec![addr]).await;
570
571 for _ in 0..3 {
573 manager.record_success(&peer_id, 50).await;
574 }
575 let good_peer = manager.get_peer(&peer_id).await.unwrap();
576 let good_quality = good_peer.quality_score;
577
578 for _ in 0..5 {
580 manager.record_failure(&peer_id).await;
581 }
582
583 let bad_peer = manager.get_peer(&peer_id).await.unwrap();
585 assert!(
586 bad_peer.quality_score < good_quality,
587 "Quality should decrease after failures"
588 );
589 }
590
591 #[tokio::test]
592 async fn test_select_peers_returns_best() {
593 let temp_dir = TempDir::new().unwrap();
594 let config = test_config(&temp_dir);
595 let manager = BootstrapManager::with_config(config).await.unwrap();
596
597 for i in 0..10 {
599 let peer_id = format!("peer-{}", i);
600 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
601 manager.add_peer_trusted(peer_id.clone(), vec![addr]).await;
602
603 for _ in 0..i {
605 manager.record_success(&peer_id, 50).await;
606 }
607 }
608
609 let selected = manager.select_peers(5).await;
611 assert_eq!(selected.len(), 5);
612
613 for i in 0..4 {
615 assert!(
616 selected[i].quality_score >= selected[i + 1].quality_score,
617 "Peers should be sorted by quality"
618 );
619 }
620 }
621
622 #[tokio::test]
623 async fn test_stats() {
624 let temp_dir = TempDir::new().unwrap();
625 let config = test_config(&temp_dir);
626 let manager = BootstrapManager::with_config(config).await.unwrap();
627
628 for i in 0..5 {
630 let peer_id = format!("peer-{}", i);
631 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
632 manager.add_peer_trusted(peer_id, vec![addr]).await;
633 }
634
635 let stats = manager.stats().await;
636 assert_eq!(stats.total_peers, 5);
637 assert_eq!(stats.untested_peers, 5); }
639
640 #[tokio::test]
641 async fn test_persistence() {
642 let temp_dir = TempDir::new().unwrap();
643 let cache_path = temp_dir.path().to_path_buf();
644
645 {
647 let config = BootstrapConfig {
648 cache_dir: cache_path.clone(),
649 max_peers: 100,
650 epsilon: 0.0,
651 rate_limit: JoinRateLimiterConfig::default(),
652 diversity: IPDiversityConfig::default(),
653 };
654 let manager = BootstrapManager::with_config(config).await.unwrap();
655 let peer_id = "persistent-peer".to_string();
656 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
657 manager.add_peer_trusted(peer_id, vec![addr]).await;
658
659 let count_before = manager.peer_count().await;
661 assert_eq!(count_before, 1, "Peer should be in cache before save");
662
663 manager.save().await.unwrap();
665
666 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
668 }
669
670 {
672 let config = BootstrapConfig {
673 cache_dir: cache_path,
674 max_peers: 100,
675 epsilon: 0.0,
676 rate_limit: JoinRateLimiterConfig::default(),
677 diversity: IPDiversityConfig::default(),
678 };
679 let manager = BootstrapManager::with_config(config).await.unwrap();
680 let count = manager.peer_count().await;
681
682 if count == 0 {
685 eprintln!("Note: ant-quic BootstrapCache may have different persistence behavior");
688 }
689 }
692 }
693
694 #[tokio::test]
695 async fn test_rate_limiting() {
696 let temp_dir = TempDir::new().unwrap();
697
698 let diversity_config = IPDiversityConfig {
701 max_nodes_per_64: 100,
702 max_nodes_per_48: 100,
703 max_nodes_per_32: 100,
704 max_nodes_per_ipv4_32: 100, max_nodes_per_ipv4_24: 100,
706 max_nodes_per_ipv4_16: 100,
707 max_per_ip_cap: 100,
708 max_network_fraction: 1.0,
709 max_nodes_per_asn: 1000,
710 enable_geolocation_check: false,
711 min_geographic_diversity: 0,
712 };
713
714 let config = BootstrapConfig {
715 cache_dir: temp_dir.path().to_path_buf(),
716 max_peers: 100,
717 epsilon: 0.0,
718 rate_limit: JoinRateLimiterConfig {
719 max_joins_per_64_per_hour: 100, max_joins_per_48_per_hour: 100, max_joins_per_24_per_hour: 2, max_global_joins_per_minute: 100,
723 global_burst_size: 10,
724 },
725 diversity: diversity_config,
726 };
727
728 let manager = BootstrapManager::with_config(config).await.unwrap();
729
730 for i in 0..2 {
732 let peer_id = format!("peer-{}", i);
733 let addr: SocketAddr = format!("192.168.1.{}:{}", 10 + i, 9000 + i)
734 .parse()
735 .unwrap();
736 let result = manager.add_peer(peer_id, vec![addr]).await;
737 assert!(
738 result.is_ok(),
739 "First 2 peers should be allowed: {:?}",
740 result
741 );
742 }
743
744 let peer_id = "peer-blocked".to_string();
746 let addr: SocketAddr = "192.168.1.100:9100".parse().unwrap();
747 let result = manager.add_peer(peer_id, vec![addr]).await;
748 assert!(result.is_err(), "Third peer should be rate limited");
749 assert!(matches!(
750 result.unwrap_err(),
751 P2PError::Bootstrap(BootstrapError::RateLimited(_))
752 ));
753 }
754
755 #[tokio::test]
756 async fn test_peer_id_hashing_deterministic() {
757 let peer_id = "test-peer-123";
759 let ant_id_1 = string_to_ant_peer_id(peer_id);
760 let ant_id_2 = string_to_ant_peer_id(peer_id);
761 assert_eq!(ant_id_1.0, ant_id_2.0);
762
763 let other_id = "other-peer-456";
765 let ant_id_3 = string_to_ant_peer_id(other_id);
766 assert_ne!(ant_id_1.0, ant_id_3.0);
767 }
768}