1use std::fmt::Debug;
2
3use anyhow::{anyhow, Result};
4use bytes::Bytes;
5use http_body_util::{BodyExt, Full};
6use hyper::{header::HeaderValue, Request, StatusCode, Uri};
7use hyper_tls::HttpsConnector;
8use hyper_util::{client::legacy::Client, rt::TokioExecutor};
9use serde::{Deserialize, Serialize};
10
11use crate::{RPCError, RPCRequest, RPCResponse, RPCResult};
12
13pub async fn call<P, R>(
14 url: &str,
15 method: &str,
16 params: &P,
17 auth: Option<&str>,
18) -> RPCResult<Option<R>>
19where
20 R: for<'de> Deserialize<'de> + Debug,
21 P: Serialize,
22{
23 let req = RPCRequest::new(method, params);
24 let s = serde_json::to_string(&req).map_err(|e| RPCError::internal_error(format!("{e:?}")))?;
25 let mut headers = vec![
26 ("content-type", String::from("application/json")),
27 ("User-Agent", String::from("hyper-client")),
28 ];
29 if let Some(t) = auth {
30 let r = format!("Bearer {}", t);
31 headers.push(("Authorization", r));
32 }
33
34 let (status_code, bytes) = http_post(url, s.as_bytes(), Some(&headers))
35 .await
36 .map_err(|e| RPCError::internal_error(format!("{e:?}")))?;
37
38 if !status_code.is_success() {
39 log::error!(
40 "StatusCode:{:?}, Response is: {:?}",
41 status_code,
42 String::from_utf8_lossy(&bytes)
43 );
44 return Err(RPCError::internal_error(String::from(
45 "Failed to request uri",
46 )));
47 } else {
48 log::debug!(
49 "StatusCode:{:?}, Response is: {:?}",
50 status_code,
51 String::from_utf8_lossy(&bytes)
52 );
53 }
54
55 let resp: RPCResponse<R> =
56 serde_json::from_slice(&bytes).map_err(|e| RPCError::internal_error(format!("{e:?}")))?;
57
58 if let Some(e) = resp.error {
59 Err(e)
60 } else {
61 Ok(resp.result)
62 }
63}
64
65pub async fn batch_call<P, R>(
66 url: &str,
67 requests: &Vec<RPCRequest<P>>,
68 auth: Option<&str>,
69) -> Result<Vec<RPCResponse<R>>>
70where
71 R: for<'de> Deserialize<'de>,
72 P: Serialize + Clone,
73{
74 let s = serde_json::to_string(&requests)?;
75
76 let mut headers = vec![("content-type", String::from("application/json"))];
77 if let Some(t) = auth {
78 let r = format!("Bearer {}", t);
79 headers.push(("Authorization", r));
80 }
81
82 let (status_code, bytes) = http_post(url, s.as_bytes(), Some(&headers)).await?;
83 log::debug!(
84 "StatusCode:{:?}, Response is: {:?}",
85 status_code,
86 String::from_utf8_lossy(&bytes)
87 );
88
89 if status_code.is_success() {
90 Ok(serde_json::from_slice(&bytes)?)
91 } else {
92 log::error!(
93 "StatusCode:{:?}, Response is: {:?}",
94 status_code,
95 String::from_utf8_lossy(&bytes)
96 );
97 Err(anyhow!("Failed to request uri"))
98 }
99}
100
101pub async fn http_post_ret_string(
102 url: &str,
103 body: &[u8],
104 headers: Option<&[(&'static str, String)]>,
105) -> Result<(StatusCode, String)> {
106 http_post(url, body, headers)
107 .await
108 .map(|(code, msg)| (code, String::from_utf8_lossy(&msg).into_owned()))
109}
110
111pub async fn http_post(
112 url: &str,
113 body: &[u8],
114 headers: Option<&[(&'static str, String)]>,
115) -> Result<(StatusCode, Vec<u8>)> {
116 let uri: Uri = url.parse()?;
117 let request = Request::post(uri).body(Full::from(body.to_vec()))?;
118 send_http_request(request, headers).await
119}
120pub async fn http_get_ret_string(
121 url: &str,
122 body: &[u8],
123 headers: Option<&[(&'static str, String)]>,
124) -> Result<(StatusCode, String)> {
125 http_get(url, body, headers)
126 .await
127 .map(|(code, msg)| (code, String::from_utf8_lossy(&msg).into_owned()))
128}
129
130pub async fn http_get(
131 url: &str,
132 body: &[u8],
133 headers: Option<&[(&'static str, String)]>,
134) -> Result<(StatusCode, Vec<u8>)> {
135 let uri: Uri = url.parse()?;
136 let request = Request::get(uri).body(Full::from(body.to_vec()))?;
137 send_http_request(request, headers).await
138}
139
140async fn send_http_request(
141 mut request: Request<Full<Bytes>>,
142 headers: Option<&[(&'static str, String)]>,
143) -> Result<(StatusCode, Vec<u8>)> {
144 let connector = HttpsConnector::new();
145 let client = Client::builder(TokioExecutor::new()).build(connector);
146
147 if let Some(v) = headers {
148 let hs = request.headers_mut();
149 for (h, v) in v.iter() {
150 hs.insert(*h, HeaderValue::from_str(v)?);
151 }
152 }
153
154 let response = client.request(request).await?;
155 let status_code = response.status();
156 let body = response.into_body().collect().await?.to_bytes().to_vec();
157 Ok((status_code, body))
158}