job_queue/
client.rs

1use crate::{get_pool, models::FailedJob, DBType, Error, Job};
2use sqlx::{Any, AnyPool, Connection};
3use std::time::{Duration, SystemTime, UNIX_EPOCH};
4use uuid::Uuid;
5
6#[derive(Debug, Clone, Default)]
7pub struct DispatchOptions {
8    pub queue: Option<String>,
9    pub delay: Option<Duration>,
10}
11
12#[derive(Debug, Clone)]
13pub struct Client {
14    pool: AnyPool,
15    db_type: DBType,
16}
17
18impl Client {
19    pub fn builder() -> ClientBuilder {
20        ClientBuilder::new()
21    }
22
23    pub async fn dispatch(&self, job: &impl Job) -> Result<(), Error> {
24        let queue = job.queue();
25
26        self.dispatch_on_queue(job, &queue).await
27    }
28
29    pub async fn dispatch_on_queue(&self, job: &impl Job, queue: &str) -> Result<(), Error> {
30        let options = DispatchOptions {
31            queue: Some(queue.to_string()),
32            ..Default::default()
33        };
34
35        self.custom_dispatch(job, &options).await
36    }
37
38    pub async fn custom_dispatch(
39        &self,
40        job: &impl Job,
41        options: &DispatchOptions,
42    ) -> Result<(), Error> {
43        let mut conn = self.pool.clone().acquire().await?;
44        let payload = serde_json::to_string(job as &dyn Job).map_err(Error::SerdeError)?;
45        let time = SystemTime::now()
46            .duration_since(UNIX_EPOCH)
47            .map_err(|_| Error::Unknown)?
48            .as_secs();
49
50        let job_id = Uuid::new_v4().to_string();
51        let queue = options.queue.clone().unwrap_or_else(|| job.queue());
52
53        sqlx::query(&format!(
54            "INSERT INTO jobs (uuid, queue, payload, attempts, available_at, created_at) VALUES {}",
55            match self.db_type {
56                DBType::Mysql => "(?, ?, ?, ?, ?, ?)",
57                DBType::Postgres => "($1, $2, $3, $4, $5, $6)",
58            }
59        ))
60        .bind(job_id)
61        .bind(queue)
62        .bind(payload)
63        .bind(0)
64        .bind(
65            (time
66                + options
67                    .delay
68                    .unwrap_or_else(|| Duration::from_secs(0))
69                    .as_secs()) as i64,
70        )
71        .bind(time as i64)
72        .execute(&mut *conn)
73        .await
74        .map_err(Error::DatabaseError)?;
75
76        conn.close().await?;
77
78        Ok(())
79    }
80
81    pub async fn retry_failed_job(&self, job_id: &str) -> Result<(), Error> {
82        let mut pool = self.pool.acquire().await?;
83        let mut conn = pool.begin().await?;
84
85        let failed_job = sqlx::query_as::<Any, FailedJob>(&format!(
86            "SELECT id, uuid, queue, payload FROM failed_jobs WHERE uuid = {}",
87            match self.db_type {
88                DBType::Mysql => "?",
89                DBType::Postgres => "$1",
90            }
91        ))
92        .bind(job_id)
93        .fetch_one(&mut *conn)
94        .await
95        .map_err(Error::DatabaseError)?;
96
97        let time = SystemTime::now()
98            .duration_since(UNIX_EPOCH)
99            .map_err(|_| Error::Unknown)?
100            .as_secs() as i64;
101
102        sqlx::query(&format!(
103            "INSERT INTO jobs (uuid, queue, payload, attempts, available_at, created_at) VALUES {}",
104            match self.db_type {
105                DBType::Mysql => "(?, ?, ?, ?, ?, ?)",
106                DBType::Postgres => "($1, $2, $3, $4, $5, $6)",
107            }
108        ))
109        .bind(job_id)
110        .bind(failed_job.queue)
111        .bind(failed_job.payload.0.to_string())
112        .bind(0)
113        .bind(time)
114        .bind(time)
115        .execute(&mut *conn)
116        .await
117        .map_err(Error::DatabaseError)?;
118
119        sqlx::query(&format!(
120            "DELETE FROM failed_jobs WHERE uuid = {}",
121            match self.db_type {
122                DBType::Mysql => "?",
123                DBType::Postgres => "$1",
124            }
125        ))
126        .bind(job_id)
127        .execute(&mut *conn)
128        .await
129        .map_err(Error::DatabaseError)?;
130
131        conn.commit().await?;
132        pool.close().await?;
133
134        Ok(())
135    }
136
137    pub async fn retry_all_failed_jobs(&self) -> Result<(), Error> {
138        let mut pool = self.pool.acquire().await?;
139        let mut conn = pool.begin().await?;
140
141        let failed_jobs = sqlx::query_as::<Any, (String,)>("SELECT uuid FROM failed_jobs")
142            .fetch_all(&mut *conn)
143            .await
144            .map_err(Error::DatabaseError)?;
145
146        conn.commit().await?;
147        pool.close().await?;
148
149        for failed_job in failed_jobs {
150            self.retry_failed_job(&failed_job.0).await?;
151        }
152
153        Ok(())
154    }
155
156    pub async fn delete_failed_job(&self, job_id: &str) -> Result<(), Error> {
157        let mut conn = self.pool.clone().acquire().await?;
158
159        sqlx::query(&format!(
160            "DELETE FROM failed_jobs WHERE uuid = {}",
161            match self.db_type {
162                DBType::Mysql => "?",
163                DBType::Postgres => "$1",
164            }
165        ))
166        .bind(job_id)
167        .execute(&mut *conn)
168        .await
169        .map_err(Error::DatabaseError)?;
170
171        conn.close().await?;
172
173        Ok(())
174    }
175
176    pub async fn delete_all_failed_jobs(&self) -> Result<(), Error> {
177        let mut conn = self.pool.clone().acquire().await?;
178
179        sqlx::query("DELETE FROM failed_jobs")
180            .execute(&mut *conn)
181            .await
182            .map_err(Error::DatabaseError)?;
183
184        conn.close().await?;
185
186        Ok(())
187    }
188
189    pub async fn delete_job(&self, job_id: &str) -> Result<(), Error> {
190        let mut conn = self.pool.clone().acquire().await?;
191
192        sqlx::query(&format!(
193            "DELETE FROM jobs WHERE uuid = {}",
194            match self.db_type {
195                DBType::Mysql => "?",
196                DBType::Postgres => "$1",
197            }
198        ))
199        .bind(job_id)
200        .execute(&mut *conn)
201        .await
202        .map_err(Error::DatabaseError)?;
203
204        conn.close().await?;
205
206        Ok(())
207    }
208
209    pub async fn delete_all_jobs(&self) -> Result<(), Error> {
210        let mut conn = self.pool.clone().acquire().await?;
211
212        sqlx::query("DELETE FROM jobs")
213            .execute(&mut *conn)
214            .await
215            .map_err(Error::DatabaseError)?;
216
217        conn.close().await?;
218
219        Ok(())
220    }
221}
222
223#[derive(Debug, Default)]
224pub struct ClientBuilder {
225    max_connections: u32,
226    min_connections: u32,
227}
228
229impl ClientBuilder {
230    pub fn new() -> Self {
231        Self {
232            max_connections: 10,
233            min_connections: 0,
234        }
235    }
236
237    pub fn max_connections(mut self, max_connections: u32) -> Self {
238        self.max_connections = max_connections;
239        self
240    }
241
242    pub fn min_connections(mut self, min_connections: u32) -> Self {
243        self.min_connections = min_connections;
244        self
245    }
246
247    pub async fn connect(self, database_url: &str) -> Result<Client, Error> {
248        let (pool, db_type) = get_pool(
249            database_url,
250            crate::PoolOptions {
251                max_connections: self.max_connections,
252                min_connections: self.min_connections,
253            },
254        )
255        .await?;
256
257        let client = Client { db_type, pool };
258
259        Ok(client)
260    }
261}