1use std::str::FromStr;
2
3use camel_component_api::CamelError;
4use camel_component_api::{UriComponents, UriConfig, parse_uri};
5
6fn redacted_opt(opt: &Option<String>) -> Option<&'static str> {
8 if opt.is_some() { Some("***") } else { None }
9}
10
11pub fn redact_db_url(db_url: &str) -> String {
14 match url::Url::parse(db_url) {
15 Ok(mut parsed) => {
16 if parsed.username().is_empty() && parsed.password().is_none() {
17 return db_url.to_string();
18 }
19 let _ = parsed.set_username("***");
20 let _ = parsed.set_password(Some("***"));
21 parsed.to_string()
22 }
23 Err(_) => db_url.to_string(),
24 }
25}
26
27#[derive(Debug, Clone, PartialEq, Default)]
29pub enum SqlOutputType {
30 #[default]
32 SelectList,
33 SelectOne,
35 StreamList,
37}
38
39impl FromStr for SqlOutputType {
40 type Err = CamelError;
41
42 fn from_str(s: &str) -> Result<Self, Self::Err> {
43 match s {
44 "SelectList" => Ok(SqlOutputType::SelectList),
45 "SelectOne" => Ok(SqlOutputType::SelectOne),
46 "StreamList" => Ok(SqlOutputType::StreamList),
47 _ => Err(CamelError::InvalidUri(format!(
48 "Unknown output type: {}",
49 s
50 ))),
51 }
52 }
53}
54
55#[derive(Clone, PartialEq, serde::Deserialize)]
62#[serde(default)]
63pub struct SqlGlobalConfig {
64 pub max_connections: u32,
65 pub min_connections: u32,
66 pub idle_timeout_secs: u64,
67 pub max_lifetime_secs: u64,
68 pub ssl_mode: Option<String>,
70 pub ssl_root_cert: Option<String>,
71 pub ssl_cert: Option<String>,
72 pub ssl_key: Option<String>,
73}
74
75impl std::fmt::Debug for SqlGlobalConfig {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("SqlGlobalConfig")
78 .field("max_connections", &self.max_connections)
79 .field("min_connections", &self.min_connections)
80 .field("idle_timeout_secs", &self.idle_timeout_secs)
81 .field("max_lifetime_secs", &self.max_lifetime_secs)
82 .field("ssl_mode", &self.ssl_mode)
83 .field("ssl_root_cert", &self.ssl_root_cert)
84 .field("ssl_cert", &self.ssl_cert)
85 .field("ssl_key", &redacted_opt(&self.ssl_key))
86 .finish()
87 }
88}
89
90impl Default for SqlGlobalConfig {
91 fn default() -> Self {
92 Self {
93 max_connections: 5,
94 min_connections: 1,
95 idle_timeout_secs: 300,
96 max_lifetime_secs: 1800,
97 ssl_mode: None,
98 ssl_root_cert: None,
99 ssl_cert: None,
100 ssl_key: None,
101 }
102 }
103}
104
105impl SqlGlobalConfig {
106 pub fn new() -> Self {
107 Self::default()
108 }
109
110 pub fn with_max_connections(mut self, value: u32) -> Self {
111 self.max_connections = value;
112 self
113 }
114
115 pub fn with_min_connections(mut self, value: u32) -> Self {
116 self.min_connections = value;
117 self
118 }
119
120 pub fn with_idle_timeout_secs(mut self, value: u64) -> Self {
121 self.idle_timeout_secs = value;
122 self
123 }
124
125 pub fn with_max_lifetime_secs(mut self, value: u64) -> Self {
126 self.max_lifetime_secs = value;
127 self
128 }
129
130 pub fn with_ssl_mode(mut self, value: impl Into<String>) -> Self {
131 self.ssl_mode = Some(value.into());
132 self
133 }
134
135 pub fn with_ssl_root_cert(mut self, value: impl Into<String>) -> Self {
136 self.ssl_root_cert = Some(value.into());
137 self
138 }
139
140 pub fn with_ssl_cert(mut self, value: impl Into<String>) -> Self {
141 self.ssl_cert = Some(value.into());
142 self
143 }
144
145 pub fn with_ssl_key(mut self, value: impl Into<String>) -> Self {
146 self.ssl_key = Some(value.into());
147 self
148 }
149}
150
151#[derive(Clone)]
168pub struct SqlEndpointConfig {
169 pub db_url: String,
172 pub max_connections: Option<u32>,
174 pub min_connections: Option<u32>,
176 pub idle_timeout_secs: Option<u64>,
178 pub max_lifetime_secs: Option<u64>,
180
181 pub query: String,
184 pub source_path: Option<String>,
186 pub output_type: SqlOutputType,
188 pub placeholder: char,
190 pub use_placeholder: bool,
193 pub noop: bool,
195 pub in_separator: String,
197
198 pub delay_ms: u64,
201 pub initial_delay_ms: u64,
203 pub max_messages_per_poll: Option<i32>,
205 pub on_consume: Option<String>,
207 pub on_consume_failed: Option<String>,
209 pub on_consume_batch_complete: Option<String>,
211 pub route_empty_result_set: bool,
213 pub use_iterator: bool,
215 pub expected_update_count: Option<i64>,
217 pub break_batch_on_consume_fail: bool,
219
220 pub batch: bool,
223 pub use_message_body_for_sql: bool,
225
226 pub ssl_mode: Option<String>,
229 pub ssl_root_cert: Option<String>,
231 pub ssl_cert: Option<String>,
233 pub ssl_key: Option<String>,
235}
236
237impl std::fmt::Debug for SqlEndpointConfig {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 f.debug_struct("SqlEndpointConfig")
240 .field("db_url", &redact_db_url(&self.db_url))
241 .field("max_connections", &self.max_connections)
242 .field("min_connections", &self.min_connections)
243 .field("idle_timeout_secs", &self.idle_timeout_secs)
244 .field("max_lifetime_secs", &self.max_lifetime_secs)
245 .field("query", &self.query)
246 .field("source_path", &self.source_path)
247 .field("output_type", &self.output_type)
248 .field("placeholder", &self.placeholder)
249 .field("use_placeholder", &self.use_placeholder)
250 .field("noop", &self.noop)
251 .field("in_separator", &self.in_separator)
252 .field("delay_ms", &self.delay_ms)
253 .field("initial_delay_ms", &self.initial_delay_ms)
254 .field("max_messages_per_poll", &self.max_messages_per_poll)
255 .field("on_consume", &self.on_consume)
256 .field("on_consume_failed", &self.on_consume_failed)
257 .field("on_consume_batch_complete", &self.on_consume_batch_complete)
258 .field("route_empty_result_set", &self.route_empty_result_set)
259 .field("use_iterator", &self.use_iterator)
260 .field("expected_update_count", &self.expected_update_count)
261 .field(
262 "break_batch_on_consume_fail",
263 &self.break_batch_on_consume_fail,
264 )
265 .field("batch", &self.batch)
266 .field("use_message_body_for_sql", &self.use_message_body_for_sql)
267 .field("ssl_mode", &self.ssl_mode)
268 .field("ssl_root_cert", &self.ssl_root_cert)
269 .field("ssl_cert", &self.ssl_cert)
270 .field("ssl_key", &redacted_opt(&self.ssl_key))
271 .finish()
272 }
273}
274
275impl SqlEndpointConfig {
276 pub fn apply_defaults(&mut self, defaults: &SqlGlobalConfig) {
278 if self.max_connections.is_none() {
279 self.max_connections = Some(defaults.max_connections);
280 }
281 if self.min_connections.is_none() {
282 self.min_connections = Some(defaults.min_connections);
283 }
284 if self.idle_timeout_secs.is_none() {
285 self.idle_timeout_secs = Some(defaults.idle_timeout_secs);
286 }
287 if self.max_lifetime_secs.is_none() {
288 self.max_lifetime_secs = Some(defaults.max_lifetime_secs);
289 }
290 if self.ssl_mode.is_none() {
291 self.ssl_mode = defaults.ssl_mode.clone();
292 }
293 if self.ssl_root_cert.is_none() {
294 self.ssl_root_cert = defaults.ssl_root_cert.clone();
295 }
296 if self.ssl_cert.is_none() {
297 self.ssl_cert = defaults.ssl_cert.clone();
298 }
299 if self.ssl_key.is_none() {
300 self.ssl_key = defaults.ssl_key.clone();
301 }
302 }
303
304 pub fn resolve_defaults(&mut self) {
306 let defaults = SqlGlobalConfig::default();
307 self.apply_defaults(&defaults);
308 }
309}
310
311struct SslParamMapping {
312 pg_key: &'static str,
313 mysql_key: &'static str,
314}
315
316const SSL_MAPPINGS: &[(&str, SslParamMapping)] = &[
317 (
318 "sslMode",
319 SslParamMapping {
320 pg_key: "sslmode",
321 mysql_key: "ssl-mode",
322 },
323 ),
324 (
325 "sslRootCert",
326 SslParamMapping {
327 pg_key: "sslrootcert",
328 mysql_key: "ssl-ca",
329 },
330 ),
331 (
332 "sslCert",
333 SslParamMapping {
334 pg_key: "sslcert",
335 mysql_key: "ssl-cert",
336 },
337 ),
338 (
339 "sslKey",
340 SslParamMapping {
341 pg_key: "sslkey",
342 mysql_key: "ssl-key",
343 },
344 ),
345];
346
347pub fn enrich_db_url_with_ssl(
348 db_url: &str,
349 config: &SqlEndpointConfig,
350) -> Result<String, CamelError> {
351 let ssl_params: Vec<(&str, &str)> = [
352 config.ssl_mode.as_deref().map(|v| ("sslMode", v)),
353 config.ssl_root_cert.as_deref().map(|v| ("sslRootCert", v)),
354 config.ssl_cert.as_deref().map(|v| ("sslCert", v)),
355 config.ssl_key.as_deref().map(|v| ("sslKey", v)),
356 ]
357 .into_iter()
358 .flatten()
359 .collect();
360
361 if ssl_params.is_empty() {
362 return Ok(db_url.to_string());
363 }
364
365 let mut parsed = url::Url::parse(db_url).map_err(|e| {
366 CamelError::InvalidUri(format!(
367 "Cannot parse database URL for SSL enrichment: {}",
368 e
369 ))
370 })?;
371
372 let scheme = parsed.scheme();
373 if scheme != "postgres" && scheme != "postgresql" && scheme != "mysql" {
374 return Ok(db_url.to_string());
375 }
376 let is_mysql = scheme == "mysql";
377
378 let mut query_pairs = parsed.query_pairs().collect::<Vec<_>>();
379 for (camel_name, value) in &ssl_params {
380 if let Some((_, mapping)) = SSL_MAPPINGS.iter().find(|(name, _)| *name == *camel_name) {
381 let driver_key = if is_mysql {
382 mapping.mysql_key
383 } else {
384 mapping.pg_key
385 };
386
387 if let Some(pos) = query_pairs.iter().position(|(k, _)| k == driver_key) {
388 query_pairs[pos].1 = (*value).into();
389 } else {
390 query_pairs.push((driver_key.into(), (*value).into()));
391 }
392 }
393 }
394
395 {
396 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
397 for (k, v) in query_pairs {
398 serializer.append_pair(&k, &v);
399 }
400 parsed.set_query(Some(&serializer.finish()));
401 }
402
403 Ok(parsed.to_string())
404}
405
406impl UriConfig for SqlEndpointConfig {
407 fn scheme() -> &'static str {
408 "sql"
409 }
410
411 fn from_uri(uri: &str) -> Result<Self, CamelError> {
412 let parts = parse_uri(uri)?;
413 Self::from_components(parts)
414 }
415
416 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
417 if parts.scheme != Self::scheme() {
419 return Err(CamelError::InvalidUri(format!(
420 "expected scheme '{}' but got '{}'",
421 Self::scheme(),
422 parts.scheme
423 )));
424 }
425
426 let params = &parts.params;
427
428 let (query, source_path) = if parts.path.starts_with("file:") {
430 let file_path = parts.path.trim_start_matches("file:").to_string();
431 let contents = std::fs::read_to_string(&file_path).map_err(|e| {
432 CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
433 })?;
434 (contents.trim().to_string(), Some(file_path))
435 } else {
436 (parts.path.clone(), None)
437 };
438
439 let db_url = params
441 .get("db_url")
442 .ok_or_else(|| CamelError::Config("db_url parameter is required".to_string()))?
443 .clone();
444
445 let max_connections = params.get("maxConnections").and_then(|v| v.parse().ok());
447 let min_connections = params.get("minConnections").and_then(|v| v.parse().ok());
448 let idle_timeout_secs = params.get("idleTimeoutSecs").and_then(|v| v.parse().ok());
449 let max_lifetime_secs = params.get("maxLifetimeSecs").and_then(|v| v.parse().ok());
450
451 let output_type = params
453 .get("outputType")
454 .map(|s| s.parse())
455 .transpose()?
456 .unwrap_or_default();
457 let placeholder = params
458 .get("placeholder")
459 .filter(|v| !v.is_empty())
460 .map(|v| {
461 if v.chars().count() != 1 {
462 return Err(CamelError::InvalidUri(format!(
463 "placeholder must be exactly one character, got '{}'",
464 v
465 )));
466 }
467 Ok(v.chars().next().unwrap()) })
469 .transpose()?
470 .unwrap_or('#');
471 fn parse_bool_param(name: &str, value: &str) -> Result<bool, CamelError> {
476 if value.eq_ignore_ascii_case("true") {
477 Ok(true)
478 } else if value.eq_ignore_ascii_case("false") {
479 Ok(false)
480 } else {
481 Err(CamelError::InvalidUri(format!(
482 "{} must be 'true' or 'false', got '{}'",
483 name, value
484 )))
485 }
486 }
487
488 let use_placeholder = params
489 .get("usePlaceholder")
490 .map(|v| parse_bool_param("usePlaceholder", v))
491 .transpose()?
492 .unwrap_or(true);
493 let noop = params
494 .get("noop")
495 .map(|v| parse_bool_param("noop", v))
496 .transpose()?
497 .unwrap_or(false);
498 let in_separator = params
499 .get("inSeparator")
500 .map(|v| v.to_string())
501 .unwrap_or_else(|| ", ".to_string());
502 if in_separator.is_empty() {
503 return Err(CamelError::InvalidUri(
504 "inSeparator must not be empty".to_string(),
505 ));
506 }
507
508 let delay_ms = params
510 .get("delay")
511 .and_then(|v| v.parse().ok())
512 .unwrap_or(500);
513 let initial_delay_ms = params
514 .get("initialDelay")
515 .and_then(|v| v.parse().ok())
516 .unwrap_or(1000);
517 let max_messages_per_poll = params
518 .get("maxMessagesPerPoll")
519 .and_then(|v| v.parse().ok());
520 let on_consume = params.get("onConsume").cloned();
521 let on_consume_failed = params.get("onConsumeFailed").cloned();
522 let on_consume_batch_complete = params.get("onConsumeBatchComplete").cloned();
523 let route_empty_result_set = params
524 .get("routeEmptyResultSet")
525 .map(|v| parse_bool_param("routeEmptyResultSet", v))
526 .transpose()?
527 .unwrap_or(false);
528 let use_iterator = params
529 .get("useIterator")
530 .map(|v| parse_bool_param("useIterator", v))
531 .transpose()?
532 .unwrap_or(true);
533 let expected_update_count = params
534 .get("expectedUpdateCount")
535 .and_then(|v| v.parse().ok());
536 let break_batch_on_consume_fail = params
537 .get("breakBatchOnConsumeFail")
538 .map(|v| parse_bool_param("breakBatchOnConsumeFail", v))
539 .transpose()?
540 .unwrap_or(false);
541
542 let batch = params
544 .get("batch")
545 .map(|v| parse_bool_param("batch", v))
546 .transpose()?
547 .unwrap_or(false);
548 let use_message_body_for_sql = params
549 .get("useMessageBodyForSql")
550 .map(|v| parse_bool_param("useMessageBodyForSql", v))
551 .transpose()?
552 .unwrap_or(false);
553 let ssl_mode = params.get("sslMode").cloned();
554 let ssl_root_cert = params.get("sslRootCert").cloned();
555 let ssl_cert = params.get("sslCert").cloned();
556 let ssl_key = params.get("sslKey").cloned();
557
558 Ok(Self {
559 db_url,
560 max_connections,
561 min_connections,
562 idle_timeout_secs,
563 max_lifetime_secs,
564 query,
565 source_path,
566 output_type,
567 placeholder,
568 use_placeholder,
569 noop,
570 in_separator,
571 delay_ms,
572 initial_delay_ms,
573 max_messages_per_poll,
574 on_consume,
575 on_consume_failed,
576 on_consume_batch_complete,
577 route_empty_result_set,
578 use_iterator,
579 expected_update_count,
580 break_batch_on_consume_fail,
581 batch,
582 use_message_body_for_sql,
583 ssl_mode,
584 ssl_root_cert,
585 ssl_cert,
586 ssl_key,
587 })
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594
595 #[test]
596 fn config_defaults() {
597 let mut c =
598 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
599 c.resolve_defaults();
600 assert_eq!(c.query, "select 1");
601 assert_eq!(c.db_url, "postgres://localhost/test");
602 assert_eq!(c.max_connections, Some(5));
603 assert_eq!(c.min_connections, Some(1));
604 assert_eq!(c.idle_timeout_secs, Some(300));
605 assert_eq!(c.max_lifetime_secs, Some(1800));
606 assert_eq!(c.output_type, SqlOutputType::SelectList);
607 assert_eq!(c.placeholder, '#');
608 assert!(!c.noop);
609 assert_eq!(c.in_separator, ", ");
610 assert_eq!(c.delay_ms, 500);
611 assert_eq!(c.initial_delay_ms, 1000);
612 assert!(c.max_messages_per_poll.is_none());
613 assert!(c.on_consume.is_none());
614 assert!(c.on_consume_failed.is_none());
615 assert!(c.on_consume_batch_complete.is_none());
616 assert!(!c.route_empty_result_set);
617 assert!(c.use_iterator);
618 assert!(c.expected_update_count.is_none());
619 assert!(!c.break_batch_on_consume_fail);
620 assert!(!c.batch);
621 assert!(!c.use_message_body_for_sql);
622 assert!(c.ssl_mode.is_none());
623 assert!(c.ssl_root_cert.is_none());
624 assert!(c.ssl_cert.is_none());
625 assert!(c.ssl_key.is_none());
626 }
627
628 #[test]
629 fn ssl_none_by_default() {
630 let c =
631 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
632 assert!(c.ssl_mode.is_none());
633 assert!(c.ssl_root_cert.is_none());
634 assert!(c.ssl_cert.is_none());
635 assert!(c.ssl_key.is_none());
636 }
637
638 #[test]
639 fn ssl_mode_from_uri() {
640 let c = SqlEndpointConfig::from_uri(
641 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
642 )
643 .unwrap();
644 assert_eq!(c.ssl_mode, Some("require".to_string()));
645 assert!(c.ssl_root_cert.is_none());
646 }
647
648 #[test]
649 fn ssl_all_params_from_uri() {
650 let c = SqlEndpointConfig::from_uri(
651 "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
652 )
653 .unwrap();
654 assert_eq!(c.ssl_mode, Some("require".to_string()));
655 assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
656 assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
657 assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
658 }
659
660 #[test]
661 fn ssl_global_applied_to_endpoint() {
662 let mut c =
663 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
664 let global = SqlGlobalConfig::default()
665 .with_ssl_mode("require")
666 .with_ssl_root_cert("/etc/ssl/ca.pem");
667 c.apply_defaults(&global);
668 assert_eq!(c.ssl_mode, Some("require".to_string()));
669 assert_eq!(c.ssl_root_cert, Some("/etc/ssl/ca.pem".to_string()));
670 assert!(c.ssl_cert.is_none());
671 assert!(c.ssl_key.is_none());
672 }
673
674 #[test]
675 fn ssl_uri_overrides_global() {
676 let mut c = SqlEndpointConfig::from_uri(
677 "sql:select 1?db_url=postgres://localhost/test&sslMode=verify-full",
678 )
679 .unwrap();
680 let global = SqlGlobalConfig::default().with_ssl_mode("require");
681 c.apply_defaults(&global);
682 assert_eq!(c.ssl_mode, Some("verify-full".to_string()));
683 }
684
685 #[test]
686 fn config_wrong_scheme() {
687 assert!(SqlEndpointConfig::from_uri("redis://localhost:6379").is_err());
688 }
689
690 #[test]
691 fn config_missing_db_url() {
692 assert!(SqlEndpointConfig::from_uri("sql:select 1").is_err());
693 }
694
695 #[test]
696 fn config_output_type_select_one() {
697 let c = SqlEndpointConfig::from_uri(
698 "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
699 )
700 .unwrap();
701 assert_eq!(c.output_type, SqlOutputType::SelectOne);
702 }
703
704 #[test]
705 fn config_output_type_stream_list() {
706 let c = SqlEndpointConfig::from_uri(
707 "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
708 )
709 .unwrap();
710 assert_eq!(c.output_type, SqlOutputType::StreamList);
711 }
712
713 #[test]
714 fn in_separator_default() {
715 let c =
716 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
717 assert_eq!(c.in_separator, ", ");
718 }
719
720 #[test]
721 fn in_separator_from_uri() {
722 let c = SqlEndpointConfig::from_uri(
723 "sql:select 1?db_url=postgres://localhost/test&inSeparator=;",
724 )
725 .unwrap();
726 assert_eq!(c.in_separator, ";");
727 }
728
729 #[test]
730 fn in_separator_empty_rejected() {
731 let result = SqlEndpointConfig::from_uri(
732 "sql:select 1?db_url=postgres://localhost/test&inSeparator=",
733 );
734 assert!(result.is_err());
735 let msg = format!("{:?}", result.unwrap_err());
736 assert!(msg.contains("inSeparator") || msg.contains("empty"));
737 }
738
739 #[test]
740 fn config_consumer_options() {
741 let c = SqlEndpointConfig::from_uri(
742 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
743 ).unwrap();
744 assert_eq!(c.delay_ms, 2000);
745 assert_eq!(c.initial_delay_ms, 500);
746 assert_eq!(c.max_messages_per_poll, Some(10));
747 assert_eq!(
748 c.on_consume,
749 Some("update t set done=true where id=:#id".to_string())
750 );
751 assert_eq!(
752 c.on_consume_failed,
753 Some("update t set failed=true where id=:#id".to_string())
754 );
755 assert_eq!(
756 c.on_consume_batch_complete,
757 Some("delete from t where done=true".to_string())
758 );
759 assert!(c.route_empty_result_set);
760 assert!(!c.use_iterator);
761 assert_eq!(c.expected_update_count, Some(1));
762 assert!(c.break_batch_on_consume_fail);
763 }
764
765 #[test]
766 fn config_producer_options() {
767 let c = SqlEndpointConfig::from_uri(
768 "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
769 ).unwrap();
770 assert!(c.batch);
771 assert!(c.use_message_body_for_sql);
772 assert!(c.noop);
773 }
774
775 #[test]
776 fn config_pool_options() {
777 let c = SqlEndpointConfig::from_uri(
778 "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
779 ).unwrap();
780 assert_eq!(c.max_connections, Some(20));
781 assert_eq!(c.min_connections, Some(3));
782 assert_eq!(c.idle_timeout_secs, Some(600));
783 assert_eq!(c.max_lifetime_secs, Some(3600));
784 }
785
786 #[test]
787 fn config_query_with_special_chars() {
788 let c = SqlEndpointConfig::from_uri(
789 "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
790 )
791 .unwrap();
792 assert_eq!(
793 c.query,
794 "select * from users where name = :#name and age > #"
795 );
796 }
797
798 #[test]
799 fn output_type_from_str() {
800 assert_eq!(
801 "SelectList".parse::<SqlOutputType>().unwrap(),
802 SqlOutputType::SelectList
803 );
804 assert_eq!(
805 "SelectOne".parse::<SqlOutputType>().unwrap(),
806 SqlOutputType::SelectOne
807 );
808 assert_eq!(
809 "StreamList".parse::<SqlOutputType>().unwrap(),
810 SqlOutputType::StreamList
811 );
812 assert!("Invalid".parse::<SqlOutputType>().is_err());
813 }
814
815 #[test]
816 fn config_file_not_found() {
817 let result = SqlEndpointConfig::from_uri(
818 "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
819 );
820 assert!(result.is_err());
821 let err = result.unwrap_err();
822 let msg = format!("{:?}", err);
823 assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
824 }
825
826 #[test]
827 fn config_file_query() {
828 use std::io::Write;
829 let unique_name = format!(
830 "test_sql_query_{}.sql",
831 std::time::SystemTime::now()
832 .duration_since(std::time::UNIX_EPOCH)
833 .unwrap_or_default()
834 .as_nanos()
835 );
836 let mut tmp = std::env::temp_dir();
837 tmp.push(unique_name);
838 {
839 let mut f = std::fs::File::create(&tmp).unwrap();
840 writeln!(f, "SELECT * FROM users").unwrap();
841 }
842 let uri = format!(
843 "sql:file:{}?db_url=postgres://localhost/test",
844 tmp.display()
845 );
846 let c = SqlEndpointConfig::from_uri(&uri).unwrap();
847 assert_eq!(c.query, "SELECT * FROM users");
848 assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
849 std::fs::remove_file(&tmp).ok();
850 }
851
852 #[test]
854 fn pool_fields_none_when_not_set() {
855 let c =
856 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
857 assert_eq!(c.max_connections, None);
858 assert_eq!(c.min_connections, None);
859 assert_eq!(c.idle_timeout_secs, None);
860 assert_eq!(c.max_lifetime_secs, None);
861 }
862
863 #[test]
864 fn apply_defaults_fills_none() {
865 let mut c =
866 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
867 let global = SqlGlobalConfig {
868 max_connections: 10,
869 min_connections: 2,
870 idle_timeout_secs: 600,
871 max_lifetime_secs: 3600,
872 ssl_mode: None,
873 ssl_root_cert: None,
874 ssl_cert: None,
875 ssl_key: None,
876 };
877 c.apply_defaults(&global);
878 assert_eq!(c.max_connections, Some(10));
879 assert_eq!(c.min_connections, Some(2));
880 assert_eq!(c.idle_timeout_secs, Some(600));
881 assert_eq!(c.max_lifetime_secs, Some(3600));
882 assert!(c.ssl_mode.is_none());
883 assert!(c.ssl_root_cert.is_none());
884 assert!(c.ssl_cert.is_none());
885 assert!(c.ssl_key.is_none());
886 }
887
888 #[test]
889 fn apply_defaults_does_not_override() {
890 let mut c = SqlEndpointConfig::from_uri(
891 "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=5",
892 )
893 .unwrap();
894 let global = SqlGlobalConfig {
895 max_connections: 10,
896 min_connections: 2,
897 idle_timeout_secs: 600,
898 max_lifetime_secs: 3600,
899 ssl_mode: None,
900 ssl_root_cert: None,
901 ssl_cert: None,
902 ssl_key: None,
903 };
904 c.apply_defaults(&global);
905 assert_eq!(c.max_connections, Some(99));
907 assert_eq!(c.min_connections, Some(5));
908 assert_eq!(c.idle_timeout_secs, Some(600));
910 assert_eq!(c.max_lifetime_secs, Some(3600));
911 }
912
913 #[test]
914 fn resolve_defaults_fills_remaining() {
915 let mut c = SqlEndpointConfig::from_uri(
916 "sql:select 1?db_url=postgres://localhost/test&maxConnections=7",
917 )
918 .unwrap();
919 c.resolve_defaults();
920 assert_eq!(c.max_connections, Some(7)); assert_eq!(c.min_connections, Some(1)); assert_eq!(c.idle_timeout_secs, Some(300)); assert_eq!(c.max_lifetime_secs, Some(1800)); }
925
926 #[test]
927 fn global_config_builder() {
928 let c = SqlGlobalConfig::default()
929 .with_max_connections(20)
930 .with_min_connections(3)
931 .with_idle_timeout_secs(600)
932 .with_max_lifetime_secs(3600)
933 .with_ssl_mode("require")
934 .with_ssl_root_cert("/ca.pem")
935 .with_ssl_cert("/cert.pem")
936 .with_ssl_key("/key.pem");
937 assert_eq!(c.max_connections, 20);
938 assert_eq!(c.min_connections, 3);
939 assert_eq!(c.idle_timeout_secs, 600);
940 assert_eq!(c.max_lifetime_secs, 3600);
941 assert_eq!(c.ssl_mode, Some("require".to_string()));
942 assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
943 assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
944 assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
945 }
946
947 #[test]
948 fn enrich_postgres_ssl_mode() {
949 let mut c = SqlEndpointConfig::from_uri(
950 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
951 )
952 .unwrap();
953 c.resolve_defaults();
954 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
955 assert!(url.contains("sslmode=require"), "got: {}", url);
956 }
957
958 #[test]
959 fn enrich_postgres_all_ssl() {
960 let mut c = SqlEndpointConfig::from_uri(
961 "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
962 )
963 .unwrap();
964 c.resolve_defaults();
965 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
966 assert!(url.contains("sslmode=require"), "got: {}", url);
967 assert!(url.contains("sslrootcert="), "got: {}", url);
968 assert!(url.contains("sslcert="), "got: {}", url);
969 assert!(url.contains("sslkey="), "got: {}", url);
970 }
971
972 #[test]
973 fn enrich_mysql_ssl() {
974 let mut c = SqlEndpointConfig::from_uri(
975 "sql:select 1?db_url=mysql://localhost/test&sslMode=require",
976 )
977 .unwrap();
978 c.resolve_defaults();
979 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
980 assert!(url.contains("ssl-mode=require"), "got: {}", url);
981 }
982
983 #[test]
984 fn enrich_existing_query_params() {
985 let mut c = SqlEndpointConfig::from_uri(
986 "sql:select 1?db_url=postgres://localhost/test?existing=1&sslMode=require",
987 )
988 .unwrap();
989 c.resolve_defaults();
990 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
991 assert!(url.contains("existing=1"), "got: {}", url);
992 assert!(url.contains("sslmode=require"), "got: {}", url);
993 }
994
995 #[test]
996 fn enrich_override_existing() {
997 let mut c = SqlEndpointConfig::from_uri(
998 "sql:select 1?db_url=postgres://localhost/test?sslmode=allow&sslMode=require",
999 )
1000 .unwrap();
1001 c.resolve_defaults();
1002 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1003 assert!(url.contains("sslmode=require"), "got: {}", url);
1004 assert!(!url.contains("sslmode=allow"), "got: {}", url);
1005 }
1006
1007 #[test]
1008 fn enrich_no_params() {
1009 let mut c =
1010 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1011 c.resolve_defaults();
1012 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1013 assert_eq!(url, "postgres://localhost/test");
1014 }
1015
1016 #[test]
1017 fn enrich_url_encodes_paths() {
1018 let mut c = SqlEndpointConfig::from_uri(
1019 "sql:select 1?db_url=postgres://localhost/test&sslRootCert=/path/to/my%20cert.pem",
1020 )
1021 .unwrap();
1022 c.resolve_defaults();
1023 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1024 assert!(url.contains("sslrootcert="), "got: {}", url);
1025 }
1026
1027 #[test]
1028 fn enrich_unsupported_scheme_returns_unchanged() {
1029 let mut c = SqlEndpointConfig::from_uri(
1030 "sql:select 1?db_url=sqlite://localhost/test.db&sslMode=require",
1031 )
1032 .unwrap();
1033 c.resolve_defaults();
1034 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1035 assert_eq!(url, "sqlite://localhost/test.db");
1036 }
1037
1038 #[test]
1039 fn enrich_invalid_url_returns_error() {
1040 let mut c = SqlEndpointConfig::from_uri(
1041 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1042 )
1043 .unwrap();
1044 c.resolve_defaults();
1045 let result = enrich_db_url_with_ssl("://not-a-valid-url", &c);
1046 assert!(result.is_err());
1047 }
1048
1049 #[test]
1053 fn debug_redacts_db_url_with_password() {
1054 let c = SqlEndpointConfig::from_uri(
1055 "sql:select 1?db_url=postgres://user:secret123@localhost/test",
1056 )
1057 .unwrap();
1058 let debug_output = format!("{:?}", c);
1059 assert!(
1060 !debug_output.contains("secret123"),
1061 "Debug output must not contain password: {}",
1062 debug_output
1063 );
1064 assert!(
1065 debug_output.contains("***"),
1066 "Debug output must contain redacted marker: {}",
1067 debug_output
1068 );
1069 }
1070
1071 #[test]
1072 fn debug_redacts_ssl_key() {
1073 let c = SqlEndpointConfig::from_uri(
1074 "sql:select 1?db_url=postgres://localhost/test&sslKey=/secret/key.pem",
1075 )
1076 .unwrap();
1077 let debug_output = format!("{:?}", c);
1078 assert!(
1079 !debug_output.contains("/secret/key.pem"),
1080 "Debug output must not contain ssl_key path: {}",
1081 debug_output
1082 );
1083 }
1084
1085 #[test]
1086 fn debug_global_config_redacts_ssl_key() {
1087 let c = SqlGlobalConfig::default().with_ssl_key("/secret/key.pem");
1088 let debug_output = format!("{:?}", c);
1089 assert!(
1090 !debug_output.contains("/secret/key.pem"),
1091 "Debug output must not contain ssl_key path: {}",
1092 debug_output
1093 );
1094 assert!(
1095 debug_output.contains("***"),
1096 "Debug output must contain redacted marker: {}",
1097 debug_output
1098 );
1099 }
1100
1101 #[test]
1102 fn redact_db_url_with_credentials() {
1103 assert_eq!(
1104 redact_db_url("postgres://user:pass@host/db"),
1105 "postgres://***:***@host/db"
1106 );
1107 }
1108
1109 #[test]
1110 fn redact_db_url_without_credentials() {
1111 assert_eq!(redact_db_url("sqlite::memory:"), "sqlite::memory:");
1112 }
1113
1114 #[test]
1115 fn redact_db_url_invalid_returns_original() {
1116 assert_eq!(redact_db_url("not-a-url"), "not-a-url");
1117 }
1118
1119 #[test]
1121 fn use_placeholder_defaults_to_true() {
1122 let c =
1123 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1124 assert!(c.use_placeholder);
1125 }
1126
1127 #[test]
1128 fn use_placeholder_false_from_uri() {
1129 let c = SqlEndpointConfig::from_uri(
1130 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=false",
1131 )
1132 .unwrap();
1133 assert!(!c.use_placeholder);
1134 }
1135
1136 #[test]
1137 fn use_placeholder_true_from_uri() {
1138 let c = SqlEndpointConfig::from_uri(
1139 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=true",
1140 )
1141 .unwrap();
1142 assert!(c.use_placeholder);
1143 }
1144
1145 #[test]
1147 fn use_placeholder_rejects_invalid_value() {
1148 let result = SqlEndpointConfig::from_uri(
1149 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=1",
1150 );
1151 assert!(result.is_err());
1152 let msg = format!("{:?}", result.unwrap_err());
1153 assert!(msg.contains("usePlaceholder") && msg.contains("true") && msg.contains("false"));
1154 }
1155
1156 #[test]
1157 fn use_placeholder_rejects_typo_tru() {
1158 let result = SqlEndpointConfig::from_uri(
1159 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=tru",
1160 );
1161 assert!(result.is_err());
1162 }
1163
1164 #[test]
1165 fn use_placeholder_rejects_yes() {
1166 let result = SqlEndpointConfig::from_uri(
1167 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=yes",
1168 );
1169 assert!(result.is_err());
1170 }
1171
1172 #[test]
1173 fn noop_rejects_invalid_value() {
1174 let result =
1175 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&noop=1");
1176 assert!(result.is_err());
1177 let msg = format!("{:?}", result.unwrap_err());
1178 assert!(msg.contains("noop"));
1179 }
1180
1181 #[test]
1182 fn batch_rejects_invalid_value() {
1183 let result =
1184 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&batch=yes");
1185 assert!(result.is_err());
1186 let msg = format!("{:?}", result.unwrap_err());
1187 assert!(msg.contains("batch"));
1188 }
1189
1190 #[test]
1191 fn route_empty_result_set_rejects_invalid_value() {
1192 let result = SqlEndpointConfig::from_uri(
1193 "sql:select 1?db_url=postgres://localhost/test&routeEmptyResultSet=on",
1194 );
1195 assert!(result.is_err());
1196 }
1197
1198 #[test]
1199 fn use_iterator_rejects_invalid_value() {
1200 let result = SqlEndpointConfig::from_uri(
1201 "sql:select 1?db_url=postgres://localhost/test&useIterator=1",
1202 );
1203 assert!(result.is_err());
1204 }
1205
1206 #[test]
1207 fn break_batch_on_consume_fail_rejects_invalid_value() {
1208 let result = SqlEndpointConfig::from_uri(
1209 "sql:select 1?db_url=postgres://localhost/test&breakBatchOnConsumeFail=yes",
1210 );
1211 assert!(result.is_err());
1212 }
1213
1214 #[test]
1215 fn use_message_body_for_sql_rejects_invalid_value() {
1216 let result = SqlEndpointConfig::from_uri(
1217 "sql:select 1?db_url=postgres://localhost/test&useMessageBodyForSql=1",
1218 );
1219 assert!(result.is_err());
1220 }
1221
1222 #[test]
1224 fn boolean_params_case_insensitive() {
1225 let c = SqlEndpointConfig::from_uri(
1226 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=TRUE&noop=FALSE&batch=True&useIterator=False",
1227 )
1228 .unwrap();
1229 assert!(c.use_placeholder);
1230 assert!(!c.noop);
1231 assert!(c.batch);
1232 assert!(!c.use_iterator);
1233 }
1234
1235 #[test]
1237 fn multi_char_placeholder_rejected() {
1238 let result = SqlEndpointConfig::from_uri(
1239 "sql:select 1?db_url=postgres://localhost/test&placeholder=##",
1240 );
1241 assert!(result.is_err());
1242 let msg = format!("{:?}", result.unwrap_err());
1243 assert!(msg.contains("placeholder") && msg.contains("one character"));
1244 }
1245
1246 #[test]
1247 fn single_char_placeholder_accepted() {
1248 let c = SqlEndpointConfig::from_uri(
1249 "sql:select 1?db_url=postgres://localhost/test&placeholder=$",
1250 )
1251 .unwrap();
1252 assert_eq!(c.placeholder, '$');
1253 }
1254
1255 #[test]
1256 fn empty_placeholder_falls_back_to_default() {
1257 let c = SqlEndpointConfig::from_uri(
1259 "sql:select 1?db_url=postgres://localhost/test&placeholder=",
1260 )
1261 .unwrap();
1262 assert_eq!(c.placeholder, '#');
1263 }
1264
1265 #[test]
1267 fn file_query_cached_in_config() {
1268 use std::io::Write;
1269 let unique_name = format!(
1270 "test_sql_cached_{}.sql",
1271 std::time::SystemTime::now()
1272 .duration_since(std::time::UNIX_EPOCH)
1273 .unwrap_or_default()
1274 .as_nanos()
1275 );
1276 let mut tmp = std::env::temp_dir();
1277 tmp.push(unique_name);
1278 {
1279 let mut f = std::fs::File::create(&tmp).unwrap();
1280 writeln!(f, "SELECT * FROM cached_test").unwrap();
1281 }
1282 let uri = format!(
1283 "sql:file:{}?db_url=postgres://localhost/test",
1284 tmp.display()
1285 );
1286 let c = SqlEndpointConfig::from_uri(&uri).unwrap();
1287 assert_eq!(c.query, "SELECT * FROM cached_test");
1289 assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1290 std::fs::remove_file(&tmp).ok();
1292 assert_eq!(c.query, "SELECT * FROM cached_test");
1293 }
1294}