Skip to main content

sc_rpc_server/middleware/
metrics.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! RPC middleware to collect prometheus metrics on RPC calls.
20
21use std::{sync::LazyLock, time::Instant};
22
23use jsonrpsee::{types::Request, MethodResponse};
24use prometheus::core::{GenericCounter, GenericGauge};
25use prometheus_endpoint::{
26	register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry,
27	U64,
28};
29
30/// Total number of RPC threads created.
31pub(crate) static RPC_THREADS_TOTAL: LazyLock<GenericCounter<U64>> = LazyLock::new(|| {
32	GenericCounter::new("substrate_rpc_threads_total", "Total number of RPC threads created")
33		.expect("Creating of statics doesn't fail. qed")
34});
35
36/// Number of RPC threads currently alive.
37pub(crate) static RPC_THREADS_ALIVE: LazyLock<GenericGauge<U64>> = LazyLock::new(|| {
38	GenericGauge::new("substrate_rpc_threads_alive", "Number of RPC threads currently alive")
39		.expect("Creating of statics doesn't fail. qed")
40});
41
42/// Histogram time buckets in microseconds.
43const HISTOGRAM_BUCKETS: [f64; 11] = [
44	5.0,
45	25.0,
46	100.0,
47	500.0,
48	1_000.0,
49	2_500.0,
50	10_000.0,
51	25_000.0,
52	100_000.0,
53	1_000_000.0,
54	10_000_000.0,
55];
56
57/// Metrics for RPC middleware storing information about the number of requests started/completed,
58/// calls started/completed and their timings.
59#[derive(Debug, Clone)]
60pub struct RpcMetrics {
61	/// Histogram over RPC execution times.
62	calls_time: HistogramVec,
63	/// Number of calls started.
64	calls_started: CounterVec<U64>,
65	/// Number of calls completed.
66	calls_finished: CounterVec<U64>,
67	/// Number of calls rejected due to rate limiting.
68	calls_rejected: CounterVec<U64>,
69	/// Number of rate limit retries.
70	calls_retried: CounterVec<U64>,
71	/// Number of Websocket sessions opened.
72	ws_sessions_opened: Option<Counter<U64>>,
73	/// Number of Websocket sessions closed.
74	ws_sessions_closed: Option<Counter<U64>>,
75	/// Histogram over RPC websocket sessions.
76	ws_sessions_time: HistogramVec,
77}
78
79impl RpcMetrics {
80	/// Create an instance of metrics
81	pub fn new(metrics_registry: Option<&Registry>) -> Result<Option<Self>, PrometheusError> {
82		if let Some(metrics_registry) = metrics_registry {
83			// Register RPC thread metrics
84			metrics_registry.register(Box::new(RPC_THREADS_TOTAL.clone()))?;
85			metrics_registry.register(Box::new(RPC_THREADS_ALIVE.clone()))?;
86
87			Ok(Some(Self {
88				calls_time: register(
89					HistogramVec::new(
90						HistogramOpts::new(
91							"substrate_rpc_calls_time",
92							"Total time [μs] of processed RPC calls",
93						)
94						.buckets(HISTOGRAM_BUCKETS.to_vec()),
95						&["protocol", "method", "is_rate_limited"],
96					)?,
97					metrics_registry,
98				)?,
99				calls_started: register(
100					CounterVec::new(
101						Opts::new(
102							"substrate_rpc_calls_started",
103							"Number of received RPC calls (unique un-batched requests)",
104						),
105						&["protocol", "method"],
106					)?,
107					metrics_registry,
108				)?,
109				calls_finished: register(
110					CounterVec::new(
111						Opts::new(
112							"substrate_rpc_calls_finished",
113							"Number of processed RPC calls (unique un-batched requests)",
114						),
115						&["protocol", "method", "is_error", "is_rate_limited"],
116					)?,
117					metrics_registry,
118				)?,
119				calls_rejected: register(
120					CounterVec::new(
121						Opts::new(
122							"substrate_rpc_calls_rejected",
123							"Number of RPC calls rejected due to rate limiting",
124						),
125						&["protocol", "method"],
126					)?,
127					metrics_registry,
128				)?,
129				calls_retried: register(
130					CounterVec::new(
131						Opts::new(
132							"substrate_rpc_calls_retried",
133							"Number of rate limit retries for RPC calls",
134						),
135						&["protocol", "method"],
136					)?,
137					metrics_registry,
138				)?,
139				ws_sessions_opened: register(
140					Counter::new(
141						"substrate_rpc_sessions_opened",
142						"Number of persistent RPC sessions opened",
143					)?,
144					metrics_registry,
145				)?
146				.into(),
147				ws_sessions_closed: register(
148					Counter::new(
149						"substrate_rpc_sessions_closed",
150						"Number of persistent RPC sessions closed",
151					)?,
152					metrics_registry,
153				)?
154				.into(),
155				ws_sessions_time: register(
156					HistogramVec::new(
157						HistogramOpts::new(
158							"substrate_rpc_sessions_time",
159							"Total time [s] for each websocket session",
160						)
161						.buckets(HISTOGRAM_BUCKETS.to_vec()),
162						&["protocol"],
163					)?,
164					metrics_registry,
165				)?,
166			}))
167		} else {
168			Ok(None)
169		}
170	}
171
172	pub(crate) fn ws_connect(&self) {
173		self.ws_sessions_opened.as_ref().map(|counter| counter.inc());
174	}
175
176	pub(crate) fn ws_disconnect(&self, now: Instant) {
177		let micros = now.elapsed().as_secs();
178
179		self.ws_sessions_closed.as_ref().map(|counter| counter.inc());
180		self.ws_sessions_time.with_label_values(&["ws"]).observe(micros as _);
181	}
182
183	pub(crate) fn on_call(&self, req: &Request, transport_label: &'static str) {
184		log::trace!(
185			target: "rpc_metrics",
186			"[{transport_label}] on_call name={} params={:?}",
187			req.method_name(),
188			req.params(),
189		);
190
191		self.calls_started
192			.with_label_values(&[transport_label, req.method_name()])
193			.inc();
194	}
195
196	pub(crate) fn on_rejected(&self, req: &Request, transport_label: &'static str) {
197		log::trace!(
198			target: "rpc_metrics",
199			"[{transport_label}] {} call rejected due to rate limiting",
200			req.method_name(),
201		);
202		self.calls_rejected
203			.with_label_values(&[transport_label, req.method_name()])
204			.inc();
205	}
206
207	pub(crate) fn on_retry(&self, req: &Request, transport_label: &'static str) {
208		log::trace!(
209			target: "rpc_metrics",
210			"[{transport_label}] {} call retrying due to rate limiting",
211			req.method_name(),
212		);
213		self.calls_retried
214			.with_label_values(&[transport_label, req.method_name()])
215			.inc();
216	}
217
218	pub(crate) fn on_response(
219		&self,
220		req: &Request,
221		rp: &MethodResponse,
222		is_rate_limited: bool,
223		transport_label: &'static str,
224		now: Instant,
225	) {
226		log::trace!(target: "rpc_metrics", "[{transport_label}] on_response started_at={:?}", now);
227		log::trace!(target: "rpc_metrics::extra", "[{transport_label}] result={}", rp.as_result());
228
229		let micros = now.elapsed().as_micros();
230		log::debug!(
231			target: "rpc_metrics",
232			"[{transport_label}] {} call took {} μs",
233			req.method_name(),
234			micros,
235		);
236		self.calls_time
237			.with_label_values(&[
238				transport_label,
239				req.method_name(),
240				if is_rate_limited { "true" } else { "false" },
241			])
242			.observe(micros as _);
243		self.calls_finished
244			.with_label_values(&[
245				transport_label,
246				req.method_name(),
247				// the label "is_error", so `success` should be regarded as false
248				// and vice-versa to be registered correctly.
249				if rp.is_success() { "false" } else { "true" },
250				if is_rate_limited { "true" } else { "false" },
251			])
252			.inc();
253	}
254}
255
256/// Metrics with transport label.
257#[derive(Clone, Debug)]
258pub struct Metrics {
259	pub(crate) inner: RpcMetrics,
260	pub(crate) transport_label: &'static str,
261}
262
263impl Metrics {
264	/// Create a new [`Metrics`].
265	pub fn new(metrics: RpcMetrics, transport_label: &'static str) -> Self {
266		Self { inner: metrics, transport_label }
267	}
268
269	pub(crate) fn ws_connect(&self) {
270		self.inner.ws_connect();
271	}
272
273	pub(crate) fn ws_disconnect(&self, now: Instant) {
274		self.inner.ws_disconnect(now)
275	}
276
277	pub(crate) fn on_call(&self, req: &Request) {
278		self.inner.on_call(req, self.transport_label)
279	}
280
281	pub(crate) fn on_rejected(&self, req: &Request) {
282		self.inner.on_rejected(req, self.transport_label)
283	}
284
285	pub(crate) fn on_retry(&self, req: &Request) {
286		self.inner.on_retry(req, self.transport_label)
287	}
288
289	pub(crate) fn on_response(
290		&self,
291		req: &Request,
292		rp: &MethodResponse,
293		is_rate_limited: bool,
294		now: Instant,
295	) {
296		self.inner.on_response(req, rp, is_rate_limited, self.transport_label, now)
297	}
298}