1use camel_api::{Body, CamelError, Exchange};
7
8#[derive(Debug, Clone, PartialEq)]
10pub enum ParamSlot {
11 Positional(usize),
13 Named(String),
15 InClause(String),
17 Expression(String),
19}
20
21#[derive(Debug, Clone)]
23pub struct QueryTemplate {
24 pub fragments: Vec<String>,
26 pub params: Vec<ParamSlot>,
28}
29
30#[derive(Debug, Clone)]
32pub struct PreparedQuery {
33 pub sql: String,
35 pub bindings: Vec<serde_json::Value>,
37}
38
39pub fn parse_query_template(
54 template: &str,
55 placeholder: char,
56) -> Result<QueryTemplate, CamelError> {
57 let mut fragments = Vec::new();
58 let mut params = Vec::new();
59 let mut positional_index = 0usize;
60
61 let chars: Vec<char> = template.chars().collect();
62 let mut i = 0usize;
63 let mut last_param_end = 0usize;
64 let mut in_literal = false;
65
66 while i < chars.len() {
67 if chars[i] == '\'' {
69 if in_literal && i + 1 < chars.len() && chars[i + 1] == '\'' {
71 i += 2;
72 continue;
73 }
74 in_literal = !in_literal;
75 i += 1;
76 continue;
77 }
78
79 if !in_literal && chars[i] == placeholder {
81 if i > 0 && chars[i - 1] == ':' {
83 let is_in_clause = check_in_prefix(&chars, i + 1);
89 let is_expression =
94 i + 2 < chars.len() && chars[i + 1] == '$' && chars[i + 2] == '{';
95
96 if is_in_clause {
97 let name_start = i + 4;
100 let (name, name_end) = extract_param_name(&chars, name_start);
101
102 if name.is_empty() {
103 return Err(CamelError::ProcessorError(format!(
104 "Empty IN clause parameter name at position {}",
105 i
106 )));
107 }
108
109 fragments.push(chars[last_param_end..(i - 1)].iter().collect());
111
112 params.push(ParamSlot::InClause(name));
113 last_param_end = name_end;
114 i = name_end;
115 } else if is_expression {
116 let brace_start = i + 2;
119 if let Some(brace_end) = find_matching_brace(&chars, brace_start) {
120 let expr_content: String =
122 chars[(brace_start + 1)..brace_end].iter().collect();
123
124 if expr_content.is_empty() {
125 return Err(CamelError::ProcessorError(format!(
126 "Empty expression at position {}",
127 i
128 )));
129 }
130
131 fragments.push(chars[last_param_end..(i - 1)].iter().collect());
133
134 params.push(ParamSlot::Expression(expr_content));
135 last_param_end = brace_end + 1;
136 i = brace_end + 1;
137 } else {
138 return Err(CamelError::ProcessorError(format!(
139 "Unclosed expression at position {}",
140 i
141 )));
142 }
143 } else {
144 let name_start = i + 1;
146 let (name, name_end) = extract_param_name(&chars, name_start);
147
148 if name.is_empty() {
149 return Err(CamelError::ProcessorError(format!(
150 "Empty named parameter name at position {}",
151 i
152 )));
153 }
154
155 fragments.push(chars[last_param_end..(i - 1)].iter().collect());
157
158 params.push(ParamSlot::Named(name));
159 last_param_end = name_end;
160 i = name_end;
161 }
162 } else {
163 fragments.push(chars[last_param_end..i].iter().collect());
166
167 params.push(ParamSlot::Positional(positional_index));
168 positional_index += 1;
169 last_param_end = i + 1;
170 i += 1;
171 }
172 } else {
173 i += 1;
174 }
175 }
176
177 fragments.push(chars[last_param_end..].iter().collect());
179
180 Ok(QueryTemplate { fragments, params })
181}
182
183fn check_in_prefix(chars: &[char], start: usize) -> bool {
185 let in_prefix: Vec<char> = "in:".chars().collect();
186 if start + in_prefix.len() > chars.len() {
187 return false;
188 }
189 chars[start..start + in_prefix.len()] == in_prefix[..]
190}
191
192fn extract_param_name(chars: &[char], start: usize) -> (String, usize) {
196 let mut name = String::new();
197 let mut i = start;
198
199 while i < chars.len() {
200 let c = chars[i];
201 if c.is_alphanumeric() || c == '_' {
202 name.push(c);
203 i += 1;
204 } else {
205 break;
206 }
207 }
208
209 (name, i)
210}
211
212fn find_matching_brace(chars: &[char], start: usize) -> Option<usize> {
215 chars[start..]
217 .iter()
218 .position(|&c| c == '}')
219 .map(|p| start + p)
220}
221
222pub fn resolve_params(
240 tpl: &QueryTemplate,
241 exchange: &Exchange,
242) -> Result<PreparedQuery, CamelError> {
243 let mut sql_parts = Vec::new();
244 let mut bindings = Vec::new();
245 let mut placeholder_num = 1usize;
246
247 let body_json = match &exchange.input.body {
249 Body::Json(val) => Some(val),
250 _ => None,
251 };
252
253 let body_array = body_json.as_ref().and_then(|v| v.as_array());
255
256 for (i, param) in tpl.params.iter().enumerate() {
257 sql_parts.push(tpl.fragments[i].clone());
259
260 match param {
261 ParamSlot::Positional(idx) => {
262 let arr = body_array.ok_or_else(|| {
263 CamelError::ProcessorError(
264 "Positional parameter requires body to be a JSON array".to_string(),
265 )
266 })?;
267
268 let value = arr.get(*idx).ok_or_else(|| {
269 CamelError::ProcessorError(format!(
270 "Positional parameter index {} out of bounds (array length {})",
271 idx,
272 arr.len()
273 ))
274 })?;
275
276 sql_parts.push(format!("${}", placeholder_num));
277 placeholder_num += 1;
278 bindings.push(value.clone());
279 }
280 ParamSlot::Named(name) => {
281 let value = resolve_named_param(name, body_json, &exchange.input, exchange)?;
282 sql_parts.push(format!("${}", placeholder_num));
283 placeholder_num += 1;
284 bindings.push(value);
285 }
286 ParamSlot::InClause(name) => {
287 let value = resolve_named_param(name, body_json, &exchange.input, exchange)?;
288
289 let arr = value.as_array().ok_or_else(|| {
290 CamelError::ProcessorError(format!(
291 "IN clause parameter '{}' must be an array, got type {}",
292 name,
293 match &value {
294 serde_json::Value::Null => "null",
295 serde_json::Value::Bool(_) => "bool",
296 serde_json::Value::Number(_) => "number",
297 serde_json::Value::String(_) => "string",
298 serde_json::Value::Array(_) => "array",
299 serde_json::Value::Object(_) => "object",
300 }
301 ))
302 })?;
303
304 if arr.is_empty() {
305 sql_parts.push("NULL".to_string());
308 } else {
309 let placeholders: Vec<String> = arr
310 .iter()
311 .map(|_| {
312 let p = format!("${}", placeholder_num);
313 placeholder_num += 1;
314 p
315 })
316 .collect();
317
318 sql_parts.push(placeholders.join(", "));
320 bindings.extend(arr.iter().cloned());
321 }
322 }
323 ParamSlot::Expression(expr) => {
324 let value = resolve_expression_param(expr, body_json, &exchange.input, exchange)?;
325 sql_parts.push(format!("${}", placeholder_num));
326 placeholder_num += 1;
327 bindings.push(value);
328 }
329 }
330 }
331
332 sql_parts.push(tpl.fragments.last().cloned().unwrap_or_default());
334
335 Ok(PreparedQuery {
336 sql: sql_parts.join(""),
337 bindings,
338 })
339}
340
341fn resolve_named_param(
343 name: &str,
344 body_json: Option<&serde_json::Value>,
345 message: &camel_api::Message,
346 exchange: &Exchange,
347) -> Result<serde_json::Value, CamelError> {
348 if let Some(json) = body_json
350 && let Some(obj) = json.as_object()
351 && let Some(value) = obj.get(name)
352 {
353 return Ok(value.clone());
354 }
355
356 if let Some(value) = message.header(name) {
358 return Ok(value.clone());
359 }
360
361 if let Some(value) = exchange.property(name) {
363 return Ok(value.clone());
364 }
365
366 Err(CamelError::ProcessorError(format!(
367 "Named parameter '{}' not found in body, headers, or properties",
368 name
369 )))
370}
371
372fn resolve_expression_param(
375 expr: &str,
376 body_json: Option<&serde_json::Value>,
377 message: &camel_api::Message,
378 exchange: &Exchange,
379) -> Result<serde_json::Value, CamelError> {
380 let parts: Vec<&str> = expr.splitn(2, '.').collect();
381 match parts.as_slice() {
382 ["body", field] => {
383 body_json
385 .and_then(|v| v.as_object())
386 .and_then(|obj| obj.get(*field))
387 .cloned()
388 .ok_or_else(|| {
389 CamelError::ProcessorError(format!(
390 "Expression '{}': field '{}' not found in body (note: nested field access is not supported; use a flat body structure or pass the value via header/property)",
391 expr, field
392 ))
393 })
394 }
395 ["header", name] => message.header(name).cloned().ok_or_else(|| {
396 CamelError::ProcessorError(format!(
397 "Expression '{}': header '{}' not found",
398 expr, name
399 ))
400 }),
401 ["property", key] => exchange.property(key).cloned().ok_or_else(|| {
402 CamelError::ProcessorError(format!(
403 "Expression '{}': property '{}' not found",
404 expr, key
405 ))
406 }),
407 _ => Err(CamelError::ProcessorError(format!(
408 "Unknown expression syntax: '{}'. Use body.<field>, header.<name>, or property.<key>",
409 expr
410 ))),
411 }
412}
413
414pub fn is_select_query(sql: &str) -> bool {
420 let upper = sql.trim().to_uppercase();
421 upper.starts_with("SELECT")
422 || upper.starts_with("TABLE")
423 || upper.starts_with("SHOW")
424 || upper.starts_with("EXPLAIN")
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430 use camel_api::{Body, Exchange, Message};
431
432 #[test]
433 fn test_parse_no_params() {
434 let tpl = parse_query_template("select * from users", '#').unwrap();
435 assert_eq!(tpl.fragments.len(), 1);
436 assert!(tpl.params.is_empty());
437 }
438
439 #[test]
440 fn test_parse_positional_params() {
441 let tpl = parse_query_template("insert into t values (#, #)", '#').unwrap();
442 assert_eq!(tpl.params.len(), 2);
443 assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
444 assert!(matches!(tpl.params[1], ParamSlot::Positional(1)));
445 }
446
447 #[test]
448 fn test_parse_named_params() {
449 let tpl =
450 parse_query_template("select * from t where id = :#id and name = :#name", '#').unwrap();
451 assert_eq!(tpl.params.len(), 2);
452 assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
453 assert!(matches!(&tpl.params[1], ParamSlot::Named(n) if n == "name"));
454 }
455
456 #[test]
457 fn test_parse_mixed_params() {
458 let tpl =
459 parse_query_template("select * from t where id = :#id and status = #", '#').unwrap();
460 assert_eq!(tpl.params.len(), 2);
461 assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
462 assert!(matches!(tpl.params[1], ParamSlot::Positional(0)));
463 }
464
465 #[test]
466 fn test_parse_in_clause() {
467 let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
468 assert_eq!(tpl.params.len(), 1);
469 assert!(matches!(&tpl.params[0], ParamSlot::InClause(n) if n == "ids"));
470 }
471
472 #[test]
473 fn test_resolve_named_from_headers() {
474 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
475 let mut msg = Message::default();
476 msg.set_header("id", serde_json::json!(42));
477 let ex = Exchange::new(msg);
478
479 let prepared = resolve_params(&tpl, &ex).unwrap();
480 assert_eq!(prepared.sql, "select * from t where id = $1");
481 assert_eq!(prepared.bindings.len(), 1);
482 assert_eq!(prepared.bindings[0], serde_json::json!(42));
483 }
484
485 #[test]
486 fn test_resolve_named_from_body_map() {
487 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
488 let msg = Message::new(Body::Json(serde_json::json!({"id": 99})));
489 let ex = Exchange::new(msg);
490
491 let prepared = resolve_params(&tpl, &ex).unwrap();
492 assert_eq!(prepared.bindings[0], serde_json::json!(99));
493 }
494
495 #[test]
496 fn test_resolve_positional_from_body_array() {
497 let tpl = parse_query_template("insert into t values (#, #)", '#').unwrap();
498 let msg = Message::new(Body::Json(serde_json::json!(["foo", 42])));
499 let ex = Exchange::new(msg);
500
501 let prepared = resolve_params(&tpl, &ex).unwrap();
502 assert_eq!(prepared.sql, "insert into t values ($1, $2)");
503 assert_eq!(prepared.bindings[0], serde_json::json!("foo"));
504 assert_eq!(prepared.bindings[1], serde_json::json!(42));
505 }
506
507 #[test]
508 fn test_resolve_named_from_properties() {
509 let tpl = parse_query_template("select * from t where id = :#myProp", '#').unwrap();
510 let mut ex = Exchange::new(Message::default());
511 ex.set_property("myProp", serde_json::json!(7));
512
513 let prepared = resolve_params(&tpl, &ex).unwrap();
514 assert_eq!(prepared.bindings[0], serde_json::json!(7));
515 }
516
517 #[test]
518 fn test_resolve_named_not_found() {
519 let tpl = parse_query_template("select * from t where id = :#missing", '#').unwrap();
520 let ex = Exchange::new(Message::default());
521
522 let result = resolve_params(&tpl, &ex);
523 assert!(result.is_err());
524 }
525
526 #[test]
527 fn test_resolve_in_clause_expansion() {
528 let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
529 let mut msg = Message::default();
530 msg.set_header("ids", serde_json::json!([1, 2, 3]));
531 let ex = Exchange::new(msg);
532
533 let prepared = resolve_params(&tpl, &ex).unwrap();
534 assert_eq!(prepared.sql, "select * from t where id in ($1, $2, $3)");
535 assert_eq!(
536 prepared.bindings,
537 vec![
538 serde_json::json!(1),
539 serde_json::json!(2),
540 serde_json::json!(3)
541 ]
542 );
543 }
544
545 #[test]
546 fn test_build_sql_correct_placeholders() {
547 let tpl = parse_query_template(
548 "select * from t where a = :#x and b = # and c in (:#in:ids)",
549 '#',
550 )
551 .unwrap();
552 let mut msg = Message::new(Body::Json(serde_json::json!(["pos_val"])));
553 msg.set_header("x", serde_json::json!("hello"));
554 msg.set_header("ids", serde_json::json!([10, 20]));
555 let ex = Exchange::new(msg);
556
557 let prepared = resolve_params(&tpl, &ex).unwrap();
558 assert_eq!(
559 prepared.sql,
560 "select * from t where a = $1 and b = $2 and c in ($3, $4)"
561 );
562 assert_eq!(prepared.bindings.len(), 4);
563 }
564
565 #[test]
566 fn test_is_select() {
567 assert!(is_select_query("SELECT * FROM t"));
568 assert!(is_select_query(" select * from t"));
569 assert!(!is_select_query("WITH cte AS (SELECT 1) SELECT * FROM cte"));
571 assert!(!is_select_query(
572 "WITH cte AS (UPDATE t SET x = 1 RETURNING *) SELECT * FROM cte"
573 ));
574 assert!(is_select_query("TABLE users"));
575 assert!(is_select_query("SHOW TABLES"));
576 assert!(is_select_query("EXPLAIN SELECT * FROM t"));
577 assert!(!is_select_query("INSERT INTO t VALUES (1)"));
578 assert!(!is_select_query("UPDATE t SET x = 1"));
579 assert!(!is_select_query("DELETE FROM t"));
580 }
581
582 #[test]
583 fn test_parse_trailing_param() {
584 let tpl = parse_query_template("select * from t where id = #", '#').unwrap();
585 assert_eq!(tpl.params.len(), 1);
586 assert_eq!(tpl.fragments.len(), 2);
587 assert_eq!(tpl.fragments[0], "select * from t where id = ");
588 assert_eq!(tpl.fragments[1], "");
589 }
590
591 #[test]
592 fn test_parse_leading_param() {
593 let tpl = parse_query_template("# = id", '#').unwrap();
594 assert_eq!(tpl.params.len(), 1);
595 assert_eq!(tpl.fragments.len(), 2);
596 assert_eq!(tpl.fragments[0], "");
597 assert_eq!(tpl.fragments[1], " = id");
598 }
599
600 #[test]
601 fn test_parse_consecutive_params() {
602 let tpl = parse_query_template("# # #", '#').unwrap();
603 assert_eq!(tpl.params.len(), 3);
604 assert_eq!(tpl.fragments.len(), 4);
605 assert_eq!(tpl.fragments[0], "");
606 assert_eq!(tpl.fragments[1], " ");
607 assert_eq!(tpl.fragments[2], " ");
608 assert_eq!(tpl.fragments[3], "");
609 }
610
611 #[test]
612 fn test_resolution_priority_body_over_headers() {
613 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
615 let mut msg = Message::new(Body::Json(serde_json::json!({"id": 1})));
616 msg.set_header("id", serde_json::json!(2)); let ex = Exchange::new(msg);
618
619 let prepared = resolve_params(&tpl, &ex).unwrap();
620 assert_eq!(prepared.bindings[0], serde_json::json!(1)); }
622
623 #[test]
624 fn test_resolution_priority_headers_over_properties() {
625 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
627 let mut msg = Message::default();
628 msg.set_header("id", serde_json::json!(10));
629 let mut ex = Exchange::new(msg);
630 ex.set_property("id", serde_json::json!(20)); let prepared = resolve_params(&tpl, &ex).unwrap();
633 assert_eq!(prepared.bindings[0], serde_json::json!(10)); }
635
636 #[test]
637 fn test_custom_placeholder_char() {
638 let tpl = parse_query_template("select * from t where id = :@id", '@').unwrap();
639 assert_eq!(tpl.params.len(), 1);
640 assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
641 }
642
643 #[test]
644 fn test_parse_expression_param() {
645 let tpl = parse_query_template("select * from t where id = :#${body.id}", '#').unwrap();
646 assert_eq!(tpl.params.len(), 1);
647 assert!(matches!(&tpl.params[0], ParamSlot::Expression(e) if e == "body.id"));
648 }
649
650 #[test]
651 fn test_resolve_expression_from_body() {
652 let tpl = parse_query_template("select * from t where id = :#${body.id}", '#').unwrap();
653 let msg = Message::new(Body::Json(serde_json::json!({"id": 42})));
654 let ex = Exchange::new(msg);
655 let prepared = resolve_params(&tpl, &ex).unwrap();
656 assert_eq!(prepared.sql, "select * from t where id = $1");
657 assert_eq!(prepared.bindings[0], serde_json::json!(42));
658 }
659
660 #[test]
661 fn test_resolve_expression_from_header() {
662 let tpl =
663 parse_query_template("select * from t where name = :#${header.name}", '#').unwrap();
664 let mut msg = Message::default();
665 msg.set_header("name", serde_json::json!("alice"));
666 let ex = Exchange::new(msg);
667 let prepared = resolve_params(&tpl, &ex).unwrap();
668 assert_eq!(prepared.bindings[0], serde_json::json!("alice"));
669 }
670
671 #[test]
672 fn test_resolve_expression_from_property() {
673 let tpl =
674 parse_query_template("select * from t where k = :#${property.myKey}", '#').unwrap();
675 let mut ex = Exchange::new(Message::default());
676 ex.set_property("myKey", serde_json::json!(99));
677 let prepared = resolve_params(&tpl, &ex).unwrap();
678 assert_eq!(prepared.bindings[0], serde_json::json!(99));
679 }
680
681 #[test]
682 fn test_parse_hash_in_string_literal() {
683 let tpl =
685 parse_query_template("select * from t where x = '#literal' and id = #", '#').unwrap();
686 assert_eq!(tpl.params.len(), 1);
687 assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
688 }
689
690 #[test]
691 fn test_parse_escaped_quote_in_literal() {
692 let tpl =
694 parse_query_template("select * from t where x = 'it''s' and id = #", '#').unwrap();
695 assert_eq!(tpl.params.len(), 1);
696 assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
697 }
698
699 #[test]
700 fn test_empty_in_clause_produces_null() {
701 let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
702 let mut msg = Message::default();
703 msg.set_header("ids", serde_json::json!([]));
704 let ex = Exchange::new(msg);
705 let prepared = resolve_params(&tpl, &ex).unwrap();
706 assert_eq!(prepared.sql, "select * from t where id in (NULL)");
707 assert!(prepared.bindings.is_empty());
708 }
709}