1use crate::asynchronous::auth::SignerAPI;
17use crate::asynchronous::bucket::BucketAPI;
18use crate::asynchronous::common::DataTransferListener;
19use crate::asynchronous::control::ControlAPI;
20use crate::asynchronous::credential::CredentialsProvider;
21use crate::asynchronous::http::HttpResponse;
22use crate::asynchronous::internal::{AsyncInputTranslator, OutputParser};
23use crate::asynchronous::multipart::MultipartAPI;
24use crate::asynchronous::object::ObjectAPI;
25use crate::asynchronous::paginator::PaginatorAPI;
26use crate::asynchronous::reader::StreamVec;
27use crate::auth::{pre_signed_policy_url, pre_signed_post_signature, pre_signed_url, sign_header, PreSignedPolicyURLInput, PreSignedPolicyURLOutput, PreSignedPostSignatureInput, PreSignedPostSignatureOutput, PreSignedURLInput, PreSignedURLOutput};
28use crate::bucket::{CreateBucketInput, CreateBucketOutput, DeleteBucketCORSInput, DeleteBucketCORSOutput, DeleteBucketCustomDomainInput, DeleteBucketCustomDomainOutput, DeleteBucketEncryptionInput, DeleteBucketEncryptionOutput, DeleteBucketInput, DeleteBucketInventoryInput, DeleteBucketInventoryOutput, DeleteBucketLifecycleInput, DeleteBucketLifecycleOutput, DeleteBucketMirrorBackInput, DeleteBucketMirrorBackOutput, DeleteBucketOutput, DeleteBucketPolicyInput, DeleteBucketPolicyOutput, DeleteBucketRealTimeLogInput, DeleteBucketRealTimeLogOutput, DeleteBucketRenameInput, DeleteBucketRenameOutput, DeleteBucketReplicationInput, DeleteBucketReplicationOutput, DeleteBucketTaggingInput, DeleteBucketTaggingOutput, DeleteBucketWebsiteInput, DeleteBucketWebsiteOutput, DoesBucketExistInput, GetBucketACLInput, GetBucketACLOutput, GetBucketAccessMonitorInput, GetBucketAccessMonitorOutput, GetBucketCORSInput, GetBucketCORSOutput, GetBucketEncryptionInput, GetBucketEncryptionOutput, GetBucketInfoInput, GetBucketInfoOutput, GetBucketInventoryInput, GetBucketInventoryOutput, GetBucketLifecycleInput, GetBucketLifecycleOutput, GetBucketLocationInput, GetBucketLocationOutput, GetBucketMirrorBackInput, GetBucketMirrorBackOutput, GetBucketNotificationType2Input, GetBucketNotificationType2Output, GetBucketPolicyInput, GetBucketPolicyOutput, GetBucketRealTimeLogInput, GetBucketRealTimeLogOutput, GetBucketRenameInput, GetBucketRenameOutput, GetBucketReplicationInput, GetBucketReplicationOutput, GetBucketTaggingInput, GetBucketTaggingOutput, GetBucketTrashInput, GetBucketTrashOutput, GetBucketTypeInput, GetBucketTypeOutput, GetBucketVersioningInput, GetBucketVersioningOutput, GetBucketWebsiteInput, GetBucketWebsiteOutput, HeadBucketInput, HeadBucketOutput, ListBucketCustomDomainInput, ListBucketCustomDomainOutput, ListBucketInventoryInput, ListBucketInventoryOutput, ListBucketsInput, ListBucketsOutput, PutBucketACLInput, PutBucketACLOutput, PutBucketAccessMonitorInput, PutBucketAccessMonitorOutput, PutBucketCORSInput, PutBucketCORSOutput, PutBucketCustomDomainInput, PutBucketCustomDomainOutput, PutBucketEncryptionInput, PutBucketEncryptionOutput, PutBucketInventoryInput, PutBucketInventoryOutput, PutBucketLifecycleInput, PutBucketLifecycleOutput, PutBucketMirrorBackInput, PutBucketMirrorBackOutput, PutBucketNotificationType2Input, PutBucketNotificationType2Output, PutBucketPolicyInput, PutBucketPolicyOutput, PutBucketRealTimeLogInput, PutBucketRealTimeLogOutput, PutBucketRenameInput, PutBucketRenameOutput, PutBucketReplicationInput, PutBucketReplicationOutput, PutBucketStorageClassInput, PutBucketStorageClassOutput, PutBucketTaggingInput, PutBucketTaggingOutput, PutBucketTrashInput, PutBucketTrashOutput, PutBucketVersioningInput, PutBucketVersioningOutput, PutBucketWebsiteInput, PutBucketWebsiteOutput};
29use crate::common::{get_common_log_target, GenericInput, RequestInfoTrait};
30use crate::config::ConfigHolder;
31use crate::constant::{ALL_UPLOAD_OPERATIONS, BASE_DELAY_MS, CREDENTIALS_EXPIRES, DEFAULT_MAX_KEYS, GET_OBJECT_TO_FILE_OPERATION, HEADER_CONTENT_LENGTH, HEADER_CONTENT_LENGTH_LOWER, HEADER_EXPECT, HEADER_SDK_RETRY_COUNT, MAX_DELAY_MS, SCHEMA_HTTP, SCHEMA_HTTPS};
32use crate::control::{DeleteQosPolicyInput, DeleteQosPolicyOutput, GetQosPolicyInput, GetQosPolicyOutput, PutQosPolicyInput, PutQosPolicyOutput};
33use crate::credential::{CommonCredentials, CommonCredentialsProvider, Credentials, EnvCredentialsProvider, StaticCredentialsProvider};
34use crate::enumeration::BucketType;
35use crate::error::{GenericError, TosError};
36use crate::http::{HttpRequest, RequestContext};
37use crate::internal::{auto_recognize_content_type, build_certificate, build_identity, check_bucket_and_key, check_need_retry, exceed_high_latency_log_threshold, get_request_url, AdditionalContext, InputTranslator, MockAsyncInputTranslator};
38use crate::multipart::{AbortMultipartUploadInput, AbortMultipartUploadOutput, CompleteMultipartUploadInput, CompleteMultipartUploadOutput, CreateMultipartUploadInput, CreateMultipartUploadOutput, ListMultipartUploadsInput, ListMultipartUploadsOutput, ListPartsInput, ListPartsOutput, UploadPartCopyInput, UploadPartCopyOutput, UploadPartFromBufferInput, UploadPartInput, UploadPartOutput};
39use crate::object::{AppendObjectBasicInput, AppendObjectFromBufferInput, AppendObjectInput, AppendObjectOutput, CopyObjectInput, CopyObjectOutput, DeleteMultiObjectsInput, DeleteMultiObjectsOutput, DeleteObjectInput, DeleteObjectOutput, DeleteObjectTaggingInput, DeleteObjectTaggingOutput, DoesObjectExistInput, FetchObjectInput, FetchObjectOutput, GetFetchTaskInput, GetFetchTaskOutput, GetFileStatusInput, GetFileStatusOutput, GetObjectACLInput, GetObjectACLOutput, GetObjectInput, GetObjectOutput, GetObjectTaggingInput, GetObjectTaggingOutput, GetSymlinkInput, GetSymlinkOutput, HeadObjectInput, HeadObjectOutput, ListObjectVersionsInput, ListObjectVersionsOutput, ListObjectsType2Input, ListObjectsType2Output, ModifyObjectFromBufferInput, ModifyObjectFromFileInput, ModifyObjectInput, ModifyObjectOutput, PutFetchTaskInput, PutFetchTaskOutput, PutObjectACLInput, PutObjectACLOutput, PutObjectBasicInput, PutObjectFromBufferInput, PutObjectFromFileInput, PutObjectInput, PutObjectOutput, PutObjectTaggingInput, PutObjectTaggingOutput, PutSymlinkInput, PutSymlinkOutput, RenameObjectInput, RenameObjectOutput, RestoreObjectInput, RestoreObjectOutput, SetObjectMetaInput, SetObjectMetaOutput, SetObjectTimeInput, SetObjectTimeOutput};
40use crate::reader::{InternalReader, MultiBytes, MultifunctionalReader};
41use crate::tos::ConfigAware;
42use arc_swap::ArcSwap;
43use async_channel::Sender;
44use async_trait::async_trait;
45use bytes::Bytes;
46use futures_core::future::BoxFuture;
47use futures_core::Stream;
48use reqwest::{redirect, Body, Client, Proxy, RequestBuilder};
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt::{Debug, Formatter};
52use std::future::Future;
53use std::marker::PhantomData;
54use std::pin::Pin;
55use std::sync::atomic::{AtomicI8, AtomicU64, Ordering};
56use std::sync::Arc;
57use std::task::{Context, Poll};
58use std::time::{Duration, Instant};
59use tracing::log::{info, warn};
60
61#[async_trait]
62pub trait AsyncRuntime {
63 type JoinError: Error;
64 async fn sleep(&self, duration: Duration);
65 fn spawn<'a, F>(&self, future: F) -> BoxFuture<'a, Result<F::Output, Self::JoinError>>
66 where
67 F: Future + Send + 'static,
68 F::Output: Send + 'static;
69
70 fn block_on<F: Future>(&self, future: F) -> F::Output;
71}
72
73
74#[derive(Debug, Clone, Default)]
75pub struct TosClientBuilder<P, C, S>
76{
77 ak: String,
78 sk: String,
79 security_token: String,
80 region: String,
81 endpoint: String,
82 control_endpoint: String,
83 credentials_provider: Option<P>,
84 config_holder: ConfigHolder,
85 async_runtime: S,
86 c: PhantomData<C>,
87}
88
89impl<P, C, S> TosClientBuilder<P, C, S>
90where
91 P: CredentialsProvider<C> + Send + Sync + 'static,
92 C: Credentials + Send + Sync + 'static,
93 S: AsyncRuntime + Send + Sync + 'static,
94{
95 pub fn build(mut self) -> Result<TosClientImpl<P, C, S>, TosError> {
96 self.config_holder.check(self.endpoint, self.region)?;
97 self.config_holder.check_control(self.control_endpoint)?;
98 self.config_holder.gen_user_agent();
99 let mut client = Client::builder()
100 .user_agent(self.config_holder.user_agent.as_str())
101 .tcp_nodelay(true)
102 .tcp_keepalive(None)
103 .no_gzip()
104 .no_deflate()
105 .no_brotli()
106 .connect_timeout(Duration::from_millis(self.config_holder.connection_timeout as u64))
107 .pool_idle_timeout(Duration::from_millis(self.config_holder.idle_connection_time as u64))
108 .pool_max_idle_per_host(self.config_holder.max_connections as usize);
109 if self.config_holder.request_timeout > 0 {
110 client = client.timeout(Duration::from_millis(self.config_holder.request_timeout as u64));
111 }
112
113 if self.config_holder.follow_redirect_times > 0 {
114 client = client.redirect(redirect::Policy::limited(self.config_holder.follow_redirect_times as usize));
115 } else {
116 client = client.redirect(redirect::Policy::none());
117 }
118
119 if self.config_holder.proxy_host != "" {
120 let mut proxy_url = self.config_holder.proxy_host.as_str();
121 while proxy_url.len() > 0 && proxy_url.ends_with("/") {
122 proxy_url = &proxy_url[0..proxy_url.len() - 1];
123 }
124
125 if proxy_url != "" {
126 let mut proxy_url = proxy_url.to_lowercase();
127 if !proxy_url.starts_with(SCHEMA_HTTP) && !proxy_url.starts_with(SCHEMA_HTTPS) {
128 proxy_url = format!("{}{}", SCHEMA_HTTP, proxy_url);
129 }
130 if self.config_holder.proxy_port >= 0 {
131 proxy_url = format!("{}:{}", proxy_url, self.config_holder.proxy_port);
132 }
133
134 let (domain, schema, _) = self.config_holder.parse_domain(proxy_url.as_str())?;
135 if self.config_holder.proxy_username != "" && self.config_holder.proxy_password != "" {
136 proxy_url = format!("{}://{}:{}@{}", schema, self.config_holder.proxy_username, self.config_holder.proxy_password, domain);
137 } else {
138 proxy_url = format!("{}://{}", schema, domain);
139 }
140 match Proxy::http(proxy_url.as_str()) {
141 Err(e) => return Err(TosError::client_error_with_cause("build http proxy error", GenericError::DefaultError(e.to_string()))),
142 Ok(proxy) => {
143 client = client.proxy(proxy);
144 }
145 }
146
147 match Proxy::https(proxy_url) {
148 Err(e) => return Err(TosError::client_error_with_cause("build https proxy error", GenericError::DefaultError(e.to_string()))),
149 Ok(proxy) => {
150 client = client.proxy(proxy);
151 }
152 }
153 } else {
154 client = client.no_proxy();
155 }
156 } else {
157 client = client.no_proxy();
158 }
159
160 if !self.config_holder.enable_verify_ssl {
161 client = client.danger_accept_invalid_certs(true);
162 #[cfg(feature = "use-native-tls")]
163 {
164 client = client.danger_accept_invalid_hostnames(true);
165 }
166 }
167
168 #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
169 if self.config_holder.ca_crt != "" {
170 client = client.add_root_certificate(build_certificate(self.config_holder.ca_crt.as_str())?);
171 }
172
173 #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
174 if self.config_holder.client_crt != "" && self.config_holder.client_key != "" {
175 client = client.identity(build_identity(self.config_holder.client_crt.as_str(), self.config_holder.client_key.as_str())?);
176 }
177
178 let async_runtime = Arc::new(self.async_runtime);
179 let closed = Arc::new(AtomicI8::new(0));
180 let mut _handlers = create_handlers::<S>();
181 let (sender, _receiver) = async_channel::bounded(1);
182 #[cfg(feature = "tokio-runtime")]
183 if self.config_holder.dns_cache_time > 0 {
184 let port;
185 if let Some(pt) = self.config_holder.port {
186 port = pt;
187 } else if self.config_holder.schema == SCHEMA_HTTPS {
188 port = 443;
189 } else {
190 port = 80;
191 }
192 let (resolver, handler) = crate::asynchronous::dns::InternalDnsResolver::new(self.config_holder.dns_cache_time, self.config_holder.dns_cache_async_refresh,
193 port, async_runtime.clone(), closed.clone(), _receiver.clone());
194 client = client.dns_resolver(Arc::new(resolver));
195 _handlers.push(handler);
196 }
197
198 let cp;
199 let mut credentials_can_refresh = false;
200 match self.credentials_provider {
201 Some(p) => {
202 cp = p;
203 }
204 None => {
205 match C::new(self.ak, self.sk, self.security_token) {
206 Err(ex) => return Err(TosError::client_error_with_cause("create credentials error",
207 GenericError::DefaultError(ex.to_string()))),
208 Ok(c) => {
209 match P::new(c) {
210 Err(ex) => return Err(TosError::client_error_with_cause("create credentials provider error",
211 GenericError::DefaultError(ex.to_string()))),
212 Ok(p) => {
213 credentials_can_refresh = true;
214 cp = p;
215 }
216 }
217 }
218 }
219 }
220 }
221
222 match client.build() {
223 Ok(client) => {
224 #[cfg(not(feature = "tokio-runtime"))]
225 {
226 Ok(TosClientImpl {
227 client,
228 config_holder: ArcSwap::from(Arc::new(self.config_holder)),
229 credentials_provider: ArcSwap::from(Arc::new(cp)),
230 credentials_can_refresh,
231 async_runtime,
232 c: self.c,
233 closed,
234 closed_sender: sender,
235 })
236 }
237
238 #[cfg(feature = "tokio-runtime")]
239 {
240 let credentials_provider = Arc::new(cp);
241 let inner_credentials = Arc::new(tokio::sync::RwLock::new(None));
242 if !credentials_can_refresh {
243 let handler = async_refresh_credentials(closed.clone(), async_runtime.clone(),
244 credentials_provider.clone(), inner_credentials.clone(), _receiver.clone());
245 _handlers.push(Some(handler));
246 }
247 let tos_client = TosClientImpl {
248 client,
249 config_holder: ArcSwap::from(Arc::new(self.config_holder)),
250 credentials_provider: ArcSwap::from(credentials_provider),
251 credentials_can_refresh,
252 async_runtime,
253 c: self.c,
254 closed,
255 closed_sender: sender,
256 inner_credentials,
257 cached_buckets: tokio::sync::RwLock::new(HashMap::new()),
258 handlers: tokio::sync::Mutex::new(Some(_handlers)),
259 };
260 Ok(tos_client)
261 }
262 }
263 Err(e) => {
264 Err(TosError::client_error_with_cause("build tos client error", GenericError::DefaultError(e.to_string())))
265 }
266 }
267 }
268
269 pub fn build_as_trait(self) -> Result<impl TosClient, TosError> {
270 let client = self.build()?;
271 Ok(client)
272 }
273
274 pub fn ak(mut self, ak: impl Into<String>) -> Self {
275 self.ak = ak.into();
276 self
277 }
278
279 pub fn sk(mut self, sk: impl Into<String>) -> Self {
280 self.sk = sk.into();
281 self
282 }
283
284 pub fn security_token(mut self, security_token: impl Into<String>) -> Self {
285 self.security_token = security_token.into();
286 self
287 }
288
289 pub fn credentials_provider(mut self, p: P) -> Self {
290 self.credentials_provider = Some(p);
291 self
292 }
293
294 pub fn region(mut self, region: impl Into<String>) -> Self {
295 self.region = region.into();
296 self
297 }
298
299 pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
300 self.endpoint = endpoint.into();
301 self
302 }
303
304 pub fn control_endpoint(mut self, control_endpoint: impl Into<String>) -> Self {
305 self.control_endpoint = control_endpoint.into();
306 self
307 }
308
309 pub fn request_timeout(mut self, request_timeout: isize) -> Self {
310 if request_timeout > 0 {
311 self.config_holder.request_timeout = request_timeout;
312 }
313 self
314 }
315
316 pub fn connection_timeout(mut self, connection_timeout: isize) -> Self {
317 if connection_timeout > 0 {
318 self.config_holder.connection_timeout = connection_timeout;
319 }
320 self
321 }
322
323 pub fn max_connections(mut self, max_connections: isize) -> Self {
324 if max_connections > 0 {
325 self.config_holder.max_connections = max_connections;
326 }
327 self
328 }
329 pub fn idle_connection_time(mut self, idle_connection_time: isize) -> Self {
330 if idle_connection_time > 0 {
331 self.config_holder.idle_connection_time = idle_connection_time;
332 }
333 self
334 }
335
336 pub fn enable_crc(mut self, enable_crc: bool) -> Self {
337 self.config_holder.enable_crc = enable_crc;
338 self
339 }
340
341 pub fn enable_verify_ssl(mut self, enable_verify_ssl: bool) -> Self {
342 self.config_holder.enable_verify_ssl = enable_verify_ssl;
343 self
344 }
345
346 pub fn max_retry_count(mut self, max_retry_count: isize) -> Self {
347 self.config_holder.max_retry_count = max_retry_count;
348 self
349 }
350 pub fn auto_recognize_content_type(mut self, auto_recognize_content_type: bool) -> Self {
351 self.config_holder.auto_recognize_content_type = auto_recognize_content_type;
352 self
353 }
354 pub fn is_custom_domain(mut self, is_custom_domain: bool) -> Self {
355 self.config_holder.is_custom_domain = is_custom_domain;
356 self
357 }
358 pub fn proxy_host(mut self, proxy_host: impl Into<String>) -> Self {
359 self.config_holder.proxy_host = proxy_host.into();
360 self
361 }
362 pub fn proxy_port(mut self, proxy_port: isize) -> Self {
363 self.config_holder.proxy_port = proxy_port.into();
364 self
365 }
366 pub fn proxy_username(mut self, proxy_username: impl Into<String>) -> Self {
367 self.config_holder.proxy_username = proxy_username.into();
368 self
369 }
370 pub fn proxy_password(mut self, proxy_password: impl Into<String>) -> Self {
371 self.config_holder.proxy_password = proxy_password.into();
372 self
373 }
374 pub fn disable_encoding_meta(mut self, disable_encoding_meta: bool) -> Self {
375 self.config_holder.disable_encoding_meta = disable_encoding_meta;
376 self
377 }
378 pub fn expect_100_continue_threshold(mut self, expect_100_continue_threshold: isize) -> Self {
379 self.config_holder.expect_100_continue_threshold = expect_100_continue_threshold;
380 self
381 }
382 #[cfg(feature = "tokio-runtime")]
383 pub fn dns_cache_time(mut self, dns_cache_time: isize) -> Self {
384 self.config_holder.dns_cache_time = dns_cache_time;
385 self
386 }
387
388 #[cfg(feature = "tokio-runtime")]
389 pub fn dns_cache_async_refresh(mut self, dns_cache_async_refresh: bool) -> Self {
390 self.config_holder.dns_cache_async_refresh = dns_cache_async_refresh;
391 self
392 }
393
394 pub fn high_latency_log_threshold(mut self, high_latency_log_threshold: isize) -> Self {
395 self.config_holder.high_latency_log_threshold = high_latency_log_threshold;
396 self
397 }
398 pub fn user_agent_product_name(mut self, user_agent_product_name: impl Into<String>) -> Self {
399 self.config_holder.user_agent_product_name = user_agent_product_name.into();
400 self
401 }
402 pub fn user_agent_soft_name(mut self, user_agent_soft_name: impl Into<String>) -> Self {
403 self.config_holder.user_agent_soft_name = user_agent_soft_name.into();
404 self
405 }
406 pub fn user_agent_soft_version(mut self, user_agent_soft_version: impl Into<String>) -> Self {
407 self.config_holder.user_agent_soft_version = user_agent_soft_version.into();
408 self
409 }
410 pub fn user_agent_customized_key_values(mut self, user_agent_customized_key_values: impl Into<HashMap<String, String>>) -> Self {
411 self.config_holder.user_agent_customized_key_values = Some(user_agent_customized_key_values.into());
412 self
413 }
414 pub fn follow_redirect_times(mut self, follow_redirect_times: isize) -> Self {
415 self.config_holder.follow_redirect_times = follow_redirect_times;
416 self
417 }
418
419 #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
420 pub fn client_crt(mut self, client_crt: impl Into<String>) -> Self {
421 self.config_holder.client_crt = client_crt.into();
422 self
423 }
424
425 #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
426 pub fn client_key(mut self, client_key: impl Into<String>) -> Self {
427 self.config_holder.client_key = client_key.into();
428 self
429 }
430
431 #[cfg(any(feature = "use-native-tls", feature = "use-rustls"))]
432 pub fn ca_crt(mut self, ca_crt: impl Into<String>) -> Self {
433 self.config_holder.ca_crt = ca_crt.into();
434 self
435 }
436 pub fn async_runtime(mut self, async_runtime: impl Into<S>) -> Self {
437 self.async_runtime = async_runtime.into();
438 self
439 }
440}
441
442pub fn builder<S>() -> TosClientBuilder<CommonCredentialsProvider<CommonCredentials>, CommonCredentials, S>
443where
444 S: AsyncRuntime + Default,
445{
446 TosClientBuilder::default()
447}
448
449pub fn builder_common<P, C, S>() -> TosClientBuilder<P, C, S>
450where
451 S: AsyncRuntime + Default,
452 P: CredentialsProvider<C> + Send + Sync + Default + 'static,
453 C: Credentials + Send + Sync + Default + 'static,
454{
455 TosClientBuilder::default()
456}
457
458pub fn static_credentials_provider(ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>)
459 -> StaticCredentialsProvider<CommonCredentials> {
460 StaticCredentialsProvider::new(ak, sk, security_token).unwrap()
461}
462#[cfg(not(feature = "tokio-runtime"))]
463fn create_handlers<S>() -> Vec<i32> {
464 Vec::with_capacity(2)
465}
466
467#[cfg(feature = "tokio-runtime")]
468fn create_handlers<S>() -> Vec<Option<BoxFuture<'static, Result<(), S::JoinError>>>>
469where
470 S: AsyncRuntime,
471{
472 Vec::with_capacity(2)
473}
474
475#[cfg(feature = "tokio-runtime")]
476fn async_refresh_credentials<P, C, S>(closed: Arc<AtomicI8>, async_runtime: Arc<S>,
477 credentials_provider: Arc<P>,
478 inner_credentials: Arc<tokio::sync::RwLock<Option<Arc<C>>>>,
479 receiver: async_channel::Receiver<()>) -> BoxFuture<'static, Result<(), S::JoinError>>
480where
481 P: CredentialsProvider<C> + Send + Sync + 'static,
482 C: Credentials + Send + Sync + 'static,
483 S: AsyncRuntime + Send + Sync + 'static,
484{
485 let async_runtime2 = async_runtime.clone();
486 async_runtime.spawn(async move {
487 loop {
488 if closed.load(Ordering::Acquire) == 1 {
489 return;
490 }
491 tokio::select! {
492 _ = async_runtime2.sleep(Duration::from_secs(crate::constant::CREDENTIALS_REFRESH_INTERVAL)) => {}
493
494 _ = receiver.recv() =>{
495 return;
496 }
497 }
498 match credentials_provider.credentials(CREDENTIALS_EXPIRES).await {
499 Err(ex) => warn!(target: get_common_log_target(), "async load credentials error, {}", ex),
500 Ok(c) => {
501 let mut inner_credentials = inner_credentials.write().await;
502 *inner_credentials = Some(c.clone());
503 }
504 }
505 }
506 })
507}
508
509pub fn env_credentials_provider() -> EnvCredentialsProvider<CommonCredentials> {
510 EnvCredentialsProvider::new().unwrap()
511}
512
513pub struct BufferStream {
514 inner: Option<Bytes>,
515}
516
517impl Stream for BufferStream {
518 type Item = Result<Bytes, crate::error::CommonError>;
519
520 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
521 if self.inner.is_none() {
522 return Poll::Ready(None);
523 }
524 Poll::Ready(Some(Ok(self.inner.take().unwrap())))
525 }
526
527 fn size_hint(&self) -> (usize, Option<usize>) {
528 match &self.inner {
529 None => (0, None),
530 Some(v) => (0, Some(v.len()))
531 }
532 }
533}
534
535pub fn new_stream(data: impl AsRef<[u8]>) -> BufferStream {
536 BufferStream { inner: Some(Bytes::from(data.as_ref().to_owned())) }
537}
538
539pub fn new_stream_nocopy(data: impl Into<Vec<u8>>) -> BufferStream {
540 BufferStream { inner: Some(Bytes::from(data.into())) }
541}
542
543#[async_trait]
544pub trait TosClient: BucketAPI + ObjectAPI + MultipartAPI + PaginatorAPI + ControlAPI + SignerAPI + ConfigAware {
545 fn refresh_credentials(&self, ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>) -> bool;
546 fn refresh_endpoint_region(&self, endpoint: impl Into<String>, region: impl Into<String>) -> bool;
547}
548
549#[cfg(feature = "tokio-runtime")]
550pub(crate) type BucketCache = (GetBucketTypeOutput, chrono::DateTime<chrono::Utc>);
551
552pub struct TosClientImpl<P, C, S>
553where
554 S: AsyncRuntime,
555{
556 pub(crate) client: Client,
557 pub(crate) config_holder: ArcSwap<ConfigHolder>,
558 pub(crate) credentials_provider: ArcSwap<P>,
559 pub(crate) async_runtime: Arc<S>,
560 pub(crate) c: PhantomData<C>,
561 pub(crate) credentials_can_refresh: bool,
562 pub(crate) closed: Arc<AtomicI8>,
563 pub(crate) closed_sender: Sender<()>,
564
565 #[cfg(feature = "tokio-runtime")]
566 pub(crate) inner_credentials: Arc<tokio::sync::RwLock<Option<Arc<C>>>>,
567 #[cfg(feature = "tokio-runtime")]
568 pub(crate) cached_buckets: tokio::sync::RwLock<HashMap<String, BucketCache>>,
569 #[cfg(feature = "tokio-runtime")]
570 pub(crate) handlers: tokio::sync::Mutex<Option<Vec<Option<BoxFuture<'static, Result<(), S::JoinError>>>>>>,
571}
572
573impl<P, C, S> Debug for TosClientImpl<P, C, S>
574where
575 S: AsyncRuntime,
576{
577 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
578 write!(f, "client: {:?}, config_holder: {:?}", self.client, self.config_holder)
579 }
580}
581
582unsafe impl<P, C, S> Sync for TosClientImpl<P, C, S>
583where
584 S: AsyncRuntime,
585{}
586
587impl<P, C, S> ConfigAware for TosClientImpl<P, C, S>
588where
589 S: AsyncRuntime,
590{
591 fn is_custom_domain(&self) -> bool {
592 self.config_holder.load().is_custom_domain
593 }
594}
595#[async_trait]
596impl<P, C, S> ObjectAPI for TosClientImpl<P, C, S>
597where
598 P: CredentialsProvider<C> + Send + Sync + 'static,
599 C: Credentials + Send + Sync + 'static,
600 S: AsyncRuntime + Send + Sync + 'static,
601{
602 async fn put_object<B>(&self, input: &PutObjectInput<B>) -> Result<PutObjectOutput, TosError>
603 where
604 B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static,
605 {
606 self.do_request(input).await
607 }
608
609 async fn put_object_from_buffer(&self, input: &PutObjectFromBufferInput) -> Result<PutObjectOutput, TosError> {
610 self.do_request::<_, _, InternalReader<MultiBytes>>(input).await
611 }
612
613 #[cfg(feature = "tokio-runtime")]
614 async fn put_object_from_file(&self, input: &crate::object::PutObjectFromFileInput) -> Result<PutObjectOutput, TosError> {
615 self.do_request_af::<_, _, crate::asynchronous::file::FileReader>(input).await
616 }
617
618 async fn get_object(&self, input: &GetObjectInput) -> Result<GetObjectOutput, TosError> {
619 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
620 }
621 #[cfg(feature = "tokio-runtime")]
622 async fn get_object_to_file(&self, input: &crate::object::GetObjectToFileInput) -> Result<crate::object::GetObjectToFileOutput, TosError> {
623 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
624 }
625 async fn delete_object(&self, input: &DeleteObjectInput) -> Result<DeleteObjectOutput, TosError> {
626 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
627 }
628
629 async fn head_object(&self, input: &HeadObjectInput) -> Result<HeadObjectOutput, TosError> {
630 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
631 }
632
633 async fn list_objects_type2(&self, input: &ListObjectsType2Input) -> Result<ListObjectsType2Output, TosError> {
634 if input.list_only_once {
635 return self.do_request::<_, _, InternalReader<StreamVec>>(input).await;
636 }
637
638 let mut input = input.clone();
639 if input.max_keys <= 0 {
640 input.max_keys = DEFAULT_MAX_KEYS;
641 }
642 let mut _output: Option<ListObjectsType2Output> = None;
643 loop {
644 let mut temp_output = self.do_request::<ListObjectsType2Input, ListObjectsType2Output, InternalReader<StreamVec>>(&input).await?;
645 if _output.is_none() {
646 _output = Some(temp_output);
647 } else {
648 let output = _output.as_mut().unwrap();
649 output.key_count += temp_output.key_count;
650 output.is_truncated = temp_output.is_truncated;
651 output.next_continuation_token = temp_output.next_continuation_token;
652 output.contents.append(&mut temp_output.contents);
653 output.common_prefixes.append(&mut temp_output.common_prefixes);
654 }
655
656 let output = _output.as_ref().unwrap();
657 if !output.is_truncated || output.contents.len() + output.common_prefixes.len() >= input.max_keys as usize || output.key_count >= input.max_keys {
658 break;
659 }
660 input.continuation_token = output.next_continuation_token.clone();
661 input.max_keys = input.max_keys - output.key_count;
662 }
663
664 Ok(_output.unwrap())
665 }
666
667 async fn copy_object(&self, input: &CopyObjectInput) -> Result<CopyObjectOutput, TosError> {
668 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
669 }
670
671 async fn delete_multi_objects(&self, input: &DeleteMultiObjectsInput) -> Result<DeleteMultiObjectsOutput, TosError> {
672 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
673 }
674
675 async fn get_object_acl(&self, input: &GetObjectACLInput) -> Result<GetObjectACLOutput, TosError> {
676 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
677 }
678
679 async fn list_object_versions(&self, input: &ListObjectVersionsInput) -> Result<ListObjectVersionsOutput, TosError> {
680 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
681 }
682
683 async fn put_object_acl(&self, input: &PutObjectACLInput) -> Result<PutObjectACLOutput, TosError> {
684 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
685 }
686
687 async fn set_object_meta(&self, input: &SetObjectMetaInput) -> Result<SetObjectMetaOutput, TosError> {
688 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
689 }
690 async fn append_object<B>(&self, input: &AppendObjectInput<B>) -> Result<AppendObjectOutput, TosError>
691 where
692 B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static,
693 {
694 let mut hinput = GetBucketTypeInput::new(input.bucket());
695 hinput.set_request_host(input.request_host());
696 if let Some(request_date) = input.request_date() {
697 hinput.set_request_date(request_date);
698 }
699 if let Some(bt) = self.get_bucket_type(&hinput).await?.bucket_type() {
700 if bt == &BucketType::BucketTypeHns {
701 let (if_match, forbid_overwrite) = self.check_object_status(input.bucket(), input.key(),
702 input.offset(), input.content_length(), &input.inner.generic_input).await?;
703 if forbid_overwrite {
704 let mut minput: PutObjectInput<B> = PutObjectInput::default();
705 minput.set_request_host(input.request_host());
706 if let Some(request_date) = input.request_date() {
707 minput.set_request_date(request_date);
708 }
709 minput.inner = self.trans_append_object_input(&input.inner);
710 minput.set_forbid_overwrite(forbid_overwrite);
711 if let Some(adts) = input.async_data_transfer_listener() {
712 minput.set_async_data_transfer_listener(adts.clone());
713 }
714 if let Some(b) = input.content.take() {
715 minput.set_content(b);
716 }
717 let output = self.put_object(&minput).await?;
718 return Ok(AppendObjectOutput {
719 request_info: output.request_info,
720 next_append_offset: input.content_length(),
721 hash_crc64ecma: output.hash_crc64ecma,
722 });
723 }
724
725 let mut minput: ModifyObjectInput<B> = ModifyObjectInput::new(input.bucket(), input.key());
726 minput.set_request_host(input.request_host());
727 if let Some(request_date) = input.request_date() {
728 minput.set_request_date(request_date);
729 }
730 minput.pre_hash_crc64ecma = input.pre_hash_crc64ecma();
731 minput.set_if_match(if_match);
732 minput.set_offset(input.offset());
733 minput.set_content_length(input.content_length());
734 minput.set_notification_custom_parameters(input.notification_custom_parameters());
735 minput.set_traffic_limit(input.traffic_limit());
736 if let Some(adts) = input.async_data_transfer_listener() {
737 minput.set_async_data_transfer_listener(adts.clone());
738 }
739 if let Some(b) = input.content.take() {
740 minput.set_content(b);
741 }
742 let output = self.modify_object(&minput).await?;
743 return Ok(AppendObjectOutput {
744 request_info: output.request_info,
745 next_append_offset: output.next_modify_offset,
746 hash_crc64ecma: output.hash_crc64ecma,
747 });
748 }
749 }
750 self.do_request(input).await
751 }
752
753 async fn append_object_from_buffer(&self, input: &AppendObjectFromBufferInput) -> Result<AppendObjectOutput, TosError> {
754 let mut hinput = GetBucketTypeInput::new(input.bucket());
755 hinput.set_request_host(input.request_host());
756 if let Some(request_date) = input.request_date() {
757 hinput.set_request_date(request_date);
758 }
759 if let Some(bt) = self.get_bucket_type(&hinput).await?.bucket_type() {
760 if bt == &BucketType::BucketTypeHns {
761 let (if_match, forbid_overwrite) = self.check_object_status(input.bucket(), input.key(),
762 input.offset(), input.content_length(), &input.inner.generic_input).await?;
763 if forbid_overwrite {
764 let mut minput = PutObjectFromBufferInput::default();
765 minput.set_request_host(input.request_host());
766 if let Some(request_date) = input.request_date() {
767 minput.set_request_date(request_date);
768 }
769 minput.inner = self.trans_append_object_input(&input.inner);
770 minput.set_forbid_overwrite(forbid_overwrite);
771 if let Some(adts) = input.async_data_transfer_listener() {
772 minput.set_async_data_transfer_listener(adts.clone());
773 }
774 if let Some(b) = input.content() {
775 for item in b {
776 minput.append_content_nocopy(item.to_vec());
777 }
778 }
779 let output = self.put_object_from_buffer(&minput).await?;
780 return Ok(AppendObjectOutput {
781 request_info: output.request_info,
782 next_append_offset: input.content_length(),
783 hash_crc64ecma: output.hash_crc64ecma,
784 });
785 }
786
787 let mut minput = ModifyObjectFromBufferInput::new(input.bucket(), input.key());
788 minput.set_request_host(input.request_host());
789 if let Some(request_date) = input.request_date() {
790 minput.set_request_date(request_date);
791 }
792 minput.pre_hash_crc64ecma = input.pre_hash_crc64ecma();
793 minput.set_if_match(if_match);
794 minput.set_offset(input.offset());
795 minput.set_content_length(input.content_length());
796 minput.set_notification_custom_parameters(input.notification_custom_parameters());
797 minput.set_traffic_limit(input.traffic_limit());
798 if let Some(adts) = input.async_data_transfer_listener() {
799 minput.set_async_data_transfer_listener(adts.clone());
800 }
801 if let Some(b) = input.content() {
802 for item in b {
803 minput.append_content_nocopy(item.to_vec());
804 }
805 }
806 let output = self.modify_object_from_buffer(&minput).await?;
807 return Ok(AppendObjectOutput {
808 request_info: output.request_info,
809 next_append_offset: output.next_modify_offset,
810 hash_crc64ecma: output.hash_crc64ecma,
811 });
812 }
813 }
814 self.do_request::<_, _, InternalReader<MultiBytes>>(input).await
815 }
816 #[cfg(feature = "tokio-runtime")]
817 async fn append_object_from_file(&self, input: &crate::object::AppendObjectFromFileInput) -> Result<AppendObjectOutput, TosError> {
818 let mut hinput = GetBucketTypeInput::new(input.bucket());
819 hinput.set_request_host(input.request_host());
820 if let Some(request_date) = input.request_date() {
821 hinput.set_request_date(request_date);
822 }
823 if let Some(bt) = self.get_bucket_type(&hinput).await?.bucket_type() {
824 if bt == &BucketType::BucketTypeHns {
825 let (if_match, forbid_overwrite) = self.check_object_status(input.bucket(), input.key(),
826 input.offset(), input.content_length(), &input.inner.generic_input).await?;
827 if forbid_overwrite {
828 let mut minput = PutObjectFromFileInput::default();
829 minput.set_request_host(input.request_host());
830 if let Some(request_date) = input.request_date() {
831 minput.set_request_date(request_date);
832 }
833 minput.inner = self.trans_append_object_input(&input.inner);
834 minput.set_forbid_overwrite(forbid_overwrite);
835 if let Some(adts) = input.async_data_transfer_listener() {
836 minput.set_async_data_transfer_listener(adts.clone());
837 }
838 minput.set_file_path(input.file_path());
839 let output = self.put_object_from_file(&minput).await?;
840 return Ok(AppendObjectOutput {
841 request_info: output.request_info,
842 next_append_offset: input.content_length(),
843 hash_crc64ecma: output.hash_crc64ecma,
844 });
845 }
846
847 let mut minput = ModifyObjectFromFileInput::new(input.bucket(), input.key());
848 minput.set_request_host(input.request_host());
849 if let Some(request_date) = input.request_date() {
850 minput.set_request_date(request_date);
851 }
852 minput.pre_hash_crc64ecma = input.pre_hash_crc64ecma();
853 minput.set_if_match(if_match);
854 minput.set_offset(input.offset());
855 minput.set_content_length(input.content_length());
856 minput.set_notification_custom_parameters(input.notification_custom_parameters());
857 minput.set_traffic_limit(input.traffic_limit());
858 if let Some(adts) = input.async_data_transfer_listener() {
859 minput.set_async_data_transfer_listener(adts.clone());
860 }
861 minput.set_file_path(input.file_path());
862 let output = self.modify_object_from_file(&minput).await?;
863 return Ok(AppendObjectOutput {
864 request_info: output.request_info,
865 next_append_offset: output.next_modify_offset,
866 hash_crc64ecma: output.hash_crc64ecma,
867 });
868 }
869 }
870 self.do_request_af::<_, _, crate::asynchronous::file::FileReader>(input).await
871 }
872
873 async fn fetch_object(&self, input: &FetchObjectInput) -> Result<FetchObjectOutput, TosError> {
874 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
875 }
876
877 async fn put_fetch_task(&self, input: &PutFetchTaskInput) -> Result<PutFetchTaskOutput, TosError> {
878 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
879 }
880
881 async fn get_fetch_task(&self, input: &GetFetchTaskInput) -> Result<GetFetchTaskOutput, TosError> {
882 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
883 }
884
885 async fn put_object_tagging(&self, input: &PutObjectTaggingInput) -> Result<PutObjectTaggingOutput, TosError> {
886 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
887 }
888
889 async fn get_object_tagging(&self, input: &GetObjectTaggingInput) -> Result<GetObjectTaggingOutput, TosError> {
890 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
891 }
892
893 async fn delete_object_tagging(&self, input: &DeleteObjectTaggingInput) -> Result<DeleteObjectTaggingOutput, TosError> {
894 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
895 }
896
897 async fn rename_object(&self, input: &RenameObjectInput) -> Result<RenameObjectOutput, TosError> {
898 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
899 }
900
901 async fn restore_object(&self, input: &RestoreObjectInput) -> Result<RestoreObjectOutput, TosError> {
902 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
903 }
904
905 async fn put_symlink(&self, input: &PutSymlinkInput) -> Result<PutSymlinkOutput, TosError> {
906 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
907 }
908
909 async fn get_symlink(&self, input: &GetSymlinkInput) -> Result<GetSymlinkOutput, TosError> {
910 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
911 }
912
913 async fn get_file_status(&self, input: &GetFileStatusInput) -> Result<GetFileStatusOutput, TosError> {
914 let mut hinput = GetBucketTypeInput::new(input.bucket());
915 hinput.set_request_host(input.request_host());
916 if let Some(request_date) = input.request_date() {
917 hinput.set_request_date(request_date);
918 }
919 if let Some(bt) = self.get_bucket_type(&hinput).await?.bucket_type() {
920 if bt == &BucketType::BucketTypeHns {
921 let mut hinput = HeadObjectInput::new(&input.bucket, &input.key);
922 hinput.set_request_host(input.request_host());
923 if let Some(request_date) = input.request_date() {
924 hinput.set_request_date(request_date);
925 }
926 let o = self.head_object(&hinput).await?;
927 return Ok(GetFileStatusOutput {
928 request_info: o.request_info,
929 key: input.key.clone(),
930 size: o.content_length,
931 last_modified_string: None,
932 last_modified: o.last_modified,
933 crc32: "".to_string(),
934 crc64: o.hash_crc64ecma.to_string(),
935 etag: o.etag,
936 object_type: o.object_type,
937 });
938 }
939 }
940 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
941 }
942
943 async fn does_object_exist(&self, input: &DoesObjectExistInput) -> Result<bool, TosError> {
944 let mut hinput = HeadObjectInput::new_with_version_id(input.bucket(), input.key(), input.version_id());
945 if let Some(request_date) = input.request_date() {
946 hinput.set_request_date(request_date);
947 }
948 hinput.set_request_host(input.request_host());
949 match self.head_object(&hinput).await {
950 Ok(_) => Ok(true),
951 Err(ex) => {
952 if let Some(err) = ex.as_server_error() {
953 if err.status_code() == 400 && err.ec() == "0015-00000008" {
954 return Ok(true);
955 }
956 if err.status_code() == 404 && err.ec() == "0017-00000003" {
957 return Ok(false);
958 }
959 }
960 Err(ex)
961 }
962 }
963 }
964
965 async fn set_object_time(&self, input: &SetObjectTimeInput) -> Result<SetObjectTimeOutput, TosError> {
966 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
967 }
968}
969
970#[cfg(feature = "asynchronous")]
971#[async_trait]
972impl<P, C, S> BucketAPI for TosClientImpl<P, C, S>
973where
974 P: CredentialsProvider<C> + Send + Sync + 'static,
975 C: Credentials + Send + Sync + 'static,
976 S: AsyncRuntime + Send + Sync + 'static,
977{
978 async fn create_bucket(&self, input: &CreateBucketInput) -> Result<CreateBucketOutput, TosError> {
979 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
980 }
981
982 async fn head_bucket(&self, input: &HeadBucketInput) -> Result<HeadBucketOutput, TosError> {
983 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
984 }
985
986 async fn delete_bucket(&self, input: &DeleteBucketInput) -> Result<DeleteBucketOutput, TosError> {
987 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
988 }
989
990 async fn list_buckets(&self, input: &ListBucketsInput) -> Result<ListBucketsOutput, TosError> {
991 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
992 }
993
994 async fn put_bucket_cors(&self, input: &PutBucketCORSInput) -> Result<PutBucketCORSOutput, TosError> {
995 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
996 }
997
998 async fn get_bucket_cors(&self, input: &GetBucketCORSInput) -> Result<GetBucketCORSOutput, TosError> {
999 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1000 }
1001
1002 async fn delete_bucket_cors(&self, input: &DeleteBucketCORSInput) -> Result<DeleteBucketCORSOutput, TosError> {
1003 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1004 }
1005
1006 async fn put_bucket_storage_class(&self, input: &PutBucketStorageClassInput) -> Result<PutBucketStorageClassOutput, TosError> {
1007 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1008 }
1009
1010 async fn get_bucket_location(&self, input: &GetBucketLocationInput) -> Result<GetBucketLocationOutput, TosError> {
1011 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1012 }
1013
1014 async fn put_bucket_lifecycle(&self, input: &PutBucketLifecycleInput) -> Result<PutBucketLifecycleOutput, TosError> {
1015 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1016 }
1017
1018 async fn get_bucket_lifecycle(&self, input: &GetBucketLifecycleInput) -> Result<GetBucketLifecycleOutput, TosError> {
1019 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1020 }
1021
1022 async fn delete_bucket_lifecycle(&self, input: &DeleteBucketLifecycleInput) -> Result<DeleteBucketLifecycleOutput, TosError> {
1023 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1024 }
1025
1026 async fn put_bucket_policy(&self, input: &PutBucketPolicyInput) -> Result<PutBucketPolicyOutput, TosError> {
1027 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1028 }
1029
1030 async fn get_bucket_policy(&self, input: &GetBucketPolicyInput) -> Result<GetBucketPolicyOutput, TosError> {
1031 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1032 }
1033
1034 async fn delete_bucket_policy(&self, input: &DeleteBucketPolicyInput) -> Result<DeleteBucketPolicyOutput, TosError> {
1035 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1036 }
1037
1038 async fn put_bucket_mirror_back(&self, input: &PutBucketMirrorBackInput) -> Result<PutBucketMirrorBackOutput, TosError> {
1039 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1040 }
1041
1042 async fn get_bucket_mirror_back(&self, input: &GetBucketMirrorBackInput) -> Result<GetBucketMirrorBackOutput, TosError> {
1043 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1044 }
1045
1046 async fn delete_bucket_mirror_back(&self, input: &DeleteBucketMirrorBackInput) -> Result<DeleteBucketMirrorBackOutput, TosError> {
1047 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1048 }
1049
1050 async fn put_bucket_acl(&self, input: &PutBucketACLInput) -> Result<PutBucketACLOutput, TosError> {
1051 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1052 }
1053
1054 async fn get_bucket_acl(&self, input: &GetBucketACLInput) -> Result<GetBucketACLOutput, TosError> {
1055 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1056 }
1057
1058 async fn put_bucket_replication(&self, input: &PutBucketReplicationInput) -> Result<PutBucketReplicationOutput, TosError> {
1059 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1060 }
1061
1062 async fn get_bucket_replication(&self, input: &GetBucketReplicationInput) -> Result<GetBucketReplicationOutput, TosError> {
1063 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1064 }
1065
1066 async fn delete_bucket_replication(&self, input: &DeleteBucketReplicationInput) -> Result<DeleteBucketReplicationOutput, TosError> {
1067 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1068 }
1069
1070 async fn put_bucket_versioning(&self, input: &PutBucketVersioningInput) -> Result<PutBucketVersioningOutput, TosError> {
1071 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1072 }
1073
1074 async fn get_bucket_versioning(&self, input: &GetBucketVersioningInput) -> Result<GetBucketVersioningOutput, TosError> {
1075 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1076 }
1077
1078 async fn put_bucket_website(&self, input: &PutBucketWebsiteInput) -> Result<PutBucketWebsiteOutput, TosError> {
1079 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1080 }
1081
1082 async fn get_bucket_website(&self, input: &GetBucketWebsiteInput) -> Result<GetBucketWebsiteOutput, TosError> {
1083 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1084 }
1085
1086 async fn delete_bucket_website(&self, input: &DeleteBucketWebsiteInput) -> Result<DeleteBucketWebsiteOutput, TosError> {
1087 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1088 }
1089
1090 async fn put_bucket_custom_domain(&self, input: &PutBucketCustomDomainInput) -> Result<PutBucketCustomDomainOutput, TosError> {
1091 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1092 }
1093 async fn list_bucket_custom_domain(&self, input: &ListBucketCustomDomainInput) -> Result<ListBucketCustomDomainOutput, TosError> {
1094 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1095 }
1096
1097 async fn delete_bucket_custom_domain(&self, input: &DeleteBucketCustomDomainInput) -> Result<DeleteBucketCustomDomainOutput, TosError> {
1098 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1099 }
1100
1101 async fn put_bucket_real_time_log(&self, input: &PutBucketRealTimeLogInput) -> Result<PutBucketRealTimeLogOutput, TosError> {
1102 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1103 }
1104
1105 async fn get_bucket_real_time_log(&self, input: &GetBucketRealTimeLogInput) -> Result<GetBucketRealTimeLogOutput, TosError> {
1106 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1107 }
1108
1109 async fn delete_bucket_real_time_log(&self, input: &DeleteBucketRealTimeLogInput) -> Result<DeleteBucketRealTimeLogOutput, TosError> {
1110 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1111 }
1112
1113 async fn put_bucket_rename(&self, input: &PutBucketRenameInput) -> Result<PutBucketRenameOutput, TosError> {
1114 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1115 }
1116
1117 async fn get_bucket_rename(&self, input: &GetBucketRenameInput) -> Result<GetBucketRenameOutput, TosError> {
1118 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1119 }
1120
1121 async fn delete_bucket_rename(&self, input: &DeleteBucketRenameInput) -> Result<DeleteBucketRenameOutput, TosError> {
1122 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1123 }
1124
1125 async fn put_bucket_encryption(&self, input: &PutBucketEncryptionInput) -> Result<PutBucketEncryptionOutput, TosError> {
1126 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1127 }
1128
1129 async fn get_bucket_encryption(&self, input: &GetBucketEncryptionInput) -> Result<GetBucketEncryptionOutput, TosError> {
1130 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1131 }
1132
1133 async fn delete_bucket_encryption(&self, input: &DeleteBucketEncryptionInput) -> Result<DeleteBucketEncryptionOutput, TosError> {
1134 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1135 }
1136
1137 async fn put_bucket_tagging(&self, input: &PutBucketTaggingInput) -> Result<PutBucketTaggingOutput, TosError> {
1138 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1139 }
1140
1141 async fn get_bucket_tagging(&self, input: &GetBucketTaggingInput) -> Result<GetBucketTaggingOutput, TosError> {
1142 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1143 }
1144
1145 async fn delete_bucket_tagging(&self, input: &DeleteBucketTaggingInput) -> Result<DeleteBucketTaggingOutput, TosError> {
1146 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1147 }
1148
1149 async fn put_bucket_notification_type2(&self, input: &PutBucketNotificationType2Input) -> Result<PutBucketNotificationType2Output, TosError> {
1150 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1151 }
1152
1153 async fn get_bucket_notification_type2(&self, input: &GetBucketNotificationType2Input) -> Result<GetBucketNotificationType2Output, TosError> {
1154 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1155 }
1156
1157 async fn put_bucket_inventory(&self, input: &PutBucketInventoryInput) -> Result<PutBucketInventoryOutput, TosError> {
1158 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1159 }
1160
1161 async fn get_bucket_inventory(&self, input: &GetBucketInventoryInput) -> Result<GetBucketInventoryOutput, TosError> {
1162 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1163 }
1164
1165 async fn list_bucket_inventory(&self, input: &ListBucketInventoryInput) -> Result<ListBucketInventoryOutput, TosError> {
1166 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1167 }
1168
1169 async fn delete_bucket_inventory(&self, input: &DeleteBucketInventoryInput) -> Result<DeleteBucketInventoryOutput, TosError> {
1170 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1171 }
1172
1173 #[cfg(not(feature = "tokio-runtime"))]
1174 async fn get_bucket_type(&self, input: &GetBucketTypeInput) -> Result<GetBucketTypeOutput, TosError> {
1175 let mut hinput = HeadBucketInput::new(input.bucket());
1176 hinput.set_request_host(input.request_host());
1177 if let Some(request_date) = input.request_date() {
1178 hinput.set_request_date(request_date);
1179 }
1180 let output = self.head_bucket(&hinput).await?;
1181 Ok(GetBucketTypeOutput {
1182 request_info: output.request_info,
1183 region: output.region,
1184 storage_class: output.storage_class,
1185 az_redundancy: output.az_redundancy,
1186 project_name: output.project_name,
1187 bucket_type: output.bucket_type,
1188 expire_at: Default::default(),
1189 })
1190 }
1191
1192 async fn does_bucket_exist(&self, input: &DoesBucketExistInput) -> Result<bool, TosError> {
1193 let mut hinput = HeadBucketInput::new(input.bucket());
1194 if let Some(request_date) = input.request_date() {
1195 hinput.set_request_date(request_date);
1196 }
1197 hinput.set_request_host(input.request_host());
1198 match self.head_bucket(&hinput).await {
1199 Ok(_) => Ok(true),
1200 Err(ex) => {
1201 if let Some(err) = ex.as_server_error() {
1202 if err.status_code() == 404 && err.ec() == "0006-00000001" {
1203 return Ok(false);
1204 }
1205 }
1206 Err(ex)
1207 }
1208 }
1209 }
1210
1211 async fn get_bucket_info(&self, input: &GetBucketInfoInput) -> Result<GetBucketInfoOutput, TosError> {
1212 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1213 }
1214
1215 async fn put_bucket_access_monitor(&self, input: &PutBucketAccessMonitorInput) -> Result<PutBucketAccessMonitorOutput, TosError> {
1216 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1217 }
1218
1219 async fn get_bucket_access_monitor(&self, input: &GetBucketAccessMonitorInput) -> Result<GetBucketAccessMonitorOutput, TosError> {
1220 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1221 }
1222
1223 async fn put_bucket_trash(&self, input: &PutBucketTrashInput) -> Result<PutBucketTrashOutput, TosError> {
1224 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1225 }
1226
1227 async fn get_bucket_trash(&self, input: &GetBucketTrashInput) -> Result<GetBucketTrashOutput, TosError> {
1228 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1229 }
1230
1231 #[cfg(feature = "tokio-runtime")]
1232 async fn get_bucket_type(&self, input: &GetBucketTypeInput) -> Result<GetBucketTypeOutput, TosError> {
1233 crate::internal::check_bucket(input.bucket())?;
1234
1235 {
1236 let cached_buckets = self.cached_buckets.read().await;
1237 if let Some((output, ddl)) = cached_buckets.get(input.bucket()) {
1238 if ddl >= &chrono::Utc::now() {
1239 let mut output = output.clone();
1240 output.expire_at = *ddl;
1241 return Ok(output);
1242 }
1243 }
1244 }
1245
1246 let mut cached_buckets = self.cached_buckets.write().await;
1247 if let Some((output, ddl)) = cached_buckets.get(input.bucket()) {
1248 if ddl >= &chrono::Utc::now() {
1249 let mut output = output.clone();
1250 output.expire_at = *ddl;
1251 return Ok(output);
1252 }
1253 }
1254 let mut hinput = HeadBucketInput::new(input.bucket());
1255 hinput.set_request_host(input.request_host());
1256 if let Some(request_date) = input.request_date() {
1257 hinput.set_request_date(request_date);
1258 }
1259 let output = self.head_bucket(&hinput).await?;
1260 let mut rng = rand::thread_rng();
1261 let ddl = std::ops::Add::add(chrono::Utc::now(), chrono::Duration::minutes(rand::Rng::gen_range(&mut rng, 10..15) as i64));
1262 let output = GetBucketTypeOutput {
1263 request_info: output.request_info,
1264 region: output.region,
1265 storage_class: output.storage_class,
1266 az_redundancy: output.az_redundancy,
1267 project_name: output.project_name,
1268 bucket_type: output.bucket_type,
1269 expire_at: ddl,
1270 };
1271 cached_buckets.insert(input.bucket.clone(), (output.clone(), ddl));
1272 Ok(output)
1273 }
1274}
1275
1276#[async_trait]
1277impl<P, C, S> MultipartAPI for TosClientImpl<P, C, S>
1278where
1279 P: CredentialsProvider<C> + Send + Sync + 'static,
1280 C: Credentials + Send + Sync + 'static,
1281 S: AsyncRuntime + Send + Sync + 'static,
1282{
1283 async fn create_multipart_upload(&self, input: &CreateMultipartUploadInput) -> Result<CreateMultipartUploadOutput, TosError> {
1284 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1285 }
1286
1287 async fn upload_part<B>(&self, input: &UploadPartInput<B>) -> Result<UploadPartOutput, TosError>
1288 where
1289 B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static,
1290 {
1291 self.do_request(input).await
1292 }
1293
1294 async fn upload_part_from_buffer(&self, input: &UploadPartFromBufferInput) -> Result<UploadPartOutput, TosError> {
1295 self.do_request::<_, _, InternalReader<MultiBytes>>(input).await
1296 }
1297
1298 #[cfg(feature = "tokio-runtime")]
1299 async fn upload_part_from_file(&self, input: &crate::multipart::UploadPartFromFileInput) -> Result<UploadPartOutput, TosError> {
1300 self.do_request_af::<_, _, crate::asynchronous::file::FileReader>(input).await
1301 }
1302
1303 async fn complete_multipart_upload(&self, input: &CompleteMultipartUploadInput) -> Result<CompleteMultipartUploadOutput, TosError> {
1304 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1305 }
1306
1307 async fn abort_multipart_upload(&self, input: &AbortMultipartUploadInput) -> Result<AbortMultipartUploadOutput, TosError> {
1308 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1309 }
1310
1311 async fn upload_part_copy(&self, input: &UploadPartCopyInput) -> Result<UploadPartCopyOutput, TosError> {
1312 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1313 }
1314
1315 async fn list_multipart_uploads(&self, input: &ListMultipartUploadsInput) -> Result<ListMultipartUploadsOutput, TosError> {
1316 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1317 }
1318
1319 async fn list_parts(&self, input: &ListPartsInput) -> Result<ListPartsOutput, TosError> {
1320 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1321 }
1322}
1323
1324#[async_trait]
1325impl<P, C, S> SignerAPI for TosClientImpl<P, C, S>
1326where
1327 P: CredentialsProvider<C> + Send + Sync + 'static,
1328 C: Credentials + Send + Sync + 'static,
1329 S: AsyncRuntime + Send + Sync + 'static,
1330{
1331 async fn pre_signed_url(&self, input: &PreSignedURLInput) -> Result<PreSignedURLOutput, TosError> {
1332 let cred = self.load_credentials().await?;
1333 let ak = cred.ak();
1334 let sk = cred.sk();
1335 let security_token = cred.security_token();
1336 pre_signed_url(&self.config_holder, &ak, &sk, &security_token, input)
1337 }
1338
1339 async fn pre_signed_post_signature(&self, input: &PreSignedPostSignatureInput) -> Result<PreSignedPostSignatureOutput, TosError> {
1340 let cred = self.load_credentials().await?;
1341 let ak = cred.ak();
1342 let sk = cred.sk();
1343 let security_token = cred.security_token();
1344 pre_signed_post_signature(&self.config_holder, &ak, &sk, &security_token, input)
1345 }
1346
1347 async fn pre_signed_policy_url(&self, input: &PreSignedPolicyURLInput) -> Result<PreSignedPolicyURLOutput, TosError> {
1348 let cred = self.do_load_credentials().await?;
1349 let ak = cred.ak();
1350 let sk = cred.sk();
1351 let security_token = cred.security_token();
1352 pre_signed_policy_url(&self.config_holder, &ak, &sk, &security_token, input)
1353 }
1354}
1355
1356#[async_trait]
1357impl<P, C, S> ControlAPI for TosClientImpl<P, C, S>
1358where
1359 C: 'static + Credentials + Send + Sync,
1360 P: 'static + CredentialsProvider<C> + Send + Sync,
1361 S: 'static + AsyncRuntime + Send + Sync,
1362{
1363 async fn put_qos_policy(&self, input: &PutQosPolicyInput) -> Result<PutQosPolicyOutput, TosError> {
1364 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1365 }
1366
1367 async fn get_qos_policy(&self, input: &GetQosPolicyInput) -> Result<GetQosPolicyOutput, TosError> {
1368 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1369 }
1370
1371 async fn delete_qos_policy(&self, input: &DeleteQosPolicyInput) -> Result<DeleteQosPolicyOutput, TosError> {
1372 self.do_request::<_, _, InternalReader<StreamVec>>(input).await
1373 }
1374}
1375
1376#[async_trait]
1377impl<P, C, S> TosClient for TosClientImpl<P, C, S>
1378where
1379 P: CredentialsProvider<C> + Send + Sync + 'static,
1380 C: Credentials + Send + Sync + 'static,
1381 S: AsyncRuntime + Send + Sync + 'static,
1382{
1383 fn refresh_credentials(&self, ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>) -> bool {
1384 if !self.credentials_can_refresh {
1385 return false;
1386 }
1387 match C::new(ak, sk, security_token) {
1388 Err(_) => false,
1389 Ok(c) => {
1390 match P::new(c) {
1391 Err(_) => false,
1392 Ok(p) => {
1393 self.credentials_provider.store(Arc::new(p));
1394 true
1395 }
1396 }
1397 }
1398 }
1399 }
1400
1401 fn refresh_endpoint_region(&self, endpoint: impl Into<String>, region: impl Into<String>) -> bool {
1402 let c = self.config_holder.load();
1403 let mut config_holder = ConfigHolder {
1404 max_retry_count: c.max_retry_count,
1405 request_timeout: c.request_timeout,
1406 connection_timeout: c.connection_timeout,
1407 max_connections: c.max_connections,
1408 idle_connection_time: c.idle_connection_time,
1409 enable_crc: c.enable_crc,
1410 enable_verify_ssl: c.enable_verify_ssl,
1411 auto_recognize_content_type: c.auto_recognize_content_type,
1412 is_custom_domain: c.is_custom_domain,
1413 dns_cache_time: c.dns_cache_time,
1414 dns_cache_async_refresh: c.dns_cache_async_refresh,
1415 proxy_host: c.proxy_host.to_string(),
1416 proxy_port: c.proxy_port,
1417 proxy_username: c.proxy_username.clone(),
1418 proxy_password: c.proxy_password.clone(),
1419 disable_encoding_meta: c.disable_encoding_meta,
1420 expect_100_continue_threshold: c.expect_100_continue_threshold,
1421 high_latency_log_threshold: c.high_latency_log_threshold,
1422 user_agent_product_name: c.user_agent_product_name.clone(),
1423 user_agent_soft_name: c.user_agent_soft_name.clone(),
1424 user_agent_soft_version: c.user_agent_soft_version.clone(),
1425 user_agent_customized_key_values: c.user_agent_customized_key_values.clone(),
1426 follow_redirect_times: c.follow_redirect_times,
1427 client_crt: c.ca_crt.clone(),
1428 client_key: c.client_key.clone(),
1429 ca_crt: c.ca_crt.clone(),
1430 user_agent: c.user_agent.clone(),
1431 region: "".to_string(),
1432 schema: "".to_string(),
1433 domain: "".to_string(),
1434 port: None,
1435 schema_control: c.schema_control.clone(),
1436 domain_control: c.domain_control.clone(),
1437 };
1438 if let Err(_) = config_holder.check(endpoint, region) {
1439 return false;
1440 }
1441 self.config_holder.store(Arc::new(config_holder));
1442 true
1443 }
1444}
1445
1446impl<P, C, S> TosClientImpl<P, C, S>
1447where
1448 P: CredentialsProvider<C> + Send + Sync + 'static,
1449 C: Credentials + Send + Sync + 'static,
1450 S: AsyncRuntime + Send + Sync + 'static,
1451{
1452 async fn load_credentials(&self) -> Result<Arc<C>, TosError> {
1453 #[cfg(feature = "tokio-runtime")]
1454 {
1455 let inner_credentials = self.inner_credentials.read().await;
1456 if let Some(c) = inner_credentials.as_ref() {
1457 return Ok(c.clone());
1458 }
1459 }
1460 self.do_load_credentials().await
1461 }
1462
1463 async fn do_load_credentials(&self) -> Result<Arc<C>, TosError> {
1464 let credential_provider = self.credentials_provider.load();
1465 match credential_provider.credentials(CREDENTIALS_EXPIRES).await {
1466 Err(ex) => Err(TosError::client_error_with_cause("load credentials error", GenericError::DefaultError(ex.to_string()))),
1467 Ok(c) => Ok(c),
1468 }
1469 }
1470
1471 async fn modify_object<B>(&self, input: &ModifyObjectInput<B>) -> Result<ModifyObjectOutput, TosError>
1472 where
1473 B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static,
1474 {
1475 self.do_request(input).await
1476 }
1477
1478 async fn modify_object_from_buffer(&self, input: &ModifyObjectFromBufferInput) -> Result<ModifyObjectOutput, TosError> {
1479 self.do_request::<_, _, InternalReader<MultiBytes>>(input).await
1480 }
1481 #[cfg(feature = "tokio-runtime")]
1482 async fn modify_object_from_file(&self, input: &crate::object::ModifyObjectFromFileInput) -> Result<ModifyObjectOutput, TosError> {
1483 self.do_request_af::<_, _, crate::asynchronous::file::FileReader>(input).await
1484 }
1485
1486 async fn do_request<T, K, B>(&self, input: &T) -> Result<K, TosError>
1487 where
1488 T: InputTranslator<B>,
1489 K: OutputParser + RequestInfoTrait + Send,
1490 B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
1491 {
1492 self.do_request_common::<T, MockAsyncInputTranslator, K, B>(Some(input), None).await
1493 }
1494
1495 pub(crate) async fn do_request_common<T, F, K, B>(&self, input: Option<&T>, input2: Option<&F>) -> Result<K, TosError>
1496 where
1497 T: InputTranslator<B>,
1498 F: AsyncInputTranslator<B>,
1499 K: OutputParser + RequestInfoTrait + Send,
1500 B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
1501 {
1502 let config_holder = self.config_holder.load();
1503 let operation;
1504 if input.is_some() {
1505 operation = check_bucket_and_key(input.unwrap(), config_holder.is_custom_domain)?;
1506 } else {
1507 operation = check_bucket_and_key(input2.unwrap(), config_holder.is_custom_domain)?;
1508 }
1509 let mut retry_count = 0;
1510 let max_retry_count = config_holder.max_retry_count;
1511 loop {
1512 let start = Instant::now();
1513 let mut ac = AdditionalContext::new();
1514 let result = self.do_request_once::<T, F, K, B>(input, input2, retry_count, config_holder.clone(), &mut ac).await;
1515 let elapsed_ms = start.elapsed().as_millis();
1516 let exceed = exceed_high_latency_log_threshold(config_holder.high_latency_log_threshold, elapsed_ms, ac.request_size, operation);
1517 match result {
1518 Ok(k) => {
1519 if exceed {
1520 warn!(target: get_common_log_target(), "high latency request {} succeed, http status: {}, request id: {}, cost: {} ms", operation, k.status_code(), k.request_id(), elapsed_ms)
1521 } else {
1522 info!(target: get_common_log_target(), "do {} succeed, http status: {}, request id: {}, cost: {} ms", operation, k.status_code(), k.request_id(), elapsed_ms);
1523 }
1524 return Ok(k);
1525 }
1526 Err(mut e) => {
1527 match &e {
1528 TosError::TosClientError { .. } => {
1529 if exceed {
1530 warn!(target: get_common_log_target(), "high latency request {} failed, cost: {} ms", operation, elapsed_ms);
1531 } else {
1532 warn!(target: get_common_log_target(), "do {} failed, cost: {} ms", operation, elapsed_ms);
1533 }
1534 }
1535 TosError::TosServerError { status_code, request_id, ec, .. } => {
1536 if exceed {
1537 if status_code.to_owned() < 500 {
1538 warn!(target: get_common_log_target(), "high latency request {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
1539 request_id, ec, elapsed_ms);
1540 } else {
1541 warn!(target: get_common_log_target(), "high latency request {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
1542 request_id, ec, elapsed_ms);
1543 }
1544 } else {
1545 if status_code.to_owned() < 500 {
1546 warn!(target: get_common_log_target(), "do {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
1547 request_id, ec, elapsed_ms);
1548 } else {
1549 info!(target: get_common_log_target(), "do {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
1550 request_id, ec, elapsed_ms);
1551 }
1552 }
1553 }
1554 }
1555
1556 let (retry_after, need_retry) = check_need_retry(&e, retry_count, max_retry_count, operation);
1557 if !need_retry {
1558 if let Some(request_url) = ac.request_url {
1559 e.set_request_url(request_url);
1560 }
1561 return Err(e);
1562 }
1563 self.sleep_for_retry(retry_count, retry_after).await;
1564 retry_count += 1;
1565 }
1566 }
1567 }
1568 }
1569
1570
1571 async fn sleep_for_retry(&self, retry_count: isize, retry_after: isize) {
1572 let mut delay = BASE_DELAY_MS * 2u64.pow(retry_count as u32);
1573 if delay > MAX_DELAY_MS {
1574 delay = MAX_DELAY_MS;
1575 }
1576 let retry_after = retry_after as u64 * 1000;
1577 if retry_after > delay {
1578 delay = retry_after;
1579 }
1580
1581 self.async_runtime.sleep(Duration::from_millis(delay)).await;
1582 }
1583
1584 async fn do_request_once<'a, 'b, T, F, K, B>(&self, input: Option<&'b T>, input2: Option<&'b F>, retry_count: isize,
1585 config_holder: Arc<ConfigHolder>, ac: &mut AdditionalContext<'a>) -> Result<K, TosError>
1586 where
1587 T: InputTranslator<B>,
1588 F: AsyncInputTranslator<B>,
1589 K: OutputParser + Send,
1590 B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
1591 'b: 'a,
1592 {
1593 let mut request;
1594 if input.is_some() {
1595 let input = input.unwrap();
1596 request = input.trans(config_holder)?;
1597 ac.request_host = input.request_host();
1598 ac.request_date = input.request_date();
1599 ac.request_header = input.request_header();
1600 ac.request_query = input.request_query();
1601 ac.is_control_operation = input.is_control_operation();
1602 } else {
1603 let input2 = input2.unwrap();
1604 request = input2.trans(config_holder).await?;
1605 ac.request_host = input2.request_host();
1606 ac.request_date = input2.request_date();
1607 ac.request_header = input2.request_header();
1608 ac.request_query = input2.request_query();
1609 ac.is_control_operation = input2.is_control_operation();
1610 }
1611
1612 let body = request.body.take();
1613 request.retry_count = retry_count;
1614 let response = self.do_request_by_client(&mut request, body, ac).await?;
1615 if request.operation == GET_OBJECT_TO_FILE_OPERATION {
1616 if let Some(cl) = response.content_length() {
1617 ac.request_size = cl as i64;
1618 }
1619 let result = K::check_and_parse(request, response).await;
1620 return result;
1621 }
1622 K::check_and_parse(request, response).await
1623 }
1624
1625 async fn do_request_by_client<'a, 'c, B>(&self, request: &mut HttpRequest<'c, B>, body: Option<B>, ac: &mut AdditionalContext<'a>) -> Result<HttpResponse, TosError>
1626 where
1627 B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
1628 'a: 'c,
1629 {
1630 let config_holder = self.config_holder.load();
1631 if ac.is_control_operation && (config_holder.schema_control == "" || config_holder.domain_control == "") {
1632 return Err(TosError::client_error("request control operation but control endpoint is empty"));
1633 }
1634
1635 let cred = self.load_credentials().await?;
1636 let ak = cred.ak();
1637 let sk = cred.sk();
1638 let security_token = cred.security_token();
1639 auto_recognize_content_type(request, config_holder.auto_recognize_content_type);
1640 sign_header(request, ak, sk, security_token, config_holder.as_ref(), ac)?;
1641 request.enable_crc = config_holder.enable_crc;
1642
1643 let request_url = get_request_url(request, config_holder.as_ref(), ac.is_control_operation);
1644 ac.request_url = Some(request_url.clone());
1645 let mut rb = self.client.request(request.method.as_http_method(), request_url);
1646 let mut cl = -1i64;
1647 for kv in &request.header {
1648 if *kv.0 == HEADER_CONTENT_LENGTH || *kv.0 == HEADER_CONTENT_LENGTH_LOWER {
1649 if let Ok(x) = kv.1.parse::<i64>() {
1650 cl = x;
1651 }
1652 }
1653 rb = rb.header(*kv.0, kv.1);
1654 }
1655
1656 if let Some(meta) = &request.meta {
1657 for kv in meta {
1658 rb = rb.header(kv.0, kv.1);
1659 }
1660 }
1661
1662 if request.retry_count > 0 {
1663 rb = rb.header(HEADER_SDK_RETRY_COUNT, format!("attempt={}; max={}", request.retry_count, config_holder.max_retry_count));
1664 }
1665
1666 if config_holder.expect_100_continue_threshold > 0 && cl > config_holder.expect_100_continue_threshold as i64 {
1667 rb = rb.header(HEADER_EXPECT, "100-continue");
1668 }
1669
1670 let is_upload_operation = ALL_UPLOAD_OPERATIONS.contains_key(request.operation);
1671 let calc_crc = config_holder.enable_crc && is_upload_operation;
1672 let crc64 = Arc::new(AtomicU64::new(0));
1673 if let Some(bd) = body {
1675 if calc_crc {
1676 let mut reader = MultifunctionalReader::new(bd, Some(crc64.clone()), cl, request);
1677 if let Some(ref rc) = request.request_context {
1678 if let Some(init_crc64) = rc.init_crc64 {
1679 reader.init_crc64 = Some(init_crc64);
1680 }
1681 if is_upload_operation {
1682 if let Some(ref rl) = rc.rate_limiter {
1683 reader.set_rate_limiter(rl.clone());
1684 }
1685 if let Some(ref adts) = rc.async_data_transfer_listener {
1686 reader.set_async_data_transfer_listener(adts.clone());
1687 }
1688 }
1689 }
1690 rb = self.add_body(rb, reader, cl);
1691 } else if is_upload_operation {
1692 if let Some(ref rc) = request.request_context {
1693 let mut reader = MultifunctionalReader::new(bd, None, cl, request);
1694 if let Some(ref rl) = rc.rate_limiter {
1695 reader.set_rate_limiter(rl.clone());
1696 }
1697 if let Some(ref adts) = rc.async_data_transfer_listener {
1698 reader.set_async_data_transfer_listener(adts.clone());
1699 }
1700 rb = self.add_body(rb, reader, cl);
1701 } else {
1702 rb = self.add_body(rb, bd, cl);
1703 }
1704 } else {
1705 rb = self.add_body(rb, bd, cl);
1706 }
1707 } else if cl == -1 {
1708 rb = rb.header(HEADER_CONTENT_LENGTH, 0);
1709 }
1710
1711 match rb.build() {
1712 Ok(req) => {
1713 let result = self.client.execute(req).await;
1714 ac.request_size = cl;
1715 match result {
1716 Ok(resp) => {
1717 if calc_crc {
1718 let result = crc64.load(Ordering::Acquire);
1719 if request.request_context.is_none() {
1720 let mut rc = RequestContext::default();
1721 rc.crc64 = Some(result);
1722 request.request_context = Some(rc)
1723 } else {
1724 request.request_context.as_mut().unwrap().crc64 = Some(result);
1725 }
1726 }
1727 Ok(resp)
1728 }
1729 Err(e) => {
1730 Err(TosError::client_error_with_cause("do request error", GenericError::HttpRequestError(e.to_string())))
1731 }
1732 }
1733 }
1734 Err(e) => {
1735 Err(TosError::client_error_with_cause("build request error", GenericError::DefaultError(e.to_string())))
1736 }
1737 }
1738 }
1739
1740 fn add_body<B>(&self, rb: RequestBuilder, body: B, _: i64) -> RequestBuilder
1741 where
1742 B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + 'static,
1743 {
1744 rb.body(Body::wrap_stream(body))
1745 }
1746
1747 async fn check_object_status(&self, bucket: &str, key: &str, offset: i64, content_length: i64, ginput: &GenericInput) -> Result<(String, bool), TosError> {
1748 let mut if_match = String::new();
1749 let mut forbid_overwrite = false;
1750 if offset <= 0 && content_length >= 0 {
1751 let mut hinput = HeadObjectInput::new(bucket, key);
1752 hinput.set_request_host(ginput.request_host.as_str());
1753 if let Some(request_date) = ginput.request_date {
1754 hinput.set_request_date(request_date);
1755 }
1756 match self.head_object(&hinput).await {
1757 Ok(output) => {
1758 if output.content_length > 0 {
1759 return Err(TosError::client_error(format!("invalid offset, expected {}, actual {}", output.content_length, offset)));
1760 }
1761 if_match = output.etag().to_string();
1762 }
1763 Err(ex) => {
1764 if let Some(err) = ex.as_server_error() {
1765 if err.status_code() == 404 && err.ec() == "0017-00000003" {
1766 forbid_overwrite = true;
1767 }
1768 }
1769
1770 if !forbid_overwrite {
1771 return Err(ex);
1772 }
1773 }
1774 }
1775 }
1776 Ok((if_match, forbid_overwrite))
1777 }
1778
1779 fn trans_append_object_input(&self, input: &AppendObjectBasicInput) -> PutObjectBasicInput {
1780 let mut minput = PutObjectBasicInput::default();
1781 minput.bucket = input.bucket.clone();
1782 minput.key = input.key.clone();
1783 minput.set_cache_control(input.cache_control());
1784 minput.set_content_disposition(input.content_disposition());
1785 minput.set_content_encoding(input.content_encoding());
1786 minput.set_content_language(input.content_language());
1787 minput.set_content_type(input.content_type());
1788 if let Some(expires) = input.expires() {
1789 minput.set_expires(expires);
1790 }
1791 if let Some(acl) = input.acl() {
1792 minput.set_acl(acl);
1793 }
1794 minput.set_grant_full_control(input.grant_full_control());
1795 minput.set_grant_read(input.grant_read());
1796 minput.set_grant_read_acp(input.grant_read_acp());
1797 minput.set_grant_write_acp(input.grant_write_acp());
1798 minput.set_website_redirect_location(input.website_redirect_location());
1799 minput.object_expires = input.object_expires;
1800 minput.meta = input.meta.clone();
1801 if let Some(sc) = input.storage_class() {
1802 minput.set_storage_class(sc);
1803 }
1804 minput.set_content_length(input.content_length());
1805 minput.notification_custom_parameters = input.notification_custom_parameters.clone();
1806 minput.traffic_limit = input.traffic_limit;
1807 minput
1808 }
1809}
1810
1811impl<P, C, S> TosClientImpl<P, C, S>
1812where
1813 S: AsyncRuntime,
1814{
1815 pub fn close(&self) {
1816 let _ = self.closed.compare_exchange(0, 1, Ordering::AcqRel, Ordering::Relaxed);
1817 }
1818
1819 #[cfg(feature = "tokio-runtime")]
1820 pub async fn shutdown(&self) {
1821 self.close();
1822 self.closed_sender.close();
1823 if let Some(handlers) = self.handlers.lock().await.take() {
1824 for handler in handlers {
1825 if let Some(handler) = handler {
1826 let _ = handler.await;
1827 }
1828 }
1829 }
1830 }
1831}
1832
1833impl<P, C, S> Drop for TosClientImpl<P, C, S>
1834where
1835 S: AsyncRuntime,
1836{
1837 fn drop(&mut self) {
1838 self.close();
1839 }
1840}