cs_mwc_web3/transports/
http.rs

1//! HTTP Transport
2
3use crate::{error, helpers, rpc, BatchTransport, Error, RequestId, Transport};
4use futures::{
5    self,
6    task::{Context, Poll},
7    Future, FutureExt, StreamExt,
8};
9use hyper::header::HeaderValue;
10use std::{
11    env, fmt,
12    ops::Deref,
13    pin::Pin,
14    sync::{
15        atomic::{self, AtomicUsize},
16        Arc,
17    },
18};
19use url::Url;
20
21impl From<hyper::Error> for Error {
22    fn from(err: hyper::Error) -> Self {
23        Error::Transport(format!("{:?}", err))
24    }
25}
26
27impl From<hyper::http::uri::InvalidUri> for Error {
28    fn from(err: hyper::http::uri::InvalidUri) -> Self {
29        Error::Transport(format!("{:?}", err))
30    }
31}
32
33impl From<hyper::header::InvalidHeaderValue> for Error {
34    fn from(err: hyper::header::InvalidHeaderValue) -> Self {
35        Error::Transport(format!("{}", err))
36    }
37}
38
39// The max string length of a request without transfer-encoding: chunked.
40const MAX_SINGLE_CHUNK: usize = 256;
41
42#[cfg(feature = "http-tls")]
43#[derive(Debug, Clone)]
44enum Client {
45    Proxy(hyper::Client<hyper_proxy::ProxyConnector<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>>),
46    NoProxy(hyper::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>),
47}
48
49#[cfg(not(feature = "http-tls"))]
50#[derive(Debug, Clone)]
51enum Client {
52    Proxy(hyper::Client<hyper_proxy::ProxyConnector<hyper::client::HttpConnector>>),
53    NoProxy(hyper::Client<hyper::client::HttpConnector>),
54}
55
56impl Client {
57    pub fn request(&self, req: hyper::Request<hyper::Body>) -> hyper::client::ResponseFuture {
58        match self {
59            Client::Proxy(client) => client.request(req),
60            Client::NoProxy(client) => client.request(req),
61        }
62    }
63}
64
65/// HTTP Transport (synchronous)
66#[derive(Debug, Clone)]
67pub struct Http {
68    id: Arc<AtomicUsize>,
69    url: hyper::Uri,
70    basic_auth: Option<HeaderValue>,
71    client: Client,
72}
73
74impl Http {
75    /// Create new HTTP transport connecting to given URL.
76    pub fn new(url: &str) -> error::Result<Self> {
77        #[cfg(feature = "http-tls")]
78        let (proxy_env, connector) = { (env::var("HTTPS_PROXY"), hyper_tls::HttpsConnector::new()) };
79        #[cfg(not(feature = "http-tls"))]
80        let (proxy_env, connector) = { (env::var("HTTP_PROXY"), hyper::client::HttpConnector::new()) };
81
82        let client = match proxy_env {
83            Ok(proxy) => {
84                let mut url = url::Url::parse(&proxy)?;
85                let username = String::from(url.username());
86                let password = String::from(url.password().unwrap_or_default());
87
88                url.set_username("").map_err(|_| Error::Internal)?;
89                url.set_password(None).map_err(|_| Error::Internal)?;
90
91                let uri = url.to_string().parse()?;
92
93                let mut proxy = hyper_proxy::Proxy::new(hyper_proxy::Intercept::All, uri);
94
95                if username != "" {
96                    let credentials =
97                        typed_headers::Credentials::basic(&username, &password).map_err(|_| Error::Internal)?;
98
99                    proxy.set_authorization(credentials);
100                }
101
102                let proxy_connector = hyper_proxy::ProxyConnector::from_proxy(connector, proxy)?;
103
104                Client::Proxy(hyper::Client::builder().build(proxy_connector))
105            }
106            Err(_) => Client::NoProxy(hyper::Client::builder().build(connector)),
107        };
108
109        let basic_auth = {
110            let url = Url::parse(url)?;
111            let user = url.username();
112            let auth = format!("{}:{}", user, url.password().unwrap_or_default());
113            if &auth == ":" {
114                None
115            } else {
116                Some(HeaderValue::from_str(&format!("Basic {}", base64::encode(&auth)))?)
117            }
118        };
119
120        Ok(Http {
121            id: Arc::new(AtomicUsize::new(1)),
122            url: url.parse()?,
123            basic_auth,
124            client,
125        })
126    }
127
128    fn send_request<F, O>(&self, id: RequestId, request: rpc::Request, extract: F) -> Response<F>
129    where
130        F: Fn(Vec<u8>) -> O,
131    {
132        let request = helpers::to_string(&request);
133        log::debug!("[{}] Sending: {} to {}", id, request, self.url);
134        let len = request.len();
135        let mut req = hyper::Request::new(hyper::Body::from(request));
136        *req.method_mut() = hyper::Method::POST;
137        *req.uri_mut() = self.url.clone();
138        req.headers_mut().insert(
139            hyper::header::CONTENT_TYPE,
140            HeaderValue::from_static("application/json"),
141        );
142        req.headers_mut()
143            .insert(hyper::header::USER_AGENT, HeaderValue::from_static("web3.rs"));
144
145        // Don't send chunked request
146        if len < MAX_SINGLE_CHUNK {
147            req.headers_mut().insert(hyper::header::CONTENT_LENGTH, len.into());
148        }
149
150        // Send basic auth header
151        if let Some(ref basic_auth) = self.basic_auth {
152            req.headers_mut()
153                .insert(hyper::header::AUTHORIZATION, basic_auth.clone());
154        }
155        let result = self.client.request(req);
156
157        Response::new(id, result, extract)
158    }
159}
160
161impl Transport for Http {
162    type Out = Response<fn(Vec<u8>) -> error::Result<rpc::Value>>;
163
164    fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
165        let id = self.id.fetch_add(1, atomic::Ordering::AcqRel);
166        let request = helpers::build_request(id, method, params);
167
168        (id, request)
169    }
170
171    fn send(&self, id: RequestId, request: rpc::Call) -> Self::Out {
172        self.send_request(id, rpc::Request::Single(request), single_response)
173    }
174}
175
176impl BatchTransport for Http {
177    type Batch = Response<fn(Vec<u8>) -> error::Result<Vec<error::Result<rpc::Value>>>>;
178
179    fn send_batch<T>(&self, requests: T) -> Self::Batch
180    where
181        T: IntoIterator<Item = (RequestId, rpc::Call)>,
182    {
183        let mut it = requests.into_iter();
184        let (id, first) = it.next().map(|x| (x.0, Some(x.1))).unwrap_or_else(|| (0, None));
185        let requests = first.into_iter().chain(it.map(|x| x.1)).collect();
186
187        self.send_request(id, rpc::Request::Batch(requests), batch_response)
188    }
189}
190
191/// Parse bytes RPC response into `Result`.
192fn single_response<T: Deref<Target = [u8]>>(response: T) -> error::Result<rpc::Value> {
193    let response =
194        helpers::to_response_from_slice(&*response).map_err(|e| Error::InvalidResponse(format!("{:?}", e)))?;
195    match response {
196        rpc::Response::Single(output) => helpers::to_result_from_output(output),
197        _ => Err(Error::InvalidResponse("Expected single, got batch.".into())),
198    }
199}
200
201/// Parse bytes RPC batch response into `Result`.
202fn batch_response<T: Deref<Target = [u8]>>(response: T) -> error::Result<Vec<error::Result<rpc::Value>>> {
203    let response =
204        helpers::to_response_from_slice(&*response).map_err(|e| Error::InvalidResponse(format!("{:?}", e)))?;
205    match response {
206        rpc::Response::Batch(outputs) => Ok(outputs.into_iter().map(helpers::to_result_from_output).collect()),
207        _ => Err(Error::InvalidResponse("Expected batch, got single.".into())),
208    }
209}
210
211enum ResponseState {
212    Waiting(hyper::client::ResponseFuture),
213    Reading(Vec<u8>, hyper::Body),
214}
215
216/// A future representing a response to a pending request.
217pub struct Response<T> {
218    id: RequestId,
219    extract: T,
220    state: ResponseState,
221}
222
223impl<T> Response<T> {
224    /// Creates a new `Response`
225    pub fn new(id: RequestId, response: hyper::client::ResponseFuture, extract: T) -> Self {
226        log::trace!("[{}] Request pending.", id);
227        Response {
228            id,
229            extract,
230            state: ResponseState::Waiting(response),
231        }
232    }
233}
234
235// We can do this because `hyper::client::ResponseFuture: Unpin`.
236impl<T> Unpin for Response<T> {}
237
238impl<T, Out> Future for Response<T>
239where
240    T: Fn(Vec<u8>) -> error::Result<Out>,
241    Out: fmt::Debug,
242{
243    type Output = error::Result<Out>;
244
245    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
246        let id = self.id;
247        loop {
248            match self.state {
249                ResponseState::Waiting(ref mut waiting) => {
250                    log::trace!("[{}] Checking response.", id);
251                    let response = ready!(waiting.poll_unpin(ctx))?;
252                    if !response.status().is_success() {
253                        return Poll::Ready(Err(Error::Transport(format!(
254                            "Unexpected response status code: {}",
255                            response.status()
256                        ))));
257                    }
258                    self.state = ResponseState::Reading(Default::default(), response.into_body());
259                }
260                ResponseState::Reading(ref mut content, ref mut body) => {
261                    log::trace!("[{}] Reading body.", id);
262                    match ready!(body.poll_next_unpin(ctx)) {
263                        Some(chunk) => {
264                            content.extend(&*chunk?);
265                        }
266                        None => {
267                            let response = std::mem::take(content);
268                            log::trace!(
269                                "[{}] Extracting result from:\n{}",
270                                self.id,
271                                std::str::from_utf8(&response).unwrap_or("<invalid utf8>")
272                            );
273                            return Poll::Ready((self.extract)(response));
274                        }
275                    }
276                }
277            }
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn http_supports_basic_auth_with_user_and_password() {
288        let http = Http::new("https://user:password@127.0.0.1:8545").unwrap();
289        assert!(http.basic_auth.is_some());
290        assert_eq!(
291            http.basic_auth,
292            Some(HeaderValue::from_static("Basic dXNlcjpwYXNzd29yZA=="))
293        )
294    }
295
296    #[test]
297    fn http_supports_basic_auth_with_user_no_password() {
298        let http = Http::new("https://username:@127.0.0.1:8545").unwrap();
299        assert!(http.basic_auth.is_some());
300        assert_eq!(http.basic_auth, Some(HeaderValue::from_static("Basic dXNlcm5hbWU6")))
301    }
302
303    #[test]
304    fn http_supports_basic_auth_with_only_password() {
305        let http = Http::new("https://:password@127.0.0.1:8545").unwrap();
306        assert!(http.basic_auth.is_some());
307        assert_eq!(http.basic_auth, Some(HeaderValue::from_static("Basic OnBhc3N3b3Jk")))
308    }
309
310    async fn server(req: hyper::Request<hyper::Body>) -> hyper::Result<hyper::Response<hyper::Body>> {
311        use hyper::body::HttpBody;
312
313        let expected = r#"{"jsonrpc":"2.0","method":"eth_getAccounts","params":[],"id":1}"#;
314        let response = r#"{"jsonrpc":"2.0","id":1,"result":"x"}"#;
315
316        assert_eq!(req.method(), &hyper::Method::POST);
317        assert_eq!(req.uri().path(), "/");
318        let mut content: Vec<u8> = vec![];
319        let mut body = req.into_body();
320        while let Some(Ok(chunk)) = body.data().await {
321            content.extend(&*chunk);
322        }
323        assert_eq!(std::str::from_utf8(&*content), Ok(expected));
324
325        Ok(hyper::Response::new(response.into()))
326    }
327
328    #[tokio::test]
329    async fn should_make_a_request() {
330        use hyper::service::{make_service_fn, service_fn};
331
332        // given
333        let addr = "127.0.0.1:3001";
334        // start server
335        let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(server)) });
336        let server = hyper::Server::bind(&addr.parse().unwrap()).serve(service);
337        tokio::spawn(async move {
338            println!("Listening on http://{}", addr);
339            server.await.unwrap();
340        });
341
342        // when
343        let client = Http::new(&format!("http://{}", addr)).unwrap();
344        println!("Sending request");
345        let response = client.execute("eth_getAccounts", vec![]).await;
346        println!("Got response");
347
348        // then
349        assert_eq!(response, Ok(rpc::Value::String("x".into())));
350    }
351}