Skip to main content

camel_component_sql/
config.rs

1use std::str::FromStr;
2use std::time::Duration;
3
4use camel_component_api::CamelError;
5use camel_component_api::NetworkRetryPolicy;
6use camel_component_api::{UriComponents, UriConfig, parse_uri};
7use tracing::warn;
8
9/// Redaction helper: returns `Some("***")` if the option is `Some`, otherwise `None`.
10fn redacted_opt(opt: &Option<String>) -> Option<&'static str> {
11    if opt.is_some() { Some("***") } else { None }
12}
13
14/// Redacts the user:password portion of a database URL for safe display.
15/// Returns `"scheme://***@host/db"` for URLs with userinfo, or the original URL otherwise.
16pub fn redact_db_url(db_url: &str) -> String {
17    match url::Url::parse(db_url) {
18        Ok(mut parsed) => {
19            if parsed.username().is_empty() && parsed.password().is_none() {
20                return db_url.to_string();
21            }
22            let _ = parsed.set_username("***");
23            let _ = parsed.set_password(Some("***"));
24            parsed.to_string()
25        }
26        Err(_) => db_url.to_string(),
27    }
28}
29
30/// Output type for SQL query results.
31#[derive(Debug, Clone, PartialEq, Default)]
32pub enum SqlOutputType {
33    /// Return all rows as a list.
34    #[default]
35    SelectList,
36    /// Return a single row (first result).
37    SelectOne,
38    /// Stream results as an async iterator.
39    StreamList,
40}
41
42impl FromStr for SqlOutputType {
43    type Err = CamelError;
44
45    fn from_str(s: &str) -> Result<Self, Self::Err> {
46        match s {
47            "SelectList" => Ok(SqlOutputType::SelectList),
48            "SelectOne" => Ok(SqlOutputType::SelectOne),
49            "StreamList" => Ok(SqlOutputType::StreamList),
50            _ => Err(CamelError::InvalidUri(format!(
51                "Unknown output type: {}",
52                s
53            ))),
54        }
55    }
56}
57
58/// Transaction mode for SQL operations.
59///
60/// - `Auto`: Each statement auto-commits (default, current behavior).
61/// - `Managed`: Explicit transaction boundaries (future; currently logs a warning
62///   and falls back to Auto).
63///
64// TODO(SQL-002): managed transaction mode — implement explicit transaction boundaries
65#[derive(Debug, Clone, PartialEq, Default)]
66pub enum TransactionMode {
67    /// Auto-commit each statement (default).
68    #[default]
69    Auto,
70    /// Managed transactions — not yet implemented.
71    Managed,
72}
73
74impl FromStr for TransactionMode {
75    type Err = CamelError;
76
77    fn from_str(s: &str) -> Result<Self, Self::Err> {
78        match s {
79            "Auto" => Ok(TransactionMode::Auto),
80            "Managed" => Ok(TransactionMode::Managed),
81            _ => Err(CamelError::InvalidUri(format!(
82                "Unknown transaction mode: {}. Expected 'Auto' or 'Managed'",
83                s
84            ))),
85        }
86    }
87}
88
89impl std::fmt::Display for TransactionMode {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            TransactionMode::Auto => write!(f, "Auto"),
93            TransactionMode::Managed => write!(f, "Managed"),
94        }
95    }
96}
97
98/// Processing strategy for SQL consumers.
99#[derive(Debug, Clone, PartialEq, Default)]
100pub enum ProcessingStrategy {
101    /// Process rows directly in the polling task (default).
102    #[default]
103    Direct,
104    /// Schedule processing via a separate task (deferred execution).
105    Scheduled,
106}
107
108impl FromStr for ProcessingStrategy {
109    type Err = CamelError;
110
111    fn from_str(s: &str) -> Result<Self, Self::Err> {
112        match s {
113            "Direct" => Ok(ProcessingStrategy::Direct),
114            "Scheduled" => Ok(ProcessingStrategy::Scheduled),
115            _ => Err(CamelError::InvalidUri(format!(
116                "Unknown processing strategy: {}. Expected 'Direct' or 'Scheduled'",
117                s
118            ))),
119        }
120    }
121}
122
123impl std::fmt::Display for ProcessingStrategy {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        match self {
126            ProcessingStrategy::Direct => write!(f, "Direct"),
127            ProcessingStrategy::Scheduled => write!(f, "Scheduled"),
128        }
129    }
130}
131
132/// Poll strategy for SQL consumers.
133#[derive(Debug, Clone, PartialEq, Default)]
134pub enum PollStrategy {
135    /// Poll sequentially with delay between polls (default).
136    #[default]
137    Sequential,
138    /// Poll in bursts — execute multiple queries in rapid succession.
139    Burst,
140}
141
142impl FromStr for PollStrategy {
143    type Err = CamelError;
144
145    fn from_str(s: &str) -> Result<Self, Self::Err> {
146        match s {
147            "Sequential" => Ok(PollStrategy::Sequential),
148            "Burst" => Ok(PollStrategy::Burst),
149            _ => Err(CamelError::InvalidUri(format!(
150                "Unknown poll strategy: {}. Expected 'Sequential' or 'Burst'",
151                s
152            ))),
153        }
154    }
155}
156
157impl std::fmt::Display for PollStrategy {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        match self {
160            PollStrategy::Sequential => write!(f, "Sequential"),
161            PollStrategy::Burst => write!(f, "Burst"),
162        }
163    }
164}
165
166/// Global configuration for SQL component.
167///
168/// This struct supports serde deserialization with defaults and builder methods.
169/// It holds pool configuration that can be applied as defaults to endpoints.
170///
171/// **Security note:** `Debug` implementation redacts sensitive fields (SSL key paths).
172#[derive(Clone, PartialEq, serde::Deserialize)]
173#[serde(default)]
174pub struct SqlGlobalConfig {
175    pub max_connections: u32,
176    pub min_connections: u32,
177    pub idle_timeout_secs: u64,
178    pub max_lifetime_secs: u64,
179    // SSL/TLS
180    pub ssl_mode: Option<String>,
181    pub ssl_root_cert: Option<String>,
182    pub ssl_cert: Option<String>,
183    pub ssl_key: Option<String>,
184    /// Retry policy for transient database connection failures.
185    #[serde(default)]
186    pub retry: NetworkRetryPolicy,
187}
188
189impl std::fmt::Debug for SqlGlobalConfig {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        f.debug_struct("SqlGlobalConfig")
192            .field("max_connections", &self.max_connections)
193            .field("min_connections", &self.min_connections)
194            .field("idle_timeout_secs", &self.idle_timeout_secs)
195            .field("max_lifetime_secs", &self.max_lifetime_secs)
196            .field("ssl_mode", &self.ssl_mode)
197            .field("ssl_root_cert", &self.ssl_root_cert)
198            .field("ssl_cert", &self.ssl_cert)
199            .field("ssl_key", &redacted_opt(&self.ssl_key))
200            .field("retry", &self.retry)
201            .finish()
202    }
203}
204
205impl Default for SqlGlobalConfig {
206    fn default() -> Self {
207        Self {
208            max_connections: 5,
209            min_connections: 1,
210            idle_timeout_secs: 300,
211            max_lifetime_secs: 1800,
212            ssl_mode: None,
213            ssl_root_cert: None,
214            ssl_cert: None,
215            ssl_key: None,
216            retry: NetworkRetryPolicy::default(),
217        }
218    }
219}
220
221impl SqlGlobalConfig {
222    pub fn new() -> Self {
223        Self::default()
224    }
225
226    pub fn with_max_connections(mut self, value: u32) -> Self {
227        self.max_connections = value;
228        self
229    }
230
231    pub fn with_min_connections(mut self, value: u32) -> Self {
232        self.min_connections = value;
233        self
234    }
235
236    pub fn with_idle_timeout_secs(mut self, value: u64) -> Self {
237        self.idle_timeout_secs = value;
238        self
239    }
240
241    pub fn with_max_lifetime_secs(mut self, value: u64) -> Self {
242        self.max_lifetime_secs = value;
243        self
244    }
245
246    pub fn with_ssl_mode(mut self, value: impl Into<String>) -> Self {
247        self.ssl_mode = Some(value.into());
248        self
249    }
250
251    pub fn with_ssl_root_cert(mut self, value: impl Into<String>) -> Self {
252        self.ssl_root_cert = Some(value.into());
253        self
254    }
255
256    pub fn with_ssl_cert(mut self, value: impl Into<String>) -> Self {
257        self.ssl_cert = Some(value.into());
258        self
259    }
260
261    pub fn with_ssl_key(mut self, value: impl Into<String>) -> Self {
262        self.ssl_key = Some(value.into());
263        self
264    }
265
266    pub fn with_retry(mut self, value: NetworkRetryPolicy) -> Self {
267        self.retry = value;
268        self
269    }
270}
271
272/// Configuration for SQL component endpoints.
273///
274/// URI format: `sql:<query>?db_url=<url>&param1=val1&param2=val2`
275///
276/// The query can be inline SQL or a file reference with `file:` prefix:
277/// - `sql:SELECT * FROM users?db_url=...` - inline SQL
278/// - `sql:file:/path/to/query.sql?db_url=...` - read SQL from file
279///
280/// **Note on file-based queries (SQL-014):** When the query path starts with `file:`,
281/// the file is NOT read synchronously during `from_uri()`. Instead, the file path is
282/// stored in `source_path` and the query is resolved asynchronously via `resolve_file_query()`
283/// during async initialization (producer pool init or consumer start). This avoids
284/// blocking I/O in the synchronous URI parsing path.
285///
286/// **Security note:** `Debug` implementation redacts the `db_url` (which may contain credentials)
287/// and `ssl_key` path. Use `redact_db_url()` for safe logging of database URLs.
288#[derive(Clone)]
289pub struct SqlEndpointConfig {
290    // Connection
291    /// Database connection URL (optional when datasource_name is set).
292    pub db_url: String,
293    /// Named datasource reference (from CamelConfig.datasources).
294    pub datasource_name: Option<String>,
295    /// Maximum connections in the pool. None = use global default.
296    pub max_connections: Option<u32>,
297    /// Minimum connections in the pool. None = use global default.
298    pub min_connections: Option<u32>,
299    /// Idle timeout in seconds. None = use global default.
300    pub idle_timeout_secs: Option<u64>,
301    /// Maximum connection lifetime in seconds. None = use global default.
302    pub max_lifetime_secs: Option<u64>,
303
304    // Query
305    /// The SQL query (from URI path or file).
306    pub query: String,
307    /// Path to the file containing the SQL query (when using `file:` prefix).
308    pub source_path: Option<String>,
309    /// Output type for query results. Default: SelectList.
310    pub output_type: SqlOutputType,
311    /// Placeholder character for parameters. Default: '#'.
312    pub placeholder: char,
313    /// If true, process parameter placeholders in queries. Default: true.
314    pub use_placeholder: bool,
315    /// If true, don't execute the query (dry run). Default: false.
316    pub noop: bool,
317    /// Separator for IN clause expansion. Default: ", ".
318    pub in_separator: String,
319
320    // SQL-005: always populate statement even if body is null/empty
321    /// If true, always bind parameters even if the exchange body is null/empty
322    /// (uses empty defaults). Default: false.
323    pub always_populate_statement: bool,
324
325    // SQL-011: allow named parameters
326    /// If true, recognize `:name` style placeholders and map them from exchange
327    /// headers or body fields. Default: true.
328    pub allow_named_parameters: bool,
329
330    // SQL-016: fetch size hint
331    /// Fetch size hint for query results. None = driver default.
332    pub fetch_size: Option<u32>,
333
334    // SQL-002: transaction mode
335    /// Transaction mode for SQL operations. Default: Auto.
336    pub transaction_mode: TransactionMode,
337
338    // Consumer (polling)
339    /// Delay between polls in milliseconds. Default: 500.
340    pub delay_ms: u64,
341    /// Initial delay before first poll in milliseconds. Default: 1000.
342    pub initial_delay_ms: u64,
343    /// Maximum messages per poll.
344    pub max_messages_per_poll: Option<i32>,
345    /// SQL to execute after consuming each message.
346    pub on_consume: Option<String>,
347    /// SQL to execute if consumption fails.
348    pub on_consume_failed: Option<String>,
349    /// SQL to execute after consuming a batch.
350    pub on_consume_batch_complete: Option<String>,
351    /// Route empty result sets. Default: false.
352    pub route_empty_result_set: bool,
353    /// Use iterator for results. Default: true.
354    pub use_iterator: bool,
355    /// Expected number of rows affected.
356    pub expected_update_count: Option<i64>,
357    /// Break batch on consume failure. Default: false.
358    pub break_batch_on_consume_fail: bool,
359    /// Bridge poll errors into route error handling. Default: false.
360    pub bridge_error_handler: bool,
361
362    // SQL-015: repeat count for consumer polling
363    /// When set, the consumer only polls up to `repeat_count` times before stopping.
364    /// None = poll indefinitely (default).
365    pub repeat_count: Option<u32>,
366
367    // SQL-017: processing strategy
368    /// Processing strategy for consumer. Default: Direct.
369    pub processing_strategy: ProcessingStrategy,
370
371    // SQL-018: poll strategy
372    /// Poll strategy for consumer. Default: Sequential.
373    pub poll_strategy: PollStrategy,
374
375    // Producer
376    /// Enable batch mode. Default: false.
377    pub batch: bool,
378    /// Use message body for SQL. Default: false.
379    pub use_message_body_for_sql: bool,
380
381    // SSL/TLS
382    /// SSL mode for the connection. None = use global default.
383    pub ssl_mode: Option<String>,
384    /// Path to SSL root certificate. None = use global default.
385    pub ssl_root_cert: Option<String>,
386    /// Path to SSL client certificate. None = use global default.
387    pub ssl_cert: Option<String>,
388    /// Path to SSL client key. None = use global default.
389    pub ssl_key: Option<String>,
390
391    /// Retry policy for transient database connection failures.
392    pub retry: NetworkRetryPolicy,
393
394    /// Whether `retry` was explicitly set via URI params. Used by
395    /// [`apply_defaults`] to decide whether URI values win over
396    /// the global config. Internal tracking flag, not serialized.
397    retry_set_from_uri: bool,
398}
399
400impl std::fmt::Debug for SqlEndpointConfig {
401    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
402        f.debug_struct("SqlEndpointConfig")
403            .field("db_url", &redact_db_url(&self.db_url))
404            .field("datasource_name", &self.datasource_name)
405            .field("max_connections", &self.max_connections)
406            .field("min_connections", &self.min_connections)
407            .field("idle_timeout_secs", &self.idle_timeout_secs)
408            .field("max_lifetime_secs", &self.max_lifetime_secs)
409            .field("query", &self.query)
410            .field("source_path", &self.source_path)
411            .field("output_type", &self.output_type)
412            .field("placeholder", &self.placeholder)
413            .field("use_placeholder", &self.use_placeholder)
414            .field("noop", &self.noop)
415            .field("in_separator", &self.in_separator)
416            .field("always_populate_statement", &self.always_populate_statement)
417            .field("allow_named_parameters", &self.allow_named_parameters)
418            .field("fetch_size", &self.fetch_size)
419            .field("transaction_mode", &self.transaction_mode)
420            .field("delay_ms", &self.delay_ms)
421            .field("initial_delay_ms", &self.initial_delay_ms)
422            .field("max_messages_per_poll", &self.max_messages_per_poll)
423            .field("on_consume", &self.on_consume)
424            .field("on_consume_failed", &self.on_consume_failed)
425            .field("on_consume_batch_complete", &self.on_consume_batch_complete)
426            .field("route_empty_result_set", &self.route_empty_result_set)
427            .field("use_iterator", &self.use_iterator)
428            .field("expected_update_count", &self.expected_update_count)
429            .field(
430                "break_batch_on_consume_fail",
431                &self.break_batch_on_consume_fail,
432            )
433            .field("bridge_error_handler", &self.bridge_error_handler)
434            .field("repeat_count", &self.repeat_count)
435            .field("processing_strategy", &self.processing_strategy)
436            .field("poll_strategy", &self.poll_strategy)
437            .field("batch", &self.batch)
438            .field("use_message_body_for_sql", &self.use_message_body_for_sql)
439            .field("ssl_mode", &self.ssl_mode)
440            .field("ssl_root_cert", &self.ssl_root_cert)
441            .field("ssl_cert", &self.ssl_cert)
442            .field("ssl_key", &redacted_opt(&self.ssl_key))
443            .field("retry", &self.retry)
444            .finish()
445    }
446}
447
448impl SqlEndpointConfig {
449    /// Apply defaults from global config, filling None fields without overriding.
450    pub fn apply_defaults(&mut self, defaults: &SqlGlobalConfig) {
451        if self.max_connections.is_none() {
452            self.max_connections = Some(defaults.max_connections);
453        }
454        if self.min_connections.is_none() {
455            self.min_connections = Some(defaults.min_connections);
456        }
457        if self.idle_timeout_secs.is_none() {
458            self.idle_timeout_secs = Some(defaults.idle_timeout_secs);
459        }
460        if self.max_lifetime_secs.is_none() {
461            self.max_lifetime_secs = Some(defaults.max_lifetime_secs);
462        }
463        if self.ssl_mode.is_none() {
464            self.ssl_mode = defaults.ssl_mode.clone();
465        }
466        if self.ssl_root_cert.is_none() {
467            self.ssl_root_cert = defaults.ssl_root_cert.clone();
468        }
469        if self.ssl_cert.is_none() {
470            self.ssl_cert = defaults.ssl_cert.clone();
471        }
472        if self.ssl_key.is_none() {
473            self.ssl_key = defaults.ssl_key.clone();
474        }
475        // retry: URI wins when set_from_uri, else global fills the gap
476        if !self.retry_set_from_uri {
477            self.retry = defaults.retry.clone();
478        }
479    }
480
481    /// Resolve any remaining None fields with built-in defaults.
482    pub fn resolve_defaults(&mut self) {
483        let defaults = SqlGlobalConfig::default();
484        self.apply_defaults(&defaults);
485    }
486
487    /// Asynchronously read the SQL query from the file referenced by `source_path`.
488    ///
489    /// This is the async replacement for the blocking `std::fs::read_to_string` that
490    /// was previously called in `from_uri()`. Must be invoked during async init
491    /// (producer pool init or consumer start) — never in a synchronous context.
492    ///
493    /// After this call, `self.query` contains the file content (trimmed) and
494    /// `self.source_path` is cleared to prevent re-reading.
495    pub async fn resolve_file_query(&mut self) -> Result<(), CamelError> {
496        if let Some(file_path) = self.source_path.take() {
497            let contents = tokio::fs::read_to_string(&file_path).await.map_err(|e| {
498                CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
499            })?;
500            self.query = contents.trim().to_string();
501            // Keep source_path as Some so tests can still verify the original path
502            self.source_path = Some(file_path);
503        }
504        Ok(())
505    }
506}
507
508struct SslParamMapping {
509    pg_key: &'static str,
510    mysql_key: &'static str,
511}
512
513const SSL_MAPPINGS: &[(&str, SslParamMapping)] = &[
514    (
515        "sslMode",
516        SslParamMapping {
517            pg_key: "sslmode",
518            mysql_key: "ssl-mode",
519        },
520    ),
521    (
522        "sslRootCert",
523        SslParamMapping {
524            pg_key: "sslrootcert",
525            mysql_key: "ssl-ca",
526        },
527    ),
528    (
529        "sslCert",
530        SslParamMapping {
531            pg_key: "sslcert",
532            mysql_key: "ssl-cert",
533        },
534    ),
535    (
536        "sslKey",
537        SslParamMapping {
538            pg_key: "sslkey",
539            mysql_key: "ssl-key",
540        },
541    ),
542];
543
544pub fn enrich_db_url_with_ssl(
545    db_url: &str,
546    config: &SqlEndpointConfig,
547) -> Result<String, CamelError> {
548    enrich_db_url_with_ssl_params(
549        db_url,
550        config.ssl_mode.as_deref(),
551        config.ssl_root_cert.as_deref(),
552        config.ssl_cert.as_deref(),
553        config.ssl_key.as_deref(),
554    )
555}
556
557pub(crate) fn enrich_db_url_with_ssl_params(
558    db_url: &str,
559    ssl_mode: Option<&str>,
560    ssl_root_cert: Option<&str>,
561    ssl_cert: Option<&str>,
562    ssl_key: Option<&str>,
563) -> Result<String, CamelError> {
564    let ssl_params: Vec<(&str, &str)> = [
565        ssl_mode.map(|v| ("sslMode", v)),
566        ssl_root_cert.map(|v| ("sslRootCert", v)),
567        ssl_cert.map(|v| ("sslCert", v)),
568        ssl_key.map(|v| ("sslKey", v)),
569    ]
570    .into_iter()
571    .flatten()
572    .collect();
573
574    if ssl_params.is_empty() {
575        return Ok(db_url.to_string());
576    }
577
578    let mut parsed = url::Url::parse(db_url).map_err(|e| {
579        CamelError::InvalidUri(format!(
580            "Cannot parse database URL for SSL enrichment: {}",
581            e
582        ))
583    })?;
584
585    let scheme = parsed.scheme();
586    if scheme.starts_with("sqlite") {
587        warn!(
588            "SSL options configured for SQLite database URL, but SQLite does not support SSL/TLS; ignoring sslMode/sslRootCert/sslCert/sslKey"
589        );
590        return Ok(db_url.to_string());
591    }
592
593    if scheme != "postgres" && scheme != "postgresql" && scheme != "mysql" {
594        return Ok(db_url.to_string());
595    }
596    let is_mysql = scheme == "mysql";
597
598    let mut query_pairs = parsed.query_pairs().collect::<Vec<_>>();
599    for (camel_name, value) in &ssl_params {
600        if let Some((_, mapping)) = SSL_MAPPINGS.iter().find(|(name, _)| *name == *camel_name) {
601            let driver_key = if is_mysql {
602                mapping.mysql_key
603            } else {
604                mapping.pg_key
605            };
606
607            if let Some(pos) = query_pairs.iter().position(|(k, _)| k == driver_key) {
608                query_pairs[pos].1 = (*value).into();
609            } else {
610                query_pairs.push((driver_key.into(), (*value).into()));
611            }
612        }
613    }
614
615    {
616        let mut serializer = url::form_urlencoded::Serializer::new(String::new());
617        for (k, v) in query_pairs {
618            serializer.append_pair(&k, &v);
619        }
620        parsed.set_query(Some(&serializer.finish()));
621    }
622
623    Ok(parsed.to_string())
624}
625
626impl UriConfig for SqlEndpointConfig {
627    fn scheme() -> &'static str {
628        "sql"
629    }
630
631    fn from_uri(uri: &str) -> Result<Self, CamelError> {
632        let parts = parse_uri(uri)?;
633        Self::from_components(parts)
634    }
635
636    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
637        // Validate scheme
638        if parts.scheme != Self::scheme() {
639            return Err(CamelError::InvalidUri(format!(
640                "expected scheme '{}' but got '{}'",
641                Self::scheme(),
642                parts.scheme
643            )));
644        }
645
646        let params = &parts.params;
647
648        // Handle file: prefix for query
649        // SQL-014: defer file reading to async init path to avoid blocking I/O
650        // in the synchronous URI parsing path. Store the path; resolve_file_query()
651        // must be called during async initialization (producer pool init or consumer start).
652        let (query, source_path) = if parts.path.starts_with("file:") {
653            let file_path = parts.path.trim_start_matches("file:").to_string();
654            (String::new(), Some(file_path))
655        } else {
656            (parts.path.clone(), None)
657        };
658
659        // Optional parameter: db_url (required when datasource is not set)
660        let db_url = params.get("db_url").cloned().unwrap_or_default();
661
662        // Named datasource reference (from CamelConfig.datasources)
663        let datasource_name = params.get("datasource").cloned();
664
665        // Connection parameters - None when not set by URI param
666        let max_connections = params.get("maxConnections").and_then(|v| v.parse().ok());
667        let min_connections = params.get("minConnections").and_then(|v| v.parse().ok());
668        let idle_timeout_secs = params.get("idleTimeoutSecs").and_then(|v| v.parse().ok());
669        let max_lifetime_secs = params.get("maxLifetimeSecs").and_then(|v| v.parse().ok());
670
671        // Query parameters
672        let output_type = params
673            .get("outputType")
674            .map(|s| s.parse())
675            .transpose()?
676            .unwrap_or_default();
677        let placeholder = params
678            .get("placeholder")
679            .filter(|v| !v.is_empty())
680            .map(|v| {
681                if v.chars().count() != 1 {
682                    return Err(CamelError::InvalidUri(format!(
683                        "placeholder must be exactly one character, got '{}'",
684                        v
685                    )));
686                }
687                if !v.is_ascii() {
688                    return Err(CamelError::InvalidUri(
689                        "placeholder must be a single ASCII character".to_string(),
690                    ));
691                }
692                Ok(v.chars().next().unwrap()) // allow-unwrap
693            })
694            .transpose()?
695            .unwrap_or('#');
696        /// Parse a boolean URI parameter strictly.
697        ///
698        /// Accepts only `"true"` or `"false"` (case-insensitive). Any other value
699        /// returns `CamelError::InvalidUri` to prevent silent misconfiguration.
700        fn parse_bool_param(name: &str, value: &str) -> Result<bool, CamelError> {
701            if value.eq_ignore_ascii_case("true") {
702                Ok(true)
703            } else if value.eq_ignore_ascii_case("false") {
704                Ok(false)
705            } else {
706                Err(CamelError::InvalidUri(format!(
707                    "{} must be 'true' or 'false', got '{}'",
708                    name, value
709                )))
710            }
711        }
712
713        let use_placeholder = params
714            .get("usePlaceholder")
715            .map(|v| parse_bool_param("usePlaceholder", v))
716            .transpose()?
717            .unwrap_or(true);
718        let noop = params
719            .get("noop")
720            .map(|v| parse_bool_param("noop", v))
721            .transpose()?
722            .unwrap_or(false);
723        let in_separator = params
724            .get("inSeparator")
725            .map(|v| v.to_string())
726            .unwrap_or_else(|| ", ".to_string());
727        if in_separator.is_empty() {
728            return Err(CamelError::InvalidUri(
729                "inSeparator must not be empty".to_string(),
730            ));
731        }
732
733        // SQL-005: alwaysPopulateStatement
734        let always_populate_statement = params
735            .get("alwaysPopulateStatement")
736            .map(|v| parse_bool_param("alwaysPopulateStatement", v))
737            .transpose()?
738            .unwrap_or(false);
739
740        // SQL-011: allowNamedParameters
741        let allow_named_parameters = params
742            .get("allowNamedParameters")
743            .map(|v| parse_bool_param("allowNamedParameters", v))
744            .transpose()?
745            .unwrap_or(true);
746
747        // SQL-016: fetchSize
748        let fetch_size = params.get("fetchSize").and_then(|v| v.parse().ok());
749
750        // SQL-002: transactionMode
751        let transaction_mode = params
752            .get("transactionMode")
753            .map(|s| s.parse())
754            .transpose()?
755            .unwrap_or_default();
756
757        // Consumer parameters
758        let delay_ms = params
759            .get("delay")
760            .and_then(|v| v.parse().ok())
761            .unwrap_or(500);
762        let initial_delay_ms = params
763            .get("initialDelay")
764            .and_then(|v| v.parse().ok())
765            .unwrap_or(1000);
766        let max_messages_per_poll = params
767            .get("maxMessagesPerPoll")
768            .and_then(|v| v.parse().ok());
769        let on_consume = params.get("onConsume").cloned();
770        let on_consume_failed = params.get("onConsumeFailed").cloned();
771        let on_consume_batch_complete = params.get("onConsumeBatchComplete").cloned();
772        let route_empty_result_set = params
773            .get("routeEmptyResultSet")
774            .map(|v| parse_bool_param("routeEmptyResultSet", v))
775            .transpose()?
776            .unwrap_or(false);
777        let use_iterator = params
778            .get("useIterator")
779            .map(|v| parse_bool_param("useIterator", v))
780            .transpose()?
781            .unwrap_or(true);
782        let expected_update_count = params
783            .get("expectedUpdateCount")
784            .and_then(|v| v.parse().ok());
785        let break_batch_on_consume_fail = params
786            .get("breakBatchOnConsumeFail")
787            .map(|v| parse_bool_param("breakBatchOnConsumeFail", v))
788            .transpose()?
789            .unwrap_or(false);
790        let bridge_error_handler = params
791            .get("bridgeErrorHandler")
792            .map(|v| parse_bool_param("bridgeErrorHandler", v))
793            .transpose()?
794            .unwrap_or(false);
795
796        // SQL-015: repeatCount
797        let repeat_count = params.get("repeatCount").and_then(|v| v.parse().ok());
798
799        // SQL-017: processingStrategy
800        let processing_strategy = params
801            .get("processingStrategy")
802            .map(|s| s.parse())
803            .transpose()?
804            .unwrap_or_default();
805
806        // SQL-018: pollStrategy
807        let poll_strategy = params
808            .get("pollStrategy")
809            .map(|s| s.parse())
810            .transpose()?
811            .unwrap_or_default();
812
813        // Producer parameters
814        let batch = params
815            .get("batch")
816            .map(|v| parse_bool_param("batch", v))
817            .transpose()?
818            .unwrap_or(false);
819        let use_message_body_for_sql = params
820            .get("useMessageBodyForSql")
821            .map(|v| parse_bool_param("useMessageBodyForSql", v))
822            .transpose()?
823            .unwrap_or(false);
824        let ssl_mode = params.get("sslMode").cloned();
825        let ssl_root_cert = params.get("sslRootCert").cloned();
826        let ssl_cert = params.get("sslCert").cloned();
827        let ssl_key = params.get("sslKey").cloned();
828
829        // Parse retry policy from URI params
830        let mut retry = NetworkRetryPolicy::default();
831        let mut retry_set_from_uri = false;
832        if let Some(raw) = params.get("retryEnabled") {
833            retry.enabled = raw.parse::<bool>().map_err(|_| {
834                CamelError::InvalidUri(format!("retryEnabled must be a boolean, got '{raw}'"))
835            })?;
836            retry_set_from_uri = true;
837        }
838        if let Some(raw) = params.get("retryMaxAttempts") {
839            retry.max_attempts = raw.parse::<u32>().map_err(|_| {
840                CamelError::InvalidUri(format!("retryMaxAttempts must be a u32, got '{raw}'"))
841            })?;
842            retry_set_from_uri = true;
843        }
844        if let Some(raw) = params.get("retryInitialDelayMs") {
845            retry.initial_delay = Duration::from_millis(raw.parse::<u64>().map_err(|_| {
846                CamelError::InvalidUri(format!("retryInitialDelayMs must be a u64, got '{raw}'"))
847            })?);
848            retry_set_from_uri = true;
849        }
850        if let Some(raw) = params.get("retryMultiplier") {
851            retry.multiplier = raw.parse::<f64>().map_err(|_| {
852                CamelError::InvalidUri(format!("retryMultiplier must be a f64, got '{raw}'"))
853            })?;
854            retry_set_from_uri = true;
855        }
856        if let Some(raw) = params.get("retryMaxDelayMs") {
857            retry.max_delay = Duration::from_millis(raw.parse::<u64>().map_err(|_| {
858                CamelError::InvalidUri(format!("retryMaxDelayMs must be a u64, got '{raw}'"))
859            })?);
860            retry_set_from_uri = true;
861        }
862        if let Some(raw) = params.get("retryJitter") {
863            retry.jitter_factor = raw.parse::<f64>().map_err(|_| {
864                CamelError::InvalidUri(format!("retryJitter must be a f64, got '{raw}'"))
865            })?;
866            retry_set_from_uri = true;
867        }
868
869        if datasource_name.is_none() && db_url.is_empty() {
870            return Err(CamelError::Config(
871                "either 'datasource' or 'db_url' parameter is required".to_string(),
872            ));
873        }
874
875        if datasource_name.is_some() && !db_url.is_empty() {
876            return Err(CamelError::InvalidUri(
877                "'db_url' not allowed with named datasource — use 'datasource' alone".to_string(),
878            ));
879        }
880
881        if datasource_name.is_some() {
882            let overrides: Vec<&str> = {
883                let mut v = Vec::new();
884                if max_connections.is_some() {
885                    v.push("maxConnections");
886                }
887                if min_connections.is_some() {
888                    v.push("minConnections");
889                }
890                if idle_timeout_secs.is_some() {
891                    v.push("idleTimeoutSecs");
892                }
893                if max_lifetime_secs.is_some() {
894                    v.push("maxLifetimeSecs");
895                }
896                if ssl_mode.is_some() {
897                    v.push("sslMode");
898                }
899                if ssl_root_cert.is_some() {
900                    v.push("sslRootCert");
901                }
902                if ssl_cert.is_some() {
903                    v.push("sslCert");
904                }
905                if ssl_key.is_some() {
906                    v.push("sslKey");
907                }
908                v
909            };
910            if !overrides.is_empty() {
911                return Err(CamelError::InvalidUri(format!(
912                    "pool-affecting params not allowed with named datasource: {}",
913                    overrides.join(", ")
914                )));
915            }
916        }
917
918        Ok(Self {
919            db_url,
920            datasource_name,
921            max_connections,
922            min_connections,
923            idle_timeout_secs,
924            max_lifetime_secs,
925            query,
926            source_path,
927            output_type,
928            placeholder,
929            use_placeholder,
930            noop,
931            in_separator,
932            always_populate_statement,
933            allow_named_parameters,
934            fetch_size,
935            transaction_mode,
936            delay_ms,
937            initial_delay_ms,
938            max_messages_per_poll,
939            on_consume,
940            on_consume_failed,
941            on_consume_batch_complete,
942            route_empty_result_set,
943            use_iterator,
944            expected_update_count,
945            break_batch_on_consume_fail,
946            bridge_error_handler,
947            repeat_count,
948            processing_strategy,
949            poll_strategy,
950            batch,
951            use_message_body_for_sql,
952            ssl_mode,
953            ssl_root_cert,
954            ssl_cert,
955            ssl_key,
956            retry,
957            retry_set_from_uri,
958        })
959    }
960}
961
962#[cfg(test)]
963mod tests {
964    use super::*;
965    use camel_component_api::NetworkRetryPolicy;
966
967    #[test]
968    fn config_defaults() {
969        let mut c =
970            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
971        c.resolve_defaults();
972        assert_eq!(c.query, "select 1");
973        assert_eq!(c.db_url, "postgres://localhost/test");
974        assert_eq!(c.max_connections, Some(5));
975        assert_eq!(c.min_connections, Some(1));
976        assert_eq!(c.idle_timeout_secs, Some(300));
977        assert_eq!(c.max_lifetime_secs, Some(1800));
978        assert_eq!(c.output_type, SqlOutputType::SelectList);
979        assert_eq!(c.placeholder, '#');
980        assert!(!c.noop);
981        assert_eq!(c.in_separator, ", ");
982        assert_eq!(c.delay_ms, 500);
983        assert_eq!(c.initial_delay_ms, 1000);
984        assert!(c.max_messages_per_poll.is_none());
985        assert!(c.on_consume.is_none());
986        assert!(c.on_consume_failed.is_none());
987        assert!(c.on_consume_batch_complete.is_none());
988        assert!(!c.route_empty_result_set);
989        assert!(c.use_iterator);
990        assert!(c.expected_update_count.is_none());
991        assert!(!c.break_batch_on_consume_fail);
992        assert!(!c.batch);
993        assert!(!c.use_message_body_for_sql);
994        assert!(c.ssl_mode.is_none());
995        assert!(c.ssl_root_cert.is_none());
996        assert!(c.ssl_cert.is_none());
997        assert!(c.ssl_key.is_none());
998        // SQL-005/SQL-011/SQL-016/SQL-002/SQL-015/SQL-017/SQL-018 defaults
999        assert!(!c.always_populate_statement);
1000        assert!(c.allow_named_parameters);
1001        assert!(c.fetch_size.is_none());
1002        assert_eq!(c.transaction_mode, TransactionMode::Auto);
1003        assert!(c.repeat_count.is_none());
1004        assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
1005        assert_eq!(c.poll_strategy, PollStrategy::Sequential);
1006    }
1007
1008    #[test]
1009    fn ssl_none_by_default() {
1010        let c =
1011            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1012        assert!(c.ssl_mode.is_none());
1013        assert!(c.ssl_root_cert.is_none());
1014        assert!(c.ssl_cert.is_none());
1015        assert!(c.ssl_key.is_none());
1016    }
1017
1018    #[test]
1019    fn ssl_mode_from_uri() {
1020        let c = SqlEndpointConfig::from_uri(
1021            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1022        )
1023        .unwrap();
1024        assert_eq!(c.ssl_mode, Some("require".to_string()));
1025        assert!(c.ssl_root_cert.is_none());
1026    }
1027
1028    #[test]
1029    fn ssl_all_params_from_uri() {
1030        let c = SqlEndpointConfig::from_uri(
1031            "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
1032        )
1033        .unwrap();
1034        assert_eq!(c.ssl_mode, Some("require".to_string()));
1035        assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
1036        assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
1037        assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
1038    }
1039
1040    #[test]
1041    fn ssl_global_applied_to_endpoint() {
1042        let mut c =
1043            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1044        let global = SqlGlobalConfig::default()
1045            .with_ssl_mode("require")
1046            .with_ssl_root_cert("/etc/ssl/ca.pem");
1047        c.apply_defaults(&global);
1048        assert_eq!(c.ssl_mode, Some("require".to_string()));
1049        assert_eq!(c.ssl_root_cert, Some("/etc/ssl/ca.pem".to_string()));
1050        assert!(c.ssl_cert.is_none());
1051        assert!(c.ssl_key.is_none());
1052    }
1053
1054    #[test]
1055    fn ssl_uri_overrides_global() {
1056        let mut c = SqlEndpointConfig::from_uri(
1057            "sql:select 1?db_url=postgres://localhost/test&sslMode=verify-full",
1058        )
1059        .unwrap();
1060        let global = SqlGlobalConfig::default().with_ssl_mode("require");
1061        c.apply_defaults(&global);
1062        assert_eq!(c.ssl_mode, Some("verify-full".to_string()));
1063    }
1064
1065    #[test]
1066    fn config_wrong_scheme() {
1067        assert!(SqlEndpointConfig::from_uri("redis://localhost:6379").is_err());
1068    }
1069
1070    #[test]
1071    fn config_missing_db_url() {
1072        assert!(SqlEndpointConfig::from_uri("sql:select 1").is_err());
1073    }
1074
1075    #[test]
1076    fn config_output_type_select_one() {
1077        let c = SqlEndpointConfig::from_uri(
1078            "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
1079        )
1080        .unwrap();
1081        assert_eq!(c.output_type, SqlOutputType::SelectOne);
1082    }
1083
1084    #[test]
1085    fn config_output_type_stream_list() {
1086        let c = SqlEndpointConfig::from_uri(
1087            "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
1088        )
1089        .unwrap();
1090        assert_eq!(c.output_type, SqlOutputType::StreamList);
1091    }
1092
1093    #[test]
1094    fn in_separator_default() {
1095        let c =
1096            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1097        assert_eq!(c.in_separator, ", ");
1098    }
1099
1100    #[test]
1101    fn in_separator_from_uri() {
1102        let c = SqlEndpointConfig::from_uri(
1103            "sql:select 1?db_url=postgres://localhost/test&inSeparator=;",
1104        )
1105        .unwrap();
1106        assert_eq!(c.in_separator, ";");
1107    }
1108
1109    #[test]
1110    fn in_separator_empty_rejected() {
1111        let result = SqlEndpointConfig::from_uri(
1112            "sql:select 1?db_url=postgres://localhost/test&inSeparator=",
1113        );
1114        assert!(result.is_err());
1115        let msg = format!("{:?}", result.unwrap_err());
1116        assert!(msg.contains("inSeparator") || msg.contains("empty"));
1117    }
1118
1119    #[test]
1120    fn config_consumer_options() {
1121        let c = SqlEndpointConfig::from_uri(
1122            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
1123        ).unwrap();
1124        assert_eq!(c.delay_ms, 2000);
1125        assert_eq!(c.initial_delay_ms, 500);
1126        assert_eq!(c.max_messages_per_poll, Some(10));
1127        assert_eq!(
1128            c.on_consume,
1129            Some("update t set done=true where id=:#id".to_string())
1130        );
1131        assert_eq!(
1132            c.on_consume_failed,
1133            Some("update t set failed=true where id=:#id".to_string())
1134        );
1135        assert_eq!(
1136            c.on_consume_batch_complete,
1137            Some("delete from t where done=true".to_string())
1138        );
1139        assert!(c.route_empty_result_set);
1140        assert!(!c.use_iterator);
1141        assert_eq!(c.expected_update_count, Some(1));
1142        assert!(c.break_batch_on_consume_fail);
1143        assert!(!c.bridge_error_handler);
1144    }
1145
1146    #[test]
1147    fn config_producer_options() {
1148        let c = SqlEndpointConfig::from_uri(
1149            "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
1150        ).unwrap();
1151        assert!(c.batch);
1152        assert!(c.use_message_body_for_sql);
1153        assert!(c.noop);
1154    }
1155
1156    #[test]
1157    fn config_pool_options() {
1158        let c = SqlEndpointConfig::from_uri(
1159            "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
1160        ).unwrap();
1161        assert_eq!(c.max_connections, Some(20));
1162        assert_eq!(c.min_connections, Some(3));
1163        assert_eq!(c.idle_timeout_secs, Some(600));
1164        assert_eq!(c.max_lifetime_secs, Some(3600));
1165    }
1166
1167    #[test]
1168    fn config_query_with_special_chars() {
1169        let c = SqlEndpointConfig::from_uri(
1170            "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
1171        )
1172        .unwrap();
1173        assert_eq!(
1174            c.query,
1175            "select * from users where name = :#name and age > #"
1176        );
1177    }
1178
1179    #[test]
1180    fn output_type_from_str() {
1181        assert_eq!(
1182            "SelectList".parse::<SqlOutputType>().unwrap(),
1183            SqlOutputType::SelectList
1184        );
1185        assert_eq!(
1186            "SelectOne".parse::<SqlOutputType>().unwrap(),
1187            SqlOutputType::SelectOne
1188        );
1189        assert_eq!(
1190            "StreamList".parse::<SqlOutputType>().unwrap(),
1191            SqlOutputType::StreamList
1192        );
1193        assert!("Invalid".parse::<SqlOutputType>().is_err());
1194    }
1195
1196    // SQL-014: file-not-found is now detected during async resolve_file_query(), not from_uri
1197    #[tokio::test]
1198    async fn config_file_not_found() {
1199        let mut config = SqlEndpointConfig::from_uri(
1200            "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
1201        )
1202        .expect("from_uri should defer file reading");
1203        // from_uri no longer reads the file — source_path is set, query is empty
1204        assert_eq!(
1205            config.source_path,
1206            Some("/nonexistent/path/query.sql".to_string())
1207        );
1208        assert!(config.query.is_empty());
1209
1210        // Error occurs during async resolution
1211        let result = config.resolve_file_query().await;
1212        assert!(result.is_err());
1213        let msg = format!("{:?}", result.unwrap_err());
1214        assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
1215    }
1216
1217    // SQL-014: file query is now resolved asynchronously
1218    #[tokio::test]
1219    async fn config_file_query() {
1220        use std::io::Write;
1221        let unique_name = format!(
1222            "test_sql_query_{}.sql",
1223            std::time::SystemTime::now()
1224                .duration_since(std::time::UNIX_EPOCH)
1225                .unwrap_or_default()
1226                .as_nanos()
1227        );
1228        let mut tmp = std::env::temp_dir();
1229        tmp.push(unique_name);
1230        {
1231            let mut f = std::fs::File::create(&tmp).unwrap();
1232            writeln!(f, "SELECT * FROM users").unwrap();
1233        }
1234        let uri = format!(
1235            "sql:file:{}?db_url=postgres://localhost/test",
1236            tmp.display()
1237        );
1238        let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1239        // query is empty until async resolution
1240        assert!(c.query.is_empty());
1241        assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1242
1243        // Resolve asynchronously
1244        c.resolve_file_query()
1245            .await
1246            .expect("file query should resolve");
1247        assert_eq!(c.query, "SELECT * FROM users");
1248        std::fs::remove_file(&tmp).ok();
1249    }
1250
1251    // New tests for config contract
1252    #[test]
1253    fn pool_fields_none_when_not_set() {
1254        let c =
1255            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1256        assert_eq!(c.max_connections, None);
1257        assert_eq!(c.min_connections, None);
1258        assert_eq!(c.idle_timeout_secs, None);
1259        assert_eq!(c.max_lifetime_secs, None);
1260    }
1261
1262    #[test]
1263    fn apply_defaults_fills_none() {
1264        let mut c =
1265            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1266        let global = SqlGlobalConfig {
1267            max_connections: 10,
1268            min_connections: 2,
1269            idle_timeout_secs: 600,
1270            max_lifetime_secs: 3600,
1271            ssl_mode: None,
1272            ssl_root_cert: None,
1273            ssl_cert: None,
1274            ssl_key: None,
1275            retry: NetworkRetryPolicy::default(),
1276        };
1277        c.apply_defaults(&global);
1278        assert_eq!(c.max_connections, Some(10));
1279        assert_eq!(c.min_connections, Some(2));
1280        assert_eq!(c.idle_timeout_secs, Some(600));
1281        assert_eq!(c.max_lifetime_secs, Some(3600));
1282        assert!(c.ssl_mode.is_none());
1283        assert!(c.ssl_root_cert.is_none());
1284        assert!(c.ssl_cert.is_none());
1285        assert!(c.ssl_key.is_none());
1286    }
1287
1288    #[test]
1289    fn apply_defaults_does_not_override() {
1290        let mut c = SqlEndpointConfig::from_uri(
1291            "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=5",
1292        )
1293        .unwrap();
1294        let global = SqlGlobalConfig {
1295            max_connections: 10,
1296            min_connections: 2,
1297            idle_timeout_secs: 600,
1298            max_lifetime_secs: 3600,
1299            ssl_mode: None,
1300            ssl_root_cert: None,
1301            ssl_cert: None,
1302            ssl_key: None,
1303            retry: NetworkRetryPolicy::default(),
1304        };
1305        c.apply_defaults(&global);
1306        // URI-set values should NOT be overridden
1307        assert_eq!(c.max_connections, Some(99));
1308        assert_eq!(c.min_connections, Some(5));
1309        // None fields should be filled from global
1310        assert_eq!(c.idle_timeout_secs, Some(600));
1311        assert_eq!(c.max_lifetime_secs, Some(3600));
1312    }
1313
1314    #[test]
1315    fn resolve_defaults_fills_remaining() {
1316        let mut c = SqlEndpointConfig::from_uri(
1317            "sql:select 1?db_url=postgres://localhost/test&maxConnections=7",
1318        )
1319        .unwrap();
1320        c.resolve_defaults();
1321        assert_eq!(c.max_connections, Some(7)); // from URI
1322        assert_eq!(c.min_connections, Some(1)); // from defaults
1323        assert_eq!(c.idle_timeout_secs, Some(300)); // from defaults
1324        assert_eq!(c.max_lifetime_secs, Some(1800)); // from defaults
1325    }
1326
1327    #[test]
1328    fn global_config_builder() {
1329        let c = SqlGlobalConfig::default()
1330            .with_max_connections(20)
1331            .with_min_connections(3)
1332            .with_idle_timeout_secs(600)
1333            .with_max_lifetime_secs(3600)
1334            .with_ssl_mode("require")
1335            .with_ssl_root_cert("/ca.pem")
1336            .with_ssl_cert("/cert.pem")
1337            .with_ssl_key("/key.pem");
1338        assert_eq!(c.max_connections, 20);
1339        assert_eq!(c.min_connections, 3);
1340        assert_eq!(c.idle_timeout_secs, 600);
1341        assert_eq!(c.max_lifetime_secs, 3600);
1342        assert_eq!(c.ssl_mode, Some("require".to_string()));
1343        assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
1344        assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
1345        assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
1346    }
1347
1348    #[test]
1349    fn enrich_postgres_ssl_mode() {
1350        let mut c = SqlEndpointConfig::from_uri(
1351            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1352        )
1353        .unwrap();
1354        c.resolve_defaults();
1355        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1356        assert!(url.contains("sslmode=require"), "got: {}", url);
1357    }
1358
1359    #[test]
1360    fn enrich_postgres_all_ssl() {
1361        let mut c = SqlEndpointConfig::from_uri(
1362            "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
1363        )
1364        .unwrap();
1365        c.resolve_defaults();
1366        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1367        assert!(url.contains("sslmode=require"), "got: {}", url);
1368        assert!(url.contains("sslrootcert="), "got: {}", url);
1369        assert!(url.contains("sslcert="), "got: {}", url);
1370        assert!(url.contains("sslkey="), "got: {}", url);
1371    }
1372
1373    #[test]
1374    fn enrich_mysql_ssl() {
1375        let mut c = SqlEndpointConfig::from_uri(
1376            "sql:select 1?db_url=mysql://localhost/test&sslMode=require",
1377        )
1378        .unwrap();
1379        c.resolve_defaults();
1380        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1381        assert!(url.contains("ssl-mode=require"), "got: {}", url);
1382    }
1383
1384    #[test]
1385    fn enrich_existing_query_params() {
1386        let mut c = SqlEndpointConfig::from_uri(
1387            "sql:select 1?db_url=postgres://localhost/test?existing=1&sslMode=require",
1388        )
1389        .unwrap();
1390        c.resolve_defaults();
1391        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1392        assert!(url.contains("existing=1"), "got: {}", url);
1393        assert!(url.contains("sslmode=require"), "got: {}", url);
1394    }
1395
1396    #[test]
1397    fn enrich_override_existing() {
1398        let mut c = SqlEndpointConfig::from_uri(
1399            "sql:select 1?db_url=postgres://localhost/test?sslmode=allow&sslMode=require",
1400        )
1401        .unwrap();
1402        c.resolve_defaults();
1403        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1404        assert!(url.contains("sslmode=require"), "got: {}", url);
1405        assert!(!url.contains("sslmode=allow"), "got: {}", url);
1406    }
1407
1408    #[test]
1409    fn enrich_no_params() {
1410        let mut c =
1411            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1412        c.resolve_defaults();
1413        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1414        assert_eq!(url, "postgres://localhost/test");
1415    }
1416
1417    #[test]
1418    fn enrich_url_encodes_paths() {
1419        let mut c = SqlEndpointConfig::from_uri(
1420            "sql:select 1?db_url=postgres://localhost/test&sslRootCert=/path/to/my%20cert.pem",
1421        )
1422        .unwrap();
1423        c.resolve_defaults();
1424        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1425        assert!(url.contains("sslrootcert="), "got: {}", url);
1426    }
1427
1428    #[test]
1429    fn enrich_unsupported_scheme_returns_unchanged() {
1430        let mut c = SqlEndpointConfig::from_uri(
1431            "sql:select 1?db_url=sqlite://localhost/test.db&sslMode=require",
1432        )
1433        .unwrap();
1434        c.resolve_defaults();
1435        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1436        assert_eq!(url, "sqlite://localhost/test.db");
1437    }
1438
1439    #[test]
1440    fn enrich_invalid_url_returns_error() {
1441        let mut c = SqlEndpointConfig::from_uri(
1442            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1443        )
1444        .unwrap();
1445        c.resolve_defaults();
1446        let result = enrich_db_url_with_ssl("://not-a-valid-url", &c);
1447        assert!(result.is_err());
1448    }
1449
1450    // --- Phase B hardening tests ---
1451
1452    // SQL-010: Debug output redacts credentials
1453    #[test]
1454    fn debug_redacts_db_url_with_password() {
1455        let c = SqlEndpointConfig::from_uri(
1456            "sql:select 1?db_url=postgres://user:secret123@localhost/test",
1457        )
1458        .unwrap();
1459        let debug_output = format!("{:?}", c);
1460        assert!(
1461            !debug_output.contains("secret123"),
1462            "Debug output must not contain password: {}",
1463            debug_output
1464        );
1465        assert!(
1466            debug_output.contains("***"),
1467            "Debug output must contain redacted marker: {}",
1468            debug_output
1469        );
1470    }
1471
1472    #[test]
1473    fn debug_redacts_ssl_key() {
1474        let c = SqlEndpointConfig::from_uri(
1475            "sql:select 1?db_url=postgres://localhost/test&sslKey=/secret/key.pem",
1476        )
1477        .unwrap();
1478        let debug_output = format!("{:?}", c);
1479        assert!(
1480            !debug_output.contains("/secret/key.pem"),
1481            "Debug output must not contain ssl_key path: {}",
1482            debug_output
1483        );
1484    }
1485
1486    #[test]
1487    fn debug_global_config_redacts_ssl_key() {
1488        let c = SqlGlobalConfig::default().with_ssl_key("/secret/key.pem");
1489        let debug_output = format!("{:?}", c);
1490        assert!(
1491            !debug_output.contains("/secret/key.pem"),
1492            "Debug output must not contain ssl_key path: {}",
1493            debug_output
1494        );
1495        assert!(
1496            debug_output.contains("***"),
1497            "Debug output must contain redacted marker: {}",
1498            debug_output
1499        );
1500    }
1501
1502    #[test]
1503    fn redact_db_url_with_credentials() {
1504        assert_eq!(
1505            redact_db_url("postgres://user:pass@host/db"),
1506            "postgres://***:***@host/db"
1507        );
1508    }
1509
1510    #[test]
1511    fn redact_db_url_without_credentials() {
1512        assert_eq!(redact_db_url("sqlite::memory:"), "sqlite::memory:");
1513    }
1514
1515    #[test]
1516    fn redact_db_url_invalid_returns_original() {
1517        assert_eq!(redact_db_url("not-a-url"), "not-a-url");
1518    }
1519
1520    // SQL-004: usePlaceholder parsing
1521    #[test]
1522    fn use_placeholder_defaults_to_true() {
1523        let c =
1524            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1525        assert!(c.use_placeholder);
1526    }
1527
1528    #[test]
1529    fn use_placeholder_false_from_uri() {
1530        let c = SqlEndpointConfig::from_uri(
1531            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=false",
1532        )
1533        .unwrap();
1534        assert!(!c.use_placeholder);
1535    }
1536
1537    #[test]
1538    fn use_placeholder_true_from_uri() {
1539        let c = SqlEndpointConfig::from_uri(
1540            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=true",
1541        )
1542        .unwrap();
1543        assert!(c.use_placeholder);
1544    }
1545
1546    // SQL-004: strict boolean parsing — invalid values rejected
1547    #[test]
1548    fn use_placeholder_rejects_invalid_value() {
1549        let result = SqlEndpointConfig::from_uri(
1550            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=1",
1551        );
1552        assert!(result.is_err());
1553        let msg = format!("{:?}", result.unwrap_err());
1554        assert!(msg.contains("usePlaceholder") && msg.contains("true") && msg.contains("false"));
1555    }
1556
1557    #[test]
1558    fn use_placeholder_rejects_typo_tru() {
1559        let result = SqlEndpointConfig::from_uri(
1560            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=tru",
1561        );
1562        assert!(result.is_err());
1563    }
1564
1565    #[test]
1566    fn use_placeholder_rejects_yes() {
1567        let result = SqlEndpointConfig::from_uri(
1568            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=yes",
1569        );
1570        assert!(result.is_err());
1571    }
1572
1573    #[test]
1574    fn noop_rejects_invalid_value() {
1575        let result =
1576            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&noop=1");
1577        assert!(result.is_err());
1578        let msg = format!("{:?}", result.unwrap_err());
1579        assert!(msg.contains("noop"));
1580    }
1581
1582    #[test]
1583    fn batch_rejects_invalid_value() {
1584        let result =
1585            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&batch=yes");
1586        assert!(result.is_err());
1587        let msg = format!("{:?}", result.unwrap_err());
1588        assert!(msg.contains("batch"));
1589    }
1590
1591    #[test]
1592    fn route_empty_result_set_rejects_invalid_value() {
1593        let result = SqlEndpointConfig::from_uri(
1594            "sql:select 1?db_url=postgres://localhost/test&routeEmptyResultSet=on",
1595        );
1596        assert!(result.is_err());
1597    }
1598
1599    #[test]
1600    fn use_iterator_rejects_invalid_value() {
1601        let result = SqlEndpointConfig::from_uri(
1602            "sql:select 1?db_url=postgres://localhost/test&useIterator=1",
1603        );
1604        assert!(result.is_err());
1605    }
1606
1607    #[test]
1608    fn break_batch_on_consume_fail_rejects_invalid_value() {
1609        let result = SqlEndpointConfig::from_uri(
1610            "sql:select 1?db_url=postgres://localhost/test&breakBatchOnConsumeFail=yes",
1611        );
1612        assert!(result.is_err());
1613    }
1614
1615    #[test]
1616    fn use_message_body_for_sql_rejects_invalid_value() {
1617        let result = SqlEndpointConfig::from_uri(
1618            "sql:select 1?db_url=postgres://localhost/test&useMessageBodyForSql=1",
1619        );
1620        assert!(result.is_err());
1621    }
1622
1623    // Case-insensitive true/false still works
1624    #[test]
1625    fn boolean_params_case_insensitive() {
1626        let c = SqlEndpointConfig::from_uri(
1627            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=TRUE&noop=FALSE&batch=True&useIterator=False&bridgeErrorHandler=TRUE",
1628        )
1629        .unwrap();
1630        assert!(c.use_placeholder);
1631        assert!(!c.noop);
1632        assert!(c.batch);
1633        assert!(!c.use_iterator);
1634        assert!(c.bridge_error_handler);
1635    }
1636
1637    // SQL-022: multi-char placeholder rejected
1638    #[test]
1639    fn multi_char_placeholder_rejected() {
1640        let result = SqlEndpointConfig::from_uri(
1641            "sql:select 1?db_url=postgres://localhost/test&placeholder=##",
1642        );
1643        assert!(result.is_err());
1644        let msg = format!("{:?}", result.unwrap_err());
1645        assert!(msg.contains("placeholder") && msg.contains("one character"));
1646    }
1647
1648    #[test]
1649    fn non_ascii_placeholder_rejected() {
1650        let result = SqlEndpointConfig::from_uri(
1651            "sql:select 1?db_url=postgres://localhost/test&placeholder=%C2%A2",
1652        );
1653        assert!(result.is_err());
1654    }
1655
1656    #[test]
1657    fn single_char_placeholder_accepted() {
1658        let c = SqlEndpointConfig::from_uri(
1659            "sql:select 1?db_url=postgres://localhost/test&placeholder=$",
1660        )
1661        .unwrap();
1662        assert_eq!(c.placeholder, '$');
1663    }
1664
1665    #[test]
1666    fn empty_placeholder_falls_back_to_default() {
1667        // Empty string is filtered out by the original logic — falls back to '#'
1668        let c = SqlEndpointConfig::from_uri(
1669            "sql:select 1?db_url=postgres://localhost/test&placeholder=",
1670        )
1671        .unwrap();
1672        assert_eq!(c.placeholder, '#');
1673    }
1674
1675    // SQL-014: file-based SQL config test (verifies async resolution and caching)
1676    #[tokio::test]
1677    async fn file_query_cached_in_config() {
1678        use std::io::Write;
1679        let unique_name = format!(
1680            "test_sql_cached_{}.sql",
1681            std::time::SystemTime::now()
1682                .duration_since(std::time::UNIX_EPOCH)
1683                .unwrap_or_default()
1684                .as_nanos()
1685        );
1686        let mut tmp = std::env::temp_dir();
1687        tmp.push(unique_name);
1688        {
1689            let mut f = std::fs::File::create(&tmp).unwrap();
1690            writeln!(f, "SELECT * FROM cached_test").unwrap();
1691        }
1692        let uri = format!(
1693            "sql:file:{}?db_url=postgres://localhost/test",
1694            tmp.display()
1695        );
1696        let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1697        // Query is empty before async resolution
1698        assert!(c.query.is_empty());
1699        assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1700
1701        // Resolve asynchronously — query is cached in config
1702        c.resolve_file_query()
1703            .await
1704            .expect("resolve should succeed");
1705        assert_eq!(c.query, "SELECT * FROM cached_test");
1706
1707        // Delete the file — config still has the query
1708        std::fs::remove_file(&tmp).ok();
1709        assert_eq!(c.query, "SELECT * FROM cached_test");
1710    }
1711
1712    // --- H-03 audit sweep tests ---
1713
1714    // SQL-005: alwaysPopulateStatement
1715    #[test]
1716    fn always_populate_statement_defaults_to_false() {
1717        let c =
1718            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1719        assert!(!c.always_populate_statement);
1720    }
1721
1722    #[test]
1723    fn always_populate_statement_from_uri() {
1724        let c = SqlEndpointConfig::from_uri(
1725            "sql:select 1?db_url=postgres://localhost/test&alwaysPopulateStatement=true",
1726        )
1727        .unwrap();
1728        assert!(c.always_populate_statement);
1729    }
1730
1731    // SQL-011: allowNamedParameters
1732    #[test]
1733    fn allow_named_parameters_defaults_to_true() {
1734        let c =
1735            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1736        assert!(c.allow_named_parameters);
1737    }
1738
1739    #[test]
1740    fn allow_named_parameters_false_from_uri() {
1741        let c = SqlEndpointConfig::from_uri(
1742            "sql:select 1?db_url=postgres://localhost/test&allowNamedParameters=false",
1743        )
1744        .unwrap();
1745        assert!(!c.allow_named_parameters);
1746    }
1747
1748    // SQL-016: fetchSize
1749    #[test]
1750    fn fetch_size_defaults_to_none() {
1751        let c =
1752            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1753        assert!(c.fetch_size.is_none());
1754    }
1755
1756    #[test]
1757    fn fetch_size_from_uri() {
1758        let c = SqlEndpointConfig::from_uri(
1759            "sql:select 1?db_url=postgres://localhost/test&fetchSize=1000",
1760        )
1761        .unwrap();
1762        assert_eq!(c.fetch_size, Some(1000));
1763    }
1764
1765    // SQL-002: transactionMode
1766    #[test]
1767    fn transaction_mode_defaults_to_auto() {
1768        let c =
1769            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1770        assert_eq!(c.transaction_mode, TransactionMode::Auto);
1771    }
1772
1773    #[test]
1774    fn transaction_mode_managed_from_uri() {
1775        let c = SqlEndpointConfig::from_uri(
1776            "sql:select 1?db_url=postgres://localhost/test&transactionMode=Managed",
1777        )
1778        .unwrap();
1779        assert_eq!(c.transaction_mode, TransactionMode::Managed);
1780    }
1781
1782    #[test]
1783    fn transaction_mode_invalid_rejected() {
1784        let result = SqlEndpointConfig::from_uri(
1785            "sql:select 1?db_url=postgres://localhost/test&transactionMode=Invalid",
1786        );
1787        assert!(result.is_err());
1788    }
1789
1790    // SQL-015: repeatCount
1791    #[test]
1792    fn repeat_count_defaults_to_none() {
1793        let c =
1794            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1795        assert!(c.repeat_count.is_none());
1796    }
1797
1798    #[test]
1799    fn repeat_count_from_uri() {
1800        let c = SqlEndpointConfig::from_uri(
1801            "sql:select 1?db_url=postgres://localhost/test&repeatCount=10",
1802        )
1803        .unwrap();
1804        assert_eq!(c.repeat_count, Some(10));
1805    }
1806
1807    // SQL-017: processingStrategy
1808    #[test]
1809    fn processing_strategy_defaults_to_direct() {
1810        let c =
1811            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1812        assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
1813    }
1814
1815    #[test]
1816    fn processing_strategy_scheduled_from_uri() {
1817        let c = SqlEndpointConfig::from_uri(
1818            "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Scheduled",
1819        )
1820        .unwrap();
1821        assert_eq!(c.processing_strategy, ProcessingStrategy::Scheduled);
1822    }
1823
1824    #[test]
1825    fn processing_strategy_invalid_rejected() {
1826        let result = SqlEndpointConfig::from_uri(
1827            "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Invalid",
1828        );
1829        assert!(result.is_err());
1830    }
1831
1832    // SQL-018: pollStrategy
1833    #[test]
1834    fn poll_strategy_defaults_to_sequential() {
1835        let c =
1836            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1837        assert_eq!(c.poll_strategy, PollStrategy::Sequential);
1838    }
1839
1840    #[test]
1841    fn poll_strategy_burst_from_uri() {
1842        let c = SqlEndpointConfig::from_uri(
1843            "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Burst",
1844        )
1845        .unwrap();
1846        assert_eq!(c.poll_strategy, PollStrategy::Burst);
1847    }
1848
1849    #[test]
1850    fn poll_strategy_invalid_rejected() {
1851        let result = SqlEndpointConfig::from_uri(
1852            "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Invalid",
1853        );
1854        assert!(result.is_err());
1855    }
1856
1857    // ── RetryPolicy (rc-ddl) ──────────────────────────────────────────────
1858
1859    #[test]
1860    fn sql_endpoint_config_has_retry_policy() {
1861        let cfg = SqlEndpointConfig::from_uri(
1862            "sql:select 1?db_url=sqlite::memory:&retryMaxAttempts=3&retryInitialDelayMs=500",
1863        )
1864        .expect("parse");
1865        assert_eq!(cfg.retry.max_attempts, 3);
1866        assert_eq!(
1867            cfg.retry.initial_delay,
1868            std::time::Duration::from_millis(500)
1869        );
1870        assert!(cfg.retry.enabled);
1871    }
1872
1873    #[test]
1874    fn sql_endpoint_config_retry_defaults_when_unspecified() {
1875        let cfg =
1876            SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").expect("parse");
1877        // When URI has no retry params, retry defaults to NetworkRetryPolicy::default()
1878        assert!(cfg.retry.enabled);
1879        assert_eq!(cfg.retry.max_attempts, 10); // default
1880    }
1881
1882    #[test]
1883    fn sql_global_config_has_retry_default() {
1884        let cfg = SqlGlobalConfig::default();
1885        assert!(cfg.retry.enabled);
1886    }
1887
1888    #[test]
1889    fn retry_policy_parse_full_uri_params() {
1890        let cfg = SqlEndpointConfig::from_uri(
1891            "sql:select 1?db_url=sqlite::memory:&retryEnabled=false&retryMaxAttempts=7&retryInitialDelayMs=1000&retryMultiplier=3.0&retryMaxDelayMs=60000&retryJitter=0.5",
1892        )
1893        .expect("parse");
1894        assert!(!cfg.retry.enabled);
1895        assert_eq!(cfg.retry.max_attempts, 7);
1896        assert_eq!(
1897            cfg.retry.initial_delay,
1898            std::time::Duration::from_millis(1000)
1899        );
1900        assert!((cfg.retry.multiplier - 3.0).abs() < f64::EPSILON);
1901        assert_eq!(cfg.retry.max_delay, std::time::Duration::from_millis(60000));
1902        assert!((cfg.retry.jitter_factor - 0.5).abs() < f64::EPSILON);
1903    }
1904
1905    #[test]
1906    fn retry_policy_from_uri_survives_apply_defaults_with_global() {
1907        let mut ep = SqlEndpointConfig::from_uri(
1908            "sql:select 1?db_url=sqlite::memory:&retryMaxAttempts=10&retryInitialDelayMs=500",
1909        )
1910        .expect("parse");
1911        let global = SqlGlobalConfig::default(); // global has default retry (max_attempts=10)
1912        ep.apply_defaults(&global);
1913        // URI values survive when retry_set_from_uri is true
1914        assert_eq!(ep.retry.max_attempts, 10);
1915        assert_eq!(
1916            ep.retry.initial_delay,
1917            std::time::Duration::from_millis(500)
1918        );
1919    }
1920
1921    #[test]
1922    fn retry_policy_falls_back_to_global_when_uri_has_no_retry_params() {
1923        let mut ep =
1924            SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").expect("parse");
1925        let mut global = SqlGlobalConfig::default();
1926        global.retry.max_attempts = 7;
1927        ep.apply_defaults(&global);
1928        // When URI has no retry params, global fills the gap
1929        assert_eq!(ep.retry.max_attempts, 7);
1930    }
1931
1932    #[test]
1933    fn from_uri_with_datasource_name() {
1934        let cfg = SqlEndpointConfig::from_uri("sql:SELECT 1?datasource=orders").unwrap();
1935        assert_eq!(cfg.datasource_name.as_deref(), Some("orders"));
1936        assert!(cfg.db_url.is_empty());
1937    }
1938
1939    #[test]
1940    fn from_uri_with_datasource_and_behavior_override() {
1941        let cfg =
1942            SqlEndpointConfig::from_uri("sql:SELECT 1?datasource=orders&outputType=SelectOne")
1943                .unwrap();
1944        assert_eq!(cfg.datasource_name.as_deref(), Some("orders"));
1945    }
1946
1947    #[test]
1948    fn from_uri_datasource_rejects_pool_override() {
1949        let result =
1950            SqlEndpointConfig::from_uri("sql:SELECT 1?datasource=orders&maxConnections=50");
1951        assert!(result.is_err());
1952        let msg = result.unwrap_err().to_string();
1953        assert!(msg.contains("pool-affecting"));
1954    }
1955
1956    #[test]
1957    fn from_uri_neither_datasource_nor_db_url_is_error() {
1958        let result = SqlEndpointConfig::from_uri("sql:SELECT 1");
1959        assert!(result.is_err());
1960    }
1961
1962    #[test]
1963    fn from_uri_db_url_inline_still_works() {
1964        let cfg =
1965            SqlEndpointConfig::from_uri("sql:SELECT 1?db_url=postgres://localhost/test").unwrap();
1966        assert!(cfg.datasource_name.is_none());
1967        assert_eq!(cfg.db_url, "postgres://localhost/test");
1968    }
1969
1970    #[test]
1971    fn from_uri_datasource_rejects_ssl_mode() {
1972        let result = SqlEndpointConfig::from_uri("sql:SELECT 1?datasource=orders&sslMode=require");
1973        assert!(result.is_err());
1974        let msg = result.unwrap_err().to_string();
1975        assert!(msg.contains("pool-affecting"));
1976    }
1977
1978    #[test]
1979    fn from_uri_datasource_rejects_ssl_root_cert() {
1980        let result =
1981            SqlEndpointConfig::from_uri("sql:SELECT 1?datasource=orders&sslRootCert=/ca.pem");
1982        assert!(result.is_err());
1983    }
1984
1985    #[test]
1986    fn from_uri_datasource_rejects_db_url() {
1987        let result = SqlEndpointConfig::from_uri(
1988            "sql:SELECT 1?datasource=orders&db_url=postgres://evil:5432/pwned",
1989        );
1990        assert!(result.is_err());
1991        let msg = result.unwrap_err().to_string();
1992        assert!(msg.contains("db_url") && msg.contains("datasource"));
1993    }
1994}