1use std::str::FromStr;
2use std::time::Duration;
3
4use camel_component_api::CamelError;
5use camel_component_api::NetworkRetryPolicy;
6use camel_component_api::{UriComponents, UriConfig, parse_uri};
7use tracing::warn;
8
9fn redacted_opt(opt: &Option<String>) -> Option<&'static str> {
11 if opt.is_some() { Some("***") } else { None }
12}
13
14pub fn redact_db_url(db_url: &str) -> String {
17 match url::Url::parse(db_url) {
18 Ok(mut parsed) => {
19 if parsed.username().is_empty() && parsed.password().is_none() {
20 return db_url.to_string();
21 }
22 let _ = parsed.set_username("***");
23 let _ = parsed.set_password(Some("***"));
24 parsed.to_string()
25 }
26 Err(_) => db_url.to_string(),
27 }
28}
29
30#[derive(Debug, Clone, PartialEq, Default)]
32pub enum SqlOutputType {
33 #[default]
35 SelectList,
36 SelectOne,
38 StreamList,
40}
41
42impl FromStr for SqlOutputType {
43 type Err = CamelError;
44
45 fn from_str(s: &str) -> Result<Self, Self::Err> {
46 match s {
47 "SelectList" => Ok(SqlOutputType::SelectList),
48 "SelectOne" => Ok(SqlOutputType::SelectOne),
49 "StreamList" => Ok(SqlOutputType::StreamList),
50 _ => Err(CamelError::InvalidUri(format!(
51 "Unknown output type: {}",
52 s
53 ))),
54 }
55 }
56}
57
58#[derive(Debug, Clone, PartialEq, Default)]
66pub enum TransactionMode {
67 #[default]
69 Auto,
70 Managed,
72}
73
74impl FromStr for TransactionMode {
75 type Err = CamelError;
76
77 fn from_str(s: &str) -> Result<Self, Self::Err> {
78 match s {
79 "Auto" => Ok(TransactionMode::Auto),
80 "Managed" => Ok(TransactionMode::Managed),
81 _ => Err(CamelError::InvalidUri(format!(
82 "Unknown transaction mode: {}. Expected 'Auto' or 'Managed'",
83 s
84 ))),
85 }
86 }
87}
88
89impl std::fmt::Display for TransactionMode {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 TransactionMode::Auto => write!(f, "Auto"),
93 TransactionMode::Managed => write!(f, "Managed"),
94 }
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Default)]
100pub enum ProcessingStrategy {
101 #[default]
103 Direct,
104 Scheduled,
106}
107
108impl FromStr for ProcessingStrategy {
109 type Err = CamelError;
110
111 fn from_str(s: &str) -> Result<Self, Self::Err> {
112 match s {
113 "Direct" => Ok(ProcessingStrategy::Direct),
114 "Scheduled" => Ok(ProcessingStrategy::Scheduled),
115 _ => Err(CamelError::InvalidUri(format!(
116 "Unknown processing strategy: {}. Expected 'Direct' or 'Scheduled'",
117 s
118 ))),
119 }
120 }
121}
122
123impl std::fmt::Display for ProcessingStrategy {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 match self {
126 ProcessingStrategy::Direct => write!(f, "Direct"),
127 ProcessingStrategy::Scheduled => write!(f, "Scheduled"),
128 }
129 }
130}
131
132#[derive(Debug, Clone, PartialEq, Default)]
134pub enum PollStrategy {
135 #[default]
137 Sequential,
138 Burst,
140}
141
142impl FromStr for PollStrategy {
143 type Err = CamelError;
144
145 fn from_str(s: &str) -> Result<Self, Self::Err> {
146 match s {
147 "Sequential" => Ok(PollStrategy::Sequential),
148 "Burst" => Ok(PollStrategy::Burst),
149 _ => Err(CamelError::InvalidUri(format!(
150 "Unknown poll strategy: {}. Expected 'Sequential' or 'Burst'",
151 s
152 ))),
153 }
154 }
155}
156
157impl std::fmt::Display for PollStrategy {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 match self {
160 PollStrategy::Sequential => write!(f, "Sequential"),
161 PollStrategy::Burst => write!(f, "Burst"),
162 }
163 }
164}
165
166#[derive(Clone, PartialEq, serde::Deserialize)]
173#[serde(default)]
174pub struct SqlGlobalConfig {
175 pub max_connections: u32,
176 pub min_connections: u32,
177 pub idle_timeout_secs: u64,
178 pub max_lifetime_secs: u64,
179 pub ssl_mode: Option<String>,
181 pub ssl_root_cert: Option<String>,
182 pub ssl_cert: Option<String>,
183 pub ssl_key: Option<String>,
184 #[serde(default)]
186 pub retry: NetworkRetryPolicy,
187}
188
189impl std::fmt::Debug for SqlGlobalConfig {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 f.debug_struct("SqlGlobalConfig")
192 .field("max_connections", &self.max_connections)
193 .field("min_connections", &self.min_connections)
194 .field("idle_timeout_secs", &self.idle_timeout_secs)
195 .field("max_lifetime_secs", &self.max_lifetime_secs)
196 .field("ssl_mode", &self.ssl_mode)
197 .field("ssl_root_cert", &self.ssl_root_cert)
198 .field("ssl_cert", &self.ssl_cert)
199 .field("ssl_key", &redacted_opt(&self.ssl_key))
200 .field("retry", &self.retry)
201 .finish()
202 }
203}
204
205impl Default for SqlGlobalConfig {
206 fn default() -> Self {
207 Self {
208 max_connections: 5,
209 min_connections: 1,
210 idle_timeout_secs: 300,
211 max_lifetime_secs: 1800,
212 ssl_mode: None,
213 ssl_root_cert: None,
214 ssl_cert: None,
215 ssl_key: None,
216 retry: NetworkRetryPolicy::default(),
217 }
218 }
219}
220
221impl SqlGlobalConfig {
222 pub fn new() -> Self {
223 Self::default()
224 }
225
226 pub fn with_max_connections(mut self, value: u32) -> Self {
227 self.max_connections = value;
228 self
229 }
230
231 pub fn with_min_connections(mut self, value: u32) -> Self {
232 self.min_connections = value;
233 self
234 }
235
236 pub fn with_idle_timeout_secs(mut self, value: u64) -> Self {
237 self.idle_timeout_secs = value;
238 self
239 }
240
241 pub fn with_max_lifetime_secs(mut self, value: u64) -> Self {
242 self.max_lifetime_secs = value;
243 self
244 }
245
246 pub fn with_ssl_mode(mut self, value: impl Into<String>) -> Self {
247 self.ssl_mode = Some(value.into());
248 self
249 }
250
251 pub fn with_ssl_root_cert(mut self, value: impl Into<String>) -> Self {
252 self.ssl_root_cert = Some(value.into());
253 self
254 }
255
256 pub fn with_ssl_cert(mut self, value: impl Into<String>) -> Self {
257 self.ssl_cert = Some(value.into());
258 self
259 }
260
261 pub fn with_ssl_key(mut self, value: impl Into<String>) -> Self {
262 self.ssl_key = Some(value.into());
263 self
264 }
265
266 pub fn with_retry(mut self, value: NetworkRetryPolicy) -> Self {
267 self.retry = value;
268 self
269 }
270}
271
272#[derive(Clone)]
289pub struct SqlEndpointConfig {
290 pub db_url: String,
293 pub max_connections: Option<u32>,
295 pub min_connections: Option<u32>,
297 pub idle_timeout_secs: Option<u64>,
299 pub max_lifetime_secs: Option<u64>,
301
302 pub query: String,
305 pub source_path: Option<String>,
307 pub output_type: SqlOutputType,
309 pub placeholder: char,
311 pub use_placeholder: bool,
313 pub noop: bool,
315 pub in_separator: String,
317
318 pub always_populate_statement: bool,
322
323 pub allow_named_parameters: bool,
327
328 pub fetch_size: Option<u32>,
331
332 pub transaction_mode: TransactionMode,
335
336 pub delay_ms: u64,
339 pub initial_delay_ms: u64,
341 pub max_messages_per_poll: Option<i32>,
343 pub on_consume: Option<String>,
345 pub on_consume_failed: Option<String>,
347 pub on_consume_batch_complete: Option<String>,
349 pub route_empty_result_set: bool,
351 pub use_iterator: bool,
353 pub expected_update_count: Option<i64>,
355 pub break_batch_on_consume_fail: bool,
357 pub bridge_error_handler: bool,
359
360 pub repeat_count: Option<u32>,
364
365 pub processing_strategy: ProcessingStrategy,
368
369 pub poll_strategy: PollStrategy,
372
373 pub batch: bool,
376 pub use_message_body_for_sql: bool,
378
379 pub ssl_mode: Option<String>,
382 pub ssl_root_cert: Option<String>,
384 pub ssl_cert: Option<String>,
386 pub ssl_key: Option<String>,
388
389 pub retry: NetworkRetryPolicy,
391
392 retry_set_from_uri: bool,
396}
397
398impl std::fmt::Debug for SqlEndpointConfig {
399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 f.debug_struct("SqlEndpointConfig")
401 .field("db_url", &redact_db_url(&self.db_url))
402 .field("max_connections", &self.max_connections)
403 .field("min_connections", &self.min_connections)
404 .field("idle_timeout_secs", &self.idle_timeout_secs)
405 .field("max_lifetime_secs", &self.max_lifetime_secs)
406 .field("query", &self.query)
407 .field("source_path", &self.source_path)
408 .field("output_type", &self.output_type)
409 .field("placeholder", &self.placeholder)
410 .field("use_placeholder", &self.use_placeholder)
411 .field("noop", &self.noop)
412 .field("in_separator", &self.in_separator)
413 .field("always_populate_statement", &self.always_populate_statement)
414 .field("allow_named_parameters", &self.allow_named_parameters)
415 .field("fetch_size", &self.fetch_size)
416 .field("transaction_mode", &self.transaction_mode)
417 .field("delay_ms", &self.delay_ms)
418 .field("initial_delay_ms", &self.initial_delay_ms)
419 .field("max_messages_per_poll", &self.max_messages_per_poll)
420 .field("on_consume", &self.on_consume)
421 .field("on_consume_failed", &self.on_consume_failed)
422 .field("on_consume_batch_complete", &self.on_consume_batch_complete)
423 .field("route_empty_result_set", &self.route_empty_result_set)
424 .field("use_iterator", &self.use_iterator)
425 .field("expected_update_count", &self.expected_update_count)
426 .field(
427 "break_batch_on_consume_fail",
428 &self.break_batch_on_consume_fail,
429 )
430 .field("bridge_error_handler", &self.bridge_error_handler)
431 .field("repeat_count", &self.repeat_count)
432 .field("processing_strategy", &self.processing_strategy)
433 .field("poll_strategy", &self.poll_strategy)
434 .field("batch", &self.batch)
435 .field("use_message_body_for_sql", &self.use_message_body_for_sql)
436 .field("ssl_mode", &self.ssl_mode)
437 .field("ssl_root_cert", &self.ssl_root_cert)
438 .field("ssl_cert", &self.ssl_cert)
439 .field("ssl_key", &redacted_opt(&self.ssl_key))
440 .field("retry", &self.retry)
441 .finish()
442 }
443}
444
445impl SqlEndpointConfig {
446 pub fn apply_defaults(&mut self, defaults: &SqlGlobalConfig) {
448 if self.max_connections.is_none() {
449 self.max_connections = Some(defaults.max_connections);
450 }
451 if self.min_connections.is_none() {
452 self.min_connections = Some(defaults.min_connections);
453 }
454 if self.idle_timeout_secs.is_none() {
455 self.idle_timeout_secs = Some(defaults.idle_timeout_secs);
456 }
457 if self.max_lifetime_secs.is_none() {
458 self.max_lifetime_secs = Some(defaults.max_lifetime_secs);
459 }
460 if self.ssl_mode.is_none() {
461 self.ssl_mode = defaults.ssl_mode.clone();
462 }
463 if self.ssl_root_cert.is_none() {
464 self.ssl_root_cert = defaults.ssl_root_cert.clone();
465 }
466 if self.ssl_cert.is_none() {
467 self.ssl_cert = defaults.ssl_cert.clone();
468 }
469 if self.ssl_key.is_none() {
470 self.ssl_key = defaults.ssl_key.clone();
471 }
472 if !self.retry_set_from_uri {
474 self.retry = defaults.retry.clone();
475 }
476 }
477
478 pub fn resolve_defaults(&mut self) {
480 let defaults = SqlGlobalConfig::default();
481 self.apply_defaults(&defaults);
482 }
483
484 pub async fn resolve_file_query(&mut self) -> Result<(), CamelError> {
493 if let Some(file_path) = self.source_path.take() {
494 let contents = tokio::fs::read_to_string(&file_path).await.map_err(|e| {
495 CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
496 })?;
497 self.query = contents.trim().to_string();
498 self.source_path = Some(file_path);
500 }
501 Ok(())
502 }
503}
504
505struct SslParamMapping {
506 pg_key: &'static str,
507 mysql_key: &'static str,
508}
509
510const SSL_MAPPINGS: &[(&str, SslParamMapping)] = &[
511 (
512 "sslMode",
513 SslParamMapping {
514 pg_key: "sslmode",
515 mysql_key: "ssl-mode",
516 },
517 ),
518 (
519 "sslRootCert",
520 SslParamMapping {
521 pg_key: "sslrootcert",
522 mysql_key: "ssl-ca",
523 },
524 ),
525 (
526 "sslCert",
527 SslParamMapping {
528 pg_key: "sslcert",
529 mysql_key: "ssl-cert",
530 },
531 ),
532 (
533 "sslKey",
534 SslParamMapping {
535 pg_key: "sslkey",
536 mysql_key: "ssl-key",
537 },
538 ),
539];
540
541pub fn enrich_db_url_with_ssl(
542 db_url: &str,
543 config: &SqlEndpointConfig,
544) -> Result<String, CamelError> {
545 let ssl_params: Vec<(&str, &str)> = [
546 config.ssl_mode.as_deref().map(|v| ("sslMode", v)),
547 config.ssl_root_cert.as_deref().map(|v| ("sslRootCert", v)),
548 config.ssl_cert.as_deref().map(|v| ("sslCert", v)),
549 config.ssl_key.as_deref().map(|v| ("sslKey", v)),
550 ]
551 .into_iter()
552 .flatten()
553 .collect();
554
555 if ssl_params.is_empty() {
556 return Ok(db_url.to_string());
557 }
558
559 let mut parsed = url::Url::parse(db_url).map_err(|e| {
560 CamelError::InvalidUri(format!(
561 "Cannot parse database URL for SSL enrichment: {}",
562 e
563 ))
564 })?;
565
566 let scheme = parsed.scheme();
567 if scheme.starts_with("sqlite") {
568 warn!(
569 "SSL options configured for SQLite database URL, but SQLite does not support SSL/TLS; ignoring sslMode/sslRootCert/sslCert/sslKey"
570 );
571 return Ok(db_url.to_string());
572 }
573
574 if scheme != "postgres" && scheme != "postgresql" && scheme != "mysql" {
575 return Ok(db_url.to_string());
576 }
577 let is_mysql = scheme == "mysql";
578
579 let mut query_pairs = parsed.query_pairs().collect::<Vec<_>>();
580 for (camel_name, value) in &ssl_params {
581 if let Some((_, mapping)) = SSL_MAPPINGS.iter().find(|(name, _)| *name == *camel_name) {
582 let driver_key = if is_mysql {
583 mapping.mysql_key
584 } else {
585 mapping.pg_key
586 };
587
588 if let Some(pos) = query_pairs.iter().position(|(k, _)| k == driver_key) {
589 query_pairs[pos].1 = (*value).into();
590 } else {
591 query_pairs.push((driver_key.into(), (*value).into()));
592 }
593 }
594 }
595
596 {
597 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
598 for (k, v) in query_pairs {
599 serializer.append_pair(&k, &v);
600 }
601 parsed.set_query(Some(&serializer.finish()));
602 }
603
604 Ok(parsed.to_string())
605}
606
607impl UriConfig for SqlEndpointConfig {
608 fn scheme() -> &'static str {
609 "sql"
610 }
611
612 fn from_uri(uri: &str) -> Result<Self, CamelError> {
613 let parts = parse_uri(uri)?;
614 Self::from_components(parts)
615 }
616
617 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
618 if parts.scheme != Self::scheme() {
620 return Err(CamelError::InvalidUri(format!(
621 "expected scheme '{}' but got '{}'",
622 Self::scheme(),
623 parts.scheme
624 )));
625 }
626
627 let params = &parts.params;
628
629 let (query, source_path) = if parts.path.starts_with("file:") {
634 let file_path = parts.path.trim_start_matches("file:").to_string();
635 (String::new(), Some(file_path))
636 } else {
637 (parts.path.clone(), None)
638 };
639
640 let db_url = params
642 .get("db_url")
643 .ok_or_else(|| CamelError::Config("db_url parameter is required".to_string()))?
644 .clone();
645
646 let max_connections = params.get("maxConnections").and_then(|v| v.parse().ok());
648 let min_connections = params.get("minConnections").and_then(|v| v.parse().ok());
649 let idle_timeout_secs = params.get("idleTimeoutSecs").and_then(|v| v.parse().ok());
650 let max_lifetime_secs = params.get("maxLifetimeSecs").and_then(|v| v.parse().ok());
651
652 let output_type = params
654 .get("outputType")
655 .map(|s| s.parse())
656 .transpose()?
657 .unwrap_or_default();
658 let placeholder = params
659 .get("placeholder")
660 .filter(|v| !v.is_empty())
661 .map(|v| {
662 if v.chars().count() != 1 {
663 return Err(CamelError::InvalidUri(format!(
664 "placeholder must be exactly one character, got '{}'",
665 v
666 )));
667 }
668 if !v.is_ascii() {
669 return Err(CamelError::InvalidUri(
670 "placeholder must be a single ASCII character".to_string(),
671 ));
672 }
673 Ok(v.chars().next().unwrap()) })
675 .transpose()?
676 .unwrap_or('#');
677 fn parse_bool_param(name: &str, value: &str) -> Result<bool, CamelError> {
682 if value.eq_ignore_ascii_case("true") {
683 Ok(true)
684 } else if value.eq_ignore_ascii_case("false") {
685 Ok(false)
686 } else {
687 Err(CamelError::InvalidUri(format!(
688 "{} must be 'true' or 'false', got '{}'",
689 name, value
690 )))
691 }
692 }
693
694 let use_placeholder = params
695 .get("usePlaceholder")
696 .map(|v| parse_bool_param("usePlaceholder", v))
697 .transpose()?
698 .unwrap_or(true);
699 let noop = params
700 .get("noop")
701 .map(|v| parse_bool_param("noop", v))
702 .transpose()?
703 .unwrap_or(false);
704 let in_separator = params
705 .get("inSeparator")
706 .map(|v| v.to_string())
707 .unwrap_or_else(|| ", ".to_string());
708 if in_separator.is_empty() {
709 return Err(CamelError::InvalidUri(
710 "inSeparator must not be empty".to_string(),
711 ));
712 }
713
714 let always_populate_statement = params
716 .get("alwaysPopulateStatement")
717 .map(|v| parse_bool_param("alwaysPopulateStatement", v))
718 .transpose()?
719 .unwrap_or(false);
720
721 let allow_named_parameters = params
723 .get("allowNamedParameters")
724 .map(|v| parse_bool_param("allowNamedParameters", v))
725 .transpose()?
726 .unwrap_or(true);
727
728 let fetch_size = params.get("fetchSize").and_then(|v| v.parse().ok());
730
731 let transaction_mode = params
733 .get("transactionMode")
734 .map(|s| s.parse())
735 .transpose()?
736 .unwrap_or_default();
737
738 let delay_ms = params
740 .get("delay")
741 .and_then(|v| v.parse().ok())
742 .unwrap_or(500);
743 let initial_delay_ms = params
744 .get("initialDelay")
745 .and_then(|v| v.parse().ok())
746 .unwrap_or(1000);
747 let max_messages_per_poll = params
748 .get("maxMessagesPerPoll")
749 .and_then(|v| v.parse().ok());
750 let on_consume = params.get("onConsume").cloned();
751 let on_consume_failed = params.get("onConsumeFailed").cloned();
752 let on_consume_batch_complete = params.get("onConsumeBatchComplete").cloned();
753 let route_empty_result_set = params
754 .get("routeEmptyResultSet")
755 .map(|v| parse_bool_param("routeEmptyResultSet", v))
756 .transpose()?
757 .unwrap_or(false);
758 let use_iterator = params
759 .get("useIterator")
760 .map(|v| parse_bool_param("useIterator", v))
761 .transpose()?
762 .unwrap_or(true);
763 let expected_update_count = params
764 .get("expectedUpdateCount")
765 .and_then(|v| v.parse().ok());
766 let break_batch_on_consume_fail = params
767 .get("breakBatchOnConsumeFail")
768 .map(|v| parse_bool_param("breakBatchOnConsumeFail", v))
769 .transpose()?
770 .unwrap_or(false);
771 let bridge_error_handler = params
772 .get("bridgeErrorHandler")
773 .map(|v| parse_bool_param("bridgeErrorHandler", v))
774 .transpose()?
775 .unwrap_or(false);
776
777 let repeat_count = params.get("repeatCount").and_then(|v| v.parse().ok());
779
780 let processing_strategy = params
782 .get("processingStrategy")
783 .map(|s| s.parse())
784 .transpose()?
785 .unwrap_or_default();
786
787 let poll_strategy = params
789 .get("pollStrategy")
790 .map(|s| s.parse())
791 .transpose()?
792 .unwrap_or_default();
793
794 let batch = params
796 .get("batch")
797 .map(|v| parse_bool_param("batch", v))
798 .transpose()?
799 .unwrap_or(false);
800 let use_message_body_for_sql = params
801 .get("useMessageBodyForSql")
802 .map(|v| parse_bool_param("useMessageBodyForSql", v))
803 .transpose()?
804 .unwrap_or(false);
805 let ssl_mode = params.get("sslMode").cloned();
806 let ssl_root_cert = params.get("sslRootCert").cloned();
807 let ssl_cert = params.get("sslCert").cloned();
808 let ssl_key = params.get("sslKey").cloned();
809
810 let mut retry = NetworkRetryPolicy::default();
812 let mut retry_set_from_uri = false;
813 if let Some(raw) = params.get("retryEnabled") {
814 retry.enabled = raw.parse::<bool>().map_err(|_| {
815 CamelError::InvalidUri(format!("retryEnabled must be a boolean, got '{raw}'"))
816 })?;
817 retry_set_from_uri = true;
818 }
819 if let Some(raw) = params.get("retryMaxAttempts") {
820 retry.max_attempts = raw.parse::<u32>().map_err(|_| {
821 CamelError::InvalidUri(format!("retryMaxAttempts must be a u32, got '{raw}'"))
822 })?;
823 retry_set_from_uri = true;
824 }
825 if let Some(raw) = params.get("retryInitialDelayMs") {
826 retry.initial_delay = Duration::from_millis(raw.parse::<u64>().map_err(|_| {
827 CamelError::InvalidUri(format!("retryInitialDelayMs must be a u64, got '{raw}'"))
828 })?);
829 retry_set_from_uri = true;
830 }
831 if let Some(raw) = params.get("retryMultiplier") {
832 retry.multiplier = raw.parse::<f64>().map_err(|_| {
833 CamelError::InvalidUri(format!("retryMultiplier must be a f64, got '{raw}'"))
834 })?;
835 retry_set_from_uri = true;
836 }
837 if let Some(raw) = params.get("retryMaxDelayMs") {
838 retry.max_delay = Duration::from_millis(raw.parse::<u64>().map_err(|_| {
839 CamelError::InvalidUri(format!("retryMaxDelayMs must be a u64, got '{raw}'"))
840 })?);
841 retry_set_from_uri = true;
842 }
843 if let Some(raw) = params.get("retryJitter") {
844 retry.jitter_factor = raw.parse::<f64>().map_err(|_| {
845 CamelError::InvalidUri(format!("retryJitter must be a f64, got '{raw}'"))
846 })?;
847 retry_set_from_uri = true;
848 }
849
850 Ok(Self {
851 db_url,
852 max_connections,
853 min_connections,
854 idle_timeout_secs,
855 max_lifetime_secs,
856 query,
857 source_path,
858 output_type,
859 placeholder,
860 use_placeholder,
861 noop,
862 in_separator,
863 always_populate_statement,
864 allow_named_parameters,
865 fetch_size,
866 transaction_mode,
867 delay_ms,
868 initial_delay_ms,
869 max_messages_per_poll,
870 on_consume,
871 on_consume_failed,
872 on_consume_batch_complete,
873 route_empty_result_set,
874 use_iterator,
875 expected_update_count,
876 break_batch_on_consume_fail,
877 bridge_error_handler,
878 repeat_count,
879 processing_strategy,
880 poll_strategy,
881 batch,
882 use_message_body_for_sql,
883 ssl_mode,
884 ssl_root_cert,
885 ssl_cert,
886 ssl_key,
887 retry,
888 retry_set_from_uri,
889 })
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use super::*;
896 use camel_component_api::NetworkRetryPolicy;
897
898 #[test]
899 fn config_defaults() {
900 let mut c =
901 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
902 c.resolve_defaults();
903 assert_eq!(c.query, "select 1");
904 assert_eq!(c.db_url, "postgres://localhost/test");
905 assert_eq!(c.max_connections, Some(5));
906 assert_eq!(c.min_connections, Some(1));
907 assert_eq!(c.idle_timeout_secs, Some(300));
908 assert_eq!(c.max_lifetime_secs, Some(1800));
909 assert_eq!(c.output_type, SqlOutputType::SelectList);
910 assert_eq!(c.placeholder, '#');
911 assert!(!c.noop);
912 assert_eq!(c.in_separator, ", ");
913 assert_eq!(c.delay_ms, 500);
914 assert_eq!(c.initial_delay_ms, 1000);
915 assert!(c.max_messages_per_poll.is_none());
916 assert!(c.on_consume.is_none());
917 assert!(c.on_consume_failed.is_none());
918 assert!(c.on_consume_batch_complete.is_none());
919 assert!(!c.route_empty_result_set);
920 assert!(c.use_iterator);
921 assert!(c.expected_update_count.is_none());
922 assert!(!c.break_batch_on_consume_fail);
923 assert!(!c.batch);
924 assert!(!c.use_message_body_for_sql);
925 assert!(c.ssl_mode.is_none());
926 assert!(c.ssl_root_cert.is_none());
927 assert!(c.ssl_cert.is_none());
928 assert!(c.ssl_key.is_none());
929 assert!(!c.always_populate_statement);
931 assert!(c.allow_named_parameters);
932 assert!(c.fetch_size.is_none());
933 assert_eq!(c.transaction_mode, TransactionMode::Auto);
934 assert!(c.repeat_count.is_none());
935 assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
936 assert_eq!(c.poll_strategy, PollStrategy::Sequential);
937 }
938
939 #[test]
940 fn ssl_none_by_default() {
941 let c =
942 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
943 assert!(c.ssl_mode.is_none());
944 assert!(c.ssl_root_cert.is_none());
945 assert!(c.ssl_cert.is_none());
946 assert!(c.ssl_key.is_none());
947 }
948
949 #[test]
950 fn ssl_mode_from_uri() {
951 let c = SqlEndpointConfig::from_uri(
952 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
953 )
954 .unwrap();
955 assert_eq!(c.ssl_mode, Some("require".to_string()));
956 assert!(c.ssl_root_cert.is_none());
957 }
958
959 #[test]
960 fn ssl_all_params_from_uri() {
961 let c = SqlEndpointConfig::from_uri(
962 "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
963 )
964 .unwrap();
965 assert_eq!(c.ssl_mode, Some("require".to_string()));
966 assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
967 assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
968 assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
969 }
970
971 #[test]
972 fn ssl_global_applied_to_endpoint() {
973 let mut c =
974 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
975 let global = SqlGlobalConfig::default()
976 .with_ssl_mode("require")
977 .with_ssl_root_cert("/etc/ssl/ca.pem");
978 c.apply_defaults(&global);
979 assert_eq!(c.ssl_mode, Some("require".to_string()));
980 assert_eq!(c.ssl_root_cert, Some("/etc/ssl/ca.pem".to_string()));
981 assert!(c.ssl_cert.is_none());
982 assert!(c.ssl_key.is_none());
983 }
984
985 #[test]
986 fn ssl_uri_overrides_global() {
987 let mut c = SqlEndpointConfig::from_uri(
988 "sql:select 1?db_url=postgres://localhost/test&sslMode=verify-full",
989 )
990 .unwrap();
991 let global = SqlGlobalConfig::default().with_ssl_mode("require");
992 c.apply_defaults(&global);
993 assert_eq!(c.ssl_mode, Some("verify-full".to_string()));
994 }
995
996 #[test]
997 fn config_wrong_scheme() {
998 assert!(SqlEndpointConfig::from_uri("redis://localhost:6379").is_err());
999 }
1000
1001 #[test]
1002 fn config_missing_db_url() {
1003 assert!(SqlEndpointConfig::from_uri("sql:select 1").is_err());
1004 }
1005
1006 #[test]
1007 fn config_output_type_select_one() {
1008 let c = SqlEndpointConfig::from_uri(
1009 "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
1010 )
1011 .unwrap();
1012 assert_eq!(c.output_type, SqlOutputType::SelectOne);
1013 }
1014
1015 #[test]
1016 fn config_output_type_stream_list() {
1017 let c = SqlEndpointConfig::from_uri(
1018 "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
1019 )
1020 .unwrap();
1021 assert_eq!(c.output_type, SqlOutputType::StreamList);
1022 }
1023
1024 #[test]
1025 fn in_separator_default() {
1026 let c =
1027 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1028 assert_eq!(c.in_separator, ", ");
1029 }
1030
1031 #[test]
1032 fn in_separator_from_uri() {
1033 let c = SqlEndpointConfig::from_uri(
1034 "sql:select 1?db_url=postgres://localhost/test&inSeparator=;",
1035 )
1036 .unwrap();
1037 assert_eq!(c.in_separator, ";");
1038 }
1039
1040 #[test]
1041 fn in_separator_empty_rejected() {
1042 let result = SqlEndpointConfig::from_uri(
1043 "sql:select 1?db_url=postgres://localhost/test&inSeparator=",
1044 );
1045 assert!(result.is_err());
1046 let msg = format!("{:?}", result.unwrap_err());
1047 assert!(msg.contains("inSeparator") || msg.contains("empty"));
1048 }
1049
1050 #[test]
1051 fn config_consumer_options() {
1052 let c = SqlEndpointConfig::from_uri(
1053 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
1054 ).unwrap();
1055 assert_eq!(c.delay_ms, 2000);
1056 assert_eq!(c.initial_delay_ms, 500);
1057 assert_eq!(c.max_messages_per_poll, Some(10));
1058 assert_eq!(
1059 c.on_consume,
1060 Some("update t set done=true where id=:#id".to_string())
1061 );
1062 assert_eq!(
1063 c.on_consume_failed,
1064 Some("update t set failed=true where id=:#id".to_string())
1065 );
1066 assert_eq!(
1067 c.on_consume_batch_complete,
1068 Some("delete from t where done=true".to_string())
1069 );
1070 assert!(c.route_empty_result_set);
1071 assert!(!c.use_iterator);
1072 assert_eq!(c.expected_update_count, Some(1));
1073 assert!(c.break_batch_on_consume_fail);
1074 assert!(!c.bridge_error_handler);
1075 }
1076
1077 #[test]
1078 fn config_producer_options() {
1079 let c = SqlEndpointConfig::from_uri(
1080 "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
1081 ).unwrap();
1082 assert!(c.batch);
1083 assert!(c.use_message_body_for_sql);
1084 assert!(c.noop);
1085 }
1086
1087 #[test]
1088 fn config_pool_options() {
1089 let c = SqlEndpointConfig::from_uri(
1090 "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
1091 ).unwrap();
1092 assert_eq!(c.max_connections, Some(20));
1093 assert_eq!(c.min_connections, Some(3));
1094 assert_eq!(c.idle_timeout_secs, Some(600));
1095 assert_eq!(c.max_lifetime_secs, Some(3600));
1096 }
1097
1098 #[test]
1099 fn config_query_with_special_chars() {
1100 let c = SqlEndpointConfig::from_uri(
1101 "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
1102 )
1103 .unwrap();
1104 assert_eq!(
1105 c.query,
1106 "select * from users where name = :#name and age > #"
1107 );
1108 }
1109
1110 #[test]
1111 fn output_type_from_str() {
1112 assert_eq!(
1113 "SelectList".parse::<SqlOutputType>().unwrap(),
1114 SqlOutputType::SelectList
1115 );
1116 assert_eq!(
1117 "SelectOne".parse::<SqlOutputType>().unwrap(),
1118 SqlOutputType::SelectOne
1119 );
1120 assert_eq!(
1121 "StreamList".parse::<SqlOutputType>().unwrap(),
1122 SqlOutputType::StreamList
1123 );
1124 assert!("Invalid".parse::<SqlOutputType>().is_err());
1125 }
1126
1127 #[tokio::test]
1129 async fn config_file_not_found() {
1130 let mut config = SqlEndpointConfig::from_uri(
1131 "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
1132 )
1133 .expect("from_uri should defer file reading");
1134 assert_eq!(
1136 config.source_path,
1137 Some("/nonexistent/path/query.sql".to_string())
1138 );
1139 assert!(config.query.is_empty());
1140
1141 let result = config.resolve_file_query().await;
1143 assert!(result.is_err());
1144 let msg = format!("{:?}", result.unwrap_err());
1145 assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
1146 }
1147
1148 #[tokio::test]
1150 async fn config_file_query() {
1151 use std::io::Write;
1152 let unique_name = format!(
1153 "test_sql_query_{}.sql",
1154 std::time::SystemTime::now()
1155 .duration_since(std::time::UNIX_EPOCH)
1156 .unwrap_or_default()
1157 .as_nanos()
1158 );
1159 let mut tmp = std::env::temp_dir();
1160 tmp.push(unique_name);
1161 {
1162 let mut f = std::fs::File::create(&tmp).unwrap();
1163 writeln!(f, "SELECT * FROM users").unwrap();
1164 }
1165 let uri = format!(
1166 "sql:file:{}?db_url=postgres://localhost/test",
1167 tmp.display()
1168 );
1169 let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1170 assert!(c.query.is_empty());
1172 assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1173
1174 c.resolve_file_query()
1176 .await
1177 .expect("file query should resolve");
1178 assert_eq!(c.query, "SELECT * FROM users");
1179 std::fs::remove_file(&tmp).ok();
1180 }
1181
1182 #[test]
1184 fn pool_fields_none_when_not_set() {
1185 let c =
1186 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1187 assert_eq!(c.max_connections, None);
1188 assert_eq!(c.min_connections, None);
1189 assert_eq!(c.idle_timeout_secs, None);
1190 assert_eq!(c.max_lifetime_secs, None);
1191 }
1192
1193 #[test]
1194 fn apply_defaults_fills_none() {
1195 let mut c =
1196 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1197 let global = SqlGlobalConfig {
1198 max_connections: 10,
1199 min_connections: 2,
1200 idle_timeout_secs: 600,
1201 max_lifetime_secs: 3600,
1202 ssl_mode: None,
1203 ssl_root_cert: None,
1204 ssl_cert: None,
1205 ssl_key: None,
1206 retry: NetworkRetryPolicy::default(),
1207 };
1208 c.apply_defaults(&global);
1209 assert_eq!(c.max_connections, Some(10));
1210 assert_eq!(c.min_connections, Some(2));
1211 assert_eq!(c.idle_timeout_secs, Some(600));
1212 assert_eq!(c.max_lifetime_secs, Some(3600));
1213 assert!(c.ssl_mode.is_none());
1214 assert!(c.ssl_root_cert.is_none());
1215 assert!(c.ssl_cert.is_none());
1216 assert!(c.ssl_key.is_none());
1217 }
1218
1219 #[test]
1220 fn apply_defaults_does_not_override() {
1221 let mut c = SqlEndpointConfig::from_uri(
1222 "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=5",
1223 )
1224 .unwrap();
1225 let global = SqlGlobalConfig {
1226 max_connections: 10,
1227 min_connections: 2,
1228 idle_timeout_secs: 600,
1229 max_lifetime_secs: 3600,
1230 ssl_mode: None,
1231 ssl_root_cert: None,
1232 ssl_cert: None,
1233 ssl_key: None,
1234 retry: NetworkRetryPolicy::default(),
1235 };
1236 c.apply_defaults(&global);
1237 assert_eq!(c.max_connections, Some(99));
1239 assert_eq!(c.min_connections, Some(5));
1240 assert_eq!(c.idle_timeout_secs, Some(600));
1242 assert_eq!(c.max_lifetime_secs, Some(3600));
1243 }
1244
1245 #[test]
1246 fn resolve_defaults_fills_remaining() {
1247 let mut c = SqlEndpointConfig::from_uri(
1248 "sql:select 1?db_url=postgres://localhost/test&maxConnections=7",
1249 )
1250 .unwrap();
1251 c.resolve_defaults();
1252 assert_eq!(c.max_connections, Some(7)); assert_eq!(c.min_connections, Some(1)); assert_eq!(c.idle_timeout_secs, Some(300)); assert_eq!(c.max_lifetime_secs, Some(1800)); }
1257
1258 #[test]
1259 fn global_config_builder() {
1260 let c = SqlGlobalConfig::default()
1261 .with_max_connections(20)
1262 .with_min_connections(3)
1263 .with_idle_timeout_secs(600)
1264 .with_max_lifetime_secs(3600)
1265 .with_ssl_mode("require")
1266 .with_ssl_root_cert("/ca.pem")
1267 .with_ssl_cert("/cert.pem")
1268 .with_ssl_key("/key.pem");
1269 assert_eq!(c.max_connections, 20);
1270 assert_eq!(c.min_connections, 3);
1271 assert_eq!(c.idle_timeout_secs, 600);
1272 assert_eq!(c.max_lifetime_secs, 3600);
1273 assert_eq!(c.ssl_mode, Some("require".to_string()));
1274 assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
1275 assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
1276 assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
1277 }
1278
1279 #[test]
1280 fn enrich_postgres_ssl_mode() {
1281 let mut c = SqlEndpointConfig::from_uri(
1282 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1283 )
1284 .unwrap();
1285 c.resolve_defaults();
1286 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1287 assert!(url.contains("sslmode=require"), "got: {}", url);
1288 }
1289
1290 #[test]
1291 fn enrich_postgres_all_ssl() {
1292 let mut c = SqlEndpointConfig::from_uri(
1293 "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
1294 )
1295 .unwrap();
1296 c.resolve_defaults();
1297 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1298 assert!(url.contains("sslmode=require"), "got: {}", url);
1299 assert!(url.contains("sslrootcert="), "got: {}", url);
1300 assert!(url.contains("sslcert="), "got: {}", url);
1301 assert!(url.contains("sslkey="), "got: {}", url);
1302 }
1303
1304 #[test]
1305 fn enrich_mysql_ssl() {
1306 let mut c = SqlEndpointConfig::from_uri(
1307 "sql:select 1?db_url=mysql://localhost/test&sslMode=require",
1308 )
1309 .unwrap();
1310 c.resolve_defaults();
1311 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1312 assert!(url.contains("ssl-mode=require"), "got: {}", url);
1313 }
1314
1315 #[test]
1316 fn enrich_existing_query_params() {
1317 let mut c = SqlEndpointConfig::from_uri(
1318 "sql:select 1?db_url=postgres://localhost/test?existing=1&sslMode=require",
1319 )
1320 .unwrap();
1321 c.resolve_defaults();
1322 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1323 assert!(url.contains("existing=1"), "got: {}", url);
1324 assert!(url.contains("sslmode=require"), "got: {}", url);
1325 }
1326
1327 #[test]
1328 fn enrich_override_existing() {
1329 let mut c = SqlEndpointConfig::from_uri(
1330 "sql:select 1?db_url=postgres://localhost/test?sslmode=allow&sslMode=require",
1331 )
1332 .unwrap();
1333 c.resolve_defaults();
1334 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1335 assert!(url.contains("sslmode=require"), "got: {}", url);
1336 assert!(!url.contains("sslmode=allow"), "got: {}", url);
1337 }
1338
1339 #[test]
1340 fn enrich_no_params() {
1341 let mut c =
1342 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1343 c.resolve_defaults();
1344 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1345 assert_eq!(url, "postgres://localhost/test");
1346 }
1347
1348 #[test]
1349 fn enrich_url_encodes_paths() {
1350 let mut c = SqlEndpointConfig::from_uri(
1351 "sql:select 1?db_url=postgres://localhost/test&sslRootCert=/path/to/my%20cert.pem",
1352 )
1353 .unwrap();
1354 c.resolve_defaults();
1355 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1356 assert!(url.contains("sslrootcert="), "got: {}", url);
1357 }
1358
1359 #[test]
1360 fn enrich_unsupported_scheme_returns_unchanged() {
1361 let mut c = SqlEndpointConfig::from_uri(
1362 "sql:select 1?db_url=sqlite://localhost/test.db&sslMode=require",
1363 )
1364 .unwrap();
1365 c.resolve_defaults();
1366 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1367 assert_eq!(url, "sqlite://localhost/test.db");
1368 }
1369
1370 #[test]
1371 fn enrich_invalid_url_returns_error() {
1372 let mut c = SqlEndpointConfig::from_uri(
1373 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1374 )
1375 .unwrap();
1376 c.resolve_defaults();
1377 let result = enrich_db_url_with_ssl("://not-a-valid-url", &c);
1378 assert!(result.is_err());
1379 }
1380
1381 #[test]
1385 fn debug_redacts_db_url_with_password() {
1386 let c = SqlEndpointConfig::from_uri(
1387 "sql:select 1?db_url=postgres://user:secret123@localhost/test",
1388 )
1389 .unwrap();
1390 let debug_output = format!("{:?}", c);
1391 assert!(
1392 !debug_output.contains("secret123"),
1393 "Debug output must not contain password: {}",
1394 debug_output
1395 );
1396 assert!(
1397 debug_output.contains("***"),
1398 "Debug output must contain redacted marker: {}",
1399 debug_output
1400 );
1401 }
1402
1403 #[test]
1404 fn debug_redacts_ssl_key() {
1405 let c = SqlEndpointConfig::from_uri(
1406 "sql:select 1?db_url=postgres://localhost/test&sslKey=/secret/key.pem",
1407 )
1408 .unwrap();
1409 let debug_output = format!("{:?}", c);
1410 assert!(
1411 !debug_output.contains("/secret/key.pem"),
1412 "Debug output must not contain ssl_key path: {}",
1413 debug_output
1414 );
1415 }
1416
1417 #[test]
1418 fn debug_global_config_redacts_ssl_key() {
1419 let c = SqlGlobalConfig::default().with_ssl_key("/secret/key.pem");
1420 let debug_output = format!("{:?}", c);
1421 assert!(
1422 !debug_output.contains("/secret/key.pem"),
1423 "Debug output must not contain ssl_key path: {}",
1424 debug_output
1425 );
1426 assert!(
1427 debug_output.contains("***"),
1428 "Debug output must contain redacted marker: {}",
1429 debug_output
1430 );
1431 }
1432
1433 #[test]
1434 fn redact_db_url_with_credentials() {
1435 assert_eq!(
1436 redact_db_url("postgres://user:pass@host/db"),
1437 "postgres://***:***@host/db"
1438 );
1439 }
1440
1441 #[test]
1442 fn redact_db_url_without_credentials() {
1443 assert_eq!(redact_db_url("sqlite::memory:"), "sqlite::memory:");
1444 }
1445
1446 #[test]
1447 fn redact_db_url_invalid_returns_original() {
1448 assert_eq!(redact_db_url("not-a-url"), "not-a-url");
1449 }
1450
1451 #[test]
1453 fn use_placeholder_defaults_to_true() {
1454 let c =
1455 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1456 assert!(c.use_placeholder);
1457 }
1458
1459 #[test]
1460 fn use_placeholder_false_from_uri() {
1461 let c = SqlEndpointConfig::from_uri(
1462 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=false",
1463 )
1464 .unwrap();
1465 assert!(!c.use_placeholder);
1466 }
1467
1468 #[test]
1469 fn use_placeholder_true_from_uri() {
1470 let c = SqlEndpointConfig::from_uri(
1471 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=true",
1472 )
1473 .unwrap();
1474 assert!(c.use_placeholder);
1475 }
1476
1477 #[test]
1479 fn use_placeholder_rejects_invalid_value() {
1480 let result = SqlEndpointConfig::from_uri(
1481 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=1",
1482 );
1483 assert!(result.is_err());
1484 let msg = format!("{:?}", result.unwrap_err());
1485 assert!(msg.contains("usePlaceholder") && msg.contains("true") && msg.contains("false"));
1486 }
1487
1488 #[test]
1489 fn use_placeholder_rejects_typo_tru() {
1490 let result = SqlEndpointConfig::from_uri(
1491 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=tru",
1492 );
1493 assert!(result.is_err());
1494 }
1495
1496 #[test]
1497 fn use_placeholder_rejects_yes() {
1498 let result = SqlEndpointConfig::from_uri(
1499 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=yes",
1500 );
1501 assert!(result.is_err());
1502 }
1503
1504 #[test]
1505 fn noop_rejects_invalid_value() {
1506 let result =
1507 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&noop=1");
1508 assert!(result.is_err());
1509 let msg = format!("{:?}", result.unwrap_err());
1510 assert!(msg.contains("noop"));
1511 }
1512
1513 #[test]
1514 fn batch_rejects_invalid_value() {
1515 let result =
1516 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&batch=yes");
1517 assert!(result.is_err());
1518 let msg = format!("{:?}", result.unwrap_err());
1519 assert!(msg.contains("batch"));
1520 }
1521
1522 #[test]
1523 fn route_empty_result_set_rejects_invalid_value() {
1524 let result = SqlEndpointConfig::from_uri(
1525 "sql:select 1?db_url=postgres://localhost/test&routeEmptyResultSet=on",
1526 );
1527 assert!(result.is_err());
1528 }
1529
1530 #[test]
1531 fn use_iterator_rejects_invalid_value() {
1532 let result = SqlEndpointConfig::from_uri(
1533 "sql:select 1?db_url=postgres://localhost/test&useIterator=1",
1534 );
1535 assert!(result.is_err());
1536 }
1537
1538 #[test]
1539 fn break_batch_on_consume_fail_rejects_invalid_value() {
1540 let result = SqlEndpointConfig::from_uri(
1541 "sql:select 1?db_url=postgres://localhost/test&breakBatchOnConsumeFail=yes",
1542 );
1543 assert!(result.is_err());
1544 }
1545
1546 #[test]
1547 fn use_message_body_for_sql_rejects_invalid_value() {
1548 let result = SqlEndpointConfig::from_uri(
1549 "sql:select 1?db_url=postgres://localhost/test&useMessageBodyForSql=1",
1550 );
1551 assert!(result.is_err());
1552 }
1553
1554 #[test]
1556 fn boolean_params_case_insensitive() {
1557 let c = SqlEndpointConfig::from_uri(
1558 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=TRUE&noop=FALSE&batch=True&useIterator=False&bridgeErrorHandler=TRUE",
1559 )
1560 .unwrap();
1561 assert!(c.use_placeholder);
1562 assert!(!c.noop);
1563 assert!(c.batch);
1564 assert!(!c.use_iterator);
1565 assert!(c.bridge_error_handler);
1566 }
1567
1568 #[test]
1570 fn multi_char_placeholder_rejected() {
1571 let result = SqlEndpointConfig::from_uri(
1572 "sql:select 1?db_url=postgres://localhost/test&placeholder=##",
1573 );
1574 assert!(result.is_err());
1575 let msg = format!("{:?}", result.unwrap_err());
1576 assert!(msg.contains("placeholder") && msg.contains("one character"));
1577 }
1578
1579 #[test]
1580 fn non_ascii_placeholder_rejected() {
1581 let result = SqlEndpointConfig::from_uri(
1582 "sql:select 1?db_url=postgres://localhost/test&placeholder=%C2%A2",
1583 );
1584 assert!(result.is_err());
1585 }
1586
1587 #[test]
1588 fn single_char_placeholder_accepted() {
1589 let c = SqlEndpointConfig::from_uri(
1590 "sql:select 1?db_url=postgres://localhost/test&placeholder=$",
1591 )
1592 .unwrap();
1593 assert_eq!(c.placeholder, '$');
1594 }
1595
1596 #[test]
1597 fn empty_placeholder_falls_back_to_default() {
1598 let c = SqlEndpointConfig::from_uri(
1600 "sql:select 1?db_url=postgres://localhost/test&placeholder=",
1601 )
1602 .unwrap();
1603 assert_eq!(c.placeholder, '#');
1604 }
1605
1606 #[tokio::test]
1608 async fn file_query_cached_in_config() {
1609 use std::io::Write;
1610 let unique_name = format!(
1611 "test_sql_cached_{}.sql",
1612 std::time::SystemTime::now()
1613 .duration_since(std::time::UNIX_EPOCH)
1614 .unwrap_or_default()
1615 .as_nanos()
1616 );
1617 let mut tmp = std::env::temp_dir();
1618 tmp.push(unique_name);
1619 {
1620 let mut f = std::fs::File::create(&tmp).unwrap();
1621 writeln!(f, "SELECT * FROM cached_test").unwrap();
1622 }
1623 let uri = format!(
1624 "sql:file:{}?db_url=postgres://localhost/test",
1625 tmp.display()
1626 );
1627 let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1628 assert!(c.query.is_empty());
1630 assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1631
1632 c.resolve_file_query()
1634 .await
1635 .expect("resolve should succeed");
1636 assert_eq!(c.query, "SELECT * FROM cached_test");
1637
1638 std::fs::remove_file(&tmp).ok();
1640 assert_eq!(c.query, "SELECT * FROM cached_test");
1641 }
1642
1643 #[test]
1647 fn always_populate_statement_defaults_to_false() {
1648 let c =
1649 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1650 assert!(!c.always_populate_statement);
1651 }
1652
1653 #[test]
1654 fn always_populate_statement_from_uri() {
1655 let c = SqlEndpointConfig::from_uri(
1656 "sql:select 1?db_url=postgres://localhost/test&alwaysPopulateStatement=true",
1657 )
1658 .unwrap();
1659 assert!(c.always_populate_statement);
1660 }
1661
1662 #[test]
1664 fn allow_named_parameters_defaults_to_true() {
1665 let c =
1666 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1667 assert!(c.allow_named_parameters);
1668 }
1669
1670 #[test]
1671 fn allow_named_parameters_false_from_uri() {
1672 let c = SqlEndpointConfig::from_uri(
1673 "sql:select 1?db_url=postgres://localhost/test&allowNamedParameters=false",
1674 )
1675 .unwrap();
1676 assert!(!c.allow_named_parameters);
1677 }
1678
1679 #[test]
1681 fn fetch_size_defaults_to_none() {
1682 let c =
1683 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1684 assert!(c.fetch_size.is_none());
1685 }
1686
1687 #[test]
1688 fn fetch_size_from_uri() {
1689 let c = SqlEndpointConfig::from_uri(
1690 "sql:select 1?db_url=postgres://localhost/test&fetchSize=1000",
1691 )
1692 .unwrap();
1693 assert_eq!(c.fetch_size, Some(1000));
1694 }
1695
1696 #[test]
1698 fn transaction_mode_defaults_to_auto() {
1699 let c =
1700 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1701 assert_eq!(c.transaction_mode, TransactionMode::Auto);
1702 }
1703
1704 #[test]
1705 fn transaction_mode_managed_from_uri() {
1706 let c = SqlEndpointConfig::from_uri(
1707 "sql:select 1?db_url=postgres://localhost/test&transactionMode=Managed",
1708 )
1709 .unwrap();
1710 assert_eq!(c.transaction_mode, TransactionMode::Managed);
1711 }
1712
1713 #[test]
1714 fn transaction_mode_invalid_rejected() {
1715 let result = SqlEndpointConfig::from_uri(
1716 "sql:select 1?db_url=postgres://localhost/test&transactionMode=Invalid",
1717 );
1718 assert!(result.is_err());
1719 }
1720
1721 #[test]
1723 fn repeat_count_defaults_to_none() {
1724 let c =
1725 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1726 assert!(c.repeat_count.is_none());
1727 }
1728
1729 #[test]
1730 fn repeat_count_from_uri() {
1731 let c = SqlEndpointConfig::from_uri(
1732 "sql:select 1?db_url=postgres://localhost/test&repeatCount=10",
1733 )
1734 .unwrap();
1735 assert_eq!(c.repeat_count, Some(10));
1736 }
1737
1738 #[test]
1740 fn processing_strategy_defaults_to_direct() {
1741 let c =
1742 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1743 assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
1744 }
1745
1746 #[test]
1747 fn processing_strategy_scheduled_from_uri() {
1748 let c = SqlEndpointConfig::from_uri(
1749 "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Scheduled",
1750 )
1751 .unwrap();
1752 assert_eq!(c.processing_strategy, ProcessingStrategy::Scheduled);
1753 }
1754
1755 #[test]
1756 fn processing_strategy_invalid_rejected() {
1757 let result = SqlEndpointConfig::from_uri(
1758 "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Invalid",
1759 );
1760 assert!(result.is_err());
1761 }
1762
1763 #[test]
1765 fn poll_strategy_defaults_to_sequential() {
1766 let c =
1767 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1768 assert_eq!(c.poll_strategy, PollStrategy::Sequential);
1769 }
1770
1771 #[test]
1772 fn poll_strategy_burst_from_uri() {
1773 let c = SqlEndpointConfig::from_uri(
1774 "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Burst",
1775 )
1776 .unwrap();
1777 assert_eq!(c.poll_strategy, PollStrategy::Burst);
1778 }
1779
1780 #[test]
1781 fn poll_strategy_invalid_rejected() {
1782 let result = SqlEndpointConfig::from_uri(
1783 "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Invalid",
1784 );
1785 assert!(result.is_err());
1786 }
1787
1788 #[test]
1791 fn sql_endpoint_config_has_retry_policy() {
1792 let cfg = SqlEndpointConfig::from_uri(
1793 "sql:select 1?db_url=sqlite::memory:&retryMaxAttempts=3&retryInitialDelayMs=500",
1794 )
1795 .expect("parse");
1796 assert_eq!(cfg.retry.max_attempts, 3);
1797 assert_eq!(
1798 cfg.retry.initial_delay,
1799 std::time::Duration::from_millis(500)
1800 );
1801 assert!(cfg.retry.enabled);
1802 }
1803
1804 #[test]
1805 fn sql_endpoint_config_retry_defaults_when_unspecified() {
1806 let cfg =
1807 SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").expect("parse");
1808 assert!(cfg.retry.enabled);
1810 assert_eq!(cfg.retry.max_attempts, 10); }
1812
1813 #[test]
1814 fn sql_global_config_has_retry_default() {
1815 let cfg = SqlGlobalConfig::default();
1816 assert!(cfg.retry.enabled);
1817 }
1818
1819 #[test]
1820 fn retry_policy_parse_full_uri_params() {
1821 let cfg = SqlEndpointConfig::from_uri(
1822 "sql:select 1?db_url=sqlite::memory:&retryEnabled=false&retryMaxAttempts=7&retryInitialDelayMs=1000&retryMultiplier=3.0&retryMaxDelayMs=60000&retryJitter=0.5",
1823 )
1824 .expect("parse");
1825 assert!(!cfg.retry.enabled);
1826 assert_eq!(cfg.retry.max_attempts, 7);
1827 assert_eq!(
1828 cfg.retry.initial_delay,
1829 std::time::Duration::from_millis(1000)
1830 );
1831 assert!((cfg.retry.multiplier - 3.0).abs() < f64::EPSILON);
1832 assert_eq!(cfg.retry.max_delay, std::time::Duration::from_millis(60000));
1833 assert!((cfg.retry.jitter_factor - 0.5).abs() < f64::EPSILON);
1834 }
1835
1836 #[test]
1837 fn retry_policy_from_uri_survives_apply_defaults_with_global() {
1838 let mut ep = SqlEndpointConfig::from_uri(
1839 "sql:select 1?db_url=sqlite::memory:&retryMaxAttempts=10&retryInitialDelayMs=500",
1840 )
1841 .expect("parse");
1842 let global = SqlGlobalConfig::default(); ep.apply_defaults(&global);
1844 assert_eq!(ep.retry.max_attempts, 10);
1846 assert_eq!(
1847 ep.retry.initial_delay,
1848 std::time::Duration::from_millis(500)
1849 );
1850 }
1851
1852 #[test]
1853 fn retry_policy_falls_back_to_global_when_uri_has_no_retry_params() {
1854 let mut ep =
1855 SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").expect("parse");
1856 let mut global = SqlGlobalConfig::default();
1857 global.retry.max_attempts = 7;
1858 ep.apply_defaults(&global);
1859 assert_eq!(ep.retry.max_attempts, 7);
1861 }
1862}