Skip to main content

blueprint_tangle_extra/
layers.rs

1//! Tangle Layers
2//!
3//! Middleware layers for processing Tangle job calls and results.
4
5use crate::extract;
6use blueprint_core::{JobCall, JobResult};
7use core::future::Future;
8use core::pin::Pin;
9use core::task::ready;
10use core::task::{Context, Poll};
11use pin_project_lite::pin_project;
12use tower::{Layer, Service};
13
14/// A layer service that attaches Tangle metadata to job results
15#[derive(Copy, Clone, Debug)]
16pub struct TangleSubmissionService<S> {
17    service: S,
18}
19
20pin_project! {
21    /// Response future of [`TangleSubmissionService`].
22    #[derive(Debug)]
23    pub struct TangleSubmissionFuture<F> {
24        #[pin]
25        kind: Kind<F>
26    }
27}
28
29impl<F> TangleSubmissionFuture<F> {
30    fn valid(future: F, call_id: extract::CallId, service_id: extract::ServiceId) -> Self {
31        Self {
32            kind: Kind::Valid {
33                future,
34                call_id,
35                service_id,
36            },
37        }
38    }
39
40    fn invalid() -> Self {
41        Self {
42            kind: Kind::Invalid,
43        }
44    }
45}
46
47pin_project! {
48    #[project = KindProj]
49    #[derive(Debug)]
50    enum Kind<F> {
51        Valid {
52            #[pin]
53            future: F,
54            call_id: extract::CallId,
55            service_id: extract::ServiceId,
56        },
57        Invalid,
58    }
59}
60
61impl<F, B, E> Future for TangleSubmissionFuture<F>
62where
63    F: Future<Output = Result<Option<JobResult<B>>, E>>,
64{
65    type Output = F::Output;
66
67    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68        match self.project().kind.project() {
69            KindProj::Valid {
70                future,
71                call_id,
72                service_id,
73            } => {
74                let result = ready!(future.poll(cx)?);
75                match result {
76                    Some(mut result) => {
77                        let JobResult::Ok { head, .. } = &mut result else {
78                            // Result is an error, ignore
79                            return Poll::Ready(Ok(Some(result)));
80                        };
81
82                        head.metadata
83                            .insert(extract::CallId::METADATA_KEY, call_id.0);
84                        head.metadata
85                            .insert(extract::ServiceId::METADATA_KEY, service_id.0);
86                        Poll::Ready(Ok(Some(result)))
87                    }
88                    None => Poll::Ready(Ok(None)),
89                }
90            }
91            KindProj::Invalid => {
92                // Malformed call, ignore
93                Poll::Ready(Ok(None))
94            }
95        }
96    }
97}
98
99impl<S> Service<JobCall> for TangleSubmissionService<S>
100where
101    S: Service<JobCall, Response = Option<JobResult>>,
102{
103    type Response = S::Response;
104    type Error = S::Error;
105    type Future = TangleSubmissionFuture<S::Future>;
106
107    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
108        self.service.poll_ready(cx)
109    }
110
111    fn call(&mut self, call: JobCall) -> Self::Future {
112        let (mut parts, body) = call.into_parts();
113        let Ok(call_id) = extract::CallId::try_from(&mut parts) else {
114            return TangleSubmissionFuture::invalid();
115        };
116        let Ok(service_id) = extract::ServiceId::try_from(&mut parts) else {
117            return TangleSubmissionFuture::invalid();
118        };
119
120        let call = JobCall::from_parts(parts, body);
121        TangleSubmissionFuture::valid(self.service.call(call), call_id, service_id)
122    }
123}
124
125/// A layer to make [`JobResult`]s visible to a [`TangleConsumer`]
126///
127/// This layer extracts the `call_id` and `service_id` from incoming job calls
128/// and attaches them to the job results, enabling the consumer to submit
129/// results to the correct service and call.
130///
131/// [`TangleConsumer`]: crate::consumer::TangleConsumer
132#[derive(Copy, Clone, Debug, Default)]
133pub struct TangleLayer;
134
135impl<S> Layer<S> for TangleLayer {
136    type Service = TangleSubmissionService<S>;
137
138    fn layer(&self, service: S) -> Self::Service {
139        TangleSubmissionService { service }
140    }
141}