Skip to main content

ve_tos_rust_sdk/asynchronous/
tos.rs

1/*
2 * Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16use crate::asynchronous::auth::SignerAPI;
17use crate::asynchronous::bucket::BucketAPI;
18use crate::asynchronous::common::DataTransferListener;
19use crate::asynchronous::control::ControlAPI;
20use crate::asynchronous::credential::CredentialsProvider;
21use crate::asynchronous::http::HttpResponse;
22use crate::asynchronous::internal::{AsyncInputTranslator, OutputParser};
23use crate::asynchronous::multipart::MultipartAPI;
24use crate::asynchronous::object::ObjectAPI;
25use crate::asynchronous::paginator::PaginatorAPI;
26use crate::asynchronous::reader::StreamVec;
27use crate::auth::{pre_signed_policy_url, pre_signed_post_signature, pre_signed_url, sign_header, PreSignedPolicyURLInput, PreSignedPolicyURLOutput, PreSignedPostSignatureInput, PreSignedPostSignatureOutput, PreSignedURLInput, PreSignedURLOutput};
28use crate::bucket::{CreateBucketInput, CreateBucketOutput, DeleteBucketCORSInput, DeleteBucketCORSOutput, DeleteBucketCustomDomainInput, DeleteBucketCustomDomainOutput, DeleteBucketEncryptionInput, DeleteBucketEncryptionOutput, DeleteBucketInput, DeleteBucketInventoryInput, DeleteBucketInventoryOutput, DeleteBucketLifecycleInput, DeleteBucketLifecycleOutput, DeleteBucketMirrorBackInput, DeleteBucketMirrorBackOutput, DeleteBucketOutput, DeleteBucketPolicyInput, DeleteBucketPolicyOutput, DeleteBucketRealTimeLogInput, DeleteBucketRealTimeLogOutput, DeleteBucketRenameInput, DeleteBucketRenameOutput, DeleteBucketReplicationInput, DeleteBucketReplicationOutput, DeleteBucketTaggingInput, DeleteBucketTaggingOutput, DeleteBucketWebsiteInput, DeleteBucketWebsiteOutput, DoesBucketExistInput, GetBucketACLInput, GetBucketACLOutput, GetBucketAccessMonitorInput, GetBucketAccessMonitorOutput, GetBucketCORSInput, GetBucketCORSOutput, GetBucketEncryptionInput, GetBucketEncryptionOutput, GetBucketInfoInput, GetBucketInfoOutput, GetBucketInventoryInput, GetBucketInventoryOutput, GetBucketLifecycleInput, GetBucketLifecycleOutput, GetBucketLocationInput, GetBucketLocationOutput, GetBucketMirrorBackInput, GetBucketMirrorBackOutput, GetBucketNotificationType2Input, GetBucketNotificationType2Output, GetBucketPolicyInput, GetBucketPolicyOutput, GetBucketRealTimeLogInput, GetBucketRealTimeLogOutput, GetBucketRenameInput, GetBucketRenameOutput, GetBucketReplicationInput, GetBucketReplicationOutput, GetBucketTaggingInput, GetBucketTaggingOutput, GetBucketTrashInput, GetBucketTrashOutput, GetBucketTypeInput, GetBucketTypeOutput, GetBucketVersioningInput, GetBucketVersioningOutput, GetBucketWebsiteInput, GetBucketWebsiteOutput, HeadBucketInput, HeadBucketOutput, ListBucketCustomDomainInput, ListBucketCustomDomainOutput, ListBucketInventoryInput, ListBucketInventoryOutput, ListBucketsInput, ListBucketsOutput, PutBucketACLInput, PutBucketACLOutput, PutBucketAccessMonitorInput, PutBucketAccessMonitorOutput, PutBucketCORSInput, PutBucketCORSOutput, PutBucketCustomDomainInput, PutBucketCustomDomainOutput, PutBucketEncryptionInput, PutBucketEncryptionOutput, PutBucketInventoryInput, PutBucketInventoryOutput, PutBucketLifecycleInput, PutBucketLifecycleOutput, PutBucketMirrorBackInput, PutBucketMirrorBackOutput, PutBucketNotificationType2Input, PutBucketNotificationType2Output, PutBucketPolicyInput, PutBucketPolicyOutput, PutBucketRealTimeLogInput, PutBucketRealTimeLogOutput, PutBucketRenameInput, PutBucketRenameOutput, PutBucketReplicationInput, PutBucketReplicationOutput, PutBucketStorageClassInput, PutBucketStorageClassOutput, PutBucketTaggingInput, PutBucketTaggingOutput, PutBucketTrashInput, PutBucketTrashOutput, PutBucketVersioningInput, PutBucketVersioningOutput, PutBucketWebsiteInput, PutBucketWebsiteOutput};
29use crate::common::{get_common_log_target, GenericInput, RequestInfoTrait};
30use crate::config::ConfigHolder;
31use crate::constant::{ALL_UPLOAD_OPERATIONS, BASE_DELAY_MS, CREDENTIALS_EXPIRES, DEFAULT_MAX_KEYS, GET_OBJECT_TO_FILE_OPERATION, HEADER_CONTENT_LENGTH, HEADER_CONTENT_LENGTH_LOWER, HEADER_EXPECT, HEADER_SDK_RETRY_COUNT, MAX_DELAY_MS, SCHEMA_HTTP, SCHEMA_HTTPS};
32use crate::control::{DeleteQosPolicyInput, DeleteQosPolicyOutput, GetQosPolicyInput, GetQosPolicyOutput, PutQosPolicyInput, PutQosPolicyOutput};
33use crate::credential::{CommonCredentials, CommonCredentialsProvider, Credentials, EnvCredentialsProvider, StaticCredentialsProvider};
34use crate::enumeration::BucketType;
35use crate::error::{GenericError, TosError};
36use crate::http::{HttpRequest, RequestContext};
37use crate::internal::{auto_recognize_content_type, build_certificate, build_identity, check_bucket_and_key, check_need_retry, exceed_high_latency_log_threshold, get_request_url, AdditionalContext, InputTranslator, MockAsyncInputTranslator};
38use crate::multipart::{AbortMultipartUploadInput, AbortMultipartUploadOutput, CompleteMultipartUploadInput, CompleteMultipartUploadOutput, CreateMultipartUploadInput, CreateMultipartUploadOutput, ListMultipartUploadsInput, ListMultipartUploadsOutput, ListPartsInput, ListPartsOutput, UploadPartCopyInput, UploadPartCopyOutput, UploadPartFromBufferInput, UploadPartInput, UploadPartOutput};
39use crate::object::{AppendObjectBasicInput, AppendObjectFromBufferInput, AppendObjectInput, AppendObjectOutput, CopyObjectInput, CopyObjectOutput, DeleteMultiObjectsInput, DeleteMultiObjectsOutput, DeleteObjectInput, DeleteObjectOutput, DeleteObjectTaggingInput, DeleteObjectTaggingOutput, DoesObjectExistInput, FetchObjectInput, FetchObjectOutput, GetFetchTaskInput, GetFetchTaskOutput, GetFileStatusInput, GetFileStatusOutput, GetObjectACLInput, GetObjectACLOutput, GetObjectInput, GetObjectOutput, GetObjectTaggingInput, GetObjectTaggingOutput, GetSymlinkInput, GetSymlinkOutput, HeadObjectInput, HeadObjectOutput, ListObjectVersionsInput, ListObjectVersionsOutput, ListObjectsType2Input, ListObjectsType2Output, ModifyObjectFromBufferInput, ModifyObjectFromFileInput, ModifyObjectInput, ModifyObjectOutput, PutFetchTaskInput, PutFetchTaskOutput, PutObjectACLInput, PutObjectACLOutput, PutObjectBasicInput, PutObjectFromBufferInput, PutObjectFromFileInput, PutObjectInput, PutObjectOutput, PutObjectTaggingInput, PutObjectTaggingOutput, PutSymlinkInput, PutSymlinkOutput, RenameObjectInput, RenameObjectOutput, RestoreObjectInput, RestoreObjectOutput, SetObjectMetaInput, SetObjectMetaOutput, SetObjectTimeInput, SetObjectTimeOutput};
40use crate::reader::{InternalReader, MultiBytes, MultifunctionalReader};
41use crate::tos::ConfigAware;
42use arc_swap::ArcSwap;
43use async_channel::Sender;
44use async_trait::async_trait;
45use bytes::Bytes;
46use futures_core::future::BoxFuture;
47use futures_core::Stream;
48use reqwest::{redirect, Body, Client, Proxy, RequestBuilder};
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt::{Debug, Formatter};
52use std::future::Future;
53use std::marker::PhantomData;
54use std::pin::Pin;
55use std::sync::atomic::{AtomicI8, AtomicU64, Ordering};
56use std::sync::Arc;
57use std::task::{Context, Poll};
58use std::time::{Duration, Instant};
59use tracing::log::{info, warn};
60
61#[async_trait]
62pub trait AsyncRuntime {
63    type JoinError: Error;
64    async fn sleep(&self, duration: Duration);
65    fn spawn<'a, F>(&self, future: F) -> BoxFuture<'a, Result<F::Output, Self::JoinError>>
66    where
67        F: Future + Send + 'static,
68        F::Output: Send + 'static;
69
70    fn block_on<F: Future>(&self, future: F) -> F::Output;
71}
72
73
74#[derive(Debug, Clone, Default)]
75pub struct TosClientBuilder<P, C, S>
76{
77    ak: String,
78    sk: String,
79    security_token: String,
80    region: String,
81    endpoint: String,
82    control_endpoint: String,
83    credentials_provider: Option<P>,
84    config_holder: ConfigHolder,
85    async_runtime: S,
86    c: PhantomData<C>,
87}
88
89impl<P, C, S> TosClientBuilder<P, C, S>
90where
91    P: CredentialsProvider<C> + Send + Sync + 'static,
92    C: Credentials + Send + Sync + 'static,
93    S: AsyncRuntime + Send + Sync + 'static,
94{
95    pub fn build(mut self) -> Result<TosClientImpl<P, C, S>, TosError> {
96        self.config_holder.check(self.endpoint, self.region)?;
97        self.config_holder.check_control(self.control_endpoint)?;
98        self.config_holder.gen_user_agent();
99        let mut client = Client::builder()
100            .user_agent(self.config_holder.user_agent.as_str())
101            .tcp_nodelay(true)
102            .tcp_keepalive(None)
103            .no_gzip()
104            .no_deflate()
105            .no_brotli()
106            .connect_timeout(Duration::from_millis(self.config_holder.connection_timeout as u64))
107            .pool_idle_timeout(Duration::from_millis(self.config_holder.idle_connection_time as u64))
108            .pool_max_idle_per_host(self.config_holder.max_connections as usize);
109        if self.config_holder.request_timeout > 0 {
110            client = client.timeout(Duration::from_millis(self.config_holder.request_timeout as u64));
111        }
112
113        if self.config_holder.follow_redirect_times > 0 {
114            client = client.redirect(redirect::Policy::limited(self.config_holder.follow_redirect_times as usize));
115        } else {
116            client = client.redirect(redirect::Policy::none());
117        }
118
119        if self.config_holder.proxy_host != "" {
120            let mut proxy_url = self.config_holder.proxy_host.as_str();
121            while proxy_url.len() > 0 && proxy_url.ends_with("/") {
122                proxy_url = &proxy_url[0..proxy_url.len() - 1];
123            }
124
125            if proxy_url != "" {
126                let mut proxy_url = proxy_url.to_lowercase();
127                if !proxy_url.starts_with(SCHEMA_HTTP) && !proxy_url.starts_with(SCHEMA_HTTPS) {
128                    proxy_url = format!("{}{}", SCHEMA_HTTP, proxy_url);
129                }
130                if self.config_holder.proxy_port >= 0 {
131                    proxy_url = format!("{}:{}", proxy_url, self.config_holder.proxy_port);
132                }
133
134                let (domain, schema, _) = self.config_holder.parse_domain(proxy_url.as_str())?;
135                if self.config_holder.proxy_username != "" && self.config_holder.proxy_password != "" {
136                    proxy_url = format!("{}://{}:{}@{}", schema, self.config_holder.proxy_username, self.config_holder.proxy_password, domain);
137                } else {
138                    proxy_url = format!("{}://{}", schema, domain);
139                }
140                match Proxy::http(proxy_url.as_str()) {
141                    Err(e) => return Err(TosError::client_error_with_cause("build http proxy error", GenericError::DefaultError(e.to_string()))),
142                    Ok(proxy) => {
143                        client = client.proxy(proxy);
144                    }
145                }
146
147                match Proxy::https(proxy_url) {
148                    Err(e) => return Err(TosError::client_error_with_cause("build https proxy error", GenericError::DefaultError(e.to_string()))),
149                    Ok(proxy) => {
150                        client = client.proxy(proxy);
151                    }
152                }
153            } else {
154                client = client.no_proxy();
155            }
156        } else {
157            client = client.no_proxy();
158        }
159
160        if !self.config_holder.enable_verify_ssl {
161            client = client.danger_accept_invalid_certs(true);
162            #[cfg(feature = "use-native-tls")]
163            {
164                client = client.danger_accept_invalid_hostnames(true);
165            }
166        }
167
168        #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
169        if self.config_holder.ca_crt != "" {
170            client = client.add_root_certificate(build_certificate(self.config_holder.ca_crt.as_str())?);
171        }
172
173        #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
174        if self.config_holder.client_crt != "" && self.config_holder.client_key != "" {
175            client = client.identity(build_identity(self.config_holder.client_crt.as_str(), self.config_holder.client_key.as_str())?);
176        }
177
178        let async_runtime = Arc::new(self.async_runtime);
179        let closed = Arc::new(AtomicI8::new(0));
180        let mut _handlers = create_handlers::<S>();
181        let (sender, _receiver) = async_channel::bounded(1);
182        #[cfg(feature = "tokio-runtime")]
183        if self.config_holder.dns_cache_time > 0 {
184            let port;
185            if let Some(pt) = self.config_holder.port {
186                port = pt;
187            } else if self.config_holder.schema == SCHEMA_HTTPS {
188                port = 443;
189            } else {
190                port = 80;
191            }
192            let (resolver, handler) = crate::asynchronous::dns::InternalDnsResolver::new(self.config_holder.dns_cache_time, self.config_holder.dns_cache_async_refresh,
193                                                                                         port, async_runtime.clone(), closed.clone(), _receiver.clone());
194            client = client.dns_resolver(Arc::new(resolver));
195            _handlers.push(handler);
196        }
197
198        let cp;
199        let mut credentials_can_refresh = false;
200        match self.credentials_provider {
201            Some(p) => {
202                cp = p;
203            }
204            None => {
205                match C::new(self.ak, self.sk, self.security_token) {
206                    Err(ex) => return Err(TosError::client_error_with_cause("create credentials error",
207                                                                            GenericError::DefaultError(ex.to_string()))),
208                    Ok(c) => {
209                        match P::new(c) {
210                            Err(ex) => return Err(TosError::client_error_with_cause("create credentials provider error",
211                                                                                    GenericError::DefaultError(ex.to_string()))),
212                            Ok(p) => {
213                                credentials_can_refresh = true;
214                                cp = p;
215                            }
216                        }
217                    }
218                }
219            }
220        }
221
222        match client.build() {
223            Ok(client) => {
224                #[cfg(not(feature = "tokio-runtime"))]
225                {
226                    Ok(TosClientImpl {
227                        client,
228                        config_holder: ArcSwap::from(Arc::new(self.config_holder)),
229                        credentials_provider: ArcSwap::from(Arc::new(cp)),
230                        credentials_can_refresh,
231                        async_runtime,
232                        c: self.c,
233                        closed,
234                        closed_sender: sender,
235                    })
236                }
237
238                #[cfg(feature = "tokio-runtime")]
239                {
240                    let credentials_provider = Arc::new(cp);
241                    let inner_credentials = Arc::new(tokio::sync::RwLock::new(None));
242                    if !credentials_can_refresh {
243                        let handler = async_refresh_credentials(closed.clone(), async_runtime.clone(),
244                                                                credentials_provider.clone(), inner_credentials.clone(), _receiver.clone());
245                        _handlers.push(Some(handler));
246                    }
247                    let tos_client = TosClientImpl {
248                        client,
249                        config_holder: ArcSwap::from(Arc::new(self.config_holder)),
250                        credentials_provider: ArcSwap::from(credentials_provider),
251                        credentials_can_refresh,
252                        async_runtime,
253                        c: self.c,
254                        closed,
255                        closed_sender: sender,
256                        inner_credentials,
257                        cached_buckets: tokio::sync::RwLock::new(HashMap::new()),
258                        handlers: tokio::sync::Mutex::new(Some(_handlers)),
259                    };
260                    Ok(tos_client)
261                }
262            }
263            Err(e) => {
264                Err(TosError::client_error_with_cause("build tos client error", GenericError::DefaultError(e.to_string())))
265            }
266        }
267    }
268
269    pub fn build_as_trait(self) -> Result<impl TosClient, TosError> {
270        let client = self.build()?;
271        Ok(client)
272    }
273
274    pub fn ak(mut self, ak: impl Into<String>) -> Self {
275        self.ak = ak.into();
276        self
277    }
278
279    pub fn sk(mut self, sk: impl Into<String>) -> Self {
280        self.sk = sk.into();
281        self
282    }
283
284    pub fn security_token(mut self, security_token: impl Into<String>) -> Self {
285        self.security_token = security_token.into();
286        self
287    }
288
289    pub fn credentials_provider(mut self, p: P) -> Self {
290        self.credentials_provider = Some(p);
291        self
292    }
293
294    pub fn region(mut self, region: impl Into<String>) -> Self {
295        self.region = region.into();
296        self
297    }
298
299    pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
300        self.endpoint = endpoint.into();
301        self
302    }
303
304    pub fn control_endpoint(mut self, control_endpoint: impl Into<String>) -> Self {
305        self.control_endpoint = control_endpoint.into();
306        self
307    }
308
309    pub fn request_timeout(mut self, request_timeout: isize) -> Self {
310        if request_timeout > 0 {
311            self.config_holder.request_timeout = request_timeout;
312        }
313        self
314    }
315
316    pub fn connection_timeout(mut self, connection_timeout: isize) -> Self {
317        if connection_timeout > 0 {
318            self.config_holder.connection_timeout = connection_timeout;
319        }
320        self
321    }
322
323    pub fn max_connections(mut self, max_connections: isize) -> Self {
324        if max_connections > 0 {
325            self.config_holder.max_connections = max_connections;
326        }
327        self
328    }
329    pub fn idle_connection_time(mut self, idle_connection_time: isize) -> Self {
330        if idle_connection_time > 0 {
331            self.config_holder.idle_connection_time = idle_connection_time;
332        }
333        self
334    }
335
336    pub fn enable_crc(mut self, enable_crc: bool) -> Self {
337        self.config_holder.enable_crc = enable_crc;
338        self
339    }
340
341    pub fn enable_verify_ssl(mut self, enable_verify_ssl: bool) -> Self {
342        self.config_holder.enable_verify_ssl = enable_verify_ssl;
343        self
344    }
345
346    pub fn max_retry_count(mut self, max_retry_count: isize) -> Self {
347        self.config_holder.max_retry_count = max_retry_count;
348        self
349    }
350    pub fn auto_recognize_content_type(mut self, auto_recognize_content_type: bool) -> Self {
351        self.config_holder.auto_recognize_content_type = auto_recognize_content_type;
352        self
353    }
354    pub fn is_custom_domain(mut self, is_custom_domain: bool) -> Self {
355        self.config_holder.is_custom_domain = is_custom_domain;
356        self
357    }
358    pub fn proxy_host(mut self, proxy_host: impl Into<String>) -> Self {
359        self.config_holder.proxy_host = proxy_host.into();
360        self
361    }
362    pub fn proxy_port(mut self, proxy_port: isize) -> Self {
363        self.config_holder.proxy_port = proxy_port.into();
364        self
365    }
366    pub fn proxy_username(mut self, proxy_username: impl Into<String>) -> Self {
367        self.config_holder.proxy_username = proxy_username.into();
368        self
369    }
370    pub fn proxy_password(mut self, proxy_password: impl Into<String>) -> Self {
371        self.config_holder.proxy_password = proxy_password.into();
372        self
373    }
374    pub fn disable_encoding_meta(mut self, disable_encoding_meta: bool) -> Self {
375        self.config_holder.disable_encoding_meta = disable_encoding_meta;
376        self
377    }
378    pub fn expect_100_continue_threshold(mut self, expect_100_continue_threshold: isize) -> Self {
379        self.config_holder.expect_100_continue_threshold = expect_100_continue_threshold;
380        self
381    }
382    #[cfg(feature = "tokio-runtime")]
383    pub fn dns_cache_time(mut self, dns_cache_time: isize) -> Self {
384        self.config_holder.dns_cache_time = dns_cache_time;
385        self
386    }
387
388    #[cfg(feature = "tokio-runtime")]
389    pub fn dns_cache_async_refresh(mut self, dns_cache_async_refresh: bool) -> Self {
390        self.config_holder.dns_cache_async_refresh = dns_cache_async_refresh;
391        self
392    }
393
394    pub fn high_latency_log_threshold(mut self, high_latency_log_threshold: isize) -> Self {
395        self.config_holder.high_latency_log_threshold = high_latency_log_threshold;
396        self
397    }
398    pub fn user_agent_product_name(mut self, user_agent_product_name: impl Into<String>) -> Self {
399        self.config_holder.user_agent_product_name = user_agent_product_name.into();
400        self
401    }
402    pub fn user_agent_soft_name(mut self, user_agent_soft_name: impl Into<String>) -> Self {
403        self.config_holder.user_agent_soft_name = user_agent_soft_name.into();
404        self
405    }
406    pub fn user_agent_soft_version(mut self, user_agent_soft_version: impl Into<String>) -> Self {
407        self.config_holder.user_agent_soft_version = user_agent_soft_version.into();
408        self
409    }
410    pub fn user_agent_customized_key_values(mut self, user_agent_customized_key_values: impl Into<HashMap<String, String>>) -> Self {
411        self.config_holder.user_agent_customized_key_values = Some(user_agent_customized_key_values.into());
412        self
413    }
414    pub fn follow_redirect_times(mut self, follow_redirect_times: isize) -> Self {
415        self.config_holder.follow_redirect_times = follow_redirect_times;
416        self
417    }
418
419    #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
420    pub fn client_crt(mut self, client_crt: impl Into<String>) -> Self {
421        self.config_holder.client_crt = client_crt.into();
422        self
423    }
424
425    #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
426    pub fn client_key(mut self, client_key: impl Into<String>) -> Self {
427        self.config_holder.client_key = client_key.into();
428        self
429    }
430
431    #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
432    pub fn ca_crt(mut self, ca_crt: impl Into<String>) -> Self {
433        self.config_holder.ca_crt = ca_crt.into();
434        self
435    }
436    pub fn async_runtime(mut self, async_runtime: impl Into<S>) -> Self {
437        self.async_runtime = async_runtime.into();
438        self
439    }
440}
441
442pub fn builder<S>() -> TosClientBuilder<CommonCredentialsProvider<CommonCredentials>, CommonCredentials, S>
443where
444    S: AsyncRuntime + Default,
445{
446    TosClientBuilder::default()
447}
448
449pub fn builder_common<P, C, S>() -> TosClientBuilder<P, C, S>
450where
451    S: AsyncRuntime + Default,
452    P: CredentialsProvider<C> + Send + Sync + Default + 'static,
453    C: Credentials + Send + Sync + Default + 'static,
454{
455    TosClientBuilder::default()
456}
457
458pub fn static_credentials_provider(ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>)
459                                   -> StaticCredentialsProvider<CommonCredentials> {
460    StaticCredentialsProvider::new(ak, sk, security_token).unwrap()
461}
462#[cfg(not(feature = "tokio-runtime"))]
463fn create_handlers<S>() -> Vec<i32> {
464    Vec::with_capacity(2)
465}
466
467#[cfg(feature = "tokio-runtime")]
468fn create_handlers<S>() -> Vec<Option<BoxFuture<'static, Result<(), S::JoinError>>>>
469where
470    S: AsyncRuntime,
471{
472    Vec::with_capacity(2)
473}
474
475#[cfg(feature = "tokio-runtime")]
476fn async_refresh_credentials<P, C, S>(closed: Arc<AtomicI8>, async_runtime: Arc<S>,
477                                      credentials_provider: Arc<P>,
478                                      inner_credentials: Arc<tokio::sync::RwLock<Option<Arc<C>>>>,
479                                      receiver: async_channel::Receiver<()>) -> BoxFuture<'static, Result<(), S::JoinError>>
480where
481    P: CredentialsProvider<C> + Send + Sync + 'static,
482    C: Credentials + Send + Sync + 'static,
483    S: AsyncRuntime + Send + Sync + 'static,
484{
485    let async_runtime2 = async_runtime.clone();
486    async_runtime.spawn(async move {
487        loop {
488            if closed.load(Ordering::Acquire) == 1 {
489                return;
490            }
491            tokio::select! {
492                _ = async_runtime2.sleep(Duration::from_secs(crate::constant::CREDENTIALS_REFRESH_INTERVAL)) => {}
493
494                _ = receiver.recv() =>{
495                    return;
496                }
497            }
498            match credentials_provider.credentials(CREDENTIALS_EXPIRES).await {
499                Err(ex) => warn!(target: get_common_log_target(), "async load credentials error, {}", ex),
500                Ok(c) => {
501                    let mut inner_credentials = inner_credentials.write().await;
502                    *inner_credentials = Some(c.clone());
503                }
504            }
505        }
506    })
507}
508
509pub fn env_credentials_provider() -> EnvCredentialsProvider<CommonCredentials> {
510    EnvCredentialsProvider::new().unwrap()
511}
512
513pub struct BufferStream {
514    inner: Option<Bytes>,
515}
516
517impl Stream for BufferStream {
518    type Item = Result<Bytes, crate::error::CommonError>;
519
520    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
521        if self.inner.is_none() {
522            return Poll::Ready(None);
523        }
524        Poll::Ready(Some(Ok(self.inner.take().unwrap())))
525    }
526
527    fn size_hint(&self) -> (usize, Option<usize>) {
528        match &self.inner {
529            None => (0, None),
530            Some(v) => (0, Some(v.len()))
531        }
532    }
533}
534
535pub fn new_stream(data: impl AsRef<[u8]>) -> BufferStream {
536    BufferStream { inner: Some(Bytes::from(data.as_ref().to_owned())) }
537}
538
539pub fn new_stream_nocopy(data: impl Into<Vec<u8>>) -> BufferStream {
540    BufferStream { inner: Some(Bytes::from(data.into())) }
541}
542
543#[async_trait]
544pub trait TosClient: BucketAPI + ObjectAPI + MultipartAPI + PaginatorAPI + ControlAPI + SignerAPI + ConfigAware {
545    fn refresh_credentials(&self, ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>) -> bool;
546    fn refresh_endpoint_region(&self, endpoint: impl Into<String>, region: impl Into<String>) -> bool;
547}
548
549#[cfg(feature = "tokio-runtime")]
550pub(crate) type BucketCache = (GetBucketTypeOutput, chrono::DateTime<chrono::Utc>);
551
552pub struct TosClientImpl<P, C, S>
553where
554    S: AsyncRuntime,
555{
556    pub(crate) client: Client,
557    pub(crate) config_holder: ArcSwap<ConfigHolder>,
558    pub(crate) credentials_provider: ArcSwap<P>,
559    pub(crate) async_runtime: Arc<S>,
560    pub(crate) c: PhantomData<C>,
561    pub(crate) credentials_can_refresh: bool,
562    pub(crate) closed: Arc<AtomicI8>,
563    pub(crate) closed_sender: Sender<()>,
564
565    #[cfg(feature = "tokio-runtime")]
566    pub(crate) inner_credentials: Arc<tokio::sync::RwLock<Option<Arc<C>>>>,
567    #[cfg(feature = "tokio-runtime")]
568    pub(crate) cached_buckets: tokio::sync::RwLock<HashMap<String, BucketCache>>,
569    #[cfg(feature = "tokio-runtime")]
570    pub(crate) handlers: tokio::sync::Mutex<Option<Vec<Option<BoxFuture<'static, Result<(), S::JoinError>>>>>>,
571}
572
573impl<P, C, S> Debug for TosClientImpl<P, C, S>
574where
575    S: AsyncRuntime,
576{
577    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
578        write!(f, "client: {:?}, config_holder: {:?}", self.client, self.config_holder)
579    }
580}
581
582unsafe impl<P, C, S> Sync for TosClientImpl<P, C, S>
583where
584    S: AsyncRuntime,
585{}
586
587impl<P, C, S> ConfigAware for TosClientImpl<P, C, S>
588where
589    S: AsyncRuntime,
590{
591    fn is_custom_domain(&self) -> bool {
592        self.config_holder.load().is_custom_domain
593    }
594}
595#[async_trait]
596impl<P, C, S> ObjectAPI for TosClientImpl<P, C, S>
597where
598    P: CredentialsProvider<C> + Send + Sync + 'static,
599    C: Credentials + Send + Sync + 'static,
600    S: AsyncRuntime + Send + Sync + 'static,
601{
602    async fn put_object<B>(&self, input: &PutObjectInput<B>) -> Result<PutObjectOutput, TosError>
603    where
604        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static,
605    {
606        self.do_request(input).await
607    }
608
609    async fn put_object_from_buffer(&self, input: &PutObjectFromBufferInput) -> Result<PutObjectOutput, TosError> {
610        self.do_request::<_, _, InternalReader<MultiBytes>>(input).await
611    }
612
613    #[cfg(feature = "tokio-runtime")]
614    async fn put_object_from_file(&self, input: &crate::object::PutObjectFromFileInput) -> Result<PutObjectOutput, TosError> {
615        self.do_request_af::<_, _, crate::asynchronous::file::FileReader>(input).await
616    }
617
618    async fn get_object(&self, input: &GetObjectInput) -> Result<GetObjectOutput, TosError> {
619        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
620    }
621    #[cfg(feature = "tokio-runtime")]
622    async fn get_object_to_file(&self, input: &crate::object::GetObjectToFileInput) -> Result<crate::object::GetObjectToFileOutput, TosError> {
623        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
624    }
625    async fn delete_object(&self, input: &DeleteObjectInput) -> Result<DeleteObjectOutput, TosError> {
626        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
627    }
628
629    async fn head_object(&self, input: &HeadObjectInput) -> Result<HeadObjectOutput, TosError> {
630        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
631    }
632
633    async fn list_objects_type2(&self, input: &ListObjectsType2Input) -> Result<ListObjectsType2Output, TosError> {
634        if input.list_only_once {
635            return self.do_request::<_, _, InternalReader<StreamVec>>(input).await;
636        }
637
638        let mut input = input.clone();
639        if input.max_keys <= 0 {
640            input.max_keys = DEFAULT_MAX_KEYS;
641        }
642        let mut _output: Option<ListObjectsType2Output> = None;
643        loop {
644            let mut temp_output = self.do_request::<ListObjectsType2Input, ListObjectsType2Output, InternalReader<StreamVec>>(&input).await?;
645            if _output.is_none() {
646                _output = Some(temp_output);
647            } else {
648                let output = _output.as_mut().unwrap();
649                output.key_count += temp_output.key_count;
650                output.is_truncated = temp_output.is_truncated;
651                output.next_continuation_token = temp_output.next_continuation_token;
652                output.contents.append(&mut temp_output.contents);
653                output.common_prefixes.append(&mut temp_output.common_prefixes);
654            }
655
656            let output = _output.as_ref().unwrap();
657            if !output.is_truncated || output.contents.len() + output.common_prefixes.len() >= input.max_keys as usize || output.key_count >= input.max_keys {
658                break;
659            }
660            input.continuation_token = output.next_continuation_token.clone();
661            input.max_keys = input.max_keys - output.key_count;
662        }
663
664        Ok(_output.unwrap())
665    }
666
667    async fn copy_object(&self, input: &CopyObjectInput) -> Result<CopyObjectOutput, TosError> {
668        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
669    }
670
671    async fn delete_multi_objects(&self, input: &DeleteMultiObjectsInput) -> Result<DeleteMultiObjectsOutput, TosError> {
672        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
673    }
674
675    async fn get_object_acl(&self, input: &GetObjectACLInput) -> Result<GetObjectACLOutput, TosError> {
676        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
677    }
678
679    async fn list_object_versions(&self, input: &ListObjectVersionsInput) -> Result<ListObjectVersionsOutput, TosError> {
680        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
681    }
682
683    async fn put_object_acl(&self, input: &PutObjectACLInput) -> Result<PutObjectACLOutput, TosError> {
684        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
685    }
686
687    async fn set_object_meta(&self, input: &SetObjectMetaInput) -> Result<SetObjectMetaOutput, TosError> {
688        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
689    }
690    async fn append_object<B>(&self, input: &AppendObjectInput<B>) -> Result<AppendObjectOutput, TosError>
691    where
692        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static,
693    {
694        let mut hinput = GetBucketTypeInput::new(input.bucket());
695        hinput.set_request_host(input.request_host());
696        if let Some(request_date) = input.request_date() {
697            hinput.set_request_date(request_date);
698        }
699        if let Some(bt) = self.get_bucket_type(&hinput).await?.bucket_type() {
700            if bt == &BucketType::BucketTypeHns {
701                let (if_match, forbid_overwrite) = self.check_object_status(input.bucket(), input.key(),
702                                                                            input.offset(), input.content_length(), &input.inner.generic_input).await?;
703                if forbid_overwrite {
704                    let mut minput: PutObjectInput<B> = PutObjectInput::default();
705                    minput.set_request_host(input.request_host());
706                    if let Some(request_date) = input.request_date() {
707                        minput.set_request_date(request_date);
708                    }
709                    minput.inner = self.trans_append_object_input(&input.inner);
710                    minput.set_forbid_overwrite(forbid_overwrite);
711                    if let Some(adts) = input.async_data_transfer_listener() {
712                        minput.set_async_data_transfer_listener(adts.clone());
713                    }
714                    if let Some(b) = input.content.take() {
715                        minput.set_content(b);
716                    }
717                    let output = self.put_object(&minput).await?;
718                    return Ok(AppendObjectOutput {
719                        request_info: output.request_info,
720                        next_append_offset: input.content_length(),
721                        hash_crc64ecma: output.hash_crc64ecma,
722                    });
723                }
724
725                let mut minput: ModifyObjectInput<B> = ModifyObjectInput::new(input.bucket(), input.key());
726                minput.set_request_host(input.request_host());
727                if let Some(request_date) = input.request_date() {
728                    minput.set_request_date(request_date);
729                }
730                minput.pre_hash_crc64ecma = input.pre_hash_crc64ecma();
731                minput.set_if_match(if_match);
732                minput.set_offset(input.offset());
733                minput.set_content_length(input.content_length());
734                minput.set_notification_custom_parameters(input.notification_custom_parameters());
735                minput.set_traffic_limit(input.traffic_limit());
736                if let Some(adts) = input.async_data_transfer_listener() {
737                    minput.set_async_data_transfer_listener(adts.clone());
738                }
739                if let Some(b) = input.content.take() {
740                    minput.set_content(b);
741                }
742                let output = self.modify_object(&minput).await?;
743                return Ok(AppendObjectOutput {
744                    request_info: output.request_info,
745                    next_append_offset: output.next_modify_offset,
746                    hash_crc64ecma: output.hash_crc64ecma,
747                });
748            }
749        }
750        self.do_request(input).await
751    }
752
753    async fn append_object_from_buffer(&self, input: &AppendObjectFromBufferInput) -> Result<AppendObjectOutput, TosError> {
754        let mut hinput = GetBucketTypeInput::new(input.bucket());
755        hinput.set_request_host(input.request_host());
756        if let Some(request_date) = input.request_date() {
757            hinput.set_request_date(request_date);
758        }
759        if let Some(bt) = self.get_bucket_type(&hinput).await?.bucket_type() {
760            if bt == &BucketType::BucketTypeHns {
761                let (if_match, forbid_overwrite) = self.check_object_status(input.bucket(), input.key(),
762                                                                            input.offset(), input.content_length(), &input.inner.generic_input).await?;
763                if forbid_overwrite {
764                    let mut minput = PutObjectFromBufferInput::default();
765                    minput.set_request_host(input.request_host());
766                    if let Some(request_date) = input.request_date() {
767                        minput.set_request_date(request_date);
768                    }
769                    minput.inner = self.trans_append_object_input(&input.inner);
770                    minput.set_forbid_overwrite(forbid_overwrite);
771                    if let Some(adts) = input.async_data_transfer_listener() {
772                        minput.set_async_data_transfer_listener(adts.clone());
773                    }
774                    if let Some(b) = input.content() {
775                        for item in b {
776                            minput.append_content_nocopy(item.to_vec());
777                        }
778                    }
779                    let output = self.put_object_from_buffer(&minput).await?;
780                    return Ok(AppendObjectOutput {
781                        request_info: output.request_info,
782                        next_append_offset: input.content_length(),
783                        hash_crc64ecma: output.hash_crc64ecma,
784                    });
785                }
786
787                let mut minput = ModifyObjectFromBufferInput::new(input.bucket(), input.key());
788                minput.set_request_host(input.request_host());
789                if let Some(request_date) = input.request_date() {
790                    minput.set_request_date(request_date);
791                }
792                minput.pre_hash_crc64ecma = input.pre_hash_crc64ecma();
793                minput.set_if_match(if_match);
794                minput.set_offset(input.offset());
795                minput.set_content_length(input.content_length());
796                minput.set_notification_custom_parameters(input.notification_custom_parameters());
797                minput.set_traffic_limit(input.traffic_limit());
798                if let Some(adts) = input.async_data_transfer_listener() {
799                    minput.set_async_data_transfer_listener(adts.clone());
800                }
801                if let Some(b) = input.content() {
802                    for item in b {
803                        minput.append_content_nocopy(item.to_vec());
804                    }
805                }
806                let output = self.modify_object_from_buffer(&minput).await?;
807                return Ok(AppendObjectOutput {
808                    request_info: output.request_info,
809                    next_append_offset: output.next_modify_offset,
810                    hash_crc64ecma: output.hash_crc64ecma,
811                });
812            }
813        }
814        self.do_request::<_, _, InternalReader<MultiBytes>>(input).await
815    }
816    #[cfg(feature = "tokio-runtime")]
817    async fn append_object_from_file(&self, input: &crate::object::AppendObjectFromFileInput) -> Result<AppendObjectOutput, TosError> {
818        let mut hinput = GetBucketTypeInput::new(input.bucket());
819        hinput.set_request_host(input.request_host());
820        if let Some(request_date) = input.request_date() {
821            hinput.set_request_date(request_date);
822        }
823        if let Some(bt) = self.get_bucket_type(&hinput).await?.bucket_type() {
824            if bt == &BucketType::BucketTypeHns {
825                let (if_match, forbid_overwrite) = self.check_object_status(input.bucket(), input.key(),
826                                                                            input.offset(), input.content_length(), &input.inner.generic_input).await?;
827                if forbid_overwrite {
828                    let mut minput = PutObjectFromFileInput::default();
829                    minput.set_request_host(input.request_host());
830                    if let Some(request_date) = input.request_date() {
831                        minput.set_request_date(request_date);
832                    }
833                    minput.inner = self.trans_append_object_input(&input.inner);
834                    minput.set_forbid_overwrite(forbid_overwrite);
835                    if let Some(adts) = input.async_data_transfer_listener() {
836                        minput.set_async_data_transfer_listener(adts.clone());
837                    }
838                    minput.set_file_path(input.file_path());
839                    let output = self.put_object_from_file(&minput).await?;
840                    return Ok(AppendObjectOutput {
841                        request_info: output.request_info,
842                        next_append_offset: input.content_length(),
843                        hash_crc64ecma: output.hash_crc64ecma,
844                    });
845                }
846
847                let mut minput = ModifyObjectFromFileInput::new(input.bucket(), input.key());
848                minput.set_request_host(input.request_host());
849                if let Some(request_date) = input.request_date() {
850                    minput.set_request_date(request_date);
851                }
852                minput.pre_hash_crc64ecma = input.pre_hash_crc64ecma();
853                minput.set_if_match(if_match);
854                minput.set_offset(input.offset());
855                minput.set_content_length(input.content_length());
856                minput.set_notification_custom_parameters(input.notification_custom_parameters());
857                minput.set_traffic_limit(input.traffic_limit());
858                if let Some(adts) = input.async_data_transfer_listener() {
859                    minput.set_async_data_transfer_listener(adts.clone());
860                }
861                minput.set_file_path(input.file_path());
862                let output = self.modify_object_from_file(&minput).await?;
863                return Ok(AppendObjectOutput {
864                    request_info: output.request_info,
865                    next_append_offset: output.next_modify_offset,
866                    hash_crc64ecma: output.hash_crc64ecma,
867                });
868            }
869        }
870        self.do_request_af::<_, _, crate::asynchronous::file::FileReader>(input).await
871    }
872
873    async fn fetch_object(&self, input: &FetchObjectInput) -> Result<FetchObjectOutput, TosError> {
874        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
875    }
876
877    async fn put_fetch_task(&self, input: &PutFetchTaskInput) -> Result<PutFetchTaskOutput, TosError> {
878        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
879    }
880
881    async fn get_fetch_task(&self, input: &GetFetchTaskInput) -> Result<GetFetchTaskOutput, TosError> {
882        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
883    }
884
885    async fn put_object_tagging(&self, input: &PutObjectTaggingInput) -> Result<PutObjectTaggingOutput, TosError> {
886        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
887    }
888
889    async fn get_object_tagging(&self, input: &GetObjectTaggingInput) -> Result<GetObjectTaggingOutput, TosError> {
890        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
891    }
892
893    async fn delete_object_tagging(&self, input: &DeleteObjectTaggingInput) -> Result<DeleteObjectTaggingOutput, TosError> {
894        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
895    }
896
897    async fn rename_object(&self, input: &RenameObjectInput) -> Result<RenameObjectOutput, TosError> {
898        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
899    }
900
901    async fn restore_object(&self, input: &RestoreObjectInput) -> Result<RestoreObjectOutput, TosError> {
902        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
903    }
904
905    async fn put_symlink(&self, input: &PutSymlinkInput) -> Result<PutSymlinkOutput, TosError> {
906        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
907    }
908
909    async fn get_symlink(&self, input: &GetSymlinkInput) -> Result<GetSymlinkOutput, TosError> {
910        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
911    }
912
913    async fn get_file_status(&self, input: &GetFileStatusInput) -> Result<GetFileStatusOutput, TosError> {
914        let mut hinput = GetBucketTypeInput::new(input.bucket());
915        hinput.set_request_host(input.request_host());
916        if let Some(request_date) = input.request_date() {
917            hinput.set_request_date(request_date);
918        }
919        if let Some(bt) = self.get_bucket_type(&hinput).await?.bucket_type() {
920            if bt == &BucketType::BucketTypeHns {
921                let mut hinput = HeadObjectInput::new(&input.bucket, &input.key);
922                hinput.set_request_host(input.request_host());
923                if let Some(request_date) = input.request_date() {
924                    hinput.set_request_date(request_date);
925                }
926                let o = self.head_object(&hinput).await?;
927                return Ok(GetFileStatusOutput {
928                    request_info: o.request_info,
929                    key: input.key.clone(),
930                    size: o.content_length,
931                    last_modified_string: None,
932                    last_modified: o.last_modified,
933                    crc32: "".to_string(),
934                    crc64: o.hash_crc64ecma.to_string(),
935                    etag: o.etag,
936                    object_type: o.object_type,
937                });
938            }
939        }
940        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
941    }
942
943    async fn does_object_exist(&self, input: &DoesObjectExistInput) -> Result<bool, TosError> {
944        let mut hinput = HeadObjectInput::new_with_version_id(input.bucket(), input.key(), input.version_id());
945        if let Some(request_date) = input.request_date() {
946            hinput.set_request_date(request_date);
947        }
948        hinput.set_request_host(input.request_host());
949        match self.head_object(&hinput).await {
950            Ok(_) => Ok(true),
951            Err(ex) => {
952                if let Some(err) = ex.as_server_error() {
953                    if err.status_code() == 400 && err.ec() == "0015-00000008" {
954                        return Ok(true);
955                    }
956                    if err.status_code() == 404 && err.ec() == "0017-00000003" {
957                        return Ok(false);
958                    }
959                }
960                Err(ex)
961            }
962        }
963    }
964
965    async fn set_object_time(&self, input: &SetObjectTimeInput) -> Result<SetObjectTimeOutput, TosError> {
966        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
967    }
968}
969
970#[cfg(feature = "asynchronous")]
971#[async_trait]
972impl<P, C, S> BucketAPI for TosClientImpl<P, C, S>
973where
974    P: CredentialsProvider<C> + Send + Sync + 'static,
975    C: Credentials + Send + Sync + 'static,
976    S: AsyncRuntime + Send + Sync + 'static,
977{
978    async fn create_bucket(&self, input: &CreateBucketInput) -> Result<CreateBucketOutput, TosError> {
979        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
980    }
981
982    async fn head_bucket(&self, input: &HeadBucketInput) -> Result<HeadBucketOutput, TosError> {
983        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
984    }
985
986    async fn delete_bucket(&self, input: &DeleteBucketInput) -> Result<DeleteBucketOutput, TosError> {
987        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
988    }
989
990    async fn list_buckets(&self, input: &ListBucketsInput) -> Result<ListBucketsOutput, TosError> {
991        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
992    }
993
994    async fn put_bucket_cors(&self, input: &PutBucketCORSInput) -> Result<PutBucketCORSOutput, TosError> {
995        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
996    }
997
998    async fn get_bucket_cors(&self, input: &GetBucketCORSInput) -> Result<GetBucketCORSOutput, TosError> {
999        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1000    }
1001
1002    async fn delete_bucket_cors(&self, input: &DeleteBucketCORSInput) -> Result<DeleteBucketCORSOutput, TosError> {
1003        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1004    }
1005
1006    async fn put_bucket_storage_class(&self, input: &PutBucketStorageClassInput) -> Result<PutBucketStorageClassOutput, TosError> {
1007        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1008    }
1009
1010    async fn get_bucket_location(&self, input: &GetBucketLocationInput) -> Result<GetBucketLocationOutput, TosError> {
1011        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1012    }
1013
1014    async fn put_bucket_lifecycle(&self, input: &PutBucketLifecycleInput) -> Result<PutBucketLifecycleOutput, TosError> {
1015        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1016    }
1017
1018    async fn get_bucket_lifecycle(&self, input: &GetBucketLifecycleInput) -> Result<GetBucketLifecycleOutput, TosError> {
1019        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1020    }
1021
1022    async fn delete_bucket_lifecycle(&self, input: &DeleteBucketLifecycleInput) -> Result<DeleteBucketLifecycleOutput, TosError> {
1023        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1024    }
1025
1026    async fn put_bucket_policy(&self, input: &PutBucketPolicyInput) -> Result<PutBucketPolicyOutput, TosError> {
1027        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1028    }
1029
1030    async fn get_bucket_policy(&self, input: &GetBucketPolicyInput) -> Result<GetBucketPolicyOutput, TosError> {
1031        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1032    }
1033
1034    async fn delete_bucket_policy(&self, input: &DeleteBucketPolicyInput) -> Result<DeleteBucketPolicyOutput, TosError> {
1035        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1036    }
1037
1038    async fn put_bucket_mirror_back(&self, input: &PutBucketMirrorBackInput) -> Result<PutBucketMirrorBackOutput, TosError> {
1039        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1040    }
1041
1042    async fn get_bucket_mirror_back(&self, input: &GetBucketMirrorBackInput) -> Result<GetBucketMirrorBackOutput, TosError> {
1043        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1044    }
1045
1046    async fn delete_bucket_mirror_back(&self, input: &DeleteBucketMirrorBackInput) -> Result<DeleteBucketMirrorBackOutput, TosError> {
1047        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1048    }
1049
1050    async fn put_bucket_acl(&self, input: &PutBucketACLInput) -> Result<PutBucketACLOutput, TosError> {
1051        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1052    }
1053
1054    async fn get_bucket_acl(&self, input: &GetBucketACLInput) -> Result<GetBucketACLOutput, TosError> {
1055        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1056    }
1057
1058    async fn put_bucket_replication(&self, input: &PutBucketReplicationInput) -> Result<PutBucketReplicationOutput, TosError> {
1059        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1060    }
1061
1062    async fn get_bucket_replication(&self, input: &GetBucketReplicationInput) -> Result<GetBucketReplicationOutput, TosError> {
1063        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1064    }
1065
1066    async fn delete_bucket_replication(&self, input: &DeleteBucketReplicationInput) -> Result<DeleteBucketReplicationOutput, TosError> {
1067        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1068    }
1069
1070    async fn put_bucket_versioning(&self, input: &PutBucketVersioningInput) -> Result<PutBucketVersioningOutput, TosError> {
1071        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1072    }
1073
1074    async fn get_bucket_versioning(&self, input: &GetBucketVersioningInput) -> Result<GetBucketVersioningOutput, TosError> {
1075        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1076    }
1077
1078    async fn put_bucket_website(&self, input: &PutBucketWebsiteInput) -> Result<PutBucketWebsiteOutput, TosError> {
1079        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1080    }
1081
1082    async fn get_bucket_website(&self, input: &GetBucketWebsiteInput) -> Result<GetBucketWebsiteOutput, TosError> {
1083        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1084    }
1085
1086    async fn delete_bucket_website(&self, input: &DeleteBucketWebsiteInput) -> Result<DeleteBucketWebsiteOutput, TosError> {
1087        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1088    }
1089
1090    async fn put_bucket_custom_domain(&self, input: &PutBucketCustomDomainInput) -> Result<PutBucketCustomDomainOutput, TosError> {
1091        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1092    }
1093    async fn list_bucket_custom_domain(&self, input: &ListBucketCustomDomainInput) -> Result<ListBucketCustomDomainOutput, TosError> {
1094        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1095    }
1096
1097    async fn delete_bucket_custom_domain(&self, input: &DeleteBucketCustomDomainInput) -> Result<DeleteBucketCustomDomainOutput, TosError> {
1098        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1099    }
1100
1101    async fn put_bucket_real_time_log(&self, input: &PutBucketRealTimeLogInput) -> Result<PutBucketRealTimeLogOutput, TosError> {
1102        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1103    }
1104
1105    async fn get_bucket_real_time_log(&self, input: &GetBucketRealTimeLogInput) -> Result<GetBucketRealTimeLogOutput, TosError> {
1106        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1107    }
1108
1109    async fn delete_bucket_real_time_log(&self, input: &DeleteBucketRealTimeLogInput) -> Result<DeleteBucketRealTimeLogOutput, TosError> {
1110        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1111    }
1112
1113    async fn put_bucket_rename(&self, input: &PutBucketRenameInput) -> Result<PutBucketRenameOutput, TosError> {
1114        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1115    }
1116
1117    async fn get_bucket_rename(&self, input: &GetBucketRenameInput) -> Result<GetBucketRenameOutput, TosError> {
1118        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1119    }
1120
1121    async fn delete_bucket_rename(&self, input: &DeleteBucketRenameInput) -> Result<DeleteBucketRenameOutput, TosError> {
1122        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1123    }
1124
1125    async fn put_bucket_encryption(&self, input: &PutBucketEncryptionInput) -> Result<PutBucketEncryptionOutput, TosError> {
1126        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1127    }
1128
1129    async fn get_bucket_encryption(&self, input: &GetBucketEncryptionInput) -> Result<GetBucketEncryptionOutput, TosError> {
1130        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1131    }
1132
1133    async fn delete_bucket_encryption(&self, input: &DeleteBucketEncryptionInput) -> Result<DeleteBucketEncryptionOutput, TosError> {
1134        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1135    }
1136
1137    async fn put_bucket_tagging(&self, input: &PutBucketTaggingInput) -> Result<PutBucketTaggingOutput, TosError> {
1138        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1139    }
1140
1141    async fn get_bucket_tagging(&self, input: &GetBucketTaggingInput) -> Result<GetBucketTaggingOutput, TosError> {
1142        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1143    }
1144
1145    async fn delete_bucket_tagging(&self, input: &DeleteBucketTaggingInput) -> Result<DeleteBucketTaggingOutput, TosError> {
1146        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1147    }
1148
1149    async fn put_bucket_notification_type2(&self, input: &PutBucketNotificationType2Input) -> Result<PutBucketNotificationType2Output, TosError> {
1150        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1151    }
1152
1153    async fn get_bucket_notification_type2(&self, input: &GetBucketNotificationType2Input) -> Result<GetBucketNotificationType2Output, TosError> {
1154        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1155    }
1156
1157    async fn put_bucket_inventory(&self, input: &PutBucketInventoryInput) -> Result<PutBucketInventoryOutput, TosError> {
1158        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1159    }
1160
1161    async fn get_bucket_inventory(&self, input: &GetBucketInventoryInput) -> Result<GetBucketInventoryOutput, TosError> {
1162        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1163    }
1164
1165    async fn list_bucket_inventory(&self, input: &ListBucketInventoryInput) -> Result<ListBucketInventoryOutput, TosError> {
1166        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1167    }
1168
1169    async fn delete_bucket_inventory(&self, input: &DeleteBucketInventoryInput) -> Result<DeleteBucketInventoryOutput, TosError> {
1170        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1171    }
1172
1173    #[cfg(not(feature = "tokio-runtime"))]
1174    async fn get_bucket_type(&self, input: &GetBucketTypeInput) -> Result<GetBucketTypeOutput, TosError> {
1175        let mut hinput = HeadBucketInput::new(input.bucket());
1176        hinput.set_request_host(input.request_host());
1177        if let Some(request_date) = input.request_date() {
1178            hinput.set_request_date(request_date);
1179        }
1180        let output = self.head_bucket(&hinput).await?;
1181        Ok(GetBucketTypeOutput {
1182            request_info: output.request_info,
1183            region: output.region,
1184            storage_class: output.storage_class,
1185            az_redundancy: output.az_redundancy,
1186            project_name: output.project_name,
1187            bucket_type: output.bucket_type,
1188            expire_at: Default::default(),
1189        })
1190    }
1191
1192    async fn does_bucket_exist(&self, input: &DoesBucketExistInput) -> Result<bool, TosError> {
1193        let mut hinput = HeadBucketInput::new(input.bucket());
1194        if let Some(request_date) = input.request_date() {
1195            hinput.set_request_date(request_date);
1196        }
1197        hinput.set_request_host(input.request_host());
1198        match self.head_bucket(&hinput).await {
1199            Ok(_) => Ok(true),
1200            Err(ex) => {
1201                if let Some(err) = ex.as_server_error() {
1202                    if err.status_code() == 404 && err.ec() == "0006-00000001" {
1203                        return Ok(false);
1204                    }
1205                }
1206                Err(ex)
1207            }
1208        }
1209    }
1210
1211    async fn get_bucket_info(&self, input: &GetBucketInfoInput) -> Result<GetBucketInfoOutput, TosError> {
1212        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1213    }
1214
1215    async fn put_bucket_access_monitor(&self, input: &PutBucketAccessMonitorInput) -> Result<PutBucketAccessMonitorOutput, TosError> {
1216        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1217    }
1218
1219    async fn get_bucket_access_monitor(&self, input: &GetBucketAccessMonitorInput) -> Result<GetBucketAccessMonitorOutput, TosError> {
1220        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1221    }
1222
1223    async fn put_bucket_trash(&self, input: &PutBucketTrashInput) -> Result<PutBucketTrashOutput, TosError> {
1224        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1225    }
1226
1227    async fn get_bucket_trash(&self, input: &GetBucketTrashInput) -> Result<GetBucketTrashOutput, TosError> {
1228        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1229    }
1230
1231    #[cfg(feature = "tokio-runtime")]
1232    async fn get_bucket_type(&self, input: &GetBucketTypeInput) -> Result<GetBucketTypeOutput, TosError> {
1233        crate::internal::check_bucket(input.bucket())?;
1234
1235        {
1236            let cached_buckets = self.cached_buckets.read().await;
1237            if let Some((output, ddl)) = cached_buckets.get(input.bucket()) {
1238                if ddl >= &chrono::Utc::now() {
1239                    let mut output = output.clone();
1240                    output.expire_at = *ddl;
1241                    return Ok(output);
1242                }
1243            }
1244        }
1245
1246        let mut cached_buckets = self.cached_buckets.write().await;
1247        if let Some((output, ddl)) = cached_buckets.get(input.bucket()) {
1248            if ddl >= &chrono::Utc::now() {
1249                let mut output = output.clone();
1250                output.expire_at = *ddl;
1251                return Ok(output);
1252            }
1253        }
1254        let mut hinput = HeadBucketInput::new(input.bucket());
1255        hinput.set_request_host(input.request_host());
1256        if let Some(request_date) = input.request_date() {
1257            hinput.set_request_date(request_date);
1258        }
1259        let output = self.head_bucket(&hinput).await?;
1260        let mut rng = rand::thread_rng();
1261        let ddl = std::ops::Add::add(chrono::Utc::now(), chrono::Duration::minutes(rand::Rng::gen_range(&mut rng, 10..15) as i64));
1262        let output = GetBucketTypeOutput {
1263            request_info: output.request_info,
1264            region: output.region,
1265            storage_class: output.storage_class,
1266            az_redundancy: output.az_redundancy,
1267            project_name: output.project_name,
1268            bucket_type: output.bucket_type,
1269            expire_at: ddl,
1270        };
1271        cached_buckets.insert(input.bucket.clone(), (output.clone(), ddl));
1272        Ok(output)
1273    }
1274}
1275
1276#[async_trait]
1277impl<P, C, S> MultipartAPI for TosClientImpl<P, C, S>
1278where
1279    P: CredentialsProvider<C> + Send + Sync + 'static,
1280    C: Credentials + Send + Sync + 'static,
1281    S: AsyncRuntime + Send + Sync + 'static,
1282{
1283    async fn create_multipart_upload(&self, input: &CreateMultipartUploadInput) -> Result<CreateMultipartUploadOutput, TosError> {
1284        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1285    }
1286
1287    async fn upload_part<B>(&self, input: &UploadPartInput<B>) -> Result<UploadPartOutput, TosError>
1288    where
1289        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static,
1290    {
1291        self.do_request(input).await
1292    }
1293
1294    async fn upload_part_from_buffer(&self, input: &UploadPartFromBufferInput) -> Result<UploadPartOutput, TosError> {
1295        self.do_request::<_, _, InternalReader<MultiBytes>>(input).await
1296    }
1297
1298    #[cfg(feature = "tokio-runtime")]
1299    async fn upload_part_from_file(&self, input: &crate::multipart::UploadPartFromFileInput) -> Result<UploadPartOutput, TosError> {
1300        self.do_request_af::<_, _, crate::asynchronous::file::FileReader>(input).await
1301    }
1302
1303    async fn complete_multipart_upload(&self, input: &CompleteMultipartUploadInput) -> Result<CompleteMultipartUploadOutput, TosError> {
1304        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1305    }
1306
1307    async fn abort_multipart_upload(&self, input: &AbortMultipartUploadInput) -> Result<AbortMultipartUploadOutput, TosError> {
1308        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1309    }
1310
1311    async fn upload_part_copy(&self, input: &UploadPartCopyInput) -> Result<UploadPartCopyOutput, TosError> {
1312        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1313    }
1314
1315    async fn list_multipart_uploads(&self, input: &ListMultipartUploadsInput) -> Result<ListMultipartUploadsOutput, TosError> {
1316        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1317    }
1318
1319    async fn list_parts(&self, input: &ListPartsInput) -> Result<ListPartsOutput, TosError> {
1320        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1321    }
1322}
1323
1324#[async_trait]
1325impl<P, C, S> SignerAPI for TosClientImpl<P, C, S>
1326where
1327    P: CredentialsProvider<C> + Send + Sync + 'static,
1328    C: Credentials + Send + Sync + 'static,
1329    S: AsyncRuntime + Send + Sync + 'static,
1330{
1331    async fn pre_signed_url(&self, input: &PreSignedURLInput) -> Result<PreSignedURLOutput, TosError> {
1332        let cred = self.load_credentials().await?;
1333        let ak = cred.ak();
1334        let sk = cred.sk();
1335        let security_token = cred.security_token();
1336        pre_signed_url(&self.config_holder, &ak, &sk, &security_token, input)
1337    }
1338
1339    async fn pre_signed_post_signature(&self, input: &PreSignedPostSignatureInput) -> Result<PreSignedPostSignatureOutput, TosError> {
1340        let cred = self.load_credentials().await?;
1341        let ak = cred.ak();
1342        let sk = cred.sk();
1343        let security_token = cred.security_token();
1344        pre_signed_post_signature(&self.config_holder, &ak, &sk, &security_token, input)
1345    }
1346
1347    async fn pre_signed_policy_url(&self, input: &PreSignedPolicyURLInput) -> Result<PreSignedPolicyURLOutput, TosError> {
1348        let cred = self.do_load_credentials().await?;
1349        let ak = cred.ak();
1350        let sk = cred.sk();
1351        let security_token = cred.security_token();
1352        pre_signed_policy_url(&self.config_holder, &ak, &sk, &security_token, input)
1353    }
1354}
1355
1356#[async_trait]
1357impl<P, C, S> ControlAPI for TosClientImpl<P, C, S>
1358where
1359    C: 'static + Credentials + Send + Sync,
1360    P: 'static + CredentialsProvider<C> + Send + Sync,
1361    S: 'static + AsyncRuntime + Send + Sync,
1362{
1363    async fn put_qos_policy(&self, input: &PutQosPolicyInput) -> Result<PutQosPolicyOutput, TosError> {
1364        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1365    }
1366
1367    async fn get_qos_policy(&self, input: &GetQosPolicyInput) -> Result<GetQosPolicyOutput, TosError> {
1368        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1369    }
1370
1371    async fn delete_qos_policy(&self, input: &DeleteQosPolicyInput) -> Result<DeleteQosPolicyOutput, TosError> {
1372        self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1373    }
1374}
1375
1376#[async_trait]
1377impl<P, C, S> TosClient for TosClientImpl<P, C, S>
1378where
1379    P: CredentialsProvider<C> + Send + Sync + 'static,
1380    C: Credentials + Send + Sync + 'static,
1381    S: AsyncRuntime + Send + Sync + 'static,
1382{
1383    fn refresh_credentials(&self, ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>) -> bool {
1384        if !self.credentials_can_refresh {
1385            return false;
1386        }
1387        match C::new(ak, sk, security_token) {
1388            Err(_) => false,
1389            Ok(c) => {
1390                match P::new(c) {
1391                    Err(_) => false,
1392                    Ok(p) => {
1393                        self.credentials_provider.store(Arc::new(p));
1394                        true
1395                    }
1396                }
1397            }
1398        }
1399    }
1400
1401    fn refresh_endpoint_region(&self, endpoint: impl Into<String>, region: impl Into<String>) -> bool {
1402        let c = self.config_holder.load();
1403        let mut config_holder = ConfigHolder {
1404            max_retry_count: c.max_retry_count,
1405            request_timeout: c.request_timeout,
1406            connection_timeout: c.connection_timeout,
1407            max_connections: c.max_connections,
1408            idle_connection_time: c.idle_connection_time,
1409            enable_crc: c.enable_crc,
1410            enable_verify_ssl: c.enable_verify_ssl,
1411            auto_recognize_content_type: c.auto_recognize_content_type,
1412            is_custom_domain: c.is_custom_domain,
1413            dns_cache_time: c.dns_cache_time,
1414            dns_cache_async_refresh: c.dns_cache_async_refresh,
1415            proxy_host: c.proxy_host.to_string(),
1416            proxy_port: c.proxy_port,
1417            proxy_username: c.proxy_username.clone(),
1418            proxy_password: c.proxy_password.clone(),
1419            disable_encoding_meta: c.disable_encoding_meta,
1420            expect_100_continue_threshold: c.expect_100_continue_threshold,
1421            high_latency_log_threshold: c.high_latency_log_threshold,
1422            user_agent_product_name: c.user_agent_product_name.clone(),
1423            user_agent_soft_name: c.user_agent_soft_name.clone(),
1424            user_agent_soft_version: c.user_agent_soft_version.clone(),
1425            user_agent_customized_key_values: c.user_agent_customized_key_values.clone(),
1426            follow_redirect_times: c.follow_redirect_times,
1427            client_crt: c.ca_crt.clone(),
1428            client_key: c.client_key.clone(),
1429            ca_crt: c.ca_crt.clone(),
1430            user_agent: c.user_agent.clone(),
1431            region: "".to_string(),
1432            schema: "".to_string(),
1433            domain: "".to_string(),
1434            port: None,
1435            schema_control: c.schema_control.clone(),
1436            domain_control: c.domain_control.clone(),
1437        };
1438        if let Err(_) = config_holder.check(endpoint, region) {
1439            return false;
1440        }
1441        self.config_holder.store(Arc::new(config_holder));
1442        true
1443    }
1444}
1445
1446impl<P, C, S> TosClientImpl<P, C, S>
1447where
1448    P: CredentialsProvider<C> + Send + Sync + 'static,
1449    C: Credentials + Send + Sync + 'static,
1450    S: AsyncRuntime + Send + Sync + 'static,
1451{
1452    async fn load_credentials(&self) -> Result<Arc<C>, TosError> {
1453        #[cfg(feature = "tokio-runtime")]
1454        {
1455            let inner_credentials = self.inner_credentials.read().await;
1456            if let Some(c) = inner_credentials.as_ref() {
1457                return Ok(c.clone());
1458            }
1459        }
1460        self.do_load_credentials().await
1461    }
1462
1463    async fn do_load_credentials(&self) -> Result<Arc<C>, TosError> {
1464        let credential_provider = self.credentials_provider.load();
1465        match credential_provider.credentials(CREDENTIALS_EXPIRES).await {
1466            Err(ex) => Err(TosError::client_error_with_cause("load credentials error", GenericError::DefaultError(ex.to_string()))),
1467            Ok(c) => Ok(c),
1468        }
1469    }
1470
1471    async fn modify_object<B>(&self, input: &ModifyObjectInput<B>) -> Result<ModifyObjectOutput, TosError>
1472    where
1473        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static,
1474    {
1475        self.do_request(input).await
1476    }
1477
1478    async fn modify_object_from_buffer(&self, input: &ModifyObjectFromBufferInput) -> Result<ModifyObjectOutput, TosError> {
1479        self.do_request::<_, _, InternalReader<MultiBytes>>(input).await
1480    }
1481    #[cfg(feature = "tokio-runtime")]
1482    async fn modify_object_from_file(&self, input: &crate::object::ModifyObjectFromFileInput) -> Result<ModifyObjectOutput, TosError> {
1483        self.do_request_af::<_, _, crate::asynchronous::file::FileReader>(input).await
1484    }
1485
1486    async fn do_request<T, K, B>(&self, input: &T) -> Result<K, TosError>
1487    where
1488        T: InputTranslator<B>,
1489        K: OutputParser + RequestInfoTrait + Send,
1490        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
1491    {
1492        self.do_request_common::<T, MockAsyncInputTranslator, K, B>(Some(input), None).await
1493    }
1494
1495    pub(crate) async fn do_request_common<T, F, K, B>(&self, input: Option<&T>, input2: Option<&F>) -> Result<K, TosError>
1496    where
1497        T: InputTranslator<B>,
1498        F: AsyncInputTranslator<B>,
1499        K: OutputParser + RequestInfoTrait + Send,
1500        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
1501    {
1502        let config_holder = self.config_holder.load();
1503        let operation;
1504        if input.is_some() {
1505            operation = check_bucket_and_key(input.unwrap(), config_holder.is_custom_domain)?;
1506        } else {
1507            operation = check_bucket_and_key(input2.unwrap(), config_holder.is_custom_domain)?;
1508        }
1509        let mut retry_count = 0;
1510        let max_retry_count = config_holder.max_retry_count;
1511        loop {
1512            let start = Instant::now();
1513            let mut ac = AdditionalContext::new();
1514            let result = self.do_request_once::<T, F, K, B>(input, input2, retry_count, config_holder.clone(), &mut ac).await;
1515            let elapsed_ms = start.elapsed().as_millis();
1516            let exceed = exceed_high_latency_log_threshold(config_holder.high_latency_log_threshold, elapsed_ms, ac.request_size, operation);
1517            match result {
1518                Ok(k) => {
1519                    if exceed {
1520                        warn!(target: get_common_log_target(), "high latency request {} succeed, http status: {}, request id: {}, cost: {} ms", operation, k.status_code(), k.request_id(), elapsed_ms)
1521                    } else {
1522                        info!(target: get_common_log_target(), "do {} succeed, http status: {}, request id: {}, cost: {} ms", operation, k.status_code(), k.request_id(), elapsed_ms);
1523                    }
1524                    return Ok(k);
1525                }
1526                Err(mut e) => {
1527                    match &e {
1528                        TosError::TosClientError { .. } => {
1529                            if exceed {
1530                                warn!(target: get_common_log_target(), "high latency request {} failed, cost: {} ms", operation, elapsed_ms);
1531                            } else {
1532                                warn!(target: get_common_log_target(), "do {} failed, cost: {} ms", operation, elapsed_ms);
1533                            }
1534                        }
1535                        TosError::TosServerError { status_code, request_id, ec, .. } => {
1536                            if exceed {
1537                                if status_code.to_owned() < 500 {
1538                                    warn!(target: get_common_log_target(), "high latency request {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
1539                                    request_id, ec, elapsed_ms);
1540                                } else {
1541                                    warn!(target: get_common_log_target(), "high latency request {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
1542                                    request_id, ec, elapsed_ms);
1543                                }
1544                            } else {
1545                                if status_code.to_owned() < 500 {
1546                                    warn!(target: get_common_log_target(), "do {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
1547                                    request_id, ec, elapsed_ms);
1548                                } else {
1549                                    info!(target: get_common_log_target(), "do {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
1550                                    request_id, ec, elapsed_ms);
1551                                }
1552                            }
1553                        }
1554                    }
1555
1556                    let (retry_after, need_retry) = check_need_retry(&e, retry_count, max_retry_count, operation);
1557                    if !need_retry {
1558                        if let Some(request_url) = ac.request_url {
1559                            e.set_request_url(request_url);
1560                        }
1561                        return Err(e);
1562                    }
1563                    self.sleep_for_retry(retry_count, retry_after).await;
1564                    retry_count += 1;
1565                }
1566            }
1567        }
1568    }
1569
1570
1571    async fn sleep_for_retry(&self, retry_count: isize, retry_after: isize) {
1572        let mut delay = BASE_DELAY_MS * 2u64.pow(retry_count as u32);
1573        if delay > MAX_DELAY_MS {
1574            delay = MAX_DELAY_MS;
1575        }
1576        let retry_after = retry_after as u64 * 1000;
1577        if retry_after > delay {
1578            delay = retry_after;
1579        }
1580
1581        self.async_runtime.sleep(Duration::from_millis(delay)).await;
1582    }
1583
1584    async fn do_request_once<'a, 'b, T, F, K, B>(&self, input: Option<&'b T>, input2: Option<&'b F>, retry_count: isize,
1585                                                 config_holder: Arc<ConfigHolder>, ac: &mut AdditionalContext<'a>) -> Result<K, TosError>
1586    where
1587        T: InputTranslator<B>,
1588        F: AsyncInputTranslator<B>,
1589        K: OutputParser + Send,
1590        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
1591        'b: 'a,
1592    {
1593        let mut request;
1594        if input.is_some() {
1595            let input = input.unwrap();
1596            request = input.trans(config_holder)?;
1597            ac.request_host = input.request_host();
1598            ac.request_date = input.request_date();
1599            ac.request_header = input.request_header();
1600            ac.request_query = input.request_query();
1601            ac.is_control_operation = input.is_control_operation();
1602        } else {
1603            let input2 = input2.unwrap();
1604            request = input2.trans(config_holder).await?;
1605            ac.request_host = input2.request_host();
1606            ac.request_date = input2.request_date();
1607            ac.request_header = input2.request_header();
1608            ac.request_query = input2.request_query();
1609            ac.is_control_operation = input2.is_control_operation();
1610        }
1611
1612        let body = request.body.take();
1613        request.retry_count = retry_count;
1614        let response = self.do_request_by_client(&mut request, body, ac).await?;
1615        if request.operation == GET_OBJECT_TO_FILE_OPERATION {
1616            if let Some(cl) = response.content_length() {
1617                ac.request_size = cl as i64;
1618            }
1619            let result = K::check_and_parse(request, response).await;
1620            return result;
1621        }
1622        K::check_and_parse(request, response).await
1623    }
1624
1625    async fn do_request_by_client<'a, 'c, B>(&self, request: &mut HttpRequest<'c, B>, body: Option<B>, ac: &mut AdditionalContext<'a>) -> Result<HttpResponse, TosError>
1626    where
1627        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
1628        'a: 'c,
1629    {
1630        let config_holder = self.config_holder.load();
1631        if ac.is_control_operation && (config_holder.schema_control == "" || config_holder.domain_control == "") {
1632            return Err(TosError::client_error("request control operation but control endpoint is empty"));
1633        }
1634
1635        let cred = self.load_credentials().await?;
1636        let ak = cred.ak();
1637        let sk = cred.sk();
1638        let security_token = cred.security_token();
1639        auto_recognize_content_type(request, config_holder.auto_recognize_content_type);
1640        sign_header(request, ak, sk, security_token, config_holder.as_ref(), ac)?;
1641        request.enable_crc = config_holder.enable_crc;
1642
1643        let request_url = get_request_url(request, config_holder.as_ref(), ac.is_control_operation);
1644        ac.request_url = Some(request_url.clone());
1645        let mut rb = self.client.request(request.method.as_http_method(), request_url);
1646        let mut cl = -1i64;
1647        for kv in &request.header {
1648            if *kv.0 == HEADER_CONTENT_LENGTH || *kv.0 == HEADER_CONTENT_LENGTH_LOWER {
1649                if let Ok(x) = kv.1.parse::<i64>() {
1650                    cl = x;
1651                }
1652            }
1653            rb = rb.header(*kv.0, kv.1);
1654        }
1655
1656        if let Some(meta) = &request.meta {
1657            for kv in meta {
1658                rb = rb.header(kv.0, kv.1);
1659            }
1660        }
1661
1662        if request.retry_count > 0 {
1663            rb = rb.header(HEADER_SDK_RETRY_COUNT, format!("attempt={}; max={}", request.retry_count, config_holder.max_retry_count));
1664        }
1665
1666        if config_holder.expect_100_continue_threshold > 0 && cl > config_holder.expect_100_continue_threshold as i64 {
1667            rb = rb.header(HEADER_EXPECT, "100-continue");
1668        }
1669
1670        let is_upload_operation = ALL_UPLOAD_OPERATIONS.contains_key(request.operation);
1671        let calc_crc = config_holder.enable_crc && is_upload_operation;
1672        let crc64 = Arc::new(AtomicU64::new(0));
1673        // add body
1674        if let Some(bd) = body {
1675            if calc_crc {
1676                let mut reader = MultifunctionalReader::new(bd, Some(crc64.clone()), cl, request);
1677                if let Some(ref rc) = request.request_context {
1678                    if let Some(init_crc64) = rc.init_crc64 {
1679                        reader.init_crc64 = Some(init_crc64);
1680                    }
1681                    if is_upload_operation {
1682                        if let Some(ref rl) = rc.rate_limiter {
1683                            reader.set_rate_limiter(rl.clone());
1684                        }
1685                        if let Some(ref adts) = rc.async_data_transfer_listener {
1686                            reader.set_async_data_transfer_listener(adts.clone());
1687                        }
1688                    }
1689                }
1690                rb = self.add_body(rb, reader, cl);
1691            } else if is_upload_operation {
1692                if let Some(ref rc) = request.request_context {
1693                    let mut reader = MultifunctionalReader::new(bd, None, cl, request);
1694                    if let Some(ref rl) = rc.rate_limiter {
1695                        reader.set_rate_limiter(rl.clone());
1696                    }
1697                    if let Some(ref adts) = rc.async_data_transfer_listener {
1698                        reader.set_async_data_transfer_listener(adts.clone());
1699                    }
1700                    rb = self.add_body(rb, reader, cl);
1701                } else {
1702                    rb = self.add_body(rb, bd, cl);
1703                }
1704            } else {
1705                rb = self.add_body(rb, bd, cl);
1706            }
1707        } else if cl == -1 {
1708            rb = rb.header(HEADER_CONTENT_LENGTH, 0);
1709        }
1710
1711        match rb.build() {
1712            Ok(req) => {
1713                let result = self.client.execute(req).await;
1714                ac.request_size = cl;
1715                match result {
1716                    Ok(resp) => {
1717                        if calc_crc {
1718                            let result = crc64.load(Ordering::Acquire);
1719                            if request.request_context.is_none() {
1720                                let mut rc = RequestContext::default();
1721                                rc.crc64 = Some(result);
1722                                request.request_context = Some(rc)
1723                            } else {
1724                                request.request_context.as_mut().unwrap().crc64 = Some(result);
1725                            }
1726                        }
1727                        Ok(resp)
1728                    }
1729                    Err(e) => {
1730                        Err(TosError::client_error_with_cause("do request error", GenericError::HttpRequestError(e.to_string())))
1731                    }
1732                }
1733            }
1734            Err(e) => {
1735                Err(TosError::client_error_with_cause("build request error", GenericError::DefaultError(e.to_string())))
1736            }
1737        }
1738    }
1739
1740    fn add_body<B>(&self, rb: RequestBuilder, body: B, _: i64) -> RequestBuilder
1741    where
1742        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + 'static,
1743    {
1744        rb.body(Body::wrap_stream(body))
1745    }
1746
1747    async fn check_object_status(&self, bucket: &str, key: &str, offset: i64, content_length: i64, ginput: &GenericInput) -> Result<(String, bool), TosError> {
1748        let mut if_match = String::new();
1749        let mut forbid_overwrite = false;
1750        if offset <= 0 && content_length >= 0 {
1751            let mut hinput = HeadObjectInput::new(bucket, key);
1752            hinput.set_request_host(ginput.request_host.as_str());
1753            if let Some(request_date) = ginput.request_date {
1754                hinput.set_request_date(request_date);
1755            }
1756            match self.head_object(&hinput).await {
1757                Ok(output) => {
1758                    if output.content_length > 0 {
1759                        return Err(TosError::client_error(format!("invalid offset,  expected {}, actual {}", output.content_length, offset)));
1760                    }
1761                    if_match = output.etag().to_string();
1762                }
1763                Err(ex) => {
1764                    if let Some(err) = ex.as_server_error() {
1765                        if err.status_code() == 404 && err.ec() == "0017-00000003" {
1766                            forbid_overwrite = true;
1767                        }
1768                    }
1769
1770                    if !forbid_overwrite {
1771                        return Err(ex);
1772                    }
1773                }
1774            }
1775        }
1776        Ok((if_match, forbid_overwrite))
1777    }
1778
1779    fn trans_append_object_input(&self, input: &AppendObjectBasicInput) -> PutObjectBasicInput {
1780        let mut minput = PutObjectBasicInput::default();
1781        minput.bucket = input.bucket.clone();
1782        minput.key = input.key.clone();
1783        minput.set_cache_control(input.cache_control());
1784        minput.set_content_disposition(input.content_disposition());
1785        minput.set_content_encoding(input.content_encoding());
1786        minput.set_content_language(input.content_language());
1787        minput.set_content_type(input.content_type());
1788        if let Some(expires) = input.expires() {
1789            minput.set_expires(expires);
1790        }
1791        if let Some(acl) = input.acl() {
1792            minput.set_acl(acl);
1793        }
1794        minput.set_grant_full_control(input.grant_full_control());
1795        minput.set_grant_read(input.grant_read());
1796        minput.set_grant_read_acp(input.grant_read_acp());
1797        minput.set_grant_write_acp(input.grant_write_acp());
1798        minput.set_website_redirect_location(input.website_redirect_location());
1799        minput.object_expires = input.object_expires;
1800        minput.meta = input.meta.clone();
1801        if let Some(sc) = input.storage_class() {
1802            minput.set_storage_class(sc);
1803        }
1804        minput.set_content_length(input.content_length());
1805        minput.notification_custom_parameters = input.notification_custom_parameters.clone();
1806        minput.traffic_limit = input.traffic_limit;
1807        minput
1808    }
1809}
1810
1811impl<P, C, S> TosClientImpl<P, C, S>
1812where
1813    S: AsyncRuntime,
1814{
1815    pub fn close(&self) {
1816        let _ = self.closed.compare_exchange(0, 1, Ordering::AcqRel, Ordering::Relaxed);
1817    }
1818
1819    #[cfg(feature = "tokio-runtime")]
1820    pub async fn shutdown(&self) {
1821        self.close();
1822        self.closed_sender.close();
1823        if let Some(handlers) = self.handlers.lock().await.take() {
1824            for handler in handlers {
1825                if let Some(handler) = handler {
1826                    let _ = handler.await;
1827                }
1828            }
1829        }
1830    }
1831}
1832
1833impl<P, C, S> Drop for TosClientImpl<P, C, S>
1834where
1835    S: AsyncRuntime,
1836{
1837    fn drop(&mut self) {
1838        self.close();
1839    }
1840}