1use async_trait::async_trait;
5use serde_json::json;
6use std::sync::Arc;
7use tokio::sync::Mutex;
8
9use crate::config::Config;
10use crate::context::Context;
11use crate::errors::{ErrorCode, ModuleError};
12use crate::module::Module;
13use crate::observability::error_history::ErrorHistory;
14use crate::observability::metrics::MetricsCollector;
15use crate::registry::registry::Registry;
16
17fn classify_health(error_rate: f64, total_calls: u64, threshold: f64) -> &'static str {
18 if total_calls == 0 {
19 return "unknown";
20 }
21 if error_rate < threshold {
22 "healthy"
23 } else if error_rate < 0.10 {
24 "degraded"
25 } else {
26 "error"
27 }
28}
29
30pub struct HealthSummaryModule {
32 registry: Arc<Mutex<Registry>>,
33 metrics: Option<MetricsCollector>,
34 error_history: ErrorHistory,
35 config: Arc<Mutex<Config>>,
36}
37
38impl HealthSummaryModule {
39 pub fn new(
40 registry: Arc<Mutex<Registry>>,
41 metrics: Option<MetricsCollector>,
42 error_history: ErrorHistory,
43 config: Arc<Mutex<Config>>,
44 ) -> Self {
45 Self {
46 registry,
47 metrics,
48 error_history,
49 config,
50 }
51 }
52}
53
54#[async_trait]
55impl Module for HealthSummaryModule {
56 fn description(&self) -> &str {
57 "Aggregated health overview of all registered modules"
58 }
59
60 fn input_schema(&self) -> serde_json::Value {
61 json!({
62 "type": "object",
63 "properties": {
64 "error_rate_threshold": {"type": "number", "default": 0.01},
65 "include_healthy": {"type": "boolean", "default": true}
66 }
67 })
68 }
69
70 fn output_schema(&self) -> serde_json::Value {
71 json!({ "type": "object" })
72 }
73
74 async fn execute(
75 &self,
76 inputs: serde_json::Value,
77 _ctx: &Context<serde_json::Value>,
78 ) -> Result<serde_json::Value, ModuleError> {
79 let threshold = inputs
80 .get("error_rate_threshold")
81 .and_then(|v| v.as_f64())
82 .unwrap_or(0.01);
83 let include_healthy = inputs
84 .get("include_healthy")
85 .and_then(|v| v.as_bool())
86 .unwrap_or(true);
87
88 let reg = self.registry.lock().await;
89 let module_ids = reg.list(None, None);
90
91 let project_name = {
92 let cfg = self.config.lock().await;
93 cfg.get("project.name")
94 .and_then(|v| v.as_str().map(|s| s.to_string()))
95 .unwrap_or_else(|| "apcore".to_string())
96 };
97
98 let snapshot = self.metrics.as_ref().map(|m| m.snapshot());
99
100 let mut modules = Vec::new();
101 let (mut healthy, mut degraded, mut error_count, mut unknown) = (0u32, 0u32, 0u32, 0u32);
102
103 for mid in &module_ids {
104 let (total_calls, errors) = snapshot
105 .as_ref()
106 .map(|s| extract_call_counts(s, mid))
107 .unwrap_or((0, 0));
108 let error_rate = if total_calls > 0 {
109 errors as f64 / total_calls as f64
110 } else {
111 0.0
112 };
113 let status = classify_health(error_rate, total_calls, threshold);
114
115 match status {
116 "healthy" => healthy += 1,
117 "degraded" => degraded += 1,
118 "error" => error_count += 1,
119 _ => unknown += 1,
120 }
121
122 if !include_healthy && status == "healthy" {
123 continue;
124 }
125
126 let top_error = self.error_history.get(mid, Some(1)).first().map(|e| {
127 json!({
128 "code": e.error_code,
129 "message": e.message,
130 "ai_guidance": e.ai_guidance,
131 "count": e.count,
132 })
133 });
134
135 modules.push(json!({
136 "module_id": mid,
137 "status": status,
138 "error_rate": error_rate,
139 "top_error": top_error,
140 }));
141 }
142
143 Ok(json!({
144 "project": { "name": project_name },
145 "summary": {
146 "total_modules": module_ids.len(),
147 "healthy": healthy,
148 "degraded": degraded,
149 "error": error_count,
150 "unknown": unknown,
151 },
152 "modules": modules,
153 }))
154 }
155}
156
157pub struct HealthModuleModule {
159 registry: Arc<Mutex<Registry>>,
160 metrics: Option<MetricsCollector>,
161 error_history: ErrorHistory,
162}
163
164impl HealthModuleModule {
165 pub fn new(
166 registry: Arc<Mutex<Registry>>,
167 metrics: Option<MetricsCollector>,
168 error_history: ErrorHistory,
169 ) -> Self {
170 Self {
171 registry,
172 metrics,
173 error_history,
174 }
175 }
176}
177
178#[async_trait]
179impl Module for HealthModuleModule {
180 fn description(&self) -> &str {
181 "Detailed health information for a single module"
182 }
183
184 fn input_schema(&self) -> serde_json::Value {
185 json!({
186 "type": "object",
187 "required": ["module_id"],
188 "properties": {
189 "module_id": {"type": "string"},
190 "error_limit": {"type": "integer", "default": 10}
191 }
192 })
193 }
194
195 fn output_schema(&self) -> serde_json::Value {
196 json!({ "type": "object" })
197 }
198
199 async fn execute(
200 &self,
201 inputs: serde_json::Value,
202 _ctx: &Context<serde_json::Value>,
203 ) -> Result<serde_json::Value, ModuleError> {
204 let module_id = inputs
205 .get("module_id")
206 .and_then(|v| v.as_str())
207 .ok_or_else(|| {
208 ModuleError::new(ErrorCode::GeneralInvalidInput, "'module_id' is required")
209 })?;
210 let error_limit = inputs
211 .get("error_limit")
212 .and_then(|v| v.as_u64())
213 .unwrap_or(10) as usize;
214
215 {
216 let reg = self.registry.lock().await;
217 if !reg.has(module_id) {
218 return Err(ModuleError::new(
219 ErrorCode::ModuleNotFound,
220 format!("Module '{}' not found", module_id),
221 ));
222 }
223 }
224
225 let snapshot = self.metrics.as_ref().map(|m| m.snapshot());
226 let (total_calls, errors) = snapshot
227 .as_ref()
228 .map(|s| extract_call_counts(s, module_id))
229 .unwrap_or((0, 0));
230 let error_rate = if total_calls > 0 {
231 errors as f64 / total_calls as f64
232 } else {
233 0.0
234 };
235 let status = classify_health(error_rate, total_calls, 0.01);
236
237 let recent_errors: Vec<serde_json::Value> = self
238 .error_history
239 .get(module_id, Some(error_limit))
240 .into_iter()
241 .map(|e| {
242 json!({
243 "code": e.error_code,
244 "message": e.message,
245 "ai_guidance": e.ai_guidance,
246 "count": e.count,
247 "first_occurred": e.first_occurred.to_rfc3339(),
248 "last_occurred": e.last_occurred.to_rfc3339(),
249 })
250 })
251 .collect();
252
253 let (avg_latency_ms, p99_latency_ms) = snapshot
254 .as_ref()
255 .map(|s| extract_latency_stats(s, module_id))
256 .unwrap_or((0.0, 0.0));
257
258 Ok(json!({
259 "module_id": module_id,
260 "status": status,
261 "total_calls": total_calls,
262 "error_count": errors,
263 "error_rate": error_rate,
264 "avg_latency_ms": avg_latency_ms,
265 "p99_latency_ms": p99_latency_ms,
266 "recent_errors": recent_errors,
267 }))
268 }
269}
270
271fn extract_call_counts(snapshot: &serde_json::Value, module_id: &str) -> (u64, u64) {
273 let counters = match snapshot.get("counters").and_then(|c| c.as_object()) {
274 Some(c) => c,
275 None => return (0, 0),
276 };
277 let mut total: u64 = 0;
278 let mut errors: u64 = 0;
279 let success_key = format!("apcore_module_calls_total|module_id={module_id},status=success");
280 let error_key = format!("apcore_module_calls_total|module_id={module_id},status=error");
281 if let Some(v) = counters.get(&success_key).and_then(|v| v.as_u64()) {
282 total += v;
283 }
284 if let Some(v) = counters.get(&error_key).and_then(|v| v.as_u64()) {
285 total += v;
286 errors = v;
287 }
288 (total, errors)
289}
290
291fn extract_latency_stats(snapshot: &serde_json::Value, module_id: &str) -> (f64, f64) {
296 let histograms = match snapshot.get("histograms").and_then(|h| h.as_object()) {
297 Some(h) => h,
298 None => return (0.0, 0.0),
299 };
300 let hist_key = format!("apcore_module_duration_seconds|module_id={module_id}");
301 let data = match histograms.get(&hist_key) {
302 Some(d) => d,
303 None => return (0.0, 0.0),
304 };
305 let sum = data.get("sum").and_then(|v| v.as_f64()).unwrap_or(0.0);
306 let count = data.get("count").and_then(|v| v.as_u64()).unwrap_or(0);
307 let avg_ms = if count > 0 {
308 (sum / count as f64) * 1000.0
309 } else {
310 0.0
311 };
312
313 let p99_ms = if let Some(buckets) = data.get("buckets").and_then(|b| b.as_array()) {
315 let target = (count as f64 * 0.99).ceil() as u64;
316 let mut p99 = 0.0_f64;
317 for bucket in buckets {
318 let le = bucket
319 .get("le")
320 .and_then(|v| v.as_f64())
321 .unwrap_or(f64::INFINITY);
322 let cnt = bucket.get("count").and_then(|v| v.as_u64()).unwrap_or(0);
323 if cnt >= target {
324 p99 = le * 1000.0; break;
326 }
327 }
328 p99
329 } else {
330 0.0
331 };
332
333 (avg_ms, p99_ms)
334}