1use async_trait::async_trait;
2use camel_api::{Body, CamelError, Exchange, Message};
3use camel_component::{ConcurrencyModel, Consumer, ConsumerContext};
4use futures_util::StreamExt;
5use redis::Msg;
6use std::time::Duration;
7use tokio::task::JoinHandle;
8use tokio_util::sync::CancellationToken;
9use tracing::{error, info, warn};
10
11use crate::config::{RedisCommand, RedisEndpointConfig};
12
13#[derive(Debug, Clone)]
15pub enum RedisConsumerMode {
16 PubSub {
18 channels: Vec<String>,
20 patterns: Vec<String>,
22 },
23 Queue {
25 key: String,
27 timeout: u64,
29 },
30}
31
32pub struct RedisConsumer {
34 config: RedisEndpointConfig,
35 mode: RedisConsumerMode,
36 cancel_token: Option<CancellationToken>,
38 task_handle: Option<JoinHandle<Result<(), CamelError>>>,
40}
41
42impl RedisConsumer {
43 pub fn new(config: RedisEndpointConfig) -> Self {
50 let mode = match &config.command {
51 RedisCommand::Subscribe => RedisConsumerMode::PubSub {
52 channels: config.channels.clone(),
53 patterns: vec![],
54 },
55 RedisCommand::Psubscribe => RedisConsumerMode::PubSub {
56 channels: vec![],
57 patterns: config.channels.clone(),
58 },
59 RedisCommand::Blpop | RedisCommand::Brpop => {
60 let key = config.key.clone().unwrap_or_else(|| "queue".to_string());
61 RedisConsumerMode::Queue {
62 key,
63 timeout: config.timeout,
64 }
65 }
66 _ => {
67 warn!(
68 "Invalid consumer command: {:?}, defaulting to BLPOP",
69 config.command
70 );
71 RedisConsumerMode::Queue {
72 key: config.key.clone().unwrap_or_else(|| "queue".to_string()),
73 timeout: config.timeout,
74 }
75 }
76 };
77
78 Self {
79 config,
80 mode,
81 cancel_token: None,
82 task_handle: None,
83 }
84 }
85}
86
87#[async_trait]
88impl Consumer for RedisConsumer {
89 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
90 let cancel_token = CancellationToken::new();
92 self.cancel_token = Some(cancel_token.clone());
93
94 let config = self.config.clone();
96 let mode = self.mode.clone();
97
98 info!("Starting Redis consumer in {:?} mode", mode);
99
100 let handle =
102 match mode {
103 RedisConsumerMode::PubSub { channels, patterns } => tokio::spawn(
104 run_pubsub_consumer(config, channels, patterns, ctx, cancel_token),
105 ),
106 RedisConsumerMode::Queue { key, timeout } => {
107 tokio::spawn(run_queue_consumer(config, key, timeout, ctx, cancel_token))
108 }
109 };
110
111 self.task_handle = Some(handle);
112 Ok(())
113 }
114
115 async fn stop(&mut self) -> Result<(), CamelError> {
116 info!("Stopping Redis consumer");
117
118 if let Some(token) = &self.cancel_token {
120 token.cancel();
121 }
122
123 if let Some(handle) = self.task_handle.take() {
125 match handle.await {
126 Ok(result) => {
127 if let Err(e) = result {
128 error!("Consumer task exited with error: {}", e);
129 }
130 }
131 Err(e) => {
132 error!("Failed to join consumer task: {}", e);
133 }
134 }
135 }
136
137 self.cancel_token = None;
138 info!("Redis consumer stopped");
139 Ok(())
140 }
141
142 fn concurrency_model(&self) -> ConcurrencyModel {
153 ConcurrencyModel::Sequential
154 }
155}
156
157async fn run_pubsub_consumer(
163 config: RedisEndpointConfig,
164 channels: Vec<String>,
165 patterns: Vec<String>,
166 ctx: ConsumerContext,
167 cancel_token: CancellationToken,
168) -> Result<(), CamelError> {
169 info!("PubSub consumer connecting to {}", config.redis_url());
170
171 let client = redis::Client::open(config.redis_url())
173 .map_err(|e| CamelError::ProcessorError(format!("Failed to create Redis client: {}", e)))?;
174
175 let mut pubsub = client.get_async_pubsub().await.map_err(|e| {
176 CamelError::ProcessorError(format!("Failed to create PubSub connection: {}", e))
177 })?;
178
179 for channel in &channels {
181 info!("Subscribing to channel: {}", channel);
182 pubsub.subscribe(channel).await.map_err(|e| {
183 CamelError::ProcessorError(format!("Failed to subscribe to channel {}: {}", channel, e))
184 })?;
185 }
186
187 for pattern in &patterns {
189 info!("Subscribing to pattern: {}", pattern);
190 pubsub.psubscribe(pattern).await.map_err(|e| {
191 CamelError::ProcessorError(format!("Failed to subscribe to pattern {}: {}", pattern, e))
192 })?;
193 }
194
195 info!("PubSub consumer started, waiting for messages");
196
197 let mut stream = pubsub.on_message();
199 loop {
200 tokio::select! {
201 _ = cancel_token.cancelled() => {
202 info!("PubSub consumer received shutdown signal");
203 break;
204 }
205 msg = stream.next() => {
206 if let Some(msg) = msg {
207 let exchange = build_exchange_from_pubsub(msg);
208 if let Err(e) = ctx.send(exchange).await {
209 error!("Failed to send exchange to pipeline: {}", e);
210 }
212 } else {
213 warn!("PubSub stream ended");
215 break;
216 }
217 }
218 }
219 }
220
221 Ok(())
222}
223
224async fn run_queue_consumer(
229 config: RedisEndpointConfig,
230 key: String,
231 timeout: u64,
232 ctx: ConsumerContext,
233 cancel_token: CancellationToken,
234) -> Result<(), CamelError> {
235 info!(
236 "Queue consumer connecting to {} for key '{}' with timeout {}s",
237 config.redis_url(),
238 key,
239 timeout
240 );
241
242 let client = redis::Client::open(config.redis_url())
244 .map_err(|e| CamelError::ProcessorError(format!("Failed to create Redis client: {}", e)))?;
245
246 let mut conn = client
247 .get_multiplexed_async_connection()
248 .await
249 .map_err(|e| CamelError::ProcessorError(format!("Failed to create connection: {}", e)))?;
250
251 info!("Queue consumer started, waiting for items");
252
253 loop {
255 tokio::select! {
256 _ = cancel_token.cancelled() => {
257 info!("Queue consumer received shutdown signal");
258 break;
259 }
260 result = async {
261 let cmd = redis::cmd("BLPOP")
262 .arg(&key)
263 .arg(timeout)
264 .to_owned();
265 cmd.query_async::<Option<(String, String)>>(&mut conn).await
266 } =>
267 {
268 match result {
269 Ok(Some((key, value))) => {
270 let exchange = build_exchange_from_blpop(key, value);
271 if let Err(e) = ctx.send(exchange).await {
272 error!("Failed to send exchange to pipeline: {}", e);
273 }
275 }
276 Ok(None) => {
277 }
280 Err(e) => {
281 error!("BLPOP error: {}", e);
282 tokio::time::sleep(Duration::from_millis(100)).await;
284 }
285 }
286 }
287 }
288 }
289
290 Ok(())
291}
292
293fn build_exchange_from_pubsub(msg: Msg) -> Exchange {
299 let payload: String = msg
300 .get_payload()
301 .unwrap_or_else(|_| "<error decoding payload>".to_string());
302
303 let mut exchange = Exchange::new(Message::new(Body::Text(payload)));
304
305 exchange.input.set_header(
307 "CamelRedis.Channel",
308 serde_json::Value::String(msg.get_channel_name().to_string()),
309 );
310
311 if msg.from_pattern()
313 && let Ok(pattern) = msg.get_pattern::<String>()
314 {
315 exchange
316 .input
317 .set_header("CamelRedis.Pattern", serde_json::Value::String(pattern));
318 }
319
320 exchange
321}
322
323fn build_exchange_from_blpop(key: String, value: String) -> Exchange {
328 let mut exchange = Exchange::new(Message::new(Body::Text(value)));
329
330 exchange
332 .input
333 .set_header("CamelRedis.Key", serde_json::Value::String(key));
334
335 exchange
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use tokio::sync::mpsc;
342
343 fn create_test_config(command: RedisCommand) -> RedisEndpointConfig {
344 RedisEndpointConfig {
345 host: Some("localhost".to_string()),
346 port: Some(6379),
347 command,
348 channels: vec!["test".to_string()],
349 key: Some("test-queue".to_string()),
350 timeout: 1,
351 password: None,
352 db: 0,
353 }
354 }
355
356 #[test]
357 fn test_consumer_new_subscribe() {
358 let config = create_test_config(RedisCommand::Subscribe);
359 let consumer = RedisConsumer::new(config);
360
361 match consumer.mode {
362 RedisConsumerMode::PubSub { channels, patterns } => {
363 assert_eq!(channels, vec!["test".to_string()]);
364 assert!(patterns.is_empty());
365 }
366 _ => panic!("Expected PubSub mode"),
367 }
368 }
369
370 #[test]
371 fn test_consumer_new_psubscribe() {
372 let config = create_test_config(RedisCommand::Psubscribe);
373 let consumer = RedisConsumer::new(config);
374
375 match consumer.mode {
376 RedisConsumerMode::PubSub { channels, patterns } => {
377 assert!(channels.is_empty());
378 assert_eq!(patterns, vec!["test".to_string()]);
379 }
380 _ => panic!("Expected PubSub mode"),
381 }
382 }
383
384 #[test]
385 fn test_consumer_new_blpop() {
386 let config = create_test_config(RedisCommand::Blpop);
387 let consumer = RedisConsumer::new(config);
388
389 match consumer.mode {
390 RedisConsumerMode::Queue { key, timeout } => {
391 assert_eq!(key, "test-queue");
392 assert_eq!(timeout, 1);
393 }
394 _ => panic!("Expected Queue mode"),
395 }
396 }
397
398 #[test]
399 fn test_consumer_new_blpop_default_key() {
400 let mut config = create_test_config(RedisCommand::Blpop);
401 config.key = None;
402 let consumer = RedisConsumer::new(config);
403
404 match consumer.mode {
405 RedisConsumerMode::Queue { key, .. } => {
406 assert_eq!(key, "queue");
407 }
408 _ => panic!("Expected Queue mode"),
409 }
410 }
411
412 #[test]
413 fn test_build_exchange_from_blpop() {
414 let exchange = build_exchange_from_blpop("mykey".to_string(), "myvalue".to_string());
415
416 assert_eq!(exchange.input.body.as_text(), Some("myvalue"));
417
418 let header = exchange.input.header("CamelRedis.Key");
419 assert_eq!(
420 header,
421 Some(&serde_json::Value::String("mykey".to_string()))
422 );
423 }
424
425 #[tokio::test]
426 async fn test_consumer_stops_gracefully() {
427 let config = create_test_config(RedisCommand::Blpop);
428 let mut consumer = RedisConsumer::new(config);
429
430 let (tx, _rx) = mpsc::channel(16);
432 let cancel_token = CancellationToken::new();
433 let ctx = ConsumerContext::new(tx, cancel_token.clone());
434
435 let start_result = consumer.start(ctx).await;
437 assert!(start_result.is_ok());
438
439 tokio::time::sleep(Duration::from_millis(10)).await;
441
442 let stop_result = consumer.stop().await;
444 assert!(stop_result.is_ok());
445 }
446
447 #[tokio::test]
450 #[ignore] async fn test_pubsub_consumer_receives_messages() {
452 let config = create_test_config(RedisCommand::Subscribe);
454 let mut consumer = RedisConsumer::new(config);
455
456 let (tx, _rx) = mpsc::channel(16);
457 let cancel_token = CancellationToken::new();
458 let ctx = ConsumerContext::new(tx, cancel_token.clone());
459
460 consumer.start(ctx).await.unwrap();
462
463 tokio::time::sleep(Duration::from_millis(100)).await;
465
466 consumer.stop().await.unwrap();
480 }
481
482 #[tokio::test]
483 #[ignore] async fn test_queue_consumer_processes_items() {
485 let config = create_test_config(RedisCommand::Blpop);
487 let mut consumer = RedisConsumer::new(config);
488
489 let (tx, _rx) = mpsc::channel(16);
490 let cancel_token = CancellationToken::new();
491 let ctx = ConsumerContext::new(tx, cancel_token.clone());
492
493 consumer.start(ctx).await.unwrap();
495
496 tokio::time::sleep(Duration::from_millis(100)).await;
498
499 consumer.stop().await.unwrap();
513 }
514}