Skip to main content

camel_component_sql/
config.rs

1use std::str::FromStr;
2
3use camel_api::CamelError;
4use camel_endpoint::{UriComponents, UriConfig, parse_uri};
5
6/// Output type for SQL query results.
7#[derive(Debug, Clone, PartialEq, Default)]
8pub enum SqlOutputType {
9    /// Return all rows as a list.
10    #[default]
11    SelectList,
12    /// Return a single row (first result).
13    SelectOne,
14    /// Stream results as an async iterator.
15    StreamList,
16}
17
18impl FromStr for SqlOutputType {
19    type Err = CamelError;
20
21    fn from_str(s: &str) -> Result<Self, Self::Err> {
22        match s {
23            "SelectList" => Ok(SqlOutputType::SelectList),
24            "SelectOne" => Ok(SqlOutputType::SelectOne),
25            "StreamList" => Ok(SqlOutputType::StreamList),
26            _ => Err(CamelError::InvalidUri(format!(
27                "Unknown output type: {}",
28                s
29            ))),
30        }
31    }
32}
33
34/// Global configuration for SQL component.
35///
36/// This is a plain Rust struct (no serde) with defaults and builder methods.
37/// It holds pool configuration that can be applied as defaults to endpoints.
38#[derive(Debug, Clone, PartialEq)]
39pub struct SqlGlobalConfig {
40    pub max_connections: u32,
41    pub min_connections: u32,
42    pub idle_timeout_secs: u64,
43    pub max_lifetime_secs: u64,
44}
45
46impl Default for SqlGlobalConfig {
47    fn default() -> Self {
48        Self {
49            max_connections: 5,
50            min_connections: 1,
51            idle_timeout_secs: 300,
52            max_lifetime_secs: 1800,
53        }
54    }
55}
56
57impl SqlGlobalConfig {
58    pub fn new() -> Self {
59        Self::default()
60    }
61
62    pub fn with_max_connections(mut self, value: u32) -> Self {
63        self.max_connections = value;
64        self
65    }
66
67    pub fn with_min_connections(mut self, value: u32) -> Self {
68        self.min_connections = value;
69        self
70    }
71
72    pub fn with_idle_timeout_secs(mut self, value: u64) -> Self {
73        self.idle_timeout_secs = value;
74        self
75    }
76
77    pub fn with_max_lifetime_secs(mut self, value: u64) -> Self {
78        self.max_lifetime_secs = value;
79        self
80    }
81}
82
83/// Configuration for SQL component endpoints.
84///
85/// URI format: `sql:<query>?db_url=<url>&param1=val1&param2=val2`
86///
87/// The query can be inline SQL or a file reference with `file:` prefix:
88/// - `sql:SELECT * FROM users?db_url=...` - inline SQL
89/// - `sql:file:/path/to/query.sql?db_url=...` - read SQL from file
90#[derive(Debug, Clone)]
91pub struct SqlEndpointConfig {
92    // Connection
93    /// Database connection URL (required).
94    pub db_url: String,
95    /// Maximum connections in the pool. None = use global default.
96    pub max_connections: Option<u32>,
97    /// Minimum connections in the pool. None = use global default.
98    pub min_connections: Option<u32>,
99    /// Idle timeout in seconds. None = use global default.
100    pub idle_timeout_secs: Option<u64>,
101    /// Maximum connection lifetime in seconds. None = use global default.
102    pub max_lifetime_secs: Option<u64>,
103
104    // Query
105    /// The SQL query (from URI path or file).
106    pub query: String,
107    /// Path to the file containing the SQL query (when using `file:` prefix).
108    pub source_path: Option<String>,
109    /// Output type for query results. Default: SelectList.
110    pub output_type: SqlOutputType,
111    /// Placeholder character for parameters. Default: '#'.
112    pub placeholder: char,
113    /// If true, don't execute the query (dry run). Default: false.
114    pub noop: bool,
115
116    // Consumer (polling)
117    /// Delay between polls in milliseconds. Default: 500.
118    pub delay_ms: u64,
119    /// Initial delay before first poll in milliseconds. Default: 1000.
120    pub initial_delay_ms: u64,
121    /// Maximum messages per poll.
122    pub max_messages_per_poll: Option<i32>,
123    /// SQL to execute after consuming each message.
124    pub on_consume: Option<String>,
125    /// SQL to execute if consumption fails.
126    pub on_consume_failed: Option<String>,
127    /// SQL to execute after consuming a batch.
128    pub on_consume_batch_complete: Option<String>,
129    /// Route empty result sets. Default: false.
130    pub route_empty_result_set: bool,
131    /// Use iterator for results. Default: true.
132    pub use_iterator: bool,
133    /// Expected number of rows affected.
134    pub expected_update_count: Option<i64>,
135    /// Break batch on consume failure. Default: false.
136    pub break_batch_on_consume_fail: bool,
137
138    // Producer
139    /// Enable batch mode. Default: false.
140    pub batch: bool,
141    /// Use message body for SQL. Default: false.
142    pub use_message_body_for_sql: bool,
143}
144
145impl SqlEndpointConfig {
146    /// Apply defaults from global config, filling None fields without overriding.
147    pub fn apply_defaults(&mut self, defaults: &SqlGlobalConfig) {
148        if self.max_connections.is_none() {
149            self.max_connections = Some(defaults.max_connections);
150        }
151        if self.min_connections.is_none() {
152            self.min_connections = Some(defaults.min_connections);
153        }
154        if self.idle_timeout_secs.is_none() {
155            self.idle_timeout_secs = Some(defaults.idle_timeout_secs);
156        }
157        if self.max_lifetime_secs.is_none() {
158            self.max_lifetime_secs = Some(defaults.max_lifetime_secs);
159        }
160    }
161
162    /// Resolve any remaining None fields with built-in defaults.
163    pub fn resolve_defaults(&mut self) {
164        let defaults = SqlGlobalConfig::default();
165        self.apply_defaults(&defaults);
166    }
167}
168
169impl UriConfig for SqlEndpointConfig {
170    fn scheme() -> &'static str {
171        "sql"
172    }
173
174    fn from_uri(uri: &str) -> Result<Self, CamelError> {
175        let parts = parse_uri(uri)?;
176        Self::from_components(parts)
177    }
178
179    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
180        // Validate scheme
181        if parts.scheme != Self::scheme() {
182            return Err(CamelError::InvalidUri(format!(
183                "expected scheme '{}' but got '{}'",
184                Self::scheme(),
185                parts.scheme
186            )));
187        }
188
189        let params = &parts.params;
190
191        // Handle file: prefix for query
192        let (query, source_path) = if parts.path.starts_with("file:") {
193            let file_path = parts.path.trim_start_matches("file:").to_string();
194            let contents = std::fs::read_to_string(&file_path).map_err(|e| {
195                CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
196            })?;
197            (contents.trim().to_string(), Some(file_path))
198        } else {
199            (parts.path.clone(), None)
200        };
201
202        // Required parameter: db_url
203        let db_url = params
204            .get("db_url")
205            .ok_or_else(|| CamelError::Config("db_url parameter is required".to_string()))?
206            .clone();
207
208        // Connection parameters - None when not set by URI param
209        let max_connections = params.get("maxConnections").and_then(|v| v.parse().ok());
210        let min_connections = params.get("minConnections").and_then(|v| v.parse().ok());
211        let idle_timeout_secs = params.get("idleTimeoutSecs").and_then(|v| v.parse().ok());
212        let max_lifetime_secs = params.get("maxLifetimeSecs").and_then(|v| v.parse().ok());
213
214        // Query parameters
215        let output_type = params
216            .get("outputType")
217            .map(|s| s.parse())
218            .transpose()?
219            .unwrap_or_default();
220        let placeholder = params
221            .get("placeholder")
222            .filter(|v| !v.is_empty())
223            .map(|v| v.chars().next().unwrap())
224            .unwrap_or('#');
225        let noop = params
226            .get("noop")
227            .map(|v| v.eq_ignore_ascii_case("true"))
228            .unwrap_or(false);
229
230        // Consumer parameters
231        let delay_ms = params
232            .get("delay")
233            .and_then(|v| v.parse().ok())
234            .unwrap_or(500);
235        let initial_delay_ms = params
236            .get("initialDelay")
237            .and_then(|v| v.parse().ok())
238            .unwrap_or(1000);
239        let max_messages_per_poll = params
240            .get("maxMessagesPerPoll")
241            .and_then(|v| v.parse().ok());
242        let on_consume = params.get("onConsume").cloned();
243        let on_consume_failed = params.get("onConsumeFailed").cloned();
244        let on_consume_batch_complete = params.get("onConsumeBatchComplete").cloned();
245        let route_empty_result_set = params
246            .get("routeEmptyResultSet")
247            .map(|v| v.eq_ignore_ascii_case("true"))
248            .unwrap_or(false);
249        let use_iterator = params
250            .get("useIterator")
251            .map(|v| v.eq_ignore_ascii_case("true"))
252            .unwrap_or(true);
253        let expected_update_count = params
254            .get("expectedUpdateCount")
255            .and_then(|v| v.parse().ok());
256        let break_batch_on_consume_fail = params
257            .get("breakBatchOnConsumeFail")
258            .map(|v| v.eq_ignore_ascii_case("true"))
259            .unwrap_or(false);
260
261        // Producer parameters
262        let batch = params
263            .get("batch")
264            .map(|v| v.eq_ignore_ascii_case("true"))
265            .unwrap_or(false);
266        let use_message_body_for_sql = params
267            .get("useMessageBodyForSql")
268            .map(|v| v.eq_ignore_ascii_case("true"))
269            .unwrap_or(false);
270
271        Ok(Self {
272            db_url,
273            max_connections,
274            min_connections,
275            idle_timeout_secs,
276            max_lifetime_secs,
277            query,
278            source_path,
279            output_type,
280            placeholder,
281            noop,
282            delay_ms,
283            initial_delay_ms,
284            max_messages_per_poll,
285            on_consume,
286            on_consume_failed,
287            on_consume_batch_complete,
288            route_empty_result_set,
289            use_iterator,
290            expected_update_count,
291            break_batch_on_consume_fail,
292            batch,
293            use_message_body_for_sql,
294        })
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[test]
303    fn test_config_defaults() {
304        let mut c =
305            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
306        c.resolve_defaults();
307        assert_eq!(c.query, "select 1");
308        assert_eq!(c.db_url, "postgres://localhost/test");
309        assert_eq!(c.max_connections, Some(5));
310        assert_eq!(c.min_connections, Some(1));
311        assert_eq!(c.idle_timeout_secs, Some(300));
312        assert_eq!(c.max_lifetime_secs, Some(1800));
313        assert_eq!(c.output_type, SqlOutputType::SelectList);
314        assert_eq!(c.placeholder, '#');
315        assert!(!c.noop);
316        assert_eq!(c.delay_ms, 500);
317        assert_eq!(c.initial_delay_ms, 1000);
318        assert!(c.max_messages_per_poll.is_none());
319        assert!(c.on_consume.is_none());
320        assert!(c.on_consume_failed.is_none());
321        assert!(c.on_consume_batch_complete.is_none());
322        assert!(!c.route_empty_result_set);
323        assert!(c.use_iterator);
324        assert!(c.expected_update_count.is_none());
325        assert!(!c.break_batch_on_consume_fail);
326        assert!(!c.batch);
327        assert!(!c.use_message_body_for_sql);
328    }
329
330    #[test]
331    fn test_config_wrong_scheme() {
332        assert!(SqlEndpointConfig::from_uri("redis://localhost:6379").is_err());
333    }
334
335    #[test]
336    fn test_config_missing_db_url() {
337        assert!(SqlEndpointConfig::from_uri("sql:select 1").is_err());
338    }
339
340    #[test]
341    fn test_config_output_type_select_one() {
342        let c = SqlEndpointConfig::from_uri(
343            "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
344        )
345        .unwrap();
346        assert_eq!(c.output_type, SqlOutputType::SelectOne);
347    }
348
349    #[test]
350    fn test_config_output_type_stream_list() {
351        let c = SqlEndpointConfig::from_uri(
352            "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
353        )
354        .unwrap();
355        assert_eq!(c.output_type, SqlOutputType::StreamList);
356    }
357
358    #[test]
359    fn test_config_consumer_options() {
360        let c = SqlEndpointConfig::from_uri(
361            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
362        ).unwrap();
363        assert_eq!(c.delay_ms, 2000);
364        assert_eq!(c.initial_delay_ms, 500);
365        assert_eq!(c.max_messages_per_poll, Some(10));
366        assert_eq!(
367            c.on_consume,
368            Some("update t set done=true where id=:#id".to_string())
369        );
370        assert_eq!(
371            c.on_consume_failed,
372            Some("update t set failed=true where id=:#id".to_string())
373        );
374        assert_eq!(
375            c.on_consume_batch_complete,
376            Some("delete from t where done=true".to_string())
377        );
378        assert!(c.route_empty_result_set);
379        assert!(!c.use_iterator);
380        assert_eq!(c.expected_update_count, Some(1));
381        assert!(c.break_batch_on_consume_fail);
382    }
383
384    #[test]
385    fn test_config_producer_options() {
386        let c = SqlEndpointConfig::from_uri(
387            "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
388        ).unwrap();
389        assert!(c.batch);
390        assert!(c.use_message_body_for_sql);
391        assert!(c.noop);
392    }
393
394    #[test]
395    fn test_config_pool_options() {
396        let c = SqlEndpointConfig::from_uri(
397            "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
398        ).unwrap();
399        assert_eq!(c.max_connections, Some(20));
400        assert_eq!(c.min_connections, Some(3));
401        assert_eq!(c.idle_timeout_secs, Some(600));
402        assert_eq!(c.max_lifetime_secs, Some(3600));
403    }
404
405    #[test]
406    fn test_config_query_with_special_chars() {
407        let c = SqlEndpointConfig::from_uri(
408            "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
409        )
410        .unwrap();
411        assert_eq!(
412            c.query,
413            "select * from users where name = :#name and age > #"
414        );
415    }
416
417    #[test]
418    fn test_output_type_from_str() {
419        assert_eq!(
420            "SelectList".parse::<SqlOutputType>().unwrap(),
421            SqlOutputType::SelectList
422        );
423        assert_eq!(
424            "SelectOne".parse::<SqlOutputType>().unwrap(),
425            SqlOutputType::SelectOne
426        );
427        assert_eq!(
428            "StreamList".parse::<SqlOutputType>().unwrap(),
429            SqlOutputType::StreamList
430        );
431        assert!("Invalid".parse::<SqlOutputType>().is_err());
432    }
433
434    #[test]
435    fn test_config_file_not_found() {
436        let result = SqlEndpointConfig::from_uri(
437            "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
438        );
439        assert!(result.is_err());
440        let err = result.unwrap_err();
441        let msg = format!("{:?}", err);
442        assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
443    }
444
445    #[test]
446    fn test_config_file_query() {
447        use std::io::Write;
448        let unique_name = format!(
449            "test_sql_query_{}.sql",
450            std::time::SystemTime::now()
451                .duration_since(std::time::UNIX_EPOCH)
452                .unwrap_or_default()
453                .as_nanos()
454        );
455        let mut tmp = std::env::temp_dir();
456        tmp.push(unique_name);
457        {
458            let mut f = std::fs::File::create(&tmp).unwrap();
459            writeln!(f, "SELECT * FROM users").unwrap();
460        }
461        let uri = format!(
462            "sql:file:{}?db_url=postgres://localhost/test",
463            tmp.display()
464        );
465        let c = SqlEndpointConfig::from_uri(&uri).unwrap();
466        assert_eq!(c.query, "SELECT * FROM users");
467        assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
468        std::fs::remove_file(&tmp).ok();
469    }
470
471    // New tests for config contract
472    #[test]
473    fn test_pool_fields_none_when_not_set() {
474        let c =
475            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
476        assert_eq!(c.max_connections, None);
477        assert_eq!(c.min_connections, None);
478        assert_eq!(c.idle_timeout_secs, None);
479        assert_eq!(c.max_lifetime_secs, None);
480    }
481
482    #[test]
483    fn test_apply_defaults_fills_none() {
484        let mut c =
485            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
486        let global = SqlGlobalConfig {
487            max_connections: 10,
488            min_connections: 2,
489            idle_timeout_secs: 600,
490            max_lifetime_secs: 3600,
491        };
492        c.apply_defaults(&global);
493        assert_eq!(c.max_connections, Some(10));
494        assert_eq!(c.min_connections, Some(2));
495        assert_eq!(c.idle_timeout_secs, Some(600));
496        assert_eq!(c.max_lifetime_secs, Some(3600));
497    }
498
499    #[test]
500    fn test_apply_defaults_does_not_override() {
501        let mut c = SqlEndpointConfig::from_uri(
502            "sql:select 1?db_url=postgres://localhost/test&maxConnections=99&minConnections=5",
503        )
504        .unwrap();
505        let global = SqlGlobalConfig {
506            max_connections: 10,
507            min_connections: 2,
508            idle_timeout_secs: 600,
509            max_lifetime_secs: 3600,
510        };
511        c.apply_defaults(&global);
512        // URI-set values should NOT be overridden
513        assert_eq!(c.max_connections, Some(99));
514        assert_eq!(c.min_connections, Some(5));
515        // None fields should be filled from global
516        assert_eq!(c.idle_timeout_secs, Some(600));
517        assert_eq!(c.max_lifetime_secs, Some(3600));
518    }
519
520    #[test]
521    fn test_resolve_defaults_fills_remaining() {
522        let mut c = SqlEndpointConfig::from_uri(
523            "sql:select 1?db_url=postgres://localhost/test&maxConnections=7",
524        )
525        .unwrap();
526        c.resolve_defaults();
527        assert_eq!(c.max_connections, Some(7)); // from URI
528        assert_eq!(c.min_connections, Some(1)); // from defaults
529        assert_eq!(c.idle_timeout_secs, Some(300)); // from defaults
530        assert_eq!(c.max_lifetime_secs, Some(1800)); // from defaults
531    }
532
533    #[test]
534    fn test_global_config_builder() {
535        let c = SqlGlobalConfig::default()
536            .with_max_connections(20)
537            .with_min_connections(3)
538            .with_idle_timeout_secs(600)
539            .with_max_lifetime_secs(3600);
540        assert_eq!(c.max_connections, 20);
541        assert_eq!(c.min_connections, 3);
542        assert_eq!(c.idle_timeout_secs, 600);
543        assert_eq!(c.max_lifetime_secs, 3600);
544    }
545}