awa-testing 0.5.0-alpha.0

Test utilities for the Awa job queue
Documentation
//! Test utilities for Awa job queue.
//!
//! Provides `TestClient` for integration testing of job handlers.

pub mod setup;

use awa_model::{AwaError, JobArgs, JobRow};
use awa_worker::context::ProgressState;
use awa_worker::{JobContext, JobError, JobResult, Worker};
use sqlx::PgPool;
use std::any::Any;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

/// Test client for working with jobs in tests.
///
/// Provides helper methods for inserting jobs and executing them synchronously.
pub struct TestClient {
    pool: PgPool,
}

impl TestClient {
    /// Create a test client from an existing pool.
    pub async fn from_pool(pool: PgPool) -> Self {
        Self { pool }
    }

    /// Get the underlying pool.
    pub fn pool(&self) -> &PgPool {
        &self.pool
    }

    /// Run migrations (call this in test setup).
    pub async fn migrate(&self) -> Result<(), AwaError> {
        awa_model::migrations::run(&self.pool).await
    }

    /// Clean the awa schema (for test isolation).
    pub async fn clean(&self) -> Result<(), AwaError> {
        sqlx::query("DELETE FROM awa.jobs")
            .execute(&self.pool)
            .await?;
        sqlx::query("DELETE FROM awa.queue_meta")
            .execute(&self.pool)
            .await?;
        Ok(())
    }

    /// Insert a job.
    pub async fn insert(&self, args: &impl JobArgs) -> Result<JobRow, AwaError> {
        awa_model::insert(&self.pool, args).await
    }

    /// Claim and execute a single job of type T using the given worker.
    ///
    /// This overload does NOT filter by queue, so it may pick up jobs from any
    /// queue. Prefer `work_one_in_queue` for test isolation.
    pub async fn work_one<W: Worker>(&self, worker: &W) -> Result<WorkResult, AwaError> {
        self.work_one_in_queue(worker, None).await
    }

    /// Claim and execute a single job, optionally filtered by queue.
    pub async fn work_one_in_queue<W: Worker>(
        &self,
        worker: &W,
        queue: Option<&str>,
    ) -> Result<WorkResult, AwaError> {
        // Claim one job
        let jobs: Vec<JobRow> = sqlx::query_as::<_, JobRow>(
            r#"
            WITH claimed AS (
                SELECT id FROM awa.jobs
                WHERE state = 'available' AND kind = $1
                  AND ($2::text IS NULL OR queue = $2)
                ORDER BY run_at ASC, id ASC
                LIMIT 1
                FOR UPDATE SKIP LOCKED
            )
            UPDATE awa.jobs
            SET state = 'running',
                attempt = attempt + 1,
                run_lease = run_lease + 1,
                attempted_at = now(),
                heartbeat_at = now(),
                deadline_at = now() + interval '5 minutes'
            FROM claimed
            WHERE awa.jobs.id = claimed.id
            RETURNING awa.jobs.*
            "#,
        )
        .bind(worker.kind())
        .bind(queue)
        .fetch_all(&self.pool)
        .await?;

        let job = match jobs.into_iter().next() {
            Some(job) => job,
            None => return Ok(WorkResult::NoJob),
        };

        let cancel = Arc::new(AtomicBool::new(false));
        let state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>> =
            Arc::new(HashMap::new());
        let progress = Arc::new(std::sync::Mutex::new(ProgressState::new(
            job.progress.clone(),
        )));
        let ctx = JobContext::new(
            job.clone(),
            cancel,
            state,
            self.pool.clone(),
            progress.clone(),
        );

        let result = worker.perform(&ctx).await;

        // Snapshot progress from the buffer after handler execution
        let progress_snapshot: Option<serde_json::Value> = {
            let guard = progress.lock().expect("progress lock poisoned");
            guard.clone_latest()
        };

        // Update job state based on result
        match &result {
            Ok(JobResult::Completed) => {
                sqlx::query(
                    "UPDATE awa.jobs SET state = 'completed', finalized_at = now(), progress = NULL WHERE id = $1",
                )
                .bind(job.id)
                .execute(&self.pool)
                .await?;
                Ok(WorkResult::Completed(job))
            }
            Ok(JobResult::Cancel(reason)) => {
                sqlx::query(
                    "UPDATE awa.jobs SET state = 'cancelled', finalized_at = now(), progress = $2 WHERE id = $1",
                )
                .bind(job.id)
                .bind(&progress_snapshot)
                .execute(&self.pool)
                .await?;
                Ok(WorkResult::Cancelled(job, reason.clone()))
            }
            Ok(JobResult::RetryAfter(_)) | Err(JobError::Retryable(_)) => {
                sqlx::query(
                    "UPDATE awa.jobs SET state = 'retryable', finalized_at = now(), progress = $2 WHERE id = $1",
                )
                .bind(job.id)
                .bind(&progress_snapshot)
                .execute(&self.pool)
                .await?;
                Ok(WorkResult::Retryable(job))
            }
            Ok(JobResult::Snooze(_)) => {
                sqlx::query(
                    "UPDATE awa.jobs SET state = 'available', attempt = attempt - 1, progress = $2 WHERE id = $1",
                )
                .bind(job.id)
                .bind(&progress_snapshot)
                .execute(&self.pool)
                .await?;
                Ok(WorkResult::Snoozed(job))
            }
            Ok(JobResult::WaitForCallback(_)) => {
                // Check if callback_id was registered
                let has_callback: Option<(Option<uuid::Uuid>,)> =
                    sqlx::query_as("SELECT callback_id FROM awa.jobs WHERE id = $1")
                        .bind(job.id)
                        .fetch_optional(&self.pool)
                        .await?;
                match has_callback {
                    Some((Some(_),)) => {
                        sqlx::query(
                            "UPDATE awa.jobs SET state = 'waiting_external', heartbeat_at = NULL, deadline_at = NULL, progress = $2 WHERE id = $1",
                        )
                        .bind(job.id)
                        .bind(&progress_snapshot)
                        .execute(&self.pool)
                        .await?;
                        let updated = self.get_job(job.id).await?;
                        Ok(WorkResult::WaitingExternal(updated))
                    }
                    _ => {
                        sqlx::query(
                            "UPDATE awa.jobs SET state = 'failed', finalized_at = now() WHERE id = $1",
                        )
                        .bind(job.id)
                        .execute(&self.pool)
                        .await?;
                        Ok(WorkResult::Failed(
                            job,
                            "WaitForCallback returned without calling register_callback"
                                .to_string(),
                        ))
                    }
                }
            }
            Err(JobError::Terminal(msg)) => {
                sqlx::query(
                    "UPDATE awa.jobs SET state = 'failed', finalized_at = now(), progress = $2 WHERE id = $1",
                )
                .bind(job.id)
                .bind(&progress_snapshot)
                .execute(&self.pool)
                .await?;
                Ok(WorkResult::Failed(job, msg.clone()))
            }
        }
    }

    /// Get a job by ID.
    pub async fn get_job(&self, job_id: i64) -> Result<JobRow, AwaError> {
        awa_model::admin::get_job(&self.pool, job_id).await
    }
}

/// Result of `work_one`.
#[derive(Debug)]
pub enum WorkResult {
    /// No job was available.
    NoJob,
    /// Job completed successfully.
    Completed(JobRow),
    /// Job was retried.
    Retryable(JobRow),
    /// Job was snoozed.
    Snoozed(JobRow),
    /// Job was cancelled.
    Cancelled(JobRow, String),
    /// Job failed terminally.
    Failed(JobRow, String),
    /// Job is waiting for an external callback.
    WaitingExternal(JobRow),
}

impl WorkResult {
    pub fn is_completed(&self) -> bool {
        matches!(self, WorkResult::Completed(_))
    }

    pub fn is_failed(&self) -> bool {
        matches!(self, WorkResult::Failed(_, _))
    }

    pub fn is_no_job(&self) -> bool {
        matches!(self, WorkResult::NoJob)
    }

    pub fn is_waiting_external(&self) -> bool {
        matches!(self, WorkResult::WaitingExternal(_))
    }
}