blueprint_core/job/
service.rs

1use super::Job;
2use super::call::JobCall;
3
4use crate::JobResult;
5use crate::error::BoxError;
6use bytes::Bytes;
7use core::{
8    fmt,
9    marker::PhantomData,
10    task::{Context, Poll},
11};
12use tower::Service;
13
14/// An adapter that makes a [`Job`] into a [`Service`].
15///
16/// Created with [`Job::with_context`] or [`JobWithoutContextExt::into_service`].
17///
18/// [`JobWithoutContextExt::into_service`]: super::JobWithoutContextExt::into_service
19pub struct JobService<J, T, Ctx> {
20    job: J,
21    ctx: Ctx,
22    _marker: PhantomData<fn() -> T>,
23}
24
25impl<J, T, Ctx> JobService<J, T, Ctx> {
26    /// Get a reference to the state.
27    pub fn context(&self) -> &Ctx {
28        &self.ctx
29    }
30}
31
32#[test]
33fn traits() {
34    pub(crate) fn assert_send<T: Send>() {}
35    pub(crate) fn assert_sync<T: Sync>() {}
36    #[allow(dead_code)]
37    pub(crate) struct NotSendSync(*const ());
38    assert_send::<JobService<(), NotSendSync, ()>>();
39    assert_sync::<JobService<(), NotSendSync, ()>>();
40}
41
42impl<J, T, Ctx> JobService<J, T, Ctx> {
43    pub(super) fn new(job: J, ctx: Ctx) -> Self {
44        Self {
45            job,
46            ctx,
47            _marker: PhantomData,
48        }
49    }
50}
51
52impl<J, T, Ctx> fmt::Debug for JobService<J, T, Ctx> {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("IntoService").finish_non_exhaustive()
55    }
56}
57
58impl<J, T, Ctx> Clone for JobService<J, T, Ctx>
59where
60    J: Clone,
61    Ctx: Clone,
62{
63    fn clone(&self) -> Self {
64        Self {
65            job: self.job.clone(),
66            ctx: self.ctx.clone(),
67            _marker: PhantomData,
68        }
69    }
70}
71
72impl<J, T, Ctx, B> Service<JobCall<B>> for JobService<J, T, Ctx>
73where
74    J: Job<T, Ctx> + Clone + Send + 'static,
75    B: Into<Bytes> + Send + 'static,
76    Ctx: Clone + Send + Sync,
77{
78    type Response = Option<JobResult>;
79    type Error = BoxError;
80    type Future = super::future::IntoServiceFuture<J::Future>;
81
82    #[inline]
83    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
84        // `IntoService` can only be constructed from async functions which are always ready, or
85        // from `Layered` which buffers in `<Layered as Handler>::call` and is therefore
86        // also always ready.
87        Poll::Ready(Ok(()))
88    }
89
90    fn call(&mut self, call: JobCall<B>) -> Self::Future {
91        use futures_util::future::FutureExt;
92
93        let call = call.map(Into::into);
94
95        let handler = self.job.clone();
96        let future = Job::call(handler, call, self.ctx.clone());
97        let future = future.map(Ok as _);
98
99        super::future::IntoServiceFuture::new(future)
100    }
101}