1use std::ops::Deref;
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, Instant};
4
5use dashmap::DashMap;
6use malachitebft_metrics::prometheus::metrics::counter::Counter;
7use malachitebft_metrics::prometheus::metrics::gauge::Gauge;
8use malachitebft_metrics::prometheus::metrics::histogram::{exponential_buckets, Histogram};
9use malachitebft_metrics::SharedRegistry;
10
11#[derive(Clone, Debug)]
12pub struct Metrics(Arc<Inner>);
13
14impl Deref for Metrics {
15 type Target = Inner;
16
17 fn deref(&self) -> &Self::Target {
18 &self.0
19 }
20}
21
22#[derive(Debug)]
23pub struct Inner {
24 value_requests_sent: Counter,
25 value_requests_received: Counter,
26 value_responses_sent: Counter,
27 value_responses_received: Counter,
28 value_client_latency: Histogram,
29 value_server_latency: Histogram,
30 value_request_timeouts: Counter,
31 status_interarrival: Histogram,
32 status_interarrival_normalized: Histogram, status_total: Counter,
34
35 instant_request_sent: Arc<DashMap<u64, Instant>>,
36 instant_request_received: Arc<DashMap<u64, Instant>>,
37 instant_last_status_received: Arc<Mutex<Option<Instant>>>,
38 status_update_interval: Duration,
39
40 pub scoring: crate::scoring::metrics::Metrics,
41
42 pub sync_queue_heights: Gauge,
44
45 pub sync_queue_size: Gauge,
47}
48
49impl Inner {
50 pub fn new(status_update_interval: Duration) -> Self {
51 let t = status_update_interval.as_secs_f64();
52 Self {
53 value_requests_sent: Counter::default(),
54 value_requests_received: Counter::default(),
55 value_responses_sent: Counter::default(),
56 value_responses_received: Counter::default(),
57 value_client_latency: Histogram::new(exponential_buckets(0.1, 2.0, 20)),
58 value_server_latency: Histogram::new(exponential_buckets(0.1, 2.0, 20)),
59 value_request_timeouts: Counter::default(),
60 status_interarrival: Histogram::new(exponential_buckets(0.05 * t.max(1e-6), 1.15, 40)),
61 status_interarrival_normalized: Histogram::new(exponential_buckets(0.05, 1.15, 40)),
62 status_total: Counter::default(),
63 instant_request_sent: Arc::new(DashMap::new()),
64 instant_request_received: Arc::new(DashMap::new()),
65 instant_last_status_received: Arc::new(Mutex::new(None)),
66 status_update_interval,
67 scoring: crate::scoring::metrics::Metrics::new(),
68 sync_queue_heights: Gauge::default(),
69 sync_queue_size: Gauge::default(),
70 }
71 }
72}
73
74impl Metrics {
75 pub fn new(status_update_interval: Duration) -> Self {
76 Self(Arc::new(Inner::new(status_update_interval)))
77 }
78
79 pub fn register(registry: &SharedRegistry, status_update_interval: Duration) -> Self {
80 let metrics = Self::new(status_update_interval);
81
82 registry.with_prefix("malachitebft_sync", |registry| {
83 registry.register(
85 "value_requests_sent",
86 "Number of ValueSync requests sent",
87 metrics.value_requests_sent.clone(),
88 );
89
90 registry.register(
91 "value_requests_received",
92 "Number of ValueSync requests received",
93 metrics.value_requests_received.clone(),
94 );
95
96 registry.register(
97 "value_responses_sent",
98 "Number of ValueSync responses sent",
99 metrics.value_responses_sent.clone(),
100 );
101
102 registry.register(
103 "value_responses_received",
104 "Number of ValueSync responses received",
105 metrics.value_responses_received.clone(),
106 );
107
108 registry.register(
109 "value_client_latency",
110 "Interval of time between when request was sent and response was received",
111 metrics.value_client_latency.clone(),
112 );
113
114 registry.register(
115 "value_server_latency",
116 "Interval of time between when request was received and response was sent",
117 metrics.value_server_latency.clone(),
118 );
119
120 registry.register(
121 "value_request_timeouts",
122 "Number of ValueSync request timeouts",
123 metrics.value_request_timeouts.clone(),
124 );
125
126 metrics.scoring.register(registry);
127
128 registry.register(
129 "sync_queue_heights",
130 "Number of heights in the sync input queue",
131 metrics.sync_queue_heights.clone(),
132 );
133
134 registry.register(
135 "sync_queue_size",
136 "Number of inputs in the sync input queue across all heights",
137 metrics.sync_queue_size.clone(),
138 );
139
140 registry.register(
141 "status_interarrival",
142 "Status updates interarrival histogram (any peer)",
143 metrics.status_interarrival.clone(),
144 );
145
146 registry.register(
147 "status_interarrival_normalized",
148 "Status updates interarrival histogram (any peer) normalized to have a mean of 1",
149 metrics.status_interarrival_normalized.clone(),
150 );
151 registry.register(
152 "status_total",
153 "Total number of status updates received",
154 metrics.status_total.clone(),
155 );
156 });
157
158 metrics
159 }
160
161 pub fn value_request_sent(&self, height: u64) {
162 self.value_requests_sent.inc();
163 self.instant_request_sent.insert(height, Instant::now());
164 }
165
166 pub fn value_request_received(&self, height: u64) {
167 self.value_requests_received.inc();
168 self.instant_request_received.insert(height, Instant::now());
169 }
170
171 pub fn value_response_sent(&self, height: u64) {
172 self.value_responses_sent.inc();
173
174 if let Some((_, instant)) = self.instant_request_received.remove(&height) {
175 self.value_server_latency
176 .observe(instant.elapsed().as_secs_f64());
177 }
178 }
179
180 pub fn value_response_received(&self, height: u64) -> Option<Duration> {
181 self.value_responses_received.inc();
182
183 if let Some((_, instant_request_sent)) = self.instant_request_sent.remove(&height) {
184 let latency = instant_request_sent.elapsed();
185 self.value_client_latency.observe(latency.as_secs_f64());
186 Some(latency)
187 } else {
188 None
189 }
190 }
191
192 pub fn value_request_timed_out(&self, height: u64) {
193 self.value_request_timeouts.inc();
194 self.instant_request_sent.remove(&height);
195 }
196
197 pub fn status_received(&self, n_peers: u64) {
198 self.status_total.inc();
199 let now = Instant::now();
200
201 let mut last_recv_guard = self.instant_last_status_received.lock().unwrap();
202 if let Some(prev) = *last_recv_guard {
203 let delta = now.duration_since(prev).as_secs_f64();
204 self.status_interarrival.observe(delta);
205
206 if n_peers > 0 {
207 let mu = self.status_update_interval.as_secs_f64() / (n_peers as f64);
209 if mu > 0.0 {
210 let ratio = delta / mu;
211 self.status_interarrival_normalized.observe(ratio);
212 }
213 }
214 }
215 *last_recv_guard = Some(now);
216 }
217
218 pub fn sync_queue_updated(&self, heights: usize, size: usize) {
219 self.sync_queue_heights.set(heights as _);
220 self.sync_queue_size.set(size as _);
221 }
222}
223
224impl Default for Metrics {
225 fn default() -> Self {
226 Self::new(Duration::from_secs(1))
228 }
229}