mecha10_core/
health.rs

1//! Centralized Health Monitoring
2//!
3//! Provides centralized health monitoring and aggregation for all nodes in a Mecha10 system.
4//!
5//! # Features
6//!
7//! - Automatic health reporting from nodes
8//! - Centralized health aggregation in Redis
9//! - System-wide health status
10//! - Health history storage (24 hours, using Redis sorted sets)
11//! - Health history queries by time range
12//! - Alerting on health degradation
13//! - Dashboard/CLI integration
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────┐     Health      ┌───────────────┐
19//! │  Node A │────Status──────▶│               │
20//! └─────────┘                 │               │
21//!                             │  Health       │     ┌──────────┐
22//! ┌─────────┐     Health      │  Monitor      │────▶│Dashboard │
23//! │  Node B │────Status──────▶│  (Redis)      │     └──────────┘
24//! └─────────┘                 │               │
25//!                             │               │     ┌──────────┐
26//! ┌─────────┐     Health      │               │────▶│Alerting  │
27//! │  Node C │────Status──────▶│               │     └──────────┘
28//! └─────────┘                 └───────────────┘
29//! ```
30//!
31//! # Example
32//!
33//! ```rust
34//! use mecha10::prelude::*;
35//! use mecha10::health::{HealthMonitor, HealthReporter};
36//!
37//! # async fn example() -> Result<()> {
38//! // In your node
39//! let ctx = Context::new("my-node").await?;
40//! let reporter = HealthReporter::new(&ctx).await?;
41//!
42//! // Report health periodically
43//! let mut interval = interval_at_hz(1.0); // 1 Hz
44//! loop {
45//!     interval.tick().await;
46//!
47//!     let status = HealthStatus::healthy()
48//!         .with_diagnostic("cpu_usage", "25%")
49//!         .with_diagnostic("memory_mb", "128");
50//!
51//!     reporter.report(status).await?;
52//! }
53//!
54//! // Monitor system health
55//! let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
56//!
57//! // Get overall system health
58//! let system_health = monitor.system_health().await?;
59//! println!("System: {:?}", system_health.level);
60//!
61//! // Get health by node
62//! let node_health = monitor.node_health("my-node").await?;
63//!
64//! // List unhealthy nodes
65//! let unhealthy = monitor.unhealthy_nodes().await?;
66//! # Ok(())
67//! # }
68//! ```
69
70use crate::context::Context;
71use crate::error::{Mecha10Error, Result};
72use crate::messages::{HealthLevel, HealthStatus};
73use serde::{Deserialize, Serialize};
74use std::collections::HashMap;
75use std::sync::Arc;
76use std::time::Duration;
77use tokio::sync::RwLock;
78
79// ============================================================================
80// Health Reporter (Node Side)
81// ============================================================================
82
83/// Health reporter for nodes to publish their health status
84///
85/// Each node should create a HealthReporter and use it to periodically
86/// report health status to the centralized monitoring system.
87///
88/// # Example
89///
90/// ```rust
91/// use mecha10::prelude::*;
92/// use mecha10::health::HealthReporter;
93///
94/// # async fn example(ctx: &Context) -> Result<()> {
95/// let reporter = HealthReporter::new(ctx).await?;
96///
97/// // Report healthy status
98/// reporter.report(HealthStatus::healthy()).await?;
99///
100/// // Report degraded status
101/// reporter.report(
102///     HealthStatus::degraded("High CPU usage")
103///         .with_diagnostic("cpu", "95%")
104/// ).await?;
105/// # Ok(())
106/// # }
107/// ```
108pub struct HealthReporter {
109    node_id: String,
110    redis_client: redis::Client,
111    ctx: Arc<Context>,
112}
113
114impl HealthReporter {
115    /// Create a new health reporter
116    pub async fn new(ctx: &Context) -> Result<Self> {
117        let redis_url = Self::get_redis_url()?;
118        let redis_client = redis::Client::open(redis_url.as_str())
119            .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis for health reporting: {}", e)))?;
120
121        Ok(Self {
122            node_id: ctx.node_id().to_string(),
123            redis_client,
124            ctx: Arc::new(ctx.clone()),
125        })
126    }
127
128    /// Report health status
129    ///
130    /// Stores the health status in Redis with a TTL. If the node stops reporting,
131    /// the health status will expire and the node will be marked as unavailable.
132    pub async fn report(&self, status: HealthStatus) -> Result<()> {
133        use redis::AsyncCommands;
134
135        let mut conn = self
136            .redis_client
137            .get_multiplexed_async_connection()
138            .await
139            .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
140
141        // Create health report
142        let report = HealthReport {
143            node_id: self.node_id.clone(),
144            status,
145            reported_at: crate::prelude::now_micros(),
146        };
147
148        // Store in Redis with TTL
149        let key = format!("mecha10:health:{}", self.node_id);
150        let value = serde_json::to_string(&report)
151            .map_err(|e| Mecha10Error::Other(format!("Failed to serialize health report: {}", e)))?;
152
153        // Set with 60 second TTL (node should report more frequently)
154        conn.set_ex::<_, _, ()>(&key, value.clone(), 60)
155            .await
156            .map_err(|e| Mecha10Error::Other(format!("Failed to store health status: {}", e)))?;
157
158        // Store in health history (sorted set by timestamp)
159        let history_key = format!("mecha10:health:history:{}", self.node_id);
160        let score = report.reported_at as f64; // Use timestamp as score for sorting
161        conn.zadd::<_, _, _, ()>(&history_key, &value, score)
162            .await
163            .map_err(|e| Mecha10Error::Other(format!("Failed to store health history: {}", e)))?;
164
165        // Keep only last 24 hours of history (86400 seconds)
166        let cutoff_time = (report.reported_at - 86_400_000_000) as f64; // 24 hours ago in microseconds
167        conn.zrembyscore::<_, _, _, ()>(&history_key, f64::MIN, cutoff_time)
168            .await
169            .map_err(|e| Mecha10Error::Other(format!("Failed to clean old health history: {}", e)))?;
170
171        // Also publish to health topic for real-time monitoring
172        self.ctx.publish_raw("/system/health", &report.status).await?;
173
174        Ok(())
175    }
176
177    /// Report healthy status
178    pub async fn report_healthy(&self) -> Result<()> {
179        self.report(HealthStatus::healthy()).await
180    }
181
182    /// Report degraded status
183    pub async fn report_degraded(&self, message: impl Into<String>) -> Result<()> {
184        self.report(HealthStatus::degraded(message)).await
185    }
186
187    /// Report unhealthy status
188    pub async fn report_unhealthy(&self, message: impl Into<String>) -> Result<()> {
189        self.report(HealthStatus::unhealthy(message)).await
190    }
191
192    fn get_redis_url() -> Result<String> {
193        if let Ok(url) = std::env::var("REDIS_URL") {
194            return Ok(url);
195        }
196
197        // Load from infrastructure config
198        let environment = std::env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "development".to_string());
199
200        let config_path = format!("config/infrastructure.{}.yaml", environment);
201
202        use crate::config::load_config;
203        use crate::context::InfrastructureConfig;
204
205        let config: InfrastructureConfig = load_config(&config_path)
206            .map_err(|e| Mecha10Error::Configuration(format!("Failed to load infrastructure config: {}", e)))?;
207
208        Ok(config.redis.url)
209    }
210}
211
212// ============================================================================
213// Health Monitor (Centralized)
214// ============================================================================
215
216/// Health report stored in Redis
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct HealthReport {
219    /// Node identifier
220    pub node_id: String,
221    /// Health status at the time of report
222    pub status: HealthStatus,
223    /// Timestamp when the report was created (microseconds since epoch)
224    pub reported_at: u64,
225}
226
227/// System-wide health summary
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct SystemHealth {
230    /// Overall system health level
231    pub level: HealthLevel,
232
233    /// Total number of nodes
234    pub total_nodes: usize,
235
236    /// Number of healthy nodes
237    pub healthy_nodes: usize,
238
239    /// Number of degraded nodes
240    pub degraded_nodes: usize,
241
242    /// Number of unhealthy nodes
243    pub unhealthy_nodes: usize,
244
245    /// Number of unresponsive nodes (haven't reported recently)
246    pub unresponsive_nodes: usize,
247
248    /// Timestamp when this summary was generated
249    pub timestamp: u64,
250
251    /// Details by node
252    pub nodes: HashMap<String, NodeHealthInfo>,
253}
254
255/// Health information for a single node
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct NodeHealthInfo {
258    /// Node ID
259    pub node_id: String,
260
261    /// Current health status
262    pub status: HealthStatus,
263
264    /// When the status was last reported
265    pub reported_at: u64,
266
267    /// How long ago the status was reported (microseconds)
268    pub age_micros: u64,
269
270    /// Whether the node is responsive (reported recently)
271    pub responsive: bool,
272}
273
274/// Centralized health monitor
275///
276/// Aggregates health status from all nodes and provides system-wide health information.
277///
278/// # Example
279///
280/// ```rust
281/// use mecha10::health::HealthMonitor;
282///
283/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
284/// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
285///
286/// // Get system-wide health
287/// let health = monitor.system_health().await?;
288/// println!("System health: {:?}", health.level);
289/// println!("Healthy: {}/{}", health.healthy_nodes, health.total_nodes);
290///
291/// // List unhealthy nodes
292/// for node in monitor.unhealthy_nodes().await? {
293///     println!("Unhealthy: {} - {}", node.node_id, node.status.message);
294/// }
295/// # Ok(())
296/// # }
297/// ```
298pub struct HealthMonitor {
299    redis_client: redis::Client,
300    cache: Arc<RwLock<Option<(SystemHealth, std::time::Instant)>>>,
301    cache_ttl: Duration,
302}
303
304impl HealthMonitor {
305    /// Connect to Redis for health monitoring
306    pub async fn connect(redis_url: &str) -> Result<Self> {
307        let redis_client = redis::Client::open(redis_url)
308            .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
309
310        Ok(Self {
311            redis_client,
312            cache: Arc::new(RwLock::new(None)),
313            cache_ttl: Duration::from_secs(5), // Cache for 5 seconds
314        })
315    }
316
317    /// Get system-wide health summary
318    pub async fn system_health(&self) -> Result<SystemHealth> {
319        // Check cache first
320        {
321            let cache = self.cache.read().await;
322            if let Some((health, cached_at)) = cache.as_ref() {
323                if cached_at.elapsed() < self.cache_ttl {
324                    return Ok(health.clone());
325                }
326            }
327        }
328
329        // Fetch fresh data
330        let health = self.fetch_system_health().await?;
331
332        // Update cache
333        {
334            let mut cache = self.cache.write().await;
335            *cache = Some((health.clone(), std::time::Instant::now()));
336        }
337
338        Ok(health)
339    }
340
341    async fn fetch_system_health(&self) -> Result<SystemHealth> {
342        use redis::AsyncCommands;
343
344        let mut conn = self
345            .redis_client
346            .get_multiplexed_async_connection()
347            .await
348            .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
349
350        // Get all health keys
351        let pattern = "mecha10:health:*";
352        let keys: Vec<String> = conn
353            .keys(pattern)
354            .await
355            .map_err(|e| Mecha10Error::Other(format!("Failed to list health keys: {}", e)))?;
356
357        let mut nodes = HashMap::new();
358        let mut healthy_count = 0;
359        let mut degraded_count = 0;
360        let mut unhealthy_count = 0;
361        let mut unresponsive_count = 0;
362
363        let now = crate::prelude::now_micros();
364        let responsive_threshold_micros = 90_000_000; // 90 seconds
365
366        for key in keys {
367            let value: Option<String> = conn
368                .get(&key)
369                .await
370                .map_err(|e| Mecha10Error::Other(format!("Failed to get health value: {}", e)))?;
371
372            if let Some(value) = value {
373                if let Ok(report) = serde_json::from_str::<HealthReport>(&value) {
374                    let age_micros = now.saturating_sub(report.reported_at);
375                    let responsive = age_micros < responsive_threshold_micros;
376
377                    if !responsive {
378                        unresponsive_count += 1;
379                    } else {
380                        match report.status.level {
381                            HealthLevel::Ok => healthy_count += 1,
382                            HealthLevel::Degraded => degraded_count += 1,
383                            HealthLevel::Error => unhealthy_count += 1,
384                            HealthLevel::Unknown => unresponsive_count += 1,
385                        }
386                    }
387
388                    nodes.insert(
389                        report.node_id.clone(),
390                        NodeHealthInfo {
391                            node_id: report.node_id,
392                            status: report.status,
393                            reported_at: report.reported_at,
394                            age_micros,
395                            responsive,
396                        },
397                    );
398                }
399            }
400        }
401
402        // Determine overall system health
403        let level = if unhealthy_count > 0 || unresponsive_count > nodes.len() / 2 {
404            HealthLevel::Error
405        } else if degraded_count > 0 || unresponsive_count > 0 {
406            HealthLevel::Degraded
407        } else if healthy_count > 0 {
408            HealthLevel::Ok
409        } else {
410            HealthLevel::Unknown
411        };
412
413        Ok(SystemHealth {
414            level,
415            total_nodes: nodes.len(),
416            healthy_nodes: healthy_count,
417            degraded_nodes: degraded_count,
418            unhealthy_nodes: unhealthy_count,
419            unresponsive_nodes: unresponsive_count,
420            timestamp: now,
421            nodes,
422        })
423    }
424
425    /// Get health status for a specific node
426    pub async fn node_health(&self, node_id: &str) -> Result<Option<NodeHealthInfo>> {
427        let system_health = self.system_health().await?;
428        Ok(system_health.nodes.get(node_id).cloned())
429    }
430
431    /// Get list of all healthy nodes
432    pub async fn healthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
433        let system_health = self.system_health().await?;
434        Ok(system_health
435            .nodes
436            .values()
437            .filter(|n| n.responsive && n.status.level == HealthLevel::Ok)
438            .cloned()
439            .collect())
440    }
441
442    /// Get list of degraded nodes
443    pub async fn degraded_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
444        let system_health = self.system_health().await?;
445        Ok(system_health
446            .nodes
447            .values()
448            .filter(|n| n.responsive && n.status.level == HealthLevel::Degraded)
449            .cloned()
450            .collect())
451    }
452
453    /// Get list of unhealthy nodes
454    pub async fn unhealthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
455        let system_health = self.system_health().await?;
456        Ok(system_health
457            .nodes
458            .values()
459            .filter(|n| n.responsive && n.status.level == HealthLevel::Error)
460            .cloned()
461            .collect())
462    }
463
464    /// Get list of unresponsive nodes (haven't reported recently)
465    pub async fn unresponsive_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
466        let system_health = self.system_health().await?;
467        Ok(system_health
468            .nodes
469            .values()
470            .filter(|n| !n.responsive)
471            .cloned()
472            .collect())
473    }
474
475    /// Clear the cache (force fresh data on next query)
476    pub async fn clear_cache(&self) {
477        let mut cache = self.cache.write().await;
478        *cache = None;
479    }
480
481    /// Get health history for a node
482    ///
483    /// Returns health reports for the specified node over the given duration.
484    /// History is stored for up to 24 hours.
485    ///
486    /// # Arguments
487    ///
488    /// * `node_id` - The node to query history for
489    /// * `duration` - How far back to retrieve history (e.g., Duration::from_secs(3600) for 1 hour)
490    ///
491    /// # Example
492    ///
493    /// ```rust,no_run
494    /// # use mecha10::prelude::*;
495    /// # use mecha10::health::HealthMonitor;
496    /// # use std::time::Duration;
497    /// # async fn example() -> Result<()> {
498    /// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
499    ///
500    /// // Get last hour of health history
501    /// let history = monitor.node_health_history("camera_node", Duration::from_secs(3600)).await?;
502    /// println!("Found {} health reports in the last hour", history.len());
503    /// # Ok(())
504    /// # }
505    /// ```
506    pub async fn node_health_history(&self, node_id: &str, duration: Duration) -> Result<Vec<HealthReport>> {
507        use redis::AsyncCommands;
508
509        let mut conn = self
510            .redis_client
511            .get_multiplexed_async_connection()
512            .await
513            .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
514
515        let history_key = format!("mecha10:health:history:{}", node_id);
516
517        // Calculate time range (in microseconds)
518        let now = crate::prelude::now_micros();
519        let duration_micros = duration.as_micros() as u64;
520        let start_time = now.saturating_sub(duration_micros);
521
522        // Query sorted set by score (timestamp)
523        let results: Vec<String> = conn
524            .zrangebyscore(&history_key, start_time as f64, now as f64)
525            .await
526            .map_err(|e| Mecha10Error::Other(format!("Failed to retrieve health history: {}", e)))?;
527
528        // Deserialize results
529        let mut reports = Vec::new();
530        for json_str in results {
531            match serde_json::from_str::<HealthReport>(&json_str) {
532                Ok(report) => reports.push(report),
533                Err(e) => {
534                    eprintln!("Failed to deserialize health report: {}", e);
535                    continue;
536                }
537            }
538        }
539
540        Ok(reports)
541    }
542}
543
544// ============================================================================
545// Auto-Reporting Extension
546// ============================================================================
547
548/// Extension trait for Context to enable automatic health reporting
549#[async_trait::async_trait]
550pub trait HealthReportingExt {
551    /// Start automatic health reporting
552    ///
553    /// Spawns a background task that periodically reports health status.
554    /// The health_check_fn is called to generate the current health status.
555    ///
556    /// # Example
557    ///
558    /// ```rust
559    /// use mecha10::prelude::*;
560    /// use mecha10::health::HealthReportingExt;
561    ///
562    /// # async fn example(ctx: &Context) -> Result<()> {
563    /// // Report healthy status every second
564    /// ctx.start_health_reporting(
565    ///     Duration::from_secs(1),
566    ///     || async { HealthStatus::healthy() }
567    /// ).await?;
568    /// # Ok(())
569    /// # }
570    /// ```
571    async fn start_health_reporting<F, Fut>(&self, interval: Duration, health_check_fn: F) -> Result<()>
572    where
573        F: Fn() -> Fut + Send + Sync + 'static,
574        Fut: std::future::Future<Output = HealthStatus> + Send;
575}
576
577#[async_trait::async_trait]
578impl HealthReportingExt for Context {
579    async fn start_health_reporting<F, Fut>(&self, interval: Duration, health_check_fn: F) -> Result<()>
580    where
581        F: Fn() -> Fut + Send + Sync + 'static,
582        Fut: std::future::Future<Output = HealthStatus> + Send,
583    {
584        let reporter = HealthReporter::new(self).await?;
585
586        tokio::spawn(async move {
587            let mut ticker = tokio::time::interval(interval);
588
589            loop {
590                ticker.tick().await;
591
592                let status = health_check_fn().await;
593
594                if let Err(e) = reporter.report(status).await {
595                    tracing::error!("Failed to report health: {}", e);
596                }
597            }
598        });
599
600        Ok(())
601    }
602}