mecha10_core/health.rs
1//! Centralized Health Monitoring
2//!
3//! Provides centralized health monitoring and aggregation for all nodes in a Mecha10 system.
4//!
5//! # Features
6//!
7//! - Automatic health reporting from nodes
8//! - Centralized health aggregation in Redis
9//! - System-wide health status
10//! - Health history storage (24 hours, using Redis sorted sets)
11//! - Health history queries by time range
12//! - Alerting on health degradation
13//! - Dashboard/CLI integration
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────┐ Health ┌───────────────┐
19//! │ Node A │────Status──────▶│ │
20//! └─────────┘ │ │
21//! │ Health │ ┌──────────┐
22//! ┌─────────┐ Health │ Monitor │────▶│Dashboard │
23//! │ Node B │────Status──────▶│ (Redis) │ └──────────┘
24//! └─────────┘ │ │
25//! │ │ ┌──────────┐
26//! ┌─────────┐ Health │ │────▶│Alerting │
27//! │ Node C │────Status──────▶│ │ └──────────┘
28//! └─────────┘ └───────────────┘
29//! ```
30//!
31//! # Example
32//!
33//! ```rust
34//! use mecha10::prelude::*;
35//! use mecha10::health::{HealthMonitor, HealthReporter};
36//!
37//! # async fn example() -> Result<()> {
38//! // In your node
39//! let ctx = Context::new("my-node").await?;
40//! let reporter = HealthReporter::new(&ctx).await?;
41//!
42//! // Report health periodically
43//! let mut interval = interval_at_hz(1.0); // 1 Hz
44//! loop {
45//! interval.tick().await;
46//!
47//! let status = HealthStatus::healthy()
48//! .with_diagnostic("cpu_usage", "25%")
49//! .with_diagnostic("memory_mb", "128");
50//!
51//! reporter.report(status).await?;
52//! }
53//!
54//! // Monitor system health
55//! let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
56//!
57//! // Get overall system health
58//! let system_health = monitor.system_health().await?;
59//! println!("System: {:?}", system_health.level);
60//!
61//! // Get health by node
62//! let node_health = monitor.node_health("my-node").await?;
63//!
64//! // List unhealthy nodes
65//! let unhealthy = monitor.unhealthy_nodes().await?;
66//! # Ok(())
67//! # }
68//! ```
69
70use crate::context::Context;
71use crate::error::{Mecha10Error, Result};
72use crate::messages::{HealthLevel, HealthStatus};
73use serde::{Deserialize, Serialize};
74use std::collections::HashMap;
75use std::sync::Arc;
76use std::time::Duration;
77use tokio::sync::RwLock;
78
79// ============================================================================
80// Health Reporter (Node Side)
81// ============================================================================
82
83/// Health reporter for nodes to publish their health status
84///
85/// Each node should create a HealthReporter and use it to periodically
86/// report health status to the centralized monitoring system.
87///
88/// # Example
89///
90/// ```rust
91/// use mecha10::prelude::*;
92/// use mecha10::health::HealthReporter;
93///
94/// # async fn example(ctx: &Context) -> Result<()> {
95/// let reporter = HealthReporter::new(ctx).await?;
96///
97/// // Report healthy status
98/// reporter.report(HealthStatus::healthy()).await?;
99///
100/// // Report degraded status
101/// reporter.report(
102/// HealthStatus::degraded("High CPU usage")
103/// .with_diagnostic("cpu", "95%")
104/// ).await?;
105/// # Ok(())
106/// # }
107/// ```
108pub struct HealthReporter {
109 node_id: String,
110 redis_client: redis::Client,
111 ctx: Arc<Context>,
112}
113
114impl HealthReporter {
115 /// Create a new health reporter
116 pub async fn new(ctx: &Context) -> Result<Self> {
117 let redis_url = Self::get_redis_url()?;
118 let redis_client = redis::Client::open(redis_url.as_str())
119 .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis for health reporting: {}", e)))?;
120
121 Ok(Self {
122 node_id: ctx.node_id().to_string(),
123 redis_client,
124 ctx: Arc::new(ctx.clone()),
125 })
126 }
127
128 /// Report health status
129 ///
130 /// Stores the health status in Redis with a TTL. If the node stops reporting,
131 /// the health status will expire and the node will be marked as unavailable.
132 pub async fn report(&self, status: HealthStatus) -> Result<()> {
133 use redis::AsyncCommands;
134
135 let mut conn = self
136 .redis_client
137 .get_multiplexed_async_connection()
138 .await
139 .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
140
141 // Create health report
142 let report = HealthReport {
143 node_id: self.node_id.clone(),
144 status,
145 reported_at: crate::prelude::now_micros(),
146 };
147
148 // Store in Redis with TTL
149 let key = format!("mecha10:health:{}", self.node_id);
150 let value = serde_json::to_string(&report)
151 .map_err(|e| Mecha10Error::Other(format!("Failed to serialize health report: {}", e)))?;
152
153 // Set with 60 second TTL (node should report more frequently)
154 conn.set_ex::<_, _, ()>(&key, value.clone(), 60)
155 .await
156 .map_err(|e| Mecha10Error::Other(format!("Failed to store health status: {}", e)))?;
157
158 // Store in health history (sorted set by timestamp)
159 let history_key = format!("mecha10:health:history:{}", self.node_id);
160 let score = report.reported_at as f64; // Use timestamp as score for sorting
161 conn.zadd::<_, _, _, ()>(&history_key, &value, score)
162 .await
163 .map_err(|e| Mecha10Error::Other(format!("Failed to store health history: {}", e)))?;
164
165 // Keep only last 24 hours of history (86400 seconds)
166 let cutoff_time = (report.reported_at - 86_400_000_000) as f64; // 24 hours ago in microseconds
167 conn.zrembyscore::<_, _, _, ()>(&history_key, f64::MIN, cutoff_time)
168 .await
169 .map_err(|e| Mecha10Error::Other(format!("Failed to clean old health history: {}", e)))?;
170
171 // Also publish to health topic for real-time monitoring
172 self.ctx.publish_raw("/system/health", &report.status).await?;
173
174 Ok(())
175 }
176
177 /// Report healthy status
178 pub async fn report_healthy(&self) -> Result<()> {
179 self.report(HealthStatus::healthy()).await
180 }
181
182 /// Report degraded status
183 pub async fn report_degraded(&self, message: impl Into<String>) -> Result<()> {
184 self.report(HealthStatus::degraded(message)).await
185 }
186
187 /// Report unhealthy status
188 pub async fn report_unhealthy(&self, message: impl Into<String>) -> Result<()> {
189 self.report(HealthStatus::unhealthy(message)).await
190 }
191
192 fn get_redis_url() -> Result<String> {
193 if let Ok(url) = std::env::var("REDIS_URL") {
194 return Ok(url);
195 }
196
197 // Load from infrastructure config
198 let environment = std::env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "development".to_string());
199
200 let config_path = format!("config/infrastructure.{}.yaml", environment);
201
202 use crate::config::load_config;
203 use crate::context::InfrastructureConfig;
204
205 let config: InfrastructureConfig = load_config(&config_path)
206 .map_err(|e| Mecha10Error::Configuration(format!("Failed to load infrastructure config: {}", e)))?;
207
208 Ok(config.redis.url)
209 }
210}
211
212// ============================================================================
213// Health Monitor (Centralized)
214// ============================================================================
215
216/// Health report stored in Redis
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct HealthReport {
219 /// Node identifier
220 pub node_id: String,
221 /// Health status at the time of report
222 pub status: HealthStatus,
223 /// Timestamp when the report was created (microseconds since epoch)
224 pub reported_at: u64,
225}
226
227/// System-wide health summary
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct SystemHealth {
230 /// Overall system health level
231 pub level: HealthLevel,
232
233 /// Total number of nodes
234 pub total_nodes: usize,
235
236 /// Number of healthy nodes
237 pub healthy_nodes: usize,
238
239 /// Number of degraded nodes
240 pub degraded_nodes: usize,
241
242 /// Number of unhealthy nodes
243 pub unhealthy_nodes: usize,
244
245 /// Number of unresponsive nodes (haven't reported recently)
246 pub unresponsive_nodes: usize,
247
248 /// Timestamp when this summary was generated
249 pub timestamp: u64,
250
251 /// Details by node
252 pub nodes: HashMap<String, NodeHealthInfo>,
253}
254
255/// Health information for a single node
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct NodeHealthInfo {
258 /// Node ID
259 pub node_id: String,
260
261 /// Current health status
262 pub status: HealthStatus,
263
264 /// When the status was last reported
265 pub reported_at: u64,
266
267 /// How long ago the status was reported (microseconds)
268 pub age_micros: u64,
269
270 /// Whether the node is responsive (reported recently)
271 pub responsive: bool,
272}
273
274/// Centralized health monitor
275///
276/// Aggregates health status from all nodes and provides system-wide health information.
277///
278/// # Example
279///
280/// ```rust
281/// use mecha10::health::HealthMonitor;
282///
283/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
284/// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
285///
286/// // Get system-wide health
287/// let health = monitor.system_health().await?;
288/// println!("System health: {:?}", health.level);
289/// println!("Healthy: {}/{}", health.healthy_nodes, health.total_nodes);
290///
291/// // List unhealthy nodes
292/// for node in monitor.unhealthy_nodes().await? {
293/// println!("Unhealthy: {} - {}", node.node_id, node.status.message);
294/// }
295/// # Ok(())
296/// # }
297/// ```
298pub struct HealthMonitor {
299 redis_client: redis::Client,
300 cache: Arc<RwLock<Option<(SystemHealth, std::time::Instant)>>>,
301 cache_ttl: Duration,
302}
303
304impl HealthMonitor {
305 /// Connect to Redis for health monitoring
306 pub async fn connect(redis_url: &str) -> Result<Self> {
307 let redis_client = redis::Client::open(redis_url)
308 .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
309
310 Ok(Self {
311 redis_client,
312 cache: Arc::new(RwLock::new(None)),
313 cache_ttl: Duration::from_secs(5), // Cache for 5 seconds
314 })
315 }
316
317 /// Get system-wide health summary
318 pub async fn system_health(&self) -> Result<SystemHealth> {
319 // Check cache first
320 {
321 let cache = self.cache.read().await;
322 if let Some((health, cached_at)) = cache.as_ref() {
323 if cached_at.elapsed() < self.cache_ttl {
324 return Ok(health.clone());
325 }
326 }
327 }
328
329 // Fetch fresh data
330 let health = self.fetch_system_health().await?;
331
332 // Update cache
333 {
334 let mut cache = self.cache.write().await;
335 *cache = Some((health.clone(), std::time::Instant::now()));
336 }
337
338 Ok(health)
339 }
340
341 async fn fetch_system_health(&self) -> Result<SystemHealth> {
342 use redis::AsyncCommands;
343
344 let mut conn = self
345 .redis_client
346 .get_multiplexed_async_connection()
347 .await
348 .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
349
350 // Get all health keys
351 let pattern = "mecha10:health:*";
352 let keys: Vec<String> = conn
353 .keys(pattern)
354 .await
355 .map_err(|e| Mecha10Error::Other(format!("Failed to list health keys: {}", e)))?;
356
357 let mut nodes = HashMap::new();
358 let mut healthy_count = 0;
359 let mut degraded_count = 0;
360 let mut unhealthy_count = 0;
361 let mut unresponsive_count = 0;
362
363 let now = crate::prelude::now_micros();
364 let responsive_threshold_micros = 90_000_000; // 90 seconds
365
366 for key in keys {
367 let value: Option<String> = conn
368 .get(&key)
369 .await
370 .map_err(|e| Mecha10Error::Other(format!("Failed to get health value: {}", e)))?;
371
372 if let Some(value) = value {
373 if let Ok(report) = serde_json::from_str::<HealthReport>(&value) {
374 let age_micros = now.saturating_sub(report.reported_at);
375 let responsive = age_micros < responsive_threshold_micros;
376
377 if !responsive {
378 unresponsive_count += 1;
379 } else {
380 match report.status.level {
381 HealthLevel::Ok => healthy_count += 1,
382 HealthLevel::Degraded => degraded_count += 1,
383 HealthLevel::Error => unhealthy_count += 1,
384 HealthLevel::Unknown => unresponsive_count += 1,
385 }
386 }
387
388 nodes.insert(
389 report.node_id.clone(),
390 NodeHealthInfo {
391 node_id: report.node_id,
392 status: report.status,
393 reported_at: report.reported_at,
394 age_micros,
395 responsive,
396 },
397 );
398 }
399 }
400 }
401
402 // Determine overall system health
403 let level = if unhealthy_count > 0 || unresponsive_count > nodes.len() / 2 {
404 HealthLevel::Error
405 } else if degraded_count > 0 || unresponsive_count > 0 {
406 HealthLevel::Degraded
407 } else if healthy_count > 0 {
408 HealthLevel::Ok
409 } else {
410 HealthLevel::Unknown
411 };
412
413 Ok(SystemHealth {
414 level,
415 total_nodes: nodes.len(),
416 healthy_nodes: healthy_count,
417 degraded_nodes: degraded_count,
418 unhealthy_nodes: unhealthy_count,
419 unresponsive_nodes: unresponsive_count,
420 timestamp: now,
421 nodes,
422 })
423 }
424
425 /// Get health status for a specific node
426 pub async fn node_health(&self, node_id: &str) -> Result<Option<NodeHealthInfo>> {
427 let system_health = self.system_health().await?;
428 Ok(system_health.nodes.get(node_id).cloned())
429 }
430
431 /// Get list of all healthy nodes
432 pub async fn healthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
433 let system_health = self.system_health().await?;
434 Ok(system_health
435 .nodes
436 .values()
437 .filter(|n| n.responsive && n.status.level == HealthLevel::Ok)
438 .cloned()
439 .collect())
440 }
441
442 /// Get list of degraded nodes
443 pub async fn degraded_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
444 let system_health = self.system_health().await?;
445 Ok(system_health
446 .nodes
447 .values()
448 .filter(|n| n.responsive && n.status.level == HealthLevel::Degraded)
449 .cloned()
450 .collect())
451 }
452
453 /// Get list of unhealthy nodes
454 pub async fn unhealthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
455 let system_health = self.system_health().await?;
456 Ok(system_health
457 .nodes
458 .values()
459 .filter(|n| n.responsive && n.status.level == HealthLevel::Error)
460 .cloned()
461 .collect())
462 }
463
464 /// Get list of unresponsive nodes (haven't reported recently)
465 pub async fn unresponsive_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
466 let system_health = self.system_health().await?;
467 Ok(system_health
468 .nodes
469 .values()
470 .filter(|n| !n.responsive)
471 .cloned()
472 .collect())
473 }
474
475 /// Clear the cache (force fresh data on next query)
476 pub async fn clear_cache(&self) {
477 let mut cache = self.cache.write().await;
478 *cache = None;
479 }
480
481 /// Get health history for a node
482 ///
483 /// Returns health reports for the specified node over the given duration.
484 /// History is stored for up to 24 hours.
485 ///
486 /// # Arguments
487 ///
488 /// * `node_id` - The node to query history for
489 /// * `duration` - How far back to retrieve history (e.g., Duration::from_secs(3600) for 1 hour)
490 ///
491 /// # Example
492 ///
493 /// ```rust,no_run
494 /// # use mecha10::prelude::*;
495 /// # use mecha10::health::HealthMonitor;
496 /// # use std::time::Duration;
497 /// # async fn example() -> Result<()> {
498 /// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
499 ///
500 /// // Get last hour of health history
501 /// let history = monitor.node_health_history("camera_node", Duration::from_secs(3600)).await?;
502 /// println!("Found {} health reports in the last hour", history.len());
503 /// # Ok(())
504 /// # }
505 /// ```
506 pub async fn node_health_history(&self, node_id: &str, duration: Duration) -> Result<Vec<HealthReport>> {
507 use redis::AsyncCommands;
508
509 let mut conn = self
510 .redis_client
511 .get_multiplexed_async_connection()
512 .await
513 .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
514
515 let history_key = format!("mecha10:health:history:{}", node_id);
516
517 // Calculate time range (in microseconds)
518 let now = crate::prelude::now_micros();
519 let duration_micros = duration.as_micros() as u64;
520 let start_time = now.saturating_sub(duration_micros);
521
522 // Query sorted set by score (timestamp)
523 let results: Vec<String> = conn
524 .zrangebyscore(&history_key, start_time as f64, now as f64)
525 .await
526 .map_err(|e| Mecha10Error::Other(format!("Failed to retrieve health history: {}", e)))?;
527
528 // Deserialize results
529 let mut reports = Vec::new();
530 for json_str in results {
531 match serde_json::from_str::<HealthReport>(&json_str) {
532 Ok(report) => reports.push(report),
533 Err(e) => {
534 eprintln!("Failed to deserialize health report: {}", e);
535 continue;
536 }
537 }
538 }
539
540 Ok(reports)
541 }
542}
543
544// ============================================================================
545// Auto-Reporting Extension
546// ============================================================================
547
548/// Extension trait for Context to enable automatic health reporting
549#[async_trait::async_trait]
550pub trait HealthReportingExt {
551 /// Start automatic health reporting
552 ///
553 /// Spawns a background task that periodically reports health status.
554 /// The health_check_fn is called to generate the current health status.
555 ///
556 /// # Example
557 ///
558 /// ```rust
559 /// use mecha10::prelude::*;
560 /// use mecha10::health::HealthReportingExt;
561 ///
562 /// # async fn example(ctx: &Context) -> Result<()> {
563 /// // Report healthy status every second
564 /// ctx.start_health_reporting(
565 /// Duration::from_secs(1),
566 /// || async { HealthStatus::healthy() }
567 /// ).await?;
568 /// # Ok(())
569 /// # }
570 /// ```
571 async fn start_health_reporting<F, Fut>(&self, interval: Duration, health_check_fn: F) -> Result<()>
572 where
573 F: Fn() -> Fut + Send + Sync + 'static,
574 Fut: std::future::Future<Output = HealthStatus> + Send;
575}
576
577#[async_trait::async_trait]
578impl HealthReportingExt for Context {
579 async fn start_health_reporting<F, Fut>(&self, interval: Duration, health_check_fn: F) -> Result<()>
580 where
581 F: Fn() -> Fut + Send + Sync + 'static,
582 Fut: std::future::Future<Output = HealthStatus> + Send,
583 {
584 let reporter = HealthReporter::new(self).await?;
585
586 tokio::spawn(async move {
587 let mut ticker = tokio::time::interval(interval);
588
589 loop {
590 ticker.tick().await;
591
592 let status = health_check_fn().await;
593
594 if let Err(e) = reporter.report(status).await {
595 tracing::error!("Failed to report health: {}", e);
596 }
597 }
598 });
599
600 Ok(())
601 }
602}