multiplex_tonic_hyper/
make.rs1use std::future::Future;
2use std::task::Poll;
3
4use futures::future::Join;
5use hyper::{Body, Request};
6use pin_project::pin_project;
7use tower::Service;
8
9use crate::to_boxed;
10use crate::BoxedError;
11use crate::Multiplexer;
12
13pub struct MakeMultiplexer<MakeGrpc, MakeWeb> {
17 make_grpc: MakeGrpc,
18 make_web: MakeWeb,
19}
20
21impl<MakeGrpc, MakeWeb> MakeMultiplexer<MakeGrpc, MakeWeb> {
22 pub fn new(make_grpc: MakeGrpc, make_web: MakeWeb) -> Self {
24 MakeMultiplexer {
25 make_grpc,
26 make_web,
27 }
28 }
29}
30
31impl<Grpc, Web, GrpcError, WebError, MakeGrpc, MakeWeb, Target> Service<Target>
32 for MakeMultiplexer<MakeGrpc, MakeWeb>
33where
34 MakeGrpc: Service<Target, Response = Grpc, Error = GrpcError>,
35 MakeWeb: Service<Target, Response = Web, Error = WebError>,
36 Grpc: Service<Request<Body>>,
37 Web: Service<Request<Body>>,
38 GrpcError: Into<BoxedError>,
39 WebError: Into<BoxedError>,
40 Target: Clone,
41{
42 type Response = Multiplexer<Grpc, Web>;
43
44 type Error = BoxedError;
45
46 type Future = MakeMultiplexerFuture<MakeGrpc::Future, MakeWeb::Future>;
47
48 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
49 match (
50 self.make_grpc.poll_ready(cx).map_err(to_boxed)?,
51 self.make_web.poll_ready(cx).map_err(to_boxed)?,
52 ) {
53 (Poll::Ready(_), Poll::Ready(_)) => Poll::Ready(Ok(())),
54 _ => Poll::Pending,
55 }
56 }
57
58 fn call(&mut self, req: Target) -> Self::Future {
59 let make_grpc_future = self.make_grpc.call(req.clone());
60 let make_web_future = self.make_web.call(req);
61 MakeMultiplexerFuture::new(make_grpc_future, make_web_future)
62 }
63}
64
65#[pin_project]
66pub struct MakeMultiplexerFuture<MakeGrpcFuture, MakeWebFuture>
67where
68 MakeGrpcFuture: Future,
69 MakeWebFuture: Future,
70{
71 #[pin]
72 inner: Join<MakeGrpcFuture, MakeWebFuture>,
73}
74
75impl<MakeGrpcFuture, MakeWebFuture> MakeMultiplexerFuture<MakeGrpcFuture, MakeWebFuture>
76where
77 MakeGrpcFuture: Future,
78 MakeWebFuture: Future,
79{
80 fn new(make_grpc_future: MakeGrpcFuture, make_web_future: MakeWebFuture) -> Self {
81 let joined_future = futures::future::join(make_grpc_future, make_web_future);
82 MakeMultiplexerFuture {
83 inner: joined_future,
84 }
85 }
86}
87
88impl<MakeGrpcFuture, MakeWebFuture, MakeGrpcError, MakeWebError, Grpc, Web> Future
89 for MakeMultiplexerFuture<MakeGrpcFuture, MakeWebFuture>
90where
91 MakeGrpcFuture: Future<Output = Result<Grpc, MakeGrpcError>>,
92 MakeWebFuture: Future<Output = Result<Web, MakeWebError>>,
93 MakeGrpcError: Into<BoxedError>,
94 MakeWebError: Into<BoxedError>,
95 Grpc: Service<Request<Body>>,
96 Web: Service<Request<Body>>,
97{
98 type Output = Result<Multiplexer<Grpc, Web>, BoxedError>;
99
100 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
101 let poll = self.project().inner.poll(cx);
102 if let Poll::Ready(output) = poll {
103 match output {
104 (Ok(grpc), Ok(web)) => Poll::Ready(Ok(Multiplexer::new(grpc, web))),
105 (Err(grpc_error), _) => Poll::Ready(Err(grpc_error.into())),
106 (_, Err(web_error)) => Poll::Ready(Err(web_error.into())),
107 }
108 } else {
109 Poll::Pending
110 }
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use hyper::{service::service_fn, Body, Request, Response};
117 use tower::Service;
118
119 use super::MakeMultiplexer;
120
121 async fn service(_req: Request<Body>) -> Result<Response<Body>, String> {
122 Ok(Response::new(Body::from("service")))
123 }
124
125 #[test]
126 fn make_multiplexer_receives_two_make_service_impl_make_service() {
127 let make_grpc = tower::make::Shared::new(service_fn(service));
128 let make_web = tower::make::Shared::new(service_fn(service));
129
130 let _make_multiplexer = MakeMultiplexer::new(make_grpc, make_web);
132 }
133
134 #[tokio::test]
135 async fn make_multiplexer_is_a_make_service() {
136 let make_grpc = tower::make::Shared::new(service_fn(service));
137 let make_web = tower::make::Shared::new(service_fn(service));
138
139 let mut make_multiplexer = MakeMultiplexer::new(make_grpc, make_web);
140 use tower::make::MakeService;
141 let _service = make_multiplexer.make_service(()).await.unwrap();
143 }
144
145 #[tokio::test]
146 async fn use_make_multiplexer_as_service() {
147 let make_grpc = tower::make::Shared::new(service_fn(service));
148 let make_web = tower::make::Shared::new(service_fn(service));
149 let mut make_multiplexer = MakeMultiplexer::new(make_grpc, make_web);
150 use tower::ServiceExt;
151
152 ServiceExt::<()>::ready(&mut make_multiplexer)
153 .await
154 .unwrap();
155 make_multiplexer.call(()).await.unwrap();
156 }
157}