Skip to main content

memory_mcp/
health.rs

1//! Health reporting infrastructure and HTTP handlers.
2//!
3//! Subsystems report their own operational state via [`SubsystemReporter`].
4//! The `/readyz` handler reads the latest state — no active probing.
5
6use std::sync::{
7    atomic::{AtomicBool, Ordering},
8    Arc,
9};
10use std::time::{Duration, Instant};
11
12use arc_swap::ArcSwap;
13use axum::response::IntoResponse;
14
15// ---------------------------------------------------------------------------
16// SubsystemStatus
17// ---------------------------------------------------------------------------
18
19/// Per-subsystem health snapshot. Immutable once created.
20pub struct SubsystemStatus {
21    /// Whether the subsystem is currently healthy.
22    pub healthy: bool,
23    /// Human-readable reason for an unhealthy state. `None` when healthy.
24    pub reason: Option<&'static str>,
25    /// Timestamp of the most recent successful operation.
26    /// Used for staleness detection — if healthy but last success is old, may be stale.
27    pub last_success: Option<Instant>,
28    /// Timestamp of the most recent failed operation.
29    pub last_failure: Option<Instant>,
30}
31
32impl SubsystemStatus {
33    fn initial() -> Self {
34        Self {
35            healthy: false,
36            reason: Some("not yet checked"),
37            last_success: None,
38            last_failure: None,
39        }
40    }
41}
42
43// ---------------------------------------------------------------------------
44// SubsystemReporter
45// ---------------------------------------------------------------------------
46
47/// Lightweight handle for a subsystem to report its health.
48///
49/// Clone is cheap (Arc clone). Each clone shares the same underlying state.
50#[derive(Clone)]
51pub struct SubsystemReporter {
52    state: Arc<ArcSwap<SubsystemStatus>>,
53}
54
55impl SubsystemReporter {
56    /// Create a new reporter. Initial state is "not yet checked" (unhealthy until first success).
57    pub fn new() -> Self {
58        Self {
59            state: Arc::new(ArcSwap::new(Arc::new(SubsystemStatus::initial()))),
60        }
61    }
62
63    /// Report a successful operation.
64    pub fn report_ok(&self) {
65        self.state.rcu(|current| {
66            Arc::new(SubsystemStatus {
67                healthy: true,
68                reason: None,
69                last_success: Some(Instant::now()),
70                last_failure: current.last_failure,
71            })
72        });
73    }
74
75    /// Report a failed operation with a static reason string.
76    pub fn report_err(&self, reason: &'static str) {
77        self.state.rcu(|current| {
78            Arc::new(SubsystemStatus {
79                healthy: false,
80                reason: Some(reason),
81                last_success: current.last_success,
82                last_failure: Some(Instant::now()),
83            })
84        });
85    }
86
87    /// Load the current status snapshot.
88    pub fn load(&self) -> arc_swap::Guard<Arc<SubsystemStatus>> {
89        self.state.load()
90    }
91}
92
93impl Default for SubsystemReporter {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99// ---------------------------------------------------------------------------
100// HealthRegistry
101// ---------------------------------------------------------------------------
102
103/// Central registry holding reporters for all subsystems.
104///
105/// Lives on `AppState`. The `/readyz` handler reads from here — no active probing.
106pub struct HealthRegistry {
107    /// Reporter for the git-backed memory repository.
108    pub git: SubsystemReporter,
109    /// Reporter for the embedding engine.
110    pub embedding: SubsystemReporter,
111    /// Reporter for the vector index.
112    pub vector_index: SubsystemReporter,
113    /// Reporter for remote sync (push/pull) operations.
114    pub sync: SubsystemReporter,
115    /// Whether sync failures affect readiness (`/readyz` returns 503 on sync failure).
116    pub require_sync: bool,
117    /// Duration after which a healthy subsystem with no recent successes is
118    /// considered stale. `None` disables staleness detection.
119    pub stale_threshold: Option<Duration>,
120    was_ready: AtomicBool,
121}
122
123impl HealthRegistry {
124    /// Create a new registry with all subsystems in "not yet checked" state.
125    ///
126    /// `require_sync` controls whether sync failures affect readiness.
127    /// `stale_threshold` sets the staleness window (`None` to disable).
128    pub fn new() -> Self {
129        Self::with_config(false, None)
130    }
131
132    /// Create a registry with explicit sync-gating and staleness configuration.
133    pub fn with_config(require_sync: bool, stale_threshold: Option<Duration>) -> Self {
134        Self {
135            git: SubsystemReporter::new(),
136            embedding: SubsystemReporter::new(),
137            vector_index: SubsystemReporter::new(),
138            sync: SubsystemReporter::new(),
139            require_sync,
140            stale_threshold,
141            was_ready: AtomicBool::new(false),
142        }
143    }
144}
145
146impl Default for HealthRegistry {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152// ---------------------------------------------------------------------------
153// Response types for JSON serialisation
154// ---------------------------------------------------------------------------
155
156/// Top-level readiness response returned by `/readyz`.
157#[derive(serde::Serialize)]
158pub struct ReadyzResponse {
159    /// `"ready"` or `"not_ready"`.
160    pub status: &'static str,
161    /// Per-subsystem check results.
162    pub checks: ReadyzChecks,
163}
164
165/// Individual subsystem check results.
166#[derive(serde::Serialize)]
167pub struct ReadyzChecks {
168    /// Git repository lock status.
169    pub git_repo: CheckResult,
170    /// Embedding model status.
171    pub embedding: CheckResult,
172    /// Vector index status.
173    pub vector_index: CheckResult,
174    /// Remote sync status. Present only when `--require-remote-sync` is set.
175    #[serde(skip_serializing_if = "Option::is_none")]
176    pub sync: Option<CheckResult>,
177}
178
179/// Result of a single subsystem health check.
180#[derive(serde::Serialize)]
181pub struct CheckResult {
182    /// `"up"` or `"down"`.
183    pub status: &'static str,
184    /// Present only when `status` is `"down"` — fixed vocabulary, no dynamic content.
185    #[serde(skip_serializing_if = "Option::is_none")]
186    pub reason: Option<&'static str>,
187}
188
189impl From<&SubsystemStatus> for CheckResult {
190    fn from(s: &SubsystemStatus) -> Self {
191        if s.healthy {
192            Self {
193                status: "up",
194                reason: None,
195            }
196        } else {
197            Self {
198                status: "down",
199                reason: s.reason,
200            }
201        }
202    }
203}
204
205// ---------------------------------------------------------------------------
206// Staleness helpers
207// ---------------------------------------------------------------------------
208
209/// Derive a `CheckResult` from a status snapshot, applying staleness detection
210/// when `threshold` is `Some`.
211///
212/// A subsystem that reports `healthy = true` but whose `last_success` is older
213/// than `threshold` is considered stale and returned as `"down"` with reason
214/// `"stale"`.
215fn check_with_staleness(status: &SubsystemStatus, threshold: Option<Duration>) -> CheckResult {
216    if !status.healthy {
217        return CheckResult::from(status);
218    }
219    if let Some(threshold) = threshold {
220        if let Some(last_success) = status.last_success {
221            if last_success.elapsed() > threshold {
222                return CheckResult {
223                    status: "down",
224                    reason: Some("stale"),
225                };
226            }
227        }
228        // No last_success but healthy (e.g. freshly reported via startup) — not stale yet.
229    }
230    CheckResult::from(status)
231}
232
233// ---------------------------------------------------------------------------
234// HTTP handlers
235// ---------------------------------------------------------------------------
236
237/// Handler for `GET /readyz`.
238///
239/// Reads the latest reported state from each subsystem reporter — no active
240/// probing. Returns `200 OK` when all subsystems are up, `503` otherwise.
241pub async fn readyz_handler(
242    axum::extract::State(state): axum::extract::State<Arc<crate::types::AppState>>,
243) -> axum::response::Response {
244    let threshold = state.health.stale_threshold;
245
246    let git = state.health.git.load();
247    let embedding = state.health.embedding.load();
248    let vector_index = state.health.vector_index.load();
249
250    let git_check = check_with_staleness(&git, threshold);
251    let embedding_check = check_with_staleness(&embedding, threshold);
252    let vector_index_check = check_with_staleness(&vector_index, threshold);
253
254    let sync_check = if state.health.require_sync {
255        let sync = state.health.sync.load();
256        Some(check_with_staleness(&sync, threshold))
257    } else {
258        None
259    };
260
261    let all_up = git_check.status == "up"
262        && embedding_check.status == "up"
263        && vector_index_check.status == "up"
264        && sync_check.as_ref().is_none_or(|s| s.status == "up");
265
266    let response = ReadyzResponse {
267        status: if all_up { "ready" } else { "not_ready" },
268        checks: ReadyzChecks {
269            git_repo: git_check,
270            embedding: embedding_check,
271            vector_index: vector_index_check,
272            sync: sync_check,
273        },
274    };
275
276    // Transition-based logging via compare_exchange.
277    let status_code = if all_up {
278        if state
279            .health
280            .was_ready
281            .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
282            .is_ok()
283        {
284            tracing::info!("readyz: all subsystems up");
285        }
286        axum::http::StatusCode::OK
287    } else {
288        if state
289            .health
290            .was_ready
291            .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
292            .is_ok()
293        {
294            tracing::warn!(
295                git_repo = response.checks.git_repo.status,
296                embedding = response.checks.embedding.status,
297                vector_index = response.checks.vector_index.status,
298                sync = response.checks.sync.as_ref().map_or("n/a", |s| s.status),
299                "readyz degraded: subsystem(s) down",
300            );
301        } else {
302            tracing::debug!("readyz check: not ready");
303        }
304        axum::http::StatusCode::SERVICE_UNAVAILABLE
305    };
306
307    (status_code, axum::Json(response)).into_response()
308}
309
310/// Handler for `GET /healthz` (liveness probe — always 200 OK).
311pub async fn healthz_handler() -> impl IntoResponse {
312    axum::Json(serde_json::json!({"status": "ok"}))
313}
314
315/// Handler for `GET /version`.
316pub async fn version_handler() -> impl IntoResponse {
317    axum::Json(serde_json::json!({"version": env!("CARGO_PKG_VERSION")}))
318}