use crate::jobs::{Job, JobError, JobResult};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Clone)]
pub struct TestJobQueue {
jobs: Arc<Mutex<VecDeque<Box<dyn JobWrapper>>>>,
completed: Arc<Mutex<Vec<String>>>,
failed: Arc<Mutex<Vec<(String, String)>>>,
}
impl Default for TestJobQueue {
fn default() -> Self {
Self::new()
}
}
impl TestJobQueue {
#[must_use]
pub fn new() -> Self {
Self {
jobs: Arc::new(Mutex::new(VecDeque::new())),
completed: Arc::new(Mutex::new(Vec::new())),
failed: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn enqueue<J: Job + Clone + Send + Sync + 'static>(&self, job: J) {
let wrapper = Box::new(TypedJobWrapper { job });
self.jobs.lock().unwrap().push_back(wrapper);
}
pub async fn execute_next(&self) -> Option<Result<(), JobError>> {
let job = self.jobs.lock().unwrap().pop_front()?;
let job_name = job.name();
match job.execute_boxed().await {
Ok(()) => {
self.completed.lock().unwrap().push(job_name);
Some(Ok(()))
}
Err(e) => {
self.failed
.lock()
.unwrap()
.push((job_name, e.to_string()));
Some(Err(e))
}
}
}
pub async fn execute_all(&self) -> Result<(), JobError> {
while let Some(result) = self.execute_next().await {
result?;
}
Ok(())
}
#[must_use]
pub fn len(&self) -> usize {
self.jobs.lock().unwrap().len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.jobs.lock().unwrap().is_empty()
}
#[must_use]
pub fn completed_count(&self) -> usize {
self.completed.lock().unwrap().len()
}
#[must_use]
pub fn failed_count(&self) -> usize {
self.failed.lock().unwrap().len()
}
#[must_use]
pub fn completed_jobs(&self) -> Vec<String> {
self.completed.lock().unwrap().clone()
}
#[must_use]
pub fn failed_jobs(&self) -> Vec<(String, String)> {
self.failed.lock().unwrap().clone()
}
pub fn clear_history(&self) {
self.completed.lock().unwrap().clear();
self.failed.lock().unwrap().clear();
}
}
trait JobWrapper: Send {
fn execute_boxed(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = JobResult<()>> + Send + '_>>;
fn name(&self) -> String;
}
struct TypedJobWrapper<J: Job> {
job: J,
}
impl<J: Job + Send + Sync> JobWrapper for TypedJobWrapper<J> {
fn execute_boxed(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = JobResult<()>> + Send + '_>> {
Box::pin(async move {
let ctx = crate::jobs::JobContext::new();
self.job.execute(&ctx).await?;
Ok(())
})
}
fn name(&self) -> String {
std::any::type_name::<J>().to_string()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TestJob {
pub id: String,
pub should_succeed: bool,
#[serde(default)]
pub delay_ms: Option<u64>,
}
impl TestJob {
#[must_use]
pub const fn new(id: String, should_succeed: bool) -> Self {
Self {
id,
should_succeed,
delay_ms: None,
}
}
#[must_use]
pub const fn with_delay(id: String, should_succeed: bool, delay_ms: u64) -> Self {
Self {
id,
should_succeed,
delay_ms: Some(delay_ms),
}
}
}
#[async_trait]
impl Job for TestJob {
type Result = String;
async fn execute(&self, _ctx: &crate::jobs::JobContext) -> JobResult<Self::Result> {
if let Some(delay) = self.delay_ms {
tokio::time::sleep(Duration::from_millis(delay)).await;
}
if self.should_succeed {
Ok(format!("Success: {}", self.id))
} else {
Err(JobError::ExecutionFailed(format!(
"Intentional failure: {}",
self.id
)))
}
}
fn max_retries(&self) -> u32 {
3
}
fn timeout(&self) -> Duration {
Duration::from_secs(30)
}
fn priority(&self) -> i32 {
128
}
}
pub async fn assert_job_succeeds<J: Job>(job: J)
where
J::Result: std::fmt::Debug,
{
let ctx = crate::jobs::JobContext::new();
let result = job.execute(&ctx).await;
assert!(
result.is_ok(),
"Job should succeed but failed with: {:?}",
result.unwrap_err()
);
}
pub async fn assert_job_fails<J: Job>(job: J) {
let ctx = crate::jobs::JobContext::new();
let result = job.execute(&ctx).await;
assert!(result.is_err(), "Job should fail but succeeded");
}
pub async fn assert_job_completes_within<J: Job>(job: J, timeout: Duration) {
let ctx = crate::jobs::JobContext::new();
let result = tokio::time::timeout(timeout, job.execute(&ctx)).await;
assert!(
result.is_ok(),
"Job should complete within {timeout:?} but timed out"
);
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_queue_enqueue_and_execute() {
let queue = TestJobQueue::new();
let job = TestJob::new("test1".to_string(), true);
queue.enqueue(job);
assert_eq!(queue.len(), 1);
let result = queue.execute_next().await;
assert!(result.is_some());
assert!(result.unwrap().is_ok());
assert_eq!(queue.len(), 0);
assert_eq!(queue.completed_count(), 1);
}
#[tokio::test]
async fn test_queue_failed_job() {
let queue = TestJobQueue::new();
let job = TestJob::new("test1".to_string(), false);
queue.enqueue(job);
let result = queue.execute_next().await;
assert!(result.is_some());
assert!(result.unwrap().is_err());
assert_eq!(queue.failed_count(), 1);
}
#[tokio::test]
async fn test_queue_execute_all() {
let queue = TestJobQueue::new();
queue.enqueue(TestJob::new("test1".to_string(), true));
queue.enqueue(TestJob::new("test2".to_string(), true));
queue.enqueue(TestJob::new("test3".to_string(), true));
let result = queue.execute_all().await;
assert!(result.is_ok());
assert_eq!(queue.completed_count(), 3);
}
#[tokio::test]
async fn test_successful_job() {
let job = TestJob::new("test".to_string(), true);
assert_job_succeeds(job).await;
}
#[tokio::test]
async fn test_failing_job() {
let job = TestJob::new("test".to_string(), false);
assert_job_fails(job).await;
}
#[tokio::test]
async fn test_job_with_delay() {
let job = TestJob::with_delay("test".to_string(), true, 50);
assert_job_completes_within(job, Duration::from_millis(100)).await;
}
}