camel_component_sql/
pool_factory.rs1use std::any::Any;
2use std::sync::Arc;
3use std::time::Duration;
4
5use camel_api::datasource::{CheckFuture, CreatePoolFuture};
6use camel_api::datasource::{DatasourceConfig, DatasourceHandle, PoolFactory};
7use camel_api::error::CamelError;
8use camel_api::lifecycle::HealthStatus;
9use sqlx::AnyPool;
10use sqlx::any::AnyPoolOptions;
11
12use crate::config::{enrich_db_url_with_ssl_params, redact_db_url};
13
14pub struct SqlPoolFactory;
15
16impl PoolFactory for SqlPoolFactory {
17 fn create<'a>(&'a self, config: &'a DatasourceConfig) -> CreatePoolFuture<'a> {
18 Box::pin(async move {
19 sqlx::any::install_default_drivers();
22
23 let max_conn = config.max_connections.unwrap_or(5);
24 let min_conn = config.min_connections.unwrap_or(1);
25 let idle_timeout = Duration::from_secs(config.idle_timeout_secs.unwrap_or(300));
26 let max_lifetime = Duration::from_secs(config.max_lifetime_secs.unwrap_or(1800));
27
28 let db_url = enrich_db_url_with_ssl_params(
29 &config.db_url,
30 config.ssl_mode.as_deref(),
31 config.ssl_root_cert.as_deref(),
32 config.ssl_cert.as_deref(),
33 config.ssl_key.as_deref(),
34 )?;
35
36 let pool = AnyPoolOptions::new()
37 .max_connections(max_conn)
38 .min_connections(min_conn)
39 .idle_timeout(idle_timeout)
40 .max_lifetime(max_lifetime)
41 .connect(&db_url)
42 .await
43 .map_err(|e| {
44 CamelError::ProcessorError(format!(
45 "failed to create datasource pool ({}): {}",
46 redact_db_url(&config.db_url),
47 e
48 ))
49 })?;
50
51 tracing::info!("datasource pool created: max_connections={}", max_conn);
52 Ok(Arc::new(pool) as Arc<dyn Any + Send + Sync>)
53 })
54 }
55
56 fn check<'a>(&'a self, handle: &'a DatasourceHandle) -> CheckFuture<'a> {
57 Box::pin(async move {
58 match handle.downcast::<AnyPool>() {
59 Ok(pool) => match sqlx::query("SELECT 1").execute(&*pool).await {
60 Ok(_) => HealthStatus::Healthy,
61 Err(e) => {
62 tracing::warn!("datasource '{}' health check failed: {}", handle.name, e);
64 HealthStatus::Unhealthy
65 }
66 },
67 Err(_) => HealthStatus::Unhealthy,
68 }
69 })
70 }
71
72 fn supported_schemes(&self) -> &[&str] {
73 &["postgres", "postgresql", "mysql", "sqlite"]
74 }
75
76 fn name(&self) -> &'static str {
77 "sqlx"
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84
85 #[test]
86 fn sql_pool_factory_name() {
87 let f = SqlPoolFactory;
88 assert_eq!(f.name(), "sqlx");
89 }
90
91 #[test]
92 fn sql_pool_factory_supported_schemes() {
93 let f = SqlPoolFactory;
94 assert!(f.supported_schemes().contains(&"postgres"));
95 assert!(f.supported_schemes().contains(&"mysql"));
96 assert!(f.supported_schemes().contains(&"sqlite"));
97 }
98
99 #[test]
100 fn sql_pool_factory_matches_postgres() {
101 let f = SqlPoolFactory;
102 let cfg = DatasourceConfig {
103 db_url: "postgres://localhost/test".into(),
104 provider: None,
105 max_connections: None,
106 min_connections: None,
107 idle_timeout_secs: None,
108 max_lifetime_secs: None,
109 ssl_mode: None,
110 ssl_root_cert: None,
111 ssl_cert: None,
112 ssl_key: None,
113 };
114 assert!(f.matches(&cfg));
115 }
116}