#![doc = include_str!("../../docs/jobs_intro.md")]
#![doc = include_str!("../../docs/debugging_job_type_errors.md")]
use crate::{
JobCall, JobResult,
extract::{FromJobCall, FromJobCallParts},
};
use alloc::boxed::Box;
use core::{fmt, future::Future, marker::PhantomData, pin::Pin};
use tower::{Layer, Service, ServiceExt};
pub mod call;
pub mod future;
mod id;
pub use id::*;
pub mod result;
pub mod service;
use result::IntoJobResult;
pub use self::service::JobService;
#[doc = include_str!("../../docs/debugging_job_type_errors.md")]
#[diagnostic::on_unimplemented(
note = "Consider using `#[blueprint_sdk::debug_job]` to improve the error message"
)]
pub trait Job<T, Ctx>: Clone + Send + Sync + Sized + 'static {
type Future: Future<Output = Option<JobResult>> + Send + 'static;
fn call(self, call: JobCall, ctx: Ctx) -> Self::Future;
fn layer<L>(self, layer: L) -> Layered<L, Self, T, Ctx>
where
L: Layer<JobService<Self, T, Ctx>> + Clone,
L::Service: Service<JobCall>,
{
Layered {
layer,
job: self,
_marker: PhantomData,
}
}
fn with_context(self, ctx: Ctx) -> JobService<Self, T, Ctx> {
JobService::new(self, ctx)
}
}
impl<F, Fut, Res, Ctx> Job<((),), Ctx> for F
where
F: FnOnce() -> Fut + Clone + Send + Sync + 'static,
Fut: Future<Output = Res> + Send,
Res: IntoJobResult,
{
type Future = Pin<Box<dyn Future<Output = Option<JobResult>> + Send>>;
fn call(self, _call: JobCall, _ctx: Ctx) -> Self::Future {
Box::pin(async move { self().await.into_job_result() })
}
}
macro_rules! impl_job {
(
[$($ty:ident),*], $last:ident
) => {
#[allow(non_snake_case, unused_mut)]
impl<F, Fut, Ctx, Res, M, $($ty,)* $last> Job<(M, $($ty,)* $last,), Ctx> for F
where
F: FnOnce($($ty,)* $last,) -> Fut + Clone + Send + Sync + 'static,
Fut: Future<Output = Res> + Send,
Ctx: Send + Sync + 'static,
Res: IntoJobResult,
$( $ty: FromJobCallParts<Ctx> + Send, )*
$last: FromJobCall<Ctx, M> + Send,
{
type Future = Pin<Box<dyn Future<Output = Option<JobResult>> + Send>>;
fn call(self, call: JobCall, context: Ctx) -> Self::Future {
Box::pin(async move {
let (mut parts, body) = call.into_parts();
let context = &context;
$(
let $ty = match $ty::from_job_call_parts(&mut parts, context).await {
Ok(value) => value,
Err(rejection) => return rejection.into_job_result(),
};
)*
let call = JobCall::from_parts(parts, body);
let $last = match $last::from_job_call(call, context).await {
Ok(value) => value,
Err(rejection) => return rejection.into_job_result(),
};
let res = self($($ty,)* $last,).await;
res.into_job_result()
})
}
}
};
}
all_the_tuples!(impl_job);
mod private {
#[allow(missing_debug_implementations)]
pub enum IntoJobResultHandler {}
}
impl<T, Ctx> Job<private::IntoJobResultHandler, Ctx> for T
where
T: IntoJobResult + Clone + Send + Sync + 'static,
{
type Future = core::future::Ready<Option<JobResult>>;
fn call(self, _call: JobCall, _ctx: Ctx) -> Self::Future {
core::future::ready(self.into_job_result())
}
}
pub struct Layered<L, J, T, Ctx> {
layer: L,
job: J,
_marker: PhantomData<fn() -> (T, Ctx)>,
}
#[allow(clippy::missing_fields_in_debug)] impl<L, J, T, Ctx> fmt::Debug for Layered<L, J, T, Ctx>
where
L: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Layered")
.field("layer", &self.layer)
.finish()
}
}
impl<L, J, T, Ctx> Clone for Layered<L, J, T, Ctx>
where
L: Clone,
J: Clone,
{
fn clone(&self) -> Self {
Self {
layer: self.layer.clone(),
job: self.job.clone(),
_marker: PhantomData,
}
}
}
impl<L, J, Ctx, T> Job<T, Ctx> for Layered<L, J, T, Ctx>
where
L: Layer<JobService<J, T, Ctx>> + Clone + Send + Sync + 'static,
L::Service: Service<JobCall> + Clone + Send + 'static,
<L::Service as Service<JobCall>>::Response: IntoJobResult,
<L::Service as Service<JobCall>>::Future: Send,
J: Job<T, Ctx>,
T: 'static,
Ctx: 'static,
{
type Future = future::LayeredFuture<L::Service>;
fn call(self, call: JobCall, context: Ctx) -> Self::Future {
use futures_util::future::{FutureExt, Map};
let svc = self.job.with_context(context);
let svc = self.layer.layer(svc);
#[allow(clippy::type_complexity)]
let future: Map<
_,
fn(
Result<
<L::Service as Service<JobCall>>::Response,
<L::Service as Service<JobCall>>::Error,
>,
) -> _,
> = svc.oneshot(call).map(|result| match result {
Ok(res) => res.into_job_result(),
Err(_err) => None,
});
future::LayeredFuture::new(future)
}
}
pub trait JobWithoutContextExt<T>: Job<T, ()> {
fn into_service(self) -> JobService<Self, T, ()>;
}
impl<H, T> JobWithoutContextExt<T> for H
where
H: Job<T, ()>,
{
fn into_service(self) -> JobService<Self, T, ()> {
self.with_context(())
}
}