1use libp2p::{Multiaddr, PeerId};
9use parking_lot::RwLock;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tracing::{debug, info, warn};
14
15pub const DEFAULT_IPFS_BOOTSTRAP_PEERS: &[&str] = &[
17 "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
18 "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
19 "/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
20 "/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
21];
22
23#[derive(Debug, Clone)]
25pub struct BootstrapConfig {
26 pub max_retries: u32,
28 pub initial_backoff: Duration,
30 pub max_backoff: Duration,
32 pub backoff_multiplier: f64,
34 pub circuit_breaker_threshold: u32,
36 pub circuit_breaker_timeout: Duration,
38 pub re_bootstrap_interval: Duration,
40}
41
42impl Default for BootstrapConfig {
43 fn default() -> Self {
44 Self {
45 max_retries: 5,
46 initial_backoff: Duration::from_secs(1),
47 max_backoff: Duration::from_secs(60),
48 backoff_multiplier: 2.0,
49 circuit_breaker_threshold: 3,
50 circuit_breaker_timeout: Duration::from_secs(300),
51 re_bootstrap_interval: Duration::from_secs(300),
52 }
53 }
54}
55
56#[derive(Debug, Clone)]
58struct BootstrapPeerState {
59 addr: Multiaddr,
61 attempts: u32,
63 consecutive_failures: u32,
65 last_attempt: Option<Instant>,
67 last_success: Option<Instant>,
69 circuit_open: bool,
71 circuit_opened_at: Option<Instant>,
73 current_backoff: Duration,
75}
76
77impl BootstrapPeerState {
78 fn new(addr: Multiaddr, initial_backoff: Duration) -> Self {
79 Self {
80 addr,
81 attempts: 0,
82 consecutive_failures: 0,
83 last_attempt: None,
84 last_success: None,
85 circuit_open: false,
86 circuit_opened_at: None,
87 current_backoff: initial_backoff,
88 }
89 }
90
91 fn should_retry(&self, config: &BootstrapConfig) -> bool {
92 if self.circuit_open {
94 if let Some(opened_at) = self.circuit_opened_at {
95 if opened_at.elapsed() < config.circuit_breaker_timeout {
96 return false;
97 }
98 }
99 }
100
101 if self.consecutive_failures >= config.max_retries {
103 return false;
104 }
105
106 if let Some(last) = self.last_attempt {
108 if last.elapsed() < self.current_backoff {
109 return false;
110 }
111 }
112
113 true
114 }
115
116 fn record_attempt(&mut self) {
117 self.attempts += 1;
118 self.last_attempt = Some(Instant::now());
119 }
120
121 fn record_success(&mut self, initial_backoff: Duration) {
122 self.consecutive_failures = 0;
123 self.last_success = Some(Instant::now());
124 self.circuit_open = false;
125 self.circuit_opened_at = None;
126 self.current_backoff = initial_backoff;
127 }
128
129 fn record_failure(&mut self, config: &BootstrapConfig) {
130 self.consecutive_failures += 1;
131
132 let new_backoff =
134 Duration::from_secs_f64(self.current_backoff.as_secs_f64() * config.backoff_multiplier);
135 self.current_backoff = new_backoff.min(config.max_backoff);
136
137 if self.consecutive_failures >= config.circuit_breaker_threshold {
139 self.circuit_open = true;
140 self.circuit_opened_at = Some(Instant::now());
141 warn!(
142 "Circuit breaker opened for peer {} after {} failures",
143 self.addr, self.consecutive_failures
144 );
145 }
146 }
147}
148
149pub struct BootstrapManager {
151 config: BootstrapConfig,
153 peers: Arc<RwLock<HashMap<String, BootstrapPeerState>>>,
155 connected: Arc<RwLock<Vec<PeerId>>>,
157}
158
159impl BootstrapManager {
160 pub fn new(config: BootstrapConfig) -> Self {
162 Self {
163 config,
164 peers: Arc::new(RwLock::new(HashMap::new())),
165 connected: Arc::new(RwLock::new(Vec::new())),
166 }
167 }
168
169 pub fn add_peer(&self, addr: Multiaddr) {
171 let key = addr.to_string();
172 let mut peers = self.peers.write();
173 peers
174 .entry(key)
175 .or_insert_with(|| BootstrapPeerState::new(addr, self.config.initial_backoff));
176 }
177
178 pub fn add_peers_from_strings(&self, addrs: &[String]) {
180 for addr_str in addrs {
181 if let Ok(addr) = addr_str.parse::<Multiaddr>() {
182 self.add_peer(addr);
183 } else {
184 warn!("Invalid bootstrap peer address: {}", addr_str);
185 }
186 }
187 }
188
189 pub fn add_default_peers(&self) {
191 for addr_str in DEFAULT_IPFS_BOOTSTRAP_PEERS {
192 if let Ok(addr) = addr_str.parse::<Multiaddr>() {
193 self.add_peer(addr);
194 }
195 }
196 info!(
197 "Added {} default IPFS bootstrap peers",
198 DEFAULT_IPFS_BOOTSTRAP_PEERS.len()
199 );
200 }
201
202 pub fn get_peers_to_dial(&self) -> Vec<Multiaddr> {
204 let peers = self.peers.read();
205 peers
206 .values()
207 .filter(|state| state.should_retry(&self.config))
208 .map(|state| state.addr.clone())
209 .collect()
210 }
211
212 pub fn record_dial_attempt(&self, addr: &Multiaddr) {
214 let key = addr.to_string();
215 let mut peers = self.peers.write();
216 if let Some(state) = peers.get_mut(&key) {
217 state.record_attempt();
218 debug!("Recorded dial attempt for {}", addr);
219 }
220 }
221
222 pub fn record_connection_success(&self, addr: &Multiaddr, peer_id: PeerId) {
224 let key = addr.to_string();
225 let mut peers = self.peers.write();
226 if let Some(state) = peers.get_mut(&key) {
227 state.record_success(self.config.initial_backoff);
228 info!(
229 "Successfully connected to bootstrap peer {} ({})",
230 addr, peer_id
231 );
232 }
233
234 let mut connected = self.connected.write();
236 if !connected.contains(&peer_id) {
237 connected.push(peer_id);
238 }
239 }
240
241 pub fn record_connection_failure(&self, addr: &Multiaddr) {
243 let key = addr.to_string();
244 let mut peers = self.peers.write();
245 if let Some(state) = peers.get_mut(&key) {
246 state.record_failure(&self.config);
247 warn!(
248 "Failed to connect to bootstrap peer {}, backoff: {:?}",
249 addr, state.current_backoff
250 );
251 }
252 }
253
254 pub fn record_disconnection(&self, peer_id: &PeerId) {
256 let mut connected = self.connected.write();
257 connected.retain(|p| p != peer_id);
258 }
259
260 pub fn has_sufficient_connections(&self, min_peers: usize) -> bool {
262 self.connected.read().len() >= min_peers
263 }
264
265 pub fn connected_count(&self) -> usize {
267 self.connected.read().len()
268 }
269
270 pub fn stats(&self) -> BootstrapStats {
272 let peers = self.peers.read();
273 let connected = self.connected.read();
274
275 let total_attempts: u32 = peers.values().map(|s| s.attempts).sum();
276 let total_failures: u32 = peers.values().map(|s| s.consecutive_failures).sum();
277 let open_circuits = peers.values().filter(|s| s.circuit_open).count();
278
279 BootstrapStats {
280 total_peers: peers.len(),
281 connected_peers: connected.len(),
282 total_attempts,
283 total_failures,
284 open_circuits,
285 }
286 }
287
288 pub fn reset_circuit_breaker(&self, addr: &Multiaddr) {
290 let key = addr.to_string();
291 let mut peers = self.peers.write();
292 if let Some(state) = peers.get_mut(&key) {
293 state.circuit_open = false;
294 state.circuit_opened_at = None;
295 state.consecutive_failures = 0;
296 state.current_backoff = self.config.initial_backoff;
297 info!("Reset circuit breaker for {}", addr);
298 }
299 }
300
301 pub fn reset_all_circuit_breakers(&self) {
303 let mut peers = self.peers.write();
304 for state in peers.values_mut() {
305 state.circuit_open = false;
306 state.circuit_opened_at = None;
307 state.consecutive_failures = 0;
308 state.current_backoff = self.config.initial_backoff;
309 }
310 info!("Reset all circuit breakers");
311 }
312}
313
314impl Default for BootstrapManager {
315 fn default() -> Self {
316 Self::new(BootstrapConfig::default())
317 }
318}
319
320#[derive(Debug, Clone, serde::Serialize)]
322pub struct BootstrapStats {
323 pub total_peers: usize,
325 pub connected_peers: usize,
327 pub total_attempts: u32,
329 pub total_failures: u32,
331 pub open_circuits: usize,
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 #[test]
340 fn test_bootstrap_config_default() {
341 let config = BootstrapConfig::default();
342 assert_eq!(config.max_retries, 5);
343 assert_eq!(config.initial_backoff, Duration::from_secs(1));
344 }
345
346 #[test]
347 fn test_bootstrap_manager_add_peer() {
348 let manager = BootstrapManager::default();
349 let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
350
351 manager.add_peer(addr.clone());
352 let peers = manager.get_peers_to_dial();
353 assert_eq!(peers.len(), 1);
354 assert_eq!(peers[0], addr);
355 }
356
357 #[test]
358 fn test_bootstrap_manager_backoff() {
359 let config = BootstrapConfig {
360 initial_backoff: Duration::from_millis(10),
361 max_backoff: Duration::from_secs(1),
362 backoff_multiplier: 2.0,
363 ..Default::default()
364 };
365 let manager = BootstrapManager::new(config);
366 let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
367
368 manager.add_peer(addr.clone());
369 manager.record_dial_attempt(&addr);
370 manager.record_connection_failure(&addr);
371
372 let peers = manager.get_peers_to_dial();
374 assert!(peers.is_empty());
375
376 std::thread::sleep(Duration::from_millis(25));
378 let peers = manager.get_peers_to_dial();
379 assert_eq!(peers.len(), 1);
380 }
381
382 #[test]
383 fn test_bootstrap_manager_circuit_breaker() {
384 let config = BootstrapConfig {
385 initial_backoff: Duration::from_millis(1),
386 circuit_breaker_threshold: 2,
387 circuit_breaker_timeout: Duration::from_secs(1),
388 ..Default::default()
389 };
390 let manager = BootstrapManager::new(config);
391 let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
392
393 manager.add_peer(addr.clone());
394
395 manager.record_dial_attempt(&addr);
397 manager.record_connection_failure(&addr);
398 std::thread::sleep(Duration::from_millis(5));
399
400 manager.record_dial_attempt(&addr);
402 manager.record_connection_failure(&addr);
403
404 let peers = manager.get_peers_to_dial();
406 assert!(peers.is_empty());
407
408 let stats = manager.stats();
410 assert_eq!(stats.open_circuits, 1);
411 }
412
413 #[test]
414 fn test_bootstrap_manager_success_resets() {
415 let config = BootstrapConfig {
416 initial_backoff: Duration::from_millis(1),
417 ..Default::default()
418 };
419 let manager = BootstrapManager::new(config);
420 let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
421 let peer_id = PeerId::random();
422
423 manager.add_peer(addr.clone());
424 manager.record_dial_attempt(&addr);
425 manager.record_connection_failure(&addr);
426
427 manager.record_connection_success(&addr, peer_id);
429
430 assert!(manager.has_sufficient_connections(1));
431 assert_eq!(manager.connected_count(), 1);
432 }
433}