alloy_transport_http/
hyper_transport.rs

1use crate::{Http, HttpConnect};
2use alloy_json_rpc::{RequestPacket, ResponsePacket};
3use alloy_transport::{
4    utils::guess_local_url, BoxTransport, TransportConnect, TransportError, TransportErrorKind,
5    TransportFut, TransportResult,
6};
7use http_body_util::{BodyExt, Full};
8use hyper::{
9    body::{Bytes, Incoming},
10    header, Request, Response,
11};
12use hyper_util::client::legacy::Error;
13use std::{future::Future, marker::PhantomData, pin::Pin, task};
14use tower::Service;
15use tracing::{debug, debug_span, trace, Instrument};
16
17#[cfg(feature = "hyper-tls")]
18type Hyper = hyper_util::client::legacy::Client<
19    hyper_tls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
20    http_body_util::Full<::hyper::body::Bytes>,
21>;
22
23#[cfg(not(feature = "hyper-tls"))]
24type Hyper = hyper_util::client::legacy::Client<
25    hyper_util::client::legacy::connect::HttpConnector,
26    http_body_util::Full<::hyper::body::Bytes>,
27>;
28
29/// A [`hyper`] based transport client.
30pub type HyperTransport = Http<HyperClient>;
31
32impl HyperTransport {
33    /// Create a new [`HyperTransport`] with the given URL and default hyper client.
34    pub fn new_hyper(url: url::Url) -> Self {
35        let client = HyperClient::new();
36        Self::with_client(client, url)
37    }
38}
39
40/// A [hyper] based client that can be used with tower layers.
41#[derive(Clone, Debug)]
42pub struct HyperClient<B = Full<Bytes>, S = Hyper> {
43    service: S,
44    _pd: PhantomData<B>,
45}
46
47/// Alias for [`Response<Incoming>`]
48pub type HyperResponse = Response<Incoming>;
49
50/// Alias for pinned box future that results in [`HyperResponse`]
51pub type HyperResponseFut<T = HyperResponse, E = Error> =
52    Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'static>>;
53
54impl HyperClient {
55    /// Create a new [HyperClient] with the given URL and default hyper client.
56    pub fn new() -> Self {
57        let executor = hyper_util::rt::TokioExecutor::new();
58
59        #[cfg(feature = "hyper-tls")]
60        let service = hyper_util::client::legacy::Client::builder(executor)
61            .build(hyper_tls::HttpsConnector::new());
62
63        #[cfg(not(feature = "hyper-tls"))]
64        let service =
65            hyper_util::client::legacy::Client::builder(executor).build_http::<Full<Bytes>>();
66        Self { service, _pd: PhantomData }
67    }
68}
69
70impl Default for HyperClient {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl<B, S> HyperClient<B, S> {
77    /// Create a new [HyperClient] with the given URL and service.
78    pub const fn with_service(service: S) -> Self {
79        Self { service, _pd: PhantomData }
80    }
81}
82
83impl<B, S, ResBody> Http<HyperClient<B, S>>
84where
85    S: Service<Request<B>, Response = Response<ResBody>> + Clone + Send + Sync + 'static,
86    S::Future: Send,
87    S::Error: std::error::Error + Send + Sync + 'static,
88    B: From<Vec<u8>> + Send + 'static + Clone,
89    ResBody: BodyExt + Send + 'static,
90    ResBody::Error: std::error::Error + Send + Sync + 'static,
91    ResBody::Data: Send,
92{
93    async fn do_hyper(self, req: RequestPacket) -> TransportResult<ResponsePacket> {
94        debug!(count = req.len(), "sending request packet to server");
95
96        let mut builder = hyper::Request::builder()
97            .method(hyper::Method::POST)
98            .uri(self.url.as_str())
99            .header(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
100
101        // Add any additional headers from the request packet.
102        for (name, value) in req.headers().iter() {
103            builder = builder.header(name, value);
104        }
105
106        let ser = req.serialize().map_err(TransportError::ser_err)?;
107        // convert the Box<RawValue> into a hyper request<B>
108        let body = ser.get().as_bytes().to_owned().into();
109
110        let req = builder.body(body).map_err(TransportErrorKind::custom)?;
111
112        let mut service = self.client.service;
113        let resp = service.call(req).await.map_err(TransportErrorKind::custom)?;
114
115        let status = resp.status();
116
117        debug!(%status, "received response from server");
118
119        // Unpack data from the response body. We do this regardless of
120        // the status code, as we want to return the error in the body
121        // if there is one.
122        let body = resp.into_body().collect().await.map_err(TransportErrorKind::custom)?.to_bytes();
123
124        if tracing::enabled!(tracing::Level::TRACE) {
125            trace!(body = %String::from_utf8_lossy(&body), "response body");
126        } else {
127            debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body");
128        }
129
130        if !status.is_success() {
131            return Err(TransportErrorKind::http_error(
132                status.as_u16(),
133                String::from_utf8_lossy(&body).into_owned(),
134            ));
135        }
136
137        // Deserialize a Box<RawValue> from the body. If deserialization fails, return
138        // the body as a string in the error. The conversion to String
139        // is lossy and may not cover all the bytes in the body.
140        serde_json::from_slice(&body)
141            .map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(body.as_ref())))
142    }
143}
144
145impl TransportConnect for HttpConnect<HyperTransport> {
146    fn is_local(&self) -> bool {
147        guess_local_url(self.url.as_str())
148    }
149
150    async fn get_transport(&self) -> Result<BoxTransport, TransportError> {
151        Ok(BoxTransport::new(Http::with_client(HyperClient::new(), self.url.clone())))
152    }
153}
154
155impl<B, S> Service<RequestPacket> for Http<HyperClient<B, S>>
156where
157    S: Service<Request<B>, Response = HyperResponse> + Clone + Send + Sync + 'static,
158    S::Future: Send,
159    S::Error: std::error::Error + Send + Sync + 'static,
160    B: From<Vec<u8>> + Send + 'static + Clone + Sync,
161{
162    type Response = ResponsePacket;
163    type Error = TransportError;
164    type Future = TransportFut<'static>;
165
166    #[inline]
167    fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
168        // `hyper` always returns `Ok(())`.
169        task::Poll::Ready(Ok(()))
170    }
171
172    #[inline]
173    fn call(&mut self, req: RequestPacket) -> Self::Future {
174        let this = self.clone();
175        let span = debug_span!("HyperTransport", url = %this.url);
176        Box::pin(this.do_hyper(req).instrument(span))
177    }
178}