Skip to main content

camel_api/
health.rs

1//! Health monitoring types for rust-camel.
2//!
3//! This module provides types for tracking and reporting the health status
4//! of services in a rust-camel application.
5
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9
10use crate::lifecycle::{HealthStatus, ServiceStatus};
11
12/// System-wide health report containing aggregated status of all services.
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct HealthReport {
15    pub status: HealthStatus,
16    pub services: Vec<ServiceHealth>,
17    pub timestamp: DateTime<Utc>,
18}
19
20impl Default for HealthReport {
21    fn default() -> Self {
22        Self {
23            status: HealthStatus::Healthy,
24            services: Vec::new(),
25            timestamp: Utc::now(),
26        }
27    }
28}
29
30/// Health status of an individual service.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ServiceHealth {
33    pub name: String,
34    pub status: ServiceStatus,
35    #[serde(default)]
36    pub message: Option<String>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct CheckResult {
41    pub name: String,
42    pub status: HealthStatus,
43    pub message: Option<String>,
44}
45
46impl CheckResult {
47    pub fn healthy(name: &str) -> Self {
48        Self {
49            name: name.to_string(),
50            status: HealthStatus::Healthy,
51            message: None,
52        }
53    }
54
55    pub fn unhealthy(name: &str, reason: &str) -> Self {
56        Self {
57            name: name.to_string(),
58            status: HealthStatus::Unhealthy,
59            message: Some(reason.to_string()),
60        }
61    }
62
63    pub fn degraded(name: &str, reason: &str) -> Self {
64        Self {
65            name: name.to_string(),
66            status: HealthStatus::Degraded,
67            message: Some(reason.to_string()),
68        }
69    }
70}
71
72#[async_trait]
73pub trait AsyncHealthCheck: Send + Sync {
74    fn name(&self) -> &str;
75    async fn check(&self) -> CheckResult;
76}
77
78/// Programmatic health state readable by platform adapters.
79/// `camel-health` implements this; `camel-platform-kubernetes` consumes it via this trait.
80/// Neither crate depends on the other.
81#[async_trait]
82pub trait HealthSource: Send + Sync {
83    async fn liveness(&self) -> HealthStatus;
84    async fn readiness(&self) -> HealthStatus;
85
86    async fn health_report(&self) -> HealthReport {
87        HealthReport {
88            status: self.readiness().await,
89            services: vec![],
90            timestamp: chrono::Utc::now(),
91        }
92    }
93
94    /// Default: `Healthy` — non-K8s implementors need not override.
95    async fn startup(&self) -> HealthStatus {
96        HealthStatus::Healthy
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    #[tokio::test]
105    async fn test_health_source_default_startup() {
106        struct MinimalSource;
107        #[async_trait]
108        impl HealthSource for MinimalSource {
109            async fn liveness(&self) -> HealthStatus {
110                HealthStatus::Healthy
111            }
112
113            async fn readiness(&self) -> HealthStatus {
114                HealthStatus::Healthy
115            }
116        }
117        let s = MinimalSource;
118        assert_eq!(s.startup().await, HealthStatus::Healthy);
119    }
120
121    #[tokio::test]
122    async fn test_health_source_custom_startup() {
123        struct BootingSource;
124        #[async_trait]
125        impl HealthSource for BootingSource {
126            async fn liveness(&self) -> HealthStatus {
127                HealthStatus::Healthy
128            }
129
130            async fn readiness(&self) -> HealthStatus {
131                HealthStatus::Healthy
132            }
133
134            async fn startup(&self) -> HealthStatus {
135                HealthStatus::Unhealthy
136            }
137        }
138        let s = BootingSource;
139        assert_eq!(s.startup().await, HealthStatus::Unhealthy);
140    }
141
142    #[test]
143    fn test_health_report_serialization() {
144        let report = HealthReport {
145            status: HealthStatus::Healthy,
146            services: vec![ServiceHealth {
147                name: "prometheus".to_string(),
148                status: ServiceStatus::Started,
149                message: None,
150            }],
151            timestamp: chrono::Utc::now(),
152        };
153
154        let json = serde_json::to_string(&report).unwrap();
155        assert!(json.contains("Healthy"));
156        assert!(json.contains("prometheus"));
157        assert!(json.contains("Started"));
158        assert!(json.contains("timestamp"));
159    }
160
161    #[test]
162    fn test_health_report_default() {
163        let report = HealthReport::default();
164        assert_eq!(report.status, HealthStatus::Healthy);
165        assert!(report.services.is_empty());
166        assert!(report.timestamp <= chrono::Utc::now());
167    }
168
169    #[test]
170    fn test_service_health_serialization_round_trip() {
171        let svc = ServiceHealth {
172            name: "kafka".to_string(),
173            status: ServiceStatus::Stopped,
174            message: None,
175        };
176
177        let json = serde_json::to_string(&svc).unwrap();
178        assert!(json.contains("kafka"));
179        assert!(json.contains("Stopped"));
180
181        let decoded: ServiceHealth = serde_json::from_str(&json).unwrap();
182        assert_eq!(decoded.name, "kafka");
183        assert_eq!(decoded.status, ServiceStatus::Stopped);
184        assert!(decoded.message.is_none());
185    }
186
187    #[tokio::test]
188    async fn test_health_source_liveness_and_readiness() {
189        struct MixedSource;
190        #[async_trait]
191        impl HealthSource for MixedSource {
192            async fn liveness(&self) -> HealthStatus {
193                HealthStatus::Healthy
194            }
195
196            async fn readiness(&self) -> HealthStatus {
197                HealthStatus::Unhealthy
198            }
199        }
200
201        let s = MixedSource;
202        assert_eq!(s.liveness().await, HealthStatus::Healthy);
203        assert_eq!(s.readiness().await, HealthStatus::Unhealthy);
204    }
205
206    #[test]
207    fn test_health_report_serialization_round_trip() {
208        let report = HealthReport {
209            status: HealthStatus::Unhealthy,
210            services: vec![
211                ServiceHealth {
212                    name: "db".to_string(),
213                    status: ServiceStatus::Started,
214                    message: None,
215                },
216                ServiceHealth {
217                    name: "queue".to_string(),
218                    status: ServiceStatus::Stopped,
219                    message: None,
220                },
221            ],
222            timestamp: chrono::Utc::now(),
223        };
224
225        let json = serde_json::to_string(&report).unwrap();
226        let back: HealthReport = serde_json::from_str(&json).unwrap();
227        assert_eq!(back.status, HealthStatus::Unhealthy);
228        assert_eq!(back.services.len(), 2);
229        assert_eq!(back.services[0].name, "db");
230        assert_eq!(back.services[1].status, ServiceStatus::Stopped);
231    }
232
233    #[tokio::test]
234    async fn test_health_source_startup_override_independent_from_readiness() {
235        struct StartupOnly;
236        #[async_trait]
237        impl HealthSource for StartupOnly {
238            async fn liveness(&self) -> HealthStatus {
239                HealthStatus::Healthy
240            }
241
242            async fn readiness(&self) -> HealthStatus {
243                HealthStatus::Unhealthy
244            }
245
246            async fn startup(&self) -> HealthStatus {
247                HealthStatus::Healthy
248            }
249        }
250
251        let source = StartupOnly;
252        assert_eq!(source.liveness().await, HealthStatus::Healthy);
253        assert_eq!(source.readiness().await, HealthStatus::Unhealthy);
254        assert_eq!(source.startup().await, HealthStatus::Healthy);
255    }
256
257    #[test]
258    fn test_check_result_healthy() {
259        let r = CheckResult::healthy("kafka");
260        assert_eq!(r.name, "kafka");
261        assert_eq!(r.status, HealthStatus::Healthy);
262        assert!(r.message.is_none());
263    }
264
265    #[test]
266    fn test_check_result_unhealthy() {
267        let r = CheckResult::unhealthy("redis", "connection refused");
268        assert_eq!(r.name, "redis");
269        assert_eq!(r.status, HealthStatus::Unhealthy);
270        assert_eq!(r.message.as_deref(), Some("connection refused"));
271    }
272
273    #[test]
274    fn test_check_result_degraded() {
275        let r = CheckResult::degraded("sql", "pool exhausted");
276        assert_eq!(r.name, "sql");
277        assert_eq!(r.status, HealthStatus::Degraded);
278        assert_eq!(r.message.as_deref(), Some("pool exhausted"));
279    }
280
281    #[test]
282    fn test_check_result_serialization_round_trip() {
283        let r = CheckResult::unhealthy("opensearch", "timeout");
284        let json = serde_json::to_string(&r).unwrap();
285        let back: CheckResult = serde_json::from_str(&json).unwrap();
286        assert_eq!(back.name, "opensearch");
287        assert_eq!(back.status, HealthStatus::Unhealthy);
288        assert_eq!(back.message.as_deref(), Some("timeout"));
289    }
290}