apalis_core/
lib.rs

1#![crate_name = "apalis_core"]
2#![warn(
3    missing_debug_implementations,
4    missing_docs,
5    rust_2018_idioms,
6    unreachable_pub,
7    bad_style,
8    dead_code,
9    improper_ctypes,
10    non_shorthand_field_patterns,
11    no_mangle_generic_items,
12    overflowing_literals,
13    path_statements,
14    patterns_in_fns_without_body,
15    unconditional_recursion,
16    unused,
17    unused_allocation,
18    unused_comparisons,
19    unused_parens,
20    while_true
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23//! # apalis-core
24//! Utilities for building job and message processing tools.
25/// Represent utilities for creating worker instances.
26pub mod builder;
27
28/// Represents a task source eg Postgres or Redis
29pub mod backend;
30/// Includes all possible error types.
31pub mod error;
32/// Represents middleware offered through [`tower`]
33pub mod layers;
34/// Represents monitoring of running workers
35pub mod monitor;
36/// Represents the request to be processed.
37pub mod request;
38/// Represents different possible responses.
39pub mod response;
40/// Represents a service that is created from a function.
41pub mod service_fn;
42/// Represents ability to persist and consume jobs from storages.
43pub mod storage;
44/// Represents the utils for building workers.
45pub mod worker;
46
47/// Represents the utils needed to extend a task's context.
48pub mod data;
49/// Message queuing utilities
50pub mod mq;
51/// Allows async listening in a mpsc style.
52pub mod notify;
53/// Controlled polling and streaming
54pub mod poller;
55
56/// In-memory utilities for testing and mocking
57pub mod memory;
58
59/// Task management utilities
60pub mod task;
61
62/// Codec for handling data
63pub mod codec;
64
65/// Sleep utilities
66#[cfg(feature = "sleep")]
67pub async fn sleep(duration: std::time::Duration) {
68    futures_timer::Delay::new(duration).await;
69}
70
71#[cfg(feature = "sleep")]
72/// Interval utilities
73pub mod interval {
74    use std::fmt;
75    use std::future::Future;
76    use std::pin::Pin;
77    use std::task::{Context, Poll};
78    use std::time::Duration;
79
80    use futures::future::BoxFuture;
81    use futures::Stream;
82
83    use crate::sleep;
84    /// Creates a new stream that yields at a set interval.
85    pub fn interval(duration: Duration) -> Interval {
86        Interval {
87            timer: Box::pin(sleep(duration)),
88            interval: duration,
89        }
90    }
91
92    /// A stream representing notifications at fixed interval
93    #[must_use = "streams do nothing unless polled or .awaited"]
94    pub struct Interval {
95        timer: BoxFuture<'static, ()>,
96        interval: Duration,
97    }
98
99    impl fmt::Debug for Interval {
100        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101            f.debug_struct("Interval")
102                .field("interval", &self.interval)
103                .field("timer", &"a future represented `apalis_core::sleep`")
104                .finish()
105        }
106    }
107
108    impl Stream for Interval {
109        type Item = ();
110
111        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112            match Pin::new(&mut self.timer).poll(cx) {
113                Poll::Ready(_) => {}
114                Poll::Pending => return Poll::Pending,
115            };
116            let interval = self.interval;
117            let fut = std::mem::replace(&mut self.timer, Box::pin(sleep(interval)));
118            drop(fut);
119            Poll::Ready(Some(()))
120        }
121    }
122}
123
124#[cfg(feature = "test-utils")]
125/// Test utilities that allows you to test backends
126pub mod test_utils {
127    use crate::backend::Backend;
128    use crate::error::BoxDynError;
129    use crate::request::Request;
130    use crate::task::task_id::TaskId;
131    use crate::worker::{Worker, WorkerId};
132    use futures::channel::mpsc::{channel, Receiver, Sender};
133    use futures::future::BoxFuture;
134    use futures::stream::{Stream, StreamExt};
135    use futures::{Future, FutureExt, SinkExt};
136    use std::fmt::Debug;
137    use std::marker::PhantomData;
138    use std::ops::{Deref, DerefMut};
139    use std::pin::Pin;
140    use std::task::{Context, Poll};
141    use tower::{Layer, Service};
142
143    /// Define a dummy service
144    #[derive(Debug, Clone)]
145    pub struct DummyService;
146
147    impl<Request: Send + 'static> Service<Request> for DummyService {
148        type Response = Request;
149        type Error = std::convert::Infallible;
150        type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
151
152        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153            Poll::Ready(Ok(()))
154        }
155
156        fn call(&mut self, req: Request) -> Self::Future {
157            let fut = async move { Ok(req) };
158            Box::pin(fut)
159        }
160    }
161
162    /// A generic backend wrapper that polls and executes jobs
163    #[derive(Debug)]
164    pub struct TestWrapper<B, Req, Res> {
165        stop_tx: Sender<()>,
166        res_rx: Receiver<(TaskId, Result<String, String>)>,
167        _p: PhantomData<Req>,
168        _r: PhantomData<Res>,
169        backend: B,
170    }
171    /// A test wrapper to allow you to test without requiring a worker.
172    /// Important for testing backends and jobs
173    /// # Example
174    /// ```no_run
175    /// #[cfg(tests)]
176    /// mod tests {
177    ///    use crate::{
178    ///        error::Error, memory::MemoryStorage, mq::MessageQueue, service_fn::service_fn,
179    ///    };
180    ///
181    ///    use super::*;
182    ///
183    ///    async fn is_even(req: usize) -> Result<(), Error> {
184    ///        if req % 2 == 0 {
185    ///            Ok(())
186    ///        } else {
187    ///            Err(Error::Abort("Not an even number".to_string()))
188    ///        }
189    ///    }
190    ///
191    ///    #[tokio::test]
192    ///    async fn test_accepts_even() {
193    ///        let backend = MemoryStorage::new();
194    ///        let (mut tester, poller) = TestWrapper::new_with_service(backend, service_fn(is_even));
195    ///        tokio::spawn(poller);
196    ///        tester.enqueue(42usize).await.unwrap();
197    ///        assert_eq!(tester.size().await.unwrap(), 1);
198    ///        let (_, resp) = tester.execute_next().await;
199    ///        assert_eq!(resp, Ok("()".to_string()));
200    ///    }
201    ///}
202    /// ````
203    impl<B, Req, Res, Ctx> TestWrapper<B, Request<Req, Ctx>, Res>
204    where
205        B: Backend<Request<Req, Ctx>, Res> + Send + Sync + 'static + Clone,
206        Req: Send + 'static,
207        Ctx: Send,
208        B::Stream: Send + 'static,
209        B::Stream: Stream<Item = Result<Option<Request<Req, Ctx>>, crate::error::Error>> + Unpin,
210    {
211        /// Build a new instance provided a custom service
212        pub fn new_with_service<S>(backend: B, service: S) -> (Self, BoxFuture<'static, ()>)
213        where
214            S: Service<Request<Req, Ctx>, Response = Res> + Send + 'static,
215            B::Layer: Layer<S>,
216            <<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<S>>::Service:
217                Service<Request<Req, Ctx>> + Send + 'static,
218            <<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<S>>::Service as Service<
219                Request<Req, Ctx>,
220            >>::Response: Send + Debug,
221            <<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<S>>::Service as Service<
222                Request<Req, Ctx>,
223            >>::Error: Send + Into<BoxDynError> + Sync,
224            <<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<S>>::Service as Service<
225                Request<Req, Ctx>,
226            >>::Future: Send + 'static,
227        {
228            let worker_id = WorkerId::new("test-worker");
229            let worker = Worker::new(worker_id, crate::worker::Context::default());
230            worker.start();
231            let b = backend.clone();
232            let mut poller = b.poll::<S>(&worker);
233            let (stop_tx, mut stop_rx) = channel::<()>(1);
234
235            let (mut res_tx, res_rx) = channel(10);
236
237            let mut service = poller.layer.layer(service);
238
239            let poller = async move {
240                let heartbeat = poller.heartbeat.shared();
241                loop {
242                    futures::select! {
243
244                        item = poller.stream.next().fuse() => match item {
245                            Some(Ok(Some(req))) => {
246                                let task_id = req.parts.task_id.clone();
247                                match service.call(req).await {
248                                    Ok(res) => {
249                                        res_tx.send((task_id, Ok(format!("{res:?}")))).await.unwrap();
250                                    },
251                                    Err(err) => {
252                                        res_tx.send((task_id, Err(err.into().to_string()))).await.unwrap();
253                                    }
254                                }
255                            }
256                            Some(Ok(None)) | None => break,
257                            Some(Err(_e)) => {
258                                // handle error
259                                break;
260                            }
261                        },
262                        _ = stop_rx.next().fuse() => break,
263                        _ = heartbeat.clone().fuse() => {
264
265                        },
266                    }
267                }
268            };
269            (
270                TestWrapper {
271                    stop_tx,
272                    res_rx,
273                    _p: PhantomData,
274                    backend,
275                    _r: PhantomData,
276                },
277                poller.boxed(),
278            )
279        }
280
281        /// Stop polling
282        pub fn stop(mut self) {
283            self.stop_tx.try_send(()).unwrap();
284        }
285
286        /// Gets the current state of results
287        pub async fn execute_next(&mut self) -> (TaskId, Result<String, String>) {
288            self.res_rx.next().await.unwrap()
289        }
290    }
291
292    impl<B, Req, Res, Ctx> Deref for TestWrapper<B, Request<Req, Ctx>, Res>
293    where
294        B: Backend<Request<Req, Ctx>, Res>,
295    {
296        type Target = B;
297
298        fn deref(&self) -> &Self::Target {
299            &self.backend
300        }
301    }
302
303    impl<B, Req, Ctx, Res> DerefMut for TestWrapper<B, Request<Req, Ctx>, Res>
304    where
305        B: Backend<Request<Req, Ctx>, Res>,
306    {
307        fn deref_mut(&mut self) -> &mut Self::Target {
308            &mut self.backend
309        }
310    }
311
312    pub use tower::service_fn as apalis_test_service_fn;
313
314    #[macro_export]
315    /// Tests a generic mq
316    macro_rules! test_message_queue {
317        ($backend_instance:expr) => {
318            #[tokio::test]
319            async fn it_works_as_an_mq_backend() {
320                let backend = $backend_instance;
321                let service = apalis_test_service_fn(|request: Request<u32, ()>| async {
322                    Ok::<_, io::Error>(request)
323                });
324                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
325                tokio::spawn(poller);
326                t.enqueue(1).await.unwrap();
327                tokio::time::sleep(Duration::from_secs(1)).await;
328                let _res = t.execute_next().await;
329                // assert_eq!(res.len(), 1); // One job is done
330            }
331        };
332    }
333    #[macro_export]
334    /// Tests a generic storage
335    macro_rules! generic_storage_test {
336        ($setup:path ) => {
337            #[tokio::test]
338            async fn integration_test_storage_push_and_consume() {
339                let backend = $setup().await;
340                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
341                    Ok::<_, io::Error>(request.args)
342                });
343                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
344                tokio::spawn(poller);
345                let res = t.len().await.unwrap();
346                assert_eq!(res, 0); // No jobs
347                t.push(1).await.unwrap();
348                let res = t.len().await.unwrap();
349                assert_eq!(res, 1); // A job exists
350                let res = t.execute_next().await;
351                assert_eq!(res.1, Ok("1".to_owned()));
352                // TODO: all storages need to satisfy this rule, redis does not
353                // let res = t.len().await.unwrap();
354                // assert_eq!(res, 0);
355                t.vacuum().await.unwrap();
356            }
357            #[tokio::test]
358            async fn integration_test_storage_vacuum() {
359                let backend = $setup().await;
360                let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
361                    Ok::<_, io::Error>(request.args)
362                });
363                let (mut t, poller) = TestWrapper::new_with_service(backend, service);
364                tokio::spawn(poller);
365                let res = t.len().await.unwrap();
366                assert_eq!(res, 0); // No jobs
367                t.push(1).await.unwrap();
368                let res = t.len().await.unwrap();
369                assert_eq!(res, 1); // A job exists
370                let res = t.execute_next().await;
371                assert_eq!(res.1, Ok("1".to_owned()));
372                t.vacuum().await.unwrap();
373                let res = t.len().await.unwrap();
374                assert_eq!(res, 0); // After vacuuming, there should be nothing
375            }
376        };
377    }
378}