apalis_core/
lib.rs

1#![crate_name = "apalis_core"]
2#![warn(
3    missing_debug_implementations,
4    missing_docs,
5    rust_2018_idioms,
6    unreachable_pub,
7    bad_style,
8    dead_code,
9    improper_ctypes,
10    non_shorthand_field_patterns,
11    no_mangle_generic_items,
12    overflowing_literals,
13    path_statements,
14    patterns_in_fns_without_body,
15    unconditional_recursion,
16    unused,
17    unused_allocation,
18    unused_comparisons,
19    unused_parens,
20    while_true
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23//! # apalis-core
24//! Utilities for building job and message processing tools.
25/// Represent utilities for creating worker instances.
26pub mod builder;
27
28/// Represents a task source eg Postgres or Redis
29pub mod backend;
30/// Includes all possible error types.
31pub mod error;
32/// Represents middleware offered through [`tower`]
33pub mod layers;
34/// Represents monitoring of running workers
35pub mod monitor;
36/// Represents the request to be processed.
37pub mod request;
38/// Represents different possible responses.
39pub mod response;
40/// Represents a service that is created from a function.
41pub mod service_fn;
42/// Represents ability to persist and consume jobs from storages.
43pub mod storage;
44/// Represents the utils for building workers.
45pub mod worker;
46
47/// Represents the utils needed to extend a task's context.
48pub mod data;
49/// Message queuing utilities
50pub mod mq;
51/// Allows async listening in a mpsc style.
52pub mod notify;
53/// Controlled polling and streaming
54pub mod poller;
55
56/// In-memory utilities for testing and mocking
57pub mod memory;
58
59/// Task management utilities
60pub mod task;
61
62/// Codec for handling data
63pub mod codec;
64
65/// Allows stepped tasks
66pub mod step;
67
68/// Sleep utilities
69#[cfg(feature = "sleep")]
70pub async fn sleep(duration: std::time::Duration) {
71    futures_timer::Delay::new(duration).await;
72}
73
74#[cfg(feature = "sleep")]
75/// Interval utilities
76pub mod interval {
77    use std::fmt;
78    use std::future::Future;
79    use std::pin::Pin;
80    use std::task::{Context, Poll};
81    use std::time::Duration;
82
83    use futures::future::BoxFuture;
84    use futures::Stream;
85
86    use crate::sleep;
87    /// Creates a new stream that yields at a set interval.
88    pub fn interval(duration: Duration) -> Interval {
89        Interval {
90            timer: Box::pin(sleep(duration)),
91            interval: duration,
92        }
93    }
94
95    /// A stream representing notifications at fixed interval
96    #[must_use = "streams do nothing unless polled or .awaited"]
97    pub struct Interval {
98        timer: BoxFuture<'static, ()>,
99        interval: Duration,
100    }
101
102    impl fmt::Debug for Interval {
103        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104            f.debug_struct("Interval")
105                .field("interval", &self.interval)
106                .field("timer", &"a future represented `apalis_core::sleep`")
107                .finish()
108        }
109    }
110
111    impl Stream for Interval {
112        type Item = ();
113
114        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
115            match Pin::new(&mut self.timer).poll(cx) {
116                Poll::Ready(_) => {}
117                Poll::Pending => return Poll::Pending,
118            };
119            let interval = self.interval;
120            let fut = std::mem::replace(&mut self.timer, Box::pin(sleep(interval)));
121            drop(fut);
122            Poll::Ready(Some(()))
123        }
124    }
125}
126
127#[cfg(feature = "test-utils")]
128/// Test utilities that allows you to test backends
129pub mod test_utils {
130    use crate::backend::Backend;
131    use crate::builder::{WorkerBuilder, WorkerFactory};
132    use crate::request::Request;
133    use crate::task::task_id::TaskId;
134    use crate::worker::Worker;
135    use futures::channel::mpsc::{self, channel, Receiver, Sender, TryRecvError};
136    use futures::future::{BoxFuture, Either};
137    use futures::stream::{Stream, StreamExt};
138    use futures::{FutureExt, SinkExt};
139    use std::fmt::Debug;
140    use std::future::Future;
141    use std::marker::PhantomData;
142    use std::ops::{Deref, DerefMut};
143    use std::pin::Pin;
144    use std::sync::atomic::{AtomicBool, Ordering};
145    use std::sync::Arc;
146    use std::task::{Context, Poll};
147    use tower::{Layer, Service, ServiceBuilder};
148
149    /// Define a dummy service
150    #[derive(Debug, Clone)]
151    pub struct DummyService;
152
153    impl<Request: Send + 'static> Service<Request> for DummyService {
154        type Response = Request;
155        type Error = std::convert::Infallible;
156        type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
157
158        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
159            Poll::Ready(Ok(()))
160        }
161
162        fn call(&mut self, req: Request) -> Self::Future {
163            let fut = async move { Ok(req) };
164            Box::pin(fut)
165        }
166    }
167
168    /// A generic backend wrapper that polls and executes jobs
169    #[derive(Debug)]
170    pub struct TestWrapper<B, Req, Res> {
171        stop_tx: Sender<()>,
172        res_rx: Receiver<(TaskId, Result<String, String>)>,
173        _p: PhantomData<Req>,
174        _r: PhantomData<Res>,
175        should_next: Arc<AtomicBool>,
176        /// The inner backend
177        pub backend: B,
178        /// The inner worker
179        pub worker: Worker<crate::worker::Context>,
180    }
181    /// A test wrapper to allow you to test without requiring a worker.
182    /// Important for testing backends and jobs
183    /// # Example
184    /// ```no_run
185    /// #[cfg(tests)]
186    /// mod tests {
187    ///    use crate::{
188    ///        error::Error, memory::MemoryStorage, mq::MessageQueue, service_fn::service_fn,
189    ///    };
190    ///
191    ///    use super::*;
192    ///
193    ///    async fn is_even(req: usize) -> Result<(), Error> {
194    ///        if req % 2 == 0 {
195    ///            Ok(())
196    ///        } else {
197    ///            Err(Error::Abort("Not an even number".to_string()))
198    ///        }
199    ///    }
200    ///
201    ///    #[tokio::test]
202    ///    async fn test_accepts_even() {
203    ///        let backend = MemoryStorage::new();
204    ///        let (mut tester, poller) = TestWrapper::new_with_service(backend, service_fn(is_even));
205    ///        tokio::spawn(poller);
206    ///        tester.enqueue(42usize).await.unwrap();
207    ///        assert_eq!(tester.size().await.unwrap(), 1);
208    ///        let (_, resp) = tester.execute_next().await.unwrap();
209    ///        assert_eq!(resp, Ok("()".to_string()));
210    ///    }
211    ///}
212    /// ````
213
214    impl<B, Req, Res, Ctx> TestWrapper<B, Request<Req, Ctx>, Res>
215    where
216        B: Backend<Request<Req, Ctx>> + Send + Sync + 'static + Clone,
217        Req: Send + 'static,
218        Ctx: Send,
219        B::Stream: Send + 'static,
220        B::Stream: Stream<Item = Result<Option<Request<Req, Ctx>>, crate::error::Error>> + Unpin,
221        Res: Debug,
222    {
223        /// Build a new instance provided a custom service
224    pub fn new_with_service<Svc>(backend: B, service: Svc) -> (Self, BoxFuture<'static, ()>)
225    where
226        Svc::Future: Send,
227        Svc: Send + Sync + Service<Request<Req, Ctx>, Response = Res> + 'static,
228        Req: Send + Sync + 'static,
229        Svc::Response: Send + Sync + 'static,
230        Svc::Error: Send + Sync + std::error::Error,
231        Ctx: Send + Sync + 'static,
232        B: Backend<Request<Req, Ctx>> + 'static,
233        B::Layer: Layer<Svc>,
234        <B::Layer as Layer<Svc>>::Service: Service<Request<Req, Ctx>, Response = Res> + Send + Sync,
235        B::Stream: Unpin + Send,
236        Res: Send,
237        <<B::Layer as Layer<Svc>>::Service as Service<Request<Req, Ctx>>>::Future: Send,
238        <<B::Layer as Layer<Svc>>::Service as Service<Request<Req, Ctx>>>::Error:
239            Send + Sync + std::error::Error,
240        B::Layer: Layer<TestEmitService<Svc>>,
241        <B::Layer as Layer<TestEmitService<Svc>>>::Service:
242            Service<Request<Req, Ctx>, Response = Res> + Send + Sync,
243        <<B::Layer as Layer<TestEmitService<Svc>>>::Service as Service<Request<Req, Ctx>>>::Future:
244            Send,
245        <<B::Layer as Layer<TestEmitService<Svc>>>::Service as Service<Request<Req, Ctx>>>::Error:
246            Send + Sync + Sync + std::error::Error,
247    {
248            let (mut res_tx, res_rx) = channel(10);
249            let should_next = Arc::new(AtomicBool::new(false));
250            let service = ServiceBuilder::new()
251                .layer_fn(|service| TestEmitService {
252                    service,
253                    tx: res_tx.clone(),
254                    should_next: should_next.clone(),
255                })
256                .service(service);
257            let worker = WorkerBuilder::new("test-worker")
258                .backend(backend.clone())
259                .build(service)
260                .run();
261            let handle = worker.get_handle();
262            let (stop_tx, mut stop_rx) = channel::<()>(1);
263
264            let poller = async move {
265                let worker = worker.shared();
266                loop {
267                    futures::select! {
268                        _ = stop_rx.next().fuse() => break,
269                        _ = worker.clone().fuse() => {
270
271                        },
272                    }
273                }
274                res_tx.close_channel();
275            };
276            (
277                TestWrapper {
278                    stop_tx,
279                    res_rx,
280                    _p: PhantomData,
281                    backend,
282                    _r: PhantomData,
283                    should_next,
284                    worker: handle,
285                },
286                poller.boxed(),
287            )
288        }
289
290        /// Stop polling
291        pub fn stop(mut self) {
292            self.stop_tx.try_send(()).unwrap();
293        }
294
295        /// Gets the current state of results
296        pub async fn execute_next(&mut self) -> Option<(TaskId, Result<String, String>)> {
297            self.should_next.store(true, Ordering::Release);
298            #[cfg(feature = "sleep")]
299            let fut = async {
300                crate::sleep(std::time::Duration::from_secs(2))
301                    .boxed()
302                    .await;
303            }
304            .boxed();
305            #[cfg(not(feature = "sleep"))]
306            let fut = async {
307                std::future::pending::<()>().await;
308            }
309            .boxed();
310
311            let res = futures::future::select(self.res_rx.next(), fut).await;
312            match res {
313                Either::Left(next) => next.0,
314                Either::Right(_) => None,
315            }
316        }
317
318        /// Gets the current state of results
319        pub fn try_execute_next(
320            &mut self,
321        ) -> Result<Option<(TaskId, Result<String, String>)>, TryRecvError> {
322            self.should_next.store(true, Ordering::Release);
323            self.res_rx.try_next().map(|res| {
324                self.should_next.store(false, Ordering::Release);
325                res
326            })
327        }
328    }
329
330    impl<B, Req, Res, Ctx> Deref for TestWrapper<B, Request<Req, Ctx>, Res>
331    where
332        B: Backend<Request<Req, Ctx>>,
333    {
334        type Target = B;
335
336        fn deref(&self) -> &Self::Target {
337            &self.backend
338        }
339    }
340
341    impl<B, Req, Ctx, Res> DerefMut for TestWrapper<B, Request<Req, Ctx>, Res>
342    where
343        B: Backend<Request<Req, Ctx>>,
344    {
345        fn deref_mut(&mut self) -> &mut Self::Target {
346            &mut self.backend
347        }
348    }
349
350    /// A generic service that emits the result of a test
351    #[derive(Debug, Clone)]
352    pub struct TestEmitService<S> {
353        tx: mpsc::Sender<(TaskId, Result<String, String>)>,
354        service: S,
355        should_next: Arc<AtomicBool>,
356    }
357
358    impl<S, Req, Ctx> Service<Request<Req, Ctx>> for TestEmitService<S>
359    where
360        S: Service<Request<Req, Ctx>> + Send + 'static,
361        S::Future: Send + 'static,
362        Req: Send + 'static,
363        S::Response: Debug + Send + Sync,
364        S::Error: std::error::Error + Send,
365        Ctx: Send + 'static,
366    {
367        type Response = S::Response;
368        type Error = S::Error;
369        type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
370
371        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
372            if self.should_next.load(Ordering::Relaxed) {
373                self.service.poll_ready(cx)
374            } else {
375                Poll::Pending
376            }
377        }
378
379        fn call(&mut self, req: Request<Req, Ctx>) -> Self::Future {
380            self.should_next.store(false, Ordering::Relaxed);
381            let task_id = req.parts.task_id.clone();
382            let mut tx = Clone::clone(&self.tx);
383            let fut = self.service.call(req);
384            Box::pin(async move {
385                let res = fut.await;
386                match &res {
387                    Ok(res) => {
388                        tx.send((task_id, Ok(format!("{res:?}")))).await.unwrap();
389                    }
390                    Err(err) => {
391                        tx.send((task_id, Err(err.to_string()))).await.unwrap();
392                    }
393                }
394                res
395            })
396        }
397    }
398
399    pub use tower::service_fn as apalis_test_service_fn;
400
401    #[macro_export]
402    /// Tests a generic mq
403    macro_rules! test_message_queue {
404        ($backend_instance:expr) => {
405            #[tokio::test]
406            async fn it_works_as_an_mq_backend() {
407                let backend = $backend_instance;
408                let service = apalis_test_service_fn(|request: Request<u32, ()>| async {
409                    Ok::<_, io::Error>(request)
410                });
411                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
412                tokio::spawn(poller);
413                t.enqueue(1).await.unwrap();
414                tokio::time::sleep(Duration::from_secs(1)).await;
415                let _res = t.execute_next().await.unwrap();
416                // assert_eq!(res.len(), 1); // One job is done
417            }
418        };
419    }
420    #[macro_export]
421    /// Tests a generic storage
422    macro_rules! generic_storage_test {
423        ($setup:path ) => {
424            #[tokio::test]
425            async fn integration_test_storage_push_and_consume() {
426                let backend = $setup().await;
427                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
428                    Ok::<_, io::Error>(request.args)
429                });
430                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
431                tokio::spawn(poller);
432                let res = t.len().await.unwrap();
433                assert_eq!(res, 0, "There should be no jobs");
434                t.push(1).await.unwrap();
435                let res = t.len().await.unwrap();
436                assert_eq!(res, 1, "There should be 1 job");
437                let res = t.execute_next().await.unwrap();
438                assert_eq!(res.1, Ok("1".to_owned()));
439
440                apalis_core::sleep(Duration::from_secs(1)).await;
441                let res = t.len().await.unwrap();
442                assert_eq!(res, 0, "There should be no jobs");
443
444                t.vacuum().await.unwrap();
445            }
446            #[tokio::test]
447            async fn integration_test_storage_vacuum() {
448                let backend = $setup().await;
449                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
450                    Ok::<_, io::Error>(request.args)
451                });
452                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
453                tokio::spawn(poller);
454                let res = t.len().await.unwrap();
455                assert_eq!(res, 0, "There should be no jobs");
456                t.push(1).await.unwrap();
457                let res = t.len().await.unwrap();
458                assert_eq!(res, 1, "A job exists");
459                let res = t.execute_next().await.unwrap();
460                assert_eq!(res.1, Ok("1".to_owned()));
461                apalis_core::sleep(Duration::from_secs(1)).await;
462                let res = t.len().await.unwrap();
463                assert_eq!(res, 0, "There should be no job");
464
465                t.vacuum().await.unwrap();
466                let res = t.len().await.unwrap();
467                assert_eq!(res, 0, "After vacuuming, there should be nothing");
468            }
469
470            #[tokio::test]
471            async fn integration_test_storage_retry_persists() {
472                use std::io::{Error, ErrorKind};
473                let mut backend = $setup().await;
474                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
475                    Err::<String, io::Error>(Error::new(ErrorKind::Other, "oh no!"))
476                });
477                let (mut t, poller) = TestWrapper::new_with_service(backend.clone(), service);
478                tokio::spawn(poller);
479                let res = t.len().await.unwrap();
480                assert_eq!(res, 0, "should have no jobs"); // No jobs
481                let parts = t.push(1).await.unwrap();
482                let res = t.len().await.unwrap();
483                assert_eq!(res, 1, "should have 1 job"); // A job exists
484                let res = t.execute_next().await.unwrap();
485                assert_eq!(res.1, Err("oh no!".to_owned()));
486
487                apalis_core::sleep(Duration::from_secs(1)).await;
488
489                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
490                assert_eq!(task.parts.attempt.current(), 1, "should have 1 attempt");
491
492                let res = t
493                    .execute_next()
494                    .await
495                    .expect("Job must be added back to the queue after attempt 1");
496                assert_eq!(res.1, Err("oh no!".to_owned()));
497
498                apalis_core::sleep(Duration::from_secs(1)).await;
499                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
500                assert_eq!(task.parts.attempt.current(), 2, "should have 2 attempts");
501
502                let res = t
503                    .execute_next()
504                    .await
505                    .expect("Job must be added back to the queue after attempt 2");
506                assert_eq!(res.1, Err("oh no!".to_owned()));
507
508                apalis_core::sleep(Duration::from_secs(1)).await;
509                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
510                assert_eq!(task.parts.attempt.current(), 3, "should have 3 attempts");
511
512                let res = t
513                    .execute_next()
514                    .await
515                    .expect("Job must be added back to the queue after attempt 3");
516                assert_eq!(res.1, Err("oh no!".to_owned()));
517                apalis_core::sleep(Duration::from_secs(1)).await;
518
519                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
520                assert_eq!(task.parts.attempt.current(), 4, "should have 4 attempts");
521
522                let res = t
523                    .execute_next()
524                    .await
525                    .expect("Job must be added back to the queue after attempt 5");
526                assert_eq!(res.1, Err("oh no!".to_owned()));
527                apalis_core::sleep(Duration::from_secs(1)).await;
528
529                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
530                assert_eq!(task.parts.attempt.current(), 5, "should have 5 attempts");
531
532                apalis_core::sleep(Duration::from_secs(1)).await;
533
534                let res = t.len().await.unwrap();
535                // Integration tests should include a max of 5 retries after that job should be aborted
536                assert_eq!(res, 0, "should have no job");
537
538                let res = t.try_execute_next();
539                assert!(res.is_err());
540
541                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
542                assert_eq!(
543                    task.parts.attempt.current(),
544                    5,
545                    "should still have 5 attempts"
546                );
547
548                t.vacuum().await.unwrap();
549                let res = t.len().await.unwrap();
550                assert_eq!(res, 0, "After vacuuming, there should be nothing");
551            }
552
553            #[tokio::test]
554            async fn integration_test_storage_abort() {
555                let backend = $setup().await;
556                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
557                    Err::<(), _>(Error::Abort(std::sync::Arc::new(Box::new(io::Error::new(
558                        io::ErrorKind::InvalidData,
559                        "request was invalid",
560                    )))))
561                });
562                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
563                tokio::spawn(poller);
564                let res = t.len().await.unwrap();
565                assert_eq!(res, 0); // No jobs
566                t.push(1).await.unwrap();
567                let res = t.len().await.unwrap();
568                assert_eq!(res, 1); // A job exists
569                let res = t.execute_next().await.unwrap();
570                assert_eq!(res.1, Err("AbortError: request was invalid".to_owned()));
571                // Allow lazy storages to sync
572                apalis_core::sleep(Duration::from_secs(1)).await;
573                // Rechecking the queue len should return 0
574                let res = t.len().await.unwrap();
575                assert_eq!(
576                    res, 0,
577                    "The queue should be empty as the previous task was aborted"
578                );
579
580                t.vacuum().await.unwrap();
581                let res = t.len().await.unwrap();
582                assert_eq!(res, 0, "After vacuuming, there should be nothing");
583            }
584
585            #[tokio::test]
586            async fn integration_test_storage_unexpected_abort() {
587                let backend = $setup().await;
588                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
589                    None::<()>.unwrap(); // unexpected abort
590                    Ok::<_, io::Error>(request.args)
591                });
592                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
593                tokio::spawn(poller);
594                let res = t.len().await.unwrap();
595                assert_eq!(res, 0, "There should be no jobs");
596                let parts = t.push(1).await.unwrap();
597                let res = t.len().await.unwrap();
598                assert_eq!(res, 1, "A job exists");
599                let res = t.execute_next().await;
600                assert_eq!(res, None, "Our worker is dead");
601
602                let res = t.len().await.unwrap();
603                assert_eq!(res, 0, "Job should not have been re-added to the queue");
604
605                // We start a healthy worker
606                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
607                    Ok::<_, io::Error>(request.args)
608                });
609                let (mut t, poller) = TestWrapper::new_with_service(t.backend, service);
610                tokio::spawn(poller);
611
612                apalis_core::sleep(Duration::from_secs(1)).await;
613                // This is testing resuming the same worker
614                // This ensures that the worker resumed any jobs lost during an interruption
615                let res = t
616                    .execute_next()
617                    .await
618                    .expect("Task must have been recovered and added to the queue");
619                assert_eq!(res.1, Ok("1".to_owned()));
620
621                let res = t.len().await.unwrap();
622                assert_eq!(res, 0, "Task should have been consumed");
623
624                t.vacuum().await.unwrap();
625                let res = t.len().await.unwrap();
626                assert_eq!(res, 0, "After vacuuming, there should be nothing");
627            }
628        };
629    }
630}