apalis_core/
builder.rs

1use std::marker::PhantomData;
2
3use futures::Stream;
4use tower::{
5    layer::util::{Identity, Stack},
6    Layer, Service, ServiceBuilder,
7};
8
9use crate::{
10    backend::Backend,
11    error::Error,
12    layers::extensions::Data,
13    request::Request,
14    service_fn::service_fn,
15    service_fn::ServiceFn,
16    worker::{Ready, Worker, WorkerId},
17};
18
19/// Allows building a [`Worker`].
20/// Usually the output is [`Worker<Ready>`]
21pub struct WorkerBuilder<Req, Ctx, Source, Middleware, Serv> {
22    pub(crate) id: WorkerId,
23    pub(crate) request: PhantomData<Request<Req, Ctx>>,
24    pub(crate) layer: ServiceBuilder<Middleware>,
25    pub(crate) source: Source,
26    service: PhantomData<Serv>,
27}
28
29impl<Req, Ctx, Source, Middleware, Serv> std::fmt::Debug
30    for WorkerBuilder<Req, Ctx, Source, Middleware, Serv>
31{
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        f.debug_struct("WorkerBuilder")
34            .field("id", &self.id)
35            .field("job", &std::any::type_name::<Req>())
36            .field("layer", &std::any::type_name::<Middleware>())
37            .field("source", &std::any::type_name::<Source>())
38            .finish()
39    }
40}
41
42impl<Serv> WorkerBuilder<(), (), (), Identity, Serv> {
43    /// Build a new [`WorkerBuilder`] instance with a name for the worker to build
44    pub fn new<T: AsRef<str>>(name: T) -> WorkerBuilder<(), (), (), Identity, Serv> {
45        let job: PhantomData<Request<(), ()>> = PhantomData;
46        WorkerBuilder {
47            request: job,
48            layer: ServiceBuilder::new(),
49            source: (),
50            id: WorkerId::new(name),
51            service: PhantomData,
52        }
53    }
54}
55
56impl<M, Serv> WorkerBuilder<(), (), (), M, Serv> {
57    /// Consume a stream directly
58    #[deprecated(since = "0.6.0", note = "Consider using the `.backend`")]
59    pub fn stream<
60        NS: Stream<Item = Result<Option<Request<NJ, Ctx>>, Error>> + Send + 'static,
61        NJ,
62        Ctx,
63    >(
64        self,
65        stream: NS,
66    ) -> WorkerBuilder<NJ, Ctx, NS, M, Serv> {
67        WorkerBuilder {
68            request: PhantomData,
69            layer: self.layer,
70            source: stream,
71            id: self.id,
72            service: self.service,
73        }
74    }
75
76    /// Set the source to a backend that implements [Backend]
77    pub fn backend<NB: Backend<Request<NJ, Ctx>>, NJ, Res: Send, Ctx>(
78        self,
79        backend: NB,
80    ) -> WorkerBuilder<NJ, Ctx, NB, M, Serv>
81    where
82        Serv: Service<Request<NJ, Ctx>, Response = Res>,
83    {
84        WorkerBuilder {
85            request: PhantomData,
86            layer: self.layer,
87            source: backend,
88            id: self.id,
89            service: self.service,
90        }
91    }
92}
93
94impl<Req, M, Serv, Ctx> WorkerBuilder<Req, Ctx, (), M, Serv> {
95    /// Allows of decorating the service that consumes jobs.
96    /// Allows adding multiple [`tower`] middleware
97    pub fn chain<NewLayer>(
98        self,
99        f: impl FnOnce(ServiceBuilder<M>) -> ServiceBuilder<NewLayer>,
100    ) -> WorkerBuilder<Req, Ctx, (), NewLayer, Serv> {
101        let middleware = f(self.layer);
102
103        WorkerBuilder {
104            request: self.request,
105            layer: middleware,
106            id: self.id,
107            source: self.source,
108            service: self.service,
109        }
110    }
111    /// Allows adding a single layer [tower] middleware
112    pub fn layer<U>(self, layer: U) -> WorkerBuilder<Req, Ctx, (), Stack<U, M>, Serv>
113    where
114        M: Layer<U>,
115    {
116        WorkerBuilder {
117            request: self.request,
118            source: self.source,
119            layer: self.layer.layer(layer),
120            id: self.id,
121            service: self.service,
122        }
123    }
124
125    /// Adds data to the context
126    /// This will be shared by all requests
127    pub fn data<D>(self, data: D) -> WorkerBuilder<Req, Ctx, (), Stack<Data<D>, M>, Serv>
128    where
129        M: Layer<Data<D>>,
130    {
131        WorkerBuilder {
132            request: self.request,
133            source: self.source,
134            layer: self.layer.layer(Data::new(data)),
135            id: self.id,
136            service: self.service,
137        }
138    }
139}
140
141impl<Req, P, M, S, Ctx> WorkerFactory<Req, Ctx, S> for WorkerBuilder<Req, Ctx, P, M, S>
142where
143    S: Service<Request<Req, Ctx>>,
144    M: Layer<S>,
145    P: Backend<Request<Req, Ctx>>,
146{
147    type Source = P;
148
149    type Service = M::Service;
150
151    fn build(self, service: S) -> Worker<Ready<M::Service, P>> {
152        let worker_id = self.id;
153        let poller = self.source;
154        let middleware = self.layer;
155        let service = middleware.service(service);
156
157        Worker::new(worker_id, Ready::new(service, poller))
158    }
159}
160/// Helper trait for building new Workers from [`WorkerBuilder`]
161pub trait WorkerFactory<Req, Ctx, S> {
162    /// The request source for the worker
163    type Source;
164
165    /// The service that the worker will run jobs against
166    type Service;
167    /// Builds a [`WorkerFactory`] using a [`tower`] service
168    /// that can be used to generate a new [`Worker`] using the `build` method
169    /// # Arguments
170    ///
171    /// * `service` - A tower service
172    ///
173    /// # Examples
174    ///
175    fn build(self, service: S) -> Worker<Ready<Self::Service, Self::Source>>;
176}
177
178/// Helper trait for building new Workers from [`WorkerBuilder`]
179pub trait WorkerFactoryFn<Req, Ctx, F, FnArgs> {
180    /// The request source for the [`Worker`]
181    type Source;
182
183    /// The service that the worker will run jobs against
184    type Service;
185    /// Builds a [`WorkerFactoryFn`] using [`ServiceFn`]
186    /// that can be used to generate new [`Worker`] using the `build_fn` method
187    /// # Arguments
188    ///
189    /// * `f` - A functional service.
190    ///
191    /// # Examples
192    ///
193    /// A function can take many forms to allow flexibility
194    /// - An async function with a single argument of the item being processed
195    /// - An async function with an argument of the item being processed plus up-to 16 arguments that are extracted from the request [`Data`]
196    ///
197    /// A function can return:
198    /// - ()
199    /// - primitive
200    /// - Result<T, E: Error>
201    /// - impl IntoResponse
202    ///
203    /// ```rust
204    /// # use apalis_core::layers::extensions::Data;
205    /// #[derive(Debug)]
206    /// struct Email;
207    /// #[derive(Debug)]
208    /// struct PgPool;
209    ///
210    /// async fn send_email(email: Email, data: Data<PgPool>) {
211    ///     // Implementation of the task function?
212    /// }
213    /// ```
214    ///
215    fn build_fn(self, f: F) -> Worker<Ready<Self::Service, Self::Source>>;
216}
217
218impl<Req, W, F, Ctx, FnArgs> WorkerFactoryFn<Req, Ctx, F, FnArgs> for W
219where
220    W: WorkerFactory<Req, Ctx, ServiceFn<F, Req, Ctx, FnArgs>>,
221{
222    type Source = W::Source;
223
224    type Service = W::Service;
225
226    fn build_fn(self, f: F) -> Worker<Ready<Self::Service, Self::Source>> {
227        self.build(service_fn(f))
228    }
229}