#![expect(
clippy::cast_precision_loss,
reason = "prometheus uses f64 and we're well within f64 bounds for all u64's we're sending"
)]
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use anyhow::{Context as _, Result, anyhow};
use futures_util::FutureExt;
use http::{Method, StatusCode, header};
use jsonrpsee::core::middleware::{Batch, Notification, RpcServiceT};
use jsonrpsee::core::server::MethodResponse;
use jsonrpsee::server::{HttpBody, HttpRequest, HttpResponse};
use jsonrpsee::types::Request;
use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram};
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
use tower::{Layer, Service};
use crate::rpc::SubKind;
const BUILD_INFO: &str = "neve_build_info";
const PROCESS_START_TIME: &str = "neve_process_start_time_seconds";
const INGEST_HEAD_HEIGHT: &str = "neve_ingest_head_height";
const INGEST_CONTIGUOUS_HEIGHT: &str = "neve_ingest_contiguous_height";
const INGEST_BEHIND_BLOCKS: &str = "neve_ingest_behind_blocks";
const INGEST_BLOCKS_TOTAL: &str = "neve_ingest_blocks_total";
const INGEST_LAST_BLOCK_TIMESTAMP: &str = "neve_ingest_last_block_timestamp_seconds";
const RPC_REQUESTS_TOTAL: &str = "neve_rpc_requests_total";
const RPC_REQUEST_DURATION_SECONDS: &str = "neve_rpc_request_duration_seconds";
const RPC_OPEN_CONNECTIONS: &str = "neve_rpc_open_connections";
const RPC_MISDIRECTED_TOTAL: &str = "neve_rpc_misdirected_total";
const UPSTREAM_REQUESTS_TOTAL: &str = "neve_upstream_requests_total";
const UPSTREAM_REQUEST_DURATION_SECONDS: &str = "neve_upstream_request_duration_seconds";
const UPSTREAM_RETRY_AFTER_SECONDS: &str = "neve_upstream_retry_after_seconds";
const UPSTREAM_CONNECTED_SINCE: &str = "neve_upstream_connected_since_seconds";
const UPSTREAM_WS_RECONNECTS_TOTAL: &str = "neve_upstream_ws_reconnects_total";
const UPSTREAM_WS_IDLE_TIMEOUTS_TOTAL: &str = "neve_upstream_ws_idle_timeouts_total";
const SUB_OPEN: &str = "neve_sub_open";
const SUB_LAGGED_TOTAL: &str = "neve_sub_lagged_total";
const SUB_SENT_BYTES_TOTAL: &str = "neve_sub_sent_bytes_total";
#[derive(Debug)]
pub enum BlockSource {
Live,
Backfill,
}
impl BlockSource {
const fn as_str(&self) -> &'static str {
match self {
Self::Live => "live",
Self::Backfill => "backfill",
}
}
}
#[derive(Debug)]
pub enum UpstreamOutcome {
Ok,
Empty,
TooManyRequests,
ServiceUnavailable,
Error,
}
impl UpstreamOutcome {
const fn as_str(&self) -> &'static str {
match self {
Self::Ok => "ok",
Self::Empty => "empty",
Self::TooManyRequests => "429",
Self::ServiceUnavailable => "503",
Self::Error => "error",
}
}
}
impl From<StatusCode> for UpstreamOutcome {
fn from(status: StatusCode) -> Self {
if status == StatusCode::TOO_MANY_REQUESTS {
Self::TooManyRequests
} else if status == StatusCode::SERVICE_UNAVAILABLE {
Self::ServiceUnavailable
} else {
Self::Error
}
}
}
pub fn process_metadata() {
gauge!(
BUILD_INFO,
"version" => env!("CARGO_PKG_VERSION"),
"commit" => env!("NEVE_GIT_COMMIT"),
)
.set(1.0);
let start = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0.0, |d| d.as_secs_f64());
gauge!(PROCESS_START_TIME).set(start);
}
pub fn process_collector() -> metrics_process::Collector {
let collector = metrics_process::Collector::default();
collector.describe();
collector
}
pub fn ingest_heights(head: u64, contiguous: u64, behind: u64) {
gauge!(INGEST_HEAD_HEIGHT).set(head as f64);
gauge!(INGEST_CONTIGUOUS_HEIGHT).set(contiguous as f64);
gauge!(INGEST_BEHIND_BLOCKS).set(behind as f64);
}
pub fn block_persisted(source: BlockSource) {
counter!(INGEST_BLOCKS_TOTAL, "source" => source.as_str()).increment(1);
}
pub fn last_block_timestamp(secs: u64) {
gauge!(INGEST_LAST_BLOCK_TIMESTAMP).set(secs as f64);
}
pub fn rpc_call(method: &'static str, status: &'static str, secs: f64) {
counter!(RPC_REQUESTS_TOTAL, "method" => method, "status" => status).increment(1);
histogram!(RPC_REQUEST_DURATION_SECONDS, "method" => method).record(secs);
}
pub fn rpc_misdirected() {
counter!(RPC_MISDIRECTED_TOTAL).increment(1);
}
pub fn upstream_request(outcome: impl Into<UpstreamOutcome>, secs: f64) {
counter!(UPSTREAM_REQUESTS_TOTAL, "outcome" => outcome.into().as_str()).increment(1);
histogram!(UPSTREAM_REQUEST_DURATION_SECONDS).record(secs);
}
pub fn upstream_retry_after(secs: u64) {
histogram!(UPSTREAM_RETRY_AFTER_SECONDS).record(secs as f64);
}
pub fn upstream_connected() {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0.0, |d| d.as_secs_f64());
gauge!(UPSTREAM_CONNECTED_SINCE).set(now);
}
pub fn ws_reconnect() {
counter!(UPSTREAM_WS_RECONNECTS_TOTAL).increment(1);
}
pub fn ws_idle_timeout() {
counter!(UPSTREAM_WS_IDLE_TIMEOUTS_TOTAL).increment(1);
}
#[derive(Debug)]
pub struct SubMetricsGuard {
kind: SubKind,
}
impl SubMetricsGuard {
pub fn new(kind: SubKind) -> Self {
gauge!(SUB_OPEN, "kind" => kind.as_str()).increment(1.0);
Self { kind }
}
pub fn lagged(&self, n: u64) {
counter!(SUB_LAGGED_TOTAL, "kind" => self.kind.as_str()).increment(n);
}
pub fn sent_bytes(&self, bytes: u64) {
counter!(SUB_SENT_BYTES_TOTAL, "kind" => self.kind.as_str()).increment(bytes);
}
}
impl Drop for SubMetricsGuard {
fn drop(&mut self) {
gauge!(SUB_OPEN, "kind" => self.kind.as_str()).decrement(1.0);
}
}
const RETRY_AFTER_BUCKETS: &[f64] = &[0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0];
const UPSTREAM_DURATION_BUCKETS: &[f64] = &[
0.01, 0.015, 0.02, 0.03, 0.05, 0.07, 0.1, 0.15, 0.2, 0.3, 0.5, 0.7, 1.0, 1.5, 2.0,
];
const RPC_DURATION_BUCKETS: &[f64] = &[
0.000_025, 0.000_05, 0.000_1, 0.000_25, 0.000_5, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1,
0.25, 0.5,
];
pub fn install() -> Result<PrometheusHandle> {
let recorder = PrometheusBuilder::new()
.set_buckets_for_metric(
Matcher::Full(UPSTREAM_RETRY_AFTER_SECONDS.to_owned()),
RETRY_AFTER_BUCKETS,
)
.context("configuring retry-after histogram buckets")?
.set_buckets_for_metric(
Matcher::Full(RPC_REQUEST_DURATION_SECONDS.to_owned()),
RPC_DURATION_BUCKETS,
)
.context("configuring rpc-duration histogram buckets")?
.set_buckets_for_metric(
Matcher::Full(UPSTREAM_REQUEST_DURATION_SECONDS.to_owned()),
UPSTREAM_DURATION_BUCKETS,
)
.context("configuring upstream-duration histogram buckets")?
.build_recorder();
let handle = recorder.handle();
metrics::set_global_recorder(recorder)
.map_err(|e| anyhow!("install global metrics recorder: {e}"))?;
describe_metrics();
process_metadata();
Ok(handle)
}
fn describe_metrics() {
describe_gauge!(
BUILD_INFO,
"Build metadata as a constant 1; version and short git commit carried in labels."
);
describe_gauge!(
PROCESS_START_TIME,
metrics::Unit::Seconds,
"Process start time (unix epoch seconds). Uptime = time() - this."
);
describe_gauge!(
INGEST_HEAD_HEIGHT,
"Highest stored block height (the blockstore high-water mark)."
);
describe_gauge!(
INGEST_CONTIGUOUS_HEIGHT,
"Highest gap-free stored block height."
);
describe_gauge!(
INGEST_BEHIND_BLOCKS,
"Blocks between the contiguous frontier and the upstream tip (0 = caught up). Primary freshness alerting signal."
);
describe_counter!(
INGEST_BLOCKS_TOTAL,
"Blocks persisted. Label source={live|backfill}."
);
describe_gauge!(
INGEST_LAST_BLOCK_TIMESTAMP,
metrics::Unit::Seconds,
"Block-header timestamp of the latest live block (unix epoch seconds). Staleness = time() - this."
);
describe_counter!(
RPC_REQUESTS_TOTAL,
"Served JSON-RPC method calls. Labels method (registered eth_* set, else \"other\") and status={ok|error}."
);
describe_histogram!(
RPC_REQUEST_DURATION_SECONDS,
metrics::Unit::Seconds,
"Served JSON-RPC method-call latency. Label method (clamped to the registered set)."
);
describe_gauge!(
RPC_OPEN_CONNECTIONS,
"Open JSON-RPC transport connections currently being served."
);
describe_counter!(
RPC_MISDIRECTED_TOTAL,
"Responses rewritten 200->421: a requested block/hash is outside this mirror's stored tail."
);
describe_counter!(
UPSTREAM_REQUESTS_TOTAL,
"Upstream block fetches (backfill HTTPS + live on-socket getBlockByNumber). Label outcome={ok|empty|429|503|error}."
);
describe_histogram!(
UPSTREAM_REQUEST_DURATION_SECONDS,
metrics::Unit::Seconds,
"Per-attempt upstream HTTPS request latency (round-trip incl. body decode, excl. retry backoff)."
);
describe_histogram!(
UPSTREAM_RETRY_AFTER_SECONDS,
metrics::Unit::Seconds,
"Retry-After delays requested by the upstream on 429/503."
);
describe_gauge!(
UPSTREAM_CONNECTED_SINCE,
metrics::Unit::Seconds,
"Unix epoch seconds of the last successful upstream live subscribe. Session age = time() - this."
);
describe_counter!(
UPSTREAM_WS_RECONNECTS_TOTAL,
"WebSocket reconnects to the upstream."
);
describe_counter!(
UPSTREAM_WS_IDLE_TIMEOUTS_TOTAL,
"Idle-watchdog timeouts that forced a WebSocket reconnect."
);
describe_gauge!(
SUB_OPEN,
"Active eth_subscribe subscriptions. Label kind={newHeads|newBlocks|oldBlocks}."
);
describe_counter!(
SUB_LAGGED_TOTAL,
"Blocks dropped for subscribers that fell behind the broadcast ring. Label kind={newHeads|newBlocks} (live kinds only)."
);
describe_counter!(
SUB_SENT_BYTES_TOTAL,
"Serialized bytes pushed to subscribers. Label kind={newHeads|newBlocks|oldBlocks}."
);
}
#[derive(Clone, Debug)]
pub struct MetricsLayer {
handle: PrometheusHandle,
}
impl MetricsLayer {
pub const fn new(handle: PrometheusHandle) -> Self {
Self { handle }
}
}
impl<S> Layer<S> for MetricsLayer {
type Service = MetricsService<S>;
fn layer(&self, inner: S) -> Self::Service {
MetricsService {
inner,
handle: self.handle.clone(),
}
}
}
#[derive(Clone, Debug)]
pub struct MetricsService<S> {
inner: S,
handle: PrometheusHandle,
}
impl<S> Service<HttpRequest<HttpBody>> for MetricsService<S>
where
S: Service<HttpRequest<HttpBody>, Response = HttpResponse<HttpBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Send + 'static,
{
type Response = HttpResponse<HttpBody>;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: HttpRequest<HttpBody>) -> Self::Future {
if req.method() == Method::GET && req.uri().path() == "/metrics" {
let body = self.handle.render();
let resp = HttpResponse::builder()
.status(StatusCode::OK)
.header(
header::CONTENT_TYPE,
"text/plain; version=0.0.4; charset=utf-8",
)
.body(HttpBody::from(body))
.expect("static metrics response is valid");
return std::future::ready(Ok(resp)).boxed();
}
self.inner.call(req).boxed()
}
}
#[derive(Debug)]
struct ConnGuard;
impl ConnGuard {
fn new() -> Self {
gauge!(RPC_OPEN_CONNECTIONS).increment(1.0);
Self
}
}
impl Drop for ConnGuard {
fn drop(&mut self) {
gauge!(RPC_OPEN_CONNECTIONS).decrement(1.0);
}
}
#[derive(Clone)]
pub struct RpcMetricsService<S> {
inner: S,
methods: Arc<[&'static str]>,
_conn: Arc<ConnGuard>,
}
impl<S> RpcMetricsService<S> {
pub fn new(inner: S, methods: Arc<[&'static str]>) -> Self {
Self {
inner,
methods,
_conn: Arc::new(ConnGuard::new()),
}
}
fn label(&self, method: &str) -> &'static str {
self.methods
.iter()
.copied()
.find(|&m| m == method)
.unwrap_or("other")
}
}
impl<S> RpcServiceT for RpcMetricsService<S>
where
S: RpcServiceT<MethodResponse = MethodResponse> + Send + Sync + Clone + 'static,
{
type MethodResponse = S::MethodResponse;
type NotificationResponse = S::NotificationResponse;
type BatchResponse = S::BatchResponse;
fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
let method = self.label(req.method.as_ref());
let inner = self.inner.clone();
async move {
let start = Instant::now();
let rp = inner.call(req).await;
let status = if rp.is_success() { "ok" } else { "error" };
rpc_call(method, status, start.elapsed().as_secs_f64());
rp
}
}
fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
self.inner.batch(batch)
}
fn notification<'a>(
&self,
n: Notification<'a>,
) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
self.inner.notification(n)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[expect(
clippy::too_many_lines,
reason = "contract test deliberately exercises every helper and asserts every rendered series in one place; splitting would scatter the contract"
)]
fn helpers_render_expected_series() {
let recorder = PrometheusBuilder::new()
.set_buckets_for_metric(
Matcher::Full(UPSTREAM_RETRY_AFTER_SECONDS.to_owned()),
RETRY_AFTER_BUCKETS,
)
.expect("bucket config")
.set_buckets_for_metric(
Matcher::Full(RPC_REQUEST_DURATION_SECONDS.to_owned()),
RPC_DURATION_BUCKETS,
)
.expect("bucket config")
.set_buckets_for_metric(
Matcher::Full(UPSTREAM_REQUEST_DURATION_SECONDS.to_owned()),
UPSTREAM_DURATION_BUCKETS,
)
.expect("bucket config")
.build_recorder();
let handle = recorder.handle();
metrics::with_local_recorder(&recorder, || {
process_metadata();
rpc_call("eth_chainId", "ok", 0.001);
rpc_misdirected();
ingest_heights(100, 90, 10);
block_persisted(BlockSource::Live);
block_persisted(BlockSource::Backfill);
last_block_timestamp(1_780_000_000);
upstream_request(UpstreamOutcome::Ok, 0.012);
upstream_request(UpstreamOutcome::Empty, 0.012);
upstream_request(UpstreamOutcome::TooManyRequests, 0.012);
upstream_retry_after(7);
upstream_connected();
ws_reconnect();
ws_idle_timeout();
let guard = SubMetricsGuard::new(SubKind::NewHeads);
guard.sent_bytes(512);
guard.lagged(3);
});
let out = handle.render();
assert!(
out.contains(&format!(
r#"neve_build_info{{version="{}",commit="{}"}} 1"#,
env!("CARGO_PKG_VERSION"),
env!("NEVE_GIT_COMMIT"),
)),
"{out}"
);
assert!(out.contains("neve_process_start_time_seconds "), "{out}");
assert!(
out.contains(r#"neve_rpc_requests_total{method="eth_chainId",status="ok"} 1"#),
"{out}"
);
assert!(
out.contains(r#"neve_rpc_request_duration_seconds_bucket{method="eth_chainId""#),
"{out}"
);
assert!(out.contains("neve_rpc_misdirected_total 1"), "{out}");
assert!(out.contains("neve_ingest_head_height 100"), "{out}");
assert!(out.contains("neve_ingest_behind_blocks 10"), "{out}");
assert!(
out.contains(r#"neve_ingest_blocks_total{source="live"} 1"#),
"{out}"
);
assert!(
out.contains(r#"neve_ingest_blocks_total{source="backfill"} 1"#),
"{out}"
);
assert!(
out.contains("neve_ingest_last_block_timestamp_seconds 1780000000"),
"{out}"
);
assert!(
out.contains(r#"neve_upstream_requests_total{outcome="empty"} 1"#),
"{out}"
);
assert!(
out.contains(r#"neve_upstream_requests_total{outcome="429"} 1"#),
"{out}"
);
assert!(
out.contains("neve_upstream_request_duration_seconds_bucket"),
"{out}"
);
assert!(
out.contains("neve_upstream_request_duration_seconds_count 3"),
"{out}"
);
assert!(
out.contains("neve_upstream_connected_since_seconds "),
"{out}"
);
assert!(out.contains("neve_upstream_ws_reconnects_total 1"), "{out}");
assert!(
out.contains("neve_upstream_ws_idle_timeouts_total 1"),
"{out}"
);
assert!(
out.contains("neve_upstream_retry_after_seconds_bucket"),
"{out}"
);
assert!(
out.contains("neve_upstream_retry_after_seconds_count 1"),
"{out}"
);
assert!(out.contains(r#"neve_sub_open{kind="newHeads"} 0"#), "{out}");
assert!(
out.contains(r#"neve_sub_sent_bytes_total{kind="newHeads"} 512"#),
"{out}"
);
assert!(
out.contains(r#"neve_sub_lagged_total{kind="newHeads"} 3"#),
"{out}"
);
}
}