1use std::sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc,
9};
10use std::time::{Duration, Instant};
11
12use arc_swap::ArcSwap;
13use axum::response::IntoResponse;
14
15pub struct SubsystemStatus {
21 pub healthy: bool,
23 pub reason: Option<&'static str>,
25 pub last_success: Option<Instant>,
28 pub last_failure: Option<Instant>,
30}
31
32impl SubsystemStatus {
33 fn initial() -> Self {
34 Self {
35 healthy: false,
36 reason: Some("not yet checked"),
37 last_success: None,
38 last_failure: None,
39 }
40 }
41}
42
43#[derive(Clone)]
51pub struct SubsystemReporter {
52 state: Arc<ArcSwap<SubsystemStatus>>,
53}
54
55impl SubsystemReporter {
56 pub fn new() -> Self {
58 Self {
59 state: Arc::new(ArcSwap::new(Arc::new(SubsystemStatus::initial()))),
60 }
61 }
62
63 pub fn report_ok(&self) {
65 self.state.rcu(|current| {
66 Arc::new(SubsystemStatus {
67 healthy: true,
68 reason: None,
69 last_success: Some(Instant::now()),
70 last_failure: current.last_failure,
71 })
72 });
73 }
74
75 pub fn report_err(&self, reason: &'static str) {
77 self.state.rcu(|current| {
78 Arc::new(SubsystemStatus {
79 healthy: false,
80 reason: Some(reason),
81 last_success: current.last_success,
82 last_failure: Some(Instant::now()),
83 })
84 });
85 }
86
87 pub fn load(&self) -> arc_swap::Guard<Arc<SubsystemStatus>> {
89 self.state.load()
90 }
91}
92
93impl Default for SubsystemReporter {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99pub struct HealthRegistry {
107 pub git: SubsystemReporter,
109 pub embedding: SubsystemReporter,
111 pub vector_index: SubsystemReporter,
113 pub sync: SubsystemReporter,
115 pub require_sync: bool,
117 pub stale_threshold: Option<Duration>,
120 was_ready: AtomicBool,
121}
122
123impl HealthRegistry {
124 pub fn new() -> Self {
129 Self::with_config(false, None)
130 }
131
132 pub fn with_config(require_sync: bool, stale_threshold: Option<Duration>) -> Self {
134 Self {
135 git: SubsystemReporter::new(),
136 embedding: SubsystemReporter::new(),
137 vector_index: SubsystemReporter::new(),
138 sync: SubsystemReporter::new(),
139 require_sync,
140 stale_threshold,
141 was_ready: AtomicBool::new(false),
142 }
143 }
144}
145
146impl Default for HealthRegistry {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152#[derive(serde::Serialize)]
158pub struct ReadyzResponse {
159 pub status: &'static str,
161 pub checks: ReadyzChecks,
163}
164
165#[derive(serde::Serialize)]
167pub struct ReadyzChecks {
168 pub git_repo: CheckResult,
170 pub embedding: CheckResult,
172 pub vector_index: CheckResult,
174 #[serde(skip_serializing_if = "Option::is_none")]
176 pub sync: Option<CheckResult>,
177}
178
179#[derive(serde::Serialize)]
181pub struct CheckResult {
182 pub status: &'static str,
184 #[serde(skip_serializing_if = "Option::is_none")]
186 pub reason: Option<&'static str>,
187}
188
189impl From<&SubsystemStatus> for CheckResult {
190 fn from(s: &SubsystemStatus) -> Self {
191 if s.healthy {
192 Self {
193 status: "up",
194 reason: None,
195 }
196 } else {
197 Self {
198 status: "down",
199 reason: s.reason,
200 }
201 }
202 }
203}
204
205fn check_with_staleness(status: &SubsystemStatus, threshold: Option<Duration>) -> CheckResult {
216 if !status.healthy {
217 return CheckResult::from(status);
218 }
219 if let Some(threshold) = threshold {
220 if let Some(last_success) = status.last_success {
221 if last_success.elapsed() > threshold {
222 return CheckResult {
223 status: "down",
224 reason: Some("stale"),
225 };
226 }
227 }
228 }
230 CheckResult::from(status)
231}
232
233pub async fn readyz_handler(
242 axum::extract::State(state): axum::extract::State<Arc<crate::types::AppState>>,
243) -> axum::response::Response {
244 let threshold = state.health.stale_threshold;
245
246 let git = state.health.git.load();
247 let embedding = state.health.embedding.load();
248 let vector_index = state.health.vector_index.load();
249
250 let git_check = check_with_staleness(&git, threshold);
251 let embedding_check = check_with_staleness(&embedding, threshold);
252 let vector_index_check = check_with_staleness(&vector_index, threshold);
253
254 let sync_check = if state.health.require_sync {
255 let sync = state.health.sync.load();
256 Some(check_with_staleness(&sync, threshold))
257 } else {
258 None
259 };
260
261 let all_up = git_check.status == "up"
262 && embedding_check.status == "up"
263 && vector_index_check.status == "up"
264 && sync_check.as_ref().is_none_or(|s| s.status == "up");
265
266 let response = ReadyzResponse {
267 status: if all_up { "ready" } else { "not_ready" },
268 checks: ReadyzChecks {
269 git_repo: git_check,
270 embedding: embedding_check,
271 vector_index: vector_index_check,
272 sync: sync_check,
273 },
274 };
275
276 let status_code = if all_up {
278 if state
279 .health
280 .was_ready
281 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
282 .is_ok()
283 {
284 tracing::info!("readyz: all subsystems up");
285 }
286 axum::http::StatusCode::OK
287 } else {
288 if state
289 .health
290 .was_ready
291 .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
292 .is_ok()
293 {
294 tracing::warn!(
295 git_repo = response.checks.git_repo.status,
296 embedding = response.checks.embedding.status,
297 vector_index = response.checks.vector_index.status,
298 sync = response.checks.sync.as_ref().map_or("n/a", |s| s.status),
299 "readyz degraded: subsystem(s) down",
300 );
301 } else {
302 tracing::debug!("readyz check: not ready");
303 }
304 axum::http::StatusCode::SERVICE_UNAVAILABLE
305 };
306
307 (status_code, axum::Json(response)).into_response()
308}
309
310pub async fn healthz_handler() -> impl IntoResponse {
312 axum::Json(serde_json::json!({"status": "ok"}))
313}
314
315pub async fn version_handler() -> impl IntoResponse {
317 axum::Json(serde_json::json!({"version": env!("CARGO_PKG_VERSION")}))
318}