1use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15
16use serde::Serialize;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
20#[serde(rename_all = "lowercase")]
21pub enum HealthStatus {
22 Up,
23 Degraded,
24 Down,
25}
26
27impl HealthStatus {
28 pub const fn worst(self, other: Self) -> Self {
30 match (self, other) {
31 (Self::Down, _) | (_, Self::Down) => Self::Down,
32 (Self::Degraded, _) | (_, Self::Degraded) => Self::Degraded,
33 _ => Self::Up,
34 }
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
40#[serde(rename_all = "lowercase")]
41pub enum ComponentType {
42 Connector,
43 Engine,
44 Actor,
45 Worker,
46}
47
48#[derive(Debug, Clone, Serialize)]
50pub struct ComponentHealth {
51 pub name: String,
52 pub component_type: ComponentType,
53 pub status: HealthStatus,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub reason: Option<String>,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub details: Option<serde_json::Value>,
58}
59
60pub trait HealthReporter: Send + Sync {
62 fn health(&self) -> ComponentHealth;
63}
64
65#[derive(Debug, Clone, Serialize)]
67pub struct SystemHealth {
68 pub status: HealthStatus,
70 pub components: Vec<ComponentHealth>,
72 pub timestamp: String,
74}
75
76pub struct HealthRegistry {
78 reporters: Vec<Arc<dyn HealthReporter>>,
79 started: bool,
80 ready: Arc<AtomicBool>,
81}
82
83impl std::fmt::Debug for HealthRegistry {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("HealthRegistry")
86 .field("reporters_count", &self.reporters.len())
87 .field("started", &self.started)
88 .field("ready", &self.ready)
89 .finish_non_exhaustive()
90 }
91}
92
93impl HealthRegistry {
94 pub fn new() -> Self {
95 Self {
96 reporters: Vec::new(),
97 started: false,
98 ready: Arc::new(AtomicBool::new(false)),
99 }
100 }
101
102 pub fn register(&mut self, reporter: Arc<dyn HealthReporter>) {
104 self.reporters.push(reporter);
105 }
106
107 pub const fn mark_started(&mut self) {
109 self.started = true;
110 }
111
112 pub fn mark_ready(&self) {
114 self.ready.store(true, Ordering::SeqCst);
115 }
116
117 pub fn system_health(&self) -> SystemHealth {
119 let components: Vec<ComponentHealth> = self.reporters.iter().map(|r| r.health()).collect();
120
121 let status = components
122 .iter()
123 .fold(HealthStatus::Up, |acc, c| acc.worst(c.status));
124
125 SystemHealth {
126 status,
127 components,
128 timestamp: chrono::Utc::now().to_rfc3339(),
129 }
130 }
131
132 pub const fn is_live(&self) -> bool {
134 true
135 }
136
137 pub fn is_ready(&self) -> bool {
139 self.ready.load(Ordering::SeqCst)
140 }
141
142 pub const fn is_started(&self) -> bool {
144 self.started
145 }
146}
147
148impl Default for HealthRegistry {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154#[cfg(feature = "async-runtime")]
157pub struct ConnectorHealthAdapter {
158 name: String,
159 health_fn: Box<dyn Fn() -> crate::connector::ConnectorHealthReport + Send + Sync>,
160}
161
162#[cfg(feature = "async-runtime")]
163impl std::fmt::Debug for ConnectorHealthAdapter {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 f.debug_struct("ConnectorHealthAdapter")
166 .field("name", &self.name)
167 .finish_non_exhaustive()
168 }
169}
170
171#[cfg(feature = "async-runtime")]
172impl ConnectorHealthAdapter {
173 pub fn new<F>(name: String, health_fn: F) -> Self
174 where
175 F: Fn() -> crate::connector::ConnectorHealthReport + Send + Sync + 'static,
176 {
177 Self {
178 name,
179 health_fn: Box::new(health_fn),
180 }
181 }
182}
183
184#[cfg(feature = "async-runtime")]
185impl HealthReporter for ConnectorHealthAdapter {
186 fn health(&self) -> ComponentHealth {
187 let report = (self.health_fn)();
188 let status = if report.connected && report.circuit_breaker_state == "closed" {
189 HealthStatus::Up
190 } else if report.circuit_breaker_state == "half_open" {
191 HealthStatus::Degraded
192 } else {
193 HealthStatus::Down
194 };
195
196 ComponentHealth {
197 name: self.name.clone(),
198 component_type: ComponentType::Connector,
199 status,
200 reason: report.last_error,
201 details: Some(serde_json::json!({
202 "messages_received": report.messages_received,
203 "seconds_since_last_message": report.seconds_since_last_message,
204 "circuit_breaker_state": report.circuit_breaker_state,
205 "circuit_breaker_failures": report.circuit_breaker_failures,
206 })),
207 }
208 }
209}
210
211#[derive(Debug)]
213pub struct EngineHealthReporter {
214 name: String,
215 streams_loaded: Arc<AtomicBool>,
216}
217
218impl EngineHealthReporter {
219 pub fn new(name: String) -> Self {
220 Self {
221 name,
222 streams_loaded: Arc::new(AtomicBool::new(false)),
223 }
224 }
225
226 pub fn set_streams_loaded(&self) {
228 self.streams_loaded.store(true, Ordering::SeqCst);
229 }
230
231 pub fn streams_loaded_flag(&self) -> Arc<AtomicBool> {
233 self.streams_loaded.clone()
234 }
235}
236
237impl HealthReporter for EngineHealthReporter {
238 fn health(&self) -> ComponentHealth {
239 let loaded = self.streams_loaded.load(Ordering::SeqCst);
240 ComponentHealth {
241 name: self.name.clone(),
242 component_type: ComponentType::Engine,
243 status: if loaded {
244 HealthStatus::Up
245 } else {
246 HealthStatus::Down
247 },
248 reason: if loaded {
249 None
250 } else {
251 Some("streams not loaded".into())
252 },
253 details: None,
254 }
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261
262 struct AlwaysUp;
263 impl HealthReporter for AlwaysUp {
264 fn health(&self) -> ComponentHealth {
265 ComponentHealth {
266 name: "test".into(),
267 component_type: ComponentType::Engine,
268 status: HealthStatus::Up,
269 reason: None,
270 details: None,
271 }
272 }
273 }
274
275 struct AlwaysDown;
276 impl HealthReporter for AlwaysDown {
277 fn health(&self) -> ComponentHealth {
278 ComponentHealth {
279 name: "broken".into(),
280 component_type: ComponentType::Connector,
281 status: HealthStatus::Down,
282 reason: Some("connection lost".into()),
283 details: None,
284 }
285 }
286 }
287
288 #[test]
289 fn test_health_status_worst() {
290 assert_eq!(HealthStatus::Up.worst(HealthStatus::Up), HealthStatus::Up);
291 assert_eq!(
292 HealthStatus::Up.worst(HealthStatus::Degraded),
293 HealthStatus::Degraded
294 );
295 assert_eq!(
296 HealthStatus::Degraded.worst(HealthStatus::Down),
297 HealthStatus::Down
298 );
299 assert_eq!(
300 HealthStatus::Down.worst(HealthStatus::Up),
301 HealthStatus::Down
302 );
303 }
304
305 #[test]
306 fn test_registry_empty() {
307 let registry = HealthRegistry::new();
308 let health = registry.system_health();
309 assert_eq!(health.status, HealthStatus::Up);
310 assert!(health.components.is_empty());
311 }
312
313 #[test]
314 fn test_registry_all_up() {
315 let mut registry = HealthRegistry::new();
316 registry.register(Arc::new(AlwaysUp));
317 registry.register(Arc::new(AlwaysUp));
318 let health = registry.system_health();
319 assert_eq!(health.status, HealthStatus::Up);
320 assert_eq!(health.components.len(), 2);
321 }
322
323 #[test]
324 fn test_registry_worst_wins() {
325 let mut registry = HealthRegistry::new();
326 registry.register(Arc::new(AlwaysUp));
327 registry.register(Arc::new(AlwaysDown));
328 let health = registry.system_health();
329 assert_eq!(health.status, HealthStatus::Down);
330 }
331
332 #[test]
333 fn test_liveness_always_true() {
334 let registry = HealthRegistry::new();
335 assert!(registry.is_live());
336 }
337
338 #[test]
339 fn test_readiness_and_startup() {
340 let mut registry = HealthRegistry::new();
341 assert!(!registry.is_ready());
342 assert!(!registry.is_started());
343
344 registry.mark_started();
345 assert!(registry.is_started());
346 assert!(!registry.is_ready());
347
348 registry.mark_ready();
349 assert!(registry.is_ready());
350 }
351
352 #[test]
353 fn test_engine_health_reporter() {
354 let reporter = EngineHealthReporter::new("engine-0".into());
355 let h = reporter.health();
356 assert_eq!(h.status, HealthStatus::Down);
357 assert!(h.reason.is_some());
358
359 reporter.set_streams_loaded();
360 let h = reporter.health();
361 assert_eq!(h.status, HealthStatus::Up);
362 assert!(h.reason.is_none());
363 }
364
365 #[test]
366 fn test_connector_health_adapter() {
367 let adapter = ConnectorHealthAdapter::new("mqtt-0".into(), || {
368 crate::connector::ConnectorHealthReport {
369 connected: true,
370 circuit_breaker_state: "closed".to_string(),
371 messages_received: 42,
372 ..Default::default()
373 }
374 });
375 let h = adapter.health();
376 assert_eq!(h.status, HealthStatus::Up);
377 assert_eq!(h.name, "mqtt-0");
378
379 let degraded = ConnectorHealthAdapter::new("mqtt-1".into(), || {
380 crate::connector::ConnectorHealthReport {
381 connected: true,
382 circuit_breaker_state: "half_open".to_string(),
383 ..Default::default()
384 }
385 });
386 assert_eq!(degraded.health().status, HealthStatus::Degraded);
387
388 let down = ConnectorHealthAdapter::new("mqtt-2".into(), || {
389 crate::connector::ConnectorHealthReport {
390 connected: false,
391 circuit_breaker_state: "open".to_string(),
392 ..Default::default()
393 }
394 });
395 assert_eq!(down.health().status, HealthStatus::Down);
396 }
397
398 #[test]
399 fn test_system_health_serializes() {
400 let mut registry = HealthRegistry::new();
401 registry.register(Arc::new(AlwaysUp));
402 let health = registry.system_health();
403 let json = serde_json::to_string(&health).unwrap();
404 assert!(json.contains("\"status\":\"up\""));
405 assert!(json.contains("\"timestamp\""));
406 }
407}