blueprint_router/
future.rs1use blueprint_core::{IntoJobResult, JobCall, JobResult};
7use bytes::Bytes;
8use core::{
9 fmt,
10 future::Future,
11 pin::Pin,
12 task::{Context, Poll, ready},
13};
14use pin_project_lite::pin_project;
15use tower::util::{BoxCloneSyncService, MapErrLayer, MapResponseLayer, Oneshot};
16use tower::{BoxError, Layer, Service, ServiceExt};
17
18pub struct Route<E = BoxError>(BoxCloneSyncService<JobCall, Option<JobResult>, E>);
23
24impl<E> Route<E> {
25 pub(crate) fn new<T>(svc: T) -> Self
26 where
27 T: Service<JobCall, Error = E> + Clone + Send + Sync + 'static,
28 T::Response: IntoJobResult + 'static,
29 T::Future: Send + 'static,
30 {
31 Self(BoxCloneSyncService::new(
32 svc.map_response(IntoJobResult::into_job_result),
33 ))
34 }
35
36 pub(crate) fn call_owned(self, call: JobCall) -> RouteFuture<E> {
38 self.oneshot_inner_owned(call)
39 }
40
41 pub(crate) fn oneshot_inner(&mut self, call: JobCall) -> RouteFuture<E> {
42 RouteFuture::new(self.0.clone().oneshot(call))
43 }
44
45 pub(crate) fn oneshot_inner_owned(self, call: JobCall) -> RouteFuture<E> {
47 RouteFuture::new(self.0.oneshot(call))
48 }
49
50 pub(crate) fn layer<L, NewError>(self, layer: L) -> Route<NewError>
51 where
52 L: Layer<Route<E>> + Clone + Send + 'static,
53 L::Service: Service<JobCall> + Clone + Send + Sync + 'static,
54 <L::Service as Service<JobCall>>::Response: IntoJobResult + 'static,
55 <L::Service as Service<JobCall>>::Error: Into<NewError> + 'static,
56 <L::Service as Service<JobCall>>::Future: Send + 'static,
57 NewError: 'static,
58 {
59 let layer = (
60 MapErrLayer::new(Into::into),
61 MapResponseLayer::new(IntoJobResult::into_job_result),
62 layer,
63 );
64
65 Route::new(layer.layer(self))
66 }
67}
68
69impl<E> Clone for Route<E> {
70 #[track_caller]
71 fn clone(&self) -> Self {
72 Self(self.0.clone())
73 }
74}
75
76impl<E> fmt::Debug for Route<E> {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 f.debug_struct("Route").finish()
79 }
80}
81
82impl<B, E> Service<JobCall<B>> for Route<E>
83where
84 B: Into<Bytes>,
85{
86 type Response = Option<JobResult>;
87 type Error = E;
88 type Future = RouteFuture<E>;
89
90 #[inline]
91 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
92 Poll::Ready(Ok(()))
93 }
94
95 #[inline]
96 fn call(&mut self, call: JobCall<B>) -> Self::Future {
97 self.oneshot_inner(call.map(Into::into))
98 }
99}
100
101pin_project! {
102 pub struct RouteFuture<E> {
104 #[pin]
105 inner: Oneshot<BoxCloneSyncService<JobCall, Option<JobResult>, E>, JobCall>,
106 }
107}
108
109impl<E> RouteFuture<E> {
110 fn new(inner: Oneshot<BoxCloneSyncService<JobCall, Option<JobResult>, E>, JobCall>) -> Self {
112 Self { inner }
113 }
114}
115
116impl<E> Future for RouteFuture<E> {
117 type Output = Result<Option<JobResult>, E>;
118
119 #[inline]
120 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
121 let this = self.project();
122 let res = ready!(this.inner.poll(cx))?;
123
124 Poll::Ready(Ok(res))
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 #[test]
133 fn traits() {
134 use crate::test_helpers::*;
135 assert_send::<Route<()>>();
136 }
137}