1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use tokio::sync::Mutex;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum HealthStatus {
14 Healthy,
16 Unready,
18 Unhealthy,
20 Unknown,
22}
23
24#[derive(Debug, Clone)]
26pub struct HealthConfig {
27 pub check_interval: Duration,
29 pub timeout: Duration,
31 pub failure_threshold: u32,
33 pub success_threshold: u32,
35}
36
37impl Default for HealthConfig {
38 fn default() -> Self {
39 Self {
40 check_interval: Duration::from_secs(10),
41 timeout: Duration::from_secs(5),
42 failure_threshold: 3,
43 success_threshold: 2,
44 }
45 }
46}
47
48#[derive(Debug, Clone)]
50pub struct HealthCheckResult {
51 pub timestamp: Instant,
53 pub success: bool,
55 pub latency_us: u64,
57 pub error: Option<String>,
59}
60
61#[derive(Debug)]
63pub struct HealthTracker {
64 module_name: String,
66 status: Mutex<HealthStatus>,
68 failures: AtomicU32,
70 successes: AtomicU32,
72 total_checks: AtomicU64,
74 last_success: Mutex<Option<Instant>>,
76 last_failure: Mutex<Option<Instant>>,
78 config: HealthConfig,
80}
81
82impl HealthTracker {
83 pub fn new(module_name: String, config: HealthConfig) -> Self {
85 Self {
86 module_name,
87 status: Mutex::new(HealthStatus::Unknown),
88 failures: AtomicU32::new(0),
89 successes: AtomicU32::new(0),
90 total_checks: AtomicU64::new(0),
91 last_success: Mutex::new(None),
92 last_failure: Mutex::new(None),
93 config,
94 }
95 }
96
97 pub async fn record_success(&self, _latency_us: u64) {
99 self.total_checks.fetch_add(1, Ordering::Relaxed);
100 self.successes.fetch_add(1, Ordering::Relaxed);
101 self.failures.store(0, Ordering::Relaxed);
102 *self.last_success.lock().await = Some(Instant::now());
103
104 let mut status = self.status.lock().await;
105 let current = *status;
106
107 if current == HealthStatus::Unhealthy || current == HealthStatus::Unknown {
108 let successes = self.successes.load(Ordering::Relaxed);
109 if successes >= self.config.success_threshold {
110 *status = HealthStatus::Healthy;
111 } else if current == HealthStatus::Unhealthy {
112 *status = HealthStatus::Unready;
113 }
114 }
115 }
116
117 pub async fn record_failure(&self, _error: Option<String>) {
119 self.total_checks.fetch_add(1, Ordering::Relaxed);
120 self.failures.fetch_add(1, Ordering::Relaxed);
121 self.successes.store(0, Ordering::Relaxed);
122 *self.last_failure.lock().await = Some(Instant::now());
123
124 let mut status = self.status.lock().await;
125
126 let failures = self.failures.load(Ordering::Relaxed);
127 if failures >= self.config.failure_threshold {
128 *status = HealthStatus::Unhealthy;
129 }
130 }
131
132 pub async fn status(&self) -> HealthStatus {
134 *self.status.lock().await
135 }
136
137 pub async fn is_healthy(&self) -> bool {
139 *self.status.lock().await == HealthStatus::Healthy
140 }
141
142 pub async fn is_ready(&self) -> bool {
144 let status = *self.status.lock().await;
145 status == HealthStatus::Healthy || status == HealthStatus::Unready
146 }
147
148 pub fn stats(&self) -> HealthStats {
150 HealthStats {
151 module_name: self.module_name.clone(),
152 total_checks: self.total_checks.load(Ordering::Relaxed),
153 consecutive_failures: self.failures.load(Ordering::Relaxed),
154 consecutive_successes: self.successes.load(Ordering::Relaxed),
155 }
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct HealthStats {
162 pub module_name: String,
163 pub total_checks: u64,
164 pub consecutive_failures: u32,
165 pub consecutive_successes: u32,
166}
167
168#[derive(Debug)]
170pub struct HealthRegistry {
171 trackers: DashMap<String, Arc<HealthTracker>>,
173 default_config: HealthConfig,
175}
176
177impl HealthRegistry {
178 pub fn new() -> Self {
180 Self {
181 trackers: DashMap::new(),
182 default_config: HealthConfig::default(),
183 }
184 }
185
186 pub fn get_or_create(&self, module_name: &str) -> Arc<HealthTracker> {
188 self.trackers
189 .entry(module_name.to_string())
190 .or_insert_with(|| {
191 Arc::new(HealthTracker::new(
192 module_name.to_string(),
193 self.default_config.clone(),
194 ))
195 })
196 .clone()
197 }
198
199 pub async fn unhealthy_modules(&self) -> Vec<String> {
201 let mut result = Vec::new();
202 for entry in self.trackers.iter() {
203 if !entry.value().is_healthy().await {
204 result.push(entry.key().clone());
205 }
206 }
207 result
208 }
209
210 pub async fn summary(&self) -> HealthSummary {
212 let mut healthy = 0;
213 let mut unready = 0;
214 let mut unhealthy = 0;
215 let mut unknown = 0;
216
217 for entry in self.trackers.iter() {
218 match entry.value().status().await {
219 HealthStatus::Healthy => healthy += 1,
220 HealthStatus::Unready => unready += 1,
221 HealthStatus::Unhealthy => unhealthy += 1,
222 HealthStatus::Unknown => unknown += 1,
223 }
224 }
225
226 HealthSummary {
227 total: self.trackers.len(),
228 healthy,
229 unready,
230 unhealthy,
231 unknown,
232 }
233 }
234}
235
236impl Default for HealthRegistry {
237 fn default() -> Self {
238 Self::new()
239 }
240}
241
242#[derive(Debug, Clone)]
244pub struct HealthSummary {
245 pub total: usize,
246 pub healthy: usize,
247 pub unready: usize,
248 pub unhealthy: usize,
249 pub unknown: usize,
250}
251
252use dashmap::DashMap;
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 #[tokio::test]
259 async fn test_health_tracker_transitions() {
260 let tracker = HealthTracker::new("test".to_string(), HealthConfig::default());
261
262 assert_eq!(tracker.status().await, HealthStatus::Unknown);
264
265 tracker.record_success(100).await;
267 tracker.record_success(100).await;
268
269 assert_eq!(tracker.status().await, HealthStatus::Healthy);
270 assert!(tracker.is_healthy().await);
271 assert!(tracker.is_ready().await);
272 }
273
274 #[tokio::test]
275 async fn test_health_tracker_failure() {
276 let config = HealthConfig {
277 failure_threshold: 2,
278 ..HealthConfig::default()
279 };
280 let tracker = HealthTracker::new("test".to_string(), config);
281
282 tracker.record_failure(Some("error".to_string())).await;
284 tracker.record_failure(Some("error".to_string())).await;
285 tracker.record_failure(Some("error".to_string())).await;
286
287 assert_eq!(tracker.status().await, HealthStatus::Unhealthy);
288 assert!(!tracker.is_healthy().await);
289 assert!(!tracker.is_ready().await);
290 }
291}