jsonrpsee_server/
utils.rs1use std::future::Future;
28use std::pin::Pin;
29use std::task::{Context, Poll};
30
31use crate::{HttpBody, HttpRequest};
32
33use futures_util::future::{self, Either};
34use hyper_util::rt::{TokioExecutor, TokioIo};
35use jsonrpsee_core::BoxError;
36use pin_project::pin_project;
37use tower::ServiceExt;
38use tower::util::Oneshot;
39
40#[derive(Debug, Copy, Clone)]
41pub(crate) struct TowerToHyperService<S> {
42 service: S,
43}
44
45impl<S> TowerToHyperService<S> {
46 pub(crate) fn new(service: S) -> Self {
47 Self { service }
48 }
49}
50
51impl<S> hyper::service::Service<HttpRequest<hyper::body::Incoming>> for TowerToHyperService<S>
52where
53 S: tower::Service<HttpRequest> + Clone,
54{
55 type Response = S::Response;
56 type Error = S::Error;
57 type Future = TowerToHyperServiceFuture<S, HttpRequest>;
58
59 fn call(&self, req: HttpRequest<hyper::body::Incoming>) -> Self::Future {
60 let req = req.map(HttpBody::new);
61 TowerToHyperServiceFuture { future: self.service.clone().oneshot(req) }
62 }
63}
64
65#[pin_project]
66pub(crate) struct TowerToHyperServiceFuture<S, R>
67where
68 S: tower::Service<R>,
69{
70 #[pin]
71 future: Oneshot<S, R>,
72}
73
74impl<S, R> std::future::Future for TowerToHyperServiceFuture<S, R>
75where
76 S: tower::Service<R>,
77{
78 type Output = Result<S::Response, S::Error>;
79
80 #[inline]
81 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82 self.project().future.poll(cx)
83 }
84}
85
86pub async fn serve<S, B, I>(io: I, service: S) -> Result<(), BoxError>
91where
92 S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
93 S::Future: Send,
94 S::Response: Send,
95 S::Error: Into<BoxError>,
96 B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
97 B::Error: Into<BoxError>,
98 I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
99{
100 let service = hyper_util::service::TowerToHyperService::new(service);
101 let io = TokioIo::new(io);
102
103 let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
104 let conn = builder.serve_connection_with_upgrades(io, service);
105 conn.await
106}
107
108pub async fn serve_with_graceful_shutdown<S, B, I>(
111 io: I,
112 service: S,
113 stopped: impl Future<Output = ()>,
114) -> Result<(), BoxError>
115where
116 S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
117 S::Future: Send,
118 S::Response: Send,
119 S::Error: Into<BoxError>,
120 B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
121 B::Error: Into<BoxError>,
122 I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
123{
124 let service = hyper_util::service::TowerToHyperService::new(service);
125 let io = TokioIo::new(io);
126
127 let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
128 let conn = builder.serve_connection_with_upgrades(io, service);
129
130 tokio::pin!(stopped, conn);
131
132 match future::select(conn, stopped).await {
133 Either::Left((conn, _)) => conn,
135 Either::Right((_, mut conn)) => {
138 conn.as_mut().graceful_shutdown();
139 conn.await
140 }
141 }
142}
143
144pub mod deserialize_with_ext {
146 pub mod call {
148 use jsonrpsee_types::Request;
149
150 pub fn from_slice<'a>(
152 data: &'a [u8],
153 extensions: &'a http::Extensions,
154 ) -> Result<Request<'a>, serde_json::Error> {
155 let mut req: Request = serde_json::from_slice(data)?;
156 *req.extensions_mut() = extensions.clone();
157 Ok(req)
158 }
159
160 pub fn from_str<'a>(data: &'a str, extensions: &'a http::Extensions) -> Result<Request<'a>, serde_json::Error> {
162 let mut req: Request = serde_json::from_str(data)?;
163 *req.extensions_mut() = extensions.clone();
164 Ok(req)
165 }
166 }
167
168 pub mod notif {
170 use jsonrpsee_types::Notification;
171
172 pub fn from_slice<'a, T>(
174 data: &'a [u8],
175 extensions: &'a http::Extensions,
176 ) -> Result<Notification<'a, T>, serde_json::Error>
177 where
178 T: serde::Deserialize<'a>,
179 {
180 let mut notif: Notification<T> = serde_json::from_slice(data)?;
181 *notif.extensions_mut() = extensions.clone();
182 Ok(notif)
183 }
184
185 pub fn from_str<'a, T>(
187 data: &'a str,
188 extensions: &http::Extensions,
189 ) -> Result<Notification<'a, T>, serde_json::Error>
190 where
191 T: serde::Deserialize<'a>,
192 {
193 let mut notif: Notification<T> = serde_json::from_str(data)?;
194 *notif.extensions_mut() = extensions.clone();
195 Ok(notif)
196 }
197 }
198}