1use std::marker::PhantomData;
78use tower_layer::{Identity, Layer, Stack};
79use tower_service::Service;
80
81use crate::{
82 backend::Backend,
83 monitor::shutdown::Shutdown,
84 task::{Task, data::Data},
85 worker::{Worker, event::EventHandlerBuilder},
86};
87
88pub struct WorkerBuilder<Args, Ctx, Source, Middleware> {
90 pub(crate) name: String,
91 pub(crate) request: PhantomData<(Args, Ctx)>,
92 pub(crate) layer: Middleware,
93 pub(crate) source: Source,
94 pub(crate) event_handler: EventHandlerBuilder,
95 pub(crate) shutdown: Option<Shutdown>,
96}
97
98impl<Args, Ctx, Source, Middleware> std::fmt::Debug
99 for WorkerBuilder<Args, Ctx, Source, Middleware>
100{
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("WorkerBuilder")
103 .field("id", &self.name)
104 .field("job", &std::any::type_name::<(Args, Ctx)>())
105 .field("layer", &std::any::type_name::<Middleware>())
106 .field("source", &std::any::type_name::<Source>())
107 .finish()
108 }
109}
110
111impl WorkerBuilder<(), (), (), Identity> {
112 pub fn new<T: AsRef<str>>(name: T) -> WorkerBuilder<(), (), (), Identity> {
114 WorkerBuilder {
115 request: PhantomData,
116 layer: Identity::new(),
117 source: (),
118 name: name.as_ref().to_owned(),
119 event_handler: EventHandlerBuilder::default(),
120 shutdown: None,
121 }
122 }
123}
124
125impl WorkerBuilder<(), (), (), Identity> {
126 pub fn backend<NB, NJ, Ctx>(self, backend: NB) -> WorkerBuilder<NJ, Ctx, NB, Identity>
128 where
129 NB: Backend<Args = NJ, Context = Ctx>,
130 {
131 WorkerBuilder {
132 request: PhantomData,
133 layer: self.layer,
134 source: backend,
135 name: self.name,
136 shutdown: self.shutdown,
137 event_handler: self.event_handler,
138 }
139 }
140}
141
142impl<Args, Ctx, M, B> WorkerBuilder<Args, Ctx, B, M>
143where
144 B: Backend<Args = Args>,
145{
146 pub fn chain<NewLayer>(
149 self,
150 f: impl FnOnce(M) -> NewLayer,
151 ) -> WorkerBuilder<Args, Ctx, B, NewLayer> {
152 let middleware = f(self.layer);
153
154 WorkerBuilder {
155 request: self.request,
156 layer: middleware,
157 name: self.name,
158 source: self.source,
159 shutdown: self.shutdown,
160 event_handler: self.event_handler,
161 }
162 }
163 pub fn layer<U>(self, layer: U) -> WorkerBuilder<Args, Ctx, B, Stack<U, M>> {
165 WorkerBuilder {
166 request: self.request,
167 source: self.source,
168 layer: Stack::new(layer, self.layer),
169 name: self.name,
170 shutdown: self.shutdown,
171 event_handler: self.event_handler,
172 }
173 }
174
175 pub fn data<D>(self, data: D) -> WorkerBuilder<Args, Ctx, B, Stack<Data<D>, M>>
178 where
179 M: Layer<Data<D>>,
180 {
181 WorkerBuilder {
182 request: self.request,
183 source: self.source,
184 layer: Stack::new(Data::new(data), self.layer),
185 name: self.name,
186 shutdown: self.shutdown,
187 event_handler: self.event_handler,
188 }
189 }
190}
191
192impl<Args, Ctx, B, M> WorkerBuilder<Args, Ctx, B, M>
194where
195 B: Backend<Args = Args, Context = Ctx>,
196{
197 pub fn build<W: IntoWorkerServiceExt<Args, Ctx, Svc, B, M>, Svc>(
199 self,
200 service: W,
201 ) -> Worker<Args, Ctx, B, Svc, M>
202 where
203 Svc: Service<Task<Args, Ctx, B::IdType>>,
204 {
205 service.build_with(self)
206 }
207}
208
209pub trait IntoWorkerService<Backend, Svc, Args, Ctx>
211where
212 Backend: crate::backend::Backend<Args = Args, Context = Ctx>,
213 Svc: Service<Task<Args, Ctx, Backend::IdType>>,
214{
215 fn into_service(self, backend: &Backend) -> Svc;
217}
218
219pub trait IntoWorkerServiceExt<Args, Ctx, Svc, Backend, M>: Sized
221where
222 Backend: crate::backend::Backend<Args = Args, Context = Ctx>,
223 Svc: Service<Task<Args, Ctx, Backend::IdType>>,
224{
225 fn build_with(
227 self,
228 builder: WorkerBuilder<Args, Ctx, Backend, M>,
229 ) -> Worker<Args, Ctx, Backend, Svc, M>;
230}
231
232impl<T, Args, Ctx, Svc, B, M> IntoWorkerServiceExt<Args, Ctx, Svc, B, M> for T
237where
238 T: IntoWorkerService<B, Svc, Args, Ctx>,
239 B: Backend<Args = Args, Context = Ctx>,
240 Svc: Service<Task<Args, Ctx, B::IdType>>,
241{
242 fn build_with(self, builder: WorkerBuilder<Args, Ctx, B, M>) -> Worker<Args, Ctx, B, Svc, M> {
243 let svc = self.into_service(&builder.source);
244 let mut worker = Worker::new(builder.name, builder.source, svc, builder.layer);
245 worker.event_handler = builder
246 .event_handler
247 .write()
248 .map(|mut d| d.take())
249 .unwrap()
250 .unwrap_or(Box::new(|_ctx, _e| {
251 debug!("Worker [{}] received event {_e}", _ctx.name());
252 }));
253 worker.shutdown = builder.shutdown;
254 worker
255 }
256}
257
258#[cfg(feature = "test-utils")]
262pub mod task_fn_validator {
263 use crate::backend::Backend;
264 use crate::task::Task;
265 use tower_service::Service;
266
267 use crate::task_fn::{FromRequest, TaskFn};
268
269 macro_rules! impl_check_fn {
271 ($($num:tt => $($arg:ident),+);+ $(;)?) => {
272 $(
273 #[inline]
274 #[doc = concat!("A helper for checking that the builder can build a worker with the provided service (", stringify!($num), " arguments)")]
275 pub fn $num<
276 F, B, Args, Ctx,
277 $($arg: FromRequest<Task<Args, Ctx, B::IdType>>),+
278 >(
279 _: F,
280 ) where
281 TaskFn<F, Args, Ctx, ($($arg,)+)>: Service<Task<Args, Ctx, B::IdType>>,
282 B: Backend<Args = Args>
283 {
284 }
285 )+
286 };
287 }
288
289 impl_check_fn! {
290 check_fn_1 => A1;
291 check_fn_2 => A1, A2;
292 check_fn_3 => A1, A2, A3;
293 check_fn_4 => A1, A2, A3, A4;
294 check_fn_5 => A1, A2, A3, A4, A5;
295 check_fn_6 => A1, A2, A3, A4, A5, A6;
296 check_fn_7 => A1, A2, A3, A4, A5, A6, A7;
297 check_fn_8 => A1, A2, A3, A4, A5, A6, A7, A8;
298 check_fn_9 => A1, A2, A3, A4, A5, A6, A7, A8, A9;
299 check_fn_10 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10;
300 check_fn_11 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11;
301 check_fn_12 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12;
302 check_fn_13 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13;
303 check_fn_14 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14;
304 check_fn_15 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15;
305 check_fn_16 => A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16;
306 }
307}