1use std::{
2 convert::Infallible,
3 error::Error,
4 fmt::Debug,
5 marker::PhantomData,
6 time::{Duration, Instant},
7};
8
9use longbridge_geo::is_cn;
10use reqwest::{
11 Method, StatusCode,
12 header::{HeaderMap, HeaderName, HeaderValue},
13};
14use serde::{Deserialize, Serialize, de::DeserializeOwned};
15
16use crate::{
17 AuthConfig, HttpClient, HttpClientError, HttpClientResult,
18 signature::{SignatureParams, signature},
19 timestamp::Timestamp,
20};
21
22const HTTP_URL: &str = "https://openapi.longbridge.com";
23const HTTP_URL_CN: &str = "https://openapi.longbridge.cn";
24
25const USER_AGENT: &str = "openapi-sdk";
26const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
27const RETRY_COUNT: usize = 5;
28const RETRY_INITIAL_DELAY: Duration = Duration::from_millis(100);
29const RETRY_FACTOR: f32 = 2.0;
30
31#[derive(Debug)]
33pub struct Json<T>(pub T);
34
35pub trait FromPayload: Sized + Send + Sync + 'static {
37 type Err: Error;
39
40 fn parse_from_bytes(data: &[u8]) -> Result<Self, Self::Err>;
42}
43
44pub trait ToPayload: Debug + Sized + Send + Sync + 'static {
46 type Err: Error;
48
49 fn to_bytes(&self) -> Result<Vec<u8>, Self::Err>;
51}
52
53impl<T> FromPayload for Json<T>
54where
55 T: DeserializeOwned + Send + Sync + 'static,
56{
57 type Err = serde_json::Error;
58
59 #[inline]
60 fn parse_from_bytes(data: &[u8]) -> Result<Self, Self::Err> {
61 Ok(Json(serde_json::from_slice(data)?))
62 }
63}
64
65impl<T> ToPayload for Json<T>
66where
67 T: Debug + Serialize + Send + Sync + 'static,
68{
69 type Err = serde_json::Error;
70
71 #[inline]
72 fn to_bytes(&self) -> Result<Vec<u8>, Self::Err> {
73 serde_json::to_vec(&self.0)
74 }
75}
76
77impl FromPayload for String {
78 type Err = std::string::FromUtf8Error;
79
80 #[inline]
81 fn parse_from_bytes(data: &[u8]) -> Result<Self, Self::Err> {
82 String::from_utf8(data.to_vec())
83 }
84}
85
86impl ToPayload for String {
87 type Err = std::string::FromUtf8Error;
88
89 #[inline]
90 fn to_bytes(&self) -> Result<Vec<u8>, Self::Err> {
91 Ok(self.clone().into_bytes())
92 }
93}
94
95impl FromPayload for () {
96 type Err = Infallible;
97
98 #[inline]
99 fn parse_from_bytes(_data: &[u8]) -> Result<Self, Self::Err> {
100 Ok(())
101 }
102}
103
104impl ToPayload for () {
105 type Err = Infallible;
106
107 #[inline]
108 fn to_bytes(&self) -> Result<Vec<u8>, Self::Err> {
109 Ok(vec![])
110 }
111}
112
113#[derive(Deserialize)]
114struct OpenApiResponse {
115 code: i32,
116 message: String,
117 data: Option<Box<serde_json::value::RawValue>>,
118}
119
120pub struct RequestBuilder<'a, T, Q, R> {
122 client: &'a HttpClient,
123 method: Method,
124 path: String,
125 headers: HeaderMap,
126 body: Option<T>,
127 query_params: Option<Q>,
128 mark_resp: PhantomData<R>,
129}
130
131impl<'a> RequestBuilder<'a, (), (), ()> {
132 pub(crate) fn new(client: &'a HttpClient, method: Method, path: impl Into<String>) -> Self {
133 Self {
134 client,
135 method,
136 path: path.into(),
137 headers: Default::default(),
138 body: None,
139 query_params: None,
140 mark_resp: PhantomData,
141 }
142 }
143}
144
145impl<'a, T, Q, R> RequestBuilder<'a, T, Q, R> {
146 #[must_use]
148 pub fn body<T2>(self, body: T2) -> RequestBuilder<'a, T2, Q, R>
149 where
150 T2: ToPayload,
151 {
152 RequestBuilder {
153 client: self.client,
154 method: self.method,
155 path: self.path,
156 headers: self.headers,
157 body: Some(body),
158 query_params: self.query_params,
159 mark_resp: self.mark_resp,
160 }
161 }
162
163 #[must_use]
165 pub fn header<K, V>(mut self, key: K, value: V) -> Self
166 where
167 K: TryInto<HeaderName>,
168 V: TryInto<HeaderValue>,
169 {
170 let key = key.try_into();
171 let value = value.try_into();
172 if let (Ok(key), Ok(value)) = (key, value) {
173 self.headers.insert(key, value);
174 }
175 self
176 }
177
178 #[must_use]
180 pub fn query_params<Q2>(self, params: Q2) -> RequestBuilder<'a, T, Q2, R>
181 where
182 Q2: Serialize + Send + Sync,
183 {
184 RequestBuilder {
185 client: self.client,
186 method: self.method,
187 path: self.path,
188 headers: self.headers,
189 body: self.body,
190 query_params: Some(params),
191 mark_resp: self.mark_resp,
192 }
193 }
194
195 #[must_use]
197 pub fn response<R2>(self) -> RequestBuilder<'a, T, Q, R2>
198 where
199 R2: FromPayload,
200 {
201 RequestBuilder {
202 client: self.client,
203 method: self.method,
204 path: self.path,
205 headers: self.headers,
206 body: self.body,
207 query_params: self.query_params,
208 mark_resp: PhantomData,
209 }
210 }
211}
212
213impl<T, Q, R> RequestBuilder<'_, T, Q, R>
214where
215 T: ToPayload,
216 Q: Serialize + Send,
217 R: FromPayload,
218{
219 async fn http_url(&self) -> &str {
220 if let Some(url) = self.client.config.http_url.as_deref() {
221 return url;
222 }
223
224 if is_cn().await { HTTP_URL_CN } else { HTTP_URL }
225 }
226
227 async fn do_send(&self) -> HttpClientResult<R> {
228 let HttpClient {
229 http_cli,
230 config,
231 default_headers,
232 } = &self.client;
233 let timestamp = self
234 .headers
235 .get("X-Timestamp")
236 .and_then(|value| value.to_str().ok())
237 .and_then(|value| value.parse().ok())
238 .unwrap_or_else(Timestamp::now);
239
240 let (app_key, access_token, app_secret) = match &config.auth {
242 AuthConfig::ApiKey {
243 app_key,
244 app_secret,
245 access_token,
246 } => (
247 app_key.clone(),
248 access_token.clone(),
249 Some(app_secret.clone()),
250 ),
251 AuthConfig::OAuth(oauth) => {
252 let token = oauth
253 .access_token()
254 .await
255 .map_err(|e| HttpClientError::OAuth(e.to_string()))?;
256 (
257 oauth.client_id().to_string(),
258 format!("Bearer {token}"),
259 None,
260 )
261 }
262 };
263
264 let app_key_value =
265 HeaderValue::from_str(&app_key).map_err(|_| HttpClientError::InvalidApiKey)?;
266 let access_token_value = HeaderValue::from_str(&access_token)
267 .map_err(|_| HttpClientError::InvalidAccessToken)?;
268
269 let url = self.http_url().await;
270 let mut request_builder = http_cli
271 .request(self.method.clone(), format!("{}{}", url, self.path))
272 .headers(default_headers.clone())
273 .headers(self.headers.clone())
274 .header("User-Agent", USER_AGENT)
275 .header("X-Api-Key", app_key_value)
276 .header("Authorization", access_token_value)
277 .header("X-Timestamp", timestamp.to_string())
278 .header("Content-Type", "application/json; charset=utf-8");
279
280 if let Some(body) = &self.body {
282 let body = body
283 .to_bytes()
284 .map_err(|err| HttpClientError::SerializeRequestBody(err.to_string()))?;
285 request_builder = request_builder.body(body);
286 }
287
288 let mut request = request_builder.build().expect("invalid request");
289
290 if let Some(query_params) = &self.query_params {
292 let query_string = crate::qs::to_string(&query_params)?;
293 request.url_mut().set_query(Some(&query_string));
294 }
295
296 if let Some(secret) = app_secret {
298 let sign = signature(SignatureParams {
299 request: &request,
300 app_key: &app_key,
301 access_token: Some(&access_token),
302 app_secret: &secret,
303 timestamp,
304 });
305 if let Some(signature_value) = sign {
306 request.headers_mut().insert(
307 "X-Api-Signature",
308 HeaderValue::from_maybe_shared(signature_value).expect("valid signature"),
309 );
310 }
311 }
312
313 if let Some(body) = &self.body {
314 tracing::info!(method = %request.method(), url = %request.url(), body = ?body, "http request");
315 } else {
316 tracing::info!(method = %request.method(), url = %request.url(), "http request");
317 }
318
319 let s = Instant::now();
320
321 let (status, trace_id, text) = tokio::time::timeout(REQUEST_TIMEOUT, async move {
323 let resp = http_cli
324 .execute(request)
325 .await
326 .map_err(|err| HttpClientError::Http(err.into()))?;
327 let status = resp.status();
328 let trace_id = resp
329 .headers()
330 .get("x-trace-id")
331 .and_then(|value| value.to_str().ok())
332 .unwrap_or_default()
333 .to_string();
334 let text = resp
335 .text()
336 .await
337 .map_err(|err| HttpClientError::Http(err.into()))?;
338 Ok::<_, HttpClientError>((status, trace_id, text))
339 })
340 .await
341 .map_err(|_| HttpClientError::RequestTimeout)??;
342
343 tracing::info!(duration = ?s.elapsed(), body = %text.as_str(), "http response");
344
345 let resp = match serde_json::from_str::<OpenApiResponse>(&text) {
346 Ok(resp) if resp.code == 0 => resp.data.ok_or(HttpClientError::UnexpectedResponse),
347 Ok(resp) => Err(HttpClientError::OpenApi {
348 code: resp.code,
349 message: resp.message,
350 trace_id,
351 }),
352 Err(err) if status == StatusCode::OK => {
353 Err(HttpClientError::DeserializeResponseBody(err.to_string()))
354 }
355 Err(_) => Err(HttpClientError::BadStatus(status)),
356 }?;
357
358 R::parse_from_bytes(resp.get().as_bytes())
359 .map_err(|err| HttpClientError::DeserializeResponseBody(err.to_string()))
360 }
361
362 pub async fn send(self) -> HttpClientResult<R> {
364 match self.do_send().await {
365 Ok(resp) => Ok(resp),
366 Err(HttpClientError::BadStatus(StatusCode::TOO_MANY_REQUESTS)) => {
367 let mut retry_delay = RETRY_INITIAL_DELAY;
368
369 for _ in 0..RETRY_COUNT {
370 tokio::time::sleep(retry_delay).await;
371
372 match self.do_send().await {
373 Ok(resp) => return Ok(resp),
374 Err(HttpClientError::BadStatus(StatusCode::TOO_MANY_REQUESTS)) => {
375 retry_delay =
376 Duration::from_secs_f32(retry_delay.as_secs_f32() * RETRY_FACTOR);
377 continue;
378 }
379 Err(err) => return Err(err),
380 }
381 }
382
383 Err(HttpClientError::BadStatus(StatusCode::TOO_MANY_REQUESTS))
384 }
385 Err(err) => Err(err),
386 }
387 }
388}