futures_jsonrpcv2/rasi/
http.rs

1use std::{any::Any, io, net::ToSocketAddrs, path::Path, str::from_utf8, time::Duration};
2
3use futures::TryStreamExt;
4use futures_http::{
5    body::BodyReader,
6    client::rasio::{HttpClient, HttpClientOptions, HttpClientOptionsBuilder},
7    types::{
8        request::{Builder as RequestBuilder, Parts},
9        Error as HttpError, HeaderName, HeaderValue, Request, StatusCode, Uri,
10    },
11};
12use rasi::{task::spawn_ok, timer::TimeoutExt};
13use serde_json::json;
14
15use crate::{
16    client::{JsonRpcClient, JsonRpcClientState},
17    Error, ErrorCode,
18};
19
20/// A builder to create a http jsonrpc client.
21pub struct HttpJsonRpcClient {
22    max_body_size: usize,
23    send_cached_len: usize,
24    timeout: Duration,
25    builder: RequestBuilder,
26    send_ops: HttpClientOptionsBuilder,
27}
28
29impl HttpJsonRpcClient {
30    /// Create new http rpc client builder with server uri.
31    pub fn new<T>(uri: T) -> Self
32    where
33        Uri: TryFrom<T>,
34        <Uri as TryFrom<T>>::Error: Into<HttpError>,
35    {
36        HttpJsonRpcClient {
37            max_body_size: 1024 * 1024,
38            send_cached_len: 10,
39            timeout: Duration::from_secs(5),
40            builder: RequestBuilder::new().method("POST").uri(uri),
41            send_ops: HttpClientOptions::new(),
42        }
43    }
44
45    /// Appends a http header to the http `JsonRpcClient` builder.
46    pub fn header<K, V>(mut self, key: K, value: V) -> Self
47    where
48        HeaderName: TryFrom<K>,
49        <HeaderName as TryFrom<K>>::Error: Into<HttpError>,
50        HeaderValue: TryFrom<V>,
51        <HeaderValue as TryFrom<V>>::Error: Into<HttpError>,
52    {
53        self.builder = self.builder.header(key, value);
54
55        self
56    }
57
58    /// Add an extension to the http `JsonRpcClient` builder.
59    pub fn extension<T>(mut self, extension: T) -> Self
60    where
61        T: Clone + Any + Send + Sync + 'static,
62    {
63        self.builder = self.builder.extension(extension);
64        self
65    }
66
67    /// Rewrite http request's host:port fields and send request to the specified `raddrs`.
68    pub fn redirect<R: ToSocketAddrs>(mut self, raddrs: R) -> Self {
69        self.send_ops = self.send_ops.redirect(raddrs);
70
71        self
72    }
73
74    /// Set remote server's server name, this option will rewrite request's host field.
75    pub fn with_server_name(mut self, server_name: &str) -> Self {
76        self.send_ops = self.send_ops.with_server_name(server_name);
77
78        self
79    }
80
81    /// Set the server verification ca file, this is useful for self signed server.
82    pub fn with_ca_file<P: AsRef<Path>>(mut self, ca_file: P) -> Self {
83        self.send_ops = self.send_ops.with_ca_file(ca_file);
84        self
85    }
86
87    /// Set the timeout duration of the jsonrpc call via http request.
88    pub fn timeout(mut self, duration: Duration) -> Self {
89        self.timeout = duration;
90        self
91    }
92
93    /// Configures the use of Server Name Indication (SNI) when connecting.
94    /// Defaults to true.
95    pub fn set_use_server_name_indication(mut self, value: bool) -> Self {
96        self.send_ops = self.send_ops.set_use_server_name_indication(value);
97        self
98    }
99
100    /// Consume builder and create a new `JsonRpcClient` instance.
101    pub fn create(self) -> io::Result<JsonRpcClient> {
102        let client = JsonRpcClient::new(self.send_cached_len);
103
104        let background = client.to_state();
105
106        spawn_ok(async move {
107            if let Err(err) = self.run_loop(background).await {
108                log::error!(target: "HttpJsonRpcClient", "stop background task, {}",err);
109            } else {
110                log::info!(target: "HttpJsonRpcClient", "stop background task");
111            }
112        });
113
114        Ok(client)
115    }
116
117    async fn run_loop(self, background: JsonRpcClientState) -> std::io::Result<()> {
118        let request = self
119            .builder
120            .body(())
121            .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
122
123        let (parts, _) = request.into_parts();
124
125        let ops: HttpClientOptions = self.send_ops.try_into()?;
126
127        loop {
128            let (id, packet) = background.send().await?;
129
130            log::trace!("send jsonrpc: {}", from_utf8(&packet).unwrap());
131
132            let call = Self::send_request(&ops, self.max_body_size, parts.clone(), packet)
133                .timeout(self.timeout)
134                .await;
135
136            let buf = match call {
137                Some(Ok(buf)) => buf,
138                Some(Err(err)) => {
139                    _ = background
140                        .recv(
141                            json!({
142                            "id":id,"jsonrpc":"2.0","error": Error {
143                                         code: ErrorCode::InternalError,
144                                         message: err.to_string(),
145                                         data: None::<()>
146                                      }
147                             })
148                            .to_string(),
149                        )
150                        .await;
151
152                    continue;
153                }
154                None => {
155                    _ = background
156                        .recv(
157                            json!({
158                            "id":id,"jsonrpc":"2.0","error": Error {
159                                         code: ErrorCode::InternalError,
160                                         message: "Timeout",
161                                         data: None::<()>
162                                      }
163                             })
164                            .to_string(),
165                        )
166                        .await;
167
168                    continue;
169                }
170            };
171
172            Self::handle_recv(&background, buf).await;
173        }
174    }
175
176    async fn handle_recv<P: AsRef<[u8]>>(client: &JsonRpcClientState, packet: P) {
177        log::trace!("recv jsonrpc: {}", from_utf8(packet.as_ref()).unwrap());
178
179        if let Err(err) = client.recv(packet).await {
180            log::error!("handle http jsonrpc recv with error: {}", err);
181        }
182    }
183
184    async fn send_request(
185        ops: &HttpClientOptions,
186        max_body_size: usize,
187        parts: Parts,
188        packet: Vec<u8>,
189    ) -> io::Result<Vec<u8>> {
190        let request = Request::from_parts(parts, BodyReader::from(packet));
191
192        let resp = request.send(ops).await?;
193
194        if StatusCode::OK != resp.status() {
195            return Err(io::Error::new(io::ErrorKind::Other, resp.status().as_str()));
196        }
197
198        let (_, mut body) = resp.into_parts();
199
200        let mut buf = vec![];
201
202        while let Some(mut chunk) = body.try_next().await? {
203            buf.append(&mut chunk);
204
205            if buf.len() > max_body_size {
206                return Err(io::Error::new(
207                    io::ErrorKind::Other,
208                    format!("Body length too long: {}", max_body_size),
209                ));
210            }
211        }
212
213        Ok(buf)
214    }
215}