1use std::marker::PhantomData;
78use tower_layer::{Identity, Layer, Stack};
79use tower_service::Service;
80
81use crate::{
82 backend::Backend,
83 monitor::shutdown::Shutdown,
84 task::{Task, data::Data},
85 worker::{Worker, event::EventHandlerBuilder},
86};
87
88pub struct WorkerBuilder<Args, Ctx, Source, Middleware> {
90 pub(crate) name: String,
91 pub(crate) request: PhantomData<(Args, Ctx)>,
92 pub(crate) layer: Middleware,
93 pub(crate) source: Source,
94 pub(crate) event_handler: EventHandlerBuilder,
95 pub(crate) shutdown: Option<Shutdown>,
96}
97
98impl<Args, Ctx, Source, Middleware> std::fmt::Debug
99 for WorkerBuilder<Args, Ctx, Source, Middleware>
100{
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("WorkerBuilder")
103 .field("id", &self.name)
104 .field("job", &std::any::type_name::<(Args, Ctx)>())
105 .field("layer", &std::any::type_name::<Middleware>())
106 .field("source", &std::any::type_name::<Source>())
107 .finish()
108 }
109}
110
111impl WorkerBuilder<(), (), (), Identity> {
112 pub fn new<T: AsRef<str>>(name: T) -> Self {
114 Self {
115 request: PhantomData,
116 layer: Identity::new(),
117 source: (),
118 name: name.as_ref().to_owned(),
119 event_handler: EventHandlerBuilder::default(),
120 shutdown: None,
121 }
122 }
123}
124
125impl WorkerBuilder<(), (), (), Identity> {
126 pub fn backend<NB, NJ, Ctx>(self, backend: NB) -> WorkerBuilder<NJ, Ctx, NB, Identity>
128 where
129 NB: Backend<Args = NJ, Context = Ctx>,
130 {
131 WorkerBuilder {
132 request: PhantomData,
133 layer: self.layer,
134 source: backend,
135 name: self.name,
136 shutdown: self.shutdown,
137 event_handler: self.event_handler,
138 }
139 }
140}
141
142impl<Args, Ctx, M, B> WorkerBuilder<Args, Ctx, B, M>
143where
144 B: Backend<Args = Args>,
145{
146 pub fn chain<NewLayer>(
149 self,
150 f: impl FnOnce(M) -> NewLayer,
151 ) -> WorkerBuilder<Args, Ctx, B, NewLayer> {
152 let middleware = f(self.layer);
153
154 WorkerBuilder {
155 request: self.request,
156 layer: middleware,
157 name: self.name,
158 source: self.source,
159 shutdown: self.shutdown,
160 event_handler: self.event_handler,
161 }
162 }
163 pub fn layer<U>(self, layer: U) -> WorkerBuilder<Args, Ctx, B, Stack<U, M>> {
165 WorkerBuilder {
166 request: self.request,
167 source: self.source,
168 layer: Stack::new(layer, self.layer),
169 name: self.name,
170 shutdown: self.shutdown,
171 event_handler: self.event_handler,
172 }
173 }
174
175 pub fn data<D>(self, data: D) -> WorkerBuilder<Args, Ctx, B, Stack<Data<D>, M>>
178 where
179 M: Layer<Data<D>>,
180 {
181 WorkerBuilder {
182 request: self.request,
183 source: self.source,
184 layer: Stack::new(Data::new(data), self.layer),
185 name: self.name,
186 shutdown: self.shutdown,
187 event_handler: self.event_handler,
188 }
189 }
190}
191
192impl<Args, Ctx, B, M> WorkerBuilder<Args, Ctx, B, M>
194where
195 B: Backend<Args = Args, Context = Ctx>,
196{
197 pub fn build<W: IntoWorkerServiceExt<Args, Ctx, Svc, B, M>, Svc>(
199 self,
200 service: W,
201 ) -> Worker<Args, Ctx, W::Backend, Svc, M>
202 where
203 Svc: Service<Task<Args, Ctx, B::IdType>>,
204 {
205 service.build_with(self)
206 }
207}
208
209#[derive(Debug, Clone)]
211pub struct WorkerService<Backend, Svc> {
212 pub backend: Backend,
214 pub service: Svc,
216}
217
218pub trait IntoWorkerService<B, Svc, Args, Ctx>
220where
221 B: crate::backend::Backend<Args = Args, Context = Ctx>,
222 Svc: Service<Task<Args, Ctx, B::IdType>>,
223{
224 type Backend;
226 fn into_service(self, backend: B) -> WorkerService<Self::Backend, Svc>;
228}
229
230pub trait IntoWorkerServiceExt<Args, Ctx, Svc, Backend, M>: Sized
232where
233 Backend: crate::backend::Backend<Args = Args, Context = Ctx>,
234 Svc: Service<Task<Args, Ctx, Backend::IdType>>,
235 Self: IntoWorkerService<Backend, Svc, Args, Ctx>,
236{
237 fn build_with(
239 self,
240 builder: WorkerBuilder<Args, Ctx, Backend, M>,
241 ) -> Worker<Args, Ctx, Self::Backend, Svc, M>;
242}
243
244impl<T, Args, Ctx, Svc, B, M> IntoWorkerServiceExt<Args, Ctx, Svc, B, M> for T
249where
250 T: IntoWorkerService<B, Svc, Args, Ctx>,
251 B: Backend<Args = Args, Context = Ctx>,
252 Svc: Service<Task<Args, Ctx, B::IdType>>,
253{
254 fn build_with(
255 self,
256 builder: WorkerBuilder<Args, Ctx, B, M>,
257 ) -> Worker<Args, Ctx, T::Backend, Svc, M> {
258 let svc = self.into_service(builder.source);
259 let mut worker = Worker::new(builder.name, svc.backend, svc.service, builder.layer);
260 worker.event_handler = builder
261 .event_handler
262 .write()
263 .map(|mut d| d.take())
264 .unwrap()
265 .unwrap_or(Box::new(|_ctx, _e| {
266 debug!("Worker [{}] received event {_e}", _ctx.name());
267 }));
268 worker.shutdown = builder.shutdown;
269 worker
270 }
271}
272
273#[cfg(feature = "test-utils")]
277pub mod task_fn_validator {
278 use crate::backend::Backend;
279 use crate::task::Task;
280 use tower_service::Service;
281
282 use crate::task_fn::{FromRequest, TaskFn};
283
284 macro_rules! impl_check_fn {
286 ($($num:tt => $($arg:ident),+);+ $(;)?) => {
287 $(
288 #[inline]
289 #[doc = concat!("A helper for checking that the builder can build a worker with the provided service (", stringify!($num), " arguments)")]
290 pub fn $num<
291 F, B, Args, Ctx,
292 $($arg: FromRequest<Task<Args, Ctx, B::IdType>>),+
293 >(
294 _: F,
295 ) where
296 TaskFn<F, Args, Ctx, ($($arg,)+)>: Service<Task<Args, Ctx, B::IdType>>,
297 B: Backend<Args = Args>
298 {
299 }
300 )+
301 };
302 }
303
304 impl_check_fn! {
305 check_fn_1 => A1;
306 check_fn_2 => A1, A2;
307 check_fn_3 => A1, A2, A3;
308 check_fn_4 => A1, A2, A3, A4;
309 check_fn_5 => A1, A2, A3, A4, A5;
310 check_fn_6 => A1, A2, A3, A4, A5, A6;
311 check_fn_7 => A1, A2, A3, A4, A5, A6, A7;
312 check_fn_8 => A1, A2, A3, A4, A5, A6, A7, A8;
313 check_fn_9 => A1, A2, A3, A4, A5, A6, A7, A8, A9;
314 check_fn_10 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10;
315 check_fn_11 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11;
316 check_fn_12 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12;
317 check_fn_13 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13;
318 check_fn_14 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14;
319 check_fn_15 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15;
320 check_fn_16 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16;
321 }
322}