ipfrs_network/
bootstrap.rs

1//! Bootstrap peer management with retry logic
2//!
3//! This module handles:
4//! - Bootstrap peer dialing with exponential backoff
5//! - Connection retry logic
6//! - Circuit breaker pattern for failing peers
7
8use libp2p::{Multiaddr, PeerId};
9use parking_lot::RwLock;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tracing::{debug, info, warn};
14
15/// Default bootstrap peers for IPFS network
16pub const DEFAULT_IPFS_BOOTSTRAP_PEERS: &[&str] = &[
17    "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
18    "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
19    "/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
20    "/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
21];
22
23/// Bootstrap configuration
24#[derive(Debug, Clone)]
25pub struct BootstrapConfig {
26    /// Maximum retry attempts per peer
27    pub max_retries: u32,
28    /// Initial backoff duration
29    pub initial_backoff: Duration,
30    /// Maximum backoff duration
31    pub max_backoff: Duration,
32    /// Backoff multiplier
33    pub backoff_multiplier: f64,
34    /// Circuit breaker failure threshold
35    pub circuit_breaker_threshold: u32,
36    /// Circuit breaker reset timeout
37    pub circuit_breaker_timeout: Duration,
38    /// Periodic re-bootstrap interval
39    pub re_bootstrap_interval: Duration,
40}
41
42impl Default for BootstrapConfig {
43    fn default() -> Self {
44        Self {
45            max_retries: 5,
46            initial_backoff: Duration::from_secs(1),
47            max_backoff: Duration::from_secs(60),
48            backoff_multiplier: 2.0,
49            circuit_breaker_threshold: 3,
50            circuit_breaker_timeout: Duration::from_secs(300),
51            re_bootstrap_interval: Duration::from_secs(300),
52        }
53    }
54}
55
56/// State of a bootstrap peer
57#[derive(Debug, Clone)]
58struct BootstrapPeerState {
59    /// Multiaddress of the peer
60    addr: Multiaddr,
61    /// Number of connection attempts
62    attempts: u32,
63    /// Number of consecutive failures
64    consecutive_failures: u32,
65    /// Last attempt time
66    last_attempt: Option<Instant>,
67    /// Last successful connection
68    last_success: Option<Instant>,
69    /// Circuit breaker state
70    circuit_open: bool,
71    /// Circuit breaker opened at
72    circuit_opened_at: Option<Instant>,
73    /// Current backoff duration
74    current_backoff: Duration,
75}
76
77impl BootstrapPeerState {
78    fn new(addr: Multiaddr, initial_backoff: Duration) -> Self {
79        Self {
80            addr,
81            attempts: 0,
82            consecutive_failures: 0,
83            last_attempt: None,
84            last_success: None,
85            circuit_open: false,
86            circuit_opened_at: None,
87            current_backoff: initial_backoff,
88        }
89    }
90
91    fn should_retry(&self, config: &BootstrapConfig) -> bool {
92        // Check circuit breaker
93        if self.circuit_open {
94            if let Some(opened_at) = self.circuit_opened_at {
95                if opened_at.elapsed() < config.circuit_breaker_timeout {
96                    return false;
97                }
98            }
99        }
100
101        // Check if we've exceeded max retries
102        if self.consecutive_failures >= config.max_retries {
103            return false;
104        }
105
106        // Check backoff
107        if let Some(last) = self.last_attempt {
108            if last.elapsed() < self.current_backoff {
109                return false;
110            }
111        }
112
113        true
114    }
115
116    fn record_attempt(&mut self) {
117        self.attempts += 1;
118        self.last_attempt = Some(Instant::now());
119    }
120
121    fn record_success(&mut self, initial_backoff: Duration) {
122        self.consecutive_failures = 0;
123        self.last_success = Some(Instant::now());
124        self.circuit_open = false;
125        self.circuit_opened_at = None;
126        self.current_backoff = initial_backoff;
127    }
128
129    fn record_failure(&mut self, config: &BootstrapConfig) {
130        self.consecutive_failures += 1;
131
132        // Increase backoff with exponential growth
133        let new_backoff =
134            Duration::from_secs_f64(self.current_backoff.as_secs_f64() * config.backoff_multiplier);
135        self.current_backoff = new_backoff.min(config.max_backoff);
136
137        // Check if we should open circuit breaker
138        if self.consecutive_failures >= config.circuit_breaker_threshold {
139            self.circuit_open = true;
140            self.circuit_opened_at = Some(Instant::now());
141            warn!(
142                "Circuit breaker opened for peer {} after {} failures",
143                self.addr, self.consecutive_failures
144            );
145        }
146    }
147}
148
149/// Bootstrap peer manager
150pub struct BootstrapManager {
151    /// Configuration
152    config: BootstrapConfig,
153    /// Bootstrap peer states
154    peers: Arc<RwLock<HashMap<String, BootstrapPeerState>>>,
155    /// Successfully connected peers
156    connected: Arc<RwLock<Vec<PeerId>>>,
157}
158
159impl BootstrapManager {
160    /// Create a new bootstrap manager
161    pub fn new(config: BootstrapConfig) -> Self {
162        Self {
163            config,
164            peers: Arc::new(RwLock::new(HashMap::new())),
165            connected: Arc::new(RwLock::new(Vec::new())),
166        }
167    }
168
169    /// Add a bootstrap peer
170    pub fn add_peer(&self, addr: Multiaddr) {
171        let key = addr.to_string();
172        let mut peers = self.peers.write();
173        peers
174            .entry(key)
175            .or_insert_with(|| BootstrapPeerState::new(addr, self.config.initial_backoff));
176    }
177
178    /// Add multiple bootstrap peers from strings
179    pub fn add_peers_from_strings(&self, addrs: &[String]) {
180        for addr_str in addrs {
181            if let Ok(addr) = addr_str.parse::<Multiaddr>() {
182                self.add_peer(addr);
183            } else {
184                warn!("Invalid bootstrap peer address: {}", addr_str);
185            }
186        }
187    }
188
189    /// Add default IPFS bootstrap peers
190    pub fn add_default_peers(&self) {
191        for addr_str in DEFAULT_IPFS_BOOTSTRAP_PEERS {
192            if let Ok(addr) = addr_str.parse::<Multiaddr>() {
193                self.add_peer(addr);
194            }
195        }
196        info!(
197            "Added {} default IPFS bootstrap peers",
198            DEFAULT_IPFS_BOOTSTRAP_PEERS.len()
199        );
200    }
201
202    /// Get peers that should be dialed
203    pub fn get_peers_to_dial(&self) -> Vec<Multiaddr> {
204        let peers = self.peers.read();
205        peers
206            .values()
207            .filter(|state| state.should_retry(&self.config))
208            .map(|state| state.addr.clone())
209            .collect()
210    }
211
212    /// Record a dial attempt
213    pub fn record_dial_attempt(&self, addr: &Multiaddr) {
214        let key = addr.to_string();
215        let mut peers = self.peers.write();
216        if let Some(state) = peers.get_mut(&key) {
217            state.record_attempt();
218            debug!("Recorded dial attempt for {}", addr);
219        }
220    }
221
222    /// Record a successful connection
223    pub fn record_connection_success(&self, addr: &Multiaddr, peer_id: PeerId) {
224        let key = addr.to_string();
225        let mut peers = self.peers.write();
226        if let Some(state) = peers.get_mut(&key) {
227            state.record_success(self.config.initial_backoff);
228            info!(
229                "Successfully connected to bootstrap peer {} ({})",
230                addr, peer_id
231            );
232        }
233
234        // Track connected peer
235        let mut connected = self.connected.write();
236        if !connected.contains(&peer_id) {
237            connected.push(peer_id);
238        }
239    }
240
241    /// Record a connection failure
242    pub fn record_connection_failure(&self, addr: &Multiaddr) {
243        let key = addr.to_string();
244        let mut peers = self.peers.write();
245        if let Some(state) = peers.get_mut(&key) {
246            state.record_failure(&self.config);
247            warn!(
248                "Failed to connect to bootstrap peer {}, backoff: {:?}",
249                addr, state.current_backoff
250            );
251        }
252    }
253
254    /// Record peer disconnection
255    pub fn record_disconnection(&self, peer_id: &PeerId) {
256        let mut connected = self.connected.write();
257        connected.retain(|p| p != peer_id);
258    }
259
260    /// Check if we have enough bootstrap connections
261    pub fn has_sufficient_connections(&self, min_peers: usize) -> bool {
262        self.connected.read().len() >= min_peers
263    }
264
265    /// Get number of connected bootstrap peers
266    pub fn connected_count(&self) -> usize {
267        self.connected.read().len()
268    }
269
270    /// Get bootstrap statistics
271    pub fn stats(&self) -> BootstrapStats {
272        let peers = self.peers.read();
273        let connected = self.connected.read();
274
275        let total_attempts: u32 = peers.values().map(|s| s.attempts).sum();
276        let total_failures: u32 = peers.values().map(|s| s.consecutive_failures).sum();
277        let open_circuits = peers.values().filter(|s| s.circuit_open).count();
278
279        BootstrapStats {
280            total_peers: peers.len(),
281            connected_peers: connected.len(),
282            total_attempts,
283            total_failures,
284            open_circuits,
285        }
286    }
287
288    /// Reset a peer's circuit breaker (for manual intervention)
289    pub fn reset_circuit_breaker(&self, addr: &Multiaddr) {
290        let key = addr.to_string();
291        let mut peers = self.peers.write();
292        if let Some(state) = peers.get_mut(&key) {
293            state.circuit_open = false;
294            state.circuit_opened_at = None;
295            state.consecutive_failures = 0;
296            state.current_backoff = self.config.initial_backoff;
297            info!("Reset circuit breaker for {}", addr);
298        }
299    }
300
301    /// Reset all circuit breakers
302    pub fn reset_all_circuit_breakers(&self) {
303        let mut peers = self.peers.write();
304        for state in peers.values_mut() {
305            state.circuit_open = false;
306            state.circuit_opened_at = None;
307            state.consecutive_failures = 0;
308            state.current_backoff = self.config.initial_backoff;
309        }
310        info!("Reset all circuit breakers");
311    }
312}
313
314impl Default for BootstrapManager {
315    fn default() -> Self {
316        Self::new(BootstrapConfig::default())
317    }
318}
319
320/// Bootstrap statistics
321#[derive(Debug, Clone, serde::Serialize)]
322pub struct BootstrapStats {
323    /// Total number of bootstrap peers
324    pub total_peers: usize,
325    /// Number of connected bootstrap peers
326    pub connected_peers: usize,
327    /// Total connection attempts
328    pub total_attempts: u32,
329    /// Total failures
330    pub total_failures: u32,
331    /// Number of open circuit breakers
332    pub open_circuits: usize,
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    #[test]
340    fn test_bootstrap_config_default() {
341        let config = BootstrapConfig::default();
342        assert_eq!(config.max_retries, 5);
343        assert_eq!(config.initial_backoff, Duration::from_secs(1));
344    }
345
346    #[test]
347    fn test_bootstrap_manager_add_peer() {
348        let manager = BootstrapManager::default();
349        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
350
351        manager.add_peer(addr.clone());
352        let peers = manager.get_peers_to_dial();
353        assert_eq!(peers.len(), 1);
354        assert_eq!(peers[0], addr);
355    }
356
357    #[test]
358    fn test_bootstrap_manager_backoff() {
359        let config = BootstrapConfig {
360            initial_backoff: Duration::from_millis(10),
361            max_backoff: Duration::from_secs(1),
362            backoff_multiplier: 2.0,
363            ..Default::default()
364        };
365        let manager = BootstrapManager::new(config);
366        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
367
368        manager.add_peer(addr.clone());
369        manager.record_dial_attempt(&addr);
370        manager.record_connection_failure(&addr);
371
372        // Should not retry immediately due to backoff
373        let peers = manager.get_peers_to_dial();
374        assert!(peers.is_empty());
375
376        // Wait for backoff
377        std::thread::sleep(Duration::from_millis(25));
378        let peers = manager.get_peers_to_dial();
379        assert_eq!(peers.len(), 1);
380    }
381
382    #[test]
383    fn test_bootstrap_manager_circuit_breaker() {
384        let config = BootstrapConfig {
385            initial_backoff: Duration::from_millis(1),
386            circuit_breaker_threshold: 2,
387            circuit_breaker_timeout: Duration::from_secs(1),
388            ..Default::default()
389        };
390        let manager = BootstrapManager::new(config);
391        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
392
393        manager.add_peer(addr.clone());
394
395        // First failure
396        manager.record_dial_attempt(&addr);
397        manager.record_connection_failure(&addr);
398        std::thread::sleep(Duration::from_millis(5));
399
400        // Second failure - circuit should open
401        manager.record_dial_attempt(&addr);
402        manager.record_connection_failure(&addr);
403
404        // Should not retry with open circuit
405        let peers = manager.get_peers_to_dial();
406        assert!(peers.is_empty());
407
408        // Check stats
409        let stats = manager.stats();
410        assert_eq!(stats.open_circuits, 1);
411    }
412
413    #[test]
414    fn test_bootstrap_manager_success_resets() {
415        let config = BootstrapConfig {
416            initial_backoff: Duration::from_millis(1),
417            ..Default::default()
418        };
419        let manager = BootstrapManager::new(config);
420        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
421        let peer_id = PeerId::random();
422
423        manager.add_peer(addr.clone());
424        manager.record_dial_attempt(&addr);
425        manager.record_connection_failure(&addr);
426
427        // Success should reset
428        manager.record_connection_success(&addr, peer_id);
429
430        assert!(manager.has_sufficient_connections(1));
431        assert_eq!(manager.connected_count(), 1);
432    }
433}