1use crate::error::BootstrapError;
24use crate::network::DHTConfig;
25use crate::rate_limit::{JoinRateLimiter, JoinRateLimiterConfig};
26use crate::security::{BootstrapIpLimiter, IPDiversityConfig};
27use crate::{P2PError, Result};
28use parking_lot::Mutex;
29use saorsa_transport::bootstrap_cache::{
30 BootstrapCache as AntBootstrapCache, BootstrapCacheConfig, CachedPeer, PeerCapabilities,
31};
32use std::net::SocketAddr;
33use std::path::PathBuf;
34use std::sync::Arc;
35use tokio::task::JoinHandle;
36use tracing::{info, warn};
37
38#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
40pub struct BootstrapConfig {
41 pub cache_dir: PathBuf,
43 pub max_peers: usize,
45 pub epsilon: f64,
47 pub rate_limit: JoinRateLimiterConfig,
49 pub diversity: IPDiversityConfig,
51}
52
53impl Default for BootstrapConfig {
54 fn default() -> Self {
55 Self {
56 cache_dir: default_cache_dir(),
57 max_peers: 20_000,
58 epsilon: 0.1,
59 rate_limit: JoinRateLimiterConfig::default(),
60 diversity: IPDiversityConfig::default(),
61 }
62 }
63}
64
65pub struct BootstrapManager {
70 cache: Arc<AntBootstrapCache>,
71 rate_limiter: JoinRateLimiter,
72 ip_limiter: Mutex<BootstrapIpLimiter>,
73 diversity_config: IPDiversityConfig,
74 maintenance_handle: Option<JoinHandle<()>>,
75}
76
77impl BootstrapManager {
78 async fn with_config_loopback_and_k(
79 config: BootstrapConfig,
80 allow_loopback: bool,
81 k_value: usize,
82 ) -> Result<Self> {
83 let ant_config = BootstrapCacheConfig::builder()
84 .cache_dir(&config.cache_dir)
85 .max_peers(config.max_peers)
86 .epsilon(config.epsilon)
87 .build();
88
89 let cache = AntBootstrapCache::open(ant_config).await.map_err(|e| {
90 P2PError::Bootstrap(BootstrapError::CacheError(
91 format!("Failed to open bootstrap cache: {e}").into(),
92 ))
93 })?;
94
95 Ok(Self {
96 cache: Arc::new(cache),
97 rate_limiter: JoinRateLimiter::new(config.rate_limit),
98 ip_limiter: Mutex::new(BootstrapIpLimiter::with_loopback_and_k(
99 config.diversity.clone(),
100 allow_loopback,
101 k_value,
102 )),
103 diversity_config: config.diversity,
104 maintenance_handle: None,
105 })
106 }
107
108 pub async fn new() -> Result<Self> {
110 Self::with_config(BootstrapConfig::default()).await
111 }
112
113 pub async fn with_config(config: BootstrapConfig) -> Result<Self> {
115 Self::with_config_loopback_and_k(config, false, DHTConfig::DEFAULT_K_VALUE).await
116 }
117
118 pub async fn with_node_config(
125 mut config: BootstrapConfig,
126 node_config: &crate::network::NodeConfig,
127 ) -> Result<Self> {
128 if let Some(ref diversity) = node_config.diversity_config {
129 config.diversity = diversity.clone();
130 }
131 Self::with_config_loopback_and_k(
132 config,
133 node_config.allow_loopback,
134 node_config.dht_config.k_value,
135 )
136 .await
137 }
138
139 pub fn start_maintenance(&mut self) -> Result<()> {
141 if self.maintenance_handle.is_some() {
142 return Ok(()); }
144
145 let handle = self.cache.clone().start_maintenance();
146 self.maintenance_handle = Some(handle);
147 info!("Started bootstrap cache maintenance tasks");
148 Ok(())
149 }
150
151 pub async fn add_peer(&self, addr: &SocketAddr, addresses: Vec<SocketAddr>) -> Result<()> {
157 if addresses.is_empty() {
158 return Err(P2PError::Bootstrap(BootstrapError::InvalidData(
159 "No addresses provided".to_string().into(),
160 )));
161 }
162
163 let ip = addr.ip();
164
165 self.rate_limiter.check_join_allowed(&ip).map_err(|e| {
167 warn!("Rate limit exceeded for {}: {}", ip, e);
168 P2PError::Bootstrap(BootstrapError::RateLimited(e.to_string().into()))
169 })?;
170
171 {
173 let mut diversity = self.ip_limiter.lock();
174 if !diversity.can_accept(ip) {
175 warn!("IP diversity limit exceeded for {}", ip);
176 return Err(P2PError::Bootstrap(BootstrapError::RateLimited(
177 "IP diversity limits exceeded".to_string().into(),
178 )));
179 }
180
181 if let Err(e) = diversity.track(ip) {
183 warn!("Failed to track IP diversity for {}: {}", ip, e);
184 }
185 } self.cache.add_seed(*addr, addresses).await;
189
190 Ok(())
191 }
192
193 pub async fn add_peer_trusted(&self, addr: &SocketAddr, addresses: Vec<SocketAddr>) {
197 self.cache.add_seed(*addr, addresses).await;
198 }
199
200 pub async fn record_success(&self, addr: &SocketAddr, rtt_ms: u32) {
202 self.cache.record_success(addr, rtt_ms).await;
203 }
204
205 pub async fn record_failure(&self, addr: &SocketAddr) {
207 self.cache.record_failure(addr).await;
208 }
209
210 pub async fn select_peers(&self, count: usize) -> Vec<CachedPeer> {
212 self.cache.select_peers(count).await
213 }
214
215 pub async fn select_relay_peers(&self, count: usize) -> Vec<CachedPeer> {
217 self.cache.select_relay_peers(count).await
218 }
219
220 pub async fn select_coordinators(&self, count: usize) -> Vec<CachedPeer> {
222 self.cache.select_coordinators(count).await
223 }
224
225 pub async fn stats(&self) -> BootstrapStats {
227 let ant_stats = self.cache.stats().await;
228 BootstrapStats {
229 total_peers: ant_stats.total_peers,
230 relay_peers: ant_stats.relay_peers,
231 coordinator_peers: ant_stats.coordinator_peers,
232 average_quality: ant_stats.average_quality,
233 untested_peers: ant_stats.untested_peers,
234 }
235 }
236
237 pub async fn peer_count(&self) -> usize {
239 self.cache.peer_count().await
240 }
241
242 pub async fn save(&self) -> Result<()> {
244 self.cache.save().await.map_err(|e| {
245 P2PError::Bootstrap(BootstrapError::CacheError(
246 format!("Failed to save cache: {e}").into(),
247 ))
248 })
249 }
250
251 pub async fn update_capabilities(&self, addr: &SocketAddr, capabilities: PeerCapabilities) {
253 self.cache.update_capabilities(addr, capabilities).await;
254 }
255
256 pub async fn contains(&self, addr: &SocketAddr) -> bool {
258 self.cache.contains(addr).await
259 }
260
261 pub async fn get_peer(&self, addr: &SocketAddr) -> Option<CachedPeer> {
263 self.cache.get(addr).await
264 }
265
266 pub fn diversity_config(&self) -> &IPDiversityConfig {
268 &self.diversity_config
269 }
270}
271
272#[derive(Debug, Clone, Default)]
274pub struct BootstrapStats {
275 pub total_peers: usize,
277 pub relay_peers: usize,
279 pub coordinator_peers: usize,
281 pub average_quality: f64,
283 pub untested_peers: usize,
285}
286
287fn default_cache_dir() -> PathBuf {
289 if let Some(cache_dir) = dirs::cache_dir() {
290 cache_dir.join("saorsa").join("bootstrap")
291 } else if let Some(home) = dirs::home_dir() {
292 home.join(".cache").join("saorsa").join("bootstrap")
293 } else {
294 PathBuf::from(".saorsa-bootstrap-cache")
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use tempfile::TempDir;
302
303 fn test_config(temp_dir: &TempDir) -> BootstrapConfig {
305 BootstrapConfig {
306 cache_dir: temp_dir.path().to_path_buf(),
307 max_peers: 100,
308 epsilon: 0.0, rate_limit: JoinRateLimiterConfig::default(),
310 diversity: IPDiversityConfig::default(),
311 }
312 }
313
314 #[tokio::test]
315 async fn test_manager_creation() {
316 let temp_dir = TempDir::new().unwrap();
317 let config = test_config(&temp_dir);
318
319 let manager = BootstrapManager::with_config(config).await;
320 assert!(manager.is_ok());
321
322 let manager = manager.unwrap();
323 assert_eq!(manager.peer_count().await, 0);
324 }
325
326 #[tokio::test]
327 async fn test_add_and_get_peer() {
328 let temp_dir = TempDir::new().unwrap();
329 let config = test_config(&temp_dir);
330 let manager = BootstrapManager::with_config(config).await.unwrap();
331
332 let addr: SocketAddr = "10.0.0.1:9000".parse().unwrap();
334
335 let result = manager.add_peer(&addr, vec![addr]).await;
337 assert!(result.is_ok());
338
339 assert_eq!(manager.peer_count().await, 1);
341 assert!(manager.contains(&addr).await);
342 }
343
344 #[tokio::test]
345 async fn test_add_peer_no_addresses_fails() {
346 let temp_dir = TempDir::new().unwrap();
347 let config = test_config(&temp_dir);
348 let manager = BootstrapManager::with_config(config).await.unwrap();
349
350 let addr: SocketAddr = "10.0.0.1:9000".parse().unwrap();
351 let result = manager.add_peer(&addr, vec![]).await;
352
353 assert!(result.is_err());
354 assert!(matches!(
355 result.unwrap_err(),
356 P2PError::Bootstrap(BootstrapError::InvalidData(_))
357 ));
358 }
359
360 #[tokio::test]
361 async fn test_add_trusted_peer_bypasses_checks() {
362 let temp_dir = TempDir::new().unwrap();
363 let config = test_config(&temp_dir);
364 let manager = BootstrapManager::with_config(config).await.unwrap();
365
366 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
367
368 manager.add_peer_trusted(&addr, vec![addr]).await;
370
371 assert_eq!(manager.peer_count().await, 1);
372 assert!(manager.contains(&addr).await);
373 }
374
375 #[tokio::test]
376 async fn test_record_success_updates_quality() {
377 let temp_dir = TempDir::new().unwrap();
378 let config = test_config(&temp_dir);
379 let manager = BootstrapManager::with_config(config).await.unwrap();
380
381 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
382 manager.add_peer_trusted(&addr, vec![addr]).await;
383
384 let initial_peer = manager.get_peer(&addr).await.unwrap();
386 let initial_quality = initial_peer.quality_score;
387
388 for _ in 0..5 {
390 manager.record_success(&addr, 50).await;
391 }
392
393 let updated_peer = manager.get_peer(&addr).await.unwrap();
395 assert!(
396 updated_peer.quality_score >= initial_quality,
397 "Quality should improve after successes"
398 );
399 }
400
401 #[tokio::test]
402 async fn test_record_failure_decreases_quality() {
403 let temp_dir = TempDir::new().unwrap();
404 let config = test_config(&temp_dir);
405 let manager = BootstrapManager::with_config(config).await.unwrap();
406
407 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
408 manager.add_peer_trusted(&addr, vec![addr]).await;
409
410 for _ in 0..3 {
412 manager.record_success(&addr, 50).await;
413 }
414 let good_peer = manager.get_peer(&addr).await.unwrap();
415 let good_quality = good_peer.quality_score;
416
417 for _ in 0..5 {
419 manager.record_failure(&addr).await;
420 }
421
422 let bad_peer = manager.get_peer(&addr).await.unwrap();
424 assert!(
425 bad_peer.quality_score < good_quality,
426 "Quality should decrease after failures"
427 );
428 }
429
430 #[tokio::test]
431 async fn test_select_peers_returns_best() {
432 let temp_dir = TempDir::new().unwrap();
433 let config = test_config(&temp_dir);
434 let manager = BootstrapManager::with_config(config).await.unwrap();
435
436 for i in 0..10 {
438 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
439 manager.add_peer_trusted(&addr, vec![addr]).await;
440
441 for _ in 0..i {
443 manager.record_success(&addr, 50).await;
444 }
445 }
446
447 let selected = manager.select_peers(5).await;
449 assert_eq!(selected.len(), 5);
450
451 for i in 0..4 {
453 assert!(
454 selected[i].quality_score >= selected[i + 1].quality_score,
455 "Peers should be sorted by quality"
456 );
457 }
458 }
459
460 #[tokio::test]
461 async fn test_stats() {
462 let temp_dir = TempDir::new().unwrap();
463 let config = test_config(&temp_dir);
464 let manager = BootstrapManager::with_config(config).await.unwrap();
465
466 for i in 0..5 {
468 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
469 manager.add_peer_trusted(&addr, vec![addr]).await;
470 }
471
472 let stats = manager.stats().await;
473 assert_eq!(stats.total_peers, 5);
474 assert_eq!(stats.untested_peers, 5); }
476
477 #[tokio::test]
478 async fn test_persistence() {
479 let temp_dir = TempDir::new().unwrap();
480 let cache_path = temp_dir.path().to_path_buf();
481
482 {
484 let config = BootstrapConfig {
485 cache_dir: cache_path.clone(),
486 max_peers: 100,
487 epsilon: 0.0,
488 rate_limit: JoinRateLimiterConfig::default(),
489 diversity: IPDiversityConfig::default(),
490 };
491 let manager = BootstrapManager::with_config(config).await.unwrap();
492 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
493 manager.add_peer_trusted(&addr, vec![addr]).await;
494
495 let count_before = manager.peer_count().await;
497 assert_eq!(count_before, 1, "Peer should be in cache before save");
498
499 manager.save().await.unwrap();
501
502 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
504 }
505
506 {
508 let config = BootstrapConfig {
509 cache_dir: cache_path,
510 max_peers: 100,
511 epsilon: 0.0,
512 rate_limit: JoinRateLimiterConfig::default(),
513 diversity: IPDiversityConfig::default(),
514 };
515 let manager = BootstrapManager::with_config(config).await.unwrap();
516 let count = manager.peer_count().await;
517
518 if count == 0 {
521 eprintln!(
524 "Note: saorsa-transport BootstrapCache may have different persistence behavior"
525 );
526 }
527 }
530 }
531
532 #[tokio::test]
533 async fn test_rate_limiting() {
534 let temp_dir = TempDir::new().unwrap();
535
536 let diversity_config = IPDiversityConfig {
539 max_per_ip: Some(usize::MAX),
540 max_per_subnet: Some(usize::MAX),
541 };
542
543 let config = BootstrapConfig {
544 cache_dir: temp_dir.path().to_path_buf(),
545 max_peers: 100,
546 epsilon: 0.0,
547 rate_limit: JoinRateLimiterConfig {
548 max_joins_per_64_per_hour: 100, max_joins_per_48_per_hour: 100, max_joins_per_24_per_hour: 2, max_global_joins_per_minute: 100,
552 global_burst_size: 10,
553 },
554 diversity: diversity_config,
555 };
556
557 let manager = BootstrapManager::with_config(config).await.unwrap();
558
559 for i in 0..2 {
561 let addr: SocketAddr = format!("192.168.1.{}:{}", 10 + i, 9000 + i)
562 .parse()
563 .unwrap();
564 let result = manager.add_peer(&addr, vec![addr]).await;
565 assert!(
566 result.is_ok(),
567 "First 2 peers should be allowed: {:?}",
568 result
569 );
570 }
571
572 let addr: SocketAddr = "192.168.1.100:9100".parse().unwrap();
574 let result = manager.add_peer(&addr, vec![addr]).await;
575 assert!(result.is_err(), "Third peer should be rate limited");
576 assert!(matches!(
577 result.unwrap_err(),
578 P2PError::Bootstrap(BootstrapError::RateLimited(_))
579 ));
580 }
581}