1use crate::error::BootstrapError;
24use crate::rate_limit::{JoinRateLimiter, JoinRateLimiterConfig};
25use crate::security::{IPDiversityConfig, IPDiversityEnforcer};
26use crate::{P2PError, Result};
27use parking_lot::Mutex;
28use saorsa_transport::bootstrap_cache::{
29 BootstrapCache as AntBootstrapCache, BootstrapCacheConfig, CachedPeer, PeerCapabilities,
30};
31use std::net::SocketAddr;
32use std::path::PathBuf;
33use std::sync::Arc;
34use tokio::task::JoinHandle;
35
36#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
38pub struct BootstrapConfig {
39 pub cache_dir: PathBuf,
41 pub max_peers: usize,
43 pub epsilon: f64,
45 pub rate_limit: JoinRateLimiterConfig,
47 pub diversity: IPDiversityConfig,
49}
50
51impl Default for BootstrapConfig {
52 fn default() -> Self {
53 Self {
54 cache_dir: default_cache_dir(),
55 max_peers: 20_000,
56 epsilon: 0.1,
57 rate_limit: JoinRateLimiterConfig::default(),
58 diversity: IPDiversityConfig::default(),
59 }
60 }
61}
62
63pub struct BootstrapManager {
68 cache: Arc<AntBootstrapCache>,
69 rate_limiter: JoinRateLimiter,
70 diversity_enforcer: Mutex<IPDiversityEnforcer>,
71 diversity_config: IPDiversityConfig,
72 maintenance_handle: Option<JoinHandle<()>>,
73}
74
75impl BootstrapManager {
76 async fn with_config_and_loopback(
77 config: BootstrapConfig,
78 allow_loopback: bool,
79 ) -> Result<Self> {
80 let ant_config = BootstrapCacheConfig::builder()
81 .cache_dir(&config.cache_dir)
82 .max_peers(config.max_peers)
83 .epsilon(config.epsilon)
84 .build();
85
86 let cache = AntBootstrapCache::open(ant_config).await.map_err(|e| {
87 P2PError::Bootstrap(BootstrapError::CacheError(
88 format!("Failed to open bootstrap cache: {e}").into(),
89 ))
90 })?;
91
92 Ok(Self {
93 cache: Arc::new(cache),
94 rate_limiter: JoinRateLimiter::new(config.rate_limit),
95 diversity_enforcer: Mutex::new(IPDiversityEnforcer::with_loopback(
96 config.diversity.clone(),
97 allow_loopback,
98 )),
99 diversity_config: config.diversity,
100 maintenance_handle: None,
101 })
102 }
103
104 pub async fn new() -> Result<Self> {
106 Self::with_config(BootstrapConfig::default()).await
107 }
108
109 pub async fn with_config(config: BootstrapConfig) -> Result<Self> {
111 Self::with_config_and_loopback(config, false).await
112 }
113
114 pub async fn with_node_config(
120 mut config: BootstrapConfig,
121 node_config: &crate::network::NodeConfig,
122 ) -> Result<Self> {
123 if let Some(ref diversity) = node_config.diversity_config {
124 config.diversity = diversity.clone();
125 }
126 Self::with_config_and_loopback(config, node_config.allow_loopback).await
127 }
128
129 pub fn start_maintenance(&mut self) -> Result<()> {
131 if self.maintenance_handle.is_some() {
132 return Ok(()); }
134
135 let handle = self.cache.clone().start_maintenance();
136 self.maintenance_handle = Some(handle);
137 info!("Started bootstrap cache maintenance tasks");
138 Ok(())
139 }
140
141 pub async fn add_peer(&self, addr: &SocketAddr, addresses: Vec<SocketAddr>) -> Result<()> {
147 if addresses.is_empty() {
148 return Err(P2PError::Bootstrap(BootstrapError::InvalidData(
149 "No addresses provided".to_string().into(),
150 )));
151 }
152
153 let ip = addr.ip();
154
155 self.rate_limiter.check_join_allowed(&ip).map_err(|e| {
157 warn!("Rate limit exceeded for {}: {}", ip, e);
158 P2PError::Bootstrap(BootstrapError::RateLimited(e.to_string().into()))
159 })?;
160
161 let ipv6 = super::ip_to_ipv6(&ip);
163 {
164 let mut diversity = self.diversity_enforcer.lock();
165 let analysis = diversity.analyze_ip(ipv6).map_err(|e| {
166 warn!("IP analysis failed for {}: {}", ip, e);
167 P2PError::Bootstrap(BootstrapError::InvalidData(
168 format!("IP analysis failed: {e}").into(),
169 ))
170 })?;
171
172 if !diversity.can_accept_node(&analysis) {
173 warn!("IP diversity limit exceeded for {}", ip);
174 return Err(P2PError::Bootstrap(BootstrapError::RateLimited(
175 "IP diversity limits exceeded".to_string().into(),
176 )));
177 }
178
179 if let Err(_e) = diversity.add_node(&analysis) {
181 warn!("Failed to track IP diversity for {}: {}", ip, _e);
182 }
183 } self.cache.add_seed(*addr, addresses).await;
187
188 Ok(())
189 }
190
191 pub async fn add_peer_trusted(&self, addr: &SocketAddr, addresses: Vec<SocketAddr>) {
195 self.cache.add_seed(*addr, addresses).await;
196 }
197
198 pub async fn record_success(&self, addr: &SocketAddr, rtt_ms: u32) {
200 self.cache.record_success(addr, rtt_ms).await;
201 }
202
203 pub async fn record_failure(&self, addr: &SocketAddr) {
205 self.cache.record_failure(addr).await;
206 }
207
208 pub async fn select_peers(&self, count: usize) -> Vec<CachedPeer> {
210 self.cache.select_peers(count).await
211 }
212
213 pub async fn select_relay_peers(&self, count: usize) -> Vec<CachedPeer> {
215 self.cache.select_relay_peers(count).await
216 }
217
218 pub async fn select_coordinators(&self, count: usize) -> Vec<CachedPeer> {
220 self.cache.select_coordinators(count).await
221 }
222
223 pub async fn stats(&self) -> BootstrapStats {
225 let ant_stats = self.cache.stats().await;
226 BootstrapStats {
227 total_peers: ant_stats.total_peers,
228 relay_peers: ant_stats.relay_peers,
229 coordinator_peers: ant_stats.coordinator_peers,
230 average_quality: ant_stats.average_quality,
231 untested_peers: ant_stats.untested_peers,
232 }
233 }
234
235 pub async fn peer_count(&self) -> usize {
237 self.cache.peer_count().await
238 }
239
240 pub async fn save(&self) -> Result<()> {
242 self.cache.save().await.map_err(|e| {
243 P2PError::Bootstrap(BootstrapError::CacheError(
244 format!("Failed to save cache: {e}").into(),
245 ))
246 })
247 }
248
249 pub async fn update_capabilities(&self, addr: &SocketAddr, capabilities: PeerCapabilities) {
251 self.cache.update_capabilities(addr, capabilities).await;
252 }
253
254 pub async fn contains(&self, addr: &SocketAddr) -> bool {
256 self.cache.contains(addr).await
257 }
258
259 pub async fn get_peer(&self, addr: &SocketAddr) -> Option<CachedPeer> {
261 self.cache.get(addr).await
262 }
263
264 pub fn diversity_config(&self) -> &IPDiversityConfig {
266 &self.diversity_config
267 }
268}
269
270#[derive(Debug, Clone, Default)]
272pub struct BootstrapStats {
273 pub total_peers: usize,
275 pub relay_peers: usize,
277 pub coordinator_peers: usize,
279 pub average_quality: f64,
281 pub untested_peers: usize,
283}
284
285fn default_cache_dir() -> PathBuf {
287 if let Some(cache_dir) = dirs::cache_dir() {
288 cache_dir.join("saorsa").join("bootstrap")
289 } else if let Some(home) = dirs::home_dir() {
290 home.join(".cache").join("saorsa").join("bootstrap")
291 } else {
292 PathBuf::from(".saorsa-bootstrap-cache")
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use tempfile::TempDir;
300
301 fn test_config(temp_dir: &TempDir) -> BootstrapConfig {
303 BootstrapConfig {
304 cache_dir: temp_dir.path().to_path_buf(),
305 max_peers: 100,
306 epsilon: 0.0, rate_limit: JoinRateLimiterConfig::default(),
308 diversity: IPDiversityConfig::default(),
309 }
310 }
311
312 #[tokio::test]
313 async fn test_manager_creation() {
314 let temp_dir = TempDir::new().unwrap();
315 let config = test_config(&temp_dir);
316
317 let manager = BootstrapManager::with_config(config).await;
318 assert!(manager.is_ok());
319
320 let manager = manager.unwrap();
321 assert_eq!(manager.peer_count().await, 0);
322 }
323
324 #[tokio::test]
325 async fn test_add_and_get_peer() {
326 let temp_dir = TempDir::new().unwrap();
327 let config = test_config(&temp_dir);
328 let manager = BootstrapManager::with_config(config).await.unwrap();
329
330 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
331
332 let result = manager.add_peer(&addr, vec![addr]).await;
334 assert!(result.is_ok());
335
336 assert_eq!(manager.peer_count().await, 1);
338 assert!(manager.contains(&addr).await);
339 }
340
341 #[tokio::test]
342 async fn test_add_peer_no_addresses_fails() {
343 let temp_dir = TempDir::new().unwrap();
344 let config = test_config(&temp_dir);
345 let manager = BootstrapManager::with_config(config).await.unwrap();
346
347 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
348 let result = manager.add_peer(&addr, vec![]).await;
349
350 assert!(result.is_err());
351 assert!(matches!(
352 result.unwrap_err(),
353 P2PError::Bootstrap(BootstrapError::InvalidData(_))
354 ));
355 }
356
357 #[tokio::test]
358 async fn test_add_trusted_peer_bypasses_checks() {
359 let temp_dir = TempDir::new().unwrap();
360 let config = test_config(&temp_dir);
361 let manager = BootstrapManager::with_config(config).await.unwrap();
362
363 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
364
365 manager.add_peer_trusted(&addr, vec![addr]).await;
367
368 assert_eq!(manager.peer_count().await, 1);
369 assert!(manager.contains(&addr).await);
370 }
371
372 #[tokio::test]
373 async fn test_record_success_updates_quality() {
374 let temp_dir = TempDir::new().unwrap();
375 let config = test_config(&temp_dir);
376 let manager = BootstrapManager::with_config(config).await.unwrap();
377
378 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
379 manager.add_peer_trusted(&addr, vec![addr]).await;
380
381 let initial_peer = manager.get_peer(&addr).await.unwrap();
383 let initial_quality = initial_peer.quality_score;
384
385 for _ in 0..5 {
387 manager.record_success(&addr, 50).await;
388 }
389
390 let updated_peer = manager.get_peer(&addr).await.unwrap();
392 assert!(
393 updated_peer.quality_score >= initial_quality,
394 "Quality should improve after successes"
395 );
396 }
397
398 #[tokio::test]
399 async fn test_record_failure_decreases_quality() {
400 let temp_dir = TempDir::new().unwrap();
401 let config = test_config(&temp_dir);
402 let manager = BootstrapManager::with_config(config).await.unwrap();
403
404 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
405 manager.add_peer_trusted(&addr, vec![addr]).await;
406
407 for _ in 0..3 {
409 manager.record_success(&addr, 50).await;
410 }
411 let good_peer = manager.get_peer(&addr).await.unwrap();
412 let good_quality = good_peer.quality_score;
413
414 for _ in 0..5 {
416 manager.record_failure(&addr).await;
417 }
418
419 let bad_peer = manager.get_peer(&addr).await.unwrap();
421 assert!(
422 bad_peer.quality_score < good_quality,
423 "Quality should decrease after failures"
424 );
425 }
426
427 #[tokio::test]
428 async fn test_select_peers_returns_best() {
429 let temp_dir = TempDir::new().unwrap();
430 let config = test_config(&temp_dir);
431 let manager = BootstrapManager::with_config(config).await.unwrap();
432
433 for i in 0..10 {
435 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
436 manager.add_peer_trusted(&addr, vec![addr]).await;
437
438 for _ in 0..i {
440 manager.record_success(&addr, 50).await;
441 }
442 }
443
444 let selected = manager.select_peers(5).await;
446 assert_eq!(selected.len(), 5);
447
448 for i in 0..4 {
450 assert!(
451 selected[i].quality_score >= selected[i + 1].quality_score,
452 "Peers should be sorted by quality"
453 );
454 }
455 }
456
457 #[tokio::test]
458 async fn test_stats() {
459 let temp_dir = TempDir::new().unwrap();
460 let config = test_config(&temp_dir);
461 let manager = BootstrapManager::with_config(config).await.unwrap();
462
463 for i in 0..5 {
465 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
466 manager.add_peer_trusted(&addr, vec![addr]).await;
467 }
468
469 let stats = manager.stats().await;
470 assert_eq!(stats.total_peers, 5);
471 assert_eq!(stats.untested_peers, 5); }
473
474 #[tokio::test]
475 async fn test_persistence() {
476 let temp_dir = TempDir::new().unwrap();
477 let cache_path = temp_dir.path().to_path_buf();
478
479 {
481 let config = BootstrapConfig {
482 cache_dir: cache_path.clone(),
483 max_peers: 100,
484 epsilon: 0.0,
485 rate_limit: JoinRateLimiterConfig::default(),
486 diversity: IPDiversityConfig::default(),
487 };
488 let manager = BootstrapManager::with_config(config).await.unwrap();
489 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
490 manager.add_peer_trusted(&addr, vec![addr]).await;
491
492 let count_before = manager.peer_count().await;
494 assert_eq!(count_before, 1, "Peer should be in cache before save");
495
496 manager.save().await.unwrap();
498
499 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
501 }
502
503 {
505 let config = BootstrapConfig {
506 cache_dir: cache_path,
507 max_peers: 100,
508 epsilon: 0.0,
509 rate_limit: JoinRateLimiterConfig::default(),
510 diversity: IPDiversityConfig::default(),
511 };
512 let manager = BootstrapManager::with_config(config).await.unwrap();
513 let count = manager.peer_count().await;
514
515 if count == 0 {
518 eprintln!(
521 "Note: saorsa-transport BootstrapCache may have different persistence behavior"
522 );
523 }
524 }
527 }
528
529 #[tokio::test]
530 async fn test_rate_limiting() {
531 let temp_dir = TempDir::new().unwrap();
532
533 let diversity_config = IPDiversityConfig {
536 max_nodes_per_ipv6_64: 100,
537 max_nodes_per_ipv6_48: 100,
538 max_nodes_per_ipv6_32: 100,
539 max_nodes_per_ipv4_32: None, max_nodes_per_ipv4_24: None,
541 max_nodes_per_ipv4_16: None,
542 ipv4_limit_floor: None,
543 ipv4_limit_ceiling: None,
544 ipv6_limit_floor: None,
545 ipv6_limit_ceiling: None,
546 max_per_ip_cap: 100,
547 max_network_fraction: 1.0,
548 max_nodes_per_asn: 1000,
549 enable_geolocation_check: false,
550 min_geographic_diversity: 0,
551 };
552
553 let config = BootstrapConfig {
554 cache_dir: temp_dir.path().to_path_buf(),
555 max_peers: 100,
556 epsilon: 0.0,
557 rate_limit: JoinRateLimiterConfig {
558 max_joins_per_64_per_hour: 100, max_joins_per_48_per_hour: 100, max_joins_per_24_per_hour: 2, max_global_joins_per_minute: 100,
562 global_burst_size: 10,
563 },
564 diversity: diversity_config,
565 };
566
567 let manager = BootstrapManager::with_config(config).await.unwrap();
568
569 for i in 0..2 {
571 let addr: SocketAddr = format!("192.168.1.{}:{}", 10 + i, 9000 + i)
572 .parse()
573 .unwrap();
574 let result = manager.add_peer(&addr, vec![addr]).await;
575 assert!(
576 result.is_ok(),
577 "First 2 peers should be allowed: {:?}",
578 result
579 );
580 }
581
582 let addr: SocketAddr = "192.168.1.100:9100".parse().unwrap();
584 let result = manager.add_peer(&addr, vec![addr]).await;
585 assert!(result.is_err(), "Third peer should be rate limited");
586 assert!(matches!(
587 result.unwrap_err(),
588 P2PError::Bootstrap(BootstrapError::RateLimited(_))
589 ));
590 }
591}