1use reqwest::Client;
2use std::time::{Duration, Instant};
3use std::collections::HashMap;
4use crate::error::{TushareError, TushareResult};
5use crate::types::{TushareRequest, TushareResponse, TushareEntityList};
6use crate::api::{Api, serialize_api_name};
7use crate::logging::{LogConfig, LogLevel, Logger};
8use serde::{Serialize};
9use serde_json;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12#[derive(Debug, Clone)]
14pub struct HttpClientConfig {
15 pub connect_timeout: Duration,
17 pub timeout: Duration,
19 pub pool_max_idle_per_host: usize,
21 pub pool_idle_timeout: Duration,
23 pub user_agent: Option<String>,
25 pub tcp_nodelay: bool,
27 pub tcp_keepalive: Option<Duration>,
29}
30
31impl Default for HttpClientConfig {
32 fn default() -> Self {
33 Self {
34 connect_timeout: Duration::from_secs(10),
35 timeout: Duration::from_secs(30),
36 pool_max_idle_per_host: 20, pool_idle_timeout: Duration::from_secs(90), user_agent: Some("tushare-api-rust/1.0.0".to_string()),
39 tcp_nodelay: true, tcp_keepalive: Some(Duration::from_secs(60)), }
42 }
43}
44
45impl HttpClientConfig {
46 pub fn new() -> Self {
48 Self::default()
49 }
50
51 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
53 self.connect_timeout = timeout;
54 self
55 }
56
57 pub fn with_timeout(mut self, timeout: Duration) -> Self {
59 self.timeout = timeout;
60 self
61 }
62
63 pub fn with_pool_max_idle_per_host(mut self, max_idle: usize) -> Self {
65 self.pool_max_idle_per_host = max_idle;
66 self
67 }
68
69 pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self {
71 self.pool_idle_timeout = timeout;
72 self
73 }
74
75 pub fn with_user_agent<S: Into<String>>(mut self, user_agent: S) -> Self {
77 self.user_agent = Some(user_agent.into());
78 self
79 }
80
81 pub fn with_tcp_nodelay(mut self, enabled: bool) -> Self {
83 self.tcp_nodelay = enabled;
84 self
85 }
86
87 pub fn with_tcp_keepalive(mut self, duration: Option<Duration>) -> Self {
89 self.tcp_keepalive = duration;
90 self
91 }
92
93 pub(crate) fn build_client(&self) -> Result<Client, reqwest::Error> {
95 let mut builder = Client::builder()
96 .connect_timeout(self.connect_timeout)
97 .timeout(self.timeout)
98 .pool_max_idle_per_host(self.pool_max_idle_per_host)
99 .pool_idle_timeout(self.pool_idle_timeout)
100 .tcp_nodelay(self.tcp_nodelay);
101
102 if let Some(ref user_agent) = self.user_agent {
103 builder = builder.user_agent(user_agent);
104 }
105
106 if let Some(keepalive) = self.tcp_keepalive {
107 builder = builder.tcp_keepalive(keepalive);
108 }
109
110 builder.build()
111 }
112}
113
114#[derive(Debug, Serialize)]
116struct InternalTushareRequest {
117 #[serde(serialize_with = "serialize_api_name")]
118 api_name: Api,
119 token: String,
120 params: HashMap<String, String>,
121 fields: Vec<String>,
122}
123
124#[derive(Debug)]
126pub struct TushareClient {
127 token: String,
128 client: Client,
129 logger: Logger,
130}
131
132#[derive(Debug)]
134pub struct TushareClientBuilder {
135 token: Option<String>,
136 http_config: HttpClientConfig,
137 log_config: LogConfig,
138}
139
140impl TushareClientBuilder {
141 pub fn new() -> Self {
142 Self {
143 token: None,
144 http_config: HttpClientConfig::default(),
145 log_config: LogConfig::default(),
146 }
147 }
148
149 pub fn with_token(mut self, token: &str) -> Self {
150 self.token = Some(token.to_string());
151 self
152 }
153
154 pub fn with_connect_timeout(mut self, connect_timeout: Duration) -> Self {
155 self.http_config = self.http_config.with_connect_timeout(connect_timeout);
156 self
157 }
158
159 pub fn with_timeout(mut self, timeout: Duration) -> Self {
160 self.http_config = self.http_config.with_timeout(timeout);
161 self
162 }
163
164 pub fn with_http_config(mut self, http_config: HttpClientConfig) -> Self {
166 self.http_config = http_config;
167 self
168 }
169
170 pub fn with_pool_max_idle_per_host(mut self, max_idle: usize) -> Self {
172 self.http_config = self.http_config.with_pool_max_idle_per_host(max_idle);
173 self
174 }
175
176 pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self {
178 self.http_config = self.http_config.with_pool_idle_timeout(timeout);
179 self
180 }
181
182 pub fn with_log_config(mut self, log_config: LogConfig) -> Self {
183 self.log_config = log_config;
184 self
185 }
186
187 pub fn with_log_level(mut self, level: LogLevel) -> Self {
189 self.log_config.level = level;
190 self
191 }
192
193 pub fn log_requests(mut self, enabled: bool) -> Self {
195 self.log_config.log_requests = enabled;
196 self
197 }
198
199 pub fn log_responses(mut self, enabled: bool) -> Self {
201 self.log_config.log_responses = enabled;
202 self
203 }
204
205 pub fn log_sensitive_data(mut self, enabled: bool) -> Self {
207 self.log_config.log_sensitive_data = enabled;
208 self
209 }
210
211 pub fn log_performance(mut self, enabled: bool) -> Self {
213 self.log_config.log_performance = enabled;
214 self
215 }
216
217 pub fn build(self) -> TushareResult<TushareClient> {
218 let token = self.token.ok_or(TushareError::InvalidToken)?;
219
220 let client = self.http_config.build_client()
221 .map_err(TushareError::HttpError)?;
222
223 Ok(TushareClient {
224 token,
225 client,
226 logger: Logger::new(self.log_config),
227 })
228 }
229}
230
231impl TushareClient {
232 pub fn builder() -> TushareClientBuilder {
234 TushareClientBuilder::new()
235 }
236
237
238
239 pub fn new(token: &str) -> Self {
253 Self::with_timeout(token, Duration::from_secs(10), Duration::from_secs(30))
254 }
255
256 pub fn from_env() -> TushareResult<Self> {
272 let token = std::env::var("TUSHARE_TOKEN")
273 .map_err(|_| TushareError::InvalidToken)?
274 .trim()
275 .to_string();
276
277 if token.is_empty() {
278 return Err(TushareError::InvalidToken);
279 }
280
281 Ok(Self::new(&token))
282 }
283
284 pub fn from_env_with_timeout(connect_timeout: Duration, timeout: Duration) -> TushareResult<Self> {
309 let token = std::env::var("TUSHARE_TOKEN")
310 .map_err(|_| TushareError::InvalidToken)?
311 .trim()
312 .to_string();
313
314 if token.is_empty() {
315 return Err(TushareError::InvalidToken);
316 }
317
318 Ok(Self::with_timeout(&token, connect_timeout, timeout))
319 }
320
321 pub fn with_timeout(token: &str, connect_timeout: Duration, timeout: Duration) -> Self {
342 let http_config = HttpClientConfig::new()
343 .with_connect_timeout(connect_timeout)
344 .with_timeout(timeout);
345
346 let client = http_config.build_client()
347 .expect("Failed to create HTTP client");
348
349 TushareClient {
350 token: token.to_string(),
351 client,
352 logger: Logger::new(LogConfig::default()),
353 }
354 }
355
356 pub async fn call_api<T>(&self, request: T) -> TushareResult<TushareResponse>
387 where
388 T: TryInto<TushareRequest>,
389 <T as TryInto<TushareRequest>>::Error: Into<TushareError>,
390 {
391 let request_id = generate_request_id();
392 let start_time = Instant::now();
393 let request = request
394 .try_into()
395 .map_err(Into::into)?;
396 self.logger.log_api_start(
398 &request_id,
399 &request.api_name.name(),
400 request.params.len(),
401 request.fields.len()
402 );
403
404 let token_preview_string = if self.logger.config().log_sensitive_data {
406 Some(format!("token: {}***", &self.token[..self.token.len().min(8)]))
407 } else {
408 None
409 };
410
411 self.logger.log_request_details(
412 &request_id,
413 &request.api_name.name(),
414 &format!("{:?}", request.params),
415 &format!("{:?}", request.fields),
416 token_preview_string.as_deref()
417 );
418
419 let internal_request = InternalTushareRequest {
420 api_name: request.api_name,
421 token: self.token.clone(),
422 params: request.params,
423 fields: request.fields,
424 };
425
426 self.logger.log_http_request(&request_id);
427
428 let response = self.client
429 .post("http://api.tushare.pro")
430 .json(&internal_request)
431 .send()
432 .await
433 .map_err(|e| {
434 let elapsed = start_time.elapsed();
435 self.logger.log_http_error(&request_id, elapsed, &e.to_string());
436 e
437 })?;
438
439 let status = response.status();
440 self.logger.log_http_response(&request_id, status.as_u16());
441
442 let response_text = response.text().await
443 .map_err(|e| {
444 let elapsed = start_time.elapsed();
445 self.logger.log_response_read_error(&request_id, elapsed, &e.to_string());
446 e
447 })?;
448 self.logger.log_raw_response(&request_id, &response_text);
449
450 let tushare_response: TushareResponse = serde_json::from_str(&response_text)
451 .map_err(|e| {
452 let elapsed = start_time.elapsed();
453 self.logger.log_json_parse_error(&request_id, elapsed, &e.to_string(), &response_text);
454 e
455 })?;
456
457 let elapsed = start_time.elapsed();
458
459 if tushare_response.code != 0 {
460 let message = format!("error code: {}, error msg: {}", tushare_response.code, tushare_response.msg.clone().unwrap_or_default());
461 self.logger.log_api_error(&request_id, elapsed, tushare_response.code, &message);
462 return Err(TushareError::ApiError {
463 code: tushare_response.code,
464 message
465 });
466 }
467
468 self.logger.log_api_success(&request_id, elapsed, tushare_response.data.clone().map(|data| data.items.len()).unwrap_or(0));
470
471 self.logger.log_response_details(
473 &request_id,
474 &tushare_response.request_id,
475 &format!("{:?}", tushare_response.data.as_ref().map(|d| &d.fields))
476 );
477
478 Ok(tushare_response)
479 }
480
481 pub async fn call_api_as<T, R>(&self, request: R) -> TushareResult<TushareEntityList<T>>
508 where
509 T: crate::traits::FromTushareData,
510 R: TryInto<TushareRequest>,
511 <R as TryInto<TushareRequest>>::Error: Into<TushareError>,
512 {
513 let response = self.call_api(request).await?;
514 TushareEntityList::try_from(response).map_err(Into::into)
515 }
516 }
517
518 fn generate_request_id() -> String {
520 let timestamp = SystemTime::now()
521 .duration_since(UNIX_EPOCH)
522 .unwrap_or_default()
523 .as_nanos();
524 format!("req_{}", timestamp)
525 }
526
527 mod tests {
528 use crate::{fields, params, Api, TushareClient, TushareRequest};
529
530 #[tokio::test]
531 async fn test() {
532 unsafe { std::env::set_var("TUSHARE_TOKEN", "xxxx"); }
533 let client = TushareClient::from_env().unwrap();
534 let response = client.call_api(
535 r#"
536 {
537 "api_name": "stock_basic",
538 "params": { "list_stauts": "L"},
539 "fields": [ "ts_code",
540 "symbol",
541 "name",
542 "area",
543 "industry",
544 "list_date",
545 "exchange",
546 "market"]
547 }
548 "#
549 ).await;
550 println!("resposne = {:?}", response);
551 }
559}