1use std::str::FromStr;
2
3use camel_component_api::CamelError;
4use camel_component_api::{UriComponents, UriConfig, parse_uri};
5use tracing::warn;
6
7fn redacted_opt(opt: &Option<String>) -> Option<&'static str> {
9 if opt.is_some() { Some("***") } else { None }
10}
11
12pub fn redact_db_url(db_url: &str) -> String {
15 match url::Url::parse(db_url) {
16 Ok(mut parsed) => {
17 if parsed.username().is_empty() && parsed.password().is_none() {
18 return db_url.to_string();
19 }
20 let _ = parsed.set_username("***");
21 let _ = parsed.set_password(Some("***"));
22 parsed.to_string()
23 }
24 Err(_) => db_url.to_string(),
25 }
26}
27
28#[derive(Debug, Clone, PartialEq, Default)]
30pub enum SqlOutputType {
31 #[default]
33 SelectList,
34 SelectOne,
36 StreamList,
38}
39
40impl FromStr for SqlOutputType {
41 type Err = CamelError;
42
43 fn from_str(s: &str) -> Result<Self, Self::Err> {
44 match s {
45 "SelectList" => Ok(SqlOutputType::SelectList),
46 "SelectOne" => Ok(SqlOutputType::SelectOne),
47 "StreamList" => Ok(SqlOutputType::StreamList),
48 _ => Err(CamelError::InvalidUri(format!(
49 "Unknown output type: {}",
50 s
51 ))),
52 }
53 }
54}
55
56#[derive(Debug, Clone, PartialEq, Default)]
64pub enum TransactionMode {
65 #[default]
67 Auto,
68 Managed,
70}
71
72impl FromStr for TransactionMode {
73 type Err = CamelError;
74
75 fn from_str(s: &str) -> Result<Self, Self::Err> {
76 match s {
77 "Auto" => Ok(TransactionMode::Auto),
78 "Managed" => Ok(TransactionMode::Managed),
79 _ => Err(CamelError::InvalidUri(format!(
80 "Unknown transaction mode: {}. Expected 'Auto' or 'Managed'",
81 s
82 ))),
83 }
84 }
85}
86
87impl std::fmt::Display for TransactionMode {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 TransactionMode::Auto => write!(f, "Auto"),
91 TransactionMode::Managed => write!(f, "Managed"),
92 }
93 }
94}
95
96#[derive(Debug, Clone, PartialEq, Default)]
98pub enum ProcessingStrategy {
99 #[default]
101 Direct,
102 Scheduled,
104}
105
106impl FromStr for ProcessingStrategy {
107 type Err = CamelError;
108
109 fn from_str(s: &str) -> Result<Self, Self::Err> {
110 match s {
111 "Direct" => Ok(ProcessingStrategy::Direct),
112 "Scheduled" => Ok(ProcessingStrategy::Scheduled),
113 _ => Err(CamelError::InvalidUri(format!(
114 "Unknown processing strategy: {}. Expected 'Direct' or 'Scheduled'",
115 s
116 ))),
117 }
118 }
119}
120
121impl std::fmt::Display for ProcessingStrategy {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 match self {
124 ProcessingStrategy::Direct => write!(f, "Direct"),
125 ProcessingStrategy::Scheduled => write!(f, "Scheduled"),
126 }
127 }
128}
129
130#[derive(Debug, Clone, PartialEq, Default)]
132pub enum PollStrategy {
133 #[default]
135 Sequential,
136 Burst,
138}
139
140impl FromStr for PollStrategy {
141 type Err = CamelError;
142
143 fn from_str(s: &str) -> Result<Self, Self::Err> {
144 match s {
145 "Sequential" => Ok(PollStrategy::Sequential),
146 "Burst" => Ok(PollStrategy::Burst),
147 _ => Err(CamelError::InvalidUri(format!(
148 "Unknown poll strategy: {}. Expected 'Sequential' or 'Burst'",
149 s
150 ))),
151 }
152 }
153}
154
155impl std::fmt::Display for PollStrategy {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 match self {
158 PollStrategy::Sequential => write!(f, "Sequential"),
159 PollStrategy::Burst => write!(f, "Burst"),
160 }
161 }
162}
163
164#[derive(Clone, PartialEq, serde::Deserialize)]
171#[serde(default)]
172pub struct SqlGlobalConfig {
173 pub max_connections: u32,
174 pub min_connections: u32,
175 pub idle_timeout_secs: u64,
176 pub max_lifetime_secs: u64,
177 pub ssl_mode: Option<String>,
179 pub ssl_root_cert: Option<String>,
180 pub ssl_cert: Option<String>,
181 pub ssl_key: Option<String>,
182}
183
184impl std::fmt::Debug for SqlGlobalConfig {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 f.debug_struct("SqlGlobalConfig")
187 .field("max_connections", &self.max_connections)
188 .field("min_connections", &self.min_connections)
189 .field("idle_timeout_secs", &self.idle_timeout_secs)
190 .field("max_lifetime_secs", &self.max_lifetime_secs)
191 .field("ssl_mode", &self.ssl_mode)
192 .field("ssl_root_cert", &self.ssl_root_cert)
193 .field("ssl_cert", &self.ssl_cert)
194 .field("ssl_key", &redacted_opt(&self.ssl_key))
195 .finish()
196 }
197}
198
199impl Default for SqlGlobalConfig {
200 fn default() -> Self {
201 Self {
202 max_connections: 5,
203 min_connections: 1,
204 idle_timeout_secs: 300,
205 max_lifetime_secs: 1800,
206 ssl_mode: None,
207 ssl_root_cert: None,
208 ssl_cert: None,
209 ssl_key: None,
210 }
211 }
212}
213
214impl SqlGlobalConfig {
215 pub fn new() -> Self {
216 Self::default()
217 }
218
219 pub fn with_max_connections(mut self, value: u32) -> Self {
220 self.max_connections = value;
221 self
222 }
223
224 pub fn with_min_connections(mut self, value: u32) -> Self {
225 self.min_connections = value;
226 self
227 }
228
229 pub fn with_idle_timeout_secs(mut self, value: u64) -> Self {
230 self.idle_timeout_secs = value;
231 self
232 }
233
234 pub fn with_max_lifetime_secs(mut self, value: u64) -> Self {
235 self.max_lifetime_secs = value;
236 self
237 }
238
239 pub fn with_ssl_mode(mut self, value: impl Into<String>) -> Self {
240 self.ssl_mode = Some(value.into());
241 self
242 }
243
244 pub fn with_ssl_root_cert(mut self, value: impl Into<String>) -> Self {
245 self.ssl_root_cert = Some(value.into());
246 self
247 }
248
249 pub fn with_ssl_cert(mut self, value: impl Into<String>) -> Self {
250 self.ssl_cert = Some(value.into());
251 self
252 }
253
254 pub fn with_ssl_key(mut self, value: impl Into<String>) -> Self {
255 self.ssl_key = Some(value.into());
256 self
257 }
258}
259
260#[derive(Clone)]
277pub struct SqlEndpointConfig {
278 pub db_url: String,
281 pub max_connections: Option<u32>,
283 pub min_connections: Option<u32>,
285 pub idle_timeout_secs: Option<u64>,
287 pub max_lifetime_secs: Option<u64>,
289
290 pub query: String,
293 pub source_path: Option<String>,
295 pub output_type: SqlOutputType,
297 pub placeholder: char,
299 pub use_placeholder: bool,
302 pub noop: bool,
304 pub in_separator: String,
306
307 pub always_populate_statement: bool,
311
312 pub allow_named_parameters: bool,
316
317 pub fetch_size: Option<u32>,
320
321 pub transaction_mode: TransactionMode,
324
325 pub delay_ms: u64,
328 pub initial_delay_ms: u64,
330 pub max_messages_per_poll: Option<i32>,
332 pub on_consume: Option<String>,
334 pub on_consume_failed: Option<String>,
336 pub on_consume_batch_complete: Option<String>,
338 pub route_empty_result_set: bool,
340 pub use_iterator: bool,
342 pub expected_update_count: Option<i64>,
344 pub break_batch_on_consume_fail: bool,
346 pub bridge_error_handler: bool,
348
349 pub repeat_count: Option<u32>,
353
354 pub processing_strategy: ProcessingStrategy,
357
358 pub poll_strategy: PollStrategy,
361
362 pub batch: bool,
365 pub use_message_body_for_sql: bool,
367
368 pub ssl_mode: Option<String>,
371 pub ssl_root_cert: Option<String>,
373 pub ssl_cert: Option<String>,
375 pub ssl_key: Option<String>,
377}
378
379impl std::fmt::Debug for SqlEndpointConfig {
380 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381 f.debug_struct("SqlEndpointConfig")
382 .field("db_url", &redact_db_url(&self.db_url))
383 .field("max_connections", &self.max_connections)
384 .field("min_connections", &self.min_connections)
385 .field("idle_timeout_secs", &self.idle_timeout_secs)
386 .field("max_lifetime_secs", &self.max_lifetime_secs)
387 .field("query", &self.query)
388 .field("source_path", &self.source_path)
389 .field("output_type", &self.output_type)
390 .field("placeholder", &self.placeholder)
391 .field("use_placeholder", &self.use_placeholder)
392 .field("noop", &self.noop)
393 .field("in_separator", &self.in_separator)
394 .field("always_populate_statement", &self.always_populate_statement)
395 .field("allow_named_parameters", &self.allow_named_parameters)
396 .field("fetch_size", &self.fetch_size)
397 .field("transaction_mode", &self.transaction_mode)
398 .field("delay_ms", &self.delay_ms)
399 .field("initial_delay_ms", &self.initial_delay_ms)
400 .field("max_messages_per_poll", &self.max_messages_per_poll)
401 .field("on_consume", &self.on_consume)
402 .field("on_consume_failed", &self.on_consume_failed)
403 .field("on_consume_batch_complete", &self.on_consume_batch_complete)
404 .field("route_empty_result_set", &self.route_empty_result_set)
405 .field("use_iterator", &self.use_iterator)
406 .field("expected_update_count", &self.expected_update_count)
407 .field(
408 "break_batch_on_consume_fail",
409 &self.break_batch_on_consume_fail,
410 )
411 .field("bridge_error_handler", &self.bridge_error_handler)
412 .field("repeat_count", &self.repeat_count)
413 .field("processing_strategy", &self.processing_strategy)
414 .field("poll_strategy", &self.poll_strategy)
415 .field("batch", &self.batch)
416 .field("use_message_body_for_sql", &self.use_message_body_for_sql)
417 .field("ssl_mode", &self.ssl_mode)
418 .field("ssl_root_cert", &self.ssl_root_cert)
419 .field("ssl_cert", &self.ssl_cert)
420 .field("ssl_key", &redacted_opt(&self.ssl_key))
421 .finish()
422 }
423}
424
425impl SqlEndpointConfig {
426 pub fn apply_defaults(&mut self, defaults: &SqlGlobalConfig) {
428 if self.max_connections.is_none() {
429 self.max_connections = Some(defaults.max_connections);
430 }
431 if self.min_connections.is_none() {
432 self.min_connections = Some(defaults.min_connections);
433 }
434 if self.idle_timeout_secs.is_none() {
435 self.idle_timeout_secs = Some(defaults.idle_timeout_secs);
436 }
437 if self.max_lifetime_secs.is_none() {
438 self.max_lifetime_secs = Some(defaults.max_lifetime_secs);
439 }
440 if self.ssl_mode.is_none() {
441 self.ssl_mode = defaults.ssl_mode.clone();
442 }
443 if self.ssl_root_cert.is_none() {
444 self.ssl_root_cert = defaults.ssl_root_cert.clone();
445 }
446 if self.ssl_cert.is_none() {
447 self.ssl_cert = defaults.ssl_cert.clone();
448 }
449 if self.ssl_key.is_none() {
450 self.ssl_key = defaults.ssl_key.clone();
451 }
452 }
453
454 pub fn resolve_defaults(&mut self) {
456 let defaults = SqlGlobalConfig::default();
457 self.apply_defaults(&defaults);
458 }
459
460 pub async fn resolve_file_query(&mut self) -> Result<(), CamelError> {
469 if let Some(file_path) = self.source_path.take() {
470 let contents = tokio::fs::read_to_string(&file_path).await.map_err(|e| {
471 CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
472 })?;
473 self.query = contents.trim().to_string();
474 self.source_path = Some(file_path);
476 }
477 Ok(())
478 }
479}
480
481struct SslParamMapping {
482 pg_key: &'static str,
483 mysql_key: &'static str,
484}
485
486const SSL_MAPPINGS: &[(&str, SslParamMapping)] = &[
487 (
488 "sslMode",
489 SslParamMapping {
490 pg_key: "sslmode",
491 mysql_key: "ssl-mode",
492 },
493 ),
494 (
495 "sslRootCert",
496 SslParamMapping {
497 pg_key: "sslrootcert",
498 mysql_key: "ssl-ca",
499 },
500 ),
501 (
502 "sslCert",
503 SslParamMapping {
504 pg_key: "sslcert",
505 mysql_key: "ssl-cert",
506 },
507 ),
508 (
509 "sslKey",
510 SslParamMapping {
511 pg_key: "sslkey",
512 mysql_key: "ssl-key",
513 },
514 ),
515];
516
517pub fn enrich_db_url_with_ssl(
518 db_url: &str,
519 config: &SqlEndpointConfig,
520) -> Result<String, CamelError> {
521 let ssl_params: Vec<(&str, &str)> = [
522 config.ssl_mode.as_deref().map(|v| ("sslMode", v)),
523 config.ssl_root_cert.as_deref().map(|v| ("sslRootCert", v)),
524 config.ssl_cert.as_deref().map(|v| ("sslCert", v)),
525 config.ssl_key.as_deref().map(|v| ("sslKey", v)),
526 ]
527 .into_iter()
528 .flatten()
529 .collect();
530
531 if ssl_params.is_empty() {
532 return Ok(db_url.to_string());
533 }
534
535 let mut parsed = url::Url::parse(db_url).map_err(|e| {
536 CamelError::InvalidUri(format!(
537 "Cannot parse database URL for SSL enrichment: {}",
538 e
539 ))
540 })?;
541
542 let scheme = parsed.scheme();
543 if scheme.starts_with("sqlite") {
544 warn!(
545 "SSL options configured for SQLite database URL, but SQLite does not support SSL/TLS; ignoring sslMode/sslRootCert/sslCert/sslKey"
546 );
547 return Ok(db_url.to_string());
548 }
549
550 if scheme != "postgres" && scheme != "postgresql" && scheme != "mysql" {
551 return Ok(db_url.to_string());
552 }
553 let is_mysql = scheme == "mysql";
554
555 let mut query_pairs = parsed.query_pairs().collect::<Vec<_>>();
556 for (camel_name, value) in &ssl_params {
557 if let Some((_, mapping)) = SSL_MAPPINGS.iter().find(|(name, _)| *name == *camel_name) {
558 let driver_key = if is_mysql {
559 mapping.mysql_key
560 } else {
561 mapping.pg_key
562 };
563
564 if let Some(pos) = query_pairs.iter().position(|(k, _)| k == driver_key) {
565 query_pairs[pos].1 = (*value).into();
566 } else {
567 query_pairs.push((driver_key.into(), (*value).into()));
568 }
569 }
570 }
571
572 {
573 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
574 for (k, v) in query_pairs {
575 serializer.append_pair(&k, &v);
576 }
577 parsed.set_query(Some(&serializer.finish()));
578 }
579
580 Ok(parsed.to_string())
581}
582
583impl UriConfig for SqlEndpointConfig {
584 fn scheme() -> &'static str {
585 "sql"
586 }
587
588 fn from_uri(uri: &str) -> Result<Self, CamelError> {
589 let parts = parse_uri(uri)?;
590 Self::from_components(parts)
591 }
592
593 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
594 if parts.scheme != Self::scheme() {
596 return Err(CamelError::InvalidUri(format!(
597 "expected scheme '{}' but got '{}'",
598 Self::scheme(),
599 parts.scheme
600 )));
601 }
602
603 let params = &parts.params;
604
605 let (query, source_path) = if parts.path.starts_with("file:") {
610 let file_path = parts.path.trim_start_matches("file:").to_string();
611 (String::new(), Some(file_path))
612 } else {
613 (parts.path.clone(), None)
614 };
615
616 let db_url = params
618 .get("db_url")
619 .ok_or_else(|| CamelError::Config("db_url parameter is required".to_string()))?
620 .clone();
621
622 let max_connections = params.get("maxConnections").and_then(|v| v.parse().ok());
624 let min_connections = params.get("minConnections").and_then(|v| v.parse().ok());
625 let idle_timeout_secs = params.get("idleTimeoutSecs").and_then(|v| v.parse().ok());
626 let max_lifetime_secs = params.get("maxLifetimeSecs").and_then(|v| v.parse().ok());
627
628 let output_type = params
630 .get("outputType")
631 .map(|s| s.parse())
632 .transpose()?
633 .unwrap_or_default();
634 let placeholder = params
635 .get("placeholder")
636 .filter(|v| !v.is_empty())
637 .map(|v| {
638 if v.chars().count() != 1 {
639 return Err(CamelError::InvalidUri(format!(
640 "placeholder must be exactly one character, got '{}'",
641 v
642 )));
643 }
644 if !v.is_ascii() {
645 return Err(CamelError::InvalidUri(
646 "placeholder must be a single ASCII character".to_string(),
647 ));
648 }
649 Ok(v.chars().next().unwrap()) })
651 .transpose()?
652 .unwrap_or('#');
653 fn parse_bool_param(name: &str, value: &str) -> Result<bool, CamelError> {
658 if value.eq_ignore_ascii_case("true") {
659 Ok(true)
660 } else if value.eq_ignore_ascii_case("false") {
661 Ok(false)
662 } else {
663 Err(CamelError::InvalidUri(format!(
664 "{} must be 'true' or 'false', got '{}'",
665 name, value
666 )))
667 }
668 }
669
670 let use_placeholder = params
671 .get("usePlaceholder")
672 .map(|v| parse_bool_param("usePlaceholder", v))
673 .transpose()?
674 .unwrap_or(true);
675 let noop = params
676 .get("noop")
677 .map(|v| parse_bool_param("noop", v))
678 .transpose()?
679 .unwrap_or(false);
680 let in_separator = params
681 .get("inSeparator")
682 .map(|v| v.to_string())
683 .unwrap_or_else(|| ", ".to_string());
684 if in_separator.is_empty() {
685 return Err(CamelError::InvalidUri(
686 "inSeparator must not be empty".to_string(),
687 ));
688 }
689
690 let always_populate_statement = params
692 .get("alwaysPopulateStatement")
693 .map(|v| parse_bool_param("alwaysPopulateStatement", v))
694 .transpose()?
695 .unwrap_or(false);
696
697 let allow_named_parameters = params
699 .get("allowNamedParameters")
700 .map(|v| parse_bool_param("allowNamedParameters", v))
701 .transpose()?
702 .unwrap_or(true);
703
704 let fetch_size = params.get("fetchSize").and_then(|v| v.parse().ok());
706
707 let transaction_mode = params
709 .get("transactionMode")
710 .map(|s| s.parse())
711 .transpose()?
712 .unwrap_or_default();
713
714 let delay_ms = params
716 .get("delay")
717 .and_then(|v| v.parse().ok())
718 .unwrap_or(500);
719 let initial_delay_ms = params
720 .get("initialDelay")
721 .and_then(|v| v.parse().ok())
722 .unwrap_or(1000);
723 let max_messages_per_poll = params
724 .get("maxMessagesPerPoll")
725 .and_then(|v| v.parse().ok());
726 let on_consume = params.get("onConsume").cloned();
727 let on_consume_failed = params.get("onConsumeFailed").cloned();
728 let on_consume_batch_complete = params.get("onConsumeBatchComplete").cloned();
729 let route_empty_result_set = params
730 .get("routeEmptyResultSet")
731 .map(|v| parse_bool_param("routeEmptyResultSet", v))
732 .transpose()?
733 .unwrap_or(false);
734 let use_iterator = params
735 .get("useIterator")
736 .map(|v| parse_bool_param("useIterator", v))
737 .transpose()?
738 .unwrap_or(true);
739 let expected_update_count = params
740 .get("expectedUpdateCount")
741 .and_then(|v| v.parse().ok());
742 let break_batch_on_consume_fail = params
743 .get("breakBatchOnConsumeFail")
744 .map(|v| parse_bool_param("breakBatchOnConsumeFail", v))
745 .transpose()?
746 .unwrap_or(false);
747 let bridge_error_handler = params
748 .get("bridgeErrorHandler")
749 .map(|v| parse_bool_param("bridgeErrorHandler", v))
750 .transpose()?
751 .unwrap_or(false);
752
753 let repeat_count = params.get("repeatCount").and_then(|v| v.parse().ok());
755
756 let processing_strategy = params
758 .get("processingStrategy")
759 .map(|s| s.parse())
760 .transpose()?
761 .unwrap_or_default();
762
763 let poll_strategy = params
765 .get("pollStrategy")
766 .map(|s| s.parse())
767 .transpose()?
768 .unwrap_or_default();
769
770 let batch = params
772 .get("batch")
773 .map(|v| parse_bool_param("batch", v))
774 .transpose()?
775 .unwrap_or(false);
776 let use_message_body_for_sql = params
777 .get("useMessageBodyForSql")
778 .map(|v| parse_bool_param("useMessageBodyForSql", v))
779 .transpose()?
780 .unwrap_or(false);
781 let ssl_mode = params.get("sslMode").cloned();
782 let ssl_root_cert = params.get("sslRootCert").cloned();
783 let ssl_cert = params.get("sslCert").cloned();
784 let ssl_key = params.get("sslKey").cloned();
785
786 Ok(Self {
787 db_url,
788 max_connections,
789 min_connections,
790 idle_timeout_secs,
791 max_lifetime_secs,
792 query,
793 source_path,
794 output_type,
795 placeholder,
796 use_placeholder,
797 noop,
798 in_separator,
799 always_populate_statement,
800 allow_named_parameters,
801 fetch_size,
802 transaction_mode,
803 delay_ms,
804 initial_delay_ms,
805 max_messages_per_poll,
806 on_consume,
807 on_consume_failed,
808 on_consume_batch_complete,
809 route_empty_result_set,
810 use_iterator,
811 expected_update_count,
812 break_batch_on_consume_fail,
813 bridge_error_handler,
814 repeat_count,
815 processing_strategy,
816 poll_strategy,
817 batch,
818 use_message_body_for_sql,
819 ssl_mode,
820 ssl_root_cert,
821 ssl_cert,
822 ssl_key,
823 })
824 }
825}
826
827#[cfg(test)]
828mod tests {
829 use super::*;
830
831 #[test]
832 fn config_defaults() {
833 let mut c =
834 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
835 c.resolve_defaults();
836 assert_eq!(c.query, "select 1");
837 assert_eq!(c.db_url, "postgres://localhost/test");
838 assert_eq!(c.max_connections, Some(5));
839 assert_eq!(c.min_connections, Some(1));
840 assert_eq!(c.idle_timeout_secs, Some(300));
841 assert_eq!(c.max_lifetime_secs, Some(1800));
842 assert_eq!(c.output_type, SqlOutputType::SelectList);
843 assert_eq!(c.placeholder, '#');
844 assert!(!c.noop);
845 assert_eq!(c.in_separator, ", ");
846 assert_eq!(c.delay_ms, 500);
847 assert_eq!(c.initial_delay_ms, 1000);
848 assert!(c.max_messages_per_poll.is_none());
849 assert!(c.on_consume.is_none());
850 assert!(c.on_consume_failed.is_none());
851 assert!(c.on_consume_batch_complete.is_none());
852 assert!(!c.route_empty_result_set);
853 assert!(c.use_iterator);
854 assert!(c.expected_update_count.is_none());
855 assert!(!c.break_batch_on_consume_fail);
856 assert!(!c.batch);
857 assert!(!c.use_message_body_for_sql);
858 assert!(c.ssl_mode.is_none());
859 assert!(c.ssl_root_cert.is_none());
860 assert!(c.ssl_cert.is_none());
861 assert!(c.ssl_key.is_none());
862 assert!(!c.always_populate_statement);
864 assert!(c.allow_named_parameters);
865 assert!(c.fetch_size.is_none());
866 assert_eq!(c.transaction_mode, TransactionMode::Auto);
867 assert!(c.repeat_count.is_none());
868 assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
869 assert_eq!(c.poll_strategy, PollStrategy::Sequential);
870 }
871
872 #[test]
873 fn ssl_none_by_default() {
874 let c =
875 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
876 assert!(c.ssl_mode.is_none());
877 assert!(c.ssl_root_cert.is_none());
878 assert!(c.ssl_cert.is_none());
879 assert!(c.ssl_key.is_none());
880 }
881
882 #[test]
883 fn ssl_mode_from_uri() {
884 let c = SqlEndpointConfig::from_uri(
885 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
886 )
887 .unwrap();
888 assert_eq!(c.ssl_mode, Some("require".to_string()));
889 assert!(c.ssl_root_cert.is_none());
890 }
891
892 #[test]
893 fn ssl_all_params_from_uri() {
894 let c = SqlEndpointConfig::from_uri(
895 "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
896 )
897 .unwrap();
898 assert_eq!(c.ssl_mode, Some("require".to_string()));
899 assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
900 assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
901 assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
902 }
903
904 #[test]
905 fn ssl_global_applied_to_endpoint() {
906 let mut c =
907 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
908 let global = SqlGlobalConfig::default()
909 .with_ssl_mode("require")
910 .with_ssl_root_cert("/etc/ssl/ca.pem");
911 c.apply_defaults(&global);
912 assert_eq!(c.ssl_mode, Some("require".to_string()));
913 assert_eq!(c.ssl_root_cert, Some("/etc/ssl/ca.pem".to_string()));
914 assert!(c.ssl_cert.is_none());
915 assert!(c.ssl_key.is_none());
916 }
917
918 #[test]
919 fn ssl_uri_overrides_global() {
920 let mut c = SqlEndpointConfig::from_uri(
921 "sql:select 1?db_url=postgres://localhost/test&sslMode=verify-full",
922 )
923 .unwrap();
924 let global = SqlGlobalConfig::default().with_ssl_mode("require");
925 c.apply_defaults(&global);
926 assert_eq!(c.ssl_mode, Some("verify-full".to_string()));
927 }
928
929 #[test]
930 fn config_wrong_scheme() {
931 assert!(SqlEndpointConfig::from_uri("redis://localhost:6379").is_err());
932 }
933
934 #[test]
935 fn config_missing_db_url() {
936 assert!(SqlEndpointConfig::from_uri("sql:select 1").is_err());
937 }
938
939 #[test]
940 fn config_output_type_select_one() {
941 let c = SqlEndpointConfig::from_uri(
942 "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
943 )
944 .unwrap();
945 assert_eq!(c.output_type, SqlOutputType::SelectOne);
946 }
947
948 #[test]
949 fn config_output_type_stream_list() {
950 let c = SqlEndpointConfig::from_uri(
951 "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
952 )
953 .unwrap();
954 assert_eq!(c.output_type, SqlOutputType::StreamList);
955 }
956
957 #[test]
958 fn in_separator_default() {
959 let c =
960 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
961 assert_eq!(c.in_separator, ", ");
962 }
963
964 #[test]
965 fn in_separator_from_uri() {
966 let c = SqlEndpointConfig::from_uri(
967 "sql:select 1?db_url=postgres://localhost/test&inSeparator=;",
968 )
969 .unwrap();
970 assert_eq!(c.in_separator, ";");
971 }
972
973 #[test]
974 fn in_separator_empty_rejected() {
975 let result = SqlEndpointConfig::from_uri(
976 "sql:select 1?db_url=postgres://localhost/test&inSeparator=",
977 );
978 assert!(result.is_err());
979 let msg = format!("{:?}", result.unwrap_err());
980 assert!(msg.contains("inSeparator") || msg.contains("empty"));
981 }
982
983 #[test]
984 fn config_consumer_options() {
985 let c = SqlEndpointConfig::from_uri(
986 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
987 ).unwrap();
988 assert_eq!(c.delay_ms, 2000);
989 assert_eq!(c.initial_delay_ms, 500);
990 assert_eq!(c.max_messages_per_poll, Some(10));
991 assert_eq!(
992 c.on_consume,
993 Some("update t set done=true where id=:#id".to_string())
994 );
995 assert_eq!(
996 c.on_consume_failed,
997 Some("update t set failed=true where id=:#id".to_string())
998 );
999 assert_eq!(
1000 c.on_consume_batch_complete,
1001 Some("delete from t where done=true".to_string())
1002 );
1003 assert!(c.route_empty_result_set);
1004 assert!(!c.use_iterator);
1005 assert_eq!(c.expected_update_count, Some(1));
1006 assert!(c.break_batch_on_consume_fail);
1007 assert!(!c.bridge_error_handler);
1008 }
1009
1010 #[test]
1011 fn config_producer_options() {
1012 let c = SqlEndpointConfig::from_uri(
1013 "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
1014 ).unwrap();
1015 assert!(c.batch);
1016 assert!(c.use_message_body_for_sql);
1017 assert!(c.noop);
1018 }
1019
1020 #[test]
1021 fn config_pool_options() {
1022 let c = SqlEndpointConfig::from_uri(
1023 "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
1024 ).unwrap();
1025 assert_eq!(c.max_connections, Some(20));
1026 assert_eq!(c.min_connections, Some(3));
1027 assert_eq!(c.idle_timeout_secs, Some(600));
1028 assert_eq!(c.max_lifetime_secs, Some(3600));
1029 }
1030
1031 #[test]
1032 fn config_query_with_special_chars() {
1033 let c = SqlEndpointConfig::from_uri(
1034 "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
1035 )
1036 .unwrap();
1037 assert_eq!(
1038 c.query,
1039 "select * from users where name = :#name and age > #"
1040 );
1041 }
1042
1043 #[test]
1044 fn output_type_from_str() {
1045 assert_eq!(
1046 "SelectList".parse::<SqlOutputType>().unwrap(),
1047 SqlOutputType::SelectList
1048 );
1049 assert_eq!(
1050 "SelectOne".parse::<SqlOutputType>().unwrap(),
1051 SqlOutputType::SelectOne
1052 );
1053 assert_eq!(
1054 "StreamList".parse::<SqlOutputType>().unwrap(),
1055 SqlOutputType::StreamList
1056 );
1057 assert!("Invalid".parse::<SqlOutputType>().is_err());
1058 }
1059
1060 #[tokio::test]
1062 async fn config_file_not_found() {
1063 let mut config = SqlEndpointConfig::from_uri(
1064 "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
1065 )
1066 .expect("from_uri should defer file reading");
1067 assert_eq!(
1069 config.source_path,
1070 Some("/nonexistent/path/query.sql".to_string())
1071 );
1072 assert!(config.query.is_empty());
1073
1074 let result = config.resolve_file_query().await;
1076 assert!(result.is_err());
1077 let msg = format!("{:?}", result.unwrap_err());
1078 assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
1079 }
1080
1081 #[tokio::test]
1083 async fn config_file_query() {
1084 use std::io::Write;
1085 let unique_name = format!(
1086 "test_sql_query_{}.sql",
1087 std::time::SystemTime::now()
1088 .duration_since(std::time::UNIX_EPOCH)
1089 .unwrap_or_default()
1090 .as_nanos()
1091 );
1092 let mut tmp = std::env::temp_dir();
1093 tmp.push(unique_name);
1094 {
1095 let mut f = std::fs::File::create(&tmp).unwrap();
1096 writeln!(f, "SELECT * FROM users").unwrap();
1097 }
1098 let uri = format!(
1099 "sql:file:{}?db_url=postgres://localhost/test",
1100 tmp.display()
1101 );
1102 let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1103 assert!(c.query.is_empty());
1105 assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1106
1107 c.resolve_file_query()
1109 .await
1110 .expect("file query should resolve");
1111 assert_eq!(c.query, "SELECT * FROM users");
1112 std::fs::remove_file(&tmp).ok();
1113 }
1114
1115 #[test]
1117 fn pool_fields_none_when_not_set() {
1118 let c =
1119 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1120 assert_eq!(c.max_connections, None);
1121 assert_eq!(c.min_connections, None);
1122 assert_eq!(c.idle_timeout_secs, None);
1123 assert_eq!(c.max_lifetime_secs, None);
1124 }
1125
1126 #[test]
1127 fn apply_defaults_fills_none() {
1128 let mut c =
1129 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1130 let global = SqlGlobalConfig {
1131 max_connections: 10,
1132 min_connections: 2,
1133 idle_timeout_secs: 600,
1134 max_lifetime_secs: 3600,
1135 ssl_mode: None,
1136 ssl_root_cert: None,
1137 ssl_cert: None,
1138 ssl_key: None,
1139 };
1140 c.apply_defaults(&global);
1141 assert_eq!(c.max_connections, Some(10));
1142 assert_eq!(c.min_connections, Some(2));
1143 assert_eq!(c.idle_timeout_secs, Some(600));
1144 assert_eq!(c.max_lifetime_secs, Some(3600));
1145 assert!(c.ssl_mode.is_none());
1146 assert!(c.ssl_root_cert.is_none());
1147 assert!(c.ssl_cert.is_none());
1148 assert!(c.ssl_key.is_none());
1149 }
1150
1151 #[test]
1152 fn apply_defaults_does_not_override() {
1153 let mut c = SqlEndpointConfig::from_uri(
1154 "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=5",
1155 )
1156 .unwrap();
1157 let global = SqlGlobalConfig {
1158 max_connections: 10,
1159 min_connections: 2,
1160 idle_timeout_secs: 600,
1161 max_lifetime_secs: 3600,
1162 ssl_mode: None,
1163 ssl_root_cert: None,
1164 ssl_cert: None,
1165 ssl_key: None,
1166 };
1167 c.apply_defaults(&global);
1168 assert_eq!(c.max_connections, Some(99));
1170 assert_eq!(c.min_connections, Some(5));
1171 assert_eq!(c.idle_timeout_secs, Some(600));
1173 assert_eq!(c.max_lifetime_secs, Some(3600));
1174 }
1175
1176 #[test]
1177 fn resolve_defaults_fills_remaining() {
1178 let mut c = SqlEndpointConfig::from_uri(
1179 "sql:select 1?db_url=postgres://localhost/test&maxConnections=7",
1180 )
1181 .unwrap();
1182 c.resolve_defaults();
1183 assert_eq!(c.max_connections, Some(7)); assert_eq!(c.min_connections, Some(1)); assert_eq!(c.idle_timeout_secs, Some(300)); assert_eq!(c.max_lifetime_secs, Some(1800)); }
1188
1189 #[test]
1190 fn global_config_builder() {
1191 let c = SqlGlobalConfig::default()
1192 .with_max_connections(20)
1193 .with_min_connections(3)
1194 .with_idle_timeout_secs(600)
1195 .with_max_lifetime_secs(3600)
1196 .with_ssl_mode("require")
1197 .with_ssl_root_cert("/ca.pem")
1198 .with_ssl_cert("/cert.pem")
1199 .with_ssl_key("/key.pem");
1200 assert_eq!(c.max_connections, 20);
1201 assert_eq!(c.min_connections, 3);
1202 assert_eq!(c.idle_timeout_secs, 600);
1203 assert_eq!(c.max_lifetime_secs, 3600);
1204 assert_eq!(c.ssl_mode, Some("require".to_string()));
1205 assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
1206 assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
1207 assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
1208 }
1209
1210 #[test]
1211 fn enrich_postgres_ssl_mode() {
1212 let mut c = SqlEndpointConfig::from_uri(
1213 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1214 )
1215 .unwrap();
1216 c.resolve_defaults();
1217 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1218 assert!(url.contains("sslmode=require"), "got: {}", url);
1219 }
1220
1221 #[test]
1222 fn enrich_postgres_all_ssl() {
1223 let mut c = SqlEndpointConfig::from_uri(
1224 "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
1225 )
1226 .unwrap();
1227 c.resolve_defaults();
1228 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1229 assert!(url.contains("sslmode=require"), "got: {}", url);
1230 assert!(url.contains("sslrootcert="), "got: {}", url);
1231 assert!(url.contains("sslcert="), "got: {}", url);
1232 assert!(url.contains("sslkey="), "got: {}", url);
1233 }
1234
1235 #[test]
1236 fn enrich_mysql_ssl() {
1237 let mut c = SqlEndpointConfig::from_uri(
1238 "sql:select 1?db_url=mysql://localhost/test&sslMode=require",
1239 )
1240 .unwrap();
1241 c.resolve_defaults();
1242 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1243 assert!(url.contains("ssl-mode=require"), "got: {}", url);
1244 }
1245
1246 #[test]
1247 fn enrich_existing_query_params() {
1248 let mut c = SqlEndpointConfig::from_uri(
1249 "sql:select 1?db_url=postgres://localhost/test?existing=1&sslMode=require",
1250 )
1251 .unwrap();
1252 c.resolve_defaults();
1253 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1254 assert!(url.contains("existing=1"), "got: {}", url);
1255 assert!(url.contains("sslmode=require"), "got: {}", url);
1256 }
1257
1258 #[test]
1259 fn enrich_override_existing() {
1260 let mut c = SqlEndpointConfig::from_uri(
1261 "sql:select 1?db_url=postgres://localhost/test?sslmode=allow&sslMode=require",
1262 )
1263 .unwrap();
1264 c.resolve_defaults();
1265 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1266 assert!(url.contains("sslmode=require"), "got: {}", url);
1267 assert!(!url.contains("sslmode=allow"), "got: {}", url);
1268 }
1269
1270 #[test]
1271 fn enrich_no_params() {
1272 let mut c =
1273 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1274 c.resolve_defaults();
1275 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1276 assert_eq!(url, "postgres://localhost/test");
1277 }
1278
1279 #[test]
1280 fn enrich_url_encodes_paths() {
1281 let mut c = SqlEndpointConfig::from_uri(
1282 "sql:select 1?db_url=postgres://localhost/test&sslRootCert=/path/to/my%20cert.pem",
1283 )
1284 .unwrap();
1285 c.resolve_defaults();
1286 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1287 assert!(url.contains("sslrootcert="), "got: {}", url);
1288 }
1289
1290 #[test]
1291 fn enrich_unsupported_scheme_returns_unchanged() {
1292 let mut c = SqlEndpointConfig::from_uri(
1293 "sql:select 1?db_url=sqlite://localhost/test.db&sslMode=require",
1294 )
1295 .unwrap();
1296 c.resolve_defaults();
1297 let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1298 assert_eq!(url, "sqlite://localhost/test.db");
1299 }
1300
1301 #[test]
1302 fn enrich_invalid_url_returns_error() {
1303 let mut c = SqlEndpointConfig::from_uri(
1304 "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1305 )
1306 .unwrap();
1307 c.resolve_defaults();
1308 let result = enrich_db_url_with_ssl("://not-a-valid-url", &c);
1309 assert!(result.is_err());
1310 }
1311
1312 #[test]
1316 fn debug_redacts_db_url_with_password() {
1317 let c = SqlEndpointConfig::from_uri(
1318 "sql:select 1?db_url=postgres://user:secret123@localhost/test",
1319 )
1320 .unwrap();
1321 let debug_output = format!("{:?}", c);
1322 assert!(
1323 !debug_output.contains("secret123"),
1324 "Debug output must not contain password: {}",
1325 debug_output
1326 );
1327 assert!(
1328 debug_output.contains("***"),
1329 "Debug output must contain redacted marker: {}",
1330 debug_output
1331 );
1332 }
1333
1334 #[test]
1335 fn debug_redacts_ssl_key() {
1336 let c = SqlEndpointConfig::from_uri(
1337 "sql:select 1?db_url=postgres://localhost/test&sslKey=/secret/key.pem",
1338 )
1339 .unwrap();
1340 let debug_output = format!("{:?}", c);
1341 assert!(
1342 !debug_output.contains("/secret/key.pem"),
1343 "Debug output must not contain ssl_key path: {}",
1344 debug_output
1345 );
1346 }
1347
1348 #[test]
1349 fn debug_global_config_redacts_ssl_key() {
1350 let c = SqlGlobalConfig::default().with_ssl_key("/secret/key.pem");
1351 let debug_output = format!("{:?}", c);
1352 assert!(
1353 !debug_output.contains("/secret/key.pem"),
1354 "Debug output must not contain ssl_key path: {}",
1355 debug_output
1356 );
1357 assert!(
1358 debug_output.contains("***"),
1359 "Debug output must contain redacted marker: {}",
1360 debug_output
1361 );
1362 }
1363
1364 #[test]
1365 fn redact_db_url_with_credentials() {
1366 assert_eq!(
1367 redact_db_url("postgres://user:pass@host/db"),
1368 "postgres://***:***@host/db"
1369 );
1370 }
1371
1372 #[test]
1373 fn redact_db_url_without_credentials() {
1374 assert_eq!(redact_db_url("sqlite::memory:"), "sqlite::memory:");
1375 }
1376
1377 #[test]
1378 fn redact_db_url_invalid_returns_original() {
1379 assert_eq!(redact_db_url("not-a-url"), "not-a-url");
1380 }
1381
1382 #[test]
1384 fn use_placeholder_defaults_to_true() {
1385 let c =
1386 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1387 assert!(c.use_placeholder);
1388 }
1389
1390 #[test]
1391 fn use_placeholder_false_from_uri() {
1392 let c = SqlEndpointConfig::from_uri(
1393 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=false",
1394 )
1395 .unwrap();
1396 assert!(!c.use_placeholder);
1397 }
1398
1399 #[test]
1400 fn use_placeholder_true_from_uri() {
1401 let c = SqlEndpointConfig::from_uri(
1402 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=true",
1403 )
1404 .unwrap();
1405 assert!(c.use_placeholder);
1406 }
1407
1408 #[test]
1410 fn use_placeholder_rejects_invalid_value() {
1411 let result = SqlEndpointConfig::from_uri(
1412 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=1",
1413 );
1414 assert!(result.is_err());
1415 let msg = format!("{:?}", result.unwrap_err());
1416 assert!(msg.contains("usePlaceholder") && msg.contains("true") && msg.contains("false"));
1417 }
1418
1419 #[test]
1420 fn use_placeholder_rejects_typo_tru() {
1421 let result = SqlEndpointConfig::from_uri(
1422 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=tru",
1423 );
1424 assert!(result.is_err());
1425 }
1426
1427 #[test]
1428 fn use_placeholder_rejects_yes() {
1429 let result = SqlEndpointConfig::from_uri(
1430 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=yes",
1431 );
1432 assert!(result.is_err());
1433 }
1434
1435 #[test]
1436 fn noop_rejects_invalid_value() {
1437 let result =
1438 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&noop=1");
1439 assert!(result.is_err());
1440 let msg = format!("{:?}", result.unwrap_err());
1441 assert!(msg.contains("noop"));
1442 }
1443
1444 #[test]
1445 fn batch_rejects_invalid_value() {
1446 let result =
1447 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&batch=yes");
1448 assert!(result.is_err());
1449 let msg = format!("{:?}", result.unwrap_err());
1450 assert!(msg.contains("batch"));
1451 }
1452
1453 #[test]
1454 fn route_empty_result_set_rejects_invalid_value() {
1455 let result = SqlEndpointConfig::from_uri(
1456 "sql:select 1?db_url=postgres://localhost/test&routeEmptyResultSet=on",
1457 );
1458 assert!(result.is_err());
1459 }
1460
1461 #[test]
1462 fn use_iterator_rejects_invalid_value() {
1463 let result = SqlEndpointConfig::from_uri(
1464 "sql:select 1?db_url=postgres://localhost/test&useIterator=1",
1465 );
1466 assert!(result.is_err());
1467 }
1468
1469 #[test]
1470 fn break_batch_on_consume_fail_rejects_invalid_value() {
1471 let result = SqlEndpointConfig::from_uri(
1472 "sql:select 1?db_url=postgres://localhost/test&breakBatchOnConsumeFail=yes",
1473 );
1474 assert!(result.is_err());
1475 }
1476
1477 #[test]
1478 fn use_message_body_for_sql_rejects_invalid_value() {
1479 let result = SqlEndpointConfig::from_uri(
1480 "sql:select 1?db_url=postgres://localhost/test&useMessageBodyForSql=1",
1481 );
1482 assert!(result.is_err());
1483 }
1484
1485 #[test]
1487 fn boolean_params_case_insensitive() {
1488 let c = SqlEndpointConfig::from_uri(
1489 "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=TRUE&noop=FALSE&batch=True&useIterator=False&bridgeErrorHandler=TRUE",
1490 )
1491 .unwrap();
1492 assert!(c.use_placeholder);
1493 assert!(!c.noop);
1494 assert!(c.batch);
1495 assert!(!c.use_iterator);
1496 assert!(c.bridge_error_handler);
1497 }
1498
1499 #[test]
1501 fn multi_char_placeholder_rejected() {
1502 let result = SqlEndpointConfig::from_uri(
1503 "sql:select 1?db_url=postgres://localhost/test&placeholder=##",
1504 );
1505 assert!(result.is_err());
1506 let msg = format!("{:?}", result.unwrap_err());
1507 assert!(msg.contains("placeholder") && msg.contains("one character"));
1508 }
1509
1510 #[test]
1511 fn non_ascii_placeholder_rejected() {
1512 let result = SqlEndpointConfig::from_uri(
1513 "sql:select 1?db_url=postgres://localhost/test&placeholder=%C2%A2",
1514 );
1515 assert!(result.is_err());
1516 }
1517
1518 #[test]
1519 fn single_char_placeholder_accepted() {
1520 let c = SqlEndpointConfig::from_uri(
1521 "sql:select 1?db_url=postgres://localhost/test&placeholder=$",
1522 )
1523 .unwrap();
1524 assert_eq!(c.placeholder, '$');
1525 }
1526
1527 #[test]
1528 fn empty_placeholder_falls_back_to_default() {
1529 let c = SqlEndpointConfig::from_uri(
1531 "sql:select 1?db_url=postgres://localhost/test&placeholder=",
1532 )
1533 .unwrap();
1534 assert_eq!(c.placeholder, '#');
1535 }
1536
1537 #[tokio::test]
1539 async fn file_query_cached_in_config() {
1540 use std::io::Write;
1541 let unique_name = format!(
1542 "test_sql_cached_{}.sql",
1543 std::time::SystemTime::now()
1544 .duration_since(std::time::UNIX_EPOCH)
1545 .unwrap_or_default()
1546 .as_nanos()
1547 );
1548 let mut tmp = std::env::temp_dir();
1549 tmp.push(unique_name);
1550 {
1551 let mut f = std::fs::File::create(&tmp).unwrap();
1552 writeln!(f, "SELECT * FROM cached_test").unwrap();
1553 }
1554 let uri = format!(
1555 "sql:file:{}?db_url=postgres://localhost/test",
1556 tmp.display()
1557 );
1558 let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1559 assert!(c.query.is_empty());
1561 assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1562
1563 c.resolve_file_query()
1565 .await
1566 .expect("resolve should succeed");
1567 assert_eq!(c.query, "SELECT * FROM cached_test");
1568
1569 std::fs::remove_file(&tmp).ok();
1571 assert_eq!(c.query, "SELECT * FROM cached_test");
1572 }
1573
1574 #[test]
1578 fn always_populate_statement_defaults_to_false() {
1579 let c =
1580 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1581 assert!(!c.always_populate_statement);
1582 }
1583
1584 #[test]
1585 fn always_populate_statement_from_uri() {
1586 let c = SqlEndpointConfig::from_uri(
1587 "sql:select 1?db_url=postgres://localhost/test&alwaysPopulateStatement=true",
1588 )
1589 .unwrap();
1590 assert!(c.always_populate_statement);
1591 }
1592
1593 #[test]
1595 fn allow_named_parameters_defaults_to_true() {
1596 let c =
1597 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1598 assert!(c.allow_named_parameters);
1599 }
1600
1601 #[test]
1602 fn allow_named_parameters_false_from_uri() {
1603 let c = SqlEndpointConfig::from_uri(
1604 "sql:select 1?db_url=postgres://localhost/test&allowNamedParameters=false",
1605 )
1606 .unwrap();
1607 assert!(!c.allow_named_parameters);
1608 }
1609
1610 #[test]
1612 fn fetch_size_defaults_to_none() {
1613 let c =
1614 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1615 assert!(c.fetch_size.is_none());
1616 }
1617
1618 #[test]
1619 fn fetch_size_from_uri() {
1620 let c = SqlEndpointConfig::from_uri(
1621 "sql:select 1?db_url=postgres://localhost/test&fetchSize=1000",
1622 )
1623 .unwrap();
1624 assert_eq!(c.fetch_size, Some(1000));
1625 }
1626
1627 #[test]
1629 fn transaction_mode_defaults_to_auto() {
1630 let c =
1631 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1632 assert_eq!(c.transaction_mode, TransactionMode::Auto);
1633 }
1634
1635 #[test]
1636 fn transaction_mode_managed_from_uri() {
1637 let c = SqlEndpointConfig::from_uri(
1638 "sql:select 1?db_url=postgres://localhost/test&transactionMode=Managed",
1639 )
1640 .unwrap();
1641 assert_eq!(c.transaction_mode, TransactionMode::Managed);
1642 }
1643
1644 #[test]
1645 fn transaction_mode_invalid_rejected() {
1646 let result = SqlEndpointConfig::from_uri(
1647 "sql:select 1?db_url=postgres://localhost/test&transactionMode=Invalid",
1648 );
1649 assert!(result.is_err());
1650 }
1651
1652 #[test]
1654 fn repeat_count_defaults_to_none() {
1655 let c =
1656 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1657 assert!(c.repeat_count.is_none());
1658 }
1659
1660 #[test]
1661 fn repeat_count_from_uri() {
1662 let c = SqlEndpointConfig::from_uri(
1663 "sql:select 1?db_url=postgres://localhost/test&repeatCount=10",
1664 )
1665 .unwrap();
1666 assert_eq!(c.repeat_count, Some(10));
1667 }
1668
1669 #[test]
1671 fn processing_strategy_defaults_to_direct() {
1672 let c =
1673 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1674 assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
1675 }
1676
1677 #[test]
1678 fn processing_strategy_scheduled_from_uri() {
1679 let c = SqlEndpointConfig::from_uri(
1680 "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Scheduled",
1681 )
1682 .unwrap();
1683 assert_eq!(c.processing_strategy, ProcessingStrategy::Scheduled);
1684 }
1685
1686 #[test]
1687 fn processing_strategy_invalid_rejected() {
1688 let result = SqlEndpointConfig::from_uri(
1689 "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Invalid",
1690 );
1691 assert!(result.is_err());
1692 }
1693
1694 #[test]
1696 fn poll_strategy_defaults_to_sequential() {
1697 let c =
1698 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1699 assert_eq!(c.poll_strategy, PollStrategy::Sequential);
1700 }
1701
1702 #[test]
1703 fn poll_strategy_burst_from_uri() {
1704 let c = SqlEndpointConfig::from_uri(
1705 "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Burst",
1706 )
1707 .unwrap();
1708 assert_eq!(c.poll_strategy, PollStrategy::Burst);
1709 }
1710
1711 #[test]
1712 fn poll_strategy_invalid_rejected() {
1713 let result = SqlEndpointConfig::from_uri(
1714 "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Invalid",
1715 );
1716 assert!(result.is_err());
1717 }
1718}