1use super::bucket::*;
17use super::config::ConfigHolder;
18use super::credential::{CommonCredentials, CommonCredentialsProvider, Credentials, CredentialsProvider};
19use super::error::{ErrorResponse, GenericError, TosError};
20use super::internal::{auto_recognize_content_type, build_certificate, build_identity, check_bucket_and_key, check_need_retry, exceed_high_latency_log_threshold, get_request_url, parse_json, sleep_for_retry, trans_header_value, AdditionalContext, InputTranslator, OutputParser};
21use crate::auth::{pre_signed_policy_url, pre_signed_post_signature, pre_signed_url, sign_header, PreSignedPolicyURLInput, PreSignedPolicyURLOutput, PreSignedPostSignatureInput, PreSignedPostSignatureOutput, PreSignedURLInput, PreSignedURLOutput, SignerAPI};
22use crate::common::{get_common_log_target, Meta, RequestInfo, RequestInfoTrait};
23use crate::constant::*;
24use crate::enumeration::HttpMethodType::HttpMethodHead;
25use crate::http::{HttpRequest, HttpResponse, RequestContext};
26use crate::multipart::*;
27use crate::object::*;
28use crate::reader::{InternalReader, MultiBytes, MultifunctionalReader};
29use arc_swap::ArcSwap;
30use bytes::Bytes;
31use reqwest::blocking::{Body, Client, RequestBuilder};
32use reqwest::{redirect, Proxy};
33use std::collections::HashMap;
34use std::fmt::Debug;
35use std::fs::File;
36use std::io::{Cursor, Read};
37use std::marker::PhantomData;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tracing::log::{info, warn};
42
43#[derive(Debug, Clone, Default)]
44pub struct TosClientBuilder<P, C>
45{
46 ak: String,
47 sk: String,
48 security_token: String,
49 region: String,
50 endpoint: String,
51 credentials_provider: Option<P>,
52 config_holder: ConfigHolder,
53 c: PhantomData<C>,
54}
55
56impl<P, C> TosClientBuilder<P, C>
57where
58 P: CredentialsProvider<C> + Debug,
59 C: Credentials + Debug,
60{
61 pub fn build(mut self) -> Result<TosClientImpl<P, C>, TosError> {
62 self.config_holder.check(self.endpoint, self.region)?;
63 self.config_holder.gen_user_agent();
64 let mut client = Client::builder()
65 .user_agent(self.config_holder.user_agent.as_str())
66 .tcp_nodelay(true)
67 .tcp_keepalive(None)
68 .no_gzip()
69 .no_deflate()
70 .no_brotli()
71 .connect_timeout(Duration::from_millis(self.config_holder.connection_timeout as u64))
72 .pool_idle_timeout(Duration::from_millis(self.config_holder.idle_connection_time as u64))
73 .pool_max_idle_per_host(self.config_holder.max_connections as usize);
74 if self.config_holder.request_timeout > 0 {
75 client = client.timeout(Duration::from_millis(self.config_holder.request_timeout as u64));
76 }
77
78 if self.config_holder.follow_redirect_times > 0 {
79 client = client.redirect(redirect::Policy::limited(self.config_holder.follow_redirect_times as usize));
80 } else {
81 client = client.redirect(redirect::Policy::none());
82 }
83
84 if self.config_holder.proxy_host != "" {
85 let mut proxy_url = self.config_holder.proxy_host.as_str();
86 while proxy_url.len() > 0 && proxy_url.ends_with("/") {
87 proxy_url = &proxy_url[0..proxy_url.len() - 1];
88 }
89
90 if proxy_url != "" {
91 let mut proxy_url = proxy_url.to_lowercase();
92 if !proxy_url.starts_with(SCHEMA_HTTP) && !proxy_url.starts_with(SCHEMA_HTTPS) {
93 proxy_url = format!("{}{}", SCHEMA_HTTP, proxy_url);
94 }
95 if self.config_holder.proxy_port >= 0 {
96 proxy_url = format!("{}:{}", proxy_url, self.config_holder.proxy_port);
97 }
98
99 let (domain, schema, _) = self.config_holder.parse_domain(proxy_url.as_str())?;
100 if self.config_holder.proxy_username != "" && self.config_holder.proxy_password != "" {
101 proxy_url = format!("{}://{}:{}@{}", schema, self.config_holder.proxy_username, self.config_holder.proxy_password, domain);
102 } else {
103 proxy_url = format!("{}://{}", schema, domain);
104 }
105 match Proxy::http(proxy_url.as_str()) {
106 Err(e) => return Err(TosError::client_error_with_cause("build http proxy error", GenericError::DefaultError(e.to_string()))),
107 Ok(proxy) => {
108 client = client.proxy(proxy);
109 }
110 }
111
112 match Proxy::https(proxy_url) {
113 Err(e) => return Err(TosError::client_error_with_cause("build https proxy error", GenericError::DefaultError(e.to_string()))),
114 Ok(proxy) => {
115 client = client.proxy(proxy);
116 }
117 }
118 } else {
119 client = client.no_proxy();
120 }
121 } else {
122 client = client.no_proxy();
123 }
124
125 if !self.config_holder.enable_verify_ssl {
126 client = client.danger_accept_invalid_certs(true);
127 #[cfg(feature = "use-native-tls")]
128 {
129 client = client.danger_accept_invalid_hostnames(true);
130 }
131 }
132
133 #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
134 if self.config_holder.ca_crt != "" {
135 client = client.add_root_certificate(build_certificate(self.config_holder.ca_crt.as_str())?);
136 }
137
138 #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
139 if self.config_holder.client_crt != "" && self.config_holder.client_key != "" {
140 client = client.identity(build_identity(self.config_holder.client_crt.as_str(), self.config_holder.client_key.as_str())?);
141 }
142
143 let cp;
144 let mut credentials_can_refresh = false;
145 match self.credentials_provider {
146 Some(p) => {
147 cp = p;
148 }
149 None => {
150 match C::new(self.ak, self.sk, self.security_token) {
151 Err(ex) => return Err(TosError::client_error_with_cause("create credentials error",
152 GenericError::DefaultError(ex.to_string()))),
153 Ok(c) => {
154 match P::new(c) {
155 Err(ex) => return Err(TosError::client_error_with_cause("create credentials provider error",
156 GenericError::DefaultError(ex.to_string()))),
157 Ok(p) => {
158 credentials_can_refresh = true;
159 cp = p;
160 }
161 }
162 }
163 }
164 }
165 }
166
167 match client.build() {
168 Ok(client) => {
169 Ok(TosClientImpl {
170 client,
171 config_holder: ArcSwap::from(Arc::new(self.config_holder)),
172 credentials_provider: ArcSwap::from(Arc::new(cp)),
173 credentials_can_refresh,
174 c: self.c,
175 })
176 }
177 Err(e) => {
178 Err(TosError::client_error_with_cause("build tos client error", GenericError::DefaultError(e.to_string())))
179 }
180 }
181 }
182
183 pub fn build_as_trait(self) -> Result<impl TosClient, TosError> {
184 let client = self.build()?;
185 Ok(client)
186 }
187
188 pub fn ak(mut self, ak: impl Into<String>) -> Self {
189 self.ak = ak.into();
190 self
191 }
192
193 pub fn sk(mut self, sk: impl Into<String>) -> Self {
194 self.sk = sk.into();
195 self
196 }
197
198 pub fn security_token(mut self, security_token: impl Into<String>) -> Self {
199 self.security_token = security_token.into();
200 self
201 }
202
203 pub fn credentials_provider(mut self, p: P) -> Self {
204 self.credentials_provider = Some(p);
205 self
206 }
207
208 pub fn region(mut self, region: impl Into<String>) -> Self {
209 self.region = region.into();
210 self
211 }
212
213 pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
214 self.endpoint = endpoint.into();
215 self
216 }
217
218 pub fn request_timeout(mut self, request_timeout: isize) -> Self {
219 if request_timeout > 0 {
220 self.config_holder.request_timeout = request_timeout;
221 }
222 self
223 }
224
225 pub fn connection_timeout(mut self, connection_timeout: isize) -> Self {
226 if connection_timeout > 0 {
227 self.config_holder.connection_timeout = connection_timeout;
228 }
229 self
230 }
231
232 pub fn max_connections(mut self, max_connections: isize) -> Self {
233 if max_connections > 0 {
234 self.config_holder.max_connections = max_connections;
235 }
236 self
237 }
238 pub fn idle_connection_time(mut self, idle_connection_time: isize) -> Self {
239 if idle_connection_time > 0 {
240 self.config_holder.idle_connection_time = idle_connection_time;
241 }
242 self
243 }
244
245 pub fn enable_crc(mut self, enable_crc: bool) -> Self {
246 self.config_holder.enable_crc = enable_crc;
247 self
248 }
249
250 pub fn enable_verify_ssl(mut self, enable_verify_ssl: bool) -> Self {
251 self.config_holder.enable_verify_ssl = enable_verify_ssl;
252 self
253 }
254
255 pub fn max_retry_count(mut self, max_retry_count: isize) -> Self {
256 self.config_holder.max_retry_count = max_retry_count;
257 self
258 }
259 pub fn auto_recognize_content_type(mut self, auto_recognize_content_type: bool) -> Self {
260 self.config_holder.auto_recognize_content_type = auto_recognize_content_type;
261 self
262 }
263 pub fn is_custom_domain(mut self, is_custom_domain: bool) -> Self {
264 self.config_holder.is_custom_domain = is_custom_domain;
265 self
266 }
267 pub fn proxy_host(mut self, proxy_host: impl Into<String>) -> Self {
268 self.config_holder.proxy_host = proxy_host.into();
269 self
270 }
271 pub fn proxy_port(mut self, proxy_host: isize) -> Self {
272 self.config_holder.proxy_port = proxy_host.into();
273 self
274 }
275 pub fn proxy_username(mut self, proxy_username: impl Into<String>) -> Self {
276 self.config_holder.proxy_username = proxy_username.into();
277 self
278 }
279 pub fn proxy_password(mut self, proxy_password: impl Into<String>) -> Self {
280 self.config_holder.proxy_password = proxy_password.into();
281 self
282 }
283 pub fn disable_encoding_meta(mut self, disable_encoding_meta: bool) -> Self {
284 self.config_holder.disable_encoding_meta = disable_encoding_meta;
285 self
286 }
287 pub fn expect_100_continue_threshold(mut self, expect_100_continue_threshold: isize) -> Self {
288 self.config_holder.expect_100_continue_threshold = expect_100_continue_threshold;
289 self
290 }
291 pub fn high_latency_log_threshold(mut self, high_latency_log_threshold: isize) -> Self {
292 self.config_holder.high_latency_log_threshold = high_latency_log_threshold;
293 self
294 }
295 pub fn user_agent_product_name(mut self, user_agent_product_name: impl Into<String>) -> Self {
296 self.config_holder.user_agent_product_name = user_agent_product_name.into();
297 self
298 }
299 pub fn user_agent_soft_name(mut self, user_agent_soft_name: impl Into<String>) -> Self {
300 self.config_holder.user_agent_soft_name = user_agent_soft_name.into();
301 self
302 }
303 pub fn user_agent_soft_version(mut self, user_agent_soft_version: impl Into<String>) -> Self {
304 self.config_holder.user_agent_soft_version = user_agent_soft_version.into();
305 self
306 }
307 pub fn user_agent_customized_key_values(mut self, user_agent_customized_key_values: impl Into<HashMap<String, String>>) -> Self {
308 self.config_holder.user_agent_customized_key_values = Some(user_agent_customized_key_values.into());
309 self
310 }
311
312 pub fn follow_redirect_times(mut self, follow_redirect_times: isize) -> Self {
313 self.config_holder.follow_redirect_times = follow_redirect_times;
314 self
315 }
316 #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
317 pub fn client_crt(mut self, client_crt: impl Into<String>) -> Self {
318 self.config_holder.client_crt = client_crt.into();
319 self
320 }
321 #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
322 pub fn client_key(mut self, client_key: impl Into<String>) -> Self {
323 self.config_holder.client_key = client_key.into();
324 self
325 }
326 #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
327 pub fn ca_crt(mut self, ca_crt: impl Into<String>) -> Self {
328 self.config_holder.ca_crt = ca_crt.into();
329 self
330 }
331}
332
333pub fn builder() -> TosClientBuilder<CommonCredentialsProvider<CommonCredentials>, CommonCredentials> {
334 TosClientBuilder::default()
335}
336
337pub trait ConfigAware {
338 fn is_custom_domain(&self) -> bool;
339}
340
341pub trait TosClient: BucketAPI + ObjectAPI + MultipartAPI + SignerAPI + ConfigAware + Debug {
342 fn refresh_credentials(&self, ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>) -> bool;
343 fn refresh_endpoint_region(&self, endpoint: impl Into<String>, region: impl Into<String>) -> bool;
344}
345
346pub type DefaultTosClient = TosClientImpl<CommonCredentialsProvider<CommonCredentials>, CommonCredentials>;
347
348#[derive(Debug)]
349pub struct TosClientImpl<P, C> {
350 pub(crate) client: Client,
351 pub(crate) config_holder: ArcSwap<ConfigHolder>,
352 pub(crate) credentials_provider: ArcSwap<P>,
353 pub(crate) credentials_can_refresh: bool,
354 c: PhantomData<C>,
355}
356
357
358impl<P, C> BucketAPI for TosClientImpl<P, C>
359where
360 C: Credentials,
361 P: CredentialsProvider<C>,
362{
363 fn create_bucket(&self, input: &CreateBucketInput) -> Result<CreateBucketOutput, TosError> {
364 self.do_request::<_, _, Cursor<String>>(input)
365 }
366
367 fn head_bucket(&self, input: &HeadBucketInput) -> Result<HeadBucketOutput, TosError> {
368 self.do_request::<_, _, Cursor<String>>(input)
369 }
370
371 fn delete_bucket(&self, input: &DeleteBucketInput) -> Result<DeleteBucketOutput, TosError> {
372 self.do_request::<_, _, Cursor<String>>(input)
373 }
374
375 fn list_buckets(&self, input: &ListBucketsInput) -> Result<ListBucketsOutput, TosError> {
376 self.do_request::<_, _, Cursor<String>>(input)
377 }
378}
379
380impl<P, C> ObjectAPI for TosClientImpl<P, C>
381where
382 C: Credentials,
383 P: CredentialsProvider<C>,
384{
385 fn copy_object(&self, input: &CopyObjectInput) -> Result<CopyObjectOutput, TosError> {
386 self.do_request::<_, _, Cursor<String>>(input)
387 }
388
389 fn delete_object(&self, input: &DeleteObjectInput) -> Result<DeleteObjectOutput, TosError> {
390 self.do_request::<_, _, Cursor<String>>(input)
391 }
392
393 fn delete_multi_objects(&self, input: &DeleteMultiObjectsInput) -> Result<DeleteMultiObjectsOutput, TosError> {
394 self.do_request::<DeleteMultiObjectsInput, DeleteMultiObjectsOutput, InternalReader<Cursor<Bytes>>>(input)
395 }
396
397 fn get_object(&self, input: &GetObjectInput) -> Result<GetObjectOutput, TosError> {
398 self.do_request::<_, _, Cursor<String>>(input)
399 }
400
401 fn get_object_to_file(&self, input: &GetObjectToFileInput) -> Result<GetObjectToFileOutput, TosError> {
402 self.do_request::<_, _, Cursor<String>>(input)
403 }
404
405 fn get_object_acl(&self, input: &GetObjectACLInput) -> Result<GetObjectACLOutput, TosError> {
406 self.do_request::<_, _, Cursor<String>>(input)
407 }
408
409 fn head_object(&self, input: &HeadObjectInput) -> Result<HeadObjectOutput, TosError> {
410 self.do_request::<_, _, Cursor<String>>(input)
411 }
412
413 fn append_object<B>(&self, input: &AppendObjectInput<B>) -> Result<AppendObjectOutput, TosError>
414 where
415 B: Read + Send + 'static,
416 {
417 self.do_request(input)
418 }
419
420 fn append_object_from_buffer(&self, input: &AppendObjectFromBufferInput) -> Result<AppendObjectOutput, TosError> {
421 self.do_request::<_, _, InternalReader<MultiBytes>>(input)
422 }
423
424 fn list_objects(&self, input: &ListObjectsInput) -> Result<ListObjectsOutput, TosError> {
425 self.do_request::<_, _, Cursor<String>>(input)
426 }
427
428 fn list_objects_type2(&self, input: &ListObjectsType2Input) -> Result<ListObjectsType2Output, TosError> {
429 if input.list_only_once {
430 return self.do_request::<_, _, Cursor<String>>(input);
431 }
432
433 let mut input = input.clone();
434 if input.max_keys <= 0 {
435 input.max_keys = DEFAULT_MAX_KEYS;
436 }
437 let mut _output: Option<ListObjectsType2Output> = None;
438 loop {
439 let mut temp_output = self.do_request::<ListObjectsType2Input, ListObjectsType2Output, Cursor<String>>(&input)?;
440 if _output.is_none() {
441 _output = Some(temp_output);
442 } else {
443 let output = _output.as_mut().unwrap();
444 output.key_count += temp_output.key_count;
445 output.is_truncated = temp_output.is_truncated;
446 output.next_continuation_token = temp_output.next_continuation_token;
447 output.contents.append(&mut temp_output.contents);
448 output.common_prefixes.append(&mut temp_output.common_prefixes);
449 }
450
451 let output = _output.as_ref().unwrap();
452 if !output.is_truncated || output.contents.len() + output.common_prefixes.len() >= input.max_keys as usize || output.key_count >= input.max_keys {
453 break;
454 }
455 input.continuation_token = output.next_continuation_token.clone();
456 input.max_keys = input.max_keys - output.key_count;
457 }
458
459 Ok(_output.unwrap())
460 }
461
462 fn list_object_versions(&self, input: &ListObjectVersionsInput) -> Result<ListObjectVersionsOutput, TosError> {
463 self.do_request::<_, _, Cursor<String>>(input)
464 }
465
466 fn put_object<B>(&self, input: &PutObjectInput<B>) -> Result<PutObjectOutput, TosError>
467 where
468 B: Read + Send + 'static,
469 {
470 self.do_request(input)
471 }
472
473 fn put_object_from_buffer(&self, input: &PutObjectFromBufferInput) -> Result<PutObjectOutput, TosError> {
474 self.do_request::<_, _, InternalReader<MultiBytes>>(input)
475 }
476
477 fn put_object_from_file(&self, input: &PutObjectFromFileInput) -> Result<PutObjectOutput, TosError> {
478 self.do_request::<_, _, InternalReader<File>>(input)
479 }
480
481 fn put_object_acl(&self, input: &PutObjectACLInput) -> Result<PutObjectACLOutput, TosError> {
482 self.do_request::<_, _, InternalReader<Cursor<Bytes>>>(input)
483 }
484
485 fn set_object_meta(&self, input: &SetObjectMetaInput) -> Result<SetObjectMetaOutput, TosError> {
486 self.do_request::<_, _, Cursor<String>>(input)
487 }
488}
489
490impl<P, C> MultipartAPI for TosClientImpl<P, C>
491where
492 C: Credentials,
493 P: CredentialsProvider<C>,
494{
495 fn create_multipart_upload(&self, input: &CreateMultipartUploadInput) -> Result<CreateMultipartUploadOutput, TosError> {
496 self.do_request::<_, _, Cursor<String>>(input)
497 }
498
499 fn upload_part<B>(&self, input: &UploadPartInput<B>) -> Result<UploadPartOutput, TosError>
500 where
501 B: Read + Send + 'static,
502 {
503 self.do_request(input)
504 }
505
506 fn upload_part_from_buffer(&self, input: &UploadPartFromBufferInput) -> Result<UploadPartOutput, TosError> {
507 self.do_request::<_, _, InternalReader<MultiBytes>>(input)
508 }
509
510 fn upload_part_from_file(&self, input: &UploadPartFromFileInput) -> Result<UploadPartOutput, TosError> {
511 self.do_request::<_, _, InternalReader<File>>(input)
512 }
513
514 fn complete_multipart_upload(&self, input: &CompleteMultipartUploadInput) -> Result<CompleteMultipartUploadOutput, TosError> {
515 self.do_request::<_, _, InternalReader<Cursor<Bytes>>>(input)
516 }
517
518 fn abort_multipart_upload(&self, input: &AbortMultipartUploadInput) -> Result<AbortMultipartUploadOutput, TosError> {
519 self.do_request::<_, _, Cursor<String>>(input)
520 }
521
522 fn upload_part_copy(&self, input: &UploadPartCopyInput) -> Result<UploadPartCopyOutput, TosError> {
523 self.do_request::<_, _, Cursor<String>>(input)
524 }
525
526 fn list_multipart_uploads(&self, input: &ListMultipartUploadsInput) -> Result<ListMultipartUploadsOutput, TosError> {
527 self.do_request::<_, _, Cursor<String>>(input)
528 }
529
530 fn list_parts(&self, input: &ListPartsInput) -> Result<ListPartsOutput, TosError> {
531 self.do_request::<_, _, Cursor<String>>(input)
532 }
533}
534
535impl<P, C> ConfigAware for TosClientImpl<P, C>
536{
537 fn is_custom_domain(&self) -> bool {
538 self.config_holder.load().is_custom_domain
539 }
540}
541
542impl<P, C> SignerAPI for TosClientImpl<P, C>
543where
544 C: Credentials + Debug,
545 P: CredentialsProvider<C> + Debug,
546{
547 fn pre_signed_url(&self, input: &PreSignedURLInput) -> Result<PreSignedURLOutput, TosError> {
548 let cred = self.load_credentials()?;
549 let ak = cred.ak();
550 let sk = cred.sk();
551 let security_token = cred.security_token();
552 pre_signed_url(&self.config_holder, ak, sk, security_token, input)
553 }
554
555 fn pre_signed_post_signature(&self, input: &PreSignedPostSignatureInput) -> Result<PreSignedPostSignatureOutput, TosError> {
556 let cred = self.load_credentials()?;
557 let ak = cred.ak();
558 let sk = cred.sk();
559 let security_token = cred.security_token();
560 pre_signed_post_signature(&self.config_holder, ak, sk, security_token, input)
561 }
562
563 fn pre_signed_policy_url(&self, input: &PreSignedPolicyURLInput) -> Result<PreSignedPolicyURLOutput, TosError> {
564 let cred = self.load_credentials()?;
565 let ak = cred.ak();
566 let sk = cred.sk();
567 let security_token = cred.security_token();
568 pre_signed_policy_url(&self.config_holder, ak, sk, security_token, input)
569 }
570}
571
572impl<P, C> TosClient for TosClientImpl<P, C>
573where
574 P: CredentialsProvider<C> + Debug,
575 C: Credentials + Debug,
576{
577 fn refresh_credentials(&self, ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>) -> bool {
578 if !self.credentials_can_refresh {
579 return false;
580 }
581
582 match C::new(ak, sk, security_token) {
583 Err(_) => false,
584 Ok(c) => {
585 match P::new(c) {
586 Err(_) => false,
587 Ok(p) => {
588 self.credentials_provider.store(Arc::new(p));
589 true
590 }
591 }
592 }
593 }
594 }
595
596 fn refresh_endpoint_region(&self, endpoint: impl Into<String>, region: impl Into<String>) -> bool {
597 let c = self.config_holder.load();
598 let mut config_holder = ConfigHolder {
599 max_retry_count: c.max_retry_count,
600 request_timeout: c.request_timeout,
601 connection_timeout: c.connection_timeout,
602 max_connections: c.max_connections,
603 idle_connection_time: c.idle_connection_time,
604 enable_crc: c.enable_crc,
605 enable_verify_ssl: c.enable_verify_ssl,
606 auto_recognize_content_type: c.auto_recognize_content_type,
607 is_custom_domain: c.is_custom_domain,
608 dns_cache_time: c.dns_cache_time,
609 dns_cache_async_refresh: c.dns_cache_async_refresh,
610 proxy_host: c.proxy_host.to_string(),
611 proxy_port: c.proxy_port,
612 proxy_username: c.proxy_username.clone(),
613 proxy_password: c.proxy_password.clone(),
614 disable_encoding_meta: c.disable_encoding_meta,
615 expect_100_continue_threshold: c.expect_100_continue_threshold,
616 high_latency_log_threshold: c.high_latency_log_threshold,
617 user_agent_product_name: c.user_agent_product_name.clone(),
618 user_agent_soft_name: c.user_agent_soft_name.clone(),
619 user_agent_soft_version: c.user_agent_soft_version.clone(),
620 user_agent_customized_key_values: c.user_agent_customized_key_values.clone(),
621 follow_redirect_times: c.follow_redirect_times,
622 client_crt: c.client_crt.clone(),
623 client_key: c.client_key.clone(),
624 ca_crt: c.ca_crt.clone(),
625 user_agent: c.user_agent.clone(),
626 region: "".to_string(),
627 schema: "".to_string(),
628 domain: "".to_string(),
629 port: None,
630 schema_control: c.schema_control.clone(),
631 domain_control: c.domain_control.clone(),
632 };
633 if let Err(_) = config_holder.check(endpoint, region) {
634 return false;
635 }
636 self.config_holder.store(Arc::new(config_holder));
637 true
638 }
639}
640
641impl<P, C> TosClientImpl<P, C>
642where
643 P: CredentialsProvider<C>,
644 C: Credentials,
645{
646 fn load_credentials(&self) -> Result<Arc<C>, TosError> {
647 let credential_provider = self.credentials_provider.load();
648 match credential_provider.credentials(CREDENTIALS_EXPIRES) {
649 Err(ex) => Err(TosError::client_error_with_cause("load credentials error", GenericError::DefaultError(ex.to_string()))),
650 Ok(c) => Ok(c),
651 }
652 }
653
654 fn do_request<T, K, B>(&self, input: &T) -> Result<K, TosError>
655 where
656 T: InputTranslator<B>,
657 K: OutputParser + RequestInfoTrait,
658 B: Read + Send + 'static,
659 {
660 let config_holder = self.config_holder.load();
661 let operation = check_bucket_and_key(input, config_holder.is_custom_domain)?;
662 let mut retry_count = 0;
663 let max_retry_count = config_holder.max_retry_count;
664 loop {
665 let start = Instant::now();
666 let mut ac = AdditionalContext::new();
667 let result = self.do_request_once::<T, K, B>(input, retry_count, config_holder.clone(), &mut ac);
668 let elapsed_ms = start.elapsed().as_millis();
669 let exceed = exceed_high_latency_log_threshold(config_holder.high_latency_log_threshold, elapsed_ms, ac.request_size, operation);
670 match result {
671 Ok(k) => {
672 if exceed {
673 warn!(target: get_common_log_target(), "high latency request {} succeed, http status: {}, request id: {}, cost: {} ms", operation, k.status_code(), k.request_id(), elapsed_ms);
674 } else {
675 info!(target: get_common_log_target(), "do {} succeed, http status: {}, request id: {}, cost: {} ms", operation, k.status_code(), k.request_id(), elapsed_ms);
676 }
677 return Ok(k);
678 }
679 Err(mut e) => {
680 match &e {
681 TosError::TosClientError { .. } => {
682 if exceed {
683 warn!(target: get_common_log_target(), "high latency request {} failed, cost: {} ms", operation, elapsed_ms);
684 } else {
685 warn!(target: get_common_log_target(), "do {} failed, cost: {} ms", operation, elapsed_ms);
686 }
687 }
688 TosError::TosServerError { status_code, request_id, ec, .. } => {
689 if exceed {
690 if status_code.to_owned() < 500 {
691 warn!(target: get_common_log_target(), "high latency request {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
692 request_id, ec, elapsed_ms);
693 } else {
694 warn!(target: get_common_log_target(), "high latency request {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
695 request_id, ec, elapsed_ms);
696 }
697 } else {
698 if status_code.to_owned() < 500 {
699 warn!(target: get_common_log_target(), "do {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
700 request_id, ec, elapsed_ms);
701 } else {
702 info!(target: get_common_log_target(), "do {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
703 request_id, ec, elapsed_ms);
704 }
705 }
706 }
707 }
708
709 let (retry_after, need_retry) = check_need_retry(&e, retry_count, max_retry_count, operation);
710 if !need_retry {
711 if let Some(request_url) = ac.request_url {
712 e.set_request_url(request_url);
713 }
714 return Err(e);
715 }
716 sleep_for_retry(retry_count, retry_after);
717 retry_count += 1;
718 }
719 }
720 }
721 }
722
723 fn do_request_once<'a, 'b, T, K, B>(&self, input: &'b T, retry_count: isize, config_holder: Arc<ConfigHolder>, ac: &mut AdditionalContext<'a>) -> Result<K, TosError>
724 where
725 T: InputTranslator<B>,
726 K: OutputParser,
727 B: Read + Send + 'static,
728 'b: 'a,
729 {
730 let mut request = input.trans(config_holder)?;
731 ac.request_host = input.request_host();
732 ac.request_date = input.request_date();
733 ac.request_header = input.request_header();
734 ac.request_query = input.request_query();
735 let body = request.body.take();
736 request.retry_count = retry_count;
737 let mut response = self.do_request_by_client(&mut request, body, ac)?;
738 let (request_info, meta) = self.check_response(&request, &mut response)?;
739 if request.operation == GET_OBJECT_TO_FILE_OPERATION {
740 if let Some(cl) = response.content_length() {
741 ac.request_size = cl as i64;
742 }
743 let result = K::parse(request, response, request_info, meta);
744 return result;
745 }
746 K::parse(request, response, request_info, meta)
747 }
748
749 fn do_request_by_client<'a, 'c, B>(&self, request: &mut HttpRequest<'c, B>, body: Option<B>, ac: &mut AdditionalContext<'a>) -> Result<HttpResponse, TosError>
750 where
751 B: Read + Send + 'static,
752 'a: 'c,
753 {
754 let cred = self.load_credentials()?;
755 let ak = cred.ak();
756 let sk = cred.sk();
757 let security_token = cred.security_token();
758 let config_holder = self.config_holder.load();
759 auto_recognize_content_type(request, config_holder.auto_recognize_content_type);
760 sign_header(request, ak, sk, security_token, config_holder.as_ref(), ac)?;
761 request.enable_crc = config_holder.enable_crc;
762 let request_url = get_request_url(request, config_holder.as_ref(), false);
763 ac.request_url = Some(request_url.clone());
764 let mut rb = self.client.request(request.method.as_http_method(), request_url);
765 let mut cl = -1i64;
766 for kv in &request.header {
767 if *kv.0 == HEADER_CONTENT_LENGTH || *kv.0 == HEADER_CONTENT_LENGTH_LOWER {
768 if let Ok(x) = kv.1.parse::<i64>() {
769 cl = x;
770 }
771 }
772 rb = rb.header(*kv.0, kv.1);
773 }
774
775 if let Some(meta) = &request.meta {
776 for kv in meta {
777 rb = rb.header(kv.0, kv.1);
778 }
779 }
780
781 if request.retry_count > 0 {
782 rb = rb.header(HEADER_SDK_RETRY_COUNT, format!("attempt={}; max={}", request.retry_count, config_holder.max_retry_count));
783 }
784 if config_holder.expect_100_continue_threshold > 0 && cl > config_holder.expect_100_continue_threshold as i64 {
785 rb = rb.header(HEADER_EXPECT, "100-continue");
786 }
787 let is_upload_operation = ALL_UPLOAD_OPERATIONS.contains_key(request.operation);
788 let calc_crc = config_holder.enable_crc && is_upload_operation;
789 let crc64 = Arc::new(AtomicU64::new(0u64));
790 if let Some(bd) = body {
792 if calc_crc {
793 let mut reader = MultifunctionalReader::new(bd, Some(crc64.clone()), cl, request);
794 if let Some(ref rc) = request.request_context {
795 if let Some(init_crc64) = rc.init_crc64 {
796 reader.init_crc64 = Some(init_crc64);
797 }
798
799 if is_upload_operation {
800 if let Some(ref rl) = rc.rate_limiter {
801 reader.set_rate_limiter(rl.clone());
802 }
803
804 if let Some(ref dts) = rc.data_transfer_listener {
805 reader.set_data_transfer_listener(dts.clone());
806 }
807 }
808 }
809 rb = self.add_body(rb, reader, cl);
810 } else if is_upload_operation {
811 if let Some(ref rc) = request.request_context {
812 let mut reader = MultifunctionalReader::new(bd, None, cl, request);
813 if let Some(ref rl) = rc.rate_limiter {
814 reader.set_rate_limiter(rl.clone());
815 }
816
817 if let Some(ref dts) = rc.data_transfer_listener {
818 reader.set_data_transfer_listener(dts.clone());
819 }
820 rb = self.add_body(rb, reader, cl);
821 } else {
822 rb = self.add_body(rb, bd, cl);
823 }
824 } else {
825 rb = self.add_body(rb, bd, cl);
826 }
827 } else if cl == -1 {
828 rb = rb.header(HEADER_CONTENT_LENGTH, 0);
829 }
830
831 match rb.build() {
832 Ok(req) => {
833 let result = self.client.execute(req);
834 ac.request_size = cl;
835 match result {
836 Ok(resp) => {
837 if calc_crc {
838 let result = crc64.load(Ordering::Acquire);
839 if request.request_context.is_none() {
840 let mut rc = RequestContext::default();
841 rc.crc64 = Some(result);
842 request.request_context = Some(rc)
843 } else {
844 request.request_context.as_mut().unwrap().crc64 = Some(result);
845 }
846 }
847 Ok(resp)
848 }
849 Err(e) => {
850 Err(TosError::client_error_with_cause("do request error", GenericError::HttpRequestError(e.to_string())))
851 }
852 }
853 }
854 Err(e) => {
855 Err(TosError::client_error_with_cause("build request error", GenericError::DefaultError(e.to_string())))
856 }
857 }
858 }
859
860 fn add_body<B>(&self, mut rb: RequestBuilder, bd: B, cl: i64) -> RequestBuilder
861 where
862 B: Read + Send + 'static,
863 {
864 if cl >= 0 {
865 rb = rb.body(Body::sized(bd, cl as u64));
866 } else {
867 rb = rb.body(Body::new(bd));
868 }
869 rb
870 }
871
872 fn check_response<B>(&self, request: &HttpRequest<B>, response: &mut HttpResponse) -> Result<(RequestInfo, Meta), TosError> {
873 let status_code = response.status().as_u16();
874 let mut header = HashMap::<String, String>::with_capacity(response.headers().len());
875 let mut request_id = "".to_string();
876 let mut id2 = "".to_string();
877 let mut ec = "".to_string();
878 let mut k;
879 let mut v;
880 let mut meta = Meta::new();
881 for (key, value) in response.headers() {
882 k = key.to_string();
883 v = trans_header_value(value);
884 if k == HEADER_REQUEST_ID {
885 request_id = trans_header_value(value);
886 } else if k == HEADER_ID2 {
887 id2 = trans_header_value(value);
888 } else if k == HEADER_EC {
889 ec = trans_header_value(value);
890 } else if k.starts_with(HEADER_PREFIX_META) {
891 if let Ok(dk) = urlencoding::decode(&k[HEADER_PREFIX_META.len()..]) {
892 if let Ok(dv) = urlencoding::decode(v.as_str()) {
893 meta.insert(dk.to_string(), dv.to_string());
894 }
895 }
896 }
897 header.insert(k, v);
898 }
899
900 let request_info = RequestInfo {
901 status_code: status_code as isize,
902 request_id,
903 id2,
904 header,
905 };
906
907 if status_code >= 300 {
908 if request.method != HttpMethodHead {
909 if let Ok(error_response) = parse_json::<ErrorResponse>(response) {
910 return Err(TosError::server_error_with_code(error_response.code, error_response.ec, error_response.key, error_response.message,
914 error_response.host_id, error_response.resource, request_info));
915 }
916 }
917 return Err(TosError::server_error_with_code("", ec, "", String::from("unexpected status code: ") + response.status().as_str(),
918 "", "", request_info));
919 }
920
921 Ok((request_info, meta))
922 }
923}