1use crate::error::BootstrapError;
24use crate::rate_limit::{JoinRateLimiter, JoinRateLimiterConfig};
25use crate::security::{IPDiversityConfig, IPDiversityEnforcer};
26use crate::{P2PError, PeerId, Result};
27use ant_quic::bootstrap_cache::{
28 BootstrapCache as AntBootstrapCache, BootstrapCacheConfig, CachedPeer, PeerCapabilities,
29};
30use parking_lot::Mutex;
31use serde::{Deserialize, Serialize};
32use std::net::{IpAddr, Ipv6Addr, SocketAddr};
33use std::path::PathBuf;
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::task::JoinHandle;
37use tracing::{info, warn};
38
39pub const DEFAULT_MAX_CONTACTS: usize = 30_000;
41pub const DEFAULT_MERGE_INTERVAL: Duration = Duration::from_secs(60);
42pub const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
43pub const DEFAULT_QUALITY_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct CacheConfig {
50 pub cache_dir: PathBuf,
52 pub max_contacts: usize,
54 pub merge_interval: Duration,
56 pub cleanup_interval: Duration,
58 pub quality_update_interval: Duration,
60 pub stale_threshold: Duration,
62 pub connectivity_check_interval: Duration,
64 pub connectivity_check_count: usize,
66}
67
68impl Default for CacheConfig {
69 fn default() -> Self {
70 Self {
71 cache_dir: PathBuf::from(".cache/saorsa"),
72 max_contacts: DEFAULT_MAX_CONTACTS,
73 merge_interval: DEFAULT_MERGE_INTERVAL,
74 cleanup_interval: DEFAULT_CLEANUP_INTERVAL,
75 quality_update_interval: DEFAULT_QUALITY_UPDATE_INTERVAL,
76 stale_threshold: Duration::from_secs(86400 * 7), connectivity_check_interval: Duration::from_secs(900), connectivity_check_count: 100,
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
85pub struct BootstrapConfig {
86 pub cache_dir: PathBuf,
88 pub max_peers: usize,
90 pub epsilon: f64,
92 pub rate_limit: JoinRateLimiterConfig,
94 pub diversity: IPDiversityConfig,
96}
97
98impl Default for BootstrapConfig {
99 fn default() -> Self {
100 Self {
101 cache_dir: default_cache_dir(),
102 max_peers: 20_000,
103 epsilon: 0.1,
104 rate_limit: JoinRateLimiterConfig::default(),
105 diversity: IPDiversityConfig::default(),
106 }
107 }
108}
109
110pub struct BootstrapManager {
115 cache: Arc<AntBootstrapCache>,
116 rate_limiter: JoinRateLimiter,
117 diversity_enforcer: Mutex<IPDiversityEnforcer>,
118 diversity_config: IPDiversityConfig,
119 maintenance_handle: Option<JoinHandle<()>>,
120}
121
122impl BootstrapManager {
123 pub async fn new() -> Result<Self> {
125 Self::with_config(BootstrapConfig::default()).await
126 }
127
128 pub async fn with_config(config: BootstrapConfig) -> Result<Self> {
130 let ant_config = BootstrapCacheConfig::builder()
131 .cache_dir(&config.cache_dir)
132 .max_peers(config.max_peers)
133 .epsilon(config.epsilon)
134 .build();
135
136 let cache = AntBootstrapCache::open(ant_config).await.map_err(|e| {
137 P2PError::Bootstrap(BootstrapError::CacheError(
138 format!("Failed to open bootstrap cache: {e}").into(),
139 ))
140 })?;
141
142 Ok(Self {
143 cache: Arc::new(cache),
144 rate_limiter: JoinRateLimiter::new(config.rate_limit),
145 diversity_enforcer: Mutex::new(IPDiversityEnforcer::new(config.diversity.clone())),
146 diversity_config: config.diversity,
147 maintenance_handle: None,
148 })
149 }
150
151 pub async fn with_full_config(
156 cache_config: CacheConfig,
157 rate_limit_config: JoinRateLimiterConfig,
158 diversity_config: IPDiversityConfig,
159 ) -> Result<Self> {
160 let config = BootstrapConfig {
161 cache_dir: cache_config.cache_dir,
162 max_peers: cache_config.max_contacts,
163 epsilon: 0.1, rate_limit: rate_limit_config,
165 diversity: diversity_config,
166 };
167 Self::with_config(config).await
168 }
169
170 pub fn start_maintenance(&mut self) -> Result<()> {
172 if self.maintenance_handle.is_some() {
173 return Ok(()); }
175
176 let handle = self.cache.clone().start_maintenance();
177 self.maintenance_handle = Some(handle);
178 info!("Started bootstrap cache maintenance tasks");
179 Ok(())
180 }
181
182 pub async fn add_peer(&self, peer_id: PeerId, addresses: Vec<SocketAddr>) -> Result<()> {
188 if addresses.is_empty() {
189 return Err(P2PError::Bootstrap(BootstrapError::InvalidData(
190 "No addresses provided".to_string().into(),
191 )));
192 }
193
194 let ip = addresses
195 .first()
196 .ok_or_else(|| {
197 P2PError::Bootstrap(BootstrapError::InvalidData(
198 "No addresses provided".to_string().into(),
199 ))
200 })?
201 .ip();
202
203 self.rate_limiter.check_join_allowed(&ip).map_err(|e| {
205 warn!("Rate limit exceeded for {}: {}", ip, e);
206 P2PError::Bootstrap(BootstrapError::RateLimited(e.to_string().into()))
207 })?;
208
209 let ipv6 = ip_to_ipv6(&ip);
211 {
212 let mut diversity = self.diversity_enforcer.lock();
213 let analysis = diversity.analyze_ip(ipv6).map_err(|e| {
214 warn!("IP analysis failed for {}: {}", ip, e);
215 P2PError::Bootstrap(BootstrapError::InvalidData(
216 format!("IP analysis failed: {e}").into(),
217 ))
218 })?;
219
220 if !diversity.can_accept_node(&analysis) {
221 warn!("IP diversity limit exceeded for {}", ip);
222 return Err(P2PError::Bootstrap(BootstrapError::RateLimited(
223 "IP diversity limits exceeded".to_string().into(),
224 )));
225 }
226
227 if let Err(e) = diversity.add_node(&analysis) {
229 warn!("Failed to track IP diversity for {}: {}", ip, e);
230 }
231 } let ant_peer_id = string_to_ant_peer_id(&peer_id);
235 self.cache.add_seed(ant_peer_id, addresses).await;
236
237 Ok(())
238 }
239
240 pub async fn add_peer_trusted(&self, peer_id: PeerId, addresses: Vec<SocketAddr>) {
244 let ant_peer_id = string_to_ant_peer_id(&peer_id);
245 self.cache.add_seed(ant_peer_id, addresses).await;
246 }
247
248 pub async fn record_success(&self, peer_id: &PeerId, rtt_ms: u32) {
250 let ant_peer_id = string_to_ant_peer_id(peer_id);
251 self.cache.record_success(&ant_peer_id, rtt_ms).await;
252 }
253
254 pub async fn record_failure(&self, peer_id: &PeerId) {
256 let ant_peer_id = string_to_ant_peer_id(peer_id);
257 self.cache.record_failure(&ant_peer_id).await;
258 }
259
260 pub async fn select_peers(&self, count: usize) -> Vec<CachedPeer> {
262 self.cache.select_peers(count).await
263 }
264
265 pub async fn select_relay_peers(&self, count: usize) -> Vec<CachedPeer> {
267 self.cache.select_relay_peers(count).await
268 }
269
270 pub async fn select_coordinators(&self, count: usize) -> Vec<CachedPeer> {
272 self.cache.select_coordinators(count).await
273 }
274
275 pub async fn stats(&self) -> BootstrapStats {
277 let ant_stats = self.cache.stats().await;
278 BootstrapStats {
279 total_peers: ant_stats.total_peers,
280 relay_peers: ant_stats.relay_peers,
281 coordinator_peers: ant_stats.coordinator_peers,
282 average_quality: ant_stats.average_quality,
283 untested_peers: ant_stats.untested_peers,
284 }
285 }
286
287 pub async fn peer_count(&self) -> usize {
289 self.cache.peer_count().await
290 }
291
292 pub async fn save(&self) -> Result<()> {
294 self.cache.save().await.map_err(|e| {
295 P2PError::Bootstrap(BootstrapError::CacheError(
296 format!("Failed to save cache: {e}").into(),
297 ))
298 })
299 }
300
301 pub async fn update_capabilities(&self, peer_id: &PeerId, capabilities: PeerCapabilities) {
303 let ant_peer_id = string_to_ant_peer_id(peer_id);
304 self.cache
305 .update_capabilities(&ant_peer_id, capabilities)
306 .await;
307 }
308
309 pub async fn contains(&self, peer_id: &PeerId) -> bool {
311 let ant_peer_id = string_to_ant_peer_id(peer_id);
312 self.cache.contains(&ant_peer_id).await
313 }
314
315 pub async fn get_peer(&self, peer_id: &PeerId) -> Option<CachedPeer> {
317 let ant_peer_id = string_to_ant_peer_id(peer_id);
318 self.cache.get(&ant_peer_id).await
319 }
320
321 pub async fn add_contact(&self, contact: super::ContactEntry) -> Result<()> {
330 self.add_peer(contact.peer_id, contact.addresses).await
331 }
332
333 pub async fn add_contact_trusted(&self, contact: super::ContactEntry) {
335 self.add_peer_trusted(contact.peer_id, contact.addresses)
336 .await;
337 }
338
339 pub async fn update_contact_metrics(
343 &self,
344 peer_id: &PeerId,
345 metrics: super::QualityMetrics,
346 ) -> Result<()> {
347 if metrics.success_rate >= 0.5 {
351 let rtt_ms = metrics.avg_latency_ms as u32;
352 self.record_success(peer_id, rtt_ms).await;
353 } else {
354 self.record_failure(peer_id).await;
355 }
356 Ok(())
357 }
358
359 pub async fn get_stats(&self) -> Result<super::CacheStats> {
363 let stats = self.stats().await;
364 Ok(super::CacheStats {
365 total_contacts: stats.total_peers,
366 high_quality_contacts: stats.relay_peers + stats.coordinator_peers,
367 verified_contacts: stats.total_peers - stats.untested_peers,
368 average_quality_score: stats.average_quality,
369 cache_hit_rate: 0.0, last_cleanup: chrono::Utc::now(),
371 last_merge: chrono::Utc::now(),
372 iroh_contacts: stats.total_peers, nat_traversal_contacts: stats.coordinator_peers,
375 avg_iroh_setup_time_ms: 0.0,
376 preferred_iroh_connection_type: None,
377 })
378 }
379
380 pub async fn start_background_tasks(&mut self) -> Result<()> {
384 self.start_maintenance()
385 }
386
387 pub async fn get_bootstrap_peers(&self, count: usize) -> Result<Vec<super::ContactEntry>> {
391 let peers = self.select_peers(count).await;
392 let contacts: Vec<super::ContactEntry> = peers
393 .into_iter()
394 .map(|cached| {
395 let peer_id_str = hex::encode(&cached.peer_id.0[..8]);
398 super::ContactEntry::new(peer_id_str, cached.addresses.clone())
399 })
400 .collect();
401 Ok(contacts)
402 }
403
404 pub async fn get_quic_bootstrap_peers(&self, count: usize) -> Result<Vec<super::ContactEntry>> {
406 self.get_bootstrap_peers(count).await
408 }
409}
410
411impl BootstrapManager {
412 pub fn diversity_config(&self) -> &IPDiversityConfig {
414 &self.diversity_config
415 }
416}
417
418#[derive(Debug, Clone, Default)]
420pub struct BootstrapStats {
421 pub total_peers: usize,
423 pub relay_peers: usize,
425 pub coordinator_peers: usize,
427 pub average_quality: f64,
429 pub untested_peers: usize,
431}
432
433fn string_to_ant_peer_id(peer_id: &str) -> ant_quic::nat_traversal_api::PeerId {
435 use sha2::{Digest, Sha256};
436 let mut hasher = Sha256::new();
437 hasher.update(peer_id.as_bytes());
438 let result = hasher.finalize();
439 let mut bytes = [0u8; 32];
440 bytes.copy_from_slice(&result);
441 ant_quic::nat_traversal_api::PeerId(bytes)
442}
443
444fn ip_to_ipv6(ip: &IpAddr) -> Ipv6Addr {
446 match ip {
447 IpAddr::V4(v4) => v4.to_ipv6_mapped(),
448 IpAddr::V6(v6) => *v6,
449 }
450}
451
452fn default_cache_dir() -> PathBuf {
454 if let Some(cache_dir) = dirs::cache_dir() {
455 cache_dir.join("saorsa").join("bootstrap")
456 } else if let Some(home) = dirs::home_dir() {
457 home.join(".cache").join("saorsa").join("bootstrap")
458 } else {
459 PathBuf::from(".saorsa-bootstrap-cache")
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466 use tempfile::TempDir;
467
468 fn test_config(temp_dir: &TempDir) -> BootstrapConfig {
470 BootstrapConfig {
471 cache_dir: temp_dir.path().to_path_buf(),
472 max_peers: 100,
473 epsilon: 0.0, rate_limit: JoinRateLimiterConfig::default(),
475 diversity: IPDiversityConfig::default(),
476 }
477 }
478
479 #[tokio::test]
480 async fn test_manager_creation() {
481 let temp_dir = TempDir::new().unwrap();
482 let config = test_config(&temp_dir);
483
484 let manager = BootstrapManager::with_config(config).await;
485 assert!(manager.is_ok());
486
487 let manager = manager.unwrap();
488 assert_eq!(manager.peer_count().await, 0);
489 }
490
491 #[tokio::test]
492 async fn test_add_and_get_peer() {
493 let temp_dir = TempDir::new().unwrap();
494 let config = test_config(&temp_dir);
495 let manager = BootstrapManager::with_config(config).await.unwrap();
496
497 let peer_id = "test-peer-1".to_string();
498 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
499
500 let result = manager.add_peer(peer_id.clone(), vec![addr]).await;
502 assert!(result.is_ok());
503
504 assert_eq!(manager.peer_count().await, 1);
506 assert!(manager.contains(&peer_id).await);
507 }
508
509 #[tokio::test]
510 async fn test_add_peer_no_addresses_fails() {
511 let temp_dir = TempDir::new().unwrap();
512 let config = test_config(&temp_dir);
513 let manager = BootstrapManager::with_config(config).await.unwrap();
514
515 let peer_id = "test-peer-1".to_string();
516 let result = manager.add_peer(peer_id, vec![]).await;
517
518 assert!(result.is_err());
519 assert!(matches!(
520 result.unwrap_err(),
521 P2PError::Bootstrap(BootstrapError::InvalidData(_))
522 ));
523 }
524
525 #[tokio::test]
526 async fn test_add_trusted_peer_bypasses_checks() {
527 let temp_dir = TempDir::new().unwrap();
528 let config = test_config(&temp_dir);
529 let manager = BootstrapManager::with_config(config).await.unwrap();
530
531 let peer_id = "trusted-peer".to_string();
532 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
533
534 manager.add_peer_trusted(peer_id.clone(), vec![addr]).await;
536
537 assert_eq!(manager.peer_count().await, 1);
538 assert!(manager.contains(&peer_id).await);
539 }
540
541 #[tokio::test]
542 async fn test_record_success_updates_quality() {
543 let temp_dir = TempDir::new().unwrap();
544 let config = test_config(&temp_dir);
545 let manager = BootstrapManager::with_config(config).await.unwrap();
546
547 let peer_id = "test-peer".to_string();
548 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
549 manager.add_peer_trusted(peer_id.clone(), vec![addr]).await;
550
551 let initial_peer = manager.get_peer(&peer_id).await.unwrap();
553 let initial_quality = initial_peer.quality_score;
554
555 for _ in 0..5 {
557 manager.record_success(&peer_id, 50).await;
558 }
559
560 let updated_peer = manager.get_peer(&peer_id).await.unwrap();
562 assert!(
563 updated_peer.quality_score >= initial_quality,
564 "Quality should improve after successes"
565 );
566 }
567
568 #[tokio::test]
569 async fn test_record_failure_decreases_quality() {
570 let temp_dir = TempDir::new().unwrap();
571 let config = test_config(&temp_dir);
572 let manager = BootstrapManager::with_config(config).await.unwrap();
573
574 let peer_id = "test-peer".to_string();
575 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
576 manager.add_peer_trusted(peer_id.clone(), vec![addr]).await;
577
578 for _ in 0..3 {
580 manager.record_success(&peer_id, 50).await;
581 }
582 let good_peer = manager.get_peer(&peer_id).await.unwrap();
583 let good_quality = good_peer.quality_score;
584
585 for _ in 0..5 {
587 manager.record_failure(&peer_id).await;
588 }
589
590 let bad_peer = manager.get_peer(&peer_id).await.unwrap();
592 assert!(
593 bad_peer.quality_score < good_quality,
594 "Quality should decrease after failures"
595 );
596 }
597
598 #[tokio::test]
599 async fn test_select_peers_returns_best() {
600 let temp_dir = TempDir::new().unwrap();
601 let config = test_config(&temp_dir);
602 let manager = BootstrapManager::with_config(config).await.unwrap();
603
604 for i in 0..10 {
606 let peer_id = format!("peer-{}", i);
607 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
608 manager.add_peer_trusted(peer_id.clone(), vec![addr]).await;
609
610 for _ in 0..i {
612 manager.record_success(&peer_id, 50).await;
613 }
614 }
615
616 let selected = manager.select_peers(5).await;
618 assert_eq!(selected.len(), 5);
619
620 for i in 0..4 {
622 assert!(
623 selected[i].quality_score >= selected[i + 1].quality_score,
624 "Peers should be sorted by quality"
625 );
626 }
627 }
628
629 #[tokio::test]
630 async fn test_stats() {
631 let temp_dir = TempDir::new().unwrap();
632 let config = test_config(&temp_dir);
633 let manager = BootstrapManager::with_config(config).await.unwrap();
634
635 for i in 0..5 {
637 let peer_id = format!("peer-{}", i);
638 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
639 manager.add_peer_trusted(peer_id, vec![addr]).await;
640 }
641
642 let stats = manager.stats().await;
643 assert_eq!(stats.total_peers, 5);
644 assert_eq!(stats.untested_peers, 5); }
646
647 #[tokio::test]
648 async fn test_persistence() {
649 let temp_dir = TempDir::new().unwrap();
650 let cache_path = temp_dir.path().to_path_buf();
651
652 {
654 let config = BootstrapConfig {
655 cache_dir: cache_path.clone(),
656 max_peers: 100,
657 epsilon: 0.0,
658 rate_limit: JoinRateLimiterConfig::default(),
659 diversity: IPDiversityConfig::default(),
660 };
661 let manager = BootstrapManager::with_config(config).await.unwrap();
662 let peer_id = "persistent-peer".to_string();
663 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
664 manager.add_peer_trusted(peer_id, vec![addr]).await;
665
666 let count_before = manager.peer_count().await;
668 assert_eq!(count_before, 1, "Peer should be in cache before save");
669
670 manager.save().await.unwrap();
672
673 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
675 }
676
677 {
679 let config = BootstrapConfig {
680 cache_dir: cache_path,
681 max_peers: 100,
682 epsilon: 0.0,
683 rate_limit: JoinRateLimiterConfig::default(),
684 diversity: IPDiversityConfig::default(),
685 };
686 let manager = BootstrapManager::with_config(config).await.unwrap();
687 let count = manager.peer_count().await;
688
689 if count == 0 {
692 eprintln!("Note: ant-quic BootstrapCache may have different persistence behavior");
695 }
696 }
699 }
700
701 #[tokio::test]
702 async fn test_rate_limiting() {
703 let temp_dir = TempDir::new().unwrap();
704
705 let diversity_config = IPDiversityConfig {
708 max_nodes_per_64: 100,
709 max_nodes_per_48: 100,
710 max_nodes_per_32: 100,
711 max_nodes_per_ipv4_32: 100, max_nodes_per_ipv4_24: 100,
713 max_nodes_per_ipv4_16: 100,
714 max_per_ip_cap: 100,
715 max_network_fraction: 1.0,
716 max_nodes_per_asn: 1000,
717 enable_geolocation_check: false,
718 min_geographic_diversity: 0,
719 };
720
721 let config = BootstrapConfig {
722 cache_dir: temp_dir.path().to_path_buf(),
723 max_peers: 100,
724 epsilon: 0.0,
725 rate_limit: JoinRateLimiterConfig {
726 max_joins_per_64_per_hour: 100, max_joins_per_48_per_hour: 100, max_joins_per_24_per_hour: 2, max_global_joins_per_minute: 100,
730 global_burst_size: 10,
731 },
732 diversity: diversity_config,
733 };
734
735 let manager = BootstrapManager::with_config(config).await.unwrap();
736
737 for i in 0..2 {
739 let peer_id = format!("peer-{}", i);
740 let addr: SocketAddr = format!("192.168.1.{}:{}", 10 + i, 9000 + i)
741 .parse()
742 .unwrap();
743 let result = manager.add_peer(peer_id, vec![addr]).await;
744 assert!(
745 result.is_ok(),
746 "First 2 peers should be allowed: {:?}",
747 result
748 );
749 }
750
751 let peer_id = "peer-blocked".to_string();
753 let addr: SocketAddr = "192.168.1.100:9100".parse().unwrap();
754 let result = manager.add_peer(peer_id, vec![addr]).await;
755 assert!(result.is_err(), "Third peer should be rate limited");
756 assert!(matches!(
757 result.unwrap_err(),
758 P2PError::Bootstrap(BootstrapError::RateLimited(_))
759 ));
760 }
761
762 #[tokio::test]
763 async fn test_peer_id_hashing_deterministic() {
764 let peer_id = "test-peer-123";
766 let ant_id_1 = string_to_ant_peer_id(peer_id);
767 let ant_id_2 = string_to_ant_peer_id(peer_id);
768 assert_eq!(ant_id_1.0, ant_id_2.0);
769
770 let other_id = "other-peer-456";
772 let ant_id_3 = string_to_ant_peer_id(other_id);
773 assert_ne!(ant_id_1.0, ant_id_3.0);
774 }
775}