use anyhow::{Context as _, Result};
use ethexe_common::db::{BlockMetaStorageRO, GlobalsStorageRO, MbStorageRO, OnChainStorageRO};
use ethexe_db::Database;
use futures::{FutureExt, Stream, stream::FusedStream};
use hyper::{
Body, Request, Response, Server,
http::StatusCode,
server::conn::AddrIncoming,
service::{make_service_fn, service_fn},
};
use metrics::Gauge;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use prometheus::{
self, Opts,
core::{AtomicU64, GenericCounterVec, GenericGaugeVec},
};
use std::{
net::SocketAddr,
pin::Pin,
sync::LazyLock,
task::{Context, Poll},
};
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
task,
task::JoinHandle,
};
pub static UNBOUNDED_CHANNELS_COUNTER: LazyLock<GenericCounterVec<AtomicU64>> =
LazyLock::new(|| {
GenericCounterVec::new(
Opts::new(
"ethexe_unbounded_channel_len",
"Items sent/received/dropped on each mpsc::unbounded instance",
),
&["entity", "action"],
)
.expect("Creating of statics doesn't fail. qed")
});
pub static UNBOUNDED_CHANNELS_SIZE: LazyLock<GenericGaugeVec<AtomicU64>> = LazyLock::new(|| {
GenericGaugeVec::new(
Opts::new(
"ethexe_unbounded_channel_size",
"Size (number of messages to be processed) of each mpsc::unbounded instance",
),
&["entity"],
)
.expect("Creating of statics doesn't fail. qed")
});
#[derive(Clone, metrics_derive::Metrics)]
#[metrics(scope = "ethexe_liveness")]
pub struct LivenessMetrics {
pub latest_committed_block_number: Gauge,
pub latest_committed_block_timestamp: Gauge,
pub time_since_latest_committed_secs: Gauge,
}
#[derive(Debug, Clone)]
pub struct PrometheusConfig {
pub name: String,
pub addr: SocketAddr,
}
#[derive(Debug)]
pub enum PrometheusEvent {
CollectMetrics {
libp2p_metrics: oneshot::Sender<String>,
},
ServerClosed(Result<(), task::JoinError>),
}
pub struct PrometheusService {
server: JoinHandle<()>,
server_receiver: mpsc::Receiver<PrometheusEvent>,
server_closed_returned: bool,
}
impl Stream for PrometheusService {
type Item = PrometheusEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(res) = self.server.poll_unpin(cx) {
self.server_closed_returned = true;
return Poll::Ready(Some(PrometheusEvent::ServerClosed(res)));
}
if let Poll::Ready(Some(event)) = self.server_receiver.poll_recv(cx) {
return Poll::Ready(Some(event));
}
Poll::Pending
}
}
impl FusedStream for PrometheusService {
fn is_terminated(&self) -> bool {
self.server_closed_returned
}
}
impl PrometheusService {
pub fn new(config: PrometheusConfig, db: Database) -> Result<Self> {
let handle = PrometheusBuilder::new()
.add_global_label("node", config.name)
.install_recorder()
.context("Failed to install prometheus recorder")?;
let metrics = LivenessMetrics::default();
let (server_sender, server_receiver) = mpsc::channel(64);
let server = tokio::spawn(
start_prometheus_server(config.addr, server_sender, handle.clone(), metrics, db)
.map(drop),
);
Ok(Self {
server,
server_receiver,
server_closed_returned: false,
})
}
}
async fn start_prometheus_server(
prometheus_addr: SocketAddr,
sender: mpsc::Sender<PrometheusEvent>,
handle: PrometheusHandle,
metrics: LivenessMetrics,
db: Database,
) -> Result<()> {
let listener = TcpListener::bind(&prometheus_addr).await?;
let listener = AddrIncoming::from_listener(listener)?;
let (signal, on_exit) = oneshot::channel::<()>();
let service = make_service_fn(move |_| {
let sender = sender.clone();
let handle = handle.clone();
let metrics = metrics.clone();
let db = db.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
request_metrics(
req,
sender.clone(),
handle.clone(),
metrics.clone(),
db.clone(),
)
}))
}
});
let server = Server::builder(listener)
.serve(service)
.with_graceful_shutdown(async {
let _ = on_exit.await;
});
log::info!("〽️ Prometheus exporter started at {prometheus_addr}");
let result = server.await.map_err(Into::into);
let _ = signal.send(());
result
}
async fn request_metrics(
req: Request<Body>,
sender: mpsc::Sender<PrometheusEvent>,
handle: PrometheusHandle,
metrics: LivenessMetrics,
db: Database,
) -> Result<Response<Body>> {
if req.uri().path() == "/metrics" {
update_liveness_metrics(db, metrics);
let mut metrics = handle.render();
debug_assert!(metrics.ends_with('\n'));
debug_assert!(!metrics.ends_with("# EOF\n"));
let (tx, rx) = oneshot::channel();
sender
.send(PrometheusEvent::CollectMetrics { libp2p_metrics: tx })
.await
.expect("channel must never be closed");
if let Ok(libp2p_metrics) = rx.await {
metrics += &libp2p_metrics;
}
Response::builder()
.status(StatusCode::OK)
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static("text/plain"),
)
.body(Body::from(metrics))
} else {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not found."))
}
.context("Failed to request metrics")
}
fn update_liveness_metrics(db: Database, metrics: LivenessMetrics) {
let Some(latest_committed_block_header) = db
.block_meta(db.globals().latest_prepared_eb_hash)
.last_committed_mb
.map(|mb_hash| db.mb_meta(mb_hash).last_advanced_eb)
.and_then(|eth_block| db.block_header(eth_block))
else {
return;
};
let time_since_latest_committed_secs = db
.globals()
.latest_synced_eb
.header
.timestamp
.saturating_sub(latest_committed_block_header.timestamp);
metrics
.latest_committed_block_number
.set(latest_committed_block_header.height as f64);
metrics
.latest_committed_block_timestamp
.set(latest_committed_block_header.timestamp as f64);
metrics
.time_since_latest_committed_secs
.set(time_since_latest_committed_secs as f64);
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use std::{net::Ipv4Addr, time::Duration};
use tokio::{task, time};
#[tokio::test]
async fn fused_stream_works() {
let mut service = PrometheusService::new(
PrometheusConfig {
name: "".to_string(),
addr: (Ipv4Addr::LOCALHOST, 0).into(),
},
Database::memory(),
)
.unwrap();
assert!(!service.is_terminated());
time::timeout(Duration::from_secs(5), async {
service.server.abort();
while !service.server.is_finished() {
task::yield_now().await;
}
})
.await
.unwrap();
assert!(!service.is_terminated());
let event = service.select_next_some().await;
if let PrometheusEvent::ServerClosed(res) = event {
assert!(res.unwrap_err().is_cancelled());
} else {
unreachable!("unexpected event: {event:?}");
}
assert!(service.is_terminated());
}
}