apalis_core/
lib.rs

1#![crate_name = "apalis_core"]
2#![warn(
3    missing_debug_implementations,
4    missing_docs,
5    rust_2018_idioms,
6    unreachable_pub,
7    bad_style,
8    dead_code,
9    improper_ctypes,
10    non_shorthand_field_patterns,
11    no_mangle_generic_items,
12    overflowing_literals,
13    path_statements,
14    patterns_in_fns_without_body,
15    unconditional_recursion,
16    unused,
17    unused_allocation,
18    unused_comparisons,
19    unused_parens,
20    while_true
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23//! # apalis-core
24//! Utilities for building job and message processing tools.
25/// Represent utilities for creating worker instances.
26pub mod builder;
27
28/// Represents a task source eg Postgres or Redis
29pub mod backend;
30/// Includes all possible error types.
31pub mod error;
32/// Represents middleware offered through [`tower`]
33pub mod layers;
34/// Represents monitoring of running workers
35pub mod monitor;
36/// Represents the request to be processed.
37pub mod request;
38/// Represents different possible responses.
39pub mod response;
40/// Represents a service that is created from a function.
41pub mod service_fn;
42/// Represents ability to persist and consume jobs from storages.
43pub mod storage;
44/// Represents the utils for building workers.
45pub mod worker;
46
47/// Represents the utils needed to extend a task's context.
48pub mod data;
49/// Message queuing utilities
50pub mod mq;
51/// Allows async listening in a mpsc style.
52pub mod notify;
53/// Controlled polling and streaming
54pub mod poller;
55
56/// In-memory utilities for testing and mocking
57pub mod memory;
58
59/// Task management utilities
60pub mod task;
61
62/// Codec for handling data
63pub mod codec;
64
65/// Allows stepped tasks
66pub mod step;
67
68/// Sleep utilities
69#[cfg(feature = "sleep")]
70pub async fn sleep(duration: std::time::Duration) {
71    futures_timer::Delay::new(duration).await;
72}
73
74#[cfg(feature = "sleep")]
75/// Interval utilities
76pub mod interval {
77    use std::fmt;
78    use std::future::Future;
79    use std::pin::Pin;
80    use std::task::{Context, Poll};
81    use std::time::Duration;
82
83    use futures::future::BoxFuture;
84    use futures::Stream;
85
86    use crate::sleep;
87    /// Creates a new stream that yields at a set interval.
88    pub fn interval(duration: Duration) -> Interval {
89        Interval {
90            timer: Box::pin(sleep(duration)),
91            interval: duration,
92        }
93    }
94
95    /// A stream representing notifications at fixed interval
96    #[must_use = "streams do nothing unless polled or .awaited"]
97    pub struct Interval {
98        timer: BoxFuture<'static, ()>,
99        interval: Duration,
100    }
101
102    impl fmt::Debug for Interval {
103        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104            f.debug_struct("Interval")
105                .field("interval", &self.interval)
106                .field("timer", &"a future represented `apalis_core::sleep`")
107                .finish()
108        }
109    }
110
111    impl Stream for Interval {
112        type Item = ();
113
114        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
115            match Pin::new(&mut self.timer).poll(cx) {
116                Poll::Ready(_) => {}
117                Poll::Pending => return Poll::Pending,
118            };
119            let interval = self.interval;
120            let fut = std::mem::replace(&mut self.timer, Box::pin(sleep(interval)));
121            drop(fut);
122            Poll::Ready(Some(()))
123        }
124    }
125}
126
127#[cfg(feature = "test-utils")]
128#[allow(unused)]
129/// Test utilities that allows you to test backends
130pub mod test_utils {
131    use crate::backend::Backend;
132    use crate::builder::{WorkerBuilder, WorkerFactory};
133    use crate::request::Request;
134    use crate::task::task_id::TaskId;
135    use crate::worker::Worker;
136    use futures::channel::mpsc::{self, channel, Receiver, Sender, TryRecvError};
137    use futures::future::BoxFuture;
138    use futures::stream::{Stream, StreamExt};
139    use futures::{FutureExt, SinkExt};
140    use std::fmt::Debug;
141    use std::future::Future;
142    use std::marker::PhantomData;
143    use std::ops::{Deref, DerefMut};
144    use std::pin::Pin;
145    use std::sync::atomic::{AtomicBool, Ordering};
146    use std::sync::Arc;
147    use std::task::{Context, Poll};
148    use tower::{Layer, Service, ServiceBuilder};
149
150    /// Define a dummy service
151    #[derive(Debug, Clone)]
152    pub struct DummyService;
153
154    impl<Request: Send + 'static> Service<Request> for DummyService {
155        type Response = Request;
156        type Error = std::convert::Infallible;
157        type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
158
159        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
160            Poll::Ready(Ok(()))
161        }
162
163        fn call(&mut self, req: Request) -> Self::Future {
164            let fut = async move { Ok(req) };
165            Box::pin(fut)
166        }
167    }
168
169    /// A generic backend wrapper that polls and executes jobs
170    #[derive(Debug)]
171    pub struct TestWrapper<B, Req, Res> {
172        stop_tx: Sender<()>,
173        res_rx: Receiver<(TaskId, Result<String, String>)>,
174        _p: PhantomData<Req>,
175        _r: PhantomData<Res>,
176        should_next: Arc<AtomicBool>,
177        /// The inner backend
178        pub backend: B,
179        /// The inner worker
180        pub worker: Worker<crate::worker::Context>,
181    }
182    /// A test wrapper to allow you to test without requiring a worker.
183    /// Important for testing backends and jobs
184    /// # Example
185    /// ```no_run
186    /// #[cfg(tests)]
187    /// mod tests {
188    ///    use crate::{
189    ///        error::Error, memory::MemoryStorage, mq::MessageQueue, service_fn::service_fn,
190    ///    };
191    ///
192    ///    use super::*;
193    ///
194    ///    async fn is_even(req: usize) -> Result<(), Error> {
195    ///        if req % 2 == 0 {
196    ///            Ok(())
197    ///        } else {
198    ///            Err(Error::Abort("Not an even number".to_string()))
199    ///        }
200    ///    }
201    ///
202    ///    #[tokio::test]
203    ///    async fn test_accepts_even() {
204    ///        let backend = MemoryStorage::new();
205    ///        let (mut tester, poller) = TestWrapper::new_with_service(backend, service_fn(is_even));
206    ///        tokio::spawn(poller);
207    ///        tester.enqueue(42usize).await.unwrap();
208    ///        assert_eq!(tester.size().await.unwrap(), 1);
209    ///        let (_, resp) = tester.execute_next().await.unwrap();
210    ///        assert_eq!(resp, Ok("()".to_string()));
211    ///    }
212    ///}
213    /// ````
214
215    impl<B, Req, Res, Ctx> TestWrapper<B, Request<Req, Ctx>, Res>
216    where
217        B: Backend<Request<Req, Ctx>> + Send + Sync + 'static + Clone,
218        Req: Send + 'static,
219        Ctx: Send,
220        B::Stream: Send + 'static,
221        B::Stream: Stream<Item = Result<Option<Request<Req, Ctx>>, crate::error::Error>> + Unpin,
222        Res: Debug,
223    {
224        /// Build a new instance provided a custom service
225    pub fn new_with_service<Svc>(backend: B, service: Svc) -> (Self, BoxFuture<'static, ()>)
226    where
227        Svc::Future: Send,
228        Svc: Send + Sync + Service<Request<Req, Ctx>, Response = Res> + 'static,
229        Req: Send + Sync + 'static,
230        Svc::Response: Send + Sync + 'static,
231        Svc::Error: Send + Sync + std::error::Error,
232        Ctx: Send + Sync + 'static,
233        B: Backend<Request<Req, Ctx>> + 'static,
234        B::Layer: Layer<Svc>,
235        <B::Layer as Layer<Svc>>::Service: Service<Request<Req, Ctx>, Response = Res> + Send + Sync,
236        B::Stream: Unpin + Send,
237        Res: Send,
238        <<B::Layer as Layer<Svc>>::Service as Service<Request<Req, Ctx>>>::Future: Send,
239        <<B::Layer as Layer<Svc>>::Service as Service<Request<Req, Ctx>>>::Error:
240            Send + Sync + std::error::Error,
241        B::Layer: Layer<TestEmitService<Svc>>,
242        <B::Layer as Layer<TestEmitService<Svc>>>::Service:
243            Service<Request<Req, Ctx>, Response = Res> + Send + Sync,
244        <<B::Layer as Layer<TestEmitService<Svc>>>::Service as Service<Request<Req, Ctx>>>::Future:
245            Send,
246        <<B::Layer as Layer<TestEmitService<Svc>>>::Service as Service<Request<Req, Ctx>>>::Error:
247            Send + Sync + Sync + std::error::Error,
248    {
249            let (mut res_tx, res_rx) = channel(10);
250            let should_next = Arc::new(AtomicBool::new(false));
251            let service = ServiceBuilder::new()
252                .layer_fn(|service| TestEmitService {
253                    service,
254                    tx: res_tx.clone(),
255                    should_next: should_next.clone(),
256                })
257                .service(service);
258            let worker = WorkerBuilder::new("test-worker")
259                .backend(backend.clone())
260                .build(service)
261                .run();
262            let handle = worker.get_handle();
263            let (stop_tx, mut stop_rx) = channel::<()>(1);
264
265            let poller = async move {
266                let worker = worker.shared();
267                loop {
268                    futures::select! {
269                        _ = stop_rx.next().fuse() => break,
270                        _ = worker.clone().fuse() => {
271
272                        },
273                    }
274                }
275                res_tx.close_channel();
276            };
277            (
278                TestWrapper {
279                    stop_tx,
280                    res_rx,
281                    _p: PhantomData,
282                    backend,
283                    _r: PhantomData,
284                    should_next,
285                    worker: handle,
286                },
287                poller.boxed(),
288            )
289        }
290
291        /// Stop polling
292        pub fn stop(mut self) {
293            self.stop_tx.try_send(()).unwrap();
294        }
295
296        /// Gets the current state of results
297        pub async fn execute_next(&mut self) -> Option<(TaskId, Result<String, String>)> {
298            self.should_next.store(true, Ordering::Release);
299            self.res_rx.next().await
300        }
301
302        /// Gets the current state of results
303        pub fn try_execute_next(
304            &mut self,
305        ) -> Result<Option<(TaskId, Result<String, String>)>, TryRecvError> {
306            self.should_next.store(true, Ordering::Release);
307            self.res_rx.try_next().map(|res| {
308                self.should_next.store(false, Ordering::Release);
309                res
310            })
311        }
312    }
313
314    impl<B, Req, Res, Ctx> Deref for TestWrapper<B, Request<Req, Ctx>, Res>
315    where
316        B: Backend<Request<Req, Ctx>>,
317    {
318        type Target = B;
319
320        fn deref(&self) -> &Self::Target {
321            &self.backend
322        }
323    }
324
325    impl<B, Req, Ctx, Res> DerefMut for TestWrapper<B, Request<Req, Ctx>, Res>
326    where
327        B: Backend<Request<Req, Ctx>>,
328    {
329        fn deref_mut(&mut self) -> &mut Self::Target {
330            &mut self.backend
331        }
332    }
333
334    /// A generic service that emits the result of a test
335    #[derive(Debug, Clone)]
336    pub struct TestEmitService<S> {
337        tx: mpsc::Sender<(TaskId, Result<String, String>)>,
338        service: S,
339        should_next: Arc<AtomicBool>,
340    }
341
342    impl<S, Req, Ctx> Service<Request<Req, Ctx>> for TestEmitService<S>
343    where
344        S: Service<Request<Req, Ctx>> + Send + 'static,
345        S::Future: Send + 'static,
346        Req: Send + 'static,
347        S::Response: Debug + Send + Sync,
348        S::Error: std::error::Error + Send,
349        Ctx: Send + 'static,
350    {
351        type Response = S::Response;
352        type Error = S::Error;
353        type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
354
355        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
356            if self.should_next.load(Ordering::Relaxed) {
357                self.service.poll_ready(cx)
358            } else {
359                Poll::Pending
360            }
361        }
362
363        fn call(&mut self, req: Request<Req, Ctx>) -> Self::Future {
364            self.should_next.store(false, Ordering::Relaxed);
365            let task_id = req.parts.task_id.clone();
366            let mut tx = Clone::clone(&self.tx);
367            let fut = self.service.call(req);
368            Box::pin(async move {
369                let res = fut.await;
370                match &res {
371                    Ok(res) => {
372                        tx.send((task_id, Ok(format!("{res:?}")))).await.unwrap();
373                    }
374                    Err(err) => {
375                        tx.send((task_id, Err(err.to_string()))).await.unwrap();
376                    }
377                }
378                res
379            })
380        }
381    }
382
383    pub use tower::service_fn as apalis_test_service_fn;
384
385    #[macro_export]
386    /// Tests a generic mq
387    macro_rules! test_message_queue {
388        ($backend_instance:expr) => {
389            #[tokio::test]
390            async fn it_works_as_an_mq_backend() {
391                let backend = $backend_instance;
392                let service = apalis_test_service_fn(|request: Request<u32, ()>| async {
393                    Ok::<_, io::Error>(request)
394                });
395                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
396                tokio::spawn(poller);
397                t.enqueue(1).await.unwrap();
398                tokio::time::sleep(Duration::from_secs(1)).await;
399                let _res = t.execute_next().await.unwrap();
400                // assert_eq!(res.len(), 1); // One job is done
401            }
402        };
403    }
404    #[macro_export]
405    /// Tests a generic storage
406    macro_rules! generic_storage_test {
407        ($setup:path ) => {
408            #[tokio::test]
409            async fn integration_test_storage_push_and_consume() {
410                let backend = $setup().await;
411                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
412                    Ok::<_, io::Error>(request.args)
413                });
414                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
415                tokio::spawn(poller);
416                let res = t.len().await.unwrap();
417                assert_eq!(res, 0, "There should be no jobs");
418                t.push(1).await.unwrap();
419                let res = t.len().await.unwrap();
420                assert_eq!(res, 1, "There should be 1 job");
421                let res = t.execute_next().await.unwrap();
422                assert_eq!(res.1, Ok("1".to_owned()));
423
424                apalis_core::sleep(Duration::from_secs(1)).await;
425                let res = t.len().await.unwrap();
426                assert_eq!(res, 0, "There should be no jobs");
427
428                t.vacuum().await.unwrap();
429            }
430            #[tokio::test]
431            async fn integration_test_storage_vacuum() {
432                let backend = $setup().await;
433                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
434                    Ok::<_, io::Error>(request.args)
435                });
436                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
437                tokio::spawn(poller);
438                let res = t.len().await.unwrap();
439                assert_eq!(res, 0, "There should be no jobs");
440                t.push(1).await.unwrap();
441                let res = t.len().await.unwrap();
442                assert_eq!(res, 1, "A job exists");
443                let res = t.execute_next().await.unwrap();
444                assert_eq!(res.1, Ok("1".to_owned()));
445                apalis_core::sleep(Duration::from_secs(1)).await;
446                let res = t.len().await.unwrap();
447                assert_eq!(res, 0, "There should be no job");
448
449                t.vacuum().await.unwrap();
450                let res = t.len().await.unwrap();
451                assert_eq!(res, 0, "After vacuuming, there should be nothing");
452            }
453
454            #[tokio::test]
455            async fn integration_test_storage_retry_persists() {
456                use std::io::{Error, ErrorKind};
457                let mut backend = $setup().await;
458                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
459                    Err::<String, io::Error>(Error::new(ErrorKind::Other, "oh no!"))
460                });
461                let (mut t, poller) = TestWrapper::new_with_service(backend.clone(), service);
462                tokio::spawn(poller);
463                let res = t.len().await.unwrap();
464                assert_eq!(res, 0, "should have no jobs"); // No jobs
465                let parts = t.push(1).await.unwrap();
466                let res = t.len().await.unwrap();
467                assert_eq!(res, 1, "should have 1 job"); // A job exists
468                let res = t.execute_next().await.unwrap();
469                assert_eq!(res.1, Err("oh no!".to_owned()));
470
471                apalis_core::sleep(Duration::from_secs(1)).await;
472
473                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
474                assert_eq!(task.parts.attempt.current(), 1, "should have 1 attempt");
475
476                let res = t
477                    .execute_next()
478                    .await
479                    .expect("Job must be added back to the queue after attempt 1");
480                assert_eq!(res.1, Err("oh no!".to_owned()));
481
482                apalis_core::sleep(Duration::from_secs(1)).await;
483                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
484                assert_eq!(task.parts.attempt.current(), 2, "should have 2 attempts");
485
486                let res = t
487                    .execute_next()
488                    .await
489                    .expect("Job must be added back to the queue after attempt 2");
490                assert_eq!(res.1, Err("oh no!".to_owned()));
491
492                apalis_core::sleep(Duration::from_secs(1)).await;
493                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
494                assert_eq!(task.parts.attempt.current(), 3, "should have 3 attempts");
495
496                let res = t
497                    .execute_next()
498                    .await
499                    .expect("Job must be added back to the queue after attempt 3");
500                assert_eq!(res.1, Err("oh no!".to_owned()));
501                apalis_core::sleep(Duration::from_secs(1)).await;
502
503                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
504                assert_eq!(task.parts.attempt.current(), 4, "should have 4 attempts");
505
506                let res = t
507                    .execute_next()
508                    .await
509                    .expect("Job must be added back to the queue after attempt 5");
510                assert_eq!(res.1, Err("oh no!".to_owned()));
511                apalis_core::sleep(Duration::from_secs(1)).await;
512
513                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
514                assert_eq!(task.parts.attempt.current(), 5, "should have 5 attempts");
515
516                apalis_core::sleep(Duration::from_secs(1)).await;
517
518                let res = t.len().await.unwrap();
519                // Integration tests should include a max of 5 retries after that job should be aborted
520                assert_eq!(res, 0, "should have no job");
521
522                let res = t.try_execute_next();
523                assert!(res.is_err());
524
525                let task = backend.fetch_by_id(&parts.task_id).await.unwrap().unwrap();
526                assert_eq!(
527                    task.parts.attempt.current(),
528                    5,
529                    "should still have 5 attempts"
530                );
531
532                t.vacuum().await.unwrap();
533                let res = t.len().await.unwrap();
534                assert_eq!(res, 0, "After vacuuming, there should be nothing");
535            }
536
537            #[tokio::test]
538            async fn integration_test_storage_abort() {
539                let backend = $setup().await;
540                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
541                    Err::<(), _>(Error::Abort(std::sync::Arc::new(Box::new(io::Error::new(
542                        io::ErrorKind::InvalidData,
543                        "request was invalid",
544                    )))))
545                });
546                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
547                tokio::spawn(poller);
548                let res = t.len().await.unwrap();
549                assert_eq!(res, 0); // No jobs
550                t.push(1).await.unwrap();
551                let res = t.len().await.unwrap();
552                assert_eq!(res, 1); // A job exists
553                let res = t.execute_next().await.unwrap();
554                assert_eq!(res.1, Err("AbortError: request was invalid".to_owned()));
555                // Allow lazy storages to sync
556                apalis_core::sleep(Duration::from_secs(1)).await;
557                // Rechecking the queue len should return 0
558                let res = t.len().await.unwrap();
559                assert_eq!(
560                    res, 0,
561                    "The queue should be empty as the previous task was aborted"
562                );
563
564                t.vacuum().await.unwrap();
565                let res = t.len().await.unwrap();
566                assert_eq!(res, 0, "After vacuuming, there should be nothing");
567            }
568
569            #[tokio::test]
570            async fn integration_test_storage_unexpected_abort() {
571                let backend = $setup().await;
572                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
573                    None::<()>.unwrap(); // unexpected abort
574                    Ok::<_, io::Error>(request.args)
575                });
576                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
577                tokio::spawn(poller);
578                let res = t.len().await.unwrap();
579                assert_eq!(res, 0, "There should be no jobs");
580                let parts = t.push(1).await.unwrap();
581                let res = t.len().await.unwrap();
582                assert_eq!(res, 1, "A job exists");
583                let res = t.execute_next().await;
584                assert_eq!(res, None, "Our worker is dead");
585
586                let res = t.len().await.unwrap();
587                assert_eq!(res, 0, "Job should not have been re-added to the queue");
588
589                // We start a healthy worker
590                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
591                    Ok::<_, io::Error>(request.args)
592                });
593                let (mut t, poller) = TestWrapper::new_with_service(t.backend, service);
594                tokio::spawn(poller);
595
596                apalis_core::sleep(Duration::from_secs(1)).await;
597                // This is testing resuming the same worker
598                // This ensures that the worker resumed any jobs lost during an interruption
599                let res = t
600                    .execute_next()
601                    .await
602                    .expect("Task must have been recovered and added to the queue");
603                assert_eq!(res.1, Ok("1".to_owned()));
604
605                let res = t.len().await.unwrap();
606                assert_eq!(res, 0, "Task should have been consumed");
607
608                t.vacuum().await.unwrap();
609                let res = t.len().await.unwrap();
610                assert_eq!(res, 0, "After vacuuming, there should be nothing");
611            }
612        };
613    }
614}