ipfrs_network/
metrics.rs

1//! Network metrics collection and reporting
2//!
3//! This module provides comprehensive network metrics tracking:
4//! - Connection metrics (established, failed, duration)
5//! - Bandwidth metrics (bytes sent/received)
6//! - DHT metrics (queries, providers)
7//! - Protocol metrics (messages sent/received by type)
8//! - Prometheus export support
9
10use parking_lot::RwLock;
11use prometheus::{Encoder, Opts, Registry, TextEncoder};
12use serde::Serialize;
13use std::collections::HashMap;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18/// Network metrics collector
19pub struct NetworkMetrics {
20    /// Connection metrics
21    connections: ConnectionMetrics,
22    /// Bandwidth metrics
23    bandwidth: BandwidthMetrics,
24    /// DHT metrics
25    dht: DhtMetrics,
26    /// Protocol metrics
27    protocols: ProtocolMetrics,
28    /// Start time for uptime calculation
29    start_time: Instant,
30}
31
32impl NetworkMetrics {
33    /// Create a new metrics collector
34    pub fn new() -> Self {
35        Self {
36            connections: ConnectionMetrics::new(),
37            bandwidth: BandwidthMetrics::new(),
38            dht: DhtMetrics::new(),
39            protocols: ProtocolMetrics::new(),
40            start_time: Instant::now(),
41        }
42    }
43
44    /// Get connection metrics
45    pub fn connections(&self) -> &ConnectionMetrics {
46        &self.connections
47    }
48
49    /// Get bandwidth metrics
50    pub fn bandwidth(&self) -> &BandwidthMetrics {
51        &self.bandwidth
52    }
53
54    /// Get DHT metrics
55    pub fn dht(&self) -> &DhtMetrics {
56        &self.dht
57    }
58
59    /// Get protocol metrics
60    pub fn protocols(&self) -> &ProtocolMetrics {
61        &self.protocols
62    }
63
64    /// Get uptime duration
65    pub fn uptime(&self) -> Duration {
66        self.start_time.elapsed()
67    }
68
69    /// Get a complete metrics snapshot
70    pub fn snapshot(&self) -> MetricsSnapshot {
71        MetricsSnapshot {
72            uptime_secs: self.start_time.elapsed().as_secs(),
73            connections: self.connections.snapshot(),
74            bandwidth: self.bandwidth.snapshot(),
75            dht: self.dht.snapshot(),
76        }
77    }
78
79    /// Create and populate a Prometheus registry with all metrics
80    pub fn create_prometheus_registry(&self) -> Result<Registry, prometheus::Error> {
81        let registry = Registry::new();
82
83        // Connection metrics
84        let connections_established = prometheus::IntCounterVec::new(
85            Opts::new(
86                "ipfrs_connections_established_total",
87                "Total number of connections established",
88            ),
89            &["direction"],
90        )?;
91        connections_established
92            .with_label_values(&["inbound"])
93            .inc_by(self.connections.total_inbound());
94        connections_established
95            .with_label_values(&["outbound"])
96            .inc_by(self.connections.total_outbound());
97        registry.register(Box::new(connections_established))?;
98
99        let connections_failed = prometheus::IntCounter::new(
100            "ipfrs_connections_failed_total",
101            "Total number of failed connection attempts",
102        )?;
103        connections_failed.inc_by(self.connections.total_failed());
104        registry.register(Box::new(connections_failed))?;
105
106        let connections_active = prometheus::IntGauge::new(
107            "ipfrs_connections_active",
108            "Number of currently active connections",
109        )?;
110        connections_active.set(self.connections.active() as i64);
111        registry.register(Box::new(connections_active))?;
112
113        // Bandwidth metrics
114        let bytes_sent = prometheus::IntCounter::new(
115            "ipfrs_bytes_sent_total",
116            "Total bytes sent over the network",
117        )?;
118        bytes_sent.inc_by(self.bandwidth.total_sent());
119        registry.register(Box::new(bytes_sent))?;
120
121        let bytes_received = prometheus::IntCounter::new(
122            "ipfrs_bytes_received_total",
123            "Total bytes received from the network",
124        )?;
125        bytes_received.inc_by(self.bandwidth.total_received());
126        registry.register(Box::new(bytes_received))?;
127
128        // DHT metrics
129        let dht_queries = prometheus::IntCounterVec::new(
130            Opts::new("ipfrs_dht_queries_total", "Total DHT queries by status"),
131            &["status"],
132        )?;
133        let dht_snapshot = self.dht.snapshot();
134        dht_queries
135            .with_label_values(&["success"])
136            .inc_by(dht_snapshot.queries_successful);
137        dht_queries
138            .with_label_values(&["failed"])
139            .inc_by(dht_snapshot.queries_failed);
140        registry.register(Box::new(dht_queries))?;
141
142        let providers_published = prometheus::IntCounter::new(
143            "ipfrs_dht_providers_published_total",
144            "Total provider records published to DHT",
145        )?;
146        providers_published.inc_by(dht_snapshot.providers_published);
147        registry.register(Box::new(providers_published))?;
148
149        let providers_found = prometheus::IntCounter::new(
150            "ipfrs_dht_providers_found_total",
151            "Total providers found via DHT queries",
152        )?;
153        providers_found.inc_by(dht_snapshot.providers_found);
154        registry.register(Box::new(providers_found))?;
155
156        let routing_table_size = prometheus::IntGauge::new(
157            "ipfrs_dht_routing_table_size",
158            "Current DHT routing table size",
159        )?;
160        routing_table_size.set(dht_snapshot.routing_table_size as i64);
161        registry.register(Box::new(routing_table_size))?;
162
163        // Uptime
164        let uptime = prometheus::IntGauge::new("ipfrs_uptime_seconds", "Node uptime in seconds")?;
165        uptime.set(self.uptime().as_secs() as i64);
166        registry.register(Box::new(uptime))?;
167
168        Ok(registry)
169    }
170
171    /// Export metrics in Prometheus text format
172    pub fn export_prometheus(&self) -> Result<String, prometheus::Error> {
173        let registry = self.create_prometheus_registry()?;
174        let encoder = TextEncoder::new();
175        let metric_families = registry.gather();
176
177        let mut buffer = Vec::new();
178        encoder
179            .encode(&metric_families, &mut buffer)
180            .map_err(|e| prometheus::Error::Msg(e.to_string()))?;
181
182        String::from_utf8(buffer).map_err(|e| prometheus::Error::Msg(e.to_string()))
183    }
184}
185
186impl Default for NetworkMetrics {
187    fn default() -> Self {
188        Self::new()
189    }
190}
191
192/// Connection metrics
193pub struct ConnectionMetrics {
194    /// Total connections established
195    connections_established: AtomicU64,
196    /// Total connections failed
197    connections_failed: AtomicU64,
198    /// Currently active connections
199    active_connections: AtomicU64,
200    /// Total inbound connections
201    inbound_connections: AtomicU64,
202    /// Total outbound connections
203    outbound_connections: AtomicU64,
204    /// Connection durations for averaging
205    connection_durations: RwLock<Vec<Duration>>,
206}
207
208impl ConnectionMetrics {
209    fn new() -> Self {
210        Self {
211            connections_established: AtomicU64::new(0),
212            connections_failed: AtomicU64::new(0),
213            active_connections: AtomicU64::new(0),
214            inbound_connections: AtomicU64::new(0),
215            outbound_connections: AtomicU64::new(0),
216            connection_durations: RwLock::new(Vec::new()),
217        }
218    }
219
220    /// Record a connection established
221    pub fn connection_established(&self, inbound: bool) {
222        self.connections_established.fetch_add(1, Ordering::Relaxed);
223        self.active_connections.fetch_add(1, Ordering::Relaxed);
224        if inbound {
225            self.inbound_connections.fetch_add(1, Ordering::Relaxed);
226        } else {
227            self.outbound_connections.fetch_add(1, Ordering::Relaxed);
228        }
229    }
230
231    /// Record a connection closed
232    pub fn connection_closed(&self, duration: Duration) {
233        self.active_connections.fetch_sub(1, Ordering::Relaxed);
234
235        let mut durations = self.connection_durations.write();
236        // Keep last 1000 durations for averaging
237        if durations.len() >= 1000 {
238            durations.remove(0);
239        }
240        durations.push(duration);
241    }
242
243    /// Record a connection failure
244    pub fn connection_failed(&self) {
245        self.connections_failed.fetch_add(1, Ordering::Relaxed);
246    }
247
248    /// Get total connections established
249    pub fn total_established(&self) -> u64 {
250        self.connections_established.load(Ordering::Relaxed)
251    }
252
253    /// Get total connections failed
254    pub fn total_failed(&self) -> u64 {
255        self.connections_failed.load(Ordering::Relaxed)
256    }
257
258    /// Get active connection count
259    pub fn active(&self) -> u64 {
260        self.active_connections.load(Ordering::Relaxed)
261    }
262
263    /// Get inbound connection count
264    pub fn total_inbound(&self) -> u64 {
265        self.inbound_connections.load(Ordering::Relaxed)
266    }
267
268    /// Get outbound connection count
269    pub fn total_outbound(&self) -> u64 {
270        self.outbound_connections.load(Ordering::Relaxed)
271    }
272
273    /// Get average connection duration
274    pub fn avg_duration(&self) -> Option<Duration> {
275        let durations = self.connection_durations.read();
276        if durations.is_empty() {
277            None
278        } else {
279            let total: Duration = durations.iter().sum();
280            Some(total / durations.len() as u32)
281        }
282    }
283
284    /// Get snapshot
285    pub fn snapshot(&self) -> ConnectionMetricsSnapshot {
286        ConnectionMetricsSnapshot {
287            total_established: self.total_established(),
288            total_failed: self.total_failed(),
289            active: self.active(),
290            total_inbound: self.total_inbound(),
291            total_outbound: self.total_outbound(),
292            avg_duration_ms: self.avg_duration().map(|d| d.as_millis() as u64),
293        }
294    }
295}
296
297/// Bandwidth metrics
298pub struct BandwidthMetrics {
299    /// Total bytes sent
300    bytes_sent: AtomicU64,
301    /// Total bytes received
302    bytes_received: AtomicU64,
303    /// Per-protocol bandwidth
304    protocol_bandwidth: RwLock<HashMap<String, (u64, u64)>>,
305}
306
307impl BandwidthMetrics {
308    fn new() -> Self {
309        Self {
310            bytes_sent: AtomicU64::new(0),
311            bytes_received: AtomicU64::new(0),
312            protocol_bandwidth: RwLock::new(HashMap::new()),
313        }
314    }
315
316    /// Record bytes sent
317    pub fn record_sent(&self, bytes: u64) {
318        self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
319    }
320
321    /// Record bytes received
322    pub fn record_received(&self, bytes: u64) {
323        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
324    }
325
326    /// Record protocol-specific bandwidth
327    pub fn record_protocol_traffic(&self, protocol: &str, sent: u64, received: u64) {
328        let mut bandwidth = self.protocol_bandwidth.write();
329        let entry = bandwidth.entry(protocol.to_string()).or_insert((0, 0));
330        entry.0 += sent;
331        entry.1 += received;
332    }
333
334    /// Get total bytes sent
335    pub fn total_sent(&self) -> u64 {
336        self.bytes_sent.load(Ordering::Relaxed)
337    }
338
339    /// Get total bytes received
340    pub fn total_received(&self) -> u64 {
341        self.bytes_received.load(Ordering::Relaxed)
342    }
343
344    /// Get snapshot
345    pub fn snapshot(&self) -> BandwidthMetricsSnapshot {
346        BandwidthMetricsSnapshot {
347            total_sent: self.total_sent(),
348            total_received: self.total_received(),
349        }
350    }
351}
352
353/// DHT metrics
354pub struct DhtMetrics {
355    /// Total DHT queries made
356    queries_made: AtomicU64,
357    /// Successful DHT queries
358    queries_successful: AtomicU64,
359    /// Failed DHT queries
360    queries_failed: AtomicU64,
361    /// Provider records published
362    providers_published: AtomicU64,
363    /// Provider queries made
364    provider_queries: AtomicU64,
365    /// Providers found
366    providers_found: AtomicU64,
367    /// Routing table size
368    routing_table_size: AtomicU64,
369}
370
371impl DhtMetrics {
372    fn new() -> Self {
373        Self {
374            queries_made: AtomicU64::new(0),
375            queries_successful: AtomicU64::new(0),
376            queries_failed: AtomicU64::new(0),
377            providers_published: AtomicU64::new(0),
378            provider_queries: AtomicU64::new(0),
379            providers_found: AtomicU64::new(0),
380            routing_table_size: AtomicU64::new(0),
381        }
382    }
383
384    /// Record a DHT query
385    pub fn query_made(&self) {
386        self.queries_made.fetch_add(1, Ordering::Relaxed);
387    }
388
389    /// Record a successful query
390    pub fn query_successful(&self) {
391        self.queries_successful.fetch_add(1, Ordering::Relaxed);
392    }
393
394    /// Record a failed query
395    pub fn query_failed(&self) {
396        self.queries_failed.fetch_add(1, Ordering::Relaxed);
397    }
398
399    /// Record provider published
400    pub fn provider_published(&self) {
401        self.providers_published.fetch_add(1, Ordering::Relaxed);
402    }
403
404    /// Record provider query
405    pub fn provider_query(&self) {
406        self.provider_queries.fetch_add(1, Ordering::Relaxed);
407    }
408
409    /// Record providers found
410    pub fn providers_found(&self, count: u64) {
411        self.providers_found.fetch_add(count, Ordering::Relaxed);
412    }
413
414    /// Update routing table size
415    pub fn set_routing_table_size(&self, size: u64) {
416        self.routing_table_size.store(size, Ordering::Relaxed);
417    }
418
419    /// Get snapshot
420    pub fn snapshot(&self) -> DhtMetricsSnapshot {
421        DhtMetricsSnapshot {
422            queries_made: self.queries_made.load(Ordering::Relaxed),
423            queries_successful: self.queries_successful.load(Ordering::Relaxed),
424            queries_failed: self.queries_failed.load(Ordering::Relaxed),
425            providers_published: self.providers_published.load(Ordering::Relaxed),
426            provider_queries: self.provider_queries.load(Ordering::Relaxed),
427            providers_found: self.providers_found.load(Ordering::Relaxed),
428            routing_table_size: self.routing_table_size.load(Ordering::Relaxed),
429        }
430    }
431}
432
433/// Protocol metrics
434pub struct ProtocolMetrics {
435    /// Messages per protocol
436    messages: RwLock<HashMap<String, ProtocolStats>>,
437}
438
439#[derive(Default, Clone)]
440struct ProtocolStats {
441    messages_sent: u64,
442    messages_received: u64,
443    bytes_sent: u64,
444    bytes_received: u64,
445}
446
447impl ProtocolMetrics {
448    fn new() -> Self {
449        Self {
450            messages: RwLock::new(HashMap::new()),
451        }
452    }
453
454    /// Record message sent
455    pub fn message_sent(&self, protocol: &str, bytes: u64) {
456        let mut messages = self.messages.write();
457        let stats = messages.entry(protocol.to_string()).or_default();
458        stats.messages_sent += 1;
459        stats.bytes_sent += bytes;
460    }
461
462    /// Record message received
463    pub fn message_received(&self, protocol: &str, bytes: u64) {
464        let mut messages = self.messages.write();
465        let stats = messages.entry(protocol.to_string()).or_default();
466        stats.messages_received += 1;
467        stats.bytes_received += bytes;
468    }
469
470    /// Get protocol stats
471    pub fn get_stats(&self, protocol: &str) -> Option<(u64, u64, u64, u64)> {
472        let messages = self.messages.read();
473        messages.get(protocol).map(|s| {
474            (
475                s.messages_sent,
476                s.messages_received,
477                s.bytes_sent,
478                s.bytes_received,
479            )
480        })
481    }
482}
483
484/// Complete metrics snapshot for serialization
485#[derive(Debug, Clone, Serialize)]
486pub struct MetricsSnapshot {
487    /// Uptime in seconds
488    pub uptime_secs: u64,
489    /// Connection metrics
490    pub connections: ConnectionMetricsSnapshot,
491    /// Bandwidth metrics
492    pub bandwidth: BandwidthMetricsSnapshot,
493    /// DHT metrics
494    pub dht: DhtMetricsSnapshot,
495}
496
497/// Connection metrics snapshot
498#[derive(Debug, Clone, Serialize)]
499pub struct ConnectionMetricsSnapshot {
500    /// Total connections established
501    pub total_established: u64,
502    /// Total connections failed
503    pub total_failed: u64,
504    /// Currently active connections
505    pub active: u64,
506    /// Total inbound connections
507    pub total_inbound: u64,
508    /// Total outbound connections
509    pub total_outbound: u64,
510    /// Average connection duration in milliseconds
511    pub avg_duration_ms: Option<u64>,
512}
513
514/// Bandwidth metrics snapshot
515#[derive(Debug, Clone, Serialize)]
516pub struct BandwidthMetricsSnapshot {
517    /// Total bytes sent
518    pub total_sent: u64,
519    /// Total bytes received
520    pub total_received: u64,
521}
522
523/// DHT metrics snapshot
524#[derive(Debug, Clone, Serialize)]
525pub struct DhtMetricsSnapshot {
526    /// Total queries made
527    pub queries_made: u64,
528    /// Successful queries
529    pub queries_successful: u64,
530    /// Failed queries
531    pub queries_failed: u64,
532    /// Providers published
533    pub providers_published: u64,
534    /// Provider queries made
535    pub provider_queries: u64,
536    /// Total providers found
537    pub providers_found: u64,
538    /// Current routing table size
539    pub routing_table_size: u64,
540}
541
542/// Thread-safe metrics handle
543pub type SharedMetrics = Arc<NetworkMetrics>;
544
545/// Create a new shared metrics instance
546pub fn new_shared_metrics() -> SharedMetrics {
547    Arc::new(NetworkMetrics::new())
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553
554    #[test]
555    fn test_connection_metrics() {
556        let metrics = NetworkMetrics::new();
557
558        metrics.connections.connection_established(true);
559        metrics.connections.connection_established(false);
560        assert_eq!(metrics.connections.active(), 2);
561        assert_eq!(metrics.connections.total_established(), 2);
562        assert_eq!(metrics.connections.total_inbound(), 1);
563        assert_eq!(metrics.connections.total_outbound(), 1);
564
565        metrics
566            .connections
567            .connection_closed(Duration::from_secs(10));
568        assert_eq!(metrics.connections.active(), 1);
569
570        metrics.connections.connection_failed();
571        assert_eq!(metrics.connections.total_failed(), 1);
572    }
573
574    #[test]
575    fn test_bandwidth_metrics() {
576        let metrics = NetworkMetrics::new();
577
578        metrics.bandwidth.record_sent(1000);
579        metrics.bandwidth.record_received(2000);
580
581        assert_eq!(metrics.bandwidth.total_sent(), 1000);
582        assert_eq!(metrics.bandwidth.total_received(), 2000);
583    }
584
585    #[test]
586    fn test_dht_metrics() {
587        let metrics = NetworkMetrics::new();
588
589        metrics.dht.query_made();
590        metrics.dht.query_successful();
591        metrics.dht.query_made();
592        metrics.dht.query_failed();
593        metrics.dht.providers_found(5);
594
595        let snapshot = metrics.dht.snapshot();
596        assert_eq!(snapshot.queries_made, 2);
597        assert_eq!(snapshot.queries_successful, 1);
598        assert_eq!(snapshot.queries_failed, 1);
599        assert_eq!(snapshot.providers_found, 5);
600    }
601
602    #[test]
603    fn test_protocol_metrics() {
604        let metrics = NetworkMetrics::new();
605
606        metrics.protocols.message_sent("/ipfs/kad/1.0.0", 100);
607        metrics.protocols.message_received("/ipfs/kad/1.0.0", 200);
608
609        let stats = metrics.protocols.get_stats("/ipfs/kad/1.0.0");
610        assert!(stats.is_some());
611        let (sent, received, bytes_sent, bytes_received) = stats.unwrap();
612        assert_eq!(sent, 1);
613        assert_eq!(received, 1);
614        assert_eq!(bytes_sent, 100);
615        assert_eq!(bytes_received, 200);
616    }
617
618    #[test]
619    fn test_metrics_snapshot() {
620        let metrics = NetworkMetrics::new();
621
622        metrics.connections.connection_established(true);
623        metrics.bandwidth.record_sent(100);
624        metrics.dht.query_made();
625
626        let snapshot = metrics.snapshot();
627        assert_eq!(snapshot.connections.active, 1);
628        assert_eq!(snapshot.bandwidth.total_sent, 100);
629        assert_eq!(snapshot.dht.queries_made, 1);
630    }
631}