camel_component_sql/
lib.rs1pub mod bundle;
2pub mod config;
3pub mod consumer;
4pub mod endpoint;
5pub mod headers;
6pub mod health;
7pub mod pool_factory;
8pub mod producer;
9pub mod query;
10pub(crate) mod utils;
11
12use std::sync::Arc;
13
14use camel_api::datasource::DatasourceCatalog;
15use camel_component_api::CamelError;
16use camel_component_api::UriConfig;
17use camel_component_api::{Component, Endpoint};
18use sqlx::AnyPool;
19use tokio::sync::OnceCell;
20
21pub use bundle::SqlBundle;
22pub use config::{
23 PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlGlobalConfig, SqlOutputType,
24 TransactionMode,
25};
26pub use health::SqlHealthCheck;
27
28type SharedPool = Arc<OnceCell<Arc<AnyPool>>>;
29
30pub struct SqlComponent {
31 config: Option<SqlGlobalConfig>,
32 catalog: Option<Arc<dyn DatasourceCatalog>>,
33}
34
35impl SqlComponent {
36 pub fn new() -> Self {
37 Self {
38 config: None,
39 catalog: None,
40 }
41 }
42
43 pub fn with_config(config: SqlGlobalConfig) -> Self {
44 Self {
45 config: Some(config),
46 catalog: None,
47 }
48 }
49
50 pub fn with_optional_config(config: Option<SqlGlobalConfig>) -> Self {
51 Self {
52 config,
53 catalog: None,
54 }
55 }
56
57 pub fn with_config_and_catalog(
58 config: SqlGlobalConfig,
59 catalog: Arc<dyn DatasourceCatalog>,
60 ) -> Self {
61 Self {
62 config: Some(config),
63 catalog: Some(catalog),
64 }
65 }
66
67 pub fn catalog(&self) -> Option<&Arc<dyn DatasourceCatalog>> {
68 self.catalog.as_ref()
69 }
70}
71
72impl Default for SqlComponent {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78impl Component for SqlComponent {
79 fn scheme(&self) -> &str {
80 "sql"
81 }
82
83 fn create_endpoint(
84 &self,
85 uri: &str,
86 ctx: &dyn camel_component_api::ComponentContext,
87 ) -> Result<Box<dyn Endpoint>, CamelError> {
88 let mut config = SqlEndpointConfig::from_uri(uri)?;
89
90 if config.datasource_name.is_some() && self.catalog.is_none() {
91 return Err(CamelError::Config(
92 "datasource parameter requires catalog — no datasource catalog configured".into(),
93 ));
94 }
95
96 if let Some(ref ds_name) = config.datasource_name
97 && let Some(ref catalog) = self.catalog
98 {
99 let ds_config = catalog.get_config(ds_name).ok_or_else(|| {
100 CamelError::Config(format!("datasource '{}' not found in catalog", ds_name))
101 })?;
102 config.db_url = ds_config.db_url;
103 }
104
105 if let Some(ref global_config) = self.config {
106 config.apply_defaults(global_config);
107 }
108 config.resolve_defaults();
109 let pool: SharedPool = Arc::new(OnceCell::new());
110 let health_check = SqlHealthCheck::new(Arc::clone(&pool));
111 ctx.register_current_route_health_check(Arc::new(health_check));
112
113 if config.datasource_name.is_some()
114 && let Some(ref catalog) = self.catalog
115 {
116 Ok(Box::new(endpoint::SqlEndpoint::new_with_pool_and_catalog(
117 uri.to_string(),
118 config,
119 pool,
120 Arc::clone(catalog),
121 )))
122 } else {
123 Ok(Box::new(endpoint::SqlEndpoint::new_with_pool(
124 uri.to_string(),
125 config,
126 pool,
127 )))
128 }
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use camel_component_api::Component;
136 use camel_component_api::NoOpComponentContext;
137
138 #[test]
139 fn test_component_scheme() {
140 let c = SqlComponent::new();
141 assert_eq!(c.scheme(), "sql");
142 }
143
144 #[test]
145 fn test_component_creates_endpoint() {
146 let c = SqlComponent::new();
147 let ctx = NoOpComponentContext;
148 let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
149 assert!(ep.is_ok());
150 }
151
152 #[test]
153 fn test_component_rejects_wrong_scheme() {
154 let c = SqlComponent::new();
155 let ctx = NoOpComponentContext;
156 let ep = c.create_endpoint("redis://localhost", &ctx);
157 assert!(ep.is_err());
158 }
159
160 #[test]
161 fn test_endpoint_uri() {
162 let c = SqlComponent::new();
163 let ctx = NoOpComponentContext;
164 let ep = c
165 .create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx)
166 .unwrap();
167 assert_eq!(ep.uri(), "sql:select 1?db_url=postgres://localhost/test");
168 }
169
170 #[test]
171 fn test_component_with_global_config() {
172 let global = SqlGlobalConfig::default().with_max_connections(20);
173 let c = SqlComponent::with_config(global);
174 let ctx = NoOpComponentContext;
175 assert_eq!(c.scheme(), "sql");
177 let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
178 assert!(ep.is_ok());
179 }
180
181 #[test]
182 fn test_global_config_applied_to_endpoint() {
183 let global = SqlGlobalConfig::default()
186 .with_max_connections(20)
187 .with_min_connections(3)
188 .with_idle_timeout_secs(600)
189 .with_max_lifetime_secs(3600);
190 let mut cfg =
191 config::SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
192 .unwrap();
193 cfg.apply_defaults(&global);
194 cfg.resolve_defaults();
195 assert_eq!(cfg.max_connections, Some(20));
196 assert_eq!(cfg.min_connections, Some(3));
197 assert_eq!(cfg.idle_timeout_secs, Some(600));
198 assert_eq!(cfg.max_lifetime_secs, Some(3600));
199 }
200
201 #[test]
202 fn test_uri_param_wins_over_global_config() {
203 let global = SqlGlobalConfig::default()
205 .with_max_connections(20)
206 .with_min_connections(3);
207 let mut cfg = config::SqlEndpointConfig::from_uri(
208 "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=7",
209 )
210 .unwrap();
211 cfg.apply_defaults(&global);
212 cfg.resolve_defaults();
213 assert_eq!(cfg.max_connections, Some(99)); assert_eq!(cfg.min_connections, Some(7)); }
216}