Skip to main content

camel_component_sql/
pool_factory.rs

1use std::any::Any;
2use std::sync::Arc;
3use std::time::Duration;
4
5use camel_api::datasource::{CheckFuture, CreatePoolFuture};
6use camel_api::datasource::{DatasourceConfig, DatasourceHandle, PoolFactory};
7use camel_api::error::CamelError;
8use camel_api::lifecycle::HealthStatus;
9use sqlx::AnyPool;
10use sqlx::any::AnyPoolOptions;
11
12use crate::config::{enrich_db_url_with_ssl_params, redact_db_url};
13
14pub struct SqlPoolFactory;
15
16impl PoolFactory for SqlPoolFactory {
17    fn create<'a>(&'a self, config: &'a DatasourceConfig) -> CreatePoolFuture<'a> {
18        Box::pin(async move {
19            // Install all compiled-in sqlx drivers so AnyPool can resolve them.
20            // This is idempotent; safe to call multiple times.
21            sqlx::any::install_default_drivers();
22
23            let max_conn = config.max_connections.unwrap_or(5);
24            let min_conn = config.min_connections.unwrap_or(1);
25            let idle_timeout = Duration::from_secs(config.idle_timeout_secs.unwrap_or(300));
26            let max_lifetime = Duration::from_secs(config.max_lifetime_secs.unwrap_or(1800));
27
28            let db_url = enrich_db_url_with_ssl_params(
29                &config.db_url,
30                config.ssl_mode.as_deref(),
31                config.ssl_root_cert.as_deref(),
32                config.ssl_cert.as_deref(),
33                config.ssl_key.as_deref(),
34            )?;
35
36            let pool = AnyPoolOptions::new()
37                .max_connections(max_conn)
38                .min_connections(min_conn)
39                .idle_timeout(idle_timeout)
40                .max_lifetime(max_lifetime)
41                .connect(&db_url)
42                .await
43                .map_err(|e| {
44                    CamelError::ProcessorError(format!(
45                        "failed to create datasource pool ({}): {}",
46                        redact_db_url(&config.db_url),
47                        e
48                    ))
49                })?;
50
51            tracing::info!("datasource pool created: max_connections={}", max_conn);
52            Ok(Arc::new(pool) as Arc<dyn Any + Send + Sync>)
53        })
54    }
55
56    fn check<'a>(&'a self, handle: &'a DatasourceHandle) -> CheckFuture<'a> {
57        Box::pin(async move {
58            match handle.downcast::<AnyPool>() {
59                Ok(pool) => match sqlx::query("SELECT 1").execute(&*pool).await {
60                    Ok(_) => HealthStatus::Healthy,
61                    Err(e) => {
62                        // log-policy: outside-contract
63                        tracing::warn!("datasource '{}' health check failed: {}", handle.name, e);
64                        HealthStatus::Unhealthy
65                    }
66                },
67                Err(_) => HealthStatus::Unhealthy,
68            }
69        })
70    }
71
72    fn supported_schemes(&self) -> &[&str] {
73        &["postgres", "postgresql", "mysql", "sqlite"]
74    }
75
76    fn name(&self) -> &'static str {
77        "sqlx"
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84
85    #[test]
86    fn sql_pool_factory_name() {
87        let f = SqlPoolFactory;
88        assert_eq!(f.name(), "sqlx");
89    }
90
91    #[test]
92    fn sql_pool_factory_supported_schemes() {
93        let f = SqlPoolFactory;
94        assert!(f.supported_schemes().contains(&"postgres"));
95        assert!(f.supported_schemes().contains(&"mysql"));
96        assert!(f.supported_schemes().contains(&"sqlite"));
97    }
98
99    #[test]
100    fn sql_pool_factory_matches_postgres() {
101        let f = SqlPoolFactory;
102        let cfg = DatasourceConfig {
103            db_url: "postgres://localhost/test".into(),
104            provider: None,
105            max_connections: None,
106            min_connections: None,
107            idle_timeout_secs: None,
108            max_lifetime_secs: None,
109            ssl_mode: None,
110            ssl_root_cert: None,
111            ssl_cert: None,
112            ssl_key: None,
113        };
114        assert!(f.matches(&cfg));
115    }
116}