camel_component_container/
health.rs1use async_trait::async_trait;
2use camel_api::{AsyncHealthCheck, CheckResult};
3use camel_component_api::CamelError;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::ContainerConfig;
10
11type ProbeFuture = Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send>>;
12
13trait ContainerHealthProbe: Send + Sync {
15 fn probe(&self) -> ProbeFuture;
16}
17
18struct DockerPingProbe {
20 config: ContainerConfig,
21}
22
23impl DockerPingProbe {
24 fn new(config: &ContainerConfig) -> Self {
25 Self {
26 config: config.clone(),
27 }
28 }
29}
30
31impl ContainerHealthProbe for DockerPingProbe {
32 fn probe(&self) -> ProbeFuture {
33 let config = self.config.clone();
34 Box::pin(async move {
35 let docker = config.connect_docker_client()?;
36 docker
37 .ping()
38 .await
39 .map(|_| ())
40 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))
41 })
42 }
43}
44
45pub struct ContainerHealthCheck {
47 probe: Arc<dyn ContainerHealthProbe>,
48 timeout: Duration,
49}
50
51impl ContainerHealthCheck {
52 pub fn new(config: &ContainerConfig) -> Self {
55 Self {
56 probe: Arc::new(DockerPingProbe::new(config)),
57 timeout: Duration::from_secs(2),
58 }
59 }
60
61 #[cfg(test)]
62 fn with_probe_for_tests(probe: Arc<dyn ContainerHealthProbe>, timeout: Duration) -> Self {
63 Self { probe, timeout }
64 }
65}
66
67#[async_trait]
68impl AsyncHealthCheck for ContainerHealthCheck {
69 fn name(&self) -> &str {
70 "container"
71 }
72
73 async fn check(&self) -> CheckResult {
74 match tokio::time::timeout(self.timeout, self.probe.probe()).await {
75 Ok(Ok(())) => CheckResult::healthy(self.name()),
76 Ok(Err(err)) => CheckResult::unhealthy(self.name(), &err.to_string()),
77 Err(_) => CheckResult::unhealthy(self.name(), "Docker ping timed out"),
78 }
79 }
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85 use camel_api::HealthStatus;
86
87 struct MockProbe {
88 responder: Arc<dyn Fn() -> ProbeFuture + Send + Sync>,
89 }
90
91 impl MockProbe {
92 fn new<F>(f: F) -> Self
93 where
94 F: Fn() -> ProbeFuture + Send + Sync + 'static,
95 {
96 Self {
97 responder: Arc::new(f),
98 }
99 }
100 }
101
102 impl ContainerHealthProbe for MockProbe {
103 fn probe(&self) -> ProbeFuture {
104 (self.responder)()
105 }
106 }
107
108 #[tokio::test]
109 async fn container_health_check_healthy_on_ping_ok() {
110 let probe = Arc::new(MockProbe::new(|| Box::pin(async { Ok(()) })));
111 let check = ContainerHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
112
113 let result = check.check().await;
114
115 assert_eq!(result.name, "container");
116 assert_eq!(result.status, HealthStatus::Healthy);
117 assert!(result.message.is_none());
118 }
119
120 #[tokio::test]
121 async fn container_health_check_unhealthy_on_error() {
122 let probe = Arc::new(MockProbe::new(|| {
123 Box::pin(async {
124 Err(CamelError::ProcessorError(
125 "simulated docker error".to_string(),
126 ))
127 })
128 }));
129 let check = ContainerHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
130
131 let result = check.check().await;
132
133 assert_eq!(result.name, "container");
134 assert_eq!(result.status, HealthStatus::Unhealthy);
135 assert!(
136 result
137 .message
138 .as_deref()
139 .is_some_and(|m| m.contains("simulated docker error"))
140 );
141 }
142
143 #[tokio::test]
144 async fn container_health_check_unhealthy_on_timeout() {
145 let probe = Arc::new(MockProbe::new(|| {
146 Box::pin(async {
147 tokio::time::sleep(Duration::from_millis(50)).await;
148 Ok(())
149 })
150 }));
151 let check = ContainerHealthCheck::with_probe_for_tests(probe, Duration::from_millis(5));
152
153 let result = check.check().await;
154
155 assert_eq!(result.name, "container");
156 assert_eq!(result.status, HealthStatus::Unhealthy);
157 assert_eq!(result.message.as_deref(), Some("Docker ping timed out"));
158 }
159}