1use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15
16use serde::Serialize;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
20#[serde(rename_all = "lowercase")]
21pub enum HealthStatus {
22 Up,
23 Degraded,
24 Down,
25}
26
27impl HealthStatus {
28 pub const fn worst(self, other: Self) -> Self {
30 match (self, other) {
31 (Self::Down, _) | (_, Self::Down) => Self::Down,
32 (Self::Degraded, _) | (_, Self::Degraded) => Self::Degraded,
33 _ => Self::Up,
34 }
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
40#[serde(rename_all = "lowercase")]
41pub enum ComponentType {
42 Connector,
43 Engine,
44 Actor,
45 Worker,
46}
47
48#[derive(Debug, Clone, Serialize)]
50pub struct ComponentHealth {
51 pub name: String,
52 pub component_type: ComponentType,
53 pub status: HealthStatus,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub reason: Option<String>,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub details: Option<serde_json::Value>,
58}
59
60pub trait HealthReporter: Send + Sync {
62 fn health(&self) -> ComponentHealth;
63}
64
65#[derive(Debug, Clone, Serialize)]
67pub struct SystemHealth {
68 pub status: HealthStatus,
70 pub components: Vec<ComponentHealth>,
72 pub timestamp: String,
74}
75
76pub struct HealthRegistry {
78 reporters: Vec<Arc<dyn HealthReporter>>,
79 started: bool,
80 ready: Arc<AtomicBool>,
81}
82
83impl std::fmt::Debug for HealthRegistry {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("HealthRegistry")
86 .field("reporters_count", &self.reporters.len())
87 .field("started", &self.started)
88 .field("ready", &self.ready)
89 .finish_non_exhaustive()
90 }
91}
92
93impl HealthRegistry {
94 pub fn new() -> Self {
95 Self {
96 reporters: Vec::new(),
97 started: false,
98 ready: Arc::new(AtomicBool::new(false)),
99 }
100 }
101
102 pub fn register(&mut self, reporter: Arc<dyn HealthReporter>) {
104 self.reporters.push(reporter);
105 }
106
107 pub const fn mark_started(&mut self) {
109 self.started = true;
110 }
111
112 pub fn mark_ready(&self) {
114 self.ready.store(true, Ordering::SeqCst);
115 }
116
117 pub fn system_health(&self) -> SystemHealth {
119 let components: Vec<ComponentHealth> = self.reporters.iter().map(|r| r.health()).collect();
120
121 let status = components
122 .iter()
123 .fold(HealthStatus::Up, |acc, c| acc.worst(c.status));
124
125 SystemHealth {
126 status,
127 components,
128 timestamp: chrono::Utc::now().to_rfc3339(),
129 }
130 }
131
132 pub const fn is_live(&self) -> bool {
134 true
135 }
136
137 pub fn is_ready(&self) -> bool {
139 self.ready.load(Ordering::SeqCst)
140 }
141
142 pub const fn is_started(&self) -> bool {
144 self.started
145 }
146}
147
148impl Default for HealthRegistry {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154pub struct ConnectorHealthAdapter {
157 name: String,
158 health_fn: Box<dyn Fn() -> crate::connector::ConnectorHealthReport + Send + Sync>,
159}
160
161impl std::fmt::Debug for ConnectorHealthAdapter {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 f.debug_struct("ConnectorHealthAdapter")
164 .field("name", &self.name)
165 .finish_non_exhaustive()
166 }
167}
168
169impl ConnectorHealthAdapter {
170 pub fn new<F>(name: String, health_fn: F) -> Self
171 where
172 F: Fn() -> crate::connector::ConnectorHealthReport + Send + Sync + 'static,
173 {
174 Self {
175 name,
176 health_fn: Box::new(health_fn),
177 }
178 }
179}
180
181impl HealthReporter for ConnectorHealthAdapter {
182 fn health(&self) -> ComponentHealth {
183 let report = (self.health_fn)();
184 let status = if report.connected && report.circuit_breaker_state == "closed" {
185 HealthStatus::Up
186 } else if report.circuit_breaker_state == "half_open" {
187 HealthStatus::Degraded
188 } else {
189 HealthStatus::Down
190 };
191
192 ComponentHealth {
193 name: self.name.clone(),
194 component_type: ComponentType::Connector,
195 status,
196 reason: report.last_error,
197 details: Some(serde_json::json!({
198 "messages_received": report.messages_received,
199 "seconds_since_last_message": report.seconds_since_last_message,
200 "circuit_breaker_state": report.circuit_breaker_state,
201 "circuit_breaker_failures": report.circuit_breaker_failures,
202 })),
203 }
204 }
205}
206
207#[derive(Debug)]
209pub struct EngineHealthReporter {
210 name: String,
211 streams_loaded: Arc<AtomicBool>,
212}
213
214impl EngineHealthReporter {
215 pub fn new(name: String) -> Self {
216 Self {
217 name,
218 streams_loaded: Arc::new(AtomicBool::new(false)),
219 }
220 }
221
222 pub fn set_streams_loaded(&self) {
224 self.streams_loaded.store(true, Ordering::SeqCst);
225 }
226
227 pub fn streams_loaded_flag(&self) -> Arc<AtomicBool> {
229 self.streams_loaded.clone()
230 }
231}
232
233impl HealthReporter for EngineHealthReporter {
234 fn health(&self) -> ComponentHealth {
235 let loaded = self.streams_loaded.load(Ordering::SeqCst);
236 ComponentHealth {
237 name: self.name.clone(),
238 component_type: ComponentType::Engine,
239 status: if loaded {
240 HealthStatus::Up
241 } else {
242 HealthStatus::Down
243 },
244 reason: if loaded {
245 None
246 } else {
247 Some("streams not loaded".into())
248 },
249 details: None,
250 }
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 struct AlwaysUp;
259 impl HealthReporter for AlwaysUp {
260 fn health(&self) -> ComponentHealth {
261 ComponentHealth {
262 name: "test".into(),
263 component_type: ComponentType::Engine,
264 status: HealthStatus::Up,
265 reason: None,
266 details: None,
267 }
268 }
269 }
270
271 struct AlwaysDown;
272 impl HealthReporter for AlwaysDown {
273 fn health(&self) -> ComponentHealth {
274 ComponentHealth {
275 name: "broken".into(),
276 component_type: ComponentType::Connector,
277 status: HealthStatus::Down,
278 reason: Some("connection lost".into()),
279 details: None,
280 }
281 }
282 }
283
284 #[test]
285 fn test_health_status_worst() {
286 assert_eq!(HealthStatus::Up.worst(HealthStatus::Up), HealthStatus::Up);
287 assert_eq!(
288 HealthStatus::Up.worst(HealthStatus::Degraded),
289 HealthStatus::Degraded
290 );
291 assert_eq!(
292 HealthStatus::Degraded.worst(HealthStatus::Down),
293 HealthStatus::Down
294 );
295 assert_eq!(
296 HealthStatus::Down.worst(HealthStatus::Up),
297 HealthStatus::Down
298 );
299 }
300
301 #[test]
302 fn test_registry_empty() {
303 let registry = HealthRegistry::new();
304 let health = registry.system_health();
305 assert_eq!(health.status, HealthStatus::Up);
306 assert!(health.components.is_empty());
307 }
308
309 #[test]
310 fn test_registry_all_up() {
311 let mut registry = HealthRegistry::new();
312 registry.register(Arc::new(AlwaysUp));
313 registry.register(Arc::new(AlwaysUp));
314 let health = registry.system_health();
315 assert_eq!(health.status, HealthStatus::Up);
316 assert_eq!(health.components.len(), 2);
317 }
318
319 #[test]
320 fn test_registry_worst_wins() {
321 let mut registry = HealthRegistry::new();
322 registry.register(Arc::new(AlwaysUp));
323 registry.register(Arc::new(AlwaysDown));
324 let health = registry.system_health();
325 assert_eq!(health.status, HealthStatus::Down);
326 }
327
328 #[test]
329 fn test_liveness_always_true() {
330 let registry = HealthRegistry::new();
331 assert!(registry.is_live());
332 }
333
334 #[test]
335 fn test_readiness_and_startup() {
336 let mut registry = HealthRegistry::new();
337 assert!(!registry.is_ready());
338 assert!(!registry.is_started());
339
340 registry.mark_started();
341 assert!(registry.is_started());
342 assert!(!registry.is_ready());
343
344 registry.mark_ready();
345 assert!(registry.is_ready());
346 }
347
348 #[test]
349 fn test_engine_health_reporter() {
350 let reporter = EngineHealthReporter::new("engine-0".into());
351 let h = reporter.health();
352 assert_eq!(h.status, HealthStatus::Down);
353 assert!(h.reason.is_some());
354
355 reporter.set_streams_loaded();
356 let h = reporter.health();
357 assert_eq!(h.status, HealthStatus::Up);
358 assert!(h.reason.is_none());
359 }
360
361 #[test]
362 fn test_connector_health_adapter() {
363 let adapter = ConnectorHealthAdapter::new("mqtt-0".into(), || {
364 crate::connector::ConnectorHealthReport {
365 connected: true,
366 circuit_breaker_state: "closed".to_string(),
367 messages_received: 42,
368 ..Default::default()
369 }
370 });
371 let h = adapter.health();
372 assert_eq!(h.status, HealthStatus::Up);
373 assert_eq!(h.name, "mqtt-0");
374
375 let degraded = ConnectorHealthAdapter::new("mqtt-1".into(), || {
376 crate::connector::ConnectorHealthReport {
377 connected: true,
378 circuit_breaker_state: "half_open".to_string(),
379 ..Default::default()
380 }
381 });
382 assert_eq!(degraded.health().status, HealthStatus::Degraded);
383
384 let down = ConnectorHealthAdapter::new("mqtt-2".into(), || {
385 crate::connector::ConnectorHealthReport {
386 connected: false,
387 circuit_breaker_state: "open".to_string(),
388 ..Default::default()
389 }
390 });
391 assert_eq!(down.health().status, HealthStatus::Down);
392 }
393
394 #[test]
395 fn test_system_health_serializes() {
396 let mut registry = HealthRegistry::new();
397 registry.register(Arc::new(AlwaysUp));
398 let health = registry.system_health();
399 let json = serde_json::to_string(&health).unwrap();
400 assert!(json.contains("\"status\":\"up\""));
401 assert!(json.contains("\"timestamp\""));
402 }
403}