1use awa_model::{AwaError, JobArgs, JobRow};
6use awa_worker::{JobContext, JobError, JobResult, Worker};
7use sqlx::PgPool;
8use std::any::Any;
9use std::collections::HashMap;
10use std::sync::atomic::AtomicBool;
11use std::sync::Arc;
12
13pub struct TestClient {
17 pool: PgPool,
18}
19
20impl TestClient {
21 pub async fn from_pool(pool: PgPool) -> Self {
23 Self { pool }
24 }
25
26 pub fn pool(&self) -> &PgPool {
28 &self.pool
29 }
30
31 pub async fn migrate(&self) -> Result<(), AwaError> {
33 awa_model::migrations::run(&self.pool).await
34 }
35
36 pub async fn clean(&self) -> Result<(), AwaError> {
38 sqlx::query("DELETE FROM awa.jobs")
39 .execute(&self.pool)
40 .await?;
41 sqlx::query("DELETE FROM awa.queue_meta")
42 .execute(&self.pool)
43 .await?;
44 Ok(())
45 }
46
47 pub async fn insert(&self, args: &impl JobArgs) -> Result<JobRow, AwaError> {
49 awa_model::insert(&self.pool, args).await
50 }
51
52 pub async fn work_one<W: Worker>(&self, worker: &W) -> Result<WorkResult, AwaError> {
57 self.work_one_in_queue(worker, None).await
58 }
59
60 pub async fn work_one_in_queue<W: Worker>(
62 &self,
63 worker: &W,
64 queue: Option<&str>,
65 ) -> Result<WorkResult, AwaError> {
66 let jobs: Vec<JobRow> = sqlx::query_as::<_, JobRow>(
68 r#"
69 WITH claimed AS (
70 SELECT id FROM awa.jobs
71 WHERE state = 'available' AND kind = $1
72 AND ($2::text IS NULL OR queue = $2)
73 ORDER BY run_at ASC, id ASC
74 LIMIT 1
75 FOR UPDATE SKIP LOCKED
76 )
77 UPDATE awa.jobs
78 SET state = 'running',
79 attempt = attempt + 1,
80 attempted_at = now(),
81 heartbeat_at = now(),
82 deadline_at = now() + interval '5 minutes'
83 FROM claimed
84 WHERE awa.jobs.id = claimed.id
85 RETURNING awa.jobs.*
86 "#,
87 )
88 .bind(worker.kind())
89 .bind(queue)
90 .fetch_all(&self.pool)
91 .await?;
92
93 let job = match jobs.into_iter().next() {
94 Some(job) => job,
95 None => return Ok(WorkResult::NoJob),
96 };
97
98 let cancel = Arc::new(AtomicBool::new(false));
99 let state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>> =
100 Arc::new(HashMap::new());
101 let ctx = JobContext::new(job.clone(), cancel, state);
102
103 let result = worker.perform(&job, &ctx).await;
104
105 match &result {
107 Ok(JobResult::Completed) => {
108 sqlx::query(
109 "UPDATE awa.jobs SET state = 'completed', finalized_at = now() WHERE id = $1",
110 )
111 .bind(job.id)
112 .execute(&self.pool)
113 .await?;
114 Ok(WorkResult::Completed(job))
115 }
116 Ok(JobResult::Cancel(reason)) => {
117 sqlx::query(
118 "UPDATE awa.jobs SET state = 'cancelled', finalized_at = now() WHERE id = $1",
119 )
120 .bind(job.id)
121 .execute(&self.pool)
122 .await?;
123 Ok(WorkResult::Cancelled(job, reason.clone()))
124 }
125 Ok(JobResult::RetryAfter(_)) | Err(JobError::Retryable(_)) => {
126 sqlx::query(
127 "UPDATE awa.jobs SET state = 'retryable', finalized_at = now() WHERE id = $1",
128 )
129 .bind(job.id)
130 .execute(&self.pool)
131 .await?;
132 Ok(WorkResult::Retryable(job))
133 }
134 Ok(JobResult::Snooze(_)) => {
135 sqlx::query(
136 "UPDATE awa.jobs SET state = 'available', attempt = attempt - 1 WHERE id = $1",
137 )
138 .bind(job.id)
139 .execute(&self.pool)
140 .await?;
141 Ok(WorkResult::Snoozed(job))
142 }
143 Err(JobError::Terminal(msg)) => {
144 sqlx::query(
145 "UPDATE awa.jobs SET state = 'failed', finalized_at = now() WHERE id = $1",
146 )
147 .bind(job.id)
148 .execute(&self.pool)
149 .await?;
150 Ok(WorkResult::Failed(job, msg.clone()))
151 }
152 }
153 }
154
155 pub async fn get_job(&self, job_id: i64) -> Result<JobRow, AwaError> {
157 awa_model::admin::get_job(&self.pool, job_id).await
158 }
159}
160
161#[derive(Debug)]
163pub enum WorkResult {
164 NoJob,
166 Completed(JobRow),
168 Retryable(JobRow),
170 Snoozed(JobRow),
172 Cancelled(JobRow, String),
174 Failed(JobRow, String),
176}
177
178impl WorkResult {
179 pub fn is_completed(&self) -> bool {
180 matches!(self, WorkResult::Completed(_))
181 }
182
183 pub fn is_failed(&self) -> bool {
184 matches!(self, WorkResult::Failed(_, _))
185 }
186
187 pub fn is_no_job(&self) -> bool {
188 matches!(self, WorkResult::NoJob)
189 }
190}