Skip to main content

camel_component_sql/
lib.rs

1pub mod bundle;
2pub mod config;
3pub mod consumer;
4pub mod endpoint;
5pub mod headers;
6pub mod health;
7pub mod producer;
8pub mod query;
9pub(crate) mod utils;
10
11use std::sync::Arc;
12
13use camel_component_api::CamelError;
14use camel_component_api::UriConfig;
15use camel_component_api::{Component, Endpoint};
16use sqlx::AnyPool;
17use tokio::sync::OnceCell;
18
19pub use bundle::SqlBundle;
20pub use config::{
21    PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlGlobalConfig, SqlOutputType,
22    TransactionMode,
23};
24pub use health::SqlHealthCheck;
25
26pub struct SqlComponent {
27    config: Option<SqlGlobalConfig>,
28}
29
30impl SqlComponent {
31    pub fn new() -> Self {
32        Self { config: None }
33    }
34
35    pub fn with_config(config: SqlGlobalConfig) -> Self {
36        Self {
37            config: Some(config),
38        }
39    }
40
41    pub fn with_optional_config(config: Option<SqlGlobalConfig>) -> Self {
42        Self { config }
43    }
44}
45
46impl Default for SqlComponent {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52impl Component for SqlComponent {
53    fn scheme(&self) -> &str {
54        "sql"
55    }
56
57    fn create_endpoint(
58        &self,
59        uri: &str,
60        ctx: &dyn camel_component_api::ComponentContext,
61    ) -> Result<Box<dyn Endpoint>, CamelError> {
62        let mut config = SqlEndpointConfig::from_uri(uri)?;
63        if let Some(ref global_config) = self.config {
64            config.apply_defaults(global_config);
65        }
66        config.resolve_defaults();
67        let pool = Arc::new(OnceCell::<AnyPool>::new());
68        let health_check = SqlHealthCheck::new(Arc::clone(&pool));
69        ctx.register_current_route_health_check(Arc::new(health_check));
70
71        Ok(Box::new(endpoint::SqlEndpoint::new_with_pool(
72            uri.to_string(),
73            config,
74            pool,
75        )))
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use camel_component_api::Component;
83    use camel_component_api::NoOpComponentContext;
84
85    #[test]
86    fn test_component_scheme() {
87        let c = SqlComponent::new();
88        assert_eq!(c.scheme(), "sql");
89    }
90
91    #[test]
92    fn test_component_creates_endpoint() {
93        let c = SqlComponent::new();
94        let ctx = NoOpComponentContext;
95        let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
96        assert!(ep.is_ok());
97    }
98
99    #[test]
100    fn test_component_rejects_wrong_scheme() {
101        let c = SqlComponent::new();
102        let ctx = NoOpComponentContext;
103        let ep = c.create_endpoint("redis://localhost", &ctx);
104        assert!(ep.is_err());
105    }
106
107    #[test]
108    fn test_endpoint_uri() {
109        let c = SqlComponent::new();
110        let ctx = NoOpComponentContext;
111        let ep = c
112            .create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx)
113            .unwrap();
114        assert_eq!(ep.uri(), "sql:select 1?db_url=postgres://localhost/test");
115    }
116
117    #[test]
118    fn test_component_with_global_config() {
119        let global = SqlGlobalConfig::default().with_max_connections(20);
120        let c = SqlComponent::with_config(global);
121        let ctx = NoOpComponentContext;
122        // Verify the component can create endpoints with global config applied
123        assert_eq!(c.scheme(), "sql");
124        let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
125        assert!(ep.is_ok());
126    }
127
128    #[test]
129    fn test_global_config_applied_to_endpoint() {
130        // Verify that when URI does NOT set pool params, global config fills them in.
131        // Tests the same logic as create_endpoint: from_uri + apply_defaults + resolve_defaults.
132        let global = SqlGlobalConfig::default()
133            .with_max_connections(20)
134            .with_min_connections(3)
135            .with_idle_timeout_secs(600)
136            .with_max_lifetime_secs(3600);
137        let mut cfg =
138            config::SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
139                .unwrap();
140        cfg.apply_defaults(&global);
141        cfg.resolve_defaults();
142        assert_eq!(cfg.max_connections, Some(20));
143        assert_eq!(cfg.min_connections, Some(3));
144        assert_eq!(cfg.idle_timeout_secs, Some(600));
145        assert_eq!(cfg.max_lifetime_secs, Some(3600));
146    }
147
148    #[test]
149    fn test_uri_param_wins_over_global_config() {
150        // Verify that URI-set pool params are NOT overridden by global config.
151        let global = SqlGlobalConfig::default()
152            .with_max_connections(20)
153            .with_min_connections(3);
154        let mut cfg = config::SqlEndpointConfig::from_uri(
155            "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=7",
156        )
157        .unwrap();
158        cfg.apply_defaults(&global);
159        cfg.resolve_defaults();
160        assert_eq!(cfg.max_connections, Some(99)); // URI wins
161        assert_eq!(cfg.min_connections, Some(7)); // URI wins
162    }
163}