1use std::str::FromStr;
2
3use camel_api::CamelError;
4use camel_endpoint::{UriComponents, UriConfig, parse_uri};
5
6#[derive(Debug, Clone, PartialEq, Default)]
8pub enum SqlOutputType {
9 #[default]
11 SelectList,
12 SelectOne,
14 StreamList,
16}
17
18impl FromStr for SqlOutputType {
19 type Err = CamelError;
20
21 fn from_str(s: &str) -> Result<Self, Self::Err> {
22 match s {
23 "SelectList" => Ok(SqlOutputType::SelectList),
24 "SelectOne" => Ok(SqlOutputType::SelectOne),
25 "StreamList" => Ok(SqlOutputType::StreamList),
26 _ => Err(CamelError::InvalidUri(format!(
27 "Unknown output type: {}",
28 s
29 ))),
30 }
31 }
32}
33
34#[derive(Debug, Clone, PartialEq)]
39pub struct SqlGlobalConfig {
40 pub max_connections: u32,
41 pub min_connections: u32,
42 pub idle_timeout_secs: u64,
43 pub max_lifetime_secs: u64,
44}
45
46impl Default for SqlGlobalConfig {
47 fn default() -> Self {
48 Self {
49 max_connections: 5,
50 min_connections: 1,
51 idle_timeout_secs: 300,
52 max_lifetime_secs: 1800,
53 }
54 }
55}
56
57impl SqlGlobalConfig {
58 pub fn new() -> Self {
59 Self::default()
60 }
61
62 pub fn with_max_connections(mut self, value: u32) -> Self {
63 self.max_connections = value;
64 self
65 }
66
67 pub fn with_min_connections(mut self, value: u32) -> Self {
68 self.min_connections = value;
69 self
70 }
71
72 pub fn with_idle_timeout_secs(mut self, value: u64) -> Self {
73 self.idle_timeout_secs = value;
74 self
75 }
76
77 pub fn with_max_lifetime_secs(mut self, value: u64) -> Self {
78 self.max_lifetime_secs = value;
79 self
80 }
81}
82
83#[derive(Debug, Clone)]
91pub struct SqlEndpointConfig {
92 pub db_url: String,
95 pub max_connections: Option<u32>,
97 pub min_connections: Option<u32>,
99 pub idle_timeout_secs: Option<u64>,
101 pub max_lifetime_secs: Option<u64>,
103
104 pub query: String,
107 pub source_path: Option<String>,
109 pub output_type: SqlOutputType,
111 pub placeholder: char,
113 pub noop: bool,
115
116 pub delay_ms: u64,
119 pub initial_delay_ms: u64,
121 pub max_messages_per_poll: Option<i32>,
123 pub on_consume: Option<String>,
125 pub on_consume_failed: Option<String>,
127 pub on_consume_batch_complete: Option<String>,
129 pub route_empty_result_set: bool,
131 pub use_iterator: bool,
133 pub expected_update_count: Option<i64>,
135 pub break_batch_on_consume_fail: bool,
137
138 pub batch: bool,
141 pub use_message_body_for_sql: bool,
143}
144
145impl SqlEndpointConfig {
146 pub fn apply_defaults(&mut self, defaults: &SqlGlobalConfig) {
148 if self.max_connections.is_none() {
149 self.max_connections = Some(defaults.max_connections);
150 }
151 if self.min_connections.is_none() {
152 self.min_connections = Some(defaults.min_connections);
153 }
154 if self.idle_timeout_secs.is_none() {
155 self.idle_timeout_secs = Some(defaults.idle_timeout_secs);
156 }
157 if self.max_lifetime_secs.is_none() {
158 self.max_lifetime_secs = Some(defaults.max_lifetime_secs);
159 }
160 }
161
162 pub fn resolve_defaults(&mut self) {
164 let defaults = SqlGlobalConfig::default();
165 self.apply_defaults(&defaults);
166 }
167}
168
169impl UriConfig for SqlEndpointConfig {
170 fn scheme() -> &'static str {
171 "sql"
172 }
173
174 fn from_uri(uri: &str) -> Result<Self, CamelError> {
175 let parts = parse_uri(uri)?;
176 Self::from_components(parts)
177 }
178
179 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
180 if parts.scheme != Self::scheme() {
182 return Err(CamelError::InvalidUri(format!(
183 "expected scheme '{}' but got '{}'",
184 Self::scheme(),
185 parts.scheme
186 )));
187 }
188
189 let params = &parts.params;
190
191 let (query, source_path) = if parts.path.starts_with("file:") {
193 let file_path = parts.path.trim_start_matches("file:").to_string();
194 let contents = std::fs::read_to_string(&file_path).map_err(|e| {
195 CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
196 })?;
197 (contents.trim().to_string(), Some(file_path))
198 } else {
199 (parts.path.clone(), None)
200 };
201
202 let db_url = params
204 .get("db_url")
205 .ok_or_else(|| CamelError::Config("db_url parameter is required".to_string()))?
206 .clone();
207
208 let max_connections = params.get("maxConnections").and_then(|v| v.parse().ok());
210 let min_connections = params.get("minConnections").and_then(|v| v.parse().ok());
211 let idle_timeout_secs = params.get("idleTimeoutSecs").and_then(|v| v.parse().ok());
212 let max_lifetime_secs = params.get("maxLifetimeSecs").and_then(|v| v.parse().ok());
213
214 let output_type = params
216 .get("outputType")
217 .map(|s| s.parse())
218 .transpose()?
219 .unwrap_or_default();
220 let placeholder = params
221 .get("placeholder")
222 .filter(|v| !v.is_empty())
223 .map(|v| v.chars().next().unwrap())
224 .unwrap_or('#');
225 let noop = params
226 .get("noop")
227 .map(|v| v.eq_ignore_ascii_case("true"))
228 .unwrap_or(false);
229
230 let delay_ms = params
232 .get("delay")
233 .and_then(|v| v.parse().ok())
234 .unwrap_or(500);
235 let initial_delay_ms = params
236 .get("initialDelay")
237 .and_then(|v| v.parse().ok())
238 .unwrap_or(1000);
239 let max_messages_per_poll = params
240 .get("maxMessagesPerPoll")
241 .and_then(|v| v.parse().ok());
242 let on_consume = params.get("onConsume").cloned();
243 let on_consume_failed = params.get("onConsumeFailed").cloned();
244 let on_consume_batch_complete = params.get("onConsumeBatchComplete").cloned();
245 let route_empty_result_set = params
246 .get("routeEmptyResultSet")
247 .map(|v| v.eq_ignore_ascii_case("true"))
248 .unwrap_or(false);
249 let use_iterator = params
250 .get("useIterator")
251 .map(|v| v.eq_ignore_ascii_case("true"))
252 .unwrap_or(true);
253 let expected_update_count = params
254 .get("expectedUpdateCount")
255 .and_then(|v| v.parse().ok());
256 let break_batch_on_consume_fail = params
257 .get("breakBatchOnConsumeFail")
258 .map(|v| v.eq_ignore_ascii_case("true"))
259 .unwrap_or(false);
260
261 let batch = params
263 .get("batch")
264 .map(|v| v.eq_ignore_ascii_case("true"))
265 .unwrap_or(false);
266 let use_message_body_for_sql = params
267 .get("useMessageBodyForSql")
268 .map(|v| v.eq_ignore_ascii_case("true"))
269 .unwrap_or(false);
270
271 Ok(Self {
272 db_url,
273 max_connections,
274 min_connections,
275 idle_timeout_secs,
276 max_lifetime_secs,
277 query,
278 source_path,
279 output_type,
280 placeholder,
281 noop,
282 delay_ms,
283 initial_delay_ms,
284 max_messages_per_poll,
285 on_consume,
286 on_consume_failed,
287 on_consume_batch_complete,
288 route_empty_result_set,
289 use_iterator,
290 expected_update_count,
291 break_batch_on_consume_fail,
292 batch,
293 use_message_body_for_sql,
294 })
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[test]
303 fn test_config_defaults() {
304 let mut c =
305 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
306 c.resolve_defaults();
307 assert_eq!(c.query, "select 1");
308 assert_eq!(c.db_url, "postgres://localhost/test");
309 assert_eq!(c.max_connections, Some(5));
310 assert_eq!(c.min_connections, Some(1));
311 assert_eq!(c.idle_timeout_secs, Some(300));
312 assert_eq!(c.max_lifetime_secs, Some(1800));
313 assert_eq!(c.output_type, SqlOutputType::SelectList);
314 assert_eq!(c.placeholder, '#');
315 assert!(!c.noop);
316 assert_eq!(c.delay_ms, 500);
317 assert_eq!(c.initial_delay_ms, 1000);
318 assert!(c.max_messages_per_poll.is_none());
319 assert!(c.on_consume.is_none());
320 assert!(c.on_consume_failed.is_none());
321 assert!(c.on_consume_batch_complete.is_none());
322 assert!(!c.route_empty_result_set);
323 assert!(c.use_iterator);
324 assert!(c.expected_update_count.is_none());
325 assert!(!c.break_batch_on_consume_fail);
326 assert!(!c.batch);
327 assert!(!c.use_message_body_for_sql);
328 }
329
330 #[test]
331 fn test_config_wrong_scheme() {
332 assert!(SqlEndpointConfig::from_uri("redis://localhost:6379").is_err());
333 }
334
335 #[test]
336 fn test_config_missing_db_url() {
337 assert!(SqlEndpointConfig::from_uri("sql:select 1").is_err());
338 }
339
340 #[test]
341 fn test_config_output_type_select_one() {
342 let c = SqlEndpointConfig::from_uri(
343 "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
344 )
345 .unwrap();
346 assert_eq!(c.output_type, SqlOutputType::SelectOne);
347 }
348
349 #[test]
350 fn test_config_output_type_stream_list() {
351 let c = SqlEndpointConfig::from_uri(
352 "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
353 )
354 .unwrap();
355 assert_eq!(c.output_type, SqlOutputType::StreamList);
356 }
357
358 #[test]
359 fn test_config_consumer_options() {
360 let c = SqlEndpointConfig::from_uri(
361 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
362 ).unwrap();
363 assert_eq!(c.delay_ms, 2000);
364 assert_eq!(c.initial_delay_ms, 500);
365 assert_eq!(c.max_messages_per_poll, Some(10));
366 assert_eq!(
367 c.on_consume,
368 Some("update t set done=true where id=:#id".to_string())
369 );
370 assert_eq!(
371 c.on_consume_failed,
372 Some("update t set failed=true where id=:#id".to_string())
373 );
374 assert_eq!(
375 c.on_consume_batch_complete,
376 Some("delete from t where done=true".to_string())
377 );
378 assert!(c.route_empty_result_set);
379 assert!(!c.use_iterator);
380 assert_eq!(c.expected_update_count, Some(1));
381 assert!(c.break_batch_on_consume_fail);
382 }
383
384 #[test]
385 fn test_config_producer_options() {
386 let c = SqlEndpointConfig::from_uri(
387 "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
388 ).unwrap();
389 assert!(c.batch);
390 assert!(c.use_message_body_for_sql);
391 assert!(c.noop);
392 }
393
394 #[test]
395 fn test_config_pool_options() {
396 let c = SqlEndpointConfig::from_uri(
397 "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
398 ).unwrap();
399 assert_eq!(c.max_connections, Some(20));
400 assert_eq!(c.min_connections, Some(3));
401 assert_eq!(c.idle_timeout_secs, Some(600));
402 assert_eq!(c.max_lifetime_secs, Some(3600));
403 }
404
405 #[test]
406 fn test_config_query_with_special_chars() {
407 let c = SqlEndpointConfig::from_uri(
408 "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
409 )
410 .unwrap();
411 assert_eq!(
412 c.query,
413 "select * from users where name = :#name and age > #"
414 );
415 }
416
417 #[test]
418 fn test_output_type_from_str() {
419 assert_eq!(
420 "SelectList".parse::<SqlOutputType>().unwrap(),
421 SqlOutputType::SelectList
422 );
423 assert_eq!(
424 "SelectOne".parse::<SqlOutputType>().unwrap(),
425 SqlOutputType::SelectOne
426 );
427 assert_eq!(
428 "StreamList".parse::<SqlOutputType>().unwrap(),
429 SqlOutputType::StreamList
430 );
431 assert!("Invalid".parse::<SqlOutputType>().is_err());
432 }
433
434 #[test]
435 fn test_config_file_not_found() {
436 let result = SqlEndpointConfig::from_uri(
437 "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
438 );
439 assert!(result.is_err());
440 let err = result.unwrap_err();
441 let msg = format!("{:?}", err);
442 assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
443 }
444
445 #[test]
446 fn test_config_file_query() {
447 use std::io::Write;
448 let unique_name = format!(
449 "test_sql_query_{}.sql",
450 std::time::SystemTime::now()
451 .duration_since(std::time::UNIX_EPOCH)
452 .unwrap_or_default()
453 .as_nanos()
454 );
455 let mut tmp = std::env::temp_dir();
456 tmp.push(unique_name);
457 {
458 let mut f = std::fs::File::create(&tmp).unwrap();
459 writeln!(f, "SELECT * FROM users").unwrap();
460 }
461 let uri = format!(
462 "sql:file:{}?db_url=postgres://localhost/test",
463 tmp.display()
464 );
465 let c = SqlEndpointConfig::from_uri(&uri).unwrap();
466 assert_eq!(c.query, "SELECT * FROM users");
467 assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
468 std::fs::remove_file(&tmp).ok();
469 }
470
471 #[test]
473 fn test_pool_fields_none_when_not_set() {
474 let c =
475 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
476 assert_eq!(c.max_connections, None);
477 assert_eq!(c.min_connections, None);
478 assert_eq!(c.idle_timeout_secs, None);
479 assert_eq!(c.max_lifetime_secs, None);
480 }
481
482 #[test]
483 fn test_apply_defaults_fills_none() {
484 let mut c =
485 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
486 let global = SqlGlobalConfig {
487 max_connections: 10,
488 min_connections: 2,
489 idle_timeout_secs: 600,
490 max_lifetime_secs: 3600,
491 };
492 c.apply_defaults(&global);
493 assert_eq!(c.max_connections, Some(10));
494 assert_eq!(c.min_connections, Some(2));
495 assert_eq!(c.idle_timeout_secs, Some(600));
496 assert_eq!(c.max_lifetime_secs, Some(3600));
497 }
498
499 #[test]
500 fn test_apply_defaults_does_not_override() {
501 let mut c = SqlEndpointConfig::from_uri(
502 "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=5",
503 )
504 .unwrap();
505 let global = SqlGlobalConfig {
506 max_connections: 10,
507 min_connections: 2,
508 idle_timeout_secs: 600,
509 max_lifetime_secs: 3600,
510 };
511 c.apply_defaults(&global);
512 assert_eq!(c.max_connections, Some(99));
514 assert_eq!(c.min_connections, Some(5));
515 assert_eq!(c.idle_timeout_secs, Some(600));
517 assert_eq!(c.max_lifetime_secs, Some(3600));
518 }
519
520 #[test]
521 fn test_resolve_defaults_fills_remaining() {
522 let mut c = SqlEndpointConfig::from_uri(
523 "sql:select 1?db_url=postgres://localhost/test&maxConnections=7",
524 )
525 .unwrap();
526 c.resolve_defaults();
527 assert_eq!(c.max_connections, Some(7)); assert_eq!(c.min_connections, Some(1)); assert_eq!(c.idle_timeout_secs, Some(300)); assert_eq!(c.max_lifetime_secs, Some(1800)); }
532
533 #[test]
534 fn test_global_config_builder() {
535 let c = SqlGlobalConfig::default()
536 .with_max_connections(20)
537 .with_min_connections(3)
538 .with_idle_timeout_secs(600)
539 .with_max_lifetime_secs(3600);
540 assert_eq!(c.max_connections, 20);
541 assert_eq!(c.min_connections, 3);
542 assert_eq!(c.idle_timeout_secs, 600);
543 assert_eq!(c.max_lifetime_secs, 3600);
544 }
545}