use camel_api::{Body, CamelError, Exchange};
#[derive(Debug, Clone, PartialEq)]
pub enum ParamSlot {
Positional(usize),
Named(String),
InClause(String),
Expression(String),
}
#[derive(Debug, Clone)]
pub struct QueryTemplate {
pub fragments: Vec<String>,
pub params: Vec<ParamSlot>,
}
#[derive(Debug, Clone)]
pub struct PreparedQuery {
pub sql: String,
pub bindings: Vec<serde_json::Value>,
}
pub fn parse_query_template(
template: &str,
placeholder: char,
) -> Result<QueryTemplate, CamelError> {
let mut fragments = Vec::new();
let mut params = Vec::new();
let mut positional_index = 0usize;
let chars: Vec<char> = template.chars().collect();
let mut i = 0usize;
let mut last_param_end = 0usize;
let mut in_literal = false;
while i < chars.len() {
if chars[i] == '\'' {
if in_literal && i + 1 < chars.len() && chars[i + 1] == '\'' {
i += 2;
continue;
}
in_literal = !in_literal;
i += 1;
continue;
}
if !in_literal && chars[i] == placeholder {
if i > 0 && chars[i - 1] == ':' {
let is_in_clause = check_in_prefix(&chars, i + 1);
let is_expression =
i + 2 < chars.len() && chars[i + 1] == '$' && chars[i + 2] == '{';
if is_in_clause {
let name_start = i + 4;
let (name, name_end) = extract_param_name(&chars, name_start);
if name.is_empty() {
return Err(CamelError::ProcessorError(format!(
"Empty IN clause parameter name at position {}",
i
)));
}
fragments.push(chars[last_param_end..(i - 1)].iter().collect());
params.push(ParamSlot::InClause(name));
last_param_end = name_end;
i = name_end;
} else if is_expression {
let brace_start = i + 2;
if let Some(brace_end) = find_matching_brace(&chars, brace_start) {
let expr_content: String =
chars[(brace_start + 1)..brace_end].iter().collect();
if expr_content.is_empty() {
return Err(CamelError::ProcessorError(format!(
"Empty expression at position {}",
i
)));
}
fragments.push(chars[last_param_end..(i - 1)].iter().collect());
params.push(ParamSlot::Expression(expr_content));
last_param_end = brace_end + 1;
i = brace_end + 1;
} else {
return Err(CamelError::ProcessorError(format!(
"Unclosed expression at position {}",
i
)));
}
} else {
let name_start = i + 1;
let (name, name_end) = extract_param_name(&chars, name_start);
if name.is_empty() {
return Err(CamelError::ProcessorError(format!(
"Empty named parameter name at position {}",
i
)));
}
fragments.push(chars[last_param_end..(i - 1)].iter().collect());
params.push(ParamSlot::Named(name));
last_param_end = name_end;
i = name_end;
}
} else {
fragments.push(chars[last_param_end..i].iter().collect());
params.push(ParamSlot::Positional(positional_index));
positional_index += 1;
last_param_end = i + 1;
i += 1;
}
} else {
i += 1;
}
}
fragments.push(chars[last_param_end..].iter().collect());
Ok(QueryTemplate { fragments, params })
}
fn check_in_prefix(chars: &[char], start: usize) -> bool {
let in_prefix: Vec<char> = "in:".chars().collect();
if start + in_prefix.len() > chars.len() {
return false;
}
chars[start..start + in_prefix.len()] == in_prefix[..]
}
fn extract_param_name(chars: &[char], start: usize) -> (String, usize) {
let mut name = String::new();
let mut i = start;
while i < chars.len() {
let c = chars[i];
if c.is_alphanumeric() || c == '_' {
name.push(c);
i += 1;
} else {
break;
}
}
(name, i)
}
fn find_matching_brace(chars: &[char], start: usize) -> Option<usize> {
chars[start..]
.iter()
.position(|&c| c == '}')
.map(|p| start + p)
}
pub fn resolve_params(
tpl: &QueryTemplate,
exchange: &Exchange,
) -> Result<PreparedQuery, CamelError> {
let mut sql_parts = Vec::new();
let mut bindings = Vec::new();
let mut placeholder_num = 1usize;
let body_json = match &exchange.input.body {
Body::Json(val) => Some(val),
_ => None,
};
let body_array = body_json.as_ref().and_then(|v| v.as_array());
for (i, param) in tpl.params.iter().enumerate() {
sql_parts.push(tpl.fragments[i].clone());
match param {
ParamSlot::Positional(idx) => {
let arr = body_array.ok_or_else(|| {
CamelError::ProcessorError(
"Positional parameter requires body to be a JSON array".to_string(),
)
})?;
let value = arr.get(*idx).ok_or_else(|| {
CamelError::ProcessorError(format!(
"Positional parameter index {} out of bounds (array length {})",
idx,
arr.len()
))
})?;
sql_parts.push(format!("${}", placeholder_num));
placeholder_num += 1;
bindings.push(value.clone());
}
ParamSlot::Named(name) => {
let value = resolve_named_param(name, body_json, &exchange.input, exchange)?;
sql_parts.push(format!("${}", placeholder_num));
placeholder_num += 1;
bindings.push(value);
}
ParamSlot::InClause(name) => {
let value = resolve_named_param(name, body_json, &exchange.input, exchange)?;
let arr = value.as_array().ok_or_else(|| {
CamelError::ProcessorError(format!(
"IN clause parameter '{}' must be an array, got type {}",
name,
match &value {
serde_json::Value::Null => "null",
serde_json::Value::Bool(_) => "bool",
serde_json::Value::Number(_) => "number",
serde_json::Value::String(_) => "string",
serde_json::Value::Array(_) => "array",
serde_json::Value::Object(_) => "object",
}
))
})?;
if arr.is_empty() {
sql_parts.push("NULL".to_string());
} else {
let placeholders: Vec<String> = arr
.iter()
.map(|_| {
let p = format!("${}", placeholder_num);
placeholder_num += 1;
p
})
.collect();
sql_parts.push(placeholders.join(", "));
bindings.extend(arr.iter().cloned());
}
}
ParamSlot::Expression(expr) => {
let value = resolve_expression_param(expr, body_json, &exchange.input, exchange)?;
sql_parts.push(format!("${}", placeholder_num));
placeholder_num += 1;
bindings.push(value);
}
}
}
sql_parts.push(tpl.fragments.last().cloned().unwrap_or_default());
Ok(PreparedQuery {
sql: sql_parts.join(""),
bindings,
})
}
fn resolve_named_param(
name: &str,
body_json: Option<&serde_json::Value>,
message: &camel_api::Message,
exchange: &Exchange,
) -> Result<serde_json::Value, CamelError> {
if let Some(json) = body_json
&& let Some(obj) = json.as_object()
&& let Some(value) = obj.get(name)
{
return Ok(value.clone());
}
if let Some(value) = message.header(name) {
return Ok(value.clone());
}
if let Some(value) = exchange.property(name) {
return Ok(value.clone());
}
Err(CamelError::ProcessorError(format!(
"Named parameter '{}' not found in body, headers, or properties",
name
)))
}
fn resolve_expression_param(
expr: &str,
body_json: Option<&serde_json::Value>,
message: &camel_api::Message,
exchange: &Exchange,
) -> Result<serde_json::Value, CamelError> {
let parts: Vec<&str> = expr.splitn(2, '.').collect();
match parts.as_slice() {
["body", field] => {
body_json
.and_then(|v| v.as_object())
.and_then(|obj| obj.get(*field))
.cloned()
.ok_or_else(|| {
CamelError::ProcessorError(format!(
"Expression '{}': field '{}' not found in body (note: nested field access is not supported; use a flat body structure or pass the value via header/property)",
expr, field
))
})
}
["header", name] => message.header(name).cloned().ok_or_else(|| {
CamelError::ProcessorError(format!(
"Expression '{}': header '{}' not found",
expr, name
))
}),
["property", key] => exchange.property(key).cloned().ok_or_else(|| {
CamelError::ProcessorError(format!(
"Expression '{}': property '{}' not found",
expr, key
))
}),
_ => Err(CamelError::ProcessorError(format!(
"Unknown expression syntax: '{}'. Use body.<field>, header.<name>, or property.<key>",
expr
))),
}
}
pub fn is_select_query(sql: &str) -> bool {
let upper = sql.trim().to_uppercase();
upper.starts_with("SELECT")
|| upper.starts_with("TABLE")
|| upper.starts_with("SHOW")
|| upper.starts_with("EXPLAIN")
}
#[cfg(test)]
mod tests {
use super::*;
use camel_api::{Body, Exchange, Message};
#[test]
fn parse_no_params() {
let tpl = parse_query_template("select * from users", '#').unwrap();
assert_eq!(tpl.fragments.len(), 1);
assert!(tpl.params.is_empty());
}
#[test]
fn parse_positional_params() {
let tpl = parse_query_template("insert into t values (#, #)", '#').unwrap();
assert_eq!(tpl.params.len(), 2);
assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
assert!(matches!(tpl.params[1], ParamSlot::Positional(1)));
}
#[test]
fn parse_named_params() {
let tpl =
parse_query_template("select * from t where id = :#id and name = :#name", '#').unwrap();
assert_eq!(tpl.params.len(), 2);
assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
assert!(matches!(&tpl.params[1], ParamSlot::Named(n) if n == "name"));
}
#[test]
fn parse_mixed_params() {
let tpl =
parse_query_template("select * from t where id = :#id and status = #", '#').unwrap();
assert_eq!(tpl.params.len(), 2);
assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
assert!(matches!(tpl.params[1], ParamSlot::Positional(0)));
}
#[test]
fn parse_in_clause() {
let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
assert_eq!(tpl.params.len(), 1);
assert!(matches!(&tpl.params[0], ParamSlot::InClause(n) if n == "ids"));
}
#[test]
fn resolve_named_from_headers() {
let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
let mut msg = Message::default();
msg.set_header("id", serde_json::json!(42));
let ex = Exchange::new(msg);
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(prepared.sql, "select * from t where id = $1");
assert_eq!(prepared.bindings.len(), 1);
assert_eq!(prepared.bindings[0], serde_json::json!(42));
}
#[test]
fn resolve_named_from_body_map() {
let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
let msg = Message::new(Body::Json(serde_json::json!({"id": 99})));
let ex = Exchange::new(msg);
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(prepared.bindings[0], serde_json::json!(99));
}
#[test]
fn resolve_positional_from_body_array() {
let tpl = parse_query_template("insert into t values (#, #)", '#').unwrap();
let msg = Message::new(Body::Json(serde_json::json!(["foo", 42])));
let ex = Exchange::new(msg);
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(prepared.sql, "insert into t values ($1, $2)");
assert_eq!(prepared.bindings[0], serde_json::json!("foo"));
assert_eq!(prepared.bindings[1], serde_json::json!(42));
}
#[test]
fn resolve_named_from_properties() {
let tpl = parse_query_template("select * from t where id = :#myProp", '#').unwrap();
let mut ex = Exchange::new(Message::default());
ex.set_property("myProp", serde_json::json!(7));
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(prepared.bindings[0], serde_json::json!(7));
}
#[test]
fn resolve_named_not_found() {
let tpl = parse_query_template("select * from t where id = :#missing", '#').unwrap();
let ex = Exchange::new(Message::default());
let result = resolve_params(&tpl, &ex);
assert!(result.is_err());
}
#[test]
fn resolve_in_clause_expansion() {
let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
let mut msg = Message::default();
msg.set_header("ids", serde_json::json!([1, 2, 3]));
let ex = Exchange::new(msg);
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(prepared.sql, "select * from t where id in ($1, $2, $3)");
assert_eq!(
prepared.bindings,
vec![
serde_json::json!(1),
serde_json::json!(2),
serde_json::json!(3)
]
);
}
#[test]
fn build_sql_correct_placeholders() {
let tpl = parse_query_template(
"select * from t where a = :#x and b = # and c in (:#in:ids)",
'#',
)
.unwrap();
let mut msg = Message::new(Body::Json(serde_json::json!(["pos_val"])));
msg.set_header("x", serde_json::json!("hello"));
msg.set_header("ids", serde_json::json!([10, 20]));
let ex = Exchange::new(msg);
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(
prepared.sql,
"select * from t where a = $1 and b = $2 and c in ($3, $4)"
);
assert_eq!(prepared.bindings.len(), 4);
}
#[test]
fn is_select() {
assert!(is_select_query("SELECT * FROM t"));
assert!(is_select_query(" select * from t"));
assert!(!is_select_query("WITH cte AS (SELECT 1) SELECT * FROM cte"));
assert!(!is_select_query(
"WITH cte AS (UPDATE t SET x = 1 RETURNING *) SELECT * FROM cte"
));
assert!(is_select_query("TABLE users"));
assert!(is_select_query("SHOW TABLES"));
assert!(is_select_query("EXPLAIN SELECT * FROM t"));
assert!(!is_select_query("INSERT INTO t VALUES (1)"));
assert!(!is_select_query("UPDATE t SET x = 1"));
assert!(!is_select_query("DELETE FROM t"));
}
#[test]
fn parse_trailing_param() {
let tpl = parse_query_template("select * from t where id = #", '#').unwrap();
assert_eq!(tpl.params.len(), 1);
assert_eq!(tpl.fragments.len(), 2);
assert_eq!(tpl.fragments[0], "select * from t where id = ");
assert_eq!(tpl.fragments[1], "");
}
#[test]
fn parse_leading_param() {
let tpl = parse_query_template("# = id", '#').unwrap();
assert_eq!(tpl.params.len(), 1);
assert_eq!(tpl.fragments.len(), 2);
assert_eq!(tpl.fragments[0], "");
assert_eq!(tpl.fragments[1], " = id");
}
#[test]
fn parse_consecutive_params() {
let tpl = parse_query_template("# # #", '#').unwrap();
assert_eq!(tpl.params.len(), 3);
assert_eq!(tpl.fragments.len(), 4);
assert_eq!(tpl.fragments[0], "");
assert_eq!(tpl.fragments[1], " ");
assert_eq!(tpl.fragments[2], " ");
assert_eq!(tpl.fragments[3], "");
}
#[test]
fn resolution_priority_body_over_headers() {
let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
let mut msg = Message::new(Body::Json(serde_json::json!({"id": 1})));
msg.set_header("id", serde_json::json!(2)); let ex = Exchange::new(msg);
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(prepared.bindings[0], serde_json::json!(1)); }
#[test]
fn resolution_priority_headers_over_properties() {
let tpl = parse_query_template("select * from t where id = :#id", '#').unwrap();
let mut msg = Message::default();
msg.set_header("id", serde_json::json!(10));
let mut ex = Exchange::new(msg);
ex.set_property("id", serde_json::json!(20));
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(prepared.bindings[0], serde_json::json!(10)); }
#[test]
fn custom_placeholder_char() {
let tpl = parse_query_template("select * from t where id = :@id", '@').unwrap();
assert_eq!(tpl.params.len(), 1);
assert!(matches!(&tpl.params[0], ParamSlot::Named(n) if n == "id"));
}
#[test]
fn parse_expression_param() {
let tpl = parse_query_template("select * from t where id = :#${body.id}", '#').unwrap();
assert_eq!(tpl.params.len(), 1);
assert!(matches!(&tpl.params[0], ParamSlot::Expression(e) if e == "body.id"));
}
#[test]
fn resolve_expression_from_body() {
let tpl = parse_query_template("select * from t where id = :#${body.id}", '#').unwrap();
let msg = Message::new(Body::Json(serde_json::json!({"id": 42})));
let ex = Exchange::new(msg);
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(prepared.sql, "select * from t where id = $1");
assert_eq!(prepared.bindings[0], serde_json::json!(42));
}
#[test]
fn resolve_expression_from_header() {
let tpl =
parse_query_template("select * from t where name = :#${header.name}", '#').unwrap();
let mut msg = Message::default();
msg.set_header("name", serde_json::json!("alice"));
let ex = Exchange::new(msg);
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(prepared.bindings[0], serde_json::json!("alice"));
}
#[test]
fn resolve_expression_from_property() {
let tpl =
parse_query_template("select * from t where k = :#${property.myKey}", '#').unwrap();
let mut ex = Exchange::new(Message::default());
ex.set_property("myKey", serde_json::json!(99));
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(prepared.bindings[0], serde_json::json!(99));
}
#[test]
fn parse_hash_in_string_literal() {
let tpl =
parse_query_template("select * from t where x = '#literal' and id = #", '#').unwrap();
assert_eq!(tpl.params.len(), 1);
assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
}
#[test]
fn parse_escaped_quote_in_literal() {
let tpl =
parse_query_template("select * from t where x = 'it''s' and id = #", '#').unwrap();
assert_eq!(tpl.params.len(), 1);
assert!(matches!(tpl.params[0], ParamSlot::Positional(0)));
}
#[test]
fn empty_in_clause_produces_null() {
let tpl = parse_query_template("select * from t where id in (:#in:ids)", '#').unwrap();
let mut msg = Message::default();
msg.set_header("ids", serde_json::json!([]));
let ex = Exchange::new(msg);
let prepared = resolve_params(&tpl, &ex).unwrap();
assert_eq!(prepared.sql, "select * from t where id in (NULL)");
assert!(prepared.bindings.is_empty());
}
}