alloy_transport_http/
hyper_transport.rs1use crate::{Http, HttpConnect};
2use alloy_json_rpc::{RequestPacket, ResponsePacket};
3use alloy_transport::{
4 utils::guess_local_url, BoxTransport, TransportConnect, TransportError, TransportErrorKind,
5 TransportFut, TransportResult,
6};
7use http_body_util::{BodyExt, Full};
8use hyper::{
9 body::{Bytes, Incoming},
10 header, Request, Response,
11};
12use hyper_util::client::legacy::Error;
13use std::{future::Future, marker::PhantomData, pin::Pin, task};
14use tower::{Layer, Service};
15use tracing::{debug, debug_span, trace, Instrument};
16
17#[cfg(feature = "hyper-tls")]
18type Hyper = hyper_util::client::legacy::Client<
19 hyper_tls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
20 http_body_util::Full<::hyper::body::Bytes>,
21>;
22
23#[cfg(not(feature = "hyper-tls"))]
24type Hyper = hyper_util::client::legacy::Client<
25 hyper_util::client::legacy::connect::HttpConnector,
26 http_body_util::Full<::hyper::body::Bytes>,
27>;
28
29pub type HyperTransport = Http<HyperClient>;
31
32impl HyperTransport {
33 pub fn new_hyper(url: url::Url) -> Self {
35 let client = HyperClient::new();
36 Self::with_client(client, url)
37 }
38}
39
40#[derive(Clone, Debug)]
42pub struct HyperClient<B = Full<Bytes>, S = Hyper> {
43 service: S,
44 _pd: PhantomData<B>,
45}
46
47pub type HyperResponse = Response<Incoming>;
49
50pub type HyperResponseFut<T = HyperResponse, E = Error> =
52 Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'static>>;
53
54impl HyperClient {
55 pub fn new() -> Self {
57 let executor = hyper_util::rt::TokioExecutor::new();
58
59 #[cfg(feature = "hyper-tls")]
60 let service = hyper_util::client::legacy::Client::builder(executor)
61 .build(hyper_tls::HttpsConnector::new());
62
63 #[cfg(not(feature = "hyper-tls"))]
64 let service =
65 hyper_util::client::legacy::Client::builder(executor).build_http::<Full<Bytes>>();
66 Self { service, _pd: PhantomData }
67 }
68}
69
70impl Default for HyperClient {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl<B, S> HyperClient<B, S> {
77 pub const fn with_service(service: S) -> Self {
79 Self { service, _pd: PhantomData }
80 }
81
82 pub fn layer<L>(self, layer: L) -> HyperClient<B, L::Service>
98 where
99 L: Layer<S>,
100 {
101 HyperClient::with_service(layer.layer(self.service))
102 }
103}
104
105impl<B, S, ResBody> Http<HyperClient<B, S>>
106where
107 S: Service<Request<B>, Response = Response<ResBody>> + Clone + Send + Sync + 'static,
108 S::Future: Send,
109 S::Error: std::error::Error + Send + Sync + 'static,
110 B: From<Vec<u8>> + Send + 'static + Clone,
111 ResBody: BodyExt + Send + 'static,
112 ResBody::Error: std::error::Error + Send + Sync + 'static,
113 ResBody::Data: Send,
114{
115 async fn do_hyper(self, req: RequestPacket) -> TransportResult<ResponsePacket> {
116 debug!(count = req.len(), "sending request packet to server");
117
118 let mut builder = hyper::Request::builder()
119 .method(hyper::Method::POST)
120 .uri(self.url.as_str())
121 .header(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
122
123 for (name, value) in req.headers().iter() {
125 builder = builder.header(name, value);
126 }
127
128 let ser = req.serialize().map_err(TransportError::ser_err)?;
129 let body = ser.get().as_bytes().to_owned().into();
131
132 let req = builder.body(body).map_err(TransportErrorKind::custom)?;
133
134 let mut service = self.client.service;
135 let resp = service.call(req).await.map_err(TransportErrorKind::custom)?;
136
137 let status = resp.status();
138
139 debug!(%status, "received response from server");
140
141 let body = resp.into_body().collect().await.map_err(TransportErrorKind::custom)?.to_bytes();
145
146 if tracing::enabled!(tracing::Level::TRACE) {
147 trace!(body = %String::from_utf8_lossy(&body), "response body");
148 } else {
149 debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body");
150 }
151
152 if !status.is_success() {
153 return Err(TransportErrorKind::http_error(
154 status.as_u16(),
155 String::from_utf8_lossy(&body).into_owned(),
156 ));
157 }
158
159 serde_json::from_slice(&body)
163 .map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(body.as_ref())))
164 }
165}
166
167impl TransportConnect for HttpConnect<HyperTransport> {
168 fn is_local(&self) -> bool {
169 guess_local_url(self.url.as_str())
170 }
171
172 async fn get_transport(&self) -> Result<BoxTransport, TransportError> {
173 Ok(BoxTransport::new(Http::with_client(HyperClient::new(), self.url.clone())))
174 }
175}
176
177impl<B, S> Service<RequestPacket> for Http<HyperClient<B, S>>
178where
179 S: Service<Request<B>, Response = HyperResponse> + Clone + Send + Sync + 'static,
180 S::Future: Send,
181 S::Error: std::error::Error + Send + Sync + 'static,
182 B: From<Vec<u8>> + Send + 'static + Clone + Sync,
183{
184 type Response = ResponsePacket;
185 type Error = TransportError;
186 type Future = TransportFut<'static>;
187
188 #[inline]
189 fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
190 task::Poll::Ready(Ok(()))
192 }
193
194 #[inline]
195 fn call(&mut self, req: RequestPacket) -> Self::Future {
196 let this = self.clone();
197 let span = debug_span!("HyperTransport", url = %this.url);
198 Box::pin(this.do_hyper(req).instrument(span.or_current()))
199 }
200}