mecha10_core/
health.rs

1//! Centralized Health Monitoring
2//!
3//! Provides centralized health monitoring and aggregation for all nodes in a Mecha10 system.
4//!
5//! # Features
6//!
7//! - Automatic health reporting from nodes
8//! - Centralized health aggregation in Redis
9//! - System-wide health status
10//! - Health history storage (24 hours, using Redis sorted sets)
11//! - Health history queries by time range
12//! - Alerting on health degradation
13//! - Dashboard/CLI integration
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────┐     Health      ┌───────────────┐
19//! │  Node A │────Status──────▶│               │
20//! └─────────┘                 │               │
21//!                             │  Health       │     ┌──────────┐
22//! ┌─────────┐     Health      │  Monitor      │────▶│Dashboard │
23//! │  Node B │────Status──────▶│  (Redis)      │     └──────────┘
24//! └─────────┘                 │               │
25//!                             │               │     ┌──────────┐
26//! ┌─────────┐     Health      │               │────▶│Alerting  │
27//! │  Node C │────Status──────▶│               │     └──────────┘
28//! └─────────┘                 └───────────────┘
29//! ```
30//!
31//! # Example
32//!
33//! ```rust
34//! use mecha10::prelude::*;
35//! use mecha10::health::{HealthMonitor, HealthReporter};
36//!
37//! # async fn example() -> Result<()> {
38//! // In your node
39//! let ctx = Context::new("my-node").await?;
40//! let reporter = HealthReporter::new(&ctx).await?;
41//!
42//! // Report health periodically
43//! let mut interval = interval_at_hz(1.0); // 1 Hz
44//! loop {
45//!     interval.tick().await;
46//!
47//!     let status = HealthStatus::healthy()
48//!         .with_diagnostic("cpu_usage", "25%")
49//!         .with_diagnostic("memory_mb", "128");
50//!
51//!     reporter.report(status).await?;
52//! }
53//!
54//! // Monitor system health
55//! let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
56//!
57//! // Get overall system health
58//! let system_health = monitor.system_health().await?;
59//! println!("System: {:?}", system_health.level);
60//!
61//! // Get health by node
62//! let node_health = monitor.node_health("my-node").await?;
63//!
64//! // List unhealthy nodes
65//! let unhealthy = monitor.unhealthy_nodes().await?;
66//! # Ok(())
67//! # }
68//! ```
69
70use crate::context::Context;
71use crate::error::{Mecha10Error, Result};
72use crate::messages::{HealthLevel, HealthStatus, Message};
73use crate::node::HealthPriority;
74use serde::{Deserialize, Serialize};
75use std::collections::HashMap;
76use std::sync::Arc;
77use std::time::Duration;
78use tokio::sync::RwLock;
79
80// ============================================================================
81// Health Reporter (Node Side)
82// ============================================================================
83
84/// Health reporter for nodes to publish their health status
85///
86/// Each node should create a HealthReporter and use it to periodically
87/// report health status to the centralized monitoring system.
88///
89/// # Example
90///
91/// ```rust
92/// use mecha10::prelude::*;
93/// use mecha10::health::HealthReporter;
94///
95/// # async fn example(ctx: &Context) -> Result<()> {
96/// let reporter = HealthReporter::new(ctx).await?;
97///
98/// // Report healthy status
99/// reporter.report(HealthStatus::healthy()).await?;
100///
101/// // Report degraded status
102/// reporter.report(
103///     HealthStatus::degraded("High CPU usage")
104///         .with_diagnostic("cpu", "95%")
105/// ).await?;
106/// # Ok(())
107/// # }
108/// ```
109pub struct HealthReporter {
110    node_id: String,
111    redis_client: redis::Client,
112    ctx: Arc<Context>,
113    /// Health priority level
114    priority: HealthPriority,
115    /// TTL for Redis health key in seconds
116    ttl_seconds: u64,
117    /// Whether to store health history
118    store_history: bool,
119}
120
121impl HealthReporter {
122    /// Create a new health reporter with default settings (Normal priority)
123    pub async fn new(ctx: &Context) -> Result<Self> {
124        Self::with_config(ctx, HealthPriority::Normal, 60, true).await
125    }
126
127    /// Create a new health reporter with custom configuration
128    ///
129    /// # Arguments
130    ///
131    /// * `ctx` - Node context
132    /// * `priority` - Health priority level (affects timeout calculations)
133    /// * `ttl_seconds` - TTL for the Redis health key
134    /// * `store_history` - Whether to store health history in sorted sets
135    pub async fn with_config(
136        ctx: &Context,
137        priority: HealthPriority,
138        ttl_seconds: u64,
139        store_history: bool,
140    ) -> Result<Self> {
141        let redis_url = Self::get_redis_url()?;
142        let redis_client = redis::Client::open(redis_url.as_str())
143            .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis for health reporting: {}", e)))?;
144
145        Ok(Self {
146            node_id: ctx.node_id().to_string(),
147            redis_client,
148            ctx: Arc::new(ctx.clone()),
149            priority,
150            ttl_seconds,
151            store_history,
152        })
153    }
154
155    /// Report health status
156    ///
157    /// Stores the health status in Redis with a TTL. If the node stops reporting,
158    /// the health status will expire and the node will be marked as unavailable.
159    pub async fn report(&self, status: HealthStatus) -> Result<()> {
160        use redis::AsyncCommands;
161
162        let mut conn = self
163            .redis_client
164            .get_multiplexed_async_connection()
165            .await
166            .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
167
168        // Create health report with priority
169        let report = HealthReport {
170            node_id: self.node_id.clone(),
171            status,
172            priority: self.priority,
173            reported_at: crate::prelude::now_micros(),
174        };
175
176        // Store in Redis with configured TTL
177        let key = format!("mecha10:health:{}", self.node_id);
178        let value = serde_json::to_string(&report)
179            .map_err(|e| Mecha10Error::Other(format!("Failed to serialize health report: {}", e)))?;
180
181        conn.set_ex::<_, _, ()>(&key, value.clone(), self.ttl_seconds)
182            .await
183            .map_err(|e| Mecha10Error::Other(format!("Failed to store health status: {}", e)))?;
184
185        // Store in health history if enabled
186        if self.store_history {
187            let history_key = format!("mecha10:health:history:{}", self.node_id);
188            let score = report.reported_at as f64; // Use timestamp as score for sorting
189            conn.zadd::<_, _, _, ()>(&history_key, &value, score)
190                .await
191                .map_err(|e| Mecha10Error::Other(format!("Failed to store health history: {}", e)))?;
192
193            // Keep only last 24 hours of history (86400 seconds)
194            let cutoff_time = (report.reported_at - 86_400_000_000) as f64; // 24 hours ago in microseconds
195            conn.zrembyscore::<_, _, _, ()>(&history_key, f64::MIN, cutoff_time)
196                .await
197                .map_err(|e| Mecha10Error::Other(format!("Failed to clean old health history: {}", e)))?;
198        }
199
200        // Publish to health topic for real-time monitoring (includes priority)
201        self.ctx.publish_raw("/system/health", &report).await?;
202
203        Ok(())
204    }
205
206    /// Report healthy status
207    pub async fn report_healthy(&self) -> Result<()> {
208        self.report(HealthStatus::healthy()).await
209    }
210
211    /// Report degraded status
212    pub async fn report_degraded(&self, message: impl Into<String>) -> Result<()> {
213        self.report(HealthStatus::degraded(message)).await
214    }
215
216    /// Report unhealthy status
217    pub async fn report_unhealthy(&self, message: impl Into<String>) -> Result<()> {
218        self.report(HealthStatus::unhealthy(message)).await
219    }
220
221    fn get_redis_url() -> Result<String> {
222        // Use Context's get_redis_url() for consistency - it checks env vars and config properly
223        let url = Context::get_redis_url()?;
224        tracing::debug!("HealthReporter using Redis URL: {}", url);
225        Ok(url)
226    }
227}
228
229// ============================================================================
230// Health Monitor (Centralized)
231// ============================================================================
232
233/// Health report stored in Redis
234///
235/// Contains the node's health status along with priority information
236/// for dashboard display and monitoring timeout calculations.
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct HealthReport {
239    /// Node identifier
240    pub node_id: String,
241    /// Health status at the time of report
242    pub status: HealthStatus,
243    /// Node priority level (affects monitoring timeout)
244    #[serde(default)]
245    pub priority: HealthPriority,
246    /// Timestamp when the report was created (microseconds since epoch)
247    pub reported_at: u64,
248}
249
250// Implement Message trait so HealthReport can be published via pub/sub
251impl Message for HealthReport {}
252
253/// System-wide health summary
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct SystemHealth {
256    /// Overall system health level
257    pub level: HealthLevel,
258
259    /// Total number of nodes
260    pub total_nodes: usize,
261
262    /// Number of healthy nodes
263    pub healthy_nodes: usize,
264
265    /// Number of degraded nodes
266    pub degraded_nodes: usize,
267
268    /// Number of unhealthy nodes
269    pub unhealthy_nodes: usize,
270
271    /// Number of unresponsive nodes (haven't reported recently)
272    pub unresponsive_nodes: usize,
273
274    /// Timestamp when this summary was generated
275    pub timestamp: u64,
276
277    /// Details by node
278    pub nodes: HashMap<String, NodeHealthInfo>,
279}
280
281/// Health information for a single node
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct NodeHealthInfo {
284    /// Node ID
285    pub node_id: String,
286
287    /// Current health status
288    pub status: HealthStatus,
289
290    /// Node priority level
291    #[serde(default)]
292    pub priority: HealthPriority,
293
294    /// When the status was last reported
295    pub reported_at: u64,
296
297    /// How long ago the status was reported (microseconds)
298    pub age_micros: u64,
299
300    /// Whether the node is responsive (reported recently)
301    pub responsive: bool,
302}
303
304/// Centralized health monitor
305///
306/// Aggregates health status from all nodes and provides system-wide health information.
307///
308/// # Example
309///
310/// ```rust
311/// use mecha10::health::HealthMonitor;
312///
313/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
314/// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
315///
316/// // Get system-wide health
317/// let health = monitor.system_health().await?;
318/// println!("System health: {:?}", health.level);
319/// println!("Healthy: {}/{}", health.healthy_nodes, health.total_nodes);
320///
321/// // List unhealthy nodes
322/// for node in monitor.unhealthy_nodes().await? {
323///     println!("Unhealthy: {} - {}", node.node_id, node.status.message);
324/// }
325/// # Ok(())
326/// # }
327/// ```
328pub struct HealthMonitor {
329    redis_client: redis::Client,
330    cache: Arc<RwLock<Option<(SystemHealth, std::time::Instant)>>>,
331    cache_ttl: Duration,
332}
333
334impl HealthMonitor {
335    /// Connect to Redis for health monitoring
336    pub async fn connect(redis_url: &str) -> Result<Self> {
337        let redis_client = redis::Client::open(redis_url)
338            .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
339
340        Ok(Self {
341            redis_client,
342            cache: Arc::new(RwLock::new(None)),
343            cache_ttl: Duration::from_secs(5), // Cache for 5 seconds
344        })
345    }
346
347    /// Get system-wide health summary
348    pub async fn system_health(&self) -> Result<SystemHealth> {
349        // Check cache first
350        {
351            let cache = self.cache.read().await;
352            if let Some((health, cached_at)) = cache.as_ref() {
353                if cached_at.elapsed() < self.cache_ttl {
354                    return Ok(health.clone());
355                }
356            }
357        }
358
359        // Fetch fresh data
360        let health = self.fetch_system_health().await?;
361
362        // Update cache
363        {
364            let mut cache = self.cache.write().await;
365            *cache = Some((health.clone(), std::time::Instant::now()));
366        }
367
368        Ok(health)
369    }
370
371    async fn fetch_system_health(&self) -> Result<SystemHealth> {
372        use redis::AsyncCommands;
373
374        let mut conn = self
375            .redis_client
376            .get_multiplexed_async_connection()
377            .await
378            .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
379
380        // Get all health keys (filter out history keys)
381        let pattern = "mecha10:health:*";
382        let keys: Vec<String> = conn
383            .keys(pattern)
384            .await
385            .map_err(|e| Mecha10Error::Other(format!("Failed to list health keys: {}", e)))?;
386
387        // Filter out history keys
388        let keys: Vec<_> = keys.into_iter().filter(|k| !k.contains(":history:")).collect();
389
390        let mut nodes = HashMap::new();
391        let mut healthy_count = 0;
392        let mut degraded_count = 0;
393        let mut unhealthy_count = 0;
394        let mut unresponsive_count = 0;
395
396        let now = crate::prelude::now_micros();
397
398        for key in keys {
399            let value: Option<String> = conn
400                .get(&key)
401                .await
402                .map_err(|e| Mecha10Error::Other(format!("Failed to get health value: {}", e)))?;
403
404            if let Some(value) = value {
405                if let Ok(report) = serde_json::from_str::<HealthReport>(&value) {
406                    let age_micros = now.saturating_sub(report.reported_at);
407                    // Use priority-based timeout threshold
408                    let timeout_micros = report.priority.timeout_micros();
409                    let responsive = age_micros < timeout_micros;
410
411                    if !responsive {
412                        unresponsive_count += 1;
413                    } else {
414                        match report.status.level {
415                            HealthLevel::Ok => healthy_count += 1,
416                            HealthLevel::Degraded => degraded_count += 1,
417                            HealthLevel::Error => unhealthy_count += 1,
418                            HealthLevel::Unknown => unresponsive_count += 1,
419                        }
420                    }
421
422                    nodes.insert(
423                        report.node_id.clone(),
424                        NodeHealthInfo {
425                            node_id: report.node_id,
426                            status: report.status,
427                            priority: report.priority,
428                            reported_at: report.reported_at,
429                            age_micros,
430                            responsive,
431                        },
432                    );
433                }
434            }
435        }
436
437        // Determine overall system health
438        let level = if unhealthy_count > 0 || unresponsive_count > nodes.len() / 2 {
439            HealthLevel::Error
440        } else if degraded_count > 0 || unresponsive_count > 0 {
441            HealthLevel::Degraded
442        } else if healthy_count > 0 {
443            HealthLevel::Ok
444        } else {
445            HealthLevel::Unknown
446        };
447
448        Ok(SystemHealth {
449            level,
450            total_nodes: nodes.len(),
451            healthy_nodes: healthy_count,
452            degraded_nodes: degraded_count,
453            unhealthy_nodes: unhealthy_count,
454            unresponsive_nodes: unresponsive_count,
455            timestamp: now,
456            nodes,
457        })
458    }
459
460    /// Get health status for a specific node
461    pub async fn node_health(&self, node_id: &str) -> Result<Option<NodeHealthInfo>> {
462        let system_health = self.system_health().await?;
463        Ok(system_health.nodes.get(node_id).cloned())
464    }
465
466    /// Get list of all healthy nodes
467    pub async fn healthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
468        let system_health = self.system_health().await?;
469        Ok(system_health
470            .nodes
471            .values()
472            .filter(|n| n.responsive && n.status.level == HealthLevel::Ok)
473            .cloned()
474            .collect())
475    }
476
477    /// Get list of degraded nodes
478    pub async fn degraded_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
479        let system_health = self.system_health().await?;
480        Ok(system_health
481            .nodes
482            .values()
483            .filter(|n| n.responsive && n.status.level == HealthLevel::Degraded)
484            .cloned()
485            .collect())
486    }
487
488    /// Get list of unhealthy nodes
489    pub async fn unhealthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
490        let system_health = self.system_health().await?;
491        Ok(system_health
492            .nodes
493            .values()
494            .filter(|n| n.responsive && n.status.level == HealthLevel::Error)
495            .cloned()
496            .collect())
497    }
498
499    /// Get list of unresponsive nodes (haven't reported recently)
500    pub async fn unresponsive_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
501        let system_health = self.system_health().await?;
502        Ok(system_health
503            .nodes
504            .values()
505            .filter(|n| !n.responsive)
506            .cloned()
507            .collect())
508    }
509
510    /// Clear the cache (force fresh data on next query)
511    pub async fn clear_cache(&self) {
512        let mut cache = self.cache.write().await;
513        *cache = None;
514    }
515
516    /// Get health history for a node
517    ///
518    /// Returns health reports for the specified node over the given duration.
519    /// History is stored for up to 24 hours.
520    ///
521    /// # Arguments
522    ///
523    /// * `node_id` - The node to query history for
524    /// * `duration` - How far back to retrieve history (e.g., Duration::from_secs(3600) for 1 hour)
525    ///
526    /// # Example
527    ///
528    /// ```rust,no_run
529    /// # use mecha10::prelude::*;
530    /// # use mecha10::health::HealthMonitor;
531    /// # use std::time::Duration;
532    /// # async fn example() -> Result<()> {
533    /// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
534    ///
535    /// // Get last hour of health history
536    /// let history = monitor.node_health_history("camera_node", Duration::from_secs(3600)).await?;
537    /// println!("Found {} health reports in the last hour", history.len());
538    /// # Ok(())
539    /// # }
540    /// ```
541    pub async fn node_health_history(&self, node_id: &str, duration: Duration) -> Result<Vec<HealthReport>> {
542        use redis::AsyncCommands;
543
544        let mut conn = self
545            .redis_client
546            .get_multiplexed_async_connection()
547            .await
548            .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
549
550        let history_key = format!("mecha10:health:history:{}", node_id);
551
552        // Calculate time range (in microseconds)
553        let now = crate::prelude::now_micros();
554        let duration_micros = duration.as_micros() as u64;
555        let start_time = now.saturating_sub(duration_micros);
556
557        // Query sorted set by score (timestamp)
558        let results: Vec<String> = conn
559            .zrangebyscore(&history_key, start_time as f64, now as f64)
560            .await
561            .map_err(|e| Mecha10Error::Other(format!("Failed to retrieve health history: {}", e)))?;
562
563        // Deserialize results
564        let mut reports = Vec::new();
565        for json_str in results {
566            match serde_json::from_str::<HealthReport>(&json_str) {
567                Ok(report) => reports.push(report),
568                Err(e) => {
569                    eprintln!("Failed to deserialize health report: {}", e);
570                    continue;
571                }
572            }
573        }
574
575        Ok(reports)
576    }
577}
578
579// ============================================================================
580// Auto-Reporting Extension
581// ============================================================================
582
583/// Extension trait for Context to enable automatic health reporting
584#[async_trait::async_trait]
585pub trait HealthReportingExt {
586    /// Start automatic health reporting with default settings (Normal priority, 5s interval)
587    ///
588    /// Spawns a background task that periodically reports health status.
589    /// The health_check_fn is called to generate the current health status.
590    ///
591    /// # Example
592    ///
593    /// ```rust
594    /// use mecha10::prelude::*;
595    /// use mecha10::health::HealthReportingExt;
596    ///
597    /// # async fn example(ctx: &Context) -> Result<()> {
598    /// // Report healthy status every 5 seconds (default)
599    /// ctx.start_health_reporting(|| async { HealthStatus::healthy() }).await?;
600    /// # Ok(())
601    /// # }
602    /// ```
603    async fn start_health_reporting<F, Fut>(&self, health_check_fn: F) -> Result<()>
604    where
605        F: Fn() -> Fut + Send + Sync + 'static,
606        Fut: std::future::Future<Output = HealthStatus> + Send;
607
608    /// Start automatic health reporting with custom configuration
609    ///
610    /// # Example
611    ///
612    /// ```rust
613    /// use mecha10::prelude::*;
614    /// use mecha10::health::HealthReportingExt;
615    /// use mecha10::node::HealthPriority;
616    ///
617    /// # async fn example(ctx: &Context) -> Result<()> {
618    /// // Report every second with Critical priority
619    /// ctx.start_health_reporting_with_config(
620    ///     HealthPriority::Critical,
621    ///     Duration::from_secs(1),
622    ///     || async { HealthStatus::healthy() }
623    /// ).await?;
624    /// # Ok(())
625    /// # }
626    /// ```
627    async fn start_health_reporting_with_config<F, Fut>(
628        &self,
629        priority: HealthPriority,
630        interval: Duration,
631        health_check_fn: F,
632    ) -> Result<()>
633    where
634        F: Fn() -> Fut + Send + Sync + 'static,
635        Fut: std::future::Future<Output = HealthStatus> + Send;
636}
637
638#[async_trait::async_trait]
639impl HealthReportingExt for Context {
640    async fn start_health_reporting<F, Fut>(&self, health_check_fn: F) -> Result<()>
641    where
642        F: Fn() -> Fut + Send + Sync + 'static,
643        Fut: std::future::Future<Output = HealthStatus> + Send,
644    {
645        self.start_health_reporting_with_config(HealthPriority::Normal, Duration::from_secs(5), health_check_fn)
646            .await
647    }
648
649    async fn start_health_reporting_with_config<F, Fut>(
650        &self,
651        priority: HealthPriority,
652        interval: Duration,
653        health_check_fn: F,
654    ) -> Result<()>
655    where
656        F: Fn() -> Fut + Send + Sync + 'static,
657        Fut: std::future::Future<Output = HealthStatus> + Send,
658    {
659        let reporter = HealthReporter::with_config(self, priority, 60, true).await?;
660
661        tracing::info!(
662            "Health reporting enabled: priority={:?}, interval={:?}",
663            priority,
664            interval
665        );
666
667        tokio::spawn(async move {
668            let mut ticker = tokio::time::interval(interval);
669
670            loop {
671                ticker.tick().await;
672
673                let status = health_check_fn().await;
674
675                if let Err(e) = reporter.report(status).await {
676                    tracing::debug!("Failed to report health: {}", e);
677                }
678            }
679        });
680
681        Ok(())
682    }
683}