Skip to main content

awa_model/
admin.rs

1use crate::error::AwaError;
2use crate::job::{JobRow, JobState};
3use sqlx::PgExecutor;
4
5/// Retry a single failed or cancelled job.
6pub async fn retry<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
7where
8    E: PgExecutor<'e>,
9{
10    sqlx::query_as::<_, JobRow>(
11        r#"
12        UPDATE awa.jobs
13        SET state = 'available', attempt = 0, run_at = now(),
14            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
15        WHERE id = $1 AND state IN ('failed', 'cancelled')
16        RETURNING *
17        "#,
18    )
19    .bind(job_id)
20    .fetch_optional(executor)
21    .await?
22    .ok_or(AwaError::JobNotFound { id: job_id })
23    .map(Some)
24}
25
26/// Cancel a single non-terminal job.
27pub async fn cancel<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
28where
29    E: PgExecutor<'e>,
30{
31    sqlx::query_as::<_, JobRow>(
32        r#"
33        UPDATE awa.jobs
34        SET state = 'cancelled', finalized_at = now()
35        WHERE id = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
36        RETURNING *
37        "#,
38    )
39    .bind(job_id)
40    .fetch_optional(executor)
41    .await?
42    .ok_or(AwaError::JobNotFound { id: job_id })
43    .map(Some)
44}
45
46/// Retry all failed jobs of a given kind.
47pub async fn retry_failed_by_kind<'e, E>(executor: E, kind: &str) -> Result<Vec<JobRow>, AwaError>
48where
49    E: PgExecutor<'e>,
50{
51    let rows = sqlx::query_as::<_, JobRow>(
52        r#"
53        UPDATE awa.jobs
54        SET state = 'available', attempt = 0, run_at = now(),
55            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
56        WHERE kind = $1 AND state = 'failed'
57        RETURNING *
58        "#,
59    )
60    .bind(kind)
61    .fetch_all(executor)
62    .await?;
63
64    Ok(rows)
65}
66
67/// Retry all failed jobs in a given queue.
68pub async fn retry_failed_by_queue<'e, E>(executor: E, queue: &str) -> Result<Vec<JobRow>, AwaError>
69where
70    E: PgExecutor<'e>,
71{
72    let rows = sqlx::query_as::<_, JobRow>(
73        r#"
74        UPDATE awa.jobs
75        SET state = 'available', attempt = 0, run_at = now(),
76            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
77        WHERE queue = $1 AND state = 'failed'
78        RETURNING *
79        "#,
80    )
81    .bind(queue)
82    .fetch_all(executor)
83    .await?;
84
85    Ok(rows)
86}
87
88/// Discard (delete) all failed jobs of a given kind.
89pub async fn discard_failed<'e, E>(executor: E, kind: &str) -> Result<u64, AwaError>
90where
91    E: PgExecutor<'e>,
92{
93    let result = sqlx::query("DELETE FROM awa.jobs WHERE kind = $1 AND state = 'failed'")
94        .bind(kind)
95        .execute(executor)
96        .await?;
97
98    Ok(result.rows_affected())
99}
100
101/// Pause a queue. Affects all workers immediately.
102pub async fn pause_queue<'e, E>(
103    executor: E,
104    queue: &str,
105    paused_by: Option<&str>,
106) -> Result<(), AwaError>
107where
108    E: PgExecutor<'e>,
109{
110    sqlx::query(
111        r#"
112        INSERT INTO awa.queue_meta (queue, paused, paused_at, paused_by)
113        VALUES ($1, TRUE, now(), $2)
114        ON CONFLICT (queue) DO UPDATE SET paused = TRUE, paused_at = now(), paused_by = $2
115        "#,
116    )
117    .bind(queue)
118    .bind(paused_by)
119    .execute(executor)
120    .await?;
121
122    Ok(())
123}
124
125/// Resume a paused queue.
126pub async fn resume_queue<'e, E>(executor: E, queue: &str) -> Result<(), AwaError>
127where
128    E: PgExecutor<'e>,
129{
130    sqlx::query("UPDATE awa.queue_meta SET paused = FALSE WHERE queue = $1")
131        .bind(queue)
132        .execute(executor)
133        .await?;
134
135    Ok(())
136}
137
138/// Drain a queue: cancel all non-running, non-terminal jobs.
139pub async fn drain_queue<'e, E>(executor: E, queue: &str) -> Result<u64, AwaError>
140where
141    E: PgExecutor<'e>,
142{
143    let result = sqlx::query(
144        r#"
145        UPDATE awa.jobs SET state = 'cancelled', finalized_at = now()
146        WHERE queue = $1 AND state IN ('available', 'scheduled', 'retryable')
147        "#,
148    )
149    .bind(queue)
150    .execute(executor)
151    .await?;
152
153    Ok(result.rows_affected())
154}
155
156/// Queue statistics.
157#[derive(Debug, Clone)]
158pub struct QueueStats {
159    pub queue: String,
160    pub available: i64,
161    pub running: i64,
162    pub failed: i64,
163    pub completed_last_hour: i64,
164    pub lag_seconds: Option<f64>,
165}
166
167/// Get statistics for all queues.
168pub async fn queue_stats<'e, E>(executor: E) -> Result<Vec<QueueStats>, AwaError>
169where
170    E: PgExecutor<'e>,
171{
172    let rows = sqlx::query_as::<_, (String, i64, i64, i64, i64, Option<f64>)>(
173        r#"
174        SELECT
175            queue,
176            count(*) FILTER (WHERE state = 'available') AS available,
177            count(*) FILTER (WHERE state = 'running') AS running,
178            count(*) FILTER (WHERE state = 'failed') AS failed,
179            count(*) FILTER (WHERE state = 'completed'
180                AND finalized_at > now() - interval '1 hour') AS completed_last_hour,
181            EXTRACT(EPOCH FROM (now() - min(run_at) FILTER (WHERE state = 'available')))::float8 AS lag_seconds
182        FROM awa.jobs
183        GROUP BY queue
184        "#,
185    )
186    .fetch_all(executor)
187    .await?;
188
189    Ok(rows
190        .into_iter()
191        .map(
192            |(queue, available, running, failed, completed_last_hour, lag_seconds)| QueueStats {
193                queue,
194                available,
195                running,
196                failed,
197                completed_last_hour,
198                lag_seconds,
199            },
200        )
201        .collect())
202}
203
204/// List jobs with optional filters.
205#[derive(Debug, Clone, Default)]
206pub struct ListJobsFilter {
207    pub state: Option<JobState>,
208    pub kind: Option<String>,
209    pub queue: Option<String>,
210    pub limit: Option<i64>,
211}
212
213/// List jobs matching the given filter.
214pub async fn list_jobs<'e, E>(executor: E, filter: &ListJobsFilter) -> Result<Vec<JobRow>, AwaError>
215where
216    E: PgExecutor<'e>,
217{
218    let limit = filter.limit.unwrap_or(100);
219
220    let rows = sqlx::query_as::<_, JobRow>(
221        r#"
222        SELECT * FROM awa.jobs
223        WHERE ($1::awa.job_state IS NULL OR state = $1)
224          AND ($2::text IS NULL OR kind = $2)
225          AND ($3::text IS NULL OR queue = $3)
226        ORDER BY id DESC
227        LIMIT $4
228        "#,
229    )
230    .bind(filter.state)
231    .bind(&filter.kind)
232    .bind(&filter.queue)
233    .bind(limit)
234    .fetch_all(executor)
235    .await?;
236
237    Ok(rows)
238}
239
240/// Get a single job by ID.
241pub async fn get_job<'e, E>(executor: E, job_id: i64) -> Result<JobRow, AwaError>
242where
243    E: PgExecutor<'e>,
244{
245    let row = sqlx::query_as::<_, JobRow>("SELECT * FROM awa.jobs WHERE id = $1")
246        .bind(job_id)
247        .fetch_optional(executor)
248        .await?;
249
250    row.ok_or(AwaError::JobNotFound { id: job_id })
251}