1use parking_lot::RwLock;
11use prometheus::{Encoder, Opts, Registry, TextEncoder};
12use serde::Serialize;
13use std::collections::HashMap;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18pub struct NetworkMetrics {
20 connections: ConnectionMetrics,
22 bandwidth: BandwidthMetrics,
24 dht: DhtMetrics,
26 protocols: ProtocolMetrics,
28 start_time: Instant,
30}
31
32impl NetworkMetrics {
33 pub fn new() -> Self {
35 Self {
36 connections: ConnectionMetrics::new(),
37 bandwidth: BandwidthMetrics::new(),
38 dht: DhtMetrics::new(),
39 protocols: ProtocolMetrics::new(),
40 start_time: Instant::now(),
41 }
42 }
43
44 pub fn connections(&self) -> &ConnectionMetrics {
46 &self.connections
47 }
48
49 pub fn bandwidth(&self) -> &BandwidthMetrics {
51 &self.bandwidth
52 }
53
54 pub fn dht(&self) -> &DhtMetrics {
56 &self.dht
57 }
58
59 pub fn protocols(&self) -> &ProtocolMetrics {
61 &self.protocols
62 }
63
64 pub fn uptime(&self) -> Duration {
66 self.start_time.elapsed()
67 }
68
69 pub fn snapshot(&self) -> MetricsSnapshot {
71 MetricsSnapshot {
72 uptime_secs: self.start_time.elapsed().as_secs(),
73 connections: self.connections.snapshot(),
74 bandwidth: self.bandwidth.snapshot(),
75 dht: self.dht.snapshot(),
76 }
77 }
78
79 pub fn create_prometheus_registry(&self) -> Result<Registry, prometheus::Error> {
81 let registry = Registry::new();
82
83 let connections_established = prometheus::IntCounterVec::new(
85 Opts::new(
86 "ipfrs_connections_established_total",
87 "Total number of connections established",
88 ),
89 &["direction"],
90 )?;
91 connections_established
92 .with_label_values(&["inbound"])
93 .inc_by(self.connections.total_inbound());
94 connections_established
95 .with_label_values(&["outbound"])
96 .inc_by(self.connections.total_outbound());
97 registry.register(Box::new(connections_established))?;
98
99 let connections_failed = prometheus::IntCounter::new(
100 "ipfrs_connections_failed_total",
101 "Total number of failed connection attempts",
102 )?;
103 connections_failed.inc_by(self.connections.total_failed());
104 registry.register(Box::new(connections_failed))?;
105
106 let connections_active = prometheus::IntGauge::new(
107 "ipfrs_connections_active",
108 "Number of currently active connections",
109 )?;
110 connections_active.set(self.connections.active() as i64);
111 registry.register(Box::new(connections_active))?;
112
113 let bytes_sent = prometheus::IntCounter::new(
115 "ipfrs_bytes_sent_total",
116 "Total bytes sent over the network",
117 )?;
118 bytes_sent.inc_by(self.bandwidth.total_sent());
119 registry.register(Box::new(bytes_sent))?;
120
121 let bytes_received = prometheus::IntCounter::new(
122 "ipfrs_bytes_received_total",
123 "Total bytes received from the network",
124 )?;
125 bytes_received.inc_by(self.bandwidth.total_received());
126 registry.register(Box::new(bytes_received))?;
127
128 let dht_queries = prometheus::IntCounterVec::new(
130 Opts::new("ipfrs_dht_queries_total", "Total DHT queries by status"),
131 &["status"],
132 )?;
133 let dht_snapshot = self.dht.snapshot();
134 dht_queries
135 .with_label_values(&["success"])
136 .inc_by(dht_snapshot.queries_successful);
137 dht_queries
138 .with_label_values(&["failed"])
139 .inc_by(dht_snapshot.queries_failed);
140 registry.register(Box::new(dht_queries))?;
141
142 let providers_published = prometheus::IntCounter::new(
143 "ipfrs_dht_providers_published_total",
144 "Total provider records published to DHT",
145 )?;
146 providers_published.inc_by(dht_snapshot.providers_published);
147 registry.register(Box::new(providers_published))?;
148
149 let providers_found = prometheus::IntCounter::new(
150 "ipfrs_dht_providers_found_total",
151 "Total providers found via DHT queries",
152 )?;
153 providers_found.inc_by(dht_snapshot.providers_found);
154 registry.register(Box::new(providers_found))?;
155
156 let routing_table_size = prometheus::IntGauge::new(
157 "ipfrs_dht_routing_table_size",
158 "Current DHT routing table size",
159 )?;
160 routing_table_size.set(dht_snapshot.routing_table_size as i64);
161 registry.register(Box::new(routing_table_size))?;
162
163 let uptime = prometheus::IntGauge::new("ipfrs_uptime_seconds", "Node uptime in seconds")?;
165 uptime.set(self.uptime().as_secs() as i64);
166 registry.register(Box::new(uptime))?;
167
168 Ok(registry)
169 }
170
171 pub fn export_prometheus(&self) -> Result<String, prometheus::Error> {
173 let registry = self.create_prometheus_registry()?;
174 let encoder = TextEncoder::new();
175 let metric_families = registry.gather();
176
177 let mut buffer = Vec::new();
178 encoder
179 .encode(&metric_families, &mut buffer)
180 .map_err(|e| prometheus::Error::Msg(e.to_string()))?;
181
182 String::from_utf8(buffer).map_err(|e| prometheus::Error::Msg(e.to_string()))
183 }
184}
185
186impl Default for NetworkMetrics {
187 fn default() -> Self {
188 Self::new()
189 }
190}
191
192pub struct ConnectionMetrics {
194 connections_established: AtomicU64,
196 connections_failed: AtomicU64,
198 active_connections: AtomicU64,
200 inbound_connections: AtomicU64,
202 outbound_connections: AtomicU64,
204 connection_durations: RwLock<Vec<Duration>>,
206}
207
208impl ConnectionMetrics {
209 fn new() -> Self {
210 Self {
211 connections_established: AtomicU64::new(0),
212 connections_failed: AtomicU64::new(0),
213 active_connections: AtomicU64::new(0),
214 inbound_connections: AtomicU64::new(0),
215 outbound_connections: AtomicU64::new(0),
216 connection_durations: RwLock::new(Vec::new()),
217 }
218 }
219
220 pub fn connection_established(&self, inbound: bool) {
222 self.connections_established.fetch_add(1, Ordering::Relaxed);
223 self.active_connections.fetch_add(1, Ordering::Relaxed);
224 if inbound {
225 self.inbound_connections.fetch_add(1, Ordering::Relaxed);
226 } else {
227 self.outbound_connections.fetch_add(1, Ordering::Relaxed);
228 }
229 }
230
231 pub fn connection_closed(&self, duration: Duration) {
233 self.active_connections.fetch_sub(1, Ordering::Relaxed);
234
235 let mut durations = self.connection_durations.write();
236 if durations.len() >= 1000 {
238 durations.remove(0);
239 }
240 durations.push(duration);
241 }
242
243 pub fn connection_failed(&self) {
245 self.connections_failed.fetch_add(1, Ordering::Relaxed);
246 }
247
248 pub fn total_established(&self) -> u64 {
250 self.connections_established.load(Ordering::Relaxed)
251 }
252
253 pub fn total_failed(&self) -> u64 {
255 self.connections_failed.load(Ordering::Relaxed)
256 }
257
258 pub fn active(&self) -> u64 {
260 self.active_connections.load(Ordering::Relaxed)
261 }
262
263 pub fn total_inbound(&self) -> u64 {
265 self.inbound_connections.load(Ordering::Relaxed)
266 }
267
268 pub fn total_outbound(&self) -> u64 {
270 self.outbound_connections.load(Ordering::Relaxed)
271 }
272
273 pub fn avg_duration(&self) -> Option<Duration> {
275 let durations = self.connection_durations.read();
276 if durations.is_empty() {
277 None
278 } else {
279 let total: Duration = durations.iter().sum();
280 Some(total / durations.len() as u32)
281 }
282 }
283
284 pub fn snapshot(&self) -> ConnectionMetricsSnapshot {
286 ConnectionMetricsSnapshot {
287 total_established: self.total_established(),
288 total_failed: self.total_failed(),
289 active: self.active(),
290 total_inbound: self.total_inbound(),
291 total_outbound: self.total_outbound(),
292 avg_duration_ms: self.avg_duration().map(|d| d.as_millis() as u64),
293 }
294 }
295}
296
297pub struct BandwidthMetrics {
299 bytes_sent: AtomicU64,
301 bytes_received: AtomicU64,
303 protocol_bandwidth: RwLock<HashMap<String, (u64, u64)>>,
305}
306
307impl BandwidthMetrics {
308 fn new() -> Self {
309 Self {
310 bytes_sent: AtomicU64::new(0),
311 bytes_received: AtomicU64::new(0),
312 protocol_bandwidth: RwLock::new(HashMap::new()),
313 }
314 }
315
316 pub fn record_sent(&self, bytes: u64) {
318 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
319 }
320
321 pub fn record_received(&self, bytes: u64) {
323 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
324 }
325
326 pub fn record_protocol_traffic(&self, protocol: &str, sent: u64, received: u64) {
328 let mut bandwidth = self.protocol_bandwidth.write();
329 let entry = bandwidth.entry(protocol.to_string()).or_insert((0, 0));
330 entry.0 += sent;
331 entry.1 += received;
332 }
333
334 pub fn total_sent(&self) -> u64 {
336 self.bytes_sent.load(Ordering::Relaxed)
337 }
338
339 pub fn total_received(&self) -> u64 {
341 self.bytes_received.load(Ordering::Relaxed)
342 }
343
344 pub fn snapshot(&self) -> BandwidthMetricsSnapshot {
346 BandwidthMetricsSnapshot {
347 total_sent: self.total_sent(),
348 total_received: self.total_received(),
349 }
350 }
351}
352
353pub struct DhtMetrics {
355 queries_made: AtomicU64,
357 queries_successful: AtomicU64,
359 queries_failed: AtomicU64,
361 providers_published: AtomicU64,
363 provider_queries: AtomicU64,
365 providers_found: AtomicU64,
367 routing_table_size: AtomicU64,
369}
370
371impl DhtMetrics {
372 fn new() -> Self {
373 Self {
374 queries_made: AtomicU64::new(0),
375 queries_successful: AtomicU64::new(0),
376 queries_failed: AtomicU64::new(0),
377 providers_published: AtomicU64::new(0),
378 provider_queries: AtomicU64::new(0),
379 providers_found: AtomicU64::new(0),
380 routing_table_size: AtomicU64::new(0),
381 }
382 }
383
384 pub fn query_made(&self) {
386 self.queries_made.fetch_add(1, Ordering::Relaxed);
387 }
388
389 pub fn query_successful(&self) {
391 self.queries_successful.fetch_add(1, Ordering::Relaxed);
392 }
393
394 pub fn query_failed(&self) {
396 self.queries_failed.fetch_add(1, Ordering::Relaxed);
397 }
398
399 pub fn provider_published(&self) {
401 self.providers_published.fetch_add(1, Ordering::Relaxed);
402 }
403
404 pub fn provider_query(&self) {
406 self.provider_queries.fetch_add(1, Ordering::Relaxed);
407 }
408
409 pub fn providers_found(&self, count: u64) {
411 self.providers_found.fetch_add(count, Ordering::Relaxed);
412 }
413
414 pub fn set_routing_table_size(&self, size: u64) {
416 self.routing_table_size.store(size, Ordering::Relaxed);
417 }
418
419 pub fn snapshot(&self) -> DhtMetricsSnapshot {
421 DhtMetricsSnapshot {
422 queries_made: self.queries_made.load(Ordering::Relaxed),
423 queries_successful: self.queries_successful.load(Ordering::Relaxed),
424 queries_failed: self.queries_failed.load(Ordering::Relaxed),
425 providers_published: self.providers_published.load(Ordering::Relaxed),
426 provider_queries: self.provider_queries.load(Ordering::Relaxed),
427 providers_found: self.providers_found.load(Ordering::Relaxed),
428 routing_table_size: self.routing_table_size.load(Ordering::Relaxed),
429 }
430 }
431}
432
433pub struct ProtocolMetrics {
435 messages: RwLock<HashMap<String, ProtocolStats>>,
437}
438
439#[derive(Default, Clone)]
440struct ProtocolStats {
441 messages_sent: u64,
442 messages_received: u64,
443 bytes_sent: u64,
444 bytes_received: u64,
445}
446
447impl ProtocolMetrics {
448 fn new() -> Self {
449 Self {
450 messages: RwLock::new(HashMap::new()),
451 }
452 }
453
454 pub fn message_sent(&self, protocol: &str, bytes: u64) {
456 let mut messages = self.messages.write();
457 let stats = messages.entry(protocol.to_string()).or_default();
458 stats.messages_sent += 1;
459 stats.bytes_sent += bytes;
460 }
461
462 pub fn message_received(&self, protocol: &str, bytes: u64) {
464 let mut messages = self.messages.write();
465 let stats = messages.entry(protocol.to_string()).or_default();
466 stats.messages_received += 1;
467 stats.bytes_received += bytes;
468 }
469
470 pub fn get_stats(&self, protocol: &str) -> Option<(u64, u64, u64, u64)> {
472 let messages = self.messages.read();
473 messages.get(protocol).map(|s| {
474 (
475 s.messages_sent,
476 s.messages_received,
477 s.bytes_sent,
478 s.bytes_received,
479 )
480 })
481 }
482}
483
484#[derive(Debug, Clone, Serialize)]
486pub struct MetricsSnapshot {
487 pub uptime_secs: u64,
489 pub connections: ConnectionMetricsSnapshot,
491 pub bandwidth: BandwidthMetricsSnapshot,
493 pub dht: DhtMetricsSnapshot,
495}
496
497#[derive(Debug, Clone, Serialize)]
499pub struct ConnectionMetricsSnapshot {
500 pub total_established: u64,
502 pub total_failed: u64,
504 pub active: u64,
506 pub total_inbound: u64,
508 pub total_outbound: u64,
510 pub avg_duration_ms: Option<u64>,
512}
513
514#[derive(Debug, Clone, Serialize)]
516pub struct BandwidthMetricsSnapshot {
517 pub total_sent: u64,
519 pub total_received: u64,
521}
522
523#[derive(Debug, Clone, Serialize)]
525pub struct DhtMetricsSnapshot {
526 pub queries_made: u64,
528 pub queries_successful: u64,
530 pub queries_failed: u64,
532 pub providers_published: u64,
534 pub provider_queries: u64,
536 pub providers_found: u64,
538 pub routing_table_size: u64,
540}
541
542pub type SharedMetrics = Arc<NetworkMetrics>;
544
545pub fn new_shared_metrics() -> SharedMetrics {
547 Arc::new(NetworkMetrics::new())
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553
554 #[test]
555 fn test_connection_metrics() {
556 let metrics = NetworkMetrics::new();
557
558 metrics.connections.connection_established(true);
559 metrics.connections.connection_established(false);
560 assert_eq!(metrics.connections.active(), 2);
561 assert_eq!(metrics.connections.total_established(), 2);
562 assert_eq!(metrics.connections.total_inbound(), 1);
563 assert_eq!(metrics.connections.total_outbound(), 1);
564
565 metrics
566 .connections
567 .connection_closed(Duration::from_secs(10));
568 assert_eq!(metrics.connections.active(), 1);
569
570 metrics.connections.connection_failed();
571 assert_eq!(metrics.connections.total_failed(), 1);
572 }
573
574 #[test]
575 fn test_bandwidth_metrics() {
576 let metrics = NetworkMetrics::new();
577
578 metrics.bandwidth.record_sent(1000);
579 metrics.bandwidth.record_received(2000);
580
581 assert_eq!(metrics.bandwidth.total_sent(), 1000);
582 assert_eq!(metrics.bandwidth.total_received(), 2000);
583 }
584
585 #[test]
586 fn test_dht_metrics() {
587 let metrics = NetworkMetrics::new();
588
589 metrics.dht.query_made();
590 metrics.dht.query_successful();
591 metrics.dht.query_made();
592 metrics.dht.query_failed();
593 metrics.dht.providers_found(5);
594
595 let snapshot = metrics.dht.snapshot();
596 assert_eq!(snapshot.queries_made, 2);
597 assert_eq!(snapshot.queries_successful, 1);
598 assert_eq!(snapshot.queries_failed, 1);
599 assert_eq!(snapshot.providers_found, 5);
600 }
601
602 #[test]
603 fn test_protocol_metrics() {
604 let metrics = NetworkMetrics::new();
605
606 metrics.protocols.message_sent("/ipfs/kad/1.0.0", 100);
607 metrics.protocols.message_received("/ipfs/kad/1.0.0", 200);
608
609 let stats = metrics.protocols.get_stats("/ipfs/kad/1.0.0");
610 assert!(stats.is_some());
611 let (sent, received, bytes_sent, bytes_received) = stats.unwrap();
612 assert_eq!(sent, 1);
613 assert_eq!(received, 1);
614 assert_eq!(bytes_sent, 100);
615 assert_eq!(bytes_received, 200);
616 }
617
618 #[test]
619 fn test_metrics_snapshot() {
620 let metrics = NetworkMetrics::new();
621
622 metrics.connections.connection_established(true);
623 metrics.bandwidth.record_sent(100);
624 metrics.dht.query_made();
625
626 let snapshot = metrics.snapshot();
627 assert_eq!(snapshot.connections.active, 1);
628 assert_eq!(snapshot.bandwidth.total_sent, 100);
629 assert_eq!(snapshot.dht.queries_made, 1);
630 }
631}