camel_component_http/
health.rs1use async_trait::async_trait;
2use camel_api::{AsyncHealthCheck, CheckResult};
3use camel_component_api::CamelError;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9type ProbeFuture = Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send>>;
10
11trait HttpHealthProbe: Send + Sync {
12 fn probe(&self) -> ProbeFuture;
13}
14
15struct HttpListenerProbe {
16 host: String,
17 port: u16,
18}
19
20impl HttpListenerProbe {
21 fn new(host: String, port: u16) -> Self {
22 let probe_host = match host.as_str() {
23 "0.0.0.0" => "127.0.0.1".to_string(),
24 "::" | "[::]" => "::1".to_string(),
25 _ => host,
26 };
27 Self {
28 host: probe_host,
29 port,
30 }
31 }
32}
33
34impl HttpHealthProbe for HttpListenerProbe {
35 fn probe(&self) -> ProbeFuture {
36 let addr = format!("{}:{}", self.host, self.port);
37 Box::pin(async move {
38 tokio::net::TcpStream::connect(&addr)
39 .await
40 .map(|_| ())
41 .map_err(|e| {
42 CamelError::ProcessorError(format!(
43 "HTTP listener health check failed for '{}': {}",
44 addr, e
45 ))
46 })
47 })
48 }
49}
50
51pub struct HttpHealthCheck {
52 probe: Arc<dyn HttpHealthProbe>,
53 timeout: Duration,
54}
55
56impl HttpHealthCheck {
57 pub fn new(host: String, port: u16) -> Self {
58 Self {
59 probe: Arc::new(HttpListenerProbe::new(host, port)),
60 timeout: Duration::from_secs(3),
61 }
62 }
63
64 #[cfg(test)]
65 fn with_probe_for_tests(probe: Arc<dyn HttpHealthProbe>, timeout: Duration) -> Self {
66 Self { probe, timeout }
67 }
68}
69
70#[async_trait]
71impl AsyncHealthCheck for HttpHealthCheck {
72 fn name(&self) -> &str {
73 "http"
74 }
75
76 async fn check(&self) -> CheckResult {
77 match tokio::time::timeout(self.timeout, self.probe.probe()).await {
78 Ok(Ok(())) => CheckResult::healthy(self.name()),
79 Ok(Err(err)) => CheckResult::unhealthy(self.name(), &err.to_string()),
80 Err(_) => CheckResult::unhealthy(self.name(), "HTTP listener probe timed out"),
81 }
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use camel_api::HealthStatus;
89
90 struct MockProbe {
91 responder: Arc<dyn Fn() -> ProbeFuture + Send + Sync>,
92 }
93
94 impl MockProbe {
95 fn new<F>(f: F) -> Self
96 where
97 F: Fn() -> ProbeFuture + Send + Sync + 'static,
98 {
99 Self {
100 responder: Arc::new(f),
101 }
102 }
103 }
104
105 impl HttpHealthProbe for MockProbe {
106 fn probe(&self) -> ProbeFuture {
107 (self.responder)()
108 }
109 }
110
111 #[tokio::test]
112 async fn http_health_check_healthy_when_probe_succeeds() {
113 let probe = Arc::new(MockProbe::new(|| Box::pin(async { Ok(()) })));
114 let check = HttpHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
115
116 let result = check.check().await;
117
118 assert_eq!(result.name, "http");
119 assert_eq!(result.status, HealthStatus::Healthy);
120 assert!(result.message.is_none());
121 }
122
123 #[tokio::test]
124 async fn http_health_check_unhealthy_when_probe_fails() {
125 let probe = Arc::new(MockProbe::new(|| {
126 Box::pin(async { Err(CamelError::ProcessorError("listener not bound".to_string())) })
127 }));
128 let check = HttpHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
129
130 let result = check.check().await;
131
132 assert_eq!(result.name, "http");
133 assert_eq!(result.status, HealthStatus::Unhealthy);
134 assert!(
135 result
136 .message
137 .as_deref()
138 .is_some_and(|m| m.contains("listener not bound"))
139 );
140 }
141
142 #[tokio::test]
143 async fn http_health_check_unhealthy_when_probe_times_out() {
144 let probe = Arc::new(MockProbe::new(|| {
145 Box::pin(async {
146 tokio::time::sleep(Duration::from_millis(50)).await;
147 Ok(())
148 })
149 }));
150 let check = HttpHealthCheck::with_probe_for_tests(probe, Duration::from_millis(5));
151
152 let result = check.check().await;
153
154 assert_eq!(result.name, "http");
155 assert_eq!(result.status, HealthStatus::Unhealthy);
156 assert_eq!(
157 result.message.as_deref(),
158 Some("HTTP listener probe timed out")
159 );
160 }
161}