Skip to main content

apalis_core/worker/
test_worker.rs

1//! Provides a worker that allows testing and debugging
2//!
3//! This module enables comprehensive testing of task services and backends by simulating
4//! a real worker’s lifecycle. It allows developers to push jobs to a backend, run them
5//! through a test worker, and capture the results for assertion without needing full runtime orchestration.
6//!
7//! # Features
8//! - Pluggable with any backend implementing [`Backend`].
9//! - Supports service functions created using [`Service`].
10//! - Captures task output through [`TestEmitService`] for validation.
11//!
12//! # Example
13//! ```rust
14//! # use apalis_core::{backend::memory::MemoryStorage, worker::test_worker::TestWorker};
15//! # use apalis_core::error::BoxDynError;
16//! async fn is_even(req: usize) -> Result<(), BoxDynError> {
17//!     if req % 2 == 0 {
18//!         Ok(())
19//!     } else {
20//!         Err("Not an even number".into())
21//!     }
22//! }
23//!
24//! #[tokio::test]
25//! async fn test_accepts_even() {
26//!     let backend = MemoryStorage::new();
27//!     backend.push(42usize).await.unwrap();
28//!     let mut worker = TestWorker::new(backend, is_even);
29//!
30//!     let (_task_id, resp) = worker.execute_next().await.unwrap().unwrap();
31//!     assert_eq!(resp, Ok("()".to_string()));
32//! }
33//! ```
34//!
35//! This module is intended for use in tests and local development.
36
37use crate::backend::Backend;
38use crate::error::BoxDynError;
39use crate::task::Task;
40use crate::task::task_id::{RandomId, TaskId};
41use crate::worker::builder::{IntoWorkerService, WorkerBuilder};
42use crate::worker::{Event, ReadinessService, TrackerService, WorkerError};
43use futures_channel::mpsc::{self, channel};
44use futures_core::future::BoxFuture;
45use futures_core::stream::BoxStream;
46use futures_util::{SinkExt, StreamExt};
47use std::fmt::{self, Debug};
48use std::future::Future;
49use std::marker::PhantomData;
50use std::task::{Context, Poll};
51use tower_layer::Layer;
52use tower_service::Service;
53
54type TestStream<IdType, Res> =
55    BoxStream<'static, Result<(TaskId<IdType>, Result<Res, BoxDynError>), WorkerError>>;
56/// A test worker to allow you to test services.
57/// Important for testing backends and tasks
58/// # Example
59/// ```
60/// mod tests {
61///    use apalis_core::{error::BoxDynError, backend::memory::MemoryStorage};
62///
63///    use super::*;
64///
65///    async fn is_even(req: usize) -> Result<(), BoxDynError> {
66///        if req % 2 == 0 {
67///            Ok(())
68///        } else {
69///            Err("Not an even number".into())
70///        }
71///    }
72///
73///    #[tokio::test]
74///    async fn test_accepts_even() {
75///        let mut backend = MemoryStorage::new();
76///        backend.push(42usize).await.unwrap();
77///        let mut worker = TestWorker::new(backend, is_even);
78///        let (_task_id, resp) = worker.execute_next().await.unwrap().unwrap();
79///        assert_eq!(resp, Ok("()".to_string()));
80///    }
81///}
82/// ````
83pub struct TestWorker<B, S, Res, IdType = RandomId> {
84    stream: TestStream<IdType, Res>,
85    backend: PhantomData<B>,
86    service: PhantomData<(S, Res)>,
87}
88
89impl<B, S, Res, IdType> fmt::Debug for TestWorker<B, S, Res, IdType> {
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        f.debug_struct("TestWorker")
92            .field("stream", &"BoxStream<...>") // can't really debug streams
93            .field("backend", &std::any::type_name::<B>())
94            .field("service", &std::any::type_name::<(S, Res)>())
95            .field("id_type", &std::any::type_name::<IdType>())
96            .finish()
97    }
98}
99
100/// Utility for executing the next item in the queue
101pub trait ExecuteNext<Args, Ctx> {
102    /// The expected result from the provided service
103    type Result;
104    /// Allows the test worker to step to the next task
105    /// No polling is done in between calls
106    fn execute_next(&mut self) -> impl Future<Output = Self::Result> + Send;
107}
108
109impl<B, S, Args, Ctx, Res, IdType> ExecuteNext<Args, Ctx> for TestWorker<B, S, Res, IdType>
110where
111    S: Service<Task<Args, Ctx, IdType>, Response = Res> + Send + 'static,
112    B: Send,
113    Res: Send,
114{
115    type Result = Option<Result<(TaskId<IdType>, Result<Res, BoxDynError>), WorkerError>>;
116    async fn execute_next(&mut self) -> Self::Result {
117        self.stream.next().await
118    }
119}
120
121impl<B, S, Res> TestWorker<B, S, Res, ()> {
122    /// Create a new test worker
123    pub fn new<Args, Ctx, W>(backend: B, factory: W) -> TestWorker<W::Backend, S, Res, B::IdType>
124    where
125        W: IntoWorkerService<B, S, Args, Ctx>,
126        W::Backend: Backend<
127                Args = B::Args,
128                Context = B::Context,
129                IdType = B::IdType,
130                Error = B::Error,
131                Stream = B::Stream,
132                Beat = B::Beat,
133                Layer = B::Layer,
134            > + 'static,
135        B: Backend<Args = Args, Context = Ctx> + 'static,
136        S: Service<Task<Args, Ctx, B::IdType>, Response = Res> + Send + 'static,
137        B::Stream: Unpin + Send + 'static,
138        B::Beat: Unpin + Send + 'static,
139        Args: Send + 'static,
140        Ctx: Send + 'static,
141        B::Error: Into<BoxDynError> + Send + 'static,
142        B::Layer: Layer<ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>>,
143        S::Future: Send,
144        S::Error: Into<BoxDynError> + Send + Sync,
145        S::Response: Clone + Send,
146        Res: 'static,
147        <<B as Backend>::Layer as Layer<
148            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
149        >>::Service: Service<Task<Args, Ctx, B::IdType>>,
150        <<<B as Backend>::Layer as Layer<
151            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
152        >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Error: Into<BoxDynError> + Sync + Send,
153        <<<B as Backend>::Layer as Layer<
154            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
155        >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Future: Send,
156        <<B as Backend>::Layer as Layer<
157            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
158        >>::Service: std::marker::Send + 'static,
159        B::IdType: Send + Clone + 'static,
160    {
161        let worker_service = factory.into_service(backend);
162        TestWorker::<W::Backend, S, Res, _>::new_with_svc(
163            worker_service.backend,
164            worker_service.service,
165        )
166    }
167}
168
169impl<B, S, Res> TestWorker<B, S, Res, ()> {
170    /// Create a new test worker with a service
171    pub fn new_with_svc<Args, Ctx>(backend: B, service: S) -> TestWorker<B, S, Res, B::IdType>
172    where
173        B: Backend<Args = Args, Context = Ctx> + 'static,
174        S: Service<Task<Args, Ctx, B::IdType>, Response = Res> + Send + 'static,
175        B::Stream: Unpin + Send + 'static,
176        B::Beat: Unpin + Send + 'static,
177        Args: Send + 'static,
178        Ctx: Send + 'static,
179        B::Error: Into<BoxDynError> + Send + 'static,
180        B::Layer: Layer<ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>>,
181        S::Future: Send,
182        S::Error: Into<BoxDynError> + Send + Sync,
183        S::Response: Clone + Send,
184        Res: 'static,
185        <<B as Backend>::Layer as Layer<
186            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
187        >>::Service: Service<Task<Args, Ctx, B::IdType>>,
188        <<<B as Backend>::Layer as Layer<
189            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
190        >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Error: Into<BoxDynError> + Sync + Send,
191        <<<B as Backend>::Layer as Layer<
192            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
193        >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Future: Send,
194        <<B as Backend>::Layer as Layer<
195            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
196        >>::Service: std::marker::Send + 'static,
197        B::IdType: Send + Clone + 'static,
198    {
199        enum Item<R, IdType> {
200            Ev(Result<Event, WorkerError>),
201            Res((TaskId<IdType>, Result<R, BoxDynError>)),
202        }
203        let (tx, rx) = channel(1);
204        let sender = tx.clone();
205        let service: TestEmitService<S, Res, B::IdType> = TestEmitService {
206            service,
207            tx: tx.clone(),
208        };
209        let stream = WorkerBuilder::new("test-worker")
210            .backend(backend)
211            .build(service)
212            .stream()
213            .map(|r| Item::Ev(r));
214        let task_stream = rx.map(|s| Item::Res(s));
215        let stream = futures_util::stream::select(task_stream, stream)
216            .filter_map(move |s| {
217                let mut tx = sender.clone();
218                async move {
219                    match s {
220                        Item::Ev(Err(e)) => {
221                            tx.close().await.unwrap();
222                            Some(Err(e))
223                        }
224                        Item::Ev(_) => None,
225                        Item::Res(r) => Some(Ok(r)),
226                    }
227                }
228            })
229            .boxed();
230        TestWorker {
231            stream,
232            service: PhantomData,
233            backend: PhantomData,
234        }
235    }
236}
237
238impl<B, S, Res, I> TestWorker<B, S, Res, I> {
239    /// Get the underlying stream
240    pub fn into_stream(self) -> TestStream<I, Res> {
241        self.stream
242    }
243}
244
245/// A generic service that emits the result of a test
246#[derive(Debug, Clone)]
247pub struct TestEmitService<S, Response, IdType> {
248    tx: mpsc::Sender<(TaskId<IdType>, Result<Response, BoxDynError>)>,
249    service: S,
250}
251
252impl<S, Args, Ctx, Res, IdType> Service<Task<Args, Ctx, IdType>> for TestEmitService<S, Res, IdType>
253where
254    S: Service<Task<Args, Ctx, IdType>, Response = Res> + Send + 'static,
255    S::Future: Send + 'static,
256    Args: Send + 'static,
257    Ctx: Send + 'static,
258    S::Response: Send + 'static,
259    S::Error: Into<BoxDynError> + Send,
260    IdType: Send + 'static + Clone,
261{
262    type Response = ();
263    type Error = String;
264    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
265
266    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
267        self.service
268            .poll_ready(cx)
269            .map_err(|e| e.into().to_string())
270    }
271
272    fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
273        let task_id = task.parts.task_id.clone().unwrap();
274        let mut tx = Clone::clone(&self.tx);
275        let fut = self.service.call(task);
276        Box::pin(async move {
277            let res = fut.await;
278            match res {
279                Ok(res) => {
280                    tx.send((task_id, Ok(res))).await.unwrap();
281                    Ok(())
282                }
283                Err(err) => {
284                    let e = err.into();
285                    let e_str = e.to_string();
286                    tx.send((task_id, Err(e))).await.unwrap();
287                    Err(e_str)
288                }
289            }
290        })
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use futures_util::StreamExt;
297
298    use crate::{
299        backend::{TaskSink, memory::MemoryStorage},
300        error::BoxDynError,
301        task_fn::task_fn,
302        worker::{
303            WorkerContext,
304            test_worker::{ExecuteNext, TestWorker},
305        },
306    };
307    use std::time::Duration;
308
309    #[tokio::test]
310    async fn basic_worker() {
311        let mut backend = MemoryStorage::new();
312
313        for i in 0..=10 {
314            backend.push(i).await.unwrap();
315        }
316
317        let service = task_fn(|req: u32, w: WorkerContext| async move {
318            if req == 10 {
319                w.stop()?;
320            }
321            tokio::time::sleep(Duration::from_secs(1)).await;
322            Ok::<_, BoxDynError>(req)
323        });
324        let mut worker = TestWorker::new(backend, service);
325        while let Some(Ok((_, ret))) = worker.execute_next().await {
326            ret.unwrap();
327        }
328        println!("Worker run successfully");
329    }
330
331    #[tokio::test]
332    async fn basic_worker_as_stream() {
333        let mut backend = MemoryStorage::new();
334
335        for i in 0..=10 {
336            backend.push(i).await.unwrap();
337        }
338
339        let service = task_fn(|req: u32, w: WorkerContext| async move {
340            if req == 10 {
341                w.stop()?;
342            }
343            tokio::time::sleep(Duration::from_secs(1)).await;
344            Ok::<_, BoxDynError>(req)
345        });
346        let worker = TestWorker::new(backend, service);
347        let mut result_stream = worker.into_stream();
348        while let Some(Ok((_, ret))) = result_stream.next().await {
349            ret.unwrap();
350        }
351        println!("Worker run successfully");
352    }
353}