multiplex_tonic_hyper/
make.rs

1use std::future::Future;
2use std::task::Poll;
3
4use futures::future::Join;
5use hyper::{Body, Request};
6use pin_project::pin_project;
7use tower::Service;
8
9use crate::to_boxed;
10use crate::BoxedError;
11use crate::Multiplexer;
12
13/// A MakeService for [Multiplexer]
14///
15/// This type is used when more than one Multiplexer instance is needed
16pub struct MakeMultiplexer<MakeGrpc, MakeWeb> {
17	make_grpc: MakeGrpc,
18	make_web: MakeWeb,
19}
20
21impl<MakeGrpc, MakeWeb> MakeMultiplexer<MakeGrpc, MakeWeb> {
22	/// Move two make services into a new MakeService for Multiplexer
23	pub fn new(make_grpc: MakeGrpc, make_web: MakeWeb) -> Self {
24		MakeMultiplexer {
25			make_grpc,
26			make_web,
27		}
28	}
29}
30
31impl<Grpc, Web, GrpcError, WebError, MakeGrpc, MakeWeb, Target> Service<Target>
32	for MakeMultiplexer<MakeGrpc, MakeWeb>
33where
34	MakeGrpc: Service<Target, Response = Grpc, Error = GrpcError>,
35	MakeWeb: Service<Target, Response = Web, Error = WebError>,
36	Grpc: Service<Request<Body>>,
37	Web: Service<Request<Body>>,
38	GrpcError: Into<BoxedError>,
39	WebError: Into<BoxedError>,
40	Target: Clone,
41{
42	type Response = Multiplexer<Grpc, Web>;
43
44	type Error = BoxedError;
45
46	type Future = MakeMultiplexerFuture<MakeGrpc::Future, MakeWeb::Future>;
47
48	fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
49		match (
50			self.make_grpc.poll_ready(cx).map_err(to_boxed)?,
51			self.make_web.poll_ready(cx).map_err(to_boxed)?,
52		) {
53			(Poll::Ready(_), Poll::Ready(_)) => Poll::Ready(Ok(())),
54			_ => Poll::Pending,
55		}
56	}
57
58	fn call(&mut self, req: Target) -> Self::Future {
59		let make_grpc_future = self.make_grpc.call(req.clone());
60		let make_web_future = self.make_web.call(req);
61		MakeMultiplexerFuture::new(make_grpc_future, make_web_future)
62	}
63}
64
65#[pin_project]
66pub struct MakeMultiplexerFuture<MakeGrpcFuture, MakeWebFuture>
67where
68	MakeGrpcFuture: Future,
69	MakeWebFuture: Future,
70{
71	#[pin]
72	inner: Join<MakeGrpcFuture, MakeWebFuture>,
73}
74
75impl<MakeGrpcFuture, MakeWebFuture> MakeMultiplexerFuture<MakeGrpcFuture, MakeWebFuture>
76where
77	MakeGrpcFuture: Future,
78	MakeWebFuture: Future,
79{
80	fn new(make_grpc_future: MakeGrpcFuture, make_web_future: MakeWebFuture) -> Self {
81		let joined_future = futures::future::join(make_grpc_future, make_web_future);
82		MakeMultiplexerFuture {
83			inner: joined_future,
84		}
85	}
86}
87
88impl<MakeGrpcFuture, MakeWebFuture, MakeGrpcError, MakeWebError, Grpc, Web> Future
89	for MakeMultiplexerFuture<MakeGrpcFuture, MakeWebFuture>
90where
91	MakeGrpcFuture: Future<Output = Result<Grpc, MakeGrpcError>>,
92	MakeWebFuture: Future<Output = Result<Web, MakeWebError>>,
93	MakeGrpcError: Into<BoxedError>,
94	MakeWebError: Into<BoxedError>,
95	Grpc: Service<Request<Body>>,
96	Web: Service<Request<Body>>,
97{
98	type Output = Result<Multiplexer<Grpc, Web>, BoxedError>;
99
100	fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
101		let poll = self.project().inner.poll(cx);
102		if let Poll::Ready(output) = poll {
103			match output {
104				(Ok(grpc), Ok(web)) => Poll::Ready(Ok(Multiplexer::new(grpc, web))),
105				(Err(grpc_error), _) => Poll::Ready(Err(grpc_error.into())),
106				(_, Err(web_error)) => Poll::Ready(Err(web_error.into())),
107			}
108		} else {
109			Poll::Pending
110		}
111	}
112}
113
114#[cfg(test)]
115mod tests {
116	use hyper::{service::service_fn, Body, Request, Response};
117	use tower::Service;
118
119	use super::MakeMultiplexer;
120
121	async fn service(_req: Request<Body>) -> Result<Response<Body>, String> {
122		Ok(Response::new(Body::from("service")))
123	}
124
125	#[test]
126	fn make_multiplexer_receives_two_make_service_impl_make_service() {
127		let make_grpc = tower::make::Shared::new(service_fn(service));
128		let make_web = tower::make::Shared::new(service_fn(service));
129
130		//Only ensure that constructor accepts these services
131		let _make_multiplexer = MakeMultiplexer::new(make_grpc, make_web);
132	}
133
134	#[tokio::test]
135	async fn make_multiplexer_is_a_make_service() {
136		let make_grpc = tower::make::Shared::new(service_fn(service));
137		let make_web = tower::make::Shared::new(service_fn(service));
138
139		let mut make_multiplexer = MakeMultiplexer::new(make_grpc, make_web);
140		use tower::make::MakeService;
141		//Ensure make_multiplexer is a MakeService
142		let _service = make_multiplexer.make_service(()).await.unwrap();
143	}
144
145	#[tokio::test]
146	async fn use_make_multiplexer_as_service() {
147		let make_grpc = tower::make::Shared::new(service_fn(service));
148		let make_web = tower::make::Shared::new(service_fn(service));
149		let mut make_multiplexer = MakeMultiplexer::new(make_grpc, make_web);
150		use tower::ServiceExt;
151
152		ServiceExt::<()>::ready(&mut make_multiplexer)
153			.await
154			.unwrap();
155		make_multiplexer.call(()).await.unwrap();
156	}
157}