blueprint_core/job/
mod.rs

1//! Async functions that can be used to handle jobs.
2#![doc = include_str!("../../docs/jobs_intro.md")]
3//!
4//! Some examples of jobs:
5//!
6//! ```rust
7//! use blueprint_sdk::Bytes;
8//!
9//! // Job that immediately returns an empty result.
10//! async fn unit() {}
11//!
12//! // Job that immediately returns a result with a body of "Hello, World!".
13//! async fn string() -> String {
14//!     "Hello, World!".to_string()
15//! }
16//!
17//! // Job that buffers the request body and returns it.
18//! //
19//! // This works because `Bytes` implements `FromJobCall`
20//! // and therefore can be used as an extractor.
21//! //
22//! // `String` implements `IntoJobResult` and therefore `Result<String, String>`
23//! // also implements `IntoJobResult`
24//! async fn echo(body: Bytes) -> Result<String, String> {
25//!     if let Ok(string) = String::from_utf8(body.to_vec()) {
26//!         Ok(string)
27//!     } else {
28//!         Err(String::from("Invalid UTF-8"))
29//!     }
30//! }
31//! ```
32//!
33//! Instead of a direct `String`, it makes sense to use an intermediate error type
34//! that can ultimately be converted to `JobResult`. This allows using the `?` operator
35//! in jobs.
36#![doc = include_str!("../../docs/debugging_job_type_errors.md")]
37
38use crate::{
39    JobCall, JobResult,
40    extract::{FromJobCall, FromJobCallParts},
41};
42use alloc::boxed::Box;
43use core::{fmt, future::Future, marker::PhantomData, pin::Pin};
44use tower::{Layer, Service, ServiceExt};
45
46pub mod call;
47pub mod future;
48mod id;
49pub use id::*;
50pub mod result;
51pub mod service;
52use result::IntoJobResult;
53
54pub use self::service::JobService;
55
56/// Trait for async functions that can be used to handle requests.
57///
58/// You shouldn't need to depend on this trait directly. It is automatically
59/// implemented for functions of the right types.
60///
61/// See the [module docs](crate::job) for more details.
62///
63/// # Converting `Job`s into [`Service`]s
64///
65/// To convert `Job`s into [`Service`]s you have to call either
66/// [`JobWithoutContextExt::into_service`] or [`Job::with_context`]:
67///
68/// ```
69/// use blueprint_sdk::extract::Context;
70/// use blueprint_sdk::job::JobWithoutContextExt;
71/// use blueprint_sdk::{Job, JobCall};
72/// use tower::Service;
73///
74/// // this job doesn't require any state
75/// async fn one() {}
76/// // so it can be converted to a service with `JobWithoutContextExt::into_service`
77/// assert_service(one.into_service());
78///
79/// // this job requires a context
80/// async fn two(_: Context<String>) {}
81/// // so we have to provide it
82/// let job_with_state = two.with_context(String::new());
83/// // which gives us a `Service`
84/// assert_service(job_with_state);
85///
86/// // helper to check that a value implements `Service`
87/// fn assert_service<S>(service: S)
88/// where
89///     S: Service<JobCall>,
90/// {
91/// }
92/// ```
93#[doc = include_str!("../../docs/debugging_job_type_errors.md")]
94///
95/// # Jobs that aren't functions
96///
97/// The `Job` trait is also implemented for `T: IntoJobResult`. That allows easily returning
98/// fixed data for routes:
99///
100/// ```
101/// use blueprint_sdk::Router;
102/// use serde_json::json;
103///
104/// const HELLO_JOB_ID: u32 = 0;
105/// const USERS_JOB_ID: u32 = 1;
106///
107/// let app = Router::new()
108///     // respond with a fixed string
109///     .route(HELLO_JOB_ID, "Hello, World!")
110///     // or return some mock data
111///     .route(USERS_JOB_ID, json!({ "id": 1, "username": "alice" }).to_string());
112/// # let _: Router = app;
113/// ```
114#[diagnostic::on_unimplemented(
115    note = "Consider using `#[blueprint_sdk::debug_job]` to improve the error message"
116)]
117pub trait Job<T, Ctx>: Clone + Send + Sync + Sized + 'static {
118    /// The type of future calling this job returns.
119    type Future: Future<Output = Option<JobResult>> + Send + 'static;
120
121    /// Call the job with the given request.
122    fn call(self, call: JobCall, ctx: Ctx) -> Self::Future;
123
124    /// Apply a [`tower::Layer`] to the job.
125    ///
126    /// All requests to the job will be processed by the layer's
127    /// corresponding middleware.
128    ///
129    /// This can be used to add additional processing to a request for a single
130    /// job.
131    ///
132    /// Note this differs from [`routing::Router::layer`]
133    /// which adds a middleware to a group of routes.
134    ///
135    /// If you're applying middleware that produces errors you have to handle the errors
136    /// so they're converted into responses. You can learn more about doing that
137    /// [here](crate::error_handling).
138    ///
139    /// # Example
140    ///
141    /// Adding the [`tower::limit::ConcurrencyLimit`] middleware to a job
142    /// can be done like so:
143    ///
144    /// ```rust
145    /// use blueprint_sdk::{Job, Router};
146    /// use tower::limit::{ConcurrencyLimit, ConcurrencyLimitLayer};
147    ///
148    /// async fn job() { /* ... */
149    /// }
150    ///
151    /// const MY_JOB_ID: u32 = 0;
152    ///
153    /// let layered_job = job.layer(ConcurrencyLimitLayer::new(64));
154    /// let app = Router::new().route(MY_JOB_ID, layered_job);
155    /// # let _: Router = app;
156    /// ```
157    ///
158    /// [`routing::Router::layer`]: https://docs.rs/blueprint-sdk/latest/blueprint_sdk/struct.Router.html#method.layer
159    fn layer<L>(self, layer: L) -> Layered<L, Self, T, Ctx>
160    where
161        L: Layer<JobService<Self, T, Ctx>> + Clone,
162        L::Service: Service<JobCall>,
163    {
164        Layered {
165            layer,
166            job: self,
167            _marker: PhantomData,
168        }
169    }
170
171    /// Convert the job into a [`Service`] by providing the context
172    fn with_context(self, ctx: Ctx) -> JobService<Self, T, Ctx> {
173        JobService::new(self, ctx)
174    }
175}
176
177impl<F, Fut, Res, Ctx> Job<((),), Ctx> for F
178where
179    F: FnOnce() -> Fut + Clone + Send + Sync + 'static,
180    Fut: Future<Output = Res> + Send,
181    Res: IntoJobResult,
182{
183    type Future = Pin<Box<dyn Future<Output = Option<JobResult>> + Send>>;
184
185    fn call(self, _call: JobCall, _ctx: Ctx) -> Self::Future {
186        Box::pin(async move { self().await.into_job_result() })
187    }
188}
189
190macro_rules! impl_job {
191    (
192        [$($ty:ident),*], $last:ident
193    ) => {
194        #[allow(non_snake_case, unused_mut)]
195        impl<F, Fut, Ctx, Res, M, $($ty,)* $last> Job<(M, $($ty,)* $last,), Ctx> for F
196        where
197            F: FnOnce($($ty,)* $last,) -> Fut + Clone + Send + Sync + 'static,
198            Fut: Future<Output = Res> + Send,
199            Ctx: Send + Sync + 'static,
200            Res: IntoJobResult,
201            $( $ty: FromJobCallParts<Ctx> + Send, )*
202            $last: FromJobCall<Ctx, M> + Send,
203        {
204            type Future = Pin<Box<dyn Future<Output = Option<JobResult>> + Send>>;
205
206            fn call(self, call: JobCall, context: Ctx) -> Self::Future {
207                Box::pin(async move {
208                    let (mut parts, body) = call.into_parts();
209                    let context = &context;
210
211                    $(
212                        let $ty = match $ty::from_job_call_parts(&mut parts, context).await {
213                            Ok(value) => value,
214                            Err(rejection) => return rejection.into_job_result(),
215                        };
216                    )*
217
218                    let call = JobCall::from_parts(parts, body);
219
220                    let $last = match $last::from_job_call(call, context).await {
221                        Ok(value) => value,
222                        Err(rejection) => return rejection.into_job_result(),
223                    };
224
225                    let res = self($($ty,)* $last,).await;
226
227                    res.into_job_result()
228                })
229            }
230        }
231    };
232}
233
234all_the_tuples!(impl_job);
235
236mod private {
237    // Marker type for `impl<T: IntoJobResult> Job for T`
238    #[allow(missing_debug_implementations)]
239    pub enum IntoJobResultHandler {}
240}
241
242impl<T, Ctx> Job<private::IntoJobResultHandler, Ctx> for T
243where
244    T: IntoJobResult + Clone + Send + Sync + 'static,
245{
246    type Future = core::future::Ready<Option<JobResult>>;
247
248    fn call(self, _call: JobCall, _ctx: Ctx) -> Self::Future {
249        core::future::ready(self.into_job_result())
250    }
251}
252
253/// A [`Service`] created from a [`Job`] by applying a Tower middleware.
254///
255/// Created with [`Job::layer`]. See that method for more details.
256pub struct Layered<L, J, T, Ctx> {
257    layer: L,
258    job: J,
259    _marker: PhantomData<fn() -> (T, Ctx)>,
260}
261
262impl<L, J, T, Ctx> fmt::Debug for Layered<L, J, T, Ctx>
263where
264    L: fmt::Debug,
265{
266    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267        f.debug_struct("Layered")
268            .field("layer", &self.layer)
269            .finish()
270    }
271}
272
273impl<L, J, T, Ctx> Clone for Layered<L, J, T, Ctx>
274where
275    L: Clone,
276    J: Clone,
277{
278    fn clone(&self) -> Self {
279        Self {
280            layer: self.layer.clone(),
281            job: self.job.clone(),
282            _marker: PhantomData,
283        }
284    }
285}
286
287impl<L, J, Ctx, T> Job<T, Ctx> for Layered<L, J, T, Ctx>
288where
289    L: Layer<JobService<J, T, Ctx>> + Clone + Send + Sync + 'static,
290    L::Service: Service<JobCall> + Clone + Send + 'static,
291    <L::Service as Service<JobCall>>::Response: IntoJobResult,
292    <L::Service as Service<JobCall>>::Future: Send,
293    J: Job<T, Ctx>,
294    T: 'static,
295    Ctx: 'static,
296{
297    type Future = future::LayeredFuture<L::Service>;
298
299    fn call(self, call: JobCall, context: Ctx) -> Self::Future {
300        use futures_util::future::{FutureExt, Map};
301
302        let svc = self.job.with_context(context);
303        let svc = self.layer.layer(svc);
304
305        #[allow(clippy::type_complexity)]
306        let future: Map<
307            _,
308            fn(
309                Result<
310                    <L::Service as Service<JobCall>>::Response,
311                    <L::Service as Service<JobCall>>::Error,
312                >,
313            ) -> _,
314        > = svc.oneshot(call).map(|result| match result {
315            Ok(res) => res.into_job_result(),
316            Err(_err) => todo!("JobService needs to return a result"),
317        });
318
319        future::LayeredFuture::new(future)
320    }
321}
322
323/// Extension trait for [`Job`]s that don't have context.
324///
325/// This provides convenience methods to convert the [`Job`] into a [`Service`].
326pub trait JobWithoutContextExt<T>: Job<T, ()> {
327    /// Convert the handler into a [`Service`] and no context.
328    fn into_service(self) -> JobService<Self, T, ()>;
329}
330
331impl<H, T> JobWithoutContextExt<T> for H
332where
333    H: Job<T, ()>,
334{
335    fn into_service(self) -> JobService<Self, T, ()> {
336        self.with_context(())
337    }
338}