1use async_trait::async_trait;
2use camel_component_api::{Body, CamelError, Exchange, Message};
3use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
4use futures_util::StreamExt;
5use redis::Msg;
6use std::time::Duration;
7use tokio::task::JoinHandle;
8use tokio_util::sync::CancellationToken;
9use tracing::{error, info, warn};
10
11use crate::config::{RedisCommand, RedisEndpointConfig};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum QueuePopCommand {
15 Blpop,
16 Brpop,
17}
18
19fn queue_command_name(pop_command: QueuePopCommand) -> &'static str {
20 match pop_command {
21 QueuePopCommand::Blpop => "BLPOP",
22 QueuePopCommand::Brpop => "BRPOP",
23 }
24}
25
26#[derive(Debug, Clone)]
28pub enum RedisConsumerMode {
29 PubSub {
31 channels: Vec<String>,
33 patterns: Vec<String>,
35 },
36 Queue {
38 key: String,
40 timeout: u64,
42 pop_command: QueuePopCommand,
44 },
45}
46
47pub struct RedisConsumer {
49 config: RedisEndpointConfig,
50 mode: RedisConsumerMode,
51 cancel_token: Option<CancellationToken>,
53 task_handle: Option<JoinHandle<Result<(), CamelError>>>,
55}
56
57impl RedisConsumer {
58 pub fn new(config: RedisEndpointConfig) -> Self {
65 let mode = match &config.command {
66 RedisCommand::Subscribe => RedisConsumerMode::PubSub {
67 channels: config.channels.clone(),
68 patterns: vec![],
69 },
70 RedisCommand::Psubscribe => RedisConsumerMode::PubSub {
71 channels: vec![],
72 patterns: config.channels.clone(),
73 },
74 RedisCommand::Blpop | RedisCommand::Brpop => {
75 let key = config.key.clone().unwrap_or_else(|| "queue".to_string());
76 let pop_command = if config.command == RedisCommand::Brpop {
77 QueuePopCommand::Brpop
78 } else {
79 QueuePopCommand::Blpop
80 };
81 RedisConsumerMode::Queue {
82 key,
83 timeout: config.timeout,
84 pop_command,
85 }
86 }
87 _ => {
88 warn!(
89 "Invalid consumer command: {:?}, defaulting to BLPOP",
90 config.command
91 );
92 RedisConsumerMode::Queue {
93 key: config.key.clone().unwrap_or_else(|| "queue".to_string()),
94 timeout: config.timeout,
95 pop_command: QueuePopCommand::Blpop,
96 }
97 }
98 };
99
100 Self {
101 config,
102 mode,
103 cancel_token: None,
104 task_handle: None,
105 }
106 }
107}
108
109#[async_trait]
110impl Consumer for RedisConsumer {
111 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
112 let cancel_token = CancellationToken::new();
114 self.cancel_token = Some(cancel_token.clone());
115
116 let config = self.config.clone();
118 let mode = self.mode.clone();
119
120 info!("Starting Redis consumer in {:?} mode", mode);
121
122 let handle =
124 match mode {
125 RedisConsumerMode::PubSub { channels, patterns } => tokio::spawn(
126 run_pubsub_consumer(config, channels, patterns, ctx, cancel_token),
127 ),
128 RedisConsumerMode::Queue {
129 key,
130 timeout,
131 pop_command,
132 } => tokio::spawn(run_queue_consumer(
133 config,
134 key,
135 timeout,
136 pop_command,
137 ctx,
138 cancel_token,
139 )),
140 };
141
142 self.task_handle = Some(handle);
143 Ok(())
144 }
145
146 async fn stop(&mut self) -> Result<(), CamelError> {
147 info!("Stopping Redis consumer");
148
149 if let Some(token) = &self.cancel_token {
151 token.cancel();
152 }
153
154 if let Some(handle) = self.task_handle.take() {
156 match handle.await {
157 Ok(result) => {
158 if let Err(e) = result {
159 error!("Consumer task exited with error: {}", e);
160 }
161 }
162 Err(e) => {
163 error!("Failed to join consumer task: {}", e);
164 }
165 }
166 }
167
168 self.cancel_token = None;
169 info!("Redis consumer stopped");
170 Ok(())
171 }
172
173 fn concurrency_model(&self) -> ConcurrencyModel {
184 ConcurrencyModel::Sequential
185 }
186}
187
188async fn run_pubsub_consumer(
194 config: RedisEndpointConfig,
195 channels: Vec<String>,
196 patterns: Vec<String>,
197 ctx: ConsumerContext,
198 cancel_token: CancellationToken,
199) -> Result<(), CamelError> {
200 info!("PubSub consumer connecting to {}", config.redis_url());
201
202 let client = redis::Client::open(config.redis_url())
204 .map_err(|e| CamelError::ProcessorError(format!("Failed to create Redis client: {}", e)))?;
205
206 let mut pubsub = client.get_async_pubsub().await.map_err(|e| {
207 CamelError::ProcessorError(format!("Failed to create PubSub connection: {}", e))
208 })?;
209
210 for channel in &channels {
212 info!("Subscribing to channel: {}", channel);
213 pubsub.subscribe(channel).await.map_err(|e| {
214 CamelError::ProcessorError(format!("Failed to subscribe to channel {}: {}", channel, e))
215 })?;
216 }
217
218 for pattern in &patterns {
220 info!("Subscribing to pattern: {}", pattern);
221 pubsub.psubscribe(pattern).await.map_err(|e| {
222 CamelError::ProcessorError(format!("Failed to subscribe to pattern {}: {}", pattern, e))
223 })?;
224 }
225
226 info!("PubSub consumer started, waiting for messages");
227
228 let mut stream = pubsub.on_message();
230 loop {
231 tokio::select! {
232 _ = cancel_token.cancelled() => {
233 info!("PubSub consumer received shutdown signal");
234 break;
235 }
236 msg = stream.next() => {
237 if let Some(msg) = msg {
238 let exchange = build_exchange_from_pubsub(msg);
239 if let Err(e) = ctx.send(exchange).await {
240 error!("Failed to send exchange to pipeline: {}", e);
241 }
243 } else {
244 warn!("PubSub stream ended");
246 break;
247 }
248 }
249 }
250 }
251
252 Ok(())
253}
254
255async fn run_queue_consumer(
260 config: RedisEndpointConfig,
261 key: String,
262 timeout: u64,
263 pop_command: QueuePopCommand,
264 ctx: ConsumerContext,
265 cancel_token: CancellationToken,
266) -> Result<(), CamelError> {
267 info!(
268 "Queue consumer connecting to {} for key '{}' with {} timeout {}s",
269 config.redis_url(),
270 key,
271 queue_command_name(pop_command),
272 timeout
273 );
274
275 let client = redis::Client::open(config.redis_url())
277 .map_err(|e| CamelError::ProcessorError(format!("Failed to create Redis client: {}", e)))?;
278
279 let mut conn = client
280 .get_multiplexed_async_connection()
281 .await
282 .map_err(|e| CamelError::ProcessorError(format!("Failed to create connection: {}", e)))?;
283
284 info!("Queue consumer started, waiting for items");
285
286 let queue_cmd = queue_command_name(pop_command);
288 loop {
289 tokio::select! {
290 _ = cancel_token.cancelled() => {
291 info!("Queue consumer received shutdown signal");
292 break;
293 }
294 result = async {
295 let cmd = redis::cmd(queue_cmd)
296 .arg(&key)
297 .arg(timeout)
298 .to_owned();
299 cmd.query_async::<Option<(String, String)>>(&mut conn).await
300 } =>
301 {
302 match result {
303 Ok(Some((key, value))) => {
304 let exchange = build_exchange_from_blpop(key, value);
305 if let Err(e) = ctx.send(exchange).await {
306 error!("Failed to send exchange to pipeline: {}", e);
307 }
309 }
310 Ok(None) => {
311 }
314 Err(e) => {
315 if e.is_timeout() {
316 } else {
318 error!("{} error: {}", queue_cmd, e);
319 tokio::time::sleep(Duration::from_millis(100)).await;
320 }
321 }
322 }
323 }
324 }
325 }
326
327 Ok(())
328}
329
330fn build_pubsub_exchange(payload: String, channel: String, pattern: Option<String>) -> Exchange {
331 let mut exchange = Exchange::new(Message::new(Body::Text(payload)));
332 exchange
333 .input
334 .set_header("CamelRedis.Channel", serde_json::Value::String(channel));
335
336 if let Some(pattern) = pattern {
337 exchange
338 .input
339 .set_header("CamelRedis.Pattern", serde_json::Value::String(pattern));
340 }
341
342 exchange
343}
344
345fn build_exchange_from_pubsub(msg: Msg) -> Exchange {
351 let payload: String = msg
352 .get_payload()
353 .unwrap_or_else(|_| "<error decoding payload>".to_string());
354 let channel = msg.get_channel_name().to_string();
355 let pattern = if msg.from_pattern() {
356 msg.get_pattern::<String>().ok()
357 } else {
358 None
359 };
360
361 build_pubsub_exchange(payload, channel, pattern)
362}
363
364fn build_exchange_from_blpop(key: String, value: String) -> Exchange {
369 let mut exchange = Exchange::new(Message::new(Body::Text(value)));
370
371 exchange
373 .input
374 .set_header("CamelRedis.Key", serde_json::Value::String(key));
375
376 exchange
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use tokio::sync::mpsc;
383
384 fn create_test_config(command: RedisCommand) -> RedisEndpointConfig {
385 RedisEndpointConfig {
386 host: Some("localhost".to_string()),
387 port: Some(6379),
388 command,
389 channels: vec!["test".to_string()],
390 key: Some("test-queue".to_string()),
391 timeout: 1,
392 password: None,
393 db: 0,
394 }
395 }
396
397 #[test]
398 fn test_consumer_new_subscribe() {
399 let config = create_test_config(RedisCommand::Subscribe);
400 let consumer = RedisConsumer::new(config);
401
402 match consumer.mode {
403 RedisConsumerMode::PubSub { channels, patterns } => {
404 assert_eq!(channels, vec!["test".to_string()]);
405 assert!(patterns.is_empty());
406 }
407 _ => panic!("Expected PubSub mode"),
408 }
409 }
410
411 #[test]
412 fn test_consumer_new_psubscribe() {
413 let config = create_test_config(RedisCommand::Psubscribe);
414 let consumer = RedisConsumer::new(config);
415
416 match consumer.mode {
417 RedisConsumerMode::PubSub { channels, patterns } => {
418 assert!(channels.is_empty());
419 assert_eq!(patterns, vec!["test".to_string()]);
420 }
421 _ => panic!("Expected PubSub mode"),
422 }
423 }
424
425 #[test]
426 fn test_consumer_new_blpop() {
427 let config = create_test_config(RedisCommand::Blpop);
428 let consumer = RedisConsumer::new(config);
429
430 match consumer.mode {
431 RedisConsumerMode::Queue {
432 key,
433 timeout,
434 pop_command,
435 } => {
436 assert_eq!(key, "test-queue");
437 assert_eq!(timeout, 1);
438 assert_eq!(pop_command, QueuePopCommand::Blpop);
439 }
440 _ => panic!("Expected Queue mode"),
441 }
442 }
443
444 #[test]
445 fn test_consumer_new_brpop_uses_right_pop_command() {
446 let config = create_test_config(RedisCommand::Brpop);
447 let consumer = RedisConsumer::new(config);
448
449 match consumer.mode {
450 RedisConsumerMode::Queue { pop_command, .. } => {
451 assert_eq!(pop_command, QueuePopCommand::Brpop);
452 }
453 _ => panic!("Expected Queue mode"),
454 }
455 }
456
457 #[test]
458 fn test_consumer_new_blpop_default_key() {
459 let mut config = create_test_config(RedisCommand::Blpop);
460 config.key = None;
461 let consumer = RedisConsumer::new(config);
462
463 match consumer.mode {
464 RedisConsumerMode::Queue {
465 key, pop_command, ..
466 } => {
467 assert_eq!(key, "queue");
468 assert_eq!(pop_command, QueuePopCommand::Blpop);
469 }
470 _ => panic!("Expected Queue mode"),
471 }
472 }
473
474 #[test]
475 fn test_consumer_new_non_consumer_command_defaults_to_queue_mode() {
476 let config = create_test_config(RedisCommand::Set);
477 let consumer = RedisConsumer::new(config);
478
479 match consumer.mode {
480 RedisConsumerMode::Queue {
481 key,
482 timeout,
483 pop_command,
484 } => {
485 assert_eq!(key, "test-queue");
486 assert_eq!(timeout, 1);
487 assert_eq!(pop_command, QueuePopCommand::Blpop);
488 }
489 _ => panic!("Expected Queue mode"),
490 }
491 }
492
493 #[test]
494 fn test_queue_command_name_matches_pop_side() {
495 assert_eq!(queue_command_name(QueuePopCommand::Blpop), "BLPOP");
496 assert_eq!(queue_command_name(QueuePopCommand::Brpop), "BRPOP");
497 }
498
499 #[test]
500 fn test_consumer_concurrency_model_is_sequential() {
501 let config = create_test_config(RedisCommand::Subscribe);
502 let consumer = RedisConsumer::new(config);
503 assert_eq!(consumer.concurrency_model(), ConcurrencyModel::Sequential);
504 }
505
506 #[test]
507 fn test_build_exchange_from_blpop() {
508 let exchange = build_exchange_from_blpop("mykey".to_string(), "myvalue".to_string());
509
510 assert_eq!(exchange.input.body.as_text(), Some("myvalue"));
511
512 let header = exchange.input.header("CamelRedis.Key");
513 assert_eq!(
514 header,
515 Some(&serde_json::Value::String("mykey".to_string()))
516 );
517 }
518
519 #[test]
520 fn test_build_pubsub_exchange_without_pattern() {
521 let exchange = build_pubsub_exchange("hello".to_string(), "news".to_string(), None);
522
523 assert_eq!(exchange.input.body.as_text(), Some("hello"));
524 assert_eq!(
525 exchange.input.header("CamelRedis.Channel"),
526 Some(&serde_json::json!("news"))
527 );
528 assert!(exchange.input.header("CamelRedis.Pattern").is_none());
529 }
530
531 #[test]
532 fn test_build_pubsub_exchange_with_pattern() {
533 let exchange = build_pubsub_exchange(
534 "hello".to_string(),
535 "news.eu".to_string(),
536 Some("news.*".to_string()),
537 );
538
539 assert_eq!(
540 exchange.input.header("CamelRedis.Pattern"),
541 Some(&serde_json::json!("news.*"))
542 );
543 }
544
545 #[tokio::test]
546 async fn test_consumer_stops_gracefully() {
547 let config = create_test_config(RedisCommand::Blpop);
548 let mut consumer = RedisConsumer::new(config);
549
550 let (tx, _rx) = mpsc::channel(16);
552 let cancel_token = CancellationToken::new();
553 let ctx = ConsumerContext::new(tx, cancel_token.clone());
554
555 let start_result = consumer.start(ctx).await;
557 assert!(start_result.is_ok());
558
559 tokio::time::sleep(Duration::from_millis(10)).await;
561
562 let stop_result = consumer.stop().await;
564 assert!(stop_result.is_ok());
565 }
566}