Skip to main content

varpulis_runtime/
health.rs

1//! Health monitoring for Varpulis components
2//!
3//! Provides per-component health reporting and system-level health aggregation.
4//! Components implement [`HealthReporter`] to participate in health checks.
5//!
6//! ## Health Probes
7//!
8//! - **Liveness** (`/health/live`): always 200 — process is alive
9//! - **Readiness** (`/health/ready`): 200 when all components are healthy
10//! - **Startup** (`/health/started`): 200 once the engine has started
11//! - **Detailed** (`/health`): full JSON with per-component status
12
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15
16use serde::Serialize;
17
18/// Health status of a component.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
20#[serde(rename_all = "lowercase")]
21pub enum HealthStatus {
22    Up,
23    Degraded,
24    Down,
25}
26
27impl HealthStatus {
28    /// Returns the worst (most severe) of two statuses.
29    pub const fn worst(self, other: Self) -> Self {
30        match (self, other) {
31            (Self::Down, _) | (_, Self::Down) => Self::Down,
32            (Self::Degraded, _) | (_, Self::Degraded) => Self::Degraded,
33            _ => Self::Up,
34        }
35    }
36}
37
38/// Type of component being monitored.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
40#[serde(rename_all = "lowercase")]
41pub enum ComponentType {
42    Connector,
43    Engine,
44    Actor,
45    Worker,
46}
47
48/// Health information for a single component.
49#[derive(Debug, Clone, Serialize)]
50pub struct ComponentHealth {
51    pub name: String,
52    pub component_type: ComponentType,
53    pub status: HealthStatus,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub reason: Option<String>,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub details: Option<serde_json::Value>,
58}
59
60/// Trait for components that can report their health.
61pub trait HealthReporter: Send + Sync {
62    fn health(&self) -> ComponentHealth;
63}
64
65/// System-wide health summary.
66#[derive(Debug, Clone, Serialize)]
67pub struct SystemHealth {
68    /// Aggregate status (worst-component-wins).
69    pub status: HealthStatus,
70    /// Per-component health details.
71    pub components: Vec<ComponentHealth>,
72    /// ISO 8601 timestamp of this health check.
73    pub timestamp: String,
74}
75
76/// Central registry of health reporters.
77pub struct HealthRegistry {
78    reporters: Vec<Arc<dyn HealthReporter>>,
79    started: bool,
80    ready: Arc<AtomicBool>,
81}
82
83impl std::fmt::Debug for HealthRegistry {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("HealthRegistry")
86            .field("reporters_count", &self.reporters.len())
87            .field("started", &self.started)
88            .field("ready", &self.ready)
89            .finish_non_exhaustive()
90    }
91}
92
93impl HealthRegistry {
94    pub fn new() -> Self {
95        Self {
96            reporters: Vec::new(),
97            started: false,
98            ready: Arc::new(AtomicBool::new(false)),
99        }
100    }
101
102    /// Register a component for health monitoring.
103    pub fn register(&mut self, reporter: Arc<dyn HealthReporter>) {
104        self.reporters.push(reporter);
105    }
106
107    /// Mark the system as started.
108    pub const fn mark_started(&mut self) {
109        self.started = true;
110    }
111
112    /// Mark the system as ready to accept traffic.
113    pub fn mark_ready(&self) {
114        self.ready.store(true, Ordering::SeqCst);
115    }
116
117    /// Check all components and produce a system health summary.
118    pub fn system_health(&self) -> SystemHealth {
119        let components: Vec<ComponentHealth> = self.reporters.iter().map(|r| r.health()).collect();
120
121        let status = components
122            .iter()
123            .fold(HealthStatus::Up, |acc, c| acc.worst(c.status));
124
125        SystemHealth {
126            status,
127            components,
128            timestamp: chrono::Utc::now().to_rfc3339(),
129        }
130    }
131
132    /// Liveness check — always true if the process is running.
133    pub const fn is_live(&self) -> bool {
134        true
135    }
136
137    /// Readiness check — true when all components are healthy.
138    pub fn is_ready(&self) -> bool {
139        self.ready.load(Ordering::SeqCst)
140    }
141
142    /// Startup check — true once `mark_started` has been called.
143    pub const fn is_started(&self) -> bool {
144        self.started
145    }
146}
147
148impl Default for HealthRegistry {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154/// Adapter that converts a [`ConnectorHealthReport`](crate::connector::ConnectorHealthReport)
155/// into the health monitoring system (async-runtime only).
156#[cfg(feature = "async-runtime")]
157pub struct ConnectorHealthAdapter {
158    name: String,
159    health_fn: Box<dyn Fn() -> crate::connector::ConnectorHealthReport + Send + Sync>,
160}
161
162#[cfg(feature = "async-runtime")]
163impl std::fmt::Debug for ConnectorHealthAdapter {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        f.debug_struct("ConnectorHealthAdapter")
166            .field("name", &self.name)
167            .finish_non_exhaustive()
168    }
169}
170
171#[cfg(feature = "async-runtime")]
172impl ConnectorHealthAdapter {
173    pub fn new<F>(name: String, health_fn: F) -> Self
174    where
175        F: Fn() -> crate::connector::ConnectorHealthReport + Send + Sync + 'static,
176    {
177        Self {
178            name,
179            health_fn: Box::new(health_fn),
180        }
181    }
182}
183
184#[cfg(feature = "async-runtime")]
185impl HealthReporter for ConnectorHealthAdapter {
186    fn health(&self) -> ComponentHealth {
187        let report = (self.health_fn)();
188        let status = if report.connected && report.circuit_breaker_state == "closed" {
189            HealthStatus::Up
190        } else if report.circuit_breaker_state == "half_open" {
191            HealthStatus::Degraded
192        } else {
193            HealthStatus::Down
194        };
195
196        ComponentHealth {
197            name: self.name.clone(),
198            component_type: ComponentType::Connector,
199            status,
200            reason: report.last_error,
201            details: Some(serde_json::json!({
202                "messages_received": report.messages_received,
203                "seconds_since_last_message": report.seconds_since_last_message,
204                "circuit_breaker_state": report.circuit_breaker_state,
205                "circuit_breaker_failures": report.circuit_breaker_failures,
206            })),
207        }
208    }
209}
210
211/// Simple health reporter for the Engine component.
212#[derive(Debug)]
213pub struct EngineHealthReporter {
214    name: String,
215    streams_loaded: Arc<AtomicBool>,
216}
217
218impl EngineHealthReporter {
219    pub fn new(name: String) -> Self {
220        Self {
221            name,
222            streams_loaded: Arc::new(AtomicBool::new(false)),
223        }
224    }
225
226    /// Mark that streams have been loaded.
227    pub fn set_streams_loaded(&self) {
228        self.streams_loaded.store(true, Ordering::SeqCst);
229    }
230
231    /// Get a shareable reference to the streams_loaded flag.
232    pub fn streams_loaded_flag(&self) -> Arc<AtomicBool> {
233        self.streams_loaded.clone()
234    }
235}
236
237impl HealthReporter for EngineHealthReporter {
238    fn health(&self) -> ComponentHealth {
239        let loaded = self.streams_loaded.load(Ordering::SeqCst);
240        ComponentHealth {
241            name: self.name.clone(),
242            component_type: ComponentType::Engine,
243            status: if loaded {
244                HealthStatus::Up
245            } else {
246                HealthStatus::Down
247            },
248            reason: if loaded {
249                None
250            } else {
251                Some("streams not loaded".into())
252            },
253            details: None,
254        }
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261
262    struct AlwaysUp;
263    impl HealthReporter for AlwaysUp {
264        fn health(&self) -> ComponentHealth {
265            ComponentHealth {
266                name: "test".into(),
267                component_type: ComponentType::Engine,
268                status: HealthStatus::Up,
269                reason: None,
270                details: None,
271            }
272        }
273    }
274
275    struct AlwaysDown;
276    impl HealthReporter for AlwaysDown {
277        fn health(&self) -> ComponentHealth {
278            ComponentHealth {
279                name: "broken".into(),
280                component_type: ComponentType::Connector,
281                status: HealthStatus::Down,
282                reason: Some("connection lost".into()),
283                details: None,
284            }
285        }
286    }
287
288    #[test]
289    fn test_health_status_worst() {
290        assert_eq!(HealthStatus::Up.worst(HealthStatus::Up), HealthStatus::Up);
291        assert_eq!(
292            HealthStatus::Up.worst(HealthStatus::Degraded),
293            HealthStatus::Degraded
294        );
295        assert_eq!(
296            HealthStatus::Degraded.worst(HealthStatus::Down),
297            HealthStatus::Down
298        );
299        assert_eq!(
300            HealthStatus::Down.worst(HealthStatus::Up),
301            HealthStatus::Down
302        );
303    }
304
305    #[test]
306    fn test_registry_empty() {
307        let registry = HealthRegistry::new();
308        let health = registry.system_health();
309        assert_eq!(health.status, HealthStatus::Up);
310        assert!(health.components.is_empty());
311    }
312
313    #[test]
314    fn test_registry_all_up() {
315        let mut registry = HealthRegistry::new();
316        registry.register(Arc::new(AlwaysUp));
317        registry.register(Arc::new(AlwaysUp));
318        let health = registry.system_health();
319        assert_eq!(health.status, HealthStatus::Up);
320        assert_eq!(health.components.len(), 2);
321    }
322
323    #[test]
324    fn test_registry_worst_wins() {
325        let mut registry = HealthRegistry::new();
326        registry.register(Arc::new(AlwaysUp));
327        registry.register(Arc::new(AlwaysDown));
328        let health = registry.system_health();
329        assert_eq!(health.status, HealthStatus::Down);
330    }
331
332    #[test]
333    fn test_liveness_always_true() {
334        let registry = HealthRegistry::new();
335        assert!(registry.is_live());
336    }
337
338    #[test]
339    fn test_readiness_and_startup() {
340        let mut registry = HealthRegistry::new();
341        assert!(!registry.is_ready());
342        assert!(!registry.is_started());
343
344        registry.mark_started();
345        assert!(registry.is_started());
346        assert!(!registry.is_ready());
347
348        registry.mark_ready();
349        assert!(registry.is_ready());
350    }
351
352    #[test]
353    fn test_engine_health_reporter() {
354        let reporter = EngineHealthReporter::new("engine-0".into());
355        let h = reporter.health();
356        assert_eq!(h.status, HealthStatus::Down);
357        assert!(h.reason.is_some());
358
359        reporter.set_streams_loaded();
360        let h = reporter.health();
361        assert_eq!(h.status, HealthStatus::Up);
362        assert!(h.reason.is_none());
363    }
364
365    #[test]
366    fn test_connector_health_adapter() {
367        let adapter = ConnectorHealthAdapter::new("mqtt-0".into(), || {
368            crate::connector::ConnectorHealthReport {
369                connected: true,
370                circuit_breaker_state: "closed".to_string(),
371                messages_received: 42,
372                ..Default::default()
373            }
374        });
375        let h = adapter.health();
376        assert_eq!(h.status, HealthStatus::Up);
377        assert_eq!(h.name, "mqtt-0");
378
379        let degraded = ConnectorHealthAdapter::new("mqtt-1".into(), || {
380            crate::connector::ConnectorHealthReport {
381                connected: true,
382                circuit_breaker_state: "half_open".to_string(),
383                ..Default::default()
384            }
385        });
386        assert_eq!(degraded.health().status, HealthStatus::Degraded);
387
388        let down = ConnectorHealthAdapter::new("mqtt-2".into(), || {
389            crate::connector::ConnectorHealthReport {
390                connected: false,
391                circuit_breaker_state: "open".to_string(),
392                ..Default::default()
393            }
394        });
395        assert_eq!(down.health().status, HealthStatus::Down);
396    }
397
398    #[test]
399    fn test_system_health_serializes() {
400        let mut registry = HealthRegistry::new();
401        registry.register(Arc::new(AlwaysUp));
402        let health = registry.system_health();
403        let json = serde_json::to_string(&health).unwrap();
404        assert!(json.contains("\"status\":\"up\""));
405        assert!(json.contains("\"timestamp\""));
406    }
407}