Skip to main content

camel_component_sql/
config.rs

1use std::str::FromStr;
2use std::time::Duration;
3
4use camel_component_api::CamelError;
5use camel_component_api::NetworkRetryPolicy;
6use camel_component_api::{UriComponents, UriConfig, parse_uri};
7use tracing::warn;
8
9/// Redaction helper: returns `Some("***")` if the option is `Some`, otherwise `None`.
10fn redacted_opt(opt: &Option<String>) -> Option<&'static str> {
11    if opt.is_some() { Some("***") } else { None }
12}
13
14/// Redacts the user:password portion of a database URL for safe display.
15/// Returns `"scheme://***@host/db"` for URLs with userinfo, or the original URL otherwise.
16pub fn redact_db_url(db_url: &str) -> String {
17    match url::Url::parse(db_url) {
18        Ok(mut parsed) => {
19            if parsed.username().is_empty() && parsed.password().is_none() {
20                return db_url.to_string();
21            }
22            let _ = parsed.set_username("***");
23            let _ = parsed.set_password(Some("***"));
24            parsed.to_string()
25        }
26        Err(_) => db_url.to_string(),
27    }
28}
29
30/// Output type for SQL query results.
31#[derive(Debug, Clone, PartialEq, Default)]
32pub enum SqlOutputType {
33    /// Return all rows as a list.
34    #[default]
35    SelectList,
36    /// Return a single row (first result).
37    SelectOne,
38    /// Stream results as an async iterator.
39    StreamList,
40}
41
42impl FromStr for SqlOutputType {
43    type Err = CamelError;
44
45    fn from_str(s: &str) -> Result<Self, Self::Err> {
46        match s {
47            "SelectList" => Ok(SqlOutputType::SelectList),
48            "SelectOne" => Ok(SqlOutputType::SelectOne),
49            "StreamList" => Ok(SqlOutputType::StreamList),
50            _ => Err(CamelError::InvalidUri(format!(
51                "Unknown output type: {}",
52                s
53            ))),
54        }
55    }
56}
57
58/// Transaction mode for SQL operations.
59///
60/// - `Auto`: Each statement auto-commits (default, current behavior).
61/// - `Managed`: Explicit transaction boundaries (future; currently logs a warning
62///   and falls back to Auto).
63///
64// TODO(SQL-002): managed transaction mode — implement explicit transaction boundaries
65#[derive(Debug, Clone, PartialEq, Default)]
66pub enum TransactionMode {
67    /// Auto-commit each statement (default).
68    #[default]
69    Auto,
70    /// Managed transactions — not yet implemented.
71    Managed,
72}
73
74impl FromStr for TransactionMode {
75    type Err = CamelError;
76
77    fn from_str(s: &str) -> Result<Self, Self::Err> {
78        match s {
79            "Auto" => Ok(TransactionMode::Auto),
80            "Managed" => Ok(TransactionMode::Managed),
81            _ => Err(CamelError::InvalidUri(format!(
82                "Unknown transaction mode: {}. Expected 'Auto' or 'Managed'",
83                s
84            ))),
85        }
86    }
87}
88
89impl std::fmt::Display for TransactionMode {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            TransactionMode::Auto => write!(f, "Auto"),
93            TransactionMode::Managed => write!(f, "Managed"),
94        }
95    }
96}
97
98/// Processing strategy for SQL consumers.
99#[derive(Debug, Clone, PartialEq, Default)]
100pub enum ProcessingStrategy {
101    /// Process rows directly in the polling task (default).
102    #[default]
103    Direct,
104    /// Schedule processing via a separate task (deferred execution).
105    Scheduled,
106}
107
108impl FromStr for ProcessingStrategy {
109    type Err = CamelError;
110
111    fn from_str(s: &str) -> Result<Self, Self::Err> {
112        match s {
113            "Direct" => Ok(ProcessingStrategy::Direct),
114            "Scheduled" => Ok(ProcessingStrategy::Scheduled),
115            _ => Err(CamelError::InvalidUri(format!(
116                "Unknown processing strategy: {}. Expected 'Direct' or 'Scheduled'",
117                s
118            ))),
119        }
120    }
121}
122
123impl std::fmt::Display for ProcessingStrategy {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        match self {
126            ProcessingStrategy::Direct => write!(f, "Direct"),
127            ProcessingStrategy::Scheduled => write!(f, "Scheduled"),
128        }
129    }
130}
131
132/// Poll strategy for SQL consumers.
133#[derive(Debug, Clone, PartialEq, Default)]
134pub enum PollStrategy {
135    /// Poll sequentially with delay between polls (default).
136    #[default]
137    Sequential,
138    /// Poll in bursts — execute multiple queries in rapid succession.
139    Burst,
140}
141
142impl FromStr for PollStrategy {
143    type Err = CamelError;
144
145    fn from_str(s: &str) -> Result<Self, Self::Err> {
146        match s {
147            "Sequential" => Ok(PollStrategy::Sequential),
148            "Burst" => Ok(PollStrategy::Burst),
149            _ => Err(CamelError::InvalidUri(format!(
150                "Unknown poll strategy: {}. Expected 'Sequential' or 'Burst'",
151                s
152            ))),
153        }
154    }
155}
156
157impl std::fmt::Display for PollStrategy {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        match self {
160            PollStrategy::Sequential => write!(f, "Sequential"),
161            PollStrategy::Burst => write!(f, "Burst"),
162        }
163    }
164}
165
166/// Global configuration for SQL component.
167///
168/// This struct supports serde deserialization with defaults and builder methods.
169/// It holds pool configuration that can be applied as defaults to endpoints.
170///
171/// **Security note:** `Debug` implementation redacts sensitive fields (SSL key paths).
172#[derive(Clone, PartialEq, serde::Deserialize)]
173#[serde(default)]
174pub struct SqlGlobalConfig {
175    pub max_connections: u32,
176    pub min_connections: u32,
177    pub idle_timeout_secs: u64,
178    pub max_lifetime_secs: u64,
179    // SSL/TLS
180    pub ssl_mode: Option<String>,
181    pub ssl_root_cert: Option<String>,
182    pub ssl_cert: Option<String>,
183    pub ssl_key: Option<String>,
184    /// Retry policy for transient database connection failures.
185    #[serde(default)]
186    pub retry: NetworkRetryPolicy,
187}
188
189impl std::fmt::Debug for SqlGlobalConfig {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        f.debug_struct("SqlGlobalConfig")
192            .field("max_connections", &self.max_connections)
193            .field("min_connections", &self.min_connections)
194            .field("idle_timeout_secs", &self.idle_timeout_secs)
195            .field("max_lifetime_secs", &self.max_lifetime_secs)
196            .field("ssl_mode", &self.ssl_mode)
197            .field("ssl_root_cert", &self.ssl_root_cert)
198            .field("ssl_cert", &self.ssl_cert)
199            .field("ssl_key", &redacted_opt(&self.ssl_key))
200            .field("retry", &self.retry)
201            .finish()
202    }
203}
204
205impl Default for SqlGlobalConfig {
206    fn default() -> Self {
207        Self {
208            max_connections: 5,
209            min_connections: 1,
210            idle_timeout_secs: 300,
211            max_lifetime_secs: 1800,
212            ssl_mode: None,
213            ssl_root_cert: None,
214            ssl_cert: None,
215            ssl_key: None,
216            retry: NetworkRetryPolicy::default(),
217        }
218    }
219}
220
221impl SqlGlobalConfig {
222    pub fn new() -> Self {
223        Self::default()
224    }
225
226    pub fn with_max_connections(mut self, value: u32) -> Self {
227        self.max_connections = value;
228        self
229    }
230
231    pub fn with_min_connections(mut self, value: u32) -> Self {
232        self.min_connections = value;
233        self
234    }
235
236    pub fn with_idle_timeout_secs(mut self, value: u64) -> Self {
237        self.idle_timeout_secs = value;
238        self
239    }
240
241    pub fn with_max_lifetime_secs(mut self, value: u64) -> Self {
242        self.max_lifetime_secs = value;
243        self
244    }
245
246    pub fn with_ssl_mode(mut self, value: impl Into<String>) -> Self {
247        self.ssl_mode = Some(value.into());
248        self
249    }
250
251    pub fn with_ssl_root_cert(mut self, value: impl Into<String>) -> Self {
252        self.ssl_root_cert = Some(value.into());
253        self
254    }
255
256    pub fn with_ssl_cert(mut self, value: impl Into<String>) -> Self {
257        self.ssl_cert = Some(value.into());
258        self
259    }
260
261    pub fn with_ssl_key(mut self, value: impl Into<String>) -> Self {
262        self.ssl_key = Some(value.into());
263        self
264    }
265
266    pub fn with_retry(mut self, value: NetworkRetryPolicy) -> Self {
267        self.retry = value;
268        self
269    }
270}
271
272/// Configuration for SQL component endpoints.
273///
274/// URI format: `sql:<query>?db_url=<url>&param1=val1&param2=val2`
275///
276/// The query can be inline SQL or a file reference with `file:` prefix:
277/// - `sql:SELECT * FROM users?db_url=...` - inline SQL
278/// - `sql:file:/path/to/query.sql?db_url=...` - read SQL from file
279///
280/// **Note on file-based queries (SQL-014):** When the query path starts with `file:`,
281/// the file is NOT read synchronously during `from_uri()`. Instead, the file path is
282/// stored in `source_path` and the query is resolved asynchronously via `resolve_file_query()`
283/// during async initialization (producer pool init or consumer start). This avoids
284/// blocking I/O in the synchronous URI parsing path.
285///
286/// **Security note:** `Debug` implementation redacts the `db_url` (which may contain credentials)
287/// and `ssl_key` path. Use `redact_db_url()` for safe logging of database URLs.
288#[derive(Clone)]
289pub struct SqlEndpointConfig {
290    // Connection
291    /// Database connection URL (required).
292    pub db_url: String,
293    /// Maximum connections in the pool. None = use global default.
294    pub max_connections: Option<u32>,
295    /// Minimum connections in the pool. None = use global default.
296    pub min_connections: Option<u32>,
297    /// Idle timeout in seconds. None = use global default.
298    pub idle_timeout_secs: Option<u64>,
299    /// Maximum connection lifetime in seconds. None = use global default.
300    pub max_lifetime_secs: Option<u64>,
301
302    // Query
303    /// The SQL query (from URI path or file).
304    pub query: String,
305    /// Path to the file containing the SQL query (when using `file:` prefix).
306    pub source_path: Option<String>,
307    /// Output type for query results. Default: SelectList.
308    pub output_type: SqlOutputType,
309    /// Placeholder character for parameters. Default: '#'.
310    pub placeholder: char,
311    /// If true, process parameter placeholders in queries. Default: true.
312    pub use_placeholder: bool,
313    /// If true, don't execute the query (dry run). Default: false.
314    pub noop: bool,
315    /// Separator for IN clause expansion. Default: ", ".
316    pub in_separator: String,
317
318    // SQL-005: always populate statement even if body is null/empty
319    /// If true, always bind parameters even if the exchange body is null/empty
320    /// (uses empty defaults). Default: false.
321    pub always_populate_statement: bool,
322
323    // SQL-011: allow named parameters
324    /// If true, recognize `:name` style placeholders and map them from exchange
325    /// headers or body fields. Default: true.
326    pub allow_named_parameters: bool,
327
328    // SQL-016: fetch size hint
329    /// Fetch size hint for query results. None = driver default.
330    pub fetch_size: Option<u32>,
331
332    // SQL-002: transaction mode
333    /// Transaction mode for SQL operations. Default: Auto.
334    pub transaction_mode: TransactionMode,
335
336    // Consumer (polling)
337    /// Delay between polls in milliseconds. Default: 500.
338    pub delay_ms: u64,
339    /// Initial delay before first poll in milliseconds. Default: 1000.
340    pub initial_delay_ms: u64,
341    /// Maximum messages per poll.
342    pub max_messages_per_poll: Option<i32>,
343    /// SQL to execute after consuming each message.
344    pub on_consume: Option<String>,
345    /// SQL to execute if consumption fails.
346    pub on_consume_failed: Option<String>,
347    /// SQL to execute after consuming a batch.
348    pub on_consume_batch_complete: Option<String>,
349    /// Route empty result sets. Default: false.
350    pub route_empty_result_set: bool,
351    /// Use iterator for results. Default: true.
352    pub use_iterator: bool,
353    /// Expected number of rows affected.
354    pub expected_update_count: Option<i64>,
355    /// Break batch on consume failure. Default: false.
356    pub break_batch_on_consume_fail: bool,
357    /// Bridge poll errors into route error handling. Default: false.
358    pub bridge_error_handler: bool,
359
360    // SQL-015: repeat count for consumer polling
361    /// When set, the consumer only polls up to `repeat_count` times before stopping.
362    /// None = poll indefinitely (default).
363    pub repeat_count: Option<u32>,
364
365    // SQL-017: processing strategy
366    /// Processing strategy for consumer. Default: Direct.
367    pub processing_strategy: ProcessingStrategy,
368
369    // SQL-018: poll strategy
370    /// Poll strategy for consumer. Default: Sequential.
371    pub poll_strategy: PollStrategy,
372
373    // Producer
374    /// Enable batch mode. Default: false.
375    pub batch: bool,
376    /// Use message body for SQL. Default: false.
377    pub use_message_body_for_sql: bool,
378
379    // SSL/TLS
380    /// SSL mode for the connection. None = use global default.
381    pub ssl_mode: Option<String>,
382    /// Path to SSL root certificate. None = use global default.
383    pub ssl_root_cert: Option<String>,
384    /// Path to SSL client certificate. None = use global default.
385    pub ssl_cert: Option<String>,
386    /// Path to SSL client key. None = use global default.
387    pub ssl_key: Option<String>,
388
389    /// Retry policy for transient database connection failures.
390    pub retry: NetworkRetryPolicy,
391
392    /// Whether `retry` was explicitly set via URI params. Used by
393    /// [`apply_defaults`] to decide whether URI values win over
394    /// the global config. Internal tracking flag, not serialized.
395    retry_set_from_uri: bool,
396}
397
398impl std::fmt::Debug for SqlEndpointConfig {
399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400        f.debug_struct("SqlEndpointConfig")
401            .field("db_url", &redact_db_url(&self.db_url))
402            .field("max_connections", &self.max_connections)
403            .field("min_connections", &self.min_connections)
404            .field("idle_timeout_secs", &self.idle_timeout_secs)
405            .field("max_lifetime_secs", &self.max_lifetime_secs)
406            .field("query", &self.query)
407            .field("source_path", &self.source_path)
408            .field("output_type", &self.output_type)
409            .field("placeholder", &self.placeholder)
410            .field("use_placeholder", &self.use_placeholder)
411            .field("noop", &self.noop)
412            .field("in_separator", &self.in_separator)
413            .field("always_populate_statement", &self.always_populate_statement)
414            .field("allow_named_parameters", &self.allow_named_parameters)
415            .field("fetch_size", &self.fetch_size)
416            .field("transaction_mode", &self.transaction_mode)
417            .field("delay_ms", &self.delay_ms)
418            .field("initial_delay_ms", &self.initial_delay_ms)
419            .field("max_messages_per_poll", &self.max_messages_per_poll)
420            .field("on_consume", &self.on_consume)
421            .field("on_consume_failed", &self.on_consume_failed)
422            .field("on_consume_batch_complete", &self.on_consume_batch_complete)
423            .field("route_empty_result_set", &self.route_empty_result_set)
424            .field("use_iterator", &self.use_iterator)
425            .field("expected_update_count", &self.expected_update_count)
426            .field(
427                "break_batch_on_consume_fail",
428                &self.break_batch_on_consume_fail,
429            )
430            .field("bridge_error_handler", &self.bridge_error_handler)
431            .field("repeat_count", &self.repeat_count)
432            .field("processing_strategy", &self.processing_strategy)
433            .field("poll_strategy", &self.poll_strategy)
434            .field("batch", &self.batch)
435            .field("use_message_body_for_sql", &self.use_message_body_for_sql)
436            .field("ssl_mode", &self.ssl_mode)
437            .field("ssl_root_cert", &self.ssl_root_cert)
438            .field("ssl_cert", &self.ssl_cert)
439            .field("ssl_key", &redacted_opt(&self.ssl_key))
440            .field("retry", &self.retry)
441            .finish()
442    }
443}
444
445impl SqlEndpointConfig {
446    /// Apply defaults from global config, filling None fields without overriding.
447    pub fn apply_defaults(&mut self, defaults: &SqlGlobalConfig) {
448        if self.max_connections.is_none() {
449            self.max_connections = Some(defaults.max_connections);
450        }
451        if self.min_connections.is_none() {
452            self.min_connections = Some(defaults.min_connections);
453        }
454        if self.idle_timeout_secs.is_none() {
455            self.idle_timeout_secs = Some(defaults.idle_timeout_secs);
456        }
457        if self.max_lifetime_secs.is_none() {
458            self.max_lifetime_secs = Some(defaults.max_lifetime_secs);
459        }
460        if self.ssl_mode.is_none() {
461            self.ssl_mode = defaults.ssl_mode.clone();
462        }
463        if self.ssl_root_cert.is_none() {
464            self.ssl_root_cert = defaults.ssl_root_cert.clone();
465        }
466        if self.ssl_cert.is_none() {
467            self.ssl_cert = defaults.ssl_cert.clone();
468        }
469        if self.ssl_key.is_none() {
470            self.ssl_key = defaults.ssl_key.clone();
471        }
472        // retry: URI wins when set_from_uri, else global fills the gap
473        if !self.retry_set_from_uri {
474            self.retry = defaults.retry.clone();
475        }
476    }
477
478    /// Resolve any remaining None fields with built-in defaults.
479    pub fn resolve_defaults(&mut self) {
480        let defaults = SqlGlobalConfig::default();
481        self.apply_defaults(&defaults);
482    }
483
484    /// Asynchronously read the SQL query from the file referenced by `source_path`.
485    ///
486    /// This is the async replacement for the blocking `std::fs::read_to_string` that
487    /// was previously called in `from_uri()`. Must be invoked during async init
488    /// (producer pool init or consumer start) — never in a synchronous context.
489    ///
490    /// After this call, `self.query` contains the file content (trimmed) and
491    /// `self.source_path` is cleared to prevent re-reading.
492    pub async fn resolve_file_query(&mut self) -> Result<(), CamelError> {
493        if let Some(file_path) = self.source_path.take() {
494            let contents = tokio::fs::read_to_string(&file_path).await.map_err(|e| {
495                CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
496            })?;
497            self.query = contents.trim().to_string();
498            // Keep source_path as Some so tests can still verify the original path
499            self.source_path = Some(file_path);
500        }
501        Ok(())
502    }
503}
504
505struct SslParamMapping {
506    pg_key: &'static str,
507    mysql_key: &'static str,
508}
509
510const SSL_MAPPINGS: &[(&str, SslParamMapping)] = &[
511    (
512        "sslMode",
513        SslParamMapping {
514            pg_key: "sslmode",
515            mysql_key: "ssl-mode",
516        },
517    ),
518    (
519        "sslRootCert",
520        SslParamMapping {
521            pg_key: "sslrootcert",
522            mysql_key: "ssl-ca",
523        },
524    ),
525    (
526        "sslCert",
527        SslParamMapping {
528            pg_key: "sslcert",
529            mysql_key: "ssl-cert",
530        },
531    ),
532    (
533        "sslKey",
534        SslParamMapping {
535            pg_key: "sslkey",
536            mysql_key: "ssl-key",
537        },
538    ),
539];
540
541pub fn enrich_db_url_with_ssl(
542    db_url: &str,
543    config: &SqlEndpointConfig,
544) -> Result<String, CamelError> {
545    let ssl_params: Vec<(&str, &str)> = [
546        config.ssl_mode.as_deref().map(|v| ("sslMode", v)),
547        config.ssl_root_cert.as_deref().map(|v| ("sslRootCert", v)),
548        config.ssl_cert.as_deref().map(|v| ("sslCert", v)),
549        config.ssl_key.as_deref().map(|v| ("sslKey", v)),
550    ]
551    .into_iter()
552    .flatten()
553    .collect();
554
555    if ssl_params.is_empty() {
556        return Ok(db_url.to_string());
557    }
558
559    let mut parsed = url::Url::parse(db_url).map_err(|e| {
560        CamelError::InvalidUri(format!(
561            "Cannot parse database URL for SSL enrichment: {}",
562            e
563        ))
564    })?;
565
566    let scheme = parsed.scheme();
567    if scheme.starts_with("sqlite") {
568        warn!(
569            "SSL options configured for SQLite database URL, but SQLite does not support SSL/TLS; ignoring sslMode/sslRootCert/sslCert/sslKey"
570        );
571        return Ok(db_url.to_string());
572    }
573
574    if scheme != "postgres" && scheme != "postgresql" && scheme != "mysql" {
575        return Ok(db_url.to_string());
576    }
577    let is_mysql = scheme == "mysql";
578
579    let mut query_pairs = parsed.query_pairs().collect::<Vec<_>>();
580    for (camel_name, value) in &ssl_params {
581        if let Some((_, mapping)) = SSL_MAPPINGS.iter().find(|(name, _)| *name == *camel_name) {
582            let driver_key = if is_mysql {
583                mapping.mysql_key
584            } else {
585                mapping.pg_key
586            };
587
588            if let Some(pos) = query_pairs.iter().position(|(k, _)| k == driver_key) {
589                query_pairs[pos].1 = (*value).into();
590            } else {
591                query_pairs.push((driver_key.into(), (*value).into()));
592            }
593        }
594    }
595
596    {
597        let mut serializer = url::form_urlencoded::Serializer::new(String::new());
598        for (k, v) in query_pairs {
599            serializer.append_pair(&k, &v);
600        }
601        parsed.set_query(Some(&serializer.finish()));
602    }
603
604    Ok(parsed.to_string())
605}
606
607impl UriConfig for SqlEndpointConfig {
608    fn scheme() -> &'static str {
609        "sql"
610    }
611
612    fn from_uri(uri: &str) -> Result<Self, CamelError> {
613        let parts = parse_uri(uri)?;
614        Self::from_components(parts)
615    }
616
617    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
618        // Validate scheme
619        if parts.scheme != Self::scheme() {
620            return Err(CamelError::InvalidUri(format!(
621                "expected scheme '{}' but got '{}'",
622                Self::scheme(),
623                parts.scheme
624            )));
625        }
626
627        let params = &parts.params;
628
629        // Handle file: prefix for query
630        // SQL-014: defer file reading to async init path to avoid blocking I/O
631        // in the synchronous URI parsing path. Store the path; resolve_file_query()
632        // must be called during async initialization (producer pool init or consumer start).
633        let (query, source_path) = if parts.path.starts_with("file:") {
634            let file_path = parts.path.trim_start_matches("file:").to_string();
635            (String::new(), Some(file_path))
636        } else {
637            (parts.path.clone(), None)
638        };
639
640        // Required parameter: db_url
641        let db_url = params
642            .get("db_url")
643            .ok_or_else(|| CamelError::Config("db_url parameter is required".to_string()))?
644            .clone();
645
646        // Connection parameters - None when not set by URI param
647        let max_connections = params.get("maxConnections").and_then(|v| v.parse().ok());
648        let min_connections = params.get("minConnections").and_then(|v| v.parse().ok());
649        let idle_timeout_secs = params.get("idleTimeoutSecs").and_then(|v| v.parse().ok());
650        let max_lifetime_secs = params.get("maxLifetimeSecs").and_then(|v| v.parse().ok());
651
652        // Query parameters
653        let output_type = params
654            .get("outputType")
655            .map(|s| s.parse())
656            .transpose()?
657            .unwrap_or_default();
658        let placeholder = params
659            .get("placeholder")
660            .filter(|v| !v.is_empty())
661            .map(|v| {
662                if v.chars().count() != 1 {
663                    return Err(CamelError::InvalidUri(format!(
664                        "placeholder must be exactly one character, got '{}'",
665                        v
666                    )));
667                }
668                if !v.is_ascii() {
669                    return Err(CamelError::InvalidUri(
670                        "placeholder must be a single ASCII character".to_string(),
671                    ));
672                }
673                Ok(v.chars().next().unwrap()) // allow-unwrap
674            })
675            .transpose()?
676            .unwrap_or('#');
677        /// Parse a boolean URI parameter strictly.
678        ///
679        /// Accepts only `"true"` or `"false"` (case-insensitive). Any other value
680        /// returns `CamelError::InvalidUri` to prevent silent misconfiguration.
681        fn parse_bool_param(name: &str, value: &str) -> Result<bool, CamelError> {
682            if value.eq_ignore_ascii_case("true") {
683                Ok(true)
684            } else if value.eq_ignore_ascii_case("false") {
685                Ok(false)
686            } else {
687                Err(CamelError::InvalidUri(format!(
688                    "{} must be 'true' or 'false', got '{}'",
689                    name, value
690                )))
691            }
692        }
693
694        let use_placeholder = params
695            .get("usePlaceholder")
696            .map(|v| parse_bool_param("usePlaceholder", v))
697            .transpose()?
698            .unwrap_or(true);
699        let noop = params
700            .get("noop")
701            .map(|v| parse_bool_param("noop", v))
702            .transpose()?
703            .unwrap_or(false);
704        let in_separator = params
705            .get("inSeparator")
706            .map(|v| v.to_string())
707            .unwrap_or_else(|| ", ".to_string());
708        if in_separator.is_empty() {
709            return Err(CamelError::InvalidUri(
710                "inSeparator must not be empty".to_string(),
711            ));
712        }
713
714        // SQL-005: alwaysPopulateStatement
715        let always_populate_statement = params
716            .get("alwaysPopulateStatement")
717            .map(|v| parse_bool_param("alwaysPopulateStatement", v))
718            .transpose()?
719            .unwrap_or(false);
720
721        // SQL-011: allowNamedParameters
722        let allow_named_parameters = params
723            .get("allowNamedParameters")
724            .map(|v| parse_bool_param("allowNamedParameters", v))
725            .transpose()?
726            .unwrap_or(true);
727
728        // SQL-016: fetchSize
729        let fetch_size = params.get("fetchSize").and_then(|v| v.parse().ok());
730
731        // SQL-002: transactionMode
732        let transaction_mode = params
733            .get("transactionMode")
734            .map(|s| s.parse())
735            .transpose()?
736            .unwrap_or_default();
737
738        // Consumer parameters
739        let delay_ms = params
740            .get("delay")
741            .and_then(|v| v.parse().ok())
742            .unwrap_or(500);
743        let initial_delay_ms = params
744            .get("initialDelay")
745            .and_then(|v| v.parse().ok())
746            .unwrap_or(1000);
747        let max_messages_per_poll = params
748            .get("maxMessagesPerPoll")
749            .and_then(|v| v.parse().ok());
750        let on_consume = params.get("onConsume").cloned();
751        let on_consume_failed = params.get("onConsumeFailed").cloned();
752        let on_consume_batch_complete = params.get("onConsumeBatchComplete").cloned();
753        let route_empty_result_set = params
754            .get("routeEmptyResultSet")
755            .map(|v| parse_bool_param("routeEmptyResultSet", v))
756            .transpose()?
757            .unwrap_or(false);
758        let use_iterator = params
759            .get("useIterator")
760            .map(|v| parse_bool_param("useIterator", v))
761            .transpose()?
762            .unwrap_or(true);
763        let expected_update_count = params
764            .get("expectedUpdateCount")
765            .and_then(|v| v.parse().ok());
766        let break_batch_on_consume_fail = params
767            .get("breakBatchOnConsumeFail")
768            .map(|v| parse_bool_param("breakBatchOnConsumeFail", v))
769            .transpose()?
770            .unwrap_or(false);
771        let bridge_error_handler = params
772            .get("bridgeErrorHandler")
773            .map(|v| parse_bool_param("bridgeErrorHandler", v))
774            .transpose()?
775            .unwrap_or(false);
776
777        // SQL-015: repeatCount
778        let repeat_count = params.get("repeatCount").and_then(|v| v.parse().ok());
779
780        // SQL-017: processingStrategy
781        let processing_strategy = params
782            .get("processingStrategy")
783            .map(|s| s.parse())
784            .transpose()?
785            .unwrap_or_default();
786
787        // SQL-018: pollStrategy
788        let poll_strategy = params
789            .get("pollStrategy")
790            .map(|s| s.parse())
791            .transpose()?
792            .unwrap_or_default();
793
794        // Producer parameters
795        let batch = params
796            .get("batch")
797            .map(|v| parse_bool_param("batch", v))
798            .transpose()?
799            .unwrap_or(false);
800        let use_message_body_for_sql = params
801            .get("useMessageBodyForSql")
802            .map(|v| parse_bool_param("useMessageBodyForSql", v))
803            .transpose()?
804            .unwrap_or(false);
805        let ssl_mode = params.get("sslMode").cloned();
806        let ssl_root_cert = params.get("sslRootCert").cloned();
807        let ssl_cert = params.get("sslCert").cloned();
808        let ssl_key = params.get("sslKey").cloned();
809
810        // Parse retry policy from URI params
811        let mut retry = NetworkRetryPolicy::default();
812        let mut retry_set_from_uri = false;
813        if let Some(raw) = params.get("retryEnabled") {
814            retry.enabled = raw.parse::<bool>().map_err(|_| {
815                CamelError::InvalidUri(format!("retryEnabled must be a boolean, got '{raw}'"))
816            })?;
817            retry_set_from_uri = true;
818        }
819        if let Some(raw) = params.get("retryMaxAttempts") {
820            retry.max_attempts = raw.parse::<u32>().map_err(|_| {
821                CamelError::InvalidUri(format!("retryMaxAttempts must be a u32, got '{raw}'"))
822            })?;
823            retry_set_from_uri = true;
824        }
825        if let Some(raw) = params.get("retryInitialDelayMs") {
826            retry.initial_delay = Duration::from_millis(raw.parse::<u64>().map_err(|_| {
827                CamelError::InvalidUri(format!("retryInitialDelayMs must be a u64, got '{raw}'"))
828            })?);
829            retry_set_from_uri = true;
830        }
831        if let Some(raw) = params.get("retryMultiplier") {
832            retry.multiplier = raw.parse::<f64>().map_err(|_| {
833                CamelError::InvalidUri(format!("retryMultiplier must be a f64, got '{raw}'"))
834            })?;
835            retry_set_from_uri = true;
836        }
837        if let Some(raw) = params.get("retryMaxDelayMs") {
838            retry.max_delay = Duration::from_millis(raw.parse::<u64>().map_err(|_| {
839                CamelError::InvalidUri(format!("retryMaxDelayMs must be a u64, got '{raw}'"))
840            })?);
841            retry_set_from_uri = true;
842        }
843        if let Some(raw) = params.get("retryJitter") {
844            retry.jitter_factor = raw.parse::<f64>().map_err(|_| {
845                CamelError::InvalidUri(format!("retryJitter must be a f64, got '{raw}'"))
846            })?;
847            retry_set_from_uri = true;
848        }
849
850        Ok(Self {
851            db_url,
852            max_connections,
853            min_connections,
854            idle_timeout_secs,
855            max_lifetime_secs,
856            query,
857            source_path,
858            output_type,
859            placeholder,
860            use_placeholder,
861            noop,
862            in_separator,
863            always_populate_statement,
864            allow_named_parameters,
865            fetch_size,
866            transaction_mode,
867            delay_ms,
868            initial_delay_ms,
869            max_messages_per_poll,
870            on_consume,
871            on_consume_failed,
872            on_consume_batch_complete,
873            route_empty_result_set,
874            use_iterator,
875            expected_update_count,
876            break_batch_on_consume_fail,
877            bridge_error_handler,
878            repeat_count,
879            processing_strategy,
880            poll_strategy,
881            batch,
882            use_message_body_for_sql,
883            ssl_mode,
884            ssl_root_cert,
885            ssl_cert,
886            ssl_key,
887            retry,
888            retry_set_from_uri,
889        })
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use super::*;
896    use camel_component_api::NetworkRetryPolicy;
897
898    #[test]
899    fn config_defaults() {
900        let mut c =
901            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
902        c.resolve_defaults();
903        assert_eq!(c.query, "select 1");
904        assert_eq!(c.db_url, "postgres://localhost/test");
905        assert_eq!(c.max_connections, Some(5));
906        assert_eq!(c.min_connections, Some(1));
907        assert_eq!(c.idle_timeout_secs, Some(300));
908        assert_eq!(c.max_lifetime_secs, Some(1800));
909        assert_eq!(c.output_type, SqlOutputType::SelectList);
910        assert_eq!(c.placeholder, '#');
911        assert!(!c.noop);
912        assert_eq!(c.in_separator, ", ");
913        assert_eq!(c.delay_ms, 500);
914        assert_eq!(c.initial_delay_ms, 1000);
915        assert!(c.max_messages_per_poll.is_none());
916        assert!(c.on_consume.is_none());
917        assert!(c.on_consume_failed.is_none());
918        assert!(c.on_consume_batch_complete.is_none());
919        assert!(!c.route_empty_result_set);
920        assert!(c.use_iterator);
921        assert!(c.expected_update_count.is_none());
922        assert!(!c.break_batch_on_consume_fail);
923        assert!(!c.batch);
924        assert!(!c.use_message_body_for_sql);
925        assert!(c.ssl_mode.is_none());
926        assert!(c.ssl_root_cert.is_none());
927        assert!(c.ssl_cert.is_none());
928        assert!(c.ssl_key.is_none());
929        // SQL-005/SQL-011/SQL-016/SQL-002/SQL-015/SQL-017/SQL-018 defaults
930        assert!(!c.always_populate_statement);
931        assert!(c.allow_named_parameters);
932        assert!(c.fetch_size.is_none());
933        assert_eq!(c.transaction_mode, TransactionMode::Auto);
934        assert!(c.repeat_count.is_none());
935        assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
936        assert_eq!(c.poll_strategy, PollStrategy::Sequential);
937    }
938
939    #[test]
940    fn ssl_none_by_default() {
941        let c =
942            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
943        assert!(c.ssl_mode.is_none());
944        assert!(c.ssl_root_cert.is_none());
945        assert!(c.ssl_cert.is_none());
946        assert!(c.ssl_key.is_none());
947    }
948
949    #[test]
950    fn ssl_mode_from_uri() {
951        let c = SqlEndpointConfig::from_uri(
952            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
953        )
954        .unwrap();
955        assert_eq!(c.ssl_mode, Some("require".to_string()));
956        assert!(c.ssl_root_cert.is_none());
957    }
958
959    #[test]
960    fn ssl_all_params_from_uri() {
961        let c = SqlEndpointConfig::from_uri(
962            "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
963        )
964        .unwrap();
965        assert_eq!(c.ssl_mode, Some("require".to_string()));
966        assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
967        assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
968        assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
969    }
970
971    #[test]
972    fn ssl_global_applied_to_endpoint() {
973        let mut c =
974            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
975        let global = SqlGlobalConfig::default()
976            .with_ssl_mode("require")
977            .with_ssl_root_cert("/etc/ssl/ca.pem");
978        c.apply_defaults(&global);
979        assert_eq!(c.ssl_mode, Some("require".to_string()));
980        assert_eq!(c.ssl_root_cert, Some("/etc/ssl/ca.pem".to_string()));
981        assert!(c.ssl_cert.is_none());
982        assert!(c.ssl_key.is_none());
983    }
984
985    #[test]
986    fn ssl_uri_overrides_global() {
987        let mut c = SqlEndpointConfig::from_uri(
988            "sql:select 1?db_url=postgres://localhost/test&sslMode=verify-full",
989        )
990        .unwrap();
991        let global = SqlGlobalConfig::default().with_ssl_mode("require");
992        c.apply_defaults(&global);
993        assert_eq!(c.ssl_mode, Some("verify-full".to_string()));
994    }
995
996    #[test]
997    fn config_wrong_scheme() {
998        assert!(SqlEndpointConfig::from_uri("redis://localhost:6379").is_err());
999    }
1000
1001    #[test]
1002    fn config_missing_db_url() {
1003        assert!(SqlEndpointConfig::from_uri("sql:select 1").is_err());
1004    }
1005
1006    #[test]
1007    fn config_output_type_select_one() {
1008        let c = SqlEndpointConfig::from_uri(
1009            "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
1010        )
1011        .unwrap();
1012        assert_eq!(c.output_type, SqlOutputType::SelectOne);
1013    }
1014
1015    #[test]
1016    fn config_output_type_stream_list() {
1017        let c = SqlEndpointConfig::from_uri(
1018            "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
1019        )
1020        .unwrap();
1021        assert_eq!(c.output_type, SqlOutputType::StreamList);
1022    }
1023
1024    #[test]
1025    fn in_separator_default() {
1026        let c =
1027            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1028        assert_eq!(c.in_separator, ", ");
1029    }
1030
1031    #[test]
1032    fn in_separator_from_uri() {
1033        let c = SqlEndpointConfig::from_uri(
1034            "sql:select 1?db_url=postgres://localhost/test&inSeparator=;",
1035        )
1036        .unwrap();
1037        assert_eq!(c.in_separator, ";");
1038    }
1039
1040    #[test]
1041    fn in_separator_empty_rejected() {
1042        let result = SqlEndpointConfig::from_uri(
1043            "sql:select 1?db_url=postgres://localhost/test&inSeparator=",
1044        );
1045        assert!(result.is_err());
1046        let msg = format!("{:?}", result.unwrap_err());
1047        assert!(msg.contains("inSeparator") || msg.contains("empty"));
1048    }
1049
1050    #[test]
1051    fn config_consumer_options() {
1052        let c = SqlEndpointConfig::from_uri(
1053            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
1054        ).unwrap();
1055        assert_eq!(c.delay_ms, 2000);
1056        assert_eq!(c.initial_delay_ms, 500);
1057        assert_eq!(c.max_messages_per_poll, Some(10));
1058        assert_eq!(
1059            c.on_consume,
1060            Some("update t set done=true where id=:#id".to_string())
1061        );
1062        assert_eq!(
1063            c.on_consume_failed,
1064            Some("update t set failed=true where id=:#id".to_string())
1065        );
1066        assert_eq!(
1067            c.on_consume_batch_complete,
1068            Some("delete from t where done=true".to_string())
1069        );
1070        assert!(c.route_empty_result_set);
1071        assert!(!c.use_iterator);
1072        assert_eq!(c.expected_update_count, Some(1));
1073        assert!(c.break_batch_on_consume_fail);
1074        assert!(!c.bridge_error_handler);
1075    }
1076
1077    #[test]
1078    fn config_producer_options() {
1079        let c = SqlEndpointConfig::from_uri(
1080            "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
1081        ).unwrap();
1082        assert!(c.batch);
1083        assert!(c.use_message_body_for_sql);
1084        assert!(c.noop);
1085    }
1086
1087    #[test]
1088    fn config_pool_options() {
1089        let c = SqlEndpointConfig::from_uri(
1090            "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
1091        ).unwrap();
1092        assert_eq!(c.max_connections, Some(20));
1093        assert_eq!(c.min_connections, Some(3));
1094        assert_eq!(c.idle_timeout_secs, Some(600));
1095        assert_eq!(c.max_lifetime_secs, Some(3600));
1096    }
1097
1098    #[test]
1099    fn config_query_with_special_chars() {
1100        let c = SqlEndpointConfig::from_uri(
1101            "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
1102        )
1103        .unwrap();
1104        assert_eq!(
1105            c.query,
1106            "select * from users where name = :#name and age > #"
1107        );
1108    }
1109
1110    #[test]
1111    fn output_type_from_str() {
1112        assert_eq!(
1113            "SelectList".parse::<SqlOutputType>().unwrap(),
1114            SqlOutputType::SelectList
1115        );
1116        assert_eq!(
1117            "SelectOne".parse::<SqlOutputType>().unwrap(),
1118            SqlOutputType::SelectOne
1119        );
1120        assert_eq!(
1121            "StreamList".parse::<SqlOutputType>().unwrap(),
1122            SqlOutputType::StreamList
1123        );
1124        assert!("Invalid".parse::<SqlOutputType>().is_err());
1125    }
1126
1127    // SQL-014: file-not-found is now detected during async resolve_file_query(), not from_uri
1128    #[tokio::test]
1129    async fn config_file_not_found() {
1130        let mut config = SqlEndpointConfig::from_uri(
1131            "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
1132        )
1133        .expect("from_uri should defer file reading");
1134        // from_uri no longer reads the file — source_path is set, query is empty
1135        assert_eq!(
1136            config.source_path,
1137            Some("/nonexistent/path/query.sql".to_string())
1138        );
1139        assert!(config.query.is_empty());
1140
1141        // Error occurs during async resolution
1142        let result = config.resolve_file_query().await;
1143        assert!(result.is_err());
1144        let msg = format!("{:?}", result.unwrap_err());
1145        assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
1146    }
1147
1148    // SQL-014: file query is now resolved asynchronously
1149    #[tokio::test]
1150    async fn config_file_query() {
1151        use std::io::Write;
1152        let unique_name = format!(
1153            "test_sql_query_{}.sql",
1154            std::time::SystemTime::now()
1155                .duration_since(std::time::UNIX_EPOCH)
1156                .unwrap_or_default()
1157                .as_nanos()
1158        );
1159        let mut tmp = std::env::temp_dir();
1160        tmp.push(unique_name);
1161        {
1162            let mut f = std::fs::File::create(&tmp).unwrap();
1163            writeln!(f, "SELECT * FROM users").unwrap();
1164        }
1165        let uri = format!(
1166            "sql:file:{}?db_url=postgres://localhost/test",
1167            tmp.display()
1168        );
1169        let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1170        // query is empty until async resolution
1171        assert!(c.query.is_empty());
1172        assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1173
1174        // Resolve asynchronously
1175        c.resolve_file_query()
1176            .await
1177            .expect("file query should resolve");
1178        assert_eq!(c.query, "SELECT * FROM users");
1179        std::fs::remove_file(&tmp).ok();
1180    }
1181
1182    // New tests for config contract
1183    #[test]
1184    fn pool_fields_none_when_not_set() {
1185        let c =
1186            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1187        assert_eq!(c.max_connections, None);
1188        assert_eq!(c.min_connections, None);
1189        assert_eq!(c.idle_timeout_secs, None);
1190        assert_eq!(c.max_lifetime_secs, None);
1191    }
1192
1193    #[test]
1194    fn apply_defaults_fills_none() {
1195        let mut c =
1196            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1197        let global = SqlGlobalConfig {
1198            max_connections: 10,
1199            min_connections: 2,
1200            idle_timeout_secs: 600,
1201            max_lifetime_secs: 3600,
1202            ssl_mode: None,
1203            ssl_root_cert: None,
1204            ssl_cert: None,
1205            ssl_key: None,
1206            retry: NetworkRetryPolicy::default(),
1207        };
1208        c.apply_defaults(&global);
1209        assert_eq!(c.max_connections, Some(10));
1210        assert_eq!(c.min_connections, Some(2));
1211        assert_eq!(c.idle_timeout_secs, Some(600));
1212        assert_eq!(c.max_lifetime_secs, Some(3600));
1213        assert!(c.ssl_mode.is_none());
1214        assert!(c.ssl_root_cert.is_none());
1215        assert!(c.ssl_cert.is_none());
1216        assert!(c.ssl_key.is_none());
1217    }
1218
1219    #[test]
1220    fn apply_defaults_does_not_override() {
1221        let mut c = SqlEndpointConfig::from_uri(
1222            "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=5",
1223        )
1224        .unwrap();
1225        let global = SqlGlobalConfig {
1226            max_connections: 10,
1227            min_connections: 2,
1228            idle_timeout_secs: 600,
1229            max_lifetime_secs: 3600,
1230            ssl_mode: None,
1231            ssl_root_cert: None,
1232            ssl_cert: None,
1233            ssl_key: None,
1234            retry: NetworkRetryPolicy::default(),
1235        };
1236        c.apply_defaults(&global);
1237        // URI-set values should NOT be overridden
1238        assert_eq!(c.max_connections, Some(99));
1239        assert_eq!(c.min_connections, Some(5));
1240        // None fields should be filled from global
1241        assert_eq!(c.idle_timeout_secs, Some(600));
1242        assert_eq!(c.max_lifetime_secs, Some(3600));
1243    }
1244
1245    #[test]
1246    fn resolve_defaults_fills_remaining() {
1247        let mut c = SqlEndpointConfig::from_uri(
1248            "sql:select 1?db_url=postgres://localhost/test&maxConnections=7",
1249        )
1250        .unwrap();
1251        c.resolve_defaults();
1252        assert_eq!(c.max_connections, Some(7)); // from URI
1253        assert_eq!(c.min_connections, Some(1)); // from defaults
1254        assert_eq!(c.idle_timeout_secs, Some(300)); // from defaults
1255        assert_eq!(c.max_lifetime_secs, Some(1800)); // from defaults
1256    }
1257
1258    #[test]
1259    fn global_config_builder() {
1260        let c = SqlGlobalConfig::default()
1261            .with_max_connections(20)
1262            .with_min_connections(3)
1263            .with_idle_timeout_secs(600)
1264            .with_max_lifetime_secs(3600)
1265            .with_ssl_mode("require")
1266            .with_ssl_root_cert("/ca.pem")
1267            .with_ssl_cert("/cert.pem")
1268            .with_ssl_key("/key.pem");
1269        assert_eq!(c.max_connections, 20);
1270        assert_eq!(c.min_connections, 3);
1271        assert_eq!(c.idle_timeout_secs, 600);
1272        assert_eq!(c.max_lifetime_secs, 3600);
1273        assert_eq!(c.ssl_mode, Some("require".to_string()));
1274        assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
1275        assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
1276        assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
1277    }
1278
1279    #[test]
1280    fn enrich_postgres_ssl_mode() {
1281        let mut c = SqlEndpointConfig::from_uri(
1282            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1283        )
1284        .unwrap();
1285        c.resolve_defaults();
1286        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1287        assert!(url.contains("sslmode=require"), "got: {}", url);
1288    }
1289
1290    #[test]
1291    fn enrich_postgres_all_ssl() {
1292        let mut c = SqlEndpointConfig::from_uri(
1293            "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
1294        )
1295        .unwrap();
1296        c.resolve_defaults();
1297        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1298        assert!(url.contains("sslmode=require"), "got: {}", url);
1299        assert!(url.contains("sslrootcert="), "got: {}", url);
1300        assert!(url.contains("sslcert="), "got: {}", url);
1301        assert!(url.contains("sslkey="), "got: {}", url);
1302    }
1303
1304    #[test]
1305    fn enrich_mysql_ssl() {
1306        let mut c = SqlEndpointConfig::from_uri(
1307            "sql:select 1?db_url=mysql://localhost/test&sslMode=require",
1308        )
1309        .unwrap();
1310        c.resolve_defaults();
1311        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1312        assert!(url.contains("ssl-mode=require"), "got: {}", url);
1313    }
1314
1315    #[test]
1316    fn enrich_existing_query_params() {
1317        let mut c = SqlEndpointConfig::from_uri(
1318            "sql:select 1?db_url=postgres://localhost/test?existing=1&sslMode=require",
1319        )
1320        .unwrap();
1321        c.resolve_defaults();
1322        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1323        assert!(url.contains("existing=1"), "got: {}", url);
1324        assert!(url.contains("sslmode=require"), "got: {}", url);
1325    }
1326
1327    #[test]
1328    fn enrich_override_existing() {
1329        let mut c = SqlEndpointConfig::from_uri(
1330            "sql:select 1?db_url=postgres://localhost/test?sslmode=allow&sslMode=require",
1331        )
1332        .unwrap();
1333        c.resolve_defaults();
1334        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1335        assert!(url.contains("sslmode=require"), "got: {}", url);
1336        assert!(!url.contains("sslmode=allow"), "got: {}", url);
1337    }
1338
1339    #[test]
1340    fn enrich_no_params() {
1341        let mut c =
1342            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1343        c.resolve_defaults();
1344        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1345        assert_eq!(url, "postgres://localhost/test");
1346    }
1347
1348    #[test]
1349    fn enrich_url_encodes_paths() {
1350        let mut c = SqlEndpointConfig::from_uri(
1351            "sql:select 1?db_url=postgres://localhost/test&sslRootCert=/path/to/my%20cert.pem",
1352        )
1353        .unwrap();
1354        c.resolve_defaults();
1355        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1356        assert!(url.contains("sslrootcert="), "got: {}", url);
1357    }
1358
1359    #[test]
1360    fn enrich_unsupported_scheme_returns_unchanged() {
1361        let mut c = SqlEndpointConfig::from_uri(
1362            "sql:select 1?db_url=sqlite://localhost/test.db&sslMode=require",
1363        )
1364        .unwrap();
1365        c.resolve_defaults();
1366        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1367        assert_eq!(url, "sqlite://localhost/test.db");
1368    }
1369
1370    #[test]
1371    fn enrich_invalid_url_returns_error() {
1372        let mut c = SqlEndpointConfig::from_uri(
1373            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1374        )
1375        .unwrap();
1376        c.resolve_defaults();
1377        let result = enrich_db_url_with_ssl("://not-a-valid-url", &c);
1378        assert!(result.is_err());
1379    }
1380
1381    // --- Phase B hardening tests ---
1382
1383    // SQL-010: Debug output redacts credentials
1384    #[test]
1385    fn debug_redacts_db_url_with_password() {
1386        let c = SqlEndpointConfig::from_uri(
1387            "sql:select 1?db_url=postgres://user:secret123@localhost/test",
1388        )
1389        .unwrap();
1390        let debug_output = format!("{:?}", c);
1391        assert!(
1392            !debug_output.contains("secret123"),
1393            "Debug output must not contain password: {}",
1394            debug_output
1395        );
1396        assert!(
1397            debug_output.contains("***"),
1398            "Debug output must contain redacted marker: {}",
1399            debug_output
1400        );
1401    }
1402
1403    #[test]
1404    fn debug_redacts_ssl_key() {
1405        let c = SqlEndpointConfig::from_uri(
1406            "sql:select 1?db_url=postgres://localhost/test&sslKey=/secret/key.pem",
1407        )
1408        .unwrap();
1409        let debug_output = format!("{:?}", c);
1410        assert!(
1411            !debug_output.contains("/secret/key.pem"),
1412            "Debug output must not contain ssl_key path: {}",
1413            debug_output
1414        );
1415    }
1416
1417    #[test]
1418    fn debug_global_config_redacts_ssl_key() {
1419        let c = SqlGlobalConfig::default().with_ssl_key("/secret/key.pem");
1420        let debug_output = format!("{:?}", c);
1421        assert!(
1422            !debug_output.contains("/secret/key.pem"),
1423            "Debug output must not contain ssl_key path: {}",
1424            debug_output
1425        );
1426        assert!(
1427            debug_output.contains("***"),
1428            "Debug output must contain redacted marker: {}",
1429            debug_output
1430        );
1431    }
1432
1433    #[test]
1434    fn redact_db_url_with_credentials() {
1435        assert_eq!(
1436            redact_db_url("postgres://user:pass@host/db"),
1437            "postgres://***:***@host/db"
1438        );
1439    }
1440
1441    #[test]
1442    fn redact_db_url_without_credentials() {
1443        assert_eq!(redact_db_url("sqlite::memory:"), "sqlite::memory:");
1444    }
1445
1446    #[test]
1447    fn redact_db_url_invalid_returns_original() {
1448        assert_eq!(redact_db_url("not-a-url"), "not-a-url");
1449    }
1450
1451    // SQL-004: usePlaceholder parsing
1452    #[test]
1453    fn use_placeholder_defaults_to_true() {
1454        let c =
1455            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1456        assert!(c.use_placeholder);
1457    }
1458
1459    #[test]
1460    fn use_placeholder_false_from_uri() {
1461        let c = SqlEndpointConfig::from_uri(
1462            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=false",
1463        )
1464        .unwrap();
1465        assert!(!c.use_placeholder);
1466    }
1467
1468    #[test]
1469    fn use_placeholder_true_from_uri() {
1470        let c = SqlEndpointConfig::from_uri(
1471            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=true",
1472        )
1473        .unwrap();
1474        assert!(c.use_placeholder);
1475    }
1476
1477    // SQL-004: strict boolean parsing — invalid values rejected
1478    #[test]
1479    fn use_placeholder_rejects_invalid_value() {
1480        let result = SqlEndpointConfig::from_uri(
1481            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=1",
1482        );
1483        assert!(result.is_err());
1484        let msg = format!("{:?}", result.unwrap_err());
1485        assert!(msg.contains("usePlaceholder") && msg.contains("true") && msg.contains("false"));
1486    }
1487
1488    #[test]
1489    fn use_placeholder_rejects_typo_tru() {
1490        let result = SqlEndpointConfig::from_uri(
1491            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=tru",
1492        );
1493        assert!(result.is_err());
1494    }
1495
1496    #[test]
1497    fn use_placeholder_rejects_yes() {
1498        let result = SqlEndpointConfig::from_uri(
1499            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=yes",
1500        );
1501        assert!(result.is_err());
1502    }
1503
1504    #[test]
1505    fn noop_rejects_invalid_value() {
1506        let result =
1507            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&noop=1");
1508        assert!(result.is_err());
1509        let msg = format!("{:?}", result.unwrap_err());
1510        assert!(msg.contains("noop"));
1511    }
1512
1513    #[test]
1514    fn batch_rejects_invalid_value() {
1515        let result =
1516            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&batch=yes");
1517        assert!(result.is_err());
1518        let msg = format!("{:?}", result.unwrap_err());
1519        assert!(msg.contains("batch"));
1520    }
1521
1522    #[test]
1523    fn route_empty_result_set_rejects_invalid_value() {
1524        let result = SqlEndpointConfig::from_uri(
1525            "sql:select 1?db_url=postgres://localhost/test&routeEmptyResultSet=on",
1526        );
1527        assert!(result.is_err());
1528    }
1529
1530    #[test]
1531    fn use_iterator_rejects_invalid_value() {
1532        let result = SqlEndpointConfig::from_uri(
1533            "sql:select 1?db_url=postgres://localhost/test&useIterator=1",
1534        );
1535        assert!(result.is_err());
1536    }
1537
1538    #[test]
1539    fn break_batch_on_consume_fail_rejects_invalid_value() {
1540        let result = SqlEndpointConfig::from_uri(
1541            "sql:select 1?db_url=postgres://localhost/test&breakBatchOnConsumeFail=yes",
1542        );
1543        assert!(result.is_err());
1544    }
1545
1546    #[test]
1547    fn use_message_body_for_sql_rejects_invalid_value() {
1548        let result = SqlEndpointConfig::from_uri(
1549            "sql:select 1?db_url=postgres://localhost/test&useMessageBodyForSql=1",
1550        );
1551        assert!(result.is_err());
1552    }
1553
1554    // Case-insensitive true/false still works
1555    #[test]
1556    fn boolean_params_case_insensitive() {
1557        let c = SqlEndpointConfig::from_uri(
1558            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=TRUE&noop=FALSE&batch=True&useIterator=False&bridgeErrorHandler=TRUE",
1559        )
1560        .unwrap();
1561        assert!(c.use_placeholder);
1562        assert!(!c.noop);
1563        assert!(c.batch);
1564        assert!(!c.use_iterator);
1565        assert!(c.bridge_error_handler);
1566    }
1567
1568    // SQL-022: multi-char placeholder rejected
1569    #[test]
1570    fn multi_char_placeholder_rejected() {
1571        let result = SqlEndpointConfig::from_uri(
1572            "sql:select 1?db_url=postgres://localhost/test&placeholder=##",
1573        );
1574        assert!(result.is_err());
1575        let msg = format!("{:?}", result.unwrap_err());
1576        assert!(msg.contains("placeholder") && msg.contains("one character"));
1577    }
1578
1579    #[test]
1580    fn non_ascii_placeholder_rejected() {
1581        let result = SqlEndpointConfig::from_uri(
1582            "sql:select 1?db_url=postgres://localhost/test&placeholder=%C2%A2",
1583        );
1584        assert!(result.is_err());
1585    }
1586
1587    #[test]
1588    fn single_char_placeholder_accepted() {
1589        let c = SqlEndpointConfig::from_uri(
1590            "sql:select 1?db_url=postgres://localhost/test&placeholder=$",
1591        )
1592        .unwrap();
1593        assert_eq!(c.placeholder, '$');
1594    }
1595
1596    #[test]
1597    fn empty_placeholder_falls_back_to_default() {
1598        // Empty string is filtered out by the original logic — falls back to '#'
1599        let c = SqlEndpointConfig::from_uri(
1600            "sql:select 1?db_url=postgres://localhost/test&placeholder=",
1601        )
1602        .unwrap();
1603        assert_eq!(c.placeholder, '#');
1604    }
1605
1606    // SQL-014: file-based SQL config test (verifies async resolution and caching)
1607    #[tokio::test]
1608    async fn file_query_cached_in_config() {
1609        use std::io::Write;
1610        let unique_name = format!(
1611            "test_sql_cached_{}.sql",
1612            std::time::SystemTime::now()
1613                .duration_since(std::time::UNIX_EPOCH)
1614                .unwrap_or_default()
1615                .as_nanos()
1616        );
1617        let mut tmp = std::env::temp_dir();
1618        tmp.push(unique_name);
1619        {
1620            let mut f = std::fs::File::create(&tmp).unwrap();
1621            writeln!(f, "SELECT * FROM cached_test").unwrap();
1622        }
1623        let uri = format!(
1624            "sql:file:{}?db_url=postgres://localhost/test",
1625            tmp.display()
1626        );
1627        let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1628        // Query is empty before async resolution
1629        assert!(c.query.is_empty());
1630        assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1631
1632        // Resolve asynchronously — query is cached in config
1633        c.resolve_file_query()
1634            .await
1635            .expect("resolve should succeed");
1636        assert_eq!(c.query, "SELECT * FROM cached_test");
1637
1638        // Delete the file — config still has the query
1639        std::fs::remove_file(&tmp).ok();
1640        assert_eq!(c.query, "SELECT * FROM cached_test");
1641    }
1642
1643    // --- H-03 audit sweep tests ---
1644
1645    // SQL-005: alwaysPopulateStatement
1646    #[test]
1647    fn always_populate_statement_defaults_to_false() {
1648        let c =
1649            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1650        assert!(!c.always_populate_statement);
1651    }
1652
1653    #[test]
1654    fn always_populate_statement_from_uri() {
1655        let c = SqlEndpointConfig::from_uri(
1656            "sql:select 1?db_url=postgres://localhost/test&alwaysPopulateStatement=true",
1657        )
1658        .unwrap();
1659        assert!(c.always_populate_statement);
1660    }
1661
1662    // SQL-011: allowNamedParameters
1663    #[test]
1664    fn allow_named_parameters_defaults_to_true() {
1665        let c =
1666            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1667        assert!(c.allow_named_parameters);
1668    }
1669
1670    #[test]
1671    fn allow_named_parameters_false_from_uri() {
1672        let c = SqlEndpointConfig::from_uri(
1673            "sql:select 1?db_url=postgres://localhost/test&allowNamedParameters=false",
1674        )
1675        .unwrap();
1676        assert!(!c.allow_named_parameters);
1677    }
1678
1679    // SQL-016: fetchSize
1680    #[test]
1681    fn fetch_size_defaults_to_none() {
1682        let c =
1683            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1684        assert!(c.fetch_size.is_none());
1685    }
1686
1687    #[test]
1688    fn fetch_size_from_uri() {
1689        let c = SqlEndpointConfig::from_uri(
1690            "sql:select 1?db_url=postgres://localhost/test&fetchSize=1000",
1691        )
1692        .unwrap();
1693        assert_eq!(c.fetch_size, Some(1000));
1694    }
1695
1696    // SQL-002: transactionMode
1697    #[test]
1698    fn transaction_mode_defaults_to_auto() {
1699        let c =
1700            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1701        assert_eq!(c.transaction_mode, TransactionMode::Auto);
1702    }
1703
1704    #[test]
1705    fn transaction_mode_managed_from_uri() {
1706        let c = SqlEndpointConfig::from_uri(
1707            "sql:select 1?db_url=postgres://localhost/test&transactionMode=Managed",
1708        )
1709        .unwrap();
1710        assert_eq!(c.transaction_mode, TransactionMode::Managed);
1711    }
1712
1713    #[test]
1714    fn transaction_mode_invalid_rejected() {
1715        let result = SqlEndpointConfig::from_uri(
1716            "sql:select 1?db_url=postgres://localhost/test&transactionMode=Invalid",
1717        );
1718        assert!(result.is_err());
1719    }
1720
1721    // SQL-015: repeatCount
1722    #[test]
1723    fn repeat_count_defaults_to_none() {
1724        let c =
1725            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1726        assert!(c.repeat_count.is_none());
1727    }
1728
1729    #[test]
1730    fn repeat_count_from_uri() {
1731        let c = SqlEndpointConfig::from_uri(
1732            "sql:select 1?db_url=postgres://localhost/test&repeatCount=10",
1733        )
1734        .unwrap();
1735        assert_eq!(c.repeat_count, Some(10));
1736    }
1737
1738    // SQL-017: processingStrategy
1739    #[test]
1740    fn processing_strategy_defaults_to_direct() {
1741        let c =
1742            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1743        assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
1744    }
1745
1746    #[test]
1747    fn processing_strategy_scheduled_from_uri() {
1748        let c = SqlEndpointConfig::from_uri(
1749            "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Scheduled",
1750        )
1751        .unwrap();
1752        assert_eq!(c.processing_strategy, ProcessingStrategy::Scheduled);
1753    }
1754
1755    #[test]
1756    fn processing_strategy_invalid_rejected() {
1757        let result = SqlEndpointConfig::from_uri(
1758            "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Invalid",
1759        );
1760        assert!(result.is_err());
1761    }
1762
1763    // SQL-018: pollStrategy
1764    #[test]
1765    fn poll_strategy_defaults_to_sequential() {
1766        let c =
1767            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1768        assert_eq!(c.poll_strategy, PollStrategy::Sequential);
1769    }
1770
1771    #[test]
1772    fn poll_strategy_burst_from_uri() {
1773        let c = SqlEndpointConfig::from_uri(
1774            "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Burst",
1775        )
1776        .unwrap();
1777        assert_eq!(c.poll_strategy, PollStrategy::Burst);
1778    }
1779
1780    #[test]
1781    fn poll_strategy_invalid_rejected() {
1782        let result = SqlEndpointConfig::from_uri(
1783            "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Invalid",
1784        );
1785        assert!(result.is_err());
1786    }
1787
1788    // ── RetryPolicy (rc-ddl) ──────────────────────────────────────────────
1789
1790    #[test]
1791    fn sql_endpoint_config_has_retry_policy() {
1792        let cfg = SqlEndpointConfig::from_uri(
1793            "sql:select 1?db_url=sqlite::memory:&retryMaxAttempts=3&retryInitialDelayMs=500",
1794        )
1795        .expect("parse");
1796        assert_eq!(cfg.retry.max_attempts, 3);
1797        assert_eq!(
1798            cfg.retry.initial_delay,
1799            std::time::Duration::from_millis(500)
1800        );
1801        assert!(cfg.retry.enabled);
1802    }
1803
1804    #[test]
1805    fn sql_endpoint_config_retry_defaults_when_unspecified() {
1806        let cfg =
1807            SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").expect("parse");
1808        // When URI has no retry params, retry defaults to NetworkRetryPolicy::default()
1809        assert!(cfg.retry.enabled);
1810        assert_eq!(cfg.retry.max_attempts, 10); // default
1811    }
1812
1813    #[test]
1814    fn sql_global_config_has_retry_default() {
1815        let cfg = SqlGlobalConfig::default();
1816        assert!(cfg.retry.enabled);
1817    }
1818
1819    #[test]
1820    fn retry_policy_parse_full_uri_params() {
1821        let cfg = SqlEndpointConfig::from_uri(
1822            "sql:select 1?db_url=sqlite::memory:&retryEnabled=false&retryMaxAttempts=7&retryInitialDelayMs=1000&retryMultiplier=3.0&retryMaxDelayMs=60000&retryJitter=0.5",
1823        )
1824        .expect("parse");
1825        assert!(!cfg.retry.enabled);
1826        assert_eq!(cfg.retry.max_attempts, 7);
1827        assert_eq!(
1828            cfg.retry.initial_delay,
1829            std::time::Duration::from_millis(1000)
1830        );
1831        assert!((cfg.retry.multiplier - 3.0).abs() < f64::EPSILON);
1832        assert_eq!(cfg.retry.max_delay, std::time::Duration::from_millis(60000));
1833        assert!((cfg.retry.jitter_factor - 0.5).abs() < f64::EPSILON);
1834    }
1835
1836    #[test]
1837    fn retry_policy_from_uri_survives_apply_defaults_with_global() {
1838        let mut ep = SqlEndpointConfig::from_uri(
1839            "sql:select 1?db_url=sqlite::memory:&retryMaxAttempts=10&retryInitialDelayMs=500",
1840        )
1841        .expect("parse");
1842        let global = SqlGlobalConfig::default(); // global has default retry (max_attempts=10)
1843        ep.apply_defaults(&global);
1844        // URI values survive when retry_set_from_uri is true
1845        assert_eq!(ep.retry.max_attempts, 10);
1846        assert_eq!(
1847            ep.retry.initial_delay,
1848            std::time::Duration::from_millis(500)
1849        );
1850    }
1851
1852    #[test]
1853    fn retry_policy_falls_back_to_global_when_uri_has_no_retry_params() {
1854        let mut ep =
1855            SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").expect("parse");
1856        let mut global = SqlGlobalConfig::default();
1857        global.retry.max_attempts = 7;
1858        ep.apply_defaults(&global);
1859        // When URI has no retry params, global fills the gap
1860        assert_eq!(ep.retry.max_attempts, 7);
1861    }
1862}