apalis_core/worker/
test_worker.rs

1//! Provides a worker that allows testing and debugging
2//!
3//! This module enables comprehensive testing of task services and backends by simulating
4//! a real worker’s lifecycle. It allows developers to push jobs to a backend, run them
5//! through a test worker, and capture the results for assertion without needing full runtime orchestration.
6//!
7//! # Features
8//! - Pluggable with any backend implementing [`Backend`].
9//! - Supports service functions created using [`Service`].
10//! - Captures task output through [`TestEmitService`] for validation.
11//!
12//! # Example
13//! ```rust
14//! # use apalis_core::{backend::memory::MemoryStorage, worker::test_worker::TestWorker};
15//! # use apalis_core::error::BoxDynError;
16//! async fn is_even(req: usize) -> Result<(), BoxDynError> {
17//!     if req % 2 == 0 {
18//!         Ok(())
19//!     } else {
20//!         Err("Not an even number".into())
21//!     }
22//! }
23//!
24//! #[tokio::test]
25//! async fn test_accepts_even() {
26//!     let backend = MemoryStorage::new();
27//!     backend.push(42usize).await.unwrap();
28//!     let mut worker = TestWorker::new(backend, is_even);
29//!
30//!     let (_task_id, resp) = worker.execute_next().await.unwrap().unwrap();
31//!     assert_eq!(resp, Ok("()".to_string()));
32//! }
33//! ```
34//!
35//! This module is intended for use in tests and local development.
36
37use crate::backend::Backend;
38use crate::error::BoxDynError;
39use crate::task::task_id::{RandomId, TaskId};
40use crate::task::Task;
41use crate::worker::builder::{IntoWorkerService, WorkerBuilder};
42use crate::worker::{Event, ReadinessService, TrackerService, WorkerError};
43use futures_channel::mpsc::{self, channel};
44use futures_core::future::BoxFuture;
45use futures_core::stream::BoxStream;
46use futures_util::{SinkExt, StreamExt};
47use std::fmt::{self, Debug};
48use std::future::Future;
49use std::marker::PhantomData;
50use std::task::{Context, Poll};
51use tower_layer::Layer;
52use tower_service::Service;
53
54/// A test worker to allow you to test services.
55/// Important for testing backends and tasks
56/// # Example
57/// ```
58/// mod tests {
59///    use apalis_core::{error::BoxDynError, backend::memory::MemoryStorage};
60///
61///    use super::*;
62///
63///    async fn is_even(req: usize) -> Result<(), BoxDynError> {
64///        if req % 2 == 0 {
65///            Ok(())
66///        } else {
67///            Err("Not an even number".into())
68///        }
69///    }
70///
71///    #[tokio::test]
72///    async fn test_accepts_even() {
73///        let mut backend = MemoryStorage::new();
74///        backend.push(42usize).await.unwrap();
75///        let mut worker = TestWorker::new(backend, is_even);
76///        let (_task_id, resp) = worker.execute_next().await.unwrap().unwrap();
77///        assert_eq!(resp, Ok("()".to_string()));
78///    }
79///}
80/// ````
81
82pub struct TestWorker<B, S, Res, IdType = RandomId> {
83    stream: BoxStream<'static, Result<(TaskId<IdType>, Result<Res, BoxDynError>), WorkerError>>,
84    backend: PhantomData<B>,
85    service: PhantomData<(S, Res)>,
86}
87
88impl<B, S, Res, IdType> fmt::Debug for TestWorker<B, S, Res, IdType> {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        f.debug_struct("TestWorker")
91            .field("stream", &"BoxStream<...>") // can't really debug streams
92            .field("backend", &std::any::type_name::<B>())
93            .field("service", &std::any::type_name::<(S, Res)>())
94            .field("id_type", &std::any::type_name::<IdType>())
95            .finish()
96    }
97}
98
99/// Utility for executing the next item in the queue
100pub trait ExecuteNext<Args, Ctx> {
101    /// The expected result from the provided service
102    type Result;
103    /// Allows the test worker to step to the next task
104    /// No polling is done in between calls
105    fn execute_next(&mut self) -> impl Future<Output = Self::Result> + Send;
106}
107
108impl<B, S, Args, Ctx, Res, IdType> ExecuteNext<Args, Ctx> for TestWorker<B, S, Res, IdType>
109where
110    S: Service<Task<Args, Ctx, IdType>, Response = Res> + Send + 'static,
111    B: Send,
112    Res: Send,
113{
114    type Result = Option<Result<(TaskId<IdType>, Result<Res, BoxDynError>), WorkerError>>;
115    async fn execute_next(&mut self) -> Self::Result {
116        self.stream.next().await
117    }
118}
119
120impl<B, S, Res> TestWorker<B, S, Res, ()> {
121    /// Create a new test worker
122    pub fn new<Args, Ctx, W>(backend: B, factory: W) -> TestWorker<B, S, Res, B::IdType>
123    where
124        W: IntoWorkerService<B, S, Args, Ctx>,
125        B: Backend<Args, Context = Ctx> + 'static,
126        S: Service<Task<Args, Ctx, B::IdType>, Response = Res> + Send + 'static,
127        B::Stream: Unpin + Send + 'static,
128        B::Beat: Unpin + Send + 'static,
129        Args: Send + 'static,
130        Ctx: Send + 'static,
131        B::Error: Into<BoxDynError> + Send + 'static,
132        B::Layer: Layer<ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>>,
133        S::Future: Send,
134        S::Error: Into<BoxDynError> + Send + Sync,
135        S::Response: Clone + Send,
136        Res: 'static,
137        <<B as Backend<Args>>::Layer as Layer<
138            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
139        >>::Service: Service<Task<Args, Ctx, B::IdType>>,
140        <<<B as Backend<Args>>::Layer as Layer<
141            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
142        >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Error: Into<BoxDynError> + Sync + Send,
143        <<<B as Backend<Args>>::Layer as Layer<
144            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
145        >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Future: Send,
146        <<B as Backend<Args>>::Layer as Layer<
147            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
148        >>::Service: std::marker::Send + 'static,
149        B::IdType: Send + Clone + 'static,
150    {
151        let service = factory.into_service(&backend);
152        TestWorker::new_with_svc(backend, service)
153    }
154
155    /// Create a new test worker with a service
156    pub fn new_with_svc<Args, Ctx>(backend: B, service: S) -> TestWorker<B, S, Res, B::IdType>
157    where
158        B: Backend<Args, Context = Ctx> + 'static,
159        S: Service<Task<Args, Ctx, B::IdType>, Response = Res> + Send + 'static,
160        B::Stream: Unpin + Send + 'static,
161        B::Beat: Unpin + Send + 'static,
162        Args: Send + 'static,
163        Ctx: Send + 'static,
164        B::Error: Into<BoxDynError> + Send + 'static,
165        B::Layer: Layer<ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>>,
166        S::Future: Send,
167        S::Error: Into<BoxDynError> + Send + Sync,
168        S::Response: Clone + Send,
169        Res: 'static,
170        <<B as Backend<Args>>::Layer as Layer<
171            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
172        >>::Service: Service<Task<Args, Ctx, B::IdType>>,
173        <<<B as Backend<Args>>::Layer as Layer<
174            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
175        >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Error: Into<BoxDynError> + Sync + Send,
176        <<<B as Backend<Args>>::Layer as Layer<
177            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
178        >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Future: Send,
179        <<B as Backend<Args>>::Layer as Layer<
180            ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
181        >>::Service: std::marker::Send + 'static,
182        B::IdType: Send + Clone + 'static,
183    {
184        enum Item<R, IdType> {
185            Ev(Result<Event, WorkerError>),
186            Res((TaskId<IdType>, Result<R, BoxDynError>)),
187        }
188        let (tx, rx) = channel(1);
189        let sender = tx.clone();
190        let service: TestEmitService<S, Res, B::IdType> = TestEmitService {
191            service,
192            tx: tx.clone(),
193        };
194        let stream = WorkerBuilder::new("test-worker")
195            .backend(backend)
196            .build(service)
197            .stream()
198            .map(|r| Item::Ev(r));
199        let task_stream = rx.map(|s| Item::Res(s));
200        let stream = futures_util::stream::select(task_stream, stream)
201            .filter_map(move |s| {
202                let mut tx = sender.clone();
203                async move {
204                    match s {
205                        Item::Ev(Err(e)) => {
206                            tx.close().await.unwrap();
207                            Some(Err(e))
208                        }
209                        Item::Ev(_) => None,
210                        Item::Res(r) => Some(Ok(r)),
211                    }
212                }
213            })
214            .boxed();
215        TestWorker {
216            stream,
217            service: PhantomData,
218            backend: PhantomData,
219        }
220    }
221}
222
223/// A generic service that emits the result of a test
224#[derive(Debug, Clone)]
225pub struct TestEmitService<S, Response, IdType> {
226    tx: mpsc::Sender<(TaskId<IdType>, Result<Response, BoxDynError>)>,
227    service: S,
228}
229
230impl<S, Args, Ctx, Res, IdType> Service<Task<Args, Ctx, IdType>> for TestEmitService<S, Res, IdType>
231where
232    S: Service<Task<Args, Ctx, IdType>, Response = Res> + Send + 'static,
233    S::Future: Send + 'static,
234    Args: Send + 'static,
235    Ctx: Send + 'static,
236    S::Response: Send + 'static,
237    S::Error: Into<BoxDynError> + Send,
238    IdType: Send + 'static + Clone,
239{
240    type Response = ();
241    type Error = String;
242    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
243
244    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
245        self.service
246            .poll_ready(cx)
247            .map_err(|e| e.into().to_string())
248    }
249
250    fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
251        let task_id = task.parts.task_id.clone().unwrap();
252        let mut tx = Clone::clone(&self.tx);
253        let fut = self.service.call(task);
254        Box::pin(async move {
255            let res = fut.await;
256            match res {
257                Ok(res) => {
258                    tx.send((task_id, Ok(res))).await.unwrap();
259                    Ok(())
260                }
261                Err(err) => {
262                    let e = err.into();
263                    let e_str = e.to_string();
264                    tx.send((task_id, Err(e))).await.unwrap();
265                    Err(e_str)
266                }
267            }
268        })
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use crate::{
275        backend::{memory::MemoryStorage, TaskSink},
276        error::BoxDynError,
277        task_fn::task_fn,
278        worker::{
279            test_worker::{ExecuteNext, TestWorker},
280            WorkerContext,
281        },
282    };
283    use std::time::Duration;
284
285    #[tokio::test]
286    async fn basic_worker() {
287        let mut backend = MemoryStorage::new();
288
289        for i in 0..=10 {
290            backend.push(i).await.unwrap();
291        }
292
293        let service = task_fn(|req: u32, w: WorkerContext| async move {
294            if req == 10 {
295                w.stop()?;
296            }
297            tokio::time::sleep(Duration::from_secs(1)).await;
298            Ok::<_, BoxDynError>(req)
299        });
300        let mut worker = TestWorker::new(backend, service);
301        while let Some(Ok((_, ret))) = worker.execute_next().await {
302            dbg!(ret.unwrap());
303        }
304        println!("Worker run successfully");
305    }
306}