Skip to main content

memlink_runtime/
health.rs

1//! Module health checking and probes.
2//!
3//! Provides liveness and readiness probes for module health monitoring.
4
5use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use tokio::sync::Mutex;
10
11/// Health status.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum HealthStatus {
14    /// Module is healthy and ready.
15    Healthy,
16    /// Module is alive but not ready.
17    Unready,
18    /// Module is unhealthy.
19    Unhealthy,
20    /// Module health is unknown.
21    Unknown,
22}
23
24/// Health check configuration.
25#[derive(Debug, Clone)]
26pub struct HealthConfig {
27    /// Interval between health checks.
28    pub check_interval: Duration,
29    /// Timeout for health check calls.
30    pub timeout: Duration,
31    /// Number of failures before unhealthy.
32    pub failure_threshold: u32,
33    /// Number of successes before healthy.
34    pub success_threshold: u32,
35}
36
37impl Default for HealthConfig {
38    fn default() -> Self {
39        Self {
40            check_interval: Duration::from_secs(10),
41            timeout: Duration::from_secs(5),
42            failure_threshold: 3,
43            success_threshold: 2,
44        }
45    }
46}
47
48/// Health check result.
49#[derive(Debug, Clone)]
50pub struct HealthCheckResult {
51    /// Timestamp of the check.
52    pub timestamp: Instant,
53    /// Whether the check succeeded.
54    pub success: bool,
55    /// Response time in microseconds.
56    pub latency_us: u64,
57    /// Error message if failed.
58    pub error: Option<String>,
59}
60
61/// Module health tracker.
62#[derive(Debug)]
63pub struct HealthTracker {
64    /// Module name.
65    module_name: String,
66    /// Current health status.
67    status: Mutex<HealthStatus>,
68    /// Consecutive failures.
69    failures: AtomicU32,
70    /// Consecutive successes.
71    successes: AtomicU32,
72    /// Total health checks.
73    total_checks: AtomicU64,
74    /// Last successful check.
75    last_success: Mutex<Option<Instant>>,
76    /// Last failed check.
77    last_failure: Mutex<Option<Instant>>,
78    /// Configuration.
79    config: HealthConfig,
80}
81
82impl HealthTracker {
83    /// Creates a new health tracker.
84    pub fn new(module_name: String, config: HealthConfig) -> Self {
85        Self {
86            module_name,
87            status: Mutex::new(HealthStatus::Unknown),
88            failures: AtomicU32::new(0),
89            successes: AtomicU32::new(0),
90            total_checks: AtomicU64::new(0),
91            last_success: Mutex::new(None),
92            last_failure: Mutex::new(None),
93            config,
94        }
95    }
96
97    /// Records a successful health check.
98    pub async fn record_success(&self, _latency_us: u64) {
99        self.total_checks.fetch_add(1, Ordering::Relaxed);
100        self.successes.fetch_add(1, Ordering::Relaxed);
101        self.failures.store(0, Ordering::Relaxed);
102        *self.last_success.lock().await = Some(Instant::now());
103
104        let mut status = self.status.lock().await;
105        let current = *status;
106
107        if current == HealthStatus::Unhealthy || current == HealthStatus::Unknown {
108            let successes = self.successes.load(Ordering::Relaxed);
109            if successes >= self.config.success_threshold {
110                *status = HealthStatus::Healthy;
111            } else if current == HealthStatus::Unhealthy {
112                *status = HealthStatus::Unready;
113            }
114        }
115    }
116
117    /// Records a failed health check.
118    pub async fn record_failure(&self, _error: Option<String>) {
119        self.total_checks.fetch_add(1, Ordering::Relaxed);
120        self.failures.fetch_add(1, Ordering::Relaxed);
121        self.successes.store(0, Ordering::Relaxed);
122        *self.last_failure.lock().await = Some(Instant::now());
123
124        let mut status = self.status.lock().await;
125
126        let failures = self.failures.load(Ordering::Relaxed);
127        if failures >= self.config.failure_threshold {
128            *status = HealthStatus::Unhealthy;
129        }
130    }
131
132    /// Returns the current health status.
133    pub async fn status(&self) -> HealthStatus {
134        *self.status.lock().await
135    }
136
137    /// Returns whether the module is healthy.
138    pub async fn is_healthy(&self) -> bool {
139        *self.status.lock().await == HealthStatus::Healthy
140    }
141
142    /// Returns whether the module is ready.
143    pub async fn is_ready(&self) -> bool {
144        let status = *self.status.lock().await;
145        status == HealthStatus::Healthy || status == HealthStatus::Unready
146    }
147
148    /// Returns statistics.
149    pub fn stats(&self) -> HealthStats {
150        HealthStats {
151            module_name: self.module_name.clone(),
152            total_checks: self.total_checks.load(Ordering::Relaxed),
153            consecutive_failures: self.failures.load(Ordering::Relaxed),
154            consecutive_successes: self.successes.load(Ordering::Relaxed),
155        }
156    }
157}
158
159/// Health statistics.
160#[derive(Debug, Clone)]
161pub struct HealthStats {
162    pub module_name: String,
163    pub total_checks: u64,
164    pub consecutive_failures: u32,
165    pub consecutive_successes: u32,
166}
167
168/// Health check registry for all modules.
169#[derive(Debug)]
170pub struct HealthRegistry {
171    /// Health trackers by module name.
172    trackers: DashMap<String, Arc<HealthTracker>>,
173    /// Default configuration.
174    default_config: HealthConfig,
175}
176
177impl HealthRegistry {
178    /// Creates a new health registry.
179    pub fn new() -> Self {
180        Self {
181            trackers: DashMap::new(),
182            default_config: HealthConfig::default(),
183        }
184    }
185
186    /// Gets or creates a health tracker.
187    pub fn get_or_create(&self, module_name: &str) -> Arc<HealthTracker> {
188        self.trackers
189            .entry(module_name.to_string())
190            .or_insert_with(|| {
191                Arc::new(HealthTracker::new(
192                    module_name.to_string(),
193                    self.default_config.clone(),
194                ))
195            })
196            .clone()
197    }
198
199    /// Returns all unhealthy modules.
200    pub async fn unhealthy_modules(&self) -> Vec<String> {
201        let mut result = Vec::new();
202        for entry in self.trackers.iter() {
203            if !entry.value().is_healthy().await {
204                result.push(entry.key().clone());
205            }
206        }
207        result
208    }
209
210    /// Returns overall health summary.
211    pub async fn summary(&self) -> HealthSummary {
212        let mut healthy = 0;
213        let mut unready = 0;
214        let mut unhealthy = 0;
215        let mut unknown = 0;
216
217        for entry in self.trackers.iter() {
218            match entry.value().status().await {
219                HealthStatus::Healthy => healthy += 1,
220                HealthStatus::Unready => unready += 1,
221                HealthStatus::Unhealthy => unhealthy += 1,
222                HealthStatus::Unknown => unknown += 1,
223            }
224        }
225
226        HealthSummary {
227            total: self.trackers.len(),
228            healthy,
229            unready,
230            unhealthy,
231            unknown,
232        }
233    }
234}
235
236impl Default for HealthRegistry {
237    fn default() -> Self {
238        Self::new()
239    }
240}
241
242/// Overall health summary.
243#[derive(Debug, Clone)]
244pub struct HealthSummary {
245    pub total: usize,
246    pub healthy: usize,
247    pub unready: usize,
248    pub unhealthy: usize,
249    pub unknown: usize,
250}
251
252use dashmap::DashMap;
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    #[tokio::test]
259    async fn test_health_tracker_transitions() {
260        let tracker = HealthTracker::new("test".to_string(), HealthConfig::default());
261
262        // Initially unknown
263        assert_eq!(tracker.status().await, HealthStatus::Unknown);
264
265        // Record successes
266        tracker.record_success(100).await;
267        tracker.record_success(100).await;
268
269        assert_eq!(tracker.status().await, HealthStatus::Healthy);
270        assert!(tracker.is_healthy().await);
271        assert!(tracker.is_ready().await);
272    }
273
274    #[tokio::test]
275    async fn test_health_tracker_failure() {
276        let config = HealthConfig {
277            failure_threshold: 2,
278            ..HealthConfig::default()
279        };
280        let tracker = HealthTracker::new("test".to_string(), config);
281
282        // Record failures
283        tracker.record_failure(Some("error".to_string())).await;
284        tracker.record_failure(Some("error".to_string())).await;
285        tracker.record_failure(Some("error".to_string())).await;
286
287        assert_eq!(tracker.status().await, HealthStatus::Unhealthy);
288        assert!(!tracker.is_healthy().await);
289        assert!(!tracker.is_ready().await);
290    }
291}