mecha10_core/
health.rs

1//! Centralized Health Monitoring
2//!
3//! Provides centralized health monitoring and aggregation for all nodes in a Mecha10 system.
4//!
5//! # Features
6//!
7//! - Automatic health reporting from nodes
8//! - Centralized health aggregation in Redis
9//! - System-wide health status
10//! - Health history storage (24 hours, using Redis sorted sets)
11//! - Health history queries by time range
12//! - Alerting on health degradation
13//! - Dashboard/CLI integration
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────┐     Health      ┌───────────────┐
19//! │  Node A │────Status──────▶│               │
20//! └─────────┘                 │               │
21//!                             │  Health       │     ┌──────────┐
22//! ┌─────────┐     Health      │  Monitor      │────▶│Dashboard │
23//! │  Node B │────Status──────▶│  (Redis)      │     └──────────┘
24//! └─────────┘                 │               │
25//!                             │               │     ┌──────────┐
26//! ┌─────────┐     Health      │               │────▶│Alerting  │
27//! │  Node C │────Status──────▶│               │     └──────────┘
28//! └─────────┘                 └───────────────┘
29//! ```
30//!
31//! # Example
32//!
33//! ```rust
34//! use mecha10::prelude::*;
35//! use mecha10::health::{HealthMonitor, HealthReporter};
36//!
37//! # async fn example() -> Result<()> {
38//! // In your node
39//! let ctx = Context::new("my-node").await?;
40//! let reporter = HealthReporter::new(&ctx).await?;
41//!
42//! // Report health periodically
43//! let mut interval = interval_at_hz(1.0); // 1 Hz
44//! loop {
45//!     interval.tick().await;
46//!
47//!     let status = HealthStatus::healthy()
48//!         .with_diagnostic("cpu_usage", "25%")
49//!         .with_diagnostic("memory_mb", "128");
50//!
51//!     reporter.report(status).await?;
52//! }
53//!
54//! // Monitor system health
55//! let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
56//!
57//! // Get overall system health
58//! let system_health = monitor.system_health().await?;
59//! println!("System: {:?}", system_health.level);
60//!
61//! // Get health by node
62//! let node_health = monitor.node_health("my-node").await?;
63//!
64//! // List unhealthy nodes
65//! let unhealthy = monitor.unhealthy_nodes().await?;
66//! # Ok(())
67//! # }
68//! ```
69
70use crate::context::Context;
71use crate::error::{Mecha10Error, Result};
72use crate::messages::{HealthLevel, HealthStatus, Message};
73use crate::node::HealthPriority;
74use serde::{Deserialize, Serialize};
75use std::collections::HashMap;
76use std::sync::Arc;
77use std::time::Duration;
78use tokio::sync::RwLock;
79
80// ============================================================================
81// Health Reporter (Node Side)
82// ============================================================================
83
84/// Health reporter for nodes to publish their health status
85///
86/// Each node should create a HealthReporter and use it to periodically
87/// report health status to the centralized monitoring system.
88///
89/// # Example
90///
91/// ```rust
92/// use mecha10::prelude::*;
93/// use mecha10::health::HealthReporter;
94///
95/// # async fn example(ctx: &Context) -> Result<()> {
96/// let reporter = HealthReporter::new(ctx).await?;
97///
98/// // Report healthy status
99/// reporter.report(HealthStatus::healthy()).await?;
100///
101/// // Report degraded status
102/// reporter.report(
103///     HealthStatus::degraded("High CPU usage")
104///         .with_diagnostic("cpu", "95%")
105/// ).await?;
106/// # Ok(())
107/// # }
108/// ```
109pub struct HealthReporter {
110    node_id: String,
111    redis_client: redis::Client,
112    ctx: Arc<Context>,
113    /// Health priority level
114    priority: HealthPriority,
115    /// TTL for Redis health key in seconds
116    ttl_seconds: u64,
117    /// Whether to store health history
118    store_history: bool,
119}
120
121impl HealthReporter {
122    /// Create a new health reporter with default settings (Normal priority)
123    pub async fn new(ctx: &Context) -> Result<Self> {
124        Self::with_config(ctx, HealthPriority::Normal, 60, true).await
125    }
126
127    /// Create a new health reporter with custom configuration
128    ///
129    /// # Arguments
130    ///
131    /// * `ctx` - Node context
132    /// * `priority` - Health priority level (affects timeout calculations)
133    /// * `ttl_seconds` - TTL for the Redis health key
134    /// * `store_history` - Whether to store health history in sorted sets
135    pub async fn with_config(
136        ctx: &Context,
137        priority: HealthPriority,
138        ttl_seconds: u64,
139        store_history: bool,
140    ) -> Result<Self> {
141        let redis_url = Self::get_redis_url()?;
142        let redis_client = redis::Client::open(redis_url.as_str())
143            .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis for health reporting: {}", e)))?;
144
145        Ok(Self {
146            node_id: ctx.node_id().to_string(),
147            redis_client,
148            ctx: Arc::new(ctx.clone()),
149            priority,
150            ttl_seconds,
151            store_history,
152        })
153    }
154
155    /// Report health status
156    ///
157    /// Stores the health status in Redis with a TTL. If the node stops reporting,
158    /// the health status will expire and the node will be marked as unavailable.
159    pub async fn report(&self, status: HealthStatus) -> Result<()> {
160        use redis::AsyncCommands;
161
162        let mut conn = self
163            .redis_client
164            .get_multiplexed_async_connection()
165            .await
166            .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
167
168        // Create health report with priority
169        let report = HealthReport {
170            node_id: self.node_id.clone(),
171            status,
172            priority: self.priority,
173            reported_at: crate::prelude::now_micros(),
174        };
175
176        // Store in Redis with configured TTL
177        let key = format!("mecha10:health:{}", self.node_id);
178        let value = serde_json::to_string(&report)
179            .map_err(|e| Mecha10Error::Other(format!("Failed to serialize health report: {}", e)))?;
180
181        conn.set_ex::<_, _, ()>(&key, value.clone(), self.ttl_seconds)
182            .await
183            .map_err(|e| Mecha10Error::Other(format!("Failed to store health status: {}", e)))?;
184
185        // Store in health history if enabled
186        if self.store_history {
187            let history_key = format!("mecha10:health:history:{}", self.node_id);
188            let score = report.reported_at as f64; // Use timestamp as score for sorting
189            conn.zadd::<_, _, _, ()>(&history_key, &value, score)
190                .await
191                .map_err(|e| Mecha10Error::Other(format!("Failed to store health history: {}", e)))?;
192
193            // Keep only last 24 hours of history (86400 seconds)
194            let cutoff_time = (report.reported_at - 86_400_000_000) as f64; // 24 hours ago in microseconds
195            conn.zrembyscore::<_, _, _, ()>(&history_key, f64::MIN, cutoff_time)
196                .await
197                .map_err(|e| Mecha10Error::Other(format!("Failed to clean old health history: {}", e)))?;
198        }
199
200        // Publish to health topic for real-time monitoring (includes priority)
201        self.ctx.publish_raw("/system/health", &report).await?;
202
203        Ok(())
204    }
205
206    /// Report healthy status
207    pub async fn report_healthy(&self) -> Result<()> {
208        self.report(HealthStatus::healthy()).await
209    }
210
211    /// Report degraded status
212    pub async fn report_degraded(&self, message: impl Into<String>) -> Result<()> {
213        self.report(HealthStatus::degraded(message)).await
214    }
215
216    /// Report unhealthy status
217    pub async fn report_unhealthy(&self, message: impl Into<String>) -> Result<()> {
218        self.report(HealthStatus::unhealthy(message)).await
219    }
220
221    fn get_redis_url() -> Result<String> {
222        if let Ok(url) = std::env::var("REDIS_URL") {
223            return Ok(url);
224        }
225
226        // Load from infrastructure config
227        let environment = std::env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "development".to_string());
228
229        let config_path = format!("config/infrastructure.{}.yaml", environment);
230
231        use crate::config::load_config;
232        use crate::context::InfrastructureConfig;
233
234        let config: InfrastructureConfig = load_config(&config_path)
235            .map_err(|e| Mecha10Error::Configuration(format!("Failed to load infrastructure config: {}", e)))?;
236
237        Ok(config.redis.url)
238    }
239}
240
241// ============================================================================
242// Health Monitor (Centralized)
243// ============================================================================
244
245/// Health report stored in Redis
246///
247/// Contains the node's health status along with priority information
248/// for dashboard display and monitoring timeout calculations.
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct HealthReport {
251    /// Node identifier
252    pub node_id: String,
253    /// Health status at the time of report
254    pub status: HealthStatus,
255    /// Node priority level (affects monitoring timeout)
256    #[serde(default)]
257    pub priority: HealthPriority,
258    /// Timestamp when the report was created (microseconds since epoch)
259    pub reported_at: u64,
260}
261
262// Implement Message trait so HealthReport can be published via pub/sub
263impl Message for HealthReport {}
264
265/// System-wide health summary
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct SystemHealth {
268    /// Overall system health level
269    pub level: HealthLevel,
270
271    /// Total number of nodes
272    pub total_nodes: usize,
273
274    /// Number of healthy nodes
275    pub healthy_nodes: usize,
276
277    /// Number of degraded nodes
278    pub degraded_nodes: usize,
279
280    /// Number of unhealthy nodes
281    pub unhealthy_nodes: usize,
282
283    /// Number of unresponsive nodes (haven't reported recently)
284    pub unresponsive_nodes: usize,
285
286    /// Timestamp when this summary was generated
287    pub timestamp: u64,
288
289    /// Details by node
290    pub nodes: HashMap<String, NodeHealthInfo>,
291}
292
293/// Health information for a single node
294#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct NodeHealthInfo {
296    /// Node ID
297    pub node_id: String,
298
299    /// Current health status
300    pub status: HealthStatus,
301
302    /// Node priority level
303    #[serde(default)]
304    pub priority: HealthPriority,
305
306    /// When the status was last reported
307    pub reported_at: u64,
308
309    /// How long ago the status was reported (microseconds)
310    pub age_micros: u64,
311
312    /// Whether the node is responsive (reported recently)
313    pub responsive: bool,
314}
315
316/// Centralized health monitor
317///
318/// Aggregates health status from all nodes and provides system-wide health information.
319///
320/// # Example
321///
322/// ```rust
323/// use mecha10::health::HealthMonitor;
324///
325/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
326/// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
327///
328/// // Get system-wide health
329/// let health = monitor.system_health().await?;
330/// println!("System health: {:?}", health.level);
331/// println!("Healthy: {}/{}", health.healthy_nodes, health.total_nodes);
332///
333/// // List unhealthy nodes
334/// for node in monitor.unhealthy_nodes().await? {
335///     println!("Unhealthy: {} - {}", node.node_id, node.status.message);
336/// }
337/// # Ok(())
338/// # }
339/// ```
340pub struct HealthMonitor {
341    redis_client: redis::Client,
342    cache: Arc<RwLock<Option<(SystemHealth, std::time::Instant)>>>,
343    cache_ttl: Duration,
344}
345
346impl HealthMonitor {
347    /// Connect to Redis for health monitoring
348    pub async fn connect(redis_url: &str) -> Result<Self> {
349        let redis_client = redis::Client::open(redis_url)
350            .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
351
352        Ok(Self {
353            redis_client,
354            cache: Arc::new(RwLock::new(None)),
355            cache_ttl: Duration::from_secs(5), // Cache for 5 seconds
356        })
357    }
358
359    /// Get system-wide health summary
360    pub async fn system_health(&self) -> Result<SystemHealth> {
361        // Check cache first
362        {
363            let cache = self.cache.read().await;
364            if let Some((health, cached_at)) = cache.as_ref() {
365                if cached_at.elapsed() < self.cache_ttl {
366                    return Ok(health.clone());
367                }
368            }
369        }
370
371        // Fetch fresh data
372        let health = self.fetch_system_health().await?;
373
374        // Update cache
375        {
376            let mut cache = self.cache.write().await;
377            *cache = Some((health.clone(), std::time::Instant::now()));
378        }
379
380        Ok(health)
381    }
382
383    async fn fetch_system_health(&self) -> Result<SystemHealth> {
384        use redis::AsyncCommands;
385
386        let mut conn = self
387            .redis_client
388            .get_multiplexed_async_connection()
389            .await
390            .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
391
392        // Get all health keys (filter out history keys)
393        let pattern = "mecha10:health:*";
394        let keys: Vec<String> = conn
395            .keys(pattern)
396            .await
397            .map_err(|e| Mecha10Error::Other(format!("Failed to list health keys: {}", e)))?;
398
399        // Filter out history keys
400        let keys: Vec<_> = keys.into_iter().filter(|k| !k.contains(":history:")).collect();
401
402        let mut nodes = HashMap::new();
403        let mut healthy_count = 0;
404        let mut degraded_count = 0;
405        let mut unhealthy_count = 0;
406        let mut unresponsive_count = 0;
407
408        let now = crate::prelude::now_micros();
409
410        for key in keys {
411            let value: Option<String> = conn
412                .get(&key)
413                .await
414                .map_err(|e| Mecha10Error::Other(format!("Failed to get health value: {}", e)))?;
415
416            if let Some(value) = value {
417                if let Ok(report) = serde_json::from_str::<HealthReport>(&value) {
418                    let age_micros = now.saturating_sub(report.reported_at);
419                    // Use priority-based timeout threshold
420                    let timeout_micros = report.priority.timeout_micros();
421                    let responsive = age_micros < timeout_micros;
422
423                    if !responsive {
424                        unresponsive_count += 1;
425                    } else {
426                        match report.status.level {
427                            HealthLevel::Ok => healthy_count += 1,
428                            HealthLevel::Degraded => degraded_count += 1,
429                            HealthLevel::Error => unhealthy_count += 1,
430                            HealthLevel::Unknown => unresponsive_count += 1,
431                        }
432                    }
433
434                    nodes.insert(
435                        report.node_id.clone(),
436                        NodeHealthInfo {
437                            node_id: report.node_id,
438                            status: report.status,
439                            priority: report.priority,
440                            reported_at: report.reported_at,
441                            age_micros,
442                            responsive,
443                        },
444                    );
445                }
446            }
447        }
448
449        // Determine overall system health
450        let level = if unhealthy_count > 0 || unresponsive_count > nodes.len() / 2 {
451            HealthLevel::Error
452        } else if degraded_count > 0 || unresponsive_count > 0 {
453            HealthLevel::Degraded
454        } else if healthy_count > 0 {
455            HealthLevel::Ok
456        } else {
457            HealthLevel::Unknown
458        };
459
460        Ok(SystemHealth {
461            level,
462            total_nodes: nodes.len(),
463            healthy_nodes: healthy_count,
464            degraded_nodes: degraded_count,
465            unhealthy_nodes: unhealthy_count,
466            unresponsive_nodes: unresponsive_count,
467            timestamp: now,
468            nodes,
469        })
470    }
471
472    /// Get health status for a specific node
473    pub async fn node_health(&self, node_id: &str) -> Result<Option<NodeHealthInfo>> {
474        let system_health = self.system_health().await?;
475        Ok(system_health.nodes.get(node_id).cloned())
476    }
477
478    /// Get list of all healthy nodes
479    pub async fn healthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
480        let system_health = self.system_health().await?;
481        Ok(system_health
482            .nodes
483            .values()
484            .filter(|n| n.responsive && n.status.level == HealthLevel::Ok)
485            .cloned()
486            .collect())
487    }
488
489    /// Get list of degraded nodes
490    pub async fn degraded_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
491        let system_health = self.system_health().await?;
492        Ok(system_health
493            .nodes
494            .values()
495            .filter(|n| n.responsive && n.status.level == HealthLevel::Degraded)
496            .cloned()
497            .collect())
498    }
499
500    /// Get list of unhealthy nodes
501    pub async fn unhealthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
502        let system_health = self.system_health().await?;
503        Ok(system_health
504            .nodes
505            .values()
506            .filter(|n| n.responsive && n.status.level == HealthLevel::Error)
507            .cloned()
508            .collect())
509    }
510
511    /// Get list of unresponsive nodes (haven't reported recently)
512    pub async fn unresponsive_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
513        let system_health = self.system_health().await?;
514        Ok(system_health
515            .nodes
516            .values()
517            .filter(|n| !n.responsive)
518            .cloned()
519            .collect())
520    }
521
522    /// Clear the cache (force fresh data on next query)
523    pub async fn clear_cache(&self) {
524        let mut cache = self.cache.write().await;
525        *cache = None;
526    }
527
528    /// Get health history for a node
529    ///
530    /// Returns health reports for the specified node over the given duration.
531    /// History is stored for up to 24 hours.
532    ///
533    /// # Arguments
534    ///
535    /// * `node_id` - The node to query history for
536    /// * `duration` - How far back to retrieve history (e.g., Duration::from_secs(3600) for 1 hour)
537    ///
538    /// # Example
539    ///
540    /// ```rust,no_run
541    /// # use mecha10::prelude::*;
542    /// # use mecha10::health::HealthMonitor;
543    /// # use std::time::Duration;
544    /// # async fn example() -> Result<()> {
545    /// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
546    ///
547    /// // Get last hour of health history
548    /// let history = monitor.node_health_history("camera_node", Duration::from_secs(3600)).await?;
549    /// println!("Found {} health reports in the last hour", history.len());
550    /// # Ok(())
551    /// # }
552    /// ```
553    pub async fn node_health_history(&self, node_id: &str, duration: Duration) -> Result<Vec<HealthReport>> {
554        use redis::AsyncCommands;
555
556        let mut conn = self
557            .redis_client
558            .get_multiplexed_async_connection()
559            .await
560            .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
561
562        let history_key = format!("mecha10:health:history:{}", node_id);
563
564        // Calculate time range (in microseconds)
565        let now = crate::prelude::now_micros();
566        let duration_micros = duration.as_micros() as u64;
567        let start_time = now.saturating_sub(duration_micros);
568
569        // Query sorted set by score (timestamp)
570        let results: Vec<String> = conn
571            .zrangebyscore(&history_key, start_time as f64, now as f64)
572            .await
573            .map_err(|e| Mecha10Error::Other(format!("Failed to retrieve health history: {}", e)))?;
574
575        // Deserialize results
576        let mut reports = Vec::new();
577        for json_str in results {
578            match serde_json::from_str::<HealthReport>(&json_str) {
579                Ok(report) => reports.push(report),
580                Err(e) => {
581                    eprintln!("Failed to deserialize health report: {}", e);
582                    continue;
583                }
584            }
585        }
586
587        Ok(reports)
588    }
589}
590
591// ============================================================================
592// Auto-Reporting Extension
593// ============================================================================
594
595/// Extension trait for Context to enable automatic health reporting
596#[async_trait::async_trait]
597pub trait HealthReportingExt {
598    /// Start automatic health reporting
599    ///
600    /// Spawns a background task that periodically reports health status.
601    /// The health_check_fn is called to generate the current health status.
602    ///
603    /// # Example
604    ///
605    /// ```rust
606    /// use mecha10::prelude::*;
607    /// use mecha10::health::HealthReportingExt;
608    ///
609    /// # async fn example(ctx: &Context) -> Result<()> {
610    /// // Report healthy status every second
611    /// ctx.start_health_reporting(
612    ///     Duration::from_secs(1),
613    ///     || async { HealthStatus::healthy() }
614    /// ).await?;
615    /// # Ok(())
616    /// # }
617    /// ```
618    async fn start_health_reporting<F, Fut>(&self, interval: Duration, health_check_fn: F) -> Result<()>
619    where
620        F: Fn() -> Fut + Send + Sync + 'static,
621        Fut: std::future::Future<Output = HealthStatus> + Send;
622}
623
624#[async_trait::async_trait]
625impl HealthReportingExt for Context {
626    async fn start_health_reporting<F, Fut>(&self, interval: Duration, health_check_fn: F) -> Result<()>
627    where
628        F: Fn() -> Fut + Send + Sync + 'static,
629        Fut: std::future::Future<Output = HealthStatus> + Send,
630    {
631        let reporter = HealthReporter::new(self).await?;
632
633        tokio::spawn(async move {
634            let mut ticker = tokio::time::interval(interval);
635
636            loop {
637                ticker.tick().await;
638
639                let status = health_check_fn().await;
640
641                if let Err(e) = reporter.report(status).await {
642                    tracing::error!("Failed to report health: {}", e);
643                }
644            }
645        });
646
647        Ok(())
648    }
649}