use once_cell::sync::OnceCell;
use std::{sync::Arc, time::Duration};
use crate::{
aws::sts_service::AwsConfig,
awss3::aws::{AwsClient, normalize_s3_endpoint},
response::error::{AppError, AppResult},
};
pub struct OssConfig {
pub bucket: String,
pub region: String,
pub endpoint: String,
pub access_key: String,
pub secret_key: String,
pub force_path_style: bool,
}
static OSS_CONFIG: OnceCell<OssConfig> = OnceCell::new();
impl OssConfig {
pub fn from_env_config(cfg: &AwsConfig) -> OssConfig {
match cfg.cos_type.as_str() {
"aliyun" => OssConfig {
bucket: cfg.aliyun_bucket.clone(),
region: cfg.aliyun_region_id.clone(),
endpoint: cfg.aliyun_endpoint.clone(),
access_key: cfg.aliyun_accesskey_id.clone(),
secret_key: cfg.aliyun_accesskey_secret.clone(),
force_path_style: false,
},
"rustfs" => OssConfig {
bucket: cfg.rustfs_bucket.clone(),
region: cfg.rustfs_region_id.clone(),
endpoint: cfg.rustfs_endpoint.clone(),
access_key: cfg.rustfs_accesskey_id.clone(),
secret_key: cfg.rustfs_accesskey_secret.clone(),
force_path_style: true,
},
"minio" => OssConfig {
bucket: cfg.minio_bucket.clone(),
region: cfg.minio_region_id.clone(),
endpoint: cfg.minio_endpoint.clone(),
access_key: cfg.minio_accesskey_id.clone(),
secret_key: cfg.minio_accesskey_secret.clone(),
force_path_style: true,
},
_ => panic!("Unsupported COS type: {}", cfg.cos_type),
}
}
fn sdk_endpoint(&self) -> String {
normalize_s3_endpoint(&self.endpoint, &self.region)
}
}
pub struct AwsService;
impl AwsService {
pub fn init_from_env_config(config: &Arc<AwsConfig>) {
let _ = OSS_CONFIG.set(OssConfig::from_env_config(config.as_ref()));
}
pub async fn download_object(path: &str) -> AppResult<Vec<u8>> {
let cfg = OSS_CONFIG.get().expect("OSS_CONFIG not initialized");
let client = match Self::build_client(cfg).await {
Ok(client) => client,
Err(err) => {
tracing::error!("「download_object」Failed to create AWS client: {}", err);
return Err(err);
}
};
let data = match client.get_object(path).await {
Ok(data) => data,
Err(e) => {
tracing::error!(
"「download_object」Failed to download object from AWS: {}",
e
);
return Err(AppError::ClientError(e.to_string()));
}
};
Ok(data)
}
pub async fn put_object(path: &str, data: Vec<u8>) -> AppResult<()> {
let cfg = OSS_CONFIG.get().expect("OSS_CONFIG not initialized");
let client = match Self::build_client(cfg).await {
Ok(client) => client,
Err(err) => {
tracing::error!("「put_object」Failed to create AWS client: {}", err);
return Err(err);
}
};
match client.put_object(path, data).await {
Ok(data) => data,
Err(e) => {
tracing::error!("「put_object」Failed to upload object to AWS: {}", e);
return Err(AppError::ClientError(e.to_string()));
}
};
Ok(())
}
pub async fn get_signed_url(path: &str, expires_in: u64) -> AppResult<String> {
let cfg = OSS_CONFIG.get().expect("OSS_CONFIG not initialized");
let client = match Self::build_client(cfg).await {
Ok(client) => client,
Err(err) => {
tracing::error!("「get_signed_url」Failed to create AWS client: {}", err);
return Err(err);
}
};
match client
.get_presigned_url(path, Duration::from_secs(expires_in.max(1)))
.await
{
Ok(url) => {
tracing::info!("「get_signed_url」Generated signed URL for path: {}", path);
Ok(url)
}
Err(e) => {
tracing::error!("「get_signed_url」Failed to generate signed URL: {}", e);
return Err(AppError::ClientError(e.to_string()));
}
}
}
pub async fn get_signed_put_url(path: &str, expires_in: u64) -> AppResult<String> {
let cfg = OSS_CONFIG.get().expect("OSS_CONFIG not initialized");
let client = match Self::build_client(cfg).await {
Ok(client) => client,
Err(err) => {
tracing::error!("「get_signed_put_url」Failed to create AWS client: {}", err);
return Err(err);
}
};
match client
.get_presigned_put_url(path, Duration::from_secs(expires_in.max(1)))
.await
{
Ok(url) => {
tracing::info!(
"「get_signed_put_url」Generated signed PUT URL for path: {}",
path
);
Ok(url)
}
Err(e) => {
tracing::error!(
"「get_signed_put_url」Failed to generate signed PUT URL: {}",
e
);
Err(AppError::ClientError(e.to_string()))
}
}
}
pub async fn download_object_via_signed_url(path: &str, expires_in: u64) -> AppResult<Vec<u8>> {
let signed_url = Self::get_signed_url(path, expires_in).await?;
let safe_url = Self::redact_url(&signed_url);
let response = reqwest::Client::new()
.get(&signed_url)
.send()
.await
.map_err(|err| {
AppError::ClientError(format!(
"download_object_via_signed_url request failed: url={} err={}",
safe_url, err
))
})?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(AppError::ClientError(format!(
"download_object_via_signed_url failed: status={} url={} body={}",
status, safe_url, body
)));
}
let bytes = response.bytes().await.map_err(|err| {
AppError::ClientError(format!(
"download_object_via_signed_url read body failed: url={} err={}",
safe_url, err
))
})?;
Ok(bytes.to_vec())
}
pub async fn put_object_via_signed_url(
path: &str,
data: Vec<u8>,
expires_in: u64,
) -> AppResult<()> {
let signed_put_url = Self::get_signed_put_url(path, expires_in).await?;
let safe_url = Self::redact_url(&signed_put_url);
let response = reqwest::Client::new()
.put(&signed_put_url)
.body(data)
.send()
.await
.map_err(|err| {
AppError::ClientError(format!(
"put_object_via_signed_url request failed: url={} err={}",
safe_url, err
))
})?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(AppError::ClientError(format!(
"put_object_via_signed_url failed: status={} url={} body={}",
status, safe_url, body
)));
}
Ok(())
}
fn redact_url(url: &str) -> String {
url.split('?').next().unwrap_or(url).to_string()
}
async fn build_client(cfg: &OssConfig) -> AppResult<AwsClient> {
let endpoint = cfg.sdk_endpoint();
AwsClient::new_with_options(
&cfg.bucket,
&cfg.region,
&endpoint,
&cfg.access_key,
&cfg.secret_key,
cfg.force_path_style,
)
.await
.map_err(|e| AppError::ClientError(e.to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_aws_config(cos_type: &str) -> AwsConfig {
AwsConfig {
cos_type: cos_type.to_string(),
aliyun_accesskey_id: "aliyun-ak".to_string(),
aliyun_accesskey_secret: "aliyun-sk".to_string(),
aliyun_role_arn: "aliyun-role".to_string(),
aliyun_expiration: 3600,
aliyun_role_session_name: "aliyun-session".to_string(),
aliyun_endpoint: "https://oss-cn-hangzhou.aliyuncs.com".to_string(),
aliyun_region_id: "cn-hangzhou".to_string(),
aliyun_bucket: "aliyun-bucket".to_string(),
rustfs_accesskey_id: "rustfs-ak".to_string(),
rustfs_accesskey_secret: "rustfs-sk".to_string(),
rustfs_endpoint: "https://rustfs.internal.example.com".to_string(),
rustfs_region_id: "cn-local".to_string(),
rustfs_bucket: "rustfs-bucket".to_string(),
rustfs_expiration: 3600,
minio_accesskey_id: "minio-ak".to_string(),
minio_accesskey_secret: "minio-sk".to_string(),
minio_endpoint: "https://minio.internal.example.com".to_string(),
minio_region_id: "us-east-1".to_string(),
minio_bucket: "minio-bucket".to_string(),
minio_expiration: 3600,
}
}
#[test]
fn aliyun_oss_config_normalizes_sdk_endpoint() {
let config = sample_aws_config("aliyun");
let oss_config = OssConfig::from_env_config(&config);
assert_eq!(
oss_config.sdk_endpoint(),
"https://s3.oss-cn-hangzhou.aliyuncs.com"
);
}
#[test]
fn non_aliyun_oss_config_preserves_endpoint() {
let config = sample_aws_config("minio");
let oss_config = OssConfig::from_env_config(&config);
assert_eq!(
oss_config.sdk_endpoint(),
"https://minio.internal.example.com"
);
}
}