1use std::marker::PhantomData;
78use tower_layer::{Identity, Layer, Stack};
79use tower_service::Service;
80
81use crate::{
82 backend::Backend,
83 monitor::shutdown::Shutdown,
84 task::{Task, data::Data},
85 worker::{Worker, event::EventHandlerBuilder},
86};
87
88pub struct WorkerBuilder<Args, Ctx, Source, Middleware> {
90 pub(crate) name: String,
91 pub(crate) request: PhantomData<(Args, Ctx)>,
92 pub(crate) layer: Middleware,
93 pub(crate) source: Source,
94 pub(crate) event_handler: EventHandlerBuilder,
95 pub(crate) shutdown: Option<Shutdown>,
96}
97
98impl<Args, Ctx, Source, Middleware> std::fmt::Debug
99 for WorkerBuilder<Args, Ctx, Source, Middleware>
100{
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("WorkerBuilder")
103 .field("id", &self.name)
104 .field("job", &std::any::type_name::<(Args, Ctx)>())
105 .field("layer", &std::any::type_name::<Middleware>())
106 .field("source", &std::any::type_name::<Source>())
107 .finish()
108 }
109}
110
111impl WorkerBuilder<(), (), (), Identity> {
112 pub fn new<T: AsRef<str>>(name: T) -> Self {
114 Self {
115 request: PhantomData,
116 layer: Identity::new(),
117 source: (),
118 name: name.as_ref().to_owned(),
119 event_handler: EventHandlerBuilder::default(),
120 shutdown: None,
121 }
122 }
123}
124
125impl WorkerBuilder<(), (), (), Identity> {
126 pub fn backend<NB, NJ, Ctx>(self, backend: NB) -> WorkerBuilder<NJ, Ctx, NB, Identity>
128 where
129 NB: Backend<Args = NJ, Context = Ctx>,
130 {
131 WorkerBuilder {
132 request: PhantomData,
133 layer: self.layer,
134 source: backend,
135 name: self.name,
136 shutdown: self.shutdown,
137 event_handler: self.event_handler,
138 }
139 }
140}
141
142impl<Args, Ctx, M, B> WorkerBuilder<Args, Ctx, B, M>
143where
144 B: Backend<Args = Args>,
145{
146 pub fn chain<NewLayer>(
149 self,
150 f: impl FnOnce(M) -> NewLayer,
151 ) -> WorkerBuilder<Args, Ctx, B, NewLayer> {
152 let middleware = f(self.layer);
153
154 WorkerBuilder {
155 request: self.request,
156 layer: middleware,
157 name: self.name,
158 source: self.source,
159 shutdown: self.shutdown,
160 event_handler: self.event_handler,
161 }
162 }
163 pub fn layer<U>(self, layer: U) -> WorkerBuilder<Args, Ctx, B, Stack<U, M>> {
165 WorkerBuilder {
166 request: self.request,
167 source: self.source,
168 layer: Stack::new(layer, self.layer),
169 name: self.name,
170 shutdown: self.shutdown,
171 event_handler: self.event_handler,
172 }
173 }
174
175 pub fn data<D>(self, data: D) -> WorkerBuilder<Args, Ctx, B, Stack<Data<D>, M>>
179 where
180 M: Layer<Data<D>>,
181 {
182 WorkerBuilder {
183 request: self.request,
184 source: self.source,
185 layer: Stack::new(Data::new(data), self.layer),
186 name: self.name,
187 shutdown: self.shutdown,
188 event_handler: self.event_handler,
189 }
190 }
191}
192
193impl<Args, Ctx, B, M> WorkerBuilder<Args, Ctx, B, M>
195where
196 B: Backend<Args = Args, Context = Ctx>,
197{
198 pub fn build<W: IntoWorkerServiceExt<Args, Ctx, Svc, B, M>, Svc>(
200 self,
201 service: W,
202 ) -> Worker<Args, Ctx, W::Backend, Svc, M>
203 where
204 Svc: Service<Task<Args, Ctx, B::IdType>>,
205 {
206 service.build_with(self)
207 }
208}
209
210#[derive(Debug, Clone)]
212pub struct WorkerService<Backend, Svc> {
213 pub backend: Backend,
215 pub service: Svc,
217}
218
219pub trait IntoWorkerService<B, Svc, Args, Ctx>
221where
222 B: crate::backend::Backend<Args = Args, Context = Ctx>,
223 Svc: Service<Task<Args, Ctx, B::IdType>>,
224{
225 type Backend;
227 fn into_service(self, backend: B) -> WorkerService<Self::Backend, Svc>;
229}
230
231pub trait IntoWorkerServiceExt<Args, Ctx, Svc, Backend, M>: Sized
233where
234 Backend: crate::backend::Backend<Args = Args, Context = Ctx>,
235 Svc: Service<Task<Args, Ctx, Backend::IdType>>,
236 Self: IntoWorkerService<Backend, Svc, Args, Ctx>,
237{
238 fn build_with(
240 self,
241 builder: WorkerBuilder<Args, Ctx, Backend, M>,
242 ) -> Worker<Args, Ctx, Self::Backend, Svc, M>;
243}
244
245impl<T, Args, Ctx, Svc, B, M> IntoWorkerServiceExt<Args, Ctx, Svc, B, M> for T
250where
251 T: IntoWorkerService<B, Svc, Args, Ctx>,
252 B: Backend<Args = Args, Context = Ctx>,
253 Svc: Service<Task<Args, Ctx, B::IdType>>,
254{
255 fn build_with(
256 self,
257 builder: WorkerBuilder<Args, Ctx, B, M>,
258 ) -> Worker<Args, Ctx, T::Backend, Svc, M> {
259 let svc = self.into_service(builder.source);
260 let mut worker = Worker::new(builder.name, svc.backend, svc.service, builder.layer);
261 worker.event_handler = builder
262 .event_handler
263 .write()
264 .map(|mut d| d.take())
265 .unwrap()
266 .unwrap_or(Box::new(|_ctx, _e| {
267 debug!("Worker [{}] received event {_e}", _ctx.name());
268 }));
269 worker.shutdown = builder.shutdown;
270 worker
271 }
272}
273
274#[cfg(feature = "test-utils")]
278pub mod task_fn_validator {
279 use crate::backend::Backend;
280 use crate::task::Task;
281 use tower_service::Service;
282
283 use crate::task_fn::{FromRequest, TaskFn};
284
285 macro_rules! impl_check_fn {
287 ($($num:tt => $($arg:ident),+);+ $(;)?) => {
288 $(
289 #[inline]
290 #[doc = concat!("A helper for checking that the builder can build a worker with the provided service (", stringify!($num), " arguments)")]
291 pub fn $num<
292 F, B, Args, Ctx,
293 $($arg: FromRequest<Task<Args, Ctx, B::IdType>>),+
294 >(
295 _: F,
296 ) where
297 TaskFn<F, Args, Ctx, ($($arg,)+)>: Service<Task<Args, Ctx, B::IdType>>,
298 B: Backend<Args = Args>
299 {
300 }
301 )+
302 };
303 }
304
305 impl_check_fn! {
306 check_fn_1 => A1;
307 check_fn_2 => A1, A2;
308 check_fn_3 => A1, A2, A3;
309 check_fn_4 => A1, A2, A3, A4;
310 check_fn_5 => A1, A2, A3, A4, A5;
311 check_fn_6 => A1, A2, A3, A4, A5, A6;
312 check_fn_7 => A1, A2, A3, A4, A5, A6, A7;
313 check_fn_8 => A1, A2, A3, A4, A5, A6, A7, A8;
314 check_fn_9 => A1, A2, A3, A4, A5, A6, A7, A8, A9;
315 check_fn_10 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10;
316 check_fn_11 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11;
317 check_fn_12 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12;
318 check_fn_13 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13;
319 check_fn_14 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14;
320 check_fn_15 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15;
321 check_fn_16 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16;
322 }
323}