1use crate::error::AwaError;
2use crate::job::{JobRow, JobState};
3use sqlx::PgExecutor;
4
5pub async fn retry<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
7where
8 E: PgExecutor<'e>,
9{
10 sqlx::query_as::<_, JobRow>(
11 r#"
12 UPDATE awa.jobs
13 SET state = 'available', attempt = 0, run_at = now(),
14 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
15 WHERE id = $1 AND state IN ('failed', 'cancelled')
16 RETURNING *
17 "#,
18 )
19 .bind(job_id)
20 .fetch_optional(executor)
21 .await?
22 .ok_or(AwaError::JobNotFound { id: job_id })
23 .map(Some)
24}
25
26pub async fn cancel<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
28where
29 E: PgExecutor<'e>,
30{
31 sqlx::query_as::<_, JobRow>(
32 r#"
33 UPDATE awa.jobs
34 SET state = 'cancelled', finalized_at = now()
35 WHERE id = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
36 RETURNING *
37 "#,
38 )
39 .bind(job_id)
40 .fetch_optional(executor)
41 .await?
42 .ok_or(AwaError::JobNotFound { id: job_id })
43 .map(Some)
44}
45
46pub async fn retry_failed_by_kind<'e, E>(executor: E, kind: &str) -> Result<Vec<JobRow>, AwaError>
48where
49 E: PgExecutor<'e>,
50{
51 let rows = sqlx::query_as::<_, JobRow>(
52 r#"
53 UPDATE awa.jobs
54 SET state = 'available', attempt = 0, run_at = now(),
55 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
56 WHERE kind = $1 AND state = 'failed'
57 RETURNING *
58 "#,
59 )
60 .bind(kind)
61 .fetch_all(executor)
62 .await?;
63
64 Ok(rows)
65}
66
67pub async fn retry_failed_by_queue<'e, E>(executor: E, queue: &str) -> Result<Vec<JobRow>, AwaError>
69where
70 E: PgExecutor<'e>,
71{
72 let rows = sqlx::query_as::<_, JobRow>(
73 r#"
74 UPDATE awa.jobs
75 SET state = 'available', attempt = 0, run_at = now(),
76 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
77 WHERE queue = $1 AND state = 'failed'
78 RETURNING *
79 "#,
80 )
81 .bind(queue)
82 .fetch_all(executor)
83 .await?;
84
85 Ok(rows)
86}
87
88pub async fn discard_failed<'e, E>(executor: E, kind: &str) -> Result<u64, AwaError>
90where
91 E: PgExecutor<'e>,
92{
93 let result = sqlx::query("DELETE FROM awa.jobs WHERE kind = $1 AND state = 'failed'")
94 .bind(kind)
95 .execute(executor)
96 .await?;
97
98 Ok(result.rows_affected())
99}
100
101pub async fn pause_queue<'e, E>(
103 executor: E,
104 queue: &str,
105 paused_by: Option<&str>,
106) -> Result<(), AwaError>
107where
108 E: PgExecutor<'e>,
109{
110 sqlx::query(
111 r#"
112 INSERT INTO awa.queue_meta (queue, paused, paused_at, paused_by)
113 VALUES ($1, TRUE, now(), $2)
114 ON CONFLICT (queue) DO UPDATE SET paused = TRUE, paused_at = now(), paused_by = $2
115 "#,
116 )
117 .bind(queue)
118 .bind(paused_by)
119 .execute(executor)
120 .await?;
121
122 Ok(())
123}
124
125pub async fn resume_queue<'e, E>(executor: E, queue: &str) -> Result<(), AwaError>
127where
128 E: PgExecutor<'e>,
129{
130 sqlx::query("UPDATE awa.queue_meta SET paused = FALSE WHERE queue = $1")
131 .bind(queue)
132 .execute(executor)
133 .await?;
134
135 Ok(())
136}
137
138pub async fn drain_queue<'e, E>(executor: E, queue: &str) -> Result<u64, AwaError>
140where
141 E: PgExecutor<'e>,
142{
143 let result = sqlx::query(
144 r#"
145 UPDATE awa.jobs SET state = 'cancelled', finalized_at = now()
146 WHERE queue = $1 AND state IN ('available', 'scheduled', 'retryable')
147 "#,
148 )
149 .bind(queue)
150 .execute(executor)
151 .await?;
152
153 Ok(result.rows_affected())
154}
155
156#[derive(Debug, Clone)]
158pub struct QueueStats {
159 pub queue: String,
160 pub available: i64,
161 pub running: i64,
162 pub failed: i64,
163 pub completed_last_hour: i64,
164 pub lag_seconds: Option<f64>,
165}
166
167pub async fn queue_stats<'e, E>(executor: E) -> Result<Vec<QueueStats>, AwaError>
169where
170 E: PgExecutor<'e>,
171{
172 let rows = sqlx::query_as::<_, (String, i64, i64, i64, i64, Option<f64>)>(
173 r#"
174 SELECT
175 queue,
176 count(*) FILTER (WHERE state = 'available') AS available,
177 count(*) FILTER (WHERE state = 'running') AS running,
178 count(*) FILTER (WHERE state = 'failed') AS failed,
179 count(*) FILTER (WHERE state = 'completed'
180 AND finalized_at > now() - interval '1 hour') AS completed_last_hour,
181 EXTRACT(EPOCH FROM (now() - min(run_at) FILTER (WHERE state = 'available')))::float8 AS lag_seconds
182 FROM awa.jobs
183 GROUP BY queue
184 "#,
185 )
186 .fetch_all(executor)
187 .await?;
188
189 Ok(rows
190 .into_iter()
191 .map(
192 |(queue, available, running, failed, completed_last_hour, lag_seconds)| QueueStats {
193 queue,
194 available,
195 running,
196 failed,
197 completed_last_hour,
198 lag_seconds,
199 },
200 )
201 .collect())
202}
203
204#[derive(Debug, Clone, Default)]
206pub struct ListJobsFilter {
207 pub state: Option<JobState>,
208 pub kind: Option<String>,
209 pub queue: Option<String>,
210 pub limit: Option<i64>,
211}
212
213pub async fn list_jobs<'e, E>(executor: E, filter: &ListJobsFilter) -> Result<Vec<JobRow>, AwaError>
215where
216 E: PgExecutor<'e>,
217{
218 let limit = filter.limit.unwrap_or(100);
219
220 let rows = sqlx::query_as::<_, JobRow>(
221 r#"
222 SELECT * FROM awa.jobs
223 WHERE ($1::awa.job_state IS NULL OR state = $1)
224 AND ($2::text IS NULL OR kind = $2)
225 AND ($3::text IS NULL OR queue = $3)
226 ORDER BY id DESC
227 LIMIT $4
228 "#,
229 )
230 .bind(filter.state)
231 .bind(&filter.kind)
232 .bind(&filter.queue)
233 .bind(limit)
234 .fetch_all(executor)
235 .await?;
236
237 Ok(rows)
238}
239
240pub async fn get_job<'e, E>(executor: E, job_id: i64) -> Result<JobRow, AwaError>
242where
243 E: PgExecutor<'e>,
244{
245 let row = sqlx::query_as::<_, JobRow>("SELECT * FROM awa.jobs WHERE id = $1")
246 .bind(job_id)
247 .fetch_optional(executor)
248 .await?;
249
250 row.ok_or(AwaError::JobNotFound { id: job_id })
251}