use crate::commands;
use crate::config::{RedisCommand, RedisEndpointConfig};
use camel_component_api::{CamelError, Exchange};
use redis::aio::MultiplexedConnection;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::Mutex;
use tower::Service;
#[derive(Clone)]
pub struct RedisProducer {
config: RedisEndpointConfig,
conn: Arc<Mutex<Option<MultiplexedConnection>>>,
}
impl RedisProducer {
pub fn new(config: RedisEndpointConfig) -> Self {
Self {
config,
conn: Arc::new(Mutex::new(None)),
}
}
async fn dispatch_command(
cmd: &RedisCommand,
conn: &mut MultiplexedConnection,
exchange: &mut Exchange,
) -> Result<(), CamelError> {
match cmd {
RedisCommand::Set
| RedisCommand::Get
| RedisCommand::Getset
| RedisCommand::Setnx
| RedisCommand::Setex
| RedisCommand::Mget
| RedisCommand::Mset
| RedisCommand::Incr
| RedisCommand::Incrby
| RedisCommand::Decr
| RedisCommand::Decrby
| RedisCommand::Append
| RedisCommand::Strlen => commands::string::dispatch(cmd, conn, exchange).await,
RedisCommand::Exists
| RedisCommand::Del
| RedisCommand::Expire
| RedisCommand::Expireat
| RedisCommand::Pexpire
| RedisCommand::Pexpireat
| RedisCommand::Ttl
| RedisCommand::Keys
| RedisCommand::Rename
| RedisCommand::Renamenx
| RedisCommand::Type
| RedisCommand::Persist
| RedisCommand::Move
| RedisCommand::Sort => commands::key::dispatch(cmd, conn, exchange).await,
RedisCommand::Lpush
| RedisCommand::Rpush
| RedisCommand::Lpushx
| RedisCommand::Rpushx
| RedisCommand::Lpop
| RedisCommand::Rpop
| RedisCommand::Blpop
| RedisCommand::Brpop
| RedisCommand::Llen
| RedisCommand::Lrange
| RedisCommand::Lindex
| RedisCommand::Linsert
| RedisCommand::Lset
| RedisCommand::Lrem
| RedisCommand::Ltrim
| RedisCommand::Rpoplpush => commands::list::dispatch(cmd, conn, exchange).await,
RedisCommand::Hset
| RedisCommand::Hget
| RedisCommand::Hsetnx
| RedisCommand::Hmset
| RedisCommand::Hmget
| RedisCommand::Hdel
| RedisCommand::Hexists
| RedisCommand::Hlen
| RedisCommand::Hkeys
| RedisCommand::Hvals
| RedisCommand::Hgetall
| RedisCommand::Hincrby => commands::hash::dispatch(cmd, conn, exchange).await,
RedisCommand::Sadd
| RedisCommand::Srem
| RedisCommand::Smembers
| RedisCommand::Scard
| RedisCommand::Sismember
| RedisCommand::Spop
| RedisCommand::Smove
| RedisCommand::Sinter
| RedisCommand::Sunion
| RedisCommand::Sdiff
| RedisCommand::Sinterstore
| RedisCommand::Sunionstore
| RedisCommand::Sdiffstore
| RedisCommand::Srandmember => commands::set::dispatch(cmd, conn, exchange).await,
RedisCommand::Zadd
| RedisCommand::Zrem
| RedisCommand::Zrange
| RedisCommand::Zrevrange
| RedisCommand::Zrank
| RedisCommand::Zrevrank
| RedisCommand::Zscore
| RedisCommand::Zcard
| RedisCommand::Zincrby
| RedisCommand::Zcount
| RedisCommand::Zrangebyscore
| RedisCommand::Zrevrangebyscore
| RedisCommand::Zremrangebyrank
| RedisCommand::Zremrangebyscore
| RedisCommand::Zunionstore
| RedisCommand::Zinterstore => commands::zset::dispatch(cmd, conn, exchange).await,
RedisCommand::Publish | RedisCommand::Subscribe | RedisCommand::Psubscribe => {
commands::pubsub::dispatch(cmd, conn, exchange).await
}
RedisCommand::Ping | RedisCommand::Echo => {
commands::other::dispatch(cmd, conn, exchange).await
}
}
}
fn resolve_command(exchange: &Exchange, config: &RedisEndpointConfig) -> RedisCommand {
exchange
.input
.header("CamelRedis.Command")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| config.command.clone())
}
fn apply_default_key(exchange: &mut Exchange, config: &RedisEndpointConfig) {
if exchange.input.header("CamelRedis.Key").is_none()
&& let Some(ref key) = config.key
{
exchange
.input
.set_header("CamelRedis.Key", serde_json::Value::String(key.clone()));
}
}
fn apply_default_channels(exchange: &mut Exchange, config: &RedisEndpointConfig) {
if exchange.input.header("CamelRedis.Channels").is_none() && !config.channels.is_empty() {
exchange.input.set_header(
"CamelRedis.Channels",
serde_json::Value::Array(
config
.channels
.iter()
.map(|c| serde_json::Value::String(c.clone()))
.collect(),
),
);
}
}
}
impl Service<Exchange> for RedisProducer {
type Response = Exchange;
type Error = CamelError;
type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, mut exchange: Exchange) -> Self::Future {
let config = self.config.clone();
let conn = self.conn.clone();
Box::pin(async move {
let mut connection = {
let guard = conn.lock().await;
if let Some(c) = guard.as_ref() {
c.clone()
} else {
drop(guard);
let mut guard = conn.lock().await;
if let Some(c) = guard.as_ref() {
c.clone()
} else {
let redis_url = config.redis_url();
tracing::debug!("Creating Redis client with URL: {}", redis_url);
let client = redis::Client::open(redis_url.as_str()).map_err(|e| {
CamelError::ProcessorError(format!(
"Failed to create Redis client for URL '{}': {}",
redis_url, e
))
})?;
let new_conn =
client
.get_multiplexed_async_connection()
.await
.map_err(|e| {
CamelError::ProcessorError(format!(
"Failed to connect to Redis: {}",
e
))
})?;
*guard = Some(new_conn.clone());
new_conn
}
}
};
let cmd = Self::resolve_command(&exchange, &config);
Self::apply_default_key(&mut exchange, &config);
Self::apply_default_channels(&mut exchange, &config);
Self::dispatch_command(&cmd, &mut connection, &mut exchange).await?;
Ok(exchange)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use camel_component_api::Message;
#[test]
fn test_producer_new() {
let config = RedisEndpointConfig::from_uri("redis://localhost:6379").unwrap();
let producer = RedisProducer::new(config);
assert!(Arc::strong_count(&producer.conn) == 1);
}
#[test]
fn test_producer_clone_shares_connection() {
let config = RedisEndpointConfig::from_uri("redis://localhost:6379").unwrap();
let producer = RedisProducer::new(config);
let producer2 = producer.clone();
assert!(Arc::ptr_eq(&producer.conn, &producer2.conn));
}
#[test]
fn test_resolve_command_from_config() {
let config = RedisEndpointConfig::from_uri("redis://localhost:6379?command=GET").unwrap();
let exchange = Exchange::new(Message::default());
let cmd = RedisProducer::resolve_command(&exchange, &config);
assert_eq!(cmd, RedisCommand::Get);
}
#[test]
fn test_resolve_command_from_header() {
let config = RedisEndpointConfig::from_uri("redis://localhost:6379?command=SET").unwrap();
let mut msg = Message::default();
msg.set_header("CamelRedis.Command", serde_json::json!("GET"));
let exchange = Exchange::new(msg);
let cmd = RedisProducer::resolve_command(&exchange, &config);
assert_eq!(cmd, RedisCommand::Get);
}
#[test]
fn test_resolve_command_header_overrides_config() {
let config = RedisEndpointConfig::from_uri("redis://localhost:6379?command=SET").unwrap();
let mut msg = Message::default();
msg.set_header("CamelRedis.Command", serde_json::json!("INCR"));
let exchange = Exchange::new(msg);
let cmd = RedisProducer::resolve_command(&exchange, &config);
assert_eq!(cmd, RedisCommand::Incr);
}
#[test]
fn test_resolve_command_invalid_header_falls_back_to_config() {
let config = RedisEndpointConfig::from_uri("redis://localhost:6379?command=DECR").unwrap();
let mut msg = Message::default();
msg.set_header("CamelRedis.Command", serde_json::json!("NOT_A_COMMAND"));
let exchange = Exchange::new(msg);
let cmd = RedisProducer::resolve_command(&exchange, &config);
assert_eq!(cmd, RedisCommand::Decr);
}
#[test]
fn test_resolve_command_non_string_header_falls_back_to_config() {
let config =
RedisEndpointConfig::from_uri("redis://localhost:6379?command=EXISTS").unwrap();
let mut msg = Message::default();
msg.set_header("CamelRedis.Command", serde_json::json!(123));
let exchange = Exchange::new(msg);
let cmd = RedisProducer::resolve_command(&exchange, &config);
assert_eq!(cmd, RedisCommand::Exists);
}
#[test]
fn test_apply_default_key_sets_when_missing() {
let config = RedisEndpointConfig::from_uri("redis://localhost:6379?key=cfg-key").unwrap();
let mut exchange = Exchange::new(Message::default());
RedisProducer::apply_default_key(&mut exchange, &config);
assert_eq!(
exchange.input.header("CamelRedis.Key"),
Some(&serde_json::json!("cfg-key"))
);
}
#[test]
fn test_apply_default_key_preserves_existing_header() {
let config = RedisEndpointConfig::from_uri("redis://localhost:6379?key=cfg-key").unwrap();
let mut msg = Message::default();
msg.set_header("CamelRedis.Key", serde_json::json!("header-key"));
let mut exchange = Exchange::new(msg);
RedisProducer::apply_default_key(&mut exchange, &config);
assert_eq!(
exchange.input.header("CamelRedis.Key"),
Some(&serde_json::json!("header-key"))
);
}
#[test]
fn test_apply_default_channels_sets_when_missing() {
let config =
RedisEndpointConfig::from_uri("redis://localhost:6379?command=SUBSCRIBE&channels=a,b")
.unwrap();
let mut exchange = Exchange::new(Message::default());
RedisProducer::apply_default_channels(&mut exchange, &config);
assert_eq!(
exchange.input.header("CamelRedis.Channels"),
Some(&serde_json::json!(["a", "b"]))
);
}
#[test]
fn test_apply_default_channels_skips_when_empty() {
let config = RedisEndpointConfig::from_uri("redis://localhost:6379").unwrap();
let mut exchange = Exchange::new(Message::default());
RedisProducer::apply_default_channels(&mut exchange, &config);
assert!(exchange.input.header("CamelRedis.Channels").is_none());
}
#[tokio::test]
async fn test_producer_creates_connection_on_first_call() {
let config = RedisEndpointConfig::from_uri("redis://localhost:6379").unwrap();
let producer = RedisProducer::new(config);
{
let guard = producer.conn.lock().await;
assert!(guard.is_none());
}
}
}