1use camel_component_api::{Body, CamelError, Exchange};
7
8#[derive(Debug, Clone, PartialEq)]
10pub enum ParamSlot {
11 Positional(usize),
13 Named(String),
15 InClause(String),
17 Expression(String),
19}
20
21#[derive(Debug, Clone)]
23pub struct QueryTemplate {
24 pub fragments: Vec<String>,
26 pub params: Vec<ParamSlot>,
28}
29
30#[derive(Debug, Clone)]
32pub struct PreparedQuery {
33 pub sql: String,
35 pub bindings: Vec<serde_json::Value>,
37}
38
39pub fn parse_query_template(
54 template: &str,
55 placeholder: char,
56) -> Result<QueryTemplate, CamelError> {
57 let mut fragments = Vec::new();
58 let mut params = Vec::new();
59 let mut positional_index = 0usize;
60
61 let chars: Vec<char> = template.chars().collect();
62 let mut i = 0usize;
63 let mut last_param_end = 0usize;
64 let mut in_literal = false;
65
66 while i < chars.len() {
67 if chars[i] == '\'' {
69 if in_literal && i + 1 < chars.len() && chars[i + 1] == '\'' {
71 i += 2;
72 continue;
73 }
74 in_literal = !in_literal;
75 i += 1;
76 continue;
77 }
78
79 if !in_literal && chars[i] == placeholder {
81 if i > 0 && chars[i - 1] == ':' {
83 let is_in_clause = check_in_prefix(&chars, i + 1);
89 let is_expression =
94 i + 2 < chars.len() && chars[i + 1] == '$' && chars[i + 2] == '{';
95
96 if is_in_clause {
97 let name_start = i + 4;
100 let (name, name_end) = extract_param_name(&chars, name_start);
101
102 if name.is_empty() {
103 return Err(CamelError::ProcessorError(format!(
104 "Empty IN clause parameter name at position {}",
105 i
106 )));
107 }
108
109 fragments.push(chars[last_param_end..(i - 1)].iter().collect());
111
112 params.push(ParamSlot::InClause(name));
113 last_param_end = name_end;
114 i = name_end;
115 } else if is_expression {
116 let brace_start = i + 2;
119 if let Some(brace_end) = find_matching_brace(&chars, brace_start) {
120 let expr_content: String =
122 chars[(brace_start + 1)..brace_end].iter().collect();
123
124 if expr_content.is_empty() {
125 return Err(CamelError::ProcessorError(format!(
126 "Empty expression at position {}",
127 i
128 )));
129 }
130
131 fragments.push(chars[last_param_end..(i - 1)].iter().collect());
133
134 params.push(ParamSlot::Expression(expr_content));
135 last_param_end = brace_end + 1;
136 i = brace_end + 1;
137 } else {
138 return Err(CamelError::ProcessorError(format!(
139 "Unclosed expression at position {}",
140 i
141 )));
142 }
143 } else {
144 let name_start = i + 1;
146 let (name, name_end) = extract_param_name(&chars, name_start);
147
148 if name.is_empty() {
149 return Err(CamelError::ProcessorError(format!(
150 "Empty named parameter name at position {}",
151 i
152 )));
153 }
154
155 fragments.push(chars[last_param_end..(i - 1)].iter().collect());
157
158 params.push(ParamSlot::Named(name));
159 last_param_end = name_end;
160 i = name_end;
161 }
162 } else {
163 fragments.push(chars[last_param_end..i].iter().collect());
166
167 params.push(ParamSlot::Positional(positional_index));
168 positional_index += 1;
169 last_param_end = i + 1;
170 i += 1;
171 }
172 } else {
173 i += 1;
174 }
175 }
176
177 fragments.push(chars[last_param_end..].iter().collect());
179
180 Ok(QueryTemplate { fragments, params })
181}
182
183fn check_in_prefix(chars: &[char], start: usize) -> bool {
185 let in_prefix: Vec<char> = "in:".chars().collect();
186 if start + in_prefix.len() > chars.len() {
187 return false;
188 }
189 chars[start..start + in_prefix.len()] == in_prefix[..]
190}
191
192fn extract_param_name(chars: &[char], start: usize) -> (String, usize) {
196 let mut name = String::new();
197 let mut i = start;
198
199 while i < chars.len() {
200 let c = chars[i];
201 if c.is_alphanumeric() || c == '_' {
202 name.push(c);
203 i += 1;
204 } else {
205 break;
206 }
207 }
208
209 (name, i)
210}
211
212fn find_matching_brace(chars: &[char], start: usize) -> Option<usize> {
215 chars[start..]
217 .iter()
218 .position(|&c| c == '}')
219 .map(|p| start + p)
220}
221
222pub fn resolve_params(
241 tpl: &QueryTemplate,
242 exchange: &Exchange,
243 in_separator: &str,
244) -> Result<PreparedQuery, CamelError> {
245 let mut sql_parts = Vec::new();
246 let mut bindings = Vec::new();
247 let mut placeholder_num = 1usize;
248
249 let body_json = match &exchange.input.body {
251 Body::Json(val) => Some(val),
252 _ => None,
253 };
254
255 let body_array = body_json.as_ref().and_then(|v| v.as_array());
257
258 for (i, param) in tpl.params.iter().enumerate() {
259 sql_parts.push(tpl.fragments[i].clone());
261
262 match param {
263 ParamSlot::Positional(idx) => {
264 let arr = body_array.ok_or_else(|| {
265 CamelError::ProcessorError(
266 "Positional parameter requires body to be a JSON array".to_string(),
267 )
268 })?;
269
270 let value = arr.get(*idx).ok_or_else(|| {
271 CamelError::ProcessorError(format!(
272 "Positional parameter index {} out of bounds (array length {})",
273 idx,
274 arr.len()
275 ))
276 })?;
277
278 sql_parts.push(format!("${}", placeholder_num));
279 placeholder_num += 1;
280 bindings.push(value.clone());
281 }
282 ParamSlot::Named(name) => {
283 let value = resolve_named_param(name, body_json, &exchange.input, exchange)?;
284 sql_parts.push(format!("${}", placeholder_num));
285 placeholder_num += 1;
286 bindings.push(value);
287 }
288 ParamSlot::InClause(name) => {
289 let value = resolve_named_param(name, body_json, &exchange.input, exchange)?;
290
291 let arr = value.as_array().ok_or_else(|| {
292 CamelError::ProcessorError(format!(
293 "IN clause parameter '{}' must be an array, got type {}",
294 name,
295 match &value {
296 serde_json::Value::Null => "null",
297 serde_json::Value::Bool(_) => "bool",
298 serde_json::Value::Number(_) => "number",
299 serde_json::Value::String(_) => "string",
300 serde_json::Value::Array(_) => "array",
301 serde_json::Value::Object(_) => "object",
302 }
303 ))
304 })?;
305
306 if arr.is_empty() {
307 sql_parts.push("NULL".to_string());
310 } else {
311 let placeholders: Vec<String> = arr
312 .iter()
313 .map(|_| {
314 let p = format!("${}", placeholder_num);
315 placeholder_num += 1;
316 p
317 })
318 .collect();
319
320 sql_parts.push(placeholders.join(in_separator));
322 bindings.extend(arr.iter().cloned());
323 }
324 }
325 ParamSlot::Expression(expr) => {
326 let value = resolve_expression_param(expr, body_json, &exchange.input, exchange)?;
327 sql_parts.push(format!("${}", placeholder_num));
328 placeholder_num += 1;
329 bindings.push(value);
330 }
331 }
332 }
333
334 sql_parts.push(tpl.fragments.last().cloned().unwrap_or_default());
336
337 Ok(PreparedQuery {
338 sql: sql_parts.join(""),
339 bindings,
340 })
341}
342
343fn resolve_named_param(
345 name: &str,
346 body_json: Option<&serde_json::Value>,
347 message: &camel_component_api::Message,
348 exchange: &Exchange,
349) -> Result<serde_json::Value, CamelError> {
350 if let Some(json) = body_json
352 && let Some(obj) = json.as_object()
353 && let Some(value) = obj.get(name)
354 {
355 return Ok(value.clone());
356 }
357
358 if let Some(value) = message.header(name) {
360 return Ok(value.clone());
361 }
362
363 if let Some(value) = exchange.property(name) {
365 return Ok(value.clone());
366 }
367
368 Err(CamelError::ProcessorError(format!(
369 "Named parameter '{}' not found in body, headers, or properties",
370 name
371 )))
372}
373
374fn resolve_expression_param(
377 expr: &str,
378 body_json: Option<&serde_json::Value>,
379 message: &camel_component_api::Message,
380 exchange: &Exchange,
381) -> Result<serde_json::Value, CamelError> {
382 let parts: Vec<&str> = expr.splitn(2, '.').collect();
383 match parts.as_slice() {
384 ["body", field] => {
385 body_json
387 .and_then(|v| v.as_object())
388 .and_then(|obj| obj.get(*field))
389 .cloned()
390 .ok_or_else(|| {
391 CamelError::ProcessorError(format!(
392 "Expression '{}': field '{}' not found in body (note: nested field access is not supported; use a flat body structure or pass the value via header/property)",
393 expr, field
394 ))
395 })
396 }
397 ["header", name] => message.header(name).cloned().ok_or_else(|| {
398 CamelError::ProcessorError(format!(
399 "Expression '{}': header '{}' not found",
400 expr, name
401 ))
402 }),
403 ["property", key] => exchange.property(key).cloned().ok_or_else(|| {
404 CamelError::ProcessorError(format!(
405 "Expression '{}': property '{}' not found",
406 expr, key
407 ))
408 }),
409 _ => Err(CamelError::ProcessorError(format!(
410 "Unknown expression syntax: '{}'. Use body.<field>, header.<name>, or property.<key>",
411 expr
412 ))),
413 }
414}
415
416pub fn is_select_query(sql: &str) -> bool {
422 let upper = sql.trim().to_uppercase();
423 upper.starts_with("SELECT")
424 || upper.starts_with("TABLE")
425 || upper.starts_with("SHOW")
426 || upper.starts_with("EXPLAIN")
427}
428
429const DEFAULT_IN_SEPARATOR: &str = ", ";
430
431pub fn resolve_params_default(
432 tpl: &QueryTemplate,
433 exchange: &Exchange,
434) -> Result<PreparedQuery, CamelError> {
435 resolve_params(tpl, exchange, DEFAULT_IN_SEPARATOR)
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use camel_component_api::{Body, Exchange, Message};
442
443 #[test]
444 fn parse_no_params() {
445 let tpl = parse_query_template("select * from users", '#').unwrap();
446 assert_eq!(tpl.fragments.len(), 1);
447 assert!(tpl.params.is_empty());
448 }
449
450 #[test]
451 fn parse_positional_params() {
452 let tpl = parse_query_template("insert into t values (#, #)", '#').unwrap();
453 assert_eq!(tpl.params.len(), 2);
454 assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
455 assert!(matches!(tpl.params[1], ParamSlot::Positional(1)));
456 }
457
458 #[test]
459 fn parse_named_params() {
460 let tpl =
461 parse_query_template("select * from t where id = :#id and name = :#name", '#').unwrap();
462 assert_eq!(tpl.params.len(), 2);
463 assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
464 assert!(matches!(&tpl.params[1], ParamSlot::Named(n) if n == "name"));
465 }
466
467 #[test]
468 fn parse_mixed_params() {
469 let tpl =
470 parse_query_template("select * from t where id = :#id and status = #", '#').unwrap();
471 assert_eq!(tpl.params.len(), 2);
472 assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
473 assert!(matches!(tpl.params[1], ParamSlot::Positional(0)));
474 }
475
476 #[test]
477 fn parse_in_clause() {
478 let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
479 assert_eq!(tpl.params.len(), 1);
480 assert!(matches!(&tpl.params[0], ParamSlot::InClause(n) if n == "ids"));
481 }
482
483 #[test]
484 fn resolve_named_from_headers() {
485 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
486 let mut msg = Message::default();
487 msg.set_header("id", serde_json::json!(42));
488 let ex = Exchange::new(msg);
489
490 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
491 assert_eq!(prepared.sql, "select * from t where id = $1");
492 assert_eq!(prepared.bindings.len(), 1);
493 assert_eq!(prepared.bindings[0], serde_json::json!(42));
494 }
495
496 #[test]
497 fn resolve_named_from_body_map() {
498 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
499 let msg = Message::new(Body::Json(serde_json::json!({"id": 99})));
500 let ex = Exchange::new(msg);
501
502 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
503 assert_eq!(prepared.bindings[0], serde_json::json!(99));
504 }
505
506 #[test]
507 fn resolve_positional_from_body_array() {
508 let tpl = parse_query_template("insert into t values (#, #)", '#').unwrap();
509 let msg = Message::new(Body::Json(serde_json::json!(["foo", 42])));
510 let ex = Exchange::new(msg);
511
512 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
513 assert_eq!(prepared.sql, "insert into t values ($1, $2)");
514 assert_eq!(prepared.bindings[0], serde_json::json!("foo"));
515 assert_eq!(prepared.bindings[1], serde_json::json!(42));
516 }
517
518 #[test]
519 fn resolve_named_from_properties() {
520 let tpl = parse_query_template("select * from t where id = :#myProp", '#').unwrap();
521 let mut ex = Exchange::new(Message::default());
522 ex.set_property("myProp", serde_json::json!(7));
523
524 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
525 assert_eq!(prepared.bindings[0], serde_json::json!(7));
526 }
527
528 #[test]
529 fn resolve_named_not_found() {
530 let tpl = parse_query_template("select * from t where id = :#missing", '#').unwrap();
531 let ex = Exchange::new(Message::default());
532
533 let result = resolve_params(&tpl, &ex, ", ");
534 assert!(result.is_err());
535 }
536
537 #[test]
538 fn resolve_in_clause_expansion() {
539 let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
540 let mut msg = Message::default();
541 msg.set_header("ids", serde_json::json!([1, 2, 3]));
542 let ex = Exchange::new(msg);
543
544 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
545 assert_eq!(prepared.sql, "select * from t where id in ($1, $2, $3)");
546 assert_eq!(
547 prepared.bindings,
548 vec![
549 serde_json::json!(1),
550 serde_json::json!(2),
551 serde_json::json!(3)
552 ]
553 );
554 }
555
556 #[test]
557 fn build_sql_correct_placeholders() {
558 let tpl = parse_query_template(
559 "select * from t where a = :#x and b = # and c in (:#in:ids)",
560 '#',
561 )
562 .unwrap();
563 let mut msg = Message::new(Body::Json(serde_json::json!(["pos_val"])));
564 msg.set_header("x", serde_json::json!("hello"));
565 msg.set_header("ids", serde_json::json!([10, 20]));
566 let ex = Exchange::new(msg);
567
568 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
569 assert_eq!(
570 prepared.sql,
571 "select * from t where a = $1 and b = $2 and c in ($3, $4)"
572 );
573 assert_eq!(prepared.bindings.len(), 4);
574 }
575
576 #[test]
577 fn is_select() {
578 assert!(is_select_query("SELECT * FROM t"));
579 assert!(is_select_query(" select * from t"));
580 assert!(!is_select_query("WITH cte AS (SELECT 1) SELECT * FROM cte"));
582 assert!(!is_select_query(
583 "WITH cte AS (UPDATE t SET x = 1 RETURNING *) SELECT * FROM cte"
584 ));
585 assert!(is_select_query("TABLE users"));
586 assert!(is_select_query("SHOW TABLES"));
587 assert!(is_select_query("EXPLAIN SELECT * FROM t"));
588 assert!(!is_select_query("INSERT INTO t VALUES (1)"));
589 assert!(!is_select_query("UPDATE t SET x = 1"));
590 assert!(!is_select_query("DELETE FROM t"));
591 }
592
593 #[test]
594 fn parse_trailing_param() {
595 let tpl = parse_query_template("select * from t where id = #", '#').unwrap();
596 assert_eq!(tpl.params.len(), 1);
597 assert_eq!(tpl.fragments.len(), 2);
598 assert_eq!(tpl.fragments[0], "select * from t where id = ");
599 assert_eq!(tpl.fragments[1], "");
600 }
601
602 #[test]
603 fn parse_leading_param() {
604 let tpl = parse_query_template("# = id", '#').unwrap();
605 assert_eq!(tpl.params.len(), 1);
606 assert_eq!(tpl.fragments.len(), 2);
607 assert_eq!(tpl.fragments[0], "");
608 assert_eq!(tpl.fragments[1], " = id");
609 }
610
611 #[test]
612 fn parse_consecutive_params() {
613 let tpl = parse_query_template("# # #", '#').unwrap();
614 assert_eq!(tpl.params.len(), 3);
615 assert_eq!(tpl.fragments.len(), 4);
616 assert_eq!(tpl.fragments[0], "");
617 assert_eq!(tpl.fragments[1], " ");
618 assert_eq!(tpl.fragments[2], " ");
619 assert_eq!(tpl.fragments[3], "");
620 }
621
622 #[test]
623 fn resolution_priority_body_over_headers() {
624 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
626 let mut msg = Message::new(Body::Json(serde_json::json!({"id": 1})));
627 msg.set_header("id", serde_json::json!(2)); let ex = Exchange::new(msg);
629
630 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
631 assert_eq!(prepared.bindings[0], serde_json::json!(1)); }
633
634 #[test]
635 fn resolution_priority_headers_over_properties() {
636 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
638 let mut msg = Message::default();
639 msg.set_header("id", serde_json::json!(10));
640 let mut ex = Exchange::new(msg);
641 ex.set_property("id", serde_json::json!(20)); let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
644 assert_eq!(prepared.bindings[0], serde_json::json!(10)); }
646
647 #[test]
648 fn custom_placeholder_char() {
649 let tpl = parse_query_template("select * from t where id = :@id", '@').unwrap();
650 assert_eq!(tpl.params.len(), 1);
651 assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
652 }
653
654 #[test]
655 fn parse_expression_param() {
656 let tpl = parse_query_template("select * from t where id = :#${body.id}", '#').unwrap();
657 assert_eq!(tpl.params.len(), 1);
658 assert!(matches!(&tpl.params[0], ParamSlot::Expression(e) if e == "body.id"));
659 }
660
661 #[test]
662 fn resolve_expression_from_body() {
663 let tpl = parse_query_template("select * from t where id = :#${body.id}", '#').unwrap();
664 let msg = Message::new(Body::Json(serde_json::json!({"id": 42})));
665 let ex = Exchange::new(msg);
666 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
667 assert_eq!(prepared.sql, "select * from t where id = $1");
668 assert_eq!(prepared.bindings[0], serde_json::json!(42));
669 }
670
671 #[test]
672 fn resolve_expression_from_header() {
673 let tpl =
674 parse_query_template("select * from t where name = :#${header.name}", '#').unwrap();
675 let mut msg = Message::default();
676 msg.set_header("name", serde_json::json!("alice"));
677 let ex = Exchange::new(msg);
678 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
679 assert_eq!(prepared.bindings[0], serde_json::json!("alice"));
680 }
681
682 #[test]
683 fn resolve_expression_from_property() {
684 let tpl =
685 parse_query_template("select * from t where k = :#${property.myKey}", '#').unwrap();
686 let mut ex = Exchange::new(Message::default());
687 ex.set_property("myKey", serde_json::json!(99));
688 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
689 assert_eq!(prepared.bindings[0], serde_json::json!(99));
690 }
691
692 #[test]
693 fn parse_hash_in_string_literal() {
694 let tpl =
696 parse_query_template("select * from t where x = '#literal' and id = #", '#').unwrap();
697 assert_eq!(tpl.params.len(), 1);
698 assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
699 }
700
701 #[test]
702 fn parse_escaped_quote_in_literal() {
703 let tpl =
705 parse_query_template("select * from t where x = 'it''s' and id = #", '#').unwrap();
706 assert_eq!(tpl.params.len(), 1);
707 assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
708 }
709
710 #[test]
711 fn empty_in_clause_produces_null() {
712 let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
713 let mut msg = Message::default();
714 msg.set_header("ids", serde_json::json!([]));
715 let ex = Exchange::new(msg);
716 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
717 assert_eq!(prepared.sql, "select * from t where id in (NULL)");
718 assert!(prepared.bindings.is_empty());
719 }
720
721 #[test]
722 fn in_clause_custom_separator() {
723 let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
724 let mut msg = Message::default();
725 msg.set_header("ids", serde_json::json!([1, 2, 3]));
726 let ex = Exchange::new(msg);
727
728 let prepared = resolve_params(&tpl, &ex, ";").unwrap();
729 assert_eq!(prepared.sql, "select * from t where id in ($1;$2;$3)");
730 }
731}