1use crate::api::{Api, serialize_api_name};
2use crate::error::{TushareError, TushareResult};
3use crate::logging::{LogConfig, LogLevel, Logger};
4use crate::types::{TushareEntityList, TushareRequest, TushareResponse};
5use reqwest::Client;
6use serde::Serialize;
7use serde_json;
8use std::collections::HashMap;
9use std::time::{Duration, Instant};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12#[derive(Debug, Clone)]
14pub struct HttpClientConfig {
15 pub connect_timeout: Duration,
17 pub timeout: Duration,
19 pub pool_max_idle_per_host: usize,
21 pub pool_idle_timeout: Duration,
23 pub user_agent: Option<String>,
25 pub tcp_nodelay: bool,
27 pub tcp_keepalive: Option<Duration>,
29}
30
31impl Default for HttpClientConfig {
32 fn default() -> Self {
33 Self {
34 connect_timeout: Duration::from_secs(10),
35 timeout: Duration::from_secs(30),
36 pool_max_idle_per_host: 20, pool_idle_timeout: Duration::from_secs(90), user_agent: Some("tushare-api-rust/1.0.0".to_string()),
39 tcp_nodelay: true, tcp_keepalive: Some(Duration::from_secs(60)), }
42 }
43}
44
45impl HttpClientConfig {
46 pub fn new() -> Self {
48 Self::default()
49 }
50
51 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
53 self.connect_timeout = timeout;
54 self
55 }
56
57 pub fn with_timeout(mut self, timeout: Duration) -> Self {
59 self.timeout = timeout;
60 self
61 }
62
63 pub fn with_pool_max_idle_per_host(mut self, max_idle: usize) -> Self {
65 self.pool_max_idle_per_host = max_idle;
66 self
67 }
68
69 pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self {
71 self.pool_idle_timeout = timeout;
72 self
73 }
74
75 pub fn with_user_agent<S: Into<String>>(mut self, user_agent: S) -> Self {
77 self.user_agent = Some(user_agent.into());
78 self
79 }
80
81 pub fn with_tcp_nodelay(mut self, enabled: bool) -> Self {
83 self.tcp_nodelay = enabled;
84 self
85 }
86
87 pub fn with_tcp_keepalive(mut self, duration: Option<Duration>) -> Self {
89 self.tcp_keepalive = duration;
90 self
91 }
92
93 pub(crate) fn build_client(&self) -> Result<Client, reqwest::Error> {
95 let mut builder = Client::builder()
96 .connect_timeout(self.connect_timeout)
97 .timeout(self.timeout)
98 .pool_max_idle_per_host(self.pool_max_idle_per_host)
99 .pool_idle_timeout(self.pool_idle_timeout)
100 .tcp_nodelay(self.tcp_nodelay);
101
102 if let Some(ref user_agent) = self.user_agent {
103 builder = builder.user_agent(user_agent);
104 }
105
106 if let Some(keepalive) = self.tcp_keepalive {
107 builder = builder.tcp_keepalive(keepalive);
108 }
109
110 builder.build()
111 }
112}
113
114#[derive(Debug)]
116struct ApiNameRef<'a>(&'a Api);
117
118impl<'a> Serialize for ApiNameRef<'a> {
119 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
120 where
121 S: serde::Serializer,
122 {
123 serialize_api_name(self.0, serializer)
124 }
125}
126
127#[derive(Debug, Serialize)]
128struct InternalTushareRequest<'a> {
129 api_name: ApiNameRef<'a>,
130 token: &'a str,
131 params: &'a HashMap<String, String>,
132 fields: &'a [String],
133}
134
135#[derive(Debug)]
137pub struct TushareClient {
138 token: String,
139 client: Client,
140 logger: Logger,
141}
142
143#[derive(Debug)]
145pub struct TushareClientBuilder {
146 token: Option<String>,
147 http_config: HttpClientConfig,
148 log_config: LogConfig,
149}
150
151impl TushareClientBuilder {
152 pub fn new() -> Self {
153 Self {
154 token: None,
155 http_config: HttpClientConfig::default(),
156 log_config: LogConfig::default(),
157 }
158 }
159
160 pub fn with_token(mut self, token: &str) -> Self {
161 self.token = Some(token.to_string());
162 self
163 }
164
165 pub fn with_connect_timeout(mut self, connect_timeout: Duration) -> Self {
166 self.http_config = self.http_config.with_connect_timeout(connect_timeout);
167 self
168 }
169
170 pub fn with_timeout(mut self, timeout: Duration) -> Self {
171 self.http_config = self.http_config.with_timeout(timeout);
172 self
173 }
174
175 pub fn with_http_config(mut self, http_config: HttpClientConfig) -> Self {
177 self.http_config = http_config;
178 self
179 }
180
181 pub fn with_pool_max_idle_per_host(mut self, max_idle: usize) -> Self {
183 self.http_config = self.http_config.with_pool_max_idle_per_host(max_idle);
184 self
185 }
186
187 pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self {
189 self.http_config = self.http_config.with_pool_idle_timeout(timeout);
190 self
191 }
192
193 pub fn with_log_config(mut self, log_config: LogConfig) -> Self {
194 self.log_config = log_config;
195 self
196 }
197
198 pub fn with_log_level(mut self, level: LogLevel) -> Self {
200 self.log_config.level = level;
201 self
202 }
203
204 pub fn log_requests(mut self, enabled: bool) -> Self {
206 self.log_config.log_requests = enabled;
207 self
208 }
209
210 pub fn log_responses(mut self, enabled: bool) -> Self {
212 self.log_config.log_responses = enabled;
213 self
214 }
215
216 pub fn log_sensitive_data(mut self, enabled: bool) -> Self {
218 self.log_config.log_sensitive_data = enabled;
219 self
220 }
221
222 pub fn log_performance(mut self, enabled: bool) -> Self {
224 self.log_config.log_performance = enabled;
225 self
226 }
227
228 pub fn build(self) -> TushareResult<TushareClient> {
229 let token = self.token.ok_or(TushareError::InvalidToken)?;
230
231 let client = self
232 .http_config
233 .build_client()
234 .map_err(TushareError::HttpError)?;
235
236 Ok(TushareClient {
237 token,
238 client,
239 logger: Logger::new(self.log_config),
240 })
241 }
242}
243
244impl TushareClient {
245 pub fn builder() -> TushareClientBuilder {
247 TushareClientBuilder::new()
248 }
249
250 pub(crate) fn logger(&self) -> &Logger {
251 &self.logger
252 }
253
254 pub fn new(token: &str) -> Self {
268 Self::with_timeout(token, Duration::from_secs(10), Duration::from_secs(30))
269 }
270
271 pub fn from_env() -> TushareResult<Self> {
287 let token = std::env::var("TUSHARE_TOKEN")
288 .map_err(|_| TushareError::InvalidToken)?
289 .trim()
290 .to_string();
291
292 if token.is_empty() {
293 return Err(TushareError::InvalidToken);
294 }
295
296 Ok(Self::new(&token))
297 }
298
299 pub fn from_env_with_timeout(
324 connect_timeout: Duration,
325 timeout: Duration,
326 ) -> TushareResult<Self> {
327 let token = std::env::var("TUSHARE_TOKEN")
328 .map_err(|_| TushareError::InvalidToken)?
329 .trim()
330 .to_string();
331
332 if token.is_empty() {
333 return Err(TushareError::InvalidToken);
334 }
335
336 Ok(Self::with_timeout(&token, connect_timeout, timeout))
337 }
338
339 pub fn with_timeout(token: &str, connect_timeout: Duration, timeout: Duration) -> Self {
365 Self::try_with_timeout(token, connect_timeout, timeout)
366 .expect("failed to build HTTP client with the given configuration")
367 }
368
369 pub fn try_with_timeout(
388 token: &str,
389 connect_timeout: Duration,
390 timeout: Duration,
391 ) -> TushareResult<Self> {
392 let http_config = HttpClientConfig::new()
393 .with_connect_timeout(connect_timeout)
394 .with_timeout(timeout);
395
396 let client = http_config.build_client()?;
397
398 Ok(TushareClient {
399 token: token.to_string(),
400 client,
401 logger: Logger::new(LogConfig::default()),
402 })
403 }
404
405 pub async fn call_api<T>(&self, request: &T) -> TushareResult<TushareResponse>
436 where
437 for<'a> &'a T: TryInto<TushareRequest>,
438 for<'a> <&'a T as TryInto<TushareRequest>>::Error: Into<TushareError>,
439 {
440 let request = request.try_into().map_err(Into::into)?;
441 let request_id = generate_request_id();
442 self.call_api_inner_with_request_id(&request_id, &request)
443 .await
444 }
445
446 pub(crate) async fn call_api_request(
447 &self,
448 request: &TushareRequest,
449 ) -> TushareResult<TushareResponse> {
450 let request_id = generate_request_id();
451 self.call_api_inner_with_request_id(&request_id, request)
452 .await
453 }
454
455 pub(crate) async fn call_api_request_with_request_id(
456 &self,
457 request_id: &str,
458 request: &TushareRequest,
459 ) -> TushareResult<TushareResponse> {
460 self.call_api_inner_with_request_id(request_id, request)
461 .await
462 }
463
464 async fn call_api_inner_with_request_id(
465 &self,
466 request_id: &str,
467 request: &TushareRequest,
468 ) -> TushareResult<TushareResponse> {
469 let start_time = Instant::now();
470
471 self.log_request_start(request_id, request);
472
473 let response_text = self
474 .send_http_request(request_id, request, &start_time)
475 .await?;
476
477 let tushare_response = self.parse_response(request_id, &response_text, &start_time)?;
478
479 self.validate_and_log_response(request_id, tushare_response, &start_time)
480 }
481
482 fn log_request_start(&self, request_id: &str, request: &TushareRequest) {
483 self.logger.log_api_start(
484 request_id,
485 &request.api_name.name(),
486 request.params.len(),
487 request.fields.len(),
488 );
489
490 let token_preview = if self.logger.config().log_sensitive_data {
491 Some(format!(
492 "token: {}***",
493 &self.token[..self.token.len().min(8)]
494 ))
495 } else {
496 None
497 };
498
499 self.logger.log_request_details(
500 request_id,
501 &request.api_name.name(),
502 &format!("{:?}", request.params),
503 &format!("{:?}", request.fields),
504 token_preview.as_deref(),
505 );
506 }
507
508 async fn send_http_request(
509 &self,
510 request_id: &str,
511 request: &TushareRequest,
512 start_time: &Instant,
513 ) -> TushareResult<String> {
514 let internal_request = InternalTushareRequest {
515 api_name: ApiNameRef(&request.api_name),
516 token: &self.token,
517 params: &request.params,
518 fields: &request.fields,
519 };
520
521 self.logger.log_http_request(request_id);
522
523 let response = self
524 .client
525 .post("http://api.tushare.pro")
526 .json(&internal_request)
527 .send()
528 .await
529 .map_err(|e| {
530 self.logger
531 .log_http_error(request_id, start_time.elapsed(), &e.to_string());
532 e
533 })?;
534
535 let status = response.status();
536 self.logger.log_http_response(request_id, status.as_u16());
537
538 let response_text = response.text().await.map_err(|e| {
539 self.logger
540 .log_response_read_error(request_id, start_time.elapsed(), &e.to_string());
541 e
542 })?;
543
544 self.logger.log_raw_response(request_id, &response_text);
545 Ok(response_text)
546 }
547
548 fn parse_response(
549 &self,
550 request_id: &str,
551 response_text: &str,
552 start_time: &Instant,
553 ) -> TushareResult<TushareResponse> {
554 serde_json::from_str(response_text).map_err(|e| {
555 self.logger.log_json_parse_error(
556 request_id,
557 start_time.elapsed(),
558 &e.to_string(),
559 response_text,
560 );
561 TushareError::from(e)
562 })
563 }
564
565 fn validate_and_log_response(
566 &self,
567 request_id: &str,
568 response: TushareResponse,
569 start_time: &Instant,
570 ) -> TushareResult<TushareResponse> {
571 let elapsed = start_time.elapsed();
572
573 if response.code != 0 {
574 let msg = response.msg.as_deref().unwrap_or("<no message>");
575 let message = format!("error code: {}, error msg: {}", response.code, msg);
576 self.logger
577 .log_api_error(request_id, elapsed, response.code, &message);
578 return Err(TushareError::ApiError {
579 code: response.code,
580 message,
581 });
582 }
583
584 let data_count = response
585 .data
586 .as_ref()
587 .map(|data| data.items.len())
588 .unwrap_or(0);
589 self.logger.log_api_success(request_id, elapsed, data_count);
590
591 self.logger.log_response_details(
592 request_id,
593 &response.request_id,
594 &format!("{:?}", response.data.as_ref().map(|d| &d.fields)),
595 );
596
597 Ok(response)
598 }
599
600 pub async fn call_api_as<T, R>(&self, request: R) -> TushareResult<TushareEntityList<T>>
627 where
628 T: crate::traits::FromTushareData,
629 for<'a> &'a R: TryInto<TushareRequest>,
630 for<'a> <&'a R as TryInto<TushareRequest>>::Error: Into<TushareError>,
631 {
632 let response = self.call_api(&request).await?;
633 TushareEntityList::try_from(response).map_err(Into::into)
634 }
635}
636
637pub(crate) fn generate_request_id() -> String {
639 let timestamp = SystemTime::now()
640 .duration_since(UNIX_EPOCH)
641 .unwrap_or_default()
642 .as_nanos();
643 format!("req_{}", timestamp)
644}
645
646mod tests {
647 use crate::{Api, TushareClient, TushareRequest, fields, params};
648
649 #[tokio::test]
650 async fn test() {
651 unsafe {
652 std::env::set_var("TUSHARE_TOKEN", "xxxx");
653 }
654 let client = TushareClient::from_env().unwrap();
655 let response = client
656 .call_api(
657 &r#"
658 {
659 "api_name": "stock_basic",
660 "params": { "list_stauts": "L"},
661 "fields": [ "ts_code",
662 "symbol",
663 "name",
664 "area",
665 "industry",
666 "list_date",
667 "exchange",
668 "market"]
669 }
670 "#,
671 )
672 .await;
673 println!("resposne = {:?}", response);
674 }
682}