apalis_core/worker/
builder.rs

1//! Builder types for composing and building workers.
2//!
3//! The `WorkerBuilder` component is the recommended
4//! way to construct [`Worker`] instances in a flexible and
5//! composable manner.
6//!
7//! The builder pattern enables customization of various parts of a worker,
8//! in the following order:
9//!
10//! 1. Setting a backend that implements the [`Backend`] trait
11//! 2. Adding application state via [`Data`](crate::task::data)
12//! 3. Decorating the service pipeline with middleware
13//! 4. Handling lifecycle events with `on_event`
14//! 5. Providing task processing logic using [`build`](WorkerBuilder::build) that implements [`IntoWorkerService`].
15//!
16//! The [`IntoWorkerService`] trait can be used to convert a function or a service into a worker service. The following implementations are provided:
17//! - For async functions via [`task_fn`](crate::task_fn::task_fn)
18//! - For any type that implements the [`Service`] trait for `T: Task`
19//! - For workflows via [`apalis-workflow`](https://docs.rs/apalis-workflow)
20//!
21//! ## Basic usage
22//!
23//! ```rust,no_run
24//! # use apalis_core::worker::builder::WorkerBuilder;
25//! # use apalis_core::backend::memory::MemoryStorage;
26//! # use apalis_core::worker::context::WorkerContext;
27//! # use apalis_core::task::data::Data;
28//! # use apalis_core::backend::TaskSink;
29//! # use apalis_core::worker::ext::event_listener::EventListenerExt;
30//!
31//! # #[tokio::main]
32//! # async fn main() {
33//! # let mut in_memory = MemoryStorage::new();
34//! # in_memory.push(24).await.unwrap();
35//! async fn task(job: u32, count: Data<usize>, ctx: WorkerContext) {
36//!     println!("Received job: {job:?}");
37//!     ctx.stop().unwrap();
38//! }
39//!
40//! let worker = WorkerBuilder::new("rango-tango")
41//!     .backend(in_memory)
42//!     .data(0usize)
43//!     .on_event(|ctx, ev| {
44//!         println!("On Event = {:?}", ev);
45//!     })
46//!     .build(task);
47//!
48//! worker.run().await.unwrap();
49//! # }
50//! ```
51//! ## Order
52//!
53//! The order in which you add layers affects how tasks are processed. Layers added earlier are wrapped by those added later.
54//!
55//! ### Why does order matter?
56//! Each layer wraps the previous one, so the outermost layer is applied last. This means that middleware added later can observe or modify the effects of earlier layers. For example, tracing added before retry will see all retries as a single operation, while tracing added after retry will log each retry attempt separately.
57//!
58//! For example:
59//! ```ignore
60//! WorkerBuilder::new()
61//!     .enable_tracing()
62//!     .retry(RetryPolicy::retries(3))
63//!     .build(task);
64//! ```
65//! In this case, tracing is applied before retry. The tracing span may not reflect the correct attempt count.
66//!
67//! Reversing the order:
68//! ```ignore
69//! WorkerBuilder::new()
70//!     .retry(RetryPolicy::retries(3))
71//!     .enable_tracing()
72//!     .build(task);
73//! ```
74//! Now, retry is applied first, and tracing wraps around it. The tracing span will correctly capture retries.
75//!
76//! **Tip:** Add layers in the order you want them to wrap task processing.
77use std::marker::PhantomData;
78use tower_layer::{Identity, Layer, Stack};
79use tower_service::Service;
80
81use crate::{
82    backend::Backend,
83    monitor::shutdown::Shutdown,
84    task::{Task, data::Data},
85    worker::{Worker, event::EventHandlerBuilder},
86};
87
88/// Declaratively builds a [`Worker`]
89pub struct WorkerBuilder<Args, Ctx, Source, Middleware> {
90    pub(crate) name: String,
91    pub(crate) request: PhantomData<(Args, Ctx)>,
92    pub(crate) layer: Middleware,
93    pub(crate) source: Source,
94    pub(crate) event_handler: EventHandlerBuilder,
95    pub(crate) shutdown: Option<Shutdown>,
96}
97
98impl<Args, Ctx, Source, Middleware> std::fmt::Debug
99    for WorkerBuilder<Args, Ctx, Source, Middleware>
100{
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("WorkerBuilder")
103            .field("id", &self.name)
104            .field("job", &std::any::type_name::<(Args, Ctx)>())
105            .field("layer", &std::any::type_name::<Middleware>())
106            .field("source", &std::any::type_name::<Source>())
107            .finish()
108    }
109}
110
111impl WorkerBuilder<(), (), (), Identity> {
112    /// Build a new [`WorkerBuilder`] instance with a name for the worker to build
113    pub fn new<T: AsRef<str>>(name: T) -> WorkerBuilder<(), (), (), Identity> {
114        WorkerBuilder {
115            request: PhantomData,
116            layer: Identity::new(),
117            source: (),
118            name: name.as_ref().to_owned(),
119            event_handler: EventHandlerBuilder::default(),
120            shutdown: None,
121        }
122    }
123}
124
125impl WorkerBuilder<(), (), (), Identity> {
126    /// Set the source to a backend that implements [Backend]
127    pub fn backend<NB, NJ, Ctx>(self, backend: NB) -> WorkerBuilder<NJ, Ctx, NB, Identity>
128    where
129        NB: Backend<Args = NJ, Context = Ctx>,
130    {
131        WorkerBuilder {
132            request: PhantomData,
133            layer: self.layer,
134            source: backend,
135            name: self.name,
136            shutdown: self.shutdown,
137            event_handler: self.event_handler,
138        }
139    }
140}
141
142impl<Args, Ctx, M, B> WorkerBuilder<Args, Ctx, B, M>
143where
144    B: Backend<Args = Args>,
145{
146    /// Allows of decorating the service that consumes jobs.
147    /// Allows adding multiple middleware in one call
148    pub fn chain<NewLayer>(
149        self,
150        f: impl FnOnce(M) -> NewLayer,
151    ) -> WorkerBuilder<Args, Ctx, B, NewLayer> {
152        let middleware = f(self.layer);
153
154        WorkerBuilder {
155            request: self.request,
156            layer: middleware,
157            name: self.name,
158            source: self.source,
159            shutdown: self.shutdown,
160            event_handler: self.event_handler,
161        }
162    }
163    /// Allows adding middleware to the layer stack
164    pub fn layer<U>(self, layer: U) -> WorkerBuilder<Args, Ctx, B, Stack<U, M>> {
165        WorkerBuilder {
166            request: self.request,
167            source: self.source,
168            layer: Stack::new(layer, self.layer),
169            name: self.name,
170            shutdown: self.shutdown,
171            event_handler: self.event_handler,
172        }
173    }
174
175    /// Adds data to the context
176    /// This will be shared by all requests
177    pub fn data<D>(self, data: D) -> WorkerBuilder<Args, Ctx, B, Stack<Data<D>, M>>
178    where
179        M: Layer<Data<D>>,
180    {
181        WorkerBuilder {
182            request: self.request,
183            source: self.source,
184            layer: Stack::new(Data::new(data), self.layer),
185            name: self.name,
186            shutdown: self.shutdown,
187            event_handler: self.event_handler,
188        }
189    }
190}
191
192/// Finalizes the builder and constructs a [`Worker`] with the provided service
193impl<Args, Ctx, B, M> WorkerBuilder<Args, Ctx, B, M>
194where
195    B: Backend<Args = Args, Context = Ctx>,
196{
197    /// Consumes the builder and a service to construct the final worker
198    pub fn build<W: IntoWorkerServiceExt<Args, Ctx, Svc, B, M>, Svc>(
199        self,
200        service: W,
201    ) -> Worker<Args, Ctx, B, Svc, M>
202    where
203        Svc: Service<Task<Args, Ctx, B::IdType>>,
204    {
205        service.build_with(self)
206    }
207}
208
209/// Trait for building a worker service provided a backend
210pub trait IntoWorkerService<Backend, Svc, Args, Ctx>
211where
212    Backend: crate::backend::Backend<Args = Args, Context = Ctx>,
213    Svc: Service<Task<Args, Ctx, Backend::IdType>>,
214{
215    /// Build the service from the backend
216    fn into_service(self, backend: &Backend) -> Svc;
217}
218
219/// Extension trait for building a worker from a builder
220pub trait IntoWorkerServiceExt<Args, Ctx, Svc, Backend, M>: Sized
221where
222    Backend: crate::backend::Backend<Args = Args, Context = Ctx>,
223    Svc: Service<Task<Args, Ctx, Backend::IdType>>,
224{
225    /// Consumes the builder and returns a worker
226    fn build_with(
227        self,
228        builder: WorkerBuilder<Args, Ctx, Backend, M>,
229    ) -> Worker<Args, Ctx, Backend, Svc, M>;
230}
231
232/// Implementation of the IntoWorkerServiceExt trait for any type
233///
234/// Rust doest offer specialization yet, the [`IntoWorkerServiceExt`] and [`IntoWorkerService`]
235/// traits are used to allow the [build](WorkerBuilder::build) method to be more flexible.
236impl<T, Args, Ctx, Svc, B, M> IntoWorkerServiceExt<Args, Ctx, Svc, B, M> for T
237where
238    T: IntoWorkerService<B, Svc, Args, Ctx>,
239    B: Backend<Args = Args, Context = Ctx>,
240    Svc: Service<Task<Args, Ctx, B::IdType>>,
241{
242    fn build_with(self, builder: WorkerBuilder<Args, Ctx, B, M>) -> Worker<Args, Ctx, B, Svc, M> {
243        let svc = self.into_service(&builder.source);
244        let mut worker = Worker::new(builder.name, builder.source, svc, builder.layer);
245        worker.event_handler = builder
246            .event_handler
247            .write()
248            .map(|mut d| d.take())
249            .unwrap()
250            .unwrap_or(Box::new(|_ctx, _e| {
251                debug!("Worker [{}] received event {_e}", _ctx.name());
252            }));
253        worker.shutdown = builder.shutdown;
254        worker
255    }
256}
257
258/// Module for validating task function implementations
259/// This module provides macros and utilities to ensure that task functions
260/// conform to the expected signatures and can be converted into worker services.
261#[cfg(feature = "test-utils")]
262pub mod task_fn_validator {
263    use crate::backend::Backend;
264    use crate::task::Task;
265    use tower_service::Service;
266
267    use crate::task_fn::{FromRequest, TaskFn};
268
269    /// Macro for implementing the check functions
270    macro_rules! impl_check_fn {
271        ($($num:tt => $($arg:ident),+);+ $(;)?) => {
272            $(
273                #[inline]
274                #[doc = concat!("A helper for checking that the builder can build a worker with the provided service (", stringify!($num), " arguments)")]
275                pub fn $num<
276                    F, B, Args, Ctx,
277                    $($arg: FromRequest<Task<Args, Ctx, B::IdType>>),+
278                >(
279                    _: F,
280                ) where
281                    TaskFn<F, Args, Ctx, ($($arg,)+)>: Service<Task<Args, Ctx, B::IdType>>,
282                    B: Backend<Args = Args>
283                {
284                }
285            )+
286        };
287    }
288
289    impl_check_fn! {
290        check_fn_1 => A1;
291        check_fn_2 => A1, A2;
292        check_fn_3 => A1, A2, A3;
293        check_fn_4 => A1, A2, A3, A4;
294        check_fn_5 => A1, A2, A3, A4, A5;
295        check_fn_6 => A1, A2, A3, A4, A5, A6;
296        check_fn_7 => A1, A2, A3, A4, A5, A6, A7;
297        check_fn_8 => A1, A2, A3, A4, A5, A6, A7, A8;
298        check_fn_9 => A1, A2, A3, A4, A5, A6, A7, A8, A9;
299        check_fn_10 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10;
300        check_fn_11 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11;
301        check_fn_12 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12;
302        check_fn_13 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13;
303        check_fn_14 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14;
304        check_fn_15 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15;
305        check_fn_16 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16;
306    }
307}