use super::request_options::RequestOptions;
use crate::Error;
use crate::builder::storage::ReadObject;
use crate::builder::storage::WriteObject;
use crate::read_resume_policy::ReadResumePolicy;
use crate::streaming_source::Payload;
use auth::credentials::CacheableResource;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use http::Extensions;
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct Storage<S = crate::storage::transport::Storage>
where
S: crate::storage::stub::Storage + 'static,
{
stub: std::sync::Arc<S>,
options: RequestOptions,
}
#[derive(Clone, Debug)]
pub(crate) struct StorageInner {
pub client: reqwest::Client,
pub cred: auth::credentials::Credentials,
pub endpoint: String,
pub options: RequestOptions,
}
impl Storage {
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
}
impl<S> Storage<S>
where
S: crate::storage::stub::Storage + 'static,
{
pub fn from_stub(stub: S) -> Self
where
S: super::stub::Storage + 'static,
{
Self {
stub: std::sync::Arc::new(stub),
options: RequestOptions::new(),
}
}
pub fn write_object<B, O, T, P>(&self, bucket: B, object: O, payload: T) -> WriteObject<P, S>
where
B: Into<String>,
O: Into<String>,
T: Into<Payload<P>>,
{
WriteObject::new(
self.stub.clone(),
bucket,
object,
payload,
self.options.clone(),
)
}
pub fn read_object<B, O>(&self, bucket: B, object: O) -> ReadObject<S>
where
B: Into<String>,
O: Into<String>,
{
ReadObject::new(self.stub.clone(), bucket, object, self.options.clone())
}
}
impl Storage {
pub(crate) fn new(builder: ClientBuilder) -> gax::client_builder::Result<Self> {
use gax::client_builder::Error;
let client = reqwest::Client::builder()
.no_brotli()
.no_deflate()
.no_gzip()
.no_zstd()
.build()
.map_err(Error::transport)?;
let mut builder = builder;
let cred = if let Some(c) = builder.credentials {
c
} else {
auth::credentials::Builder::default()
.build()
.map_err(Error::cred)?
};
let endpoint = builder
.endpoint
.unwrap_or_else(|| self::DEFAULT_HOST.to_string());
builder.credentials = Some(cred);
builder.endpoint = Some(endpoint);
let inner = Arc::new(StorageInner::new(client, builder));
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner);
Ok(Self { stub, options })
}
}
impl StorageInner {
pub(self) fn new(client: reqwest::Client, builder: ClientBuilder) -> Self {
Self {
client,
cred: builder
.credentials
.expect("StorageInner assumes the credentials are initialized"),
endpoint: builder
.endpoint
.expect("StorageInner assumes the endpoint is initialized"),
options: builder.default_options,
}
}
pub async fn apply_auth_headers(
&self,
builder: reqwest::RequestBuilder,
) -> crate::Result<reqwest::RequestBuilder> {
let cached_auth_headers = self
.cred
.headers(Extensions::new())
.await
.map_err(Error::authentication)?;
let auth_headers = match cached_auth_headers {
CacheableResource::New { data, .. } => data,
CacheableResource::NotModified => {
unreachable!("headers are not cached");
}
};
let builder = builder.headers(auth_headers);
Ok(builder)
}
}
pub struct ClientBuilder {
pub(crate) endpoint: Option<String>,
pub(crate) credentials: Option<auth::credentials::Credentials>,
pub(crate) default_options: RequestOptions,
}
impl ClientBuilder {
pub(crate) fn new() -> Self {
Self {
endpoint: None,
credentials: None,
default_options: RequestOptions::new(),
}
}
pub async fn build(self) -> gax::client_builder::Result<Storage> {
Storage::new(self)
}
pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
self.endpoint = Some(v.into());
self
}
pub fn with_credentials<V: Into<auth::credentials::Credentials>>(mut self, v: V) -> Self {
self.credentials = Some(v.into());
self
}
pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
self.default_options.retry_policy = v.into().into();
self
}
pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
mut self,
v: V,
) -> Self {
self.default_options.backoff_policy = v.into().into();
self
}
pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
mut self,
v: V,
) -> Self {
self.default_options.retry_throttler = v.into().into();
self
}
pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
self.default_options.resumable_upload_threshold = v.into();
self
}
pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
self.default_options.resumable_upload_buffer_size = v.into();
self
}
pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
where
V: ReadResumePolicy + 'static,
{
self.default_options.read_resume_policy = Arc::new(v);
self
}
}
const DEFAULT_HOST: &str = "https://storage.googleapis.com";
pub(crate) mod info {
const NAME: &str = env!("CARGO_PKG_NAME");
const VERSION: &str = env!("CARGO_PKG_VERSION");
lazy_static::lazy_static! {
pub(crate) static ref X_GOOG_API_CLIENT_HEADER: String = {
let ac = gaxi::api_header::XGoogApiClient{
name: NAME,
version: VERSION,
library_type: gaxi::api_header::GCCL,
};
ac.grpc_header_value()
};
}
}
const ENCODED_CHARS: percent_encoding::AsciiSet = percent_encoding::CONTROLS
.add(b'!')
.add(b'#')
.add(b'$')
.add(b'&')
.add(b'\'')
.add(b'(')
.add(b')')
.add(b'*')
.add(b'+')
.add(b',')
.add(b'/')
.add(b':')
.add(b';')
.add(b'=')
.add(b'?')
.add(b'@')
.add(b'[')
.add(b']')
.add(b' ');
pub(crate) fn enc(value: &str) -> String {
percent_encoding::utf8_percent_encode(value, &ENCODED_CHARS).to_string()
}
pub(crate) fn apply_customer_supplied_encryption_headers(
builder: reqwest::RequestBuilder,
common_object_request_params: &Option<crate::model::CommonObjectRequestParams>,
) -> reqwest::RequestBuilder {
common_object_request_params.iter().fold(builder, |b, v| {
b.header(
"x-goog-encryption-algorithm",
v.encryption_algorithm.clone(),
)
.header(
"x-goog-encryption-key",
BASE64_STANDARD.encode(v.encryption_key_bytes.clone()),
)
.header(
"x-goog-encryption-key-sha256",
BASE64_STANDARD.encode(v.encryption_key_sha256_bytes.clone()),
)
})
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use gax::retry_result::RetryResult;
use gax::retry_state::RetryState;
use std::{sync::Arc, time::Duration};
pub(crate) fn test_builder() -> ClientBuilder {
ClientBuilder::new()
.with_credentials(auth::credentials::anonymous::Builder::new().build())
.with_endpoint("http://private.googleapis.com")
.with_backoff_policy(
gax::exponential_backoff::ExponentialBackoffBuilder::new()
.with_initial_delay(Duration::from_millis(1))
.with_maximum_delay(Duration::from_millis(2))
.build()
.expect("hard coded policy should build correctly"),
)
}
pub(crate) fn test_inner_client(builder: ClientBuilder) -> Arc<StorageInner> {
let client = reqwest::Client::new();
Arc::new(StorageInner::new(client, builder))
}
mockall::mock! {
#[derive(Debug)]
pub RetryThrottler {}
impl gax::retry_throttler::RetryThrottler for RetryThrottler {
fn throttle_retry_attempt(&self) -> bool;
fn on_retry_failure(&mut self, flow: &RetryResult);
fn on_success(&mut self);
}
}
mockall::mock! {
#[derive(Debug)]
pub RetryPolicy {}
impl gax::retry_policy::RetryPolicy for RetryPolicy {
fn on_error(&self, state: &RetryState, error: gax::error::Error) -> RetryResult;
}
}
mockall::mock! {
#[derive(Debug)]
pub BackoffPolicy {}
impl gax::backoff_policy::BackoffPolicy for BackoffPolicy {
fn on_failure(&self, state: &RetryState) -> std::time::Duration;
}
}
mockall::mock! {
#[derive(Debug)]
pub ReadResumePolicy {}
impl crate::read_resume_policy::ReadResumePolicy for ReadResumePolicy {
fn on_error(&self, query: &crate::read_resume_policy::ResumeQuery, error: gax::error::Error) -> crate::read_resume_policy::ResumeResult;
}
}
}