#[cfg(all(feature = "redis", feature = "test-fairness"))]
use broccoli_queue::queue::PublishOptions;
#[cfg(all(feature = "redis", feature = "test-fairness"))]
use redis::AsyncCommands;
#[cfg(all(feature = "redis", feature = "test-fairness"))]
use serde::{Deserialize, Serialize};
#[cfg(all(feature = "redis", feature = "test-fairness"))]
use time::Duration;
mod common;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[cfg(all(feature = "redis", feature = "test-fairness"))]
struct TestMessage {
id: String,
content: String,
}
#[tokio::test]
#[cfg(all(feature = "redis", feature = "test-fairness"))]
async fn test_fairness_round_robin() {
let queue = common::setup_queue().await;
#[cfg(feature = "redis")]
let mut redis = common::get_redis_client().await;
let test_topic = "test_fairness_topic";
let messages = vec![
("job-1", "message 1 from job 1"),
("job-1", "message 2 from job 1"),
("job-2", "message 1 from job 2"),
("job-2", "message 2 from job 2"),
("job-3", "message 1 from job 3"),
("job-3", "message 2 from job 3"),
];
for (job_id, content) in messages {
let message = TestMessage {
id: job_id.to_string(),
content: content.to_string(),
};
queue
.publish(test_topic, Some(String::from(job_id)), &message, None)
.await
.expect("Failed to publish message");
}
#[cfg(feature = "redis")]
{
let set_exists: bool = redis
.exists(format!("{}_fairness_set", test_topic))
.await
.unwrap();
assert!(set_exists, "Fairness set should exist");
let round_robin_exists: bool = redis
.exists(format!("{}_fairness_round_robin", test_topic))
.await
.unwrap();
assert!(round_robin_exists, "Round robin list should exist");
for job_id in ["job-1", "job-2", "job-3"] {
let queue_exists: bool = redis
.exists(format!("{}_{}_queue", test_topic, job_id))
.await
.unwrap();
assert!(queue_exists, "Job queue for {} should exist", job_id);
}
}
let mut consumed_messages = Vec::new();
for _ in 0..6 {
#[cfg(not(feature = "test-fairness"))]
let consume_options = ConsumeOptions::default();
#[cfg(feature = "test-fairness")]
let consume_options = ConsumeOptionsBuilder::new().fairness(true).build();
let msg = queue
.consume::<TestMessage>(test_topic, Some(consume_options))
.await
.expect("Failed to consume message");
consumed_messages.push((msg.payload.id.clone(), msg.payload.content.clone()));
queue
.acknowledge(test_topic, msg)
.await
.expect("Failed to acknowledge message");
}
assert_eq!(consumed_messages[0].0, "job-1");
assert_eq!(consumed_messages[1].0, "job-2");
assert_eq!(consumed_messages[2].0, "job-3");
assert_eq!(consumed_messages[3].0, "job-1");
assert_eq!(consumed_messages[4].0, "job-2");
assert_eq!(consumed_messages[5].0, "job-3");
}
#[tokio::test]
#[cfg(all(feature = "redis", feature = "test-fairness"))]
async fn test_fairness_with_priorities() {
let queue = common::setup_queue().await;
let test_topic = "test_fairness_priority_topic";
#[cfg(feature = "redis")]
let mut redis = common::get_redis_client().await;
let messages = vec![
("job-1", 5, "low priority from job 1"),
("job-2", 1, "high priority from job 2"),
("job-1", 3, "medium priority from job 1"),
("job-2", 3, "medium priority from job 2"),
];
for (job_id, priority, content) in messages {
let message = TestMessage {
id: job_id.to_string(),
content: content.to_string(),
};
let options = PublishOptions::builder().priority(priority).build();
let published = queue
.publish(
test_topic,
Some(String::from(job_id)),
&message,
Some(options),
)
.await
.expect("Failed to publish message");
#[cfg(feature = "redis")]
{
let priority_stored: String = redis
.hget(published.task_id.to_string(), "priority")
.await
.unwrap();
assert_eq!(
priority_stored,
priority.to_string(),
"Priority should be stored correctly"
);
}
}
let mut consumed_messages = Vec::new();
for _ in 0..4 {
#[cfg(not(feature = "test-fairness"))]
let consume_options = ConsumeOptions::default();
#[cfg(feature = "test-fairness")]
let consume_options = ConsumeOptionsBuilder::new().fairness(true).build();
let msg = queue
.consume::<TestMessage>(test_topic, Some(consume_options))
.await
.expect("Failed to consume message");
consumed_messages.push((msg.payload.id.clone(), msg.payload.content.clone()));
queue
.acknowledge(test_topic, msg)
.await
.expect("Failed to acknowledge message");
}
#[cfg(feature = "redis")]
{
for job_id in ["job-1", "job-2"] {
let queue_name = format!("{}_{}_queue", test_topic, job_id);
let queue_len: usize = redis.zcard(&queue_name).await.unwrap();
assert_eq!(queue_len, 0, "Queue should be empty after consumption");
let processing_queue = format!("{}_{}_processing", test_topic, job_id);
let proc_len: usize = redis.llen(&processing_queue).await.unwrap();
assert_eq!(proc_len, 0, "Processing queue should be empty");
}
}
assert!(
consumed_messages[0].1.contains("high priority from job 2")
|| consumed_messages[1].1.contains("high priority from job 2")
);
assert!(
consumed_messages[2].1.contains("medium priority")
&& consumed_messages[3].1.contains("low priority")
|| consumed_messages[2].1.contains("low priority")
&& consumed_messages[3].1.contains("medium priority")
);
}
#[tokio::test]
#[cfg(all(feature = "redis", feature = "test-fairness"))]
async fn test_fairness_with_delayed_messages() {
let queue = common::setup_queue().await;
let test_topic = "test_fairness_delay_topic";
#[cfg(feature = "redis")]
let mut redis = common::get_redis_client().await;
let immediate_msg = TestMessage {
id: "job-1".to_string(),
content: "immediate from job 1".to_string(),
};
queue
.publish(
test_topic,
Some(String::from("job-1")),
&immediate_msg,
None,
)
.await
.expect("Failed to publish immediate message");
let delayed_msg = TestMessage {
id: "job-2".to_string(),
content: "delayed from job 2".to_string(),
};
let options = PublishOptions::builder()
.delay(Duration::seconds(2))
.build();
queue
.publish(
test_topic,
Some(String::from("job-2")),
&delayed_msg,
Some(options),
)
.await
.expect("Failed to publish delayed message");
#[cfg(feature = "redis")]
{
let queue_name = format!("{}_job-1_queue", test_topic);
let queue_len: usize = redis.zcard(&queue_name).await.unwrap();
assert_eq!(queue_len, 1, "Queue should have one message");
let queue_name_2 = format!("{}_job-2_queue", test_topic);
let queue_len_2: usize = redis.zcard(&queue_name_2).await.unwrap();
assert_eq!(queue_len_2, 1, "Queue should have one message");
let scores_1: Vec<(String, f64)> = redis
.zrangebyscore_withscores(&queue_name, "-inf", "+inf")
.await
.unwrap();
let scores_2: Vec<(String, f64)> = redis
.zrangebyscore_withscores(&queue_name_2, "-inf", "+inf")
.await
.unwrap();
assert!(
scores_2[0].1 > scores_1[0].1,
"Delayed message should have higher score"
);
}
#[cfg(not(feature = "test-fairness"))]
let consume_options = ConsumeOptions::default();
#[cfg(feature = "test-fairness")]
let consume_options = ConsumeOptionsBuilder::new().fairness(true).build();
let first = queue
.consume::<TestMessage>(test_topic, Some(consume_options.clone()))
.await
.expect("Failed to consume first message");
assert_eq!(first.payload.content, "immediate from job 1");
queue
.acknowledge(test_topic, first)
.await
.expect("Failed to acknowledge first message");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let second = queue
.consume::<TestMessage>(test_topic, Some(consume_options.clone()))
.await
.expect("Failed to consume second message");
assert_eq!(second.payload.content, "delayed from job 2");
queue
.acknowledge(test_topic, second)
.await
.expect("Failed to acknowledge second message");
#[cfg(feature = "redis")]
{
for job_id in ["job-1", "job-2"] {
let queue_name = format!("{}_{}_queue", test_topic, job_id);
let queue_len: usize = redis.zcard(&queue_name).await.unwrap();
assert_eq!(queue_len, 0, "Queue should be empty after consumption");
let processing_queue = format!("{}_{}_processing", test_topic, job_id);
let proc_len: usize = redis.llen(&processing_queue).await.unwrap();
assert_eq!(proc_len, 0, "Processing queue should be empty");
}
}
}
#[tokio::test]
#[cfg(all(feature = "redis", feature = "test-fairness"))]
async fn test_fairness_with_retries() {
let queue = common::setup_queue().await;
let test_topic = "test_fairness_retry_topic";
#[cfg(feature = "redis")]
let mut redis = common::get_redis_client().await;
let messages = vec![
("job-1", "message from job 1"),
("job-2", "message from job 2"),
];
for (job_id, content) in messages {
let message = TestMessage {
id: job_id.to_string(),
content: content.to_string(),
};
queue
.publish(test_topic, Some(String::from(job_id)), &message, None)
.await
.expect("Failed to publish message");
}
#[cfg(not(feature = "test-fairness"))]
let consume_options = ConsumeOptions::default();
#[cfg(feature = "test-fairness")]
let consume_options = ConsumeOptionsBuilder::new().fairness(true).build();
for _ in 0..2 {
for _ in 0..3 {
let msg = queue
.consume::<TestMessage>(test_topic, Some(consume_options.clone()))
.await
.expect("Failed to consume message");
queue
.reject(test_topic, msg)
.await
.expect("Failed to reject message");
}
}
let result = queue
.try_consume::<TestMessage>(test_topic, Some(consume_options.clone()))
.await
.expect("Failed to try consume");
assert!(result.is_none());
#[cfg(feature = "redis")]
{
for job_id in ["job-1", "job-2"] {
let failed_queue = format!("{}_{}_failed", test_topic, job_id);
let failed_len: usize = redis.llen(&failed_queue).await.unwrap();
assert_eq!(
failed_len, 1,
"Failed queue for {} should have one message",
job_id
);
let queue_name = format!("{}_{}_queue", test_topic, job_id);
let queue_len: usize = redis.zcard(&queue_name).await.unwrap();
assert_eq!(queue_len, 0, "Queue should be empty after retries");
let processing_queue = format!("{}_{}_processing", test_topic, job_id);
let proc_len: usize = redis.llen(&processing_queue).await.unwrap();
assert_eq!(proc_len, 0, "Processing queue should be empty");
}
}
}