use std::sync::Arc;
use axum::{
Router,
response::{IntoResponse, Response},
routing::get,
};
use crate::{Config, chain_sync::SyncStatus, libp2p::PeerManager, networks::ChainConfig};
mod endpoints;
pub const DEFAULT_HEALTHCHECK_PORT: u16 = 2346;
pub(crate) struct ForestState {
pub config: Config,
pub chain_config: Arc<ChainConfig>,
pub genesis_timestamp: u64,
pub sync_status: SyncStatus,
pub peer_manager: Arc<PeerManager>,
}
pub(crate) async fn init_healthcheck_server(
forest_state: ForestState,
tcp_listener: tokio::net::TcpListener,
) -> anyhow::Result<()> {
let healthcheck_service = Router::new()
.route("/healthz", get(endpoints::healthz))
.route("/readyz", get(endpoints::readyz))
.route("/livez", get(endpoints::livez))
.with_state(forest_state.into());
axum::serve(tcp_listener, healthcheck_service).await?;
Ok(())
}
struct AppError(anyhow::Error);
impl IntoResponse for AppError {
fn into_response(self) -> Response {
(http::StatusCode::SERVICE_UNAVAILABLE, self.0.to_string()).into_response()
}
}
#[cfg(test)]
mod test {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::*;
use crate::Client;
use crate::chain_sync::{NodeSyncStatus, SyncStatusReport};
use crate::cli_shared::cli::ChainIndexerConfig;
use parking_lot::RwLock;
use reqwest::StatusCode;
#[tokio::test]
async fn test_check_readyz() {
let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let sync_status = Arc::new(RwLock::new(SyncStatusReport::init()));
let forest_state = ForestState {
config: Config {
chain_indexer: ChainIndexerConfig {
enable_indexer: true,
gc_retention_epochs: None,
},
client: Client {
healthcheck_address,
rpc_address: rpc_listener.local_addr().unwrap(),
..Default::default()
},
..Default::default()
},
chain_config: Arc::new(ChainConfig::default()),
genesis_timestamp: 0,
sync_status: sync_status.clone(),
peer_manager: Arc::new(PeerManager::default()),
};
let listener =
tokio::net::TcpListener::bind(forest_state.config.client.healthcheck_address)
.await
.unwrap();
let healthcheck_port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
init_healthcheck_server(forest_state, listener)
.await
.unwrap();
});
let call_healthcheck = |verbose| {
reqwest::get(format!(
"http://localhost:{}/readyz{}",
healthcheck_port,
if verbose { "?verbose" } else { "" }
))
};
sync_status.write().status = NodeSyncStatus::Synced;
sync_status.write().current_head_epoch = i64::MAX;
assert_eq!(
call_healthcheck(false).await.unwrap().status(),
StatusCode::OK
);
let response = call_healthcheck(true).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let text = response.text().await.unwrap();
assert!(text.contains("[+] sync complete"));
assert!(text.contains("[+] epoch up to date"));
assert!(text.contains("[+] rpc server running"));
drop(rpc_listener);
sync_status.write().status = NodeSyncStatus::Error;
sync_status.write().current_head_epoch = 0;
assert_eq!(
call_healthcheck(false).await.unwrap().status(),
StatusCode::SERVICE_UNAVAILABLE
);
let response = call_healthcheck(true).await.unwrap();
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
let text = response.text().await.unwrap();
assert!(text.contains("[!] sync incomplete"));
assert!(text.contains("[!] epoch outdated"));
assert!(text.contains("[!] rpc server not running"));
}
#[tokio::test]
async fn test_check_livez() {
let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let sync_status = Arc::new(RwLock::new(SyncStatusReport::default()));
let peer_manager = Arc::new(PeerManager::default());
let forest_state = ForestState {
config: Config {
client: Client {
healthcheck_address,
rpc_address: rpc_listener.local_addr().unwrap(),
..Default::default()
},
..Default::default()
},
chain_config: Arc::new(ChainConfig::default()),
genesis_timestamp: 0,
sync_status: sync_status.clone(),
peer_manager: peer_manager.clone(),
};
let listener =
tokio::net::TcpListener::bind(forest_state.config.client.healthcheck_address)
.await
.unwrap();
let healthcheck_port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
init_healthcheck_server(forest_state, listener)
.await
.unwrap();
});
let call_healthcheck = |verbose| {
reqwest::get(format!(
"http://localhost:{}/livez{}",
healthcheck_port,
if verbose { "?verbose" } else { "" }
))
};
sync_status.write().status = NodeSyncStatus::Syncing;
let peer = libp2p::PeerId::random();
peer_manager.touch_peer(&peer);
assert_eq!(
call_healthcheck(false).await.unwrap().status(),
StatusCode::OK
);
let response = call_healthcheck(true).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let text = response.text().await.unwrap();
assert!(text.contains("[+] sync ok"));
assert!(text.contains("[+] peers connected"));
sync_status.write().status = NodeSyncStatus::Error;
peer_manager.remove_peer(&peer);
assert_eq!(
call_healthcheck(false).await.unwrap().status(),
StatusCode::SERVICE_UNAVAILABLE
);
let response = call_healthcheck(true).await.unwrap();
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
let text = response.text().await.unwrap();
assert!(text.contains("[!] sync error"));
assert!(text.contains("[!] no peers connected"));
}
#[tokio::test]
async fn test_check_healthz() {
let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let peer_manager = Arc::new(PeerManager::default());
let sync_status = Arc::new(RwLock::new(SyncStatusReport::default()));
let forest_state = ForestState {
config: Config {
client: Client {
healthcheck_address,
rpc_address: rpc_listener.local_addr().unwrap(),
..Default::default()
},
..Default::default()
},
chain_config: Arc::new(ChainConfig::default()),
genesis_timestamp: 0,
sync_status: sync_status.clone(),
peer_manager: peer_manager.clone(),
};
let listener =
tokio::net::TcpListener::bind(forest_state.config.client.healthcheck_address)
.await
.unwrap();
let healthcheck_port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
init_healthcheck_server(forest_state, listener)
.await
.unwrap();
});
let call_healthcheck = |verbose| {
reqwest::get(format!(
"http://localhost:{}/healthz{}",
healthcheck_port,
if verbose { "?verbose" } else { "" }
))
};
sync_status.write().current_head_epoch = i64::MAX;
sync_status.write().status = NodeSyncStatus::Syncing;
let peer = libp2p::PeerId::random();
peer_manager.touch_peer(&peer);
assert_eq!(
call_healthcheck(false).await.unwrap().status(),
StatusCode::OK
);
let response = call_healthcheck(true).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let text = response.text().await.unwrap();
assert!(text.contains("[+] sync ok"));
assert!(text.contains("[+] epoch up to date"));
assert!(text.contains("[+] rpc server running"));
assert!(text.contains("[+] peers connected"));
drop(rpc_listener);
sync_status.write().status = NodeSyncStatus::Error;
sync_status.write().current_head_epoch = 0;
peer_manager.remove_peer(&peer);
assert_eq!(
call_healthcheck(false).await.unwrap().status(),
StatusCode::SERVICE_UNAVAILABLE
);
let response = call_healthcheck(true).await.unwrap();
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
let text = response.text().await.unwrap();
assert!(text.contains("[!] sync error"));
assert!(text.contains("[!] epoch outdated"));
assert!(text.contains("[!] rpc server not running"));
assert!(text.contains("[!] no peers connected"));
}
#[tokio::test]
async fn test_check_unknown_healthcheck_endpoint() {
let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let forest_state = ForestState {
config: Config {
client: Client {
healthcheck_address,
..Default::default()
},
..Default::default()
},
chain_config: Arc::default(),
genesis_timestamp: 0,
sync_status: Arc::new(RwLock::new(SyncStatusReport::default())),
peer_manager: Arc::default(),
};
let listener =
tokio::net::TcpListener::bind(forest_state.config.client.healthcheck_address)
.await
.unwrap();
let healthcheck_port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
init_healthcheck_server(forest_state, listener)
.await
.unwrap();
});
let response = reqwest::get(format!(
"http://localhost:{healthcheck_port}/phngluimglwnafhcthulhurlyehwgahnaglfhtagn"
))
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
}