Skip to main content

awa_testing/
lib.rs

1//! Test utilities for Awa job queue.
2//!
3//! Provides `TestClient` for integration testing of job handlers.
4
5use awa_model::{AwaError, JobArgs, JobRow};
6use awa_worker::{JobContext, JobError, JobResult, Worker};
7use sqlx::PgPool;
8use std::any::Any;
9use std::collections::HashMap;
10use std::sync::atomic::AtomicBool;
11use std::sync::Arc;
12
13/// Test client for working with jobs in tests.
14///
15/// Provides helper methods for inserting jobs and executing them synchronously.
16pub struct TestClient {
17    pool: PgPool,
18}
19
20impl TestClient {
21    /// Create a test client from an existing pool.
22    pub async fn from_pool(pool: PgPool) -> Self {
23        Self { pool }
24    }
25
26    /// Get the underlying pool.
27    pub fn pool(&self) -> &PgPool {
28        &self.pool
29    }
30
31    /// Run migrations (call this in test setup).
32    pub async fn migrate(&self) -> Result<(), AwaError> {
33        awa_model::migrations::run(&self.pool).await
34    }
35
36    /// Clean the awa schema (for test isolation).
37    pub async fn clean(&self) -> Result<(), AwaError> {
38        sqlx::query("DELETE FROM awa.jobs")
39            .execute(&self.pool)
40            .await?;
41        sqlx::query("DELETE FROM awa.queue_meta")
42            .execute(&self.pool)
43            .await?;
44        Ok(())
45    }
46
47    /// Insert a job.
48    pub async fn insert(&self, args: &impl JobArgs) -> Result<JobRow, AwaError> {
49        awa_model::insert(&self.pool, args).await
50    }
51
52    /// Claim and execute a single job of type T using the given worker.
53    ///
54    /// This overload does NOT filter by queue, so it may pick up jobs from any
55    /// queue. Prefer `work_one_in_queue` for test isolation.
56    pub async fn work_one<W: Worker>(&self, worker: &W) -> Result<WorkResult, AwaError> {
57        self.work_one_in_queue(worker, None).await
58    }
59
60    /// Claim and execute a single job, optionally filtered by queue.
61    pub async fn work_one_in_queue<W: Worker>(
62        &self,
63        worker: &W,
64        queue: Option<&str>,
65    ) -> Result<WorkResult, AwaError> {
66        // Claim one job
67        let jobs: Vec<JobRow> = sqlx::query_as::<_, JobRow>(
68            r#"
69            WITH claimed AS (
70                SELECT id FROM awa.jobs
71                WHERE state = 'available' AND kind = $1
72                  AND ($2::text IS NULL OR queue = $2)
73                ORDER BY run_at ASC, id ASC
74                LIMIT 1
75                FOR UPDATE SKIP LOCKED
76            )
77            UPDATE awa.jobs
78            SET state = 'running',
79                attempt = attempt + 1,
80                attempted_at = now(),
81                heartbeat_at = now(),
82                deadline_at = now() + interval '5 minutes'
83            FROM claimed
84            WHERE awa.jobs.id = claimed.id
85            RETURNING awa.jobs.*
86            "#,
87        )
88        .bind(worker.kind())
89        .bind(queue)
90        .fetch_all(&self.pool)
91        .await?;
92
93        let job = match jobs.into_iter().next() {
94            Some(job) => job,
95            None => return Ok(WorkResult::NoJob),
96        };
97
98        let cancel = Arc::new(AtomicBool::new(false));
99        let state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>> =
100            Arc::new(HashMap::new());
101        let ctx = JobContext::new(job.clone(), cancel, state);
102
103        let result = worker.perform(&job, &ctx).await;
104
105        // Update job state based on result
106        match &result {
107            Ok(JobResult::Completed) => {
108                sqlx::query(
109                    "UPDATE awa.jobs SET state = 'completed', finalized_at = now() WHERE id = $1",
110                )
111                .bind(job.id)
112                .execute(&self.pool)
113                .await?;
114                Ok(WorkResult::Completed(job))
115            }
116            Ok(JobResult::Cancel(reason)) => {
117                sqlx::query(
118                    "UPDATE awa.jobs SET state = 'cancelled', finalized_at = now() WHERE id = $1",
119                )
120                .bind(job.id)
121                .execute(&self.pool)
122                .await?;
123                Ok(WorkResult::Cancelled(job, reason.clone()))
124            }
125            Ok(JobResult::RetryAfter(_)) | Err(JobError::Retryable(_)) => {
126                sqlx::query(
127                    "UPDATE awa.jobs SET state = 'retryable', finalized_at = now() WHERE id = $1",
128                )
129                .bind(job.id)
130                .execute(&self.pool)
131                .await?;
132                Ok(WorkResult::Retryable(job))
133            }
134            Ok(JobResult::Snooze(_)) => {
135                sqlx::query(
136                    "UPDATE awa.jobs SET state = 'available', attempt = attempt - 1 WHERE id = $1",
137                )
138                .bind(job.id)
139                .execute(&self.pool)
140                .await?;
141                Ok(WorkResult::Snoozed(job))
142            }
143            Err(JobError::Terminal(msg)) => {
144                sqlx::query(
145                    "UPDATE awa.jobs SET state = 'failed', finalized_at = now() WHERE id = $1",
146                )
147                .bind(job.id)
148                .execute(&self.pool)
149                .await?;
150                Ok(WorkResult::Failed(job, msg.clone()))
151            }
152        }
153    }
154
155    /// Get a job by ID.
156    pub async fn get_job(&self, job_id: i64) -> Result<JobRow, AwaError> {
157        awa_model::admin::get_job(&self.pool, job_id).await
158    }
159}
160
161/// Result of `work_one`.
162#[derive(Debug)]
163pub enum WorkResult {
164    /// No job was available.
165    NoJob,
166    /// Job completed successfully.
167    Completed(JobRow),
168    /// Job was retried.
169    Retryable(JobRow),
170    /// Job was snoozed.
171    Snoozed(JobRow),
172    /// Job was cancelled.
173    Cancelled(JobRow, String),
174    /// Job failed terminally.
175    Failed(JobRow, String),
176}
177
178impl WorkResult {
179    pub fn is_completed(&self) -> bool {
180        matches!(self, WorkResult::Completed(_))
181    }
182
183    pub fn is_failed(&self) -> bool {
184        matches!(self, WorkResult::Failed(_, _))
185    }
186
187    pub fn is_no_job(&self) -> bool {
188        matches!(self, WorkResult::NoJob)
189    }
190}