1#![crate_name = "apalis_core"]
2#![warn(
3 missing_debug_implementations,
4 missing_docs,
5 rust_2018_idioms,
6 unreachable_pub,
7 bad_style,
8 dead_code,
9 improper_ctypes,
10 non_shorthand_field_patterns,
11 no_mangle_generic_items,
12 overflowing_literals,
13 path_statements,
14 patterns_in_fns_without_body,
15 unconditional_recursion,
16 unused,
17 unused_allocation,
18 unused_comparisons,
19 unused_parens,
20 while_true
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23pub mod builder;
27
28pub mod backend;
30pub mod error;
32pub mod layers;
34pub mod monitor;
36pub mod request;
38pub mod response;
40pub mod service_fn;
42pub mod storage;
44pub mod worker;
46
47pub mod data;
49pub mod mq;
51pub mod notify;
53pub mod poller;
55
56pub mod memory;
58
59pub mod task;
61
62pub mod codec;
64
65#[cfg(feature = "sleep")]
67pub async fn sleep(duration: std::time::Duration) {
68 futures_timer::Delay::new(duration).await;
69}
70
71#[cfg(feature = "sleep")]
72pub mod interval {
74 use std::fmt;
75 use std::future::Future;
76 use std::pin::Pin;
77 use std::task::{Context, Poll};
78 use std::time::Duration;
79
80 use futures::future::BoxFuture;
81 use futures::Stream;
82
83 use crate::sleep;
84 pub fn interval(duration: Duration) -> Interval {
86 Interval {
87 timer: Box::pin(sleep(duration)),
88 interval: duration,
89 }
90 }
91
92 #[must_use = "streams do nothing unless polled or .awaited"]
94 pub struct Interval {
95 timer: BoxFuture<'static, ()>,
96 interval: Duration,
97 }
98
99 impl fmt::Debug for Interval {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 f.debug_struct("Interval")
102 .field("interval", &self.interval)
103 .field("timer", &"a future represented `apalis_core::sleep`")
104 .finish()
105 }
106 }
107
108 impl Stream for Interval {
109 type Item = ();
110
111 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112 match Pin::new(&mut self.timer).poll(cx) {
113 Poll::Ready(_) => {}
114 Poll::Pending => return Poll::Pending,
115 };
116 let interval = self.interval;
117 let fut = std::mem::replace(&mut self.timer, Box::pin(sleep(interval)));
118 drop(fut);
119 Poll::Ready(Some(()))
120 }
121 }
122}
123
124#[cfg(feature = "test-utils")]
125pub mod test_utils {
127 use crate::backend::Backend;
128 use crate::error::BoxDynError;
129 use crate::request::Request;
130 use crate::task::task_id::TaskId;
131 use crate::worker::{Worker, WorkerId};
132 use futures::channel::mpsc::{channel, Receiver, Sender};
133 use futures::future::BoxFuture;
134 use futures::stream::{Stream, StreamExt};
135 use futures::{Future, FutureExt, SinkExt};
136 use std::fmt::Debug;
137 use std::marker::PhantomData;
138 use std::ops::{Deref, DerefMut};
139 use std::pin::Pin;
140 use std::task::{Context, Poll};
141 use tower::{Layer, Service};
142
143 #[derive(Debug, Clone)]
145 pub struct DummyService;
146
147 impl<Request: Send + 'static> Service<Request> for DummyService {
148 type Response = Request;
149 type Error = std::convert::Infallible;
150 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
151
152 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153 Poll::Ready(Ok(()))
154 }
155
156 fn call(&mut self, req: Request) -> Self::Future {
157 let fut = async move { Ok(req) };
158 Box::pin(fut)
159 }
160 }
161
162 #[derive(Debug)]
164 pub struct TestWrapper<B, Req, Res> {
165 stop_tx: Sender<()>,
166 res_rx: Receiver<(TaskId, Result<String, String>)>,
167 _p: PhantomData<Req>,
168 _r: PhantomData<Res>,
169 backend: B,
170 }
171 impl<B, Req, Res, Ctx> TestWrapper<B, Request<Req, Ctx>, Res>
204 where
205 B: Backend<Request<Req, Ctx>, Res> + Send + Sync + 'static + Clone,
206 Req: Send + 'static,
207 Ctx: Send,
208 B::Stream: Send + 'static,
209 B::Stream: Stream<Item = Result<Option<Request<Req, Ctx>>, crate::error::Error>> + Unpin,
210 {
211 pub fn new_with_service<S>(backend: B, service: S) -> (Self, BoxFuture<'static, ()>)
213 where
214 S: Service<Request<Req, Ctx>, Response = Res> + Send + 'static,
215 B::Layer: Layer<S>,
216 <<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<S>>::Service:
217 Service<Request<Req, Ctx>> + Send + 'static,
218 <<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<S>>::Service as Service<
219 Request<Req, Ctx>,
220 >>::Response: Send + Debug,
221 <<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<S>>::Service as Service<
222 Request<Req, Ctx>,
223 >>::Error: Send + Into<BoxDynError> + Sync,
224 <<<B as Backend<Request<Req, Ctx>, Res>>::Layer as Layer<S>>::Service as Service<
225 Request<Req, Ctx>,
226 >>::Future: Send + 'static,
227 {
228 let worker_id = WorkerId::new("test-worker");
229 let worker = Worker::new(worker_id, crate::worker::Context::default());
230 worker.start();
231 let b = backend.clone();
232 let mut poller = b.poll::<S>(&worker);
233 let (stop_tx, mut stop_rx) = channel::<()>(1);
234
235 let (mut res_tx, res_rx) = channel(10);
236
237 let mut service = poller.layer.layer(service);
238
239 let poller = async move {
240 let heartbeat = poller.heartbeat.shared();
241 loop {
242 futures::select! {
243
244 item = poller.stream.next().fuse() => match item {
245 Some(Ok(Some(req))) => {
246 let task_id = req.parts.task_id.clone();
247 match service.call(req).await {
248 Ok(res) => {
249 res_tx.send((task_id, Ok(format!("{res:?}")))).await.unwrap();
250 },
251 Err(err) => {
252 res_tx.send((task_id, Err(err.into().to_string()))).await.unwrap();
253 }
254 }
255 }
256 Some(Ok(None)) | None => break,
257 Some(Err(_e)) => {
258 break;
260 }
261 },
262 _ = stop_rx.next().fuse() => break,
263 _ = heartbeat.clone().fuse() => {
264
265 },
266 }
267 }
268 };
269 (
270 TestWrapper {
271 stop_tx,
272 res_rx,
273 _p: PhantomData,
274 backend,
275 _r: PhantomData,
276 },
277 poller.boxed(),
278 )
279 }
280
281 pub fn stop(mut self) {
283 self.stop_tx.try_send(()).unwrap();
284 }
285
286 pub async fn execute_next(&mut self) -> (TaskId, Result<String, String>) {
288 self.res_rx.next().await.unwrap()
289 }
290 }
291
292 impl<B, Req, Res, Ctx> Deref for TestWrapper<B, Request<Req, Ctx>, Res>
293 where
294 B: Backend<Request<Req, Ctx>, Res>,
295 {
296 type Target = B;
297
298 fn deref(&self) -> &Self::Target {
299 &self.backend
300 }
301 }
302
303 impl<B, Req, Ctx, Res> DerefMut for TestWrapper<B, Request<Req, Ctx>, Res>
304 where
305 B: Backend<Request<Req, Ctx>, Res>,
306 {
307 fn deref_mut(&mut self) -> &mut Self::Target {
308 &mut self.backend
309 }
310 }
311
312 pub use tower::service_fn as apalis_test_service_fn;
313
314 #[macro_export]
315 macro_rules! test_message_queue {
317 ($backend_instance:expr) => {
318 #[tokio::test]
319 async fn it_works_as_an_mq_backend() {
320 let backend = $backend_instance;
321 let service = apalis_test_service_fn(|request: Request<u32, ()>| async {
322 Ok::<_, io::Error>(request)
323 });
324 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
325 tokio::spawn(poller);
326 t.enqueue(1).await.unwrap();
327 tokio::time::sleep(Duration::from_secs(1)).await;
328 let _res = t.execute_next().await;
329 }
331 };
332 }
333 #[macro_export]
334 macro_rules! generic_storage_test {
336 ($setup:path ) => {
337 #[tokio::test]
338 async fn integration_test_storage_push_and_consume() {
339 let backend = $setup().await;
340 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
341 Ok::<_, io::Error>(request.args)
342 });
343 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
344 tokio::spawn(poller);
345 let res = t.len().await.unwrap();
346 assert_eq!(res, 0); t.push(1).await.unwrap();
348 let res = t.len().await.unwrap();
349 assert_eq!(res, 1); let res = t.execute_next().await;
351 assert_eq!(res.1, Ok("1".to_owned()));
352 t.vacuum().await.unwrap();
356 }
357 #[tokio::test]
358 async fn integration_test_storage_vacuum() {
359 let backend = $setup().await;
360 let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
361 Ok::<_, io::Error>(request.args)
362 });
363 let (mut t, poller) = TestWrapper::new_with_service(backend, service);
364 tokio::spawn(poller);
365 let res = t.len().await.unwrap();
366 assert_eq!(res, 0); t.push(1).await.unwrap();
368 let res = t.len().await.unwrap();
369 assert_eq!(res, 1); let res = t.execute_next().await;
371 assert_eq!(res.1, Ok("1".to_owned()));
372 t.vacuum().await.unwrap();
373 let res = t.len().await.unwrap();
374 assert_eq!(res, 0); }
376 };
377 }
378}