use super::request_options::RequestOptions;
use crate::builder::storage::ReadObject;
use crate::builder::storage::WriteObject;
use crate::read_resume_policy::ReadResumePolicy;
use crate::storage::bidi::OpenObject;
use crate::storage::common_options::CommonOptions;
use crate::streaming_source::Payload;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use gaxi::http::HttpRequestBuilder;
use gaxi::options::{ClientConfig, Credentials};
use google_cloud_auth::credentials::Builder as CredentialsBuilder;
use google_cloud_gax::client_builder::{Error as BuilderError, Result as BuilderResult};
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct Storage<S = crate::stub::DefaultStorage>
where
S: crate::stub::Storage + 'static,
{
stub: std::sync::Arc<S>,
options: RequestOptions,
}
#[derive(Clone, Debug)]
pub(crate) struct StorageInner {
pub client: gaxi::http::ReqwestClient,
pub options: RequestOptions,
pub grpc: gaxi::grpc::Client,
}
impl Storage {
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
}
impl<S> Storage<S>
where
S: crate::storage::stub::Storage + 'static,
{
pub fn from_stub(stub: S) -> Self
where
S: super::stub::Storage + 'static,
{
Self {
stub: std::sync::Arc::new(stub),
options: RequestOptions::new(),
}
}
pub fn write_object<B, O, T, P>(&self, bucket: B, object: O, payload: T) -> WriteObject<P, S>
where
B: Into<String>,
O: Into<String>,
T: Into<Payload<P>>,
{
WriteObject::new(
self.stub.clone(),
bucket,
object,
payload,
self.options.clone(),
)
}
pub fn read_object<B, O>(&self, bucket: B, object: O) -> ReadObject<S>
where
B: Into<String>,
O: Into<String>,
{
ReadObject::new(self.stub.clone(), bucket, object, self.options.clone())
}
pub fn open_object<B, O>(&self, bucket: B, object: O) -> OpenObject<S>
where
B: Into<String>,
O: Into<String>,
{
OpenObject::new(self.stub.clone(), bucket, object, self.options.clone())
}
}
impl Storage {
pub(crate) async fn new(builder: ClientBuilder) -> BuilderResult<Self> {
let tracing = builder.config.tracing;
let inner = StorageInner::from_parts(builder).await?;
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(Arc::new(inner), tracing);
Ok(Self { stub, options })
}
}
impl StorageInner {
pub(self) fn new(
client: gaxi::http::ReqwestClient,
options: RequestOptions,
grpc: gaxi::grpc::Client,
) -> Self {
Self {
client,
options,
grpc,
}
}
pub(self) async fn from_parts(builder: ClientBuilder) -> BuilderResult<Self> {
let (mut config, options) = builder.into_parts()?;
config.disable_automatic_decompression = true;
config.disable_follow_redirects = true;
let client = gaxi::http::ReqwestClient::new(config.clone(), super::DEFAULT_HOST).await?;
let client = if gaxi::options::tracing_enabled(&config) {
client.with_instrumentation(&super::info::INSTRUMENTATION)
} else {
client
};
let grpc = if gaxi::options::tracing_enabled(&config) {
gaxi::grpc::Client::new_with_instrumentation(
config,
super::DEFAULT_HOST,
&super::info::INSTRUMENTATION,
)
.await?
} else {
gaxi::grpc::Client::new(config, super::DEFAULT_HOST).await?
};
let inner = StorageInner::new(client, options, grpc);
Ok(inner)
}
}
pub struct ClientBuilder {
pub(crate) config: ClientConfig,
common_options: CommonOptions,
}
impl ClientBuilder {
pub(crate) fn new() -> Self {
let mut config = ClientConfig::default();
config.retry_policy = Some(Arc::new(crate::retry_policy::storage_default()));
config.backoff_policy = Some(Arc::new(crate::backoff_policy::default()));
{
let count = std::thread::available_parallelism().ok();
config.grpc_subchannel_count = Some(count.map(|x| x.get()).unwrap_or(1));
}
let common_options = CommonOptions::new();
Self {
config,
common_options,
}
}
pub async fn build(self) -> BuilderResult<Storage> {
Storage::new(self).await
}
pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
self.config.endpoint = Some(v.into());
self
}
pub fn with_credentials<V: Into<Credentials>>(mut self, v: V) -> Self {
self.config.cred = Some(v.into());
self
}
pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
mut self,
v: V,
) -> Self {
self.config.retry_policy = Some(v.into().into());
self
}
pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
mut self,
v: V,
) -> Self {
self.config.backoff_policy = Some(v.into().into());
self
}
pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
mut self,
v: V,
) -> Self {
self.config.retry_throttler = v.into().into();
self
}
pub fn with_resumable_upload_threshold<V: Into<usize>>(mut self, v: V) -> Self {
self.common_options.resumable_upload_threshold = v.into();
self
}
pub fn with_resumable_upload_buffer_size<V: Into<usize>>(mut self, v: V) -> Self {
self.common_options.resumable_upload_buffer_size = v.into();
self
}
pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
where
V: ReadResumePolicy + 'static,
{
self.common_options.read_resume_policy = Arc::new(v);
self
}
pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
self.config.grpc_subchannel_count = Some(v);
self
}
pub fn with_tracing(mut self) -> Self {
self.config.tracing = true;
self
}
pub(crate) fn apply_default_credentials(&mut self) -> BuilderResult<()> {
if self.config.cred.is_some() {
return Ok(());
};
let default = CredentialsBuilder::default()
.build()
.map_err(BuilderError::cred)?;
self.config.cred = Some(default);
Ok(())
}
pub(crate) fn apply_default_endpoint(&mut self) -> BuilderResult<()> {
let _ = self
.config
.endpoint
.get_or_insert_with(|| super::DEFAULT_HOST.to_string());
Ok(())
}
pub(crate) fn into_parts(
mut self,
) -> google_cloud_gax::client_builder::Result<(ClientConfig, RequestOptions)> {
self.apply_default_credentials()?;
self.apply_default_endpoint()?;
let request_options =
RequestOptions::new_with_client_config(&self.config, self.common_options);
Ok((self.config, request_options))
}
}
pub(crate) const ENCODED_CHARS: percent_encoding::AsciiSet = percent_encoding::CONTROLS
.add(b'!')
.add(b'#')
.add(b'$')
.add(b'&')
.add(b'\'')
.add(b'(')
.add(b')')
.add(b'*')
.add(b'+')
.add(b',')
.add(b'/')
.add(b':')
.add(b';')
.add(b'=')
.add(b'?')
.add(b'@')
.add(b'[')
.add(b']')
.add(b' ');
pub(crate) fn enc(value: &str) -> String {
percent_encoding::utf8_percent_encode(value, &ENCODED_CHARS).to_string()
}
pub(crate) fn apply_customer_supplied_encryption_headers(
builder: HttpRequestBuilder,
common_object_request_params: &Option<crate::model::CommonObjectRequestParams>,
) -> HttpRequestBuilder {
common_object_request_params.iter().fold(builder, |b, v| {
b.header(
"x-goog-encryption-algorithm",
v.encryption_algorithm.clone(),
)
.header(
"x-goog-encryption-key",
BASE64_STANDARD.encode(v.encryption_key_bytes.clone()),
)
.header(
"x-goog-encryption-key-sha256",
BASE64_STANDARD.encode(v.encryption_key_sha256_bytes.clone()),
)
})
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
use google_cloud_gax::retry_result::RetryResult;
use google_cloud_gax::retry_state::RetryState;
use std::{sync::Arc, time::Duration};
#[test]
fn default_settings() {
let builder = ClientBuilder::new().with_credentials(Anonymous::new().build());
let config = builder.config;
assert!(config.retry_policy.is_some(), "{config:?}");
assert!(config.backoff_policy.is_some(), "{config:?}");
{
assert!(
config.grpc_subchannel_count.is_some_and(|v| v >= 1),
"{config:?}"
);
}
}
#[test]
fn subchannel_count() {
let builder = ClientBuilder::new()
.with_credentials(Anonymous::new().build())
.with_grpc_subchannel_count(42);
let config = builder.config;
assert!(
config.grpc_subchannel_count.is_some_and(|v| v == 42),
"{config:?}"
);
}
pub(crate) fn test_builder() -> ClientBuilder {
ClientBuilder::new()
.with_credentials(Anonymous::new().build())
.with_endpoint("http://private.googleapis.com")
.with_backoff_policy(
google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder::new()
.with_initial_delay(Duration::from_millis(1))
.with_maximum_delay(Duration::from_millis(2))
.build()
.expect("hard coded policy should build correctly"),
)
}
pub(crate) async fn test_inner_client(builder: ClientBuilder) -> Arc<StorageInner> {
let inner = StorageInner::from_parts(builder)
.await
.expect("creating an test inner client succeeds");
Arc::new(inner)
}
mockall::mock! {
#[derive(Debug)]
pub RetryThrottler {}
impl google_cloud_gax::retry_throttler::RetryThrottler for RetryThrottler {
fn throttle_retry_attempt(&self) -> bool;
fn on_retry_failure(&mut self, flow: &RetryResult);
fn on_success(&mut self);
}
}
mockall::mock! {
#[derive(Debug)]
pub RetryPolicy {}
impl google_cloud_gax::retry_policy::RetryPolicy for RetryPolicy {
fn on_error(&self, state: &RetryState, error: google_cloud_gax::error::Error) -> RetryResult;
}
}
mockall::mock! {
#[derive(Debug)]
pub BackoffPolicy {}
impl google_cloud_gax::backoff_policy::BackoffPolicy for BackoffPolicy {
fn on_failure(&self, state: &RetryState) -> std::time::Duration;
}
}
mockall::mock! {
#[derive(Debug)]
pub ReadResumePolicy {}
impl crate::read_resume_policy::ReadResumePolicy for ReadResumePolicy {
fn on_error(&self, query: &crate::read_resume_policy::ResumeQuery, error: google_cloud_gax::error::Error) -> crate::read_resume_policy::ResumeResult;
}
}
}