apalis_core/worker/
builder.rs

1//! Builder types for composing and building workers.
2//!
3//! The `WorkerBuilder` component is the recommended
4//! way to construct [`Worker`] instances in a flexible and
5//! composable manner.
6//!
7//! The builder pattern enables customization of various parts of a worker,
8//! in the following order:
9//!
10//! 1. Setting a backend that implements the [`Backend`] trait
11//! 2. Adding application state via [`Data`](crate::task::data)
12//! 3. Decorating the service pipeline with middleware
13//! 4. Handling lifecycle events with `on_event`
14//! 5. Providing task processing logic using [`build`](WorkerBuilder::build) that implements [`IntoWorkerService`].
15//!
16//! The [`IntoWorkerService`] trait can be used to convert a function or a service into a worker service. The following implementations are provided:
17//! - For async functions via [`task_fn`](crate::task_fn::task_fn)
18//! - For any type that implements the [`Service`] trait for `T: Task`
19//! - For workflows via [`apalis-workflow`](https://docs.rs/apalis-workflow)
20//!
21//! ## Basic usage
22//!
23//! ```rust,no_run
24//! # use apalis_core::worker::builder::WorkerBuilder;
25//! # use apalis_core::backend::memory::MemoryStorage;
26//! # use apalis_core::worker::context::WorkerContext;
27//! # use apalis_core::task::data::Data;
28//! # use apalis_core::backend::TaskSink;
29//! # use apalis_core::worker::ext::event_listener::EventListenerExt;
30//!
31//! # #[tokio::main]
32//! # async fn main() {
33//! # let mut in_memory = MemoryStorage::new();
34//! # in_memory.push(24).await.unwrap();
35//! async fn task(job: u32, count: Data<usize>, ctx: WorkerContext) {
36//!     println!("Received job: {job:?}");
37//!     ctx.stop().unwrap();
38//! }
39//!
40//! let worker = WorkerBuilder::new("rango-tango")
41//!     .backend(in_memory)
42//!     .data(0usize)
43//!     .on_event(|ctx, ev| {
44//!         println!("On Event = {:?}", ev);
45//!     })
46//!     .build(task);
47//!
48//! worker.run().await.unwrap();
49//! # }
50//! ```
51//! ## Order
52//!
53//! The order in which you add layers affects how tasks are processed. Layers added earlier are wrapped by those added later.
54//!
55//! ### Why does order matter?
56//! Each layer wraps the previous one, so the outermost layer is applied last. This means that middleware added later can observe or modify the effects of earlier layers. For example, tracing added before retry will see all retries as a single operation, while tracing added after retry will log each retry attempt separately.
57//!
58//! For example:
59//! ```ignore
60//! WorkerBuilder::new()
61//!     .enable_tracing()
62//!     .retry(RetryPolicy::retries(3))
63//!     .build(task);
64//! ```
65//! In this case, tracing is applied before retry. The tracing span may not reflect the correct attempt count.
66//!
67//! Reversing the order:
68//! ```ignore
69//! WorkerBuilder::new()
70//!     .retry(RetryPolicy::retries(3))
71//!     .enable_tracing()
72//!     .build(task);
73//! ```
74//! Now, retry is applied first, and tracing wraps around it. The tracing span will correctly capture retries.
75//!
76//! **Tip:** Add layers in the order you want them to wrap task processing.
77use std::marker::PhantomData;
78use tower_layer::{Identity, Layer, Stack};
79use tower_service::Service;
80
81use crate::{
82    backend::Backend,
83    monitor::shutdown::Shutdown,
84    task::{Task, data::Data},
85    worker::{Worker, event::EventHandlerBuilder},
86};
87
88/// Declaratively builds a [`Worker`]
89pub struct WorkerBuilder<Args, Ctx, Source, Middleware> {
90    pub(crate) name: String,
91    pub(crate) request: PhantomData<(Args, Ctx)>,
92    pub(crate) layer: Middleware,
93    pub(crate) source: Source,
94    pub(crate) event_handler: EventHandlerBuilder,
95    pub(crate) shutdown: Option<Shutdown>,
96}
97
98impl<Args, Ctx, Source, Middleware> std::fmt::Debug
99    for WorkerBuilder<Args, Ctx, Source, Middleware>
100{
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("WorkerBuilder")
103            .field("id", &self.name)
104            .field("job", &std::any::type_name::<(Args, Ctx)>())
105            .field("layer", &std::any::type_name::<Middleware>())
106            .field("source", &std::any::type_name::<Source>())
107            .finish()
108    }
109}
110
111impl WorkerBuilder<(), (), (), Identity> {
112    /// Build a new [`WorkerBuilder`] instance with a name for the worker to build
113    pub fn new<T: AsRef<str>>(name: T) -> Self {
114        Self {
115            request: PhantomData,
116            layer: Identity::new(),
117            source: (),
118            name: name.as_ref().to_owned(),
119            event_handler: EventHandlerBuilder::default(),
120            shutdown: None,
121        }
122    }
123}
124
125impl WorkerBuilder<(), (), (), Identity> {
126    /// Set the source to a backend that implements [Backend]
127    pub fn backend<NB, NJ, Ctx>(self, backend: NB) -> WorkerBuilder<NJ, Ctx, NB, Identity>
128    where
129        NB: Backend<Args = NJ, Context = Ctx>,
130    {
131        WorkerBuilder {
132            request: PhantomData,
133            layer: self.layer,
134            source: backend,
135            name: self.name,
136            shutdown: self.shutdown,
137            event_handler: self.event_handler,
138        }
139    }
140}
141
142impl<Args, Ctx, M, B> WorkerBuilder<Args, Ctx, B, M>
143where
144    B: Backend<Args = Args>,
145{
146    /// Allows of decorating the service that consumes jobs.
147    /// Allows adding multiple middleware in one call
148    pub fn chain<NewLayer>(
149        self,
150        f: impl FnOnce(M) -> NewLayer,
151    ) -> WorkerBuilder<Args, Ctx, B, NewLayer> {
152        let middleware = f(self.layer);
153
154        WorkerBuilder {
155            request: self.request,
156            layer: middleware,
157            name: self.name,
158            source: self.source,
159            shutdown: self.shutdown,
160            event_handler: self.event_handler,
161        }
162    }
163    /// Allows adding middleware to the layer stack
164    pub fn layer<U>(self, layer: U) -> WorkerBuilder<Args, Ctx, B, Stack<U, M>> {
165        WorkerBuilder {
166            request: self.request,
167            source: self.source,
168            layer: Stack::new(layer, self.layer),
169            name: self.name,
170            shutdown: self.shutdown,
171            event_handler: self.event_handler,
172        }
173    }
174
175    /// Adds data to the context
176    /// This will be shared by all requests
177    pub fn data<D>(self, data: D) -> WorkerBuilder<Args, Ctx, B, Stack<Data<D>, M>>
178    where
179        M: Layer<Data<D>>,
180    {
181        WorkerBuilder {
182            request: self.request,
183            source: self.source,
184            layer: Stack::new(Data::new(data), self.layer),
185            name: self.name,
186            shutdown: self.shutdown,
187            event_handler: self.event_handler,
188        }
189    }
190}
191
192/// Finalizes the builder and constructs a [`Worker`] with the provided service
193impl<Args, Ctx, B, M> WorkerBuilder<Args, Ctx, B, M>
194where
195    B: Backend<Args = Args, Context = Ctx>,
196{
197    /// Consumes the builder and a service to construct the final worker
198    pub fn build<W: IntoWorkerServiceExt<Args, Ctx, Svc, B, M>, Svc>(
199        self,
200        service: W,
201    ) -> Worker<Args, Ctx, W::Backend, Svc, M>
202    where
203        Svc: Service<Task<Args, Ctx, B::IdType>>,
204    {
205        service.build_with(self)
206    }
207}
208
209/// A worker service composed of a backend and a service
210#[derive(Debug, Clone)]
211pub struct WorkerService<Backend, Svc> {
212    /// The backend for the worker
213    pub backend: Backend,
214    /// The service that processes tasks
215    pub service: Svc,
216}
217
218/// Trait for building a worker service provided a backend
219pub trait IntoWorkerService<B, Svc, Args, Ctx>
220where
221    B: crate::backend::Backend<Args = Args, Context = Ctx>,
222    Svc: Service<Task<Args, Ctx, B::IdType>>,
223{
224    /// The backend type for the worker
225    type Backend;
226    /// Build the service from the backend
227    fn into_service(self, backend: B) -> WorkerService<Self::Backend, Svc>;
228}
229
230/// Extension trait for building a worker from a builder
231pub trait IntoWorkerServiceExt<Args, Ctx, Svc, Backend, M>: Sized
232where
233    Backend: crate::backend::Backend<Args = Args, Context = Ctx>,
234    Svc: Service<Task<Args, Ctx, Backend::IdType>>,
235    Self: IntoWorkerService<Backend, Svc, Args, Ctx>,
236{
237    /// Consumes the builder and returns a worker
238    fn build_with(
239        self,
240        builder: WorkerBuilder<Args, Ctx, Backend, M>,
241    ) -> Worker<Args, Ctx, Self::Backend, Svc, M>;
242}
243
244/// Implementation of the IntoWorkerServiceExt trait for any type
245///
246/// Rust doest offer specialization yet, the [`IntoWorkerServiceExt`] and [`IntoWorkerService`]
247/// traits are used to allow the [build](WorkerBuilder::build) method to be more flexible.
248impl<T, Args, Ctx, Svc, B, M> IntoWorkerServiceExt<Args, Ctx, Svc, B, M> for T
249where
250    T: IntoWorkerService<B, Svc, Args, Ctx>,
251    B: Backend<Args = Args, Context = Ctx>,
252    Svc: Service<Task<Args, Ctx, B::IdType>>,
253{
254    fn build_with(
255        self,
256        builder: WorkerBuilder<Args, Ctx, B, M>,
257    ) -> Worker<Args, Ctx, T::Backend, Svc, M> {
258        let svc = self.into_service(builder.source);
259        let mut worker = Worker::new(builder.name, svc.backend, svc.service, builder.layer);
260        worker.event_handler = builder
261            .event_handler
262            .write()
263            .map(|mut d| d.take())
264            .unwrap()
265            .unwrap_or(Box::new(|_ctx, _e| {
266                debug!("Worker [{}] received event {_e}", _ctx.name());
267            }));
268        worker.shutdown = builder.shutdown;
269        worker
270    }
271}
272
273/// Module for validating task function implementations
274/// This module provides macros and utilities to ensure that task functions
275/// conform to the expected signatures and can be converted into worker services.
276#[cfg(feature = "test-utils")]
277pub mod task_fn_validator {
278    use crate::backend::Backend;
279    use crate::task::Task;
280    use tower_service::Service;
281
282    use crate::task_fn::{FromRequest, TaskFn};
283
284    /// Macro for implementing the check functions
285    macro_rules! impl_check_fn {
286        ($($num:tt => $($arg:ident),+);+ $(;)?) => {
287            $(
288                #[inline]
289                #[doc = concat!("A helper for checking that the builder can build a worker with the provided service (", stringify!($num), " arguments)")]
290                pub fn $num<
291                    F, B, Args, Ctx,
292                    $($arg: FromRequest<Task<Args, Ctx, B::IdType>>),+
293                >(
294                    _: F,
295                ) where
296                    TaskFn<F, Args, Ctx, ($($arg,)+)>: Service<Task<Args, Ctx, B::IdType>>,
297                    B: Backend<Args = Args>
298                {
299                }
300            )+
301        };
302    }
303
304    impl_check_fn! {
305        check_fn_1 => A1;
306        check_fn_2 => A1, A2;
307        check_fn_3 => A1, A2, A3;
308        check_fn_4 => A1, A2, A3, A4;
309        check_fn_5 => A1, A2, A3, A4, A5;
310        check_fn_6 => A1, A2, A3, A4, A5, A6;
311        check_fn_7 => A1, A2, A3, A4, A5, A6, A7;
312        check_fn_8 => A1, A2, A3, A4, A5, A6, A7, A8;
313        check_fn_9 => A1, A2, A3, A4, A5, A6, A7, A8, A9;
314        check_fn_10 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10;
315        check_fn_11 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11;
316        check_fn_12 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12;
317        check_fn_13 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13;
318        check_fn_14 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14;
319        check_fn_15 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15;
320        check_fn_16 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16;
321    }
322}