1use crate::backend::Backend;
44use crate::error::BoxDynError;
45use crate::task::Task;
46use crate::worker::builder::{IntoWorkerService, WorkerService};
47use futures_util::FutureExt;
48use futures_util::future::Map;
49use std::fmt;
50use std::future::Future;
51use std::marker::PhantomData;
52use std::task::{Context, Poll};
53use tower_service::Service;
54
55pub mod from_request;
56pub mod into_response;
57
58pub mod guide;
60
61pub use self::{from_request::FromRequest, into_response::IntoResponse};
62
63pub fn task_fn<F, Args, Ctx, FnArgs>(f: F) -> TaskFn<F, Args, Ctx, FnArgs> {
83 TaskFn {
84 f,
85 req: PhantomData,
86 fn_args: PhantomData,
87 }
88}
89
90pub struct TaskFn<F, Args, Ctx, FnArgs> {
94 f: F,
95 req: PhantomData<(Args, Ctx)>,
96 fn_args: PhantomData<FnArgs>,
97}
98
99impl<T: Copy, Args, Ctx, FnArgs> Copy for TaskFn<T, Args, Ctx, FnArgs> {}
100
101impl<T: Clone, Args, Ctx, FnArgs> Clone for TaskFn<T, Args, Ctx, FnArgs> {
102 fn clone(&self) -> Self {
103 Self {
104 f: self.f.clone(),
105 req: PhantomData,
106 fn_args: PhantomData,
107 }
108 }
109}
110
111impl<T, Args, Ctx, FnArgs> fmt::Debug for TaskFn<T, Args, Ctx, FnArgs> {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 f.debug_struct("TaskFn")
114 .field("f", &std::any::type_name::<T>())
115 .field(
116 "req",
117 &format_args!(
118 "PhantomData<Task<{}, {}>>",
119 std::any::type_name::<Args>(),
120 std::any::type_name::<Ctx>()
121 ),
122 )
123 .field(
124 "fn_args",
125 &format_args!("PhantomData<{}>", std::any::type_name::<FnArgs>()),
126 )
127 .finish()
128 }
129}
130
131type FnFuture<F, O, R, E> = Map<F, fn(O) -> std::result::Result<R, E>>;
133
134macro_rules! impl_service_fn {
135 ($($K:ident),+) => {
136 #[allow(unused_parens)]
137 impl<T, F, Args: Send + 'static, R, Ctx: Send + 'static, IdType: Send + Clone + 'static, $($K),+> Service<Task<Args, Ctx, IdType>> for TaskFn<T, Args, Ctx, ($($K),+)>
138 where
139 T: FnMut(Args, $($K),+) -> F + Send + Clone + 'static,
140 F: Future + Send,
141 F::Output: IntoResponse<Output = R>,
142 $(
143 $K: FromRequest<Task<Args, Ctx, IdType>> + Send,
144 < $K as FromRequest<Task<Args, Ctx, IdType>> >::Error: std::error::Error + 'static + Send + Sync,
145 )+
146 {
147 type Response = R;
148 type Error = BoxDynError;
149 type Future = futures_util::future::BoxFuture<'static, Result<R, BoxDynError>>;
150
151 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152 Poll::Ready(Ok(()))
153 }
154
155 fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
156 let mut svc = self.f.clone();
157 #[allow(non_snake_case)]
158 let fut = async move {
159 #[allow(clippy::double_parens)]
160 let results: Result<($($K),+), BoxDynError> = { Ok(($($K::from_request(&task).await.map_err(|e| Box::new(e) as BoxDynError)?),+)) };
161 match results {
162 Ok(($($K),+)) => {
163 let req = task.args;
164 (svc)(req, $($K),+).map(F::Output::into_response).await
165 }
166 Err(e) => Err(e),
167 }
168 };
169 fut.boxed()
170 }
171 }
172
173 #[allow(unused_parens)]
174 impl<T, Args, Ctx, F, R, B, $($K),+>
175 IntoWorkerService<B, TaskFn<T, Args, Ctx, ($($K),+)>, Args, Ctx> for T
176 where
177 B: Backend<Args= Args, Context = Ctx>,
178 T: FnMut(Args, $($K),+) -> F + Send + Clone + 'static,
179 F: Future + Send,
180 Args: Send + 'static,
181 Ctx: Send + Sync + 'static,
182 B::IdType: Send + 'static,
183 F::Output: IntoResponse<Output = R>,
184 $(
185 $K: FromRequest<Task<Args, Ctx, B::IdType>> + Send,
186 < $K as FromRequest<Task<Args, Ctx, B::IdType>> >::Error: std::error::Error + 'static + Send + Sync,
187 )+
188 {
189 type Backend = B;
190 fn into_service(self, backend: B) -> WorkerService<B, TaskFn<T, Args, Ctx, ($($K),+)>> {
191 WorkerService {
192 backend,
193 service: task_fn(self),
194 }
195 }
196 }
197 };
198}
199
200impl<T, F, Args, R, Ctx, IdType> Service<Task<Args, Ctx, IdType>> for TaskFn<T, Args, Ctx, ()>
201where
202 T: FnMut(Args) -> F,
203 F: Future,
204 F::Output: IntoResponse<Output = R>,
205{
206 type Response = R;
207 type Error = BoxDynError;
208 type Future = FnFuture<F, F::Output, R, BoxDynError>;
209
210 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
211 Poll::Ready(Ok(()))
212 }
213
214 fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
215 let fut = (self.f)(task.args);
216
217 fut.map(F::Output::into_response)
218 }
219}
220
221impl<T, Args, Ctx, F, R, Backend> IntoWorkerService<Backend, TaskFn<T, Args, Ctx, ()>, Args, Ctx>
222 for T
223where
224 T: FnMut(Args) -> F,
225 F: Future,
226 F::Output: IntoResponse<Output = R>,
227 Backend: crate::backend::Backend<Args = Args, Context = Ctx>,
228 Args: Send,
229{
230 type Backend = Backend;
231 fn into_service(self, backend: Backend) -> WorkerService<Backend, TaskFn<T, Args, Ctx, ()>> {
232 WorkerService {
233 backend,
234 service: task_fn(self),
235 }
236 }
237}
238
239impl<Args, Ctx, S, B> IntoWorkerService<B, S, Args, Ctx> for S
240where
241 S: Service<Task<Args, Ctx, B::IdType>>,
242 B: Backend<Args = Args, Context = Ctx>,
243{
244 type Backend = B;
245 fn into_service(self, backend: B) -> WorkerService<B, S> {
246 WorkerService {
247 backend,
248 service: self,
249 }
250 }
251}
252
253impl_service_fn!(A);
254impl_service_fn!(A1, A2);
255impl_service_fn!(A1, A2, A3);
256impl_service_fn!(A1, A2, A3, A4);
257impl_service_fn!(A1, A2, A3, A4, A5);
258impl_service_fn!(A1, A2, A3, A4, A5, A6);
259impl_service_fn!(A1, A2, A3, A4, A5, A6, A7);
260impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8);
261impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9);
262impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10);
263impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11);
264impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12);
265impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13);
266impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14);
267impl_service_fn!(
268 A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15
269);
270impl_service_fn!(
271 A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16
272);