use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::time::{Duration, Instant};
use arc_swap::ArcSwap;
use axum::response::IntoResponse;
pub struct SubsystemStatus {
pub healthy: bool,
pub reason: Option<&'static str>,
pub last_success: Option<Instant>,
pub last_failure: Option<Instant>,
}
impl SubsystemStatus {
fn initial() -> Self {
Self {
healthy: false,
reason: Some("not yet checked"),
last_success: None,
last_failure: None,
}
}
}
#[derive(Clone)]
pub struct SubsystemReporter {
state: Arc<ArcSwap<SubsystemStatus>>,
}
impl SubsystemReporter {
pub fn new() -> Self {
Self {
state: Arc::new(ArcSwap::new(Arc::new(SubsystemStatus::initial()))),
}
}
pub fn report_ok(&self) {
self.state.rcu(|current| {
Arc::new(SubsystemStatus {
healthy: true,
reason: None,
last_success: Some(Instant::now()),
last_failure: current.last_failure,
})
});
}
pub fn report_err(&self, reason: &'static str) {
self.state.rcu(|current| {
Arc::new(SubsystemStatus {
healthy: false,
reason: Some(reason),
last_success: current.last_success,
last_failure: Some(Instant::now()),
})
});
}
pub fn load(&self) -> arc_swap::Guard<Arc<SubsystemStatus>> {
self.state.load()
}
}
impl Default for SubsystemReporter {
fn default() -> Self {
Self::new()
}
}
pub struct HealthRegistry {
pub git: SubsystemReporter,
pub embedding: SubsystemReporter,
pub vector_index: SubsystemReporter,
pub sync: SubsystemReporter,
pub require_sync: bool,
pub stale_threshold: Option<Duration>,
was_ready: AtomicBool,
}
impl HealthRegistry {
pub fn new() -> Self {
Self::with_config(false, None)
}
pub fn with_config(require_sync: bool, stale_threshold: Option<Duration>) -> Self {
Self {
git: SubsystemReporter::new(),
embedding: SubsystemReporter::new(),
vector_index: SubsystemReporter::new(),
sync: SubsystemReporter::new(),
require_sync,
stale_threshold,
was_ready: AtomicBool::new(false),
}
}
}
impl Default for HealthRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(serde::Serialize)]
pub struct ReadyzResponse {
pub status: &'static str,
pub checks: ReadyzChecks,
}
#[derive(serde::Serialize)]
pub struct ReadyzChecks {
pub git_repo: CheckResult,
pub embedding: CheckResult,
pub vector_index: CheckResult,
#[serde(skip_serializing_if = "Option::is_none")]
pub sync: Option<CheckResult>,
}
#[derive(serde::Serialize)]
pub struct CheckResult {
pub status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<&'static str>,
}
impl From<&SubsystemStatus> for CheckResult {
fn from(s: &SubsystemStatus) -> Self {
if s.healthy {
Self {
status: "up",
reason: None,
}
} else {
Self {
status: "down",
reason: s.reason,
}
}
}
}
fn check_with_staleness(status: &SubsystemStatus, threshold: Option<Duration>) -> CheckResult {
if !status.healthy {
return CheckResult::from(status);
}
if let Some(threshold) = threshold {
if let Some(last_success) = status.last_success {
if last_success.elapsed() > threshold {
return CheckResult {
status: "down",
reason: Some("stale"),
};
}
}
}
CheckResult::from(status)
}
pub async fn readyz_handler(
axum::extract::State(state): axum::extract::State<Arc<crate::types::AppState>>,
) -> axum::response::Response {
let threshold = state.health.stale_threshold;
let git = state.health.git.load();
let embedding = state.health.embedding.load();
let vector_index = state.health.vector_index.load();
let git_check = check_with_staleness(&git, threshold);
let embedding_check = check_with_staleness(&embedding, threshold);
let vector_index_check = check_with_staleness(&vector_index, threshold);
let sync_check = if state.health.require_sync {
let sync = state.health.sync.load();
Some(check_with_staleness(&sync, threshold))
} else {
None
};
let all_up = git_check.status == "up"
&& embedding_check.status == "up"
&& vector_index_check.status == "up"
&& sync_check.as_ref().is_none_or(|s| s.status == "up");
let response = ReadyzResponse {
status: if all_up { "ready" } else { "not_ready" },
checks: ReadyzChecks {
git_repo: git_check,
embedding: embedding_check,
vector_index: vector_index_check,
sync: sync_check,
},
};
let status_code = if all_up {
if state
.health
.was_ready
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
tracing::info!("readyz: all subsystems up");
}
axum::http::StatusCode::OK
} else {
if state
.health
.was_ready
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
tracing::warn!(
git_repo = response.checks.git_repo.status,
embedding = response.checks.embedding.status,
vector_index = response.checks.vector_index.status,
sync = response.checks.sync.as_ref().map_or("n/a", |s| s.status),
"readyz degraded: subsystem(s) down",
);
} else {
tracing::debug!("readyz check: not ready");
}
axum::http::StatusCode::SERVICE_UNAVAILABLE
};
(status_code, axum::Json(response)).into_response()
}
pub async fn healthz_handler() -> impl IntoResponse {
axum::Json(serde_json::json!({"status": "ok"}))
}
pub async fn version_handler() -> impl IntoResponse {
axum::Json(serde_json::json!({"version": env!("CARGO_PKG_VERSION")}))
}