Skip to main content

camel_component_sql/
config.rs

1use std::str::FromStr;
2
3use camel_component_api::CamelError;
4use camel_component_api::{UriComponents, UriConfig, parse_uri};
5
6/// Redaction helper: returns `Some("***")` if the option is `Some`, otherwise `None`.
7fn redacted_opt(opt: &Option<String>) -> Option<&'static str> {
8    if opt.is_some() { Some("***") } else { None }
9}
10
11/// Redacts the user:password portion of a database URL for safe display.
12/// Returns `"scheme://***@host/db"` for URLs with userinfo, or the original URL otherwise.
13pub fn redact_db_url(db_url: &str) -> String {
14    match url::Url::parse(db_url) {
15        Ok(mut parsed) => {
16            if parsed.username().is_empty() && parsed.password().is_none() {
17                return db_url.to_string();
18            }
19            let _ = parsed.set_username("***");
20            let _ = parsed.set_password(Some("***"));
21            parsed.to_string()
22        }
23        Err(_) => db_url.to_string(),
24    }
25}
26
27/// Output type for SQL query results.
28#[derive(Debug, Clone, PartialEq, Default)]
29pub enum SqlOutputType {
30    /// Return all rows as a list.
31    #[default]
32    SelectList,
33    /// Return a single row (first result).
34    SelectOne,
35    /// Stream results as an async iterator.
36    StreamList,
37}
38
39impl FromStr for SqlOutputType {
40    type Err = CamelError;
41
42    fn from_str(s: &str) -> Result<Self, Self::Err> {
43        match s {
44            "SelectList" => Ok(SqlOutputType::SelectList),
45            "SelectOne" => Ok(SqlOutputType::SelectOne),
46            "StreamList" => Ok(SqlOutputType::StreamList),
47            _ => Err(CamelError::InvalidUri(format!(
48                "Unknown output type: {}",
49                s
50            ))),
51        }
52    }
53}
54
55/// Global configuration for SQL component.
56///
57/// This struct supports serde deserialization with defaults and builder methods.
58/// It holds pool configuration that can be applied as defaults to endpoints.
59///
60/// **Security note:** `Debug` implementation redacts sensitive fields (SSL key paths).
61#[derive(Clone, PartialEq, serde::Deserialize)]
62#[serde(default)]
63pub struct SqlGlobalConfig {
64    pub max_connections: u32,
65    pub min_connections: u32,
66    pub idle_timeout_secs: u64,
67    pub max_lifetime_secs: u64,
68    // SSL/TLS
69    pub ssl_mode: Option<String>,
70    pub ssl_root_cert: Option<String>,
71    pub ssl_cert: Option<String>,
72    pub ssl_key: Option<String>,
73}
74
75impl std::fmt::Debug for SqlGlobalConfig {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("SqlGlobalConfig")
78            .field("max_connections", &self.max_connections)
79            .field("min_connections", &self.min_connections)
80            .field("idle_timeout_secs", &self.idle_timeout_secs)
81            .field("max_lifetime_secs", &self.max_lifetime_secs)
82            .field("ssl_mode", &self.ssl_mode)
83            .field("ssl_root_cert", &self.ssl_root_cert)
84            .field("ssl_cert", &self.ssl_cert)
85            .field("ssl_key", &redacted_opt(&self.ssl_key))
86            .finish()
87    }
88}
89
90impl Default for SqlGlobalConfig {
91    fn default() -> Self {
92        Self {
93            max_connections: 5,
94            min_connections: 1,
95            idle_timeout_secs: 300,
96            max_lifetime_secs: 1800,
97            ssl_mode: None,
98            ssl_root_cert: None,
99            ssl_cert: None,
100            ssl_key: None,
101        }
102    }
103}
104
105impl SqlGlobalConfig {
106    pub fn new() -> Self {
107        Self::default()
108    }
109
110    pub fn with_max_connections(mut self, value: u32) -> Self {
111        self.max_connections = value;
112        self
113    }
114
115    pub fn with_min_connections(mut self, value: u32) -> Self {
116        self.min_connections = value;
117        self
118    }
119
120    pub fn with_idle_timeout_secs(mut self, value: u64) -> Self {
121        self.idle_timeout_secs = value;
122        self
123    }
124
125    pub fn with_max_lifetime_secs(mut self, value: u64) -> Self {
126        self.max_lifetime_secs = value;
127        self
128    }
129
130    pub fn with_ssl_mode(mut self, value: impl Into<String>) -> Self {
131        self.ssl_mode = Some(value.into());
132        self
133    }
134
135    pub fn with_ssl_root_cert(mut self, value: impl Into<String>) -> Self {
136        self.ssl_root_cert = Some(value.into());
137        self
138    }
139
140    pub fn with_ssl_cert(mut self, value: impl Into<String>) -> Self {
141        self.ssl_cert = Some(value.into());
142        self
143    }
144
145    pub fn with_ssl_key(mut self, value: impl Into<String>) -> Self {
146        self.ssl_key = Some(value.into());
147        self
148    }
149}
150
151/// Configuration for SQL component endpoints.
152///
153/// URI format: `sql:<query>?db_url=<url>&param1=val1&param2=val2`
154///
155/// The query can be inline SQL or a file reference with `file:` prefix:
156/// - `sql:SELECT * FROM users?db_url=...` - inline SQL
157/// - `sql:file:/path/to/query.sql?db_url=...` - read SQL from file
158///
159/// **Note on file-based queries (SQL-014):** When the query path starts with `file:`,
160/// the SQL is read synchronously via `std::fs::read_to_string` during endpoint creation.
161/// This is a blocking I/O call, but it occurs in the synchronous `from_uri` / `create_endpoint`
162/// path — not in the async hot path (producer `call()` or consumer `poll_database()`).
163/// The file content is cached in `config.query` after parsing, so no runtime file reads occur.
164///
165/// **Security note:** `Debug` implementation redacts the `db_url` (which may contain credentials)
166/// and `ssl_key` path. Use `redact_db_url()` for safe logging of database URLs.
167#[derive(Clone)]
168pub struct SqlEndpointConfig {
169    // Connection
170    /// Database connection URL (required).
171    pub db_url: String,
172    /// Maximum connections in the pool. None = use global default.
173    pub max_connections: Option<u32>,
174    /// Minimum connections in the pool. None = use global default.
175    pub min_connections: Option<u32>,
176    /// Idle timeout in seconds. None = use global default.
177    pub idle_timeout_secs: Option<u64>,
178    /// Maximum connection lifetime in seconds. None = use global default.
179    pub max_lifetime_secs: Option<u64>,
180
181    // Query
182    /// The SQL query (from URI path or file).
183    pub query: String,
184    /// Path to the file containing the SQL query (when using `file:` prefix).
185    pub source_path: Option<String>,
186    /// Output type for query results. Default: SelectList.
187    pub output_type: SqlOutputType,
188    /// Placeholder character for parameters. Default: '#'.
189    pub placeholder: char,
190    /// If true, process parameter placeholders in queries. Default: true.
191    /// When false, queries are executed as-is without template parsing.
192    pub use_placeholder: bool,
193    /// If true, don't execute the query (dry run). Default: false.
194    pub noop: bool,
195    /// Separator for IN clause expansion. Default: ", ".
196    pub in_separator: String,
197
198    // Consumer (polling)
199    /// Delay between polls in milliseconds. Default: 500.
200    pub delay_ms: u64,
201    /// Initial delay before first poll in milliseconds. Default: 1000.
202    pub initial_delay_ms: u64,
203    /// Maximum messages per poll.
204    pub max_messages_per_poll: Option<i32>,
205    /// SQL to execute after consuming each message.
206    pub on_consume: Option<String>,
207    /// SQL to execute if consumption fails.
208    pub on_consume_failed: Option<String>,
209    /// SQL to execute after consuming a batch.
210    pub on_consume_batch_complete: Option<String>,
211    /// Route empty result sets. Default: false.
212    pub route_empty_result_set: bool,
213    /// Use iterator for results. Default: true.
214    pub use_iterator: bool,
215    /// Expected number of rows affected.
216    pub expected_update_count: Option<i64>,
217    /// Break batch on consume failure. Default: false.
218    pub break_batch_on_consume_fail: bool,
219
220    // Producer
221    /// Enable batch mode. Default: false.
222    pub batch: bool,
223    /// Use message body for SQL. Default: false.
224    pub use_message_body_for_sql: bool,
225
226    // SSL/TLS
227    /// SSL mode for the connection. None = use global default.
228    pub ssl_mode: Option<String>,
229    /// Path to SSL root certificate. None = use global default.
230    pub ssl_root_cert: Option<String>,
231    /// Path to SSL client certificate. None = use global default.
232    pub ssl_cert: Option<String>,
233    /// Path to SSL client key. None = use global default.
234    pub ssl_key: Option<String>,
235}
236
237impl std::fmt::Debug for SqlEndpointConfig {
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        f.debug_struct("SqlEndpointConfig")
240            .field("db_url", &redact_db_url(&self.db_url))
241            .field("max_connections", &self.max_connections)
242            .field("min_connections", &self.min_connections)
243            .field("idle_timeout_secs", &self.idle_timeout_secs)
244            .field("max_lifetime_secs", &self.max_lifetime_secs)
245            .field("query", &self.query)
246            .field("source_path", &self.source_path)
247            .field("output_type", &self.output_type)
248            .field("placeholder", &self.placeholder)
249            .field("use_placeholder", &self.use_placeholder)
250            .field("noop", &self.noop)
251            .field("in_separator", &self.in_separator)
252            .field("delay_ms", &self.delay_ms)
253            .field("initial_delay_ms", &self.initial_delay_ms)
254            .field("max_messages_per_poll", &self.max_messages_per_poll)
255            .field("on_consume", &self.on_consume)
256            .field("on_consume_failed", &self.on_consume_failed)
257            .field("on_consume_batch_complete", &self.on_consume_batch_complete)
258            .field("route_empty_result_set", &self.route_empty_result_set)
259            .field("use_iterator", &self.use_iterator)
260            .field("expected_update_count", &self.expected_update_count)
261            .field(
262                "break_batch_on_consume_fail",
263                &self.break_batch_on_consume_fail,
264            )
265            .field("batch", &self.batch)
266            .field("use_message_body_for_sql", &self.use_message_body_for_sql)
267            .field("ssl_mode", &self.ssl_mode)
268            .field("ssl_root_cert", &self.ssl_root_cert)
269            .field("ssl_cert", &self.ssl_cert)
270            .field("ssl_key", &redacted_opt(&self.ssl_key))
271            .finish()
272    }
273}
274
275impl SqlEndpointConfig {
276    /// Apply defaults from global config, filling None fields without overriding.
277    pub fn apply_defaults(&mut self, defaults: &SqlGlobalConfig) {
278        if self.max_connections.is_none() {
279            self.max_connections = Some(defaults.max_connections);
280        }
281        if self.min_connections.is_none() {
282            self.min_connections = Some(defaults.min_connections);
283        }
284        if self.idle_timeout_secs.is_none() {
285            self.idle_timeout_secs = Some(defaults.idle_timeout_secs);
286        }
287        if self.max_lifetime_secs.is_none() {
288            self.max_lifetime_secs = Some(defaults.max_lifetime_secs);
289        }
290        if self.ssl_mode.is_none() {
291            self.ssl_mode = defaults.ssl_mode.clone();
292        }
293        if self.ssl_root_cert.is_none() {
294            self.ssl_root_cert = defaults.ssl_root_cert.clone();
295        }
296        if self.ssl_cert.is_none() {
297            self.ssl_cert = defaults.ssl_cert.clone();
298        }
299        if self.ssl_key.is_none() {
300            self.ssl_key = defaults.ssl_key.clone();
301        }
302    }
303
304    /// Resolve any remaining None fields with built-in defaults.
305    pub fn resolve_defaults(&mut self) {
306        let defaults = SqlGlobalConfig::default();
307        self.apply_defaults(&defaults);
308    }
309}
310
311struct SslParamMapping {
312    pg_key: &'static str,
313    mysql_key: &'static str,
314}
315
316const SSL_MAPPINGS: &[(&str, SslParamMapping)] = &[
317    (
318        "sslMode",
319        SslParamMapping {
320            pg_key: "sslmode",
321            mysql_key: "ssl-mode",
322        },
323    ),
324    (
325        "sslRootCert",
326        SslParamMapping {
327            pg_key: "sslrootcert",
328            mysql_key: "ssl-ca",
329        },
330    ),
331    (
332        "sslCert",
333        SslParamMapping {
334            pg_key: "sslcert",
335            mysql_key: "ssl-cert",
336        },
337    ),
338    (
339        "sslKey",
340        SslParamMapping {
341            pg_key: "sslkey",
342            mysql_key: "ssl-key",
343        },
344    ),
345];
346
347pub fn enrich_db_url_with_ssl(
348    db_url: &str,
349    config: &SqlEndpointConfig,
350) -> Result<String, CamelError> {
351    let ssl_params: Vec<(&str, &str)> = [
352        config.ssl_mode.as_deref().map(|v| ("sslMode", v)),
353        config.ssl_root_cert.as_deref().map(|v| ("sslRootCert", v)),
354        config.ssl_cert.as_deref().map(|v| ("sslCert", v)),
355        config.ssl_key.as_deref().map(|v| ("sslKey", v)),
356    ]
357    .into_iter()
358    .flatten()
359    .collect();
360
361    if ssl_params.is_empty() {
362        return Ok(db_url.to_string());
363    }
364
365    let mut parsed = url::Url::parse(db_url).map_err(|e| {
366        CamelError::InvalidUri(format!(
367            "Cannot parse database URL for SSL enrichment: {}",
368            e
369        ))
370    })?;
371
372    let scheme = parsed.scheme();
373    if scheme != "postgres" && scheme != "postgresql" && scheme != "mysql" {
374        return Ok(db_url.to_string());
375    }
376    let is_mysql = scheme == "mysql";
377
378    let mut query_pairs = parsed.query_pairs().collect::<Vec<_>>();
379    for (camel_name, value) in &ssl_params {
380        if let Some((_, mapping)) = SSL_MAPPINGS.iter().find(|(name, _)| *name == *camel_name) {
381            let driver_key = if is_mysql {
382                mapping.mysql_key
383            } else {
384                mapping.pg_key
385            };
386
387            if let Some(pos) = query_pairs.iter().position(|(k, _)| k == driver_key) {
388                query_pairs[pos].1 = (*value).into();
389            } else {
390                query_pairs.push((driver_key.into(), (*value).into()));
391            }
392        }
393    }
394
395    {
396        let mut serializer = url::form_urlencoded::Serializer::new(String::new());
397        for (k, v) in query_pairs {
398            serializer.append_pair(&k, &v);
399        }
400        parsed.set_query(Some(&serializer.finish()));
401    }
402
403    Ok(parsed.to_string())
404}
405
406impl UriConfig for SqlEndpointConfig {
407    fn scheme() -> &'static str {
408        "sql"
409    }
410
411    fn from_uri(uri: &str) -> Result<Self, CamelError> {
412        let parts = parse_uri(uri)?;
413        Self::from_components(parts)
414    }
415
416    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
417        // Validate scheme
418        if parts.scheme != Self::scheme() {
419            return Err(CamelError::InvalidUri(format!(
420                "expected scheme '{}' but got '{}'",
421                Self::scheme(),
422                parts.scheme
423            )));
424        }
425
426        let params = &parts.params;
427
428        // Handle file: prefix for query
429        let (query, source_path) = if parts.path.starts_with("file:") {
430            let file_path = parts.path.trim_start_matches("file:").to_string();
431            let contents = std::fs::read_to_string(&file_path).map_err(|e| {
432                CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
433            })?;
434            (contents.trim().to_string(), Some(file_path))
435        } else {
436            (parts.path.clone(), None)
437        };
438
439        // Required parameter: db_url
440        let db_url = params
441            .get("db_url")
442            .ok_or_else(|| CamelError::Config("db_url parameter is required".to_string()))?
443            .clone();
444
445        // Connection parameters - None when not set by URI param
446        let max_connections = params.get("maxConnections").and_then(|v| v.parse().ok());
447        let min_connections = params.get("minConnections").and_then(|v| v.parse().ok());
448        let idle_timeout_secs = params.get("idleTimeoutSecs").and_then(|v| v.parse().ok());
449        let max_lifetime_secs = params.get("maxLifetimeSecs").and_then(|v| v.parse().ok());
450
451        // Query parameters
452        let output_type = params
453            .get("outputType")
454            .map(|s| s.parse())
455            .transpose()?
456            .unwrap_or_default();
457        let placeholder = params
458            .get("placeholder")
459            .filter(|v| !v.is_empty())
460            .map(|v| {
461                if v.chars().count() != 1 {
462                    return Err(CamelError::InvalidUri(format!(
463                        "placeholder must be exactly one character, got '{}'",
464                        v
465                    )));
466                }
467                Ok(v.chars().next().unwrap()) // allow-unwrap
468            })
469            .transpose()?
470            .unwrap_or('#');
471        /// Parse a boolean URI parameter strictly.
472        ///
473        /// Accepts only `"true"` or `"false"` (case-insensitive). Any other value
474        /// returns `CamelError::InvalidUri` to prevent silent misconfiguration.
475        fn parse_bool_param(name: &str, value: &str) -> Result<bool, CamelError> {
476            if value.eq_ignore_ascii_case("true") {
477                Ok(true)
478            } else if value.eq_ignore_ascii_case("false") {
479                Ok(false)
480            } else {
481                Err(CamelError::InvalidUri(format!(
482                    "{} must be 'true' or 'false', got '{}'",
483                    name, value
484                )))
485            }
486        }
487
488        let use_placeholder = params
489            .get("usePlaceholder")
490            .map(|v| parse_bool_param("usePlaceholder", v))
491            .transpose()?
492            .unwrap_or(true);
493        let noop = params
494            .get("noop")
495            .map(|v| parse_bool_param("noop", v))
496            .transpose()?
497            .unwrap_or(false);
498        let in_separator = params
499            .get("inSeparator")
500            .map(|v| v.to_string())
501            .unwrap_or_else(|| ", ".to_string());
502        if in_separator.is_empty() {
503            return Err(CamelError::InvalidUri(
504                "inSeparator must not be empty".to_string(),
505            ));
506        }
507
508        // Consumer parameters
509        let delay_ms = params
510            .get("delay")
511            .and_then(|v| v.parse().ok())
512            .unwrap_or(500);
513        let initial_delay_ms = params
514            .get("initialDelay")
515            .and_then(|v| v.parse().ok())
516            .unwrap_or(1000);
517        let max_messages_per_poll = params
518            .get("maxMessagesPerPoll")
519            .and_then(|v| v.parse().ok());
520        let on_consume = params.get("onConsume").cloned();
521        let on_consume_failed = params.get("onConsumeFailed").cloned();
522        let on_consume_batch_complete = params.get("onConsumeBatchComplete").cloned();
523        let route_empty_result_set = params
524            .get("routeEmptyResultSet")
525            .map(|v| parse_bool_param("routeEmptyResultSet", v))
526            .transpose()?
527            .unwrap_or(false);
528        let use_iterator = params
529            .get("useIterator")
530            .map(|v| parse_bool_param("useIterator", v))
531            .transpose()?
532            .unwrap_or(true);
533        let expected_update_count = params
534            .get("expectedUpdateCount")
535            .and_then(|v| v.parse().ok());
536        let break_batch_on_consume_fail = params
537            .get("breakBatchOnConsumeFail")
538            .map(|v| parse_bool_param("breakBatchOnConsumeFail", v))
539            .transpose()?
540            .unwrap_or(false);
541
542        // Producer parameters
543        let batch = params
544            .get("batch")
545            .map(|v| parse_bool_param("batch", v))
546            .transpose()?
547            .unwrap_or(false);
548        let use_message_body_for_sql = params
549            .get("useMessageBodyForSql")
550            .map(|v| parse_bool_param("useMessageBodyForSql", v))
551            .transpose()?
552            .unwrap_or(false);
553        let ssl_mode = params.get("sslMode").cloned();
554        let ssl_root_cert = params.get("sslRootCert").cloned();
555        let ssl_cert = params.get("sslCert").cloned();
556        let ssl_key = params.get("sslKey").cloned();
557
558        Ok(Self {
559            db_url,
560            max_connections,
561            min_connections,
562            idle_timeout_secs,
563            max_lifetime_secs,
564            query,
565            source_path,
566            output_type,
567            placeholder,
568            use_placeholder,
569            noop,
570            in_separator,
571            delay_ms,
572            initial_delay_ms,
573            max_messages_per_poll,
574            on_consume,
575            on_consume_failed,
576            on_consume_batch_complete,
577            route_empty_result_set,
578            use_iterator,
579            expected_update_count,
580            break_batch_on_consume_fail,
581            batch,
582            use_message_body_for_sql,
583            ssl_mode,
584            ssl_root_cert,
585            ssl_cert,
586            ssl_key,
587        })
588    }
589}
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594
595    #[test]
596    fn config_defaults() {
597        let mut c =
598            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
599        c.resolve_defaults();
600        assert_eq!(c.query, "select 1");
601        assert_eq!(c.db_url, "postgres://localhost/test");
602        assert_eq!(c.max_connections, Some(5));
603        assert_eq!(c.min_connections, Some(1));
604        assert_eq!(c.idle_timeout_secs, Some(300));
605        assert_eq!(c.max_lifetime_secs, Some(1800));
606        assert_eq!(c.output_type, SqlOutputType::SelectList);
607        assert_eq!(c.placeholder, '#');
608        assert!(!c.noop);
609        assert_eq!(c.in_separator, ", ");
610        assert_eq!(c.delay_ms, 500);
611        assert_eq!(c.initial_delay_ms, 1000);
612        assert!(c.max_messages_per_poll.is_none());
613        assert!(c.on_consume.is_none());
614        assert!(c.on_consume_failed.is_none());
615        assert!(c.on_consume_batch_complete.is_none());
616        assert!(!c.route_empty_result_set);
617        assert!(c.use_iterator);
618        assert!(c.expected_update_count.is_none());
619        assert!(!c.break_batch_on_consume_fail);
620        assert!(!c.batch);
621        assert!(!c.use_message_body_for_sql);
622        assert!(c.ssl_mode.is_none());
623        assert!(c.ssl_root_cert.is_none());
624        assert!(c.ssl_cert.is_none());
625        assert!(c.ssl_key.is_none());
626    }
627
628    #[test]
629    fn ssl_none_by_default() {
630        let c =
631            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
632        assert!(c.ssl_mode.is_none());
633        assert!(c.ssl_root_cert.is_none());
634        assert!(c.ssl_cert.is_none());
635        assert!(c.ssl_key.is_none());
636    }
637
638    #[test]
639    fn ssl_mode_from_uri() {
640        let c = SqlEndpointConfig::from_uri(
641            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
642        )
643        .unwrap();
644        assert_eq!(c.ssl_mode, Some("require".to_string()));
645        assert!(c.ssl_root_cert.is_none());
646    }
647
648    #[test]
649    fn ssl_all_params_from_uri() {
650        let c = SqlEndpointConfig::from_uri(
651            "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
652        )
653        .unwrap();
654        assert_eq!(c.ssl_mode, Some("require".to_string()));
655        assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
656        assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
657        assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
658    }
659
660    #[test]
661    fn ssl_global_applied_to_endpoint() {
662        let mut c =
663            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
664        let global = SqlGlobalConfig::default()
665            .with_ssl_mode("require")
666            .with_ssl_root_cert("/etc/ssl/ca.pem");
667        c.apply_defaults(&global);
668        assert_eq!(c.ssl_mode, Some("require".to_string()));
669        assert_eq!(c.ssl_root_cert, Some("/etc/ssl/ca.pem".to_string()));
670        assert!(c.ssl_cert.is_none());
671        assert!(c.ssl_key.is_none());
672    }
673
674    #[test]
675    fn ssl_uri_overrides_global() {
676        let mut c = SqlEndpointConfig::from_uri(
677            "sql:select 1?db_url=postgres://localhost/test&sslMode=verify-full",
678        )
679        .unwrap();
680        let global = SqlGlobalConfig::default().with_ssl_mode("require");
681        c.apply_defaults(&global);
682        assert_eq!(c.ssl_mode, Some("verify-full".to_string()));
683    }
684
685    #[test]
686    fn config_wrong_scheme() {
687        assert!(SqlEndpointConfig::from_uri("redis://localhost:6379").is_err());
688    }
689
690    #[test]
691    fn config_missing_db_url() {
692        assert!(SqlEndpointConfig::from_uri("sql:select 1").is_err());
693    }
694
695    #[test]
696    fn config_output_type_select_one() {
697        let c = SqlEndpointConfig::from_uri(
698            "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
699        )
700        .unwrap();
701        assert_eq!(c.output_type, SqlOutputType::SelectOne);
702    }
703
704    #[test]
705    fn config_output_type_stream_list() {
706        let c = SqlEndpointConfig::from_uri(
707            "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
708        )
709        .unwrap();
710        assert_eq!(c.output_type, SqlOutputType::StreamList);
711    }
712
713    #[test]
714    fn in_separator_default() {
715        let c =
716            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
717        assert_eq!(c.in_separator, ", ");
718    }
719
720    #[test]
721    fn in_separator_from_uri() {
722        let c = SqlEndpointConfig::from_uri(
723            "sql:select 1?db_url=postgres://localhost/test&inSeparator=;",
724        )
725        .unwrap();
726        assert_eq!(c.in_separator, ";");
727    }
728
729    #[test]
730    fn in_separator_empty_rejected() {
731        let result = SqlEndpointConfig::from_uri(
732            "sql:select 1?db_url=postgres://localhost/test&inSeparator=",
733        );
734        assert!(result.is_err());
735        let msg = format!("{:?}", result.unwrap_err());
736        assert!(msg.contains("inSeparator") || msg.contains("empty"));
737    }
738
739    #[test]
740    fn config_consumer_options() {
741        let c = SqlEndpointConfig::from_uri(
742            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
743        ).unwrap();
744        assert_eq!(c.delay_ms, 2000);
745        assert_eq!(c.initial_delay_ms, 500);
746        assert_eq!(c.max_messages_per_poll, Some(10));
747        assert_eq!(
748            c.on_consume,
749            Some("update t set done=true where id=:#id".to_string())
750        );
751        assert_eq!(
752            c.on_consume_failed,
753            Some("update t set failed=true where id=:#id".to_string())
754        );
755        assert_eq!(
756            c.on_consume_batch_complete,
757            Some("delete from t where done=true".to_string())
758        );
759        assert!(c.route_empty_result_set);
760        assert!(!c.use_iterator);
761        assert_eq!(c.expected_update_count, Some(1));
762        assert!(c.break_batch_on_consume_fail);
763    }
764
765    #[test]
766    fn config_producer_options() {
767        let c = SqlEndpointConfig::from_uri(
768            "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
769        ).unwrap();
770        assert!(c.batch);
771        assert!(c.use_message_body_for_sql);
772        assert!(c.noop);
773    }
774
775    #[test]
776    fn config_pool_options() {
777        let c = SqlEndpointConfig::from_uri(
778            "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
779        ).unwrap();
780        assert_eq!(c.max_connections, Some(20));
781        assert_eq!(c.min_connections, Some(3));
782        assert_eq!(c.idle_timeout_secs, Some(600));
783        assert_eq!(c.max_lifetime_secs, Some(3600));
784    }
785
786    #[test]
787    fn config_query_with_special_chars() {
788        let c = SqlEndpointConfig::from_uri(
789            "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
790        )
791        .unwrap();
792        assert_eq!(
793            c.query,
794            "select * from users where name = :#name and age > #"
795        );
796    }
797
798    #[test]
799    fn output_type_from_str() {
800        assert_eq!(
801            "SelectList".parse::<SqlOutputType>().unwrap(),
802            SqlOutputType::SelectList
803        );
804        assert_eq!(
805            "SelectOne".parse::<SqlOutputType>().unwrap(),
806            SqlOutputType::SelectOne
807        );
808        assert_eq!(
809            "StreamList".parse::<SqlOutputType>().unwrap(),
810            SqlOutputType::StreamList
811        );
812        assert!("Invalid".parse::<SqlOutputType>().is_err());
813    }
814
815    #[test]
816    fn config_file_not_found() {
817        let result = SqlEndpointConfig::from_uri(
818            "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
819        );
820        assert!(result.is_err());
821        let err = result.unwrap_err();
822        let msg = format!("{:?}", err);
823        assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
824    }
825
826    #[test]
827    fn config_file_query() {
828        use std::io::Write;
829        let unique_name = format!(
830            "test_sql_query_{}.sql",
831            std::time::SystemTime::now()
832                .duration_since(std::time::UNIX_EPOCH)
833                .unwrap_or_default()
834                .as_nanos()
835        );
836        let mut tmp = std::env::temp_dir();
837        tmp.push(unique_name);
838        {
839            let mut f = std::fs::File::create(&tmp).unwrap();
840            writeln!(f, "SELECT * FROM users").unwrap();
841        }
842        let uri = format!(
843            "sql:file:{}?db_url=postgres://localhost/test",
844            tmp.display()
845        );
846        let c = SqlEndpointConfig::from_uri(&uri).unwrap();
847        assert_eq!(c.query, "SELECT * FROM users");
848        assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
849        std::fs::remove_file(&tmp).ok();
850    }
851
852    // New tests for config contract
853    #[test]
854    fn pool_fields_none_when_not_set() {
855        let c =
856            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
857        assert_eq!(c.max_connections, None);
858        assert_eq!(c.min_connections, None);
859        assert_eq!(c.idle_timeout_secs, None);
860        assert_eq!(c.max_lifetime_secs, None);
861    }
862
863    #[test]
864    fn apply_defaults_fills_none() {
865        let mut c =
866            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
867        let global = SqlGlobalConfig {
868            max_connections: 10,
869            min_connections: 2,
870            idle_timeout_secs: 600,
871            max_lifetime_secs: 3600,
872            ssl_mode: None,
873            ssl_root_cert: None,
874            ssl_cert: None,
875            ssl_key: None,
876        };
877        c.apply_defaults(&global);
878        assert_eq!(c.max_connections, Some(10));
879        assert_eq!(c.min_connections, Some(2));
880        assert_eq!(c.idle_timeout_secs, Some(600));
881        assert_eq!(c.max_lifetime_secs, Some(3600));
882        assert!(c.ssl_mode.is_none());
883        assert!(c.ssl_root_cert.is_none());
884        assert!(c.ssl_cert.is_none());
885        assert!(c.ssl_key.is_none());
886    }
887
888    #[test]
889    fn apply_defaults_does_not_override() {
890        let mut c = SqlEndpointConfig::from_uri(
891            "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=5",
892        )
893        .unwrap();
894        let global = SqlGlobalConfig {
895            max_connections: 10,
896            min_connections: 2,
897            idle_timeout_secs: 600,
898            max_lifetime_secs: 3600,
899            ssl_mode: None,
900            ssl_root_cert: None,
901            ssl_cert: None,
902            ssl_key: None,
903        };
904        c.apply_defaults(&global);
905        // URI-set values should NOT be overridden
906        assert_eq!(c.max_connections, Some(99));
907        assert_eq!(c.min_connections, Some(5));
908        // None fields should be filled from global
909        assert_eq!(c.idle_timeout_secs, Some(600));
910        assert_eq!(c.max_lifetime_secs, Some(3600));
911    }
912
913    #[test]
914    fn resolve_defaults_fills_remaining() {
915        let mut c = SqlEndpointConfig::from_uri(
916            "sql:select 1?db_url=postgres://localhost/test&maxConnections=7",
917        )
918        .unwrap();
919        c.resolve_defaults();
920        assert_eq!(c.max_connections, Some(7)); // from URI
921        assert_eq!(c.min_connections, Some(1)); // from defaults
922        assert_eq!(c.idle_timeout_secs, Some(300)); // from defaults
923        assert_eq!(c.max_lifetime_secs, Some(1800)); // from defaults
924    }
925
926    #[test]
927    fn global_config_builder() {
928        let c = SqlGlobalConfig::default()
929            .with_max_connections(20)
930            .with_min_connections(3)
931            .with_idle_timeout_secs(600)
932            .with_max_lifetime_secs(3600)
933            .with_ssl_mode("require")
934            .with_ssl_root_cert("/ca.pem")
935            .with_ssl_cert("/cert.pem")
936            .with_ssl_key("/key.pem");
937        assert_eq!(c.max_connections, 20);
938        assert_eq!(c.min_connections, 3);
939        assert_eq!(c.idle_timeout_secs, 600);
940        assert_eq!(c.max_lifetime_secs, 3600);
941        assert_eq!(c.ssl_mode, Some("require".to_string()));
942        assert_eq!(c.ssl_root_cert, Some("/ca.pem".to_string()));
943        assert_eq!(c.ssl_cert, Some("/cert.pem".to_string()));
944        assert_eq!(c.ssl_key, Some("/key.pem".to_string()));
945    }
946
947    #[test]
948    fn enrich_postgres_ssl_mode() {
949        let mut c = SqlEndpointConfig::from_uri(
950            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
951        )
952        .unwrap();
953        c.resolve_defaults();
954        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
955        assert!(url.contains("sslmode=require"), "got: {}", url);
956    }
957
958    #[test]
959    fn enrich_postgres_all_ssl() {
960        let mut c = SqlEndpointConfig::from_uri(
961            "sql:select 1?db_url=postgres://localhost/test&sslMode=require&sslRootCert=/ca.pem&sslCert=/cert.pem&sslKey=/key.pem",
962        )
963        .unwrap();
964        c.resolve_defaults();
965        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
966        assert!(url.contains("sslmode=require"), "got: {}", url);
967        assert!(url.contains("sslrootcert="), "got: {}", url);
968        assert!(url.contains("sslcert="), "got: {}", url);
969        assert!(url.contains("sslkey="), "got: {}", url);
970    }
971
972    #[test]
973    fn enrich_mysql_ssl() {
974        let mut c = SqlEndpointConfig::from_uri(
975            "sql:select 1?db_url=mysql://localhost/test&sslMode=require",
976        )
977        .unwrap();
978        c.resolve_defaults();
979        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
980        assert!(url.contains("ssl-mode=require"), "got: {}", url);
981    }
982
983    #[test]
984    fn enrich_existing_query_params() {
985        let mut c = SqlEndpointConfig::from_uri(
986            "sql:select 1?db_url=postgres://localhost/test?existing=1&sslMode=require",
987        )
988        .unwrap();
989        c.resolve_defaults();
990        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
991        assert!(url.contains("existing=1"), "got: {}", url);
992        assert!(url.contains("sslmode=require"), "got: {}", url);
993    }
994
995    #[test]
996    fn enrich_override_existing() {
997        let mut c = SqlEndpointConfig::from_uri(
998            "sql:select 1?db_url=postgres://localhost/test?sslmode=allow&sslMode=require",
999        )
1000        .unwrap();
1001        c.resolve_defaults();
1002        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1003        assert!(url.contains("sslmode=require"), "got: {}", url);
1004        assert!(!url.contains("sslmode=allow"), "got: {}", url);
1005    }
1006
1007    #[test]
1008    fn enrich_no_params() {
1009        let mut c =
1010            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1011        c.resolve_defaults();
1012        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1013        assert_eq!(url, "postgres://localhost/test");
1014    }
1015
1016    #[test]
1017    fn enrich_url_encodes_paths() {
1018        let mut c = SqlEndpointConfig::from_uri(
1019            "sql:select 1?db_url=postgres://localhost/test&sslRootCert=/path/to/my%20cert.pem",
1020        )
1021        .unwrap();
1022        c.resolve_defaults();
1023        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1024        assert!(url.contains("sslrootcert="), "got: {}", url);
1025    }
1026
1027    #[test]
1028    fn enrich_unsupported_scheme_returns_unchanged() {
1029        let mut c = SqlEndpointConfig::from_uri(
1030            "sql:select 1?db_url=sqlite://localhost/test.db&sslMode=require",
1031        )
1032        .unwrap();
1033        c.resolve_defaults();
1034        let url = enrich_db_url_with_ssl(&c.db_url, &c).unwrap();
1035        assert_eq!(url, "sqlite://localhost/test.db");
1036    }
1037
1038    #[test]
1039    fn enrich_invalid_url_returns_error() {
1040        let mut c = SqlEndpointConfig::from_uri(
1041            "sql:select 1?db_url=postgres://localhost/test&sslMode=require",
1042        )
1043        .unwrap();
1044        c.resolve_defaults();
1045        let result = enrich_db_url_with_ssl("://not-a-valid-url", &c);
1046        assert!(result.is_err());
1047    }
1048
1049    // --- Phase B hardening tests ---
1050
1051    // SQL-010: Debug output redacts credentials
1052    #[test]
1053    fn debug_redacts_db_url_with_password() {
1054        let c = SqlEndpointConfig::from_uri(
1055            "sql:select 1?db_url=postgres://user:secret123@localhost/test",
1056        )
1057        .unwrap();
1058        let debug_output = format!("{:?}", c);
1059        assert!(
1060            !debug_output.contains("secret123"),
1061            "Debug output must not contain password: {}",
1062            debug_output
1063        );
1064        assert!(
1065            debug_output.contains("***"),
1066            "Debug output must contain redacted marker: {}",
1067            debug_output
1068        );
1069    }
1070
1071    #[test]
1072    fn debug_redacts_ssl_key() {
1073        let c = SqlEndpointConfig::from_uri(
1074            "sql:select 1?db_url=postgres://localhost/test&sslKey=/secret/key.pem",
1075        )
1076        .unwrap();
1077        let debug_output = format!("{:?}", c);
1078        assert!(
1079            !debug_output.contains("/secret/key.pem"),
1080            "Debug output must not contain ssl_key path: {}",
1081            debug_output
1082        );
1083    }
1084
1085    #[test]
1086    fn debug_global_config_redacts_ssl_key() {
1087        let c = SqlGlobalConfig::default().with_ssl_key("/secret/key.pem");
1088        let debug_output = format!("{:?}", c);
1089        assert!(
1090            !debug_output.contains("/secret/key.pem"),
1091            "Debug output must not contain ssl_key path: {}",
1092            debug_output
1093        );
1094        assert!(
1095            debug_output.contains("***"),
1096            "Debug output must contain redacted marker: {}",
1097            debug_output
1098        );
1099    }
1100
1101    #[test]
1102    fn redact_db_url_with_credentials() {
1103        assert_eq!(
1104            redact_db_url("postgres://user:pass@host/db"),
1105            "postgres://***:***@host/db"
1106        );
1107    }
1108
1109    #[test]
1110    fn redact_db_url_without_credentials() {
1111        assert_eq!(redact_db_url("sqlite::memory:"), "sqlite::memory:");
1112    }
1113
1114    #[test]
1115    fn redact_db_url_invalid_returns_original() {
1116        assert_eq!(redact_db_url("not-a-url"), "not-a-url");
1117    }
1118
1119    // SQL-004: usePlaceholder parsing
1120    #[test]
1121    fn use_placeholder_defaults_to_true() {
1122        let c =
1123            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
1124        assert!(c.use_placeholder);
1125    }
1126
1127    #[test]
1128    fn use_placeholder_false_from_uri() {
1129        let c = SqlEndpointConfig::from_uri(
1130            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=false",
1131        )
1132        .unwrap();
1133        assert!(!c.use_placeholder);
1134    }
1135
1136    #[test]
1137    fn use_placeholder_true_from_uri() {
1138        let c = SqlEndpointConfig::from_uri(
1139            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=true",
1140        )
1141        .unwrap();
1142        assert!(c.use_placeholder);
1143    }
1144
1145    // SQL-004: strict boolean parsing — invalid values rejected
1146    #[test]
1147    fn use_placeholder_rejects_invalid_value() {
1148        let result = SqlEndpointConfig::from_uri(
1149            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=1",
1150        );
1151        assert!(result.is_err());
1152        let msg = format!("{:?}", result.unwrap_err());
1153        assert!(msg.contains("usePlaceholder") && msg.contains("true") && msg.contains("false"));
1154    }
1155
1156    #[test]
1157    fn use_placeholder_rejects_typo_tru() {
1158        let result = SqlEndpointConfig::from_uri(
1159            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=tru",
1160        );
1161        assert!(result.is_err());
1162    }
1163
1164    #[test]
1165    fn use_placeholder_rejects_yes() {
1166        let result = SqlEndpointConfig::from_uri(
1167            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=yes",
1168        );
1169        assert!(result.is_err());
1170    }
1171
1172    #[test]
1173    fn noop_rejects_invalid_value() {
1174        let result =
1175            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&noop=1");
1176        assert!(result.is_err());
1177        let msg = format!("{:?}", result.unwrap_err());
1178        assert!(msg.contains("noop"));
1179    }
1180
1181    #[test]
1182    fn batch_rejects_invalid_value() {
1183        let result =
1184            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test&batch=yes");
1185        assert!(result.is_err());
1186        let msg = format!("{:?}", result.unwrap_err());
1187        assert!(msg.contains("batch"));
1188    }
1189
1190    #[test]
1191    fn route_empty_result_set_rejects_invalid_value() {
1192        let result = SqlEndpointConfig::from_uri(
1193            "sql:select 1?db_url=postgres://localhost/test&routeEmptyResultSet=on",
1194        );
1195        assert!(result.is_err());
1196    }
1197
1198    #[test]
1199    fn use_iterator_rejects_invalid_value() {
1200        let result = SqlEndpointConfig::from_uri(
1201            "sql:select 1?db_url=postgres://localhost/test&useIterator=1",
1202        );
1203        assert!(result.is_err());
1204    }
1205
1206    #[test]
1207    fn break_batch_on_consume_fail_rejects_invalid_value() {
1208        let result = SqlEndpointConfig::from_uri(
1209            "sql:select 1?db_url=postgres://localhost/test&breakBatchOnConsumeFail=yes",
1210        );
1211        assert!(result.is_err());
1212    }
1213
1214    #[test]
1215    fn use_message_body_for_sql_rejects_invalid_value() {
1216        let result = SqlEndpointConfig::from_uri(
1217            "sql:select 1?db_url=postgres://localhost/test&useMessageBodyForSql=1",
1218        );
1219        assert!(result.is_err());
1220    }
1221
1222    // Case-insensitive true/false still works
1223    #[test]
1224    fn boolean_params_case_insensitive() {
1225        let c = SqlEndpointConfig::from_uri(
1226            "sql:select 1?db_url=postgres://localhost/test&usePlaceholder=TRUE&noop=FALSE&batch=True&useIterator=False",
1227        )
1228        .unwrap();
1229        assert!(c.use_placeholder);
1230        assert!(!c.noop);
1231        assert!(c.batch);
1232        assert!(!c.use_iterator);
1233    }
1234
1235    // SQL-022: multi-char placeholder rejected
1236    #[test]
1237    fn multi_char_placeholder_rejected() {
1238        let result = SqlEndpointConfig::from_uri(
1239            "sql:select 1?db_url=postgres://localhost/test&placeholder=##",
1240        );
1241        assert!(result.is_err());
1242        let msg = format!("{:?}", result.unwrap_err());
1243        assert!(msg.contains("placeholder") && msg.contains("one character"));
1244    }
1245
1246    #[test]
1247    fn single_char_placeholder_accepted() {
1248        let c = SqlEndpointConfig::from_uri(
1249            "sql:select 1?db_url=postgres://localhost/test&placeholder=$",
1250        )
1251        .unwrap();
1252        assert_eq!(c.placeholder, '$');
1253    }
1254
1255    #[test]
1256    fn empty_placeholder_falls_back_to_default() {
1257        // Empty string is filtered out by the original logic — falls back to '#'
1258        let c = SqlEndpointConfig::from_uri(
1259            "sql:select 1?db_url=postgres://localhost/test&placeholder=",
1260        )
1261        .unwrap();
1262        assert_eq!(c.placeholder, '#');
1263    }
1264
1265    // SQL-014: file-based SQL config test (verifies file content is cached, no async read)
1266    #[test]
1267    fn file_query_cached_in_config() {
1268        use std::io::Write;
1269        let unique_name = format!(
1270            "test_sql_cached_{}.sql",
1271            std::time::SystemTime::now()
1272                .duration_since(std::time::UNIX_EPOCH)
1273                .unwrap_or_default()
1274                .as_nanos()
1275        );
1276        let mut tmp = std::env::temp_dir();
1277        tmp.push(unique_name);
1278        {
1279            let mut f = std::fs::File::create(&tmp).unwrap();
1280            writeln!(f, "SELECT * FROM cached_test").unwrap();
1281        }
1282        let uri = format!(
1283            "sql:file:{}?db_url=postgres://localhost/test",
1284            tmp.display()
1285        );
1286        let c = SqlEndpointConfig::from_uri(&uri).unwrap();
1287        // Query content is cached in config — no runtime file read needed
1288        assert_eq!(c.query, "SELECT * FROM cached_test");
1289        assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
1290        // Delete the file — config still has the query
1291        std::fs::remove_file(&tmp).ok();
1292        assert_eq!(c.query, "SELECT * FROM cached_test");
1293    }
1294}