use async_trait::async_trait;
use queue_workers::{
error::QueueWorkerError,
job::Job,
queue::{Queue, QueueType},
redis_queue::RedisQueue,
};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::time::sleep;
fn get_redis_url() -> String {
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TestJob {
id: String,
payload: String,
}
#[async_trait]
impl Job for TestJob {
type Output = String;
type Error = String;
async fn execute(&self) -> Result<Self::Output, Self::Error> {
Ok(format!(
"Processed job {} with payload {}",
self.id, self.payload
))
}
}
#[tokio::test]
async fn test_redis_queue_push_pop() {
let redis_url = get_redis_url();
let queue =
RedisQueue::<TestJob>::new(&redis_url, &format!("test_queue-{}", uuid::Uuid::new_v4()))
.expect("Failed to create Redis queue");
let job = TestJob {
id: "test-1".to_string(),
payload: "test payload".to_string(),
};
queue.push(job).await.expect("Failed to push job");
let retrieved_job = queue.pop().await.expect("Failed to pop job");
assert_eq!(retrieved_job.id, "test-1");
assert_eq!(retrieved_job.payload, "test payload");
let empty_result = queue.pop().await;
assert!(matches!(
empty_result,
Err(QueueWorkerError::JobNotFound(_))
));
}
#[tokio::test]
async fn test_queue_types() {
let redis_url = get_redis_url();
let fifo_queue = RedisQueue::<TestJob>::new(&redis_url, "test_fifo_queue")
.expect("Failed to create FIFO queue");
let lifo_queue =
RedisQueue::<TestJob>::with_type(&redis_url, "test_lifo_queue", QueueType::LIFO)
.expect("Failed to create LIFO queue");
let jobs = vec![
TestJob {
id: "1".to_string(),
payload: "first".to_string(),
},
TestJob {
id: "2".to_string(),
payload: "second".to_string(),
},
TestJob {
id: "3".to_string(),
payload: "third".to_string(),
},
];
for job in jobs {
fifo_queue
.push(job.clone())
.await
.expect("Failed to push to FIFO queue");
lifo_queue
.push(job)
.await
.expect("Failed to push to LIFO queue");
}
let first = fifo_queue
.pop()
.await
.expect("Failed to pop from FIFO queue");
assert_eq!(first.id, "1");
let second = fifo_queue
.pop()
.await
.expect("Failed to pop from FIFO queue");
assert_eq!(second.id, "2");
let third = fifo_queue
.pop()
.await
.expect("Failed to pop from FIFO queue");
assert_eq!(third.id, "3");
let first = lifo_queue
.pop()
.await
.expect("Failed to pop from LIFO queue");
assert_eq!(first.id, "3");
let second = lifo_queue
.pop()
.await
.expect("Failed to pop from LIFO queue");
assert_eq!(second.id, "2");
let third = lifo_queue
.pop()
.await
.expect("Failed to pop from LIFO queue");
assert_eq!(third.id, "1");
}
#[tokio::test]
async fn test_invalid_redis_url() {
let result = RedisQueue::<TestJob>::new("invalid-url", "test_queue");
assert!(matches!(result, Err(QueueWorkerError::ConnectionError(_))));
}
#[tokio::test]
async fn test_empty_queue_name() {
let redis_url = get_redis_url();
let result = RedisQueue::<TestJob>::new(&redis_url, "");
assert!(matches!(result, Err(QueueWorkerError::InvalidJobData(_))));
}
#[tokio::test]
async fn test_concurrent_queue_access() {
let redis_url = get_redis_url();
let queue = RedisQueue::<TestJob>::new(&redis_url, "test_concurrent")
.expect("Failed to create Redis queue");
let mut handles = vec![];
let job_count = 100;
for i in 0..job_count {
let queue_clone = queue.clone();
let handle = tokio::spawn(async move {
let job = TestJob {
id: format!("concurrent-{}", i),
payload: "test".to_string(),
};
queue_clone.push(job).await
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("Task failed").expect("Push failed");
}
let mut received_count = 0;
while let Ok(_) = queue.pop().await {
received_count += 1;
}
assert_eq!(received_count, job_count);
}
#[tokio::test]
async fn test_queue_persistence() {
let redis_url = get_redis_url();
let queue_name = "test_persistence";
let queue1 =
RedisQueue::<TestJob>::new(&redis_url, queue_name).expect("Failed to create first queue");
let job = TestJob {
id: "persist-1".to_string(),
payload: "test payload".to_string(),
};
queue1.push(job.clone()).await.expect("Failed to push job");
let queue2 =
RedisQueue::<TestJob>::new(&redis_url, queue_name).expect("Failed to create second queue");
let received_job = queue2.pop().await.expect("Failed to pop job");
assert_eq!(received_job.id, job.id);
assert_eq!(received_job.payload, job.payload);
}
#[tokio::test]
async fn test_queue_behavior_under_load() {
let redis_url = get_redis_url();
let queue =
RedisQueue::<TestJob>::new(&redis_url, "test_load").expect("Failed to create Redis queue");
for i in 0..1000 {
let job = TestJob {
id: format!("load-{}", i),
payload: "test".to_string(),
};
queue.push(job).await.expect("Failed to push job");
}
let mut count = 0;
while let Ok(_) = queue.pop().await {
count += 1;
sleep(Duration::from_micros(10)).await;
}
assert_eq!(count, 1000);
}
#[tokio::test]
async fn test_queue_empty_behavior() {
let redis_url = get_redis_url();
let queue =
RedisQueue::<TestJob>::new(&redis_url, "test_empty").expect("Failed to create Redis queue");
let result = queue.pop().await;
assert!(matches!(result, Err(QueueWorkerError::JobNotFound(_))));
}
#[tokio::test]
async fn test_queue_type_switching() {
let redis_url = get_redis_url();
let queue_name = "test_switching";
let fifo_queue = RedisQueue::<TestJob>::with_type(&redis_url, queue_name, QueueType::FIFO)
.expect("Failed to create FIFO queue");
for i in 0..3 {
let job = TestJob {
id: format!("switch-{}", i),
payload: "test".to_string(),
};
fifo_queue.push(job).await.expect("Failed to push job");
}
let lifo_queue = RedisQueue::<TestJob>::with_type(&redis_url, queue_name, QueueType::LIFO)
.expect("Failed to create LIFO queue");
let job1 = lifo_queue.pop().await.expect("Failed to pop job 1");
assert_eq!(job1.id, "switch-2");
}