blueprint_tangle_extra/
layers.rs1use crate::extract;
6use blueprint_core::{JobCall, JobResult};
7use core::future::Future;
8use core::pin::Pin;
9use core::task::ready;
10use core::task::{Context, Poll};
11use pin_project_lite::pin_project;
12use tower::{Layer, Service};
13
14#[derive(Copy, Clone, Debug)]
16pub struct TangleSubmissionService<S> {
17 service: S,
18}
19
20pin_project! {
21 #[derive(Debug)]
23 pub struct TangleSubmissionFuture<F> {
24 #[pin]
25 kind: Kind<F>
26 }
27}
28
29impl<F> TangleSubmissionFuture<F> {
30 fn valid(future: F, call_id: extract::CallId, service_id: extract::ServiceId) -> Self {
31 Self {
32 kind: Kind::Valid {
33 future,
34 call_id,
35 service_id,
36 },
37 }
38 }
39
40 fn invalid() -> Self {
41 Self {
42 kind: Kind::Invalid,
43 }
44 }
45}
46
47pin_project! {
48 #[project = KindProj]
49 #[derive(Debug)]
50 enum Kind<F> {
51 Valid {
52 #[pin]
53 future: F,
54 call_id: extract::CallId,
55 service_id: extract::ServiceId,
56 },
57 Invalid,
58 }
59}
60
61impl<F, B, E> Future for TangleSubmissionFuture<F>
62where
63 F: Future<Output = Result<Option<JobResult<B>>, E>>,
64{
65 type Output = F::Output;
66
67 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68 match self.project().kind.project() {
69 KindProj::Valid {
70 future,
71 call_id,
72 service_id,
73 } => {
74 let result = ready!(future.poll(cx)?);
75 match result {
76 Some(mut result) => {
77 let JobResult::Ok { head, .. } = &mut result else {
78 return Poll::Ready(Ok(Some(result)));
80 };
81
82 head.metadata
83 .insert(extract::CallId::METADATA_KEY, call_id.0);
84 head.metadata
85 .insert(extract::ServiceId::METADATA_KEY, service_id.0);
86 Poll::Ready(Ok(Some(result)))
87 }
88 None => Poll::Ready(Ok(None)),
89 }
90 }
91 KindProj::Invalid => {
92 Poll::Ready(Ok(None))
94 }
95 }
96 }
97}
98
99impl<S> Service<JobCall> for TangleSubmissionService<S>
100where
101 S: Service<JobCall, Response = Option<JobResult>>,
102{
103 type Response = S::Response;
104 type Error = S::Error;
105 type Future = TangleSubmissionFuture<S::Future>;
106
107 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
108 self.service.poll_ready(cx)
109 }
110
111 fn call(&mut self, call: JobCall) -> Self::Future {
112 let (mut parts, body) = call.into_parts();
113 let Ok(call_id) = extract::CallId::try_from(&mut parts) else {
114 return TangleSubmissionFuture::invalid();
115 };
116 let Ok(service_id) = extract::ServiceId::try_from(&mut parts) else {
117 return TangleSubmissionFuture::invalid();
118 };
119
120 let call = JobCall::from_parts(parts, body);
121 TangleSubmissionFuture::valid(self.service.call(call), call_id, service_id)
122 }
123}
124
125#[derive(Copy, Clone, Debug, Default)]
133pub struct TangleLayer;
134
135impl<S> Layer<S> for TangleLayer {
136 type Service = TangleSubmissionService<S>;
137
138 fn layer(&self, service: S) -> Self::Service {
139 TangleSubmissionService { service }
140 }
141}