Skip to main content

firecloud_net/
health.rs

1//! Peer health monitoring system
2//!
3//! Tracks the health and availability of peers in the network using:
4//! - Periodic ping measurements (RTT)
5//! - Success/failure tracking
6//! - Health score calculation
7//! - Automatic peer status updates
8
9use libp2p::PeerId;
10use std::collections::HashMap;
11use std::time::{Duration, Instant};
12use tracing::{debug, warn};
13
14/// Health status of a peer
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum PeerHealth {
17    /// Peer is healthy and responsive
18    Healthy,
19    /// Peer is degraded (high latency or some failures)
20    Degraded,
21    /// Peer is unhealthy (many failures or timeouts)
22    Unhealthy,
23    /// Peer status is unknown (newly discovered)
24    Unknown,
25}
26
27impl PeerHealth {
28    /// Convert health to a numeric score (0-100)
29    pub fn score(&self) -> u8 {
30        match self {
31            PeerHealth::Healthy => 100,
32            PeerHealth::Degraded => 50,
33            PeerHealth::Unhealthy => 10,
34            PeerHealth::Unknown => 0,
35        }
36    }
37}
38
39/// Statistics for a single peer
40#[derive(Debug, Clone)]
41pub struct PeerStats {
42    /// Unique peer identifier
43    pub peer_id: PeerId,
44    /// Last time we successfully communicated with this peer
45    pub last_seen: Instant,
46    /// Number of successful ping responses
47    pub success_count: u32,
48    /// Number of failed pings or timeouts
49    pub fail_count: u32,
50    /// Average round-trip time in milliseconds
51    pub avg_rtt_ms: Option<u64>,
52    /// Last measured RTT
53    pub last_rtt_ms: Option<u64>,
54    /// Current health status
55    pub health: PeerHealth,
56    /// Whether this peer is on the local network
57    pub is_local: bool,
58    /// Total number of bytes sent to this peer
59    pub bytes_sent: u64,
60    /// Total number of bytes received from this peer
61    pub bytes_received: u64,
62}
63
64impl PeerStats {
65    /// Create new peer stats for a discovered peer
66    pub fn new(peer_id: PeerId, is_local: bool) -> Self {
67        Self {
68            peer_id,
69            last_seen: Instant::now(),
70            success_count: 0,
71            fail_count: 0,
72            avg_rtt_ms: None,
73            last_rtt_ms: None,
74            health: PeerHealth::Unknown,
75            is_local,
76            bytes_sent: 0,
77            bytes_received: 0,
78        }
79    }
80
81    /// Record a successful ping response
82    pub fn record_success(&mut self, rtt: Duration) {
83        self.last_seen = Instant::now();
84        self.success_count += 1;
85        
86        let rtt_ms = rtt.as_millis() as u64;
87        self.last_rtt_ms = Some(rtt_ms);
88        
89        // Update rolling average RTT
90        if let Some(avg) = self.avg_rtt_ms {
91            // Exponential moving average: new_avg = 0.7 * old_avg + 0.3 * new_value
92            self.avg_rtt_ms = Some((avg * 7 + rtt_ms * 3) / 10);
93        } else {
94            self.avg_rtt_ms = Some(rtt_ms);
95        }
96        
97        self.update_health();
98    }
99
100    /// Record a failed ping or timeout
101    pub fn record_failure(&mut self) {
102        self.fail_count += 1;
103        self.update_health();
104    }
105
106    /// Update health status based on current statistics
107    fn update_health(&mut self) {
108        let total = self.success_count + self.fail_count;
109        if total == 0 {
110            self.health = PeerHealth::Unknown;
111            return;
112        }
113
114        let fail_rate = self.fail_count as f64 / total as f64;
115        let avg_rtt = self.avg_rtt_ms.unwrap_or(0);
116
117        // Calculate health score (0-100)
118        // Penalize: high failure rate and high RTT
119        let health_score = 100.0 - (fail_rate * 50.0) - (avg_rtt as f64 / 10.0).min(50.0);
120
121        self.health = if health_score >= 70.0 {
122            PeerHealth::Healthy
123        } else if health_score >= 40.0 {
124            PeerHealth::Degraded
125        } else {
126            PeerHealth::Unhealthy
127        };
128
129        debug!(
130            "Peer {:?} health updated: {:?} (score: {:.1}, fail_rate: {:.2}, avg_rtt: {}ms)",
131            self.peer_id, self.health, health_score, fail_rate, avg_rtt
132        );
133    }
134
135    /// Check if peer should be considered offline
136    pub fn is_offline(&self, timeout: Duration) -> bool {
137        self.last_seen.elapsed() > timeout
138    }
139
140    /// Record data transfer statistics
141    pub fn record_transfer(&mut self, sent: u64, received: u64) {
142        self.bytes_sent += sent;
143        self.bytes_received += received;
144        self.last_seen = Instant::now();
145    }
146
147    /// Calculate the overall health score (0-100)
148    pub fn health_score(&self) -> u8 {
149        let total = self.success_count + self.fail_count;
150        if total == 0 {
151            return 0;
152        }
153
154        let fail_rate = self.fail_count as f64 / total as f64;
155        let avg_rtt = self.avg_rtt_ms.unwrap_or(1000);
156        
157        let score = 100.0 - (fail_rate * 50.0) - (avg_rtt as f64 / 10.0).min(50.0);
158        score.max(0.0).min(100.0) as u8
159    }
160}
161
162/// Health monitor for tracking all peers in the network
163pub struct HealthMonitor {
164    /// Map of peer_id to their statistics
165    peers: HashMap<PeerId, PeerStats>,
166    /// How long before considering a peer offline
167    offline_timeout: Duration,
168    /// Minimum health score to consider a peer usable
169    min_health_score: u8,
170}
171
172impl HealthMonitor {
173    /// Create a new health monitor
174    pub fn new() -> Self {
175        Self {
176            peers: HashMap::new(),
177            offline_timeout: Duration::from_secs(120), // 2 minutes
178            min_health_score: 30, // Minimum score of 30/100
179        }
180    }
181
182    /// Add or update a peer
183    pub fn add_peer(&mut self, peer_id: PeerId, is_local: bool) {
184        self.peers
185            .entry(peer_id)
186            .or_insert_with(|| PeerStats::new(peer_id, is_local));
187    }
188
189    /// Remove a peer from tracking
190    pub fn remove_peer(&mut self, peer_id: &PeerId) {
191        self.peers.remove(peer_id);
192    }
193
194    /// Record a successful ping to a peer
195    pub fn record_ping_success(&mut self, peer_id: &PeerId, rtt: Duration) {
196        if let Some(stats) = self.peers.get_mut(peer_id) {
197            stats.record_success(rtt);
198        }
199    }
200
201    /// Record a failed ping to a peer
202    pub fn record_ping_failure(&mut self, peer_id: &PeerId) {
203        if let Some(stats) = self.peers.get_mut(peer_id) {
204            stats.record_failure();
205        }
206    }
207
208    /// Record data transfer with a peer
209    pub fn record_transfer(&mut self, peer_id: &PeerId, sent: u64, received: u64) {
210        if let Some(stats) = self.peers.get_mut(peer_id) {
211            stats.record_transfer(sent, received);
212        }
213    }
214
215    /// Get statistics for a specific peer
216    pub fn get_stats(&self, peer_id: &PeerId) -> Option<&PeerStats> {
217        self.peers.get(peer_id)
218    }
219
220    /// Get all healthy peers, sorted by health score
221    pub fn get_healthy_peers(&self) -> Vec<PeerId> {
222        let mut peers: Vec<_> = self
223            .peers
224            .values()
225            .filter(|stats| {
226                !stats.is_offline(self.offline_timeout)
227                    && stats.health_score() >= self.min_health_score
228            })
229            .map(|stats| (stats.peer_id, stats.health_score()))
230            .collect();
231
232        // Sort by health score (descending)
233        peers.sort_by(|a, b| b.1.cmp(&a.1));
234
235        peers.into_iter().map(|(peer_id, _)| peer_id).collect()
236    }
237
238    /// Get peers on local network, sorted by health
239    pub fn get_local_peers(&self) -> Vec<PeerId> {
240        let mut peers: Vec<_> = self
241            .peers
242            .values()
243            .filter(|stats| {
244                stats.is_local
245                    && !stats.is_offline(self.offline_timeout)
246                    && stats.health_score() >= self.min_health_score
247            })
248            .map(|stats| (stats.peer_id, stats.health_score()))
249            .collect();
250
251        peers.sort_by(|a, b| b.1.cmp(&a.1));
252        peers.into_iter().map(|(peer_id, _)| peer_id).collect()
253    }
254
255    /// Get best peer for data transfer (prioritizes local + healthy)
256    pub fn get_best_peer(&self) -> Option<PeerId> {
257        // First try local peers
258        if let Some(peer) = self.get_local_peers().first() {
259            return Some(*peer);
260        }
261
262        // Fall back to any healthy peer
263        self.get_healthy_peers().first().copied()
264    }
265
266    /// Get all peers with their statistics
267    pub fn get_all_stats(&self) -> Vec<&PeerStats> {
268        self.peers.values().collect()
269    }
270
271    /// Remove offline peers
272    pub fn cleanup_offline_peers(&mut self) {
273        let offline_peers: Vec<_> = self
274            .peers
275            .iter()
276            .filter(|(_, stats)| stats.is_offline(self.offline_timeout))
277            .map(|(peer_id, _)| *peer_id)
278            .collect();
279
280        for peer_id in offline_peers {
281            warn!("Removing offline peer: {:?}", peer_id);
282            self.peers.remove(&peer_id);
283        }
284    }
285
286    /// Get network health summary
287    pub fn get_network_summary(&self) -> NetworkHealthSummary {
288        let total_peers = self.peers.len();
289        let healthy = self
290            .peers
291            .values()
292            .filter(|s| s.health == PeerHealth::Healthy)
293            .count();
294        let degraded = self
295            .peers
296            .values()
297            .filter(|s| s.health == PeerHealth::Degraded)
298            .count();
299        let unhealthy = self
300            .peers
301            .values()
302            .filter(|s| s.health == PeerHealth::Unhealthy)
303            .count();
304        let local_peers = self.peers.values().filter(|s| s.is_local).count();
305
306        NetworkHealthSummary {
307            total_peers,
308            healthy_peers: healthy,
309            degraded_peers: degraded,
310            unhealthy_peers: unhealthy,
311            local_peers,
312            remote_peers: total_peers - local_peers,
313        }
314    }
315}
316
317impl Default for HealthMonitor {
318    fn default() -> Self {
319        Self::new()
320    }
321}
322
323/// Summary of network health
324#[derive(Debug, Clone)]
325pub struct NetworkHealthSummary {
326    pub total_peers: usize,
327    pub healthy_peers: usize,
328    pub degraded_peers: usize,
329    pub unhealthy_peers: usize,
330    pub local_peers: usize,
331    pub remote_peers: usize,
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337
338    #[test]
339    fn test_peer_stats_success() {
340        let peer_id = PeerId::random();
341        let mut stats = PeerStats::new(peer_id, true);
342
343        // Record several successful pings
344        stats.record_success(Duration::from_millis(50));
345        assert_eq!(stats.success_count, 1);
346        assert_eq!(stats.last_rtt_ms, Some(50));
347        
348        stats.record_success(Duration::from_millis(60));
349        assert_eq!(stats.success_count, 2);
350        
351        // Should be healthy with low RTT
352        assert_eq!(stats.health, PeerHealth::Healthy);
353    }
354
355    #[test]
356    fn test_peer_stats_failure() {
357        let peer_id = PeerId::random();
358        let mut stats = PeerStats::new(peer_id, false);
359
360        // Record mostly failures
361        stats.record_failure();
362        stats.record_failure();
363        stats.record_success(Duration::from_millis(100));
364        
365        // Should be unhealthy due to high failure rate
366        assert!(matches!(
367            stats.health,
368            PeerHealth::Unhealthy | PeerHealth::Degraded
369        ));
370    }
371
372    #[test]
373    fn test_health_monitor() {
374        let mut monitor = HealthMonitor::new();
375        let peer1 = PeerId::random();
376        let peer2 = PeerId::random();
377
378        monitor.add_peer(peer1, true);
379        monitor.add_peer(peer2, false);
380
381        // Make peer1 healthy
382        monitor.record_ping_success(&peer1, Duration::from_millis(30));
383        monitor.record_ping_success(&peer1, Duration::from_millis(40));
384
385        // Make peer2 degraded
386        monitor.record_ping_success(&peer2, Duration::from_millis(200));
387        monitor.record_ping_failure(&peer2);
388
389        let healthy = monitor.get_healthy_peers();
390        assert!(healthy.contains(&peer1));
391        
392        let local = monitor.get_local_peers();
393        assert_eq!(local.len(), 1);
394        assert_eq!(local[0], peer1);
395    }
396}