apalis_core/worker/builder.rs
1//! Builder types for composing and building workers.
2//!
3//! The `WorkerBuilder` component is the recommended
4//! way to construct [`Worker`] instances in a flexible and
5//! composable manner.
6//!
7//! The builder pattern enables customization of various parts of a worker,
8//! in the following order:
9//!
10//! 1. Setting a backend that implements the [`Backend`] trait
11//! 2. Adding application state via [`Data`](crate::task::data)
12//! 3. Decorating the service pipeline with middleware
13//! 4. Handling lifecycle events with `on_event`
14//! 5. Providing task processing logic using [`build`](WorkerBuilder::build) that implements [`IntoWorkerService`].
15//!
16//! The [`IntoWorkerService`] trait can be used to convert a function or a service into a worker service. The following implementations are provided:
17//! - For async functions via [`task_fn`](crate::task_fn::task_fn)
18//! - For any type that implements the [`Service`] trait for `T: Task`
19//! - For workflows via [`apalis-workflow`](https://docs.rs/apalis-workflow)
20//!
21//! ## Basic usage
22//!
23//! ```rust,no_run
24//! # use apalis_core::worker::builder::WorkerBuilder;
25//! # use apalis_core::backend::memory::MemoryStorage;
26//! # use apalis_core::worker::context::WorkerContext;
27//! # use apalis_core::task::data::Data;
28//! # use apalis_core::backend::TaskSink;
29//! # use apalis_core::worker::ext::event_listener::EventListenerExt;
30//!
31//! # #[tokio::main]
32//! # async fn main() {
33//! # let mut in_memory = MemoryStorage::new();
34//! # in_memory.push(24).await.unwrap();
35//! async fn task(job: u32, count: Data<usize>, ctx: WorkerContext) {
36//! println!("Received job: {job:?}");
37//! ctx.stop().unwrap();
38//! }
39//!
40//! let worker = WorkerBuilder::new("rango-tango")
41//! .backend(in_memory)
42//! .data(0usize)
43//! .on_event(|ctx, ev| {
44//! println!("On Event = {:?}", ev);
45//! })
46//! .build(task);
47//!
48//! worker.run().await.unwrap();
49//! # }
50//! ```
51//! ## Order
52//!
53//! The order in which you add layers affects how tasks are processed. Layers added earlier are wrapped by those added later.
54//!
55//! ### Why does order matter?
56//! Each layer wraps the previous one, so the outermost layer is applied last. This means that middleware added later can observe or modify the effects of earlier layers. For example, tracing added before retry will see all retries as a single operation, while tracing added after retry will log each retry attempt separately.
57//!
58//! For example:
59//! ```ignore
60//! WorkerBuilder::new()
61//! .enable_tracing()
62//! .retry(RetryPolicy::retries(3))
63//! .build(task);
64//! ```
65//! In this case, tracing is applied before retry. The tracing span may not reflect the correct attempt count.
66//!
67//! Reversing the order:
68//! ```ignore
69//! WorkerBuilder::new()
70//! .retry(RetryPolicy::retries(3))
71//! .enable_tracing()
72//! .build(task);
73//! ```
74//! Now, retry is applied first, and tracing wraps around it. The tracing span will correctly capture retries.
75//!
76//! **Tip:** Add layers in the order you want them to wrap task processing.
77use std::marker::PhantomData;
78use tower_layer::{Identity, Layer, Stack};
79use tower_service::Service;
80
81use crate::{
82 backend::Backend,
83 monitor::shutdown::Shutdown,
84 task::{data::Data, Task},
85 task_fn::{FromRequest, TaskFn},
86 worker::{event::EventHandlerBuilder, Worker},
87};
88
89/// Declaratively builds a [`Worker`]
90pub struct WorkerBuilder<Args, Ctx, Source, Middleware> {
91 pub(crate) name: String,
92 pub(crate) request: PhantomData<(Args, Ctx)>,
93 pub(crate) layer: Middleware,
94 pub(crate) source: Source,
95 pub(crate) event_handler: EventHandlerBuilder,
96 pub(crate) shutdown: Option<Shutdown>,
97}
98
99impl<Args, Ctx, Source, Middleware> std::fmt::Debug
100 for WorkerBuilder<Args, Ctx, Source, Middleware>
101{
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct("WorkerBuilder")
104 .field("id", &self.name)
105 .field("job", &std::any::type_name::<(Args, Ctx)>())
106 .field("layer", &std::any::type_name::<Middleware>())
107 .field("source", &std::any::type_name::<Source>())
108 .finish()
109 }
110}
111
112impl WorkerBuilder<(), (), (), Identity> {
113 /// Build a new [`WorkerBuilder`] instance with a name for the worker to build
114 pub fn new<T: AsRef<str>>(name: T) -> WorkerBuilder<(), (), (), Identity> {
115 WorkerBuilder {
116 request: PhantomData,
117 layer: Identity::new(),
118 source: (),
119 name: name.as_ref().to_owned(),
120 event_handler: EventHandlerBuilder::default(),
121 shutdown: None,
122 }
123 }
124}
125
126impl WorkerBuilder<(), (), (), Identity> {
127 /// Set the source to a backend that implements [Backend]
128 pub fn backend<NB, NJ, Ctx>(self, backend: NB) -> WorkerBuilder<NJ, Ctx, NB, Identity>
129 where
130 NB: Backend<NJ, Context = Ctx>,
131 {
132 WorkerBuilder {
133 request: PhantomData,
134 layer: self.layer,
135 source: backend,
136 name: self.name,
137 shutdown: self.shutdown,
138 event_handler: self.event_handler,
139 }
140 }
141}
142
143impl<Args, Ctx, M, B> WorkerBuilder<Args, Ctx, B, M>
144where
145 B: Backend<Args>,
146{
147 /// Allows of decorating the service that consumes jobs.
148 /// Allows adding multiple middleware in one call
149 pub fn chain<NewLayer>(
150 self,
151 f: impl FnOnce(M) -> NewLayer,
152 ) -> WorkerBuilder<Args, Ctx, B, NewLayer> {
153 let middleware = f(self.layer);
154
155 WorkerBuilder {
156 request: self.request,
157 layer: middleware,
158 name: self.name,
159 source: self.source,
160 shutdown: self.shutdown,
161 event_handler: self.event_handler,
162 }
163 }
164 /// Allows adding middleware to the layer stack
165 pub fn layer<U>(self, layer: U) -> WorkerBuilder<Args, Ctx, B, Stack<U, M>> {
166 WorkerBuilder {
167 request: self.request,
168 source: self.source,
169 layer: Stack::new(layer, self.layer),
170 name: self.name,
171 shutdown: self.shutdown,
172 event_handler: self.event_handler,
173 }
174 }
175
176 /// Adds data to the context
177 /// This will be shared by all requests
178 pub fn data<D>(self, data: D) -> WorkerBuilder<Args, Ctx, B, Stack<Data<D>, M>>
179 where
180 M: Layer<Data<D>>,
181 {
182 WorkerBuilder {
183 request: self.request,
184 source: self.source,
185 layer: Stack::new(Data::new(data), self.layer),
186 name: self.name,
187 shutdown: self.shutdown,
188 event_handler: self.event_handler,
189 }
190 }
191
192 #[inline]
193 /// A helper for checking that the builder can build a worker with the provided service
194 pub fn build_check_fn<
195 F,
196 A1: FromRequest<Task<Args, Ctx, B::IdType>>,
197 A2: FromRequest<Task<Args, Ctx, B::IdType>>,
198 >(
199 self,
200 _: F,
201 ) where
202 TaskFn<F, Args, Ctx, (A1, A2)>: Service<Task<Args, Ctx, B::IdType>>,
203 {
204 }
205}
206
207/// Finalizes the builder and constructs a [`Worker`] with the provided service
208impl<Args, Ctx, B, M> WorkerBuilder<Args, Ctx, B, M>
209where
210 B: Backend<Args>,
211{
212 /// Consumes the builder and a service to construct the final worker
213 pub fn build<W: IntoWorkerServiceExt<Args, Ctx, Svc, B, M>, Svc>(
214 self,
215 service: W,
216 ) -> Worker<Args, Ctx, B, Svc, M>
217 where
218 Svc: Service<Task<Args, Ctx, B::IdType>>,
219 B: Backend<Args, Context = Ctx>,
220 {
221 service.build_with(self)
222 }
223}
224
225/// Trait for building a worker service provided a backend
226pub trait IntoWorkerService<Backend, Svc, Args, Ctx>
227where
228 Backend: crate::backend::Backend<Args, Context = Ctx>,
229 Svc: Service<Task<Args, Ctx, Backend::IdType>>,
230{
231 /// Build the service from the backend
232 fn into_service(self, backend: &Backend) -> Svc;
233}
234
235/// Extension trait for building a worker from a builder
236pub trait IntoWorkerServiceExt<Args, Ctx, Svc, Backend, M>: Sized
237where
238 Backend: crate::backend::Backend<Args, Context = Ctx>,
239 Svc: Service<Task<Args, Ctx, Backend::IdType>>,
240{
241 /// Consumes the builder and returns a worker
242 fn build_with(
243 self,
244 builder: WorkerBuilder<Args, Ctx, Backend, M>,
245 ) -> Worker<Args, Ctx, Backend, Svc, M>;
246}
247
248/// Implementation of the IntoWorkerServiceExt trait for any type
249///
250/// Rust doest offer specialization yet, the [`IntoWorkerServiceExt`] and [`IntoWorkerService`]
251/// traits are used to allow the [build](WorkerBuilder::build) method to be more flexible.
252impl<T, Args, Ctx, Svc, B, M> IntoWorkerServiceExt<Args, Ctx, Svc, B, M> for T
253where
254 T: IntoWorkerService<B, Svc, Args, Ctx>,
255 B: Backend<Args, Context = Ctx>,
256 Svc: Service<Task<Args, Ctx, B::IdType>>,
257{
258 fn build_with(self, builder: WorkerBuilder<Args, Ctx, B, M>) -> Worker<Args, Ctx, B, Svc, M> {
259 let svc = self.into_service(&builder.source);
260 let mut worker = Worker::new(builder.name, builder.source, svc, builder.layer);
261 worker.event_handler = builder
262 .event_handler
263 .write()
264 .map(|mut d| d.take())
265 .unwrap()
266 .unwrap_or(Box::new(|_ctx, _e| {
267 trace!("Worker [{}] received event {_e}", _ctx.name());
268 }));
269 worker.shutdown = builder.shutdown;
270 worker
271 }
272}