Skip to main content

camel_component_container/
health.rs

1use async_trait::async_trait;
2use camel_api::{AsyncHealthCheck, CheckResult};
3use camel_component_api::CamelError;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::ContainerConfig;
10
11type ProbeFuture = Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send>>;
12
13/// Trait for probing Docker container health.
14trait ContainerHealthProbe: Send + Sync {
15    fn probe(&self) -> ProbeFuture;
16}
17
18/// Real probe that creates a Docker client and calls `Docker::ping()`.
19struct DockerPingProbe {
20    config: ContainerConfig,
21}
22
23impl DockerPingProbe {
24    fn new(config: &ContainerConfig) -> Self {
25        Self {
26            config: config.clone(),
27        }
28    }
29}
30
31impl ContainerHealthProbe for DockerPingProbe {
32    fn probe(&self) -> ProbeFuture {
33        let config = self.config.clone();
34        Box::pin(async move {
35            let docker = config.connect_docker_client()?;
36            docker
37                .ping()
38                .await
39                .map(|_| ())
40                .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))
41        })
42    }
43}
44
45/// Async health check that probes the Docker daemon via `Docker::ping()`.
46pub struct ContainerHealthCheck {
47    probe: Arc<dyn ContainerHealthProbe>,
48    timeout: Duration,
49}
50
51impl ContainerHealthCheck {
52    /// Creates a new health check using the given endpoint config.
53    /// The Docker client is created lazily during the probe.
54    pub fn new(config: &ContainerConfig) -> Self {
55        Self {
56            probe: Arc::new(DockerPingProbe::new(config)),
57            timeout: Duration::from_secs(2),
58        }
59    }
60
61    #[cfg(test)]
62    fn with_probe_for_tests(probe: Arc<dyn ContainerHealthProbe>, timeout: Duration) -> Self {
63        Self { probe, timeout }
64    }
65}
66
67#[async_trait]
68impl AsyncHealthCheck for ContainerHealthCheck {
69    fn name(&self) -> &str {
70        "container"
71    }
72
73    async fn check(&self) -> CheckResult {
74        match tokio::time::timeout(self.timeout, self.probe.probe()).await {
75            Ok(Ok(())) => CheckResult::healthy(self.name()),
76            Ok(Err(err)) => CheckResult::unhealthy(self.name(), &err.to_string()),
77            Err(_) => CheckResult::unhealthy(self.name(), "Docker ping timed out"),
78        }
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85    use camel_api::HealthStatus;
86
87    struct MockProbe {
88        responder: Arc<dyn Fn() -> ProbeFuture + Send + Sync>,
89    }
90
91    impl MockProbe {
92        fn new<F>(f: F) -> Self
93        where
94            F: Fn() -> ProbeFuture + Send + Sync + 'static,
95        {
96            Self {
97                responder: Arc::new(f),
98            }
99        }
100    }
101
102    impl ContainerHealthProbe for MockProbe {
103        fn probe(&self) -> ProbeFuture {
104            (self.responder)()
105        }
106    }
107
108    #[tokio::test]
109    async fn container_health_check_healthy_on_ping_ok() {
110        let probe = Arc::new(MockProbe::new(|| Box::pin(async { Ok(()) })));
111        let check = ContainerHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
112
113        let result = check.check().await;
114
115        assert_eq!(result.name, "container");
116        assert_eq!(result.status, HealthStatus::Healthy);
117        assert!(result.message.is_none());
118    }
119
120    #[tokio::test]
121    async fn container_health_check_unhealthy_on_error() {
122        let probe = Arc::new(MockProbe::new(|| {
123            Box::pin(async {
124                Err(CamelError::ProcessorError(
125                    "simulated docker error".to_string(),
126                ))
127            })
128        }));
129        let check = ContainerHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
130
131        let result = check.check().await;
132
133        assert_eq!(result.name, "container");
134        assert_eq!(result.status, HealthStatus::Unhealthy);
135        assert!(
136            result
137                .message
138                .as_deref()
139                .is_some_and(|m| m.contains("simulated docker error"))
140        );
141    }
142
143    #[tokio::test]
144    async fn container_health_check_unhealthy_on_timeout() {
145        let probe = Arc::new(MockProbe::new(|| {
146            Box::pin(async {
147                tokio::time::sleep(Duration::from_millis(50)).await;
148                Ok(())
149            })
150        }));
151        let check = ContainerHealthCheck::with_probe_for_tests(probe, Duration::from_millis(5));
152
153        let result = check.check().await;
154
155        assert_eq!(result.name, "container");
156        assert_eq!(result.status, HealthStatus::Unhealthy);
157        assert_eq!(result.message.as_deref(), Some("Docker ping timed out"));
158    }
159}