node_launchpad/
node_stats.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
// Copyright 2024 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use ant_service_management::{NodeServiceData, ServiceStatus};
use color_eyre::Result;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::{path::PathBuf, time::Instant};
use tokio::sync::mpsc::UnboundedSender;

use super::components::status::NODE_STAT_UPDATE_INTERVAL;

use crate::action::{Action, StatusActions};

#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct IndividualNodeStats {
    pub service_name: String,
    pub forwarded_rewards: usize,
    pub rewards_wallet_balance: usize,
    pub memory_usage_mb: usize,
    pub bandwidth_inbound: usize,
    pub bandwidth_outbound: usize,
    pub bandwidth_inbound_rate: usize,
    pub bandwidth_outbound_rate: usize,
    pub max_records: usize,
    pub peers: usize,
    pub connections: usize,
}

#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct NodeStats {
    pub total_forwarded_rewards: usize,
    pub total_rewards_wallet_balance: usize,
    pub total_memory_usage_mb: usize,
    pub individual_stats: Vec<IndividualNodeStats>,
}

impl NodeStats {
    fn merge(&mut self, other: &IndividualNodeStats) {
        self.total_forwarded_rewards += other.forwarded_rewards;
        self.total_rewards_wallet_balance += other.rewards_wallet_balance;
        self.total_memory_usage_mb += other.memory_usage_mb;
        self.individual_stats.push(other.clone()); // Store individual stats
    }

    /// Fetches statistics from all running nodes and sends the aggregated stats via the action sender.
    ///
    /// This method iterates over the provided list of `NodeServiceData` instances, filters out nodes that are not running,
    /// and for each running node, it checks if a metrics port is available. If a metrics port is found, the node's details
    /// (service name, metrics port, and data directory path) are collected. If no metrics port is found, a debug message
    /// is logged indicating that the node's stats will not be fetched.
    ///
    /// If there are any nodes with available metrics ports, this method spawns a local task to asynchronously fetch
    /// statistics from these nodes using `fetch_all_node_stats_inner`. The aggregated statistics are then sent via the
    /// provided `action_sender`.
    ///
    /// If no running nodes with metrics ports are found, a debug message is logged indicating that there are no running nodes
    /// to fetch stats from.
    ///
    /// # Parameters
    ///
    /// * `nodes`: A slice of `NodeServiceData` instances representing the nodes to fetch statistics from.
    /// * `action_sender`: An unbounded sender of `Action` instances used to send the aggregated node statistics.
    pub fn fetch_all_node_stats(nodes: &[NodeServiceData], action_sender: UnboundedSender<Action>) {
        let node_details = nodes
            .iter()
            .filter_map(|node| {
                if node.status == ServiceStatus::Running {
                    if let Some(metrics_port) = node.metrics_port {
                        Some((
                            node.service_name.clone(),
                            metrics_port,
                            node.data_dir_path.clone(),
                        ))
                    } else {
                        error!(
                            "No metrics port found for {:?}. Skipping stat fetch.",
                            node.service_name
                        );
                        None
                    }
                } else {
                    None
                }
            })
            .collect::<Vec<_>>();
        if !node_details.is_empty() {
            debug!("Fetching stats from {} nodes", node_details.len());
            tokio::spawn(async move {
                Self::fetch_all_node_stats_inner(node_details, action_sender).await;
            });
        } else {
            debug!("No running nodes to fetch stats from.");
        }
    }

    /// This method is an inner function used to fetch statistics from all nodes.
    /// It takes a vector of node details (service name, metrics port, and data directory path) and an unbounded sender of `Action` instances.
    /// The method iterates over the provided list of `NodeServiceData` instances, filters out nodes that are not running,
    /// and for each running node, it checks if a metrics port is available. If a metrics port is found, the node's details
    /// (service name, metrics port, and data directory path) are collected. If no metrics port is found, a debug message
    /// is logged indicating that the node's stats will not be fetched.
    ///
    /// If there are any nodes with available metrics ports, this method spawns a local task to asynchronously fetch
    /// statistics from these nodes using `fetch_all_node_stats_inner`. The aggregated statistics are then sent via the
    /// provided `action_sender`.
    ///
    /// If no running nodes with metrics ports are found, a debug message is logged indicating that there are no running nodes
    /// to fetch stats from.
    ///
    /// # Parameters
    ///
    /// * `node_details`: A vector of tuples, each containing the service name, metrics port, and data directory path of a node.
    /// * `action_sender`: An unbounded sender of `Action` instances used to send the aggregated node statistics.
    async fn fetch_all_node_stats_inner(
        node_details: Vec<(String, u16, PathBuf)>,
        action_sender: UnboundedSender<Action>,
    ) {
        let mut stream = futures::stream::iter(node_details)
            .map(|(service_name, metrics_port, data_dir)| async move {
                (
                    Self::fetch_stat_per_node(metrics_port, data_dir).await,
                    service_name,
                )
            })
            .buffer_unordered(5);

        let mut all_node_stats = NodeStats::default();

        while let Some((result, service_name)) = stream.next().await {
            match result {
                Ok(stats) => {
                    let individual_stats = IndividualNodeStats {
                        service_name: service_name.clone(),
                        forwarded_rewards: stats.forwarded_rewards,
                        rewards_wallet_balance: stats.rewards_wallet_balance,
                        memory_usage_mb: stats.memory_usage_mb,
                        bandwidth_inbound: stats.bandwidth_inbound,
                        bandwidth_outbound: stats.bandwidth_outbound,
                        max_records: stats.max_records,
                        peers: stats.peers,
                        connections: stats.connections,
                        bandwidth_inbound_rate: stats.bandwidth_inbound_rate,
                        bandwidth_outbound_rate: stats.bandwidth_outbound_rate,
                    };
                    all_node_stats.merge(&individual_stats);
                }
                Err(err) => {
                    error!("Error while fetching stats from {service_name:?}: {err:?}");
                }
            }
        }

        if let Err(err) = action_sender.send(Action::StatusActions(
            StatusActions::NodesStatsObtained(all_node_stats),
        )) {
            error!("Error while sending action: {err:?}");
        }
    }

    async fn fetch_stat_per_node(
        metrics_port: u16,
        _data_dir: PathBuf,
    ) -> Result<IndividualNodeStats> {
        let now = Instant::now();

        let body = reqwest::get(&format!("http://localhost:{metrics_port}/metrics"))
            .await?
            .text()
            .await?;
        let lines: Vec<_> = body.lines().map(|s| Ok(s.to_owned())).collect();
        let all_metrics = prometheus_parse::Scrape::parse(lines.into_iter())?;

        let mut stats = IndividualNodeStats::default();

        for sample in all_metrics.samples.iter() {
            if sample.metric == "ant_node_total_forwarded_rewards" {
                // Nanos
                match sample.value {
                    prometheus_parse::Value::Counter(val)
                    | prometheus_parse::Value::Gauge(val)
                    | prometheus_parse::Value::Untyped(val) => {
                        stats.forwarded_rewards = val as usize;
                    }
                    _ => {}
                }
            } else if sample.metric == "ant_node_current_reward_wallet_balance" {
                // Attos
                match sample.value {
                    prometheus_parse::Value::Counter(val)
                    | prometheus_parse::Value::Gauge(val)
                    | prometheus_parse::Value::Untyped(val) => {
                        stats.rewards_wallet_balance = val as usize;
                    }
                    _ => {}
                }
            } else if sample.metric == "ant_networking_process_memory_used_mb" {
                // Memory
                match sample.value {
                    prometheus_parse::Value::Counter(val)
                    | prometheus_parse::Value::Gauge(val)
                    | prometheus_parse::Value::Untyped(val) => {
                        stats.memory_usage_mb = val as usize;
                    }
                    _ => {}
                }
            } else if sample.metric == "libp2p_bandwidth_bytes_total" {
                // Mbps
                match sample.value {
                    prometheus_parse::Value::Counter(val)
                    | prometheus_parse::Value::Gauge(val)
                    | prometheus_parse::Value::Untyped(val) => {
                        if let Some(direction) = sample.labels.get("direction") {
                            if direction == "Inbound" {
                                let current_inbound = val as usize;
                                let rate = (current_inbound as f64
                                    - stats.bandwidth_inbound as f64)
                                    / NODE_STAT_UPDATE_INTERVAL.as_secs_f64();
                                stats.bandwidth_inbound_rate = rate as usize;
                                stats.bandwidth_inbound = current_inbound;
                            } else if direction == "Outbound" {
                                let current_outbound = val as usize;
                                let rate = (current_outbound as f64
                                    - stats.bandwidth_outbound as f64)
                                    / NODE_STAT_UPDATE_INTERVAL.as_secs_f64();
                                stats.bandwidth_outbound_rate = rate as usize;
                                stats.bandwidth_outbound = current_outbound;
                            }
                        }
                    }
                    _ => {}
                }
            } else if sample.metric == "ant_networking_records_stored" {
                // Records
                match sample.value {
                    prometheus_parse::Value::Counter(val)
                    | prometheus_parse::Value::Gauge(val)
                    | prometheus_parse::Value::Untyped(val) => {
                        stats.max_records = val as usize;
                    }
                    _ => {}
                }
            } else if sample.metric == "ant_networking_peers_in_routing_table" {
                // Peers
                match sample.value {
                    prometheus_parse::Value::Counter(val)
                    | prometheus_parse::Value::Gauge(val)
                    | prometheus_parse::Value::Untyped(val) => {
                        stats.peers = val as usize;
                    }
                    _ => {}
                }
            } else if sample.metric == "ant_networking_open_connections" {
                // Connections
                match sample.value {
                    prometheus_parse::Value::Counter(val)
                    | prometheus_parse::Value::Gauge(val)
                    | prometheus_parse::Value::Untyped(val) => {
                        stats.connections = val as usize;
                    }
                    _ => {}
                }
            }
        }
        trace!(
            "Fetched stats from metrics_port {metrics_port:?} in {:?}",
            now.elapsed()
        );
        Ok(stats)
    }
}