use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::get, Json, Router};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::SystemTime;
use crate::app::AppState;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthStatus {
pub status: String,
pub version: String,
pub timestamp: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub uptime_seconds: Option<u64>,
}
impl HealthStatus {
pub fn healthy() -> Self {
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
status: "healthy".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
timestamp,
uptime_seconds: None,
}
}
pub fn unhealthy(reason: String) -> Self {
Self {
status: format!("unhealthy: {}", reason),
version: env!("CARGO_PKG_VERSION").to_string(),
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
uptime_seconds: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReadinessStatus {
pub status: String,
pub version: String,
pub timestamp: u64,
pub components: ReadinessComponents,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReadinessComponents {
pub catalog: ComponentStatus,
pub storage: ComponentStatus,
pub authentication: ComponentStatus,
pub authorization: ComponentStatus,
pub credentials: ComponentStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComponentStatus {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
}
impl ComponentStatus {
pub fn ready() -> Self {
Self {
status: "ready".to_string(),
message: None,
}
}
pub fn degraded(message: impl Into<String>) -> Self {
Self {
status: "degraded".to_string(),
message: Some(message.into()),
}
}
pub fn unavailable(message: impl Into<String>) -> Self {
Self {
status: "unavailable".to_string(),
message: Some(message.into()),
}
}
}
impl ReadinessStatus {
pub async fn check(state: &AppState) -> Self {
use tokio::time::{timeout, Duration};
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let check_timeout = Duration::from_secs(5);
let catalog = match timeout(check_timeout, state.catalog.list_namespaces(None)).await {
Ok(Ok(_)) => ComponentStatus::ready(),
Ok(Err(e)) => {
tracing::warn!(error = %e, "Catalog health check failed");
ComponentStatus::degraded(format!("Catalog error: {}", e))
}
Err(_) => {
tracing::warn!("Catalog health check timed out");
ComponentStatus::degraded("Catalog response timeout")
}
};
let storage = match timeout(check_timeout, state.catalog.storage_health_check()).await {
Ok(Ok(status)) if status.healthy => {
let mut comp = ComponentStatus::ready();
comp.message = Some(format!("{}:{}ms", status.backend_type, status.latency_ms));
comp
}
Ok(Ok(status)) => ComponentStatus::degraded(
status
.message
.unwrap_or_else(|| format!("{} unhealthy", status.backend_type)),
),
Ok(Err(e)) => {
tracing::warn!(error = %e, "Storage health check failed");
ComponentStatus::degraded(format!("Storage error: {}", e))
}
Err(_) => {
tracing::warn!("Storage health check timed out");
ComponentStatus::degraded("Storage response timeout")
}
};
let authentication = ComponentStatus::ready();
let authorization = ComponentStatus::ready();
let credentials = ComponentStatus::ready();
let components = ReadinessComponents {
catalog,
storage,
authentication,
authorization,
credentials,
};
let all_ready = [
&components.catalog,
&components.storage,
&components.authentication,
&components.authorization,
&components.credentials,
]
.iter()
.all(|c| c.status == "ready");
Self {
status: if all_ready {
"ready".to_string()
} else {
"not_ready".to_string()
},
version: env!("CARGO_PKG_VERSION").to_string(),
timestamp,
components,
}
}
}
pub async fn health_handler() -> impl IntoResponse {
let status = HealthStatus::healthy();
(StatusCode::OK, Json(status))
}
pub async fn readiness_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let status = ReadinessStatus::check(&state).await;
let status_code = if status.status == "ready" {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
(status_code, Json(status))
}
pub fn create_routes(state: Arc<AppState>) -> Router {
Router::new()
.route("/health", get(health_handler))
.route("/ready", get(readiness_handler))
.with_state(state)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_health_status_healthy() {
let status = HealthStatus::healthy();
assert_eq!(status.status, "healthy");
assert_eq!(status.version, env!("CARGO_PKG_VERSION"));
assert!(status.timestamp > 0);
}
#[test]
fn test_health_status_unhealthy() {
let status = HealthStatus::unhealthy("test error".to_string());
assert!(status.status.contains("unhealthy"));
assert!(status.status.contains("test error"));
}
#[test]
fn test_component_status() {
let ready = ComponentStatus::ready();
assert_eq!(ready.status, "ready");
assert!(ready.message.is_none());
let degraded = ComponentStatus::degraded("slow connection");
assert_eq!(degraded.status, "degraded");
assert_eq!(degraded.message.as_deref(), Some("slow connection"));
let unavailable = ComponentStatus::unavailable("connection lost");
assert_eq!(unavailable.status, "unavailable");
assert_eq!(unavailable.message.as_deref(), Some("connection lost"));
}
#[tokio::test]
async fn test_readiness_all_ready() {
use crate::auth::{
AllowAllAuthenticator, AllowAllAuthorizer, RateLimitConfig, RateLimiter,
};
use crate::catalog::{ExtendedCatalog, IdempotencyCache, ViewStorage};
use crate::credentials::NoopCredentialProvider;
use iceberg::memory::MemoryCatalogBuilder;
use iceberg::CatalogBuilder;
use std::collections::HashMap;
use std::time::Duration;
let mut props = HashMap::new();
props.insert("warehouse".to_string(), "memory://test".to_string());
let catalog: iceberg::MemoryCatalog = MemoryCatalogBuilder::default()
.load("memory", props)
.await
.unwrap();
let state = AppState {
authenticator: Arc::new(AllowAllAuthenticator),
authorizer: Arc::new(AllowAllAuthorizer),
catalog: Arc::new(ExtendedCatalog::new(catalog)),
credential_provider: Arc::new(NoopCredentialProvider),
rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig::default())),
idempotency_cache: Arc::new(IdempotencyCache::new(Duration::from_secs(3600))),
view_storage: Arc::new(ViewStorage::new()),
metrics: Arc::new(crate::observability::MetricsRegistry::new()),
warehouse_location: "memory://test".to_string(),
default_tenant_id: "default".to_string(),
};
let status = ReadinessStatus::check(&state).await;
assert_eq!(status.status, "ready");
assert_eq!(status.components.catalog.status, "ready");
assert_eq!(status.components.authentication.status, "ready");
assert_eq!(status.components.authorization.status, "ready");
assert_eq!(status.components.credentials.status, "ready");
}
}