Skip to main content

ve_tos_rust_sdk/
tos.rs

1/*
2 * Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16use super::bucket::*;
17use super::config::ConfigHolder;
18use super::credential::{CommonCredentials, CommonCredentialsProvider, Credentials, CredentialsProvider};
19use super::error::{ErrorResponse, GenericError, TosError};
20use super::internal::{auto_recognize_content_type, build_certificate, build_identity, check_bucket_and_key, check_need_retry, exceed_high_latency_log_threshold, get_request_url, parse_json, sleep_for_retry, trans_header_value, AdditionalContext, InputTranslator, OutputParser};
21use crate::auth::{pre_signed_policy_url, pre_signed_post_signature, pre_signed_url, sign_header, PreSignedPolicyURLInput, PreSignedPolicyURLOutput, PreSignedPostSignatureInput, PreSignedPostSignatureOutput, PreSignedURLInput, PreSignedURLOutput, SignerAPI};
22use crate::common::{get_common_log_target, Meta, RequestInfo, RequestInfoTrait};
23use crate::constant::*;
24use crate::enumeration::HttpMethodType::HttpMethodHead;
25use crate::http::{HttpRequest, HttpResponse, RequestContext};
26use crate::multipart::*;
27use crate::object::*;
28use crate::reader::{InternalReader, MultiBytes, MultifunctionalReader};
29use arc_swap::ArcSwap;
30use bytes::Bytes;
31use reqwest::blocking::{Body, Client, RequestBuilder};
32use reqwest::{redirect, Proxy};
33use std::collections::HashMap;
34use std::fmt::Debug;
35use std::fs::File;
36use std::io::{Cursor, Read};
37use std::marker::PhantomData;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tracing::log::{info, warn};
42
43#[derive(Debug, Clone, Default)]
44pub struct TosClientBuilder<P, C>
45{
46    ak: String,
47    sk: String,
48    security_token: String,
49    region: String,
50    endpoint: String,
51    credentials_provider: Option<P>,
52    config_holder: ConfigHolder,
53    c: PhantomData<C>,
54}
55
56impl<P, C> TosClientBuilder<P, C>
57where
58    P: CredentialsProvider<C> + Debug,
59    C: Credentials + Debug,
60{
61    pub fn build(mut self) -> Result<TosClientImpl<P, C>, TosError> {
62        self.config_holder.check(self.endpoint, self.region)?;
63        self.config_holder.gen_user_agent();
64        let mut client = Client::builder()
65            .user_agent(self.config_holder.user_agent.as_str())
66            .tcp_nodelay(true)
67            .tcp_keepalive(None)
68            .no_gzip()
69            .no_deflate()
70            .no_brotli()
71            .connect_timeout(Duration::from_millis(self.config_holder.connection_timeout as u64))
72            .pool_idle_timeout(Duration::from_millis(self.config_holder.idle_connection_time as u64))
73            .pool_max_idle_per_host(self.config_holder.max_connections as usize);
74        if self.config_holder.request_timeout > 0 {
75            client = client.timeout(Duration::from_millis(self.config_holder.request_timeout as u64));
76        }
77
78        if self.config_holder.follow_redirect_times > 0 {
79            client = client.redirect(redirect::Policy::limited(self.config_holder.follow_redirect_times as usize));
80        } else {
81            client = client.redirect(redirect::Policy::none());
82        }
83
84        if self.config_holder.proxy_host != "" {
85            let mut proxy_url = self.config_holder.proxy_host.as_str();
86            while proxy_url.len() > 0 && proxy_url.ends_with("/") {
87                proxy_url = &proxy_url[0..proxy_url.len() - 1];
88            }
89
90            if proxy_url != "" {
91                let mut proxy_url = proxy_url.to_lowercase();
92                if !proxy_url.starts_with(SCHEMA_HTTP) && !proxy_url.starts_with(SCHEMA_HTTPS) {
93                    proxy_url = format!("{}{}", SCHEMA_HTTP, proxy_url);
94                }
95                if self.config_holder.proxy_port >= 0 {
96                    proxy_url = format!("{}:{}", proxy_url, self.config_holder.proxy_port);
97                }
98
99                let (domain, schema, _) = self.config_holder.parse_domain(proxy_url.as_str())?;
100                if self.config_holder.proxy_username != "" && self.config_holder.proxy_password != "" {
101                    proxy_url = format!("{}://{}:{}@{}", schema, self.config_holder.proxy_username, self.config_holder.proxy_password, domain);
102                } else {
103                    proxy_url = format!("{}://{}", schema, domain);
104                }
105                match Proxy::http(proxy_url.as_str()) {
106                    Err(e) => return Err(TosError::client_error_with_cause("build http proxy error", GenericError::DefaultError(e.to_string()))),
107                    Ok(proxy) => {
108                        client = client.proxy(proxy);
109                    }
110                }
111
112                match Proxy::https(proxy_url) {
113                    Err(e) => return Err(TosError::client_error_with_cause("build https proxy error", GenericError::DefaultError(e.to_string()))),
114                    Ok(proxy) => {
115                        client = client.proxy(proxy);
116                    }
117                }
118            } else {
119                client = client.no_proxy();
120            }
121        } else {
122            client = client.no_proxy();
123        }
124
125        if !self.config_holder.enable_verify_ssl {
126            client = client.danger_accept_invalid_certs(true);
127            #[cfg(feature = "use-native-tls")]
128            {
129                client = client.danger_accept_invalid_hostnames(true);
130            }
131        }
132
133        #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
134        if self.config_holder.ca_crt != "" {
135            client = client.add_root_certificate(build_certificate(self.config_holder.ca_crt.as_str())?);
136        }
137
138        #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
139        if self.config_holder.client_crt != "" && self.config_holder.client_key != "" {
140            client = client.identity(build_identity(self.config_holder.client_crt.as_str(), self.config_holder.client_key.as_str())?);
141        }
142
143        let cp;
144        let mut credentials_can_refresh = false;
145        match self.credentials_provider {
146            Some(p) => {
147                cp = p;
148            }
149            None => {
150                match C::new(self.ak, self.sk, self.security_token) {
151                    Err(ex) => return Err(TosError::client_error_with_cause("create credentials error",
152                                                                            GenericError::DefaultError(ex.to_string()))),
153                    Ok(c) => {
154                        match P::new(c) {
155                            Err(ex) => return Err(TosError::client_error_with_cause("create credentials provider error",
156                                                                                    GenericError::DefaultError(ex.to_string()))),
157                            Ok(p) => {
158                                credentials_can_refresh = true;
159                                cp = p;
160                            }
161                        }
162                    }
163                }
164            }
165        }
166
167        match client.build() {
168            Ok(client) => {
169                Ok(TosClientImpl {
170                    client,
171                    config_holder: ArcSwap::from(Arc::new(self.config_holder)),
172                    credentials_provider: ArcSwap::from(Arc::new(cp)),
173                    credentials_can_refresh,
174                    c: self.c,
175                })
176            }
177            Err(e) => {
178                Err(TosError::client_error_with_cause("build tos client error", GenericError::DefaultError(e.to_string())))
179            }
180        }
181    }
182
183    pub fn build_as_trait(self) -> Result<impl TosClient, TosError> {
184        let client = self.build()?;
185        Ok(client)
186    }
187
188    pub fn ak(mut self, ak: impl Into<String>) -> Self {
189        self.ak = ak.into();
190        self
191    }
192
193    pub fn sk(mut self, sk: impl Into<String>) -> Self {
194        self.sk = sk.into();
195        self
196    }
197
198    pub fn security_token(mut self, security_token: impl Into<String>) -> Self {
199        self.security_token = security_token.into();
200        self
201    }
202
203    pub fn credentials_provider(mut self, p: P) -> Self {
204        self.credentials_provider = Some(p);
205        self
206    }
207
208    pub fn region(mut self, region: impl Into<String>) -> Self {
209        self.region = region.into();
210        self
211    }
212
213    pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
214        self.endpoint = endpoint.into();
215        self
216    }
217
218    pub fn request_timeout(mut self, request_timeout: isize) -> Self {
219        if request_timeout > 0 {
220            self.config_holder.request_timeout = request_timeout;
221        }
222        self
223    }
224
225    pub fn connection_timeout(mut self, connection_timeout: isize) -> Self {
226        if connection_timeout > 0 {
227            self.config_holder.connection_timeout = connection_timeout;
228        }
229        self
230    }
231
232    pub fn max_connections(mut self, max_connections: isize) -> Self {
233        if max_connections > 0 {
234            self.config_holder.max_connections = max_connections;
235        }
236        self
237    }
238    pub fn idle_connection_time(mut self, idle_connection_time: isize) -> Self {
239        if idle_connection_time > 0 {
240            self.config_holder.idle_connection_time = idle_connection_time;
241        }
242        self
243    }
244
245    pub fn enable_crc(mut self, enable_crc: bool) -> Self {
246        self.config_holder.enable_crc = enable_crc;
247        self
248    }
249
250    pub fn enable_verify_ssl(mut self, enable_verify_ssl: bool) -> Self {
251        self.config_holder.enable_verify_ssl = enable_verify_ssl;
252        self
253    }
254
255    pub fn max_retry_count(mut self, max_retry_count: isize) -> Self {
256        self.config_holder.max_retry_count = max_retry_count;
257        self
258    }
259    pub fn auto_recognize_content_type(mut self, auto_recognize_content_type: bool) -> Self {
260        self.config_holder.auto_recognize_content_type = auto_recognize_content_type;
261        self
262    }
263    pub fn is_custom_domain(mut self, is_custom_domain: bool) -> Self {
264        self.config_holder.is_custom_domain = is_custom_domain;
265        self
266    }
267    pub fn proxy_host(mut self, proxy_host: impl Into<String>) -> Self {
268        self.config_holder.proxy_host = proxy_host.into();
269        self
270    }
271    pub fn proxy_port(mut self, proxy_host: isize) -> Self {
272        self.config_holder.proxy_port = proxy_host.into();
273        self
274    }
275    pub fn proxy_username(mut self, proxy_username: impl Into<String>) -> Self {
276        self.config_holder.proxy_username = proxy_username.into();
277        self
278    }
279    pub fn proxy_password(mut self, proxy_password: impl Into<String>) -> Self {
280        self.config_holder.proxy_password = proxy_password.into();
281        self
282    }
283    pub fn disable_encoding_meta(mut self, disable_encoding_meta: bool) -> Self {
284        self.config_holder.disable_encoding_meta = disable_encoding_meta;
285        self
286    }
287    pub fn expect_100_continue_threshold(mut self, expect_100_continue_threshold: isize) -> Self {
288        self.config_holder.expect_100_continue_threshold = expect_100_continue_threshold;
289        self
290    }
291    pub fn high_latency_log_threshold(mut self, high_latency_log_threshold: isize) -> Self {
292        self.config_holder.high_latency_log_threshold = high_latency_log_threshold;
293        self
294    }
295    pub fn user_agent_product_name(mut self, user_agent_product_name: impl Into<String>) -> Self {
296        self.config_holder.user_agent_product_name = user_agent_product_name.into();
297        self
298    }
299    pub fn user_agent_soft_name(mut self, user_agent_soft_name: impl Into<String>) -> Self {
300        self.config_holder.user_agent_soft_name = user_agent_soft_name.into();
301        self
302    }
303    pub fn user_agent_soft_version(mut self, user_agent_soft_version: impl Into<String>) -> Self {
304        self.config_holder.user_agent_soft_version = user_agent_soft_version.into();
305        self
306    }
307    pub fn user_agent_customized_key_values(mut self, user_agent_customized_key_values: impl Into<HashMap<String, String>>) -> Self {
308        self.config_holder.user_agent_customized_key_values = Some(user_agent_customized_key_values.into());
309        self
310    }
311
312    pub fn follow_redirect_times(mut self, follow_redirect_times: isize) -> Self {
313        self.config_holder.follow_redirect_times = follow_redirect_times;
314        self
315    }
316    #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
317    pub fn client_crt(mut self, client_crt: impl Into<String>) -> Self {
318        self.config_holder.client_crt = client_crt.into();
319        self
320    }
321    #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
322    pub fn client_key(mut self, client_key: impl Into<String>) -> Self {
323        self.config_holder.client_key = client_key.into();
324        self
325    }
326    #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
327    pub fn ca_crt(mut self, ca_crt: impl Into<String>) -> Self {
328        self.config_holder.ca_crt = ca_crt.into();
329        self
330    }
331}
332
333pub fn builder() -> TosClientBuilder<CommonCredentialsProvider<CommonCredentials>, CommonCredentials> {
334    TosClientBuilder::default()
335}
336
337pub trait ConfigAware {
338    fn is_custom_domain(&self) -> bool;
339}
340
341pub trait TosClient: BucketAPI + ObjectAPI + MultipartAPI + SignerAPI + ConfigAware + Debug {
342    fn refresh_credentials(&self, ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>) -> bool;
343    fn refresh_endpoint_region(&self, endpoint: impl Into<String>, region: impl Into<String>) -> bool;
344}
345
346pub type DefaultTosClient = TosClientImpl<CommonCredentialsProvider<CommonCredentials>, CommonCredentials>;
347
348#[derive(Debug)]
349pub struct TosClientImpl<P, C> {
350    pub(crate) client: Client,
351    pub(crate) config_holder: ArcSwap<ConfigHolder>,
352    pub(crate) credentials_provider: ArcSwap<P>,
353    pub(crate) credentials_can_refresh: bool,
354    c: PhantomData<C>,
355}
356
357
358impl<P, C> BucketAPI for TosClientImpl<P, C>
359where
360    C: Credentials,
361    P: CredentialsProvider<C>,
362{
363    fn create_bucket(&self, input: &CreateBucketInput) -> Result<CreateBucketOutput, TosError> {
364        self.do_request::<_, _, Cursor<String>>(input)
365    }
366
367    fn head_bucket(&self, input: &HeadBucketInput) -> Result<HeadBucketOutput, TosError> {
368        self.do_request::<_, _, Cursor<String>>(input)
369    }
370
371    fn delete_bucket(&self, input: &DeleteBucketInput) -> Result<DeleteBucketOutput, TosError> {
372        self.do_request::<_, _, Cursor<String>>(input)
373    }
374
375    fn list_buckets(&self, input: &ListBucketsInput) -> Result<ListBucketsOutput, TosError> {
376        self.do_request::<_, _, Cursor<String>>(input)
377    }
378}
379
380impl<P, C> ObjectAPI for TosClientImpl<P, C>
381where
382    C: Credentials,
383    P: CredentialsProvider<C>,
384{
385    fn copy_object(&self, input: &CopyObjectInput) -> Result<CopyObjectOutput, TosError> {
386        self.do_request::<_, _, Cursor<String>>(input)
387    }
388
389    fn delete_object(&self, input: &DeleteObjectInput) -> Result<DeleteObjectOutput, TosError> {
390        self.do_request::<_, _, Cursor<String>>(input)
391    }
392
393    fn delete_multi_objects(&self, input: &DeleteMultiObjectsInput) -> Result<DeleteMultiObjectsOutput, TosError> {
394        self.do_request::<DeleteMultiObjectsInput, DeleteMultiObjectsOutput, InternalReader<Cursor<Bytes>>>(input)
395    }
396
397    fn get_object(&self, input: &GetObjectInput) -> Result<GetObjectOutput, TosError> {
398        self.do_request::<_, _, Cursor<String>>(input)
399    }
400
401    fn get_object_to_file(&self, input: &GetObjectToFileInput) -> Result<GetObjectToFileOutput, TosError> {
402        self.do_request::<_, _, Cursor<String>>(input)
403    }
404
405    fn get_object_acl(&self, input: &GetObjectACLInput) -> Result<GetObjectACLOutput, TosError> {
406        self.do_request::<_, _, Cursor<String>>(input)
407    }
408
409    fn head_object(&self, input: &HeadObjectInput) -> Result<HeadObjectOutput, TosError> {
410        self.do_request::<_, _, Cursor<String>>(input)
411    }
412
413    fn append_object<B>(&self, input: &AppendObjectInput<B>) -> Result<AppendObjectOutput, TosError>
414    where
415        B: Read + Send + 'static,
416    {
417        self.do_request(input)
418    }
419
420    fn append_object_from_buffer(&self, input: &AppendObjectFromBufferInput) -> Result<AppendObjectOutput, TosError> {
421        self.do_request::<_, _, InternalReader<MultiBytes>>(input)
422    }
423
424    fn list_objects(&self, input: &ListObjectsInput) -> Result<ListObjectsOutput, TosError> {
425        self.do_request::<_, _, Cursor<String>>(input)
426    }
427
428    fn list_objects_type2(&self, input: &ListObjectsType2Input) -> Result<ListObjectsType2Output, TosError> {
429        if input.list_only_once {
430            return self.do_request::<_, _, Cursor<String>>(input);
431        }
432
433        let mut input = input.clone();
434        if input.max_keys <= 0 {
435            input.max_keys = DEFAULT_MAX_KEYS;
436        }
437        let mut _output: Option<ListObjectsType2Output> = None;
438        loop {
439            let mut temp_output = self.do_request::<ListObjectsType2Input, ListObjectsType2Output, Cursor<String>>(&input)?;
440            if _output.is_none() {
441                _output = Some(temp_output);
442            } else {
443                let output = _output.as_mut().unwrap();
444                output.key_count += temp_output.key_count;
445                output.is_truncated = temp_output.is_truncated;
446                output.next_continuation_token = temp_output.next_continuation_token;
447                output.contents.append(&mut temp_output.contents);
448                output.common_prefixes.append(&mut temp_output.common_prefixes);
449            }
450
451            let output = _output.as_ref().unwrap();
452            if !output.is_truncated || output.contents.len() + output.common_prefixes.len() >= input.max_keys as usize || output.key_count >= input.max_keys {
453                break;
454            }
455            input.continuation_token = output.next_continuation_token.clone();
456            input.max_keys = input.max_keys - output.key_count;
457        }
458
459        Ok(_output.unwrap())
460    }
461
462    fn list_object_versions(&self, input: &ListObjectVersionsInput) -> Result<ListObjectVersionsOutput, TosError> {
463        self.do_request::<_, _, Cursor<String>>(input)
464    }
465
466    fn put_object<B>(&self, input: &PutObjectInput<B>) -> Result<PutObjectOutput, TosError>
467    where
468        B: Read + Send + 'static,
469    {
470        self.do_request(input)
471    }
472
473    fn put_object_from_buffer(&self, input: &PutObjectFromBufferInput) -> Result<PutObjectOutput, TosError> {
474        self.do_request::<_, _, InternalReader<MultiBytes>>(input)
475    }
476
477    fn put_object_from_file(&self, input: &PutObjectFromFileInput) -> Result<PutObjectOutput, TosError> {
478        self.do_request::<_, _, InternalReader<File>>(input)
479    }
480
481    fn put_object_acl(&self, input: &PutObjectACLInput) -> Result<PutObjectACLOutput, TosError> {
482        self.do_request::<_, _, InternalReader<Cursor<Bytes>>>(input)
483    }
484
485    fn set_object_meta(&self, input: &SetObjectMetaInput) -> Result<SetObjectMetaOutput, TosError> {
486        self.do_request::<_, _, Cursor<String>>(input)
487    }
488}
489
490impl<P, C> MultipartAPI for TosClientImpl<P, C>
491where
492    C: Credentials,
493    P: CredentialsProvider<C>,
494{
495    fn create_multipart_upload(&self, input: &CreateMultipartUploadInput) -> Result<CreateMultipartUploadOutput, TosError> {
496        self.do_request::<_, _, Cursor<String>>(input)
497    }
498
499    fn upload_part<B>(&self, input: &UploadPartInput<B>) -> Result<UploadPartOutput, TosError>
500    where
501        B: Read + Send + 'static,
502    {
503        self.do_request(input)
504    }
505
506    fn upload_part_from_buffer(&self, input: &UploadPartFromBufferInput) -> Result<UploadPartOutput, TosError> {
507        self.do_request::<_, _, InternalReader<MultiBytes>>(input)
508    }
509
510    fn upload_part_from_file(&self, input: &UploadPartFromFileInput) -> Result<UploadPartOutput, TosError> {
511        self.do_request::<_, _, InternalReader<File>>(input)
512    }
513
514    fn complete_multipart_upload(&self, input: &CompleteMultipartUploadInput) -> Result<CompleteMultipartUploadOutput, TosError> {
515        self.do_request::<_, _, InternalReader<Cursor<Bytes>>>(input)
516    }
517
518    fn abort_multipart_upload(&self, input: &AbortMultipartUploadInput) -> Result<AbortMultipartUploadOutput, TosError> {
519        self.do_request::<_, _, Cursor<String>>(input)
520    }
521
522    fn upload_part_copy(&self, input: &UploadPartCopyInput) -> Result<UploadPartCopyOutput, TosError> {
523        self.do_request::<_, _, Cursor<String>>(input)
524    }
525
526    fn list_multipart_uploads(&self, input: &ListMultipartUploadsInput) -> Result<ListMultipartUploadsOutput, TosError> {
527        self.do_request::<_, _, Cursor<String>>(input)
528    }
529
530    fn list_parts(&self, input: &ListPartsInput) -> Result<ListPartsOutput, TosError> {
531        self.do_request::<_, _, Cursor<String>>(input)
532    }
533}
534
535impl<P, C> ConfigAware for TosClientImpl<P, C>
536{
537    fn is_custom_domain(&self) -> bool {
538        self.config_holder.load().is_custom_domain
539    }
540}
541
542impl<P, C> SignerAPI for TosClientImpl<P, C>
543where
544    C: Credentials + Debug,
545    P: CredentialsProvider<C> + Debug,
546{
547    fn pre_signed_url(&self, input: &PreSignedURLInput) -> Result<PreSignedURLOutput, TosError> {
548        let cred = self.load_credentials()?;
549        let ak = cred.ak();
550        let sk = cred.sk();
551        let security_token = cred.security_token();
552        pre_signed_url(&self.config_holder, ak, sk, security_token, input)
553    }
554
555    fn pre_signed_post_signature(&self, input: &PreSignedPostSignatureInput) -> Result<PreSignedPostSignatureOutput, TosError> {
556        let cred = self.load_credentials()?;
557        let ak = cred.ak();
558        let sk = cred.sk();
559        let security_token = cred.security_token();
560        pre_signed_post_signature(&self.config_holder, ak, sk, security_token, input)
561    }
562
563    fn pre_signed_policy_url(&self, input: &PreSignedPolicyURLInput) -> Result<PreSignedPolicyURLOutput, TosError> {
564        let cred = self.load_credentials()?;
565        let ak = cred.ak();
566        let sk = cred.sk();
567        let security_token = cred.security_token();
568        pre_signed_policy_url(&self.config_holder, ak, sk, security_token, input)
569    }
570}
571
572impl<P, C> TosClient for TosClientImpl<P, C>
573where
574    P: CredentialsProvider<C> + Debug,
575    C: Credentials + Debug,
576{
577    fn refresh_credentials(&self, ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>) -> bool {
578        if !self.credentials_can_refresh {
579            return false;
580        }
581
582        match C::new(ak, sk, security_token) {
583            Err(_) => false,
584            Ok(c) => {
585                match P::new(c) {
586                    Err(_) => false,
587                    Ok(p) => {
588                        self.credentials_provider.store(Arc::new(p));
589                        true
590                    }
591                }
592            }
593        }
594    }
595
596    fn refresh_endpoint_region(&self, endpoint: impl Into<String>, region: impl Into<String>) -> bool {
597        let c = self.config_holder.load();
598        let mut config_holder = ConfigHolder {
599            max_retry_count: c.max_retry_count,
600            request_timeout: c.request_timeout,
601            connection_timeout: c.connection_timeout,
602            max_connections: c.max_connections,
603            idle_connection_time: c.idle_connection_time,
604            enable_crc: c.enable_crc,
605            enable_verify_ssl: c.enable_verify_ssl,
606            auto_recognize_content_type: c.auto_recognize_content_type,
607            is_custom_domain: c.is_custom_domain,
608            dns_cache_time: c.dns_cache_time,
609            dns_cache_async_refresh: c.dns_cache_async_refresh,
610            proxy_host: c.proxy_host.to_string(),
611            proxy_port: c.proxy_port,
612            proxy_username: c.proxy_username.clone(),
613            proxy_password: c.proxy_password.clone(),
614            disable_encoding_meta: c.disable_encoding_meta,
615            expect_100_continue_threshold: c.expect_100_continue_threshold,
616            high_latency_log_threshold: c.high_latency_log_threshold,
617            user_agent_product_name: c.user_agent_product_name.clone(),
618            user_agent_soft_name: c.user_agent_soft_name.clone(),
619            user_agent_soft_version: c.user_agent_soft_version.clone(),
620            user_agent_customized_key_values: c.user_agent_customized_key_values.clone(),
621            follow_redirect_times: c.follow_redirect_times,
622            client_crt: c.client_crt.clone(),
623            client_key: c.client_key.clone(),
624            ca_crt: c.ca_crt.clone(),
625            user_agent: c.user_agent.clone(),
626            region: "".to_string(),
627            schema: "".to_string(),
628            domain: "".to_string(),
629            port: None,
630            schema_control: c.schema_control.clone(),
631            domain_control: c.domain_control.clone(),
632        };
633        if let Err(_) = config_holder.check(endpoint, region) {
634            return false;
635        }
636        self.config_holder.store(Arc::new(config_holder));
637        true
638    }
639}
640
641impl<P, C> TosClientImpl<P, C>
642where
643    P: CredentialsProvider<C>,
644    C: Credentials,
645{
646    fn load_credentials(&self) -> Result<Arc<C>, TosError> {
647        let credential_provider = self.credentials_provider.load();
648        match credential_provider.credentials(CREDENTIALS_EXPIRES) {
649            Err(ex) => Err(TosError::client_error_with_cause("load credentials error", GenericError::DefaultError(ex.to_string()))),
650            Ok(c) => Ok(c),
651        }
652    }
653
654    fn do_request<T, K, B>(&self, input: &T) -> Result<K, TosError>
655    where
656        T: InputTranslator<B>,
657        K: OutputParser + RequestInfoTrait,
658        B: Read + Send + 'static,
659    {
660        let config_holder = self.config_holder.load();
661        let operation = check_bucket_and_key(input, config_holder.is_custom_domain)?;
662        let mut retry_count = 0;
663        let max_retry_count = config_holder.max_retry_count;
664        loop {
665            let start = Instant::now();
666            let mut ac = AdditionalContext::new();
667            let result = self.do_request_once::<T, K, B>(input, retry_count, config_holder.clone(), &mut ac);
668            let elapsed_ms = start.elapsed().as_millis();
669            let exceed = exceed_high_latency_log_threshold(config_holder.high_latency_log_threshold, elapsed_ms, ac.request_size, operation);
670            match result {
671                Ok(k) => {
672                    if exceed {
673                        warn!(target: get_common_log_target(), "high latency request {} succeed, http status: {}, request id: {}, cost: {} ms", operation, k.status_code(), k.request_id(), elapsed_ms);
674                    } else {
675                        info!(target: get_common_log_target(), "do {} succeed, http status: {}, request id: {}, cost: {} ms", operation, k.status_code(), k.request_id(), elapsed_ms);
676                    }
677                    return Ok(k);
678                }
679                Err(mut e) => {
680                    match &e {
681                        TosError::TosClientError { .. } => {
682                            if exceed {
683                                warn!(target: get_common_log_target(), "high latency request {} failed, cost: {} ms", operation, elapsed_ms);
684                            } else {
685                                warn!(target: get_common_log_target(), "do {} failed, cost: {} ms", operation, elapsed_ms);
686                            }
687                        }
688                        TosError::TosServerError { status_code, request_id, ec, .. } => {
689                            if exceed {
690                                if status_code.to_owned() < 500 {
691                                    warn!(target: get_common_log_target(), "high latency request {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
692                                    request_id, ec, elapsed_ms);
693                                } else {
694                                    warn!(target: get_common_log_target(), "high latency request {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
695                                    request_id, ec, elapsed_ms);
696                                }
697                            } else {
698                                if status_code.to_owned() < 500 {
699                                    warn!(target: get_common_log_target(), "do {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
700                                    request_id, ec, elapsed_ms);
701                                } else {
702                                    info!(target: get_common_log_target(), "do {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
703                                    request_id, ec, elapsed_ms);
704                                }
705                            }
706                        }
707                    }
708
709                    let (retry_after, need_retry) = check_need_retry(&e, retry_count, max_retry_count, operation);
710                    if !need_retry {
711                        if let Some(request_url) = ac.request_url {
712                            e.set_request_url(request_url);
713                        }
714                        return Err(e);
715                    }
716                    sleep_for_retry(retry_count, retry_after);
717                    retry_count += 1;
718                }
719            }
720        }
721    }
722
723    fn do_request_once<'a, 'b, T, K, B>(&self, input: &'b T, retry_count: isize, config_holder: Arc<ConfigHolder>, ac: &mut AdditionalContext<'a>) -> Result<K, TosError>
724    where
725        T: InputTranslator<B>,
726        K: OutputParser,
727        B: Read + Send + 'static,
728        'b: 'a,
729    {
730        let mut request = input.trans(config_holder)?;
731        ac.request_host = input.request_host();
732        ac.request_date = input.request_date();
733        ac.request_header = input.request_header();
734        ac.request_query = input.request_query();
735        let body = request.body.take();
736        request.retry_count = retry_count;
737        let mut response = self.do_request_by_client(&mut request, body, ac)?;
738        let (request_info, meta) = self.check_response(&request, &mut response)?;
739        if request.operation == GET_OBJECT_TO_FILE_OPERATION {
740            if let Some(cl) = response.content_length() {
741                ac.request_size = cl as i64;
742            }
743            let result = K::parse(request, response, request_info, meta);
744            return result;
745        }
746        K::parse(request, response, request_info, meta)
747    }
748
749    fn do_request_by_client<'a, 'c, B>(&self, request: &mut HttpRequest<'c, B>, body: Option<B>, ac: &mut AdditionalContext<'a>) -> Result<HttpResponse, TosError>
750    where
751        B: Read + Send + 'static,
752        'a: 'c,
753    {
754        let cred = self.load_credentials()?;
755        let ak = cred.ak();
756        let sk = cred.sk();
757        let security_token = cred.security_token();
758        let config_holder = self.config_holder.load();
759        auto_recognize_content_type(request, config_holder.auto_recognize_content_type);
760        sign_header(request, ak, sk, security_token, config_holder.as_ref(), ac)?;
761        request.enable_crc = config_holder.enable_crc;
762        let request_url = get_request_url(request, config_holder.as_ref(), false);
763        ac.request_url = Some(request_url.clone());
764        let mut rb = self.client.request(request.method.as_http_method(), request_url);
765        let mut cl = -1i64;
766        for kv in &request.header {
767            if *kv.0 == HEADER_CONTENT_LENGTH || *kv.0 == HEADER_CONTENT_LENGTH_LOWER {
768                if let Ok(x) = kv.1.parse::<i64>() {
769                    cl = x;
770                }
771            }
772            rb = rb.header(*kv.0, kv.1);
773        }
774
775        if let Some(meta) = &request.meta {
776            for kv in meta {
777                rb = rb.header(kv.0, kv.1);
778            }
779        }
780
781        if request.retry_count > 0 {
782            rb = rb.header(HEADER_SDK_RETRY_COUNT, format!("attempt={}; max={}", request.retry_count, config_holder.max_retry_count));
783        }
784        if config_holder.expect_100_continue_threshold > 0 && cl > config_holder.expect_100_continue_threshold as i64 {
785            rb = rb.header(HEADER_EXPECT, "100-continue");
786        }
787        let is_upload_operation = ALL_UPLOAD_OPERATIONS.contains_key(request.operation);
788        let calc_crc = config_holder.enable_crc && is_upload_operation;
789        let crc64 = Arc::new(AtomicU64::new(0u64));
790        // add body
791        if let Some(bd) = body {
792            if calc_crc {
793                let mut reader = MultifunctionalReader::new(bd, Some(crc64.clone()), cl, request);
794                if let Some(ref rc) = request.request_context {
795                    if let Some(init_crc64) = rc.init_crc64 {
796                        reader.init_crc64 = Some(init_crc64);
797                    }
798
799                    if is_upload_operation {
800                        if let Some(ref rl) = rc.rate_limiter {
801                            reader.set_rate_limiter(rl.clone());
802                        }
803
804                        if let Some(ref dts) = rc.data_transfer_listener {
805                            reader.set_data_transfer_listener(dts.clone());
806                        }
807                    }
808                }
809                rb = self.add_body(rb, reader, cl);
810            } else if is_upload_operation {
811                if let Some(ref rc) = request.request_context {
812                    let mut reader = MultifunctionalReader::new(bd, None, cl, request);
813                    if let Some(ref rl) = rc.rate_limiter {
814                        reader.set_rate_limiter(rl.clone());
815                    }
816
817                    if let Some(ref dts) = rc.data_transfer_listener {
818                        reader.set_data_transfer_listener(dts.clone());
819                    }
820                    rb = self.add_body(rb, reader, cl);
821                } else {
822                    rb = self.add_body(rb, bd, cl);
823                }
824            } else {
825                rb = self.add_body(rb, bd, cl);
826            }
827        } else if cl == -1 {
828            rb = rb.header(HEADER_CONTENT_LENGTH, 0);
829        }
830
831        match rb.build() {
832            Ok(req) => {
833                let result = self.client.execute(req);
834                ac.request_size = cl;
835                match result {
836                    Ok(resp) => {
837                        if calc_crc {
838                            let result = crc64.load(Ordering::Acquire);
839                            if request.request_context.is_none() {
840                                let mut rc = RequestContext::default();
841                                rc.crc64 = Some(result);
842                                request.request_context = Some(rc)
843                            } else {
844                                request.request_context.as_mut().unwrap().crc64 = Some(result);
845                            }
846                        }
847                        Ok(resp)
848                    }
849                    Err(e) => {
850                        Err(TosError::client_error_with_cause("do request error", GenericError::HttpRequestError(e.to_string())))
851                    }
852                }
853            }
854            Err(e) => {
855                Err(TosError::client_error_with_cause("build request error", GenericError::DefaultError(e.to_string())))
856            }
857        }
858    }
859
860    fn add_body<B>(&self, mut rb: RequestBuilder, bd: B, cl: i64) -> RequestBuilder
861    where
862        B: Read + Send + 'static,
863    {
864        if cl >= 0 {
865            rb = rb.body(Body::sized(bd, cl as u64));
866        } else {
867            rb = rb.body(Body::new(bd));
868        }
869        rb
870    }
871
872    fn check_response<B>(&self, request: &HttpRequest<B>, response: &mut HttpResponse) -> Result<(RequestInfo, Meta), TosError> {
873        let status_code = response.status().as_u16();
874        let mut header = HashMap::<String, String>::with_capacity(response.headers().len());
875        let mut request_id = "".to_string();
876        let mut id2 = "".to_string();
877        let mut ec = "".to_string();
878        let mut k;
879        let mut v;
880        let mut meta = Meta::new();
881        for (key, value) in response.headers() {
882            k = key.to_string();
883            v = trans_header_value(value);
884            if k == HEADER_REQUEST_ID {
885                request_id = trans_header_value(value);
886            } else if k == HEADER_ID2 {
887                id2 = trans_header_value(value);
888            } else if k == HEADER_EC {
889                ec = trans_header_value(value);
890            } else if k.starts_with(HEADER_PREFIX_META) {
891                if let Ok(dk) = urlencoding::decode(&k[HEADER_PREFIX_META.len()..]) {
892                    if let Ok(dv) = urlencoding::decode(v.as_str()) {
893                        meta.insert(dk.to_string(), dv.to_string());
894                    }
895                }
896            }
897            header.insert(k, v);
898        }
899
900        let request_info = RequestInfo {
901            status_code: status_code as isize,
902            request_id,
903            id2,
904            header,
905        };
906
907        if status_code >= 300 {
908            if request.method != HttpMethodHead {
909                if let Ok(error_response) = parse_json::<ErrorResponse>(response) {
910                    // println!("{}", error_response.canonical_request);
911                    // println!("{}", error_response.string_to_sign);
912                    // println!("{}", error_response.signature_provided);
913                    return Err(TosError::server_error_with_code(error_response.code, error_response.ec, error_response.key, error_response.message,
914                                                                error_response.host_id, error_response.resource, request_info));
915                }
916            }
917            return Err(TosError::server_error_with_code("", ec, "", String::from("unexpected status code: ") + response.status().as_str(),
918                                                        "", "", request_info));
919        }
920
921        Ok((request_info, meta))
922    }
923}