Skip to main content

apcore/sys_modules/
health.rs

1// APCore Protocol — System health modules
2// Spec reference: system.health.summary, system.health.module
3
4use async_trait::async_trait;
5use serde_json::json;
6use std::sync::Arc;
7use tokio::sync::Mutex;
8
9use crate::config::Config;
10use crate::context::Context;
11use crate::errors::{ErrorCode, ModuleError};
12use crate::module::Module;
13use crate::observability::error_history::ErrorHistory;
14use crate::observability::metrics::MetricsCollector;
15use crate::registry::registry::Registry;
16
17fn classify_health(error_rate: f64, total_calls: u64, threshold: f64) -> &'static str {
18    if total_calls == 0 {
19        return "unknown";
20    }
21    if error_rate < threshold {
22        "healthy"
23    } else if error_rate < 0.10 {
24        "degraded"
25    } else {
26        "error"
27    }
28}
29
30/// system.health.summary — Aggregated health overview of all registered modules.
31pub struct HealthSummaryModule {
32    registry: Arc<Mutex<Registry>>,
33    metrics: Option<MetricsCollector>,
34    error_history: ErrorHistory,
35    config: Arc<Mutex<Config>>,
36}
37
38impl HealthSummaryModule {
39    pub fn new(
40        registry: Arc<Mutex<Registry>>,
41        metrics: Option<MetricsCollector>,
42        error_history: ErrorHistory,
43        config: Arc<Mutex<Config>>,
44    ) -> Self {
45        Self {
46            registry,
47            metrics,
48            error_history,
49            config,
50        }
51    }
52}
53
54#[async_trait]
55impl Module for HealthSummaryModule {
56    fn description(&self) -> &str {
57        "Aggregated health overview of all registered modules"
58    }
59
60    fn input_schema(&self) -> serde_json::Value {
61        json!({
62            "type": "object",
63            "properties": {
64                "error_rate_threshold": {"type": "number", "default": 0.01},
65                "include_healthy": {"type": "boolean", "default": true}
66            }
67        })
68    }
69
70    fn output_schema(&self) -> serde_json::Value {
71        json!({ "type": "object" })
72    }
73
74    async fn execute(
75        &self,
76        inputs: serde_json::Value,
77        _ctx: &Context<serde_json::Value>,
78    ) -> Result<serde_json::Value, ModuleError> {
79        let threshold = inputs
80            .get("error_rate_threshold")
81            .and_then(|v| v.as_f64())
82            .unwrap_or(0.01);
83        let include_healthy = inputs
84            .get("include_healthy")
85            .and_then(|v| v.as_bool())
86            .unwrap_or(true);
87
88        let reg = self.registry.lock().await;
89        let module_ids = reg.list(None, None);
90
91        let project_name = {
92            let cfg = self.config.lock().await;
93            cfg.get("project.name")
94                .and_then(|v| v.as_str().map(|s| s.to_string()))
95                .unwrap_or_else(|| "apcore".to_string())
96        };
97
98        let snapshot = self.metrics.as_ref().map(|m| m.snapshot());
99
100        let mut modules = Vec::new();
101        let (mut healthy, mut degraded, mut error_count, mut unknown) = (0u32, 0u32, 0u32, 0u32);
102
103        for mid in &module_ids {
104            let (total_calls, errors) = snapshot
105                .as_ref()
106                .map(|s| extract_call_counts(s, mid))
107                .unwrap_or((0, 0));
108            let error_rate = if total_calls > 0 {
109                errors as f64 / total_calls as f64
110            } else {
111                0.0
112            };
113            let status = classify_health(error_rate, total_calls, threshold);
114
115            match status {
116                "healthy" => healthy += 1,
117                "degraded" => degraded += 1,
118                "error" => error_count += 1,
119                _ => unknown += 1,
120            }
121
122            if !include_healthy && status == "healthy" {
123                continue;
124            }
125
126            let top_error = self.error_history.get(mid, Some(1)).first().map(|e| {
127                json!({
128                    "code": e.error_code,
129                    "message": e.message,
130                    "ai_guidance": e.ai_guidance,
131                    "count": e.count,
132                })
133            });
134
135            modules.push(json!({
136                "module_id": mid,
137                "status": status,
138                "error_rate": error_rate,
139                "top_error": top_error,
140            }));
141        }
142
143        Ok(json!({
144            "project": { "name": project_name },
145            "summary": {
146                "total_modules": module_ids.len(),
147                "healthy": healthy,
148                "degraded": degraded,
149                "error": error_count,
150                "unknown": unknown,
151            },
152            "modules": modules,
153        }))
154    }
155}
156
157/// system.health.module — Detailed health for a single module.
158pub struct HealthModuleModule {
159    registry: Arc<Mutex<Registry>>,
160    metrics: Option<MetricsCollector>,
161    error_history: ErrorHistory,
162}
163
164impl HealthModuleModule {
165    pub fn new(
166        registry: Arc<Mutex<Registry>>,
167        metrics: Option<MetricsCollector>,
168        error_history: ErrorHistory,
169    ) -> Self {
170        Self {
171            registry,
172            metrics,
173            error_history,
174        }
175    }
176}
177
178#[async_trait]
179impl Module for HealthModuleModule {
180    fn description(&self) -> &str {
181        "Detailed health information for a single module"
182    }
183
184    fn input_schema(&self) -> serde_json::Value {
185        json!({
186            "type": "object",
187            "required": ["module_id"],
188            "properties": {
189                "module_id": {"type": "string"},
190                "error_limit": {"type": "integer", "default": 10}
191            }
192        })
193    }
194
195    fn output_schema(&self) -> serde_json::Value {
196        json!({ "type": "object" })
197    }
198
199    async fn execute(
200        &self,
201        inputs: serde_json::Value,
202        _ctx: &Context<serde_json::Value>,
203    ) -> Result<serde_json::Value, ModuleError> {
204        let module_id = inputs
205            .get("module_id")
206            .and_then(|v| v.as_str())
207            .ok_or_else(|| {
208                ModuleError::new(ErrorCode::GeneralInvalidInput, "'module_id' is required")
209            })?;
210        let error_limit = inputs
211            .get("error_limit")
212            .and_then(|v| v.as_u64())
213            .unwrap_or(10) as usize;
214
215        {
216            let reg = self.registry.lock().await;
217            if !reg.has(module_id) {
218                return Err(ModuleError::new(
219                    ErrorCode::ModuleNotFound,
220                    format!("Module '{}' not found", module_id),
221                ));
222            }
223        }
224
225        let snapshot = self.metrics.as_ref().map(|m| m.snapshot());
226        let (total_calls, errors) = snapshot
227            .as_ref()
228            .map(|s| extract_call_counts(s, module_id))
229            .unwrap_or((0, 0));
230        let error_rate = if total_calls > 0 {
231            errors as f64 / total_calls as f64
232        } else {
233            0.0
234        };
235        let status = classify_health(error_rate, total_calls, 0.01);
236
237        let recent_errors: Vec<serde_json::Value> = self
238            .error_history
239            .get(module_id, Some(error_limit))
240            .into_iter()
241            .map(|e| {
242                json!({
243                    "code": e.error_code,
244                    "message": e.message,
245                    "ai_guidance": e.ai_guidance,
246                    "count": e.count,
247                    "first_occurred": e.first_occurred.to_rfc3339(),
248                    "last_occurred": e.last_occurred.to_rfc3339(),
249                })
250            })
251            .collect();
252
253        let (avg_latency_ms, p99_latency_ms) = snapshot
254            .as_ref()
255            .map(|s| extract_latency_stats(s, module_id))
256            .unwrap_or((0.0, 0.0));
257
258        Ok(json!({
259            "module_id": module_id,
260            "status": status,
261            "total_calls": total_calls,
262            "error_count": errors,
263            "error_rate": error_rate,
264            "avg_latency_ms": avg_latency_ms,
265            "p99_latency_ms": p99_latency_ms,
266            "recent_errors": recent_errors,
267        }))
268    }
269}
270
271/// Extract call counts from a MetricsCollector snapshot.
272fn extract_call_counts(snapshot: &serde_json::Value, module_id: &str) -> (u64, u64) {
273    let counters = match snapshot.get("counters").and_then(|c| c.as_object()) {
274        Some(c) => c,
275        None => return (0, 0),
276    };
277    let mut total: u64 = 0;
278    let mut errors: u64 = 0;
279    let success_key = format!("apcore_module_calls_total|module_id={module_id},status=success");
280    let error_key = format!("apcore_module_calls_total|module_id={module_id},status=error");
281    if let Some(v) = counters.get(&success_key).and_then(|v| v.as_u64()) {
282        total += v;
283    }
284    if let Some(v) = counters.get(&error_key).and_then(|v| v.as_u64()) {
285        total += v;
286        errors = v;
287    }
288    (total, errors)
289}
290
291/// Extract latency statistics (avg_ms, p99_ms) from a MetricsCollector snapshot.
292///
293/// Reads the histogram key `apcore_module_duration_seconds|module_id=<id>`.
294/// Returns (avg_latency_ms, p99_latency_ms).
295fn extract_latency_stats(snapshot: &serde_json::Value, module_id: &str) -> (f64, f64) {
296    let histograms = match snapshot.get("histograms").and_then(|h| h.as_object()) {
297        Some(h) => h,
298        None => return (0.0, 0.0),
299    };
300    let hist_key = format!("apcore_module_duration_seconds|module_id={module_id}");
301    let data = match histograms.get(&hist_key) {
302        Some(d) => d,
303        None => return (0.0, 0.0),
304    };
305    let sum = data.get("sum").and_then(|v| v.as_f64()).unwrap_or(0.0);
306    let count = data.get("count").and_then(|v| v.as_u64()).unwrap_or(0);
307    let avg_ms = if count > 0 {
308        (sum / count as f64) * 1000.0
309    } else {
310        0.0
311    };
312
313    // Estimate p99 from histogram buckets.
314    let p99_ms = if let Some(buckets) = data.get("buckets").and_then(|b| b.as_array()) {
315        let target = (count as f64 * 0.99).ceil() as u64;
316        let mut p99 = 0.0_f64;
317        for bucket in buckets {
318            let le = bucket
319                .get("le")
320                .and_then(|v| v.as_f64())
321                .unwrap_or(f64::INFINITY);
322            let cnt = bucket.get("count").and_then(|v| v.as_u64()).unwrap_or(0);
323            if cnt >= target {
324                p99 = le * 1000.0; // seconds → ms
325                break;
326            }
327        }
328        p99
329    } else {
330        0.0
331    };
332
333    (avg_ms, p99_ms)
334}