Skip to main content

camel_component_sql/
config.rs

1use std::str::FromStr;
2
3use camel_component_api::CamelError;
4use camel_component_api::{UriComponents, UriConfig, parse_uri};
5use tracing::warn;
6
7/// Redaction helper: returns `Some("***")` if the option is `Some`, otherwise `None`.
8fn redacted_opt(opt: &Option<String>) -> Option<&'static str> {
9    if opt.is_some() { Some("***") } else { None }
10}
11
12/// Redacts the user:password portion of a database URL for safe display.
13/// Returns `"scheme://***@host/db"` for URLs with userinfo, or the original URL otherwise.
14pub fn redact_db_url(db_url: &str) -> String {
15    match url::Url::parse(db_url) {
16        Ok(mut parsed) => {
17            if parsed.username().is_empty() && parsed.password().is_none() {
18                return db_url.to_string();
19            }
20            let _ = parsed.set_username("***");
21            let _ = parsed.set_password(Some("***"));
22            parsed.to_string()
23        }
24        Err(_) => db_url.to_string(),
25    }
26}
27
28/// Output type for SQL query results.
29#[derive(Debug, Clone, PartialEq, Default)]
30pub enum SqlOutputType {
31    /// Return all rows as a list.
32    #[default]
33    SelectList,
34    /// Return a single row (first result).
35    SelectOne,
36    /// Stream results as an async iterator.
37    StreamList,
38}
39
40impl FromStr for SqlOutputType {
41    type Err = CamelError;
42
43    fn from_str(s: &str) -> Result<Self, Self::Err> {
44        match s {
45            "SelectList" => Ok(SqlOutputType::SelectList),
46            "SelectOne" => Ok(SqlOutputType::SelectOne),
47            "StreamList" => Ok(SqlOutputType::StreamList),
48            _ => Err(CamelError::InvalidUri(format!(
49                "Unknown output type: {}",
50                s
51            ))),
52        }
53    }
54}
55
56/// Transaction mode for SQL operations.
57///
58/// - `Auto`: Each statement auto-commits (default, current behavior).
59/// - `Managed`: Explicit transaction boundaries (future; currently logs a warning
60///   and falls back to Auto).
61///
62// TODO(SQL-002): managed transaction mode — implement explicit transaction boundaries
63#[derive(Debug, Clone, PartialEq, Default)]
64pub enum TransactionMode {
65    /// Auto-commit each statement (default).
66    #[default]
67    Auto,
68    /// Managed transactions — not yet implemented.
69    Managed,
70}
71
72impl FromStr for TransactionMode {
73    type Err = CamelError;
74
75    fn from_str(s: &str) -> Result<Self, Self::Err> {
76        match s {
77            "Auto" => Ok(TransactionMode::Auto),
78            "Managed" => Ok(TransactionMode::Managed),
79            _ => Err(CamelError::InvalidUri(format!(
80                "Unknown transaction mode: {}. Expected 'Auto' or 'Managed'",
81                s
82            ))),
83        }
84    }
85}
86
87impl std::fmt::Display for TransactionMode {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        match self {
90            TransactionMode::Auto => write!(f, "Auto"),
91            TransactionMode::Managed => write!(f, "Managed"),
92        }
93    }
94}
95
96/// Processing strategy for SQL consumers.
97#[derive(Debug, Clone, PartialEq, Default)]
98pub enum ProcessingStrategy {
99    /// Process rows directly in the polling task (default).
100    #[default]
101    Direct,
102    /// Schedule processing via a separate task (deferred execution).
103    Scheduled,
104}
105
106impl FromStr for ProcessingStrategy {
107    type Err = CamelError;
108
109    fn from_str(s: &str) -> Result<Self, Self::Err> {
110        match s {
111            "Direct" => Ok(ProcessingStrategy::Direct),
112            "Scheduled" => Ok(ProcessingStrategy::Scheduled),
113            _ => Err(CamelError::InvalidUri(format!(
114                "Unknown processing strategy: {}. Expected 'Direct' or 'Scheduled'",
115                s
116            ))),
117        }
118    }
119}
120
121impl std::fmt::Display for ProcessingStrategy {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        match self {
124            ProcessingStrategy::Direct => write!(f, "Direct"),
125            ProcessingStrategy::Scheduled => write!(f, "Scheduled"),
126        }
127    }
128}
129
130/// Poll strategy for SQL consumers.
131#[derive(Debug, Clone, PartialEq, Default)]
132pub enum PollStrategy {
133    /// Poll sequentially with delay between polls (default).
134    #[default]
135    Sequential,
136    /// Poll in bursts — execute multiple queries in rapid succession.
137    Burst,
138}
139
140impl FromStr for PollStrategy {
141    type Err = CamelError;
142
143    fn from_str(s: &str) -> Result<Self, Self::Err> {
144        match s {
145            "Sequential" => Ok(PollStrategy::Sequential),
146            "Burst" => Ok(PollStrategy::Burst),
147            _ => Err(CamelError::InvalidUri(format!(
148                "Unknown poll strategy: {}. Expected 'Sequential' or 'Burst'",
149                s
150            ))),
151        }
152    }
153}
154
155impl std::fmt::Display for PollStrategy {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        match self {
158            PollStrategy::Sequential => write!(f, "Sequential"),
159            PollStrategy::Burst => write!(f, "Burst"),
160        }
161    }
162}
163
164/// Global configuration for SQL component.
165///
166/// This struct supports serde deserialization with defaults and builder methods.
167/// It holds pool configuration that can be applied as defaults to endpoints.
168///
169/// **Security note:** `Debug` implementation redacts sensitive fields (SSL key paths).
170#[derive(Clone, PartialEq, serde::Deserialize)]
171#[serde(default)]
172pub struct SqlGlobalConfig {
173    pub max_connections: u32,
174    pub min_connections: u32,
175    pub idle_timeout_secs: u64,
176    pub max_lifetime_secs: u64,
177    // SSL/TLS
178    pub ssl_mode: Option<String>,
179    pub ssl_root_cert: Option<String>,
180    pub ssl_cert: Option<String>,
181    pub ssl_key: Option<String>,
182}
183
184impl std::fmt::Debug for SqlGlobalConfig {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        f.debug_struct("SqlGlobalConfig")
187            .field("max_connections", &self.max_connections)
188            .field("min_connections", &self.min_connections)
189            .field("idle_timeout_secs", &self.idle_timeout_secs)
190            .field("max_lifetime_secs", &self.max_lifetime_secs)
191            .field("ssl_mode", &self.ssl_mode)
192            .field("ssl_root_cert", &self.ssl_root_cert)
193            .field("ssl_cert", &self.ssl_cert)
194            .field("ssl_key", &redacted_opt(&self.ssl_key))
195            .finish()
196    }
197}
198
199impl Default for SqlGlobalConfig {
200    fn default() -> Self {
201        Self {
202            max_connections: 5,
203            min_connections: 1,
204            idle_timeout_secs: 300,
205            max_lifetime_secs: 1800,
206            ssl_mode: None,
207            ssl_root_cert: None,
208            ssl_cert: None,
209            ssl_key: None,
210        }
211    }
212}
213
214impl SqlGlobalConfig {
215    pub fn new() -> Self {
216        Self::default()
217    }
218
219    pub fn with_max_connections(mut self, value: u32) -> Self {
220        self.max_connections = value;
221        self
222    }
223
224    pub fn with_min_connections(mut self, value: u32) -> Self {
225        self.min_connections = value;
226        self
227    }
228
229    pub fn with_idle_timeout_secs(mut self, value: u64) -> Self {
230        self.idle_timeout_secs = value;
231        self
232    }
233
234    pub fn with_max_lifetime_secs(mut self, value: u64) -> Self {
235        self.max_lifetime_secs = value;
236        self
237    }
238
239    pub fn with_ssl_mode(mut self, value: impl Into<String>) -> Self {
240        self.ssl_mode = Some(value.into());
241        self
242    }
243
244    pub fn with_ssl_root_cert(mut self, value: impl Into<String>) -> Self {
245        self.ssl_root_cert = Some(value.into());
246        self
247    }
248
249    pub fn with_ssl_cert(mut self, value: impl Into<String>) -> Self {
250        self.ssl_cert = Some(value.into());
251        self
252    }
253
254    pub fn with_ssl_key(mut self, value: impl Into<String>) -> Self {
255        self.ssl_key = Some(value.into());
256        self
257    }
258}
259
260/// Configuration for SQL component endpoints.
261///
262/// URI format: `sql:<query>?db_url=<url>&param1=val1&param2=val2`
263///
264/// The query can be inline SQL or a file reference with `file:` prefix:
265/// - `sql:SELECT * FROM users?db_url=...` - inline SQL
266/// - `sql:file:/path/to/query.sql?db_url=...` - read SQL from file
267///
268/// **Note on file-based queries (SQL-014):** When the query path starts with `file:`,
269/// the file is NOT read synchronously during `from_uri()`. Instead, the file path is
270/// stored in `source_path` and the query is resolved asynchronously via `resolve_file_query()`
271/// during async initialization (producer pool init or consumer start). This avoids
272/// blocking I/O in the synchronous URI parsing path.
273///
274/// **Security note:** `Debug` implementation redacts the `db_url` (which may contain credentials)
275/// and `ssl_key` path. Use `redact_db_url()` for safe logging of database URLs.
276#[derive(Clone)]
277pub struct SqlEndpointConfig {
278    // Connection
279    /// Database connection URL (required).
280    pub db_url: String,
281    /// Maximum connections in the pool. None = use global default.
282    pub max_connections: Option<u32>,
283    /// Minimum connections in the pool. None = use global default.
284    pub min_connections: Option<u32>,
285    /// Idle timeout in seconds. None = use global default.
286    pub idle_timeout_secs: Option<u64>,
287    /// Maximum connection lifetime in seconds. None = use global default.
288    pub max_lifetime_secs: Option<u64>,
289
290    // Query
291    /// The SQL query (from URI path or file).
292    pub query: String,
293    /// Path to the file containing the SQL query (when using `file:` prefix).
294    pub source_path: Option<String>,
295    /// Output type for query results. Default: SelectList.
296    pub output_type: SqlOutputType,
297    /// Placeholder character for parameters. Default: '#'.
298    pub placeholder: char,
299    /// If true, process parameter placeholders in queries. Default: true.
300    /// When false, queries are executed as-is without template parsing.
301    pub use_placeholder: bool,
302    /// If true, don't execute the query (dry run). Default: false.
303    pub noop: bool,
304    /// Separator for IN clause expansion. Default: ", ".
305    pub in_separator: String,
306
307    // SQL-005: always populate statement even if body is null/empty
308    /// If true, always bind parameters even if the exchange body is null/empty
309    /// (uses empty defaults). Default: false.
310    pub always_populate_statement: bool,
311
312    // SQL-011: allow named parameters
313    /// If true, recognize `:name` style placeholders and map them from exchange
314    /// headers or body fields. Default: true.
315    pub allow_named_parameters: bool,
316
317    // SQL-016: fetch size hint
318    /// Fetch size hint for query results. None = driver default.
319    pub fetch_size: Option<u32>,
320
321    // SQL-002: transaction mode
322    /// Transaction mode for SQL operations. Default: Auto.
323    pub transaction_mode: TransactionMode,
324
325    // Consumer (polling)
326    /// Delay between polls in milliseconds. Default: 500.
327    pub delay_ms: u64,
328    /// Initial delay before first poll in milliseconds. Default: 1000.
329    pub initial_delay_ms: u64,
330    /// Maximum messages per poll.
331    pub max_messages_per_poll: Option<i32>,
332    /// SQL to execute after consuming each message.
333    pub on_consume: Option<String>,
334    /// SQL to execute if consumption fails.
335    pub on_consume_failed: Option<String>,
336    /// SQL to execute after consuming a batch.
337    pub on_consume_batch_complete: Option<String>,
338    /// Route empty result sets. Default: false.
339    pub route_empty_result_set: bool,
340    /// Use iterator for results. Default: true.
341    pub use_iterator: bool,
342    /// Expected number of rows affected.
343    pub expected_update_count: Option<i64>,
344    /// Break batch on consume failure. Default: false.
345    pub break_batch_on_consume_fail: bool,
346    /// Bridge poll errors into route error handling. Default: false.
347    pub bridge_error_handler: bool,
348
349    // SQL-015: repeat count for consumer polling
350    /// When set, the consumer only polls up to `repeat_count` times before stopping.
351    /// None = poll indefinitely (default).
352    pub repeat_count: Option<u32>,
353
354    // SQL-017: processing strategy
355    /// Processing strategy for consumer. Default: Direct.
356    pub processing_strategy: ProcessingStrategy,
357
358    // SQL-018: poll strategy
359    /// Poll strategy for consumer. Default: Sequential.
360    pub poll_strategy: PollStrategy,
361
362    // Producer
363    /// Enable batch mode. Default: false.
364    pub batch: bool,
365    /// Use message body for SQL. Default: false.
366    pub use_message_body_for_sql: bool,
367
368    // SSL/TLS
369    /// SSL mode for the connection. None = use global default.
370    pub ssl_mode: Option<String>,
371    /// Path to SSL root certificate. None = use global default.
372    pub ssl_root_cert: Option<String>,
373    /// Path to SSL client certificate. None = use global default.
374    pub ssl_cert: Option<String>,
375    /// Path to SSL client key. None = use global default.
376    pub ssl_key: Option<String>,
377}
378
379impl std::fmt::Debug for SqlEndpointConfig {
380    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381        f.debug_struct("SqlEndpointConfig")
382            .field("db_url", &redact_db_url(&self.db_url))
383            .field("max_connections", &self.max_connections)
384            .field("min_connections", &self.min_connections)
385            .field("idle_timeout_secs", &self.idle_timeout_secs)
386            .field("max_lifetime_secs", &self.max_lifetime_secs)
387            .field("query", &self.query)
388            .field("source_path", &self.source_path)
389            .field("output_type", &self.output_type)
390            .field("placeholder", &self.placeholder)
391            .field("use_placeholder", &self.use_placeholder)
392            .field("noop", &self.noop)
393            .field("in_separator", &self.in_separator)
394            .field("always_populate_statement", &self.always_populate_statement)
395            .field("allow_named_parameters", &self.allow_named_parameters)
396            .field("fetch_size", &self.fetch_size)
397            .field("transaction_mode", &self.transaction_mode)
398            .field("delay_ms", &self.delay_ms)
399            .field("initial_delay_ms", &self.initial_delay_ms)
400            .field("max_messages_per_poll", &self.max_messages_per_poll)
401            .field("on_consume", &self.on_consume)
402            .field("on_consume_failed", &self.on_consume_failed)
403            .field("on_consume_batch_complete", &self.on_consume_batch_complete)
404            .field("route_empty_result_set", &self.route_empty_result_set)
405            .field("use_iterator", &self.use_iterator)
406            .field("expected_update_count", &self.expected_update_count)
407            .field(
408                "break_batch_on_consume_fail",
409                &self.break_batch_on_consume_fail,
410            )
411            .field("bridge_error_handler", &self.bridge_error_handler)
412            .field("repeat_count", &self.repeat_count)
413            .field("processing_strategy", &self.processing_strategy)
414            .field("poll_strategy", &self.poll_strategy)
415            .field("batch", &self.batch)
416            .field("use_message_body_for_sql", &self.use_message_body_for_sql)
417            .field("ssl_mode", &self.ssl_mode)
418            .field("ssl_root_cert", &self.ssl_root_cert)
419            .field("ssl_cert", &self.ssl_cert)
420            .field("ssl_key", &redacted_opt(&self.ssl_key))
421            .finish()
422    }
423}
424
425impl SqlEndpointConfig {
426    /// Apply defaults from global config, filling None fields without overriding.
427    pub fn apply_defaults(&mut self, defaults: &SqlGlobalConfig) {
428        if self.max_connections.is_none() {
429            self.max_connections = Some(defaults.max_connections);
430        }
431        if self.min_connections.is_none() {
432            self.min_connections = Some(defaults.min_connections);
433        }
434        if self.idle_timeout_secs.is_none() {
435            self.idle_timeout_secs = Some(defaults.idle_timeout_secs);
436        }
437        if self.max_lifetime_secs.is_none() {
438            self.max_lifetime_secs = Some(defaults.max_lifetime_secs);
439        }
440        if self.ssl_mode.is_none() {
441            self.ssl_mode = defaults.ssl_mode.clone();
442        }
443        if self.ssl_root_cert.is_none() {
444            self.ssl_root_cert = defaults.ssl_root_cert.clone();
445        }
446        if self.ssl_cert.is_none() {
447            self.ssl_cert = defaults.ssl_cert.clone();
448        }
449        if self.ssl_key.is_none() {
450            self.ssl_key = defaults.ssl_key.clone();
451        }
452    }
453
454    /// Resolve any remaining None fields with built-in defaults.
455    pub fn resolve_defaults(&mut self) {
456        let defaults = SqlGlobalConfig::default();
457        self.apply_defaults(&defaults);
458    }
459
460    /// Asynchronously read the SQL query from the file referenced by `source_path`.
461    ///
462    /// This is the async replacement for the blocking `std::fs::read_to_string` that
463    /// was previously called in `from_uri()`. Must be invoked during async init
464    /// (producer pool init or consumer start) — never in a synchronous context.
465    ///
466    /// After this call, `self.query` contains the file content (trimmed) and
467    /// `self.source_path` is cleared to prevent re-reading.
468    pub async fn resolve_file_query(&mut self) -> Result<(), CamelError> {
469        if let Some(file_path) = self.source_path.take() {
470            let contents = tokio::fs::read_to_string(&file_path).await.map_err(|e| {
471                CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
472            })?;
473            self.query = contents.trim().to_string();
474            // Keep source_path as Some so tests can still verify the original path
475            self.source_path = Some(file_path);
476        }
477        Ok(())
478    }
479}
480
481struct SslParamMapping {
482    pg_key: &'static str,
483    mysql_key: &'static str,
484}
485
486const SSL_MAPPINGS: &[(&str, SslParamMapping)] = &[
487    (
488        "sslMode",
489        SslParamMapping {
490            pg_key: "sslmode",
491            mysql_key: "ssl-mode",
492        },
493    ),
494    (
495        "sslRootCert",
496        SslParamMapping {
497            pg_key: "sslrootcert",
498            mysql_key: "ssl-ca",
499        },
500    ),
501    (
502        "sslCert",
503        SslParamMapping {
504            pg_key: "sslcert",
505            mysql_key: "ssl-cert",
506        },
507    ),
508    (
509        "sslKey",
510        SslParamMapping {
511            pg_key: "sslkey",
512            mysql_key: "ssl-key",
513        },
514    ),
515];
516
517pub fn enrich_db_url_with_ssl(
518    db_url: &str,
519    config: &SqlEndpointConfig,
520) -> Result<String, CamelError> {
521    let ssl_params: Vec<(&str, &str)> = [
522        config.ssl_mode.as_deref().map(|v| ("sslMode", v)),
523        config.ssl_root_cert.as_deref().map(|v| ("sslRootCert", v)),
524        config.ssl_cert.as_deref().map(|v| ("sslCert", v)),
525        config.ssl_key.as_deref().map(|v| ("sslKey", v)),
526    ]
527    .into_iter()
528    .flatten()
529    .collect();
530
531    if ssl_params.is_empty() {
532        return Ok(db_url.to_string());
533    }
534
535    let mut parsed = url::Url::parse(db_url).map_err(|e| {
536        CamelError::InvalidUri(format!(
537            "Cannot parse database URL for SSL enrichment: {}",
538            e
539        ))
540    })?;
541
542    let scheme = parsed.scheme();
543    if scheme.starts_with("sqlite") {
544        warn!(
545            "SSL options configured for SQLite database URL, but SQLite does not support SSL/TLS; ignoring sslMode/sslRootCert/sslCert/sslKey"
546        );
547        return Ok(db_url.to_string());
548    }
549
550    if scheme != "postgres" && scheme != "postgresql" && scheme != "mysql" {
551        return Ok(db_url.to_string());
552    }
553    let is_mysql = scheme == "mysql";
554
555    let mut query_pairs = parsed.query_pairs().collect::<Vec<_>>();
556    for (camel_name, value) in &ssl_params {
557        if let Some((_, mapping)) = SSL_MAPPINGS.iter().find(|(name, _)| *name == *camel_name) {
558            let driver_key = if is_mysql {
559                mapping.mysql_key
560            } else {
561                mapping.pg_key
562            };
563
564            if let Some(pos) = query_pairs.iter().position(|(k, _)| k == driver_key) {
565                query_pairs[pos].1 = (*value).into();
566            } else {
567                query_pairs.push((driver_key.into(), (*value).into()));
568            }
569        }
570    }
571
572    {
573        let mut serializer = url::form_urlencoded::Serializer::new(String::new());
574        for (k, v) in query_pairs {
575            serializer.append_pair(&k, &v);
576        }
577        parsed.set_query(Some(&serializer.finish()));
578    }
579
580    Ok(parsed.to_string())
581}
582
583impl UriConfig for SqlEndpointConfig {
584    fn scheme() -> &'static str {
585        "sql"
586    }
587
588    fn from_uri(uri: &str) -> Result<Self, CamelError> {
589        let parts = parse_uri(uri)?;
590        Self::from_components(parts)
591    }
592
593    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
594        // Validate scheme
595        if parts.scheme != Self::scheme() {
596            return Err(CamelError::InvalidUri(format!(
597                "expected scheme '{}' but got '{}'",
598                Self::scheme(),
599                parts.scheme
600            )));
601        }
602
603        let params = &parts.params;
604
605        // Handle file: prefix for query
606        // SQL-014: defer file reading to async init path to avoid blocking I/O
607        // in the synchronous URI parsing path. Store the path; resolve_file_query()
608        // must be called during async initialization (producer pool init or consumer start).
609        let (query, source_path) = if parts.path.starts_with("file:") {
610            let file_path = parts.path.trim_start_matches("file:").to_string();
611            (String::new(), Some(file_path))
612        } else {
613            (parts.path.clone(), None)
614        };
615
616        // Required parameter: db_url
617        let db_url = params
618            .get("db_url")
619            .ok_or_else(|| CamelError::Config("db_url parameter is required".to_string()))?
620            .clone();
621
622        // Connection parameters - None when not set by URI param
623        let max_connections = params.get("maxConnections").and_then(|v| v.parse().ok());
624        let min_connections = params.get("minConnections").and_then(|v| v.parse().ok());
625        let idle_timeout_secs = params.get("idleTimeoutSecs").and_then(|v| v.parse().ok());
626        let max_lifetime_secs = params.get("maxLifetimeSecs").and_then(|v| v.parse().ok());
627
628        // Query parameters
629        let output_type = params
630            .get("outputType")
631            .map(|s| s.parse())
632            .transpose()?
633            .unwrap_or_default();
634        let placeholder = params
635            .get("placeholder")
636            .filter(|v| !v.is_empty())
637            .map(|v| {
638                if v.chars().count() != 1 {
639                    return Err(CamelError::InvalidUri(format!(
640                        "placeholder must be exactly one character, got '{}'",
641                        v
642                    )));
643                }
644                if !v.is_ascii() {
645                    return Err(CamelError::InvalidUri(
646                        "placeholder must be a single ASCII character".to_string(),
647                    ));
648                }
649                Ok(v.chars().next().unwrap()) // allow-unwrap
650            })
651            .transpose()?
652            .unwrap_or('#');
653        /// Parse a boolean URI parameter strictly.
654        ///
655        /// Accepts only `"true"` or `"false"` (case-insensitive). Any other value
656        /// returns `CamelError::InvalidUri` to prevent silent misconfiguration.
657        fn parse_bool_param(name: &str, value: &str) -> Result<bool, CamelError> {
658            if value.eq_ignore_ascii_case("true") {
659                Ok(true)
660            } else if value.eq_ignore_ascii_case("false") {
661                Ok(false)
662            } else {
663                Err(CamelError::InvalidUri(format!(
664                    "{} must be 'true' or 'false', got '{}'",
665                    name, value
666                )))
667            }
668        }
669
670        let use_placeholder = params
671            .get("usePlaceholder")
672            .map(|v| parse_bool_param("usePlaceholder", v))
673            .transpose()?
674            .unwrap_or(true);
675        let noop = params
676            .get("noop")
677            .map(|v| parse_bool_param("noop", v))
678            .transpose()?
679            .unwrap_or(false);
680        let in_separator = params
681            .get("inSeparator")
682            .map(|v| v.to_string())
683            .unwrap_or_else(|| ", ".to_string());
684        if in_separator.is_empty() {
685            return Err(CamelError::InvalidUri(
686                "inSeparator must not be empty".to_string(),
687            ));
688        }
689
690        // SQL-005: alwaysPopulateStatement
691        let always_populate_statement = params
692            .get("alwaysPopulateStatement")
693            .map(|v| parse_bool_param("alwaysPopulateStatement", v))
694            .transpose()?
695            .unwrap_or(false);
696
697        // SQL-011: allowNamedParameters
698        let allow_named_parameters = params
699            .get("allowNamedParameters")
700            .map(|v| parse_bool_param("allowNamedParameters", v))
701            .transpose()?
702            .unwrap_or(true);
703
704        // SQL-016: fetchSize
705        let fetch_size = params.get("fetchSize").and_then(|v| v.parse().ok());
706
707        // SQL-002: transactionMode
708        let transaction_mode = params
709            .get("transactionMode")
710            .map(|s| s.parse())
711            .transpose()?
712            .unwrap_or_default();
713
714        // Consumer parameters
715        let delay_ms = params
716            .get("delay")
717            .and_then(|v| v.parse().ok())
718            .unwrap_or(500);
719        let initial_delay_ms = params
720            .get("initialDelay")
721            .and_then(|v| v.parse().ok())
722            .unwrap_or(1000);
723        let max_messages_per_poll = params
724            .get("maxMessagesPerPoll")
725            .and_then(|v| v.parse().ok());
726        let on_consume = params.get("onConsume").cloned();
727        let on_consume_failed = params.get("onConsumeFailed").cloned();
728        let on_consume_batch_complete = params.get("onConsumeBatchComplete").cloned();
729        let route_empty_result_set = params
730            .get("routeEmptyResultSet")
731            .map(|v| parse_bool_param("routeEmptyResultSet", v))
732            .transpose()?
733            .unwrap_or(false);
734        let use_iterator = params
735            .get("useIterator")
736            .map(|v| parse_bool_param("useIterator", v))
737            .transpose()?
738            .unwrap_or(true);
739        let expected_update_count = params
740            .get("expectedUpdateCount")
741            .and_then(|v| v.parse().ok());
742        let break_batch_on_consume_fail = params
743            .get("breakBatchOnConsumeFail")
744            .map(|v| parse_bool_param("breakBatchOnConsumeFail", v))
745            .transpose()?
746            .unwrap_or(false);
747        let bridge_error_handler = params
748            .get("bridgeErrorHandler")
749            .map(|v| parse_bool_param("bridgeErrorHandler", v))
750            .transpose()?
751            .unwrap_or(false);
752
753        // SQL-015: repeatCount
754        let repeat_count = params.get("repeatCount").and_then(|v| v.parse().ok());
755
756        // SQL-017: processingStrategy
757        let processing_strategy = params
758            .get("processingStrategy")
759            .map(|s| s.parse())
760            .transpose()?
761            .unwrap_or_default();
762
763        // SQL-018: pollStrategy
764        let poll_strategy = params
765            .get("pollStrategy")
766            .map(|s| s.parse())
767            .transpose()?
768            .unwrap_or_default();
769
770        // Producer parameters
771        let batch = params
772            .get("batch")
773            .map(|v| parse_bool_param("batch", v))
774            .transpose()?
775            .unwrap_or(false);
776        let use_message_body_for_sql = params
777            .get("useMessageBodyForSql")
778            .map(|v| parse_bool_param("useMessageBodyForSql", v))
779            .transpose()?
780            .unwrap_or(false);
781        let ssl_mode = params.get("sslMode").cloned();
782        let ssl_root_cert = params.get("sslRootCert").cloned();
783        let ssl_cert = params.get("sslCert").cloned();
784        let ssl_key = params.get("sslKey").cloned();
785
786        Ok(Self {
787            db_url,
788            max_connections,
789            min_connections,
790            idle_timeout_secs,
791            max_lifetime_secs,
792            query,
793            source_path,
794            output_type,
795            placeholder,
796            use_placeholder,
797            noop,
798            in_separator,
799            always_populate_statement,
800            allow_named_parameters,
801            fetch_size,
802            transaction_mode,
803            delay_ms,
804            initial_delay_ms,
805            max_messages_per_poll,
806            on_consume,
807            on_consume_failed,
808            on_consume_batch_complete,
809            route_empty_result_set,
810            use_iterator,
811            expected_update_count,
812            break_batch_on_consume_fail,
813            bridge_error_handler,
814            repeat_count,
815            processing_strategy,
816            poll_strategy,
817            batch,
818            use_message_body_for_sql,
819            ssl_mode,
820            ssl_root_cert,
821            ssl_cert,
822            ssl_key,
823        })
824    }
825}
826
827#[cfg(test)]
828mod tests {
829    use super::*;
830
831    #[test]
832    fn config_defaults() {
833        let mut c =
834            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
835        c.resolve_defaults();
836        assert_eq!(c.query, "select 1");
837        assert_eq!(c.db_url, "postgres://localhost/test");
838        assert_eq!(c.max_connections, Some(5));
839        assert_eq!(c.min_connections, Some(1));
840        assert_eq!(c.idle_timeout_secs, Some(300));
841        assert_eq!(c.max_lifetime_secs, Some(1800));
842        assert_eq!(c.output_type, SqlOutputType::SelectList);
843        assert_eq!(c.placeholder, '#');
844        assert!(!c.noop);
845        assert_eq!(c.in_separator, ", ");
846        assert_eq!(c.delay_ms, 500);
847        assert_eq!(c.initial_delay_ms, 1000);
848        assert!(c.max_messages_per_poll.is_none());
849        assert!(c.on_consume.is_none());
850        assert!(c.on_consume_failed.is_none());
851        assert!(c.on_consume_batch_complete.is_none());
852        assert!(!c.route_empty_result_set);
853        assert!(c.use_iterator);
854        assert!(c.expected_update_count.is_none());
855        assert!(!c.break_batch_on_consume_fail);
856        assert!(!c.batch);
857        assert!(!c.use_message_body_for_sql);
858        assert!(c.ssl_mode.is_none());
859        assert!(c.ssl_root_cert.is_none());
860        assert!(c.ssl_cert.is_none());
861        assert!(c.ssl_key.is_none());
862        // SQL-005/SQL-011/SQL-016/SQL-002/SQL-015/SQL-017/SQL-018 defaults
863        assert!(!c.always_populate_statement);
864        assert!(c.allow_named_parameters);
865        assert!(c.fetch_size.is_none());
866        assert_eq!(c.transaction_mode, TransactionMode::Auto);
867        assert!(c.repeat_count.is_none());
868        assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
869        assert_eq!(c.poll_strategy, PollStrategy::Sequential);
870    }
871
872    #[test]
873    fn ssl_none_by_default() {
874        let c =
875            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
876        assert!(c.ssl_mode.is_none());
877        assert!(c.ssl_root_cert.is_none());
878        assert!(c.ssl_cert.is_none());
879        assert!(c.ssl_key.is_none());
880    }
881
882    #[test]
883    fn ssl_mode_from_uri() {
884        let c = SqlEndpointConfig::from_uri(
885            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
886        )
887        .unwrap();
888        assert_eq!(c.ssl_mode, Some("require".to_string()));
889        assert!(c.ssl_root_cert.is_none());
890    }
891
892    #[test]
893    fn ssl_all_params_from_uri() {
894        let c = SqlEndpointConfig::from_uri(
895            "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
896        )
897        .unwrap();
898        assert_eq!(c.ssl_mode, Some("require".to_string()));
899        assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
900        assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
901        assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
902    }
903
904    #[test]
905    fn ssl_global_applied_to_endpoint() {
906        let mut c =
907            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
908        let global = SqlGlobalConfig::default()
909            .with_ssl_mode("require")
910            .with_ssl_root_cert("/etc/ssl/ca.pem");
911        c.apply_defaults(&global);
912        assert_eq!(c.ssl_mode, Some("require".to_string()));
913        assert_eq!(c.ssl_root_cert, Some("/etc/ssl/ca.pem".to_string()));
914        assert!(c.ssl_cert.is_none());
915        assert!(c.ssl_key.is_none());
916    }
917
918    #[test]
919    fn ssl_uri_overrides_global() {
920        let mut c = SqlEndpointConfig::from_uri(
921            "sql:select 1?db_url=postgres://localhost/test&sslMode=verify-full",
922        )
923        .unwrap();
924        let global = SqlGlobalConfig::default().with_ssl_mode("require");
925        c.apply_defaults(&global);
926        assert_eq!(c.ssl_mode, Some("verify-full".to_string()));
927    }
928
929    #[test]
930    fn config_wrong_scheme() {
931        assert!(SqlEndpointConfig::from_uri("redis://localhost:6379").is_err());
932    }
933
934    #[test]
935    fn config_missing_db_url() {
936        assert!(SqlEndpointConfig::from_uri("sql:select 1").is_err());
937    }
938
939    #[test]
940    fn config_output_type_select_one() {
941        let c = SqlEndpointConfig::from_uri(
942            "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
943        )
944        .unwrap();
945        assert_eq!(c.output_type, SqlOutputType::SelectOne);
946    }
947
948    #[test]
949    fn config_output_type_stream_list() {
950        let c = SqlEndpointConfig::from_uri(
951            "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
952        )
953        .unwrap();
954        assert_eq!(c.output_type, SqlOutputType::StreamList);
955    }
956
957    #[test]
958    fn in_separator_default() {
959        let c =
960            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
961        assert_eq!(c.in_separator, ", ");
962    }
963
964    #[test]
965    fn in_separator_from_uri() {
966        let c = SqlEndpointConfig::from_uri(
967            "sql:select 1?db_url=postgres://localhost/test&inSeparator=;",
968        )
969        .unwrap();
970        assert_eq!(c.in_separator, ";");
971    }
972
973    #[test]
974    fn in_separator_empty_rejected() {
975        let result = SqlEndpointConfig::from_uri(
976            "sql:select 1?db_url=postgres://localhost/test&inSeparator=",
977        );
978        assert!(result.is_err());
979        let msg = format!("{:?}", result.unwrap_err());
980        assert!(msg.contains("inSeparator") || msg.contains("empty"));
981    }
982
983    #[test]
984    fn config_consumer_options() {
985        let c = SqlEndpointConfig::from_uri(
986            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
987        ).unwrap();
988        assert_eq!(c.delay_ms, 2000);
989        assert_eq!(c.initial_delay_ms, 500);
990        assert_eq!(c.max_messages_per_poll, Some(10));
991        assert_eq!(
992            c.on_consume,
993            Some("update t set done=true where id=:#id".to_string())
994        );
995        assert_eq!(
996            c.on_consume_failed,
997            Some("update t set failed=true where id=:#id".to_string())
998        );
999        assert_eq!(
1000            c.on_consume_batch_complete,
1001            Some("delete from t where done=true".to_string())
1002        );
1003        assert!(c.route_empty_result_set);
1004        assert!(!c.use_iterator);
1005        assert_eq!(c.expected_update_count, Some(1));
1006        assert!(c.break_batch_on_consume_fail);
1007        assert!(!c.bridge_error_handler);
1008    }
1009
1010    #[test]
1011    fn config_producer_options() {
1012        let c = SqlEndpointConfig::from_uri(
1013            "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
1014        ).unwrap();
1015        assert!(c.batch);
1016        assert!(c.use_message_body_for_sql);
1017        assert!(c.noop);
1018    }
1019
1020    #[test]
1021    fn config_pool_options() {
1022        let c = SqlEndpointConfig::from_uri(
1023            "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
1024        ).unwrap();
1025        assert_eq!(c.max_connections, Some(20));
1026        assert_eq!(c.min_connections, Some(3));
1027        assert_eq!(c.idle_timeout_secs, Some(600));
1028        assert_eq!(c.max_lifetime_secs, Some(3600));
1029    }
1030
1031    #[test]
1032    fn config_query_with_special_chars() {
1033        let c = SqlEndpointConfig::from_uri(
1034            "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
1035        )
1036        .unwrap();
1037        assert_eq!(
1038            c.query,
1039            "select * from users where name = :#name and age > #"
1040        );
1041    }
1042
1043    #[test]
1044    fn output_type_from_str() {
1045        assert_eq!(
1046            "SelectList".parse::<SqlOutputType>().unwrap(),
1047            SqlOutputType::SelectList
1048        );
1049        assert_eq!(
1050            "SelectOne".parse::<SqlOutputType>().unwrap(),
1051            SqlOutputType::SelectOne
1052        );
1053        assert_eq!(
1054            "StreamList".parse::<SqlOutputType>().unwrap(),
1055            SqlOutputType::StreamList
1056        );
1057        assert!("Invalid".parse::<SqlOutputType>().is_err());
1058    }
1059
1060    // SQL-014: file-not-found is now detected during async resolve_file_query(), not from_uri
1061    #[tokio::test]
1062    async fn config_file_not_found() {
1063        let mut config = SqlEndpointConfig::from_uri(
1064            "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
1065        )
1066        .expect("from_uri should defer file reading");
1067        // from_uri no longer reads the file — source_path is set, query is empty
1068        assert_eq!(
1069            config.source_path,
1070            Some("/nonexistent/path/query.sql".to_string())
1071        );
1072        assert!(config.query.is_empty());
1073
1074        // Error occurs during async resolution
1075        let result = config.resolve_file_query().await;
1076        assert!(result.is_err());
1077        let msg = format!("{:?}", result.unwrap_err());
1078        assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
1079    }
1080
1081    // SQL-014: file query is now resolved asynchronously
1082    #[tokio::test]
1083    async fn config_file_query() {
1084        use std::io::Write;
1085        let unique_name = format!(
1086            "test_sql_query_{}.sql",
1087            std::time::SystemTime::now()
1088                .duration_since(std::time::UNIX_EPOCH)
1089                .unwrap_or_default()
1090                .as_nanos()
1091        );
1092        let mut tmp = std::env::temp_dir();
1093        tmp.push(unique_name);
1094        {
1095            let mut f = std::fs::File::create(&tmp).unwrap();
1096            writeln!(f, "SELECT * FROM users").unwrap();
1097        }
1098        let uri = format!(
1099            "sql:file:{}?db_url=postgres://localhost/test",
1100            tmp.display()
1101        );
1102        let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1103        // query is empty until async resolution
1104        assert!(c.query.is_empty());
1105        assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1106
1107        // Resolve asynchronously
1108        c.resolve_file_query()
1109            .await
1110            .expect("file query should resolve");
1111        assert_eq!(c.query, "SELECT * FROM users");
1112        std::fs::remove_file(&tmp).ok();
1113    }
1114
1115    // New tests for config contract
1116    #[test]
1117    fn pool_fields_none_when_not_set() {
1118        let c =
1119            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1120        assert_eq!(c.max_connections, None);
1121        assert_eq!(c.min_connections, None);
1122        assert_eq!(c.idle_timeout_secs, None);
1123        assert_eq!(c.max_lifetime_secs, None);
1124    }
1125
1126    #[test]
1127    fn apply_defaults_fills_none() {
1128        let mut c =
1129            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1130        let global = SqlGlobalConfig {
1131            max_connections: 10,
1132            min_connections: 2,
1133            idle_timeout_secs: 600,
1134            max_lifetime_secs: 3600,
1135            ssl_mode: None,
1136            ssl_root_cert: None,
1137            ssl_cert: None,
1138            ssl_key: None,
1139        };
1140        c.apply_defaults(&global);
1141        assert_eq!(c.max_connections, Some(10));
1142        assert_eq!(c.min_connections, Some(2));
1143        assert_eq!(c.idle_timeout_secs, Some(600));
1144        assert_eq!(c.max_lifetime_secs, Some(3600));
1145        assert!(c.ssl_mode.is_none());
1146        assert!(c.ssl_root_cert.is_none());
1147        assert!(c.ssl_cert.is_none());
1148        assert!(c.ssl_key.is_none());
1149    }
1150
1151    #[test]
1152    fn apply_defaults_does_not_override() {
1153        let mut c = SqlEndpointConfig::from_uri(
1154            "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=5",
1155        )
1156        .unwrap();
1157        let global = SqlGlobalConfig {
1158            max_connections: 10,
1159            min_connections: 2,
1160            idle_timeout_secs: 600,
1161            max_lifetime_secs: 3600,
1162            ssl_mode: None,
1163            ssl_root_cert: None,
1164            ssl_cert: None,
1165            ssl_key: None,
1166        };
1167        c.apply_defaults(&global);
1168        // URI-set values should NOT be overridden
1169        assert_eq!(c.max_connections, Some(99));
1170        assert_eq!(c.min_connections, Some(5));
1171        // None fields should be filled from global
1172        assert_eq!(c.idle_timeout_secs, Some(600));
1173        assert_eq!(c.max_lifetime_secs, Some(3600));
1174    }
1175
1176    #[test]
1177    fn resolve_defaults_fills_remaining() {
1178        let mut c = SqlEndpointConfig::from_uri(
1179            "sql:select 1?db_url=postgres://localhost/test&maxConnections=7",
1180        )
1181        .unwrap();
1182        c.resolve_defaults();
1183        assert_eq!(c.max_connections, Some(7)); // from URI
1184        assert_eq!(c.min_connections, Some(1)); // from defaults
1185        assert_eq!(c.idle_timeout_secs, Some(300)); // from defaults
1186        assert_eq!(c.max_lifetime_secs, Some(1800)); // from defaults
1187    }
1188
1189    #[test]
1190    fn global_config_builder() {
1191        let c = SqlGlobalConfig::default()
1192            .with_max_connections(20)
1193            .with_min_connections(3)
1194            .with_idle_timeout_secs(600)
1195            .with_max_lifetime_secs(3600)
1196            .with_ssl_mode("require")
1197            .with_ssl_root_cert("/ca.pem")
1198            .with_ssl_cert("/cert.pem")
1199            .with_ssl_key("/key.pem");
1200        assert_eq!(c.max_connections, 20);
1201        assert_eq!(c.min_connections, 3);
1202        assert_eq!(c.idle_timeout_secs, 600);
1203        assert_eq!(c.max_lifetime_secs, 3600);
1204        assert_eq!(c.ssl_mode, Some("require".to_string()));
1205        assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
1206        assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
1207        assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
1208    }
1209
1210    #[test]
1211    fn enrich_postgres_ssl_mode() {
1212        let mut c = SqlEndpointConfig::from_uri(
1213            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1214        )
1215        .unwrap();
1216        c.resolve_defaults();
1217        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1218        assert!(url.contains("sslmode=require"), "got: {}", url);
1219    }
1220
1221    #[test]
1222    fn enrich_postgres_all_ssl() {
1223        let mut c = SqlEndpointConfig::from_uri(
1224            "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
1225        )
1226        .unwrap();
1227        c.resolve_defaults();
1228        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1229        assert!(url.contains("sslmode=require"), "got: {}", url);
1230        assert!(url.contains("sslrootcert="), "got: {}", url);
1231        assert!(url.contains("sslcert="), "got: {}", url);
1232        assert!(url.contains("sslkey="), "got: {}", url);
1233    }
1234
1235    #[test]
1236    fn enrich_mysql_ssl() {
1237        let mut c = SqlEndpointConfig::from_uri(
1238            "sql:select 1?db_url=mysql://localhost/test&sslMode=require",
1239        )
1240        .unwrap();
1241        c.resolve_defaults();
1242        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1243        assert!(url.contains("ssl-mode=require"), "got: {}", url);
1244    }
1245
1246    #[test]
1247    fn enrich_existing_query_params() {
1248        let mut c = SqlEndpointConfig::from_uri(
1249            "sql:select 1?db_url=postgres://localhost/test?existing=1&sslMode=require",
1250        )
1251        .unwrap();
1252        c.resolve_defaults();
1253        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1254        assert!(url.contains("existing=1"), "got: {}", url);
1255        assert!(url.contains("sslmode=require"), "got: {}", url);
1256    }
1257
1258    #[test]
1259    fn enrich_override_existing() {
1260        let mut c = SqlEndpointConfig::from_uri(
1261            "sql:select 1?db_url=postgres://localhost/test?sslmode=allow&sslMode=require",
1262        )
1263        .unwrap();
1264        c.resolve_defaults();
1265        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1266        assert!(url.contains("sslmode=require"), "got: {}", url);
1267        assert!(!url.contains("sslmode=allow"), "got: {}", url);
1268    }
1269
1270    #[test]
1271    fn enrich_no_params() {
1272        let mut c =
1273            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1274        c.resolve_defaults();
1275        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1276        assert_eq!(url, "postgres://localhost/test");
1277    }
1278
1279    #[test]
1280    fn enrich_url_encodes_paths() {
1281        let mut c = SqlEndpointConfig::from_uri(
1282            "sql:select 1?db_url=postgres://localhost/test&sslRootCert=/path/to/my%20cert.pem",
1283        )
1284        .unwrap();
1285        c.resolve_defaults();
1286        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1287        assert!(url.contains("sslrootcert="), "got: {}", url);
1288    }
1289
1290    #[test]
1291    fn enrich_unsupported_scheme_returns_unchanged() {
1292        let mut c = SqlEndpointConfig::from_uri(
1293            "sql:select 1?db_url=sqlite://localhost/test.db&sslMode=require",
1294        )
1295        .unwrap();
1296        c.resolve_defaults();
1297        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1298        assert_eq!(url, "sqlite://localhost/test.db");
1299    }
1300
1301    #[test]
1302    fn enrich_invalid_url_returns_error() {
1303        let mut c = SqlEndpointConfig::from_uri(
1304            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1305        )
1306        .unwrap();
1307        c.resolve_defaults();
1308        let result = enrich_db_url_with_ssl("://not-a-valid-url", &c);
1309        assert!(result.is_err());
1310    }
1311
1312    // --- Phase B hardening tests ---
1313
1314    // SQL-010: Debug output redacts credentials
1315    #[test]
1316    fn debug_redacts_db_url_with_password() {
1317        let c = SqlEndpointConfig::from_uri(
1318            "sql:select 1?db_url=postgres://user:secret123@localhost/test",
1319        )
1320        .unwrap();
1321        let debug_output = format!("{:?}", c);
1322        assert!(
1323            !debug_output.contains("secret123"),
1324            "Debug output must not contain password: {}",
1325            debug_output
1326        );
1327        assert!(
1328            debug_output.contains("***"),
1329            "Debug output must contain redacted marker: {}",
1330            debug_output
1331        );
1332    }
1333
1334    #[test]
1335    fn debug_redacts_ssl_key() {
1336        let c = SqlEndpointConfig::from_uri(
1337            "sql:select 1?db_url=postgres://localhost/test&sslKey=/secret/key.pem",
1338        )
1339        .unwrap();
1340        let debug_output = format!("{:?}", c);
1341        assert!(
1342            !debug_output.contains("/secret/key.pem"),
1343            "Debug output must not contain ssl_key path: {}",
1344            debug_output
1345        );
1346    }
1347
1348    #[test]
1349    fn debug_global_config_redacts_ssl_key() {
1350        let c = SqlGlobalConfig::default().with_ssl_key("/secret/key.pem");
1351        let debug_output = format!("{:?}", c);
1352        assert!(
1353            !debug_output.contains("/secret/key.pem"),
1354            "Debug output must not contain ssl_key path: {}",
1355            debug_output
1356        );
1357        assert!(
1358            debug_output.contains("***"),
1359            "Debug output must contain redacted marker: {}",
1360            debug_output
1361        );
1362    }
1363
1364    #[test]
1365    fn redact_db_url_with_credentials() {
1366        assert_eq!(
1367            redact_db_url("postgres://user:pass@host/db"),
1368            "postgres://***:***@host/db"
1369        );
1370    }
1371
1372    #[test]
1373    fn redact_db_url_without_credentials() {
1374        assert_eq!(redact_db_url("sqlite::memory:"), "sqlite::memory:");
1375    }
1376
1377    #[test]
1378    fn redact_db_url_invalid_returns_original() {
1379        assert_eq!(redact_db_url("not-a-url"), "not-a-url");
1380    }
1381
1382    // SQL-004: usePlaceholder parsing
1383    #[test]
1384    fn use_placeholder_defaults_to_true() {
1385        let c =
1386            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1387        assert!(c.use_placeholder);
1388    }
1389
1390    #[test]
1391    fn use_placeholder_false_from_uri() {
1392        let c = SqlEndpointConfig::from_uri(
1393            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=false",
1394        )
1395        .unwrap();
1396        assert!(!c.use_placeholder);
1397    }
1398
1399    #[test]
1400    fn use_placeholder_true_from_uri() {
1401        let c = SqlEndpointConfig::from_uri(
1402            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=true",
1403        )
1404        .unwrap();
1405        assert!(c.use_placeholder);
1406    }
1407
1408    // SQL-004: strict boolean parsing — invalid values rejected
1409    #[test]
1410    fn use_placeholder_rejects_invalid_value() {
1411        let result = SqlEndpointConfig::from_uri(
1412            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=1",
1413        );
1414        assert!(result.is_err());
1415        let msg = format!("{:?}", result.unwrap_err());
1416        assert!(msg.contains("usePlaceholder") && msg.contains("true") && msg.contains("false"));
1417    }
1418
1419    #[test]
1420    fn use_placeholder_rejects_typo_tru() {
1421        let result = SqlEndpointConfig::from_uri(
1422            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=tru",
1423        );
1424        assert!(result.is_err());
1425    }
1426
1427    #[test]
1428    fn use_placeholder_rejects_yes() {
1429        let result = SqlEndpointConfig::from_uri(
1430            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=yes",
1431        );
1432        assert!(result.is_err());
1433    }
1434
1435    #[test]
1436    fn noop_rejects_invalid_value() {
1437        let result =
1438            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&noop=1");
1439        assert!(result.is_err());
1440        let msg = format!("{:?}", result.unwrap_err());
1441        assert!(msg.contains("noop"));
1442    }
1443
1444    #[test]
1445    fn batch_rejects_invalid_value() {
1446        let result =
1447            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&batch=yes");
1448        assert!(result.is_err());
1449        let msg = format!("{:?}", result.unwrap_err());
1450        assert!(msg.contains("batch"));
1451    }
1452
1453    #[test]
1454    fn route_empty_result_set_rejects_invalid_value() {
1455        let result = SqlEndpointConfig::from_uri(
1456            "sql:select 1?db_url=postgres://localhost/test&routeEmptyResultSet=on",
1457        );
1458        assert!(result.is_err());
1459    }
1460
1461    #[test]
1462    fn use_iterator_rejects_invalid_value() {
1463        let result = SqlEndpointConfig::from_uri(
1464            "sql:select 1?db_url=postgres://localhost/test&useIterator=1",
1465        );
1466        assert!(result.is_err());
1467    }
1468
1469    #[test]
1470    fn break_batch_on_consume_fail_rejects_invalid_value() {
1471        let result = SqlEndpointConfig::from_uri(
1472            "sql:select 1?db_url=postgres://localhost/test&breakBatchOnConsumeFail=yes",
1473        );
1474        assert!(result.is_err());
1475    }
1476
1477    #[test]
1478    fn use_message_body_for_sql_rejects_invalid_value() {
1479        let result = SqlEndpointConfig::from_uri(
1480            "sql:select 1?db_url=postgres://localhost/test&useMessageBodyForSql=1",
1481        );
1482        assert!(result.is_err());
1483    }
1484
1485    // Case-insensitive true/false still works
1486    #[test]
1487    fn boolean_params_case_insensitive() {
1488        let c = SqlEndpointConfig::from_uri(
1489            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=TRUE&noop=FALSE&batch=True&useIterator=False&bridgeErrorHandler=TRUE",
1490        )
1491        .unwrap();
1492        assert!(c.use_placeholder);
1493        assert!(!c.noop);
1494        assert!(c.batch);
1495        assert!(!c.use_iterator);
1496        assert!(c.bridge_error_handler);
1497    }
1498
1499    // SQL-022: multi-char placeholder rejected
1500    #[test]
1501    fn multi_char_placeholder_rejected() {
1502        let result = SqlEndpointConfig::from_uri(
1503            "sql:select 1?db_url=postgres://localhost/test&placeholder=##",
1504        );
1505        assert!(result.is_err());
1506        let msg = format!("{:?}", result.unwrap_err());
1507        assert!(msg.contains("placeholder") && msg.contains("one character"));
1508    }
1509
1510    #[test]
1511    fn non_ascii_placeholder_rejected() {
1512        let result = SqlEndpointConfig::from_uri(
1513            "sql:select 1?db_url=postgres://localhost/test&placeholder=%C2%A2",
1514        );
1515        assert!(result.is_err());
1516    }
1517
1518    #[test]
1519    fn single_char_placeholder_accepted() {
1520        let c = SqlEndpointConfig::from_uri(
1521            "sql:select 1?db_url=postgres://localhost/test&placeholder=$",
1522        )
1523        .unwrap();
1524        assert_eq!(c.placeholder, '$');
1525    }
1526
1527    #[test]
1528    fn empty_placeholder_falls_back_to_default() {
1529        // Empty string is filtered out by the original logic — falls back to '#'
1530        let c = SqlEndpointConfig::from_uri(
1531            "sql:select 1?db_url=postgres://localhost/test&placeholder=",
1532        )
1533        .unwrap();
1534        assert_eq!(c.placeholder, '#');
1535    }
1536
1537    // SQL-014: file-based SQL config test (verifies async resolution and caching)
1538    #[tokio::test]
1539    async fn file_query_cached_in_config() {
1540        use std::io::Write;
1541        let unique_name = format!(
1542            "test_sql_cached_{}.sql",
1543            std::time::SystemTime::now()
1544                .duration_since(std::time::UNIX_EPOCH)
1545                .unwrap_or_default()
1546                .as_nanos()
1547        );
1548        let mut tmp = std::env::temp_dir();
1549        tmp.push(unique_name);
1550        {
1551            let mut f = std::fs::File::create(&tmp).unwrap();
1552            writeln!(f, "SELECT * FROM cached_test").unwrap();
1553        }
1554        let uri = format!(
1555            "sql:file:{}?db_url=postgres://localhost/test",
1556            tmp.display()
1557        );
1558        let mut c = SqlEndpointConfig::from_uri(&uri).unwrap();
1559        // Query is empty before async resolution
1560        assert!(c.query.is_empty());
1561        assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1562
1563        // Resolve asynchronously — query is cached in config
1564        c.resolve_file_query()
1565            .await
1566            .expect("resolve should succeed");
1567        assert_eq!(c.query, "SELECT * FROM cached_test");
1568
1569        // Delete the file — config still has the query
1570        std::fs::remove_file(&tmp).ok();
1571        assert_eq!(c.query, "SELECT * FROM cached_test");
1572    }
1573
1574    // --- H-03 audit sweep tests ---
1575
1576    // SQL-005: alwaysPopulateStatement
1577    #[test]
1578    fn always_populate_statement_defaults_to_false() {
1579        let c =
1580            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1581        assert!(!c.always_populate_statement);
1582    }
1583
1584    #[test]
1585    fn always_populate_statement_from_uri() {
1586        let c = SqlEndpointConfig::from_uri(
1587            "sql:select 1?db_url=postgres://localhost/test&alwaysPopulateStatement=true",
1588        )
1589        .unwrap();
1590        assert!(c.always_populate_statement);
1591    }
1592
1593    // SQL-011: allowNamedParameters
1594    #[test]
1595    fn allow_named_parameters_defaults_to_true() {
1596        let c =
1597            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1598        assert!(c.allow_named_parameters);
1599    }
1600
1601    #[test]
1602    fn allow_named_parameters_false_from_uri() {
1603        let c = SqlEndpointConfig::from_uri(
1604            "sql:select 1?db_url=postgres://localhost/test&allowNamedParameters=false",
1605        )
1606        .unwrap();
1607        assert!(!c.allow_named_parameters);
1608    }
1609
1610    // SQL-016: fetchSize
1611    #[test]
1612    fn fetch_size_defaults_to_none() {
1613        let c =
1614            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1615        assert!(c.fetch_size.is_none());
1616    }
1617
1618    #[test]
1619    fn fetch_size_from_uri() {
1620        let c = SqlEndpointConfig::from_uri(
1621            "sql:select 1?db_url=postgres://localhost/test&fetchSize=1000",
1622        )
1623        .unwrap();
1624        assert_eq!(c.fetch_size, Some(1000));
1625    }
1626
1627    // SQL-002: transactionMode
1628    #[test]
1629    fn transaction_mode_defaults_to_auto() {
1630        let c =
1631            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1632        assert_eq!(c.transaction_mode, TransactionMode::Auto);
1633    }
1634
1635    #[test]
1636    fn transaction_mode_managed_from_uri() {
1637        let c = SqlEndpointConfig::from_uri(
1638            "sql:select 1?db_url=postgres://localhost/test&transactionMode=Managed",
1639        )
1640        .unwrap();
1641        assert_eq!(c.transaction_mode, TransactionMode::Managed);
1642    }
1643
1644    #[test]
1645    fn transaction_mode_invalid_rejected() {
1646        let result = SqlEndpointConfig::from_uri(
1647            "sql:select 1?db_url=postgres://localhost/test&transactionMode=Invalid",
1648        );
1649        assert!(result.is_err());
1650    }
1651
1652    // SQL-015: repeatCount
1653    #[test]
1654    fn repeat_count_defaults_to_none() {
1655        let c =
1656            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1657        assert!(c.repeat_count.is_none());
1658    }
1659
1660    #[test]
1661    fn repeat_count_from_uri() {
1662        let c = SqlEndpointConfig::from_uri(
1663            "sql:select 1?db_url=postgres://localhost/test&repeatCount=10",
1664        )
1665        .unwrap();
1666        assert_eq!(c.repeat_count, Some(10));
1667    }
1668
1669    // SQL-017: processingStrategy
1670    #[test]
1671    fn processing_strategy_defaults_to_direct() {
1672        let c =
1673            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1674        assert_eq!(c.processing_strategy, ProcessingStrategy::Direct);
1675    }
1676
1677    #[test]
1678    fn processing_strategy_scheduled_from_uri() {
1679        let c = SqlEndpointConfig::from_uri(
1680            "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Scheduled",
1681        )
1682        .unwrap();
1683        assert_eq!(c.processing_strategy, ProcessingStrategy::Scheduled);
1684    }
1685
1686    #[test]
1687    fn processing_strategy_invalid_rejected() {
1688        let result = SqlEndpointConfig::from_uri(
1689            "sql:select 1?db_url=postgres://localhost/test&processingStrategy=Invalid",
1690        );
1691        assert!(result.is_err());
1692    }
1693
1694    // SQL-018: pollStrategy
1695    #[test]
1696    fn poll_strategy_defaults_to_sequential() {
1697        let c =
1698            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1699        assert_eq!(c.poll_strategy, PollStrategy::Sequential);
1700    }
1701
1702    #[test]
1703    fn poll_strategy_burst_from_uri() {
1704        let c = SqlEndpointConfig::from_uri(
1705            "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Burst",
1706        )
1707        .unwrap();
1708        assert_eq!(c.poll_strategy, PollStrategy::Burst);
1709    }
1710
1711    #[test]
1712    fn poll_strategy_invalid_rejected() {
1713        let result = SqlEndpointConfig::from_uri(
1714            "sql:select 1?db_url=postgres://localhost/test&pollStrategy=Invalid",
1715        );
1716        assert!(result.is_err());
1717    }
1718}