mod config;
#[cfg(test)]
mod tests;
pub use config::Config;
use derive_new::new;
use std::time::Instant;
use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
use bytes::Bytes;
use chrono::Utc;
use http_body_util::Full;
use hyper::header::{CONTENT_LENGTH, CONTENT_TYPE};
use hyper::server::conn::http1;
use hyper::{
body::Incoming, http::response::Builder as ResponseBuilder, Method, Request, Response,
StatusCode,
};
use hyper_util::rt::TokioIo;
use tokio::{
sync::watch,
task::JoinHandle,
time::{self, MissedTickBehavior},
};
use tracing::{debug, info, warn};
use zebra_chain::{chain_sync_status::ChainSyncStatus, parameters::Network};
use zebra_network::AddressBookPeers;
const PEER_METRICS_REFRESH_INTERVAL: Duration = Duration::from_secs(5);
const METHOD_NOT_ALLOWED_MSG: &str = "method not allowed";
const NOT_FOUND_MSG: &str = "not found";
const MAX_RECENT_REQUESTS: usize = 10_000;
const RECENT_REQUEST_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Clone)]
struct HealthCtx<SyncStatus>
where
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
{
config: Config,
network: Network,
chain_tip_metrics_receiver: watch::Receiver<ChainTipMetrics>,
sync_status: SyncStatus,
num_live_peer_receiver: watch::Receiver<usize>,
}
#[derive(Debug, Clone, PartialEq, Eq, new)]
pub struct ChainTipMetrics {
pub last_chain_tip_grow_time: Instant,
pub remaining_sync_blocks: Option<i64>,
}
impl ChainTipMetrics {
pub fn channel() -> (watch::Sender<Self>, watch::Receiver<Self>) {
watch::channel(Self {
last_chain_tip_grow_time: Instant::now(),
remaining_sync_blocks: None,
})
}
}
pub async fn init<AddressBook, SyncStatus>(
config: Config,
network: Network,
chain_tip_metrics_receiver: watch::Receiver<ChainTipMetrics>,
sync_status: SyncStatus,
address_book: AddressBook,
) -> (JoinHandle<()>, Option<SocketAddr>)
where
AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
{
let Some(listen_addr) = config.listen_addr else {
return (tokio::spawn(std::future::pending()), None);
};
info!("opening health endpoint at {listen_addr}...",);
let listener = tokio::net::TcpListener::bind(listen_addr)
.await
.unwrap_or_else(|e| panic!("Opening health endpoint listener {listen_addr:?} failed: {e:?}. Hint: Check if another zebrad is running, or change the health listen_addr in the config."));
let local = listener.local_addr().unwrap_or_else(|err| {
tracing::warn!(?err, "failed to read local addr from TcpListener");
listen_addr
});
info!("opened health endpoint at {}", local);
let (num_live_peer_sender, num_live_peer_receiver) = watch::channel(0);
if let Some(metrics) = num_live_peers(&address_book).await {
let _ = num_live_peer_sender.send(metrics);
}
let metrics_task = tokio::spawn(peer_metrics_refresh_task(
address_book.clone(),
num_live_peer_sender,
));
let shared = Arc::new(HealthCtx {
config,
network,
chain_tip_metrics_receiver,
sync_status,
num_live_peer_receiver,
});
let server_task = tokio::spawn(run_health_server(listener, shared));
let task = tokio::spawn(async move {
tokio::select! {
_ = metrics_task => {},
_ = server_task => {},
}
});
(task, Some(local))
}
async fn handle_request<SyncStatus>(
req: Request<Incoming>,
ctx: Arc<HealthCtx<SyncStatus>>,
) -> Result<Response<Full<Bytes>>, Infallible>
where
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
{
if req.method() != Method::GET {
return Ok(simple_response(
StatusCode::METHOD_NOT_ALLOWED,
METHOD_NOT_ALLOWED_MSG,
));
}
let path = req.uri().path();
let response = match path {
"/healthy" => healthy(&ctx).await,
"/ready" => ready(&ctx).await,
_ => simple_response(StatusCode::NOT_FOUND, NOT_FOUND_MSG),
};
Ok(response)
}
async fn healthy<SyncStatus>(ctx: &HealthCtx<SyncStatus>) -> Response<Full<Bytes>>
where
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
{
if *ctx.num_live_peer_receiver.borrow() >= ctx.config.min_connected_peers {
simple_response(StatusCode::OK, "ok")
} else {
simple_response(StatusCode::SERVICE_UNAVAILABLE, "insufficient peers")
}
}
async fn ready<SyncStatus>(ctx: &HealthCtx<SyncStatus>) -> Response<Full<Bytes>>
where
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
{
if !ctx.config.enforce_on_test_networks && ctx.network.is_a_test_network() {
return simple_response(StatusCode::OK, "ok");
}
if *ctx.num_live_peer_receiver.borrow() < ctx.config.min_connected_peers {
return simple_response(StatusCode::SERVICE_UNAVAILABLE, "insufficient peers");
}
if !ctx.sync_status.is_close_to_tip() {
return simple_response(StatusCode::SERVICE_UNAVAILABLE, "syncing");
}
let ChainTipMetrics {
last_chain_tip_grow_time,
remaining_sync_blocks,
} = ctx.chain_tip_metrics_receiver.borrow().clone();
let Some(remaining_sync_blocks) = remaining_sync_blocks else {
tracing::warn!("syncer is getting block hashes from peers, but state is empty");
return simple_response(StatusCode::SERVICE_UNAVAILABLE, "no tip");
};
let tip_age = last_chain_tip_grow_time.elapsed();
if tip_age > ctx.config.ready_max_tip_age {
return simple_response(
StatusCode::SERVICE_UNAVAILABLE,
&format!("tip_age={}s", tip_age.as_secs()),
);
}
if remaining_sync_blocks <= ctx.config.ready_max_blocks_behind {
simple_response(StatusCode::OK, "ok")
} else {
simple_response(
StatusCode::SERVICE_UNAVAILABLE,
&format!("lag={remaining_sync_blocks} blocks"),
)
}
}
async fn num_live_peers<A>(address_book: &A) -> Option<usize>
where
A: AddressBookPeers + Clone + Send + Sync + 'static,
{
let address_book = address_book.clone();
tokio::task::spawn_blocking(move || address_book.recently_live_peers(Utc::now()).len())
.await
.inspect_err(|err| warn!(?err, "failed to refresh peer metrics"))
.ok()
}
async fn peer_metrics_refresh_task<A>(address_book: A, num_live_peer_sender: watch::Sender<usize>)
where
A: AddressBookPeers + Clone + Send + Sync + 'static,
{
let mut interval = time::interval(PEER_METRICS_REFRESH_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
if let Some(metrics) = num_live_peers(&address_book).await {
if let Err(err) = num_live_peer_sender.send(metrics) {
tracing::warn!(?err, "failed to send to peer metrics channel");
break;
}
}
interval.tick().await;
}
}
async fn run_health_server<SyncStatus>(
listener: tokio::net::TcpListener,
shared: Arc<HealthCtx<SyncStatus>>,
) where
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
{
let mut num_recent_requests: usize = 0;
let mut last_request_count_reset_time = Instant::now();
loop {
match listener.accept().await {
Ok((stream, _)) => {
if num_recent_requests < MAX_RECENT_REQUESTS {
num_recent_requests += 1;
} else if last_request_count_reset_time.elapsed() > RECENT_REQUEST_INTERVAL {
num_recent_requests = 0;
last_request_count_reset_time = Instant::now();
} else {
continue;
}
let io = TokioIo::new(stream);
let svc_ctx = shared.clone();
let service =
hyper::service::service_fn(move |req| handle_request(req, svc_ctx.clone()));
tokio::spawn(async move {
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
debug!(?err, "health server connection closed with error");
}
});
}
Err(err) => {
warn!(?err, "health server accept failed");
}
}
}
}
fn simple_response(status: StatusCode, body: &str) -> Response<Full<Bytes>> {
let bytes = Bytes::from(body.to_string());
let len = bytes.len();
ResponseBuilder::new()
.status(status)
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
.header(CONTENT_LENGTH, len.to_string())
.body(Full::new(bytes))
.expect("valid response")
}