/*
* Copyright (2024) Volcengine
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::asynchronous::bucket::BucketAPI;
use crate::asynchronous::http::HttpResponse;
use crate::asynchronous::internal::{AsyncInputTranslator, OutputParser};
use crate::asynchronous::multipart::MultipartAPI;
use crate::asynchronous::object::ObjectAPI;
use crate::asynchronous::paginator::PaginatorAPI;
use crate::asynchronous::reader::StreamVec;
use crate::auth::{pre_signed_policy_url, pre_signed_post_signature, pre_signed_url, sign_header, PreSignedPolicyURLInput, PreSignedPolicyURLOutput, PreSignedPostSignatureInput, PreSignedPostSignatureOutput, PreSignedURLInput, PreSignedURLOutput, SignerAPI};
use crate::bucket::{CreateBucketInput, CreateBucketOutput, DeleteBucketCORSInput, DeleteBucketCORSOutput, DeleteBucketCustomDomainInput, DeleteBucketCustomDomainOutput, DeleteBucketInput, DeleteBucketLifecycleInput, DeleteBucketLifecycleOutput, DeleteBucketMirrorBackInput, DeleteBucketMirrorBackOutput, DeleteBucketOutput, DeleteBucketPolicyInput, DeleteBucketPolicyOutput, DeleteBucketRealTimeLogInput, DeleteBucketRealTimeLogOutput, DeleteBucketRenameInput, DeleteBucketRenameOutput, DeleteBucketReplicationInput, DeleteBucketReplicationOutput, DeleteBucketWebsiteInput, DeleteBucketWebsiteOutput, GetBucketACLInput, GetBucketACLOutput, GetBucketCORSInput, GetBucketCORSOutput, GetBucketLifecycleInput, GetBucketLifecycleOutput, GetBucketLocationInput, GetBucketLocationOutput, GetBucketMirrorBackInput, GetBucketMirrorBackOutput, GetBucketPolicyInput, GetBucketPolicyOutput, GetBucketRealTimeLogInput, GetBucketRealTimeLogOutput, GetBucketRenameInput, GetBucketRenameOutput, GetBucketReplicationInput, GetBucketReplicationOutput, GetBucketVersioningInput, GetBucketVersioningOutput, GetBucketWebsiteInput, GetBucketWebsiteOutput, HeadBucketInput, HeadBucketOutput, ListBucketCustomDomainInput, ListBucketCustomDomainOutput, ListBucketsInput, ListBucketsOutput, PutBucketACLInput, PutBucketACLOutput, PutBucketCORSInput, PutBucketCORSOutput, PutBucketCustomDomainInput, PutBucketCustomDomainOutput, PutBucketLifecycleInput, PutBucketLifecycleOutput, PutBucketMirrorBackInput, PutBucketMirrorBackOutput, PutBucketPolicyInput, PutBucketPolicyOutput, PutBucketRealTimeLogInput, PutBucketRealTimeLogOutput, PutBucketRenameInput, PutBucketRenameOutput, PutBucketReplicationInput, PutBucketReplicationOutput, PutBucketStorageClassInput, PutBucketStorageClassOutput, PutBucketVersioningInput, PutBucketVersioningOutput, PutBucketWebsiteInput, PutBucketWebsiteOutput};
use crate::common::RequestInfoTrait;
use crate::config::ConfigHolder;
use crate::constant::{ALL_UPLOAD_OPERATIONS, BASE_DELAY_MS, DEFAULT_MAX_KEYS, HEADER_CONTENT_LENGTH, HEADER_CONTENT_LENGTH_LOWER, HEADER_SDK_RETRY_COUNT, MAX_DELAY_MS, SCHEMA_HTTP, SCHEMA_HTTPS};
use crate::credential::{CommonCredentials, CommonCredentialsProvider, Credentials, CredentialsProvider};
use crate::error::{GenericError, TosError};
use crate::http::{HttpRequest, RequestContext};
use crate::internal::{auto_recognize_content_type, check_bucket_and_key, check_need_retry, get_request_url, InputTranslator, MockAsyncInputTranslator};
use crate::multipart::{AbortMultipartUploadInput, AbortMultipartUploadOutput, CompleteMultipartUploadInput, CompleteMultipartUploadOutput, CreateMultipartUploadInput, CreateMultipartUploadOutput, ListMultipartUploadsInput, ListMultipartUploadsOutput, ListPartsInput, ListPartsOutput, UploadPartCopyInput, UploadPartCopyOutput, UploadPartFromBufferInput, UploadPartInput, UploadPartOutput};
use crate::object::{AppendObjectFromBufferInput, AppendObjectInput, AppendObjectOutput, CopyObjectInput, CopyObjectOutput, DeleteMultiObjectsInput, DeleteMultiObjectsOutput, DeleteObjectInput, DeleteObjectOutput, DeleteObjectTaggingInput, DeleteObjectTaggingOutput, FetchObjectInput, FetchObjectOutput, GetFetchTaskInput, GetFetchTaskOutput, GetObjectACLInput, GetObjectACLOutput, GetObjectInput, GetObjectOutput, GetObjectTaggingInput, GetObjectTaggingOutput, HeadObjectInput, HeadObjectOutput, ListObjectVersionsInput, ListObjectVersionsOutput, ListObjectsType2Input, ListObjectsType2Output, PutFetchTaskInput, PutFetchTaskOutput, PutObjectACLInput, PutObjectACLOutput, PutObjectFromBufferInput, PutObjectInput, PutObjectOutput, PutObjectTaggingInput, PutObjectTaggingOutput, RenameObjectInput, RenameObjectOutput, RestoreObjectInput, RestoreObjectOutput, SetObjectMetaInput, SetObjectMetaOutput};
use crate::reader::{InternalReader, MultifunctionalReader};
use crate::tos::ConfigAware;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use bytes::Bytes;
use futures_core::future::BoxFuture;
use futures_core::Stream;
use reqwest::{redirect, Body, Client, Proxy, RequestBuilder};
use std::error::Error;
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tracing::log::{info, warn};
#[async_trait]
pub trait AsyncRuntime {
type JoinError: Error;
async fn sleep(&self, duration: Duration);
fn spawn<'a, F>(&self, future: F) -> BoxFuture<'a, Result<F::Output, Self::JoinError>>
where
F: Future + Send + 'static,
F::Output: Send + 'static;
fn block_on<F: Future>(&self, future: F) -> F::Output;
}
#[derive(Debug, Clone, Default)]
pub struct TosClientBuilder<P, C, S>
{
ak: String,
sk: String,
security_token: String,
region: String,
endpoint: String,
credentials_provider: Option<P>,
config_holder: ConfigHolder,
async_runtime: S,
c: PhantomData<C>,
}
impl<P, C, S> TosClientBuilder<P, C, S>
where
P: CredentialsProvider<C> + Send + Sync + Debug + 'static,
C: Credentials + Send + Sync + Debug + 'static,
S: AsyncRuntime + Send + Sync + Debug + 'static,
{
pub fn build(mut self) -> Result<TosClientImpl<P, C, S>, TosError> {
self.config_holder.check(self.endpoint, self.region)?;
let mut client = Client::builder()
.user_agent(self.config_holder.user_agent.as_str())
.tcp_nodelay(true)
.tcp_keepalive(None)
.redirect(redirect::Policy::none())
.no_gzip()
.no_deflate()
.no_brotli()
.connect_timeout(Duration::from_millis(self.config_holder.connection_timeout as u64))
.pool_idle_timeout(Duration::from_millis(self.config_holder.idle_connection_time as u64))
.pool_max_idle_per_host(self.config_holder.max_connections as usize);
if self.config_holder.request_timeout > 0 {
client = client.timeout(Duration::from_millis(self.config_holder.request_timeout as u64));
}
if self.config_holder.proxy_host != "" {
let mut proxy_url = self.config_holder.proxy_host.as_str();
while proxy_url.len() > 0 && proxy_url.ends_with("/") {
proxy_url = &proxy_url[0..proxy_url.len() - 1];
}
if proxy_url != "" {
let mut proxy_url = proxy_url.to_lowercase();
if !proxy_url.starts_with(SCHEMA_HTTP) && !proxy_url.starts_with(SCHEMA_HTTPS) {
proxy_url = format!("{}{}", SCHEMA_HTTP, proxy_url);
}
if self.config_holder.proxy_port >= 0 {
proxy_url = format!("{}:{}", proxy_url, self.config_holder.proxy_port);
}
let (domain, schema) = self.config_holder.parse_domain(proxy_url.as_str())?;
if self.config_holder.proxy_username != "" && self.config_holder.proxy_password != "" {
proxy_url = format!("{}//{}:{}@{}", schema, self.config_holder.proxy_username, self.config_holder.proxy_password, domain);
} else {
proxy_url = format!("{}//{}", schema, domain);
}
match Proxy::http(proxy_url.as_str()) {
Err(e) => return Err(TosError::client_error_with_cause("build http proxy error", GenericError::DefaultError(e.to_string()))),
Ok(proxy) => {
client = client.proxy(proxy);
}
}
match Proxy::https(proxy_url) {
Err(e) => return Err(TosError::client_error_with_cause("build https proxy error", GenericError::DefaultError(e.to_string()))),
Ok(proxy) => {
client = client.proxy(proxy);
}
}
} else {
client = client.no_proxy();
}
} else {
client = client.no_proxy();
}
if !self.config_holder.enable_verify_ssl {
client = client.danger_accept_invalid_certs(true).danger_accept_invalid_hostnames(true);
}
let cp;
match self.credentials_provider {
Some(p) => {
cp = p;
}
None => {
cp = P::new(C::new(self.ak, self.sk, self.security_token));
}
}
match client.build() {
Ok(client) => {
Ok(TosClientImpl {
client,
config_holder: ArcSwap::from(Arc::new(self.config_holder)),
credentials_provider: ArcSwap::from(Arc::new(cp)),
async_runtime: Arc::new(self.async_runtime),
c: self.c,
})
}
Err(e) => {
Err(TosError::client_error_with_cause("build tos client error", GenericError::DefaultError(e.to_string())))
}
}
}
pub fn build_as_trait(self) -> Result<impl TosClient, TosError> {
let client = self.build()?;
Ok(client)
}
pub fn ak(mut self, ak: impl Into<String>) -> Self {
self.ak = ak.into();
self
}
pub fn sk(mut self, sk: impl Into<String>) -> Self {
self.sk = sk.into();
self
}
pub fn security_token(mut self, security_token: impl Into<String>) -> Self {
self.security_token = security_token.into();
self
}
pub(crate) fn credentials_provider(mut self, p: P) -> Self {
self.credentials_provider = Some(p);
self
}
pub fn region(mut self, region: impl Into<String>) -> Self {
self.region = region.into();
self
}
pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = endpoint.into();
self
}
pub fn request_timeout(mut self, request_timeout: isize) -> Self {
if request_timeout > 0 {
self.config_holder.request_timeout = request_timeout;
}
self
}
pub fn connection_timeout(mut self, connection_timeout: isize) -> Self {
if connection_timeout > 0 {
self.config_holder.connection_timeout = connection_timeout;
}
self
}
pub fn max_connections(mut self, max_connections: isize) -> Self {
if max_connections > 0 {
self.config_holder.max_connections = max_connections;
}
self
}
pub fn idle_connection_time(mut self, idle_connection_time: isize) -> Self {
if idle_connection_time > 0 {
self.config_holder.idle_connection_time = idle_connection_time;
}
self
}
pub fn enable_crc(mut self, enable_crc: bool) -> Self {
self.config_holder.enable_crc = enable_crc;
self
}
pub fn enable_verify_ssl(mut self, enable_verify_ssl: bool) -> Self {
self.config_holder.enable_verify_ssl = enable_verify_ssl;
self
}
pub fn max_retry_count(mut self, max_retry_count: isize) -> Self {
self.config_holder.max_retry_count = max_retry_count;
self
}
pub fn auto_recognize_content_type(mut self, auto_recognize_content_type: bool) -> Self {
self.config_holder.auto_recognize_content_type = auto_recognize_content_type;
self
}
pub fn is_custom_domain(mut self, is_custom_domain: bool) -> Self {
self.config_holder.is_custom_domain = is_custom_domain;
self
}
pub fn proxy_host(mut self, proxy_host: impl Into<String>) -> Self {
self.config_holder.proxy_host = proxy_host.into();
self
}
pub fn proxy_port(mut self, proxy_host: isize) -> Self {
self.config_holder.proxy_port = proxy_host.into();
self
}
pub fn proxy_username(mut self, proxy_username: impl Into<String>) -> Self {
self.config_holder.proxy_username = proxy_username.into();
self
}
pub fn proxy_password(mut self, proxy_password: impl Into<String>) -> Self {
self.config_holder.proxy_password = proxy_password.into();
self
}
pub fn async_sleeper(mut self, async_sleeper: impl Into<S>) -> Self {
self.async_runtime = async_sleeper.into();
self
}
}
pub fn builder<S>() -> TosClientBuilder<CommonCredentialsProvider<CommonCredentials>, CommonCredentials, S>
where
S: AsyncRuntime + Default,
{
TosClientBuilder::default()
}
pub struct BufferStream {
inner: Option<Vec<u8>>,
}
impl Stream for BufferStream {
type Item = Result<Bytes, crate::error::CommonError>;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.inner.is_none() {
return Poll::Ready(None);
}
Poll::Ready(Some(Ok(Bytes::from(self.inner.take().unwrap()))))
}
fn size_hint(&self) -> (usize, Option<usize>) {
match &self.inner {
None => (0, None),
Some(v) => (0, Some(v.len()))
}
}
}
pub fn new_stream(data: impl AsRef<[u8]>) -> BufferStream {
BufferStream { inner: Some(data.as_ref().to_owned()) }
}
#[async_trait]
pub trait TosClient: BucketAPI + ObjectAPI + MultipartAPI + PaginatorAPI + SignerAPI + ConfigAware + Debug {
fn refresh_credentials(&self, ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>) -> bool;
fn refresh_endpoint_region(&self, endpoint: impl Into<String>, region: impl Into<String>) -> bool;
}
#[derive(Debug)]
pub struct TosClientImpl<P, C, S> {
pub(crate) client: Client,
pub(crate) config_holder: ArcSwap<ConfigHolder>,
pub(crate) credentials_provider: ArcSwap<P>,
pub(crate) async_runtime: Arc<S>,
pub(crate) c: PhantomData<C>,
}
unsafe impl<P, C, S> Sync for TosClientImpl<P, C, S> {}
impl<P, C, S> ConfigAware for TosClientImpl<P, C, S> {
fn is_custom_domain(&self) -> bool {
self.config_holder.load().is_custom_domain
}
}
#[async_trait]
impl<P, C, S> ObjectAPI for TosClientImpl<P, C, S>
where
P: CredentialsProvider<C>,
C: Credentials,
S: AsyncRuntime,
{
async fn put_object<B>(&self, input: &PutObjectInput<B>) -> Result<PutObjectOutput, TosError>
where
B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static,
{
self.do_request(input).await
}
async fn put_object_from_buffer(&self, input: &PutObjectFromBufferInput) -> Result<PutObjectOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
#[cfg(feature = "async-file")]
async fn put_object_from_file(&self, input: &crate::object::PutObjectFromFileInput) -> Result<PutObjectOutput, TosError> {
self.do_request_af::<_, _, crate::asynchronous::file::FileReader>(input).await
}
async fn get_object(&self, input: &GetObjectInput) -> Result<GetObjectOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
#[cfg(feature = "async-file")]
async fn get_object_to_file(&self, input: &crate::object::GetObjectToFileInput) -> Result<crate::object::GetObjectToFileOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_object(&self, input: &DeleteObjectInput) -> Result<DeleteObjectOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn head_object(&self, input: &HeadObjectInput) -> Result<HeadObjectOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn list_objects_type2(&self, input: &ListObjectsType2Input) -> Result<ListObjectsType2Output, TosError> {
if input.list_only_once {
return self.do_request::<_, _, InternalReader<StreamVec>>(input).await;
}
let mut input = input.clone();
if input.max_keys <= 0 {
input.max_keys = DEFAULT_MAX_KEYS;
}
let mut _output: Option<ListObjectsType2Output> = None;
loop {
let mut temp_output = self.do_request::<ListObjectsType2Input, ListObjectsType2Output, InternalReader<StreamVec>>(&input).await?;
if _output.is_none() {
_output = Some(temp_output);
} else {
let output = _output.as_mut().unwrap();
output.key_count += temp_output.key_count;
output.is_truncated = temp_output.is_truncated;
output.next_continuation_token = temp_output.next_continuation_token;
output.contents.append(&mut temp_output.contents);
output.common_prefixes.append(&mut temp_output.common_prefixes);
}
let output = _output.as_ref().unwrap();
if !output.is_truncated || output.contents.len() + output.common_prefixes.len() >= input.max_keys as usize || output.key_count >= input.max_keys {
break;
}
input.continuation_token = output.next_continuation_token.clone();
input.max_keys = input.max_keys - output.key_count;
}
Ok(_output.unwrap())
}
async fn copy_object(&self, input: &CopyObjectInput) -> Result<CopyObjectOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_multi_objects(&self, input: &DeleteMultiObjectsInput) -> Result<DeleteMultiObjectsOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_object_acl(&self, input: &GetObjectACLInput) -> Result<GetObjectACLOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn list_object_versions(&self, input: &ListObjectVersionsInput) -> Result<ListObjectVersionsOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_object_acl(&self, input: &PutObjectACLInput) -> Result<PutObjectACLOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn set_object_meta(&self, input: &SetObjectMetaInput) -> Result<SetObjectMetaOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn append_object<B>(&self, input: &AppendObjectInput<B>) -> Result<AppendObjectOutput, TosError>
where
B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static,
{
self.do_request(input).await
}
async fn append_object_from_buffer(&self, input: &AppendObjectFromBufferInput) -> Result<AppendObjectOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn fetch_object(&self, input: &FetchObjectInput) -> Result<FetchObjectOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_fetch_task(&self, input: &PutFetchTaskInput) -> Result<PutFetchTaskOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_fetch_task(&self, input: &GetFetchTaskInput) -> Result<GetFetchTaskOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_object_tagging(&self, input: &PutObjectTaggingInput) -> Result<PutObjectTaggingOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_object_tagging(&self, input: &GetObjectTaggingInput) -> Result<GetObjectTaggingOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_object_tagging(&self, input: &DeleteObjectTaggingInput) -> Result<DeleteObjectTaggingOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn rename_object(&self, input: &RenameObjectInput) -> Result<RenameObjectOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn restore_object(&self, input: &RestoreObjectInput) -> Result<RestoreObjectOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
}
#[cfg(feature = "asynchronous")]
#[async_trait]
impl<P, C, S> BucketAPI for TosClientImpl<P, C, S>
where
P: CredentialsProvider<C>,
C: Credentials,
S: AsyncRuntime,
{
async fn create_bucket(&self, input: &CreateBucketInput) -> Result<CreateBucketOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn head_bucket(&self, input: &HeadBucketInput) -> Result<HeadBucketOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_bucket(&self, input: &DeleteBucketInput) -> Result<DeleteBucketOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn list_buckets(&self, input: &ListBucketsInput) -> Result<ListBucketsOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_cors(&self, input: &PutBucketCORSInput) -> Result<PutBucketCORSOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_bucket_cors(&self, input: &GetBucketCORSInput) -> Result<GetBucketCORSOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_bucket_cors(&self, input: &DeleteBucketCORSInput) -> Result<DeleteBucketCORSOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_storage_class(&self, input: &PutBucketStorageClassInput) -> Result<PutBucketStorageClassOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_bucket_location(&self, input: &GetBucketLocationInput) -> Result<GetBucketLocationOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_lifecycle(&self, input: &PutBucketLifecycleInput) -> Result<PutBucketLifecycleOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_bucket_lifecycle(&self, input: &GetBucketLifecycleInput) -> Result<GetBucketLifecycleOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_bucket_lifecycle(&self, input: &DeleteBucketLifecycleInput) -> Result<DeleteBucketLifecycleOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_policy(&self, input: &PutBucketPolicyInput) -> Result<PutBucketPolicyOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_bucket_policy(&self, input: &GetBucketPolicyInput) -> Result<GetBucketPolicyOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_bucket_policy(&self, input: &DeleteBucketPolicyInput) -> Result<DeleteBucketPolicyOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_mirror_back(&self, input: &PutBucketMirrorBackInput) -> Result<PutBucketMirrorBackOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_bucket_mirror_back(&self, input: &GetBucketMirrorBackInput) -> Result<GetBucketMirrorBackOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_bucket_mirror_back(&self, input: &DeleteBucketMirrorBackInput) -> Result<DeleteBucketMirrorBackOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_acl(&self, input: &PutBucketACLInput) -> Result<PutBucketACLOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_bucket_acl(&self, input: &GetBucketACLInput) -> Result<GetBucketACLOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_replication(&self, input: &PutBucketReplicationInput) -> Result<PutBucketReplicationOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_bucket_replication(&self, input: &GetBucketReplicationInput) -> Result<GetBucketReplicationOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_bucket_replication(&self, input: &DeleteBucketReplicationInput) -> Result<DeleteBucketReplicationOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_versioning(&self, input: &PutBucketVersioningInput) -> Result<PutBucketVersioningOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_bucket_versioning(&self, input: &GetBucketVersioningInput) -> Result<GetBucketVersioningOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_website(&self, input: &PutBucketWebsiteInput) -> Result<PutBucketWebsiteOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_bucket_website(&self, input: &GetBucketWebsiteInput) -> Result<GetBucketWebsiteOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_bucket_website(&self, input: &DeleteBucketWebsiteInput) -> Result<DeleteBucketWebsiteOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_custom_domain(&self, input: &PutBucketCustomDomainInput) -> Result<PutBucketCustomDomainOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn list_bucket_custom_domain(&self, input: &ListBucketCustomDomainInput) -> Result<ListBucketCustomDomainOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_bucket_custom_domain(&self, input: &DeleteBucketCustomDomainInput) -> Result<DeleteBucketCustomDomainOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_real_time_log(&self, input: &PutBucketRealTimeLogInput) -> Result<PutBucketRealTimeLogOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_bucket_real_time_log(&self, input: &GetBucketRealTimeLogInput) -> Result<GetBucketRealTimeLogOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_bucket_real_time_log(&self, input: &DeleteBucketRealTimeLogInput) -> Result<DeleteBucketRealTimeLogOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn put_bucket_rename(&self, input: &PutBucketRenameInput) -> Result<PutBucketRenameOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn get_bucket_rename(&self, input: &GetBucketRenameInput) -> Result<GetBucketRenameOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn delete_bucket_rename(&self, input: &DeleteBucketRenameInput) -> Result<DeleteBucketRenameOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
}
#[async_trait]
impl<P, C, S> MultipartAPI for TosClientImpl<P, C, S>
where
C: Credentials + Debug,
P: CredentialsProvider<C> + Debug,
S: AsyncRuntime + Debug,
{
async fn create_multipart_upload(&self, input: &CreateMultipartUploadInput) -> Result<CreateMultipartUploadOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn upload_part<B>(&self, input: &UploadPartInput<B>) -> Result<UploadPartOutput, TosError>
where
B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static,
{
self.do_request(input).await
}
async fn upload_part_from_buffer(&self, input: &UploadPartFromBufferInput) -> Result<UploadPartOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
#[cfg(feature = "async-file")]
async fn upload_part_from_file(&self, input: &crate::multipart::UploadPartFromFileInput) -> Result<UploadPartOutput, TosError> {
self.do_request_af::<_, _, crate::asynchronous::file::FileReader>(input).await
}
async fn complete_multipart_upload(&self, input: &CompleteMultipartUploadInput) -> Result<CompleteMultipartUploadOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn abort_multipart_upload(&self, input: &AbortMultipartUploadInput) -> Result<AbortMultipartUploadOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn upload_part_copy(&self, input: &UploadPartCopyInput) -> Result<UploadPartCopyOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn list_multipart_uploads(&self, input: &ListMultipartUploadsInput) -> Result<ListMultipartUploadsOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
async fn list_parts(&self, input: &ListPartsInput) -> Result<ListPartsOutput, TosError> {
self.do_request::<_, _, InternalReader<StreamVec>>(input).await
}
}
impl<P, C, S> SignerAPI for TosClientImpl<P, C, S>
where
C: 'static + Credentials + Debug + Send + Sync,
P: 'static + CredentialsProvider<C> + Debug + Send + Sync,
S: 'static + AsyncRuntime + Debug + Send + Sync,
{
fn pre_signed_url(&self, input: &PreSignedURLInput) -> Result<PreSignedURLOutput, TosError> {
let cred = self.credentials_provider.load();
let cred = cred.credentials();
pre_signed_url(&self.config_holder, cred.ak(), cred.sk(), cred.security_token(), input)
}
fn pre_signed_post_signature(&self, input: &PreSignedPostSignatureInput) -> Result<PreSignedPostSignatureOutput, TosError> {
let cred = self.credentials_provider.load();
let cred = cred.credentials();
pre_signed_post_signature(&self.config_holder, cred.ak(), cred.sk(), cred.security_token(), input)
}
fn pre_signed_policy_url(&self, input: &PreSignedPolicyURLInput) -> Result<PreSignedPolicyURLOutput, TosError> {
let cred = self.credentials_provider.load();
let cred = cred.credentials();
pre_signed_policy_url(&self.config_holder, cred.ak(), cred.sk(), cred.security_token(), input)
}
}
#[async_trait]
impl<P, C, S> TosClient for TosClientImpl<P, C, S>
where
P: CredentialsProvider<C> + Send + Sync + Debug + 'static,
C: Credentials + Send + Sync + Debug + 'static,
S: AsyncRuntime + Send + Sync + Debug + 'static,
{
fn refresh_credentials(&self, ak: impl Into<String>, sk: impl Into<String>, security_token: impl Into<String>) -> bool {
self.credentials_provider.store(Arc::new(P::new(C::new(ak, sk, security_token))));
true
}
fn refresh_endpoint_region(&self, endpoint: impl Into<String>, region: impl Into<String>) -> bool {
let mut config_holder = ConfigHolder::default();
if let Err(_) = config_holder.check(endpoint, region) {
return false;
}
let c = self.config_holder.load();
config_holder.max_retry_count = c.max_retry_count;
config_holder.connection_timeout = c.connection_timeout;
config_holder.request_timeout = c.request_timeout;
config_holder.idle_connection_time = c.idle_connection_time;
config_holder.enable_verify_ssl = c.enable_verify_ssl;
self.config_holder.store(Arc::new(config_holder));
true
}
}
impl<P, C, S> TosClientImpl<P, C, S>
where
P: CredentialsProvider<C>,
C: Credentials,
S: AsyncRuntime,
{
async fn do_request<T, K, B>(&self, input: &T) -> Result<K, TosError>
where
T: InputTranslator<B>,
K: OutputParser + RequestInfoTrait + Send,
B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
{
self.do_request_common::<T, MockAsyncInputTranslator, K, B>(Some(input), None).await
}
pub(crate) async fn do_request_common<T, F, K, B>(&self, input: Option<&T>, input2: Option<&F>) -> Result<K, TosError>
where
T: InputTranslator<B>,
F: AsyncInputTranslator<B>,
K: OutputParser + RequestInfoTrait + Send,
B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
{
let config_holder = self.config_holder.load();
let operation;
if input.is_some() {
operation = check_bucket_and_key(input.unwrap(), config_holder.is_custom_domain)?;
} else {
operation = check_bucket_and_key(input2.unwrap(), config_holder.is_custom_domain)?;
}
let mut retry_count = 0;
let max_retry_count = config_holder.max_retry_count;
loop {
let start = Instant::now();
match self.do_request_once::<T, F, K, B>(input, input2, retry_count).await {
Ok(k) => {
info!("do {} succeed, http status: {}, request id: {}, cost: {} ms", operation, k.status_code(), k.request_id(), start.elapsed().as_millis());
return Ok(k);
}
Err(e) => {
match &e {
TosError::TosClientError { .. } => {
warn!("do {} failed, cost: {} ms", operation, start.elapsed().as_millis());
}
TosError::TosServerError { status_code, request_id, ec, .. } => {
if status_code.to_owned() < 500 {
warn!("do {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
request_id, ec, start.elapsed().as_millis());
} else {
info!("do {} finished, http status: {}, request id: {}, ec: {}, cost: {} ms", operation, status_code,
request_id, ec, start.elapsed().as_millis());
}
}
}
let (retry_after, need_retry) = check_need_retry(&e, retry_count, max_retry_count, operation);
if !need_retry {
return Err(e);
}
self.sleep_for_retry(retry_count, retry_after).await;
retry_count += 1;
}
}
}
}
async fn sleep_for_retry(&self, retry_count: isize, retry_after: isize) {
let mut delay = BASE_DELAY_MS * 2u64.pow(retry_count as u32);
if delay > MAX_DELAY_MS {
delay = MAX_DELAY_MS;
}
let retry_after = retry_after as u64 * 1000;
if retry_after > delay {
delay = retry_after;
}
self.async_runtime.sleep(Duration::from_millis(delay)).await;
}
async fn do_request_once<T, F, K, B>(&self, input: Option<&T>, input2: Option<&F>, retry_count: isize) -> Result<K, TosError>
where
T: InputTranslator<B>,
F: AsyncInputTranslator<B>,
K: OutputParser + Send,
B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
{
let mut request;
if input.is_some() {
request = input.unwrap().trans()?;
} else {
request = input2.unwrap().trans().await?;
}
let body = request.body.take();
request.retry_count = retry_count;
let response = self.do_request_by_client(&mut request, body).await?;
K::check_and_parse(request, response).await
}
async fn do_request_by_client<B>(&self, request: &mut HttpRequest<'_, B>, body: Option<B>) -> Result<HttpResponse, TosError>
where
B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
{
let config_holder = self.config_holder.load();
auto_recognize_content_type(request, config_holder.auto_recognize_content_type);
sign_header(request, self.credentials_provider.load().as_ref(), config_holder.as_ref())?;
request.enable_crc = config_holder.enable_crc;
let mut rb = self.client.request(request.method.as_http_method(), get_request_url(request, config_holder.as_ref()));
let mut cl = -1i64;
for kv in &request.header {
if *kv.0 == HEADER_CONTENT_LENGTH || *kv.0 == HEADER_CONTENT_LENGTH_LOWER {
if let Ok(x) = kv.1.parse::<i64>() {
cl = x;
}
}
rb = rb.header(*kv.0, kv.1);
}
if let Some(meta) = &request.meta {
for kv in meta {
rb = rb.header(kv.0, kv.1);
}
}
if request.retry_count > 0 {
rb = rb.header(HEADER_SDK_RETRY_COUNT, format!("attempt={}; max={}", request.retry_count, config_holder.max_retry_count));
}
let is_upload_operation = ALL_UPLOAD_OPERATIONS.contains_key(request.operation);
let calc_crc = config_holder.enable_crc && is_upload_operation;
let crc64 = Arc::new(AtomicU64::new(0));
// add body
if let Some(bd) = body {
if calc_crc {
let mut reader = MultifunctionalReader::new(bd, Some(crc64.clone()), cl, request);
if let Some(ref rc) = request.request_context {
if let Some(init_crc64) = rc.init_crc64 {
reader.init_crc64 = Some(init_crc64);
}
if is_upload_operation {
if let Some(ref rl) = rc.rate_limiter {
reader.set_rate_limiter(rl.clone());
}
if let Some(ref adts) = rc.async_data_transfer_listener {
reader.set_async_data_transfer_listener(adts.clone());
}
}
}
rb = self.add_body(rb, reader, cl);
} else if is_upload_operation {
if let Some(ref rc) = request.request_context {
let mut reader = MultifunctionalReader::new(bd, None, cl, request);
if let Some(ref rl) = rc.rate_limiter {
reader.set_rate_limiter(rl.clone());
}
if let Some(ref adts) = rc.async_data_transfer_listener {
reader.set_async_data_transfer_listener(adts.clone());
}
rb = self.add_body(rb, reader, cl);
} else {
rb = self.add_body(rb, bd, cl);
}
} else {
rb = self.add_body(rb, bd, cl);
}
} else if cl == -1 {
rb = rb.header(HEADER_CONTENT_LENGTH, 0);
}
match rb.build() {
Ok(req) => {
match self.client.execute(req).await {
Ok(resp) => {
if calc_crc {
let result = crc64.load(Ordering::Acquire);
if request.request_context.is_none() {
let mut rc = RequestContext::default();
rc.crc64 = Some(result);
request.request_context = Some(rc)
} else {
request.request_context.as_mut().unwrap().crc64 = Some(result);
}
}
Ok(resp)
}
Err(e) => {
Err(TosError::client_error_with_cause("do request error", GenericError::HttpRequestError(e.to_string())))
}
}
}
Err(e) => {
Err(TosError::client_error_with_cause("build request error", GenericError::DefaultError(e.to_string())))
}
}
}
fn add_body<B>(&self, rb: RequestBuilder, body: B, _: i64) -> RequestBuilder
where
B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + 'static,
{
rb.body(Body::wrap_stream(body))
}
}