1#![crate_name = "apalis_core"]
2#![warn(
3 missing_debug_implementations,
4 missing_docs,
5 rust_2018_idioms,
6 unreachable_pub,
7 bad_style,
8 dead_code,
9 improper_ctypes,
10 non_shorthand_field_patterns,
11 no_mangle_generic_items,
12 overflowing_literals,
13 path_statements,
14 patterns_in_fns_without_body,
15 unconditional_recursion,
16 unused,
17 unused_allocation,
18 unused_comparisons,
19 unused_parens,
20 while_true
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23pub mod builder;
27
28pub mod backend;
30pub mod error;
32pub mod layers;
34pub mod monitor;
36pub mod request;
38pub mod response;
40pub mod service_fn;
42pub mod storage;
44pub mod worker;
46
47pub mod data;
49pub mod mq;
51pub mod notify;
53pub mod poller;
55
56pub mod memory;
58
59pub mod task;
61
62pub mod codec;
64
65pub mod step;
67
68#[cfg(feature = "sleep")]
70pub async fn sleep(duration: std::time::Duration) {
71 futures_timer::Delay::new(duration).await;
72}
73
74#[cfg(feature = "sleep")]
75pub mod interval {
77 use std::fmt;
78 use std::future::Future;
79 use std::pin::Pin;
80 use std::task::{Context, Poll};
81 use std::time::Duration;
82
83 use futures::future::BoxFuture;
84 use futures::Stream;
85
86 use crate::sleep;
87 pub fn interval(duration: Duration) -> Interval {
89 Interval {
90 timer: Box::pin(sleep(duration)),
91 interval: duration,
92 }
93 }
94
95 #[must_use = "streams do nothing unless polled or .awaited"]
97 pub struct Interval {
98 timer: BoxFuture<'static, ()>,
99 interval: Duration,
100 }
101
102 impl fmt::Debug for Interval {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 f.debug_struct("Interval")
105 .field("interval", &self.interval)
106 .field("timer", &"a future represented `apalis_core::sleep`")
107 .finish()
108 }
109 }
110
111 impl Stream for Interval {
112 type Item = ();
113
114 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
115 match Pin::new(&mut self.timer).poll(cx) {
116 Poll::Ready(_) => {}
117 Poll::Pending => return Poll::Pending,
118 };
119 let interval = self.interval;
120 let fut = std::mem::replace(&mut self.timer, Box::pin(sleep(interval)));
121 drop(fut);
122 Poll::Ready(Some(()))
123 }
124 }
125}
126
127#[cfg(feature = "test-utils")]
128#[allow(unused)]
129pub mod test_utils {
131 use crate::backend::Backend;
132 use crate::builder::{WorkerBuilder, WorkerFactory};
133 use crate::request::Request;
134 use crate::task::task_id::TaskId;
135 use crate::worker::Worker;
136 use futures::channel::mpsc::{self, channel, Receiver, Sender, TryRecvError};
137 use futures::future::BoxFuture;
138 use futures::stream::{Stream, StreamExt};
139 use futures::{FutureExt, SinkExt};
140 use std::fmt::Debug;
141 use std::future::Future;
142 use std::marker::PhantomData;
143 use std::ops::{Deref, DerefMut};
144 use std::pin::Pin;
145 use std::sync::atomic::{AtomicBool, Ordering};
146 use std::sync::Arc;
147 use std::task::{Context, Poll};
148 use tower::{Layer, Service, ServiceBuilder};
149
150 #[derive(Debug, Clone)]
152 pub struct DummyService;
153
154 impl<Request: Send + 'static> Service<Request> for DummyService {
155 type Response = Request;
156 type Error = std::convert::Infallible;
157 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
158
159 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
160 Poll::Ready(Ok(()))
161 }
162
163 fn call(&mut self, req: Request) -> Self::Future {
164 let fut = async move { Ok(req) };
165 Box::pin(fut)
166 }
167 }
168
169 #[derive(Debug)]
171 pub struct TestWrapper<B, Req, Res> {
172 stop_tx: Sender<()>,
173 res_rx: Receiver<(TaskId, Result<String, String>)>,
174 _p: PhantomData<Req>,
175 _r: PhantomData<Res>,
176 should_next: Arc<AtomicBool>,
177 pub backend: B,
179 pub worker: Worker<crate::worker::Context>,
181 }
182 impl<B, Req, Res, Ctx> TestWrapper<B, Request<Req, Ctx>, Res>
216 where
217 B: Backend<Request<Req, Ctx>> + Send + Sync + 'static + Clone,
218 Req: Send + 'static,
219 Ctx: Send,
220 B::Stream: Send + 'static,
221 B::Stream: Stream<Item = Result<Option<Request<Req, Ctx>>, crate::error::Error>> + Unpin,
222 Res: Debug,
223 {
224 pub fn new_with_service<Svc>(backend: B, service: Svc) -> (Self, BoxFuture<'static, ()>)
226 where
227 Svc::Future: Send,
228 Svc: Send + Sync + Service<Request<Req, Ctx>, Response = Res> + 'static,
229 Req: Send + Sync + 'static,
230 Svc::Response: Send + Sync + 'static,
231 Svc::Error: Send + Sync + std::error::Error,
232 Ctx: Send + Sync + 'static,
233 B: Backend<Request<Req, Ctx>> + 'static,
234 B::Layer: Layer<Svc>,
235 <B::Layer as Layer<Svc>>::Service: Service<Request<Req, Ctx>, Response = Res> + Send + Sync,
236 B::Stream: Unpin + Send,
237 Res: Send,
238 <<B::Layer as Layer<Svc>>::Service as Service<Request<Req, Ctx>>>::Future: Send,
239 <<B::Layer as Layer<Svc>>::Service as Service<Request<Req, Ctx>>>::Error:
240 Send + Sync + std::error::Error,
241 B::Layer: Layer<TestEmitService<Svc>>,
242 <B::Layer as Layer<TestEmitService<Svc>>>::Service:
243 Service<Request<Req, Ctx>, Response = Res> + Send + Sync,
244 <<B::Layer as Layer<TestEmitService<Svc>>>::Service as Service<Request<Req, Ctx>>>::Future:
245 Send,
246 <<B::Layer as Layer<TestEmitService<Svc>>>::Service as Service<Request<Req, Ctx>>>::Error:
247 Send + Sync + Sync + std::error::Error,
248 {
249 let (mut res_tx, res_rx) = channel(10);
250 let should_next = Arc::new(AtomicBool::new(false));
251 let service = ServiceBuilder::new()
252 .layer_fn(|service| TestEmitService {
253 service,
254 tx: res_tx.clone(),
255 should_next: should_next.clone(),
256 })
257 .service(service);
258 let worker = WorkerBuilder::new("test-worker")
259 .backend(backend.clone())
260 .build(service)
261 .run();
262 let handle = worker.get_handle();
263 let (stop_tx, mut stop_rx) = channel::<()>(1);
264
265 let poller = async move {
266 let worker = worker.shared();
267 loop {
268 futures::select! {
269 _ = stop_rx.next().fuse() => break,
270 _ = worker.clone().fuse() => {
271
272 },
273 }
274 }
275 res_tx.close_channel();
276 };
277 (
278 TestWrapper {
279 stop_tx,
280 res_rx,
281 _p: PhantomData,
282 backend,
283 _r: PhantomData,
284 should_next,
285 worker: handle,
286 },
287 poller.boxed(),
288 )
289 }
290
291 pub fn stop(mut self) {
293 self.stop_tx.try_send(()).unwrap();
294 }
295
296 pub async fn execute_next(&mut self) -> Option<(TaskId, Result<String, String>)> {
298 self.should_next.store(true, Ordering::Release);
299 self.res_rx.next().await
300 }
301
302 pub fn try_execute_next(
304 &mut self,
305 ) -> Result<Option<(TaskId, Result<String, String>)>, TryRecvError> {
306 self.should_next.store(true, Ordering::Release);
307 self.res_rx.try_next().map(|res| {
308 self.should_next.store(false, Ordering::Release);
309 res
310 })
311 }
312 }
313
314 impl<B, Req, Res, Ctx> Deref for TestWrapper<B, Request<Req, Ctx>, Res>
315 where
316 B: Backend<Request<Req, Ctx>>,
317 {
318 type Target = B;
319
320 fn deref(&self) -> &Self::Target {
321 &self.backend
322 }
323 }
324
325 impl<B, Req, Ctx, Res> DerefMut for TestWrapper<B, Request<Req, Ctx>, Res>
326 where
327 B: Backend<Request<Req, Ctx>>,
328 {
329 fn deref_mut(&mut self) -> &mut Self::Target {
330 &mut self.backend
331 }
332 }
333
334 #[derive(Debug, Clone)]
336 pub struct TestEmitService<S> {
337 tx: mpsc::Sender<(TaskId, Result<String, String>)>,
338 service: S,
339 should_next: Arc<AtomicBool>,
340 }
341
342 impl<S, Req, Ctx> Service<Request<Req, Ctx>> for TestEmitService<S>
343 where
344 S: Service<Request<Req, Ctx>> + Send + 'static,
345 S::Future: Send + 'static,
346 Req: Send + 'static,
347 S::Response: Debug + Send + Sync,
348 S::Error: std::error::Error + Send,
349 Ctx: Send + 'static,
350 {
351 type Response = S::Response;
352 type Error = S::Error;
353 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
354
355 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
356 if self.should_next.load(Ordering::Relaxed) {
357 self.service.poll_ready(cx)
358 } else {
359 Poll::Pending
360 }
361 }
362
363 fn call(&mut self, req: Request<Req, Ctx>) -> Self::Future {
364 self.should_next.store(false, Ordering::Relaxed);
365 let task_id = req.parts.task_id.clone();
366 let mut tx = Clone::clone(&self.tx);
367 let fut = self.service.call(req);
368 Box::pin(async move {
369 let res = fut.await;
370 match &res {
371 Ok(res) => {
372 tx.send((task_id, Ok(format!("{res:?}")))).await.unwrap();
373 }
374 Err(err) => {
375 tx.send((task_id, Err(err.to_string()))).await.unwrap();
376 }
377 }
378 res
379 })
380 }
381 }
382
383 pub use tower::service_fn as apalis_test_service_fn;
384
385 #[macro_export]
386 macro_rules! test_message_queue {
388 ($backend_instance:expr) => {
389 #[tokio::test]
390 async fn it_works_as_an_mq_backend() {
391 let backend = $backend_instance;
392 let service = apalis_test_service_fn(|request: Request<u32, ()>| async {
393 Ok::<_, io::Error>(request)
394 });
395 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
396 tokio::spawn(poller);
397 t.enqueue(1).await.unwrap();
398 tokio::time::sleep(Duration::from_secs(1)).await;
399 let _res = t.execute_next().await.unwrap();
400 }
402 };
403 }
404 #[macro_export]
405 macro_rules! generic_storage_test {
407 ($setup:path ) => {
408 #[tokio::test]
409 async fn integration_test_storage_push_and_consume() {
410 let backend = $setup().await;
411 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
412 Ok::<_, io::Error>(request.args)
413 });
414 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
415 tokio::spawn(poller);
416 let res = t.len().await.unwrap();
417 assert_eq!(res, 0, "There should be no jobs");
418 t.push(1).await.unwrap();
419 let res = t.len().await.unwrap();
420 assert_eq!(res, 1, "There should be 1 job");
421 let res = t.execute_next().await.unwrap();
422 assert_eq!(res.1, Ok("1".to_owned()));
423
424 apalis_core::sleep(Duration::from_secs(1)).await;
425 let res = t.len().await.unwrap();
426 assert_eq!(res, 0, "There should be no jobs");
427
428 t.vacuum().await.unwrap();
429 }
430 #[tokio::test]
431 async fn integration_test_storage_vacuum() {
432 let backend = $setup().await;
433 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
434 Ok::<_, io::Error>(request.args)
435 });
436 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
437 tokio::spawn(poller);
438 let res = t.len().await.unwrap();
439 assert_eq!(res, 0, "There should be no jobs");
440 t.push(1).await.unwrap();
441 let res = t.len().await.unwrap();
442 assert_eq!(res, 1, "A job exists");
443 let res = t.execute_next().await.unwrap();
444 assert_eq!(res.1, Ok("1".to_owned()));
445 apalis_core::sleep(Duration::from_secs(1)).await;
446 let res = t.len().await.unwrap();
447 assert_eq!(res, 0, "There should be no job");
448
449 t.vacuum().await.unwrap();
450 let res = t.len().await.unwrap();
451 assert_eq!(res, 0, "After vacuuming, there should be nothing");
452 }
453
454 #[tokio::test]
455 async fn integration_test_storage_retry_persists() {
456 use std::io::{Error, ErrorKind};
457 let mut backend = $setup().await;
458 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
459 Err::<String, io::Error>(Error::new(ErrorKind::Other, "oh no!"))
460 });
461 let (mut t, poller) = TestWrapper::new_with_service(backend.clone(), service);
462 tokio::spawn(poller);
463 let res = t.len().await.unwrap();
464 assert_eq!(res, 0, "should have no jobs"); let parts = t.push(1).await.unwrap();
466 let res = t.len().await.unwrap();
467 assert_eq!(res, 1, "should have 1 job"); let res = t.execute_next().await.unwrap();
469 assert_eq!(res.1, Err("oh no!".to_owned()));
470
471 apalis_core::sleep(Duration::from_secs(1)).await;
472
473 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
474 assert_eq!(task.parts.attempt.current(), 1, "should have 1 attempt");
475
476 let res = t
477 .execute_next()
478 .await
479 .expect("Job must be added back to the queue after attempt 1");
480 assert_eq!(res.1, Err("oh no!".to_owned()));
481
482 apalis_core::sleep(Duration::from_secs(1)).await;
483 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
484 assert_eq!(task.parts.attempt.current(), 2, "should have 2 attempts");
485
486 let res = t
487 .execute_next()
488 .await
489 .expect("Job must be added back to the queue after attempt 2");
490 assert_eq!(res.1, Err("oh no!".to_owned()));
491
492 apalis_core::sleep(Duration::from_secs(1)).await;
493 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
494 assert_eq!(task.parts.attempt.current(), 3, "should have 3 attempts");
495
496 let res = t
497 .execute_next()
498 .await
499 .expect("Job must be added back to the queue after attempt 3");
500 assert_eq!(res.1, Err("oh no!".to_owned()));
501 apalis_core::sleep(Duration::from_secs(1)).await;
502
503 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
504 assert_eq!(task.parts.attempt.current(), 4, "should have 4 attempts");
505
506 let res = t
507 .execute_next()
508 .await
509 .expect("Job must be added back to the queue after attempt 5");
510 assert_eq!(res.1, Err("oh no!".to_owned()));
511 apalis_core::sleep(Duration::from_secs(1)).await;
512
513 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
514 assert_eq!(task.parts.attempt.current(), 5, "should have 5 attempts");
515
516 apalis_core::sleep(Duration::from_secs(1)).await;
517
518 let res = t.len().await.unwrap();
519 assert_eq!(res, 0, "should have no job");
521
522 let res = t.try_execute_next();
523 assert!(res.is_err());
524
525 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
526 assert_eq!(
527 task.parts.attempt.current(),
528 5,
529 "should still have 5 attempts"
530 );
531
532 t.vacuum().await.unwrap();
533 let res = t.len().await.unwrap();
534 assert_eq!(res, 0, "After vacuuming, there should be nothing");
535 }
536
537 #[tokio::test]
538 async fn integration_test_storage_abort() {
539 let backend = $setup().await;
540 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
541 Err::<(), _>(Error::Abort(std::sync::Arc::new(Box::new(io::Error::new(
542 io::ErrorKind::InvalidData,
543 "request was invalid",
544 )))))
545 });
546 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
547 tokio::spawn(poller);
548 let res = t.len().await.unwrap();
549 assert_eq!(res, 0); t.push(1).await.unwrap();
551 let res = t.len().await.unwrap();
552 assert_eq!(res, 1); let res = t.execute_next().await.unwrap();
554 assert_eq!(res.1, Err("AbortError: request was invalid".to_owned()));
555 apalis_core::sleep(Duration::from_secs(1)).await;
557 let res = t.len().await.unwrap();
559 assert_eq!(
560 res, 0,
561 "The queue should be empty as the previous task was aborted"
562 );
563
564 t.vacuum().await.unwrap();
565 let res = t.len().await.unwrap();
566 assert_eq!(res, 0, "After vacuuming, there should be nothing");
567 }
568
569 #[tokio::test]
570 async fn integration_test_storage_unexpected_abort() {
571 let backend = $setup().await;
572 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
573 None::<()>.unwrap(); Ok::<_, io::Error>(request.args)
575 });
576 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
577 tokio::spawn(poller);
578 let res = t.len().await.unwrap();
579 assert_eq!(res, 0, "There should be no jobs");
580 let parts = t.push(1).await.unwrap();
581 let res = t.len().await.unwrap();
582 assert_eq!(res, 1, "A job exists");
583 let res = t.execute_next().await;
584 assert_eq!(res, None, "Our worker is dead");
585
586 let res = t.len().await.unwrap();
587 assert_eq!(res, 0, "Job should not have been re-added to the queue");
588
589 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
591 Ok::<_, io::Error>(request.args)
592 });
593 let (mut t, poller) = TestWrapper::new_with_service(t.backend, service);
594 tokio::spawn(poller);
595
596 apalis_core::sleep(Duration::from_secs(1)).await;
597 let res = t
600 .execute_next()
601 .await
602 .expect("Task must have been recovered and added to the queue");
603 assert_eq!(res.1, Ok("1".to_owned()));
604
605 let res = t.len().await.unwrap();
606 assert_eq!(res, 0, "Task should have been consumed");
607
608 t.vacuum().await.unwrap();
609 let res = t.len().await.unwrap();
610 assert_eq!(res, 0, "After vacuuming, there should be nothing");
611 }
612 };
613 }
614}