camel_component_sql/
health.rs1use async_trait::async_trait;
2use camel_api::{AsyncHealthCheck, CheckResult};
3use camel_component_api::CamelError;
4use sqlx::AnyPool;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::OnceCell;
10
11type ProbeFuture = Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send>>;
12
13trait SqlHealthProbe: Send + Sync {
14 fn probe(&self) -> ProbeFuture;
15}
16
17struct SqlPoolProbe {
18 pool: Arc<OnceCell<AnyPool>>,
19}
20
21impl SqlPoolProbe {
22 fn new(pool: Arc<OnceCell<AnyPool>>) -> Self {
23 Self { pool }
24 }
25}
26
27impl SqlHealthProbe for SqlPoolProbe {
28 fn probe(&self) -> ProbeFuture {
29 let pool = Arc::clone(&self.pool);
30 Box::pin(async move {
31 let pool = pool.get().ok_or_else(|| {
32 CamelError::ProcessorError("SQL connection pool not initialized".to_string())
33 })?;
34
35 sqlx::query("SELECT 1").execute(pool).await.map_err(|e| {
36 CamelError::ProcessorError(format!("SQL health check failed: {}", e))
37 })?;
38
39 Ok(())
40 })
41 }
42}
43
44pub struct SqlHealthCheck {
45 probe: Arc<dyn SqlHealthProbe>,
46 timeout: Duration,
47}
48
49impl SqlHealthCheck {
50 pub fn new(pool: Arc<OnceCell<AnyPool>>) -> Self {
51 Self {
52 probe: Arc::new(SqlPoolProbe::new(pool)),
53 timeout: Duration::from_secs(2),
54 }
55 }
56
57 #[cfg(test)]
58 fn with_probe_for_tests(probe: Arc<dyn SqlHealthProbe>, timeout: Duration) -> Self {
59 Self { probe, timeout }
60 }
61}
62
63#[async_trait]
64impl AsyncHealthCheck for SqlHealthCheck {
65 fn name(&self) -> &str {
66 "sql"
67 }
68
69 async fn check(&self) -> CheckResult {
70 match tokio::time::timeout(self.timeout, self.probe.probe()).await {
71 Ok(Ok(())) => CheckResult::healthy(self.name()),
72 Ok(Err(err)) => CheckResult::unhealthy(self.name(), &err.to_string()),
73 Err(_) => CheckResult::unhealthy(self.name(), "SELECT 1 timed out"),
74 }
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use super::*;
81 use camel_api::HealthStatus;
82
83 struct MockProbe {
84 responder: Arc<dyn Fn() -> ProbeFuture + Send + Sync>,
85 }
86
87 impl MockProbe {
88 fn new<F>(f: F) -> Self
89 where
90 F: Fn() -> ProbeFuture + Send + Sync + 'static,
91 {
92 Self {
93 responder: Arc::new(f),
94 }
95 }
96 }
97
98 impl SqlHealthProbe for MockProbe {
99 fn probe(&self) -> ProbeFuture {
100 (self.responder)()
101 }
102 }
103
104 #[tokio::test]
105 async fn sql_health_check_healthy_when_probe_succeeds() {
106 let probe = Arc::new(MockProbe::new(|| Box::pin(async { Ok(()) })));
107 let check = SqlHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
108
109 let result = check.check().await;
110
111 assert_eq!(result.name, "sql");
112 assert_eq!(result.status, HealthStatus::Healthy);
113 assert!(result.message.is_none());
114 }
115
116 #[tokio::test]
117 async fn sql_health_check_unhealthy_when_probe_fails() {
118 let probe = Arc::new(MockProbe::new(|| {
119 Box::pin(async {
120 Err(CamelError::ProcessorError(
121 "simulated sql error".to_string(),
122 ))
123 })
124 }));
125 let check = SqlHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
126
127 let result = check.check().await;
128
129 assert_eq!(result.name, "sql");
130 assert_eq!(result.status, HealthStatus::Unhealthy);
131 assert!(
132 result
133 .message
134 .as_deref()
135 .is_some_and(|m| m.contains("simulated sql error"))
136 );
137 }
138
139 #[tokio::test]
140 async fn sql_health_check_unhealthy_when_probe_times_out() {
141 let probe = Arc::new(MockProbe::new(|| {
142 Box::pin(async {
143 tokio::time::sleep(Duration::from_millis(50)).await;
144 Ok(())
145 })
146 }));
147 let check = SqlHealthCheck::with_probe_for_tests(probe, Duration::from_millis(5));
148
149 let result = check.check().await;
150
151 assert_eq!(result.name, "sql");
152 assert_eq!(result.status, HealthStatus::Unhealthy);
153 assert_eq!(result.message.as_deref(), Some("SELECT 1 timed out"));
154 }
155}