Skip to main content

camel_component_sql/
health.rs

1use async_trait::async_trait;
2use camel_api::{AsyncHealthCheck, CheckResult};
3use camel_component_api::CamelError;
4use sqlx::AnyPool;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::OnceCell;
10
11type ProbeFuture = Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send>>;
12
13trait SqlHealthProbe: Send + Sync {
14    fn probe(&self) -> ProbeFuture;
15}
16
17struct SqlPoolProbe {
18    pool: Arc<OnceCell<AnyPool>>,
19}
20
21impl SqlPoolProbe {
22    fn new(pool: Arc<OnceCell<AnyPool>>) -> Self {
23        Self { pool }
24    }
25}
26
27impl SqlHealthProbe for SqlPoolProbe {
28    fn probe(&self) -> ProbeFuture {
29        let pool = Arc::clone(&self.pool);
30        Box::pin(async move {
31            let pool = pool.get().ok_or_else(|| {
32                CamelError::ProcessorError("SQL connection pool not initialized".to_string())
33            })?;
34
35            sqlx::query("SELECT 1").execute(pool).await.map_err(|e| {
36                CamelError::ProcessorError(format!("SQL health check failed: {}", e))
37            })?;
38
39            Ok(())
40        })
41    }
42}
43
44pub struct SqlHealthCheck {
45    probe: Arc<dyn SqlHealthProbe>,
46    timeout: Duration,
47}
48
49impl SqlHealthCheck {
50    pub fn new(pool: Arc<OnceCell<AnyPool>>) -> Self {
51        Self {
52            probe: Arc::new(SqlPoolProbe::new(pool)),
53            timeout: Duration::from_secs(2),
54        }
55    }
56
57    #[cfg(test)]
58    fn with_probe_for_tests(probe: Arc<dyn SqlHealthProbe>, timeout: Duration) -> Self {
59        Self { probe, timeout }
60    }
61}
62
63#[async_trait]
64impl AsyncHealthCheck for SqlHealthCheck {
65    fn name(&self) -> &str {
66        "sql"
67    }
68
69    async fn check(&self) -> CheckResult {
70        match tokio::time::timeout(self.timeout, self.probe.probe()).await {
71            Ok(Ok(())) => CheckResult::healthy(self.name()),
72            Ok(Err(err)) => CheckResult::unhealthy(self.name(), &err.to_string()),
73            Err(_) => CheckResult::unhealthy(self.name(), "SELECT 1 timed out"),
74        }
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81    use camel_api::HealthStatus;
82
83    struct MockProbe {
84        responder: Arc<dyn Fn() -> ProbeFuture + Send + Sync>,
85    }
86
87    impl MockProbe {
88        fn new<F>(f: F) -> Self
89        where
90            F: Fn() -> ProbeFuture + Send + Sync + 'static,
91        {
92            Self {
93                responder: Arc::new(f),
94            }
95        }
96    }
97
98    impl SqlHealthProbe for MockProbe {
99        fn probe(&self) -> ProbeFuture {
100            (self.responder)()
101        }
102    }
103
104    #[tokio::test]
105    async fn sql_health_check_healthy_when_probe_succeeds() {
106        let probe = Arc::new(MockProbe::new(|| Box::pin(async { Ok(()) })));
107        let check = SqlHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
108
109        let result = check.check().await;
110
111        assert_eq!(result.name, "sql");
112        assert_eq!(result.status, HealthStatus::Healthy);
113        assert!(result.message.is_none());
114    }
115
116    #[tokio::test]
117    async fn sql_health_check_unhealthy_when_probe_fails() {
118        let probe = Arc::new(MockProbe::new(|| {
119            Box::pin(async {
120                Err(CamelError::ProcessorError(
121                    "simulated sql error".to_string(),
122                ))
123            })
124        }));
125        let check = SqlHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
126
127        let result = check.check().await;
128
129        assert_eq!(result.name, "sql");
130        assert_eq!(result.status, HealthStatus::Unhealthy);
131        assert!(
132            result
133                .message
134                .as_deref()
135                .is_some_and(|m| m.contains("simulated sql error"))
136        );
137    }
138
139    #[tokio::test]
140    async fn sql_health_check_unhealthy_when_probe_times_out() {
141        let probe = Arc::new(MockProbe::new(|| {
142            Box::pin(async {
143                tokio::time::sleep(Duration::from_millis(50)).await;
144                Ok(())
145            })
146        }));
147        let check = SqlHealthCheck::with_probe_for_tests(probe, Duration::from_millis(5));
148
149        let result = check.check().await;
150
151        assert_eq!(result.name, "sql");
152        assert_eq!(result.status, HealthStatus::Unhealthy);
153        assert_eq!(result.message.as_deref(), Some("SELECT 1 timed out"));
154    }
155}