apalis_core/task_fn/
mod.rs

1//! Utilities for adapting async functions into a task handler.
2//!
3//! The [`task_fn`] helper and the [`TaskFn`] struct in this module allow you to wrap
4//! async functions or closures into a [`TaskFn`] implementation, which can then be
5//! used in service middleware pipelines or other components expecting a [`TaskFn`].
6//!
7//! This is particularly useful when building lightweight, composable services from plain
8//! functions, including those with extracted arguments via [`FromRequest`].
9//!
10//! # Features
11//!
12//! - Supports functions with up to 16 additional arguments beyond the core request.
13//! - Automatically applies argument extraction using the [`FromRequest`] trait.
14//! - Converts output to responses using the [`IntoResponse`] trait.
15//! - Captures function argument types at compile time via generics for static dispatch.
16//!
17//! # Example
18//!
19//! ```rust
20//! # use apalis_core::task::data::Data;
21//! #[derive(Clone)]
22//! struct State;
23//!
24//! async fn handler(id: u32, state: Data<State>) -> String {
25//!     format!("Got id {} with state", id)
26//! }
27//! ```
28//! # How It Works
29//!
30//! Internally, [`task_fn`] returns a [`TaskFn`] wrapper that implements [`Service`]
31//! for [`Task`] values. When the service is called, it:
32//!
33//! 1. Extracts arguments using [`FromRequest`].
34//! 2. Calls the function with extracted arguments.
35//! 3. Wraps the result using [`IntoResponse`].
36//!
37//! [`FromRequest`]: crate::task_fn::from_request::FromRequest
38//! [`IntoResponse`]: crate::task_fn::into_response::IntoResponse
39//! [`task_fn`]: crate::task_fn::task_fn
40//! [`TaskFn`]: crate::task_fn::TaskFn
41//! [`Service`]: tower_service::Service
42//! [`Task`]: crate::task
43use crate::backend::Backend;
44use crate::error::BoxDynError;
45use crate::task::Task;
46use crate::worker::builder::IntoWorkerService;
47use futures_util::FutureExt;
48use futures_util::future::Map;
49use std::fmt;
50use std::future::Future;
51use std::marker::PhantomData;
52use std::task::{Context, Poll};
53use tower_service::Service;
54
55pub mod from_request;
56pub mod into_response;
57
58// #[doc(hidden)]
59pub mod guide;
60
61pub use self::{from_request::FromRequest, into_response::IntoResponse};
62
63/// A helper method to build a [`TaskFn`] from an async function or closure.
64///
65/// # Example
66/// ```rust
67/// # use apalis_core::task::data::Data;
68/// #[derive(Clone)]
69/// struct State {
70///     // db: Arc<DatabaseConnection>,
71/// }
72/// async fn handler(id: u32, state: Data<State>) -> String {
73///     format!("Got id {} with state", id)
74/// }   
75///```
76/// This method can take functions with up to 16 additional arguments beyond the core request.
77///
78/// See Also:
79///
80/// - [`FromRequest`]
81/// - [`IntoResponse`]
82
83pub fn task_fn<F, Args, Ctx, FnArgs>(f: F) -> TaskFn<F, Args, Ctx, FnArgs> {
84    TaskFn {
85        f,
86        req: PhantomData,
87        fn_args: PhantomData,
88    }
89}
90
91/// An executable service implemented by a closure.
92///
93/// See [`task_fn`] for more details.
94pub struct TaskFn<F, Args, Ctx, FnArgs> {
95    f: F,
96    req: PhantomData<(Args, Ctx)>,
97    fn_args: PhantomData<FnArgs>,
98}
99
100impl<T: Copy, Args, Ctx, FnArgs> Copy for TaskFn<T, Args, Ctx, FnArgs> {}
101
102impl<T: Clone, Args, Ctx, FnArgs> Clone for TaskFn<T, Args, Ctx, FnArgs> {
103    fn clone(&self) -> Self {
104        TaskFn {
105            f: self.f.clone(),
106            req: PhantomData,
107            fn_args: PhantomData,
108        }
109    }
110}
111
112impl<T, Args, Ctx, FnArgs> fmt::Debug for TaskFn<T, Args, Ctx, FnArgs> {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        f.debug_struct("TaskFn")
115            .field("f", &std::any::type_name::<T>())
116            .field(
117                "req",
118                &format_args!(
119                    "PhantomData<Task<{}, {}>>",
120                    std::any::type_name::<Args>(),
121                    std::any::type_name::<Ctx>()
122                ),
123            )
124            .field(
125                "fn_args",
126                &format_args!("PhantomData<{}>", std::any::type_name::<FnArgs>()),
127            )
128            .finish()
129    }
130}
131
132/// The Future returned from [`TaskFn`] service.
133type FnFuture<F, O, R, E> = Map<F, fn(O) -> std::result::Result<R, E>>;
134
135macro_rules! impl_service_fn {
136    ($($K:ident),+) => {
137        #[allow(unused_parens)]
138        impl<T, F, Args: Send + 'static, R, Ctx: Send + 'static, IdType: Send + Clone + 'static, $($K),+> Service<Task<Args, Ctx, IdType>> for TaskFn<T, Args, Ctx, ($($K),+)>
139        where
140            T: FnMut(Args, $($K),+) -> F + Send + Clone + 'static,
141            F: Future + Send,
142            F::Output: IntoResponse<Output = R>,
143            $(
144                $K: FromRequest<Task<Args, Ctx, IdType>> + Send,
145                < $K as FromRequest<Task<Args, Ctx, IdType>> >::Error: std::error::Error + 'static + Send + Sync,
146            )+
147        {
148            type Response = R;
149            type Error = BoxDynError;
150            type Future = futures_util::future::BoxFuture<'static, Result<R, BoxDynError>>;
151
152            fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153                Poll::Ready(Ok(()))
154            }
155
156            fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
157                let mut svc = self.f.clone();
158                #[allow(non_snake_case)]
159                let fut = async move {
160                    let results: Result<($($K),+), BoxDynError> = { Ok(($($K::from_request(&task).await.map_err(|e| Box::new(e) as BoxDynError)?),+)) };
161                    match results {
162                        Ok(($($K),+)) => {
163                            let req = task.args;
164                            (svc)(req, $($K),+).map(F::Output::into_response).await
165                        }
166                        Err(e) => Err(e),
167                    }
168                };
169                fut.boxed()
170            }
171        }
172
173        #[allow(unused_parens)]
174        impl<T, Args, Ctx, F, R, B, $($K),+>
175            IntoWorkerService<B, TaskFn<T, Args, Ctx, ($($K),+)>, Args, Ctx> for T
176        where
177            B: Backend<Args= Args, Context = Ctx>,
178            T: FnMut(Args, $($K),+) -> F + Send + Clone + 'static,
179            F: Future + Send,
180            Args: Send + 'static,
181            Ctx: Send + Sync + 'static,
182            B::IdType: Send + 'static,
183            F::Output: IntoResponse<Output = R>,
184            $(
185                $K: FromRequest<Task<Args, Ctx, B::IdType>> + Send,
186                < $K as FromRequest<Task<Args, Ctx, B::IdType>> >::Error: std::error::Error + 'static + Send + Sync,
187            )+
188        {
189            fn into_service(self, _: &B) -> TaskFn<T, Args, Ctx, ($($K),+)> {
190                task_fn(self)
191            }
192        }
193    };
194}
195
196impl<T, F, Args, R, Ctx, IdType> Service<Task<Args, Ctx, IdType>> for TaskFn<T, Args, Ctx, ()>
197where
198    T: FnMut(Args) -> F,
199    F: Future,
200    F::Output: IntoResponse<Output = R>,
201{
202    type Response = R;
203    type Error = BoxDynError;
204    type Future = FnFuture<F, F::Output, R, BoxDynError>;
205
206    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
207        Poll::Ready(Ok(()))
208    }
209
210    fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
211        let fut = (self.f)(task.args);
212
213        fut.map(F::Output::into_response)
214    }
215}
216
217impl<T, Args, Ctx, F, R, Backend> IntoWorkerService<Backend, TaskFn<T, Args, Ctx, ()>, Args, Ctx>
218    for T
219where
220    T: FnMut(Args) -> F,
221    F: Future,
222    F::Output: IntoResponse<Output = R>,
223    Backend: crate::backend::Backend<Args = Args, Context = Ctx>,
224    Args: Send,
225{
226    fn into_service(self, _: &Backend) -> TaskFn<T, Args, Ctx, ()> {
227        task_fn(self)
228    }
229}
230
231impl<Args, Ctx, S, B> IntoWorkerService<B, S, Args, Ctx> for S
232where
233    S: Service<Task<Args, Ctx, B::IdType>>,
234    B: Backend<Args = Args, Context = Ctx>,
235{
236    fn into_service(self, _: &B) -> S {
237        self
238    }
239}
240
241impl_service_fn!(A);
242impl_service_fn!(A1, A2);
243impl_service_fn!(A1, A2, A3);
244impl_service_fn!(A1, A2, A3, A4);
245impl_service_fn!(A1, A2, A3, A4, A5);
246impl_service_fn!(A1, A2, A3, A4, A5, A6);
247impl_service_fn!(A1, A2, A3, A4, A5, A6, A7);
248impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8);
249impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9);
250impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10);
251impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11);
252impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12);
253impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13);
254impl_service_fn!(A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14);
255impl_service_fn!(
256    A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15
257);
258impl_service_fn!(
259    A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16
260);