Skip to main content

apalis_core/worker/
builder.rs

1//! Builder types for composing and building workers.
2//!
3//! The `WorkerBuilder` component is the recommended
4//! way to construct [`Worker`] instances in a flexible and
5//! composable manner.
6//!
7//! The builder pattern enables customization of various parts of a worker,
8//! in the following order:
9//!
10//! 1. Setting a backend that implements the [`Backend`] trait
11//! 2. Adding application state via [`Data`](crate::task::data)
12//! 3. Decorating the service pipeline with middleware
13//! 4. Handling lifecycle events with `on_event`
14//! 5. Providing task processing logic using [`build`](WorkerBuilder::build) that implements [`IntoWorkerService`].
15//!
16//! The [`IntoWorkerService`] trait can be used to convert a function or a service into a worker service. The following implementations are provided:
17//! - For async functions via [`task_fn`](crate::task_fn::task_fn)
18//! - For any type that implements the [`Service`] trait for `T: Task`
19//! - For workflows via [`apalis-workflow`](https://docs.rs/apalis-workflow)
20//!
21//! ## Basic usage
22//!
23//! ```rust,no_run
24//! # use apalis_core::worker::builder::WorkerBuilder;
25//! # use apalis_core::backend::memory::MemoryStorage;
26//! # use apalis_core::worker::context::WorkerContext;
27//! # use apalis_core::task::data::Data;
28//! # use apalis_core::backend::TaskSink;
29//! # use apalis_core::worker::ext::event_listener::EventListenerExt;
30//!
31//! # #[tokio::main]
32//! # async fn main() {
33//! # let mut in_memory = MemoryStorage::new();
34//! # in_memory.push(24).await.unwrap();
35//! async fn task(job: u32, count: Data<usize>, ctx: WorkerContext) {
36//!     println!("Received job: {job:?}");
37//!     ctx.stop().unwrap();
38//! }
39//!
40//! let worker = WorkerBuilder::new("rango-tango")
41//!     .backend(in_memory)
42//!     .data(0usize)
43//!     .on_event(|ctx, ev| {
44//!         println!("On Event = {:?}", ev);
45//!     })
46//!     .build(task);
47//!
48//! worker.run().await.unwrap();
49//! # }
50//! ```
51//! ## Order
52//!
53//! The order in which you add layers affects how tasks are processed. Layers added earlier are wrapped by those added later.
54//!
55//! ### Why does order matter?
56//! Each layer wraps the previous one, so the outermost layer is applied last. This means that middleware added later can observe or modify the effects of earlier layers. For example, tracing added before retry will see all retries as a single operation, while tracing added after retry will log each retry attempt separately.
57//!
58//! For example:
59//! ```ignore
60//! WorkerBuilder::new()
61//!     .enable_tracing()
62//!     .retry(RetryPolicy::retries(3))
63//!     .build(task);
64//! ```
65//! In this case, tracing is applied before retry. The tracing span may not reflect the correct attempt count.
66//!
67//! Reversing the order:
68//! ```ignore
69//! WorkerBuilder::new()
70//!     .retry(RetryPolicy::retries(3))
71//!     .enable_tracing()
72//!     .build(task);
73//! ```
74//! Now, retry is applied first, and tracing wraps around it. The tracing span will correctly capture retries.
75//!
76//! **Tip:** Add layers in the order you want them to wrap task processing.
77use std::marker::PhantomData;
78use tower_layer::{Identity, Layer, Stack};
79use tower_service::Service;
80
81use crate::{
82    backend::Backend,
83    monitor::shutdown::Shutdown,
84    task::{Task, data::Data},
85    worker::{Worker, event::EventHandlerBuilder},
86};
87
88/// Declaratively builds a [`Worker`]
89pub struct WorkerBuilder<Args, Ctx, Source, Middleware> {
90    pub(crate) name: String,
91    pub(crate) request: PhantomData<(Args, Ctx)>,
92    pub(crate) layer: Middleware,
93    pub(crate) source: Source,
94    pub(crate) event_handler: EventHandlerBuilder,
95    pub(crate) shutdown: Option<Shutdown>,
96}
97
98impl<Args, Ctx, Source, Middleware> std::fmt::Debug
99    for WorkerBuilder<Args, Ctx, Source, Middleware>
100{
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("WorkerBuilder")
103            .field("id", &self.name)
104            .field("job", &std::any::type_name::<(Args, Ctx)>())
105            .field("layer", &std::any::type_name::<Middleware>())
106            .field("source", &std::any::type_name::<Source>())
107            .finish()
108    }
109}
110
111impl WorkerBuilder<(), (), (), Identity> {
112    /// Build a new [`WorkerBuilder`] instance with a name for the worker to build
113    pub fn new<T: AsRef<str>>(name: T) -> Self {
114        Self {
115            request: PhantomData,
116            layer: Identity::new(),
117            source: (),
118            name: name.as_ref().to_owned(),
119            event_handler: EventHandlerBuilder::default(),
120            shutdown: None,
121        }
122    }
123}
124
125impl WorkerBuilder<(), (), (), Identity> {
126    /// Set the source to a backend that implements [Backend]
127    pub fn backend<NB, NJ, Ctx>(self, backend: NB) -> WorkerBuilder<NJ, Ctx, NB, Identity>
128    where
129        NB: Backend<Args = NJ, Context = Ctx>,
130    {
131        WorkerBuilder {
132            request: PhantomData,
133            layer: self.layer,
134            source: backend,
135            name: self.name,
136            shutdown: self.shutdown,
137            event_handler: self.event_handler,
138        }
139    }
140}
141
142impl<Args, Ctx, M, B> WorkerBuilder<Args, Ctx, B, M>
143where
144    B: Backend<Args = Args>,
145{
146    /// Allows of decorating the service that consumes jobs.
147    /// Allows adding multiple middleware in one call
148    pub fn chain<NewLayer>(
149        self,
150        f: impl FnOnce(M) -> NewLayer,
151    ) -> WorkerBuilder<Args, Ctx, B, NewLayer> {
152        let middleware = f(self.layer);
153
154        WorkerBuilder {
155            request: self.request,
156            layer: middleware,
157            name: self.name,
158            source: self.source,
159            shutdown: self.shutdown,
160            event_handler: self.event_handler,
161        }
162    }
163    /// Allows adding middleware to the layer stack
164    pub fn layer<U>(self, layer: U) -> WorkerBuilder<Args, Ctx, B, Stack<U, M>> {
165        WorkerBuilder {
166            request: self.request,
167            source: self.source,
168            layer: Stack::new(layer, self.layer),
169            name: self.name,
170            shutdown: self.shutdown,
171            event_handler: self.event_handler,
172        }
173    }
174
175    /// Adds data to the context
176    ///
177    /// This will be shared by all requests
178    pub fn data<D>(self, data: D) -> WorkerBuilder<Args, Ctx, B, Stack<Data<D>, M>>
179    where
180        M: Layer<Data<D>>,
181    {
182        WorkerBuilder {
183            request: self.request,
184            source: self.source,
185            layer: Stack::new(Data::new(data), self.layer),
186            name: self.name,
187            shutdown: self.shutdown,
188            event_handler: self.event_handler,
189        }
190    }
191}
192
193/// Finalizes the builder and constructs a [`Worker`] with the provided service
194impl<Args, Ctx, B, M> WorkerBuilder<Args, Ctx, B, M>
195where
196    B: Backend<Args = Args, Context = Ctx>,
197{
198    /// Consumes the builder and a service to construct the final worker
199    pub fn build<W: IntoWorkerServiceExt<Args, Ctx, Svc, B, M>, Svc>(
200        self,
201        service: W,
202    ) -> Worker<Args, Ctx, W::Backend, Svc, M>
203    where
204        Svc: Service<Task<Args, Ctx, B::IdType>>,
205    {
206        service.build_with(self)
207    }
208}
209
210/// A worker service composed of a backend and a service
211#[derive(Debug, Clone)]
212pub struct WorkerService<Backend, Svc> {
213    /// The backend for the worker
214    pub backend: Backend,
215    /// The service that processes tasks
216    pub service: Svc,
217}
218
219/// Trait for building a worker service provided a backend
220pub trait IntoWorkerService<B, Svc, Args, Ctx>
221where
222    B: crate::backend::Backend<Args = Args, Context = Ctx>,
223    Svc: Service<Task<Args, Ctx, B::IdType>>,
224{
225    /// The backend type for the worker
226    type Backend;
227    /// Build the service from the backend
228    fn into_service(self, backend: B) -> WorkerService<Self::Backend, Svc>;
229}
230
231/// Extension trait for building a worker from a builder
232pub trait IntoWorkerServiceExt<Args, Ctx, Svc, Backend, M>: Sized
233where
234    Backend: crate::backend::Backend<Args = Args, Context = Ctx>,
235    Svc: Service<Task<Args, Ctx, Backend::IdType>>,
236    Self: IntoWorkerService<Backend, Svc, Args, Ctx>,
237{
238    /// Consumes the builder and returns a worker
239    fn build_with(
240        self,
241        builder: WorkerBuilder<Args, Ctx, Backend, M>,
242    ) -> Worker<Args, Ctx, Self::Backend, Svc, M>;
243}
244
245/// Implementation of the IntoWorkerServiceExt trait for any type
246///
247/// Rust doest offer specialization yet, the [`IntoWorkerServiceExt`] and [`IntoWorkerService`]
248/// traits are used to allow the [build](WorkerBuilder::build) method to be more flexible.
249impl<T, Args, Ctx, Svc, B, M> IntoWorkerServiceExt<Args, Ctx, Svc, B, M> for T
250where
251    T: IntoWorkerService<B, Svc, Args, Ctx>,
252    B: Backend<Args = Args, Context = Ctx>,
253    Svc: Service<Task<Args, Ctx, B::IdType>>,
254{
255    fn build_with(
256        self,
257        builder: WorkerBuilder<Args, Ctx, B, M>,
258    ) -> Worker<Args, Ctx, T::Backend, Svc, M> {
259        let svc = self.into_service(builder.source);
260        let mut worker = Worker::new(builder.name, svc.backend, svc.service, builder.layer);
261        worker.event_handler = builder
262            .event_handler
263            .write()
264            .map(|mut d| d.take())
265            .unwrap()
266            .unwrap_or(Box::new(|_ctx, _e| {
267                debug!("Worker [{}] received event {_e}", _ctx.name());
268            }));
269        worker.shutdown = builder.shutdown;
270        worker
271    }
272}
273
274/// Module for validating task function implementations
275/// This module provides macros and utilities to ensure that task functions
276/// conform to the expected signatures and can be converted into worker services.
277#[cfg(feature = "test-utils")]
278pub mod task_fn_validator {
279    use crate::backend::Backend;
280    use crate::task::Task;
281    use tower_service::Service;
282
283    use crate::task_fn::{FromRequest, TaskFn};
284
285    /// Macro for implementing the check functions
286    macro_rules! impl_check_fn {
287        ($($num:tt => $($arg:ident),+);+ $(;)?) => {
288            $(
289                #[inline]
290                #[doc = concat!("A helper for checking that the builder can build a worker with the provided service (", stringify!($num), " arguments)")]
291                pub fn $num<
292                    F, B, Args, Ctx,
293                    $($arg: FromRequest<Task<Args, Ctx, B::IdType>>),+
294                >(
295                    _: F,
296                ) where
297                    TaskFn<F, Args, Ctx, ($($arg,)+)>: Service<Task<Args, Ctx, B::IdType>>,
298                    B: Backend<Args = Args>
299                {
300                }
301            )+
302        };
303    }
304
305    impl_check_fn! {
306        check_fn_1 => A1;
307        check_fn_2 => A1, A2;
308        check_fn_3 => A1, A2, A3;
309        check_fn_4 => A1, A2, A3, A4;
310        check_fn_5 => A1, A2, A3, A4, A5;
311        check_fn_6 => A1, A2, A3, A4, A5, A6;
312        check_fn_7 => A1, A2, A3, A4, A5, A6, A7;
313        check_fn_8 => A1, A2, A3, A4, A5, A6, A7, A8;
314        check_fn_9 => A1, A2, A3, A4, A5, A6, A7, A8, A9;
315        check_fn_10 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10;
316        check_fn_11 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11;
317        check_fn_12 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12;
318        check_fn_13 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13;
319        check_fn_14 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14;
320        check_fn_15 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15;
321        check_fn_16 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16;
322    }
323}