node_launchpad/
node_stats.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use ant_service_management::{NodeServiceData, ServiceStatus};
10use color_eyre::Result;
11use futures::StreamExt;
12use serde::{Deserialize, Serialize};
13use std::{path::PathBuf, time::Instant};
14use tokio::sync::mpsc::UnboundedSender;
15
16use super::components::status::NODE_STAT_UPDATE_INTERVAL;
17
18use crate::action::{Action, StatusActions};
19
20#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21pub struct IndividualNodeStats {
22    pub service_name: String,
23    pub forwarded_rewards: usize,
24    pub rewards_wallet_balance: usize,
25    pub memory_usage_mb: usize,
26    pub bandwidth_inbound: usize,
27    pub bandwidth_outbound: usize,
28    pub bandwidth_inbound_rate: usize,
29    pub bandwidth_outbound_rate: usize,
30    pub max_records: usize,
31    pub peers: usize,
32    pub connections: usize,
33}
34
35#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
36pub struct NodeStats {
37    pub total_forwarded_rewards: usize,
38    pub total_rewards_wallet_balance: usize,
39    pub total_memory_usage_mb: usize,
40    pub individual_stats: Vec<IndividualNodeStats>,
41}
42
43impl NodeStats {
44    fn merge(&mut self, other: &IndividualNodeStats) {
45        self.total_forwarded_rewards += other.forwarded_rewards;
46        self.total_rewards_wallet_balance += other.rewards_wallet_balance;
47        self.total_memory_usage_mb += other.memory_usage_mb;
48        self.individual_stats.push(other.clone()); // Store individual stats
49    }
50
51    /// Fetches statistics from all running nodes and sends the aggregated stats via the action sender.
52    ///
53    /// This method iterates over the provided list of `NodeServiceData` instances, filters out nodes that are not running,
54    /// and for each running node, it checks if a metrics port is available. If a metrics port is found, the node's details
55    /// (service name, metrics port, and data directory path) are collected. If no metrics port is found, a debug message
56    /// is logged indicating that the node's stats will not be fetched.
57    ///
58    /// If there are any nodes with available metrics ports, this method spawns a local task to asynchronously fetch
59    /// statistics from these nodes using `fetch_all_node_stats_inner`. The aggregated statistics are then sent via the
60    /// provided `action_sender`.
61    ///
62    /// If no running nodes with metrics ports are found, a debug message is logged indicating that there are no running nodes
63    /// to fetch stats from.
64    ///
65    /// # Parameters
66    ///
67    /// * `nodes`: A slice of `NodeServiceData` instances representing the nodes to fetch statistics from.
68    /// * `action_sender`: An unbounded sender of `Action` instances used to send the aggregated node statistics.
69    pub fn fetch_all_node_stats(nodes: &[NodeServiceData], action_sender: UnboundedSender<Action>) {
70        let node_details = nodes
71            .iter()
72            .filter_map(|node| {
73                if node.status == ServiceStatus::Running {
74                    if let Some(metrics_port) = node.metrics_port {
75                        Some((
76                            node.service_name.clone(),
77                            metrics_port,
78                            node.data_dir_path.clone(),
79                        ))
80                    } else {
81                        error!(
82                            "No metrics port found for {:?}. Skipping stat fetch.",
83                            node.service_name
84                        );
85                        None
86                    }
87                } else {
88                    None
89                }
90            })
91            .collect::<Vec<_>>();
92        if !node_details.is_empty() {
93            debug!("Fetching stats from {} nodes", node_details.len());
94            tokio::spawn(async move {
95                Self::fetch_all_node_stats_inner(node_details, action_sender).await;
96            });
97        } else {
98            debug!("No running nodes to fetch stats from.");
99        }
100    }
101
102    /// This method is an inner function used to fetch statistics from all nodes.
103    /// It takes a vector of node details (service name, metrics port, and data directory path) and an unbounded sender of `Action` instances.
104    /// The method iterates over the provided list of `NodeServiceData` instances, filters out nodes that are not running,
105    /// and for each running node, it checks if a metrics port is available. If a metrics port is found, the node's details
106    /// (service name, metrics port, and data directory path) are collected. If no metrics port is found, a debug message
107    /// is logged indicating that the node's stats will not be fetched.
108    ///
109    /// If there are any nodes with available metrics ports, this method spawns a local task to asynchronously fetch
110    /// statistics from these nodes using `fetch_all_node_stats_inner`. The aggregated statistics are then sent via the
111    /// provided `action_sender`.
112    ///
113    /// If no running nodes with metrics ports are found, a debug message is logged indicating that there are no running nodes
114    /// to fetch stats from.
115    ///
116    /// # Parameters
117    ///
118    /// * `node_details`: A vector of tuples, each containing the service name, metrics port, and data directory path of a node.
119    /// * `action_sender`: An unbounded sender of `Action` instances used to send the aggregated node statistics.
120    async fn fetch_all_node_stats_inner(
121        node_details: Vec<(String, u16, PathBuf)>,
122        action_sender: UnboundedSender<Action>,
123    ) {
124        let mut stream = futures::stream::iter(node_details)
125            .map(|(service_name, metrics_port, data_dir)| async move {
126                (
127                    Self::fetch_stat_per_node(metrics_port, data_dir).await,
128                    service_name,
129                )
130            })
131            .buffer_unordered(5);
132
133        let mut all_node_stats = NodeStats::default();
134
135        while let Some((result, service_name)) = stream.next().await {
136            match result {
137                Ok(stats) => {
138                    let individual_stats = IndividualNodeStats {
139                        service_name: service_name.clone(),
140                        forwarded_rewards: stats.forwarded_rewards,
141                        rewards_wallet_balance: stats.rewards_wallet_balance,
142                        memory_usage_mb: stats.memory_usage_mb,
143                        bandwidth_inbound: stats.bandwidth_inbound,
144                        bandwidth_outbound: stats.bandwidth_outbound,
145                        max_records: stats.max_records,
146                        peers: stats.peers,
147                        connections: stats.connections,
148                        bandwidth_inbound_rate: stats.bandwidth_inbound_rate,
149                        bandwidth_outbound_rate: stats.bandwidth_outbound_rate,
150                    };
151                    all_node_stats.merge(&individual_stats);
152                }
153                Err(err) => {
154                    error!("Error while fetching stats from {service_name:?}: {err:?}");
155                }
156            }
157        }
158
159        if let Err(err) = action_sender.send(Action::StatusActions(
160            StatusActions::NodesStatsObtained(all_node_stats),
161        )) {
162            error!("Error while sending action: {err:?}");
163        }
164    }
165
166    async fn fetch_stat_per_node(
167        metrics_port: u16,
168        _data_dir: PathBuf,
169    ) -> Result<IndividualNodeStats> {
170        let now = Instant::now();
171
172        let body = reqwest::get(&format!("http://localhost:{metrics_port}/metrics"))
173            .await?
174            .text()
175            .await?;
176        let lines: Vec<_> = body.lines().map(|s| Ok(s.to_owned())).collect();
177        let all_metrics = prometheus_parse::Scrape::parse(lines.into_iter())?;
178
179        let mut stats = IndividualNodeStats::default();
180
181        for sample in all_metrics.samples.iter() {
182            if sample.metric == "ant_node_total_forwarded_rewards" {
183                // Nanos
184                match sample.value {
185                    prometheus_parse::Value::Counter(val)
186                    | prometheus_parse::Value::Gauge(val)
187                    | prometheus_parse::Value::Untyped(val) => {
188                        stats.forwarded_rewards = val as usize;
189                    }
190                    _ => {}
191                }
192            } else if sample.metric == "ant_node_current_reward_wallet_balance" {
193                // Attos
194                match sample.value {
195                    prometheus_parse::Value::Counter(val)
196                    | prometheus_parse::Value::Gauge(val)
197                    | prometheus_parse::Value::Untyped(val) => {
198                        stats.rewards_wallet_balance = val as usize;
199                    }
200                    _ => {}
201                }
202            } else if sample.metric == "ant_networking_process_memory_used_mb" {
203                // Memory
204                match sample.value {
205                    prometheus_parse::Value::Counter(val)
206                    | prometheus_parse::Value::Gauge(val)
207                    | prometheus_parse::Value::Untyped(val) => {
208                        stats.memory_usage_mb = val as usize;
209                    }
210                    _ => {}
211                }
212            } else if sample.metric == "libp2p_bandwidth_bytes_total" {
213                // Mbps
214                match sample.value {
215                    prometheus_parse::Value::Counter(val)
216                    | prometheus_parse::Value::Gauge(val)
217                    | prometheus_parse::Value::Untyped(val) => {
218                        if let Some(direction) = sample.labels.get("direction") {
219                            if direction == "Inbound" {
220                                let current_inbound = val as usize;
221                                let rate = (current_inbound as f64
222                                    - stats.bandwidth_inbound as f64)
223                                    / NODE_STAT_UPDATE_INTERVAL.as_secs_f64();
224                                stats.bandwidth_inbound_rate = rate as usize;
225                                stats.bandwidth_inbound = current_inbound;
226                            } else if direction == "Outbound" {
227                                let current_outbound = val as usize;
228                                let rate = (current_outbound as f64
229                                    - stats.bandwidth_outbound as f64)
230                                    / NODE_STAT_UPDATE_INTERVAL.as_secs_f64();
231                                stats.bandwidth_outbound_rate = rate as usize;
232                                stats.bandwidth_outbound = current_outbound;
233                            }
234                        }
235                    }
236                    _ => {}
237                }
238            } else if sample.metric == "ant_networking_records_stored" {
239                // Records
240                match sample.value {
241                    prometheus_parse::Value::Counter(val)
242                    | prometheus_parse::Value::Gauge(val)
243                    | prometheus_parse::Value::Untyped(val) => {
244                        stats.max_records = val as usize;
245                    }
246                    _ => {}
247                }
248            } else if sample.metric == "ant_networking_peers_in_routing_table" {
249                // Peers
250                match sample.value {
251                    prometheus_parse::Value::Counter(val)
252                    | prometheus_parse::Value::Gauge(val)
253                    | prometheus_parse::Value::Untyped(val) => {
254                        stats.peers = val as usize;
255                    }
256                    _ => {}
257                }
258            } else if sample.metric == "ant_networking_open_connections" {
259                // Connections
260                match sample.value {
261                    prometheus_parse::Value::Counter(val)
262                    | prometheus_parse::Value::Gauge(val)
263                    | prometheus_parse::Value::Untyped(val) => {
264                        stats.connections = val as usize;
265                    }
266                    _ => {}
267                }
268            }
269        }
270        trace!(
271            "Fetched stats from metrics_port {metrics_port:?} in {:?}",
272            now.elapsed()
273        );
274        Ok(stats)
275    }
276}