use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use vex_queue::backend::QueueBackend;
use vex_queue::job::{BackoffStrategy, Job, JobResult};
use vex_queue::memory::MemoryQueue;
use vex_queue::worker::WorkerConfig;
#[derive(Debug)]
#[allow(dead_code)]
struct CounterJob {
counter: Arc<AtomicU32>,
should_fail: bool,
fail_times: u32,
}
#[allow(dead_code)]
impl CounterJob {
fn new(counter: Arc<AtomicU32>) -> Self {
Self {
counter,
should_fail: false,
fail_times: 0,
}
}
fn failing(counter: Arc<AtomicU32>, fail_times: u32) -> Self {
Self {
counter,
should_fail: true,
fail_times,
}
}
}
#[async_trait::async_trait]
impl Job for CounterJob {
fn name(&self) -> &str {
"counter_job"
}
async fn execute(&mut self) -> JobResult {
let count = self.counter.fetch_add(1, Ordering::SeqCst);
if self.should_fail && count < self.fail_times {
JobResult::Retry(format!("Failing on attempt {}", count + 1))
} else {
JobResult::Success(None)
}
}
fn max_retries(&self) -> u32 {
5
}
fn backoff_strategy(&self) -> BackoffStrategy {
BackoffStrategy::Constant { secs: 0 } }
}
#[derive(Debug)]
#[allow(dead_code)]
struct FatalJob;
#[async_trait::async_trait]
impl Job for FatalJob {
fn name(&self) -> &str {
"fatal_job"
}
async fn execute(&mut self) -> JobResult {
JobResult::Fatal("This job always fails".to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[tokio::test]
async fn test_worker_config_defaults() {
let config = WorkerConfig::default();
assert!(config.max_concurrency > 0);
assert!(config.poll_interval.as_millis() > 0);
}
#[tokio::test]
async fn test_memory_queue_enqueue_dequeue() {
let queue = MemoryQueue::new();
let tenant_id = "test-tenant";
let id = queue
.enqueue(tenant_id, "test_job", json!({"key": "value"}), None)
.await
.unwrap();
let job = queue.dequeue().await.unwrap();
assert!(job.is_some());
let job = job.unwrap();
assert_eq!(job.id, id);
assert_eq!(job.tenant_id, tenant_id);
assert_eq!(job.job_type, "test_job");
assert_eq!(job.payload["key"], "value");
}
#[tokio::test]
async fn test_memory_queue_fifo_ordering() {
let queue = MemoryQueue::new();
let t1 = "tenant-1";
let id1 = queue.enqueue(t1, "job1", json!({}), None).await.unwrap();
let id2 = queue.enqueue(t1, "job2", json!({}), None).await.unwrap();
let job1 = queue.dequeue().await.unwrap().unwrap();
let job2 = queue.dequeue().await.unwrap().unwrap();
assert_eq!(job1.id, id1);
assert_eq!(job2.id, id2);
}
#[tokio::test]
async fn test_backoff_strategy_constant() {
let strategy = BackoffStrategy::Constant { secs: 5 };
assert_eq!(strategy.delay(0).as_secs(), 5);
assert_eq!(strategy.delay(3).as_secs(), 5);
assert_eq!(strategy.delay(10).as_secs(), 5);
}
#[tokio::test]
async fn test_backoff_strategy_exponential() {
let strategy = BackoffStrategy::Exponential {
initial_secs: 1,
multiplier: 2.0,
};
assert_eq!(strategy.delay(0).as_secs(), 1); assert_eq!(strategy.delay(1).as_secs(), 2); assert_eq!(strategy.delay(2).as_secs(), 4); assert_eq!(strategy.delay(3).as_secs(), 8); }
#[tokio::test]
async fn test_job_result_variants() {
let success = JobResult::Success(None);
let retry = JobResult::Retry("error".to_string());
let fatal = JobResult::Fatal("error".to_string());
assert!(matches!(success, JobResult::Success(_)));
assert!(matches!(retry, JobResult::Retry(_)));
assert!(matches!(fatal, JobResult::Fatal(_)));
}
#[tokio::test]
async fn test_delayed_job_not_immediately_available() {
let queue = MemoryQueue::new();
let t1 = "tenant-1";
let _id = queue
.enqueue(t1, "delayed", json!({}), Some(10))
.await
.unwrap();
let job = queue.dequeue().await.unwrap();
assert!(
job.is_none(),
"Delayed job should not be immediately available"
);
}
#[tokio::test]
async fn test_empty_queue_returns_none() {
let queue = MemoryQueue::new();
let job = queue.dequeue().await.unwrap();
assert!(job.is_none());
}
}