Skip to main content

firecloud_net/
relay_manager.rs

1//! Relay connection management for NAT traversal
2//!
3//! This module handles:
4//! - Connecting to bootstrap relay servers
5//! - Listening on p2p-circuit addresses through relays
6//! - Maintaining a pool of active relay connections
7//! - Automatic reconnection to relays
8
9use libp2p::{Multiaddr, PeerId};
10use std::collections::{HashMap, HashSet};
11use std::time::{Duration, Instant};
12
13/// Configuration for relay connection management
14#[derive(Debug, Clone)]
15pub struct RelayManagerConfig {
16    /// Bootstrap relay addresses to connect to
17    pub bootstrap_relays: Vec<Multiaddr>,
18    /// Minimum number of active relay connections to maintain
19    pub min_active_relays: usize,
20    /// Maximum number of relay connections
21    pub max_relay_connections: usize,
22    /// How often to check and reconnect to relays (in seconds)
23    pub reconnect_interval: Duration,
24    /// Timeout for relay connection attempts
25    pub connection_timeout: Duration,
26}
27
28impl Default for RelayManagerConfig {
29    fn default() -> Self {
30        Self {
31            bootstrap_relays: Vec::new(),
32            min_active_relays: 2,
33            max_relay_connections: 5,
34            reconnect_interval: Duration::from_secs(30),
35            connection_timeout: Duration::from_secs(10),
36        }
37    }
38}
39
40/// Status of a relay connection
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum RelayStatus {
43    /// Not yet connected
44    Disconnected,
45    /// Connection in progress
46    Connecting,
47    /// Successfully connected
48    Connected,
49    /// Connection failed
50    Failed { reason: String },
51    /// Listening on relay (p2p-circuit address active)
52    Listening,
53}
54
55/// Information about a relay connection
56#[derive(Debug, Clone)]
57pub struct RelayInfo {
58    /// Multiaddr of the relay server
59    pub address: Multiaddr,
60    /// PeerId of the relay server (if known)
61    pub peer_id: Option<PeerId>,
62    /// Current connection status
63    pub status: RelayStatus,
64    /// Last connection attempt time
65    pub last_attempt: Option<Instant>,
66    /// Number of failed connection attempts
67    pub failed_attempts: u32,
68    /// p2p-circuit address (if listening)
69    pub circuit_address: Option<Multiaddr>,
70}
71
72impl RelayInfo {
73    pub fn new(address: Multiaddr) -> Self {
74        Self {
75            address,
76            peer_id: None,
77            status: RelayStatus::Disconnected,
78            last_attempt: None,
79            failed_attempts: 0,
80            circuit_address: None,
81        }
82    }
83
84    /// Mark connection attempt started
85    pub fn mark_connecting(&mut self) {
86        self.status = RelayStatus::Connecting;
87        self.last_attempt = Some(Instant::now());
88    }
89
90    /// Mark connection as successful
91    pub fn mark_connected(&mut self, peer_id: PeerId) {
92        self.status = RelayStatus::Connected;
93        self.peer_id = Some(peer_id);
94        self.failed_attempts = 0;
95    }
96
97    /// Mark connection as failed
98    pub fn mark_failed(&mut self, reason: String) {
99        self.status = RelayStatus::Failed { reason };
100        self.failed_attempts += 1;
101    }
102
103    /// Mark as listening on relay
104    pub fn mark_listening(&mut self, circuit_address: Multiaddr) {
105        self.status = RelayStatus::Listening;
106        self.circuit_address = Some(circuit_address);
107    }
108
109    /// Check if relay is active (connected or listening)
110    pub fn is_active(&self) -> bool {
111        matches!(
112            self.status,
113            RelayStatus::Connected | RelayStatus::Listening
114        )
115    }
116
117    /// Check if we should retry connection
118    pub fn should_retry(&self, reconnect_interval: Duration) -> bool {
119        match &self.status {
120            RelayStatus::Disconnected => true,
121            RelayStatus::Failed { .. } => {
122                // Exponential backoff: wait longer after more failures
123                let backoff = reconnect_interval
124                    .mul_f32(2_f32.powi(self.failed_attempts.min(5) as i32));
125                
126                self.last_attempt
127                    .map(|t| t.elapsed() >= backoff)
128                    .unwrap_or(true)
129            }
130            _ => false,
131        }
132    }
133}
134
135/// Manages relay connections for NAT traversal
136pub struct RelayManager {
137    config: RelayManagerConfig,
138    relays: HashMap<Multiaddr, RelayInfo>,
139    connected_peers: HashSet<PeerId>,
140}
141
142impl RelayManager {
143    /// Create a new relay manager with the given configuration
144    pub fn new(config: RelayManagerConfig) -> Self {
145        let mut relays = HashMap::new();
146        
147        // Initialize relay info for bootstrap relays
148        for addr in &config.bootstrap_relays {
149            relays.insert(addr.clone(), RelayInfo::new(addr.clone()));
150        }
151
152        Self {
153            config,
154            relays,
155            connected_peers: HashSet::new(),
156        }
157    }
158
159    /// Get the configuration
160    pub fn config(&self) -> &RelayManagerConfig {
161        &self.config
162    }
163
164    /// Add a new relay address to connect to
165    pub fn add_relay(&mut self, address: Multiaddr) {
166        if !self.relays.contains_key(&address) && 
167           self.relays.len() < self.config.max_relay_connections {
168            self.relays.insert(address.clone(), RelayInfo::new(address));
169        }
170    }
171
172    /// Remove a relay address
173    pub fn remove_relay(&mut self, address: &Multiaddr) -> Option<RelayInfo> {
174        if let Some(info) = self.relays.remove(address) {
175            if let Some(peer_id) = info.peer_id {
176                self.connected_peers.remove(&peer_id);
177            }
178            Some(info)
179        } else {
180            None
181        }
182    }
183
184    /// Get relay info by address
185    pub fn get_relay(&self, address: &Multiaddr) -> Option<&RelayInfo> {
186        self.relays.get(address)
187    }
188
189    /// Get mutable relay info by address
190    pub fn get_relay_mut(&mut self, address: &Multiaddr) -> Option<&mut RelayInfo> {
191        self.relays.get_mut(address)
192    }
193
194    /// Get relay info by peer ID
195    pub fn get_relay_by_peer(&self, peer_id: &PeerId) -> Option<&RelayInfo> {
196        self.relays
197            .values()
198            .find(|r| r.peer_id.as_ref() == Some(peer_id))
199    }
200
201    /// Get mutable relay info by peer ID
202    pub fn get_relay_by_peer_mut(&mut self, peer_id: &PeerId) -> Option<&mut RelayInfo> {
203        self.relays
204            .values_mut()
205            .find(|r| r.peer_id.as_ref() == Some(peer_id))
206    }
207
208    /// Get all relay addresses
209    pub fn relay_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
210        self.relays.keys()
211    }
212
213    /// Get all active relay addresses
214    pub fn active_relay_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
215        self.relays
216            .iter()
217            .filter(|(_, info)| info.is_active())
218            .map(|(addr, _)| addr)
219    }
220
221    /// Get number of active relays
222    pub fn active_relay_count(&self) -> usize {
223        self.relays.values().filter(|r| r.is_active()).count()
224    }
225
226    /// Check if we need more relay connections
227    pub fn needs_more_relays(&self) -> bool {
228        self.active_relay_count() < self.config.min_active_relays
229    }
230
231    /// Get relays that should be (re)connected
232    pub fn relays_to_connect(&self) -> Vec<Multiaddr> {
233        self.relays
234            .values()
235            .filter(|info| info.should_retry(self.config.reconnect_interval))
236            .map(|info| info.address.clone())
237            .collect()
238    }
239
240    /// Mark a relay as connecting
241    pub fn mark_connecting(&mut self, address: &Multiaddr) {
242        if let Some(relay) = self.relays.get_mut(address) {
243            relay.mark_connecting();
244        }
245    }
246
247    /// Mark a relay as connected
248    pub fn mark_connected(&mut self, address: &Multiaddr, peer_id: PeerId) {
249        if let Some(relay) = self.relays.get_mut(address) {
250            relay.mark_connected(peer_id);
251            self.connected_peers.insert(peer_id);
252        }
253    }
254
255    /// Mark a relay as failed
256    pub fn mark_failed(&mut self, address: &Multiaddr, reason: String) {
257        if let Some(relay) = self.relays.get_mut(address) {
258            relay.mark_failed(reason);
259        }
260    }
261
262    /// Mark a relay as listening (p2p-circuit address active)
263    pub fn mark_listening(&mut self, peer_id: &PeerId, circuit_address: Multiaddr) {
264        if let Some(relay) = self.get_relay_by_peer_mut(peer_id) {
265            relay.mark_listening(circuit_address);
266        }
267    }
268
269    /// Get all p2p-circuit addresses we're listening on
270    pub fn circuit_addresses(&self) -> Vec<Multiaddr> {
271        self.relays
272            .values()
273            .filter_map(|r| r.circuit_address.clone())
274            .collect()
275    }
276
277    /// Get statistics about relay connections
278    pub fn stats(&self) -> RelayStats {
279        let total = self.relays.len();
280        let connected = self.relays.values().filter(|r| {
281            matches!(r.status, RelayStatus::Connected | RelayStatus::Listening)
282        }).count();
283        let listening = self.relays.values().filter(|r| {
284            matches!(r.status, RelayStatus::Listening)
285        }).count();
286        let failed = self.relays.values().filter(|r| {
287            matches!(r.status, RelayStatus::Failed { .. })
288        }).count();
289
290        RelayStats {
291            total_relays: total,
292            connected_relays: connected,
293            listening_relays: listening,
294            failed_relays: failed,
295            circuit_addresses: self.circuit_addresses().len(),
296        }
297    }
298}
299
300/// Statistics about relay connections
301#[derive(Debug, Clone)]
302pub struct RelayStats {
303    pub total_relays: usize,
304    pub connected_relays: usize,
305    pub listening_relays: usize,
306    pub failed_relays: usize,
307    pub circuit_addresses: usize,
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    #[test]
315    fn test_relay_info_lifecycle() {
316        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
317        let mut info = RelayInfo::new(addr.clone());
318
319        // Initially disconnected
320        assert_eq!(info.status, RelayStatus::Disconnected);
321        assert!(!info.is_active());
322
323        // Mark as connecting
324        info.mark_connecting();
325        assert_eq!(info.status, RelayStatus::Connecting);
326        assert!(!info.is_active());
327
328        // Mark as connected
329        let peer_id = PeerId::random();
330        info.mark_connected(peer_id);
331        assert_eq!(info.status, RelayStatus::Connected);
332        assert!(info.is_active());
333        assert_eq!(info.peer_id, Some(peer_id));
334        assert_eq!(info.failed_attempts, 0);
335    }
336
337    #[test]
338    fn test_relay_manager_basic() {
339        let config = RelayManagerConfig {
340            bootstrap_relays: vec![
341                "/ip4/127.0.0.1/tcp/4001".parse().unwrap(),
342                "/ip4/127.0.0.1/tcp/4002".parse().unwrap(),
343            ],
344            min_active_relays: 2,
345            ..Default::default()
346        };
347
348        let manager = RelayManager::new(config);
349
350        assert_eq!(manager.relay_addresses().count(), 2);
351        assert_eq!(manager.active_relay_count(), 0);
352        assert!(manager.needs_more_relays());
353    }
354
355    #[test]
356    fn test_relay_manager_connect() {
357        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
358        let config = RelayManagerConfig {
359            bootstrap_relays: vec![addr.clone()],
360            min_active_relays: 1,
361            ..Default::default()
362        };
363
364        let mut manager = RelayManager::new(config);
365        let peer_id = PeerId::random();
366
367        // Initially needs more relays
368        assert!(manager.needs_more_relays());
369
370        // Connect to relay
371        manager.mark_connected(&addr, peer_id);
372        assert_eq!(manager.active_relay_count(), 1);
373        assert!(!manager.needs_more_relays());
374
375        // Check relay info
376        let info = manager.get_relay(&addr).unwrap();
377        assert_eq!(info.status, RelayStatus::Connected);
378        assert_eq!(info.peer_id, Some(peer_id));
379    }
380
381    #[test]
382    fn test_relay_retry_backoff() {
383        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
384        let mut info = RelayInfo::new(addr);
385        let interval = Duration::from_secs(10);
386
387        // Should retry initially
388        assert!(info.should_retry(interval));
389
390        // Mark as connecting first (sets last_attempt)
391        info.mark_connecting();
392        
393        // Mark as failed immediately after
394        info.mark_failed("test error".to_string());
395        
396        // Should not retry immediately (backoff)
397        // Backoff for 1 failure: 10s * 2^1 = 20s
398        assert!(!info.should_retry(interval));
399
400        // After multiple failures, backoff increases
401        assert_eq!(info.failed_attempts, 1);
402    }
403
404    #[test]
405    fn test_circuit_addresses() {
406        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/4001".parse().unwrap();
407        let config = RelayManagerConfig {
408            bootstrap_relays: vec![addr.clone()],
409            ..Default::default()
410        };
411
412        let mut manager = RelayManager::new(config);
413        let peer_id = PeerId::random();
414        let circuit_addr: Multiaddr = format!(
415            "/ip4/127.0.0.1/tcp/4001/p2p/{}/p2p-circuit",
416            peer_id
417        ).parse().unwrap();
418
419        manager.mark_connected(&addr, peer_id);
420        manager.mark_listening(&peer_id, circuit_addr.clone());
421
422        let circuits = manager.circuit_addresses();
423        assert_eq!(circuits.len(), 1);
424        assert_eq!(circuits[0], circuit_addr);
425
426        let stats = manager.stats();
427        assert_eq!(stats.listening_relays, 1);
428    }
429}