1use std::str::FromStr;
2use std::time::Duration;
3
4use camel_component_api::CamelError;
5use camel_component_api::NetworkRetryPolicy;
6use camel_component_api::{UriComponents, UriConfig, parse_uri};
7use tracing::warn;
8
9fn redacted_opt(opt: &Option<String>) -> Option<&'static str> {
11 if opt.is_some() { Some("***") } else { None }
12}
13
14pub fn redact_db_url(db_url: &str) -> String {
17 match url::Url::parse(db_url) {
18 Ok(mut parsed) => {
19 if parsed.username().is_empty() && parsed.password().is_none() {
20 return db_url.to_string();
21 }
22 let _ = parsed.set_username("***");
23 let _ = parsed.set_password(Some("***"));
24 parsed.to_string()
25 }
26 Err(_) => db_url.to_string(),
27 }
28}
29
30#[derive(Debug, Clone, PartialEq, Default)]
32pub enum SqlOutputType {
33 #[default]
35 SelectList,
36 SelectOne,
38 StreamList,
40}
41
42impl FromStr for SqlOutputType {
43 type Err = CamelError;
44
45 fn from_str(s: &str) -> Result<Self, Self::Err> {
46 match s {
47 "SelectList" => Ok(SqlOutputType::SelectList),
48 "SelectOne" => Ok(SqlOutputType::SelectOne),
49 "StreamList" => Ok(SqlOutputType::StreamList),
50 _ => Err(CamelError::InvalidUri(format!(
51 "Unknown output type: {}",
52 s
53 ))),
54 }
55 }
56}
57
58#[derive(Debug, Clone, PartialEq, Default)]
66pub enum TransactionMode {
67 #[default]
69 Auto,
70 Managed,
72}
73
74impl FromStr for TransactionMode {
75 type Err = CamelError;
76
77 fn from_str(s: &str) -> Result<Self, Self::Err> {
78 match s {
79 "Auto" => Ok(TransactionMode::Auto),
80 "Managed" => Ok(TransactionMode::Managed),
81 _ => Err(CamelError::InvalidUri(format!(
82 "Unknown transaction mode: {}. Expected 'Auto' or 'Managed'",
83 s
84 ))),
85 }
86 }
87}
88
89impl std::fmt::Display for TransactionMode {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 TransactionMode::Auto => write!(f, "Auto"),
93 TransactionMode::Managed => write!(f, "Managed"),
94 }
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Default)]
100pub enum ProcessingStrategy {
101 #[default]
103 Direct,
104 Scheduled,
106}
107
108impl FromStr for ProcessingStrategy {
109 type Err = CamelError;
110
111 fn from_str(s: &str) -> Result<Self, Self::Err> {
112 match s {
113 "Direct" => Ok(ProcessingStrategy::Direct),
114 "Scheduled" => Ok(ProcessingStrategy::Scheduled),
115 _ => Err(CamelError::InvalidUri(format!(
116 "Unknown processing strategy: {}. Expected 'Direct' or 'Scheduled'",
117 s
118 ))),
119 }
120 }
121}
122
123impl std::fmt::Display for ProcessingStrategy {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 match self {
126 ProcessingStrategy::Direct => write!(f, "Direct"),
127 ProcessingStrategy::Scheduled => write!(f, "Scheduled"),
128 }
129 }
130}
131
132#[derive(Debug, Clone, PartialEq, Default)]
134pub enum PollStrategy {
135 #[default]
137 Sequential,
138 Burst,
140}
141
142impl FromStr for PollStrategy {
143 type Err = CamelError;
144
145 fn from_str(s: &str) -> Result<Self, Self::Err> {
146 match s {
147 "Sequential" => Ok(PollStrategy::Sequential),
148 "Burst" => Ok(PollStrategy::Burst),
149 _ => Err(CamelError::InvalidUri(format!(
150 "Unknown poll strategy: {}. Expected 'Sequential' or 'Burst'",
151 s
152 ))),
153 }
154 }
155}
156
157impl std::fmt::Display for PollStrategy {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 match self {
160 PollStrategy::Sequential => write!(f, "Sequential"),
161 PollStrategy::Burst => write!(f, "Burst"),
162 }
163 }
164}
165
166#[derive(Clone, PartialEq, serde::Deserialize)]
173#[serde(default)]
174pub struct SqlGlobalConfig {
175 pub max_connections: u32,
176 pub min_connections: u32,
177 pub idle_timeout_secs: u64,
178 pub max_lifetime_secs: u64,
179 pub ssl_mode: Option<String>,
181 pub ssl_root_cert: Option<String>,
182 pub ssl_cert: Option<String>,
183 pub ssl_key: Option<String>,
184 #[serde(default)]
186 pub retry: NetworkRetryPolicy,
187}
188
189impl std::fmt::Debug for SqlGlobalConfig {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 f.debug_struct("SqlGlobalConfig")
192 .field("max_connections", &self.max_connections)
193 .field("min_connections", &self.min_connections)
194 .field("idle_timeout_secs", &self.idle_timeout_secs)
195 .field("max_lifetime_secs", &self.max_lifetime_secs)
196 .field("ssl_mode", &self.ssl_mode)
197 .field("ssl_root_cert", &self.ssl_root_cert)
198 .field("ssl_cert", &self.ssl_cert)
199 .field("ssl_key", &redacted_opt(&self.ssl_key))
200 .field("retry", &self.retry)
201 .finish()
202 }
203}
204
205impl Default for SqlGlobalConfig {
206 fn default() -> Self {
207 Self {
208 max_connections: 5,
209 min_connections: 1,
210 idle_timeout_secs: 300,
211 max_lifetime_secs: 1800,
212 ssl_mode: None,
213 ssl_root_cert: None,
214 ssl_cert: None,
215 ssl_key: None,
216 retry: NetworkRetryPolicy::default(),
217 }
218 }
219}
220
221impl SqlGlobalConfig {
222 pub fn new() -> Self {
223 Self::default()
224 }
225
226 pub fn with_max_connections(mut self, value: u32) -> Self {
227 self.max_connections = value;
228 self
229 }
230
231 pub fn with_min_connections(mut self, value: u32) -> Self {
232 self.min_connections = value;
233 self
234 }
235
236 pub fn with_idle_timeout_secs(mut self, value: u64) -> Self {
237 self.idle_timeout_secs = value;
238 self
239 }
240
241 pub fn with_max_lifetime_secs(mut self, value: u64) -> Self {
242 self.max_lifetime_secs = value;
243 self
244 }
245
246 pub fn with_ssl_mode(mut self, value: impl Into<String>) -> Self {
247 self.ssl_mode = Some(value.into());
248 self
249 }
250
251 pub fn with_ssl_root_cert(mut self, value: impl Into<String>) -> Self {
252 self.ssl_root_cert = Some(value.into());
253 self
254 }
255
256 pub fn with_ssl_cert(mut self, value: impl Into<String>) -> Self {
257 self.ssl_cert = Some(value.into());
258 self
259 }
260
261 pub fn with_ssl_key(mut self, value: impl Into<String>) -> Self {
262 self.ssl_key = Some(value.into());
263 self
264 }
265
266 pub fn with_retry(mut self, value: NetworkRetryPolicy) -> Self {
267 self.retry = value;
268 self
269 }
270}
271
272#[derive(Clone)]
289pub struct SqlEndpointConfig {
290 pub db_url: String,
293 pub datasource_name: Option<String>,
295 pub max_connections: Option<u32>,
297 pub min_connections: Option<u32>,
299 pub idle_timeout_secs: Option<u64>,
301 pub max_lifetime_secs: Option<u64>,
303
304 pub query: String,
307 pub source_path: Option<String>,
309 pub output_type: SqlOutputType,
311 pub placeholder: char,
313 pub use_placeholder: bool,
315 pub noop: bool,
317 pub in_separator: String,
319
320 pub always_populate_statement: bool,
324
325 pub allow_named_parameters: bool,
329
330 pub fetch_size: Option<u32>,
333
334 pub transaction_mode: TransactionMode,
337
338 pub delay_ms: u64,
341 pub initial_delay_ms: u64,
343 pub max_messages_per_poll: Option<i32>,
345 pub on_consume: Option<String>,
347 pub on_consume_failed: Option<String>,
349 pub on_consume_batch_complete: Option<String>,
351 pub route_empty_result_set: bool,
353 pub use_iterator: bool,
355 pub expected_update_count: Option<i64>,
357 pub break_batch_on_consume_fail: bool,
359 pub bridge_error_handler: bool,
361
362 pub repeat_count: Option<u32>,
366
367 pub processing_strategy: ProcessingStrategy,
370
371 pub poll_strategy: PollStrategy,
374
375 pub batch: bool,
378 pub use_message_body_for_sql: bool,
380
381 pub ssl_mode: Option<String>,
384 pub ssl_root_cert: Option<String>,
386 pub ssl_cert: Option<String>,
388 pub ssl_key: Option<String>,
390
391 pub retry: NetworkRetryPolicy,
393
394 retry_set_from_uri: bool,
398}
399
400impl std::fmt::Debug for SqlEndpointConfig {
401 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
402 f.debug_struct("SqlEndpointConfig")
403 .field("db_url", &redact_db_url(&self.db_url))
404 .field("datasource_name", &self.datasource_name)
405 .field("max_connections", &self.max_connections)
406 .field("min_connections", &self.min_connections)
407 .field("idle_timeout_secs", &self.idle_timeout_secs)
408 .field("max_lifetime_secs", &self.max_lifetime_secs)
409 .field("query", &self.query)
410 .field("source_path", &self.source_path)
411 .field("output_type", &self.output_type)
412 .field("placeholder", &self.placeholder)
413 .field("use_placeholder", &self.use_placeholder)
414 .field("noop", &self.noop)
415 .field("in_separator", &self.in_separator)
416 .field("always_populate_statement", &self.always_populate_statement)
417 .field("allow_named_parameters", &self.allow_named_parameters)
418 .field("fetch_size", &self.fetch_size)
419 .field("transaction_mode", &self.transaction_mode)
420 .field("delay_ms", &self.delay_ms)
421 .field("initial_delay_ms", &self.initial_delay_ms)
422 .field("max_messages_per_poll", &self.max_messages_per_poll)
423 .field("on_consume", &self.on_consume)
424 .field("on_consume_failed", &self.on_consume_failed)
425 .field("on_consume_batch_complete", &self.on_consume_batch_complete)
426 .field("route_empty_result_set", &self.route_empty_result_set)
427 .field("use_iterator", &self.use_iterator)
428 .field("expected_update_count", &self.expected_update_count)
429 .field(
430 "break_batch_on_consume_fail",
431 &self.break_batch_on_consume_fail,
432 )
433 .field("bridge_error_handler", &self.bridge_error_handler)
434 .field("repeat_count", &self.repeat_count)
435 .field("processing_strategy", &self.processing_strategy)
436 .field("poll_strategy", &self.poll_strategy)
437 .field("batch", &self.batch)
438 .field("use_message_body_for_sql", &self.use_message_body_for_sql)
439 .field("ssl_mode", &self.ssl_mode)
440 .field("ssl_root_cert", &self.ssl_root_cert)
441 .field("ssl_cert", &self.ssl_cert)
442 .field("ssl_key", &redacted_opt(&self.ssl_key))
443 .field("retry", &self.retry)
444 .finish()
445 }
446}
447
448impl SqlEndpointConfig {
449 pub fn apply_defaults(&mut self, defaults: &SqlGlobalConfig) {
451 if self.max_connections.is_none() {
452 self.max_connections = Some(defaults.max_connections);
453 }
454 if self.min_connections.is_none() {
455 self.min_connections = Some(defaults.min_connections);
456 }
457 if self.idle_timeout_secs.is_none() {
458 self.idle_timeout_secs = Some(defaults.idle_timeout_secs);
459 }
460 if self.max_lifetime_secs.is_none() {
461 self.max_lifetime_secs = Some(defaults.max_lifetime_secs);
462 }
463 if self.ssl_mode.is_none() {
464 self.ssl_mode = defaults.ssl_mode.clone();
465 }
466 if self.ssl_root_cert.is_none() {
467 self.ssl_root_cert = defaults.ssl_root_cert.clone();
468 }
469 if self.ssl_cert.is_none() {
470 self.ssl_cert = defaults.ssl_cert.clone();
471 }
472 if self.ssl_key.is_none() {
473 self.ssl_key = defaults.ssl_key.clone();
474 }
475 if !self.retry_set_from_uri {
477 self.retry = defaults.retry.clone();
478 }
479 }
480
481 pub fn resolve_defaults(&mut self) {
483 let defaults = SqlGlobalConfig::default();
484 self.apply_defaults(&defaults);
485 }
486
487 pub async fn resolve_file_query(&mut self) -> Result<(), CamelError> {
496 if let Some(file_path) = self.source_path.take() {
497 let contents = tokio::fs::read_to_string(&file_path).await.map_err(|e| {
498 CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
499 })?;
500 self.query = contents.trim().to_string();
501 self.source_path = Some(file_path);
503 }
504 Ok(())
505 }
506}
507
508struct SslParamMapping {
509 pg_key: &'static str,
510 mysql_key: &'static str,
511}
512
513const SSL_MAPPINGS: &[(&str, SslParamMapping)] = &[
514 (
515 "sslMode",
516 SslParamMapping {
517 pg_key: "sslmode",
518 mysql_key: "ssl-mode",
519 },
520 ),
521 (
522 "sslRootCert",
523 SslParamMapping {
524 pg_key: "sslrootcert",
525 mysql_key: "ssl-ca",
526 },
527 ),
528 (
529 "sslCert",
530 SslParamMapping {
531 pg_key: "sslcert",
532 mysql_key: "ssl-cert",
533 },
534 ),
535 (
536 "sslKey",
537 SslParamMapping {
538 pg_key: "sslkey",
539 mysql_key: "ssl-key",
540 },
541 ),
542];
543
544pub fn enrich_db_url_with_ssl(
545 db_url: &str,
546 config: &SqlEndpointConfig,
547) -> Result<String, CamelError> {
548 enrich_db_url_with_ssl_params(
549 db_url,
550 config.ssl_mode.as_deref(),
551 config.ssl_root_cert.as_deref(),
552 config.ssl_cert.as_deref(),
553 config.ssl_key.as_deref(),
554 )
555}
556
557pub(crate) fn enrich_db_url_with_ssl_params(
558 db_url: &str,
559 ssl_mode: Option<&str>,
560 ssl_root_cert: Option<&str>,
561 ssl_cert: Option<&str>,
562 ssl_key: Option<&str>,
563) -> Result<String, CamelError> {
564 let ssl_params: Vec<(&str, &str)> = [
565 ssl_mode.map(|v| ("sslMode", v)),
566 ssl_root_cert.map(|v| ("sslRootCert", v)),
567 ssl_cert.map(|v| ("sslCert", v)),
568 ssl_key.map(|v| ("sslKey", v)),
569 ]
570 .into_iter()
571 .flatten()
572 .collect();
573
574 if ssl_params.is_empty() {
575 return Ok(db_url.to_string());
576 }
577
578 let mut parsed = url::Url::parse(db_url).map_err(|e| {
579 CamelError::InvalidUri(format!(
580 "Cannot parse database URL for SSL enrichment: {}",
581 e
582 ))
583 })?;
584
585 let scheme = parsed.scheme();
586 if scheme.starts_with("sqlite") {
587 warn!(
588 "SSL options configured for SQLite database URL, but SQLite does not support SSL/TLS; ignoring sslMode/sslRootCert/sslCert/sslKey"
589 );
590 return Ok(db_url.to_string());
591 }
592
593 if scheme != "postgres" && scheme != "postgresql" && scheme != "mysql" {
594 return Ok(db_url.to_string());
595 }
596 let is_mysql = scheme == "mysql";
597
598 let mut query_pairs = parsed.query_pairs().collect::<Vec<_>>();
599 for (camel_name, value) in &ssl_params {
600 if let Some((_, mapping)) = SSL_MAPPINGS.iter().find(|(name, _)| *name == *camel_name) {
601 let driver_key = if is_mysql {
602 mapping.mysql_key
603 } else {
604 mapping.pg_key
605 };
606
607 if let Some(pos) = query_pairs.iter().position(|(k, _)| k == driver_key) {
608 query_pairs[pos].1 = (*value).into();
609 } else {
610 query_pairs.push((driver_key.into(), (*value).into()));
611 }
612 }
613 }
614
615 {
616 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
617 for (k, v) in query_pairs {
618 serializer.append_pair(&k, &v);
619 }
620 parsed.set_query(Some(&serializer.finish()));
621 }
622
623 Ok(parsed.to_string())
624}
625
626impl UriConfig for SqlEndpointConfig {
627 fn scheme() -> &'static str {
628 "sql"
629 }
630
631 fn from_uri(uri: &str) -> Result<Self, CamelError> {
632 let parts = parse_uri(uri)?;
633 Self::from_components(parts)
634 }
635
636 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
637 if parts.scheme != Self::scheme() {
639 return Err(CamelError::InvalidUri(format!(
640 "expected scheme '{}' but got '{}'",
641 Self::scheme(),
642 parts.scheme
643 )));
644 }
645
646 let params = &parts.params;
647
648 let (query, source_path) = if parts.path.starts_with("file:") {
653 let file_path = parts.path.trim_start_matches("file:").to_string();
654 (String::new(), Some(file_path))
655 } else {
656 (parts.path.clone(), None)
657 };
658
659 let db_url = params.get("db_url").cloned().unwrap_or_default();
661
662 let datasource_name = params.get("datasource").cloned();
664
665 let max_connections = params.get("maxConnections").and_then(|v| v.parse().ok());
667 let min_connections = params.get("minConnections").and_then(|v| v.parse().ok());
668 let idle_timeout_secs = params.get("idleTimeoutSecs").and_then(|v| v.parse().ok());
669 let max_lifetime_secs = params.get("maxLifetimeSecs").and_then(|v| v.parse().ok());
670
671 let output_type = params
673 .get("outputType")
674 .map(|s| s.parse())
675 .transpose()?
676 .unwrap_or_default();
677 let placeholder = params
678 .get("placeholder")
679 .filter(|v| !v.is_empty())
680 .map(|v| {
681 if v.chars().count() != 1 {
682 return Err(CamelError::InvalidUri(format!(
683 "placeholder must be exactly one character, got '{}'",
684 v
685 )));
686 }
687 if !v.is_ascii() {
688 return Err(CamelError::InvalidUri(
689 "placeholder must be a single ASCII character".to_string(),
690 ));
691 }
692 Ok(v.chars().next().unwrap()) })
694 .transpose()?
695 .unwrap_or('#');
696 fn parse_bool_param(name: &str, value: &str) -> Result<bool, CamelError> {
701 if value.eq_ignore_ascii_case("true") {
702 Ok(true)
703 } else if value.eq_ignore_ascii_case("false") {
704 Ok(false)
705 } else {
706 Err(CamelError::InvalidUri(format!(
707 "{} must be 'true' or 'false', got '{}'",
708 name, value
709 )))
710 }
711 }
712
713 let use_placeholder = params
714 .get("usePlaceholder")
715 .map(|v| parse_bool_param("usePlaceholder", v))
716 .transpose()?
717 .unwrap_or(true);
718 let noop = params
719 .get("noop")
720 .map(|v| parse_bool_param("noop", v))
721 .transpose()?
722 .unwrap_or(false);
723 let in_separator = params
724 .get("inSeparator")
725 .map(|v| v.to_string())
726 .unwrap_or_else(|| ", ".to_string());
727 if in_separator.is_empty() {
728 return Err(CamelError::InvalidUri(
729 "inSeparator must not be empty".to_string(),
730 ));
731 }
732
733 let always_populate_statement = params
735 .get("alwaysPopulateStatement")
736 .map(|v| parse_bool_param("alwaysPopulateStatement", v))
737 .transpose()?
738 .unwrap_or(false);
739
740 let allow_named_parameters = params
742 .get("allowNamedParameters")
743 .map(|v| parse_bool_param("allowNamedParameters", v))
744 .transpose()?
745 .unwrap_or(true);
746
747 let fetch_size = params.get("fetchSize").and_then(|v| v.parse().ok());
749
750 let transaction_mode = params
752 .get("transactionMode")
753 .map(|s| s.parse())
754 .transpose()?
755 .unwrap_or_default();
756
757 let delay_ms = params
759 .get("delay")
760 .and_then(|v| v.parse().ok())
761 .unwrap_or(500);
762 let initial_delay_ms = params
763 .get("initialDelay")
764 .and_then(|v| v.parse().ok())
765 .unwrap_or(1000);
766 let max_messages_per_poll = params
767 .get("maxMessagesPerPoll")
768 .and_then(|v| v.parse().ok());
769 let on_consume = params.get("onConsume").cloned();
770 let on_consume_failed = params.get("onConsumeFailed").cloned();
771 let on_consume_batch_complete = params.get("onConsumeBatchComplete").cloned();
772 let route_empty_result_set = params
773 .get("routeEmptyResultSet")
774 .map(|v| parse_bool_param("routeEmptyResultSet", v))
775 .transpose()?
776 .unwrap_or(false);
777 let use_iterator = params
778 .get("useIterator")
779 .map(|v| parse_bool_param("useIterator", v))
780 .transpose()?
781 .unwrap_or(true);
782 let expected_update_count = params
783 .get("expectedUpdateCount")
784 .and_then(|v| v.parse().ok());
785 let break_batch_on_consume_fail = params
786 .get("breakBatchOnConsumeFail")
787 .map(|v| parse_bool_param("breakBatchOnConsumeFail", v))
788 .transpose()?
789 .unwrap_or(false);
790 let bridge_error_handler = params
791 .get("bridgeErrorHandler")
792 .map(|v| parse_bool_param("bridgeErrorHandler", v))
793 .transpose()?
794 .unwrap_or(false);
795
796 let repeat_count = params.get("repeatCount").and_then(|v| v.parse().ok());
798
799 let processing_strategy = params
801 .get("processingStrategy")
802 .map(|s| s.parse())
803 .transpose()?
804 .unwrap_or_default();
805
806 let poll_strategy = params
808 .get("pollStrategy")
809 .map(|s| s.parse())
810 .transpose()?
811 .unwrap_or_default();
812
813 let batch = params
815 .get("batch")
816 .map(|v| parse_bool_param("batch", v))
817 .transpose()?
818 .unwrap_or(false);
819 let use_message_body_for_sql = params
820 .get("useMessageBodyForSql")
821 .map(|v| parse_bool_param("useMessageBodyForSql", v))
822 .transpose()?
823 .unwrap_or(false);
824 let ssl_mode = params.get("sslMode").cloned();
825 let ssl_root_cert = params.get("sslRootCert").cloned();
826 let ssl_cert = params.get("sslCert").cloned();
827 let ssl_key = params.get("sslKey").cloned();
828
829 let mut retry = NetworkRetryPolicy::default();
831 let mut retry_set_from_uri = false;
832 if let Some(raw) = params.get("retryEnabled") {
833 retry.enabled = raw.parse::<bool>().map_err(|_| {
834 CamelError::InvalidUri(format!("retryEnabled must be a boolean, got '{raw}'"))
835 })?;
836 retry_set_from_uri = true;
837 }
838 if let Some(raw) = params.get("retryMaxAttempts") {
839 retry.max_attempts = raw.parse::<u32>().map_err(|_| {
840 CamelError::InvalidUri(format!("retryMaxAttempts must be a u32, got '{raw}'"))
841 })?;
842 retry_set_from_uri = true;
843 }
844 if let Some(raw) = params.get("retryInitialDelayMs") {
845 retry.initial_delay = Duration::from_millis(raw.parse::<u64>().map_err(|_| {
846 CamelError::InvalidUri(format!("retryInitialDelayMs must be a u64, got '{raw}'"))
847 })?);
848 retry_set_from_uri = true;
849 }
850 if let Some(raw) = params.get("retryMultiplier") {
851 retry.multiplier = raw.parse::<f64>().map_err(|_| {
852 CamelError::InvalidUri(format!("retryMultiplier must be a f64, got '{raw}'"))
853 })?;
854 retry_set_from_uri = true;
855 }
856 if let Some(raw) = params.get("retryMaxDelayMs") {
857 retry.max_delay = Duration::from_millis(raw.parse::<u64>().map_err(|_| {
858 CamelError::InvalidUri(format!("retryMaxDelayMs must be a u64, got '{raw}'"))
859 })?);
860 retry_set_from_uri = true;
861 }
862 if let Some(raw) = params.get("retryJitter") {
863 retry.jitter_factor = raw.parse::<f64>().map_err(|_| {
864 CamelError::InvalidUri(format!("retryJitter must be a f64, got '{raw}'"))
865 })?;
866 retry_set_from_uri = true;
867 }
868
869 if datasource_name.is_none() && db_url.is_empty() {
870 return Err(CamelError::Config(
871 "either 'datasource' or 'db_url' parameter is required".to_string(),
872 ));
873 }
874
875 if datasource_name.is_some() && !db_url.is_empty() {
876 return Err(CamelError::InvalidUri(
877 "'db_url' not allowed with named datasource — use 'datasource' alone".to_string(),
878 ));
879 }
880
881 if datasource_name.is_some() {
882 let overrides: Vec<&str> = {
883 let mut v = Vec::new();
884 if max_connections.is_some() {
885 v.push("maxConnections");
886 }
887 if min_connections.is_some() {
888 v.push("minConnections");
889 }
890 if idle_timeout_secs.is_some() {
891 v.push("idleTimeoutSecs");
892 }
893 if max_lifetime_secs.is_some() {
894 v.push("maxLifetimeSecs");
895 }
896 if ssl_mode.is_some() {
897 v.push("sslMode");
898 }
899 if ssl_root_cert.is_some() {
900 v.push("sslRootCert");
901 }
902 if ssl_cert.is_some() {
903 v.push("sslCert");
904 }
905 if ssl_key.is_some() {
906 v.push("sslKey");
907 }
908 v
909 };
910 if !overrides.is_empty() {
911 return Err(CamelError::InvalidUri(format!(
912 "pool-affecting params not allowed with named datasource: {}",
913 overrides.join(", ")
914 )));
915 }
916 }
917
918 Ok(Self {
919 db_url,
920 datasource_name,
921 max_connections,
922 min_connections,
923 idle_timeout_secs,
924 max_lifetime_secs,
925 query,
926 source_path,
927 output_type,
928 placeholder,
929 use_placeholder,
930 noop,
931 in_separator,
932 always_populate_statement,
933 allow_named_parameters,
934 fetch_size,
935 transaction_mode,
936 delay_ms,
937 initial_delay_ms,
938 max_messages_per_poll,
939 on_consume,
940 on_consume_failed,
941 on_consume_batch_complete,
942 route_empty_result_set,
943 use_iterator,
944 expected_update_count,
945 break_batch_on_consume_fail,
946 bridge_error_handler,
947 repeat_count,
948 processing_strategy,
949 poll_strategy,
950 batch,
951 use_message_body_for_sql,
952 ssl_mode,
953 ssl_root_cert,
954 ssl_cert,
955 ssl_key,
956 retry,
957 retry_set_from_uri,
958 })
959 }
960}
961
962#[cfg(test)]
963mod tests {
964 use super::*;
965 use camel_component_api::NetworkRetryPolicy;
966
967 #[test]
968 fn config_defaults() {
969 let mut c =
970 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
971 c.resolve_defaults();
972 assert_eq!(c.query, "select 1");
973 assert_eq!(c.db_url, "postgres://localhost/test");
974 assert_eq!(c.max_connections, Some(5));
975 assert_eq!(c.min_connections, Some(1));
976 assert_eq!(c.idle_timeout_secs, Some(300));
977 assert_eq!(c.max_lifetime_secs, Some(1800));
978 assert_eq!(c.output_type, SqlOutputType::SelectList);
979 assert_eq!(c.placeholder, '#');
980 assert!(!c.noop);
981 assert_eq!(c.in_separator, ", ");
982 assert_eq!(c.delay_ms, 500);
983 assert_eq!(c.initial_delay_ms, 1000);
984 assert!(c.max_messages_per_poll.is_none());
985 assert!(c.on_consume.is_none());
986 assert!(c.on_consume_failed.is_none());
987 assert!(c.on_consume_batch_complete.is_none());
988 assert!(!c.route_empty_result_set);
989 assert!(c.use_iterator);
990 assert!(c.expected_update_count.is_none());
991 assert!(!c.break_batch_on_consume_fail);
992 assert!(!c.batch);
993 assert!(!c.use_message_body_for_sql);
994 assert!(c.ssl_mode.is_none());
995 assert!(c.ssl_root_cert.is_none());
996 assert!(c.ssl_cert.is_none());
997 assert!(c.ssl_key.is_none());
998 assert!(!c.always_populate_statement);
1000 assert!(c.allow_named_parameters);
1001 assert!(c.fetch_size.is_none());
1002 assert_eq!(c.transaction_mode, TransactionMode::Auto);
1003 assert!(c.repeat_count.is_none());
1004 assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
1005 assert_eq!(c.poll_strategy, PollStrategy::Sequential);
1006 }
1007
1008 #[test]
1009 fn ssl_none_by_default() {
1010 let c =
1011 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1012 assert!(c.ssl_mode.is_none());
1013 assert!(c.ssl_root_cert.is_none());
1014 assert!(c.ssl_cert.is_none());
1015 assert!(c.ssl_key.is_none());
1016 }
1017
1018 #[test]
1019 fn ssl_mode_from_uri() {
1020 let c = SqlEndpointConfig::from_uri(
1021 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1022 )
1023 .unwrap();
1024 assert_eq!(c.ssl_mode, Some("require".to_string()));
1025 assert!(c.ssl_root_cert.is_none());
1026 }
1027
1028 #[test]
1029 fn ssl_all_params_from_uri() {
1030 let c = SqlEndpointConfig::from_uri(
1031 "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
1032 )
1033 .unwrap();
1034 assert_eq!(c.ssl_mode, Some("require".to_string()));
1035 assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
1036 assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
1037 assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
1038 }
1039
1040 #[test]
1041 fn ssl_global_applied_to_endpoint() {
1042 let mut c =
1043 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1044 let global = SqlGlobalConfig::default()
1045 .with_ssl_mode("require")
1046 .with_ssl_root_cert("/etc/ssl/ca.pem");
1047 c.apply_defaults(&global);
1048 assert_eq!(c.ssl_mode, Some("require".to_string()));
1049 assert_eq!(c.ssl_root_cert, Some("/etc/ssl/ca.pem".to_string()));
1050 assert!(c.ssl_cert.is_none());
1051 assert!(c.ssl_key.is_none());
1052 }
1053
1054 #[test]
1055 fn ssl_uri_overrides_global() {
1056 let mut c = SqlEndpointConfig::from_uri(
1057 "sql:select 1?db_url=postgres://localhost/test&sslMode=verify-full",
1058 )
1059 .unwrap();
1060 let global = SqlGlobalConfig::default().with_ssl_mode("require");
1061 c.apply_defaults(&global);
1062 assert_eq!(c.ssl_mode, Some("verify-full".to_string()));
1063 }
1064
1065 #[test]
1066 fn config_wrong_scheme() {
1067 assert!(SqlEndpointConfig::from_uri("redis://localhost:6379").is_err());
1068 }
1069
1070 #[test]
1071 fn config_missing_db_url() {
1072 assert!(SqlEndpointConfig::from_uri("sql:select 1").is_err());
1073 }
1074
1075 #[test]
1076 fn config_output_type_select_one() {
1077 let c = SqlEndpointConfig::from_uri(
1078 "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
1079 )
1080 .unwrap();
1081 assert_eq!(c.output_type, SqlOutputType::SelectOne);
1082 }
1083
1084 #[test]
1085 fn config_output_type_stream_list() {
1086 let c = SqlEndpointConfig::from_uri(
1087 "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
1088 )
1089 .unwrap();
1090 assert_eq!(c.output_type, SqlOutputType::StreamList);
1091 }
1092
1093 #[test]
1094 fn in_separator_default() {
1095 let c =
1096 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1097 assert_eq!(c.in_separator, ", ");
1098 }
1099
1100 #[test]
1101 fn in_separator_from_uri() {
1102 let c = SqlEndpointConfig::from_uri(
1103 "sql:select 1?db_url=postgres://localhost/test&inSeparator=;",
1104 )
1105 .unwrap();
1106 assert_eq!(c.in_separator, ";");
1107 }
1108
1109 #[test]
1110 fn in_separator_empty_rejected() {
1111 let result = SqlEndpointConfig::from_uri(
1112 "sql:select 1?db_url=postgres://localhost/test&inSeparator=",
1113 );
1114 assert!(result.is_err());
1115 let msg = format!("{:?}", result.unwrap_err());
1116 assert!(msg.contains("inSeparator") || msg.contains("empty"));
1117 }
1118
1119 #[test]
1120 fn config_consumer_options() {
1121 let c = SqlEndpointConfig::from_uri(
1122 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
1123 ).unwrap();
1124 assert_eq!(c.delay_ms, 2000);
1125 assert_eq!(c.initial_delay_ms, 500);
1126 assert_eq!(c.max_messages_per_poll, Some(10));
1127 assert_eq!(
1128 c.on_consume,
1129 Some("update t set done=true where id=:#id".to_string())
1130 );
1131 assert_eq!(
1132 c.on_consume_failed,
1133 Some("update t set failed=true where id=:#id".to_string())
1134 );
1135 assert_eq!(
1136 c.on_consume_batch_complete,
1137 Some("delete from t where done=true".to_string())
1138 );
1139 assert!(c.route_empty_result_set);
1140 assert!(!c.use_iterator);
1141 assert_eq!(c.expected_update_count, Some(1));
1142 assert!(c.break_batch_on_consume_fail);
1143 assert!(!c.bridge_error_handler);
1144 }
1145
1146 #[test]
1147 fn config_producer_options() {
1148 let c = SqlEndpointConfig::from_uri(
1149 "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
1150 ).unwrap();
1151 assert!(c.batch);
1152 assert!(c.use_message_body_for_sql);
1153 assert!(c.noop);
1154 }
1155
1156 #[test]
1157 fn config_pool_options() {
1158 let c = SqlEndpointConfig::from_uri(
1159 "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
1160 ).unwrap();
1161 assert_eq!(c.max_connections, Some(20));
1162 assert_eq!(c.min_connections, Some(3));
1163 assert_eq!(c.idle_timeout_secs, Some(600));
1164 assert_eq!(c.max_lifetime_secs, Some(3600));
1165 }
1166
1167 #[test]
1168 fn config_query_with_special_chars() {
1169 let c = SqlEndpointConfig::from_uri(
1170 "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
1171 )
1172 .unwrap();
1173 assert_eq!(
1174 c.query,
1175 "select * from users where name = :#name and age > #"
1176 );
1177 }
1178
1179 #[test]
1180 fn output_type_from_str() {
1181 assert_eq!(
1182 "SelectList".parse::<SqlOutputType>().unwrap(),
1183 SqlOutputType::SelectList
1184 );
1185 assert_eq!(
1186 "SelectOne".parse::<SqlOutputType>().unwrap(),
1187 SqlOutputType::SelectOne
1188 );
1189 assert_eq!(
1190 "StreamList".parse::<SqlOutputType>().unwrap(),
1191 SqlOutputType::StreamList
1192 );
1193 assert!("Invalid".parse::<SqlOutputType>().is_err());
1194 }
1195
1196 #[tokio::test]
1198 async fn config_file_not_found() {
1199 let mut config = SqlEndpointConfig::from_uri(
1200 "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
1201 )
1202 .expect("from_uri should defer file reading");
1203 assert_eq!(
1205 config.source_path,
1206 Some("/nonexistent/path/query.sql".to_string())
1207 );
1208 assert!(config.query.is_empty());
1209
1210 let result = config.resolve_file_query().await;
1212 assert!(result.is_err());
1213 let msg = format!("{:?}", result.unwrap_err());
1214 assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
1215 }
1216
1217 #[tokio::test]
1219 async fn config_file_query() {
1220 use std::io::Write;
1221 let unique_name = format!(
1222 "test_sql_query_{}.sql",
1223 std::time::SystemTime::now()
1224 .duration_since(std::time::UNIX_EPOCH)
1225 .unwrap_or_default()
1226 .as_nanos()
1227 );
1228 let mut tmp = std::env::temp_dir();
1229 tmp.push(unique_name);
1230 {
1231 let mut f = std::fs::File::create(&tmp).unwrap();
1232 writeln!(f, "SELECT * FROM users").unwrap();
1233 }
1234 let uri = format!(
1235 "sql:file:{}?db_url=postgres://localhost/test",
1236 tmp.display()
1237 );
1238 let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1239 assert!(c.query.is_empty());
1241 assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1242
1243 c.resolve_file_query()
1245 .await
1246 .expect("file query should resolve");
1247 assert_eq!(c.query, "SELECT * FROM users");
1248 std::fs::remove_file(&tmp).ok();
1249 }
1250
1251 #[test]
1253 fn pool_fields_none_when_not_set() {
1254 let c =
1255 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1256 assert_eq!(c.max_connections, None);
1257 assert_eq!(c.min_connections, None);
1258 assert_eq!(c.idle_timeout_secs, None);
1259 assert_eq!(c.max_lifetime_secs, None);
1260 }
1261
1262 #[test]
1263 fn apply_defaults_fills_none() {
1264 let mut c =
1265 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1266 let global = SqlGlobalConfig {
1267 max_connections: 10,
1268 min_connections: 2,
1269 idle_timeout_secs: 600,
1270 max_lifetime_secs: 3600,
1271 ssl_mode: None,
1272 ssl_root_cert: None,
1273 ssl_cert: None,
1274 ssl_key: None,
1275 retry: NetworkRetryPolicy::default(),
1276 };
1277 c.apply_defaults(&global);
1278 assert_eq!(c.max_connections, Some(10));
1279 assert_eq!(c.min_connections, Some(2));
1280 assert_eq!(c.idle_timeout_secs, Some(600));
1281 assert_eq!(c.max_lifetime_secs, Some(3600));
1282 assert!(c.ssl_mode.is_none());
1283 assert!(c.ssl_root_cert.is_none());
1284 assert!(c.ssl_cert.is_none());
1285 assert!(c.ssl_key.is_none());
1286 }
1287
1288 #[test]
1289 fn apply_defaults_does_not_override() {
1290 let mut c = SqlEndpointConfig::from_uri(
1291 "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=5",
1292 )
1293 .unwrap();
1294 let global = SqlGlobalConfig {
1295 max_connections: 10,
1296 min_connections: 2,
1297 idle_timeout_secs: 600,
1298 max_lifetime_secs: 3600,
1299 ssl_mode: None,
1300 ssl_root_cert: None,
1301 ssl_cert: None,
1302 ssl_key: None,
1303 retry: NetworkRetryPolicy::default(),
1304 };
1305 c.apply_defaults(&global);
1306 assert_eq!(c.max_connections, Some(99));
1308 assert_eq!(c.min_connections, Some(5));
1309 assert_eq!(c.idle_timeout_secs, Some(600));
1311 assert_eq!(c.max_lifetime_secs, Some(3600));
1312 }
1313
1314 #[test]
1315 fn resolve_defaults_fills_remaining() {
1316 let mut c = SqlEndpointConfig::from_uri(
1317 "sql:select 1?db_url=postgres://localhost/test&maxConnections=7",
1318 )
1319 .unwrap();
1320 c.resolve_defaults();
1321 assert_eq!(c.max_connections, Some(7)); assert_eq!(c.min_connections, Some(1)); assert_eq!(c.idle_timeout_secs, Some(300)); assert_eq!(c.max_lifetime_secs, Some(1800)); }
1326
1327 #[test]
1328 fn global_config_builder() {
1329 let c = SqlGlobalConfig::default()
1330 .with_max_connections(20)
1331 .with_min_connections(3)
1332 .with_idle_timeout_secs(600)
1333 .with_max_lifetime_secs(3600)
1334 .with_ssl_mode("require")
1335 .with_ssl_root_cert("/ca.pem")
1336 .with_ssl_cert("/cert.pem")
1337 .with_ssl_key("/key.pem");
1338 assert_eq!(c.max_connections, 20);
1339 assert_eq!(c.min_connections, 3);
1340 assert_eq!(c.idle_timeout_secs, 600);
1341 assert_eq!(c.max_lifetime_secs, 3600);
1342 assert_eq!(c.ssl_mode, Some("require".to_string()));
1343 assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
1344 assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
1345 assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
1346 }
1347
1348 #[test]
1349 fn enrich_postgres_ssl_mode() {
1350 let mut c = SqlEndpointConfig::from_uri(
1351 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1352 )
1353 .unwrap();
1354 c.resolve_defaults();
1355 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1356 assert!(url.contains("sslmode=require"), "got: {}", url);
1357 }
1358
1359 #[test]
1360 fn enrich_postgres_all_ssl() {
1361 let mut c = SqlEndpointConfig::from_uri(
1362 "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
1363 )
1364 .unwrap();
1365 c.resolve_defaults();
1366 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1367 assert!(url.contains("sslmode=require"), "got: {}", url);
1368 assert!(url.contains("sslrootcert="), "got: {}", url);
1369 assert!(url.contains("sslcert="), "got: {}", url);
1370 assert!(url.contains("sslkey="), "got: {}", url);
1371 }
1372
1373 #[test]
1374 fn enrich_mysql_ssl() {
1375 let mut c = SqlEndpointConfig::from_uri(
1376 "sql:select 1?db_url=mysql://localhost/test&sslMode=require",
1377 )
1378 .unwrap();
1379 c.resolve_defaults();
1380 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1381 assert!(url.contains("ssl-mode=require"), "got: {}", url);
1382 }
1383
1384 #[test]
1385 fn enrich_existing_query_params() {
1386 let mut c = SqlEndpointConfig::from_uri(
1387 "sql:select 1?db_url=postgres://localhost/test?existing=1&sslMode=require",
1388 )
1389 .unwrap();
1390 c.resolve_defaults();
1391 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1392 assert!(url.contains("existing=1"), "got: {}", url);
1393 assert!(url.contains("sslmode=require"), "got: {}", url);
1394 }
1395
1396 #[test]
1397 fn enrich_override_existing() {
1398 let mut c = SqlEndpointConfig::from_uri(
1399 "sql:select 1?db_url=postgres://localhost/test?sslmode=allow&sslMode=require",
1400 )
1401 .unwrap();
1402 c.resolve_defaults();
1403 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1404 assert!(url.contains("sslmode=require"), "got: {}", url);
1405 assert!(!url.contains("sslmode=allow"), "got: {}", url);
1406 }
1407
1408 #[test]
1409 fn enrich_no_params() {
1410 let mut c =
1411 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1412 c.resolve_defaults();
1413 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1414 assert_eq!(url, "postgres://localhost/test");
1415 }
1416
1417 #[test]
1418 fn enrich_url_encodes_paths() {
1419 let mut c = SqlEndpointConfig::from_uri(
1420 "sql:select 1?db_url=postgres://localhost/test&sslRootCert=/path/to/my%20cert.pem",
1421 )
1422 .unwrap();
1423 c.resolve_defaults();
1424 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1425 assert!(url.contains("sslrootcert="), "got: {}", url);
1426 }
1427
1428 #[test]
1429 fn enrich_unsupported_scheme_returns_unchanged() {
1430 let mut c = SqlEndpointConfig::from_uri(
1431 "sql:select 1?db_url=sqlite://localhost/test.db&sslMode=require",
1432 )
1433 .unwrap();
1434 c.resolve_defaults();
1435 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1436 assert_eq!(url, "sqlite://localhost/test.db");
1437 }
1438
1439 #[test]
1440 fn enrich_invalid_url_returns_error() {
1441 let mut c = SqlEndpointConfig::from_uri(
1442 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1443 )
1444 .unwrap();
1445 c.resolve_defaults();
1446 let result = enrich_db_url_with_ssl("://not-a-valid-url", &c);
1447 assert!(result.is_err());
1448 }
1449
1450 #[test]
1454 fn debug_redacts_db_url_with_password() {
1455 let c = SqlEndpointConfig::from_uri(
1456 "sql:select 1?db_url=postgres://user:secret123@localhost/test",
1457 )
1458 .unwrap();
1459 let debug_output = format!("{:?}", c);
1460 assert!(
1461 !debug_output.contains("secret123"),
1462 "Debug output must not contain password: {}",
1463 debug_output
1464 );
1465 assert!(
1466 debug_output.contains("***"),
1467 "Debug output must contain redacted marker: {}",
1468 debug_output
1469 );
1470 }
1471
1472 #[test]
1473 fn debug_redacts_ssl_key() {
1474 let c = SqlEndpointConfig::from_uri(
1475 "sql:select 1?db_url=postgres://localhost/test&sslKey=/secret/key.pem",
1476 )
1477 .unwrap();
1478 let debug_output = format!("{:?}", c);
1479 assert!(
1480 !debug_output.contains("/secret/key.pem"),
1481 "Debug output must not contain ssl_key path: {}",
1482 debug_output
1483 );
1484 }
1485
1486 #[test]
1487 fn debug_global_config_redacts_ssl_key() {
1488 let c = SqlGlobalConfig::default().with_ssl_key("/secret/key.pem");
1489 let debug_output = format!("{:?}", c);
1490 assert!(
1491 !debug_output.contains("/secret/key.pem"),
1492 "Debug output must not contain ssl_key path: {}",
1493 debug_output
1494 );
1495 assert!(
1496 debug_output.contains("***"),
1497 "Debug output must contain redacted marker: {}",
1498 debug_output
1499 );
1500 }
1501
1502 #[test]
1503 fn redact_db_url_with_credentials() {
1504 assert_eq!(
1505 redact_db_url("postgres://user:pass@host/db"),
1506 "postgres://***:***@host/db"
1507 );
1508 }
1509
1510 #[test]
1511 fn redact_db_url_without_credentials() {
1512 assert_eq!(redact_db_url("sqlite::memory:"), "sqlite::memory:");
1513 }
1514
1515 #[test]
1516 fn redact_db_url_invalid_returns_original() {
1517 assert_eq!(redact_db_url("not-a-url"), "not-a-url");
1518 }
1519
1520 #[test]
1522 fn use_placeholder_defaults_to_true() {
1523 let c =
1524 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1525 assert!(c.use_placeholder);
1526 }
1527
1528 #[test]
1529 fn use_placeholder_false_from_uri() {
1530 let c = SqlEndpointConfig::from_uri(
1531 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=false",
1532 )
1533 .unwrap();
1534 assert!(!c.use_placeholder);
1535 }
1536
1537 #[test]
1538 fn use_placeholder_true_from_uri() {
1539 let c = SqlEndpointConfig::from_uri(
1540 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=true",
1541 )
1542 .unwrap();
1543 assert!(c.use_placeholder);
1544 }
1545
1546 #[test]
1548 fn use_placeholder_rejects_invalid_value() {
1549 let result = SqlEndpointConfig::from_uri(
1550 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=1",
1551 );
1552 assert!(result.is_err());
1553 let msg = format!("{:?}", result.unwrap_err());
1554 assert!(msg.contains("usePlaceholder") && msg.contains("true") && msg.contains("false"));
1555 }
1556
1557 #[test]
1558 fn use_placeholder_rejects_typo_tru() {
1559 let result = SqlEndpointConfig::from_uri(
1560 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=tru",
1561 );
1562 assert!(result.is_err());
1563 }
1564
1565 #[test]
1566 fn use_placeholder_rejects_yes() {
1567 let result = SqlEndpointConfig::from_uri(
1568 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=yes",
1569 );
1570 assert!(result.is_err());
1571 }
1572
1573 #[test]
1574 fn noop_rejects_invalid_value() {
1575 let result =
1576 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&noop=1");
1577 assert!(result.is_err());
1578 let msg = format!("{:?}", result.unwrap_err());
1579 assert!(msg.contains("noop"));
1580 }
1581
1582 #[test]
1583 fn batch_rejects_invalid_value() {
1584 let result =
1585 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&batch=yes");
1586 assert!(result.is_err());
1587 let msg = format!("{:?}", result.unwrap_err());
1588 assert!(msg.contains("batch"));
1589 }
1590
1591 #[test]
1592 fn route_empty_result_set_rejects_invalid_value() {
1593 let result = SqlEndpointConfig::from_uri(
1594 "sql:select 1?db_url=postgres://localhost/test&routeEmptyResultSet=on",
1595 );
1596 assert!(result.is_err());
1597 }
1598
1599 #[test]
1600 fn use_iterator_rejects_invalid_value() {
1601 let result = SqlEndpointConfig::from_uri(
1602 "sql:select 1?db_url=postgres://localhost/test&useIterator=1",
1603 );
1604 assert!(result.is_err());
1605 }
1606
1607 #[test]
1608 fn break_batch_on_consume_fail_rejects_invalid_value() {
1609 let result = SqlEndpointConfig::from_uri(
1610 "sql:select 1?db_url=postgres://localhost/test&breakBatchOnConsumeFail=yes",
1611 );
1612 assert!(result.is_err());
1613 }
1614
1615 #[test]
1616 fn use_message_body_for_sql_rejects_invalid_value() {
1617 let result = SqlEndpointConfig::from_uri(
1618 "sql:select 1?db_url=postgres://localhost/test&useMessageBodyForSql=1",
1619 );
1620 assert!(result.is_err());
1621 }
1622
1623 #[test]
1625 fn boolean_params_case_insensitive() {
1626 let c = SqlEndpointConfig::from_uri(
1627 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=TRUE&noop=FALSE&batch=True&useIterator=False&bridgeErrorHandler=TRUE",
1628 )
1629 .unwrap();
1630 assert!(c.use_placeholder);
1631 assert!(!c.noop);
1632 assert!(c.batch);
1633 assert!(!c.use_iterator);
1634 assert!(c.bridge_error_handler);
1635 }
1636
1637 #[test]
1639 fn multi_char_placeholder_rejected() {
1640 let result = SqlEndpointConfig::from_uri(
1641 "sql:select 1?db_url=postgres://localhost/test&placeholder=##",
1642 );
1643 assert!(result.is_err());
1644 let msg = format!("{:?}", result.unwrap_err());
1645 assert!(msg.contains("placeholder") && msg.contains("one character"));
1646 }
1647
1648 #[test]
1649 fn non_ascii_placeholder_rejected() {
1650 let result = SqlEndpointConfig::from_uri(
1651 "sql:select 1?db_url=postgres://localhost/test&placeholder=%C2%A2",
1652 );
1653 assert!(result.is_err());
1654 }
1655
1656 #[test]
1657 fn single_char_placeholder_accepted() {
1658 let c = SqlEndpointConfig::from_uri(
1659 "sql:select 1?db_url=postgres://localhost/test&placeholder=$",
1660 )
1661 .unwrap();
1662 assert_eq!(c.placeholder, '$');
1663 }
1664
1665 #[test]
1666 fn empty_placeholder_falls_back_to_default() {
1667 let c = SqlEndpointConfig::from_uri(
1669 "sql:select 1?db_url=postgres://localhost/test&placeholder=",
1670 )
1671 .unwrap();
1672 assert_eq!(c.placeholder, '#');
1673 }
1674
1675 #[tokio::test]
1677 async fn file_query_cached_in_config() {
1678 use std::io::Write;
1679 let unique_name = format!(
1680 "test_sql_cached_{}.sql",
1681 std::time::SystemTime::now()
1682 .duration_since(std::time::UNIX_EPOCH)
1683 .unwrap_or_default()
1684 .as_nanos()
1685 );
1686 let mut tmp = std::env::temp_dir();
1687 tmp.push(unique_name);
1688 {
1689 let mut f = std::fs::File::create(&tmp).unwrap();
1690 writeln!(f, "SELECT * FROM cached_test").unwrap();
1691 }
1692 let uri = format!(
1693 "sql:file:{}?db_url=postgres://localhost/test",
1694 tmp.display()
1695 );
1696 let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1697 assert!(c.query.is_empty());
1699 assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1700
1701 c.resolve_file_query()
1703 .await
1704 .expect("resolve should succeed");
1705 assert_eq!(c.query, "SELECT * FROM cached_test");
1706
1707 std::fs::remove_file(&tmp).ok();
1709 assert_eq!(c.query, "SELECT * FROM cached_test");
1710 }
1711
1712 #[test]
1716 fn always_populate_statement_defaults_to_false() {
1717 let c =
1718 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1719 assert!(!c.always_populate_statement);
1720 }
1721
1722 #[test]
1723 fn always_populate_statement_from_uri() {
1724 let c = SqlEndpointConfig::from_uri(
1725 "sql:select 1?db_url=postgres://localhost/test&alwaysPopulateStatement=true",
1726 )
1727 .unwrap();
1728 assert!(c.always_populate_statement);
1729 }
1730
1731 #[test]
1733 fn allow_named_parameters_defaults_to_true() {
1734 let c =
1735 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1736 assert!(c.allow_named_parameters);
1737 }
1738
1739 #[test]
1740 fn allow_named_parameters_false_from_uri() {
1741 let c = SqlEndpointConfig::from_uri(
1742 "sql:select 1?db_url=postgres://localhost/test&allowNamedParameters=false",
1743 )
1744 .unwrap();
1745 assert!(!c.allow_named_parameters);
1746 }
1747
1748 #[test]
1750 fn fetch_size_defaults_to_none() {
1751 let c =
1752 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1753 assert!(c.fetch_size.is_none());
1754 }
1755
1756 #[test]
1757 fn fetch_size_from_uri() {
1758 let c = SqlEndpointConfig::from_uri(
1759 "sql:select 1?db_url=postgres://localhost/test&fetchSize=1000",
1760 )
1761 .unwrap();
1762 assert_eq!(c.fetch_size, Some(1000));
1763 }
1764
1765 #[test]
1767 fn transaction_mode_defaults_to_auto() {
1768 let c =
1769 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1770 assert_eq!(c.transaction_mode, TransactionMode::Auto);
1771 }
1772
1773 #[test]
1774 fn transaction_mode_managed_from_uri() {
1775 let c = SqlEndpointConfig::from_uri(
1776 "sql:select 1?db_url=postgres://localhost/test&transactionMode=Managed",
1777 )
1778 .unwrap();
1779 assert_eq!(c.transaction_mode, TransactionMode::Managed);
1780 }
1781
1782 #[test]
1783 fn transaction_mode_invalid_rejected() {
1784 let result = SqlEndpointConfig::from_uri(
1785 "sql:select 1?db_url=postgres://localhost/test&transactionMode=Invalid",
1786 );
1787 assert!(result.is_err());
1788 }
1789
1790 #[test]
1792 fn repeat_count_defaults_to_none() {
1793 let c =
1794 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1795 assert!(c.repeat_count.is_none());
1796 }
1797
1798 #[test]
1799 fn repeat_count_from_uri() {
1800 let c = SqlEndpointConfig::from_uri(
1801 "sql:select 1?db_url=postgres://localhost/test&repeatCount=10",
1802 )
1803 .unwrap();
1804 assert_eq!(c.repeat_count, Some(10));
1805 }
1806
1807 #[test]
1809 fn processing_strategy_defaults_to_direct() {
1810 let c =
1811 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1812 assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
1813 }
1814
1815 #[test]
1816 fn processing_strategy_scheduled_from_uri() {
1817 let c = SqlEndpointConfig::from_uri(
1818 "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Scheduled",
1819 )
1820 .unwrap();
1821 assert_eq!(c.processing_strategy, ProcessingStrategy::Scheduled);
1822 }
1823
1824 #[test]
1825 fn processing_strategy_invalid_rejected() {
1826 let result = SqlEndpointConfig::from_uri(
1827 "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Invalid",
1828 );
1829 assert!(result.is_err());
1830 }
1831
1832 #[test]
1834 fn poll_strategy_defaults_to_sequential() {
1835 let c =
1836 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1837 assert_eq!(c.poll_strategy, PollStrategy::Sequential);
1838 }
1839
1840 #[test]
1841 fn poll_strategy_burst_from_uri() {
1842 let c = SqlEndpointConfig::from_uri(
1843 "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Burst",
1844 )
1845 .unwrap();
1846 assert_eq!(c.poll_strategy, PollStrategy::Burst);
1847 }
1848
1849 #[test]
1850 fn poll_strategy_invalid_rejected() {
1851 let result = SqlEndpointConfig::from_uri(
1852 "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Invalid",
1853 );
1854 assert!(result.is_err());
1855 }
1856
1857 #[test]
1860 fn sql_endpoint_config_has_retry_policy() {
1861 let cfg = SqlEndpointConfig::from_uri(
1862 "sql:select 1?db_url=sqlite::memory:&retryMaxAttempts=3&retryInitialDelayMs=500",
1863 )
1864 .expect("parse");
1865 assert_eq!(cfg.retry.max_attempts, 3);
1866 assert_eq!(
1867 cfg.retry.initial_delay,
1868 std::time::Duration::from_millis(500)
1869 );
1870 assert!(cfg.retry.enabled);
1871 }
1872
1873 #[test]
1874 fn sql_endpoint_config_retry_defaults_when_unspecified() {
1875 let cfg =
1876 SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").expect("parse");
1877 assert!(cfg.retry.enabled);
1879 assert_eq!(cfg.retry.max_attempts, 10); }
1881
1882 #[test]
1883 fn sql_global_config_has_retry_default() {
1884 let cfg = SqlGlobalConfig::default();
1885 assert!(cfg.retry.enabled);
1886 }
1887
1888 #[test]
1889 fn retry_policy_parse_full_uri_params() {
1890 let cfg = SqlEndpointConfig::from_uri(
1891 "sql:select 1?db_url=sqlite::memory:&retryEnabled=false&retryMaxAttempts=7&retryInitialDelayMs=1000&retryMultiplier=3.0&retryMaxDelayMs=60000&retryJitter=0.5",
1892 )
1893 .expect("parse");
1894 assert!(!cfg.retry.enabled);
1895 assert_eq!(cfg.retry.max_attempts, 7);
1896 assert_eq!(
1897 cfg.retry.initial_delay,
1898 std::time::Duration::from_millis(1000)
1899 );
1900 assert!((cfg.retry.multiplier - 3.0).abs() < f64::EPSILON);
1901 assert_eq!(cfg.retry.max_delay, std::time::Duration::from_millis(60000));
1902 assert!((cfg.retry.jitter_factor - 0.5).abs() < f64::EPSILON);
1903 }
1904
1905 #[test]
1906 fn retry_policy_from_uri_survives_apply_defaults_with_global() {
1907 let mut ep = SqlEndpointConfig::from_uri(
1908 "sql:select 1?db_url=sqlite::memory:&retryMaxAttempts=10&retryInitialDelayMs=500",
1909 )
1910 .expect("parse");
1911 let global = SqlGlobalConfig::default(); ep.apply_defaults(&global);
1913 assert_eq!(ep.retry.max_attempts, 10);
1915 assert_eq!(
1916 ep.retry.initial_delay,
1917 std::time::Duration::from_millis(500)
1918 );
1919 }
1920
1921 #[test]
1922 fn retry_policy_falls_back_to_global_when_uri_has_no_retry_params() {
1923 let mut ep =
1924 SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").expect("parse");
1925 let mut global = SqlGlobalConfig::default();
1926 global.retry.max_attempts = 7;
1927 ep.apply_defaults(&global);
1928 assert_eq!(ep.retry.max_attempts, 7);
1930 }
1931
1932 #[test]
1933 fn from_uri_with_datasource_name() {
1934 let cfg = SqlEndpointConfig::from_uri("sql:SELECT 1?datasource=orders").unwrap();
1935 assert_eq!(cfg.datasource_name.as_deref(), Some("orders"));
1936 assert!(cfg.db_url.is_empty());
1937 }
1938
1939 #[test]
1940 fn from_uri_with_datasource_and_behavior_override() {
1941 let cfg =
1942 SqlEndpointConfig::from_uri("sql:SELECT 1?datasource=orders&outputType=SelectOne")
1943 .unwrap();
1944 assert_eq!(cfg.datasource_name.as_deref(), Some("orders"));
1945 }
1946
1947 #[test]
1948 fn from_uri_datasource_rejects_pool_override() {
1949 let result =
1950 SqlEndpointConfig::from_uri("sql:SELECT 1?datasource=orders&maxConnections=50");
1951 assert!(result.is_err());
1952 let msg = result.unwrap_err().to_string();
1953 assert!(msg.contains("pool-affecting"));
1954 }
1955
1956 #[test]
1957 fn from_uri_neither_datasource_nor_db_url_is_error() {
1958 let result = SqlEndpointConfig::from_uri("sql:SELECT 1");
1959 assert!(result.is_err());
1960 }
1961
1962 #[test]
1963 fn from_uri_db_url_inline_still_works() {
1964 let cfg =
1965 SqlEndpointConfig::from_uri("sql:SELECT 1?db_url=postgres://localhost/test").unwrap();
1966 assert!(cfg.datasource_name.is_none());
1967 assert_eq!(cfg.db_url, "postgres://localhost/test");
1968 }
1969
1970 #[test]
1971 fn from_uri_datasource_rejects_ssl_mode() {
1972 let result = SqlEndpointConfig::from_uri("sql:SELECT 1?datasource=orders&sslMode=require");
1973 assert!(result.is_err());
1974 let msg = result.unwrap_err().to_string();
1975 assert!(msg.contains("pool-affecting"));
1976 }
1977
1978 #[test]
1979 fn from_uri_datasource_rejects_ssl_root_cert() {
1980 let result =
1981 SqlEndpointConfig::from_uri("sql:SELECT 1?datasource=orders&sslRootCert=/ca.pem");
1982 assert!(result.is_err());
1983 }
1984
1985 #[test]
1986 fn from_uri_datasource_rejects_db_url() {
1987 let result = SqlEndpointConfig::from_uri(
1988 "sql:SELECT 1?datasource=orders&db_url=postgres://evil:5432/pwned",
1989 );
1990 assert!(result.is_err());
1991 let msg = result.unwrap_err().to_string();
1992 assert!(msg.contains("db_url") && msg.contains("datasource"));
1993 }
1994}