use axum::extract::State;
use axum::Json;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use super::state::{SearchAppState, WarmBootSummary};
#[derive(Serialize)]
pub(super) struct HealthResponse {
pub(super) status: &'static str,
pub(super) version: &'static str,
pub(super) indexes: usize,
pub(super) uptime_secs: u64,
pub(super) embedder: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) embedder_error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) embedder_last_ok_secs_ago: Option<u64>,
pub(super) embedder_recent_timeout_count: u32,
pub(super) rss_mb: u64,
pub(super) rss_limit_mb: u64,
pub(super) disk_bytes: u64,
pub(super) cpu_pct: f32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) embedder_info: Option<EmbedderInfo>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) embedderd_rss_mb: Option<u64>,
pub(super) background_reindex_queue_depth: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) update_available: Option<String>,
pub(super) warmboot_summary: WarmBootSummary,
}
#[derive(Serialize)]
pub(super) struct EmbedderInfo {
dimension: usize,
provider: String,
quantized: bool,
}
pub(super) async fn health_handler(
State(state): State<Arc<SearchAppState>>,
) -> Json<HealthResponse> {
let embedder_error = state.current_embedder_error();
let embedder_last_ok_secs_ago = state.embedder_stall_tracker.last_ok_secs_ago();
let embedder_recent_timeout_count = state.embedder_stall_tracker.recent_timeout_count();
let embedder_status = if state.is_embedder_ready() {
let stalled = embedder_recent_timeout_count > 0;
if stalled {
"stalled"
} else {
"ready"
}
} else if state.embedder.is_some()
|| state
.embedder_slot
.try_read()
.map(|g| g.is_some())
.unwrap_or(false)
{
"ready"
} else if embedder_error.is_some() {
"error"
} else {
"initializing"
};
let (rss_mb, cpu_pct) = if let Ok(mut metrics) = state.sys_metrics.try_lock() {
let (rss, cpu) = metrics.sample();
state
.last_rss_mb
.store(rss, std::sync::atomic::Ordering::Relaxed);
state
.last_cpu_pct_bits
.store(cpu.to_bits(), std::sync::atomic::Ordering::Relaxed);
(rss, cpu)
} else {
let rss = state.last_rss_mb.load(std::sync::atomic::Ordering::Relaxed);
let cpu = f32::from_bits(
state
.last_cpu_pct_bits
.load(std::sync::atomic::Ordering::Relaxed),
);
(rss, cpu)
};
let rss_limit_mb = crate::core::memguard::memory_limit_mb().unwrap_or(0);
let disk_bytes = state.disk_bytes.load(std::sync::atomic::Ordering::Relaxed);
let embedder_info = state.try_current_embedder().map(|e| {
let dimension = e.dimension();
EmbedderInfo {
dimension,
provider: e.provider().as_str().to_string(),
quantized: dimension == trusty_common::embedder::EMBED_DIM,
}
});
let embedderd_rss_mb = state
.current_embedderd_pid()
.and_then(crate::core::memguard::current_rss_mb_for_pid);
let update_available = state.update_available.lock().ok().and_then(|g| g.clone());
let mut warmboot_summary = state
.warmboot_summary
.lock()
.map(|g| g.clone())
.unwrap_or_default();
warmboot_summary.indexes_lazy = state.cold_store.len();
warmboot_summary.indexes_failed = state.cold_store.failed_len();
Json(HealthResponse {
status: "ok",
version: env!("CARGO_PKG_VERSION"),
indexes: state.registry.list().len(),
uptime_secs: state.started_at.elapsed().as_secs(),
embedder: embedder_status,
embedder_error,
embedder_last_ok_secs_ago,
embedder_recent_timeout_count,
rss_mb,
rss_limit_mb,
disk_bytes,
cpu_pct,
embedder_info,
embedderd_rss_mb,
background_reindex_queue_depth: crate::service::reindex::background_reindex_queue_depth(),
update_available,
warmboot_summary,
})
}
#[derive(Deserialize)]
pub(super) struct UpgradeRequest {
#[serde(default = "bool_true")]
check: bool,
#[serde(default)]
confirm: bool,
}
pub(super) async fn upgrade_handler(
State(state): State<Arc<SearchAppState>>,
Json(body): Json<UpgradeRequest>,
) -> Json<serde_json::Value> {
let crate_name = env!("CARGO_PKG_NAME");
let current = env!("CARGO_PKG_VERSION");
let info = trusty_common::update::check_crates_io(crate_name, current).await;
let (latest, is_update) = match &info {
Some(u) => (u.latest.as_str(), true),
None => (current, false),
};
if body.check || !body.confirm {
let msg = if is_update {
format!(
"Update available: {crate_name} {latest} (you have {current}). \
POST with confirm=true to install."
)
} else {
format!("{crate_name} {current} is already up to date.")
};
return Json(serde_json::json!({
"status": "checked",
"current": current,
"latest": latest,
"update_available": is_update,
"message": msg
}));
}
if !is_update {
return Json(serde_json::json!({
"status": "up_to_date",
"current": current,
"message": format!("{crate_name} {current} is already up to date.")
}));
}
let latest_owned = latest.to_string();
let crate_name_owned = crate_name.to_string();
let update_slot = state.update_available.clone();
let response = serde_json::json!({
"status": "installing",
"current": current,
"latest": latest_owned,
"message": format!(
"Installing {crate_name} {latest_owned} — daemon will restart \
under launchd (or print a restart hint if not supervised)."
)
});
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
match trusty_common::update::upgrade_and_restart(&crate_name_owned, &crate_name_owned).await
{
Ok(Some(hint)) => {
tracing::info!("{hint}");
eprintln!("{hint}");
}
Ok(None) => {}
Err(e) => {
tracing::error!("upgrade_and_restart failed: {e:#}");
eprintln!("[trusty-search] upgrade failed: {e:#}");
if let Ok(mut g) = update_slot.lock() {
*g = None;
}
}
}
});
Json(response)
}
fn bool_true() -> bool {
true
}