use dragonfly_api::common;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::error::BackendError;
use dragonfly_client_core::{Error as ClientError, Result as ClientResult};
use dragonfly_client_util::tls::NoVerifier;
use opendal::{layers::HttpClientLayer, layers::TimeoutLayer, raw::HttpClient, Operator};
use percent_encoding::percent_decode_str;
use std::fmt;
use std::result::Result;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::io::StreamReader;
use tracing::{debug, error, instrument};
use url::Url;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Scheme {
S3,
GCS,
ABS,
OSS,
OBS,
COS,
}
impl fmt::Display for Scheme {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Scheme::S3 => write!(f, "s3"),
Scheme::GCS => write!(f, "gs"),
Scheme::ABS => write!(f, "abs"),
Scheme::OSS => write!(f, "oss"),
Scheme::OBS => write!(f, "obs"),
Scheme::COS => write!(f, "cos"),
}
}
}
impl FromStr for Scheme {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"s3" => Ok(Scheme::S3),
"gs" => Ok(Scheme::GCS),
"abs" => Ok(Scheme::ABS),
"oss" => Ok(Scheme::OSS),
"obs" => Ok(Scheme::OBS),
"cos" => Ok(Scheme::COS),
_ => Err(format!("invalid scheme: {}", s)),
}
}
}
#[derive(Debug)]
pub struct ParsedURL {
pub url: Url,
pub scheme: Scheme,
pub bucket: String,
pub key: String,
}
impl ParsedURL {
pub fn is_dir(&self) -> bool {
self.url.path().ends_with('/')
}
pub fn make_url_by_entry_path(&self, entry_path: &str) -> Url {
let mut url = self.url.clone();
url.set_path(entry_path);
url
}
}
impl TryFrom<Url> for ParsedURL {
type Error = ClientError;
fn try_from(url: Url) -> Result<Self, Self::Error> {
let bucket = url
.host_str()
.ok_or_else(|| ClientError::InvalidURI(url.to_string()))?
.to_string();
let scheme: Scheme = url.scheme().to_string().parse().map_err(|err| {
error!("parse scheme failed {}: {}", url, err);
ClientError::InvalidURI(url.to_string())
})?;
let key = url
.path()
.strip_prefix('/')
.ok_or_else(|| ClientError::InvalidURI(url.to_string()))?;
let decoded_key = percent_decode_str(key).decode_utf8_lossy().to_string();
Ok(Self {
url,
scheme,
bucket,
key: decoded_key,
})
}
}
macro_rules! make_need_fields_message {
($var:ident {$($field:ident),*}) => {{
let mut need_fields: Vec<&'static str> = vec![];
$(
if $var.$field.is_none() {
need_fields.push(stringify!($field));
}
)*
format!("need {}", need_fields.join(", "))
}};
}
pub struct ObjectStorage {
scheme: Scheme,
config: Arc<Config>,
client: reqwest::Client,
danger_client: reqwest::Client,
}
impl ObjectStorage {
pub fn new(scheme: Scheme, config: Arc<Config>) -> ClientResult<ObjectStorage> {
let client = reqwest::Client::builder()
.no_gzip()
.no_brotli()
.no_zstd()
.no_deflate()
.hickory_dns(true)
.pool_max_idle_per_host(super::POOL_MAX_IDLE_PER_HOST)
.tcp_keepalive(super::KEEP_ALIVE_INTERVAL)
.tcp_nodelay(true)
.http2_adaptive_window(true)
.http2_initial_stream_window_size(Some(super::HTTP2_STREAM_WINDOW_SIZE))
.http2_initial_connection_window_size(Some(super::HTTP2_CONNECTION_WINDOW_SIZE))
.http2_keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.http2_keep_alive_while_idle(true)
.build()?;
let client_config_builder = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(NoVerifier::new())
.with_no_client_auth();
let danger_client = reqwest::Client::builder()
.no_gzip()
.no_brotli()
.no_zstd()
.no_deflate()
.hickory_dns(true)
.use_preconfigured_tls(client_config_builder)
.pool_max_idle_per_host(super::POOL_MAX_IDLE_PER_HOST)
.tcp_keepalive(super::KEEP_ALIVE_INTERVAL)
.tcp_nodelay(true)
.http2_adaptive_window(true)
.http2_initial_stream_window_size(Some(super::HTTP2_STREAM_WINDOW_SIZE))
.http2_initial_connection_window_size(Some(super::HTTP2_CONNECTION_WINDOW_SIZE))
.http2_keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.http2_keep_alive_while_idle(true)
.build()?;
Ok(Self {
scheme,
config,
client,
danger_client,
})
}
pub fn operator(
&self,
parsed_url: &super::object_storage::ParsedURL,
object_storage: Option<common::v2::ObjectStorage>,
timeout: Duration,
) -> ClientResult<Operator> {
let Some(object_storage) = object_storage else {
return Err(ClientError::BackendError(Box::new(BackendError {
message: format!("{} need object_storage parameter", self.scheme),
status_code: None,
header: None,
})));
};
match self.scheme {
Scheme::S3 => self.s3_operator(parsed_url, object_storage, timeout),
Scheme::GCS => self.gcs_operator(parsed_url, object_storage, timeout),
Scheme::ABS => self.abs_operator(parsed_url, object_storage, timeout),
Scheme::OSS => self.oss_operator(parsed_url, object_storage, timeout),
Scheme::OBS => self.obs_operator(parsed_url, object_storage, timeout),
Scheme::COS => self.cos_operator(parsed_url, object_storage, timeout),
}
}
pub fn s3_operator(
&self,
parsed_url: &super::object_storage::ParsedURL,
object_storage: common::v2::ObjectStorage,
timeout: Duration,
) -> ClientResult<Operator> {
let (Some(access_key_id), Some(access_key_secret), Some(region)) = (
&object_storage.access_key_id,
&object_storage.access_key_secret,
&object_storage.region,
) else {
return Err(ClientError::BackendError(Box::new(BackendError {
message: format!(
"{} {}",
self.scheme,
make_need_fields_message!(object_storage {
access_key_id,
access_key_secret,
region
})
),
status_code: None,
header: None,
})));
};
let mut builder = opendal::services::S3::default();
builder = builder
.access_key_id(access_key_id)
.secret_access_key(access_key_secret)
.bucket(&parsed_url.bucket)
.region(region);
if let Some(endpoint) = object_storage.endpoint.as_deref() {
builder = builder.endpoint(endpoint);
}
if let Some(session_token) = object_storage.session_token.as_deref() {
builder = builder.session_token(session_token);
}
let http_client = match object_storage.insecure_skip_verify {
Some(true) => self.danger_client.clone(),
_ => self.client.clone(),
};
Ok(Operator::new(builder)?
.finish()
.layer(TimeoutLayer::new().with_timeout(timeout))
.layer(HttpClientLayer::new(HttpClient::with(http_client))))
}
pub fn gcs_operator(
&self,
parsed_url: &super::object_storage::ParsedURL,
object_storage: common::v2::ObjectStorage,
timeout: Duration,
) -> ClientResult<Operator> {
let mut builder = opendal::services::Gcs::default();
builder = builder.bucket(&parsed_url.bucket);
if let Some(credential_path) = object_storage.credential_path.as_deref() {
builder = builder.credential_path(credential_path);
}
if let Some(endpoint) = object_storage.endpoint.as_deref() {
builder = builder.endpoint(endpoint);
}
if let Some(predefined_acl) = object_storage.predefined_acl.as_deref() {
builder = builder.predefined_acl(predefined_acl);
}
let http_client = match object_storage.insecure_skip_verify {
Some(true) => self.danger_client.clone(),
_ => self.client.clone(),
};
Ok(Operator::new(builder)?
.finish()
.layer(TimeoutLayer::new().with_timeout(timeout))
.layer(HttpClientLayer::new(HttpClient::with(http_client))))
}
pub fn abs_operator(
&self,
parsed_url: &super::object_storage::ParsedURL,
object_storage: common::v2::ObjectStorage,
timeout: Duration,
) -> ClientResult<Operator> {
let (Some(access_key_id), Some(access_key_secret), Some(endpoint)) = (
&object_storage.access_key_id,
&object_storage.access_key_secret,
&object_storage.endpoint,
) else {
return Err(ClientError::BackendError(Box::new(BackendError {
message: format!(
"{} {}",
self.scheme,
make_need_fields_message!(object_storage {
access_key_id,
access_key_secret,
endpoint
})
),
status_code: None,
header: None,
})));
};
let mut builder = opendal::services::Azblob::default();
builder = builder
.account_name(access_key_id)
.account_key(access_key_secret)
.container(&parsed_url.bucket)
.endpoint(endpoint);
let http_client = match object_storage.insecure_skip_verify {
Some(true) => self.danger_client.clone(),
_ => self.client.clone(),
};
Ok(Operator::new(builder)?
.finish()
.layer(TimeoutLayer::new().with_timeout(timeout))
.layer(HttpClientLayer::new(HttpClient::with(http_client))))
}
pub fn oss_operator(
&self,
parsed_url: &super::object_storage::ParsedURL,
object_storage: common::v2::ObjectStorage,
timeout: Duration,
) -> ClientResult<Operator> {
let (Some(access_key_id), Some(access_key_secret), Some(endpoint)) = (
&object_storage.access_key_id,
&object_storage.access_key_secret,
&object_storage.endpoint,
) else {
return Err(ClientError::BackendError(Box::new(BackendError {
message: format!(
"{} {}",
self.scheme,
make_need_fields_message!(object_storage {
access_key_id,
access_key_secret,
endpoint
})
),
status_code: None,
header: None,
})));
};
let mut builder = opendal::services::Oss::default();
builder = if let Some(security_token) = &object_storage.security_token {
builder
.access_key_id(access_key_id)
.access_key_secret(access_key_secret)
.endpoint(endpoint)
.root("/")
.bucket(&parsed_url.bucket)
.security_token(security_token)
} else {
builder
.access_key_id(access_key_id)
.access_key_secret(access_key_secret)
.endpoint(endpoint)
.root("/")
.bucket(&parsed_url.bucket)
};
let http_client = match object_storage.insecure_skip_verify {
Some(true) => self.danger_client.clone(),
_ => self.client.clone(),
};
Ok(Operator::new(builder)?
.finish()
.layer(TimeoutLayer::new().with_timeout(timeout))
.layer(HttpClientLayer::new(HttpClient::with(http_client))))
}
pub fn obs_operator(
&self,
parsed_url: &super::object_storage::ParsedURL,
object_storage: common::v2::ObjectStorage,
timeout: Duration,
) -> ClientResult<Operator> {
let (Some(access_key_id), Some(access_key_secret), Some(endpoint)) = (
&object_storage.access_key_id,
&object_storage.access_key_secret,
&object_storage.endpoint,
) else {
return Err(ClientError::BackendError(Box::new(BackendError {
message: format!(
"{} {}",
self.scheme,
make_need_fields_message!(object_storage {
access_key_id,
access_key_secret,
endpoint
})
),
status_code: None,
header: None,
})));
};
let mut builder = opendal::services::Obs::default();
builder = builder
.access_key_id(access_key_id)
.secret_access_key(access_key_secret)
.endpoint(endpoint)
.bucket(&parsed_url.bucket);
let http_client = match object_storage.insecure_skip_verify {
Some(true) => self.danger_client.clone(),
_ => self.client.clone(),
};
Ok(Operator::new(builder)?
.finish()
.layer(TimeoutLayer::new().with_timeout(timeout))
.layer(HttpClientLayer::new(HttpClient::with(http_client))))
}
pub fn cos_operator(
&self,
parsed_url: &super::object_storage::ParsedURL,
object_storage: common::v2::ObjectStorage,
timeout: Duration,
) -> ClientResult<Operator> {
let (Some(access_key_id), Some(access_key_secret), Some(endpoint)) = (
&object_storage.access_key_id,
&object_storage.access_key_secret,
&object_storage.endpoint,
) else {
return Err(ClientError::BackendError(Box::new(BackendError {
message: format!(
"{} {}",
self.scheme,
make_need_fields_message!(object_storage {
access_key_id,
access_key_secret,
endpoint
})
),
status_code: None,
header: None,
})));
};
let mut builder = opendal::services::Cos::default();
builder = builder
.secret_id(access_key_id)
.secret_key(access_key_secret)
.endpoint(endpoint)
.bucket(&parsed_url.bucket);
let http_client = match object_storage.insecure_skip_verify {
Some(true) => self.danger_client.clone(),
_ => self.client.clone(),
};
Ok(Operator::new(builder)?
.finish()
.layer(TimeoutLayer::new().with_timeout(timeout))
.layer(HttpClientLayer::new(HttpClient::with(http_client))))
}
}
#[tonic::async_trait]
impl crate::Backend for ObjectStorage {
fn scheme(&self) -> String {
self.scheme.to_string()
}
#[instrument(skip_all)]
async fn stat(&self, request: super::StatRequest) -> ClientResult<super::StatResponse> {
debug!(
"stat request {} {}: {:?}",
request.task_id, request.url, request.http_header
);
let url: Url = request
.url
.parse()
.map_err(|_| ClientError::InvalidURI(request.url.clone()))?;
let parsed_url: super::object_storage::ParsedURL = url.try_into().inspect_err(|err| {
error!(
"parse stat request url failed {} {}: {}",
request.task_id, request.url, err
);
})?;
let operator = self.operator(&parsed_url, request.object_storage, request.timeout)?;
let entries = if parsed_url.is_dir() {
operator
.list_with(&parsed_url.key)
.recursive(true)
.await .map_err(|err| {
error!(
"list request failed {} {}: {}",
request.task_id, request.url, err
);
ClientError::BackendError(Box::new(BackendError {
message: err.to_string(),
status_code: None,
header: None,
}))
})?
.into_iter()
.map(|entry| {
let metadata = entry.metadata();
super::DirEntry {
url: parsed_url.make_url_by_entry_path(entry.path()).to_string(),
content_length: metadata.content_length() as usize,
is_dir: metadata.is_dir(),
}
})
.collect()
} else {
Vec::new()
};
let response = operator.stat_with(&parsed_url.key).await.map_err(|err| {
error!(
"stat request failed {} {}: {}",
request.task_id, request.url, err
);
ClientError::BackendError(Box::new(BackendError {
message: err.to_string(),
status_code: None,
header: None,
}))
})?;
debug!(
"stat response {} {}: {}",
request.task_id,
request.url,
response.content_length()
);
Ok(super::StatResponse {
success: true,
content_length: Some(response.content_length()),
http_header: None,
http_status_code: None,
error_message: None,
entries,
})
}
#[instrument(skip_all)]
async fn get(
&self,
request: super::GetRequest,
) -> ClientResult<super::GetResponse<super::Body>> {
debug!(
"get request {} {}: {:?}",
request.piece_id, request.url, request.http_header
);
let url: Url = request
.url
.parse()
.map_err(|_| ClientError::InvalidURI(request.url.clone()))?;
let parsed_url: super::object_storage::ParsedURL = url.try_into().inspect_err(|err| {
error!(
"parse get request url failed {} {}: {}",
request.piece_id, request.url, err
);
})?;
let operator_reader = self
.operator(&parsed_url, request.object_storage, request.timeout)?
.reader(&parsed_url.key)
.await
.map_err(|err| {
error!(
"get request failed {} {}: {}",
request.piece_id, request.url, err
);
ClientError::BackendError(Box::new(BackendError {
message: err.to_string(),
status_code: None,
header: None,
}))
})?;
let stream = match request.range {
Some(range) => operator_reader
.into_bytes_stream(range.start..range.start + range.length)
.await
.map_err(|err| {
error!(
"get request failed {} {}: {}",
request.piece_id, request.url, err
);
ClientError::BackendError(Box::new(BackendError {
message: err.to_string(),
status_code: None,
header: None,
}))
})?,
None => operator_reader.into_bytes_stream(..).await.map_err(|err| {
error!(
"get request failed {} {}: {}",
request.piece_id, request.url, err
);
ClientError::BackendError(Box::new(BackendError {
message: err.to_string(),
status_code: None,
header: None,
}))
})?,
};
Ok(crate::GetResponse {
success: true,
http_header: None,
http_status_code: Some(reqwest::StatusCode::OK),
reader: Box::new(StreamReader::new(stream)),
error_message: None,
})
}
#[instrument(skip_all)]
async fn put(&self, request: super::PutRequest) -> ClientResult<super::PutResponse> {
debug!("put request {:?} {}", request.path, request.url);
let url: Url = request
.url
.parse()
.map_err(|_| ClientError::InvalidURI(request.url.clone()))?;
let parsed_url: super::object_storage::ParsedURL = url.try_into().inspect_err(|err| {
error!(
"parse put request url failed {:?} {}: {}",
request.path, request.url, err
);
})?;
let mut object_storage_writer = self
.operator(&parsed_url, request.object_storage, request.timeout)?
.writer_with(&parsed_url.key)
.concurrent(self.config.backend.put_concurrent_chunk_count as usize)
.chunk(self.config.backend.put_chunk_size.as_u64() as usize)
.await
.map_err(|err| {
error!(
"put request failed {:?} {}: {}",
request.path, request.url, err
);
ClientError::BackendError(Box::new(BackendError {
message: err.to_string(),
status_code: None,
header: None,
}))
})?;
let fs_operator = Operator::new(opendal::services::Fs::default().root("/"))
.inspect_err(|err| {
error!("initialize fs operator failed: {}", err);
})?
.finish();
let fs_reader = fs_operator
.reader_with(&request.path.to_string_lossy())
.concurrent(self.config.backend.put_concurrent_chunk_count as usize)
.chunk(self.config.backend.put_chunk_size.as_u64() as usize)
.await?;
let content_length = fs_operator
.stat(&request.path.to_string_lossy())
.await
.inspect_err(|err| {
error!(
"stat local file failed {:?} {}: {}",
request.path, request.url, err
);
})?
.content_length();
let mut offset: u64 = 0;
while offset < content_length {
let end = std::cmp::min(
offset + self.config.backend.put_chunk_size.as_u64(),
content_length,
);
let buf = fs_reader.read(offset..end).await.inspect_err(|err| {
error!(
"read local file failed {:?} {}: {}",
request.path, request.url, err
);
})?;
object_storage_writer.write(buf).await.inspect_err(|err| {
error!(
"put request failed {:?} {}: {}",
request.path, request.url, err
);
})?;
offset = end;
}
object_storage_writer.close().await.inspect_err(|err| {
error!(
"close put request failed {:?} {}: {}",
request.path, request.url, err
);
})?;
Ok(crate::PutResponse {
success: true,
http_header: None,
http_status_code: Some(reqwest::StatusCode::OK),
content_length: Some(content_length),
error_message: None,
})
}
#[instrument(skip_all)]
async fn exists(&self, request: super::ExistsRequest) -> ClientResult<bool> {
debug!(
"exists request {} {}: {:?}",
request.task_id, request.url, request.http_header
);
let url: Url = request
.url
.parse()
.map_err(|_| ClientError::InvalidURI(request.url.clone()))?;
let parsed_url: super::object_storage::ParsedURL = url.try_into().inspect_err(|err| {
error!(
"parse exists request url failed {} {}: {}",
request.task_id, request.url, err
);
})?;
let operator = self.operator(&parsed_url, request.object_storage, request.timeout)?;
Ok(operator.exists(&parsed_url.key).await?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use dragonfly_api::common::v2::ObjectStorage as ObjectStorageInfo;
#[test]
fn should_get_parsed_url() {
let file_key = "test-bucket/file";
let dir_key = "test-bucket/path/to/dir/";
let schemes = vec![
Scheme::OBS,
Scheme::S3,
Scheme::ABS,
Scheme::OSS,
Scheme::COS,
Scheme::GCS,
];
for scheme in schemes {
let file_url = format!("{}://{}", scheme, file_key);
let url: Url = file_url.parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
assert!(!parsed_url.is_dir());
assert_eq!(parsed_url.bucket, "test-bucket");
assert_eq!(parsed_url.key, "file");
assert_eq!(parsed_url.scheme, scheme);
let dir_url = format!("{}://{}", scheme, dir_key);
let url: Url = dir_url.parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
assert!(parsed_url.is_dir());
assert_eq!(parsed_url.bucket, "test-bucket");
assert_eq!(parsed_url.key, "path/to/dir/");
assert_eq!(parsed_url.scheme, scheme);
}
}
#[test]
fn should_get_url_with_the_same_prefix() {
let file_key = "test-bucket/file";
let schemes = vec![
Scheme::OBS,
Scheme::S3,
Scheme::ABS,
Scheme::OSS,
Scheme::COS,
Scheme::GCS,
];
for scheme in schemes {
let file_url = format!("{}://{}", scheme, file_key);
let url: Url = file_url.parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
let new_url = parsed_url.make_url_by_entry_path("test-entry");
let new_parsed_url: ParsedURL = new_url.try_into().unwrap();
assert_eq!(parsed_url.bucket, new_parsed_url.bucket);
assert_eq!(parsed_url.scheme, new_parsed_url.scheme);
assert_eq!(new_parsed_url.key, "test-entry");
}
}
#[test]
fn should_return_error_when_scheme_not_valid() {
let url: Url = "github://test-bucket/file".parse().unwrap();
let result = TryInto::<ParsedURL>::try_into(url);
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), ClientError::InvalidURI(..)));
}
#[test]
fn should_return_error_when_bucket_not_valid() {
let schemes = vec![
Scheme::OBS,
Scheme::S3,
Scheme::ABS,
Scheme::OSS,
Scheme::COS,
Scheme::GCS,
];
for scheme in schemes {
let url: Url = format!("{}:///file", scheme).parse().unwrap();
let result = TryInto::<ParsedURL>::try_into(url);
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), ClientError::InvalidURI(..)));
}
}
#[test]
fn should_get_operator() {
let test_cases = vec![
(
Scheme::S3,
ObjectStorageInfo {
region: Some("test-region".into()),
access_key_id: Some("access-key-id".into()),
access_key_secret: Some("access-key-secret".into()),
..Default::default()
},
),
(Scheme::GCS, ObjectStorageInfo::default()),
(
Scheme::ABS,
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
access_key_id: Some("access-key-id".into()),
access_key_secret: Some("YWNjZXNzLWtleS1zZWNyZXQK".into()),
..Default::default()
},
),
(
Scheme::OSS,
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
access_key_id: Some("access-key-id".into()),
access_key_secret: Some("access-key-secret".into()),
..Default::default()
},
),
(
Scheme::OBS,
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
access_key_id: Some("access-key-id".into()),
access_key_secret: Some("access-key-secret".into()),
..Default::default()
},
),
(
Scheme::COS,
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
access_key_id: Some("access-key-id".into()),
access_key_secret: Some("access-key-secret".into()),
..Default::default()
},
),
];
for (scheme, object_storage) in test_cases {
let url: Url = format!("{}://test-bucket/file", scheme).parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
let result = ObjectStorage::new(scheme, Arc::new(Config::default()))
.unwrap()
.operator(&parsed_url, Some(object_storage), Duration::from_secs(3));
assert!(
result.is_ok(),
"can not get {} operator, due to: {}",
scheme,
result.unwrap_err()
);
}
}
#[test]
fn should_get_s3_operator_with_extra_info() {
let test_cases = vec![
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
access_key_secret: Some("access_key_secret".into()),
region: Some("test-region".into()),
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
access_key_secret: Some("access_key_secret".into()),
region: Some("test-region".into()),
session_token: Some("session_token".into()),
..Default::default()
},
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
access_key_secret: Some("access_key_secret".into()),
region: Some("test-region".into()),
endpoint: Some("test-endpoint.local".into()),
session_token: Some("session_token".into()),
..Default::default()
},
];
for object_storage in test_cases {
let url: Url = "s3://test-bucket/file".parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
let result = ObjectStorage::new(Scheme::S3, Arc::new(Config::default()))
.unwrap()
.operator(&parsed_url, Some(object_storage), Duration::from_secs(3));
assert!(result.is_ok());
assert_eq!(result.unwrap().info().scheme().to_string(), "s3");
}
}
#[test]
fn should_get_gcs_operator_with_extra_info() {
let test_cases = vec![
ObjectStorageInfo {
credential_path: Some("credential_path".into()),
..Default::default()
},
ObjectStorageInfo {
endpoint: Some("test-endpoint".into()),
..Default::default()
},
ObjectStorageInfo {
predefined_acl: Some("predefine_acl".into()),
..Default::default()
},
ObjectStorageInfo {
credential_path: Some("credential_path".into()),
endpoint: Some("test-endpoint".into()),
..Default::default()
},
ObjectStorageInfo {
credential_path: Some("credential_path".into()),
predefined_acl: Some("predefine_acl".into()),
..Default::default()
},
ObjectStorageInfo {
endpoint: Some("test-endpoint".into()),
predefined_acl: Some("predefine_acl".into()),
..Default::default()
},
ObjectStorageInfo {
credential_path: Some("credential_path".into()),
endpoint: Some("test-endpoint".into()),
predefined_acl: Some("predefine_acl".into()),
..Default::default()
},
];
for object_storage in test_cases {
let url: Url = "gs://test-bucket/file".parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
let result = ObjectStorage::new(Scheme::GCS, Arc::new(Config::default()))
.unwrap()
.operator(&parsed_url, Some(object_storage), Duration::from_secs(3));
assert!(result.is_ok());
assert_eq!(result.unwrap().info().scheme().to_string(), "gcs");
}
}
#[test]
fn should_return_error_when_lacks_of_info() {
let url: Url = "s3://test-bucket/file".parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
let result = ObjectStorage::new(Scheme::S3, Arc::new(Config::default()))
.unwrap()
.operator(&parsed_url, None, Duration::from_secs(3));
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"backend error: s3 need object_storage parameter"
)
}
#[test]
fn should_return_error_when_s3_lacks_of_info() {
let test_cases = vec![
(
ObjectStorageInfo::default(),
"backend error: s3 need access_key_id, access_key_secret, region",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
..Default::default()
},
"backend error: s3 need access_key_secret, region",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: s3 need access_key_id, region",
),
(
ObjectStorageInfo {
region: Some("test-region".into()),
..Default::default()
},
"backend error: s3 need access_key_id, access_key_secret",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: s3 need region",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
region: Some("test-region".into()),
..Default::default()
},
"backend error: s3 need access_key_secret",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
region: Some("test-region".into()),
..Default::default()
},
"backend error: s3 need access_key_id",
),
];
for (object_storage, error_message) in test_cases {
let url: Url = "s3://test-bucket/file".parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
let result = ObjectStorage::new(Scheme::S3, Arc::new(Config::default()))
.unwrap()
.operator(&parsed_url, Some(object_storage), Duration::from_secs(3));
assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), error_message);
}
}
#[test]
fn should_return_error_when_abs_lacks_of_info() {
let test_cases = vec![
(
ObjectStorageInfo::default(),
"backend error: abs need access_key_id, access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
..Default::default()
},
"backend error: abs need access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: abs need access_key_id, endpoint",
),
(
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: abs need access_key_id, access_key_secret",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: abs need endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: abs need access_key_secret",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: abs need access_key_id",
),
];
for (object_storage, error_message) in test_cases {
let url: Url = "abs://test-bucket/file".parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
let result = ObjectStorage::new(Scheme::ABS, Arc::new(Config::default()))
.unwrap()
.operator(&parsed_url, Some(object_storage), Duration::from_secs(3));
assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), error_message);
}
}
#[test]
fn should_return_error_when_oss_lacks_of_info() {
let test_cases = vec![
(
ObjectStorageInfo::default(),
"backend error: oss need access_key_id, access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
..Default::default()
},
"backend error: oss need access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: oss need access_key_id, endpoint",
),
(
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: oss need access_key_id, access_key_secret",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: oss need endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: oss need access_key_secret",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: oss need access_key_id",
),
];
for (object_storage, error_message) in test_cases {
let url: Url = "oss://test-bucket/file".parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
let result = ObjectStorage::new(Scheme::OSS, Arc::new(Config::default()))
.unwrap()
.operator(&parsed_url, Some(object_storage), Duration::from_secs(3));
assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), error_message);
}
}
#[test]
fn should_return_error_when_obs_lacks_of_info() {
let test_cases = vec![
(
ObjectStorageInfo::default(),
"backend error: obs need access_key_id, access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
..Default::default()
},
"backend error: obs need access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: obs need access_key_id, endpoint",
),
(
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: obs need access_key_id, access_key_secret",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: obs need endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: obs need access_key_secret",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: obs need access_key_id",
),
];
for (object_storage, error_message) in test_cases {
let url: Url = "obs://test-bucket/file".parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
let result = ObjectStorage::new(Scheme::OBS, Arc::new(Config::default()))
.unwrap()
.operator(&parsed_url, Some(object_storage), Duration::from_secs(3));
assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), error_message);
}
}
#[test]
fn should_return_error_when_cos_lacks_of_info() {
let test_cases = vec![
(
ObjectStorageInfo::default(),
"backend error: cos need access_key_id, access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
..Default::default()
},
"backend error: cos need access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: cos need access_key_id, endpoint",
),
(
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: cos need access_key_id, access_key_secret",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: cos need endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: cos need access_key_secret",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error: cos need access_key_id",
),
];
for (object_storage, error_message) in test_cases {
let url: Url = "cos://test-bucket/file".parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
let result = ObjectStorage::new(Scheme::COS, Arc::new(Config::default()))
.unwrap()
.operator(&parsed_url, Some(object_storage), Duration::from_secs(3));
assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), error_message);
}
}
#[test]
fn should_handle_insecure_skip_verify_parameter() {
let test_cases = vec![
ObjectStorageInfo {
endpoint: Some("https://oss-cn-beijing.aliyuncs.com".into()),
access_key_id: Some("test-access-key-id".into()),
access_key_secret: Some("test-access-key-secret".into()),
insecure_skip_verify: Some(true),
..Default::default()
},
ObjectStorageInfo {
endpoint: Some("https://oss-cn-beijing.aliyuncs.com".into()),
access_key_id: Some("test-access-key-id".into()),
access_key_secret: Some("test-access-key-secret".into()),
insecure_skip_verify: Some(false),
..Default::default()
},
ObjectStorageInfo {
endpoint: Some("https://oss-cn-beijing.aliyuncs.com".into()),
access_key_id: Some("test-access-key-id".into()),
access_key_secret: Some("test-access-key-secret".into()),
insecure_skip_verify: None,
..Default::default()
},
];
for object_storage in test_cases {
let config = Arc::new(Config::default());
let backend = ObjectStorage::new(Scheme::OSS, config).unwrap();
let url: Url = "oss://test-bucket/file".parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();
let result =
backend.operator(&parsed_url, Some(object_storage), Duration::from_secs(3));
assert!(result.is_ok());
}
}
}