Skip to main content

awa_testing/
lib.rs

1//! Test utilities for Awa job queue.
2//!
3//! Provides `TestClient` for integration testing of job handlers.
4
5pub mod setup;
6
7use awa_model::{AwaError, JobArgs, JobRow};
8use awa_worker::context::ProgressState;
9use awa_worker::{JobContext, JobError, JobResult, Worker};
10use sqlx::PgPool;
11use std::any::Any;
12use std::collections::HashMap;
13use std::sync::atomic::AtomicBool;
14use std::sync::Arc;
15
16/// Test client for working with jobs in tests.
17///
18/// Provides helper methods for inserting jobs and executing them synchronously.
19pub struct TestClient {
20    pool: PgPool,
21}
22
23impl TestClient {
24    /// Create a test client from an existing pool.
25    pub async fn from_pool(pool: PgPool) -> Self {
26        Self { pool }
27    }
28
29    /// Get the underlying pool.
30    pub fn pool(&self) -> &PgPool {
31        &self.pool
32    }
33
34    /// Run migrations (call this in test setup).
35    pub async fn migrate(&self) -> Result<(), AwaError> {
36        awa_model::migrations::run(&self.pool).await
37    }
38
39    /// Clean the awa schema (for test isolation).
40    pub async fn clean(&self) -> Result<(), AwaError> {
41        sqlx::query("DELETE FROM awa.jobs")
42            .execute(&self.pool)
43            .await?;
44        sqlx::query("DELETE FROM awa.queue_meta")
45            .execute(&self.pool)
46            .await?;
47        Ok(())
48    }
49
50    /// Insert a job.
51    pub async fn insert(&self, args: &impl JobArgs) -> Result<JobRow, AwaError> {
52        awa_model::insert(&self.pool, args).await
53    }
54
55    /// Claim and execute a single job of type T using the given worker.
56    ///
57    /// This overload does NOT filter by queue, so it may pick up jobs from any
58    /// queue. Prefer `work_one_in_queue` for test isolation.
59    pub async fn work_one<W: Worker>(&self, worker: &W) -> Result<WorkResult, AwaError> {
60        self.work_one_in_queue(worker, None).await
61    }
62
63    /// Claim and execute a single job, optionally filtered by queue.
64    pub async fn work_one_in_queue<W: Worker>(
65        &self,
66        worker: &W,
67        queue: Option<&str>,
68    ) -> Result<WorkResult, AwaError> {
69        // Claim one job
70        let jobs: Vec<JobRow> = sqlx::query_as::<_, JobRow>(
71            r#"
72            WITH claimed AS (
73                SELECT id FROM awa.jobs
74                WHERE state = 'available' AND kind = $1
75                  AND ($2::text IS NULL OR queue = $2)
76                ORDER BY run_at ASC, id ASC
77                LIMIT 1
78                FOR UPDATE SKIP LOCKED
79            )
80            UPDATE awa.jobs
81            SET state = 'running',
82                attempt = attempt + 1,
83                run_lease = run_lease + 1,
84                attempted_at = now(),
85                heartbeat_at = now(),
86                deadline_at = now() + interval '5 minutes'
87            FROM claimed
88            WHERE awa.jobs.id = claimed.id
89            RETURNING awa.jobs.*
90            "#,
91        )
92        .bind(worker.kind())
93        .bind(queue)
94        .fetch_all(&self.pool)
95        .await?;
96
97        let job = match jobs.into_iter().next() {
98            Some(job) => job,
99            None => return Ok(WorkResult::NoJob),
100        };
101
102        let cancel = Arc::new(AtomicBool::new(false));
103        let state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>> =
104            Arc::new(HashMap::new());
105        let progress = Arc::new(std::sync::Mutex::new(ProgressState::new(
106            job.progress.clone(),
107        )));
108        let ctx = JobContext::new(
109            job.clone(),
110            cancel,
111            state,
112            self.pool.clone(),
113            progress.clone(),
114        );
115
116        let result = worker.perform(&ctx).await;
117
118        // Snapshot progress from the buffer after handler execution
119        let progress_snapshot: Option<serde_json::Value> = {
120            let guard = progress.lock().expect("progress lock poisoned");
121            guard.clone_latest()
122        };
123
124        // Update job state based on result
125        match &result {
126            Ok(JobResult::Completed) => {
127                sqlx::query(
128                    "UPDATE awa.jobs SET state = 'completed', finalized_at = now(), progress = NULL WHERE id = $1",
129                )
130                .bind(job.id)
131                .execute(&self.pool)
132                .await?;
133                Ok(WorkResult::Completed(job))
134            }
135            Ok(JobResult::Cancel(reason)) => {
136                sqlx::query(
137                    "UPDATE awa.jobs SET state = 'cancelled', finalized_at = now(), progress = $2 WHERE id = $1",
138                )
139                .bind(job.id)
140                .bind(&progress_snapshot)
141                .execute(&self.pool)
142                .await?;
143                Ok(WorkResult::Cancelled(job, reason.clone()))
144            }
145            Ok(JobResult::RetryAfter(_)) | Err(JobError::Retryable(_)) => {
146                sqlx::query(
147                    "UPDATE awa.jobs SET state = 'retryable', finalized_at = now(), progress = $2 WHERE id = $1",
148                )
149                .bind(job.id)
150                .bind(&progress_snapshot)
151                .execute(&self.pool)
152                .await?;
153                Ok(WorkResult::Retryable(job))
154            }
155            Ok(JobResult::Snooze(_)) => {
156                sqlx::query(
157                    "UPDATE awa.jobs SET state = 'available', attempt = attempt - 1, progress = $2 WHERE id = $1",
158                )
159                .bind(job.id)
160                .bind(&progress_snapshot)
161                .execute(&self.pool)
162                .await?;
163                Ok(WorkResult::Snoozed(job))
164            }
165            Ok(JobResult::WaitForCallback(_)) => {
166                // Check if callback_id was registered
167                let has_callback: Option<(Option<uuid::Uuid>,)> =
168                    sqlx::query_as("SELECT callback_id FROM awa.jobs WHERE id = $1")
169                        .bind(job.id)
170                        .fetch_optional(&self.pool)
171                        .await?;
172                match has_callback {
173                    Some((Some(_),)) => {
174                        sqlx::query(
175                            "UPDATE awa.jobs SET state = 'waiting_external', heartbeat_at = NULL, deadline_at = NULL, progress = $2 WHERE id = $1",
176                        )
177                        .bind(job.id)
178                        .bind(&progress_snapshot)
179                        .execute(&self.pool)
180                        .await?;
181                        let updated = self.get_job(job.id).await?;
182                        Ok(WorkResult::WaitingExternal(updated))
183                    }
184                    _ => {
185                        sqlx::query(
186                            "UPDATE awa.jobs SET state = 'failed', finalized_at = now() WHERE id = $1",
187                        )
188                        .bind(job.id)
189                        .execute(&self.pool)
190                        .await?;
191                        Ok(WorkResult::Failed(
192                            job,
193                            "WaitForCallback returned without calling register_callback"
194                                .to_string(),
195                        ))
196                    }
197                }
198            }
199            Err(JobError::Terminal(msg)) => {
200                sqlx::query(
201                    "UPDATE awa.jobs SET state = 'failed', finalized_at = now(), progress = $2 WHERE id = $1",
202                )
203                .bind(job.id)
204                .bind(&progress_snapshot)
205                .execute(&self.pool)
206                .await?;
207                Ok(WorkResult::Failed(job, msg.clone()))
208            }
209        }
210    }
211
212    /// Get a job by ID.
213    pub async fn get_job(&self, job_id: i64) -> Result<JobRow, AwaError> {
214        awa_model::admin::get_job(&self.pool, job_id).await
215    }
216}
217
218/// Result of `work_one`.
219#[derive(Debug)]
220pub enum WorkResult {
221    /// No job was available.
222    NoJob,
223    /// Job completed successfully.
224    Completed(JobRow),
225    /// Job was retried.
226    Retryable(JobRow),
227    /// Job was snoozed.
228    Snoozed(JobRow),
229    /// Job was cancelled.
230    Cancelled(JobRow, String),
231    /// Job failed terminally.
232    Failed(JobRow, String),
233    /// Job is waiting for an external callback.
234    WaitingExternal(JobRow),
235}
236
237impl WorkResult {
238    pub fn is_completed(&self) -> bool {
239        matches!(self, WorkResult::Completed(_))
240    }
241
242    pub fn is_failed(&self) -> bool {
243        matches!(self, WorkResult::Failed(_, _))
244    }
245
246    pub fn is_no_job(&self) -> bool {
247        matches!(self, WorkResult::NoJob)
248    }
249
250    pub fn is_waiting_external(&self) -> bool {
251        matches!(self, WorkResult::WaitingExternal(_))
252    }
253}