sc_rpc_server/middleware/
metrics.rs1use std::{sync::LazyLock, time::Instant};
22
23use jsonrpsee::{types::Request, MethodResponse};
24use prometheus::core::{GenericCounter, GenericGauge};
25use prometheus_endpoint::{
26 register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry,
27 U64,
28};
29
30pub(crate) static RPC_THREADS_TOTAL: LazyLock<GenericCounter<U64>> = LazyLock::new(|| {
32 GenericCounter::new("substrate_rpc_threads_total", "Total number of RPC threads created")
33 .expect("Creating of statics doesn't fail. qed")
34});
35
36pub(crate) static RPC_THREADS_ALIVE: LazyLock<GenericGauge<U64>> = LazyLock::new(|| {
38 GenericGauge::new("substrate_rpc_threads_alive", "Number of RPC threads currently alive")
39 .expect("Creating of statics doesn't fail. qed")
40});
41
42const HISTOGRAM_BUCKETS: [f64; 11] = [
44 5.0,
45 25.0,
46 100.0,
47 500.0,
48 1_000.0,
49 2_500.0,
50 10_000.0,
51 25_000.0,
52 100_000.0,
53 1_000_000.0,
54 10_000_000.0,
55];
56
57#[derive(Debug, Clone)]
60pub struct RpcMetrics {
61 calls_time: HistogramVec,
63 calls_started: CounterVec<U64>,
65 calls_finished: CounterVec<U64>,
67 calls_rejected: CounterVec<U64>,
69 calls_retried: CounterVec<U64>,
71 ws_sessions_opened: Option<Counter<U64>>,
73 ws_sessions_closed: Option<Counter<U64>>,
75 ws_sessions_time: HistogramVec,
77}
78
79impl RpcMetrics {
80 pub fn new(metrics_registry: Option<&Registry>) -> Result<Option<Self>, PrometheusError> {
82 if let Some(metrics_registry) = metrics_registry {
83 metrics_registry.register(Box::new(RPC_THREADS_TOTAL.clone()))?;
85 metrics_registry.register(Box::new(RPC_THREADS_ALIVE.clone()))?;
86
87 Ok(Some(Self {
88 calls_time: register(
89 HistogramVec::new(
90 HistogramOpts::new(
91 "substrate_rpc_calls_time",
92 "Total time [μs] of processed RPC calls",
93 )
94 .buckets(HISTOGRAM_BUCKETS.to_vec()),
95 &["protocol", "method", "is_rate_limited"],
96 )?,
97 metrics_registry,
98 )?,
99 calls_started: register(
100 CounterVec::new(
101 Opts::new(
102 "substrate_rpc_calls_started",
103 "Number of received RPC calls (unique un-batched requests)",
104 ),
105 &["protocol", "method"],
106 )?,
107 metrics_registry,
108 )?,
109 calls_finished: register(
110 CounterVec::new(
111 Opts::new(
112 "substrate_rpc_calls_finished",
113 "Number of processed RPC calls (unique un-batched requests)",
114 ),
115 &["protocol", "method", "is_error", "is_rate_limited"],
116 )?,
117 metrics_registry,
118 )?,
119 calls_rejected: register(
120 CounterVec::new(
121 Opts::new(
122 "substrate_rpc_calls_rejected",
123 "Number of RPC calls rejected due to rate limiting",
124 ),
125 &["protocol", "method"],
126 )?,
127 metrics_registry,
128 )?,
129 calls_retried: register(
130 CounterVec::new(
131 Opts::new(
132 "substrate_rpc_calls_retried",
133 "Number of rate limit retries for RPC calls",
134 ),
135 &["protocol", "method"],
136 )?,
137 metrics_registry,
138 )?,
139 ws_sessions_opened: register(
140 Counter::new(
141 "substrate_rpc_sessions_opened",
142 "Number of persistent RPC sessions opened",
143 )?,
144 metrics_registry,
145 )?
146 .into(),
147 ws_sessions_closed: register(
148 Counter::new(
149 "substrate_rpc_sessions_closed",
150 "Number of persistent RPC sessions closed",
151 )?,
152 metrics_registry,
153 )?
154 .into(),
155 ws_sessions_time: register(
156 HistogramVec::new(
157 HistogramOpts::new(
158 "substrate_rpc_sessions_time",
159 "Total time [s] for each websocket session",
160 )
161 .buckets(HISTOGRAM_BUCKETS.to_vec()),
162 &["protocol"],
163 )?,
164 metrics_registry,
165 )?,
166 }))
167 } else {
168 Ok(None)
169 }
170 }
171
172 pub(crate) fn ws_connect(&self) {
173 self.ws_sessions_opened.as_ref().map(|counter| counter.inc());
174 }
175
176 pub(crate) fn ws_disconnect(&self, now: Instant) {
177 let micros = now.elapsed().as_secs();
178
179 self.ws_sessions_closed.as_ref().map(|counter| counter.inc());
180 self.ws_sessions_time.with_label_values(&["ws"]).observe(micros as _);
181 }
182
183 pub(crate) fn on_call(&self, req: &Request, transport_label: &'static str) {
184 log::trace!(
185 target: "rpc_metrics",
186 "[{transport_label}] on_call name={} params={:?}",
187 req.method_name(),
188 req.params(),
189 );
190
191 self.calls_started
192 .with_label_values(&[transport_label, req.method_name()])
193 .inc();
194 }
195
196 pub(crate) fn on_rejected(&self, req: &Request, transport_label: &'static str) {
197 log::trace!(
198 target: "rpc_metrics",
199 "[{transport_label}] {} call rejected due to rate limiting",
200 req.method_name(),
201 );
202 self.calls_rejected
203 .with_label_values(&[transport_label, req.method_name()])
204 .inc();
205 }
206
207 pub(crate) fn on_retry(&self, req: &Request, transport_label: &'static str) {
208 log::trace!(
209 target: "rpc_metrics",
210 "[{transport_label}] {} call retrying due to rate limiting",
211 req.method_name(),
212 );
213 self.calls_retried
214 .with_label_values(&[transport_label, req.method_name()])
215 .inc();
216 }
217
218 pub(crate) fn on_response(
219 &self,
220 req: &Request,
221 rp: &MethodResponse,
222 is_rate_limited: bool,
223 transport_label: &'static str,
224 now: Instant,
225 ) {
226 log::trace!(target: "rpc_metrics", "[{transport_label}] on_response started_at={:?}", now);
227 log::trace!(target: "rpc_metrics::extra", "[{transport_label}] result={}", rp.as_result());
228
229 let micros = now.elapsed().as_micros();
230 log::debug!(
231 target: "rpc_metrics",
232 "[{transport_label}] {} call took {} μs",
233 req.method_name(),
234 micros,
235 );
236 self.calls_time
237 .with_label_values(&[
238 transport_label,
239 req.method_name(),
240 if is_rate_limited { "true" } else { "false" },
241 ])
242 .observe(micros as _);
243 self.calls_finished
244 .with_label_values(&[
245 transport_label,
246 req.method_name(),
247 if rp.is_success() { "false" } else { "true" },
250 if is_rate_limited { "true" } else { "false" },
251 ])
252 .inc();
253 }
254}
255
256#[derive(Clone, Debug)]
258pub struct Metrics {
259 pub(crate) inner: RpcMetrics,
260 pub(crate) transport_label: &'static str,
261}
262
263impl Metrics {
264 pub fn new(metrics: RpcMetrics, transport_label: &'static str) -> Self {
266 Self { inner: metrics, transport_label }
267 }
268
269 pub(crate) fn ws_connect(&self) {
270 self.inner.ws_connect();
271 }
272
273 pub(crate) fn ws_disconnect(&self, now: Instant) {
274 self.inner.ws_disconnect(now)
275 }
276
277 pub(crate) fn on_call(&self, req: &Request) {
278 self.inner.on_call(req, self.transport_label)
279 }
280
281 pub(crate) fn on_rejected(&self, req: &Request) {
282 self.inner.on_rejected(req, self.transport_label)
283 }
284
285 pub(crate) fn on_retry(&self, req: &Request) {
286 self.inner.on_retry(req, self.transport_label)
287 }
288
289 pub(crate) fn on_response(
290 &self,
291 req: &Request,
292 rp: &MethodResponse,
293 is_rate_limited: bool,
294 now: Instant,
295 ) {
296 self.inner.on_response(req, rp, is_rate_limited, self.transport_label, now)
297 }
298}