Skip to main content

camel_component_http/
health.rs

1use async_trait::async_trait;
2use camel_api::{AsyncHealthCheck, CheckResult};
3use camel_component_api::CamelError;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9type ProbeFuture = Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send>>;
10
11trait HttpHealthProbe: Send + Sync {
12    fn probe(&self) -> ProbeFuture;
13}
14
15struct HttpListenerProbe {
16    host: String,
17    port: u16,
18}
19
20impl HttpListenerProbe {
21    fn new(host: String, port: u16) -> Self {
22        let probe_host = match host.as_str() {
23            "0.0.0.0" => "127.0.0.1".to_string(),
24            "::" | "[::]" => "::1".to_string(),
25            _ => host,
26        };
27        Self {
28            host: probe_host,
29            port,
30        }
31    }
32}
33
34impl HttpHealthProbe for HttpListenerProbe {
35    fn probe(&self) -> ProbeFuture {
36        let addr = format!("{}:{}", self.host, self.port);
37        Box::pin(async move {
38            tokio::net::TcpStream::connect(&addr)
39                .await
40                .map(|_| ())
41                .map_err(|e| {
42                    CamelError::ProcessorError(format!(
43                        "HTTP listener health check failed for '{}': {}",
44                        addr, e
45                    ))
46                })
47        })
48    }
49}
50
51pub struct HttpHealthCheck {
52    probe: Arc<dyn HttpHealthProbe>,
53    timeout: Duration,
54}
55
56impl HttpHealthCheck {
57    pub fn new(host: String, port: u16) -> Self {
58        Self {
59            probe: Arc::new(HttpListenerProbe::new(host, port)),
60            timeout: Duration::from_secs(3),
61        }
62    }
63
64    #[cfg(test)]
65    fn with_probe_for_tests(probe: Arc<dyn HttpHealthProbe>, timeout: Duration) -> Self {
66        Self { probe, timeout }
67    }
68}
69
70#[async_trait]
71impl AsyncHealthCheck for HttpHealthCheck {
72    fn name(&self) -> &str {
73        "http"
74    }
75
76    async fn check(&self) -> CheckResult {
77        match tokio::time::timeout(self.timeout, self.probe.probe()).await {
78            Ok(Ok(())) => CheckResult::healthy(self.name()),
79            Ok(Err(err)) => CheckResult::unhealthy(self.name(), &err.to_string()),
80            Err(_) => CheckResult::unhealthy(self.name(), "HTTP listener probe timed out"),
81        }
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use camel_api::HealthStatus;
89
90    struct MockProbe {
91        responder: Arc<dyn Fn() -> ProbeFuture + Send + Sync>,
92    }
93
94    impl MockProbe {
95        fn new<F>(f: F) -> Self
96        where
97            F: Fn() -> ProbeFuture + Send + Sync + 'static,
98        {
99            Self {
100                responder: Arc::new(f),
101            }
102        }
103    }
104
105    impl HttpHealthProbe for MockProbe {
106        fn probe(&self) -> ProbeFuture {
107            (self.responder)()
108        }
109    }
110
111    #[tokio::test]
112    async fn http_health_check_healthy_when_probe_succeeds() {
113        let probe = Arc::new(MockProbe::new(|| Box::pin(async { Ok(()) })));
114        let check = HttpHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
115
116        let result = check.check().await;
117
118        assert_eq!(result.name, "http");
119        assert_eq!(result.status, HealthStatus::Healthy);
120        assert!(result.message.is_none());
121    }
122
123    #[tokio::test]
124    async fn http_health_check_unhealthy_when_probe_fails() {
125        let probe = Arc::new(MockProbe::new(|| {
126            Box::pin(async { Err(CamelError::ProcessorError("listener not bound".to_string())) })
127        }));
128        let check = HttpHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
129
130        let result = check.check().await;
131
132        assert_eq!(result.name, "http");
133        assert_eq!(result.status, HealthStatus::Unhealthy);
134        assert!(
135            result
136                .message
137                .as_deref()
138                .is_some_and(|m| m.contains("listener not bound"))
139        );
140    }
141
142    #[tokio::test]
143    async fn http_health_check_unhealthy_when_probe_times_out() {
144        let probe = Arc::new(MockProbe::new(|| {
145            Box::pin(async {
146                tokio::time::sleep(Duration::from_millis(50)).await;
147                Ok(())
148            })
149        }));
150        let check = HttpHealthCheck::with_probe_for_tests(probe, Duration::from_millis(5));
151
152        let result = check.check().await;
153
154        assert_eq!(result.name, "http");
155        assert_eq!(result.status, HealthStatus::Unhealthy);
156        assert_eq!(
157            result.message.as_deref(),
158            Some("HTTP listener probe timed out")
159        );
160    }
161}