1use std::str::FromStr;
2
3use camel_api::CamelError;
4use camel_endpoint::parse_uri;
5
6#[derive(Debug, Clone, PartialEq)]
8pub enum SqlOutputType {
9 SelectList,
11 SelectOne,
13 StreamList,
15}
16
17impl FromStr for SqlOutputType {
18 type Err = CamelError;
19
20 fn from_str(s: &str) -> Result<Self, Self::Err> {
21 match s {
22 "SelectList" => Ok(SqlOutputType::SelectList),
23 "SelectOne" => Ok(SqlOutputType::SelectOne),
24 "StreamList" => Ok(SqlOutputType::StreamList),
25 _ => Err(CamelError::InvalidUri(format!(
26 "Unknown output type: {}",
27 s
28 ))),
29 }
30 }
31}
32
33#[derive(Debug, Clone)]
35pub struct SqlConfig {
36 pub db_url: String,
38 pub max_connections: u32,
39 pub min_connections: u32,
40 pub idle_timeout_secs: u64,
41 pub max_lifetime_secs: u64,
42
43 pub query: String,
45 pub source_path: Option<String>,
47 pub output_type: SqlOutputType,
48 pub placeholder: char,
49 pub noop: bool,
50
51 pub delay_ms: u64,
53 pub initial_delay_ms: u64,
54 pub max_messages_per_poll: Option<i32>,
55 pub on_consume: Option<String>,
56 pub on_consume_failed: Option<String>,
57 pub on_consume_batch_complete: Option<String>,
58 pub route_empty_result_set: bool,
59 pub use_iterator: bool,
60 pub expected_update_count: Option<i64>,
61 pub break_batch_on_consume_fail: bool,
62
63 pub batch: bool,
65 pub use_message_body_for_sql: bool,
66}
67
68impl SqlConfig {
69 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
78 let components = parse_uri(uri)?;
79
80 if components.scheme != "sql" {
82 return Err(CamelError::InvalidUri(format!(
83 "Expected scheme 'sql', got '{}'",
84 components.scheme
85 )));
86 }
87
88 let (query, source_path) = if components.path.starts_with("file:") {
90 let file_path = components.path.trim_start_matches("file:").to_string();
91 let contents = std::fs::read_to_string(&file_path).map_err(|e| {
92 CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
93 })?;
94 (contents.trim().to_string(), Some(file_path))
95 } else {
96 (components.path.clone(), None)
97 };
98
99 let db_url = components
101 .params
102 .get("db_url")
103 .ok_or_else(|| CamelError::Config("db_url parameter is required".to_string()))?
104 .clone();
105
106 let get_param = |name: &str| -> Option<&String> { components.params.get(name) };
108
109 let get_u32 = |name: &str, default: u32| -> u32 {
110 get_param(name)
111 .and_then(|v| v.parse().ok())
112 .unwrap_or(default)
113 };
114
115 let get_u64 = |name: &str, default: u64| -> u64 {
116 get_param(name)
117 .and_then(|v| v.parse().ok())
118 .unwrap_or(default)
119 };
120
121 let get_i32 = |name: &str| -> Option<i32> { get_param(name).and_then(|v| v.parse().ok()) };
122
123 let get_i64 = |name: &str| -> Option<i64> { get_param(name).and_then(|v| v.parse().ok()) };
124
125 let get_bool = |name: &str, default: bool| -> bool {
126 get_param(name)
127 .map(|v| v.eq_ignore_ascii_case("true"))
128 .unwrap_or(default)
129 };
130
131 let get_char = |name: &str, default: char| -> char {
132 get_param(name)
133 .filter(|v| !v.is_empty())
134 .map(|v| v.chars().next().unwrap())
135 .unwrap_or(default)
136 };
137
138 let get_string = |name: &str| -> Option<String> { get_param(name).cloned() };
139
140 Ok(SqlConfig {
142 db_url,
144 max_connections: get_u32("maxConnections", 5),
145 min_connections: get_u32("minConnections", 1),
146 idle_timeout_secs: get_u64("idleTimeoutSecs", 300),
147 max_lifetime_secs: get_u64("maxLifetimeSecs", 1800),
148
149 query,
151 source_path,
152 output_type: get_param("outputType")
153 .map(|s| s.parse())
154 .transpose()?
155 .unwrap_or(SqlOutputType::SelectList),
156 placeholder: get_char("placeholder", '#'),
157 noop: get_bool("noop", false),
158
159 delay_ms: get_u64("delay", 500),
161 initial_delay_ms: get_u64("initialDelay", 1000),
162 max_messages_per_poll: get_i32("maxMessagesPerPoll"),
163 on_consume: get_string("onConsume"),
164 on_consume_failed: get_string("onConsumeFailed"),
165 on_consume_batch_complete: get_string("onConsumeBatchComplete"),
166 route_empty_result_set: get_bool("routeEmptyResultSet", false),
167 use_iterator: get_bool("useIterator", true),
168 expected_update_count: get_i64("expectedUpdateCount"),
169 break_batch_on_consume_fail: get_bool("breakBatchOnConsumeFail", false),
170
171 batch: get_bool("batch", false),
173 use_message_body_for_sql: get_bool("useMessageBodyForSql", false),
174 })
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181
182 #[test]
183 fn test_config_defaults() {
184 let c = SqlConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
185 assert_eq!(c.query, "select 1");
186 assert_eq!(c.db_url, "postgres://localhost/test");
187 assert_eq!(c.max_connections, 5);
188 assert_eq!(c.min_connections, 1);
189 assert_eq!(c.idle_timeout_secs, 300);
190 assert_eq!(c.max_lifetime_secs, 1800);
191 assert_eq!(c.output_type, SqlOutputType::SelectList);
192 assert_eq!(c.placeholder, '#');
193 assert!(!c.noop);
194 assert_eq!(c.delay_ms, 500);
195 assert_eq!(c.initial_delay_ms, 1000);
196 assert!(c.max_messages_per_poll.is_none());
197 assert!(c.on_consume.is_none());
198 assert!(c.on_consume_failed.is_none());
199 assert!(c.on_consume_batch_complete.is_none());
200 assert!(!c.route_empty_result_set);
201 assert!(c.use_iterator);
202 assert!(c.expected_update_count.is_none());
203 assert!(!c.break_batch_on_consume_fail);
204 assert!(!c.batch);
205 assert!(!c.use_message_body_for_sql);
206 }
207
208 #[test]
209 fn test_config_wrong_scheme() {
210 assert!(SqlConfig::from_uri("redis://localhost:6379").is_err());
211 }
212
213 #[test]
214 fn test_config_missing_db_url() {
215 assert!(SqlConfig::from_uri("sql:select 1").is_err());
216 }
217
218 #[test]
219 fn test_config_output_type_select_one() {
220 let c = SqlConfig::from_uri(
221 "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
222 )
223 .unwrap();
224 assert_eq!(c.output_type, SqlOutputType::SelectOne);
225 }
226
227 #[test]
228 fn test_config_output_type_stream_list() {
229 let c = SqlConfig::from_uri(
230 "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
231 )
232 .unwrap();
233 assert_eq!(c.output_type, SqlOutputType::StreamList);
234 }
235
236 #[test]
237 fn test_config_consumer_options() {
238 let c = SqlConfig::from_uri(
239 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
240 ).unwrap();
241 assert_eq!(c.delay_ms, 2000);
242 assert_eq!(c.initial_delay_ms, 500);
243 assert_eq!(c.max_messages_per_poll, Some(10));
244 assert_eq!(
245 c.on_consume,
246 Some("update t set done=true where id=:#id".to_string())
247 );
248 assert_eq!(
249 c.on_consume_failed,
250 Some("update t set failed=true where id=:#id".to_string())
251 );
252 assert_eq!(
253 c.on_consume_batch_complete,
254 Some("delete from t where done=true".to_string())
255 );
256 assert!(c.route_empty_result_set);
257 assert!(!c.use_iterator);
258 assert_eq!(c.expected_update_count, Some(1));
259 assert!(c.break_batch_on_consume_fail);
260 }
261
262 #[test]
263 fn test_config_producer_options() {
264 let c = SqlConfig::from_uri(
265 "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
266 ).unwrap();
267 assert!(c.batch);
268 assert!(c.use_message_body_for_sql);
269 assert!(c.noop);
270 }
271
272 #[test]
273 fn test_config_pool_options() {
274 let c = SqlConfig::from_uri(
275 "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
276 ).unwrap();
277 assert_eq!(c.max_connections, 20);
278 assert_eq!(c.min_connections, 3);
279 assert_eq!(c.idle_timeout_secs, 600);
280 assert_eq!(c.max_lifetime_secs, 3600);
281 }
282
283 #[test]
284 fn test_config_query_with_special_chars() {
285 let c = SqlConfig::from_uri(
286 "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
287 )
288 .unwrap();
289 assert_eq!(
290 c.query,
291 "select * from users where name = :#name and age > #"
292 );
293 }
294
295 #[test]
296 fn test_output_type_from_str() {
297 assert_eq!(
298 "SelectList".parse::<SqlOutputType>().unwrap(),
299 SqlOutputType::SelectList
300 );
301 assert_eq!(
302 "SelectOne".parse::<SqlOutputType>().unwrap(),
303 SqlOutputType::SelectOne
304 );
305 assert_eq!(
306 "StreamList".parse::<SqlOutputType>().unwrap(),
307 SqlOutputType::StreamList
308 );
309 assert!("Invalid".parse::<SqlOutputType>().is_err());
310 }
311
312 #[test]
313 fn test_config_file_not_found() {
314 let result = SqlConfig::from_uri(
315 "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
316 );
317 assert!(result.is_err());
318 let err = result.unwrap_err();
319 let msg = format!("{:?}", err);
320 assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
321 }
322
323 #[test]
324 fn test_config_file_query() {
325 use std::io::Write;
326 let unique_name = format!(
327 "test_sql_query_{}.sql",
328 std::time::SystemTime::now()
329 .duration_since(std::time::UNIX_EPOCH)
330 .unwrap_or_default()
331 .as_nanos()
332 );
333 let mut tmp = std::env::temp_dir();
334 tmp.push(unique_name);
335 {
336 let mut f = std::fs::File::create(&tmp).unwrap();
337 writeln!(f, "SELECT * FROM users").unwrap();
338 }
339 let uri = format!(
340 "sql:file:{}?db_url=postgres://localhost/test",
341 tmp.display()
342 );
343 let c = SqlConfig::from_uri(&uri).unwrap();
344 assert_eq!(c.query, "SELECT * FROM users");
345 assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
346 std::fs::remove_file(&tmp).ok();
347 }
348}