blueprint_core/job/mod.rs
1//! Async functions that can be used to handle jobs.
2#![doc = include_str!("../../docs/jobs_intro.md")]
3//!
4//! Some examples of jobs:
5//!
6//! ```rust
7//! use blueprint_sdk::Bytes;
8//!
9//! // Job that immediately returns an empty result.
10//! async fn unit() {}
11//!
12//! // Job that immediately returns a result with a body of "Hello, World!".
13//! async fn string() -> String {
14//! "Hello, World!".to_string()
15//! }
16//!
17//! // Job that buffers the request body and returns it.
18//! //
19//! // This works because `Bytes` implements `FromJobCall`
20//! // and therefore can be used as an extractor.
21//! //
22//! // `String` implements `IntoJobResult` and therefore `Result<String, String>`
23//! // also implements `IntoJobResult`
24//! async fn echo(body: Bytes) -> Result<String, String> {
25//! if let Ok(string) = String::from_utf8(body.to_vec()) {
26//! Ok(string)
27//! } else {
28//! Err(String::from("Invalid UTF-8"))
29//! }
30//! }
31//! ```
32//!
33//! Instead of a direct `String`, it makes sense to use an intermediate error type
34//! that can ultimately be converted to `JobResult`. This allows using the `?` operator
35//! in jobs.
36#![doc = include_str!("../../docs/debugging_job_type_errors.md")]
37
38use crate::{
39 JobCall, JobResult,
40 extract::{FromJobCall, FromJobCallParts},
41};
42use alloc::boxed::Box;
43use core::{fmt, future::Future, marker::PhantomData, pin::Pin};
44use tower::{Layer, Service, ServiceExt};
45
46pub mod call;
47pub mod future;
48mod id;
49pub use id::*;
50pub mod result;
51pub mod service;
52use result::IntoJobResult;
53
54pub use self::service::JobService;
55
56/// Trait for async functions that can be used to handle requests.
57///
58/// You shouldn't need to depend on this trait directly. It is automatically
59/// implemented for functions of the right types.
60///
61/// See the [module docs](crate::job) for more details.
62///
63/// # Converting `Job`s into [`Service`]s
64///
65/// To convert `Job`s into [`Service`]s you have to call either
66/// [`JobWithoutContextExt::into_service`] or [`Job::with_context`]:
67///
68/// ```
69/// use blueprint_sdk::extract::Context;
70/// use blueprint_sdk::job::JobWithoutContextExt;
71/// use blueprint_sdk::{Job, JobCall};
72/// use tower::Service;
73///
74/// // this job doesn't require any state
75/// async fn one() {}
76/// // so it can be converted to a service with `JobWithoutContextExt::into_service`
77/// assert_service(one.into_service());
78///
79/// // this job requires a context
80/// async fn two(_: Context<String>) {}
81/// // so we have to provide it
82/// let job_with_state = two.with_context(String::new());
83/// // which gives us a `Service`
84/// assert_service(job_with_state);
85///
86/// // helper to check that a value implements `Service`
87/// fn assert_service<S>(service: S)
88/// where
89/// S: Service<JobCall>,
90/// {
91/// }
92/// ```
93#[doc = include_str!("../../docs/debugging_job_type_errors.md")]
94///
95/// # Jobs that aren't functions
96///
97/// The `Job` trait is also implemented for `T: IntoJobResult`. That allows easily returning
98/// fixed data for routes:
99///
100/// ```
101/// use blueprint_sdk::Router;
102/// use serde_json::json;
103///
104/// const HELLO_JOB_ID: u32 = 0;
105/// const USERS_JOB_ID: u32 = 1;
106///
107/// let app = Router::new()
108/// // respond with a fixed string
109/// .route(HELLO_JOB_ID, "Hello, World!")
110/// // or return some mock data
111/// .route(USERS_JOB_ID, json!({ "id": 1, "username": "alice" }).to_string());
112/// # let _: Router = app;
113/// ```
114#[diagnostic::on_unimplemented(
115 note = "Consider using `#[blueprint_sdk::debug_job]` to improve the error message"
116)]
117pub trait Job<T, Ctx>: Clone + Send + Sync + Sized + 'static {
118 /// The type of future calling this job returns.
119 type Future: Future<Output = Option<JobResult>> + Send + 'static;
120
121 /// Call the job with the given request.
122 fn call(self, call: JobCall, ctx: Ctx) -> Self::Future;
123
124 /// Apply a [`tower::Layer`] to the job.
125 ///
126 /// All requests to the job will be processed by the layer's
127 /// corresponding middleware.
128 ///
129 /// This can be used to add additional processing to a request for a single
130 /// job.
131 ///
132 /// Note this differs from [`routing::Router::layer`]
133 /// which adds a middleware to a group of routes.
134 ///
135 /// If you're applying middleware that produces errors you have to handle the errors
136 /// so they're converted into responses. You can learn more about doing that
137 /// [here](crate::error_handling).
138 ///
139 /// # Example
140 ///
141 /// Adding the [`tower::limit::ConcurrencyLimit`] middleware to a job
142 /// can be done like so:
143 ///
144 /// ```rust
145 /// use blueprint_sdk::{Job, Router};
146 /// use tower::limit::{ConcurrencyLimit, ConcurrencyLimitLayer};
147 ///
148 /// async fn job() { /* ... */
149 /// }
150 ///
151 /// const MY_JOB_ID: u32 = 0;
152 ///
153 /// let layered_job = job.layer(ConcurrencyLimitLayer::new(64));
154 /// let app = Router::new().route(MY_JOB_ID, layered_job);
155 /// # let _: Router = app;
156 /// ```
157 ///
158 /// [`routing::Router::layer`]: https://docs.rs/blueprint-sdk/latest/blueprint_sdk/struct.Router.html#method.layer
159 fn layer<L>(self, layer: L) -> Layered<L, Self, T, Ctx>
160 where
161 L: Layer<JobService<Self, T, Ctx>> + Clone,
162 L::Service: Service<JobCall>,
163 {
164 Layered {
165 layer,
166 job: self,
167 _marker: PhantomData,
168 }
169 }
170
171 /// Convert the job into a [`Service`] by providing the context
172 fn with_context(self, ctx: Ctx) -> JobService<Self, T, Ctx> {
173 JobService::new(self, ctx)
174 }
175}
176
177impl<F, Fut, Res, Ctx> Job<((),), Ctx> for F
178where
179 F: FnOnce() -> Fut + Clone + Send + Sync + 'static,
180 Fut: Future<Output = Res> + Send,
181 Res: IntoJobResult,
182{
183 type Future = Pin<Box<dyn Future<Output = Option<JobResult>> + Send>>;
184
185 fn call(self, _call: JobCall, _ctx: Ctx) -> Self::Future {
186 Box::pin(async move { self().await.into_job_result() })
187 }
188}
189
190macro_rules! impl_job {
191 (
192 [$($ty:ident),*], $last:ident
193 ) => {
194 #[allow(non_snake_case, unused_mut)]
195 impl<F, Fut, Ctx, Res, M, $($ty,)* $last> Job<(M, $($ty,)* $last,), Ctx> for F
196 where
197 F: FnOnce($($ty,)* $last,) -> Fut + Clone + Send + Sync + 'static,
198 Fut: Future<Output = Res> + Send,
199 Ctx: Send + Sync + 'static,
200 Res: IntoJobResult,
201 $( $ty: FromJobCallParts<Ctx> + Send, )*
202 $last: FromJobCall<Ctx, M> + Send,
203 {
204 type Future = Pin<Box<dyn Future<Output = Option<JobResult>> + Send>>;
205
206 fn call(self, call: JobCall, context: Ctx) -> Self::Future {
207 Box::pin(async move {
208 let (mut parts, body) = call.into_parts();
209 let context = &context;
210
211 $(
212 let $ty = match $ty::from_job_call_parts(&mut parts, context).await {
213 Ok(value) => value,
214 Err(rejection) => return rejection.into_job_result(),
215 };
216 )*
217
218 let call = JobCall::from_parts(parts, body);
219
220 let $last = match $last::from_job_call(call, context).await {
221 Ok(value) => value,
222 Err(rejection) => return rejection.into_job_result(),
223 };
224
225 let res = self($($ty,)* $last,).await;
226
227 res.into_job_result()
228 })
229 }
230 }
231 };
232}
233
234all_the_tuples!(impl_job);
235
236mod private {
237 // Marker type for `impl<T: IntoJobResult> Job for T`
238 #[allow(missing_debug_implementations)]
239 pub enum IntoJobResultHandler {}
240}
241
242impl<T, Ctx> Job<private::IntoJobResultHandler, Ctx> for T
243where
244 T: IntoJobResult + Clone + Send + Sync + 'static,
245{
246 type Future = core::future::Ready<Option<JobResult>>;
247
248 fn call(self, _call: JobCall, _ctx: Ctx) -> Self::Future {
249 core::future::ready(self.into_job_result())
250 }
251}
252
253/// A [`Service`] created from a [`Job`] by applying a Tower middleware.
254///
255/// Created with [`Job::layer`]. See that method for more details.
256pub struct Layered<L, J, T, Ctx> {
257 layer: L,
258 job: J,
259 _marker: PhantomData<fn() -> (T, Ctx)>,
260}
261
262impl<L, J, T, Ctx> fmt::Debug for Layered<L, J, T, Ctx>
263where
264 L: fmt::Debug,
265{
266 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267 f.debug_struct("Layered")
268 .field("layer", &self.layer)
269 .finish()
270 }
271}
272
273impl<L, J, T, Ctx> Clone for Layered<L, J, T, Ctx>
274where
275 L: Clone,
276 J: Clone,
277{
278 fn clone(&self) -> Self {
279 Self {
280 layer: self.layer.clone(),
281 job: self.job.clone(),
282 _marker: PhantomData,
283 }
284 }
285}
286
287impl<L, J, Ctx, T> Job<T, Ctx> for Layered<L, J, T, Ctx>
288where
289 L: Layer<JobService<J, T, Ctx>> + Clone + Send + Sync + 'static,
290 L::Service: Service<JobCall> + Clone + Send + 'static,
291 <L::Service as Service<JobCall>>::Response: IntoJobResult,
292 <L::Service as Service<JobCall>>::Future: Send,
293 J: Job<T, Ctx>,
294 T: 'static,
295 Ctx: 'static,
296{
297 type Future = future::LayeredFuture<L::Service>;
298
299 fn call(self, call: JobCall, context: Ctx) -> Self::Future {
300 use futures_util::future::{FutureExt, Map};
301
302 let svc = self.job.with_context(context);
303 let svc = self.layer.layer(svc);
304
305 #[allow(clippy::type_complexity)]
306 let future: Map<
307 _,
308 fn(
309 Result<
310 <L::Service as Service<JobCall>>::Response,
311 <L::Service as Service<JobCall>>::Error,
312 >,
313 ) -> _,
314 > = svc.oneshot(call).map(|result| match result {
315 Ok(res) => res.into_job_result(),
316 Err(_err) => todo!("JobService needs to return a result"),
317 });
318
319 future::LayeredFuture::new(future)
320 }
321}
322
323/// Extension trait for [`Job`]s that don't have context.
324///
325/// This provides convenience methods to convert the [`Job`] into a [`Service`].
326pub trait JobWithoutContextExt<T>: Job<T, ()> {
327 /// Convert the handler into a [`Service`] and no context.
328 fn into_service(self) -> JobService<Self, T, ()>;
329}
330
331impl<H, T> JobWithoutContextExt<T> for H
332where
333 H: Job<T, ()>,
334{
335 fn into_service(self) -> JobService<Self, T, ()> {
336 self.with_context(())
337 }
338}