Skip to main content

soil_rpc/server/middleware/
metrics.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! RPC middleware to collect prometheus metrics on RPC calls.
8
9use std::time::Instant;
10
11use jsonrpsee::{types::Request, MethodResponse};
12use soil_prometheus::{
13	register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry,
14	U64,
15};
16
17/// Histogram time buckets in microseconds.
18const HISTOGRAM_BUCKETS: [f64; 11] = [
19	5.0,
20	25.0,
21	100.0,
22	500.0,
23	1_000.0,
24	2_500.0,
25	10_000.0,
26	25_000.0,
27	100_000.0,
28	1_000_000.0,
29	10_000_000.0,
30];
31
32/// Metrics for RPC middleware storing information about the number of requests started/completed,
33/// calls started/completed and their timings.
34#[derive(Debug, Clone)]
35pub struct RpcMetrics {
36	/// Histogram over RPC execution times.
37	calls_time: HistogramVec,
38	/// Number of calls started.
39	calls_started: CounterVec<U64>,
40	/// Number of calls completed.
41	calls_finished: CounterVec<U64>,
42	/// Number of Websocket sessions opened.
43	ws_sessions_opened: Option<Counter<U64>>,
44	/// Number of Websocket sessions closed.
45	ws_sessions_closed: Option<Counter<U64>>,
46	/// Histogram over RPC websocket sessions.
47	ws_sessions_time: HistogramVec,
48}
49
50impl RpcMetrics {
51	/// Create an instance of metrics
52	pub fn new(metrics_registry: Option<&Registry>) -> Result<Option<Self>, PrometheusError> {
53		if let Some(metrics_registry) = metrics_registry {
54			Ok(Some(Self {
55				calls_time: register(
56					HistogramVec::new(
57						HistogramOpts::new(
58							"substrate_rpc_calls_time",
59							"Total time [μs] of processed RPC calls",
60						)
61						.buckets(HISTOGRAM_BUCKETS.to_vec()),
62						&["protocol", "method", "is_rate_limited"],
63					)?,
64					metrics_registry,
65				)?,
66				calls_started: register(
67					CounterVec::new(
68						Opts::new(
69							"substrate_rpc_calls_started",
70							"Number of received RPC calls (unique un-batched requests)",
71						),
72						&["protocol", "method"],
73					)?,
74					metrics_registry,
75				)?,
76				calls_finished: register(
77					CounterVec::new(
78						Opts::new(
79							"substrate_rpc_calls_finished",
80							"Number of processed RPC calls (unique un-batched requests)",
81						),
82						&["protocol", "method", "is_error", "is_rate_limited"],
83					)?,
84					metrics_registry,
85				)?,
86				ws_sessions_opened: register(
87					Counter::new(
88						"substrate_rpc_sessions_opened",
89						"Number of persistent RPC sessions opened",
90					)?,
91					metrics_registry,
92				)?
93				.into(),
94				ws_sessions_closed: register(
95					Counter::new(
96						"substrate_rpc_sessions_closed",
97						"Number of persistent RPC sessions closed",
98					)?,
99					metrics_registry,
100				)?
101				.into(),
102				ws_sessions_time: register(
103					HistogramVec::new(
104						HistogramOpts::new(
105							"substrate_rpc_sessions_time",
106							"Total time [s] for each websocket session",
107						)
108						.buckets(HISTOGRAM_BUCKETS.to_vec()),
109						&["protocol"],
110					)?,
111					metrics_registry,
112				)?,
113			}))
114		} else {
115			Ok(None)
116		}
117	}
118
119	pub(crate) fn ws_connect(&self) {
120		self.ws_sessions_opened.as_ref().map(|counter| counter.inc());
121	}
122
123	pub(crate) fn ws_disconnect(&self, now: Instant) {
124		let micros = now.elapsed().as_secs();
125
126		self.ws_sessions_closed.as_ref().map(|counter| counter.inc());
127		self.ws_sessions_time.with_label_values(&["ws"]).observe(micros as _);
128	}
129
130	pub(crate) fn on_call(&self, req: &Request, transport_label: &'static str) {
131		log::trace!(
132			target: "rpc_metrics",
133			"[{transport_label}] on_call name={} params={:?}",
134			req.method_name(),
135			req.params(),
136		);
137
138		self.calls_started
139			.with_label_values(&[transport_label, req.method_name()])
140			.inc();
141	}
142
143	pub(crate) fn on_response(
144		&self,
145		req: &Request,
146		rp: &MethodResponse,
147		is_rate_limited: bool,
148		transport_label: &'static str,
149		now: Instant,
150	) {
151		log::trace!(target: "rpc_metrics", "[{transport_label}] on_response started_at={:?}", now);
152		log::trace!(target: "rpc_metrics::extra", "[{transport_label}] result={}", rp.as_result());
153
154		let micros = now.elapsed().as_micros();
155		log::debug!(
156			target: "rpc_metrics",
157			"[{transport_label}] {} call took {} μs",
158			req.method_name(),
159			micros,
160		);
161		self.calls_time
162			.with_label_values(&[
163				transport_label,
164				req.method_name(),
165				if is_rate_limited { "true" } else { "false" },
166			])
167			.observe(micros as _);
168		self.calls_finished
169			.with_label_values(&[
170				transport_label,
171				req.method_name(),
172				// the label "is_error", so `success` should be regarded as false
173				// and vice-versa to be registered correctly.
174				if rp.is_success() { "false" } else { "true" },
175				if is_rate_limited { "true" } else { "false" },
176			])
177			.inc();
178	}
179}
180
181/// Metrics with transport label.
182#[derive(Clone, Debug)]
183pub struct Metrics {
184	pub(crate) inner: RpcMetrics,
185	pub(crate) transport_label: &'static str,
186}
187
188impl Metrics {
189	/// Create a new [`Metrics`].
190	pub fn new(metrics: RpcMetrics, transport_label: &'static str) -> Self {
191		Self { inner: metrics, transport_label }
192	}
193
194	pub(crate) fn ws_connect(&self) {
195		self.inner.ws_connect();
196	}
197
198	pub(crate) fn ws_disconnect(&self, now: Instant) {
199		self.inner.ws_disconnect(now)
200	}
201
202	pub(crate) fn on_call(&self, req: &Request) {
203		self.inner.on_call(req, self.transport_label)
204	}
205
206	pub(crate) fn on_response(
207		&self,
208		req: &Request,
209		rp: &MethodResponse,
210		is_rate_limited: bool,
211		now: Instant,
212	) {
213		self.inner.on_response(req, rp, is_rate_limited, self.transport_label, now)
214	}
215}