use super::{
super::{
region::Region,
uploader::{UploadPolicy, UploadToken, UploadTokenParseError},
},
BatchUploader, ObjectUploader,
};
use crate::{config::Config, credential::Credential, http::Client};
use assert_impl::assert_impl;
use rayon::{ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder};
use std::{borrow::Cow, result::Result, sync::Arc};
use thiserror::Error;
#[derive(Clone)]
pub struct UploadManager {
http_client: Client,
thread_pool: Option<Arc<ThreadPool>>,
}
impl UploadManager {
pub fn new(config: Config) -> Self {
UploadManager {
http_client: Client::new(config),
thread_pool: None,
}
}
pub fn new_with_thread_pool(config: Config, thread_pool: Arc<ThreadPool>) -> Self {
UploadManager {
http_client: Client::new(config),
thread_pool: Some(thread_pool),
}
}
pub fn new_with_exclusive_thread_pool(
config: Config,
thread_pool_size: usize,
) -> Result<Self, ThreadPoolBuildError> {
let upload_manager = UploadManager {
http_client: Client::new(config),
thread_pool: Some(Arc::new(
ThreadPoolBuilder::new()
.num_threads(thread_pool_size)
.thread_name(move |index| format!("upload_manager_thread_{}_{}", thread_pool_size, index))
.build()?,
)),
};
Ok(upload_manager)
}
pub fn upload_for_upload_token<'a>(
&'a self,
upload_token: impl Into<Cow<'a, UploadToken>>,
) -> CreateUploaderResult<ObjectUploader<'a>> {
let upload_token = upload_token.into();
let access_key = upload_token.access_key()?;
if let Some(bucket_name) = upload_token.policy()?.bucket() {
let bucket_name = bucket_name.to_owned();
let up_urls_list = Region::query(&bucket_name, access_key, self.config().to_owned())
.map(|regions| extract_up_urls_list_from_regions(regions.iter(), self.config().use_https()))
.unwrap_or_else(|_| all_possible_up_urls_list(self.config().use_https()));
Ok(ObjectUploader::new(
self,
upload_token,
bucket_name.into(),
up_urls_list,
))
} else {
Err(CreateUploaderError::BucketIsMissingInUploadToken)
}
}
pub(in super::super) fn upload_for_internal_generated_upload_token_with_regions<'a>(
&'a self,
bucket_name: Cow<'a, str>,
upload_token: Cow<'a, UploadToken>,
regions: Option<impl Iterator<Item = &'a Region>>,
) -> ObjectUploader<'a> {
let up_urls_list = regions
.map(|regions| extract_up_urls_list_from_regions(regions, self.config().use_https()))
.unwrap_or_else(|| all_possible_up_urls_list(self.config().use_https()));
ObjectUploader::new(self, upload_token, bucket_name, up_urls_list)
}
pub fn upload_for_upload_policy(
&self,
upload_policy: UploadPolicy,
credential: Credential,
) -> CreateUploaderResult<ObjectUploader> {
self.upload_for_upload_token(UploadToken::new(upload_policy, credential))
}
pub fn upload_for_bucket(&self, bucket: impl Into<Cow<'static, str>>, credential: Credential) -> ObjectUploader {
self.upload_for_upload_token(UploadToken::new_from_bucket(bucket.into(), credential, self.config()))
.unwrap()
}
pub fn batch_uploader_for_upload_token(
&self,
upload_token: impl Into<UploadToken>,
) -> CreateUploaderResult<BatchUploader> {
BatchUploader::new_for_upload_manager(self.to_owned(), upload_token.into())
}
pub fn batch_uploader_for_upload_policy(
&self,
upload_policy: UploadPolicy,
credential: Credential,
) -> CreateUploaderResult<BatchUploader> {
self.batch_uploader_for_upload_token(UploadToken::new(upload_policy, credential))
}
pub fn batch_uploader_for_bucket(
&self,
bucket: impl Into<Cow<'static, str>>,
credential: Credential,
) -> BatchUploader {
self.batch_uploader_for_upload_token(UploadToken::new_from_bucket(bucket, credential, self.config()))
.unwrap()
}
#[inline]
pub(crate) fn thread_pool(&self) -> Option<&Arc<ThreadPool>> {
self.thread_pool.as_ref()
}
#[inline]
pub(crate) fn http_client(&self) -> &Client {
&self.http_client
}
#[inline]
pub(crate) fn config(&self) -> &Config {
self.http_client.config()
}
#[allow(dead_code)]
fn ignore() {
assert_impl!(Send: Self);
assert_impl!(Sync: Self);
}
}
#[derive(Error, Debug)]
pub enum CreateUploaderError {
#[error("Failed to parse upload token: {0}")]
UploadTokenParseError(#[from] UploadTokenParseError),
#[error("Bucket is missing in upload token")]
BucketIsMissingInUploadToken,
}
pub type CreateUploaderResult<T> = Result<T, CreateUploaderError>;
fn extract_up_urls_list_from_regions<'a>(
iter: impl Iterator<Item = &'a Region>,
use_https: bool,
) -> Box<[Box<[Box<str>]>]> {
iter.map(|region| {
region
.up_urls_owned(use_https)
.into_iter()
.map(|url| url.into_owned().into_boxed_str())
.collect::<Box<[_]>>()
})
.collect()
}
fn all_possible_up_urls_list(use_https: bool) -> Box<[Box<[Box<str>]>]> {
Region::all()
.iter()
.map(|region| {
region
.up_urls_owned(use_https)
.into_iter()
.map(|url| url.into_owned().into_boxed_str())
.collect::<Box<[_]>>()
})
.collect()
}