mecha10_core/health.rs
1//! Centralized Health Monitoring
2//!
3//! Provides centralized health monitoring and aggregation for all nodes in a Mecha10 system.
4//!
5//! # Features
6//!
7//! - Automatic health reporting from nodes
8//! - Centralized health aggregation in Redis
9//! - System-wide health status
10//! - Health history storage (24 hours, using Redis sorted sets)
11//! - Health history queries by time range
12//! - Alerting on health degradation
13//! - Dashboard/CLI integration
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────┐ Health ┌───────────────┐
19//! │ Node A │────Status──────▶│ │
20//! └─────────┘ │ │
21//! │ Health │ ┌──────────┐
22//! ┌─────────┐ Health │ Monitor │────▶│Dashboard │
23//! │ Node B │────Status──────▶│ (Redis) │ └──────────┘
24//! └─────────┘ │ │
25//! │ │ ┌──────────┐
26//! ┌─────────┐ Health │ │────▶│Alerting │
27//! │ Node C │────Status──────▶│ │ └──────────┘
28//! └─────────┘ └───────────────┘
29//! ```
30//!
31//! # Example
32//!
33//! ```rust
34//! use mecha10::prelude::*;
35//! use mecha10::health::{HealthMonitor, HealthReporter};
36//!
37//! # async fn example() -> Result<()> {
38//! // In your node
39//! let ctx = Context::new("my-node").await?;
40//! let reporter = HealthReporter::new(&ctx).await?;
41//!
42//! // Report health periodically
43//! let mut interval = interval_at_hz(1.0); // 1 Hz
44//! loop {
45//! interval.tick().await;
46//!
47//! let status = HealthStatus::healthy()
48//! .with_diagnostic("cpu_usage", "25%")
49//! .with_diagnostic("memory_mb", "128");
50//!
51//! reporter.report(status).await?;
52//! }
53//!
54//! // Monitor system health
55//! let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
56//!
57//! // Get overall system health
58//! let system_health = monitor.system_health().await?;
59//! println!("System: {:?}", system_health.level);
60//!
61//! // Get health by node
62//! let node_health = monitor.node_health("my-node").await?;
63//!
64//! // List unhealthy nodes
65//! let unhealthy = monitor.unhealthy_nodes().await?;
66//! # Ok(())
67//! # }
68//! ```
69
70use crate::context::Context;
71use crate::error::{Mecha10Error, Result};
72use crate::messages::{HealthLevel, HealthStatus, Message};
73use crate::node::HealthPriority;
74use serde::{Deserialize, Serialize};
75use std::collections::HashMap;
76use std::sync::Arc;
77use std::time::Duration;
78use tokio::sync::RwLock;
79
80// ============================================================================
81// Health Reporter (Node Side)
82// ============================================================================
83
84/// Health reporter for nodes to publish their health status
85///
86/// Each node should create a HealthReporter and use it to periodically
87/// report health status to the centralized monitoring system.
88///
89/// # Example
90///
91/// ```rust
92/// use mecha10::prelude::*;
93/// use mecha10::health::HealthReporter;
94///
95/// # async fn example(ctx: &Context) -> Result<()> {
96/// let reporter = HealthReporter::new(ctx).await?;
97///
98/// // Report healthy status
99/// reporter.report(HealthStatus::healthy()).await?;
100///
101/// // Report degraded status
102/// reporter.report(
103/// HealthStatus::degraded("High CPU usage")
104/// .with_diagnostic("cpu", "95%")
105/// ).await?;
106/// # Ok(())
107/// # }
108/// ```
109pub struct HealthReporter {
110 node_id: String,
111 redis_client: redis::Client,
112 ctx: Arc<Context>,
113 /// Health priority level
114 priority: HealthPriority,
115 /// TTL for Redis health key in seconds
116 ttl_seconds: u64,
117 /// Whether to store health history
118 store_history: bool,
119}
120
121impl HealthReporter {
122 /// Create a new health reporter with default settings (Normal priority)
123 pub async fn new(ctx: &Context) -> Result<Self> {
124 Self::with_config(ctx, HealthPriority::Normal, 60, true).await
125 }
126
127 /// Create a new health reporter with custom configuration
128 ///
129 /// # Arguments
130 ///
131 /// * `ctx` - Node context
132 /// * `priority` - Health priority level (affects timeout calculations)
133 /// * `ttl_seconds` - TTL for the Redis health key
134 /// * `store_history` - Whether to store health history in sorted sets
135 pub async fn with_config(
136 ctx: &Context,
137 priority: HealthPriority,
138 ttl_seconds: u64,
139 store_history: bool,
140 ) -> Result<Self> {
141 let redis_url = Self::get_redis_url()?;
142 let redis_client = redis::Client::open(redis_url.as_str())
143 .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis for health reporting: {}", e)))?;
144
145 Ok(Self {
146 node_id: ctx.node_id().to_string(),
147 redis_client,
148 ctx: Arc::new(ctx.clone()),
149 priority,
150 ttl_seconds,
151 store_history,
152 })
153 }
154
155 /// Report health status
156 ///
157 /// Stores the health status in Redis with a TTL. If the node stops reporting,
158 /// the health status will expire and the node will be marked as unavailable.
159 pub async fn report(&self, status: HealthStatus) -> Result<()> {
160 use redis::AsyncCommands;
161
162 let mut conn = self
163 .redis_client
164 .get_multiplexed_async_connection()
165 .await
166 .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
167
168 // Create health report with priority
169 let report = HealthReport {
170 node_id: self.node_id.clone(),
171 status,
172 priority: self.priority,
173 reported_at: crate::prelude::now_micros(),
174 };
175
176 // Store in Redis with configured TTL
177 let key = format!("mecha10:health:{}", self.node_id);
178 let value = serde_json::to_string(&report)
179 .map_err(|e| Mecha10Error::Other(format!("Failed to serialize health report: {}", e)))?;
180
181 conn.set_ex::<_, _, ()>(&key, value.clone(), self.ttl_seconds)
182 .await
183 .map_err(|e| Mecha10Error::Other(format!("Failed to store health status: {}", e)))?;
184
185 // Store in health history if enabled
186 if self.store_history {
187 let history_key = format!("mecha10:health:history:{}", self.node_id);
188 let score = report.reported_at as f64; // Use timestamp as score for sorting
189 conn.zadd::<_, _, _, ()>(&history_key, &value, score)
190 .await
191 .map_err(|e| Mecha10Error::Other(format!("Failed to store health history: {}", e)))?;
192
193 // Keep only last 24 hours of history (86400 seconds)
194 let cutoff_time = (report.reported_at - 86_400_000_000) as f64; // 24 hours ago in microseconds
195 conn.zrembyscore::<_, _, _, ()>(&history_key, f64::MIN, cutoff_time)
196 .await
197 .map_err(|e| Mecha10Error::Other(format!("Failed to clean old health history: {}", e)))?;
198 }
199
200 // Publish to health topic for real-time monitoring (includes priority)
201 self.ctx.publish_raw("/system/health", &report).await?;
202
203 Ok(())
204 }
205
206 /// Report healthy status
207 pub async fn report_healthy(&self) -> Result<()> {
208 self.report(HealthStatus::healthy()).await
209 }
210
211 /// Report degraded status
212 pub async fn report_degraded(&self, message: impl Into<String>) -> Result<()> {
213 self.report(HealthStatus::degraded(message)).await
214 }
215
216 /// Report unhealthy status
217 pub async fn report_unhealthy(&self, message: impl Into<String>) -> Result<()> {
218 self.report(HealthStatus::unhealthy(message)).await
219 }
220
221 fn get_redis_url() -> Result<String> {
222 // Use Context's get_redis_url() for consistency - it checks env vars and config properly
223 let url = Context::get_redis_url()?;
224 tracing::debug!("HealthReporter using Redis URL: {}", url);
225 Ok(url)
226 }
227}
228
229// ============================================================================
230// Health Monitor (Centralized)
231// ============================================================================
232
233/// Health report stored in Redis
234///
235/// Contains the node's health status along with priority information
236/// for dashboard display and monitoring timeout calculations.
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct HealthReport {
239 /// Node identifier
240 pub node_id: String,
241 /// Health status at the time of report
242 pub status: HealthStatus,
243 /// Node priority level (affects monitoring timeout)
244 #[serde(default)]
245 pub priority: HealthPriority,
246 /// Timestamp when the report was created (microseconds since epoch)
247 pub reported_at: u64,
248}
249
250// Implement Message trait so HealthReport can be published via pub/sub
251impl Message for HealthReport {}
252
253/// System-wide health summary
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct SystemHealth {
256 /// Overall system health level
257 pub level: HealthLevel,
258
259 /// Total number of nodes
260 pub total_nodes: usize,
261
262 /// Number of healthy nodes
263 pub healthy_nodes: usize,
264
265 /// Number of degraded nodes
266 pub degraded_nodes: usize,
267
268 /// Number of unhealthy nodes
269 pub unhealthy_nodes: usize,
270
271 /// Number of unresponsive nodes (haven't reported recently)
272 pub unresponsive_nodes: usize,
273
274 /// Timestamp when this summary was generated
275 pub timestamp: u64,
276
277 /// Details by node
278 pub nodes: HashMap<String, NodeHealthInfo>,
279}
280
281/// Health information for a single node
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct NodeHealthInfo {
284 /// Node ID
285 pub node_id: String,
286
287 /// Current health status
288 pub status: HealthStatus,
289
290 /// Node priority level
291 #[serde(default)]
292 pub priority: HealthPriority,
293
294 /// When the status was last reported
295 pub reported_at: u64,
296
297 /// How long ago the status was reported (microseconds)
298 pub age_micros: u64,
299
300 /// Whether the node is responsive (reported recently)
301 pub responsive: bool,
302}
303
304/// Centralized health monitor
305///
306/// Aggregates health status from all nodes and provides system-wide health information.
307///
308/// # Example
309///
310/// ```rust
311/// use mecha10::health::HealthMonitor;
312///
313/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
314/// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
315///
316/// // Get system-wide health
317/// let health = monitor.system_health().await?;
318/// println!("System health: {:?}", health.level);
319/// println!("Healthy: {}/{}", health.healthy_nodes, health.total_nodes);
320///
321/// // List unhealthy nodes
322/// for node in monitor.unhealthy_nodes().await? {
323/// println!("Unhealthy: {} - {}", node.node_id, node.status.message);
324/// }
325/// # Ok(())
326/// # }
327/// ```
328pub struct HealthMonitor {
329 redis_client: redis::Client,
330 cache: Arc<RwLock<Option<(SystemHealth, std::time::Instant)>>>,
331 cache_ttl: Duration,
332}
333
334impl HealthMonitor {
335 /// Connect to Redis for health monitoring
336 pub async fn connect(redis_url: &str) -> Result<Self> {
337 let redis_client = redis::Client::open(redis_url)
338 .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
339
340 Ok(Self {
341 redis_client,
342 cache: Arc::new(RwLock::new(None)),
343 cache_ttl: Duration::from_secs(5), // Cache for 5 seconds
344 })
345 }
346
347 /// Get system-wide health summary
348 pub async fn system_health(&self) -> Result<SystemHealth> {
349 // Check cache first
350 {
351 let cache = self.cache.read().await;
352 if let Some((health, cached_at)) = cache.as_ref() {
353 if cached_at.elapsed() < self.cache_ttl {
354 return Ok(health.clone());
355 }
356 }
357 }
358
359 // Fetch fresh data
360 let health = self.fetch_system_health().await?;
361
362 // Update cache
363 {
364 let mut cache = self.cache.write().await;
365 *cache = Some((health.clone(), std::time::Instant::now()));
366 }
367
368 Ok(health)
369 }
370
371 async fn fetch_system_health(&self) -> Result<SystemHealth> {
372 use redis::AsyncCommands;
373
374 let mut conn = self
375 .redis_client
376 .get_multiplexed_async_connection()
377 .await
378 .map_err(|e| Mecha10Error::Other(format!("Failed to get Redis connection: {}", e)))?;
379
380 // Get all health keys (filter out history keys)
381 let pattern = "mecha10:health:*";
382 let keys: Vec<String> = conn
383 .keys(pattern)
384 .await
385 .map_err(|e| Mecha10Error::Other(format!("Failed to list health keys: {}", e)))?;
386
387 // Filter out history keys
388 let keys: Vec<_> = keys.into_iter().filter(|k| !k.contains(":history:")).collect();
389
390 let mut nodes = HashMap::new();
391 let mut healthy_count = 0;
392 let mut degraded_count = 0;
393 let mut unhealthy_count = 0;
394 let mut unresponsive_count = 0;
395
396 let now = crate::prelude::now_micros();
397
398 for key in keys {
399 let value: Option<String> = conn
400 .get(&key)
401 .await
402 .map_err(|e| Mecha10Error::Other(format!("Failed to get health value: {}", e)))?;
403
404 if let Some(value) = value {
405 if let Ok(report) = serde_json::from_str::<HealthReport>(&value) {
406 let age_micros = now.saturating_sub(report.reported_at);
407 // Use priority-based timeout threshold
408 let timeout_micros = report.priority.timeout_micros();
409 let responsive = age_micros < timeout_micros;
410
411 if !responsive {
412 unresponsive_count += 1;
413 } else {
414 match report.status.level {
415 HealthLevel::Ok => healthy_count += 1,
416 HealthLevel::Degraded => degraded_count += 1,
417 HealthLevel::Error => unhealthy_count += 1,
418 HealthLevel::Unknown => unresponsive_count += 1,
419 }
420 }
421
422 nodes.insert(
423 report.node_id.clone(),
424 NodeHealthInfo {
425 node_id: report.node_id,
426 status: report.status,
427 priority: report.priority,
428 reported_at: report.reported_at,
429 age_micros,
430 responsive,
431 },
432 );
433 }
434 }
435 }
436
437 // Determine overall system health
438 let level = if unhealthy_count > 0 || unresponsive_count > nodes.len() / 2 {
439 HealthLevel::Error
440 } else if degraded_count > 0 || unresponsive_count > 0 {
441 HealthLevel::Degraded
442 } else if healthy_count > 0 {
443 HealthLevel::Ok
444 } else {
445 HealthLevel::Unknown
446 };
447
448 Ok(SystemHealth {
449 level,
450 total_nodes: nodes.len(),
451 healthy_nodes: healthy_count,
452 degraded_nodes: degraded_count,
453 unhealthy_nodes: unhealthy_count,
454 unresponsive_nodes: unresponsive_count,
455 timestamp: now,
456 nodes,
457 })
458 }
459
460 /// Get health status for a specific node
461 pub async fn node_health(&self, node_id: &str) -> Result<Option<NodeHealthInfo>> {
462 let system_health = self.system_health().await?;
463 Ok(system_health.nodes.get(node_id).cloned())
464 }
465
466 /// Get list of all healthy nodes
467 pub async fn healthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
468 let system_health = self.system_health().await?;
469 Ok(system_health
470 .nodes
471 .values()
472 .filter(|n| n.responsive && n.status.level == HealthLevel::Ok)
473 .cloned()
474 .collect())
475 }
476
477 /// Get list of degraded nodes
478 pub async fn degraded_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
479 let system_health = self.system_health().await?;
480 Ok(system_health
481 .nodes
482 .values()
483 .filter(|n| n.responsive && n.status.level == HealthLevel::Degraded)
484 .cloned()
485 .collect())
486 }
487
488 /// Get list of unhealthy nodes
489 pub async fn unhealthy_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
490 let system_health = self.system_health().await?;
491 Ok(system_health
492 .nodes
493 .values()
494 .filter(|n| n.responsive && n.status.level == HealthLevel::Error)
495 .cloned()
496 .collect())
497 }
498
499 /// Get list of unresponsive nodes (haven't reported recently)
500 pub async fn unresponsive_nodes(&self) -> Result<Vec<NodeHealthInfo>> {
501 let system_health = self.system_health().await?;
502 Ok(system_health
503 .nodes
504 .values()
505 .filter(|n| !n.responsive)
506 .cloned()
507 .collect())
508 }
509
510 /// Clear the cache (force fresh data on next query)
511 pub async fn clear_cache(&self) {
512 let mut cache = self.cache.write().await;
513 *cache = None;
514 }
515
516 /// Get health history for a node
517 ///
518 /// Returns health reports for the specified node over the given duration.
519 /// History is stored for up to 24 hours.
520 ///
521 /// # Arguments
522 ///
523 /// * `node_id` - The node to query history for
524 /// * `duration` - How far back to retrieve history (e.g., Duration::from_secs(3600) for 1 hour)
525 ///
526 /// # Example
527 ///
528 /// ```rust,no_run
529 /// # use mecha10::prelude::*;
530 /// # use mecha10::health::HealthMonitor;
531 /// # use std::time::Duration;
532 /// # async fn example() -> Result<()> {
533 /// let monitor = HealthMonitor::connect("redis://localhost:6379").await?;
534 ///
535 /// // Get last hour of health history
536 /// let history = monitor.node_health_history("camera_node", Duration::from_secs(3600)).await?;
537 /// println!("Found {} health reports in the last hour", history.len());
538 /// # Ok(())
539 /// # }
540 /// ```
541 pub async fn node_health_history(&self, node_id: &str, duration: Duration) -> Result<Vec<HealthReport>> {
542 use redis::AsyncCommands;
543
544 let mut conn = self
545 .redis_client
546 .get_multiplexed_async_connection()
547 .await
548 .map_err(|e| Mecha10Error::Other(format!("Failed to connect to Redis: {}", e)))?;
549
550 let history_key = format!("mecha10:health:history:{}", node_id);
551
552 // Calculate time range (in microseconds)
553 let now = crate::prelude::now_micros();
554 let duration_micros = duration.as_micros() as u64;
555 let start_time = now.saturating_sub(duration_micros);
556
557 // Query sorted set by score (timestamp)
558 let results: Vec<String> = conn
559 .zrangebyscore(&history_key, start_time as f64, now as f64)
560 .await
561 .map_err(|e| Mecha10Error::Other(format!("Failed to retrieve health history: {}", e)))?;
562
563 // Deserialize results
564 let mut reports = Vec::new();
565 for json_str in results {
566 match serde_json::from_str::<HealthReport>(&json_str) {
567 Ok(report) => reports.push(report),
568 Err(e) => {
569 eprintln!("Failed to deserialize health report: {}", e);
570 continue;
571 }
572 }
573 }
574
575 Ok(reports)
576 }
577}
578
579// ============================================================================
580// Auto-Reporting Extension
581// ============================================================================
582
583/// Extension trait for Context to enable automatic health reporting
584#[async_trait::async_trait]
585pub trait HealthReportingExt {
586 /// Start automatic health reporting with default settings (Normal priority, 5s interval)
587 ///
588 /// Spawns a background task that periodically reports health status.
589 /// The health_check_fn is called to generate the current health status.
590 ///
591 /// # Example
592 ///
593 /// ```rust
594 /// use mecha10::prelude::*;
595 /// use mecha10::health::HealthReportingExt;
596 ///
597 /// # async fn example(ctx: &Context) -> Result<()> {
598 /// // Report healthy status every 5 seconds (default)
599 /// ctx.start_health_reporting(|| async { HealthStatus::healthy() }).await?;
600 /// # Ok(())
601 /// # }
602 /// ```
603 async fn start_health_reporting<F, Fut>(&self, health_check_fn: F) -> Result<()>
604 where
605 F: Fn() -> Fut + Send + Sync + 'static,
606 Fut: std::future::Future<Output = HealthStatus> + Send;
607
608 /// Start automatic health reporting with custom configuration
609 ///
610 /// # Example
611 ///
612 /// ```rust
613 /// use mecha10::prelude::*;
614 /// use mecha10::health::HealthReportingExt;
615 /// use mecha10::node::HealthPriority;
616 ///
617 /// # async fn example(ctx: &Context) -> Result<()> {
618 /// // Report every second with Critical priority
619 /// ctx.start_health_reporting_with_config(
620 /// HealthPriority::Critical,
621 /// Duration::from_secs(1),
622 /// || async { HealthStatus::healthy() }
623 /// ).await?;
624 /// # Ok(())
625 /// # }
626 /// ```
627 async fn start_health_reporting_with_config<F, Fut>(
628 &self,
629 priority: HealthPriority,
630 interval: Duration,
631 health_check_fn: F,
632 ) -> Result<()>
633 where
634 F: Fn() -> Fut + Send + Sync + 'static,
635 Fut: std::future::Future<Output = HealthStatus> + Send;
636}
637
638#[async_trait::async_trait]
639impl HealthReportingExt for Context {
640 async fn start_health_reporting<F, Fut>(&self, health_check_fn: F) -> Result<()>
641 where
642 F: Fn() -> Fut + Send + Sync + 'static,
643 Fut: std::future::Future<Output = HealthStatus> + Send,
644 {
645 self.start_health_reporting_with_config(HealthPriority::Normal, Duration::from_secs(5), health_check_fn)
646 .await
647 }
648
649 async fn start_health_reporting_with_config<F, Fut>(
650 &self,
651 priority: HealthPriority,
652 interval: Duration,
653 health_check_fn: F,
654 ) -> Result<()>
655 where
656 F: Fn() -> Fut + Send + Sync + 'static,
657 Fut: std::future::Future<Output = HealthStatus> + Send,
658 {
659 let reporter = HealthReporter::with_config(self, priority, 60, true).await?;
660
661 tracing::info!(
662 "Health reporting enabled: priority={:?}, interval={:?}",
663 priority,
664 interval
665 );
666
667 tokio::spawn(async move {
668 let mut ticker = tokio::time::interval(interval);
669
670 loop {
671 ticker.tick().await;
672
673 let status = health_check_fn().await;
674
675 if let Err(e) = reporter.report(status).await {
676 tracing::debug!("Failed to report health: {}", e);
677 }
678 }
679 });
680
681 Ok(())
682 }
683}