use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use tako::queue::{Job, Queue, QueueError, RetryPolicy};
#[tokio::test]
async fn push_and_process_job() {
let counter = Arc::new(AtomicU32::new(0));
let c = counter.clone();
let queue = Queue::new();
queue.register("inc", move |_job: Job| {
let c = c.clone();
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(())
}
});
queue.start();
queue.push("inc", &()).await.unwrap();
queue.push("inc", &()).await.unwrap();
queue.push("inc", &()).await.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(counter.load(Ordering::SeqCst), 3);
queue.shutdown(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn job_deserializes_payload() {
let received = Arc::new(tokio::sync::Mutex::new(String::new()));
let r = received.clone();
let queue = Queue::new();
queue.register("greet", move |job: Job| {
let r = r.clone();
async move {
let name: String = job.deserialize()?;
*r.lock().await = name;
Ok(())
}
});
queue.start();
queue.push("greet", &"hello world").await.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(*received.lock().await, "hello world");
queue.shutdown(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn job_returns_unique_id() {
let queue = Queue::new();
queue.register("noop", |_: Job| async { Ok(()) });
queue.start();
let id1 = queue.push("noop", &1).await.unwrap();
let id2 = queue.push("noop", &2).await.unwrap();
let id3 = queue.push("noop", &3).await.unwrap();
assert_ne!(id1, id2);
assert_ne!(id2, id3);
assert!(id2 > id1);
assert!(id3 > id2);
queue.shutdown(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn unknown_job_goes_to_dlq() {
let queue = Queue::new();
queue.start();
queue.push("nonexistent", &42).await.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
let dlq = queue.dead_letters();
assert_eq!(dlq.len(), 1);
assert_eq!(dlq[0].name, "nonexistent");
assert_eq!(dlq[0].error, "no handler registered");
queue.shutdown(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn failed_job_no_retry_goes_to_dlq() {
let queue = Queue::new();
queue.register("fail", |_: Job| async {
Err(QueueError::HandlerError("boom".into()))
});
queue.start();
queue.push("fail", &"data").await.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
let dlq = queue.dead_letters();
assert_eq!(dlq.len(), 1);
assert_eq!(dlq[0].name, "fail");
assert!(dlq[0].error.contains("boom"));
assert_eq!(dlq[0].attempts, 1);
queue.shutdown(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn retry_policy_fixed() {
let attempts = Arc::new(AtomicU32::new(0));
let a = attempts.clone();
let queue = Queue::builder()
.workers(1)
.retry(RetryPolicy::fixed(2, Duration::from_millis(50)))
.build();
queue.register("flaky", move |_: Job| {
let a = a.clone();
async move {
let n = a.fetch_add(1, Ordering::SeqCst);
if n < 2 {
Err(QueueError::HandlerError("not yet".into()))
} else {
Ok(()) }
}
});
queue.start();
queue.push("flaky", &()).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(attempts.load(Ordering::SeqCst), 3);
assert!(queue.dead_letters().is_empty());
queue.shutdown(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn retry_policy_exhausted_goes_to_dlq() {
let attempts = Arc::new(AtomicU32::new(0));
let a = attempts.clone();
let queue = Queue::builder()
.workers(1)
.retry(RetryPolicy::fixed(2, Duration::from_millis(50)))
.build();
queue.register("always_fail", move |_: Job| {
let a = a.clone();
async move {
a.fetch_add(1, Ordering::SeqCst);
Err(QueueError::HandlerError("permanent failure".into()))
}
});
queue.start();
queue.push("always_fail", &()).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(attempts.load(Ordering::SeqCst), 3);
let dlq = queue.dead_letters();
assert_eq!(dlq.len(), 1);
assert_eq!(dlq[0].attempts, 3);
queue.shutdown(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn delayed_job() {
let executed_at = Arc::new(tokio::sync::Mutex::new(None));
let e = executed_at.clone();
let pushed_at = std::time::Instant::now();
let queue = Queue::builder().workers(1).build();
queue.register("delayed", move |_: Job| {
let e = e.clone();
async move {
*e.lock().await = Some(std::time::Instant::now());
Ok(())
}
});
queue.start();
queue
.push_delayed("delayed", &(), Duration::from_millis(200))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(executed_at.lock().await.is_none());
tokio::time::sleep(Duration::from_millis(300)).await;
let at = executed_at.lock().await.unwrap();
assert!(at.duration_since(pushed_at) >= Duration::from_millis(200));
queue.shutdown(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn pending_and_inflight_count() {
let barrier = Arc::new(tokio::sync::Barrier::new(2));
let b = barrier.clone();
let queue = Queue::builder().workers(1).build();
queue.register("slow", move |_: Job| {
let b = b.clone();
async move {
b.wait().await;
Ok(())
}
});
queue.start();
queue.push("slow", &()).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(queue.inflight_count(), 1);
barrier.wait().await;
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(queue.inflight_count(), 0);
queue.shutdown(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn clear_dead_letters() {
let queue = Queue::new();
queue.register("fail", |_: Job| async {
Err(QueueError::HandlerError("err".into()))
});
queue.start();
queue.push("fail", &()).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(queue.dead_letters().len(), 1);
queue.clear_dead_letters();
assert!(queue.dead_letters().is_empty());
queue.shutdown(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn shutdown_rejects_new_jobs() {
let queue = Queue::new();
queue.register("noop", |_: Job| async { Ok(()) });
queue.start();
queue.shutdown(Duration::from_secs(1)).await;
let result = queue.push("noop", &()).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), QueueError::Shutdown));
}
#[tokio::test]
async fn multiple_workers_process_concurrently() {
let counter = Arc::new(AtomicU32::new(0));
let max_concurrent = Arc::new(AtomicU32::new(0));
let c = counter.clone();
let m = max_concurrent.clone();
let queue = Queue::builder().workers(4).build();
queue.register("concurrent", move |_: Job| {
let c = c.clone();
let m = m.clone();
async move {
let current = c.fetch_add(1, Ordering::SeqCst) + 1;
m.fetch_max(current, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(100)).await;
c.fetch_sub(1, Ordering::SeqCst);
Ok(())
}
});
queue.start();
for _ in 0..8 {
queue.push("concurrent", &()).await.unwrap();
}
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(max_concurrent.load(Ordering::SeqCst) > 1);
queue.shutdown(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn exponential_backoff_retry() {
let attempts = Arc::new(AtomicU32::new(0));
let a = attempts.clone();
let queue = Queue::builder()
.workers(1)
.retry(RetryPolicy::exponential(2, Duration::from_millis(50)))
.build();
queue.register("exp_fail", move |_: Job| {
let a = a.clone();
async move {
a.fetch_add(1, Ordering::SeqCst);
Err(QueueError::HandlerError("fail".into()))
}
});
queue.start();
queue.push("exp_fail", &()).await.unwrap();
tokio::time::sleep(Duration::from_millis(600)).await;
assert_eq!(attempts.load(Ordering::SeqCst), 3); assert_eq!(queue.dead_letters().len(), 1);
queue.shutdown(Duration::from_secs(1)).await;
}