Skip to main content

camel_component_sql/
config.rs

1use std::str::FromStr;
2
3use camel_api::CamelError;
4use camel_endpoint::parse_uri;
5
6/// Output type for SQL query results.
7#[derive(Debug, Clone, PartialEq)]
8pub enum SqlOutputType {
9    /// Return all rows as a list.
10    SelectList,
11    /// Return a single row (first result).
12    SelectOne,
13    /// Stream results as an async iterator.
14    StreamList,
15}
16
17impl FromStr for SqlOutputType {
18    type Err = CamelError;
19
20    fn from_str(s: &str) -> Result<Self, Self::Err> {
21        match s {
22            "SelectList" => Ok(SqlOutputType::SelectList),
23            "SelectOne" => Ok(SqlOutputType::SelectOne),
24            "StreamList" => Ok(SqlOutputType::StreamList),
25            _ => Err(CamelError::InvalidUri(format!(
26                "Unknown output type: {}",
27                s
28            ))),
29        }
30    }
31}
32
33/// Configuration for SQL component endpoints.
34#[derive(Debug, Clone)]
35pub struct SqlConfig {
36    // Connection
37    pub db_url: String,
38    pub max_connections: u32,
39    pub min_connections: u32,
40    pub idle_timeout_secs: u64,
41    pub max_lifetime_secs: u64,
42
43    // Query
44    pub query: String,
45    /// Path to a file containing the SQL query (populated when URI starts with `sql:file:...`)
46    pub source_path: Option<String>,
47    pub output_type: SqlOutputType,
48    pub placeholder: char,
49    pub noop: bool,
50
51    // Consumer (polling)
52    pub delay_ms: u64,
53    pub initial_delay_ms: u64,
54    pub max_messages_per_poll: Option<i32>,
55    pub on_consume: Option<String>,
56    pub on_consume_failed: Option<String>,
57    pub on_consume_batch_complete: Option<String>,
58    pub route_empty_result_set: bool,
59    pub use_iterator: bool,
60    pub expected_update_count: Option<i64>,
61    pub break_batch_on_consume_fail: bool,
62
63    // Producer
64    pub batch: bool,
65    pub use_message_body_for_sql: bool,
66}
67
68impl SqlConfig {
69    /// Parse configuration from a Camel-style URI.
70    ///
71    /// URI format: `sql:<query>?db_url=<url>&param1=val1&param2=val2`
72    ///
73    /// # Errors
74    ///
75    /// Returns `CamelError::InvalidUri` if the scheme is not "sql".
76    /// Returns `CamelError::Config` if `db_url` parameter is missing.
77    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
78        let components = parse_uri(uri)?;
79
80        // Validate scheme
81        if components.scheme != "sql" {
82            return Err(CamelError::InvalidUri(format!(
83                "Expected scheme 'sql', got '{}'",
84                components.scheme
85            )));
86        }
87
88        // Detect file-based query
89        let (query, source_path) = if components.path.starts_with("file:") {
90            let file_path = components.path.trim_start_matches("file:").to_string();
91            let contents = std::fs::read_to_string(&file_path).map_err(|e| {
92                CamelError::Config(format!("Failed to read SQL file '{}': {}", file_path, e))
93            })?;
94            (contents.trim().to_string(), Some(file_path))
95        } else {
96            (components.path.clone(), None)
97        };
98
99        // Extract required db_url
100        let db_url = components
101            .params
102            .get("db_url")
103            .ok_or_else(|| CamelError::Config("db_url parameter is required".to_string()))?
104            .clone();
105
106        // Helper functions for parameter extraction
107        let get_param = |name: &str| -> Option<&String> { components.params.get(name) };
108
109        let get_u32 = |name: &str, default: u32| -> u32 {
110            get_param(name)
111                .and_then(|v| v.parse().ok())
112                .unwrap_or(default)
113        };
114
115        let get_u64 = |name: &str, default: u64| -> u64 {
116            get_param(name)
117                .and_then(|v| v.parse().ok())
118                .unwrap_or(default)
119        };
120
121        let get_i32 = |name: &str| -> Option<i32> { get_param(name).and_then(|v| v.parse().ok()) };
122
123        let get_i64 = |name: &str| -> Option<i64> { get_param(name).and_then(|v| v.parse().ok()) };
124
125        let get_bool = |name: &str, default: bool| -> bool {
126            get_param(name)
127                .map(|v| v.eq_ignore_ascii_case("true"))
128                .unwrap_or(default)
129        };
130
131        let get_char = |name: &str, default: char| -> char {
132            get_param(name)
133                .filter(|v| !v.is_empty())
134                .map(|v| v.chars().next().unwrap())
135                .unwrap_or(default)
136        };
137
138        let get_string = |name: &str| -> Option<String> { get_param(name).cloned() };
139
140        // Build config with defaults
141        Ok(SqlConfig {
142            // Connection
143            db_url,
144            max_connections: get_u32("maxConnections", 5),
145            min_connections: get_u32("minConnections", 1),
146            idle_timeout_secs: get_u64("idleTimeoutSecs", 300),
147            max_lifetime_secs: get_u64("maxLifetimeSecs", 1800),
148
149            // Query
150            query,
151            source_path,
152            output_type: get_param("outputType")
153                .map(|s| s.parse())
154                .transpose()?
155                .unwrap_or(SqlOutputType::SelectList),
156            placeholder: get_char("placeholder", '#'),
157            noop: get_bool("noop", false),
158
159            // Consumer
160            delay_ms: get_u64("delay", 500),
161            initial_delay_ms: get_u64("initialDelay", 1000),
162            max_messages_per_poll: get_i32("maxMessagesPerPoll"),
163            on_consume: get_string("onConsume"),
164            on_consume_failed: get_string("onConsumeFailed"),
165            on_consume_batch_complete: get_string("onConsumeBatchComplete"),
166            route_empty_result_set: get_bool("routeEmptyResultSet", false),
167            use_iterator: get_bool("useIterator", true),
168            expected_update_count: get_i64("expectedUpdateCount"),
169            break_batch_on_consume_fail: get_bool("breakBatchOnConsumeFail", false),
170
171            // Producer
172            batch: get_bool("batch", false),
173            use_message_body_for_sql: get_bool("useMessageBodyForSql", false),
174        })
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181
182    #[test]
183    fn test_config_defaults() {
184        let c = SqlConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
185        assert_eq!(c.query, "select 1");
186        assert_eq!(c.db_url, "postgres://localhost/test");
187        assert_eq!(c.max_connections, 5);
188        assert_eq!(c.min_connections, 1);
189        assert_eq!(c.idle_timeout_secs, 300);
190        assert_eq!(c.max_lifetime_secs, 1800);
191        assert_eq!(c.output_type, SqlOutputType::SelectList);
192        assert_eq!(c.placeholder, '#');
193        assert!(!c.noop);
194        assert_eq!(c.delay_ms, 500);
195        assert_eq!(c.initial_delay_ms, 1000);
196        assert!(c.max_messages_per_poll.is_none());
197        assert!(c.on_consume.is_none());
198        assert!(c.on_consume_failed.is_none());
199        assert!(c.on_consume_batch_complete.is_none());
200        assert!(!c.route_empty_result_set);
201        assert!(c.use_iterator);
202        assert!(c.expected_update_count.is_none());
203        assert!(!c.break_batch_on_consume_fail);
204        assert!(!c.batch);
205        assert!(!c.use_message_body_for_sql);
206    }
207
208    #[test]
209    fn test_config_wrong_scheme() {
210        assert!(SqlConfig::from_uri("redis://localhost:6379").is_err());
211    }
212
213    #[test]
214    fn test_config_missing_db_url() {
215        assert!(SqlConfig::from_uri("sql:select 1").is_err());
216    }
217
218    #[test]
219    fn test_config_output_type_select_one() {
220        let c = SqlConfig::from_uri(
221            "sql:select 1?db_url=postgres://localhost/test&outputType=SelectOne",
222        )
223        .unwrap();
224        assert_eq!(c.output_type, SqlOutputType::SelectOne);
225    }
226
227    #[test]
228    fn test_config_output_type_stream_list() {
229        let c = SqlConfig::from_uri(
230            "sql:select 1?db_url=postgres://localhost/test&outputType=StreamList",
231        )
232        .unwrap();
233        assert_eq!(c.output_type, SqlOutputType::StreamList);
234    }
235
236    #[test]
237    fn test_config_consumer_options() {
238        let c = SqlConfig::from_uri(
239            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&initialDelay=500&maxMessagesPerPoll=10&onConsume=update t set done=true where id=:#id&onConsumeFailed=update t set failed=true where id=:#id&onConsumeBatchComplete=delete from t where done=true&routeEmptyResultSet=true&useIterator=false&expectedUpdateCount=1&breakBatchOnConsumeFail=true"
240        ).unwrap();
241        assert_eq!(c.delay_ms, 2000);
242        assert_eq!(c.initial_delay_ms, 500);
243        assert_eq!(c.max_messages_per_poll, Some(10));
244        assert_eq!(
245            c.on_consume,
246            Some("update t set done=true where id=:#id".to_string())
247        );
248        assert_eq!(
249            c.on_consume_failed,
250            Some("update t set failed=true where id=:#id".to_string())
251        );
252        assert_eq!(
253            c.on_consume_batch_complete,
254            Some("delete from t where done=true".to_string())
255        );
256        assert!(c.route_empty_result_set);
257        assert!(!c.use_iterator);
258        assert_eq!(c.expected_update_count, Some(1));
259        assert!(c.break_batch_on_consume_fail);
260    }
261
262    #[test]
263    fn test_config_producer_options() {
264        let c = SqlConfig::from_uri(
265            "sql:insert into t values (#)?db_url=postgres://localhost/test&batch=true&useMessageBodyForSql=true&noop=true"
266        ).unwrap();
267        assert!(c.batch);
268        assert!(c.use_message_body_for_sql);
269        assert!(c.noop);
270    }
271
272    #[test]
273    fn test_config_pool_options() {
274        let c = SqlConfig::from_uri(
275            "sql:select 1?db_url=postgres://localhost/test&maxConnections=20&minConnections=3&idleTimeoutSecs=600&maxLifetimeSecs=3600"
276        ).unwrap();
277        assert_eq!(c.max_connections, 20);
278        assert_eq!(c.min_connections, 3);
279        assert_eq!(c.idle_timeout_secs, 600);
280        assert_eq!(c.max_lifetime_secs, 3600);
281    }
282
283    #[test]
284    fn test_config_query_with_special_chars() {
285        let c = SqlConfig::from_uri(
286            "sql:select * from users where name = :#name and age > #?db_url=postgres://localhost/test",
287        )
288        .unwrap();
289        assert_eq!(
290            c.query,
291            "select * from users where name = :#name and age > #"
292        );
293    }
294
295    #[test]
296    fn test_output_type_from_str() {
297        assert_eq!(
298            "SelectList".parse::<SqlOutputType>().unwrap(),
299            SqlOutputType::SelectList
300        );
301        assert_eq!(
302            "SelectOne".parse::<SqlOutputType>().unwrap(),
303            SqlOutputType::SelectOne
304        );
305        assert_eq!(
306            "StreamList".parse::<SqlOutputType>().unwrap(),
307            SqlOutputType::StreamList
308        );
309        assert!("Invalid".parse::<SqlOutputType>().is_err());
310    }
311
312    #[test]
313    fn test_config_file_not_found() {
314        let result = SqlConfig::from_uri(
315            "sql:file:/nonexistent/path/query.sql?db_url=postgres://localhost/test",
316        );
317        assert!(result.is_err());
318        let err = result.unwrap_err();
319        let msg = format!("{:?}", err);
320        assert!(msg.contains("Failed to read SQL file") || msg.contains("nonexistent"));
321    }
322
323    #[test]
324    fn test_config_file_query() {
325        use std::io::Write;
326        let unique_name = format!(
327            "test_sql_query_{}.sql",
328            std::time::SystemTime::now()
329                .duration_since(std::time::UNIX_EPOCH)
330                .unwrap_or_default()
331                .as_nanos()
332        );
333        let mut tmp = std::env::temp_dir();
334        tmp.push(unique_name);
335        {
336            let mut f = std::fs::File::create(&tmp).unwrap();
337            writeln!(f, "SELECT * FROM users").unwrap();
338        }
339        let uri = format!(
340            "sql:file:{}?db_url=postgres://localhost/test",
341            tmp.display()
342        );
343        let c = SqlConfig::from_uri(&uri).unwrap();
344        assert_eq!(c.query, "SELECT * FROM users");
345        assert_eq!(c.source_path, Some(tmp.to_string_lossy().into_owned()));
346        std::fs::remove_file(&tmp).ok();
347    }
348}