Skip to main content

camel_component_redis/
consumer.rs

1use async_trait::async_trait;
2use camel_component_api::{Body, CamelError, Exchange, Message};
3use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
4use futures_util::StreamExt;
5use redis::Msg;
6use std::time::Duration;
7use tokio::task::JoinHandle;
8use tokio_util::sync::CancellationToken;
9use tracing::{error, info, warn};
10
11use crate::config::{RedisCommand, RedisEndpointConfig};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum QueuePopCommand {
15    Blpop,
16    Brpop,
17}
18
19fn queue_command_name(pop_command: QueuePopCommand) -> &'static str {
20    match pop_command {
21        QueuePopCommand::Blpop => "BLPOP",
22        QueuePopCommand::Brpop => "BRPOP",
23    }
24}
25
26/// Mode of operation for the Redis consumer.
27#[derive(Debug, Clone)]
28pub enum RedisConsumerMode {
29    /// Pub/Sub mode for real-time message streams.
30    PubSub {
31        /// Channels to subscribe to (SUBSCRIBE)
32        channels: Vec<String>,
33        /// Patterns to subscribe to (PSUBSCRIBE)
34        patterns: Vec<String>,
35    },
36    /// Queue mode for blocking list operations.
37    Queue {
38        /// Key to watch for items
39        key: String,
40        /// Timeout in seconds for blocking pop
41        timeout: u64,
42        /// Blocking pop command to use (left or right)
43        pop_command: QueuePopCommand,
44    },
45}
46
47/// Redis consumer implementation supporting both Pub/Sub and Queue modes.
48pub struct RedisConsumer {
49    config: RedisEndpointConfig,
50    mode: RedisConsumerMode,
51    /// Cancellation token for graceful shutdown
52    cancel_token: Option<CancellationToken>,
53    /// Handle to the spawned consumer task
54    task_handle: Option<JoinHandle<Result<(), CamelError>>>,
55}
56
57impl RedisConsumer {
58    /// Creates a new RedisConsumer with the given configuration.
59    ///
60    /// The mode is automatically determined from the command type in the config:
61    /// - SUBSCRIBE → PubSub with channels
62    /// - PSUBSCRIBE → PubSub with patterns
63    /// - BLPOP/BRPOP → Queue mode
64    pub fn new(config: RedisEndpointConfig) -> Self {
65        let mode = match &config.command {
66            RedisCommand::Subscribe => RedisConsumerMode::PubSub {
67                channels: config.channels.clone(),
68                patterns: vec![],
69            },
70            RedisCommand::Psubscribe => RedisConsumerMode::PubSub {
71                channels: vec![],
72                patterns: config.channels.clone(),
73            },
74            RedisCommand::Blpop | RedisCommand::Brpop => {
75                let key = config.key.clone().unwrap_or_else(|| "queue".to_string());
76                let pop_command = if config.command == RedisCommand::Brpop {
77                    QueuePopCommand::Brpop
78                } else {
79                    QueuePopCommand::Blpop
80                };
81                RedisConsumerMode::Queue {
82                    key,
83                    timeout: config.timeout,
84                    pop_command,
85                }
86            }
87            _ => {
88                warn!(
89                    "Invalid consumer command: {:?}, defaulting to BLPOP",
90                    config.command
91                );
92                RedisConsumerMode::Queue {
93                    key: config.key.clone().unwrap_or_else(|| "queue".to_string()),
94                    timeout: config.timeout,
95                    pop_command: QueuePopCommand::Blpop,
96                }
97            }
98        };
99
100        Self {
101            config,
102            mode,
103            cancel_token: None,
104            task_handle: None,
105        }
106    }
107}
108
109#[async_trait]
110impl Consumer for RedisConsumer {
111    async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
112        // Create cancellation token for this consumer
113        let cancel_token = CancellationToken::new();
114        self.cancel_token = Some(cancel_token.clone());
115
116        // Clone config and mode for the spawned task
117        let config = self.config.clone();
118        let mode = self.mode.clone();
119
120        info!("Starting Redis consumer in {:?} mode", mode);
121
122        // Spawn the appropriate consumer loop based on mode
123        let handle =
124            match mode {
125                RedisConsumerMode::PubSub { channels, patterns } => tokio::spawn(
126                    run_pubsub_consumer(config, channels, patterns, ctx, cancel_token),
127                ),
128                RedisConsumerMode::Queue {
129                    key,
130                    timeout,
131                    pop_command,
132                } => tokio::spawn(run_queue_consumer(
133                    config,
134                    key,
135                    timeout,
136                    pop_command,
137                    ctx,
138                    cancel_token,
139                )),
140            };
141
142        self.task_handle = Some(handle);
143        Ok(())
144    }
145
146    async fn stop(&mut self) -> Result<(), CamelError> {
147        info!("Stopping Redis consumer");
148
149        // Cancel the token to signal shutdown
150        if let Some(token) = &self.cancel_token {
151            token.cancel();
152        }
153
154        // Wait for the task to complete
155        if let Some(handle) = self.task_handle.take() {
156            match handle.await {
157                Ok(result) => {
158                    if let Err(e) = result {
159                        error!("Consumer task exited with error: {}", e);
160                    }
161                }
162                Err(e) => {
163                    error!("Failed to join consumer task: {}", e);
164                }
165            }
166        }
167
168        self.cancel_token = None;
169        info!("Redis consumer stopped");
170        Ok(())
171    }
172
173    /// Redis consumers are sequential by default to maintain message order.
174    ///
175    /// This default is chosen for the following reasons:
176    /// - **Pub/Sub**: Messages often need ordering (e.g., event streams, notifications)
177    /// - **Queue (BLPOP)**: Queue items should be processed in order
178    /// - **Backpressure**: Sequential processing naturally applies backpressure
179    ///   when the consumer is slower than the producer
180    ///
181    /// Users can override this with `.concurrent(n)` in the route DSL if they
182    /// want parallel processing and ordering is not a concern.
183    fn concurrency_model(&self) -> ConcurrencyModel {
184        ConcurrencyModel::Sequential
185    }
186}
187
188/// Runs a Pub/Sub consumer loop.
189///
190/// Creates a dedicated PubSub connection and subscribes to the specified
191/// channels and/or patterns. Messages are converted to Exchanges and sent
192/// through the consumer context.
193async fn run_pubsub_consumer(
194    config: RedisEndpointConfig,
195    channels: Vec<String>,
196    patterns: Vec<String>,
197    ctx: ConsumerContext,
198    cancel_token: CancellationToken,
199) -> Result<(), CamelError> {
200    info!("PubSub consumer connecting to {}", config.redis_url());
201
202    // Create dedicated PubSub connection
203    let client = redis::Client::open(config.redis_url())
204        .map_err(|e| CamelError::ProcessorError(format!("Failed to create Redis client: {}", e)))?;
205
206    let mut pubsub = client.get_async_pubsub().await.map_err(|e| {
207        CamelError::ProcessorError(format!("Failed to create PubSub connection: {}", e))
208    })?;
209
210    // Subscribe to channels
211    for channel in &channels {
212        info!("Subscribing to channel: {}", channel);
213        pubsub.subscribe(channel).await.map_err(|e| {
214            CamelError::ProcessorError(format!("Failed to subscribe to channel {}: {}", channel, e))
215        })?;
216    }
217
218    // Subscribe to patterns
219    for pattern in &patterns {
220        info!("Subscribing to pattern: {}", pattern);
221        pubsub.psubscribe(pattern).await.map_err(|e| {
222            CamelError::ProcessorError(format!("Failed to subscribe to pattern {}: {}", pattern, e))
223        })?;
224    }
225
226    info!("PubSub consumer started, waiting for messages");
227
228    // Message loop
229    let mut stream = pubsub.on_message();
230    loop {
231        tokio::select! {
232            _ = cancel_token.cancelled() => {
233                info!("PubSub consumer received shutdown signal");
234                break;
235            }
236            msg = stream.next() => {
237                if let Some(msg) = msg {
238                    let exchange = build_exchange_from_pubsub(msg);
239                    if let Err(e) = ctx.send(exchange).await {
240                        error!("Failed to send exchange to pipeline: {}", e);
241                        // Don't break - continue processing messages
242                    }
243                } else {
244                    // Stream ended
245                    warn!("PubSub stream ended");
246                    break;
247                }
248            }
249        }
250    }
251
252    Ok(())
253}
254
255/// Runs a Queue consumer loop using BLPOP or BRPOP.
256///
257/// Creates a dedicated connection and performs blocking list pop operations.
258/// Items are converted to Exchanges and sent through the consumer context.
259async fn run_queue_consumer(
260    config: RedisEndpointConfig,
261    key: String,
262    timeout: u64,
263    pop_command: QueuePopCommand,
264    ctx: ConsumerContext,
265    cancel_token: CancellationToken,
266) -> Result<(), CamelError> {
267    info!(
268        "Queue consumer connecting to {} for key '{}' with {} timeout {}s",
269        config.redis_url(),
270        key,
271        queue_command_name(pop_command),
272        timeout
273    );
274
275    // Create dedicated multiplexed connection
276    let client = redis::Client::open(config.redis_url())
277        .map_err(|e| CamelError::ProcessorError(format!("Failed to create Redis client: {}", e)))?;
278
279    let mut conn = client
280        .get_multiplexed_async_connection()
281        .await
282        .map_err(|e| CamelError::ProcessorError(format!("Failed to create connection: {}", e)))?;
283
284    info!("Queue consumer started, waiting for items");
285
286    // Blocking pop loop (BLPOP/BRPOP)
287    let queue_cmd = queue_command_name(pop_command);
288    loop {
289        tokio::select! {
290            _ = cancel_token.cancelled() => {
291                info!("Queue consumer received shutdown signal");
292                break;
293            }
294            result = async {
295                let cmd = redis::cmd(queue_cmd)
296                    .arg(&key)
297                    .arg(timeout)
298                    .to_owned();
299                cmd.query_async::<Option<(String, String)>>(&mut conn).await
300            } =>
301            {
302                match result {
303                    Ok(Some((key, value))) => {
304                        let exchange = build_exchange_from_blpop(key, value);
305                        if let Err(e) = ctx.send(exchange).await {
306                            error!("Failed to send exchange to pipeline: {}", e);
307                            // Don't break - continue processing items
308                        }
309                    }
310                    Ok(None) => {
311                        // Timeout - continue loop
312                        // This is normal for blocking POP with timeout
313                    }
314                    Err(e) => {
315                        if e.is_timeout() {
316                            // Timeout - continue loop silently
317                        } else {
318                            error!("{} error: {}", queue_cmd, e);
319                            tokio::time::sleep(Duration::from_millis(100)).await;
320                        }
321                    }
322                }
323            }
324        }
325    }
326
327    Ok(())
328}
329
330fn build_pubsub_exchange(payload: String, channel: String, pattern: Option<String>) -> Exchange {
331    let mut exchange = Exchange::new(Message::new(Body::Text(payload)));
332    exchange
333        .input
334        .set_header("CamelRedis.Channel", serde_json::Value::String(channel));
335
336    if let Some(pattern) = pattern {
337        exchange
338            .input
339            .set_header("CamelRedis.Pattern", serde_json::Value::String(pattern));
340    }
341
342    exchange
343}
344
345/// Builds an Exchange from a Pub/Sub message.
346///
347/// Sets the following headers:
348/// - `CamelRedis.Channel`: The channel the message was published to
349/// - `CamelRedis.Pattern`: The pattern matched (if applicable, for PSUBSCRIBE)
350fn build_exchange_from_pubsub(msg: Msg) -> Exchange {
351    let payload: String = msg
352        .get_payload()
353        .unwrap_or_else(|_| "<error decoding payload>".to_string());
354    let channel = msg.get_channel_name().to_string();
355    let pattern = if msg.from_pattern() {
356        msg.get_pattern::<String>().ok()
357    } else {
358        None
359    };
360
361    build_pubsub_exchange(payload, channel, pattern)
362}
363
364/// Builds an Exchange from a BLPOP result.
365///
366/// Sets the following headers:
367/// - `CamelRedis.Key`: The list key the item was popped from
368fn build_exchange_from_blpop(key: String, value: String) -> Exchange {
369    let mut exchange = Exchange::new(Message::new(Body::Text(value)));
370
371    // Set key header
372    exchange
373        .input
374        .set_header("CamelRedis.Key", serde_json::Value::String(key));
375
376    exchange
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use tokio::sync::mpsc;
383
384    fn create_test_config(command: RedisCommand) -> RedisEndpointConfig {
385        RedisEndpointConfig {
386            host: Some("localhost".to_string()),
387            port: Some(6379),
388            command,
389            channels: vec!["test".to_string()],
390            key: Some("test-queue".to_string()),
391            timeout: 1,
392            password: None,
393            db: 0,
394        }
395    }
396
397    #[test]
398    fn test_consumer_new_subscribe() {
399        let config = create_test_config(RedisCommand::Subscribe);
400        let consumer = RedisConsumer::new(config);
401
402        match consumer.mode {
403            RedisConsumerMode::PubSub { channels, patterns } => {
404                assert_eq!(channels, vec!["test".to_string()]);
405                assert!(patterns.is_empty());
406            }
407            _ => panic!("Expected PubSub mode"),
408        }
409    }
410
411    #[test]
412    fn test_consumer_new_psubscribe() {
413        let config = create_test_config(RedisCommand::Psubscribe);
414        let consumer = RedisConsumer::new(config);
415
416        match consumer.mode {
417            RedisConsumerMode::PubSub { channels, patterns } => {
418                assert!(channels.is_empty());
419                assert_eq!(patterns, vec!["test".to_string()]);
420            }
421            _ => panic!("Expected PubSub mode"),
422        }
423    }
424
425    #[test]
426    fn test_consumer_new_blpop() {
427        let config = create_test_config(RedisCommand::Blpop);
428        let consumer = RedisConsumer::new(config);
429
430        match consumer.mode {
431            RedisConsumerMode::Queue {
432                key,
433                timeout,
434                pop_command,
435            } => {
436                assert_eq!(key, "test-queue");
437                assert_eq!(timeout, 1);
438                assert_eq!(pop_command, QueuePopCommand::Blpop);
439            }
440            _ => panic!("Expected Queue mode"),
441        }
442    }
443
444    #[test]
445    fn test_consumer_new_brpop_uses_right_pop_command() {
446        let config = create_test_config(RedisCommand::Brpop);
447        let consumer = RedisConsumer::new(config);
448
449        match consumer.mode {
450            RedisConsumerMode::Queue { pop_command, .. } => {
451                assert_eq!(pop_command, QueuePopCommand::Brpop);
452            }
453            _ => panic!("Expected Queue mode"),
454        }
455    }
456
457    #[test]
458    fn test_consumer_new_blpop_default_key() {
459        let mut config = create_test_config(RedisCommand::Blpop);
460        config.key = None;
461        let consumer = RedisConsumer::new(config);
462
463        match consumer.mode {
464            RedisConsumerMode::Queue {
465                key, pop_command, ..
466            } => {
467                assert_eq!(key, "queue");
468                assert_eq!(pop_command, QueuePopCommand::Blpop);
469            }
470            _ => panic!("Expected Queue mode"),
471        }
472    }
473
474    #[test]
475    fn test_consumer_new_non_consumer_command_defaults_to_queue_mode() {
476        let config = create_test_config(RedisCommand::Set);
477        let consumer = RedisConsumer::new(config);
478
479        match consumer.mode {
480            RedisConsumerMode::Queue {
481                key,
482                timeout,
483                pop_command,
484            } => {
485                assert_eq!(key, "test-queue");
486                assert_eq!(timeout, 1);
487                assert_eq!(pop_command, QueuePopCommand::Blpop);
488            }
489            _ => panic!("Expected Queue mode"),
490        }
491    }
492
493    #[test]
494    fn test_queue_command_name_matches_pop_side() {
495        assert_eq!(queue_command_name(QueuePopCommand::Blpop), "BLPOP");
496        assert_eq!(queue_command_name(QueuePopCommand::Brpop), "BRPOP");
497    }
498
499    #[test]
500    fn test_consumer_concurrency_model_is_sequential() {
501        let config = create_test_config(RedisCommand::Subscribe);
502        let consumer = RedisConsumer::new(config);
503        assert_eq!(consumer.concurrency_model(), ConcurrencyModel::Sequential);
504    }
505
506    #[test]
507    fn test_build_exchange_from_blpop() {
508        let exchange = build_exchange_from_blpop("mykey".to_string(), "myvalue".to_string());
509
510        assert_eq!(exchange.input.body.as_text(), Some("myvalue"));
511
512        let header = exchange.input.header("CamelRedis.Key");
513        assert_eq!(
514            header,
515            Some(&serde_json::Value::String("mykey".to_string()))
516        );
517    }
518
519    #[test]
520    fn test_build_pubsub_exchange_without_pattern() {
521        let exchange = build_pubsub_exchange("hello".to_string(), "news".to_string(), None);
522
523        assert_eq!(exchange.input.body.as_text(), Some("hello"));
524        assert_eq!(
525            exchange.input.header("CamelRedis.Channel"),
526            Some(&serde_json::json!("news"))
527        );
528        assert!(exchange.input.header("CamelRedis.Pattern").is_none());
529    }
530
531    #[test]
532    fn test_build_pubsub_exchange_with_pattern() {
533        let exchange = build_pubsub_exchange(
534            "hello".to_string(),
535            "news.eu".to_string(),
536            Some("news.*".to_string()),
537        );
538
539        assert_eq!(
540            exchange.input.header("CamelRedis.Pattern"),
541            Some(&serde_json::json!("news.*"))
542        );
543    }
544
545    #[tokio::test]
546    async fn test_consumer_stops_gracefully() {
547        let config = create_test_config(RedisCommand::Blpop);
548        let mut consumer = RedisConsumer::new(config);
549
550        // Create a mock context (won't actually be used in this test)
551        let (tx, _rx) = mpsc::channel(16);
552        let cancel_token = CancellationToken::new();
553        let ctx = ConsumerContext::new(tx, cancel_token.clone());
554
555        // Start should succeed
556        let start_result = consumer.start(ctx).await;
557        assert!(start_result.is_ok());
558
559        // Give task a moment to start
560        tokio::time::sleep(Duration::from_millis(10)).await;
561
562        // Stop should succeed
563        let stop_result = consumer.stop().await;
564        assert!(stop_result.is_ok());
565    }
566}