1use std::{any::Any, io, net::ToSocketAddrs, path::Path, str::from_utf8, time::Duration};
2
3use futures::TryStreamExt;
4use futures_http::{
5 body::BodyReader,
6 client::rasio::{HttpClient, HttpClientOptions, HttpClientOptionsBuilder},
7 types::{
8 request::{Builder as RequestBuilder, Parts},
9 Error as HttpError, HeaderName, HeaderValue, Request, StatusCode, Uri,
10 },
11};
12use rasi::{task::spawn_ok, timer::TimeoutExt};
13use serde_json::json;
14
15use crate::{
16 client::{JsonRpcClient, JsonRpcClientState},
17 Error, ErrorCode,
18};
19
20pub struct HttpJsonRpcClient {
22 max_body_size: usize,
23 send_cached_len: usize,
24 timeout: Duration,
25 builder: RequestBuilder,
26 send_ops: HttpClientOptionsBuilder,
27}
28
29impl HttpJsonRpcClient {
30 pub fn new<T>(uri: T) -> Self
32 where
33 Uri: TryFrom<T>,
34 <Uri as TryFrom<T>>::Error: Into<HttpError>,
35 {
36 HttpJsonRpcClient {
37 max_body_size: 1024 * 1024,
38 send_cached_len: 10,
39 timeout: Duration::from_secs(5),
40 builder: RequestBuilder::new().method("POST").uri(uri),
41 send_ops: HttpClientOptions::new(),
42 }
43 }
44
45 pub fn header<K, V>(mut self, key: K, value: V) -> Self
47 where
48 HeaderName: TryFrom<K>,
49 <HeaderName as TryFrom<K>>::Error: Into<HttpError>,
50 HeaderValue: TryFrom<V>,
51 <HeaderValue as TryFrom<V>>::Error: Into<HttpError>,
52 {
53 self.builder = self.builder.header(key, value);
54
55 self
56 }
57
58 pub fn extension<T>(mut self, extension: T) -> Self
60 where
61 T: Clone + Any + Send + Sync + 'static,
62 {
63 self.builder = self.builder.extension(extension);
64 self
65 }
66
67 pub fn redirect<R: ToSocketAddrs>(mut self, raddrs: R) -> Self {
69 self.send_ops = self.send_ops.redirect(raddrs);
70
71 self
72 }
73
74 pub fn with_server_name(mut self, server_name: &str) -> Self {
76 self.send_ops = self.send_ops.with_server_name(server_name);
77
78 self
79 }
80
81 pub fn with_ca_file<P: AsRef<Path>>(mut self, ca_file: P) -> Self {
83 self.send_ops = self.send_ops.with_ca_file(ca_file);
84 self
85 }
86
87 pub fn timeout(mut self, duration: Duration) -> Self {
89 self.timeout = duration;
90 self
91 }
92
93 pub fn set_use_server_name_indication(mut self, value: bool) -> Self {
96 self.send_ops = self.send_ops.set_use_server_name_indication(value);
97 self
98 }
99
100 pub fn create(self) -> io::Result<JsonRpcClient> {
102 let client = JsonRpcClient::new(self.send_cached_len);
103
104 let background = client.to_state();
105
106 spawn_ok(async move {
107 if let Err(err) = self.run_loop(background).await {
108 log::error!(target: "HttpJsonRpcClient", "stop background task, {}",err);
109 } else {
110 log::info!(target: "HttpJsonRpcClient", "stop background task");
111 }
112 });
113
114 Ok(client)
115 }
116
117 async fn run_loop(self, background: JsonRpcClientState) -> std::io::Result<()> {
118 let request = self
119 .builder
120 .body(())
121 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
122
123 let (parts, _) = request.into_parts();
124
125 let ops: HttpClientOptions = self.send_ops.try_into()?;
126
127 loop {
128 let (id, packet) = background.send().await?;
129
130 log::trace!("send jsonrpc: {}", from_utf8(&packet).unwrap());
131
132 let call = Self::send_request(&ops, self.max_body_size, parts.clone(), packet)
133 .timeout(self.timeout)
134 .await;
135
136 let buf = match call {
137 Some(Ok(buf)) => buf,
138 Some(Err(err)) => {
139 _ = background
140 .recv(
141 json!({
142 "id":id,"jsonrpc":"2.0","error": Error {
143 code: ErrorCode::InternalError,
144 message: err.to_string(),
145 data: None::<()>
146 }
147 })
148 .to_string(),
149 )
150 .await;
151
152 continue;
153 }
154 None => {
155 _ = background
156 .recv(
157 json!({
158 "id":id,"jsonrpc":"2.0","error": Error {
159 code: ErrorCode::InternalError,
160 message: "Timeout",
161 data: None::<()>
162 }
163 })
164 .to_string(),
165 )
166 .await;
167
168 continue;
169 }
170 };
171
172 Self::handle_recv(&background, buf).await;
173 }
174 }
175
176 async fn handle_recv<P: AsRef<[u8]>>(client: &JsonRpcClientState, packet: P) {
177 log::trace!("recv jsonrpc: {}", from_utf8(packet.as_ref()).unwrap());
178
179 if let Err(err) = client.recv(packet).await {
180 log::error!("handle http jsonrpc recv with error: {}", err);
181 }
182 }
183
184 async fn send_request(
185 ops: &HttpClientOptions,
186 max_body_size: usize,
187 parts: Parts,
188 packet: Vec<u8>,
189 ) -> io::Result<Vec<u8>> {
190 let request = Request::from_parts(parts, BodyReader::from(packet));
191
192 let resp = request.send(ops).await?;
193
194 if StatusCode::OK != resp.status() {
195 return Err(io::Error::new(io::ErrorKind::Other, resp.status().as_str()));
196 }
197
198 let (_, mut body) = resp.into_parts();
199
200 let mut buf = vec![];
201
202 while let Some(mut chunk) = body.try_next().await? {
203 buf.append(&mut chunk);
204
205 if buf.len() > max_body_size {
206 return Err(io::Error::new(
207 io::ErrorKind::Other,
208 format!("Body length too long: {}", max_body_size),
209 ));
210 }
211 }
212
213 Ok(buf)
214 }
215}