use crate::error::{Error, GoogleResponse};
pub use crate::resources::bucket::Owner;
use crate::resources::common::ListResponse;
use crate::resources::object_access_control::ObjectAccessControl;
use futures::{stream, Stream, TryStream};
use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC};
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Object {
pub kind: String,
pub id: String,
pub self_link: String,
pub name: String,
pub bucket: String,
#[serde(deserialize_with = "crate::from_str")]
pub generation: i64,
#[serde(deserialize_with = "crate::from_str")]
pub metageneration: i64,
pub content_type: Option<String>,
pub time_created: chrono::DateTime<chrono::Utc>,
pub updated: chrono::DateTime<chrono::Utc>,
pub time_deleted: Option<chrono::DateTime<chrono::Utc>>,
pub temporary_hold: Option<bool>,
pub event_based_hold: Option<bool>,
pub retention_expiration_time: Option<chrono::DateTime<chrono::Utc>>,
pub storage_class: String,
pub time_storage_class_updated: chrono::DateTime<chrono::Utc>,
#[serde(deserialize_with = "crate::from_str")]
pub size: u64,
pub md5_hash: Option<String>,
pub media_link: String,
pub content_encoding: Option<String>,
pub content_disposition: Option<String>,
pub content_language: Option<String>,
pub cache_control: Option<String>,
pub metadata: Option<std::collections::HashMap<String, String>>,
pub acl: Option<Vec<ObjectAccessControl>>,
pub owner: Option<Owner>,
pub crc32c: String,
#[serde(default, deserialize_with = "crate::from_str_opt")]
pub component_count: Option<i32>,
pub etag: String,
pub customer_encryption: Option<CustomerEncrypton>,
pub kms_key_name: Option<String>,
}
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CustomerEncrypton {
pub encryption_algorithm: String,
pub key_sha256: String,
}
#[derive(Debug, PartialEq, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ComposeRequest {
pub kind: String,
pub source_objects: Vec<SourceObject>,
pub destination: Option<Object>,
}
#[derive(Debug, PartialEq, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SourceObject {
pub name: String,
pub generation: Option<i64>,
pub object_preconditions: Option<ObjectPrecondition>,
}
#[derive(Debug, PartialEq, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ObjectPrecondition {
pub if_generation_match: i64,
}
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct ObjectList {
kind: String,
items: Vec<Object>,
}
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct RewriteResponse {
kind: String,
total_bytes_rewritten: String,
object_size: String,
done: bool,
resource: Object,
}
impl Object {
pub async fn create(
bucket: &str,
file: Vec<u8>,
filename: &str,
mime_type: &str,
) -> crate::Result<Self> {
use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE};
const BASE_URL: &str = "https://www.googleapis.com/upload/storage/v1/b";
let url = &format!(
"{}/{}/o?uploadType=media&name={}",
BASE_URL,
percent_encode(&bucket),
percent_encode(&filename),
);
let mut headers = crate::get_headers().await?;
headers.insert(CONTENT_TYPE, mime_type.parse()?);
headers.insert(CONTENT_LENGTH, file.len().to_string().parse()?);
let response = crate::CLIENT
.post(url)
.headers(headers)
.body(file)
.send()
.await?;
if response.status() == 200 {
Ok(serde_json::from_str(&response.text().await?)?)
} else {
Err(Error::new(&response.text().await?))
}
}
#[cfg(feature = "sync")]
pub fn create_sync(
bucket: &str,
file: Vec<u8>,
filename: &str,
mime_type: &str,
) -> crate::Result<Self> {
crate::runtime()?.block_on(Self::create(bucket, file, filename, mime_type))
}
pub async fn create_streamed<S>(
bucket: &str,
stream: S,
length: impl Into<Option<u64>>,
filename: &str,
mime_type: &str,
) -> crate::Result<Self>
where
S: TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
bytes::Bytes: From<S::Ok>,
{
use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE};
const BASE_URL: &str = "https://www.googleapis.com/upload/storage/v1/b";
let url = &format!(
"{}/{}/o?uploadType=media&name={}",
BASE_URL,
percent_encode(&bucket),
percent_encode(&filename),
);
let mut headers = crate::get_headers().await?;
headers.insert(CONTENT_TYPE, mime_type.parse()?);
if let Some(length) = length.into() {
headers.insert(CONTENT_LENGTH, length.into());
}
let body = reqwest::Body::wrap_stream(stream);
let response = crate::CLIENT
.post(url)
.headers(headers)
.body(body)
.send()
.await?;
if response.status() == 200 {
Ok(serde_json::from_str(&response.text().await?)?)
} else {
Err(Error::new(&response.text().await?))
}
}
#[cfg(feature = "sync")]
pub fn create_streamed_sync<R: std::io::Read + Send + 'static>(
bucket: &str,
mut file: R,
length: impl Into<Option<u64>>,
filename: &str,
mime_type: &str,
) -> crate::Result<Self> {
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)
.map_err(|e| Error::Other(e.to_string()))?;
let stream = stream::once(async { Ok::<_, Error>(buffer) });
crate::runtime()?.block_on(Self::create_streamed(
bucket, stream, length, filename, mime_type,
))
}
pub async fn list(
bucket: &str,
) -> Result<impl Stream<Item = Result<Vec<Self>, Error>> + '_, Error> {
Self::list_from(bucket, None).await
}
#[cfg(feature = "sync")]
pub fn list_sync(bucket: &str) -> Result<Vec<Self>, Error> {
use futures::TryStreamExt;
let rt = crate::runtime()?;
let listed = rt.block_on(Self::list_from(bucket, None))?;
rt.block_on(listed.try_concat())
}
pub async fn list_prefix<'a>(
bucket: &'a str,
prefix: &'a str,
) -> Result<impl Stream<Item = Result<Vec<Self>, Error>> + 'a, Error> {
Self::list_from(bucket, Some(prefix)).await
}
#[cfg(feature = "sync")]
pub fn list_prefix_sync(bucket: &str, prefix: &str) -> Result<Vec<Self>, Error> {
use futures::TryStreamExt;
let rt = crate::runtime()?;
let listed = rt.block_on(Self::list_from(bucket, Some(prefix)))?;
rt.block_on(listed.try_concat())
}
async fn list_from<'a>(
bucket: &'a str,
prefix: Option<&'a str>,
) -> Result<impl Stream<Item = Result<Vec<Self>, Error>> + 'a, Error> {
#[derive(Clone)]
enum ListState {
Start,
HasMore(String),
Done,
}
use ListState::*;
Ok(stream::unfold(ListState::Start, move |state| async move {
let url = format!("{}/b/{}/o", crate::BASE_URL, percent_encode(bucket));
let headers = match crate::get_headers().await {
Ok(h) => h,
Err(e) => return Some((Err(e), state)),
};
let mut query = match state.clone() {
HasMore(page_token) => vec![("pageToken", page_token)],
Done => return None,
Start => vec![],
};
if let Some(prefix) = prefix {
query.push(("prefix", prefix.to_string()));
};
let response = crate::CLIENT
.get(&url)
.query(&query)
.headers(headers)
.send()
.await;
let response = match response {
Ok(r) => r,
Err(e) => return Some((Err(e.into()), state)),
};
let json = match response.json().await {
Ok(json) => json,
Err(e) => return Some((Err(e.into()), state)),
};
let result: GoogleResponse<ListResponse<Self>> = json;
let response_body = match result {
GoogleResponse::Success(success) => success,
GoogleResponse::Error(e) => return Some((Err(e.into()), state)),
};
let items = response_body.items;
let next_state = if let Some(page_token) = response_body.next_page_token {
HasMore(page_token)
} else {
Done
};
Some((Ok(items), next_state))
}))
}
pub async fn read(bucket: &str, file_name: &str) -> crate::Result<Self> {
let url = format!(
"{}/b/{}/o/{}",
crate::BASE_URL,
percent_encode(bucket),
percent_encode(file_name),
);
let result: GoogleResponse<Self> = crate::CLIENT
.get(&url)
.headers(crate::get_headers().await?)
.send()
.await?
.json()
.await?;
match result {
GoogleResponse::Success(s) => Ok(s),
GoogleResponse::Error(e) => Err(e.into()),
}
}
#[cfg(feature = "sync")]
pub fn read_sync(bucket: &str, file_name: &str) -> crate::Result<Self> {
crate::runtime()?.block_on(Self::read(bucket, file_name))
}
pub async fn download(bucket: &str, file_name: &str) -> Result<Vec<u8>, Error> {
let url = format!(
"{}/b/{}/o/{}?alt=media",
crate::BASE_URL,
percent_encode(bucket),
percent_encode(file_name),
);
Ok(crate::CLIENT
.get(&url)
.headers(crate::get_headers().await?)
.send()
.await?
.bytes()
.await?
.to_vec())
}
#[cfg(feature = "sync")]
pub fn download_sync(bucket: &str, file_name: &str) -> crate::Result<Vec<u8>> {
crate::runtime()?.block_on(Self::download(bucket, file_name))
}
pub async fn download_streamed(
bucket: &str,
file_name: &str,
) -> crate::Result<impl Stream<Item = crate::Result<u8>> + Unpin> {
use futures::{StreamExt, TryStreamExt};
let url = format!(
"{}/b/{}/o/{}?alt=media",
crate::BASE_URL,
percent_encode(bucket),
percent_encode(file_name),
);
let res = crate::CLIENT
.get(&url)
.headers(crate::get_headers().await?)
.send()
.await?;
let size = res.content_length();
let bytes = res
.bytes_stream()
.map(|chunk| chunk.map(|c| futures::stream::iter(c.into_iter().map(Ok))))
.try_flatten();
Ok(SizedByteStream::new(bytes, size))
}
pub async fn update(&self) -> crate::Result<Self> {
let url = format!(
"{}/b/{}/o/{}",
crate::BASE_URL,
percent_encode(&self.bucket),
percent_encode(&self.name),
);
let result: GoogleResponse<Self> = crate::CLIENT
.put(&url)
.headers(crate::get_headers().await?)
.json(&self)
.send()
.await?
.json()
.await?;
match result {
GoogleResponse::Success(s) => Ok(s),
GoogleResponse::Error(e) => Err(e.into()),
}
}
#[cfg(feature = "sync")]
pub fn update_sync(&self) -> crate::Result<Self> {
crate::runtime()?.block_on(self.update())
}
pub async fn delete(bucket: &str, file_name: &str) -> Result<(), Error> {
let url = format!(
"{}/b/{}/o/{}",
crate::BASE_URL,
percent_encode(bucket),
percent_encode(file_name),
);
let response = crate::CLIENT
.delete(&url)
.headers(crate::get_headers().await?)
.send()
.await?;
if response.status().is_success() {
Ok(())
} else {
Err(Error::Google(response.json().await?))
}
}
#[cfg(feature = "sync")]
pub fn delete_sync(bucket: &str, file_name: &str) -> Result<(), Error> {
crate::runtime()?.block_on(Self::delete(bucket, file_name))
}
pub async fn compose(
bucket: &str,
req: &ComposeRequest,
destination_object: &str,
) -> crate::Result<Self> {
let url = format!(
"{}/b/{}/o/{}/compose",
crate::BASE_URL,
percent_encode(&bucket),
percent_encode(&destination_object)
);
let result: GoogleResponse<Self> = crate::CLIENT
.post(&url)
.headers(crate::get_headers().await?)
.json(req)
.send()
.await?
.json()
.await?;
match result {
GoogleResponse::Success(s) => Ok(s),
GoogleResponse::Error(e) => Err(e.into()),
}
}
#[cfg(feature = "sync")]
pub fn compose_sync(
bucket: &str,
req: &ComposeRequest,
destination_object: &str,
) -> crate::Result<Self> {
crate::runtime()?.block_on(Self::compose(bucket, req, destination_object))
}
pub async fn copy(&self, destination_bucket: &str, path: &str) -> crate::Result<Self> {
use reqwest::header::CONTENT_LENGTH;
let url = format!(
"{base}/b/{sBucket}/o/{sObject}/copyTo/b/{dBucket}/o/{dObject}",
base = crate::BASE_URL,
sBucket = percent_encode(&self.bucket),
sObject = percent_encode(&self.name),
dBucket = percent_encode(&destination_bucket),
dObject = percent_encode(&path),
);
let mut headers = crate::get_headers().await?;
headers.insert(CONTENT_LENGTH, "0".parse()?);
let result: GoogleResponse<Self> = crate::CLIENT
.post(&url)
.headers(headers)
.send()
.await?
.json()
.await?;
match result {
GoogleResponse::Success(s) => Ok(s),
GoogleResponse::Error(e) => Err(e.into()),
}
}
#[cfg(feature = "sync")]
pub fn copy_sync(&self, destination_bucket: &str, path: &str) -> crate::Result<Self> {
crate::runtime()?.block_on(self.copy(destination_bucket, path))
}
pub async fn rewrite(&self, destination_bucket: &str, path: &str) -> crate::Result<Self> {
use reqwest::header::CONTENT_LENGTH;
let url = format!(
"{base}/b/{sBucket}/o/{sObject}/rewriteTo/b/{dBucket}/o/{dObject}",
base = crate::BASE_URL,
sBucket = percent_encode(&self.bucket),
sObject = percent_encode(&self.name),
dBucket = percent_encode(destination_bucket),
dObject = percent_encode(path),
);
let mut headers = crate::get_headers().await?;
headers.insert(CONTENT_LENGTH, "0".parse()?);
let result: GoogleResponse<RewriteResponse> = crate::CLIENT
.post(&url)
.headers(headers)
.send()
.await?
.json()
.await?;
match result {
GoogleResponse::Success(s) => Ok(s.resource),
GoogleResponse::Error(e) => Err(e.into()),
}
}
#[cfg(feature = "sync")]
pub fn rewrite_sync(&self, destination_bucket: &str, path: &str) -> crate::Result<Self> {
crate::runtime()?.block_on(self.rewrite(destination_bucket, path))
}
pub fn download_url(&self, duration: u32) -> crate::Result<String> {
self.sign(&self.name, duration, "GET", None)
}
pub fn download_url_with(
&self,
duration: u32,
opts: crate::DownloadOptions,
) -> crate::Result<String> {
self.sign(&self.name, duration, "GET", opts.content_disposition)
}
#[inline(always)]
fn sign(
&self,
file_path: &str,
duration: u32,
http_verb: &str,
content_disposition: Option<String>,
) -> crate::Result<String> {
use openssl::sha;
if duration > 604800 {
let msg = format!(
"duration may not be greater than 604800, but was {}",
duration
);
return Err(Error::Other(msg));
}
let mut headers = vec![];
headers.push(("host".to_string(), "storage.googleapis.com".to_string()));
headers.sort_unstable_by(|(k1, _), (k2, _)| k1.cmp(&k2));
let canonical_headers: String = headers
.iter()
.map(|(k, v)| format!("{}:{}", k.to_lowercase(), v.to_lowercase()))
.collect::<Vec<String>>()
.join("\n");
let signed_headers = headers
.iter()
.map(|(k, _)| k.to_lowercase())
.collect::<Vec<String>>()
.join(";");
let issue_date = chrono::Utc::now();
let file_path = self.path_to_resource(file_path);
let query_string = Self::get_canonical_query_string(
&issue_date,
duration,
&signed_headers,
content_disposition,
);
let canonical_request =
self.get_canonical_request(&file_path, &query_string, http_verb, &canonical_headers);
let hash = sha::sha256(canonical_request.as_bytes());
let hex_hash = hex::encode(hash);
let string_to_sign = format!(
"{signing_algorithm}\n\
{current_datetime}\n\
{credential_scope}\n\
{hashed_canonical_request}",
signing_algorithm = "GOOG4-RSA-SHA256",
current_datetime = issue_date.format("%Y%m%dT%H%M%SZ"),
credential_scope = Self::get_credential_scope(&issue_date),
hashed_canonical_request = hex_hash,
);
let buffer = Self::sign_str(&string_to_sign)?;
let signature = hex::encode(&buffer);
Ok(format!(
"https://storage.googleapis.com{path_to_resource}?\
{query_string}&\
X-Goog-Signature={request_signature}",
path_to_resource = file_path,
query_string = query_string,
request_signature = signature,
))
}
#[inline(always)]
fn get_canonical_request(
&self,
path: &str,
query_string: &str,
http_verb: &str,
headers: &str,
) -> String {
format!(
"{http_verb}\n\
{path_to_resource}\n\
{canonical_query_string}\n\
{canonical_headers}\n\
\n\
{signed_headers}\n\
{payload}",
http_verb = http_verb,
path_to_resource = path,
canonical_query_string = query_string,
canonical_headers = headers,
signed_headers = "host",
payload = "UNSIGNED-PAYLOAD",
)
}
#[inline(always)]
fn get_canonical_query_string(
date: &chrono::DateTime<chrono::Utc>,
exp: u32,
headers: &str,
content_disposition: Option<String>,
) -> String {
let credential = format!(
"{authorizer}/{scope}",
authorizer = crate::SERVICE_ACCOUNT.client_email,
scope = Self::get_credential_scope(date),
);
let mut s = format!(
"X-Goog-Algorithm={algo}&\
X-Goog-Credential={cred}&\
X-Goog-Date={date}&\
X-Goog-Expires={exp}&\
X-Goog-SignedHeaders={signed}",
algo = "GOOG4-RSA-SHA256",
cred = percent_encode(&credential),
date = date.format("%Y%m%dT%H%M%SZ"),
exp = exp,
signed = headers,
);
if let Some(cd) = content_disposition {
s.push_str(&format!("&response-content-disposition={}", cd));
}
s
}
#[inline(always)]
fn path_to_resource(&self, path: &str) -> String {
format!(
"/{bucket}/{file_path}",
bucket = self.bucket,
file_path = percent_encode_noslash(path),
)
}
#[inline(always)]
fn get_credential_scope(date: &chrono::DateTime<chrono::Utc>) -> String {
format!("{}/henk/storage/goog4_request", date.format("%Y%m%d"))
}
#[inline(always)]
fn sign_str(message: &str) -> Result<Vec<u8>, Error> {
use openssl::{hash::MessageDigest, pkey::PKey, sign::Signer};
let key = PKey::private_key_from_pem(crate::SERVICE_ACCOUNT.private_key.as_bytes())?;
let mut signer = Signer::new(MessageDigest::sha256(), &key)?;
signer.update(message.as_bytes())?;
Ok(signer.sign_to_vec()?)
}
}
const ENCODE_SET: &AsciiSet = &NON_ALPHANUMERIC
.remove(b'*')
.remove(b'-')
.remove(b'.')
.remove(b'_');
const NOSLASH_ENCODE_SET: &AsciiSet = &ENCODE_SET.remove(b'/').remove(b'~');
fn percent_encode_noslash(input: &str) -> String {
utf8_percent_encode(input, NOSLASH_ENCODE_SET).to_string()
}
fn percent_encode(input: &str) -> String {
utf8_percent_encode(input, ENCODE_SET).to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{StreamExt, TryStreamExt};
#[tokio::test]
async fn create() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
Object::create(&bucket.name, vec![0, 1], "test-create", "text/plain").await?;
Ok(())
}
#[tokio::test]
async fn create_streamed() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
let stream = stream::iter([0u8, 1].iter())
.map(Ok::<_, Box<dyn std::error::Error + Send + Sync>>)
.map_ok(|&b| bytes::BytesMut::from(&[b][..]));
Object::create_streamed(
&bucket.name,
stream,
2,
"test-create-streamed",
"text/plain",
)
.await?;
Ok(())
}
#[tokio::test]
async fn list() -> Result<(), Box<dyn std::error::Error>> {
let test_bucket = crate::read_test_bucket().await;
let _v: Vec<Object> = Object::list(&test_bucket.name).await?.try_concat().await?;
Ok(())
}
async fn flattened_list_prefix_stream(
bucket: &str,
prefix: &str,
) -> Result<Vec<Object>, Box<dyn std::error::Error>> {
Ok(Object::list_prefix(bucket, prefix)
.await?
.try_concat()
.await?)
}
#[tokio::test]
async fn list_prefix() -> Result<(), Box<dyn std::error::Error>> {
let test_bucket = crate::read_test_bucket().await;
let prefix_names = [
"test-list-prefix/1",
"test-list-prefix/2",
"test-list-prefix/sub/1",
"test-list-prefix/sub/2",
];
for name in &prefix_names {
Object::create(&test_bucket.name, vec![0, 1], name, "text/plain").await?;
}
let list = flattened_list_prefix_stream(&test_bucket.name, "test-list-prefix/").await?;
assert_eq!(list.len(), 4);
let list = flattened_list_prefix_stream(&test_bucket.name, "test-list-prefix/sub").await?;
assert_eq!(list.len(), 2);
Ok(())
}
#[tokio::test]
async fn read() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
Object::create(&bucket.name, vec![0, 1], "test-read", "text/plain").await?;
Object::read(&bucket.name, "test-read").await?;
Ok(())
}
#[tokio::test]
async fn download() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
let content = b"hello world";
Object::create(
&bucket.name,
content.to_vec(),
"test-download",
"application/octet-stream",
)
.await?;
let data = Object::download(&bucket.name, "test-download").await?;
assert_eq!(data, content);
Ok(())
}
#[tokio::test]
async fn download_streamed() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
let content = b"hello world";
Object::create(
&bucket.name,
content.to_vec(),
"test-download",
"application/octet-stream",
)
.await?;
let result = Object::download_streamed(&bucket.name, "test-download").await?;
let data = result.try_collect::<Vec<_>>().await?;
assert_eq!(data, content);
Ok(())
}
#[tokio::test]
async fn download_streamed_large() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
let content = vec![5u8; 1_000_000];
Object::create(
&bucket.name,
content.to_vec(),
"test-download-large",
"application/octet-stream",
)
.await?;
let mut result = Object::download_streamed(&bucket.name, "test-download-large").await?;
let mut data: Vec<u8> = Vec::new();
while let Some(part) = result.next().await {
data.push(part?);
}
assert_eq!(data, content);
Ok(())
}
#[tokio::test]
async fn update() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
let mut obj = Object::create(&bucket.name, vec![0, 1], "test-update", "text/plain").await?;
obj.content_type = Some("application/xml".to_string());
obj.update().await?;
Ok(())
}
#[tokio::test]
async fn delete() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
Object::create(&bucket.name, vec![0, 1], "test-delete", "text/plain").await?;
Object::delete(&bucket.name, "test-delete").await?;
let list: Vec<_> = flattened_list_prefix_stream(&bucket.name, "test-delete").await?;
assert!(list.is_empty());
Ok(())
}
#[tokio::test]
async fn delete_nonexistent() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
let nonexistent_object = "test-delete-nonexistent";
let delete_result = Object::delete(&bucket.name, nonexistent_object).await;
if let Err(Error::Google(google_error_response)) = delete_result {
assert!(google_error_response.to_string().contains(&format!(
"No such object: {}/{}",
bucket.name, nonexistent_object
)));
} else {
panic!("Expected a Google error, instead got {:?}", delete_result);
}
Ok(())
}
#[tokio::test]
async fn compose() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
let obj1 = Object::create(&bucket.name, vec![0, 1], "test-compose-1", "text/plain").await?;
let obj2 = Object::create(&bucket.name, vec![2, 3], "test-compose-2", "text/plain").await?;
let compose_request = ComposeRequest {
kind: "storage#composeRequest".to_string(),
source_objects: vec![
SourceObject {
name: obj1.name.clone(),
generation: None,
object_preconditions: None,
},
SourceObject {
name: obj2.name.clone(),
generation: None,
object_preconditions: None,
},
],
destination: None,
};
let obj3 = Object::compose(&bucket.name, &compose_request, "test-concatted-file").await?;
let url = obj3.download_url(100)?;
let content = reqwest::get(&url).await?.text().await?;
assert_eq!(content.as_bytes(), &[0, 1, 2, 3]);
Ok(())
}
#[tokio::test]
async fn copy() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
let original = Object::create(&bucket.name, vec![2, 3], "test-copy", "text/plain").await?;
original.copy(&bucket.name, "test-copy - copy").await?;
Ok(())
}
#[tokio::test]
async fn rewrite() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
let obj = Object::create(&bucket.name, vec![0, 1], "test-rewrite", "text/plain").await?;
let obj = obj.rewrite(&bucket.name, "test-rewritten").await?;
let url = obj.download_url(100)?;
let download = crate::CLIENT.head(&url).send().await?;
assert_eq!(download.status().as_u16(), 200);
Ok(())
}
#[tokio::test]
async fn test_url_encoding() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
let complicated_names = [
"asdf",
"asdf+1",
"asdf&&+1?=3,,-_()*&^%$#@!`~{}[]\\|:;\"'<>,.?/äöüëß",
"https://www.google.com",
"परिक्षण फाईल",
"测试很重要",
];
for name in &complicated_names {
let _obj = Object::create(&bucket.name, vec![0, 1], name, "text/plain").await?;
let obj = Object::read(&bucket.name, &name).await.unwrap();
let url = obj.download_url(100)?;
let download = crate::CLIENT.head(&url).send().await?;
assert_eq!(download.status().as_u16(), 200);
}
Ok(())
}
#[tokio::test]
async fn test_download_url_with() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket().await;
let client = reqwest::Client::new();
let obj = Object::create(&bucket.name, vec![0, 1], "test-rewrite", "text/plain").await?;
let opts1 = crate::DownloadOptions::new().content_disposition("attachment");
let download_url1 = obj.download_url_with(100, opts1)?;
let download1 = client.head(&download_url1).send().await?;
assert_eq!(download1.headers()["content-disposition"], "attachment");
Ok(())
}
#[cfg(feature = "sync")]
mod sync {
use super::*;
#[test]
fn create() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket_sync();
Object::create_sync(&bucket.name, vec![0, 1], "test-create", "text/plain")?;
Ok(())
}
#[test]
fn create_streamed() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket_sync();
let cursor = std::io::Cursor::new([0, 1]);
Object::create_streamed_sync(
&bucket.name,
cursor,
2,
"test-create-streamed",
"text/plain",
)?;
Ok(())
}
#[test]
fn list() -> Result<(), Box<dyn std::error::Error>> {
let test_bucket = crate::read_test_bucket_sync();
Object::list_sync(&test_bucket.name)?;
Ok(())
}
#[test]
fn list_prefix() -> Result<(), Box<dyn std::error::Error>> {
let test_bucket = crate::read_test_bucket_sync();
let prefix_names = [
"test-list-prefix/1",
"test-list-prefix/2",
"test-list-prefix/sub/1",
"test-list-prefix/sub/2",
];
for name in &prefix_names {
Object::create_sync(&test_bucket.name, vec![0, 1], name, "text/plain")?;
}
let list = Object::list_prefix_sync(&test_bucket.name, "test-list-prefix/")?;
assert_eq!(list.len(), 4);
let list = Object::list_prefix_sync(&test_bucket.name, "test-list-prefix/sub")?;
assert_eq!(list.len(), 2);
Ok(())
}
#[test]
fn read() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket_sync();
Object::create_sync(&bucket.name, vec![0, 1], "test-read", "text/plain")?;
Object::read_sync(&bucket.name, "test-read")?;
Ok(())
}
#[test]
fn download() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket_sync();
let content = b"hello world";
Object::create_sync(
&bucket.name,
content.to_vec(),
"test-download",
"application/octet-stream",
)?;
let data = Object::download_sync(&bucket.name, "test-download")?;
assert_eq!(data, content);
Ok(())
}
#[test]
fn update() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket_sync();
let mut obj =
Object::create_sync(&bucket.name, vec![0, 1], "test-update", "text/plain")?;
obj.content_type = Some("application/xml".to_string());
obj.update_sync()?;
Ok(())
}
#[test]
fn delete() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket_sync();
Object::create_sync(&bucket.name, vec![0, 1], "test-delete", "text/plain")?;
Object::delete_sync(&bucket.name, "test-delete")?;
let list = Object::list_prefix_sync(&bucket.name, "test-delete")?;
assert!(list.is_empty());
Ok(())
}
#[test]
fn delete_nonexistent() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket_sync();
let nonexistent_object = "test-delete-nonexistent";
let delete_result = Object::delete_sync(&bucket.name, nonexistent_object);
if let Err(Error::Google(google_error_response)) = delete_result {
assert!(google_error_response.to_string().contains(&format!(
"No such object: {}/{}",
bucket.name, nonexistent_object
)));
} else {
panic!("Expected a Google error, instead got {:?}", delete_result);
}
Ok(())
}
#[test]
fn compose() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket_sync();
let obj1 =
Object::create_sync(&bucket.name, vec![0, 1], "test-compose-1", "text/plain")?;
let obj2 =
Object::create_sync(&bucket.name, vec![2, 3], "test-compose-2", "text/plain")?;
let compose_request = ComposeRequest {
kind: "storage#composeRequest".to_string(),
source_objects: vec![
SourceObject {
name: obj1.name.clone(),
generation: None,
object_preconditions: None,
},
SourceObject {
name: obj2.name.clone(),
generation: None,
object_preconditions: None,
},
],
destination: None,
};
let obj3 = Object::compose_sync(&bucket.name, &compose_request, "test-concatted-file")?;
let url = obj3.download_url(100)?;
let content = reqwest::blocking::get(&url)?.text()?;
assert_eq!(content.as_bytes(), &[0, 1, 2, 3]);
Ok(())
}
#[test]
fn copy() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket_sync();
let original =
Object::create_sync(&bucket.name, vec![2, 3], "test-copy", "text/plain")?;
original.copy_sync(&bucket.name, "test-copy - copy")?;
Ok(())
}
#[test]
fn rewrite() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket_sync();
let obj = Object::create_sync(&bucket.name, vec![0, 1], "test-rewrite", "text/plain")?;
let obj = obj.rewrite_sync(&bucket.name, "test-rewritten")?;
let url = obj.download_url(100)?;
let client = reqwest::blocking::Client::new();
let download = client.head(&url).send()?;
assert_eq!(download.status().as_u16(), 200);
Ok(())
}
#[test]
fn test_url_encoding() -> Result<(), Box<dyn std::error::Error>> {
let bucket = crate::read_test_bucket_sync();
let complicated_names = [
"asdf",
"asdf+1",
"asdf&&+1?=3,,-_()*&^%$#@!`~{}[]\\|:;\"'<>,.?/äöüëß",
"https://www.google.com",
"परिक्षण फाईल",
"测试很重要",
];
for name in &complicated_names {
let _obj = Object::create_sync(&bucket.name, vec![0, 1], name, "text/plain")?;
let obj = Object::read_sync(&bucket.name, &name).unwrap();
let url = obj.download_url(100)?;
let client = reqwest::blocking::Client::new();
let download = client.head(&url).send()?;
assert_eq!(download.status().as_u16(), 200);
}
Ok(())
}
}
}
pub struct SizedByteStream<S: Stream<Item = crate::Result<u8>> + Unpin> {
size: Option<u64>,
bytes: S,
}
impl<S: Stream<Item = crate::Result<u8>> + Unpin> SizedByteStream<S> {
fn new(bytes: S, size: Option<u64>) -> Self {
Self { bytes, size }
}
}
impl<S: Stream<Item = crate::Result<u8>> + Unpin> Stream for SizedByteStream<S> {
type Item = crate::Result<u8>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut futures::task::Context,
) -> futures::task::Poll<Option<Self::Item>> {
futures::StreamExt::poll_next_unpin(&mut self.bytes, cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let size = self
.size
.and_then(|s| std::convert::TryInto::try_into(s).ok());
(size.unwrap_or(0), size)
}
}