json_rpc_server/
client.rs

1use std::fmt::Debug;
2
3use anyhow::{anyhow, Result};
4use bytes::Bytes;
5use http_body_util::{BodyExt, Full};
6use hyper::{header::HeaderValue, Request, StatusCode, Uri};
7use hyper_tls::HttpsConnector;
8use hyper_util::{client::legacy::Client, rt::TokioExecutor};
9use serde::{Deserialize, Serialize};
10
11use crate::{RPCError, RPCRequest, RPCResponse, RPCResult};
12
13pub async fn call<P, R>(
14    url: &str,
15    method: &str,
16    params: &P,
17    auth: Option<&str>,
18) -> RPCResult<Option<R>>
19where
20    R: for<'de> Deserialize<'de> + Debug,
21    P: Serialize,
22{
23    let req = RPCRequest::new(method, params);
24    let s = serde_json::to_string(&req).map_err(|e| RPCError::internal_error(format!("{e:?}")))?;
25    let mut headers = vec![
26        ("content-type", String::from("application/json")),
27        ("User-Agent", String::from("hyper-client")),
28    ];
29    if let Some(t) = auth {
30        let r = format!("Bearer {}", t);
31        headers.push(("Authorization", r));
32    }
33
34    let (status_code, bytes) = http_post(url, s.as_bytes(), Some(&headers))
35        .await
36        .map_err(|e| RPCError::internal_error(format!("{e:?}")))?;
37
38    if !status_code.is_success() {
39        log::error!(
40            "StatusCode:{:?}, Response is: {:?}",
41            status_code,
42            String::from_utf8_lossy(&bytes)
43        );
44        return Err(RPCError::internal_error(String::from(
45            "Failed to request uri",
46        )));
47    } else {
48        log::debug!(
49            "StatusCode:{:?}, Response is: {:?}",
50            status_code,
51            String::from_utf8_lossy(&bytes)
52        );
53    }
54
55    let resp: RPCResponse<R> =
56        serde_json::from_slice(&bytes).map_err(|e| RPCError::internal_error(format!("{e:?}")))?;
57
58    if let Some(e) = resp.error {
59        Err(e)
60    } else {
61        Ok(resp.result)
62    }
63}
64
65pub async fn batch_call<P, R>(
66    url: &str,
67    requests: &Vec<RPCRequest<P>>,
68    auth: Option<&str>,
69) -> Result<Vec<RPCResponse<R>>>
70where
71    R: for<'de> Deserialize<'de>,
72    P: Serialize + Clone,
73{
74    let s = serde_json::to_string(&requests)?;
75
76    let mut headers = vec![("content-type", String::from("application/json"))];
77    if let Some(t) = auth {
78        let r = format!("Bearer {}", t);
79        headers.push(("Authorization", r));
80    }
81
82    let (status_code, bytes) = http_post(url, s.as_bytes(), Some(&headers)).await?;
83    log::debug!(
84        "StatusCode:{:?}, Response is: {:?}",
85        status_code,
86        String::from_utf8_lossy(&bytes)
87    );
88
89    if status_code.is_success() {
90        Ok(serde_json::from_slice(&bytes)?)
91    } else {
92        log::error!(
93            "StatusCode:{:?}, Response is: {:?}",
94            status_code,
95            String::from_utf8_lossy(&bytes)
96        );
97        Err(anyhow!("Failed to request uri"))
98    }
99}
100
101pub async fn http_post_ret_string(
102    url: &str,
103    body: &[u8],
104    headers: Option<&[(&'static str, String)]>,
105) -> Result<(StatusCode, String)> {
106    http_post(url, body, headers)
107        .await
108        .map(|(code, msg)| (code, String::from_utf8_lossy(&msg).into_owned()))
109}
110
111pub async fn http_post(
112    url: &str,
113    body: &[u8],
114    headers: Option<&[(&'static str, String)]>,
115) -> Result<(StatusCode, Vec<u8>)> {
116    let uri: Uri = url.parse()?;
117    let request = Request::post(uri).body(Full::from(body.to_vec()))?;
118    send_http_request(request, headers).await
119}
120pub async fn http_get_ret_string(
121    url: &str,
122    body: &[u8],
123    headers: Option<&[(&'static str, String)]>,
124) -> Result<(StatusCode, String)> {
125    http_get(url, body, headers)
126        .await
127        .map(|(code, msg)| (code, String::from_utf8_lossy(&msg).into_owned()))
128}
129
130pub async fn http_get(
131    url: &str,
132    body: &[u8],
133    headers: Option<&[(&'static str, String)]>,
134) -> Result<(StatusCode, Vec<u8>)> {
135    let uri: Uri = url.parse()?;
136    let request = Request::get(uri).body(Full::from(body.to_vec()))?;
137    send_http_request(request, headers).await
138}
139
140async fn send_http_request(
141    mut request: Request<Full<Bytes>>,
142    headers: Option<&[(&'static str, String)]>,
143) -> Result<(StatusCode, Vec<u8>)> {
144    let connector = HttpsConnector::new();
145    let client = Client::builder(TokioExecutor::new()).build(connector);
146
147    if let Some(v) = headers {
148        let hs = request.headers_mut();
149        for (h, v) in v.iter() {
150            hs.insert(*h, HeaderValue::from_str(v)?);
151        }
152    }
153
154    let response = client.request(request).await?;
155    let status_code = response.status();
156    let body = response.into_body().collect().await?.to_bytes().to_vec();
157    Ok((status_code, body))
158}