use crate::metrics;
use crate::metrics::GaugeGuardExt as _;
use crate::{for_each_rpc_method, rpc::reflect::RpcMethod as _};
use ahash::HashMap;
use futures::future::Either;
use jsonrpsee::MethodResponse;
use jsonrpsee::core::middleware::{Batch, Notification};
use jsonrpsee::server::middleware::rpc::RpcServiceT;
use prometheus_client::metrics::{counter::Counter, histogram::Histogram};
use std::sync::LazyLock;
use tower::Layer;
struct MethodMetrics {
time: Histogram,
failure: Counter,
}
static METHOD_METRICS: LazyLock<HashMap<&'static str, MethodMetrics>> = LazyLock::new(|| {
fn register(map: &mut HashMap<&'static str, MethodMetrics>, name: &'static str) {
let label = metrics::RpcMethodLabel { method: name };
let time = metrics::RPC_METHOD_TIME.get_or_create(&label).clone();
let failure = metrics::RPC_METHOD_FAILURE.get_or_create(&label).clone();
map.insert(name, MethodMetrics { time, failure });
}
let mut map = HashMap::default();
macro_rules! insert {
($ty:ty) => {
register(&mut map, <$ty>::NAME);
if let Some(alias) = <$ty>::NAME_ALIAS {
register(&mut map, alias);
}
};
}
for_each_rpc_method!(insert);
register(&mut map, crate::rpc::chain::CHAIN_NOTIFY);
register(&mut map, crate::rpc::channel::CANCEL_METHOD_NAME);
register(&mut map, "unknown");
map
});
fn method_metrics(method: &str) -> &'static MethodMetrics {
METHOD_METRICS
.get(method)
.or_else(|| METHOD_METRICS.get("unknown"))
.expect("`unknown` metrics entry is always registered")
}
crate::def_is_env_truthy!(is_rpc_metrics_disabled, "FOREST_RPC_METRICS_DISABLED");
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum MetricsMode {
Enabled,
Disabled,
}
impl From<bool> for MetricsMode {
fn from(enabled: bool) -> Self {
if enabled {
Self::Enabled
} else {
Self::Disabled
}
}
}
#[derive(Clone)]
pub(super) struct MetricsLayer {
mode: MetricsMode,
}
impl MetricsLayer {
pub(super) fn new(mode: MetricsMode) -> Self {
let mode = if is_rpc_metrics_disabled() {
MetricsMode::Disabled
} else {
mode
};
Self { mode }
}
}
impl<S> Layer<S> for MetricsLayer {
type Service = RecordMetrics<S>;
fn layer(&self, service: S) -> Self::Service {
RecordMetrics {
service,
mode: self.mode,
}
}
}
#[derive(Clone)]
pub(super) struct RecordMetrics<S> {
service: S,
mode: MetricsMode,
}
impl<S> RecordMetrics<S> {
async fn log<F>(method: &'static MethodMetrics, future: F) -> MethodResponse
where
F: Future<Output = MethodResponse>,
{
let _in_flight = metrics::RPC_IN_FLIGHT.inc_guard();
let start_time = std::time::Instant::now();
let resp = future.await;
method
.time
.observe(start_time.elapsed().as_secs_f64() * 1000.0);
if resp.is_error() {
method.failure.inc();
}
resp
}
}
impl<S> RpcServiceT for RecordMetrics<S>
where
S: RpcServiceT<MethodResponse = MethodResponse, NotificationResponse = MethodResponse>
+ Send
+ Sync
+ Clone
+ 'static,
{
type MethodResponse = S::MethodResponse;
type NotificationResponse = S::NotificationResponse;
type BatchResponse = S::BatchResponse;
fn call<'a>(
&self,
req: jsonrpsee::types::Request<'a>,
) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
if self.mode == MetricsMode::Enabled {
Either::Right(Self::log(
method_metrics(req.method_name()),
self.service.call(req),
))
} else {
Either::Left(self.service.call(req))
}
}
fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
self.service.batch(batch)
}
fn notification<'a>(
&self,
n: Notification<'a>,
) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
if self.mode == MetricsMode::Enabled {
Either::Right(Self::log(
method_metrics(n.method_name()),
self.service.notification(n),
))
} else {
Either::Left(self.service.notification(n))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn resolves_known_and_unknown_methods() {
let _ = method_metrics("eth_getBlockByNumber");
let _ = method_metrics("Bogus.Nonexistent");
assert!(METHOD_METRICS.contains_key("unknown"));
assert!(METHOD_METRICS.contains_key(crate::rpc::chain::CHAIN_NOTIFY));
}
#[test]
#[serial_test::serial]
fn rpc_metrics_disabled_env_forces_passthrough() {
unsafe { std::env::set_var("FOREST_RPC_METRICS_DISABLED", "1") };
assert_eq!(
MetricsLayer::new(MetricsMode::Enabled).mode,
MetricsMode::Disabled
);
unsafe { std::env::remove_var("FOREST_RPC_METRICS_DISABLED") };
assert_eq!(
MetricsLayer::new(MetricsMode::Enabled).mode,
MetricsMode::Enabled
);
assert_eq!(
MetricsLayer::new(MetricsMode::Disabled).mode,
MetricsMode::Disabled
);
}
}