1#![crate_name = "apalis_core"]
2#![warn(
3 missing_debug_implementations,
4 missing_docs,
5 rust_2018_idioms,
6 unreachable_pub,
7 bad_style,
8 dead_code,
9 improper_ctypes,
10 non_shorthand_field_patterns,
11 no_mangle_generic_items,
12 overflowing_literals,
13 path_statements,
14 patterns_in_fns_without_body,
15 unconditional_recursion,
16 unused,
17 unused_allocation,
18 unused_comparisons,
19 unused_parens,
20 while_true
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23pub mod builder;
27
28pub mod backend;
30pub mod error;
32pub mod layers;
34pub mod monitor;
36pub mod request;
38pub mod response;
40pub mod service_fn;
42pub mod storage;
44pub mod worker;
46
47pub mod data;
49pub mod mq;
51pub mod notify;
53pub mod poller;
55
56pub mod memory;
58
59pub mod task;
61
62pub mod codec;
64
65pub mod step;
67
68#[cfg(feature = "sleep")]
70pub async fn sleep(duration: std::time::Duration) {
71 futures_timer::Delay::new(duration).await;
72}
73
74#[cfg(feature = "sleep")]
75pub mod interval {
77 use std::fmt;
78 use std::future::Future;
79 use std::pin::Pin;
80 use std::task::{Context, Poll};
81 use std::time::Duration;
82
83 use futures::future::BoxFuture;
84 use futures::Stream;
85
86 use crate::sleep;
87 pub fn interval(duration: Duration) -> Interval {
89 Interval {
90 timer: Box::pin(sleep(duration)),
91 interval: duration,
92 }
93 }
94
95 #[must_use = "streams do nothing unless polled or .awaited"]
97 pub struct Interval {
98 timer: BoxFuture<'static, ()>,
99 interval: Duration,
100 }
101
102 impl fmt::Debug for Interval {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 f.debug_struct("Interval")
105 .field("interval", &self.interval)
106 .field("timer", &"a future represented `apalis_core::sleep`")
107 .finish()
108 }
109 }
110
111 impl Stream for Interval {
112 type Item = ();
113
114 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
115 match Pin::new(&mut self.timer).poll(cx) {
116 Poll::Ready(_) => {}
117 Poll::Pending => return Poll::Pending,
118 };
119 let interval = self.interval;
120 let fut = std::mem::replace(&mut self.timer, Box::pin(sleep(interval)));
121 drop(fut);
122 Poll::Ready(Some(()))
123 }
124 }
125}
126
127#[cfg(feature = "test-utils")]
128pub mod test_utils {
130 use crate::backend::Backend;
131 use crate::builder::{WorkerBuilder, WorkerFactory};
132 use crate::request::Request;
133 use crate::task::task_id::TaskId;
134 use crate::worker::Worker;
135 use futures::channel::mpsc::{self, channel, Receiver, Sender, TryRecvError};
136 use futures::future::{BoxFuture, Either};
137 use futures::stream::{Stream, StreamExt};
138 use futures::{FutureExt, SinkExt};
139 use std::fmt::Debug;
140 use std::future::Future;
141 use std::marker::PhantomData;
142 use std::ops::{Deref, DerefMut};
143 use std::pin::Pin;
144 use std::sync::atomic::{AtomicBool, Ordering};
145 use std::sync::Arc;
146 use std::task::{Context, Poll};
147 use tower::{Layer, Service, ServiceBuilder};
148
149 #[derive(Debug, Clone)]
151 pub struct DummyService;
152
153 impl<Request: Send + 'static> Service<Request> for DummyService {
154 type Response = Request;
155 type Error = std::convert::Infallible;
156 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
157
158 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
159 Poll::Ready(Ok(()))
160 }
161
162 fn call(&mut self, req: Request) -> Self::Future {
163 let fut = async move { Ok(req) };
164 Box::pin(fut)
165 }
166 }
167
168 #[derive(Debug)]
170 pub struct TestWrapper<B, Req, Res> {
171 stop_tx: Sender<()>,
172 res_rx: Receiver<(TaskId, Result<String, String>)>,
173 _p: PhantomData<Req>,
174 _r: PhantomData<Res>,
175 should_next: Arc<AtomicBool>,
176 pub backend: B,
178 pub worker: Worker<crate::worker::Context>,
180 }
181 impl<B, Req, Res, Ctx> TestWrapper<B, Request<Req, Ctx>, Res>
215 where
216 B: Backend<Request<Req, Ctx>> + Send + Sync + 'static + Clone,
217 Req: Send + 'static,
218 Ctx: Send,
219 B::Stream: Send + 'static,
220 B::Stream: Stream<Item = Result<Option<Request<Req, Ctx>>, crate::error::Error>> + Unpin,
221 Res: Debug,
222 {
223 pub fn new_with_service<Svc>(backend: B, service: Svc) -> (Self, BoxFuture<'static, ()>)
225 where
226 Svc::Future: Send,
227 Svc: Send + Sync + Service<Request<Req, Ctx>, Response = Res> + 'static,
228 Req: Send + Sync + 'static,
229 Svc::Response: Send + Sync + 'static,
230 Svc::Error: Send + Sync + std::error::Error,
231 Ctx: Send + Sync + 'static,
232 B: Backend<Request<Req, Ctx>> + 'static,
233 B::Layer: Layer<Svc>,
234 <B::Layer as Layer<Svc>>::Service: Service<Request<Req, Ctx>, Response = Res> + Send + Sync,
235 B::Stream: Unpin + Send,
236 Res: Send,
237 <<B::Layer as Layer<Svc>>::Service as Service<Request<Req, Ctx>>>::Future: Send,
238 <<B::Layer as Layer<Svc>>::Service as Service<Request<Req, Ctx>>>::Error:
239 Send + Sync + std::error::Error,
240 B::Layer: Layer<TestEmitService<Svc>>,
241 <B::Layer as Layer<TestEmitService<Svc>>>::Service:
242 Service<Request<Req, Ctx>, Response = Res> + Send + Sync,
243 <<B::Layer as Layer<TestEmitService<Svc>>>::Service as Service<Request<Req, Ctx>>>::Future:
244 Send,
245 <<B::Layer as Layer<TestEmitService<Svc>>>::Service as Service<Request<Req, Ctx>>>::Error:
246 Send + Sync + Sync + std::error::Error,
247 {
248 let (mut res_tx, res_rx) = channel(10);
249 let should_next = Arc::new(AtomicBool::new(false));
250 let service = ServiceBuilder::new()
251 .layer_fn(|service| TestEmitService {
252 service,
253 tx: res_tx.clone(),
254 should_next: should_next.clone(),
255 })
256 .service(service);
257 let worker = WorkerBuilder::new("test-worker")
258 .backend(backend.clone())
259 .build(service)
260 .run();
261 let handle = worker.get_handle();
262 let (stop_tx, mut stop_rx) = channel::<()>(1);
263
264 let poller = async move {
265 let worker = worker.shared();
266 loop {
267 futures::select! {
268 _ = stop_rx.next().fuse() => break,
269 _ = worker.clone().fuse() => {
270
271 },
272 }
273 }
274 res_tx.close_channel();
275 };
276 (
277 TestWrapper {
278 stop_tx,
279 res_rx,
280 _p: PhantomData,
281 backend,
282 _r: PhantomData,
283 should_next,
284 worker: handle,
285 },
286 poller.boxed(),
287 )
288 }
289
290 pub fn stop(mut self) {
292 self.stop_tx.try_send(()).unwrap();
293 }
294
295 pub async fn execute_next(&mut self) -> Option<(TaskId, Result<String, String>)> {
297 self.should_next.store(true, Ordering::Release);
298 #[cfg(feature = "sleep")]
299 let fut = async {
300 crate::sleep(std::time::Duration::from_secs(2))
301 .boxed()
302 .await;
303 }
304 .boxed();
305 #[cfg(not(feature = "sleep"))]
306 let fut = async {
307 std::future::pending::<()>().await;
308 }
309 .boxed();
310
311 let res = futures::future::select(self.res_rx.next(), fut).await;
312 match res {
313 Either::Left(next) => next.0,
314 Either::Right(_) => None,
315 }
316 }
317
318 pub fn try_execute_next(
320 &mut self,
321 ) -> Result<Option<(TaskId, Result<String, String>)>, TryRecvError> {
322 self.should_next.store(true, Ordering::Release);
323 self.res_rx.try_next().map(|res| {
324 self.should_next.store(false, Ordering::Release);
325 res
326 })
327 }
328 }
329
330 impl<B, Req, Res, Ctx> Deref for TestWrapper<B, Request<Req, Ctx>, Res>
331 where
332 B: Backend<Request<Req, Ctx>>,
333 {
334 type Target = B;
335
336 fn deref(&self) -> &Self::Target {
337 &self.backend
338 }
339 }
340
341 impl<B, Req, Ctx, Res> DerefMut for TestWrapper<B, Request<Req, Ctx>, Res>
342 where
343 B: Backend<Request<Req, Ctx>>,
344 {
345 fn deref_mut(&mut self) -> &mut Self::Target {
346 &mut self.backend
347 }
348 }
349
350 #[derive(Debug, Clone)]
352 pub struct TestEmitService<S> {
353 tx: mpsc::Sender<(TaskId, Result<String, String>)>,
354 service: S,
355 should_next: Arc<AtomicBool>,
356 }
357
358 impl<S, Req, Ctx> Service<Request<Req, Ctx>> for TestEmitService<S>
359 where
360 S: Service<Request<Req, Ctx>> + Send + 'static,
361 S::Future: Send + 'static,
362 Req: Send + 'static,
363 S::Response: Debug + Send + Sync,
364 S::Error: std::error::Error + Send,
365 Ctx: Send + 'static,
366 {
367 type Response = S::Response;
368 type Error = S::Error;
369 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
370
371 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
372 if self.should_next.load(Ordering::Relaxed) {
373 self.service.poll_ready(cx)
374 } else {
375 Poll::Pending
376 }
377 }
378
379 fn call(&mut self, req: Request<Req, Ctx>) -> Self::Future {
380 self.should_next.store(false, Ordering::Relaxed);
381 let task_id = req.parts.task_id.clone();
382 let mut tx = Clone::clone(&self.tx);
383 let fut = self.service.call(req);
384 Box::pin(async move {
385 let res = fut.await;
386 match &res {
387 Ok(res) => {
388 tx.send((task_id, Ok(format!("{res:?}")))).await.unwrap();
389 }
390 Err(err) => {
391 tx.send((task_id, Err(err.to_string()))).await.unwrap();
392 }
393 }
394 res
395 })
396 }
397 }
398
399 pub use tower::service_fn as apalis_test_service_fn;
400
401 #[macro_export]
402 macro_rules! test_message_queue {
404 ($backend_instance:expr) => {
405 #[tokio::test]
406 async fn it_works_as_an_mq_backend() {
407 let backend = $backend_instance;
408 let service = apalis_test_service_fn(|request: Request<u32, ()>| async {
409 Ok::<_, io::Error>(request)
410 });
411 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
412 tokio::spawn(poller);
413 t.enqueue(1).await.unwrap();
414 tokio::time::sleep(Duration::from_secs(1)).await;
415 let _res = t.execute_next().await.unwrap();
416 }
418 };
419 }
420 #[macro_export]
421 macro_rules! generic_storage_test {
423 ($setup:path ) => {
424 #[tokio::test]
425 async fn integration_test_storage_push_and_consume() {
426 let backend = $setup().await;
427 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
428 Ok::<_, io::Error>(request.args)
429 });
430 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
431 tokio::spawn(poller);
432 let res = t.len().await.unwrap();
433 assert_eq!(res, 0, "There should be no jobs");
434 t.push(1).await.unwrap();
435 let res = t.len().await.unwrap();
436 assert_eq!(res, 1, "There should be 1 job");
437 let res = t.execute_next().await.unwrap();
438 assert_eq!(res.1, Ok("1".to_owned()));
439
440 apalis_core::sleep(Duration::from_secs(1)).await;
441 let res = t.len().await.unwrap();
442 assert_eq!(res, 0, "There should be no jobs");
443
444 t.vacuum().await.unwrap();
445 }
446 #[tokio::test]
447 async fn integration_test_storage_vacuum() {
448 let backend = $setup().await;
449 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
450 Ok::<_, io::Error>(request.args)
451 });
452 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
453 tokio::spawn(poller);
454 let res = t.len().await.unwrap();
455 assert_eq!(res, 0, "There should be no jobs");
456 t.push(1).await.unwrap();
457 let res = t.len().await.unwrap();
458 assert_eq!(res, 1, "A job exists");
459 let res = t.execute_next().await.unwrap();
460 assert_eq!(res.1, Ok("1".to_owned()));
461 apalis_core::sleep(Duration::from_secs(1)).await;
462 let res = t.len().await.unwrap();
463 assert_eq!(res, 0, "There should be no job");
464
465 t.vacuum().await.unwrap();
466 let res = t.len().await.unwrap();
467 assert_eq!(res, 0, "After vacuuming, there should be nothing");
468 }
469
470 #[tokio::test]
471 async fn integration_test_storage_retry_persists() {
472 use std::io::{Error, ErrorKind};
473 let mut backend = $setup().await;
474 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
475 Err::<String, io::Error>(Error::new(ErrorKind::Other, "oh no!"))
476 });
477 let (mut t, poller) = TestWrapper::new_with_service(backend.clone(), service);
478 tokio::spawn(poller);
479 let res = t.len().await.unwrap();
480 assert_eq!(res, 0, "should have no jobs"); let parts = t.push(1).await.unwrap();
482 let res = t.len().await.unwrap();
483 assert_eq!(res, 1, "should have 1 job"); let res = t.execute_next().await.unwrap();
485 assert_eq!(res.1, Err("oh no!".to_owned()));
486
487 apalis_core::sleep(Duration::from_secs(1)).await;
488
489 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
490 assert_eq!(task.parts.attempt.current(), 1, "should have 1 attempt");
491
492 let res = t
493 .execute_next()
494 .await
495 .expect("Job must be added back to the queue after attempt 1");
496 assert_eq!(res.1, Err("oh no!".to_owned()));
497
498 apalis_core::sleep(Duration::from_secs(1)).await;
499 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
500 assert_eq!(task.parts.attempt.current(), 2, "should have 2 attempts");
501
502 let res = t
503 .execute_next()
504 .await
505 .expect("Job must be added back to the queue after attempt 2");
506 assert_eq!(res.1, Err("oh no!".to_owned()));
507
508 apalis_core::sleep(Duration::from_secs(1)).await;
509 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
510 assert_eq!(task.parts.attempt.current(), 3, "should have 3 attempts");
511
512 let res = t
513 .execute_next()
514 .await
515 .expect("Job must be added back to the queue after attempt 3");
516 assert_eq!(res.1, Err("oh no!".to_owned()));
517 apalis_core::sleep(Duration::from_secs(1)).await;
518
519 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
520 assert_eq!(task.parts.attempt.current(), 4, "should have 4 attempts");
521
522 let res = t
523 .execute_next()
524 .await
525 .expect("Job must be added back to the queue after attempt 5");
526 assert_eq!(res.1, Err("oh no!".to_owned()));
527 apalis_core::sleep(Duration::from_secs(1)).await;
528
529 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
530 assert_eq!(task.parts.attempt.current(), 5, "should have 5 attempts");
531
532 apalis_core::sleep(Duration::from_secs(1)).await;
533
534 let res = t.len().await.unwrap();
535 assert_eq!(res, 0, "should have no job");
537
538 let res = t.try_execute_next();
539 assert!(res.is_err());
540
541 let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
542 assert_eq!(
543 task.parts.attempt.current(),
544 5,
545 "should still have 5 attempts"
546 );
547
548 t.vacuum().await.unwrap();
549 let res = t.len().await.unwrap();
550 assert_eq!(res, 0, "After vacuuming, there should be nothing");
551 }
552
553 #[tokio::test]
554 async fn integration_test_storage_abort() {
555 let backend = $setup().await;
556 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
557 Err::<(), _>(Error::Abort(std::sync::Arc::new(Box::new(io::Error::new(
558 io::ErrorKind::InvalidData,
559 "request was invalid",
560 )))))
561 });
562 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
563 tokio::spawn(poller);
564 let res = t.len().await.unwrap();
565 assert_eq!(res, 0); t.push(1).await.unwrap();
567 let res = t.len().await.unwrap();
568 assert_eq!(res, 1); let res = t.execute_next().await.unwrap();
570 assert_eq!(res.1, Err("AbortError: request was invalid".to_owned()));
571 apalis_core::sleep(Duration::from_secs(1)).await;
573 let res = t.len().await.unwrap();
575 assert_eq!(
576 res, 0,
577 "The queue should be empty as the previous task was aborted"
578 );
579
580 t.vacuum().await.unwrap();
581 let res = t.len().await.unwrap();
582 assert_eq!(res, 0, "After vacuuming, there should be nothing");
583 }
584
585 #[tokio::test]
586 async fn integration_test_storage_unexpected_abort() {
587 let backend = $setup().await;
588 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
589 None::<()>.unwrap(); Ok::<_, io::Error>(request.args)
591 });
592 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
593 tokio::spawn(poller);
594 let res = t.len().await.unwrap();
595 assert_eq!(res, 0, "There should be no jobs");
596 let parts = t.push(1).await.unwrap();
597 let res = t.len().await.unwrap();
598 assert_eq!(res, 1, "A job exists");
599 let res = t.execute_next().await;
600 assert_eq!(res, None, "Our worker is dead");
601
602 let res = t.len().await.unwrap();
603 assert_eq!(res, 0, "Job should not have been re-added to the queue");
604
605 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
607 Ok::<_, io::Error>(request.args)
608 });
609 let (mut t, poller) = TestWrapper::new_with_service(t.backend, service);
610 tokio::spawn(poller);
611
612 apalis_core::sleep(Duration::from_secs(1)).await;
613 let res = t
616 .execute_next()
617 .await
618 .expect("Task must have been recovered and added to the queue");
619 assert_eq!(res.1, Ok("1".to_owned()));
620
621 let res = t.len().await.unwrap();
622 assert_eq!(res, 0, "Task should have been consumed");
623
624 t.vacuum().await.unwrap();
625 let res = t.len().await.unwrap();
626 assert_eq!(res, 0, "After vacuuming, there should be nothing");
627 }
628 };
629 }
630}