use std::ops::Not;
use futures::{StreamExt, TryStreamExt};
use gcs_rsync::{
oauth2::token::ServiceAccountCredentials,
storage::{
Metadata, Object, ObjectClient, ObjectMetadata, ObjectsListRequest, PartialObject,
StorageResult,
},
};
mod config;
use config::gcs::GcsTestConfig;
#[tokio::test]
async fn test_test_config() {
let t = GcsTestConfig::from_env().await;
assert!(
t.list_prefix().is_empty().not(),
"list prefix should not be empty"
);
let name = t.object("object_name").name;
assert!(
name.ends_with("/object_name"),
"object name should end with /object_name"
);
assert!(t.bucket().is_empty().not(), "bucket should not be empty");
}
async fn assert_delete_err(object_client: &ObjectClient, object: &Object) {
let delete_result = object_client.delete(object).await;
assert!(
delete_result.is_err(),
"expected an error got {:?} for {}",
delete_result,
object
);
}
async fn assert_delete_ok(object_client: &ObjectClient, object: &Object) {
let delete_result = object_client.delete(object).await;
assert!(
delete_result.is_ok(),
"unexpected error {:?} for {}",
delete_result,
object
);
}
async fn upload_bytes(
object_client: &ObjectClient,
object: &Object,
content: &str,
) -> StorageResult<()> {
let data = bytes::Bytes::copy_from_slice(content.as_bytes());
let stream = futures::stream::once(futures::future::ok::<bytes::Bytes, String>(data));
object_client.upload(object, stream).await
}
async fn assert_upload_bytes(object_client: &ObjectClient, object: &Object, content: &str) {
let upload_result = upload_bytes(object_client, object, content).await;
assert!(
upload_result.is_ok(),
"unexpected error got {:?} for {}",
upload_result,
object
);
}
async fn assert_download_bytes(object_client: &ObjectClient, object: &Object, expected: &str) {
let bytes = futures::stream::once(object_client.download(object))
.try_flatten()
.try_fold(Vec::new(), |mut bytes, buffer| {
for byte in buffer {
bytes.push(byte);
}
futures::future::ok(bytes)
})
.await
.unwrap();
let actual = String::from_utf8(bytes).unwrap();
assert_eq!(expected, actual);
}
#[tokio::test]
async fn test_upload_multipart() {
let test_config = GcsTestConfig::from_env().await;
let object = test_config.object("with/path/object_mutlipart.txt");
let object_client = ObjectClient::new(Box::new(test_config.token()))
.await
.unwrap();
let content = b"test multipart";
let data = bytes::Bytes::copy_from_slice(content);
let stream = futures::stream::once(futures::future::ok::<bytes::Bytes, String>(data));
let now = chrono::offset::Utc::now().timestamp();
let metadata = ObjectMetadata {
metadata: Metadata {
modification_time: Some(now),
},
};
object_client
.upload_with_metadata(&metadata, &object, stream)
.await
.unwrap();
let actual = object_client
.get(&object, "size, metadata/goog-reserved-file-mtime")
.await
.unwrap();
assert_delete_ok(&object_client, &object).await;
assert_eq!(Some(now), actual.metadata.and_then(|x| x.modification_time));
assert_eq!(Some(content.len() as u64), actual.size);
}
#[tokio::test]
async fn test_delete_upload_download_delete() {
let test_config = GcsTestConfig::from_env().await;
let object = test_config.object("object.txt");
let object_client = ObjectClient::new(Box::new(test_config.token()))
.await
.unwrap();
let content = "hello";
assert_delete_err(&object_client, &object).await;
assert_upload_bytes(&object_client, &object, content).await;
assert_download_bytes(&object_client, &object, content).await;
assert_delete_ok(&object_client, &object).await;
}
#[tokio::test]
async fn test_get_object_ok() {
let test_config = GcsTestConfig::from_env().await;
let object = test_config.object("object.txt");
let object_client = ObjectClient::new(Box::new(test_config.token()))
.await
.unwrap();
let content = "hello";
assert_delete_err(&object_client, &object).await;
assert_upload_bytes(&object_client, &object, content).await;
let partial_object = object_client.get(&object, "name,selfLink").await.unwrap();
assert!(partial_object.name.unwrap().ends_with("object.txt"));
let self_link = partial_object.self_link.unwrap();
assert!(self_link.ends_with("%2Fobject.txt") || self_link.ends_with("%5Cobject.txt"));
assert_eq!(None, partial_object.crc32c);
assert_delete_ok(&object_client, &object).await;
}
#[tokio::test]
async fn test_get_object_size() {
let test_config = GcsTestConfig::from_env().await;
let object = test_config.object("object.txt");
let object_client = ObjectClient::new(Box::new(test_config.token()))
.await
.unwrap();
let content = "hello";
assert_delete_err(&object_client, &object).await;
assert_upload_bytes(&object_client, &object, content).await;
let partial_object = object_client.get(&object, "size").await.unwrap();
assert_eq!(5, partial_object.size.unwrap());
assert_delete_ok(&object_client, &object).await;
}
#[tokio::test]
async fn test_get_object_not_found() {
let test_config = GcsTestConfig::from_env().await;
let object = test_config.object("object.txt");
let object_client = ObjectClient::new(Box::new(test_config.token()))
.await
.unwrap();
let err = object_client
.get(&object, "name,selfLink")
.await
.unwrap_err();
assert_not_found_response(err);
}
#[tokio::test]
async fn test_upload_with_detailed_error() {
let test_config = GcsTestConfig::from_env().await;
let object_client = ObjectClient::new(Box::new(test_config.token()))
.await
.unwrap();
let object = Object::new("the_bad_bucket", "name").unwrap();
let err = upload_bytes(&object_client, &object, "").await.unwrap_err();
assert_unexpected_response(err, r#""code": 403"#);
}
#[tokio::test]
async fn test_api_list_objects() {
let test_config = GcsTestConfig::from_env().await;
let count = 11;
let prefix = test_config.list_prefix();
let bucket = test_config.bucket();
let test_objects = (0..count + 2)
.map(|i| test_config.object(format!("object_{}", i).as_str()))
.collect::<Vec<_>>();
let object_client = ObjectClient::new(Box::new(test_config.token()))
.await
.unwrap();
futures::stream::iter(test_objects.iter())
.for_each_concurrent(config::default::CONCURRENCY_LEVEL, |object| {
assert_upload_bytes(&object_client, object, "hello")
})
.await;
let objects_list_request = ObjectsListRequest {
prefix: Some(prefix),
fields: Some("items(selfLink,name),nextPageToken".to_owned()),
max_results: Some(2),
..Default::default()
};
let result: Vec<PartialObject> = object_client
.list(bucket.as_str(), &objects_list_request)
.await
.take(count)
.try_collect()
.await
.unwrap();
assert_eq!(count, result.len());
futures::stream::iter(test_objects.iter())
.for_each_concurrent(config::default::CONCURRENCY_LEVEL, |object| {
assert_delete_ok(&object_client, object)
})
.await;
}
#[tokio::test]
async fn test_crc32c_object() {
let test_config = GcsTestConfig::from_env().await;
let bucket = test_config.bucket();
let prefix = test_config.list_prefix();
let test_object = &test_config.object("test_crc32c");
let object_client = ObjectClient::new(Box::new(test_config.token()))
.await
.unwrap();
assert_upload_bytes(&object_client, test_object, "hello world!").await;
let objects_list_request = ObjectsListRequest {
prefix: Some(prefix),
fields: Some("items(name,crc32c),nextPageToken".to_owned()),
max_results: Some(2),
..Default::default()
};
let mut result: Vec<PartialObject> = object_client
.list(bucket.as_str(), &objects_list_request)
.await
.try_collect()
.await
.unwrap();
assert_eq!(1, result.len());
let crc32c = result.pop().unwrap().crc32c.unwrap_or_default().to_u32();
assert_eq!(1238062967, crc32c);
assert_delete_ok(&object_client, test_object).await;
}
fn assert_unexpected_response(err: gcs_rsync::storage::Error, content: &str) {
match err {
gcs_rsync::storage::Error::GcsUnexpectedResponse { value: actual, .. } => {
assert!(
actual.contains(content),
"{:?} not found in json {}",
content,
actual
);
}
e => panic!("expected UnexpectedApiResponse error got {:?}", e),
}
}
fn assert_not_found_response(err: gcs_rsync::storage::Error) {
match err {
gcs_rsync::storage::Error::GcsResourceNotFound { .. } => (),
e => panic!("expected GcsResourceNotFound error got {:?}", e),
}
}
#[tokio::test]
async fn test_api_list_objects_not_found_error() {
let path = env!("TEST_SERVICE_ACCOUNT");
let sac = ServiceAccountCredentials::from_file(path)
.await
.unwrap()
.with_scope("https://www.googleapis.com/auth/devstorage.full_control");
let object_client = ObjectClient::new(Box::new(sac)).await.unwrap();
let objects_list_request = ObjectsListRequest::default();
let err = object_client
.list("my_very_bad_bucket", &objects_list_request)
.await
.take(1)
.try_collect::<Vec<_>>()
.await
.unwrap_err();
assert_not_found_response(err);
}