apalis_core/worker/
builder.rs

1//! Builder types for composing and building workers.
2//!
3//! The `WorkerBuilder` component is the recommended
4//! way to construct [`Worker`] instances in a flexible and
5//! composable manner.
6//!
7//! The builder pattern enables customization of various parts of a worker,
8//! in the following order:
9//!
10//! 1. Setting a backend that implements the [`Backend`] trait
11//! 2. Adding application state via [`Data`](crate::task::data)
12//! 3. Decorating the service pipeline with middleware
13//! 4. Handling lifecycle events with `on_event`
14//! 5. Providing task processing logic using [`build`](WorkerBuilder::build) that implements [`IntoWorkerService`].
15//!
16//! The [`IntoWorkerService`] trait can be used to convert a function or a service into a worker service. The following implementations are provided:
17//! - For async functions via [`task_fn`](crate::task_fn::task_fn)
18//! - For any type that implements the [`Service`] trait for `T: Task`
19//! - For workflows via [`apalis-workflow`](https://docs.rs/apalis-workflow)
20//!
21//! ## Basic usage
22//!
23//! ```rust,no_run
24//! # use apalis_core::worker::builder::WorkerBuilder;
25//! # use apalis_core::backend::memory::MemoryStorage;
26//! # use apalis_core::worker::context::WorkerContext;
27//! # use apalis_core::task::data::Data;
28//! # use apalis_core::backend::TaskSink;
29//! # use apalis_core::worker::ext::event_listener::EventListenerExt;
30//!
31//! # #[tokio::main]
32//! # async fn main() {
33//! # let mut in_memory = MemoryStorage::new();
34//! # in_memory.push(24).await.unwrap();
35//! async fn task(job: u32, count: Data<usize>, ctx: WorkerContext) {
36//!     println!("Received job: {job:?}");
37//!     ctx.stop().unwrap();
38//! }
39//!
40//! let worker = WorkerBuilder::new("rango-tango")
41//!     .backend(in_memory)
42//!     .data(0usize)
43//!     .on_event(|ctx, ev| {
44//!         println!("On Event = {:?}", ev);
45//!     })
46//!     .build(task);
47//!
48//! worker.run().await.unwrap();
49//! # }
50//! ```
51//! ## Order
52//!
53//! The order in which you add layers affects how tasks are processed. Layers added earlier are wrapped by those added later.
54//!
55//! ### Why does order matter?
56//! Each layer wraps the previous one, so the outermost layer is applied last. This means that middleware added later can observe or modify the effects of earlier layers. For example, tracing added before retry will see all retries as a single operation, while tracing added after retry will log each retry attempt separately.
57//!
58//! For example:
59//! ```ignore
60//! WorkerBuilder::new()
61//!     .enable_tracing()
62//!     .retry(RetryPolicy::retries(3))
63//!     .build(task);
64//! ```
65//! In this case, tracing is applied before retry. The tracing span may not reflect the correct attempt count.
66//!
67//! Reversing the order:
68//! ```ignore
69//! WorkerBuilder::new()
70//!     .retry(RetryPolicy::retries(3))
71//!     .enable_tracing()
72//!     .build(task);
73//! ```
74//! Now, retry is applied first, and tracing wraps around it. The tracing span will correctly capture retries.
75//!
76//! **Tip:** Add layers in the order you want them to wrap task processing.
77use std::marker::PhantomData;
78use tower_layer::{Identity, Layer, Stack};
79use tower_service::Service;
80
81use crate::{
82    backend::Backend,
83    monitor::shutdown::Shutdown,
84    task::{data::Data, Task},
85    task_fn::{FromRequest, TaskFn},
86    worker::{event::EventHandlerBuilder, Worker},
87};
88
89/// Declaratively builds a [`Worker`]
90pub struct WorkerBuilder<Args, Ctx, Source, Middleware> {
91    pub(crate) name: String,
92    pub(crate) request: PhantomData<(Args, Ctx)>,
93    pub(crate) layer: Middleware,
94    pub(crate) source: Source,
95    pub(crate) event_handler: EventHandlerBuilder,
96    pub(crate) shutdown: Option<Shutdown>,
97}
98
99impl<Args, Ctx, Source, Middleware> std::fmt::Debug
100    for WorkerBuilder<Args, Ctx, Source, Middleware>
101{
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct("WorkerBuilder")
104            .field("id", &self.name)
105            .field("job", &std::any::type_name::<(Args, Ctx)>())
106            .field("layer", &std::any::type_name::<Middleware>())
107            .field("source", &std::any::type_name::<Source>())
108            .finish()
109    }
110}
111
112impl WorkerBuilder<(), (), (), Identity> {
113    /// Build a new [`WorkerBuilder`] instance with a name for the worker to build
114    pub fn new<T: AsRef<str>>(name: T) -> WorkerBuilder<(), (), (), Identity> {
115        WorkerBuilder {
116            request: PhantomData,
117            layer: Identity::new(),
118            source: (),
119            name: name.as_ref().to_owned(),
120            event_handler: EventHandlerBuilder::default(),
121            shutdown: None,
122        }
123    }
124}
125
126impl WorkerBuilder<(), (), (), Identity> {
127    /// Set the source to a backend that implements [Backend]
128    pub fn backend<NB, NJ, Ctx>(self, backend: NB) -> WorkerBuilder<NJ, Ctx, NB, Identity>
129    where
130        NB: Backend<NJ, Context = Ctx>,
131    {
132        WorkerBuilder {
133            request: PhantomData,
134            layer: self.layer,
135            source: backend,
136            name: self.name,
137            shutdown: self.shutdown,
138            event_handler: self.event_handler,
139        }
140    }
141}
142
143impl<Args, Ctx, M, B> WorkerBuilder<Args, Ctx, B, M>
144where
145    B: Backend<Args>,
146{
147    /// Allows of decorating the service that consumes jobs.
148    /// Allows adding multiple middleware in one call
149    pub fn chain<NewLayer>(
150        self,
151        f: impl FnOnce(M) -> NewLayer,
152    ) -> WorkerBuilder<Args, Ctx, B, NewLayer> {
153        let middleware = f(self.layer);
154
155        WorkerBuilder {
156            request: self.request,
157            layer: middleware,
158            name: self.name,
159            source: self.source,
160            shutdown: self.shutdown,
161            event_handler: self.event_handler,
162        }
163    }
164    /// Allows adding middleware to the layer stack
165    pub fn layer<U>(self, layer: U) -> WorkerBuilder<Args, Ctx, B, Stack<U, M>> {
166        WorkerBuilder {
167            request: self.request,
168            source: self.source,
169            layer: Stack::new(layer, self.layer),
170            name: self.name,
171            shutdown: self.shutdown,
172            event_handler: self.event_handler,
173        }
174    }
175
176    /// Adds data to the context
177    /// This will be shared by all requests
178    pub fn data<D>(self, data: D) -> WorkerBuilder<Args, Ctx, B, Stack<Data<D>, M>>
179    where
180        M: Layer<Data<D>>,
181    {
182        WorkerBuilder {
183            request: self.request,
184            source: self.source,
185            layer: Stack::new(Data::new(data), self.layer),
186            name: self.name,
187            shutdown: self.shutdown,
188            event_handler: self.event_handler,
189        }
190    }
191
192    #[inline]
193    /// A helper for checking that the builder can build a worker with the provided service
194    pub fn build_check_fn<
195        F,
196        A1: FromRequest<Task<Args, Ctx, B::IdType>>,
197        A2: FromRequest<Task<Args, Ctx, B::IdType>>,
198    >(
199        self,
200        _: F,
201    ) where
202        TaskFn<F, Args, Ctx, (A1, A2)>: Service<Task<Args, Ctx, B::IdType>>,
203    {
204    }
205}
206
207/// Finalizes the builder and constructs a [`Worker`] with the provided service
208impl<Args, Ctx, B, M> WorkerBuilder<Args, Ctx, B, M>
209where
210    B: Backend<Args>,
211{
212    /// Consumes the builder and a service to construct the final worker
213    pub fn build<W: IntoWorkerServiceExt<Args, Ctx, Svc, B, M>, Svc>(
214        self,
215        service: W,
216    ) -> Worker<Args, Ctx, B, Svc, M>
217    where
218        Svc: Service<Task<Args, Ctx, B::IdType>>,
219        B: Backend<Args, Context = Ctx>,
220    {
221        service.build_with(self)
222    }
223}
224
225/// Trait for building a worker service provided a backend
226pub trait IntoWorkerService<Backend, Svc, Args, Ctx>
227where
228    Backend: crate::backend::Backend<Args, Context = Ctx>,
229    Svc: Service<Task<Args, Ctx, Backend::IdType>>,
230{
231    /// Build the service from the backend
232    fn into_service(self, backend: &Backend) -> Svc;
233}
234
235/// Extension trait for building a worker from a builder
236pub trait IntoWorkerServiceExt<Args, Ctx, Svc, Backend, M>: Sized
237where
238    Backend: crate::backend::Backend<Args, Context = Ctx>,
239    Svc: Service<Task<Args, Ctx, Backend::IdType>>,
240{
241    /// Consumes the builder and returns a worker
242    fn build_with(
243        self,
244        builder: WorkerBuilder<Args, Ctx, Backend, M>,
245    ) -> Worker<Args, Ctx, Backend, Svc, M>;
246}
247
248/// Implementation of the IntoWorkerServiceExt trait for any type
249///
250/// Rust doest offer specialization yet, the [`IntoWorkerServiceExt`] and [`IntoWorkerService`]
251/// traits are used to allow the [build](WorkerBuilder::build) method to be more flexible.
252impl<T, Args, Ctx, Svc, B, M> IntoWorkerServiceExt<Args, Ctx, Svc, B, M> for T
253where
254    T: IntoWorkerService<B, Svc, Args, Ctx>,
255    B: Backend<Args, Context = Ctx>,
256    Svc: Service<Task<Args, Ctx, B::IdType>>,
257{
258    fn build_with(self, builder: WorkerBuilder<Args, Ctx, B, M>) -> Worker<Args, Ctx, B, Svc, M> {
259        let svc = self.into_service(&builder.source);
260        let mut worker = Worker::new(builder.name, builder.source, svc, builder.layer);
261        worker.event_handler = builder
262            .event_handler
263            .write()
264            .map(|mut d| d.take())
265            .unwrap()
266            .unwrap_or(Box::new(|_ctx, _e| {
267                trace!("Worker [{}] received event {_e}", _ctx.name());
268            }));
269        worker.shutdown = builder.shutdown;
270        worker
271    }
272}