use super::request_options::RequestOptions;
use crate::Error;
use crate::builder::storage::ReadObject;
use crate::builder::storage::UploadObject;
use crate::upload_source::{InsertPayload, Seek, StreamingSource};
use auth::credentials::CacheableResource;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use http::Extensions;
use sha2::{Digest, Sha256};
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct Storage {
inner: std::sync::Arc<StorageInner>,
}
#[derive(Clone, Debug)]
pub(crate) struct StorageInner {
pub client: reqwest::Client,
pub cred: auth::credentials::Credentials,
pub endpoint: String,
pub options: RequestOptions,
}
impl Storage {
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
pub fn upload_object<B, O, T, P>(&self, bucket: B, object: O, payload: T) -> UploadObject<P>
where
B: Into<String>,
O: Into<String>,
T: Into<InsertPayload<P>>,
InsertPayload<P>: StreamingSource + Seek,
{
UploadObject::new(self.inner.clone(), bucket, object, payload)
}
pub fn read_object<B, O>(&self, bucket: B, object: O) -> ReadObject
where
B: Into<String>,
O: Into<String>,
{
ReadObject::new(self.inner.clone(), bucket, object)
}
pub(crate) fn new(builder: ClientBuilder) -> gax::client_builder::Result<Self> {
use gax::client_builder::Error;
let client = reqwest::Client::builder()
.no_brotli()
.no_deflate()
.no_gzip()
.no_zstd()
.build()
.map_err(Error::transport)?;
let mut builder = builder;
let cred = if let Some(c) = builder.credentials {
c
} else {
auth::credentials::Builder::default()
.build()
.map_err(Error::cred)?
};
let endpoint = builder
.endpoint
.unwrap_or_else(|| self::DEFAULT_HOST.to_string());
builder.credentials = Some(cred);
builder.endpoint = Some(endpoint);
let inner = Arc::new(StorageInner::new(client, builder));
Ok(Self { inner })
}
}
impl StorageInner {
pub(self) fn new(client: reqwest::Client, builder: ClientBuilder) -> Self {
Self {
client,
cred: builder
.credentials
.expect("StorageInner assumes the credentials are initialized"),
endpoint: builder
.endpoint
.expect("StorageInner assumes the endpoint is initialized"),
options: builder.default_options,
}
}
pub async fn apply_auth_headers(
&self,
builder: reqwest::RequestBuilder,
) -> crate::Result<reqwest::RequestBuilder> {
let cached_auth_headers = self
.cred
.headers(Extensions::new())
.await
.map_err(Error::authentication)?;
let auth_headers = match cached_auth_headers {
CacheableResource::New { data, .. } => data,
CacheableResource::NotModified => {
unreachable!("headers are not cached");
}
};
let builder = builder.headers(auth_headers);
Ok(builder)
}
}
pub struct ClientBuilder {
pub(crate) endpoint: Option<String>,
pub(crate) credentials: Option<auth::credentials::Credentials>,
pub(crate) default_options: RequestOptions,
}
impl ClientBuilder {
pub(crate) fn new() -> Self {
Self {
endpoint: None,
credentials: None,
default_options: RequestOptions::new(),
}
}
pub async fn build(self) -> gax::client_builder::Result<Storage> {
Storage::new(self)
}
pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
self.endpoint = Some(v.into());
self
}
pub fn with_credentials<V: Into<auth::credentials::Credentials>>(mut self, v: V) -> Self {
self.credentials = Some(v.into());
self
}
pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
self.default_options.retry_policy = v.into().into();
self
}
pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
mut self,
v: V,
) -> Self {
self.default_options.backoff_policy = v.into().into();
self
}
pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
mut self,
v: V,
) -> Self {
self.default_options.retry_throttler = v.into().into();
self
}
}
const DEFAULT_HOST: &str = "https://storage.googleapis.com";
pub(crate) mod info {
const NAME: &str = env!("CARGO_PKG_NAME");
const VERSION: &str = env!("CARGO_PKG_VERSION");
lazy_static::lazy_static! {
pub(crate) static ref X_GOOG_API_CLIENT_HEADER: String = {
let ac = gaxi::api_header::XGoogApiClient{
name: NAME,
version: VERSION,
library_type: gaxi::api_header::GCCL,
};
ac.grpc_header_value()
};
}
}
const ENCODED_CHARS: percent_encoding::AsciiSet = percent_encoding::CONTROLS
.add(b'!')
.add(b'#')
.add(b'$')
.add(b'&')
.add(b'\'')
.add(b'(')
.add(b')')
.add(b'*')
.add(b'+')
.add(b',')
.add(b'/')
.add(b':')
.add(b';')
.add(b'=')
.add(b'?')
.add(b'@')
.add(b'[')
.add(b']')
.add(b' ');
pub(crate) fn enc(value: &str) -> String {
percent_encoding::utf8_percent_encode(value, &ENCODED_CHARS).to_string()
}
#[derive(thiserror::Error, Debug, PartialEq)]
#[non_exhaustive]
pub(crate) enum RangeError {
#[error("read limit was negative, expected non-negative value.")]
NegativeLimit,
#[error("negative read offsets cannot be used with read limits.")]
NegativeOffsetWithLimit,
}
#[derive(Debug)]
pub struct KeyAes256 {
key: [u8; 32],
}
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
pub enum KeyAes256Error {
#[error("Key has an invalid length: expected 32 bytes.")]
InvalidLength,
}
impl KeyAes256 {
pub fn new(key: &[u8]) -> std::result::Result<Self, KeyAes256Error> {
match key.len() {
32 => Ok(Self {
key: key[..32].try_into().unwrap(),
}),
_ => Err(KeyAes256Error::InvalidLength),
}
}
}
impl std::convert::From<KeyAes256> for crate::model::CommonObjectRequestParams {
fn from(value: KeyAes256) -> Self {
crate::model::CommonObjectRequestParams::new()
.set_encryption_algorithm("AES256")
.set_encryption_key_bytes(value.key.to_vec())
.set_encryption_key_sha256_bytes(Sha256::digest(value.key).as_slice().to_owned())
}
}
pub(crate) fn apply_customer_supplied_encryption_headers(
builder: reqwest::RequestBuilder,
common_object_request_params: &Option<crate::model::CommonObjectRequestParams>,
) -> reqwest::RequestBuilder {
common_object_request_params.iter().fold(builder, |b, v| {
b.header(
"x-goog-encryption-algorithm",
v.encryption_algorithm.clone(),
)
.header(
"x-goog-encryption-key",
BASE64_STANDARD.encode(v.encryption_key_bytes.clone()),
)
.header(
"x-goog-encryption-key-sha256",
BASE64_STANDARD.encode(v.encryption_key_sha256_bytes.clone()),
)
})
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::{sync::Arc, time::Duration};
use test_case::test_case;
type Result = std::result::Result<(), Box<dyn std::error::Error>>;
pub(crate) fn test_builder() -> ClientBuilder {
ClientBuilder::new()
.with_credentials(auth::credentials::testing::test_credentials())
.with_endpoint("http://private.googleapis.com")
.with_backoff_policy(
gax::exponential_backoff::ExponentialBackoffBuilder::new()
.with_initial_delay(Duration::from_millis(1))
.with_maximum_delay(Duration::from_millis(2))
.clamp(),
)
}
pub(crate) fn test_inner_client(builder: ClientBuilder) -> Arc<StorageInner> {
let client = reqwest::Client::new();
Arc::new(StorageInner::new(client, builder))
}
pub(crate) fn create_key_helper() -> (Vec<u8>, String, Vec<u8>, String) {
let key = vec![b'a'; 32];
let key_base64 = BASE64_STANDARD.encode(key.clone());
let key_sha256 = Sha256::digest(key.clone());
let key_sha256_base64 = BASE64_STANDARD.encode(key_sha256);
(key, key_base64, key_sha256.to_vec(), key_sha256_base64)
}
#[test]
fn test_key_aes_256() -> Result {
let v_slice: &[u8] = &[b'c'; 32];
KeyAes256::new(v_slice)?;
let v_vec: Vec<u8> = vec![b'a'; 32];
KeyAes256::new(&v_vec)?;
let v_array: [u8; 32] = [b'a'; 32];
KeyAes256::new(&v_array)?;
let v_bytes: bytes::Bytes = bytes::Bytes::copy_from_slice(&v_array);
KeyAes256::new(&v_bytes)?;
Ok(())
}
#[test_case(&[b'a'; 0]; "no bytes")]
#[test_case(&[b'a'; 1]; "not enough bytes")]
#[test_case(&[b'a'; 33]; "too many bytes")]
fn test_key_aes_256_err(input: &[u8]) {
KeyAes256::new(input).unwrap_err();
}
#[test]
fn test_key_aes_256_to_control_model_object() -> Result {
let (key, _, key_sha256, _) = create_key_helper();
let key_aes_256 = KeyAes256::new(&key)?;
let params = crate::model::CommonObjectRequestParams::from(key_aes_256);
assert_eq!(params.encryption_algorithm, "AES256");
assert_eq!(params.encryption_key_bytes, key);
assert_eq!(params.encryption_key_sha256_bytes, key_sha256);
Ok(())
}
}