discv5_cli/server/services/
stats.rs1use discv5::{ConnectionDirection, ConnectionState, Discv5, Event};
2use log::info;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7pub fn run(discv5: Arc<Discv5>, break_time: Option<Duration>, stats: u64) {
9 let break_time = break_time.unwrap_or_else(|| Duration::from_secs(10));
10 tokio::spawn(async move {
11 let mut event_stream = discv5.event_stream().await.unwrap();
12 let mut stats_interval = tokio::time::interval(break_time);
13 let mut ipv6_connections = 0;
14 let mut ipv4_connections = 0;
15 loop {
16 tokio::select! {
17 _ = stats_interval.tick() => {
18 print_global_stats(Arc::clone(&discv5), ipv6_connections, ipv4_connections);
19 print_bucket_stats(Arc::clone(&discv5), stats);
20 }
21 Some(event) = event_stream.recv() => {
22 match event {
23 Event::SessionEstablished(_enr,addr) => {
24 if addr.is_ipv6() {
25 ipv6_connections += 1;
26 } else if addr.is_ipv4() {
27 ipv4_connections +=1;
28 }
29 }
30 _ => {}
31 }
32 }
33 }
34 }
35 });
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
40pub struct BucketStatistic {
41 pub bucket: u64,
43 pub connected_peers: u64,
45 pub disconnected_peers: u64,
47 pub incoming_peers: u64,
49 pub outgoing_peers: u64,
51 pub ipv6_peers: u64,
53}
54
55fn print_global_stats(discv5: Arc<Discv5>, ipv6_connections: u64, ipv4_connections: u64) {
57 info!("Peers in routing table: {}", discv5.connected_peers());
58 info!(
59 "Sessions historically established, ipv4: {}, ipv6: {}",
60 ipv4_connections, ipv6_connections
61 );
62}
63
64pub fn print_bucket_stats(discv5: Arc<Discv5>, stats: u64) {
66 let table_entries = discv5.table_entries();
67 let self_id: discv5::Key<_> = discv5.local_enr().node_id().into();
68
69 let mut bucket_values = HashMap::new();
70
71 for (node_id, enr, status) in table_entries {
73 let key: discv5::Key<_> = node_id.into();
74 let bucket_no = key.log2_distance(&self_id);
75 if let Some(bucket_no) = bucket_no {
76 bucket_values
77 .entry(bucket_no)
78 .or_insert_with(Vec::new)
79 .push((enr, status));
80 }
81 }
82
83 let mut bucket_stats = Vec::<BucketStatistic>::new();
85 for (bucket, entries) in bucket_values {
86 let mut connected_peers = 0;
87 let mut connected_incoming_peers = 0;
88 let mut connected_outgoing_peers = 0;
89 let mut disconnected_peers = 0;
90 let mut ipv6_peers = 0;
91
92 for (enr, status) in entries {
93 match (status.state, status.direction) {
94 (ConnectionState::Connected, ConnectionDirection::Incoming) => {
95 connected_peers += 1;
96 connected_incoming_peers += 1;
97 }
98 (ConnectionState::Connected, ConnectionDirection::Outgoing) => {
99 connected_peers += 1;
100 connected_outgoing_peers += 1;
101 }
102 (ConnectionState::Disconnected, _) => {
103 disconnected_peers += 1;
104 }
105 }
106 if matches!(status.state, ConnectionState::Connected) && enr.udp6_socket().is_some() {
107 ipv6_peers += 1;
108 }
109 }
110
111 bucket_stats.push(BucketStatistic {
112 bucket,
113 connected_peers,
114 disconnected_peers,
115 incoming_peers: connected_incoming_peers,
116 outgoing_peers: connected_outgoing_peers,
117 ipv6_peers,
118 });
119 }
120
121 bucket_stats.sort_by_key(|stat| stat.connected_peers);
123
124 for bucket_stat in bucket_stats.iter().take(stats as usize) {
126 let BucketStatistic {
127 bucket,
128 connected_peers,
129 disconnected_peers,
130 incoming_peers: connected_incoming_peers,
131 outgoing_peers: connected_outgoing_peers,
132 ipv6_peers,
133 } = bucket_stat;
134 log::info!(
135 "Bucket {} statistics: Connected peers: {} (Incoming: {}, Outgoing: {}, ipv6: {}), Disconnected Peers: {}",
136 bucket,
137 connected_peers,
138 connected_incoming_peers,
139 connected_outgoing_peers,
140 ipv6_peers,
141 disconnected_peers
142 );
143 }
144}