mecha10_core/health.rs
1//! Centralized Health Monitoring
2//!
3//! Provides centralized health monitoring and aggregation for all nodes in a Mecha10 system.
4//!
5//! # Features
6//!
7//! - Automatic health reporting from nodes
8//! - Centralized health aggregation in Redis
9//! - System-wide health status
10//! - Health history storage (24 hours, using Redis sorted sets)
11//! - Health history queries by time range
12//! - Alerting on health degradation
13//! - Dashboard/CLI integration
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────┐ Health ┌───────────────┐
19//! │ Node A │────Status──────▶│ │
20//! └─────────┘ │ │
21//! │ Health │ ┌──────────┐
22//! ┌─────────┐ Health │ Monitor │────▶│Dashboard │
23//! │ Node B │────Status──────▶│ (Redis) │ └──────────┘
24//! └─────────┘ │ │
25//! │ │ ┌──────────┐
26//! ┌─────────┐ Health │ │────▶│Alerting │
27//! │ Node C │────Status──────▶│ │ └──────────┘
28//! └─────────┘ └───────────────┘
29//! ```
30//!
31//! # Example
32//!
33//! ```rust
34//! use mecha10::prelude::*;
35//! use mecha10::health::{HealthMonitor, HealthReporter};
36//!
37//! # async fn example() -> Result<()> {
38//! // In your node
39//! let ctx = Context::new("my-node").await?;
40//! let reporter = HealthReporter::new(&ctx).await?;
41//!
42//! // Report health periodically
43//! let mut interval = interval_at_hz(1.0); // 1 Hz
44//! loop {
45//! interval.tick().await;
46//!
47//! let status = HealthStatus::healthy()
48//! .with_diagnostic("cpu_usage", "25%")
49//! .with_diagnostic("memory_mb", "128");
50//!
51//! reporter.report(status).await?;
52//! }
53//!
54//! // Monitor system health
55//! let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
56//!
57//! // Get overall system health
58//! let system_health = monitor.system_health().await?;
59//! println!("System: {:?}", system_health.level);
60//!
61//! // Get health by node
62//! let node_health = monitor.node_health("my-node").await?;
63//!
64//! // List unhealthy nodes
65//! let unhealthy = monitor.unhealthy_nodes().await?;
66//! # Ok(())
67//! # }
68//! ```
69
70use crate::context::Context;
71use crate::error::{Mecha10Error, Result};
72use crate::messages::{HealthLevel, HealthStatus, Message};
73use crate::node::HealthPriority;
74use serde::{Deserialize, Serialize};
75use std::collections::HashMap;
76use std::sync::Arc;
77use std::time::Duration;
78use tokio::sync::RwLock;
79
80// ============================================================================
81// Health Reporter (Node Side)
82// ============================================================================
83
84/// Health reporter for nodes to publish their health status
85///
86/// Each node should create a HealthReporter and use it to periodically
87/// report health status to the centralized monitoring system.
88///
89/// # Example
90///
91/// ```rust
92/// use mecha10::prelude::*;
93/// use mecha10::health::HealthReporter;
94///
95/// # async fn example(ctx: &Context) -> Result<()> {
96/// let reporter = HealthReporter::new(ctx).await?;
97///
98/// // Report healthy status
99/// reporter.report(HealthStatus::healthy()).await?;
100///
101/// // Report degraded status
102/// reporter.report(
103/// HealthStatus::degraded("High CPU usage")
104/// .with_diagnostic("cpu", "95%")
105/// ).await?;
106/// # Ok(())
107/// # }
108/// ```
109pub struct HealthReporter {
110 node_id: String,
111 redis_client: redis::Client,
112 ctx: Arc<Context>,
113 /// Health priority level
114 priority: HealthPriority,
115 /// TTL for Redis health key in seconds
116 ttl_seconds: u64,
117 /// Whether to store health history
118 store_history: bool,
119}
120
121impl HealthReporter {
122 /// Create a new health reporter with default settings (Normal priority)
123 pub async fn new(ctx: &Context) -> Result<Self> {
124 Self::with_config(ctx, HealthPriority::Normal, 60, true).await
125 }
126
127 /// Create a new health reporter with custom configuration
128 ///
129 /// # Arguments
130 ///
131 /// * `ctx` - Node context
132 /// * `priority` - Health priority level (affects timeout calculations)
133 /// * `ttl_seconds` - TTL for the Redis health key
134 /// * `store_history` - Whether to store health history in sorted sets
135 pub async fn with_config(
136 ctx: &Context,
137 priority: HealthPriority,
138 ttl_seconds: u64,
139 store_history: bool,
140 ) -> Result<Self> {
141 let redis_url = Self::get_redis_url()?;
142 let redis_client = redis::Client::open(redis_url.as_str())
143 .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis for health reporting: {}", e)))?;
144
145 Ok(Self {
146 node_id: ctx.node_id().to_string(),
147 redis_client,
148 ctx: Arc::new(ctx.clone()),
149 priority,
150 ttl_seconds,
151 store_history,
152 })
153 }
154
155 /// Report health status
156 ///
157 /// Stores the health status in Redis with a TTL. If the node stops reporting,
158 /// the health status will expire and the node will be marked as unavailable.
159 pub async fn report(&self, status: HealthStatus) -> Result<()> {
160 use redis::AsyncCommands;
161
162 let mut conn = self
163 .redis_client
164 .get_multiplexed_async_connection()
165 .await
166 .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
167
168 // Create health report with priority
169 let report = HealthReport {
170 node_id: self.node_id.clone(),
171 status,
172 priority: self.priority,
173 reported_at: crate::prelude::now_micros(),
174 };
175
176 // Store in Redis with configured TTL
177 let key = format!("mecha10:health:{}", self.node_id);
178 let value = serde_json::to_string(&report)
179 .map_err(|e| Mecha10Error::Other(format!("Failed to serialize health report: {}", e)))?;
180
181 conn.set_ex::<_, _, ()>(&key, value.clone(), self.ttl_seconds)
182 .await
183 .map_err(|e| Mecha10Error::Other(format!("Failed to store health status: {}", e)))?;
184
185 // Store in health history if enabled
186 if self.store_history {
187 let history_key = format!("mecha10:health:history:{}", self.node_id);
188 let score = report.reported_at as f64; // Use timestamp as score for sorting
189 conn.zadd::<_, _, _, ()>(&history_key, &value, score)
190 .await
191 .map_err(|e| Mecha10Error::Other(format!("Failed to store health history: {}", e)))?;
192
193 // Keep only last 24 hours of history (86400 seconds)
194 let cutoff_time = (report.reported_at - 86_400_000_000) as f64; // 24 hours ago in microseconds
195 conn.zrembyscore::<_, _, _, ()>(&history_key, f64::MIN, cutoff_time)
196 .await
197 .map_err(|e| Mecha10Error::Other(format!("Failed to clean old health history: {}", e)))?;
198 }
199
200 // Publish to health topic for real-time monitoring (includes priority)
201 self.ctx.publish_raw("/system/health", &report).await?;
202
203 Ok(())
204 }
205
206 /// Report healthy status
207 pub async fn report_healthy(&self) -> Result<()> {
208 self.report(HealthStatus::healthy()).await
209 }
210
211 /// Report degraded status
212 pub async fn report_degraded(&self, message: impl Into<String>) -> Result<()> {
213 self.report(HealthStatus::degraded(message)).await
214 }
215
216 /// Report unhealthy status
217 pub async fn report_unhealthy(&self, message: impl Into<String>) -> Result<()> {
218 self.report(HealthStatus::unhealthy(message)).await
219 }
220
221 fn get_redis_url() -> Result<String> {
222 if let Ok(url) = std::env::var("REDIS_URL") {
223 return Ok(url);
224 }
225
226 // Load from infrastructure config
227 let environment = std::env::var("MECHA10_ENVIRONMENT").unwrap_or_else(|_| "development".to_string());
228
229 let config_path = format!("config/infrastructure.{}.yaml", environment);
230
231 use crate::config::load_config;
232 use crate::context::InfrastructureConfig;
233
234 let config: InfrastructureConfig = load_config(&config_path)
235 .map_err(|e| Mecha10Error::Configuration(format!("Failed to load infrastructure config: {}", e)))?;
236
237 Ok(config.redis.url)
238 }
239}
240
241// ============================================================================
242// Health Monitor (Centralized)
243// ============================================================================
244
245/// Health report stored in Redis
246///
247/// Contains the node's health status along with priority information
248/// for dashboard display and monitoring timeout calculations.
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct HealthReport {
251 /// Node identifier
252 pub node_id: String,
253 /// Health status at the time of report
254 pub status: HealthStatus,
255 /// Node priority level (affects monitoring timeout)
256 #[serde(default)]
257 pub priority: HealthPriority,
258 /// Timestamp when the report was created (microseconds since epoch)
259 pub reported_at: u64,
260}
261
262// Implement Message trait so HealthReport can be published via pub/sub
263impl Message for HealthReport {}
264
265/// System-wide health summary
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct SystemHealth {
268 /// Overall system health level
269 pub level: HealthLevel,
270
271 /// Total number of nodes
272 pub total_nodes: usize,
273
274 /// Number of healthy nodes
275 pub healthy_nodes: usize,
276
277 /// Number of degraded nodes
278 pub degraded_nodes: usize,
279
280 /// Number of unhealthy nodes
281 pub unhealthy_nodes: usize,
282
283 /// Number of unresponsive nodes (haven't reported recently)
284 pub unresponsive_nodes: usize,
285
286 /// Timestamp when this summary was generated
287 pub timestamp: u64,
288
289 /// Details by node
290 pub nodes: HashMap<String, NodeHealthInfo>,
291}
292
293/// Health information for a single node
294#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct NodeHealthInfo {
296 /// Node ID
297 pub node_id: String,
298
299 /// Current health status
300 pub status: HealthStatus,
301
302 /// Node priority level
303 #[serde(default)]
304 pub priority: HealthPriority,
305
306 /// When the status was last reported
307 pub reported_at: u64,
308
309 /// How long ago the status was reported (microseconds)
310 pub age_micros: u64,
311
312 /// Whether the node is responsive (reported recently)
313 pub responsive: bool,
314}
315
316/// Centralized health monitor
317///
318/// Aggregates health status from all nodes and provides system-wide health information.
319///
320/// # Example
321///
322/// ```rust
323/// use mecha10::health::HealthMonitor;
324///
325/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
326/// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
327///
328/// // Get system-wide health
329/// let health = monitor.system_health().await?;
330/// println!("System health: {:?}", health.level);
331/// println!("Healthy: {}/{}", health.healthy_nodes, health.total_nodes);
332///
333/// // List unhealthy nodes
334/// for node in monitor.unhealthy_nodes().await? {
335/// println!("Unhealthy: {} - {}", node.node_id, node.status.message);
336/// }
337/// # Ok(())
338/// # }
339/// ```
340pub struct HealthMonitor {
341 redis_client: redis::Client,
342 cache: Arc<RwLock<Option<(SystemHealth, std::time::Instant)>>>,
343 cache_ttl: Duration,
344}
345
346impl HealthMonitor {
347 /// Connect to Redis for health monitoring
348 pub async fn connect(redis_url: &str) -> Result<Self> {
349 let redis_client = redis::Client::open(redis_url)
350 .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
351
352 Ok(Self {
353 redis_client,
354 cache: Arc::new(RwLock::new(None)),
355 cache_ttl: Duration::from_secs(5), // Cache for 5 seconds
356 })
357 }
358
359 /// Get system-wide health summary
360 pub async fn system_health(&self) -> Result<SystemHealth> {
361 // Check cache first
362 {
363 let cache = self.cache.read().await;
364 if let Some((health, cached_at)) = cache.as_ref() {
365 if cached_at.elapsed() < self.cache_ttl {
366 return Ok(health.clone());
367 }
368 }
369 }
370
371 // Fetch fresh data
372 let health = self.fetch_system_health().await?;
373
374 // Update cache
375 {
376 let mut cache = self.cache.write().await;
377 *cache = Some((health.clone(), std::time::Instant::now()));
378 }
379
380 Ok(health)
381 }
382
383 async fn fetch_system_health(&self) -> Result<SystemHealth> {
384 use redis::AsyncCommands;
385
386 let mut conn = self
387 .redis_client
388 .get_multiplexed_async_connection()
389 .await
390 .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
391
392 // Get all health keys (filter out history keys)
393 let pattern = "mecha10:health:*";
394 let keys: Vec<String> = conn
395 .keys(pattern)
396 .await
397 .map_err(|e| Mecha10Error::Other(format!("Failed to list health keys: {}", e)))?;
398
399 // Filter out history keys
400 let keys: Vec<_> = keys.into_iter().filter(|k| !k.contains(":history:")).collect();
401
402 let mut nodes = HashMap::new();
403 let mut healthy_count = 0;
404 let mut degraded_count = 0;
405 let mut unhealthy_count = 0;
406 let mut unresponsive_count = 0;
407
408 let now = crate::prelude::now_micros();
409
410 for key in keys {
411 let value: Option<String> = conn
412 .get(&key)
413 .await
414 .map_err(|e| Mecha10Error::Other(format!("Failed to get health value: {}", e)))?;
415
416 if let Some(value) = value {
417 if let Ok(report) = serde_json::from_str::<HealthReport>(&value) {
418 let age_micros = now.saturating_sub(report.reported_at);
419 // Use priority-based timeout threshold
420 let timeout_micros = report.priority.timeout_micros();
421 let responsive = age_micros < timeout_micros;
422
423 if !responsive {
424 unresponsive_count += 1;
425 } else {
426 match report.status.level {
427 HealthLevel::Ok => healthy_count += 1,
428 HealthLevel::Degraded => degraded_count += 1,
429 HealthLevel::Error => unhealthy_count += 1,
430 HealthLevel::Unknown => unresponsive_count += 1,
431 }
432 }
433
434 nodes.insert(
435 report.node_id.clone(),
436 NodeHealthInfo {
437 node_id: report.node_id,
438 status: report.status,
439 priority: report.priority,
440 reported_at: report.reported_at,
441 age_micros,
442 responsive,
443 },
444 );
445 }
446 }
447 }
448
449 // Determine overall system health
450 let level = if unhealthy_count > 0 || unresponsive_count > nodes.len() / 2 {
451 HealthLevel::Error
452 } else if degraded_count > 0 || unresponsive_count > 0 {
453 HealthLevel::Degraded
454 } else if healthy_count > 0 {
455 HealthLevel::Ok
456 } else {
457 HealthLevel::Unknown
458 };
459
460 Ok(SystemHealth {
461 level,
462 total_nodes: nodes.len(),
463 healthy_nodes: healthy_count,
464 degraded_nodes: degraded_count,
465 unhealthy_nodes: unhealthy_count,
466 unresponsive_nodes: unresponsive_count,
467 timestamp: now,
468 nodes,
469 })
470 }
471
472 /// Get health status for a specific node
473 pub async fn node_health(&self, node_id: &str) -> Result<Option<NodeHealthInfo>> {
474 let system_health = self.system_health().await?;
475 Ok(system_health.nodes.get(node_id).cloned())
476 }
477
478 /// Get list of all healthy nodes
479 pub async fn healthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
480 let system_health = self.system_health().await?;
481 Ok(system_health
482 .nodes
483 .values()
484 .filter(|n| n.responsive && n.status.level == HealthLevel::Ok)
485 .cloned()
486 .collect())
487 }
488
489 /// Get list of degraded nodes
490 pub async fn degraded_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
491 let system_health = self.system_health().await?;
492 Ok(system_health
493 .nodes
494 .values()
495 .filter(|n| n.responsive && n.status.level == HealthLevel::Degraded)
496 .cloned()
497 .collect())
498 }
499
500 /// Get list of unhealthy nodes
501 pub async fn unhealthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
502 let system_health = self.system_health().await?;
503 Ok(system_health
504 .nodes
505 .values()
506 .filter(|n| n.responsive && n.status.level == HealthLevel::Error)
507 .cloned()
508 .collect())
509 }
510
511 /// Get list of unresponsive nodes (haven't reported recently)
512 pub async fn unresponsive_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
513 let system_health = self.system_health().await?;
514 Ok(system_health
515 .nodes
516 .values()
517 .filter(|n| !n.responsive)
518 .cloned()
519 .collect())
520 }
521
522 /// Clear the cache (force fresh data on next query)
523 pub async fn clear_cache(&self) {
524 let mut cache = self.cache.write().await;
525 *cache = None;
526 }
527
528 /// Get health history for a node
529 ///
530 /// Returns health reports for the specified node over the given duration.
531 /// History is stored for up to 24 hours.
532 ///
533 /// # Arguments
534 ///
535 /// * `node_id` - The node to query history for
536 /// * `duration` - How far back to retrieve history (e.g., Duration::from_secs(3600) for 1 hour)
537 ///
538 /// # Example
539 ///
540 /// ```rust,no_run
541 /// # use mecha10::prelude::*;
542 /// # use mecha10::health::HealthMonitor;
543 /// # use std::time::Duration;
544 /// # async fn example() -> Result<()> {
545 /// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
546 ///
547 /// // Get last hour of health history
548 /// let history = monitor.node_health_history("camera_node", Duration::from_secs(3600)).await?;
549 /// println!("Found {} health reports in the last hour", history.len());
550 /// # Ok(())
551 /// # }
552 /// ```
553 pub async fn node_health_history(&self, node_id: &str, duration: Duration) -> Result<Vec<HealthReport>> {
554 use redis::AsyncCommands;
555
556 let mut conn = self
557 .redis_client
558 .get_multiplexed_async_connection()
559 .await
560 .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
561
562 let history_key = format!("mecha10:health:history:{}", node_id);
563
564 // Calculate time range (in microseconds)
565 let now = crate::prelude::now_micros();
566 let duration_micros = duration.as_micros() as u64;
567 let start_time = now.saturating_sub(duration_micros);
568
569 // Query sorted set by score (timestamp)
570 let results: Vec<String> = conn
571 .zrangebyscore(&history_key, start_time as f64, now as f64)
572 .await
573 .map_err(|e| Mecha10Error::Other(format!("Failed to retrieve health history: {}", e)))?;
574
575 // Deserialize results
576 let mut reports = Vec::new();
577 for json_str in results {
578 match serde_json::from_str::<HealthReport>(&json_str) {
579 Ok(report) => reports.push(report),
580 Err(e) => {
581 eprintln!("Failed to deserialize health report: {}", e);
582 continue;
583 }
584 }
585 }
586
587 Ok(reports)
588 }
589}
590
591// ============================================================================
592// Auto-Reporting Extension
593// ============================================================================
594
595/// Extension trait for Context to enable automatic health reporting
596#[async_trait::async_trait]
597pub trait HealthReportingExt {
598 /// Start automatic health reporting
599 ///
600 /// Spawns a background task that periodically reports health status.
601 /// The health_check_fn is called to generate the current health status.
602 ///
603 /// # Example
604 ///
605 /// ```rust
606 /// use mecha10::prelude::*;
607 /// use mecha10::health::HealthReportingExt;
608 ///
609 /// # async fn example(ctx: &Context) -> Result<()> {
610 /// // Report healthy status every second
611 /// ctx.start_health_reporting(
612 /// Duration::from_secs(1),
613 /// || async { HealthStatus::healthy() }
614 /// ).await?;
615 /// # Ok(())
616 /// # }
617 /// ```
618 async fn start_health_reporting<F, Fut>(&self, interval: Duration, health_check_fn: F) -> Result<()>
619 where
620 F: Fn() -> Fut + Send + Sync + 'static,
621 Fut: std::future::Future<Output = HealthStatus> + Send;
622}
623
624#[async_trait::async_trait]
625impl HealthReportingExt for Context {
626 async fn start_health_reporting<F, Fut>(&self, interval: Duration, health_check_fn: F) -> Result<()>
627 where
628 F: Fn() -> Fut + Send + Sync + 'static,
629 Fut: std::future::Future<Output = HealthStatus> + Send,
630 {
631 let reporter = HealthReporter::new(self).await?;
632
633 tokio::spawn(async move {
634 let mut ticker = tokio::time::interval(interval);
635
636 loop {
637 ticker.tick().await;
638
639 let status = health_check_fn().await;
640
641 if let Err(e) = reporter.report(status).await {
642 tracing::error!("Failed to report health: {}", e);
643 }
644 }
645 });
646
647 Ok(())
648 }
649}