use super::{get_i64_header, get_str_vec_header, get_u64_header, require_key, require_value};
use crate::config::RedisCommand;
use camel_component_api::{Body, CamelError, Exchange};
use redis::AsyncCommands;
use redis::aio::MultiplexedConnection;
pub(crate) fn is_string_command(cmd: &RedisCommand) -> bool {
matches!(
cmd,
RedisCommand::Set
| RedisCommand::Get
| RedisCommand::Getset
| RedisCommand::Setnx
| RedisCommand::Setex
| RedisCommand::Mget
| RedisCommand::Mset
| RedisCommand::Incr
| RedisCommand::Incrby
| RedisCommand::Decr
| RedisCommand::Decrby
| RedisCommand::Append
| RedisCommand::Strlen
)
}
pub(crate) fn resolve_timeout_seconds(exchange: &Exchange) -> u64 {
get_u64_header(exchange, "CamelRedis.Timeout").unwrap_or(0)
}
pub(crate) fn resolve_increment(exchange: &Exchange) -> i64 {
get_i64_header(exchange, "CamelRedis.Increment").unwrap_or(1)
}
pub(crate) fn resolve_mget_keys(exchange: &Exchange) -> Result<Vec<String>, CamelError> {
get_str_vec_header(exchange, "CamelRedis.Keys")
.ok_or_else(|| CamelError::ProcessorError("Missing CamelRedis.Keys".into()))
}
pub(crate) fn resolve_mset_values(
exchange: &Exchange,
) -> Result<Vec<(String, String)>, CamelError> {
let values = exchange
.input
.header("CamelRedis.Values")
.and_then(|v| v.as_object())
.ok_or_else(|| CamelError::ProcessorError("Missing CamelRedis.Values".into()))?;
Ok(values
.iter()
.map(|(k, v)| (k.clone(), v.to_string()))
.collect::<Vec<_>>())
}
pub(crate) fn json_from_optional_string(value: Option<String>) -> serde_json::Value {
value
.map(serde_json::Value::String)
.unwrap_or(serde_json::Value::Null)
}
pub(crate) fn json_array_from_optional_strings(values: Vec<Option<String>>) -> serde_json::Value {
serde_json::Value::Array(values.into_iter().map(json_from_optional_string).collect())
}
#[allow(dead_code)]
pub(crate) fn build_redis_cmd(
cmd: &RedisCommand,
exchange: &Exchange,
) -> Result<redis::Cmd, CamelError> {
if !is_string_command(cmd) {
return Err(CamelError::ProcessorError("Not a string command".into()));
}
let redis_cmd = match cmd {
RedisCommand::Set => {
let key = require_key(exchange)?;
let value = require_value(exchange)?;
let mut c = redis::cmd("SET");
c.arg(key).arg(value.to_string());
c
}
RedisCommand::Get => {
let key = require_key(exchange)?;
let mut c = redis::cmd("GET");
c.arg(key);
c
}
RedisCommand::Getset => {
let key = require_key(exchange)?;
let value = require_value(exchange)?;
let mut c = redis::cmd("GETSET");
c.arg(key).arg(value.to_string());
c
}
RedisCommand::Setnx => {
let key = require_key(exchange)?;
let value = require_value(exchange)?;
let mut c = redis::cmd("SETNX");
c.arg(key).arg(value.to_string());
c
}
RedisCommand::Setex => {
let key = require_key(exchange)?;
let value = require_value(exchange)?;
let ttl = resolve_timeout_seconds(exchange);
let mut c = redis::cmd("SETEX");
c.arg(key).arg(ttl).arg(value.to_string());
c
}
RedisCommand::Mget => {
let keys = resolve_mget_keys(exchange)?;
let mut c = redis::cmd("MGET");
for key in keys {
c.arg(key);
}
c
}
RedisCommand::Mset => {
let values = resolve_mset_values(exchange)?;
let mut c = redis::cmd("MSET");
for (k, v) in values {
c.arg(k).arg(v);
}
c
}
RedisCommand::Incr => {
let key = require_key(exchange)?;
let mut c = redis::cmd("INCR");
c.arg(key);
c
}
RedisCommand::Incrby => {
let key = require_key(exchange)?;
let by = resolve_increment(exchange);
let mut c = redis::cmd("INCRBY");
c.arg(key).arg(by);
c
}
RedisCommand::Decr => {
let key = require_key(exchange)?;
let mut c = redis::cmd("DECR");
c.arg(key);
c
}
RedisCommand::Decrby => {
let key = require_key(exchange)?;
let by = resolve_increment(exchange);
let mut c = redis::cmd("DECRBY");
c.arg(key).arg(by);
c
}
RedisCommand::Append => {
let key = require_key(exchange)?;
let value = require_value(exchange)?;
let mut c = redis::cmd("APPEND");
c.arg(key).arg(value.to_string());
c
}
RedisCommand::Strlen => {
let key = require_key(exchange)?;
let mut c = redis::cmd("STRLEN");
c.arg(key);
c
}
_ => unreachable!("non-string commands rejected above"),
};
Ok(redis_cmd)
}
pub async fn dispatch(
cmd: &RedisCommand,
conn: &mut MultiplexedConnection,
exchange: &mut Exchange,
) -> Result<(), CamelError> {
if !is_string_command(cmd) {
return Err(CamelError::ProcessorError("Not a string command".into()));
}
let result: serde_json::Value = match cmd {
RedisCommand::Set => {
let key = require_key(exchange)?;
let value = require_value(exchange)?;
conn.set::<_, _, ()>(&key, value.to_string())
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis SET failed: {e}")))?;
serde_json::Value::Null
}
RedisCommand::Get => {
let key = require_key(exchange)?;
let val: Option<String> = conn
.get(&key)
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis GET failed: {e}")))?;
json_from_optional_string(val)
}
RedisCommand::Getset => {
let key = require_key(exchange)?;
let value = require_value(exchange)?;
let old: Option<String> = conn
.getset(&key, value.to_string())
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis GETSET failed: {e}")))?;
json_from_optional_string(old)
}
RedisCommand::Setnx => {
let key = require_key(exchange)?;
let value = require_value(exchange)?;
let ok: bool = conn
.set_nx(&key, value.to_string())
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis SETNX failed: {e}")))?;
serde_json::Value::Bool(ok)
}
RedisCommand::Setex => {
let key = require_key(exchange)?;
let value = require_value(exchange)?;
let ttl = resolve_timeout_seconds(exchange);
conn.set_ex::<_, _, ()>(&key, value.to_string(), ttl)
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis SETEX failed: {e}")))?;
serde_json::Value::Null
}
RedisCommand::Mget => {
let keys = resolve_mget_keys(exchange)?;
let vals: Vec<Option<String>> = conn
.mget(&keys)
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis MGET failed: {e}")))?;
json_array_from_optional_strings(vals)
}
RedisCommand::Mset => {
let values = resolve_mset_values(exchange)?;
conn.mset::<_, _, ()>(&values)
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis MSET failed: {e}")))?;
serde_json::Value::Null
}
RedisCommand::Incr => {
let key = require_key(exchange)?;
let n: i64 = conn
.incr(&key, 1i64)
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis INCR failed: {e}")))?;
serde_json::json!(n)
}
RedisCommand::Incrby => {
let key = require_key(exchange)?;
let by = resolve_increment(exchange);
let n: i64 = conn
.incr(&key, by)
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis INCRBY failed: {e}")))?;
serde_json::json!(n)
}
RedisCommand::Decr => {
let key = require_key(exchange)?;
let n: i64 = conn
.decr(&key, 1i64)
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis DECR failed: {e}")))?;
serde_json::json!(n)
}
RedisCommand::Decrby => {
let key = require_key(exchange)?;
let by = resolve_increment(exchange);
let n: i64 = conn
.decr(&key, by)
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis DECRBY failed: {e}")))?;
serde_json::json!(n)
}
RedisCommand::Append => {
let key = require_key(exchange)?;
let value = require_value(exchange)?;
let n: i64 = conn
.append(&key, value.to_string())
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis APPEND failed: {e}")))?;
serde_json::json!(n)
}
RedisCommand::Strlen => {
let key = require_key(exchange)?;
let n: i64 = conn
.strlen(&key)
.await
.map_err(|e| CamelError::ProcessorError(format!("Redis STRLEN failed: {e}")))?;
serde_json::json!(n)
}
_ => unreachable!("non-string commands rejected above"),
};
exchange.input.body = Body::Json(result);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::RedisCommand;
use camel_component_api::{Exchange, Message};
fn ex_with(headers: &[(&str, serde_json::Value)]) -> Exchange {
let mut msg = Message::default();
for (k, v) in headers {
msg.set_header(*k, v.clone());
}
Exchange::new(msg)
}
#[tokio::test]
async fn test_set_missing_key_returns_err() {
let ex = Exchange::new(Message::default());
assert!(crate::commands::require_key(&ex).is_err());
}
#[test]
fn test_set_has_key_and_value() {
let ex = ex_with(&[
("CamelRedis.Key", serde_json::json!("mykey")),
("CamelRedis.Value", serde_json::json!("hello")),
]);
assert_eq!(crate::commands::require_key(&ex).unwrap(), "mykey");
assert_eq!(
crate::commands::require_value(&ex).unwrap(),
serde_json::json!("hello")
);
}
#[test]
fn test_string_command_classification() {
assert!(is_string_command(&RedisCommand::Set));
assert!(is_string_command(&RedisCommand::Append));
assert!(!is_string_command(&RedisCommand::Sadd));
}
#[test]
fn test_resolve_timeout_seconds_defaults_to_zero() {
let ex = Exchange::new(Message::default());
assert_eq!(resolve_timeout_seconds(&ex), 0);
}
#[test]
fn test_resolve_timeout_seconds_from_header() {
let ex = ex_with(&[("CamelRedis.Timeout", serde_json::json!(15))]);
assert_eq!(resolve_timeout_seconds(&ex), 15);
}
#[test]
fn test_resolve_mget_keys_requires_header() {
let ex = Exchange::new(Message::default());
let err = resolve_mget_keys(&ex).expect_err("keys should be required");
assert!(err.to_string().contains("CamelRedis.Keys"));
}
#[test]
fn test_resolve_mset_values_extracts_pairs() {
let ex = ex_with(&[("CamelRedis.Values", serde_json::json!({"a": 1, "b": "x"}))]);
let values = resolve_mset_values(&ex).expect("values should be present");
assert_eq!(values.len(), 2);
assert!(values.iter().any(|(k, _)| k == "a"));
assert!(values.iter().any(|(k, _)| k == "b"));
}
#[test]
fn test_resolve_increment_defaults_and_values() {
let ex_default = Exchange::new(Message::default());
assert_eq!(resolve_increment(&ex_default), 1);
let ex = ex_with(&[("CamelRedis.Increment", serde_json::json!(7))]);
assert_eq!(resolve_increment(&ex), 7);
}
#[test]
fn test_resolve_mget_keys_returns_values() {
let ex = ex_with(&[("CamelRedis.Keys", serde_json::json!(["a", "b"]))]);
assert_eq!(resolve_mget_keys(&ex).unwrap(), vec!["a", "b"]);
}
#[test]
fn test_resolve_mset_values_requires_header() {
let ex = Exchange::new(Message::default());
let err = resolve_mset_values(&ex).expect_err("values should be required");
assert!(err.to_string().contains("CamelRedis.Values"));
}
#[test]
fn test_json_from_optional_string_variants() {
assert_eq!(
json_from_optional_string(Some("x".to_string())),
serde_json::json!("x")
);
assert_eq!(json_from_optional_string(None), serde_json::Value::Null);
}
#[test]
fn test_json_array_from_optional_strings_mixed() {
let json = json_array_from_optional_strings(vec![Some("a".to_string()), None]);
assert_eq!(json, serde_json::json!(["a", null]));
}
fn cmd_args(cmd: &redis::Cmd) -> Vec<String> {
cmd.args_iter()
.skip(1)
.filter_map(|a| match a {
redis::Arg::Simple(bytes) => String::from_utf8(bytes.to_vec()).ok(),
redis::Arg::Cursor => Some("CURSOR".to_string()),
_ => None,
})
.collect()
}
fn cmd_name(cmd: &redis::Cmd) -> String {
cmd.args_iter()
.next()
.and_then(|a| match a {
redis::Arg::Simple(bytes) => String::from_utf8(bytes.to_vec()).ok(),
_ => None,
})
.unwrap_or_default()
}
#[test]
fn test_build_redis_cmd_set() {
let ex = ex_with(&[
("CamelRedis.Key", serde_json::json!("mykey")),
("CamelRedis.Value", serde_json::json!("hello")),
]);
let cmd = build_redis_cmd(&RedisCommand::Set, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "SET");
assert_eq!(cmd_args(&cmd), vec!["mykey", "\"hello\""]);
}
#[test]
fn test_build_redis_cmd_set_missing_key() {
let ex = ex_with(&[("CamelRedis.Value", serde_json::json!("v"))]);
assert!(build_redis_cmd(&RedisCommand::Set, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_set_missing_value() {
let ex = ex_with(&[("CamelRedis.Key", serde_json::json!("k"))]);
assert!(build_redis_cmd(&RedisCommand::Set, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_get() {
let ex = ex_with(&[("CamelRedis.Key", serde_json::json!("mykey"))]);
let cmd = build_redis_cmd(&RedisCommand::Get, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "GET");
assert_eq!(cmd_args(&cmd), vec!["mykey"]);
}
#[test]
fn test_build_redis_cmd_get_missing_key() {
let ex = Exchange::new(Message::default());
assert!(build_redis_cmd(&RedisCommand::Get, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_getset() {
let ex = ex_with(&[
("CamelRedis.Key", serde_json::json!("mykey")),
("CamelRedis.Value", serde_json::json!("newval")),
]);
let cmd = build_redis_cmd(&RedisCommand::Getset, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "GETSET");
assert_eq!(cmd_args(&cmd), vec!["mykey", "\"newval\""]);
}
#[test]
fn test_build_redis_cmd_getset_missing_key() {
let ex = ex_with(&[("CamelRedis.Value", serde_json::json!("v"))]);
assert!(build_redis_cmd(&RedisCommand::Getset, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_getset_missing_value() {
let ex = ex_with(&[("CamelRedis.Key", serde_json::json!("k"))]);
assert!(build_redis_cmd(&RedisCommand::Getset, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_setnx() {
let ex = ex_with(&[
("CamelRedis.Key", serde_json::json!("mykey")),
("CamelRedis.Value", serde_json::json!("hello")),
]);
let cmd = build_redis_cmd(&RedisCommand::Setnx, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "SETNX");
assert_eq!(cmd_args(&cmd), vec!["mykey", "\"hello\""]);
}
#[test]
fn test_build_redis_cmd_setnx_missing_key() {
let ex = ex_with(&[("CamelRedis.Value", serde_json::json!("v"))]);
assert!(build_redis_cmd(&RedisCommand::Setnx, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_setnx_missing_value() {
let ex = ex_with(&[("CamelRedis.Key", serde_json::json!("k"))]);
assert!(build_redis_cmd(&RedisCommand::Setnx, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_setex() {
let ex = ex_with(&[
("CamelRedis.Key", serde_json::json!("mykey")),
("CamelRedis.Value", serde_json::json!("hello")),
("CamelRedis.Timeout", serde_json::json!(60u64)),
]);
let cmd = build_redis_cmd(&RedisCommand::Setex, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "SETEX");
assert_eq!(cmd_args(&cmd), vec!["mykey", "60", "\"hello\""]);
}
#[test]
fn test_build_redis_cmd_setex_missing_key() {
let ex = ex_with(&[
("CamelRedis.Value", serde_json::json!("v")),
("CamelRedis.Timeout", serde_json::json!(60u64)),
]);
assert!(build_redis_cmd(&RedisCommand::Setex, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_setex_missing_value() {
let ex = ex_with(&[
("CamelRedis.Key", serde_json::json!("k")),
("CamelRedis.Timeout", serde_json::json!(60u64)),
]);
assert!(build_redis_cmd(&RedisCommand::Setex, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_mget() {
let ex = ex_with(&[("CamelRedis.Keys", serde_json::json!(["k1", "k2", "k3"]))]);
let cmd = build_redis_cmd(&RedisCommand::Mget, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "MGET");
assert_eq!(cmd_args(&cmd), vec!["k1", "k2", "k3"]);
}
#[test]
fn test_build_redis_cmd_mget_missing_keys() {
let ex = Exchange::new(Message::default());
assert!(build_redis_cmd(&RedisCommand::Mget, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_mset() {
let ex = ex_with(&[(
"CamelRedis.Values",
serde_json::json!({"key1": "val1", "key2": "val2"}),
)]);
let cmd = build_redis_cmd(&RedisCommand::Mset, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "MSET");
let args = cmd_args(&cmd);
assert!(args.contains(&"key1".to_string()));
assert!(args.contains(&"\"val1\"".to_string()));
assert!(args.contains(&"key2".to_string()));
assert!(args.contains(&"\"val2\"".to_string()));
assert_eq!(args.len(), 4);
}
#[test]
fn test_build_redis_cmd_mset_missing_values() {
let ex = Exchange::new(Message::default());
assert!(build_redis_cmd(&RedisCommand::Mset, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_incr() {
let ex = ex_with(&[("CamelRedis.Key", serde_json::json!("mykey"))]);
let cmd = build_redis_cmd(&RedisCommand::Incr, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "INCR");
assert_eq!(cmd_args(&cmd), vec!["mykey"]);
}
#[test]
fn test_build_redis_cmd_incr_missing_key() {
let ex = Exchange::new(Message::default());
assert!(build_redis_cmd(&RedisCommand::Incr, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_incrby() {
let ex = ex_with(&[
("CamelRedis.Key", serde_json::json!("mykey")),
("CamelRedis.Increment", serde_json::json!(5i64)),
]);
let cmd = build_redis_cmd(&RedisCommand::Incrby, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "INCRBY");
assert_eq!(cmd_args(&cmd), vec!["mykey", "5"]);
}
#[test]
fn test_build_redis_cmd_incrby_default_increment() {
let ex = ex_with(&[("CamelRedis.Key", serde_json::json!("mykey"))]);
let cmd = build_redis_cmd(&RedisCommand::Incrby, &ex).unwrap();
assert_eq!(cmd_args(&cmd), vec!["mykey", "1"]);
}
#[test]
fn test_build_redis_cmd_incrby_missing_key() {
let ex = Exchange::new(Message::default());
assert!(build_redis_cmd(&RedisCommand::Incrby, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_decr() {
let ex = ex_with(&[("CamelRedis.Key", serde_json::json!("mykey"))]);
let cmd = build_redis_cmd(&RedisCommand::Decr, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "DECR");
assert_eq!(cmd_args(&cmd), vec!["mykey"]);
}
#[test]
fn test_build_redis_cmd_decr_missing_key() {
let ex = Exchange::new(Message::default());
assert!(build_redis_cmd(&RedisCommand::Decr, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_decrby() {
let ex = ex_with(&[
("CamelRedis.Key", serde_json::json!("mykey")),
("CamelRedis.Increment", serde_json::json!(3i64)),
]);
let cmd = build_redis_cmd(&RedisCommand::Decrby, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "DECRBY");
assert_eq!(cmd_args(&cmd), vec!["mykey", "3"]);
}
#[test]
fn test_build_redis_cmd_decrby_default_increment() {
let ex = ex_with(&[("CamelRedis.Key", serde_json::json!("mykey"))]);
let cmd = build_redis_cmd(&RedisCommand::Decrby, &ex).unwrap();
assert_eq!(cmd_args(&cmd), vec!["mykey", "1"]);
}
#[test]
fn test_build_redis_cmd_decrby_missing_key() {
let ex = Exchange::new(Message::default());
assert!(build_redis_cmd(&RedisCommand::Decrby, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_append() {
let ex = ex_with(&[
("CamelRedis.Key", serde_json::json!("mykey")),
("CamelRedis.Value", serde_json::json!("suffix")),
]);
let cmd = build_redis_cmd(&RedisCommand::Append, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "APPEND");
assert_eq!(cmd_args(&cmd), vec!["mykey", "\"suffix\""]);
}
#[test]
fn test_build_redis_cmd_append_missing_key() {
let ex = ex_with(&[("CamelRedis.Value", serde_json::json!("v"))]);
assert!(build_redis_cmd(&RedisCommand::Append, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_append_missing_value() {
let ex = ex_with(&[("CamelRedis.Key", serde_json::json!("k"))]);
assert!(build_redis_cmd(&RedisCommand::Append, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_strlen() {
let ex = ex_with(&[("CamelRedis.Key", serde_json::json!("mykey"))]);
let cmd = build_redis_cmd(&RedisCommand::Strlen, &ex).unwrap();
assert_eq!(cmd_name(&cmd), "STRLEN");
assert_eq!(cmd_args(&cmd), vec!["mykey"]);
}
#[test]
fn test_build_redis_cmd_strlen_missing_key() {
let ex = Exchange::new(Message::default());
assert!(build_redis_cmd(&RedisCommand::Strlen, &ex).is_err());
}
#[test]
fn test_build_redis_cmd_rejects_non_string() {
let ex = ex_with(&[("CamelRedis.Key", serde_json::json!("k"))]);
assert!(build_redis_cmd(&RedisCommand::Sadd, &ex).is_err());
}
}