1use crate::{get_pool, models::FailedJob, DBType, Error, Job};
2use sqlx::{Any, AnyPool, Connection};
3use std::time::{Duration, SystemTime, UNIX_EPOCH};
4use uuid::Uuid;
5
6#[derive(Debug, Clone, Default)]
7pub struct DispatchOptions {
8 pub queue: Option<String>,
9 pub delay: Option<Duration>,
10}
11
12#[derive(Debug, Clone)]
13pub struct Client {
14 pool: AnyPool,
15 db_type: DBType,
16}
17
18impl Client {
19 pub fn builder() -> ClientBuilder {
20 ClientBuilder::new()
21 }
22
23 pub async fn dispatch(&self, job: &impl Job) -> Result<(), Error> {
24 let queue = job.queue();
25
26 self.dispatch_on_queue(job, &queue).await
27 }
28
29 pub async fn dispatch_on_queue(&self, job: &impl Job, queue: &str) -> Result<(), Error> {
30 let options = DispatchOptions {
31 queue: Some(queue.to_string()),
32 ..Default::default()
33 };
34
35 self.custom_dispatch(job, &options).await
36 }
37
38 pub async fn custom_dispatch(
39 &self,
40 job: &impl Job,
41 options: &DispatchOptions,
42 ) -> Result<(), Error> {
43 let mut conn = self.pool.clone().acquire().await?;
44 let payload = serde_json::to_string(job as &dyn Job).map_err(Error::SerdeError)?;
45 let time = SystemTime::now()
46 .duration_since(UNIX_EPOCH)
47 .map_err(|_| Error::Unknown)?
48 .as_secs();
49
50 let job_id = Uuid::new_v4().to_string();
51 let queue = options.queue.clone().unwrap_or_else(|| job.queue());
52
53 sqlx::query(&format!(
54 "INSERT INTO jobs (uuid, queue, payload, attempts, available_at, created_at) VALUES {}",
55 match self.db_type {
56 DBType::Mysql => "(?, ?, ?, ?, ?, ?)",
57 DBType::Postgres => "($1, $2, $3, $4, $5, $6)",
58 }
59 ))
60 .bind(job_id)
61 .bind(queue)
62 .bind(payload)
63 .bind(0)
64 .bind(
65 (time
66 + options
67 .delay
68 .unwrap_or_else(|| Duration::from_secs(0))
69 .as_secs()) as i64,
70 )
71 .bind(time as i64)
72 .execute(&mut *conn)
73 .await
74 .map_err(Error::DatabaseError)?;
75
76 conn.close().await?;
77
78 Ok(())
79 }
80
81 pub async fn retry_failed_job(&self, job_id: &str) -> Result<(), Error> {
82 let mut pool = self.pool.acquire().await?;
83 let mut conn = pool.begin().await?;
84
85 let failed_job = sqlx::query_as::<Any, FailedJob>(&format!(
86 "SELECT id, uuid, queue, payload FROM failed_jobs WHERE uuid = {}",
87 match self.db_type {
88 DBType::Mysql => "?",
89 DBType::Postgres => "$1",
90 }
91 ))
92 .bind(job_id)
93 .fetch_one(&mut *conn)
94 .await
95 .map_err(Error::DatabaseError)?;
96
97 let time = SystemTime::now()
98 .duration_since(UNIX_EPOCH)
99 .map_err(|_| Error::Unknown)?
100 .as_secs() as i64;
101
102 sqlx::query(&format!(
103 "INSERT INTO jobs (uuid, queue, payload, attempts, available_at, created_at) VALUES {}",
104 match self.db_type {
105 DBType::Mysql => "(?, ?, ?, ?, ?, ?)",
106 DBType::Postgres => "($1, $2, $3, $4, $5, $6)",
107 }
108 ))
109 .bind(job_id)
110 .bind(failed_job.queue)
111 .bind(failed_job.payload.0.to_string())
112 .bind(0)
113 .bind(time)
114 .bind(time)
115 .execute(&mut *conn)
116 .await
117 .map_err(Error::DatabaseError)?;
118
119 sqlx::query(&format!(
120 "DELETE FROM failed_jobs WHERE uuid = {}",
121 match self.db_type {
122 DBType::Mysql => "?",
123 DBType::Postgres => "$1",
124 }
125 ))
126 .bind(job_id)
127 .execute(&mut *conn)
128 .await
129 .map_err(Error::DatabaseError)?;
130
131 conn.commit().await?;
132 pool.close().await?;
133
134 Ok(())
135 }
136
137 pub async fn retry_all_failed_jobs(&self) -> Result<(), Error> {
138 let mut pool = self.pool.acquire().await?;
139 let mut conn = pool.begin().await?;
140
141 let failed_jobs = sqlx::query_as::<Any, (String,)>("SELECT uuid FROM failed_jobs")
142 .fetch_all(&mut *conn)
143 .await
144 .map_err(Error::DatabaseError)?;
145
146 conn.commit().await?;
147 pool.close().await?;
148
149 for failed_job in failed_jobs {
150 self.retry_failed_job(&failed_job.0).await?;
151 }
152
153 Ok(())
154 }
155
156 pub async fn delete_failed_job(&self, job_id: &str) -> Result<(), Error> {
157 let mut conn = self.pool.clone().acquire().await?;
158
159 sqlx::query(&format!(
160 "DELETE FROM failed_jobs WHERE uuid = {}",
161 match self.db_type {
162 DBType::Mysql => "?",
163 DBType::Postgres => "$1",
164 }
165 ))
166 .bind(job_id)
167 .execute(&mut *conn)
168 .await
169 .map_err(Error::DatabaseError)?;
170
171 conn.close().await?;
172
173 Ok(())
174 }
175
176 pub async fn delete_all_failed_jobs(&self) -> Result<(), Error> {
177 let mut conn = self.pool.clone().acquire().await?;
178
179 sqlx::query("DELETE FROM failed_jobs")
180 .execute(&mut *conn)
181 .await
182 .map_err(Error::DatabaseError)?;
183
184 conn.close().await?;
185
186 Ok(())
187 }
188
189 pub async fn delete_job(&self, job_id: &str) -> Result<(), Error> {
190 let mut conn = self.pool.clone().acquire().await?;
191
192 sqlx::query(&format!(
193 "DELETE FROM jobs WHERE uuid = {}",
194 match self.db_type {
195 DBType::Mysql => "?",
196 DBType::Postgres => "$1",
197 }
198 ))
199 .bind(job_id)
200 .execute(&mut *conn)
201 .await
202 .map_err(Error::DatabaseError)?;
203
204 conn.close().await?;
205
206 Ok(())
207 }
208
209 pub async fn delete_all_jobs(&self) -> Result<(), Error> {
210 let mut conn = self.pool.clone().acquire().await?;
211
212 sqlx::query("DELETE FROM jobs")
213 .execute(&mut *conn)
214 .await
215 .map_err(Error::DatabaseError)?;
216
217 conn.close().await?;
218
219 Ok(())
220 }
221}
222
223#[derive(Debug, Default)]
224pub struct ClientBuilder {
225 max_connections: u32,
226 min_connections: u32,
227}
228
229impl ClientBuilder {
230 pub fn new() -> Self {
231 Self {
232 max_connections: 10,
233 min_connections: 0,
234 }
235 }
236
237 pub fn max_connections(mut self, max_connections: u32) -> Self {
238 self.max_connections = max_connections;
239 self
240 }
241
242 pub fn min_connections(mut self, min_connections: u32) -> Self {
243 self.min_connections = min_connections;
244 self
245 }
246
247 pub async fn connect(self, database_url: &str) -> Result<Client, Error> {
248 let (pool, db_type) = get_pool(
249 database_url,
250 crate::PoolOptions {
251 max_connections: self.max_connections,
252 min_connections: self.min_connections,
253 },
254 )
255 .await?;
256
257 let client = Client { db_type, pool };
258
259 Ok(client)
260 }
261}