Skip to main content

camel_component_sql/
lib.rs

1pub mod bundle;
2pub mod config;
3pub mod consumer;
4pub mod endpoint;
5pub mod headers;
6pub mod health;
7pub mod pool_factory;
8pub mod producer;
9pub mod query;
10pub(crate) mod utils;
11
12use std::sync::Arc;
13
14use camel_api::datasource::DatasourceCatalog;
15use camel_component_api::CamelError;
16use camel_component_api::UriConfig;
17use camel_component_api::{Component, Endpoint};
18use sqlx::AnyPool;
19use tokio::sync::OnceCell;
20
21pub use bundle::SqlBundle;
22pub use config::{
23    PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlGlobalConfig, SqlOutputType,
24    TransactionMode,
25};
26pub use health::SqlHealthCheck;
27
28type SharedPool = Arc<OnceCell<Arc<AnyPool>>>;
29
30pub struct SqlComponent {
31    config: Option<SqlGlobalConfig>,
32    catalog: Option<Arc<dyn DatasourceCatalog>>,
33}
34
35impl SqlComponent {
36    pub fn new() -> Self {
37        Self {
38            config: None,
39            catalog: None,
40        }
41    }
42
43    pub fn with_config(config: SqlGlobalConfig) -> Self {
44        Self {
45            config: Some(config),
46            catalog: None,
47        }
48    }
49
50    pub fn with_optional_config(config: Option<SqlGlobalConfig>) -> Self {
51        Self {
52            config,
53            catalog: None,
54        }
55    }
56
57    pub fn with_config_and_catalog(
58        config: SqlGlobalConfig,
59        catalog: Arc<dyn DatasourceCatalog>,
60    ) -> Self {
61        Self {
62            config: Some(config),
63            catalog: Some(catalog),
64        }
65    }
66
67    pub fn catalog(&self) -> Option<&Arc<dyn DatasourceCatalog>> {
68        self.catalog.as_ref()
69    }
70}
71
72impl Default for SqlComponent {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78impl Component for SqlComponent {
79    fn scheme(&self) -> &str {
80        "sql"
81    }
82
83    fn create_endpoint(
84        &self,
85        uri: &str,
86        ctx: &dyn camel_component_api::ComponentContext,
87    ) -> Result<Box<dyn Endpoint>, CamelError> {
88        let mut config = SqlEndpointConfig::from_uri(uri)?;
89
90        if config.datasource_name.is_some() && self.catalog.is_none() {
91            return Err(CamelError::Config(
92                "datasource parameter requires catalog — no datasource catalog configured".into(),
93            ));
94        }
95
96        if let Some(ref ds_name) = config.datasource_name
97            && let Some(ref catalog) = self.catalog
98        {
99            let ds_config = catalog.get_config(ds_name).ok_or_else(|| {
100                CamelError::Config(format!("datasource '{}' not found in catalog", ds_name))
101            })?;
102            config.db_url = ds_config.db_url;
103        }
104
105        if let Some(ref global_config) = self.config {
106            config.apply_defaults(global_config);
107        }
108        config.resolve_defaults();
109        let pool: SharedPool = Arc::new(OnceCell::new());
110        let health_check = SqlHealthCheck::new(Arc::clone(&pool));
111        ctx.register_current_route_health_check(Arc::new(health_check));
112
113        if config.datasource_name.is_some()
114            && let Some(ref catalog) = self.catalog
115        {
116            Ok(Box::new(endpoint::SqlEndpoint::new_with_pool_and_catalog(
117                uri.to_string(),
118                config,
119                pool,
120                Arc::clone(catalog),
121            )))
122        } else {
123            Ok(Box::new(endpoint::SqlEndpoint::new_with_pool(
124                uri.to_string(),
125                config,
126                pool,
127            )))
128        }
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use camel_component_api::Component;
136    use camel_component_api::NoOpComponentContext;
137
138    #[test]
139    fn test_component_scheme() {
140        let c = SqlComponent::new();
141        assert_eq!(c.scheme(), "sql");
142    }
143
144    #[test]
145    fn test_component_creates_endpoint() {
146        let c = SqlComponent::new();
147        let ctx = NoOpComponentContext;
148        let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
149        assert!(ep.is_ok());
150    }
151
152    #[test]
153    fn test_component_rejects_wrong_scheme() {
154        let c = SqlComponent::new();
155        let ctx = NoOpComponentContext;
156        let ep = c.create_endpoint("redis://localhost", &ctx);
157        assert!(ep.is_err());
158    }
159
160    #[test]
161    fn test_endpoint_uri() {
162        let c = SqlComponent::new();
163        let ctx = NoOpComponentContext;
164        let ep = c
165            .create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx)
166            .unwrap();
167        assert_eq!(ep.uri(), "sql:select 1?db_url=postgres://localhost/test");
168    }
169
170    #[test]
171    fn test_component_with_global_config() {
172        let global = SqlGlobalConfig::default().with_max_connections(20);
173        let c = SqlComponent::with_config(global);
174        let ctx = NoOpComponentContext;
175        // Verify the component can create endpoints with global config applied
176        assert_eq!(c.scheme(), "sql");
177        let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
178        assert!(ep.is_ok());
179    }
180
181    #[test]
182    fn test_global_config_applied_to_endpoint() {
183        // Verify that when URI does NOT set pool params, global config fills them in.
184        // Tests the same logic as create_endpoint: from_uri + apply_defaults + resolve_defaults.
185        let global = SqlGlobalConfig::default()
186            .with_max_connections(20)
187            .with_min_connections(3)
188            .with_idle_timeout_secs(600)
189            .with_max_lifetime_secs(3600);
190        let mut cfg =
191            config::SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
192                .unwrap();
193        cfg.apply_defaults(&global);
194        cfg.resolve_defaults();
195        assert_eq!(cfg.max_connections, Some(20));
196        assert_eq!(cfg.min_connections, Some(3));
197        assert_eq!(cfg.idle_timeout_secs, Some(600));
198        assert_eq!(cfg.max_lifetime_secs, Some(3600));
199    }
200
201    #[test]
202    fn test_uri_param_wins_over_global_config() {
203        // Verify that URI-set pool params are NOT overridden by global config.
204        let global = SqlGlobalConfig::default()
205            .with_max_connections(20)
206            .with_min_connections(3);
207        let mut cfg = config::SqlEndpointConfig::from_uri(
208            "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=7",
209        )
210        .unwrap();
211        cfg.apply_defaults(&global);
212        cfg.resolve_defaults();
213        assert_eq!(cfg.max_connections, Some(99)); // URI wins
214        assert_eq!(cfg.min_connections, Some(7)); // URI wins
215    }
216}