Skip to main content

camel_component_sql/
lib.rs

1pub mod bundle;
2pub mod config;
3pub mod consumer;
4pub mod endpoint;
5pub mod headers;
6pub mod producer;
7pub mod query;
8pub(crate) mod utils;
9
10use camel_component_api::CamelError;
11use camel_component_api::UriConfig;
12use camel_component_api::{Component, Endpoint};
13
14pub use bundle::SqlBundle;
15pub use config::{
16    PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlGlobalConfig, SqlOutputType,
17    TransactionMode,
18};
19
20pub struct SqlComponent {
21    config: Option<SqlGlobalConfig>,
22}
23
24impl SqlComponent {
25    pub fn new() -> Self {
26        Self { config: None }
27    }
28
29    pub fn with_config(config: SqlGlobalConfig) -> Self {
30        Self {
31            config: Some(config),
32        }
33    }
34
35    pub fn with_optional_config(config: Option<SqlGlobalConfig>) -> Self {
36        Self { config }
37    }
38}
39
40impl Default for SqlComponent {
41    fn default() -> Self {
42        Self::new()
43    }
44}
45
46impl Component for SqlComponent {
47    fn scheme(&self) -> &str {
48        "sql"
49    }
50
51    fn create_endpoint(
52        &self,
53        uri: &str,
54        _ctx: &dyn camel_component_api::ComponentContext,
55    ) -> Result<Box<dyn Endpoint>, CamelError> {
56        let mut config = SqlEndpointConfig::from_uri(uri)?;
57        if let Some(ref global_config) = self.config {
58            config.apply_defaults(global_config);
59        }
60        config.resolve_defaults();
61        Ok(Box::new(endpoint::SqlEndpoint::new(
62            uri.to_string(),
63            config,
64        )))
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::*;
71    use camel_component_api::Component;
72    use camel_component_api::NoOpComponentContext;
73
74    #[test]
75    fn test_component_scheme() {
76        let c = SqlComponent::new();
77        assert_eq!(c.scheme(), "sql");
78    }
79
80    #[test]
81    fn test_component_creates_endpoint() {
82        let c = SqlComponent::new();
83        let ctx = NoOpComponentContext;
84        let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
85        assert!(ep.is_ok());
86    }
87
88    #[test]
89    fn test_component_rejects_wrong_scheme() {
90        let c = SqlComponent::new();
91        let ctx = NoOpComponentContext;
92        let ep = c.create_endpoint("redis://localhost", &ctx);
93        assert!(ep.is_err());
94    }
95
96    #[test]
97    fn test_endpoint_uri() {
98        let c = SqlComponent::new();
99        let ctx = NoOpComponentContext;
100        let ep = c
101            .create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx)
102            .unwrap();
103        assert_eq!(ep.uri(), "sql:select 1?db_url=postgres://localhost/test");
104    }
105
106    #[test]
107    fn test_component_with_global_config() {
108        let global = SqlGlobalConfig::default().with_max_connections(20);
109        let c = SqlComponent::with_config(global);
110        let ctx = NoOpComponentContext;
111        // Verify the component can create endpoints with global config applied
112        assert_eq!(c.scheme(), "sql");
113        let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
114        assert!(ep.is_ok());
115    }
116
117    #[test]
118    fn test_global_config_applied_to_endpoint() {
119        // Verify that when URI does NOT set pool params, global config fills them in.
120        // Tests the same logic as create_endpoint: from_uri + apply_defaults + resolve_defaults.
121        let global = SqlGlobalConfig::default()
122            .with_max_connections(20)
123            .with_min_connections(3)
124            .with_idle_timeout_secs(600)
125            .with_max_lifetime_secs(3600);
126        let mut cfg =
127            config::SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
128                .unwrap();
129        cfg.apply_defaults(&global);
130        cfg.resolve_defaults();
131        assert_eq!(cfg.max_connections, Some(20));
132        assert_eq!(cfg.min_connections, Some(3));
133        assert_eq!(cfg.idle_timeout_secs, Some(600));
134        assert_eq!(cfg.max_lifetime_secs, Some(3600));
135    }
136
137    #[test]
138    fn test_uri_param_wins_over_global_config() {
139        // Verify that URI-set pool params are NOT overridden by global config.
140        let global = SqlGlobalConfig::default()
141            .with_max_connections(20)
142            .with_min_connections(3);
143        let mut cfg = config::SqlEndpointConfig::from_uri(
144            "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=7",
145        )
146        .unwrap();
147        cfg.apply_defaults(&global);
148        cfg.resolve_defaults();
149        assert_eq!(cfg.max_connections, Some(99)); // URI wins
150        assert_eq!(cfg.min_connections, Some(7)); // URI wins
151    }
152}