1use crate::{error, helpers, rpc, BatchTransport, Error, RequestId, Transport};
4use futures::{
5 self,
6 task::{Context, Poll},
7 Future, FutureExt, StreamExt,
8};
9use hyper::header::HeaderValue;
10use std::{
11 env, fmt,
12 ops::Deref,
13 pin::Pin,
14 sync::{
15 atomic::{self, AtomicUsize},
16 Arc,
17 },
18};
19use url::Url;
20
21impl From<hyper::Error> for Error {
22 fn from(err: hyper::Error) -> Self {
23 Error::Transport(format!("{:?}", err))
24 }
25}
26
27impl From<hyper::http::uri::InvalidUri> for Error {
28 fn from(err: hyper::http::uri::InvalidUri) -> Self {
29 Error::Transport(format!("{:?}", err))
30 }
31}
32
33impl From<hyper::header::InvalidHeaderValue> for Error {
34 fn from(err: hyper::header::InvalidHeaderValue) -> Self {
35 Error::Transport(format!("{}", err))
36 }
37}
38
39const MAX_SINGLE_CHUNK: usize = 256;
41
42#[cfg(feature = "http-tls")]
43#[derive(Debug, Clone)]
44enum Client {
45 Proxy(hyper::Client<hyper_proxy::ProxyConnector<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>>),
46 NoProxy(hyper::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>),
47}
48
49#[cfg(not(feature = "http-tls"))]
50#[derive(Debug, Clone)]
51enum Client {
52 Proxy(hyper::Client<hyper_proxy::ProxyConnector<hyper::client::HttpConnector>>),
53 NoProxy(hyper::Client<hyper::client::HttpConnector>),
54}
55
56impl Client {
57 pub fn request(&self, req: hyper::Request<hyper::Body>) -> hyper::client::ResponseFuture {
58 match self {
59 Client::Proxy(client) => client.request(req),
60 Client::NoProxy(client) => client.request(req),
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
67pub struct Http {
68 id: Arc<AtomicUsize>,
69 url: hyper::Uri,
70 basic_auth: Option<HeaderValue>,
71 client: Client,
72}
73
74impl Http {
75 pub fn new(url: &str) -> error::Result<Self> {
77 #[cfg(feature = "http-tls")]
78 let (proxy_env, connector) = { (env::var("HTTPS_PROXY"), hyper_tls::HttpsConnector::new()) };
79 #[cfg(not(feature = "http-tls"))]
80 let (proxy_env, connector) = { (env::var("HTTP_PROXY"), hyper::client::HttpConnector::new()) };
81
82 let client = match proxy_env {
83 Ok(proxy) => {
84 let mut url = url::Url::parse(&proxy)?;
85 let username = String::from(url.username());
86 let password = String::from(url.password().unwrap_or_default());
87
88 url.set_username("").map_err(|_| Error::Internal)?;
89 url.set_password(None).map_err(|_| Error::Internal)?;
90
91 let uri = url.to_string().parse()?;
92
93 let mut proxy = hyper_proxy::Proxy::new(hyper_proxy::Intercept::All, uri);
94
95 if username != "" {
96 let credentials =
97 typed_headers::Credentials::basic(&username, &password).map_err(|_| Error::Internal)?;
98
99 proxy.set_authorization(credentials);
100 }
101
102 let proxy_connector = hyper_proxy::ProxyConnector::from_proxy(connector, proxy)?;
103
104 Client::Proxy(hyper::Client::builder().build(proxy_connector))
105 }
106 Err(_) => Client::NoProxy(hyper::Client::builder().build(connector)),
107 };
108
109 let basic_auth = {
110 let url = Url::parse(url)?;
111 let user = url.username();
112 let auth = format!("{}:{}", user, url.password().unwrap_or_default());
113 if &auth == ":" {
114 None
115 } else {
116 Some(HeaderValue::from_str(&format!("Basic {}", base64::encode(&auth)))?)
117 }
118 };
119
120 Ok(Http {
121 id: Arc::new(AtomicUsize::new(1)),
122 url: url.parse()?,
123 basic_auth,
124 client,
125 })
126 }
127
128 fn send_request<F, O>(&self, id: RequestId, request: rpc::Request, extract: F) -> Response<F>
129 where
130 F: Fn(Vec<u8>) -> O,
131 {
132 let request = helpers::to_string(&request);
133 log::debug!("[{}] Sending: {} to {}", id, request, self.url);
134 let len = request.len();
135 let mut req = hyper::Request::new(hyper::Body::from(request));
136 *req.method_mut() = hyper::Method::POST;
137 *req.uri_mut() = self.url.clone();
138 req.headers_mut().insert(
139 hyper::header::CONTENT_TYPE,
140 HeaderValue::from_static("application/json"),
141 );
142 req.headers_mut()
143 .insert(hyper::header::USER_AGENT, HeaderValue::from_static("web3.rs"));
144
145 if len < MAX_SINGLE_CHUNK {
147 req.headers_mut().insert(hyper::header::CONTENT_LENGTH, len.into());
148 }
149
150 if let Some(ref basic_auth) = self.basic_auth {
152 req.headers_mut()
153 .insert(hyper::header::AUTHORIZATION, basic_auth.clone());
154 }
155 let result = self.client.request(req);
156
157 Response::new(id, result, extract)
158 }
159}
160
161impl Transport for Http {
162 type Out = Response<fn(Vec<u8>) -> error::Result<rpc::Value>>;
163
164 fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
165 let id = self.id.fetch_add(1, atomic::Ordering::AcqRel);
166 let request = helpers::build_request(id, method, params);
167
168 (id, request)
169 }
170
171 fn send(&self, id: RequestId, request: rpc::Call) -> Self::Out {
172 self.send_request(id, rpc::Request::Single(request), single_response)
173 }
174}
175
176impl BatchTransport for Http {
177 type Batch = Response<fn(Vec<u8>) -> error::Result<Vec<error::Result<rpc::Value>>>>;
178
179 fn send_batch<T>(&self, requests: T) -> Self::Batch
180 where
181 T: IntoIterator<Item = (RequestId, rpc::Call)>,
182 {
183 let mut it = requests.into_iter();
184 let (id, first) = it.next().map(|x| (x.0, Some(x.1))).unwrap_or_else(|| (0, None));
185 let requests = first.into_iter().chain(it.map(|x| x.1)).collect();
186
187 self.send_request(id, rpc::Request::Batch(requests), batch_response)
188 }
189}
190
191fn single_response<T: Deref<Target = [u8]>>(response: T) -> error::Result<rpc::Value> {
193 let response =
194 helpers::to_response_from_slice(&*response).map_err(|e| Error::InvalidResponse(format!("{:?}", e)))?;
195 match response {
196 rpc::Response::Single(output) => helpers::to_result_from_output(output),
197 _ => Err(Error::InvalidResponse("Expected single, got batch.".into())),
198 }
199}
200
201fn batch_response<T: Deref<Target = [u8]>>(response: T) -> error::Result<Vec<error::Result<rpc::Value>>> {
203 let response =
204 helpers::to_response_from_slice(&*response).map_err(|e| Error::InvalidResponse(format!("{:?}", e)))?;
205 match response {
206 rpc::Response::Batch(outputs) => Ok(outputs.into_iter().map(helpers::to_result_from_output).collect()),
207 _ => Err(Error::InvalidResponse("Expected batch, got single.".into())),
208 }
209}
210
211enum ResponseState {
212 Waiting(hyper::client::ResponseFuture),
213 Reading(Vec<u8>, hyper::Body),
214}
215
216pub struct Response<T> {
218 id: RequestId,
219 extract: T,
220 state: ResponseState,
221}
222
223impl<T> Response<T> {
224 pub fn new(id: RequestId, response: hyper::client::ResponseFuture, extract: T) -> Self {
226 log::trace!("[{}] Request pending.", id);
227 Response {
228 id,
229 extract,
230 state: ResponseState::Waiting(response),
231 }
232 }
233}
234
235impl<T> Unpin for Response<T> {}
237
238impl<T, Out> Future for Response<T>
239where
240 T: Fn(Vec<u8>) -> error::Result<Out>,
241 Out: fmt::Debug,
242{
243 type Output = error::Result<Out>;
244
245 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
246 let id = self.id;
247 loop {
248 match self.state {
249 ResponseState::Waiting(ref mut waiting) => {
250 log::trace!("[{}] Checking response.", id);
251 let response = ready!(waiting.poll_unpin(ctx))?;
252 if !response.status().is_success() {
253 return Poll::Ready(Err(Error::Transport(format!(
254 "Unexpected response status code: {}",
255 response.status()
256 ))));
257 }
258 self.state = ResponseState::Reading(Default::default(), response.into_body());
259 }
260 ResponseState::Reading(ref mut content, ref mut body) => {
261 log::trace!("[{}] Reading body.", id);
262 match ready!(body.poll_next_unpin(ctx)) {
263 Some(chunk) => {
264 content.extend(&*chunk?);
265 }
266 None => {
267 let response = std::mem::take(content);
268 log::trace!(
269 "[{}] Extracting result from:\n{}",
270 self.id,
271 std::str::from_utf8(&response).unwrap_or("<invalid utf8>")
272 );
273 return Poll::Ready((self.extract)(response));
274 }
275 }
276 }
277 }
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
287 fn http_supports_basic_auth_with_user_and_password() {
288 let http = Http::new("https://user:password@127.0.0.1:8545").unwrap();
289 assert!(http.basic_auth.is_some());
290 assert_eq!(
291 http.basic_auth,
292 Some(HeaderValue::from_static("Basic dXNlcjpwYXNzd29yZA=="))
293 )
294 }
295
296 #[test]
297 fn http_supports_basic_auth_with_user_no_password() {
298 let http = Http::new("https://username:@127.0.0.1:8545").unwrap();
299 assert!(http.basic_auth.is_some());
300 assert_eq!(http.basic_auth, Some(HeaderValue::from_static("Basic dXNlcm5hbWU6")))
301 }
302
303 #[test]
304 fn http_supports_basic_auth_with_only_password() {
305 let http = Http::new("https://:password@127.0.0.1:8545").unwrap();
306 assert!(http.basic_auth.is_some());
307 assert_eq!(http.basic_auth, Some(HeaderValue::from_static("Basic OnBhc3N3b3Jk")))
308 }
309
310 async fn server(req: hyper::Request<hyper::Body>) -> hyper::Result<hyper::Response<hyper::Body>> {
311 use hyper::body::HttpBody;
312
313 let expected = r#"{"jsonrpc":"2.0","method":"eth_getAccounts","params":[],"id":1}"#;
314 let response = r#"{"jsonrpc":"2.0","id":1,"result":"x"}"#;
315
316 assert_eq!(req.method(), &hyper::Method::POST);
317 assert_eq!(req.uri().path(), "/");
318 let mut content: Vec<u8> = vec![];
319 let mut body = req.into_body();
320 while let Some(Ok(chunk)) = body.data().await {
321 content.extend(&*chunk);
322 }
323 assert_eq!(std::str::from_utf8(&*content), Ok(expected));
324
325 Ok(hyper::Response::new(response.into()))
326 }
327
328 #[tokio::test]
329 async fn should_make_a_request() {
330 use hyper::service::{make_service_fn, service_fn};
331
332 let addr = "127.0.0.1:3001";
334 let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(server)) });
336 let server = hyper::Server::bind(&addr.parse().unwrap()).serve(service);
337 tokio::spawn(async move {
338 println!("Listening on http://{}", addr);
339 server.await.unwrap();
340 });
341
342 let client = Http::new(&format!("http://{}", addr)).unwrap();
344 println!("Sending request");
345 let response = client.execute("eth_getAccounts", vec![]).await;
346 println!("Got response");
347
348 assert_eq!(response, Ok(rpc::Value::String("x".into())));
350 }
351}