camel_component_sql/
lib.rs1pub mod bundle;
2pub mod config;
3pub mod consumer;
4pub mod endpoint;
5pub mod headers;
6pub mod health;
7pub mod producer;
8pub mod query;
9pub(crate) mod utils;
10
11use std::sync::Arc;
12
13use camel_component_api::CamelError;
14use camel_component_api::UriConfig;
15use camel_component_api::{Component, Endpoint};
16use sqlx::AnyPool;
17use tokio::sync::OnceCell;
18
19pub use bundle::SqlBundle;
20pub use config::{
21 PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlGlobalConfig, SqlOutputType,
22 TransactionMode,
23};
24pub use health::SqlHealthCheck;
25
26pub struct SqlComponent {
27 config: Option<SqlGlobalConfig>,
28}
29
30impl SqlComponent {
31 pub fn new() -> Self {
32 Self { config: None }
33 }
34
35 pub fn with_config(config: SqlGlobalConfig) -> Self {
36 Self {
37 config: Some(config),
38 }
39 }
40
41 pub fn with_optional_config(config: Option<SqlGlobalConfig>) -> Self {
42 Self { config }
43 }
44}
45
46impl Default for SqlComponent {
47 fn default() -> Self {
48 Self::new()
49 }
50}
51
52impl Component for SqlComponent {
53 fn scheme(&self) -> &str {
54 "sql"
55 }
56
57 fn create_endpoint(
58 &self,
59 uri: &str,
60 ctx: &dyn camel_component_api::ComponentContext,
61 ) -> Result<Box<dyn Endpoint>, CamelError> {
62 let mut config = SqlEndpointConfig::from_uri(uri)?;
63 if let Some(ref global_config) = self.config {
64 config.apply_defaults(global_config);
65 }
66 config.resolve_defaults();
67 let pool = Arc::new(OnceCell::<AnyPool>::new());
68 let health_check = SqlHealthCheck::new(Arc::clone(&pool));
69 ctx.register_current_route_health_check(Arc::new(health_check));
70
71 Ok(Box::new(endpoint::SqlEndpoint::new_with_pool(
72 uri.to_string(),
73 config,
74 pool,
75 )))
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82 use camel_component_api::Component;
83 use camel_component_api::NoOpComponentContext;
84
85 #[test]
86 fn test_component_scheme() {
87 let c = SqlComponent::new();
88 assert_eq!(c.scheme(), "sql");
89 }
90
91 #[test]
92 fn test_component_creates_endpoint() {
93 let c = SqlComponent::new();
94 let ctx = NoOpComponentContext;
95 let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
96 assert!(ep.is_ok());
97 }
98
99 #[test]
100 fn test_component_rejects_wrong_scheme() {
101 let c = SqlComponent::new();
102 let ctx = NoOpComponentContext;
103 let ep = c.create_endpoint("redis://localhost", &ctx);
104 assert!(ep.is_err());
105 }
106
107 #[test]
108 fn test_endpoint_uri() {
109 let c = SqlComponent::new();
110 let ctx = NoOpComponentContext;
111 let ep = c
112 .create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx)
113 .unwrap();
114 assert_eq!(ep.uri(), "sql:select 1?db_url=postgres://localhost/test");
115 }
116
117 #[test]
118 fn test_component_with_global_config() {
119 let global = SqlGlobalConfig::default().with_max_connections(20);
120 let c = SqlComponent::with_config(global);
121 let ctx = NoOpComponentContext;
122 assert_eq!(c.scheme(), "sql");
124 let ep = c.create_endpoint("sql:select 1?db_url=postgres://localhost/test", &ctx);
125 assert!(ep.is_ok());
126 }
127
128 #[test]
129 fn test_global_config_applied_to_endpoint() {
130 let global = SqlGlobalConfig::default()
133 .with_max_connections(20)
134 .with_min_connections(3)
135 .with_idle_timeout_secs(600)
136 .with_max_lifetime_secs(3600);
137 let mut cfg =
138 config::SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
139 .unwrap();
140 cfg.apply_defaults(&global);
141 cfg.resolve_defaults();
142 assert_eq!(cfg.max_connections, Some(20));
143 assert_eq!(cfg.min_connections, Some(3));
144 assert_eq!(cfg.idle_timeout_secs, Some(600));
145 assert_eq!(cfg.max_lifetime_secs, Some(3600));
146 }
147
148 #[test]
149 fn test_uri_param_wins_over_global_config() {
150 let global = SqlGlobalConfig::default()
152 .with_max_connections(20)
153 .with_min_connections(3);
154 let mut cfg = config::SqlEndpointConfig::from_uri(
155 "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=7",
156 )
157 .unwrap();
158 cfg.apply_defaults(&global);
159 cfg.resolve_defaults();
160 assert_eq!(cfg.max_connections, Some(99)); assert_eq!(cfg.min_connections, Some(7)); }
163}