amareleo_node_metrics/
lib.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod names;
17
18// Expose the names at the crate level for easy access.
19pub use names::*;
20
21// Re-export the snarkVM metrics.
22pub use snarkvm::metrics::*;
23
24#[cfg(not(feature = "serial"))]
25use rayon::prelude::*;
26
27use parking_lot::Mutex;
28use snarkvm::{
29    ledger::narwhal::TransmissionID,
30    prelude::{Block, Network, cfg_iter},
31};
32use std::{
33    collections::HashMap,
34    net::SocketAddr,
35    sync::{
36        Arc,
37        atomic::{AtomicUsize, Ordering},
38    },
39};
40use time::OffsetDateTime;
41
42/// Initializes the metrics and returns a handle to the task running the metrics exporter.
43pub fn initialize_metrics(ip: Option<SocketAddr>) {
44    // Build the Prometheus exporter.
45    let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
46    if let Some(ip) = ip { builder.with_http_listener(ip) } else { builder }
47        .install()
48        .expect("can't build the prometheus exporter");
49
50    // Register the snarkVM metrics.
51    snarkvm::metrics::register_metrics();
52
53    // Register the metrics so they exist on init.
54    for name in crate::names::GAUGE_NAMES {
55        register_gauge(name);
56    }
57    for name in crate::names::COUNTER_NAMES {
58        register_counter(name);
59    }
60    for name in crate::names::HISTOGRAM_NAMES {
61        register_histogram(name);
62    }
63}
64
65pub fn update_block_metrics<N: Network>(block: &Block<N>) {
66    use snarkvm::ledger::ConfirmedTransaction;
67
68    let accepted_deploy = AtomicUsize::new(0);
69    let accepted_execute = AtomicUsize::new(0);
70    let rejected_deploy = AtomicUsize::new(0);
71    let rejected_execute = AtomicUsize::new(0);
72
73    // Add transaction to atomic counter based on enum type match.
74    cfg_iter!(block.transactions()).for_each(|tx| match tx {
75        ConfirmedTransaction::AcceptedDeploy(_, _, _) => {
76            accepted_deploy.fetch_add(1, Ordering::Relaxed);
77        }
78        ConfirmedTransaction::AcceptedExecute(_, _, _) => {
79            accepted_execute.fetch_add(1, Ordering::Relaxed);
80        }
81        ConfirmedTransaction::RejectedDeploy(_, _, _, _) => {
82            rejected_deploy.fetch_add(1, Ordering::Relaxed);
83        }
84        ConfirmedTransaction::RejectedExecute(_, _, _, _) => {
85            rejected_execute.fetch_add(1, Ordering::Relaxed);
86        }
87    });
88
89    increment_gauge(blocks::ACCEPTED_DEPLOY, accepted_deploy.load(Ordering::Relaxed) as f64);
90    increment_gauge(blocks::ACCEPTED_EXECUTE, accepted_execute.load(Ordering::Relaxed) as f64);
91    increment_gauge(blocks::REJECTED_DEPLOY, rejected_deploy.load(Ordering::Relaxed) as f64);
92    increment_gauge(blocks::REJECTED_EXECUTE, rejected_execute.load(Ordering::Relaxed) as f64);
93
94    // Update aborted transactions and solutions.
95    increment_gauge(blocks::ABORTED_TRANSACTIONS, block.aborted_transaction_ids().len() as f64);
96    increment_gauge(blocks::ABORTED_SOLUTIONS, block.aborted_solution_ids().len() as f64);
97}
98
99pub fn add_transmission_latency_metric<N: Network>(
100    transmissions_queue_timestamps: &Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
101    block: &Block<N>,
102) {
103    const AGE_THRESHOLD_SECONDS: i32 = 30 * 60; // 30 minutes set as stale transmission threshold
104
105    // Retrieve the solution IDs.
106    let solution_ids: std::collections::HashSet<_> =
107        block.solutions().solution_ids().chain(block.aborted_solution_ids()).collect();
108
109    // Retrieve the transaction IDs.
110    let transaction_ids: std::collections::HashSet<_> =
111        block.transaction_ids().chain(block.aborted_transaction_ids()).collect();
112
113    let mut transmission_queue_timestamps = transmissions_queue_timestamps.lock();
114    let ts_now = OffsetDateTime::now_utc().unix_timestamp();
115
116    // Determine which keys to remove.
117    let keys_to_remove = cfg_iter!(transmission_queue_timestamps)
118        .flat_map(|(key, timestamp)| {
119            let elapsed_time = std::time::Duration::from_secs((ts_now - *timestamp) as u64);
120
121            if elapsed_time.as_secs() > AGE_THRESHOLD_SECONDS as u64 {
122                // This entry is stale-- remove it from transmission queue and record it as a stale transmission.
123                increment_counter(consensus::STALE_UNCONFIRMED_TRANSMISSIONS);
124                Some(*key)
125            } else {
126                let transmission_type = match key {
127                    TransmissionID::Solution(solution_id, _) if solution_ids.contains(solution_id) => Some("solution"),
128                    TransmissionID::Transaction(transaction_id, _) if transaction_ids.contains(transaction_id) => {
129                        Some("transaction")
130                    }
131                    _ => None,
132                };
133
134                if let Some(transmission_type_string) = transmission_type {
135                    histogram_label(
136                        consensus::TRANSMISSION_LATENCY,
137                        "transmission_type",
138                        transmission_type_string.to_owned(),
139                        elapsed_time.as_secs_f64(),
140                    );
141                    Some(*key)
142                } else {
143                    None
144                }
145            }
146        })
147        .collect::<Vec<_>>();
148
149    // Remove keys of stale or seen transmissions.
150    for key in keys_to_remove {
151        transmission_queue_timestamps.remove(&key);
152    }
153}