1#![warn(
2 missing_debug_implementations,
3 missing_docs,
4 rust_2018_idioms,
5 unreachable_pub
6)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8
9use std::{num::TryFromIntError, time::Duration};
28
29use apalis_core::{error::Error, request::State, response::Response};
30
31pub mod context;
33pub mod from_row;
35
36#[cfg(feature = "postgres")]
38#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
39pub mod postgres;
40
41#[cfg(feature = "sqlite")]
44#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
45pub mod sqlite;
46
47#[cfg(feature = "mysql")]
49#[cfg_attr(docsrs, doc(cfg(feature = "mysql")))]
50pub mod mysql;
51
52use context::SqlContext;
53pub use sqlx;
55
56#[derive(Debug, Clone)]
58pub struct Config {
59 keep_alive: Duration,
60 buffer_size: usize,
61 poll_interval: Duration,
62 reenqueue_orphaned_after: Duration,
63 namespace: String,
64}
65
66#[derive(Debug, thiserror::Error)]
68pub enum SqlError {
69 #[error("sqlx::Error: {0}")]
71 Sqlx(#[from] sqlx::Error),
72 #[error("TryFromIntError: {0}")]
74 TryFromInt(#[from] TryFromIntError),
75}
76
77impl Default for Config {
78 fn default() -> Self {
79 Self {
80 keep_alive: Duration::from_secs(30),
81 buffer_size: 10,
82 poll_interval: Duration::from_millis(100),
83 reenqueue_orphaned_after: Duration::from_secs(300), namespace: String::from("apalis::sql"),
85 }
86 }
87}
88
89impl Config {
90 pub fn new(namespace: &str) -> Self {
92 Config::default().set_namespace(namespace)
93 }
94
95 pub fn set_poll_interval(mut self, interval: Duration) -> Self {
99 self.poll_interval = interval;
100 self
101 }
102
103 pub fn set_keep_alive(mut self, keep_alive: Duration) -> Self {
107 self.keep_alive = keep_alive;
108 self
109 }
110
111 pub fn set_buffer_size(mut self, buffer_size: usize) -> Self {
115 self.buffer_size = buffer_size;
116 self
117 }
118
119 pub fn set_namespace(mut self, namespace: &str) -> Self {
123 self.namespace = namespace.to_string();
124 self
125 }
126
127 pub fn keep_alive(&self) -> &Duration {
129 &self.keep_alive
130 }
131
132 pub fn keep_alive_mut(&mut self) -> &mut Duration {
134 &mut self.keep_alive
135 }
136
137 pub fn buffer_size(&self) -> usize {
139 self.buffer_size
140 }
141
142 pub fn poll_interval(&self) -> &Duration {
144 &self.poll_interval
145 }
146
147 pub fn poll_interval_mut(&mut self) -> &mut Duration {
149 &mut self.poll_interval
150 }
151
152 pub fn namespace(&self) -> &String {
154 &self.namespace
155 }
156
157 pub fn namespace_mut(&mut self) -> &mut String {
159 &mut self.namespace
160 }
161
162 pub fn reenqueue_orphaned_after(&self) -> Duration {
164 self.reenqueue_orphaned_after
165 }
166
167 pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
169 &mut self.reenqueue_orphaned_after
170 }
171
172 pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
177 self.reenqueue_orphaned_after = after;
178 self
179 }
180}
181
182pub fn calculate_status<Res>(ctx: &SqlContext, res: &Response<Res>) -> State {
184 match &res.inner {
185 Ok(_) => State::Done,
186 Err(e) => match &e {
187 Error::Abort(_) => State::Killed,
188 Error::Failed(_) if ctx.max_attempts() as usize <= res.attempt.current() => {
189 State::Killed
190 }
191 _ => State::Failed,
192 },
193 }
194}
195
196#[macro_export]
198macro_rules! sql_storage_tests {
199 ($setup:path, $storage_type:ty, $job_type:ty) => {
200 type WrappedStorage = TestWrapper<$storage_type, Request<$job_type, SqlContext>, ()>;
201
202 async fn setup_test_wrapper() -> WrappedStorage {
203 let (mut t, poller) = TestWrapper::new_with_service(
204 $setup().await,
205 apalis_core::service_fn::service_fn(email_service::send_email),
206 );
207 tokio::spawn(poller);
208 t.vacuum().await.unwrap();
209 t
210 }
211
212 async fn push_email_priority(
213 storage: &mut WrappedStorage,
214 email: Email,
215 priority: i32,
216 ) -> TaskId {
217 let mut ctx = SqlContext::new();
218 ctx.set_priority(priority);
219 storage
220 .push_request(Request::new_with_ctx(email, ctx))
221 .await
222 .expect("failed to push a job")
223 .task_id
224 }
225
226 #[tokio::test]
227 async fn integration_test_kill_job() {
228 let mut storage = setup_test_wrapper().await;
229
230 storage
231 .push(email_service::example_killed_email())
232 .await
233 .unwrap();
234
235 let (job_id, res) = storage.execute_next().await.unwrap();
236 assert_eq!(res, Err("AbortError: Invalid character.".to_owned()));
237 apalis_core::sleep(Duration::from_secs(1)).await;
238 let job = storage
239 .fetch_by_id(&job_id)
240 .await
241 .unwrap()
242 .expect("No job found");
243 let ctx = job.parts.context;
244 assert_eq!(*ctx.status(), State::Killed);
245 assert_eq!(
247 ctx.last_error().clone().unwrap(),
248 "{\"Err\":\"AbortError: Invalid character.\"}"
249 );
250 }
251
252 #[tokio::test]
253 async fn integration_test_update_job() {
254 let mut storage = setup_test_wrapper().await;
255
256 let task_id = storage
257 .push(Email {
258 subject: "Test Subject".to_string(),
259 to: "example@sql".to_string(),
260 text: "Some Text".to_string(),
261 })
262 .await
263 .expect("failed to push a job")
264 .task_id;
265
266 let mut job = get_job(&mut storage, &task_id).await;
267 job.parts.context.set_status(State::Killed);
268 storage.update(job).await.expect("updating to succeed");
269
270 let job = get_job(&mut storage, &task_id).await;
271 let ctx = job.parts.context;
272 assert_eq!(*ctx.status(), State::Killed);
273 }
274
275 #[tokio::test]
276 async fn integration_test_acknowledge_good_job() {
277 let mut storage = setup_test_wrapper().await;
278 storage
279 .push(email_service::example_good_email())
280 .await
281 .unwrap();
282
283 let (job_id, res) = storage.execute_next().await.unwrap();
284 assert_eq!(res, Ok("()".to_owned()));
285 apalis_core::sleep(Duration::from_secs(1)).await;
286 let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
287 let ctx = job.parts.context;
288 assert_eq!(*ctx.status(), State::Done);
289 assert!(ctx.done_at().is_some());
290 }
291
292 #[tokio::test]
293 async fn integration_test_acknowledge_failed_job() {
294 let mut storage = setup_test_wrapper().await;
295
296 storage
297 .push(email_service::example_retry_able_email())
298 .await
299 .unwrap();
300
301 let (job_id, res) = storage.execute_next().await.unwrap();
302 assert_eq!(
303 res,
304 Err("FailedError: Missing separator character '@'.".to_owned())
305 );
306 apalis_core::sleep(Duration::from_secs(1)).await;
307 let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
308 let ctx = job.parts.context;
309 assert_eq!(*ctx.status(), State::Failed);
310 assert!(job.parts.attempt.current() >= 1);
311 assert_eq!(
312 ctx.last_error().clone().unwrap(),
313 "{\"Err\":\"FailedError: Missing separator character '@'.\"}"
314 );
315 }
316
317 #[tokio::test]
318 async fn worker_consume() {
319 use apalis_core::builder::WorkerBuilder;
320 use apalis_core::builder::WorkerFactoryFn;
321 let storage = $setup().await;
322 let mut handle = storage.clone();
323
324 let parts = handle
325 .push(email_service::example_good_email())
326 .await
327 .unwrap();
328
329 async fn task(_job: Email) -> &'static str {
330 tokio::time::sleep(Duration::from_millis(100)).await;
331 "Job well done"
332 }
333 let worker = WorkerBuilder::new("rango-tango").backend(storage);
334 let worker = worker.build_fn(task);
335 let wkr = worker.run();
336
337 let w = wkr.get_handle();
338
339 let runner = async move {
340 apalis_core::sleep(Duration::from_secs(3)).await;
341 let job_id = &parts.task_id;
342 let job = get_job(&mut handle, job_id).await;
343 let ctx = job.parts.context;
344
345 assert_eq!(*ctx.status(), State::Done);
346 assert!(ctx.done_at().is_some());
347 assert!(ctx.lock_by().is_some());
348 assert!(ctx.lock_at().is_some());
349 assert!(ctx.last_error().is_some()); w.stop();
352 };
353
354 tokio::join!(runner, wkr);
355 }
356
357 #[tokio::test]
358 async fn test_consume_jobs_with_priority() {
359 let mut storage = setup_test_wrapper().await;
360
361 let job2 =
364 push_email_priority(&mut storage, email_service::example_good_email(), 5).await;
365 let job1 =
366 push_email_priority(&mut storage, email_service::example_good_email(), 10).await;
367 let job4 =
368 push_email_priority(&mut storage, email_service::example_good_email(), -1).await;
369 let job3 =
370 push_email_priority(&mut storage, email_service::example_good_email(), 1).await;
371
372 for (job_id, prio) in &[(job1, 10), (job2, 5), (job3, 1), (job4, -1)] {
373 let (exec_job_id, res) = storage.execute_next().await.unwrap();
374 assert_eq!(job_id, &exec_job_id);
375 assert_eq!(res, Ok("()".to_owned()));
376 apalis_core::sleep(Duration::from_millis(500)).await;
377 let job = storage.fetch_by_id(&exec_job_id).await.unwrap().unwrap();
378 let ctx = job.parts.context;
379
380 assert_eq!(*ctx.status(), State::Done);
381 assert_eq!(ctx.priority(), prio);
382 }
383 }
384
385 #[tokio::test]
386 async fn test_schedule_request() {
387 use chrono::SubsecRound;
388
389 let mut storage = $setup().await;
390
391 let email = Email {
392 subject: "Scheduled Email".to_string(),
393 to: "scheduled@example.com".to_string(),
394 text: "This is a scheduled email.".to_string(),
395 };
396
397 let schedule_time = Utc::now() + Duration::from_secs(60);
399
400 let parts = storage
401 .schedule(email, schedule_time.timestamp())
402 .await
403 .expect("Failed to schedule request");
404
405 let job = get_job(&mut storage, &parts.task_id).await;
406 let ctx = &job.parts.context;
407
408 assert_eq!(*ctx.status(), State::Pending);
409 assert!(ctx.lock_by().is_none());
410 assert!(ctx.lock_at().is_none());
411 let expected_schedule_time = schedule_time.clone().trunc_subsecs(0);
412 assert_eq!(ctx.run_at(), &expected_schedule_time);
413 }
414
415 #[tokio::test]
416 async fn test_backend_expose_succeeds() {
417 let storage = setup_test_wrapper().await;
418
419 assert!(storage.stats().await.is_ok());
420 assert!(storage.list_jobs(&State::Pending, 1).await.is_ok());
421 assert!(storage.list_workers().await.is_ok());
422 }
423
424 #[tokio::test]
425 async fn integration_test_shedule_and_run_job() {
426 let current = Utc::now();
427 let dur = Duration::from_secs(15);
428 let schedule_time = current + dur;
429 let mut storage = setup_test_wrapper().await;
430 storage
431 .schedule(
432 email_service::example_good_email(),
433 schedule_time.timestamp(),
434 )
435 .await
436 .unwrap();
437
438 let (job_id, res) = storage.execute_next().await.unwrap();
439 apalis_core::sleep(Duration::from_secs(1)).await;
440
441 assert_eq!(res, Ok("()".to_owned()));
442 let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
443 let ctx = job.parts.context;
444 assert_eq!(*ctx.status(), State::Done);
445 assert!(
446 ctx.lock_at().unwrap() >= schedule_time.timestamp(),
447 "{} should be greater than {}",
448 ctx.lock_at().unwrap(),
449 schedule_time.timestamp()
450 );
451 }
452 };
453}