node_launchpad/
node_stats.rs1use ant_service_management::{NodeServiceData, ServiceStatus};
10use color_eyre::Result;
11use futures::StreamExt;
12use serde::{Deserialize, Serialize};
13use std::{path::PathBuf, time::Instant};
14use tokio::sync::mpsc::UnboundedSender;
15
16use super::components::status::NODE_STAT_UPDATE_INTERVAL;
17
18use crate::action::{Action, StatusActions};
19
20#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21pub struct IndividualNodeStats {
22 pub service_name: String,
23 pub forwarded_rewards: usize,
24 pub rewards_wallet_balance: usize,
25 pub memory_usage_mb: usize,
26 pub bandwidth_inbound: usize,
27 pub bandwidth_outbound: usize,
28 pub bandwidth_inbound_rate: usize,
29 pub bandwidth_outbound_rate: usize,
30 pub max_records: usize,
31 pub peers: usize,
32 pub connections: usize,
33}
34
35#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
36pub struct NodeStats {
37 pub total_forwarded_rewards: usize,
38 pub total_rewards_wallet_balance: usize,
39 pub total_memory_usage_mb: usize,
40 pub individual_stats: Vec<IndividualNodeStats>,
41}
42
43impl NodeStats {
44 fn merge(&mut self, other: &IndividualNodeStats) {
45 self.total_forwarded_rewards += other.forwarded_rewards;
46 self.total_rewards_wallet_balance += other.rewards_wallet_balance;
47 self.total_memory_usage_mb += other.memory_usage_mb;
48 self.individual_stats.push(other.clone()); }
50
51 pub fn fetch_all_node_stats(nodes: &[NodeServiceData], action_sender: UnboundedSender<Action>) {
70 let node_details = nodes
71 .iter()
72 .filter_map(|node| {
73 if node.status == ServiceStatus::Running {
74 if let Some(metrics_port) = node.metrics_port {
75 Some((
76 node.service_name.clone(),
77 metrics_port,
78 node.data_dir_path.clone(),
79 ))
80 } else {
81 error!(
82 "No metrics port found for {:?}. Skipping stat fetch.",
83 node.service_name
84 );
85 None
86 }
87 } else {
88 None
89 }
90 })
91 .collect::<Vec<_>>();
92 if !node_details.is_empty() {
93 debug!("Fetching stats from {} nodes", node_details.len());
94 tokio::spawn(async move {
95 Self::fetch_all_node_stats_inner(node_details, action_sender).await;
96 });
97 } else {
98 debug!("No running nodes to fetch stats from.");
99 }
100 }
101
102 async fn fetch_all_node_stats_inner(
121 node_details: Vec<(String, u16, PathBuf)>,
122 action_sender: UnboundedSender<Action>,
123 ) {
124 let mut stream = futures::stream::iter(node_details)
125 .map(|(service_name, metrics_port, data_dir)| async move {
126 (
127 Self::fetch_stat_per_node(metrics_port, data_dir).await,
128 service_name,
129 )
130 })
131 .buffer_unordered(5);
132
133 let mut all_node_stats = NodeStats::default();
134
135 while let Some((result, service_name)) = stream.next().await {
136 match result {
137 Ok(stats) => {
138 let individual_stats = IndividualNodeStats {
139 service_name: service_name.clone(),
140 forwarded_rewards: stats.forwarded_rewards,
141 rewards_wallet_balance: stats.rewards_wallet_balance,
142 memory_usage_mb: stats.memory_usage_mb,
143 bandwidth_inbound: stats.bandwidth_inbound,
144 bandwidth_outbound: stats.bandwidth_outbound,
145 max_records: stats.max_records,
146 peers: stats.peers,
147 connections: stats.connections,
148 bandwidth_inbound_rate: stats.bandwidth_inbound_rate,
149 bandwidth_outbound_rate: stats.bandwidth_outbound_rate,
150 };
151 all_node_stats.merge(&individual_stats);
152 }
153 Err(err) => {
154 error!("Error while fetching stats from {service_name:?}: {err:?}");
155 }
156 }
157 }
158
159 if let Err(err) = action_sender.send(Action::StatusActions(
160 StatusActions::NodesStatsObtained(all_node_stats),
161 )) {
162 error!("Error while sending action: {err:?}");
163 }
164 }
165
166 async fn fetch_stat_per_node(
167 metrics_port: u16,
168 _data_dir: PathBuf,
169 ) -> Result<IndividualNodeStats> {
170 let now = Instant::now();
171
172 let body = reqwest::get(&format!("http://localhost:{metrics_port}/metrics"))
173 .await?
174 .text()
175 .await?;
176 let lines: Vec<_> = body.lines().map(|s| Ok(s.to_owned())).collect();
177 let all_metrics = prometheus_parse::Scrape::parse(lines.into_iter())?;
178
179 let mut stats = IndividualNodeStats::default();
180
181 for sample in all_metrics.samples.iter() {
182 if sample.metric == "ant_node_total_forwarded_rewards" {
183 match sample.value {
185 prometheus_parse::Value::Counter(val)
186 | prometheus_parse::Value::Gauge(val)
187 | prometheus_parse::Value::Untyped(val) => {
188 stats.forwarded_rewards = val as usize;
189 }
190 _ => {}
191 }
192 } else if sample.metric == "ant_node_current_reward_wallet_balance" {
193 match sample.value {
195 prometheus_parse::Value::Counter(val)
196 | prometheus_parse::Value::Gauge(val)
197 | prometheus_parse::Value::Untyped(val) => {
198 stats.rewards_wallet_balance = val as usize;
199 }
200 _ => {}
201 }
202 } else if sample.metric == "ant_networking_process_memory_used_mb" {
203 match sample.value {
205 prometheus_parse::Value::Counter(val)
206 | prometheus_parse::Value::Gauge(val)
207 | prometheus_parse::Value::Untyped(val) => {
208 stats.memory_usage_mb = val as usize;
209 }
210 _ => {}
211 }
212 } else if sample.metric == "libp2p_bandwidth_bytes_total" {
213 match sample.value {
215 prometheus_parse::Value::Counter(val)
216 | prometheus_parse::Value::Gauge(val)
217 | prometheus_parse::Value::Untyped(val) => {
218 if let Some(direction) = sample.labels.get("direction") {
219 if direction == "Inbound" {
220 let current_inbound = val as usize;
221 let rate = (current_inbound as f64
222 - stats.bandwidth_inbound as f64)
223 / NODE_STAT_UPDATE_INTERVAL.as_secs_f64();
224 stats.bandwidth_inbound_rate = rate as usize;
225 stats.bandwidth_inbound = current_inbound;
226 } else if direction == "Outbound" {
227 let current_outbound = val as usize;
228 let rate = (current_outbound as f64
229 - stats.bandwidth_outbound as f64)
230 / NODE_STAT_UPDATE_INTERVAL.as_secs_f64();
231 stats.bandwidth_outbound_rate = rate as usize;
232 stats.bandwidth_outbound = current_outbound;
233 }
234 }
235 }
236 _ => {}
237 }
238 } else if sample.metric == "ant_networking_records_stored" {
239 match sample.value {
241 prometheus_parse::Value::Counter(val)
242 | prometheus_parse::Value::Gauge(val)
243 | prometheus_parse::Value::Untyped(val) => {
244 stats.max_records = val as usize;
245 }
246 _ => {}
247 }
248 } else if sample.metric == "ant_networking_peers_in_routing_table" {
249 match sample.value {
251 prometheus_parse::Value::Counter(val)
252 | prometheus_parse::Value::Gauge(val)
253 | prometheus_parse::Value::Untyped(val) => {
254 stats.peers = val as usize;
255 }
256 _ => {}
257 }
258 } else if sample.metric == "ant_networking_open_connections" {
259 match sample.value {
261 prometheus_parse::Value::Counter(val)
262 | prometheus_parse::Value::Gauge(val)
263 | prometheus_parse::Value::Untyped(val) => {
264 stats.connections = val as usize;
265 }
266 _ => {}
267 }
268 }
269 }
270 trace!(
271 "Fetched stats from metrics_port {metrics_port:?} in {:?}",
272 now.elapsed()
273 );
274 Ok(stats)
275 }
276}