Skip to main content

varpulis_runtime/
health.rs

1//! Health monitoring for Varpulis components
2//!
3//! Provides per-component health reporting and system-level health aggregation.
4//! Components implement [`HealthReporter`] to participate in health checks.
5//!
6//! ## Health Probes
7//!
8//! - **Liveness** (`/health/live`): always 200 — process is alive
9//! - **Readiness** (`/health/ready`): 200 when all components are healthy
10//! - **Startup** (`/health/started`): 200 once the engine has started
11//! - **Detailed** (`/health`): full JSON with per-component status
12
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15
16use serde::Serialize;
17
18/// Health status of a component.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
20#[serde(rename_all = "lowercase")]
21pub enum HealthStatus {
22    Up,
23    Degraded,
24    Down,
25}
26
27impl HealthStatus {
28    /// Returns the worst (most severe) of two statuses.
29    pub const fn worst(self, other: Self) -> Self {
30        match (self, other) {
31            (Self::Down, _) | (_, Self::Down) => Self::Down,
32            (Self::Degraded, _) | (_, Self::Degraded) => Self::Degraded,
33            _ => Self::Up,
34        }
35    }
36}
37
38/// Type of component being monitored.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
40#[serde(rename_all = "lowercase")]
41pub enum ComponentType {
42    Connector,
43    Engine,
44    Actor,
45    Worker,
46}
47
48/// Health information for a single component.
49#[derive(Debug, Clone, Serialize)]
50pub struct ComponentHealth {
51    pub name: String,
52    pub component_type: ComponentType,
53    pub status: HealthStatus,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub reason: Option<String>,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub details: Option<serde_json::Value>,
58}
59
60/// Trait for components that can report their health.
61pub trait HealthReporter: Send + Sync {
62    fn health(&self) -> ComponentHealth;
63}
64
65/// System-wide health summary.
66#[derive(Debug, Clone, Serialize)]
67pub struct SystemHealth {
68    /// Aggregate status (worst-component-wins).
69    pub status: HealthStatus,
70    /// Per-component health details.
71    pub components: Vec<ComponentHealth>,
72    /// ISO 8601 timestamp of this health check.
73    pub timestamp: String,
74}
75
76/// Central registry of health reporters.
77pub struct HealthRegistry {
78    reporters: Vec<Arc<dyn HealthReporter>>,
79    started: bool,
80    ready: Arc<AtomicBool>,
81}
82
83impl std::fmt::Debug for HealthRegistry {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("HealthRegistry")
86            .field("reporters_count", &self.reporters.len())
87            .field("started", &self.started)
88            .field("ready", &self.ready)
89            .finish_non_exhaustive()
90    }
91}
92
93impl HealthRegistry {
94    pub fn new() -> Self {
95        Self {
96            reporters: Vec::new(),
97            started: false,
98            ready: Arc::new(AtomicBool::new(false)),
99        }
100    }
101
102    /// Register a component for health monitoring.
103    pub fn register(&mut self, reporter: Arc<dyn HealthReporter>) {
104        self.reporters.push(reporter);
105    }
106
107    /// Mark the system as started.
108    pub const fn mark_started(&mut self) {
109        self.started = true;
110    }
111
112    /// Mark the system as ready to accept traffic.
113    pub fn mark_ready(&self) {
114        self.ready.store(true, Ordering::SeqCst);
115    }
116
117    /// Check all components and produce a system health summary.
118    pub fn system_health(&self) -> SystemHealth {
119        let components: Vec<ComponentHealth> = self.reporters.iter().map(|r| r.health()).collect();
120
121        let status = components
122            .iter()
123            .fold(HealthStatus::Up, |acc, c| acc.worst(c.status));
124
125        SystemHealth {
126            status,
127            components,
128            timestamp: chrono::Utc::now().to_rfc3339(),
129        }
130    }
131
132    /// Liveness check — always true if the process is running.
133    pub const fn is_live(&self) -> bool {
134        true
135    }
136
137    /// Readiness check — true when all components are healthy.
138    pub fn is_ready(&self) -> bool {
139        self.ready.load(Ordering::SeqCst)
140    }
141
142    /// Startup check — true once `mark_started` has been called.
143    pub const fn is_started(&self) -> bool {
144        self.started
145    }
146}
147
148impl Default for HealthRegistry {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154/// Adapter that converts a [`ConnectorHealthReport`](crate::connector::ConnectorHealthReport)
155/// into the health monitoring system.
156pub struct ConnectorHealthAdapter {
157    name: String,
158    health_fn: Box<dyn Fn() -> crate::connector::ConnectorHealthReport + Send + Sync>,
159}
160
161impl std::fmt::Debug for ConnectorHealthAdapter {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        f.debug_struct("ConnectorHealthAdapter")
164            .field("name", &self.name)
165            .finish_non_exhaustive()
166    }
167}
168
169impl ConnectorHealthAdapter {
170    pub fn new<F>(name: String, health_fn: F) -> Self
171    where
172        F: Fn() -> crate::connector::ConnectorHealthReport + Send + Sync + 'static,
173    {
174        Self {
175            name,
176            health_fn: Box::new(health_fn),
177        }
178    }
179}
180
181impl HealthReporter for ConnectorHealthAdapter {
182    fn health(&self) -> ComponentHealth {
183        let report = (self.health_fn)();
184        let status = if report.connected && report.circuit_breaker_state == "closed" {
185            HealthStatus::Up
186        } else if report.circuit_breaker_state == "half_open" {
187            HealthStatus::Degraded
188        } else {
189            HealthStatus::Down
190        };
191
192        ComponentHealth {
193            name: self.name.clone(),
194            component_type: ComponentType::Connector,
195            status,
196            reason: report.last_error,
197            details: Some(serde_json::json!({
198                "messages_received": report.messages_received,
199                "seconds_since_last_message": report.seconds_since_last_message,
200                "circuit_breaker_state": report.circuit_breaker_state,
201                "circuit_breaker_failures": report.circuit_breaker_failures,
202            })),
203        }
204    }
205}
206
207/// Simple health reporter for the Engine component.
208#[derive(Debug)]
209pub struct EngineHealthReporter {
210    name: String,
211    streams_loaded: Arc<AtomicBool>,
212}
213
214impl EngineHealthReporter {
215    pub fn new(name: String) -> Self {
216        Self {
217            name,
218            streams_loaded: Arc::new(AtomicBool::new(false)),
219        }
220    }
221
222    /// Mark that streams have been loaded.
223    pub fn set_streams_loaded(&self) {
224        self.streams_loaded.store(true, Ordering::SeqCst);
225    }
226
227    /// Get a shareable reference to the streams_loaded flag.
228    pub fn streams_loaded_flag(&self) -> Arc<AtomicBool> {
229        self.streams_loaded.clone()
230    }
231}
232
233impl HealthReporter for EngineHealthReporter {
234    fn health(&self) -> ComponentHealth {
235        let loaded = self.streams_loaded.load(Ordering::SeqCst);
236        ComponentHealth {
237            name: self.name.clone(),
238            component_type: ComponentType::Engine,
239            status: if loaded {
240                HealthStatus::Up
241            } else {
242                HealthStatus::Down
243            },
244            reason: if loaded {
245                None
246            } else {
247                Some("streams not loaded".into())
248            },
249            details: None,
250        }
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    struct AlwaysUp;
259    impl HealthReporter for AlwaysUp {
260        fn health(&self) -> ComponentHealth {
261            ComponentHealth {
262                name: "test".into(),
263                component_type: ComponentType::Engine,
264                status: HealthStatus::Up,
265                reason: None,
266                details: None,
267            }
268        }
269    }
270
271    struct AlwaysDown;
272    impl HealthReporter for AlwaysDown {
273        fn health(&self) -> ComponentHealth {
274            ComponentHealth {
275                name: "broken".into(),
276                component_type: ComponentType::Connector,
277                status: HealthStatus::Down,
278                reason: Some("connection lost".into()),
279                details: None,
280            }
281        }
282    }
283
284    #[test]
285    fn test_health_status_worst() {
286        assert_eq!(HealthStatus::Up.worst(HealthStatus::Up), HealthStatus::Up);
287        assert_eq!(
288            HealthStatus::Up.worst(HealthStatus::Degraded),
289            HealthStatus::Degraded
290        );
291        assert_eq!(
292            HealthStatus::Degraded.worst(HealthStatus::Down),
293            HealthStatus::Down
294        );
295        assert_eq!(
296            HealthStatus::Down.worst(HealthStatus::Up),
297            HealthStatus::Down
298        );
299    }
300
301    #[test]
302    fn test_registry_empty() {
303        let registry = HealthRegistry::new();
304        let health = registry.system_health();
305        assert_eq!(health.status, HealthStatus::Up);
306        assert!(health.components.is_empty());
307    }
308
309    #[test]
310    fn test_registry_all_up() {
311        let mut registry = HealthRegistry::new();
312        registry.register(Arc::new(AlwaysUp));
313        registry.register(Arc::new(AlwaysUp));
314        let health = registry.system_health();
315        assert_eq!(health.status, HealthStatus::Up);
316        assert_eq!(health.components.len(), 2);
317    }
318
319    #[test]
320    fn test_registry_worst_wins() {
321        let mut registry = HealthRegistry::new();
322        registry.register(Arc::new(AlwaysUp));
323        registry.register(Arc::new(AlwaysDown));
324        let health = registry.system_health();
325        assert_eq!(health.status, HealthStatus::Down);
326    }
327
328    #[test]
329    fn test_liveness_always_true() {
330        let registry = HealthRegistry::new();
331        assert!(registry.is_live());
332    }
333
334    #[test]
335    fn test_readiness_and_startup() {
336        let mut registry = HealthRegistry::new();
337        assert!(!registry.is_ready());
338        assert!(!registry.is_started());
339
340        registry.mark_started();
341        assert!(registry.is_started());
342        assert!(!registry.is_ready());
343
344        registry.mark_ready();
345        assert!(registry.is_ready());
346    }
347
348    #[test]
349    fn test_engine_health_reporter() {
350        let reporter = EngineHealthReporter::new("engine-0".into());
351        let h = reporter.health();
352        assert_eq!(h.status, HealthStatus::Down);
353        assert!(h.reason.is_some());
354
355        reporter.set_streams_loaded();
356        let h = reporter.health();
357        assert_eq!(h.status, HealthStatus::Up);
358        assert!(h.reason.is_none());
359    }
360
361    #[test]
362    fn test_connector_health_adapter() {
363        let adapter = ConnectorHealthAdapter::new("mqtt-0".into(), || {
364            crate::connector::ConnectorHealthReport {
365                connected: true,
366                circuit_breaker_state: "closed".to_string(),
367                messages_received: 42,
368                ..Default::default()
369            }
370        });
371        let h = adapter.health();
372        assert_eq!(h.status, HealthStatus::Up);
373        assert_eq!(h.name, "mqtt-0");
374
375        let degraded = ConnectorHealthAdapter::new("mqtt-1".into(), || {
376            crate::connector::ConnectorHealthReport {
377                connected: true,
378                circuit_breaker_state: "half_open".to_string(),
379                ..Default::default()
380            }
381        });
382        assert_eq!(degraded.health().status, HealthStatus::Degraded);
383
384        let down = ConnectorHealthAdapter::new("mqtt-2".into(), || {
385            crate::connector::ConnectorHealthReport {
386                connected: false,
387                circuit_breaker_state: "open".to_string(),
388                ..Default::default()
389            }
390        });
391        assert_eq!(down.health().status, HealthStatus::Down);
392    }
393
394    #[test]
395    fn test_system_health_serializes() {
396        let mut registry = HealthRegistry::new();
397        registry.register(Arc::new(AlwaysUp));
398        let health = registry.system_health();
399        let json = serde_json::to_string(&health).unwrap();
400        assert!(json.contains("\"status\":\"up\""));
401        assert!(json.contains("\"timestamp\""));
402    }
403}