jsonrpsee_server/
utils.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use std::future::Future;
28use std::pin::Pin;
29use std::task::{Context, Poll};
30
31use crate::{HttpBody, HttpRequest};
32
33use futures_util::future::{self, Either};
34use hyper_util::rt::{TokioExecutor, TokioIo};
35use jsonrpsee_core::BoxError;
36use pin_project::pin_project;
37use tower::ServiceExt;
38use tower::util::Oneshot;
39
40#[derive(Debug, Copy, Clone)]
41pub(crate) struct TowerToHyperService<S> {
42	service: S,
43}
44
45impl<S> TowerToHyperService<S> {
46	pub(crate) fn new(service: S) -> Self {
47		Self { service }
48	}
49}
50
51impl<S> hyper::service::Service<HttpRequest<hyper::body::Incoming>> for TowerToHyperService<S>
52where
53	S: tower::Service<HttpRequest> + Clone,
54{
55	type Response = S::Response;
56	type Error = S::Error;
57	type Future = TowerToHyperServiceFuture<S, HttpRequest>;
58
59	fn call(&self, req: HttpRequest<hyper::body::Incoming>) -> Self::Future {
60		let req = req.map(HttpBody::new);
61		TowerToHyperServiceFuture { future: self.service.clone().oneshot(req) }
62	}
63}
64
65#[pin_project]
66pub(crate) struct TowerToHyperServiceFuture<S, R>
67where
68	S: tower::Service<R>,
69{
70	#[pin]
71	future: Oneshot<S, R>,
72}
73
74impl<S, R> std::future::Future for TowerToHyperServiceFuture<S, R>
75where
76	S: tower::Service<R>,
77{
78	type Output = Result<S::Response, S::Error>;
79
80	#[inline]
81	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82		self.project().future.poll(cx)
83	}
84}
85
86/// Serve a service over a TCP connection without graceful shutdown.
87/// This means that pending requests will be dropped when the server is stopped.
88///
89/// If you want to gracefully shutdown the server, use [`serve_with_graceful_shutdown`] instead.
90pub async fn serve<S, B, I>(io: I, service: S) -> Result<(), BoxError>
91where
92	S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
93	S::Future: Send,
94	S::Response: Send,
95	S::Error: Into<BoxError>,
96	B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
97	B::Error: Into<BoxError>,
98	I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
99{
100	let service = hyper_util::service::TowerToHyperService::new(service);
101	let io = TokioIo::new(io);
102
103	let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
104	let conn = builder.serve_connection_with_upgrades(io, service);
105	conn.await
106}
107
108/// Serve a service over a TCP connection with graceful shutdown.
109/// This means that pending requests will be completed before the server is stopped.
110pub async fn serve_with_graceful_shutdown<S, B, I>(
111	io: I,
112	service: S,
113	stopped: impl Future<Output = ()>,
114) -> Result<(), BoxError>
115where
116	S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
117	S::Future: Send,
118	S::Response: Send,
119	S::Error: Into<BoxError>,
120	B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
121	B::Error: Into<BoxError>,
122	I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
123{
124	let service = hyper_util::service::TowerToHyperService::new(service);
125	let io = TokioIo::new(io);
126
127	let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
128	let conn = builder.serve_connection_with_upgrades(io, service);
129
130	tokio::pin!(stopped, conn);
131
132	match future::select(conn, stopped).await {
133		// Return if the connection was completed.
134		Either::Left((conn, _)) => conn,
135		// If the server is stopped, we should gracefully shutdown
136		// the connection and poll it until it finishes.
137		Either::Right((_, mut conn)) => {
138			conn.as_mut().graceful_shutdown();
139			conn.await
140		}
141	}
142}
143
144/// Deserialize calls, notifications and responses with HTTP extensions.
145pub mod deserialize_with_ext {
146	/// Method call.
147	pub mod call {
148		use jsonrpsee_types::Request;
149
150		/// Wrapper over `serde_json::from_slice` that sets the extensions.
151		pub fn from_slice<'a>(
152			data: &'a [u8],
153			extensions: &'a http::Extensions,
154		) -> Result<Request<'a>, serde_json::Error> {
155			let mut req: Request = serde_json::from_slice(data)?;
156			*req.extensions_mut() = extensions.clone();
157			Ok(req)
158		}
159
160		/// Wrapper over `serde_json::from_str` that sets the extensions.
161		pub fn from_str<'a>(data: &'a str, extensions: &'a http::Extensions) -> Result<Request<'a>, serde_json::Error> {
162			let mut req: Request = serde_json::from_str(data)?;
163			*req.extensions_mut() = extensions.clone();
164			Ok(req)
165		}
166	}
167
168	/// Notification.
169	pub mod notif {
170		use jsonrpsee_types::Notification;
171
172		/// Wrapper over `serde_json::from_slice` that sets the extensions.
173		pub fn from_slice<'a, T>(
174			data: &'a [u8],
175			extensions: &'a http::Extensions,
176		) -> Result<Notification<'a, T>, serde_json::Error>
177		where
178			T: serde::Deserialize<'a>,
179		{
180			let mut notif: Notification<T> = serde_json::from_slice(data)?;
181			*notif.extensions_mut() = extensions.clone();
182			Ok(notif)
183		}
184
185		/// Wrapper over `serde_json::from_str` that sets the extensions.
186		pub fn from_str<'a, T>(
187			data: &'a str,
188			extensions: &http::Extensions,
189		) -> Result<Notification<'a, T>, serde_json::Error>
190		where
191			T: serde::Deserialize<'a>,
192		{
193			let mut notif: Notification<T> = serde_json::from_str(data)?;
194			*notif.extensions_mut() = extensions.clone();
195			Ok(notif)
196		}
197	}
198}