simple_queue/queue/
job_api.rs1use super::SimpleQueue;
2use crate::*;
3use sqlx;
4use sqlx::error::BoxDynError;
5impl SimpleQueue {
6 pub async fn insert_job(&self, job: Job) -> Result<Option<uuid::Uuid>, BoxDynError> {
9 let id: Option<uuid::Uuid> = sqlx::query_scalar!(
10 r#"
11 INSERT INTO job_queue (
12 id, fingerprint, unique_key, queue, job_data, status, created_at, run_at, updated_at, attempt, max_attempts
13 )
14 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
15 ON CONFLICT (unique_key) WHERE unique_key IS NOT NULL AND status IN ('pending', 'running') DO NOTHING
16 RETURNING id
17 "#,
18
19 job.id,
20 job.fingerprint,
21 job.unique_key,
22 job.queue,
23 job.job_data,
24 job.status,
25 job.created_at,
26 job.run_at,
27 job.updated_at,
28 job.attempt,
29 job.max_attempts
30 )
31 .fetch_optional(&self.pool)
32 .await.inspect_err(|e| tracing::error!("[{}]Failed to insert job {}: {:?}", job.queue, job.id, e))?;
33 Ok(id)
34 }
35
36 pub async fn insert_jobs(&self, jobs: Vec<Job>) -> Result<Vec<uuid::Uuid>, BoxDynError> {
37 let mut qb = sqlx::QueryBuilder::new(
38 "INSERT INTO job_queue (id, fingerprint, unique_key, queue, job_data, status, created_at, run_at, updated_at, attempt, max_attempts) ",
39 );
40 let query = qb
41 .push_values(jobs, |mut qb, job| {
42 qb.push_bind(job.id)
43 .push_bind(job.fingerprint)
44 .push_bind(job.unique_key)
45 .push_bind(job.queue)
46 .push_bind(job.job_data)
47 .push_bind(job.status)
48 .push_bind(job.created_at)
49 .push_bind(job.run_at)
50 .push_bind(job.updated_at)
51 .push_bind(job.attempt)
52 .push_bind(job.max_attempts);
53 })
54 .push("RETURNING id")
55 .build_query_scalar::<uuid::Uuid>();
56
57 let resulting_ids = query.fetch_all(&self.pool).await?;
58 Ok(resulting_ids)
59 }
60
61 pub async fn cancel_job_by_unique_key(&self, unique_key: &str) -> Result<(), BoxDynError> {
62 sqlx::query!(
63 r#"
64 UPDATE job_queue
65 SET status = 'cancelled'
66 WHERE unique_key = $1
67 "#,
68 unique_key
69 )
70 .execute(&self.pool)
71 .await?;
72 Ok(())
73 }
74 pub async fn cancel_all_jobs_by_fingerprint(
75 &self,
76 fingerprint: &str,
77 ) -> Result<(), BoxDynError> {
78 sqlx::query!(
79 r#"
80 UPDATE job_queue
81 SET status = 'cancelled'
82 WHERE fingerprint = $1
83 "#,
84 fingerprint
85 )
86 .execute(&self.pool)
87 .await?;
88 Ok(())
89 }
90}