amareleo_node_metrics/
lib.rs1mod names;
17
18pub use names::*;
20
21pub use snarkvm::metrics::*;
23
24#[cfg(not(feature = "serial"))]
25use rayon::prelude::*;
26
27use parking_lot::Mutex;
28use snarkvm::{
29 ledger::narwhal::TransmissionID,
30 prelude::{Block, Network, cfg_iter},
31};
32use std::{
33 collections::HashMap,
34 net::SocketAddr,
35 sync::{
36 Arc,
37 atomic::{AtomicUsize, Ordering},
38 },
39};
40use time::OffsetDateTime;
41
42pub fn initialize_metrics(ip: Option<SocketAddr>) {
44 let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
46 if let Some(ip) = ip { builder.with_http_listener(ip) } else { builder }
47 .install()
48 .expect("can't build the prometheus exporter");
49
50 snarkvm::metrics::register_metrics();
52
53 for name in crate::names::GAUGE_NAMES {
55 register_gauge(name);
56 }
57 for name in crate::names::COUNTER_NAMES {
58 register_counter(name);
59 }
60 for name in crate::names::HISTOGRAM_NAMES {
61 register_histogram(name);
62 }
63}
64
65pub fn update_block_metrics<N: Network>(block: &Block<N>) {
66 use snarkvm::ledger::ConfirmedTransaction;
67
68 let accepted_deploy = AtomicUsize::new(0);
69 let accepted_execute = AtomicUsize::new(0);
70 let rejected_deploy = AtomicUsize::new(0);
71 let rejected_execute = AtomicUsize::new(0);
72
73 cfg_iter!(block.transactions()).for_each(|tx| match tx {
75 ConfirmedTransaction::AcceptedDeploy(_, _, _) => {
76 accepted_deploy.fetch_add(1, Ordering::Relaxed);
77 }
78 ConfirmedTransaction::AcceptedExecute(_, _, _) => {
79 accepted_execute.fetch_add(1, Ordering::Relaxed);
80 }
81 ConfirmedTransaction::RejectedDeploy(_, _, _, _) => {
82 rejected_deploy.fetch_add(1, Ordering::Relaxed);
83 }
84 ConfirmedTransaction::RejectedExecute(_, _, _, _) => {
85 rejected_execute.fetch_add(1, Ordering::Relaxed);
86 }
87 });
88
89 increment_gauge(blocks::ACCEPTED_DEPLOY, accepted_deploy.load(Ordering::Relaxed) as f64);
90 increment_gauge(blocks::ACCEPTED_EXECUTE, accepted_execute.load(Ordering::Relaxed) as f64);
91 increment_gauge(blocks::REJECTED_DEPLOY, rejected_deploy.load(Ordering::Relaxed) as f64);
92 increment_gauge(blocks::REJECTED_EXECUTE, rejected_execute.load(Ordering::Relaxed) as f64);
93
94 increment_gauge(blocks::ABORTED_TRANSACTIONS, block.aborted_transaction_ids().len() as f64);
96 increment_gauge(blocks::ABORTED_SOLUTIONS, block.aborted_solution_ids().len() as f64);
97}
98
99pub fn add_transmission_latency_metric<N: Network>(
100 transmissions_queue_timestamps: &Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
101 block: &Block<N>,
102) {
103 const AGE_THRESHOLD_SECONDS: i32 = 30 * 60; let solution_ids: std::collections::HashSet<_> =
107 block.solutions().solution_ids().chain(block.aborted_solution_ids()).collect();
108
109 let transaction_ids: std::collections::HashSet<_> =
111 block.transaction_ids().chain(block.aborted_transaction_ids()).collect();
112
113 let mut transmission_queue_timestamps = transmissions_queue_timestamps.lock();
114 let ts_now = OffsetDateTime::now_utc().unix_timestamp();
115
116 let keys_to_remove = cfg_iter!(transmission_queue_timestamps)
118 .flat_map(|(key, timestamp)| {
119 let elapsed_time = std::time::Duration::from_secs((ts_now - *timestamp) as u64);
120
121 if elapsed_time.as_secs() > AGE_THRESHOLD_SECONDS as u64 {
122 increment_counter(consensus::STALE_UNCONFIRMED_TRANSMISSIONS);
124 Some(*key)
125 } else {
126 let transmission_type = match key {
127 TransmissionID::Solution(solution_id, _) if solution_ids.contains(solution_id) => Some("solution"),
128 TransmissionID::Transaction(transaction_id, _) if transaction_ids.contains(transaction_id) => {
129 Some("transaction")
130 }
131 _ => None,
132 };
133
134 if let Some(transmission_type_string) = transmission_type {
135 histogram_label(
136 consensus::TRANSMISSION_LATENCY,
137 "transmission_type",
138 transmission_type_string.to_owned(),
139 elapsed_time.as_secs_f64(),
140 );
141 Some(*key)
142 } else {
143 None
144 }
145 }
146 })
147 .collect::<Vec<_>>();
148
149 for key in keys_to_remove {
151 transmission_queue_timestamps.remove(&key);
152 }
153}