Skip to main content

camel_component_sql/
lib.rs

1pub mod bundle;
2pub mod config;
3pub mod consumer;
4pub mod endpoint;
5pub mod headers;
6pub mod producer;
7pub mod query;
8pub(crate) mod utils;
9
10use camel_component_api::CamelError;
11use camel_component_api::UriConfig;
12use camel_component_api::{Component, Endpoint};
13
14pub use bundle::SqlBundle;
15pub use config::{SqlEndpointConfig, SqlGlobalConfig, SqlOutputType};
16
17pub struct SqlComponent {
18    config: Option<SqlGlobalConfig>,
19}
20
21impl SqlComponent {
22    pub fn new() -> Self {
23        Self { config: None }
24    }
25
26    pub fn with_config(config: SqlGlobalConfig) -> Self {
27        Self {
28            config: Some(config),
29        }
30    }
31
32    pub fn with_optional_config(config: Option<SqlGlobalConfig>) -> Self {
33        Self { config }
34    }
35}
36
37impl Default for SqlComponent {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl Component for SqlComponent {
44    fn scheme(&self) -> &str {
45        "sql"
46    }
47
48    fn create_endpoint(
49        &self,
50        uri: &str,
51        _ctx: &dyn camel_component_api::ComponentContext,
52    ) -> Result<Box<dyn Endpoint>, CamelError> {
53        let mut config = SqlEndpointConfig::from_uri(uri)?;
54        if let Some(ref global_config) = self.config {
55            config.apply_defaults(global_config);
56        }
57        config.resolve_defaults();
58        Ok(Box::new(endpoint::SqlEndpoint::new(
59            uri.to_string(),
60            config,
61        )))
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68    use camel_component_api::Component;
69    use camel_component_api::NoOpComponentContext;
70
71    #[test]
72    fn test_component_scheme() {
73        let c = SqlComponent::new();
74        assert_eq!(c.scheme(), "sql");
75    }
76
77    #[test]
78    fn test_component_creates_endpoint() {
79        let c = SqlComponent::new();
80        let ctx = NoOpComponentContext;
81        let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
82        assert!(ep.is_ok());
83    }
84
85    #[test]
86    fn test_component_rejects_wrong_scheme() {
87        let c = SqlComponent::new();
88        let ctx = NoOpComponentContext;
89        let ep = c.create_endpoint("redis://localhost", &ctx);
90        assert!(ep.is_err());
91    }
92
93    #[test]
94    fn test_endpoint_uri() {
95        let c = SqlComponent::new();
96        let ctx = NoOpComponentContext;
97        let ep = c
98            .create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx)
99            .unwrap();
100        assert_eq!(ep.uri(), "sql:select 1?db_url=postgres://localhost/test");
101    }
102
103    #[test]
104    fn test_component_with_global_config() {
105        let global = SqlGlobalConfig::default().with_max_connections(20);
106        let c = SqlComponent::with_config(global);
107        let ctx = NoOpComponentContext;
108        // Verify the component can create endpoints with global config applied
109        assert_eq!(c.scheme(), "sql");
110        let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
111        assert!(ep.is_ok());
112    }
113
114    #[test]
115    fn test_global_config_applied_to_endpoint() {
116        // Verify that when URI does NOT set pool params, global config fills them in.
117        // Tests the same logic as create_endpoint: from_uri + apply_defaults + resolve_defaults.
118        let global = SqlGlobalConfig::default()
119            .with_max_connections(20)
120            .with_min_connections(3)
121            .with_idle_timeout_secs(600)
122            .with_max_lifetime_secs(3600);
123        let mut cfg =
124            config::SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
125                .unwrap();
126        cfg.apply_defaults(&global);
127        cfg.resolve_defaults();
128        assert_eq!(cfg.max_connections, Some(20));
129        assert_eq!(cfg.min_connections, Some(3));
130        assert_eq!(cfg.idle_timeout_secs, Some(600));
131        assert_eq!(cfg.max_lifetime_secs, Some(3600));
132    }
133
134    #[test]
135    fn test_uri_param_wins_over_global_config() {
136        // Verify that URI-set pool params are NOT overridden by global config.
137        let global = SqlGlobalConfig::default()
138            .with_max_connections(20)
139            .with_min_connections(3);
140        let mut cfg = config::SqlEndpointConfig::from_uri(
141            "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=7",
142        )
143        .unwrap();
144        cfg.apply_defaults(&global);
145        cfg.resolve_defaults();
146        assert_eq!(cfg.max_connections, Some(99)); // URI wins
147        assert_eq!(cfg.min_connections, Some(7)); // URI wins
148    }
149}