apalis_core/task_fn/
mod.rs

1//! Utilities for adapting async functions into a task handler.
2//!
3//! The [`task_fn`] helper and the [`TaskFn`] struct in this module allow you to wrap
4//! async functions or closures into a [`TaskFn`] implementation, which can then be
5//! used in service middleware pipelines or other components expecting a [`TaskFn`].
6//!
7//! This is particularly useful when building lightweight, composable services from plain
8//! functions, including those with extracted arguments via [`FromRequest`].
9//!
10//! # Features
11//!
12//! - Supports functions with up to 16 additional arguments beyond the core request.
13//! - Automatically applies argument extraction using the [`FromRequest`] trait.
14//! - Converts output to responses using the [`IntoResponse`] trait.
15//! - Captures function argument types at compile time via generics for static dispatch.
16//!
17//! # Example
18//!
19//! ```rust
20//! # use apalis_core::task::data::Data;
21//! #[derive(Clone)]
22//! struct State;
23//!
24//! async fn handler(id: u32, state: Data<State>) -> String {
25//!     format!("Got id {} with state", id)
26//! }
27//! ```
28//! # How It Works
29//!
30//! Internally, [`task_fn`] returns a [`TaskFn`] wrapper that implements [`Service`]
31//! for [`Task`] values. When the service is called, it:
32//!
33//! 1. Extracts arguments using [`FromRequest`].
34//! 2. Calls the function with extracted arguments.
35//! 3. Wraps the result using [`IntoResponse`].
36//!
37//! [`FromRequest`]: crate::task_fn::from_request::FromRequest
38//! [`IntoResponse`]: crate::task_fn::into_response::IntoResponse
39//! [`task_fn`]: crate::task_fn::task_fn
40//! [`TaskFn`]: crate::task_fn::TaskFn
41//! [`Service`]: tower_service::Service
42//! [`Task`]: crate::task
43use crate::backend::Backend;
44use crate::error::BoxDynError;
45use crate::task::Task;
46use crate::worker::builder::{IntoWorkerService, WorkerService};
47use futures_util::FutureExt;
48use futures_util::future::Map;
49use std::fmt;
50use std::future::Future;
51use std::marker::PhantomData;
52use std::task::{Context, Poll};
53use tower_service::Service;
54
55pub mod from_request;
56pub mod into_response;
57
58// #[doc(hidden)]
59pub mod guide;
60
61pub use self::{from_request::FromRequest, into_response::IntoResponse};
62
63/// A helper method to build a [`TaskFn`] from an async function or closure.
64///
65/// # Example
66/// ```rust
67/// # use apalis_core::task::data::Data;
68/// #[derive(Clone)]
69/// struct State {
70///     // db: Arc<DatabaseConnection>,
71/// }
72/// async fn handler(id: u32, state: Data<State>) -> String {
73///     format!("Got id {} with state", id)
74/// }   
75///```
76/// This method can take functions with up to 16 additional arguments beyond the core request.
77///
78/// See Also:
79///
80/// - [`FromRequest`]
81/// - [`IntoResponse`]
82pub fn task_fn<F, Args, Ctx, FnArgs>(f: F) -> TaskFn<F, Args, Ctx, FnArgs> {
83    TaskFn {
84        f,
85        req: PhantomData,
86        fn_args: PhantomData,
87    }
88}
89
90/// An executable service implemented by a closure.
91///
92/// See [`task_fn`] for more details.
93pub struct TaskFn<F, Args, Ctx, FnArgs> {
94    f: F,
95    req: PhantomData<(Args, Ctx)>,
96    fn_args: PhantomData<FnArgs>,
97}
98
99impl<T: Copy, Args, Ctx, FnArgs> Copy for TaskFn<T, Args, Ctx, FnArgs> {}
100
101impl<T: Clone, Args, Ctx, FnArgs> Clone for TaskFn<T, Args, Ctx, FnArgs> {
102    fn clone(&self) -> Self {
103        Self {
104            f: self.f.clone(),
105            req: PhantomData,
106            fn_args: PhantomData,
107        }
108    }
109}
110
111impl<T, Args, Ctx, FnArgs> fmt::Debug for TaskFn<T, Args, Ctx, FnArgs> {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        f.debug_struct("TaskFn")
114            .field("f", &std::any::type_name::<T>())
115            .field(
116                "req",
117                &format_args!(
118                    "PhantomData<Task<{}, {}>>",
119                    std::any::type_name::<Args>(),
120                    std::any::type_name::<Ctx>()
121                ),
122            )
123            .field(
124                "fn_args",
125                &format_args!("PhantomData<{}>", std::any::type_name::<FnArgs>()),
126            )
127            .finish()
128    }
129}
130
131/// The Future returned from [`TaskFn`] service.
132type FnFuture<F, O, R, E> = Map<F, fn(O) -> std::result::Result<R, E>>;
133
134macro_rules! impl_service_fn {
135    ($($K:ident),+) => {
136        #[allow(unused_parens)]
137        impl<T, F, Args: Send + 'static, R, Ctx: Send + 'static, IdType: Send + Clone + 'static, $($K),+> Service<Task<Args, Ctx, IdType>> for TaskFn<T, Args, Ctx, ($($K),+)>
138        where
139            T: FnMut(Args, $($K),+) -> F + Send + Clone + 'static,
140            F: Future + Send,
141            F::Output: IntoResponse<Output = R>,
142            $(
143                $K: FromRequest<Task<Args, Ctx, IdType>> + Send,
144                < $K as FromRequest<Task<Args, Ctx, IdType>> >::Error: std::error::Error + 'static + Send + Sync,
145            )+
146        {
147            type Response = R;
148            type Error = BoxDynError;
149            type Future = futures_util::future::BoxFuture<'static, Result<R, BoxDynError>>;
150
151            fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152                Poll::Ready(Ok(()))
153            }
154
155            fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
156                let mut svc = self.f.clone();
157                #[allow(non_snake_case)]
158                let fut = async move {
159                    let results: Result<($($K),+), BoxDynError> = { Ok(($($K::from_request(&task).await.map_err(|e| Box::new(e) as BoxDynError)?),+)) };
160                    match results {
161                        Ok(($($K),+)) => {
162                            let req = task.args;
163                            (svc)(req, $($K),+).map(F::Output::into_response).await
164                        }
165                        Err(e) => Err(e),
166                    }
167                };
168                fut.boxed()
169            }
170        }
171
172        #[allow(unused_parens)]
173        impl<T, Args, Ctx, F, R, B, $($K),+>
174            IntoWorkerService<B, TaskFn<T, Args, Ctx, ($($K),+)>, Args, Ctx> for T
175        where
176            B: Backend<Args= Args, Context = Ctx>,
177            T: FnMut(Args, $($K),+) -> F + Send + Clone + 'static,
178            F: Future + Send,
179            Args: Send + 'static,
180            Ctx: Send + Sync + 'static,
181            B::IdType: Send + 'static,
182            F::Output: IntoResponse<Output = R>,
183            $(
184                $K: FromRequest<Task<Args, Ctx, B::IdType>> + Send,
185                < $K as FromRequest<Task<Args, Ctx, B::IdType>> >::Error: std::error::Error + 'static + Send + Sync,
186            )+
187        {
188            type Backend = B;
189            fn into_service(self, backend: B) -> WorkerService<B, TaskFn<T, Args, Ctx, ($($K),+)>> {
190                WorkerService {
191                    backend,
192                    service: task_fn(self),
193                }
194            }
195        }
196    };
197}
198
199impl<T, F, Args, R, Ctx, IdType> Service<Task<Args, Ctx, IdType>> for TaskFn<T, Args, Ctx, ()>
200where
201    T: FnMut(Args) -> F,
202    F: Future,
203    F::Output: IntoResponse<Output = R>,
204{
205    type Response = R;
206    type Error = BoxDynError;
207    type Future = FnFuture<F, F::Output, R, BoxDynError>;
208
209    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
210        Poll::Ready(Ok(()))
211    }
212
213    fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
214        let fut = (self.f)(task.args);
215
216        fut.map(F::Output::into_response)
217    }
218}
219
220impl<T, Args, Ctx, F, R, Backend> IntoWorkerService<Backend, TaskFn<T, Args, Ctx, ()>, Args, Ctx>
221    for T
222where
223    T: FnMut(Args) -> F,
224    F: Future,
225    F::Output: IntoResponse<Output = R>,
226    Backend: crate::backend::Backend<Args = Args, Context = Ctx>,
227    Args: Send,
228{
229    type Backend = Backend;
230    fn into_service(self, backend: Backend) -> WorkerService<Backend, TaskFn<T, Args, Ctx, ()>> {
231        WorkerService {
232            backend,
233            service: task_fn(self),
234        }
235    }
236}
237
238impl<Args, Ctx, S, B> IntoWorkerService<B, S, Args, Ctx> for S
239where
240    S: Service<Task<Args, Ctx, B::IdType>>,
241    B: Backend<Args = Args, Context = Ctx>,
242{
243    type Backend = B;
244    fn into_service(self, backend: B) -> WorkerService<B, S> {
245        WorkerService {
246            backend,
247            service: self,
248        }
249    }
250}
251
252impl_service_fn!(A);
253impl_service_fn!(A1, A2);
254impl_service_fn!(A1, A2, A3);
255impl_service_fn!(A1, A2, A3, A4);
256impl_service_fn!(A1, A2, A3, A4, A5);
257impl_service_fn!(A1, A2, A3, A4, A5, A6);
258impl_service_fn!(A1, A2, A3, A4, A5, A6, A7);
259impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8);
260impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9);
261impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10);
262impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11);
263impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12);
264impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13);
265impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14);
266impl_service_fn!(
267    A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15
268);
269impl_service_fn!(
270    A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16
271);