1pub mod setup;
6
7use awa_model::{AwaError, JobArgs, JobRow};
8use awa_worker::context::ProgressState;
9use awa_worker::{JobContext, JobError, JobResult, Worker};
10use sqlx::PgPool;
11use std::any::Any;
12use std::collections::HashMap;
13use std::sync::atomic::AtomicBool;
14use std::sync::Arc;
15
16pub struct TestClient {
20 pool: PgPool,
21}
22
23impl TestClient {
24 pub async fn from_pool(pool: PgPool) -> Self {
26 Self { pool }
27 }
28
29 pub fn pool(&self) -> &PgPool {
31 &self.pool
32 }
33
34 pub async fn migrate(&self) -> Result<(), AwaError> {
36 awa_model::migrations::run(&self.pool).await
37 }
38
39 pub async fn clean(&self) -> Result<(), AwaError> {
41 sqlx::query("DELETE FROM awa.jobs")
42 .execute(&self.pool)
43 .await?;
44 sqlx::query("DELETE FROM awa.queue_meta")
45 .execute(&self.pool)
46 .await?;
47 Ok(())
48 }
49
50 pub async fn insert(&self, args: &impl JobArgs) -> Result<JobRow, AwaError> {
52 awa_model::insert(&self.pool, args).await
53 }
54
55 pub async fn work_one<W: Worker>(&self, worker: &W) -> Result<WorkResult, AwaError> {
60 self.work_one_in_queue(worker, None).await
61 }
62
63 pub async fn work_one_in_queue<W: Worker>(
65 &self,
66 worker: &W,
67 queue: Option<&str>,
68 ) -> Result<WorkResult, AwaError> {
69 let jobs: Vec<JobRow> = sqlx::query_as::<_, JobRow>(
71 r#"
72 WITH claimed AS (
73 SELECT id FROM awa.jobs
74 WHERE state = 'available' AND kind = $1
75 AND ($2::text IS NULL OR queue = $2)
76 ORDER BY run_at ASC, id ASC
77 LIMIT 1
78 FOR UPDATE SKIP LOCKED
79 )
80 UPDATE awa.jobs
81 SET state = 'running',
82 attempt = attempt + 1,
83 run_lease = run_lease + 1,
84 attempted_at = now(),
85 heartbeat_at = now(),
86 deadline_at = now() + interval '5 minutes'
87 FROM claimed
88 WHERE awa.jobs.id = claimed.id
89 RETURNING awa.jobs.*
90 "#,
91 )
92 .bind(worker.kind())
93 .bind(queue)
94 .fetch_all(&self.pool)
95 .await?;
96
97 let job = match jobs.into_iter().next() {
98 Some(job) => job,
99 None => return Ok(WorkResult::NoJob),
100 };
101
102 let cancel = Arc::new(AtomicBool::new(false));
103 let state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>> =
104 Arc::new(HashMap::new());
105 let progress = Arc::new(std::sync::Mutex::new(ProgressState::new(
106 job.progress.clone(),
107 )));
108 let ctx = JobContext::new(
109 job.clone(),
110 cancel,
111 state,
112 self.pool.clone(),
113 progress.clone(),
114 );
115
116 let result = worker.perform(&ctx).await;
117
118 let progress_snapshot: Option<serde_json::Value> = {
120 let guard = progress.lock().expect("progress lock poisoned");
121 guard.clone_latest()
122 };
123
124 match &result {
126 Ok(JobResult::Completed) => {
127 sqlx::query(
128 "UPDATE awa.jobs SET state = 'completed', finalized_at = now(), progress = NULL WHERE id = $1",
129 )
130 .bind(job.id)
131 .execute(&self.pool)
132 .await?;
133 Ok(WorkResult::Completed(job))
134 }
135 Ok(JobResult::Cancel(reason)) => {
136 sqlx::query(
137 "UPDATE awa.jobs SET state = 'cancelled', finalized_at = now(), progress = $2 WHERE id = $1",
138 )
139 .bind(job.id)
140 .bind(&progress_snapshot)
141 .execute(&self.pool)
142 .await?;
143 Ok(WorkResult::Cancelled(job, reason.clone()))
144 }
145 Ok(JobResult::RetryAfter(_)) | Err(JobError::Retryable(_)) => {
146 sqlx::query(
147 "UPDATE awa.jobs SET state = 'retryable', finalized_at = now(), progress = $2 WHERE id = $1",
148 )
149 .bind(job.id)
150 .bind(&progress_snapshot)
151 .execute(&self.pool)
152 .await?;
153 Ok(WorkResult::Retryable(job))
154 }
155 Ok(JobResult::Snooze(_)) => {
156 sqlx::query(
157 "UPDATE awa.jobs SET state = 'available', attempt = attempt - 1, progress = $2 WHERE id = $1",
158 )
159 .bind(job.id)
160 .bind(&progress_snapshot)
161 .execute(&self.pool)
162 .await?;
163 Ok(WorkResult::Snoozed(job))
164 }
165 Ok(JobResult::WaitForCallback(_)) => {
166 let has_callback: Option<(Option<uuid::Uuid>,)> =
168 sqlx::query_as("SELECT callback_id FROM awa.jobs WHERE id = $1")
169 .bind(job.id)
170 .fetch_optional(&self.pool)
171 .await?;
172 match has_callback {
173 Some((Some(_),)) => {
174 sqlx::query(
175 "UPDATE awa.jobs SET state = 'waiting_external', heartbeat_at = NULL, deadline_at = NULL, progress = $2 WHERE id = $1",
176 )
177 .bind(job.id)
178 .bind(&progress_snapshot)
179 .execute(&self.pool)
180 .await?;
181 let updated = self.get_job(job.id).await?;
182 Ok(WorkResult::WaitingExternal(updated))
183 }
184 _ => {
185 sqlx::query(
186 "UPDATE awa.jobs SET state = 'failed', finalized_at = now() WHERE id = $1",
187 )
188 .bind(job.id)
189 .execute(&self.pool)
190 .await?;
191 Ok(WorkResult::Failed(
192 job,
193 "WaitForCallback returned without calling register_callback"
194 .to_string(),
195 ))
196 }
197 }
198 }
199 Err(JobError::Terminal(msg)) => {
200 sqlx::query(
201 "UPDATE awa.jobs SET state = 'failed', finalized_at = now(), progress = $2 WHERE id = $1",
202 )
203 .bind(job.id)
204 .bind(&progress_snapshot)
205 .execute(&self.pool)
206 .await?;
207 Ok(WorkResult::Failed(job, msg.clone()))
208 }
209 }
210 }
211
212 pub async fn get_job(&self, job_id: i64) -> Result<JobRow, AwaError> {
214 awa_model::admin::get_job(&self.pool, job_id).await
215 }
216}
217
218#[derive(Debug)]
220pub enum WorkResult {
221 NoJob,
223 Completed(JobRow),
225 Retryable(JobRow),
227 Snoozed(JobRow),
229 Cancelled(JobRow, String),
231 Failed(JobRow, String),
233 WaitingExternal(JobRow),
235}
236
237impl WorkResult {
238 pub fn is_completed(&self) -> bool {
239 matches!(self, WorkResult::Completed(_))
240 }
241
242 pub fn is_failed(&self) -> bool {
243 matches!(self, WorkResult::Failed(_, _))
244 }
245
246 pub fn is_no_job(&self) -> bool {
247 matches!(self, WorkResult::NoJob)
248 }
249
250 pub fn is_waiting_external(&self) -> bool {
251 matches!(self, WorkResult::WaitingExternal(_))
252 }
253}