Skip to main content

arc_malachitebft_sync/
metrics.rs

1use std::ops::Deref;
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, Instant};
4
5use dashmap::DashMap;
6use malachitebft_metrics::prometheus::metrics::counter::Counter;
7use malachitebft_metrics::prometheus::metrics::gauge::Gauge;
8use malachitebft_metrics::prometheus::metrics::histogram::{exponential_buckets, Histogram};
9use malachitebft_metrics::SharedRegistry;
10
11#[derive(Clone, Debug)]
12pub struct Metrics(Arc<Inner>);
13
14impl Deref for Metrics {
15    type Target = Inner;
16
17    fn deref(&self) -> &Self::Target {
18        &self.0
19    }
20}
21
22#[derive(Debug)]
23pub struct Inner {
24    value_requests_sent: Counter,
25    value_requests_received: Counter,
26    value_responses_sent: Counter,
27    value_responses_received: Counter,
28    value_client_latency: Histogram,
29    value_server_latency: Histogram,
30    value_request_timeouts: Counter,
31    status_interarrival: Histogram,
32    status_interarrival_normalized: Histogram, // Independent of number of peers and status update interval
33    status_total: Counter,
34
35    instant_request_sent: Arc<DashMap<u64, Instant>>,
36    instant_request_received: Arc<DashMap<u64, Instant>>,
37    instant_last_status_received: Arc<Mutex<Option<Instant>>>,
38    status_update_interval: Duration,
39
40    pub scoring: crate::scoring::metrics::Metrics,
41
42    /// Number of heights in the sync input queue
43    pub sync_queue_heights: Gauge,
44
45    /// Number of inputs in the sync input queue across all heights
46    pub sync_queue_size: Gauge,
47}
48
49impl Inner {
50    pub fn new(status_update_interval: Duration) -> Self {
51        let t = status_update_interval.as_secs_f64();
52        Self {
53            value_requests_sent: Counter::default(),
54            value_requests_received: Counter::default(),
55            value_responses_sent: Counter::default(),
56            value_responses_received: Counter::default(),
57            value_client_latency: Histogram::new(exponential_buckets(0.1, 2.0, 20)),
58            value_server_latency: Histogram::new(exponential_buckets(0.1, 2.0, 20)),
59            value_request_timeouts: Counter::default(),
60            status_interarrival: Histogram::new(exponential_buckets(0.05 * t.max(1e-6), 1.15, 40)),
61            status_interarrival_normalized: Histogram::new(exponential_buckets(0.05, 1.15, 40)),
62            status_total: Counter::default(),
63            instant_request_sent: Arc::new(DashMap::new()),
64            instant_request_received: Arc::new(DashMap::new()),
65            instant_last_status_received: Arc::new(Mutex::new(None)),
66            status_update_interval,
67            scoring: crate::scoring::metrics::Metrics::new(),
68            sync_queue_heights: Gauge::default(),
69            sync_queue_size: Gauge::default(),
70        }
71    }
72}
73
74impl Metrics {
75    pub fn new(status_update_interval: Duration) -> Self {
76        Self(Arc::new(Inner::new(status_update_interval)))
77    }
78
79    pub fn register(registry: &SharedRegistry, status_update_interval: Duration) -> Self {
80        let metrics = Self::new(status_update_interval);
81
82        registry.with_prefix("malachitebft_sync", |registry| {
83            // Value sync related metrics
84            registry.register(
85                "value_requests_sent",
86                "Number of ValueSync requests sent",
87                metrics.value_requests_sent.clone(),
88            );
89
90            registry.register(
91                "value_requests_received",
92                "Number of ValueSync requests received",
93                metrics.value_requests_received.clone(),
94            );
95
96            registry.register(
97                "value_responses_sent",
98                "Number of ValueSync responses sent",
99                metrics.value_responses_sent.clone(),
100            );
101
102            registry.register(
103                "value_responses_received",
104                "Number of ValueSync responses received",
105                metrics.value_responses_received.clone(),
106            );
107
108            registry.register(
109                "value_client_latency",
110                "Interval of time between when request was sent and response was received",
111                metrics.value_client_latency.clone(),
112            );
113
114            registry.register(
115                "value_server_latency",
116                "Interval of time between when request was received and response was sent",
117                metrics.value_server_latency.clone(),
118            );
119
120            registry.register(
121                "value_request_timeouts",
122                "Number of ValueSync request timeouts",
123                metrics.value_request_timeouts.clone(),
124            );
125
126            metrics.scoring.register(registry);
127
128            registry.register(
129                "sync_queue_heights",
130                "Number of heights in the sync input queue",
131                metrics.sync_queue_heights.clone(),
132            );
133
134            registry.register(
135                "sync_queue_size",
136                "Number of inputs in the sync input queue across all heights",
137                metrics.sync_queue_size.clone(),
138            );
139
140            registry.register(
141                "status_interarrival",
142                "Status updates interarrival histogram (any peer)",
143                metrics.status_interarrival.clone(),
144            );
145
146            registry.register(
147                "status_interarrival_normalized",
148                "Status updates interarrival histogram (any peer) normalized to have a mean of 1",
149                metrics.status_interarrival_normalized.clone(),
150            );
151            registry.register(
152                "status_total",
153                "Total number of status updates received",
154                metrics.status_total.clone(),
155            );
156        });
157
158        metrics
159    }
160
161    pub fn value_request_sent(&self, height: u64) {
162        self.value_requests_sent.inc();
163        self.instant_request_sent.insert(height, Instant::now());
164    }
165
166    pub fn value_request_received(&self, height: u64) {
167        self.value_requests_received.inc();
168        self.instant_request_received.insert(height, Instant::now());
169    }
170
171    pub fn value_response_sent(&self, height: u64) {
172        self.value_responses_sent.inc();
173
174        if let Some((_, instant)) = self.instant_request_received.remove(&height) {
175            self.value_server_latency
176                .observe(instant.elapsed().as_secs_f64());
177        }
178    }
179
180    pub fn value_response_received(&self, height: u64) -> Option<Duration> {
181        self.value_responses_received.inc();
182
183        if let Some((_, instant_request_sent)) = self.instant_request_sent.remove(&height) {
184            let latency = instant_request_sent.elapsed();
185            self.value_client_latency.observe(latency.as_secs_f64());
186            Some(latency)
187        } else {
188            None
189        }
190    }
191
192    pub fn value_request_timed_out(&self, height: u64) {
193        self.value_request_timeouts.inc();
194        self.instant_request_sent.remove(&height);
195    }
196
197    pub fn status_received(&self, n_peers: u64) {
198        self.status_total.inc();
199        let now = Instant::now();
200
201        let mut last_recv_guard = self.instant_last_status_received.lock().unwrap();
202        if let Some(prev) = *last_recv_guard {
203            let delta = now.duration_since(prev).as_secs_f64();
204            self.status_interarrival.observe(delta);
205
206            if n_peers > 0 {
207                // Observe normalized metric (delta / (T/N))
208                let mu = self.status_update_interval.as_secs_f64() / (n_peers as f64);
209                if mu > 0.0 {
210                    let ratio = delta / mu;
211                    self.status_interarrival_normalized.observe(ratio);
212                }
213            }
214        }
215        *last_recv_guard = Some(now);
216    }
217
218    pub fn sync_queue_updated(&self, heights: usize, size: usize) {
219        self.sync_queue_heights.set(heights as _);
220        self.sync_queue_size.set(size as _);
221    }
222}
223
224impl Default for Metrics {
225    fn default() -> Self {
226        // Default interval of 1s.
227        Self::new(Duration::from_secs(1))
228    }
229}