use async_trait::async_trait;
use camel_component_api::{Body, CamelError, Exchange, Message};
use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
use futures_util::StreamExt;
use redis::Msg;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use crate::config::{RedisCommand, RedisEndpointConfig};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueuePopCommand {
Blpop,
Brpop,
}
fn queue_command_name(pop_command: QueuePopCommand) -> &'static str {
match pop_command {
QueuePopCommand::Blpop => "BLPOP",
QueuePopCommand::Brpop => "BRPOP",
}
}
#[derive(Debug, Clone)]
pub enum RedisConsumerMode {
PubSub {
channels: Vec<String>,
patterns: Vec<String>,
},
Queue {
key: String,
timeout: u64,
pop_command: QueuePopCommand,
},
}
pub struct RedisConsumer {
config: RedisEndpointConfig,
mode: RedisConsumerMode,
cancel_token: Option<CancellationToken>,
task_handle: Option<JoinHandle<Result<(), CamelError>>>,
}
impl RedisConsumer {
pub fn new(config: RedisEndpointConfig) -> Self {
let mode = match &config.command {
RedisCommand::Subscribe => RedisConsumerMode::PubSub {
channels: config.channels.clone(),
patterns: vec![],
},
RedisCommand::Psubscribe => RedisConsumerMode::PubSub {
channels: vec![],
patterns: config.channels.clone(),
},
RedisCommand::Blpop | RedisCommand::Brpop => {
let key = config.key.clone().unwrap_or_else(|| "queue".to_string());
let pop_command = if config.command == RedisCommand::Brpop {
QueuePopCommand::Brpop
} else {
QueuePopCommand::Blpop
};
RedisConsumerMode::Queue {
key,
timeout: config.timeout,
pop_command,
}
}
_ => {
warn!(
"Invalid consumer command: {:?}, defaulting to BLPOP",
config.command
);
RedisConsumerMode::Queue {
key: config.key.clone().unwrap_or_else(|| "queue".to_string()),
timeout: config.timeout,
pop_command: QueuePopCommand::Blpop,
}
}
};
Self {
config,
mode,
cancel_token: None,
task_handle: None,
}
}
}
#[async_trait]
impl Consumer for RedisConsumer {
async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
let cancel_token = CancellationToken::new();
self.cancel_token = Some(cancel_token.clone());
let config = self.config.clone();
let mode = self.mode.clone();
info!("Starting Redis consumer in {:?} mode", mode);
let handle =
match mode {
RedisConsumerMode::PubSub { channels, patterns } => tokio::spawn(
run_pubsub_consumer(config, channels, patterns, ctx, cancel_token),
),
RedisConsumerMode::Queue {
key,
timeout,
pop_command,
} => tokio::spawn(run_queue_consumer(
config,
key,
timeout,
pop_command,
ctx,
cancel_token,
)),
};
self.task_handle = Some(handle);
Ok(())
}
async fn stop(&mut self) -> Result<(), CamelError> {
info!("Stopping Redis consumer");
if let Some(token) = &self.cancel_token {
token.cancel();
}
if let Some(handle) = self.task_handle.take() {
match handle.await {
Ok(result) => {
if let Err(e) = result {
error!("Consumer task exited with error: {}", e);
}
}
Err(e) => {
error!("Failed to join consumer task: {}", e);
}
}
}
self.cancel_token = None;
info!("Redis consumer stopped");
Ok(())
}
fn concurrency_model(&self) -> ConcurrencyModel {
ConcurrencyModel::Sequential
}
}
async fn run_pubsub_consumer(
config: RedisEndpointConfig,
channels: Vec<String>,
patterns: Vec<String>,
ctx: ConsumerContext,
cancel_token: CancellationToken,
) -> Result<(), CamelError> {
info!("PubSub consumer connecting to {}", config.redis_url());
let client = redis::Client::open(config.redis_url())
.map_err(|e| CamelError::ProcessorError(format!("Failed to create Redis client: {}", e)))?;
let mut pubsub = client.get_async_pubsub().await.map_err(|e| {
CamelError::ProcessorError(format!("Failed to create PubSub connection: {}", e))
})?;
for channel in &channels {
info!("Subscribing to channel: {}", channel);
pubsub.subscribe(channel).await.map_err(|e| {
CamelError::ProcessorError(format!("Failed to subscribe to channel {}: {}", channel, e))
})?;
}
for pattern in &patterns {
info!("Subscribing to pattern: {}", pattern);
pubsub.psubscribe(pattern).await.map_err(|e| {
CamelError::ProcessorError(format!("Failed to subscribe to pattern {}: {}", pattern, e))
})?;
}
info!("PubSub consumer started, waiting for messages");
let mut stream = pubsub.on_message();
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("PubSub consumer received shutdown signal");
break;
}
msg = stream.next() => {
if let Some(msg) = msg {
let exchange = build_exchange_from_pubsub(msg);
if let Err(e) = ctx.send(exchange).await {
error!("Failed to send exchange to pipeline: {}", e);
}
} else {
warn!("PubSub stream ended");
break;
}
}
}
}
Ok(())
}
async fn run_queue_consumer(
config: RedisEndpointConfig,
key: String,
timeout: u64,
pop_command: QueuePopCommand,
ctx: ConsumerContext,
cancel_token: CancellationToken,
) -> Result<(), CamelError> {
info!(
"Queue consumer connecting to {} for key '{}' with {} timeout {}s",
config.redis_url(),
key,
queue_command_name(pop_command),
timeout
);
let client = redis::Client::open(config.redis_url())
.map_err(|e| CamelError::ProcessorError(format!("Failed to create Redis client: {}", e)))?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.map_err(|e| CamelError::ProcessorError(format!("Failed to create connection: {}", e)))?;
info!("Queue consumer started, waiting for items");
let queue_cmd = queue_command_name(pop_command);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Queue consumer received shutdown signal");
break;
}
result = async {
let cmd = redis::cmd(queue_cmd)
.arg(&key)
.arg(timeout)
.to_owned();
cmd.query_async::<Option<(String, String)>>(&mut conn).await
} =>
{
match result {
Ok(Some((key, value))) => {
let exchange = build_exchange_from_blpop(key, value);
if let Err(e) = ctx.send(exchange).await {
error!("Failed to send exchange to pipeline: {}", e);
}
}
Ok(None) => {
}
Err(e) => {
if e.is_timeout() {
} else {
error!("{} error: {}", queue_cmd, e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
}
}
Ok(())
}
fn build_pubsub_exchange(payload: String, channel: String, pattern: Option<String>) -> Exchange {
let mut exchange = Exchange::new(Message::new(Body::Text(payload)));
exchange
.input
.set_header("CamelRedis.Channel", serde_json::Value::String(channel));
if let Some(pattern) = pattern {
exchange
.input
.set_header("CamelRedis.Pattern", serde_json::Value::String(pattern));
}
exchange
}
fn build_exchange_from_pubsub(msg: Msg) -> Exchange {
let payload: String = msg
.get_payload()
.unwrap_or_else(|_| "<error decoding payload>".to_string());
let channel = msg.get_channel_name().to_string();
let pattern = if msg.from_pattern() {
msg.get_pattern::<String>().ok()
} else {
None
};
build_pubsub_exchange(payload, channel, pattern)
}
fn build_exchange_from_blpop(key: String, value: String) -> Exchange {
let mut exchange = Exchange::new(Message::new(Body::Text(value)));
exchange
.input
.set_header("CamelRedis.Key", serde_json::Value::String(key));
exchange
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::mpsc;
fn create_test_config(command: RedisCommand) -> RedisEndpointConfig {
RedisEndpointConfig {
host: Some("localhost".to_string()),
port: Some(6379),
command,
channels: vec!["test".to_string()],
key: Some("test-queue".to_string()),
timeout: 1,
password: None,
db: 0,
}
}
#[test]
fn test_consumer_new_subscribe() {
let config = create_test_config(RedisCommand::Subscribe);
let consumer = RedisConsumer::new(config);
match consumer.mode {
RedisConsumerMode::PubSub { channels, patterns } => {
assert_eq!(channels, vec!["test".to_string()]);
assert!(patterns.is_empty());
}
_ => panic!("Expected PubSub mode"),
}
}
#[test]
fn test_consumer_new_psubscribe() {
let config = create_test_config(RedisCommand::Psubscribe);
let consumer = RedisConsumer::new(config);
match consumer.mode {
RedisConsumerMode::PubSub { channels, patterns } => {
assert!(channels.is_empty());
assert_eq!(patterns, vec!["test".to_string()]);
}
_ => panic!("Expected PubSub mode"),
}
}
#[test]
fn test_consumer_new_blpop() {
let config = create_test_config(RedisCommand::Blpop);
let consumer = RedisConsumer::new(config);
match consumer.mode {
RedisConsumerMode::Queue {
key,
timeout,
pop_command,
} => {
assert_eq!(key, "test-queue");
assert_eq!(timeout, 1);
assert_eq!(pop_command, QueuePopCommand::Blpop);
}
_ => panic!("Expected Queue mode"),
}
}
#[test]
fn test_consumer_new_brpop_uses_right_pop_command() {
let config = create_test_config(RedisCommand::Brpop);
let consumer = RedisConsumer::new(config);
match consumer.mode {
RedisConsumerMode::Queue { pop_command, .. } => {
assert_eq!(pop_command, QueuePopCommand::Brpop);
}
_ => panic!("Expected Queue mode"),
}
}
#[test]
fn test_consumer_new_blpop_default_key() {
let mut config = create_test_config(RedisCommand::Blpop);
config.key = None;
let consumer = RedisConsumer::new(config);
match consumer.mode {
RedisConsumerMode::Queue {
key, pop_command, ..
} => {
assert_eq!(key, "queue");
assert_eq!(pop_command, QueuePopCommand::Blpop);
}
_ => panic!("Expected Queue mode"),
}
}
#[test]
fn test_consumer_new_non_consumer_command_defaults_to_queue_mode() {
let config = create_test_config(RedisCommand::Set);
let consumer = RedisConsumer::new(config);
match consumer.mode {
RedisConsumerMode::Queue {
key,
timeout,
pop_command,
} => {
assert_eq!(key, "test-queue");
assert_eq!(timeout, 1);
assert_eq!(pop_command, QueuePopCommand::Blpop);
}
_ => panic!("Expected Queue mode"),
}
}
#[test]
fn test_queue_command_name_matches_pop_side() {
assert_eq!(queue_command_name(QueuePopCommand::Blpop), "BLPOP");
assert_eq!(queue_command_name(QueuePopCommand::Brpop), "BRPOP");
}
#[test]
fn test_consumer_concurrency_model_is_sequential() {
let config = create_test_config(RedisCommand::Subscribe);
let consumer = RedisConsumer::new(config);
assert_eq!(consumer.concurrency_model(), ConcurrencyModel::Sequential);
}
#[test]
fn test_build_exchange_from_blpop() {
let exchange = build_exchange_from_blpop("mykey".to_string(), "myvalue".to_string());
assert_eq!(exchange.input.body.as_text(), Some("myvalue"));
let header = exchange.input.header("CamelRedis.Key");
assert_eq!(
header,
Some(&serde_json::Value::String("mykey".to_string()))
);
}
#[test]
fn test_build_pubsub_exchange_without_pattern() {
let exchange = build_pubsub_exchange("hello".to_string(), "news".to_string(), None);
assert_eq!(exchange.input.body.as_text(), Some("hello"));
assert_eq!(
exchange.input.header("CamelRedis.Channel"),
Some(&serde_json::json!("news"))
);
assert!(exchange.input.header("CamelRedis.Pattern").is_none());
}
#[test]
fn test_build_pubsub_exchange_with_pattern() {
let exchange = build_pubsub_exchange(
"hello".to_string(),
"news.eu".to_string(),
Some("news.*".to_string()),
);
assert_eq!(
exchange.input.header("CamelRedis.Pattern"),
Some(&serde_json::json!("news.*"))
);
}
#[test]
fn test_queue_pop_command_derives() {
let cmd = QueuePopCommand::Blpop;
let _cmd2 = cmd; let _cmd3 = cmd.clone(); assert_eq!(format!("{:?}", cmd), "Blpop"); assert_eq!(QueuePopCommand::Blpop, QueuePopCommand::Blpop); assert_ne!(QueuePopCommand::Blpop, QueuePopCommand::Brpop);
}
#[test]
fn test_build_pubsub_exchange_with_empty_payload() {
let exchange = build_pubsub_exchange("".to_string(), "ch".to_string(), None);
assert_eq!(exchange.input.body.as_text(), Some(""));
assert_eq!(
exchange.input.header("CamelRedis.Channel"),
Some(&serde_json::json!("ch"))
);
}
#[test]
fn test_build_exchange_from_blpop_with_empty_values() {
let exchange = build_exchange_from_blpop("".to_string(), "".to_string());
assert_eq!(exchange.input.body.as_text(), Some(""));
assert_eq!(
exchange.input.header("CamelRedis.Key"),
Some(&serde_json::Value::String("".to_string()))
);
}
#[tokio::test]
async fn test_consumer_stop_without_start() {
let config = create_test_config(RedisCommand::Subscribe);
let mut consumer = RedisConsumer::new(config);
let result = consumer.stop().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_consumer_start_sets_task_handle() {
let config = create_test_config(RedisCommand::Blpop);
let mut consumer = RedisConsumer::new(config);
let (tx, _rx) = mpsc::channel(16);
let cancel_token = CancellationToken::new();
let ctx = ConsumerContext::new(tx, cancel_token.clone());
assert!(consumer.task_handle.is_none());
let result = consumer.start(ctx).await;
assert!(result.is_ok());
assert!(consumer.task_handle.is_some());
consumer.stop().await.ok();
}
#[tokio::test]
async fn test_consumer_start_pubsub_mode() {
let config = create_test_config(RedisCommand::Subscribe);
let mut consumer = RedisConsumer::new(config);
let (tx, _rx) = mpsc::channel(16);
let cancel_token = CancellationToken::new();
let ctx = ConsumerContext::new(tx, cancel_token.clone());
let result = consumer.start(ctx).await;
assert!(result.is_ok());
assert!(consumer.cancel_token.is_some());
assert!(consumer.task_handle.is_some());
consumer.stop().await.ok();
}
#[test]
fn test_consumer_new_blpop_with_default_key_when_none() {
let mut config = create_test_config(RedisCommand::Blpop);
config.key = None;
let consumer = RedisConsumer::new(config);
match &consumer.mode {
RedisConsumerMode::Queue { key, .. } => {
assert_eq!(key, "queue");
}
_ => panic!("Expected Queue mode"),
}
}
#[test]
fn test_consumer_new_brpop_with_default_key_when_none() {
let mut config = create_test_config(RedisCommand::Brpop);
config.key = None;
let consumer = RedisConsumer::new(config);
match &consumer.mode {
RedisConsumerMode::Queue {
key, pop_command, ..
} => {
assert_eq!(key, "queue");
assert_eq!(*pop_command, QueuePopCommand::Brpop);
}
_ => panic!("Expected Queue mode"),
}
}
#[test]
fn test_consumer_new_invalid_command_defaults_to_blpop() {
let mut config = create_test_config(RedisCommand::Get);
config.key = None;
let consumer = RedisConsumer::new(config);
match consumer.mode {
RedisConsumerMode::Queue {
key,
pop_command,
timeout,
} => {
assert_eq!(key, "queue");
assert_eq!(pop_command, QueuePopCommand::Blpop);
assert_eq!(timeout, 1);
}
_ => panic!("Expected Queue mode for invalid consumer command"),
}
}
#[test]
fn test_consumer_mode_debug() {
let pubsub_mode = RedisConsumerMode::PubSub {
channels: vec!["test".to_string()],
patterns: vec!["pattern:*".to_string()],
};
let debug_str = format!("{:?}", pubsub_mode);
assert!(debug_str.contains("PubSub"));
let queue_mode = RedisConsumerMode::Queue {
key: "mykey".to_string(),
timeout: 5,
pop_command: QueuePopCommand::Brpop,
};
let debug_str = format!("{:?}", queue_mode);
assert!(debug_str.contains("Queue"));
}
#[tokio::test]
async fn test_consumer_stops_gracefully() {
let config = create_test_config(RedisCommand::Blpop);
let mut consumer = RedisConsumer::new(config);
let (tx, _rx) = mpsc::channel(16);
let cancel_token = CancellationToken::new();
let ctx = ConsumerContext::new(tx, cancel_token.clone());
let start_result = consumer.start(ctx).await;
assert!(start_result.is_ok());
tokio::time::sleep(Duration::from_millis(10)).await;
let stop_result = consumer.stop().await;
assert!(stop_result.is_ok());
}
}