use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use warp::Filter;
static HEALTH_REGISTRY: OnceCell<Arc<HealthRegistry>> = OnceCell::new();
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheck {
pub name: String,
pub status: HealthStatus,
pub message: Option<String>,
pub last_check: Instant,
pub metadata: HashMap<String, String>,
}
pub struct HealthRegistry {
checks: RwLock<HashMap<String, HealthCheck>>,
shutdown_tx: mpsc::Sender<()>,
}
impl HealthRegistry {
fn new(shutdown_tx: mpsc::Sender<()>) -> Self {
Self {
checks: RwLock::new(HashMap::new()),
shutdown_tx,
}
}
pub fn register(&self, name: String, check: HealthCheck) {
let mut checks = self.checks.write().unwrap_or_else(|e| e.into_inner());
checks.insert(name, check);
}
pub fn update(&self, name: &str, status: HealthStatus, message: Option<String>) {
let mut checks = self.checks.write().unwrap_or_else(|e| e.into_inner());
if let Some(check) = checks.get_mut(name) {
check.status = status;
check.message = message;
check.last_check = Instant::now();
}
}
pub fn overall_status(&self) -> HealthStatus {
let checks = self.checks.read().unwrap_or_else(|e| e.into_inner());
if checks.is_empty() {
return HealthStatus::Healthy;
}
let has_unhealthy = checks.values().any(|c| c.status == HealthStatus::Unhealthy);
let has_degraded = checks.values().any(|c| c.status == HealthStatus::Degraded);
if has_unhealthy {
HealthStatus::Unhealthy
} else if has_degraded {
HealthStatus::Degraded
} else {
HealthStatus::Healthy
}
}
pub fn get_all(&self) -> Vec<HealthCheck> {
let checks = self.checks.read().unwrap_or_else(|e| e.into_inner());
checks.values().cloned().collect()
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HealthResponse {
pub status: HealthStatus,
pub timestamp: u64,
pub version: String,
pub checks: Vec<HealthCheck>,
}
pub fn init(port: u16) -> Result<(), Box<dyn std::error::Error>> {
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
let registry = Arc::new(HealthRegistry::new(shutdown_tx));
HEALTH_REGISTRY.set(registry.clone()).map_err(|_| "Health checks already initialized")?;
register_default_checks();
let health_route = warp::path("health")
.and(warp::get())
.map(move || {
let (status, checks) = HEALTH_REGISTRY
.get()
.map(|registry| (registry.overall_status(), registry.get_all()))
.unwrap_or((HealthStatus::Unhealthy, Vec::new()));
let response = HealthResponse {
status,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
version: env!("CARGO_PKG_VERSION").to_string(),
checks,
};
warp::reply::json(&response)
});
let liveness_route = warp::path("health")
.and(warp::path("liveness"))
.and(warp::get())
.map(|| {
warp::reply::json(&serde_json::json!({
"status": "alive",
"timestamp": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
}))
});
let readiness_route = warp::path("health")
.and(warp::path("readiness"))
.and(warp::get())
.map(move || {
let status =
HEALTH_REGISTRY.get().map(|registry| registry.overall_status()).unwrap_or(
HealthStatus::Unhealthy,
);
let status_code = match status {
HealthStatus::Healthy => warp::http::StatusCode::OK,
HealthStatus::Degraded => warp::http::StatusCode::OK,
HealthStatus::Unhealthy => warp::http::StatusCode::SERVICE_UNAVAILABLE,
};
warp::reply::with_status(
warp::reply::json(&serde_json::json!({
"ready": status != HealthStatus::Unhealthy,
"status": status,
"timestamp": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
})),
status_code,
)
});
let routes = health_route.or(liveness_route).or(readiness_route);
let addr = ([0, 0, 0, 0], port);
tokio::spawn(async move {
let (_, server) = warp::serve(routes)
.bind_with_graceful_shutdown(addr, async move {
shutdown_rx.recv().await;
});
server.await;
});
start_health_checker();
Ok(())
}
fn register_default_checks() {
if let Some(registry) = HEALTH_REGISTRY.get() {
registry.register(
"rpc_connection".to_string(),
HealthCheck {
name: "RPC Connection".to_string(),
status: HealthStatus::Healthy,
message: Some("RPC connection initialized".to_string()),
last_check: Instant::now(),
metadata: HashMap::new(),
},
);
registry.register(
"database".to_string(),
HealthCheck {
name: "Database".to_string(),
status: HealthStatus::Healthy,
message: Some("Database connection healthy".to_string()),
last_check: Instant::now(),
metadata: HashMap::new(),
},
);
registry.register(
"memory".to_string(),
HealthCheck {
name: "Memory Usage".to_string(),
status: HealthStatus::Healthy,
message: Some("Memory usage within limits".to_string()),
last_check: Instant::now(),
metadata: HashMap::new(),
},
);
registry.register(
"blockchain_sync".to_string(),
HealthCheck {
name: "Blockchain Sync".to_string(),
status: HealthStatus::Healthy,
message: Some("Blockchain fully synced".to_string()),
last_check: Instant::now(),
metadata: HashMap::new(),
},
);
}
}
fn start_health_checker() {
tokio::spawn(async {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
if let Some(registry) = HEALTH_REGISTRY.get() {
check_rpc_health(registry);
check_memory_health(registry);
check_blockchain_health(registry);
}
}
});
}
fn check_rpc_health(registry: &Arc<HealthRegistry>) {
let healthy = true;
registry.update(
"rpc_connection",
if healthy { HealthStatus::Healthy } else { HealthStatus::Unhealthy },
Some(if healthy {
"RPC connection healthy".to_string()
} else {
"RPC connection failed".to_string()
}),
);
}
fn check_memory_health(registry: &Arc<HealthRegistry>) {
let memory_usage_percent = 50;
let (status, message) = if memory_usage_percent < 70 {
(HealthStatus::Healthy, format!("Memory usage at {}%", memory_usage_percent))
} else if memory_usage_percent < 85 {
(HealthStatus::Degraded, format!("Memory usage elevated at {}%", memory_usage_percent))
} else {
(HealthStatus::Unhealthy, format!("Memory usage critical at {}%", memory_usage_percent))
};
registry.update("memory", status, Some(message));
}
fn check_blockchain_health(registry: &Arc<HealthRegistry>) {
let synced = true;
registry.update(
"blockchain_sync",
if synced { HealthStatus::Healthy } else { HealthStatus::Degraded },
Some(if synced {
"Blockchain fully synced".to_string()
} else {
"Blockchain syncing in progress".to_string()
}),
);
}
pub fn update_health(name: &str, status: HealthStatus, message: Option<String>) {
if let Some(registry) = HEALTH_REGISTRY.get() {
registry.update(name, status, message);
}
}
pub fn register_health_check(name: String, initial_status: HealthStatus) {
if let Some(registry) = HEALTH_REGISTRY.get() {
registry.register(
name.clone(),
HealthCheck {
name: name.clone(),
status: initial_status,
message: None,
last_check: Instant::now(),
metadata: HashMap::new(),
},
);
}
}
pub fn shutdown() {
if let Some(registry) = HEALTH_REGISTRY.get() {
let _ = registry.shutdown_tx.try_send(());
}
}