pub mod body;
mod credential;
pub mod credentials;
mod error;
pub mod ops;
mod query_auth_option;
pub mod response;
mod ser;
mod utils;
use std::borrow::Cow;
use std::collections::HashSet;
use std::time::Duration;
use http::HeaderMap;
use http::header::HOST;
use serde::Serialize;
use tracing::trace;
use url::Url;
pub use self::body::MakeBody;
use self::credential::SignContext;
use self::credentials::{
CredentialsProvider,
DefaultCredentialsChain,
DynCredentialsProvider,
StaticCredentialsProvider,
};
pub use self::error::{Error, Result};
pub use self::query_auth_option::{QueryAuthOptions, QueryAuthOptionsBuilder};
pub use self::response::ResponseProcessor;
pub use self::utils::escape_path;
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug, Clone)]
pub struct Prepared<Q = (), B = ()> {
pub method: http::Method,
pub key: Option<String>,
pub additional_headers: HashSet<String>,
pub headers: Option<HeaderMap>,
pub query: Option<Q>,
pub body: Option<B>,
}
impl<Q, B> Default for Prepared<Q, B> {
fn default() -> Self {
Self {
method: http::Method::GET,
key: None,
additional_headers: HashSet::new(),
headers: None,
query: None,
body: None,
}
}
}
pub trait Ops: Sized {
const PRODUCT: &'static str = "oss";
const USE_BUCKET: bool = true;
type Query;
type Body: MakeBody;
type Response;
fn prepare(self) -> Result<Prepared<Self::Query, <Self::Body as MakeBody>::Body>>;
}
pub(crate) trait Request<P> {
type Response;
fn request(&self, ops: P) -> impl Future<Output = Result<Self::Response>>;
fn presign(
&self,
ops: P,
public: bool,
query_auth_options: Option<QueryAuthOptions>,
) -> impl Future<Output = Result<String>>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum UrlStyle {
#[default]
VirtualHosted,
Path,
CName,
}
pub struct ClientConfig {
pub http_timeout: Duration,
pub default_headers: http::HeaderMap,
pub url_style: UrlStyle,
pub public_url_style: UrlStyle,
}
impl Default for ClientConfig {
fn default() -> Self {
ClientConfig {
http_timeout: Duration::from_secs(30),
default_headers: http::HeaderMap::default(),
url_style: UrlStyle::default(),
public_url_style: UrlStyle::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct Client {
http_client: reqwest::Client,
raw_internal_host: String,
raw_internal_scheme: String,
raw_public_host: String,
raw_public_scheme: String,
region: String,
bucket: String,
url_style: UrlStyle,
public_url_style: UrlStyle,
credentials_provider: DynCredentialsProvider,
}
impl Client {
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
fn build_url<'a>(
&'a self,
bucket: Option<Cow<'a, str>>,
key: Option<Cow<'a, str>>,
public: bool,
) -> (Cow<'a, str>, Cow<'a, str>) {
let host = if public {
self.raw_public_host.as_str()
} else {
self.raw_internal_host.as_str()
};
let url_style = if public {
self.public_url_style
} else {
self.url_style
};
let (host, paths) = match (bucket, url_style) {
(Some(bucket_name), UrlStyle::VirtualHosted) => {
(Cow::Owned(format!("{bucket_name}.{host}")), None)
},
(Some(bucket_name), UrlStyle::Path) => {
let mut paths = Vec::with_capacity(2);
paths.push(bucket_name);
if key.is_none() {
paths.push(Cow::Borrowed(""));
}
(Cow::Borrowed(host), Some(paths))
},
(None, _) | (Some(_), UrlStyle::CName) => (Cow::Borrowed(host), None),
};
let path = match (paths, key.as_ref().map(|k| k.trim().trim_start_matches('/'))) {
(Some(paths), Some(key_str)) => {
let mut paths = paths.clone();
paths.push(escape_path(key_str).into());
Cow::Owned(format!("/{}", paths.join("/")))
},
(Some(paths), None) => Cow::Owned(format!("/{}", paths.join("/"))),
(None, Some(key_str)) => Cow::Owned(format!("/{}", escape_path(key_str))),
(None, None) => Cow::Borrowed("/"),
};
(host, path)
}
async fn prepare_request<P>(
&self,
ops: P,
public: bool,
query_auth_options: Option<QueryAuthOptions>,
) -> Result<reqwest::Request>
where
P: Ops + Send + 'static,
P::Query: Serialize + Clone + Send,
P::Response: ResponseProcessor + Send,
P::Body: MakeBody + Send,
{
let Prepared {
method,
key,
additional_headers,
headers: extra_headers,
query,
body,
} = ops.prepare()?;
let bucket = P::USE_BUCKET.then_some(Cow::Borrowed(self.bucket.as_str()));
let (host, path) = self.build_url(bucket.clone(), key.as_deref().map(Cow::Borrowed), public);
let scheme = if public {
&self.raw_public_scheme
} else {
&self.raw_internal_scheme
};
let request_url = format!("{scheme}://{host}{path}");
let mut request = self.http_client.request(method.clone(), request_url).build()?;
let headers = request.headers_mut();
if let Some(extra_headers) = extra_headers {
headers.extend(extra_headers);
}
headers.insert(HOST, host.parse()?);
let additional_headers = additional_headers
.into_iter()
.map(Cow::Owned)
.collect::<HashSet<Cow<str>>>();
if let Some(body) = body {
P::Body::make_body(body, &mut request)?;
}
let sign_context = SignContext {
region: Cow::Borrowed(&self.region),
product: Cow::Borrowed(P::PRODUCT),
bucket,
key: key.as_deref().map(Cow::Borrowed),
query: query.as_ref(),
additional_headers,
};
let credentials = self.credentials_provider.get_credentials().await?;
credential::auth_to(&credentials, &mut request, sign_context, query_auth_options)?;
Ok(request)
}
}
impl<P> Request<P> for Client
where
P: Ops + Send + 'static,
P::Query: Clone + Serialize + Send,
P::Response: ResponseProcessor + Send,
P::Body: MakeBody + Send,
{
type Response = <P::Response as ResponseProcessor>::Output;
async fn request(&self, ops: P) -> Result<Self::Response> {
let request = self.prepare_request(ops, false, None).await?;
trace!("Sending request: {request:?}");
let resp = self.http_client.execute(request).await?;
P::Response::from_response(resp).await
}
async fn presign(
&self,
ops: P,
public: bool,
query_auth_options: Option<QueryAuthOptions>,
) -> Result<String> {
let request = self.prepare_request(ops, public, query_auth_options).await?;
let sign_url = request.url().to_string();
Ok(sign_url)
}
}
pub struct ClientBuilder {
config: ClientConfig,
endpoint: Option<String>,
public_endpoint: Option<String>,
region: Option<String>,
bucket: Option<String>,
access_key_id: Option<String>,
access_key_secret: Option<String>,
security_token: Option<String>,
credentials_provider: Option<DynCredentialsProvider>,
}
impl ClientBuilder {
pub fn new() -> Self {
Self {
config: ClientConfig::default(),
endpoint: None,
public_endpoint: None,
region: None,
bucket: None,
access_key_id: None,
access_key_secret: None,
security_token: None,
credentials_provider: None,
}
}
pub fn endpoint<T: AsRef<str>>(mut self, endpoint: T) -> Self {
self.endpoint = Some(endpoint.as_ref().to_string());
self
}
pub fn public_endpoint<T: AsRef<str>>(mut self, public_endpoint: T) -> Self {
self.public_endpoint = Some(public_endpoint.as_ref().to_string());
self
}
pub fn region<T: AsRef<str>>(mut self, region: T) -> Self {
self.region = Some(region.as_ref().to_string());
self
}
pub fn bucket<T: AsRef<str>>(mut self, bucket: T) -> Self {
self.bucket = Some(bucket.as_ref().to_string());
self
}
pub fn access_key_id<T: AsRef<str>>(mut self, access_key_id: T) -> Self {
self.access_key_id = Some(access_key_id.as_ref().to_string());
self
}
pub fn access_key_secret<T: AsRef<str>>(mut self, access_key_secret: T) -> Self {
self.access_key_secret = Some(access_key_secret.as_ref().to_string());
self
}
pub fn security_token<T: AsRef<str>>(mut self, security_token: T) -> Self {
self.security_token = Some(security_token.as_ref().to_string());
self
}
pub fn credentials_provider<P>(mut self, provider: P) -> Self
where
P: CredentialsProvider + 'static,
{
self.credentials_provider = Some(DynCredentialsProvider::new(provider));
self
}
pub fn http_timeout(mut self, timeout: Duration) -> Self {
self.config.http_timeout = timeout;
self
}
pub fn default_headers(mut self, headers: http::HeaderMap) -> Self {
self.config.default_headers = headers;
self
}
pub fn url_style(mut self, style: UrlStyle) -> Self {
self.config.url_style = style;
self
}
pub fn public_url_style(mut self, style: UrlStyle) -> Self {
self.config.public_url_style = style;
self
}
pub fn build(self) -> Result<Client> {
let endpoint = self
.endpoint
.ok_or_else(|| Error::InvalidArgument("endpoint is required".to_string()))?;
let region = self
.region
.ok_or_else(|| Error::InvalidArgument("region is required".to_string()))?;
let bucket = self
.bucket
.ok_or_else(|| Error::InvalidArgument("bucket is required".to_string()))?;
let http_client = reqwest::Client::builder()
.default_headers(self.config.default_headers)
.timeout(self.config.http_timeout)
.build()?;
let endpoint_url = Url::parse(&endpoint)?;
let raw_internal_host = endpoint_url.host_str().ok_or(Error::MissingHost)?.to_owned();
let raw_internal_scheme = endpoint_url.scheme().to_owned();
let public_endpoint_str = self.public_endpoint.as_ref().unwrap_or(&endpoint);
let public_endpoint_url = Url::parse(public_endpoint_str)?;
let raw_public_host = public_endpoint_url
.host_str()
.ok_or(Error::MissingHost)?
.to_owned();
let raw_public_scheme = public_endpoint_url.scheme().to_owned();
let credentials_provider = if let Some(provider) = self.credentials_provider {
provider
} else {
match (self.access_key_id, self.access_key_secret) {
(Some(ak), Some(sk)) => {
let provider = if let Some(token) = self.security_token {
StaticCredentialsProvider::with_security_token(ak, sk, token)
} else {
StaticCredentialsProvider::new(ak, sk)
};
DynCredentialsProvider::new(provider)
},
_ => DynCredentialsProvider::new(DefaultCredentialsChain::with_http_client(
http_client.clone(),
)),
}
};
Ok(Client {
region,
bucket,
raw_internal_host,
raw_internal_scheme,
raw_public_host,
raw_public_scheme,
url_style: self.config.url_style,
public_url_style: self.config.public_url_style,
credentials_provider,
http_client,
})
}
}
impl Default for ClientBuilder {
fn default() -> Self {
Self::new()
}
}