Skip to main content

camel_component_redis/
consumer.rs

1use async_trait::async_trait;
2use camel_api::{Body, CamelError, Exchange, Message};
3use camel_component::{ConcurrencyModel, Consumer, ConsumerContext};
4use futures_util::StreamExt;
5use redis::Msg;
6use std::time::Duration;
7use tokio::task::JoinHandle;
8use tokio_util::sync::CancellationToken;
9use tracing::{error, info, warn};
10
11use crate::config::{RedisCommand, RedisConfig};
12
13/// Mode of operation for the Redis consumer.
14#[derive(Debug, Clone)]
15pub enum RedisConsumerMode {
16    /// Pub/Sub mode for real-time message streams.
17    PubSub {
18        /// Channels to subscribe to (SUBSCRIBE)
19        channels: Vec<String>,
20        /// Patterns to subscribe to (PSUBSCRIBE)
21        patterns: Vec<String>,
22    },
23    /// Queue mode for blocking list operations.
24    Queue {
25        /// Key to watch for items
26        key: String,
27        /// Timeout in seconds for BLPOP
28        timeout: u64,
29    },
30}
31
32/// Redis consumer implementation supporting both Pub/Sub and Queue modes.
33pub struct RedisConsumer {
34    config: RedisConfig,
35    mode: RedisConsumerMode,
36    /// Cancellation token for graceful shutdown
37    cancel_token: Option<CancellationToken>,
38    /// Handle to the spawned consumer task
39    task_handle: Option<JoinHandle<Result<(), CamelError>>>,
40}
41
42impl RedisConsumer {
43    /// Creates a new RedisConsumer with the given configuration.
44    ///
45    /// The mode is automatically determined from the command type in the config:
46    /// - SUBSCRIBE → PubSub with channels
47    /// - PSUBSCRIBE → PubSub with patterns
48    /// - BLPOP/BRPOP → Queue mode
49    pub fn new(config: RedisConfig) -> Self {
50        let mode = match &config.command {
51            RedisCommand::Subscribe => RedisConsumerMode::PubSub {
52                channels: config.channels.clone(),
53                patterns: vec![],
54            },
55            RedisCommand::Psubscribe => RedisConsumerMode::PubSub {
56                channels: vec![],
57                patterns: config.channels.clone(),
58            },
59            RedisCommand::Blpop | RedisCommand::Brpop => {
60                let key = config.key.clone().unwrap_or_else(|| "queue".to_string());
61                RedisConsumerMode::Queue {
62                    key,
63                    timeout: config.timeout,
64                }
65            }
66            _ => {
67                warn!(
68                    "Invalid consumer command: {:?}, defaulting to BLPOP",
69                    config.command
70                );
71                RedisConsumerMode::Queue {
72                    key: config.key.clone().unwrap_or_else(|| "queue".to_string()),
73                    timeout: config.timeout,
74                }
75            }
76        };
77
78        Self {
79            config,
80            mode,
81            cancel_token: None,
82            task_handle: None,
83        }
84    }
85}
86
87#[async_trait]
88impl Consumer for RedisConsumer {
89    async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
90        // Create cancellation token for this consumer
91        let cancel_token = CancellationToken::new();
92        self.cancel_token = Some(cancel_token.clone());
93
94        // Clone config and mode for the spawned task
95        let config = self.config.clone();
96        let mode = self.mode.clone();
97
98        info!("Starting Redis consumer in {:?} mode", mode);
99
100        // Spawn the appropriate consumer loop based on mode
101        let handle =
102            match mode {
103                RedisConsumerMode::PubSub { channels, patterns } => tokio::spawn(
104                    run_pubsub_consumer(config, channels, patterns, ctx, cancel_token),
105                ),
106                RedisConsumerMode::Queue { key, timeout } => {
107                    tokio::spawn(run_queue_consumer(config, key, timeout, ctx, cancel_token))
108                }
109            };
110
111        self.task_handle = Some(handle);
112        Ok(())
113    }
114
115    async fn stop(&mut self) -> Result<(), CamelError> {
116        info!("Stopping Redis consumer");
117
118        // Cancel the token to signal shutdown
119        if let Some(token) = &self.cancel_token {
120            token.cancel();
121        }
122
123        // Wait for the task to complete
124        if let Some(handle) = self.task_handle.take() {
125            match handle.await {
126                Ok(result) => {
127                    if let Err(e) = result {
128                        error!("Consumer task exited with error: {}", e);
129                    }
130                }
131                Err(e) => {
132                    error!("Failed to join consumer task: {}", e);
133                }
134            }
135        }
136
137        self.cancel_token = None;
138        info!("Redis consumer stopped");
139        Ok(())
140    }
141
142    /// Redis consumers are sequential by default to maintain message order.
143    ///
144    /// This default is chosen for the following reasons:
145    /// - **Pub/Sub**: Messages often need ordering (e.g., event streams, notifications)
146    /// - **Queue (BLPOP)**: Queue items should be processed in order
147    /// - **Backpressure**: Sequential processing naturally applies backpressure
148    ///   when the consumer is slower than the producer
149    ///
150    /// Users can override this with `.concurrent(n)` in the route DSL if they
151    /// want parallel processing and ordering is not a concern.
152    fn concurrency_model(&self) -> ConcurrencyModel {
153        ConcurrencyModel::Sequential
154    }
155}
156
157/// Runs a Pub/Sub consumer loop.
158///
159/// Creates a dedicated PubSub connection and subscribes to the specified
160/// channels and/or patterns. Messages are converted to Exchanges and sent
161/// through the consumer context.
162async fn run_pubsub_consumer(
163    config: RedisConfig,
164    channels: Vec<String>,
165    patterns: Vec<String>,
166    ctx: ConsumerContext,
167    cancel_token: CancellationToken,
168) -> Result<(), CamelError> {
169    info!("PubSub consumer connecting to {}", config.redis_url());
170
171    // Create dedicated PubSub connection
172    let client = redis::Client::open(config.redis_url())
173        .map_err(|e| CamelError::ProcessorError(format!("Failed to create Redis client: {}", e)))?;
174
175    let mut pubsub = client.get_async_pubsub().await.map_err(|e| {
176        CamelError::ProcessorError(format!("Failed to create PubSub connection: {}", e))
177    })?;
178
179    // Subscribe to channels
180    for channel in &channels {
181        info!("Subscribing to channel: {}", channel);
182        pubsub.subscribe(channel).await.map_err(|e| {
183            CamelError::ProcessorError(format!("Failed to subscribe to channel {}: {}", channel, e))
184        })?;
185    }
186
187    // Subscribe to patterns
188    for pattern in &patterns {
189        info!("Subscribing to pattern: {}", pattern);
190        pubsub.psubscribe(pattern).await.map_err(|e| {
191            CamelError::ProcessorError(format!("Failed to subscribe to pattern {}: {}", pattern, e))
192        })?;
193    }
194
195    info!("PubSub consumer started, waiting for messages");
196
197    // Message loop
198    let mut stream = pubsub.on_message();
199    loop {
200        tokio::select! {
201            _ = cancel_token.cancelled() => {
202                info!("PubSub consumer received shutdown signal");
203                break;
204            }
205            msg = stream.next() => {
206                if let Some(msg) = msg {
207                    let exchange = build_exchange_from_pubsub(msg);
208                    if let Err(e) = ctx.send(exchange).await {
209                        error!("Failed to send exchange to pipeline: {}", e);
210                        // Don't break - continue processing messages
211                    }
212                } else {
213                    // Stream ended
214                    warn!("PubSub stream ended");
215                    break;
216                }
217            }
218        }
219    }
220
221    Ok(())
222}
223
224/// Runs a Queue consumer loop using BLPOP.
225///
226/// Creates a dedicated connection and performs blocking list pop operations.
227/// Items are converted to Exchanges and sent through the consumer context.
228async fn run_queue_consumer(
229    config: RedisConfig,
230    key: String,
231    timeout: u64,
232    ctx: ConsumerContext,
233    cancel_token: CancellationToken,
234) -> Result<(), CamelError> {
235    info!(
236        "Queue consumer connecting to {} for key '{}' with timeout {}s",
237        config.redis_url(),
238        key,
239        timeout
240    );
241
242    // Create dedicated multiplexed connection
243    let client = redis::Client::open(config.redis_url())
244        .map_err(|e| CamelError::ProcessorError(format!("Failed to create Redis client: {}", e)))?;
245
246    let mut conn = client
247        .get_multiplexed_async_connection()
248        .await
249        .map_err(|e| CamelError::ProcessorError(format!("Failed to create connection: {}", e)))?;
250
251    info!("Queue consumer started, waiting for items");
252
253    // BLPOP loop
254    loop {
255        tokio::select! {
256            _ = cancel_token.cancelled() => {
257                info!("Queue consumer received shutdown signal");
258                break;
259            }
260            result = async {
261                let cmd = redis::cmd("BLPOP")
262                    .arg(&key)
263                    .arg(timeout as f64)
264                    .to_owned();
265                cmd.query_async::<Option<(String, String)>>(&mut conn).await
266            } =>
267            {
268                match result {
269                    Ok(Some((key, value))) => {
270                        let exchange = build_exchange_from_blpop(key, value);
271                        if let Err(e) = ctx.send(exchange).await {
272                            error!("Failed to send exchange to pipeline: {}", e);
273                            // Don't break - continue processing items
274                        }
275                    }
276                    Ok(None) => {
277                        // Timeout - continue loop
278                        // This is normal for BLPOP with timeout
279                    }
280                    Err(e) => {
281                        error!("BLPOP error: {}", e);
282                        // Brief pause before retrying to avoid tight error loop
283                        tokio::time::sleep(Duration::from_millis(100)).await;
284                    }
285                }
286            }
287        }
288    }
289
290    Ok(())
291}
292
293/// Builds an Exchange from a Pub/Sub message.
294///
295/// Sets the following headers:
296/// - `CamelRedis.Channel`: The channel the message was published to
297/// - `CamelRedis.Pattern`: The pattern matched (if applicable, for PSUBSCRIBE)
298fn build_exchange_from_pubsub(msg: Msg) -> Exchange {
299    let payload: String = msg
300        .get_payload()
301        .unwrap_or_else(|_| "<error decoding payload>".to_string());
302
303    let mut exchange = Exchange::new(Message::new(Body::Text(payload)));
304
305    // Set channel header
306    exchange.input.set_header(
307        "CamelRedis.Channel",
308        serde_json::Value::String(msg.get_channel_name().to_string()),
309    );
310
311    // Set pattern header if this is from a pattern subscription
312    if msg.from_pattern()
313        && let Ok(pattern) = msg.get_pattern::<String>()
314    {
315        exchange
316            .input
317            .set_header("CamelRedis.Pattern", serde_json::Value::String(pattern));
318    }
319
320    exchange
321}
322
323/// Builds an Exchange from a BLPOP result.
324///
325/// Sets the following headers:
326/// - `CamelRedis.Key`: The list key the item was popped from
327fn build_exchange_from_blpop(key: String, value: String) -> Exchange {
328    let mut exchange = Exchange::new(Message::new(Body::Text(value)));
329
330    // Set key header
331    exchange
332        .input
333        .set_header("CamelRedis.Key", serde_json::Value::String(key));
334
335    exchange
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use tokio::sync::mpsc;
342
343    fn create_test_config(command: RedisCommand) -> RedisConfig {
344        RedisConfig {
345            host: "localhost".to_string(),
346            port: 6379,
347            command,
348            channels: vec!["test".to_string()],
349            key: Some("test-queue".to_string()),
350            timeout: 1,
351            password: None,
352            db: 0,
353        }
354    }
355
356    #[test]
357    fn test_consumer_new_subscribe() {
358        let config = create_test_config(RedisCommand::Subscribe);
359        let consumer = RedisConsumer::new(config);
360
361        match consumer.mode {
362            RedisConsumerMode::PubSub { channels, patterns } => {
363                assert_eq!(channels, vec!["test".to_string()]);
364                assert!(patterns.is_empty());
365            }
366            _ => panic!("Expected PubSub mode"),
367        }
368    }
369
370    #[test]
371    fn test_consumer_new_psubscribe() {
372        let config = create_test_config(RedisCommand::Psubscribe);
373        let consumer = RedisConsumer::new(config);
374
375        match consumer.mode {
376            RedisConsumerMode::PubSub { channels, patterns } => {
377                assert!(channels.is_empty());
378                assert_eq!(patterns, vec!["test".to_string()]);
379            }
380            _ => panic!("Expected PubSub mode"),
381        }
382    }
383
384    #[test]
385    fn test_consumer_new_blpop() {
386        let config = create_test_config(RedisCommand::Blpop);
387        let consumer = RedisConsumer::new(config);
388
389        match consumer.mode {
390            RedisConsumerMode::Queue { key, timeout } => {
391                assert_eq!(key, "test-queue");
392                assert_eq!(timeout, 1);
393            }
394            _ => panic!("Expected Queue mode"),
395        }
396    }
397
398    #[test]
399    fn test_consumer_new_blpop_default_key() {
400        let mut config = create_test_config(RedisCommand::Blpop);
401        config.key = None;
402        let consumer = RedisConsumer::new(config);
403
404        match consumer.mode {
405            RedisConsumerMode::Queue { key, .. } => {
406                assert_eq!(key, "queue");
407            }
408            _ => panic!("Expected Queue mode"),
409        }
410    }
411
412    #[test]
413    fn test_build_exchange_from_blpop() {
414        let exchange = build_exchange_from_blpop("mykey".to_string(), "myvalue".to_string());
415
416        assert_eq!(exchange.input.body.as_text(), Some("myvalue"));
417
418        let header = exchange.input.header("CamelRedis.Key");
419        assert_eq!(
420            header,
421            Some(&serde_json::Value::String("mykey".to_string()))
422        );
423    }
424
425    #[tokio::test]
426    async fn test_consumer_stops_gracefully() {
427        let config = create_test_config(RedisCommand::Blpop);
428        let mut consumer = RedisConsumer::new(config);
429
430        // Create a mock context (won't actually be used in this test)
431        let (tx, _rx) = mpsc::channel(16);
432        let cancel_token = CancellationToken::new();
433        let ctx = ConsumerContext::new(tx, cancel_token.clone());
434
435        // Start should succeed
436        let start_result = consumer.start(ctx).await;
437        assert!(start_result.is_ok());
438
439        // Give task a moment to start
440        tokio::time::sleep(Duration::from_millis(10)).await;
441
442        // Stop should succeed
443        let stop_result = consumer.stop().await;
444        assert!(stop_result.is_ok());
445    }
446
447    // Integration tests (require running Redis, marked with #[ignore])
448
449    #[tokio::test]
450    #[ignore] // Requires running Redis
451    async fn test_pubsub_consumer_receives_messages() {
452        // Setup: create consumer with SUBSCRIBE
453        let config = create_test_config(RedisCommand::Subscribe);
454        let mut consumer = RedisConsumer::new(config);
455
456        let (tx, _rx) = mpsc::channel(16);
457        let cancel_token = CancellationToken::new();
458        let ctx = ConsumerContext::new(tx, cancel_token.clone());
459
460        // Start consumer
461        consumer.start(ctx).await.unwrap();
462
463        // Give consumer time to subscribe
464        tokio::time::sleep(Duration::from_millis(100)).await;
465
466        // Publish test message (would need a separate Redis client)
467        // let publish_client = redis::Client::open("redis://localhost:6379").unwrap();
468        // let mut pub_conn = publish_client.get_connection().unwrap();
469        // redis::cmd("PUBLISH").arg("test").arg("hello").query(&mut pub_conn).unwrap();
470
471        // Verify exchange received (would check rx)
472        // let envelope = tokio::time::timeout(Duration::from_secs(1), rx.recv())
473        //     .await
474        //     .expect("Should receive message")
475        //     .expect("Message should exist");
476        // assert_eq!(envelope.exchange.input.body.as_text(), Some("hello"));
477
478        // Cleanup
479        consumer.stop().await.unwrap();
480    }
481
482    #[tokio::test]
483    #[ignore] // Requires running Redis
484    async fn test_queue_consumer_processes_items() {
485        // Setup: create consumer with BLPOP
486        let config = create_test_config(RedisCommand::Blpop);
487        let mut consumer = RedisConsumer::new(config);
488
489        let (tx, _rx) = mpsc::channel(16);
490        let cancel_token = CancellationToken::new();
491        let ctx = ConsumerContext::new(tx, cancel_token.clone());
492
493        // Start consumer
494        consumer.start(ctx).await.unwrap();
495
496        // Give consumer time to start
497        tokio::time::sleep(Duration::from_millis(100)).await;
498
499        // LPUSH test item (would need a separate Redis client)
500        // let push_client = redis::Client::open("redis://localhost:6379").unwrap();
501        // let mut push_conn = push_client.get_connection().unwrap();
502        // redis::cmd("LPUSH").arg("test-queue").arg("item1").query(&mut push_conn).unwrap();
503
504        // Verify exchange received (would check rx)
505        // let envelope = tokio::time::timeout(Duration::from_secs(1), rx.recv())
506        //     .await
507        //     .expect("Should receive message")
508        //     .expect("Message should exist");
509        // assert_eq!(envelope.exchange.input.body.as_text(), Some("item1"));
510
511        // Cleanup
512        consumer.stop().await.unwrap();
513    }
514}