1use crate::error::BootstrapError;
24use crate::rate_limit::{JoinRateLimiter, JoinRateLimiterConfig};
25use crate::security::{IPDiversityConfig, IPDiversityEnforcer};
26use crate::{P2PError, Result};
27use parking_lot::Mutex;
28use saorsa_transport::bootstrap_cache::{
29 BootstrapCache as AntBootstrapCache, BootstrapCacheConfig, CachedPeer, PeerCapabilities,
30};
31use std::net::SocketAddr;
32use std::path::PathBuf;
33use std::sync::Arc;
34use tokio::task::JoinHandle;
35use tracing::{info, warn};
36
37#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
39pub struct BootstrapConfig {
40 pub cache_dir: PathBuf,
42 pub max_peers: usize,
44 pub epsilon: f64,
46 pub rate_limit: JoinRateLimiterConfig,
48 pub diversity: IPDiversityConfig,
50}
51
52impl Default for BootstrapConfig {
53 fn default() -> Self {
54 Self {
55 cache_dir: default_cache_dir(),
56 max_peers: 20_000,
57 epsilon: 0.1,
58 rate_limit: JoinRateLimiterConfig::default(),
59 diversity: IPDiversityConfig::default(),
60 }
61 }
62}
63
64pub struct BootstrapManager {
69 cache: Arc<AntBootstrapCache>,
70 rate_limiter: JoinRateLimiter,
71 diversity_enforcer: Mutex<IPDiversityEnforcer>,
72 diversity_config: IPDiversityConfig,
73 maintenance_handle: Option<JoinHandle<()>>,
74}
75
76impl BootstrapManager {
77 async fn with_config_and_loopback(
78 config: BootstrapConfig,
79 allow_loopback: bool,
80 ) -> Result<Self> {
81 let ant_config = BootstrapCacheConfig::builder()
82 .cache_dir(&config.cache_dir)
83 .max_peers(config.max_peers)
84 .epsilon(config.epsilon)
85 .build();
86
87 let cache = AntBootstrapCache::open(ant_config).await.map_err(|e| {
88 P2PError::Bootstrap(BootstrapError::CacheError(
89 format!("Failed to open bootstrap cache: {e}").into(),
90 ))
91 })?;
92
93 Ok(Self {
94 cache: Arc::new(cache),
95 rate_limiter: JoinRateLimiter::new(config.rate_limit),
96 diversity_enforcer: Mutex::new(IPDiversityEnforcer::with_loopback(
97 config.diversity.clone(),
98 allow_loopback,
99 )),
100 diversity_config: config.diversity,
101 maintenance_handle: None,
102 })
103 }
104
105 pub async fn new() -> Result<Self> {
107 Self::with_config(BootstrapConfig::default()).await
108 }
109
110 pub async fn with_config(config: BootstrapConfig) -> Result<Self> {
112 Self::with_config_and_loopback(config, false).await
113 }
114
115 pub async fn with_node_config(
121 mut config: BootstrapConfig,
122 node_config: &crate::network::NodeConfig,
123 ) -> Result<Self> {
124 if let Some(ref diversity) = node_config.diversity_config {
125 config.diversity = diversity.clone();
126 }
127 Self::with_config_and_loopback(config, node_config.allow_loopback).await
128 }
129
130 pub fn start_maintenance(&mut self) -> Result<()> {
132 if self.maintenance_handle.is_some() {
133 return Ok(()); }
135
136 let handle = self.cache.clone().start_maintenance();
137 self.maintenance_handle = Some(handle);
138 info!("Started bootstrap cache maintenance tasks");
139 Ok(())
140 }
141
142 pub async fn add_peer(&self, addr: &SocketAddr, addresses: Vec<SocketAddr>) -> Result<()> {
148 if addresses.is_empty() {
149 return Err(P2PError::Bootstrap(BootstrapError::InvalidData(
150 "No addresses provided".to_string().into(),
151 )));
152 }
153
154 let ip = addr.ip();
155
156 self.rate_limiter.check_join_allowed(&ip).map_err(|e| {
158 warn!("Rate limit exceeded for {}: {}", ip, e);
159 P2PError::Bootstrap(BootstrapError::RateLimited(e.to_string().into()))
160 })?;
161
162 let ipv6 = super::ip_to_ipv6(&ip);
164 {
165 let mut diversity = self.diversity_enforcer.lock();
166 let analysis = diversity.analyze_ip(ipv6).map_err(|e| {
167 warn!("IP analysis failed for {}: {}", ip, e);
168 P2PError::Bootstrap(BootstrapError::InvalidData(
169 format!("IP analysis failed: {e}").into(),
170 ))
171 })?;
172
173 if !diversity.can_accept_node(&analysis) {
174 warn!("IP diversity limit exceeded for {}", ip);
175 return Err(P2PError::Bootstrap(BootstrapError::RateLimited(
176 "IP diversity limits exceeded".to_string().into(),
177 )));
178 }
179
180 if let Err(e) = diversity.add_node(&analysis) {
182 warn!("Failed to track IP diversity for {}: {}", ip, e);
183 }
184 } self.cache.add_seed(*addr, addresses).await;
188
189 Ok(())
190 }
191
192 pub async fn add_peer_trusted(&self, addr: &SocketAddr, addresses: Vec<SocketAddr>) {
196 self.cache.add_seed(*addr, addresses).await;
197 }
198
199 pub async fn record_success(&self, addr: &SocketAddr, rtt_ms: u32) {
201 self.cache.record_success(addr, rtt_ms).await;
202 }
203
204 pub async fn record_failure(&self, addr: &SocketAddr) {
206 self.cache.record_failure(addr).await;
207 }
208
209 pub async fn select_peers(&self, count: usize) -> Vec<CachedPeer> {
211 self.cache.select_peers(count).await
212 }
213
214 pub async fn select_relay_peers(&self, count: usize) -> Vec<CachedPeer> {
216 self.cache.select_relay_peers(count).await
217 }
218
219 pub async fn select_coordinators(&self, count: usize) -> Vec<CachedPeer> {
221 self.cache.select_coordinators(count).await
222 }
223
224 pub async fn stats(&self) -> BootstrapStats {
226 let ant_stats = self.cache.stats().await;
227 BootstrapStats {
228 total_peers: ant_stats.total_peers,
229 relay_peers: ant_stats.relay_peers,
230 coordinator_peers: ant_stats.coordinator_peers,
231 average_quality: ant_stats.average_quality,
232 untested_peers: ant_stats.untested_peers,
233 }
234 }
235
236 pub async fn peer_count(&self) -> usize {
238 self.cache.peer_count().await
239 }
240
241 pub async fn save(&self) -> Result<()> {
243 self.cache.save().await.map_err(|e| {
244 P2PError::Bootstrap(BootstrapError::CacheError(
245 format!("Failed to save cache: {e}").into(),
246 ))
247 })
248 }
249
250 pub async fn update_capabilities(&self, addr: &SocketAddr, capabilities: PeerCapabilities) {
252 self.cache.update_capabilities(addr, capabilities).await;
253 }
254
255 pub async fn contains(&self, addr: &SocketAddr) -> bool {
257 self.cache.contains(addr).await
258 }
259
260 pub async fn get_peer(&self, addr: &SocketAddr) -> Option<CachedPeer> {
262 self.cache.get(addr).await
263 }
264
265 pub fn diversity_config(&self) -> &IPDiversityConfig {
267 &self.diversity_config
268 }
269}
270
271#[derive(Debug, Clone, Default)]
273pub struct BootstrapStats {
274 pub total_peers: usize,
276 pub relay_peers: usize,
278 pub coordinator_peers: usize,
280 pub average_quality: f64,
282 pub untested_peers: usize,
284}
285
286fn default_cache_dir() -> PathBuf {
288 if let Some(cache_dir) = dirs::cache_dir() {
289 cache_dir.join("saorsa").join("bootstrap")
290 } else if let Some(home) = dirs::home_dir() {
291 home.join(".cache").join("saorsa").join("bootstrap")
292 } else {
293 PathBuf::from(".saorsa-bootstrap-cache")
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use tempfile::TempDir;
301
302 fn test_config(temp_dir: &TempDir) -> BootstrapConfig {
304 BootstrapConfig {
305 cache_dir: temp_dir.path().to_path_buf(),
306 max_peers: 100,
307 epsilon: 0.0, rate_limit: JoinRateLimiterConfig::default(),
309 diversity: IPDiversityConfig::default(),
310 }
311 }
312
313 #[tokio::test]
314 async fn test_manager_creation() {
315 let temp_dir = TempDir::new().unwrap();
316 let config = test_config(&temp_dir);
317
318 let manager = BootstrapManager::with_config(config).await;
319 assert!(manager.is_ok());
320
321 let manager = manager.unwrap();
322 assert_eq!(manager.peer_count().await, 0);
323 }
324
325 #[tokio::test]
326 async fn test_add_and_get_peer() {
327 let temp_dir = TempDir::new().unwrap();
328 let config = test_config(&temp_dir);
329 let manager = BootstrapManager::with_config(config).await.unwrap();
330
331 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
332
333 let result = manager.add_peer(&addr, vec![addr]).await;
335 assert!(result.is_ok());
336
337 assert_eq!(manager.peer_count().await, 1);
339 assert!(manager.contains(&addr).await);
340 }
341
342 #[tokio::test]
343 async fn test_add_peer_no_addresses_fails() {
344 let temp_dir = TempDir::new().unwrap();
345 let config = test_config(&temp_dir);
346 let manager = BootstrapManager::with_config(config).await.unwrap();
347
348 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
349 let result = manager.add_peer(&addr, vec![]).await;
350
351 assert!(result.is_err());
352 assert!(matches!(
353 result.unwrap_err(),
354 P2PError::Bootstrap(BootstrapError::InvalidData(_))
355 ));
356 }
357
358 #[tokio::test]
359 async fn test_add_trusted_peer_bypasses_checks() {
360 let temp_dir = TempDir::new().unwrap();
361 let config = test_config(&temp_dir);
362 let manager = BootstrapManager::with_config(config).await.unwrap();
363
364 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
365
366 manager.add_peer_trusted(&addr, vec![addr]).await;
368
369 assert_eq!(manager.peer_count().await, 1);
370 assert!(manager.contains(&addr).await);
371 }
372
373 #[tokio::test]
374 async fn test_record_success_updates_quality() {
375 let temp_dir = TempDir::new().unwrap();
376 let config = test_config(&temp_dir);
377 let manager = BootstrapManager::with_config(config).await.unwrap();
378
379 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
380 manager.add_peer_trusted(&addr, vec![addr]).await;
381
382 let initial_peer = manager.get_peer(&addr).await.unwrap();
384 let initial_quality = initial_peer.quality_score;
385
386 for _ in 0..5 {
388 manager.record_success(&addr, 50).await;
389 }
390
391 let updated_peer = manager.get_peer(&addr).await.unwrap();
393 assert!(
394 updated_peer.quality_score >= initial_quality,
395 "Quality should improve after successes"
396 );
397 }
398
399 #[tokio::test]
400 async fn test_record_failure_decreases_quality() {
401 let temp_dir = TempDir::new().unwrap();
402 let config = test_config(&temp_dir);
403 let manager = BootstrapManager::with_config(config).await.unwrap();
404
405 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
406 manager.add_peer_trusted(&addr, vec![addr]).await;
407
408 for _ in 0..3 {
410 manager.record_success(&addr, 50).await;
411 }
412 let good_peer = manager.get_peer(&addr).await.unwrap();
413 let good_quality = good_peer.quality_score;
414
415 for _ in 0..5 {
417 manager.record_failure(&addr).await;
418 }
419
420 let bad_peer = manager.get_peer(&addr).await.unwrap();
422 assert!(
423 bad_peer.quality_score < good_quality,
424 "Quality should decrease after failures"
425 );
426 }
427
428 #[tokio::test]
429 async fn test_select_peers_returns_best() {
430 let temp_dir = TempDir::new().unwrap();
431 let config = test_config(&temp_dir);
432 let manager = BootstrapManager::with_config(config).await.unwrap();
433
434 for i in 0..10 {
436 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
437 manager.add_peer_trusted(&addr, vec![addr]).await;
438
439 for _ in 0..i {
441 manager.record_success(&addr, 50).await;
442 }
443 }
444
445 let selected = manager.select_peers(5).await;
447 assert_eq!(selected.len(), 5);
448
449 for i in 0..4 {
451 assert!(
452 selected[i].quality_score >= selected[i + 1].quality_score,
453 "Peers should be sorted by quality"
454 );
455 }
456 }
457
458 #[tokio::test]
459 async fn test_stats() {
460 let temp_dir = TempDir::new().unwrap();
461 let config = test_config(&temp_dir);
462 let manager = BootstrapManager::with_config(config).await.unwrap();
463
464 for i in 0..5 {
466 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
467 manager.add_peer_trusted(&addr, vec![addr]).await;
468 }
469
470 let stats = manager.stats().await;
471 assert_eq!(stats.total_peers, 5);
472 assert_eq!(stats.untested_peers, 5); }
474
475 #[tokio::test]
476 async fn test_persistence() {
477 let temp_dir = TempDir::new().unwrap();
478 let cache_path = temp_dir.path().to_path_buf();
479
480 {
482 let config = BootstrapConfig {
483 cache_dir: cache_path.clone(),
484 max_peers: 100,
485 epsilon: 0.0,
486 rate_limit: JoinRateLimiterConfig::default(),
487 diversity: IPDiversityConfig::default(),
488 };
489 let manager = BootstrapManager::with_config(config).await.unwrap();
490 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
491 manager.add_peer_trusted(&addr, vec![addr]).await;
492
493 let count_before = manager.peer_count().await;
495 assert_eq!(count_before, 1, "Peer should be in cache before save");
496
497 manager.save().await.unwrap();
499
500 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
502 }
503
504 {
506 let config = BootstrapConfig {
507 cache_dir: cache_path,
508 max_peers: 100,
509 epsilon: 0.0,
510 rate_limit: JoinRateLimiterConfig::default(),
511 diversity: IPDiversityConfig::default(),
512 };
513 let manager = BootstrapManager::with_config(config).await.unwrap();
514 let count = manager.peer_count().await;
515
516 if count == 0 {
519 eprintln!(
522 "Note: saorsa-transport BootstrapCache may have different persistence behavior"
523 );
524 }
525 }
528 }
529
530 #[tokio::test]
531 async fn test_rate_limiting() {
532 let temp_dir = TempDir::new().unwrap();
533
534 let diversity_config = IPDiversityConfig {
537 max_nodes_per_ipv6_64: 100,
538 max_nodes_per_ipv6_48: 100,
539 max_nodes_per_ipv6_32: 100,
540 max_nodes_per_ipv4_32: None, max_nodes_per_ipv4_24: None,
542 max_nodes_per_ipv4_16: None,
543 ipv4_limit_floor: None,
544 ipv4_limit_ceiling: None,
545 ipv6_limit_floor: None,
546 ipv6_limit_ceiling: None,
547 max_per_ip_cap: 100,
548 max_network_fraction: 1.0,
549 max_nodes_per_asn: 1000,
550 enable_geolocation_check: false,
551 min_geographic_diversity: 0,
552 };
553
554 let config = BootstrapConfig {
555 cache_dir: temp_dir.path().to_path_buf(),
556 max_peers: 100,
557 epsilon: 0.0,
558 rate_limit: JoinRateLimiterConfig {
559 max_joins_per_64_per_hour: 100, max_joins_per_48_per_hour: 100, max_joins_per_24_per_hour: 2, max_global_joins_per_minute: 100,
563 global_burst_size: 10,
564 },
565 diversity: diversity_config,
566 };
567
568 let manager = BootstrapManager::with_config(config).await.unwrap();
569
570 for i in 0..2 {
572 let addr: SocketAddr = format!("192.168.1.{}:{}", 10 + i, 9000 + i)
573 .parse()
574 .unwrap();
575 let result = manager.add_peer(&addr, vec![addr]).await;
576 assert!(
577 result.is_ok(),
578 "First 2 peers should be allowed: {:?}",
579 result
580 );
581 }
582
583 let addr: SocketAddr = "192.168.1.100:9100".parse().unwrap();
585 let result = manager.add_peer(&addr, vec![addr]).await;
586 assert!(result.is_err(), "Third peer should be rate limited");
587 assert!(matches!(
588 result.unwrap_err(),
589 P2PError::Bootstrap(BootstrapError::RateLimited(_))
590 ));
591 }
592}