1use camel_component_api::{Body, CamelError, Exchange};
13
14#[derive(Debug, Clone, PartialEq)]
16pub enum ParamSlot {
17 Positional(usize),
19 Named(String),
21 InClause(String),
23 Expression(String),
25}
26
27#[derive(Debug, Clone)]
29pub struct QueryTemplate {
30 pub fragments: Vec<String>,
32 pub params: Vec<ParamSlot>,
34}
35
36#[derive(Debug, Clone)]
38pub struct PreparedQuery {
39 pub sql: String,
41 pub bindings: Vec<serde_json::Value>,
43}
44
45pub fn parse_query_template(
60 template: &str,
61 placeholder: char,
62) -> Result<QueryTemplate, CamelError> {
63 let mut fragments = Vec::new();
64 let mut params = Vec::new();
65 let mut positional_index = 0usize;
66
67 let chars: Vec<char> = template.chars().collect();
68 let mut i = 0usize;
69 let mut last_param_end = 0usize;
70 let mut in_literal = false;
71
72 while i < chars.len() {
73 if chars[i] == '\'' {
75 if in_literal && i + 1 < chars.len() && chars[i + 1] == '\'' {
77 i += 2;
78 continue;
79 }
80 in_literal = !in_literal;
81 i += 1;
82 continue;
83 }
84
85 if !in_literal && chars[i] == placeholder {
87 if i > 0 && chars[i - 1] == ':' {
89 let is_in_clause = check_in_prefix(&chars, i + 1);
95 let is_expression =
100 i + 2 < chars.len() && chars[i + 1] == '$' && chars[i + 2] == '{';
101
102 if is_in_clause {
103 let name_start = i + 4;
106 let (name, name_end) = extract_param_name(&chars, name_start);
107
108 if name.is_empty() {
109 return Err(CamelError::ProcessorError(format!(
110 "Empty IN clause parameter name at position {}",
111 i
112 )));
113 }
114
115 fragments.push(chars[last_param_end..(i - 1)].iter().collect());
117
118 params.push(ParamSlot::InClause(name));
119 last_param_end = name_end;
120 i = name_end;
121 } else if is_expression {
122 let brace_start = i + 2;
125 if let Some(brace_end) = find_matching_brace(&chars, brace_start) {
126 let expr_content: String =
128 chars[(brace_start + 1)..brace_end].iter().collect();
129
130 if expr_content.is_empty() {
131 return Err(CamelError::ProcessorError(format!(
132 "Empty expression at position {}",
133 i
134 )));
135 }
136
137 fragments.push(chars[last_param_end..(i - 1)].iter().collect());
139
140 params.push(ParamSlot::Expression(expr_content));
141 last_param_end = brace_end + 1;
142 i = brace_end + 1;
143 } else {
144 return Err(CamelError::ProcessorError(format!(
145 "Unclosed expression at position {}",
146 i
147 )));
148 }
149 } else {
150 let name_start = i + 1;
152 let (name, name_end) = extract_param_name(&chars, name_start);
153
154 if name.is_empty() {
155 return Err(CamelError::ProcessorError(format!(
156 "Empty named parameter name at position {}",
157 i
158 )));
159 }
160
161 fragments.push(chars[last_param_end..(i - 1)].iter().collect());
163
164 params.push(ParamSlot::Named(name));
165 last_param_end = name_end;
166 i = name_end;
167 }
168 } else {
169 fragments.push(chars[last_param_end..i].iter().collect());
172
173 params.push(ParamSlot::Positional(positional_index));
174 positional_index += 1;
175 last_param_end = i + 1;
176 i += 1;
177 }
178 } else {
179 i += 1;
180 }
181 }
182
183 fragments.push(chars[last_param_end..].iter().collect());
185
186 Ok(QueryTemplate { fragments, params })
187}
188
189fn check_in_prefix(chars: &[char], start: usize) -> bool {
191 let in_prefix: Vec<char> = "in:".chars().collect();
192 if start + in_prefix.len() > chars.len() {
193 return false;
194 }
195 chars[start..start + in_prefix.len()] == in_prefix[..]
196}
197
198fn extract_param_name(chars: &[char], start: usize) -> (String, usize) {
202 let mut name = String::new();
203 let mut i = start;
204
205 while i < chars.len() {
206 let c = chars[i];
207 if c.is_alphanumeric() || c == '_' {
208 name.push(c);
209 i += 1;
210 } else {
211 break;
212 }
213 }
214
215 (name, i)
216}
217
218fn find_matching_brace(chars: &[char], start: usize) -> Option<usize> {
221 chars[start..]
223 .iter()
224 .position(|&c| c == '}')
225 .map(|p| start + p)
226}
227
228pub fn resolve_params(
247 tpl: &QueryTemplate,
248 exchange: &Exchange,
249 in_separator: &str,
250) -> Result<PreparedQuery, CamelError> {
251 let mut sql_parts = Vec::new();
252 let mut bindings = Vec::new();
253 let mut placeholder_num = 1usize;
254
255 let body_json = match &exchange.input.body {
257 Body::Json(val) => Some(val),
258 _ => None,
259 };
260
261 let body_array = body_json.as_ref().and_then(|v| v.as_array());
263
264 for (i, param) in tpl.params.iter().enumerate() {
265 sql_parts.push(tpl.fragments[i].clone());
267
268 match param {
269 ParamSlot::Positional(idx) => {
270 let arr = body_array.ok_or_else(|| {
271 CamelError::ProcessorError(
272 "Positional parameter requires body to be a JSON array".to_string(),
273 )
274 })?;
275
276 let value = arr.get(*idx).ok_or_else(|| {
277 CamelError::ProcessorError(format!(
278 "Positional parameter index {} out of bounds (array length {})",
279 idx,
280 arr.len()
281 ))
282 })?;
283
284 sql_parts.push(format!("${}", placeholder_num));
285 placeholder_num += 1;
286 bindings.push(value.clone());
287 }
288 ParamSlot::Named(name) => {
289 let value = resolve_named_param(name, body_json, &exchange.input, exchange)?;
290 sql_parts.push(format!("${}", placeholder_num));
291 placeholder_num += 1;
292 bindings.push(value);
293 }
294 ParamSlot::InClause(name) => {
295 let value = resolve_named_param(name, body_json, &exchange.input, exchange)?;
296
297 let arr = value.as_array().ok_or_else(|| {
298 CamelError::ProcessorError(format!(
299 "IN clause parameter '{}' must be an array, got type {}",
300 name,
301 match &value {
302 serde_json::Value::Null => "null",
303 serde_json::Value::Bool(_) => "bool",
304 serde_json::Value::Number(_) => "number",
305 serde_json::Value::String(_) => "string",
306 serde_json::Value::Array(_) => "array",
307 serde_json::Value::Object(_) => "object",
308 }
309 ))
310 })?;
311
312 if arr.is_empty() {
313 sql_parts.push("NULL".to_string());
316 } else {
317 let placeholders: Vec<String> = arr
318 .iter()
319 .map(|_| {
320 let p = format!("${}", placeholder_num);
321 placeholder_num += 1;
322 p
323 })
324 .collect();
325
326 sql_parts.push(placeholders.join(in_separator));
328 bindings.extend(arr.iter().cloned());
329 }
330 }
331 ParamSlot::Expression(expr) => {
332 let value = resolve_expression_param(expr, body_json, &exchange.input, exchange)?;
333 sql_parts.push(format!("${}", placeholder_num));
334 placeholder_num += 1;
335 bindings.push(value);
336 }
337 }
338 }
339
340 sql_parts.push(tpl.fragments.last().cloned().unwrap_or_default());
342
343 Ok(PreparedQuery {
344 sql: sql_parts.join(""),
345 bindings,
346 })
347}
348
349fn resolve_named_param(
351 name: &str,
352 body_json: Option<&serde_json::Value>,
353 message: &camel_component_api::Message,
354 exchange: &Exchange,
355) -> Result<serde_json::Value, CamelError> {
356 if let Some(json) = body_json
358 && let Some(obj) = json.as_object()
359 && let Some(value) = obj.get(name)
360 {
361 return Ok(value.clone());
362 }
363
364 if let Some(value) = message.header(name) {
366 return Ok(value.clone());
367 }
368
369 if let Some(value) = exchange.property(name) {
371 return Ok(value.clone());
372 }
373
374 Err(CamelError::ProcessorError(format!(
375 "Named parameter '{}' not found in body, headers, or properties",
376 name
377 )))
378}
379
380fn resolve_expression_param(
383 expr: &str,
384 body_json: Option<&serde_json::Value>,
385 message: &camel_component_api::Message,
386 exchange: &Exchange,
387) -> Result<serde_json::Value, CamelError> {
388 let parts: Vec<&str> = expr.splitn(2, '.').collect();
389 match parts.as_slice() {
390 ["body", field] => {
391 body_json
393 .and_then(|v| v.as_object())
394 .and_then(|obj| obj.get(*field))
395 .cloned()
396 .ok_or_else(|| {
397 CamelError::ProcessorError(format!(
398 "Expression '{}': field '{}' not found in body (note: nested field access is not supported; use a flat body structure or pass the value via header/property)",
399 expr, field
400 ))
401 })
402 }
403 ["header", name] => message.header(name).cloned().ok_or_else(|| {
404 CamelError::ProcessorError(format!(
405 "Expression '{}': header '{}' not found",
406 expr, name
407 ))
408 }),
409 ["property", key] => exchange.property(key).cloned().ok_or_else(|| {
410 CamelError::ProcessorError(format!(
411 "Expression '{}': property '{}' not found",
412 expr, key
413 ))
414 }),
415 _ => Err(CamelError::ProcessorError(format!(
416 "Unknown expression syntax: '{}'. Use body.<field>, header.<name>, or property.<key>",
417 expr
418 ))),
419 }
420}
421
422pub fn is_select_query(sql: &str) -> bool {
424 let trimmed = sql.trim_start().to_uppercase();
425 if trimmed.starts_with("WITH") {
426 return trimmed.contains("SELECT")
427 && !trimmed.contains("INSERT INTO")
428 && !trimmed.contains("UPDATE ")
429 && !trimmed.contains("DELETE FROM");
430 }
431
432 trimmed.starts_with("SELECT")
433 || trimmed.starts_with("VALUES")
434 || trimmed.starts_with("TABLE")
435 || trimmed.starts_with("SHOW")
436 || trimmed.starts_with("EXPLAIN")
437}
438
439const DEFAULT_IN_SEPARATOR: &str = ", ";
440
441pub fn resolve_params_default(
442 tpl: &QueryTemplate,
443 exchange: &Exchange,
444) -> Result<PreparedQuery, CamelError> {
445 resolve_params(tpl, exchange, DEFAULT_IN_SEPARATOR)
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451 use camel_component_api::{Body, Exchange, Message};
452
453 #[test]
454 fn parse_no_params() {
455 let tpl = parse_query_template("select * from users", '#').unwrap();
456 assert_eq!(tpl.fragments.len(), 1);
457 assert!(tpl.params.is_empty());
458 }
459
460 #[test]
461 fn parse_positional_params() {
462 let tpl = parse_query_template("insert into t values (#, #)", '#').unwrap();
463 assert_eq!(tpl.params.len(), 2);
464 assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
465 assert!(matches!(tpl.params[1], ParamSlot::Positional(1)));
466 }
467
468 #[test]
469 fn parse_named_params() {
470 let tpl =
471 parse_query_template("select * from t where id = :#id and name = :#name", '#').unwrap();
472 assert_eq!(tpl.params.len(), 2);
473 assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
474 assert!(matches!(&tpl.params[1], ParamSlot::Named(n) if n == "name"));
475 }
476
477 #[test]
478 fn parse_mixed_params() {
479 let tpl =
480 parse_query_template("select * from t where id = :#id and status = #", '#').unwrap();
481 assert_eq!(tpl.params.len(), 2);
482 assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
483 assert!(matches!(tpl.params[1], ParamSlot::Positional(0)));
484 }
485
486 #[test]
487 fn parse_in_clause() {
488 let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
489 assert_eq!(tpl.params.len(), 1);
490 assert!(matches!(&tpl.params[0], ParamSlot::InClause(n) if n == "ids"));
491 }
492
493 #[test]
494 fn resolve_named_from_headers() {
495 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
496 let mut msg = Message::default();
497 msg.set_header("id", serde_json::json!(42));
498 let ex = Exchange::new(msg);
499
500 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
501 assert_eq!(prepared.sql, "select * from t where id = $1");
502 assert_eq!(prepared.bindings.len(), 1);
503 assert_eq!(prepared.bindings[0], serde_json::json!(42));
504 }
505
506 #[test]
507 fn resolve_named_from_body_map() {
508 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
509 let msg = Message::new(Body::Json(serde_json::json!({"id": 99})));
510 let ex = Exchange::new(msg);
511
512 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
513 assert_eq!(prepared.bindings[0], serde_json::json!(99));
514 }
515
516 #[test]
517 fn resolve_positional_from_body_array() {
518 let tpl = parse_query_template("insert into t values (#, #)", '#').unwrap();
519 let msg = Message::new(Body::Json(serde_json::json!(["foo", 42])));
520 let ex = Exchange::new(msg);
521
522 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
523 assert_eq!(prepared.sql, "insert into t values ($1, $2)");
524 assert_eq!(prepared.bindings[0], serde_json::json!("foo"));
525 assert_eq!(prepared.bindings[1], serde_json::json!(42));
526 }
527
528 #[test]
529 fn resolve_named_from_properties() {
530 let tpl = parse_query_template("select * from t where id = :#myProp", '#').unwrap();
531 let mut ex = Exchange::new(Message::default());
532 ex.set_property("myProp", serde_json::json!(7));
533
534 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
535 assert_eq!(prepared.bindings[0], serde_json::json!(7));
536 }
537
538 #[test]
539 fn resolve_named_not_found() {
540 let tpl = parse_query_template("select * from t where id = :#missing", '#').unwrap();
541 let ex = Exchange::new(Message::default());
542
543 let result = resolve_params(&tpl, &ex, ", ");
544 assert!(result.is_err());
545 }
546
547 #[test]
548 fn resolve_in_clause_expansion() {
549 let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
550 let mut msg = Message::default();
551 msg.set_header("ids", serde_json::json!([1, 2, 3]));
552 let ex = Exchange::new(msg);
553
554 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
555 assert_eq!(prepared.sql, "select * from t where id in ($1, $2, $3)");
556 assert_eq!(
557 prepared.bindings,
558 vec![
559 serde_json::json!(1),
560 serde_json::json!(2),
561 serde_json::json!(3)
562 ]
563 );
564 }
565
566 #[test]
567 fn build_sql_correct_placeholders() {
568 let tpl = parse_query_template(
569 "select * from t where a = :#x and b = # and c in (:#in:ids)",
570 '#',
571 )
572 .unwrap();
573 let mut msg = Message::new(Body::Json(serde_json::json!(["pos_val"])));
574 msg.set_header("x", serde_json::json!("hello"));
575 msg.set_header("ids", serde_json::json!([10, 20]));
576 let ex = Exchange::new(msg);
577
578 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
579 assert_eq!(
580 prepared.sql,
581 "select * from t where a = $1 and b = $2 and c in ($3, $4)"
582 );
583 assert_eq!(prepared.bindings.len(), 4);
584 }
585
586 #[test]
587 fn is_select() {
588 assert!(is_select_query("SELECT * FROM t"));
589 assert!(is_select_query(" select * from t"));
590 assert!(is_select_query("WITH cte AS (SELECT 1) SELECT * FROM cte"));
591 assert!(is_select_query(
592 "with results as (select 1) select * from results"
593 ));
594 assert!(!is_select_query(
595 "WITH cte AS (SELECT id FROM t) INSERT INTO other SELECT * FROM cte"
596 ));
597 assert!(is_select_query("TABLE users"));
598 assert!(is_select_query("SHOW TABLES"));
599 assert!(is_select_query("EXPLAIN SELECT * FROM t"));
600 assert!(!is_select_query("INSERT INTO t VALUES (1)"));
601 assert!(!is_select_query("UPDATE t SET x = 1"));
602 assert!(!is_select_query("DELETE FROM t"));
603 }
604
605 #[test]
606 fn parse_trailing_param() {
607 let tpl = parse_query_template("select * from t where id = #", '#').unwrap();
608 assert_eq!(tpl.params.len(), 1);
609 assert_eq!(tpl.fragments.len(), 2);
610 assert_eq!(tpl.fragments[0], "select * from t where id = ");
611 assert_eq!(tpl.fragments[1], "");
612 }
613
614 #[test]
615 fn parse_leading_param() {
616 let tpl = parse_query_template("# = id", '#').unwrap();
617 assert_eq!(tpl.params.len(), 1);
618 assert_eq!(tpl.fragments.len(), 2);
619 assert_eq!(tpl.fragments[0], "");
620 assert_eq!(tpl.fragments[1], " = id");
621 }
622
623 #[test]
624 fn parse_consecutive_params() {
625 let tpl = parse_query_template("# # #", '#').unwrap();
626 assert_eq!(tpl.params.len(), 3);
627 assert_eq!(tpl.fragments.len(), 4);
628 assert_eq!(tpl.fragments[0], "");
629 assert_eq!(tpl.fragments[1], " ");
630 assert_eq!(tpl.fragments[2], " ");
631 assert_eq!(tpl.fragments[3], "");
632 }
633
634 #[test]
635 fn resolution_priority_body_over_headers() {
636 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
638 let mut msg = Message::new(Body::Json(serde_json::json!({"id": 1})));
639 msg.set_header("id", serde_json::json!(2)); let ex = Exchange::new(msg);
641
642 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
643 assert_eq!(prepared.bindings[0], serde_json::json!(1)); }
645
646 #[test]
647 fn resolution_priority_headers_over_properties() {
648 let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
650 let mut msg = Message::default();
651 msg.set_header("id", serde_json::json!(10));
652 let mut ex = Exchange::new(msg);
653 ex.set_property("id", serde_json::json!(20)); let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
656 assert_eq!(prepared.bindings[0], serde_json::json!(10)); }
658
659 #[test]
660 fn custom_placeholder_char() {
661 let tpl = parse_query_template("select * from t where id = :@id", '@').unwrap();
662 assert_eq!(tpl.params.len(), 1);
663 assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
664 }
665
666 #[test]
667 fn parse_expression_param() {
668 let tpl = parse_query_template("select * from t where id = :#${body.id}", '#').unwrap();
669 assert_eq!(tpl.params.len(), 1);
670 assert!(matches!(&tpl.params[0], ParamSlot::Expression(e) if e == "body.id"));
671 }
672
673 #[test]
674 fn resolve_expression_from_body() {
675 let tpl = parse_query_template("select * from t where id = :#${body.id}", '#').unwrap();
676 let msg = Message::new(Body::Json(serde_json::json!({"id": 42})));
677 let ex = Exchange::new(msg);
678 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
679 assert_eq!(prepared.sql, "select * from t where id = $1");
680 assert_eq!(prepared.bindings[0], serde_json::json!(42));
681 }
682
683 #[test]
684 fn resolve_expression_from_header() {
685 let tpl =
686 parse_query_template("select * from t where name = :#${header.name}", '#').unwrap();
687 let mut msg = Message::default();
688 msg.set_header("name", serde_json::json!("alice"));
689 let ex = Exchange::new(msg);
690 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
691 assert_eq!(prepared.bindings[0], serde_json::json!("alice"));
692 }
693
694 #[test]
695 fn resolve_expression_from_property() {
696 let tpl =
697 parse_query_template("select * from t where k = :#${property.myKey}", '#').unwrap();
698 let mut ex = Exchange::new(Message::default());
699 ex.set_property("myKey", serde_json::json!(99));
700 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
701 assert_eq!(prepared.bindings[0], serde_json::json!(99));
702 }
703
704 #[test]
705 fn parse_hash_in_string_literal() {
706 let tpl =
708 parse_query_template("select * from t where x = '#literal' and id = #", '#').unwrap();
709 assert_eq!(tpl.params.len(), 1);
710 assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
711 }
712
713 #[test]
714 fn parse_escaped_quote_in_literal() {
715 let tpl =
717 parse_query_template("select * from t where x = 'it''s' and id = #", '#').unwrap();
718 assert_eq!(tpl.params.len(), 1);
719 assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
720 }
721
722 #[test]
723 fn empty_in_clause_produces_null() {
724 let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
725 let mut msg = Message::default();
726 msg.set_header("ids", serde_json::json!([]));
727 let ex = Exchange::new(msg);
728 let prepared = resolve_params(&tpl, &ex, ", ").unwrap();
729 assert_eq!(prepared.sql, "select * from t where id in (NULL)");
730 assert!(prepared.bindings.is_empty());
731 }
732
733 #[test]
734 fn in_clause_custom_separator() {
735 let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
736 let mut msg = Message::default();
737 msg.set_header("ids", serde_json::json!([1, 2, 3]));
738 let ex = Exchange::new(msg);
739
740 let prepared = resolve_params(&tpl, &ex, ";").unwrap();
741 assert_eq!(prepared.sql, "select * from t where id in ($1;$2;$3)");
742 }
743}